Skip to content

Commit f5d2480

Browse files
committed
chore: script to block project organization affiliations
1 parent 801d643 commit f5d2480

10 files changed

Lines changed: 197 additions & 103 deletions

File tree

services/apps/script_executor_worker/src/activities.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import {
22
blockMemberOrganizationAffiliation,
3-
getOrganizationMembers,
4-
} from './activities/block-organization-affiliation'
3+
fetchProjectMemberOrganizationsToBlock,
4+
getMembersForAffiliationRecalc,
5+
markMemberForAffiliationRecalc,
6+
} from './activities/block-project-organization-affiliations'
57
import {
68
findDuplicateMembersAfterDate,
79
moveMemberActivityRelations,
@@ -78,6 +80,8 @@ export {
7880
unlinkOrganizationFromBotActivities,
7981
syncMember,
8082
blockMemberOrganizationAffiliation,
81-
getOrganizationMembers,
83+
fetchProjectMemberOrganizationsToBlock,
84+
markMemberForAffiliationRecalc,
85+
getMembersForAffiliationRecalc,
8286
calculateMemberAffiliations,
8387
}

services/apps/script_executor_worker/src/activities/block-organization-affiliation.ts

Lines changed: 0 additions & 40 deletions
This file was deleted.
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import { pgpQx } from '@crowd/data-access-layer'
2+
import { changeMemberOrganizationAffiliationOverrides } from '@crowd/data-access-layer/src/member_organization_affiliation_overrides'
3+
import OrganizationRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/organization.repo'
4+
import { IChangeAffiliationOverrideData, IMemberOrganization } from '@crowd/types'
5+
6+
import { svc } from '../main'
7+
8+
export async function fetchProjectMemberOrganizationsToBlock(
9+
limit: number,
10+
afterId?: string,
11+
): Promise<Pick<IMemberOrganization, 'memberId' | 'id'>[]> {
12+
try {
13+
const orgRepo = new OrganizationRepository(svc.postgres.reader.connection(), svc.log)
14+
return orgRepo.fetchProjectMemberOrganizationsToBlock(limit, afterId)
15+
} catch (error) {
16+
svc.log.error(error, 'Error fetching project member organizations to block!')
17+
throw error
18+
}
19+
}
20+
21+
export async function blockMemberOrganizationAffiliation(
22+
data: IChangeAffiliationOverrideData[],
23+
): Promise<void> {
24+
try {
25+
const qx = pgpQx(svc.postgres.writer.connection())
26+
return changeMemberOrganizationAffiliationOverrides(qx, data)
27+
} catch (error) {
28+
svc.log.error(error, 'Error blocking organization affiliation!')
29+
throw error
30+
}
31+
}
32+
33+
export async function markMemberForAffiliationRecalc(memberIds: string[]): Promise<void> {
34+
try {
35+
await svc.redis.sAdd('queue:recalculate:members:affiliation', memberIds)
36+
} catch (error) {
37+
svc.log.error(error, 'Error marking member for affiliation recalc!')
38+
throw error
39+
}
40+
}
41+
42+
export async function getMembersForAffiliationRecalc(batchSize: number): Promise<string[]> {
43+
try {
44+
return svc.redis.sPop('queue:recalculate:members:affiliation', batchSize)
45+
} catch (error) {
46+
svc.log.error(error, 'Error getting members for affiliation recalc!')
47+
throw error
48+
}
49+
}

services/apps/script_executor_worker/src/activities/process-llm-verified-merges/index.ts renamed to services/apps/script_executor_worker/src/activities/process-llm-verified-merges.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import MergeActionRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/mergeAction.repo'
22
import { EntityType } from '@crowd/data-access-layer/src/old/apps/script_executor_worker/types'
33

4-
import { svc } from '../../main'
4+
import { svc } from '../main'
55

66
export async function getUnprocessedLLMApprovedSuggestions(
77
batchSize: number,

services/apps/script_executor_worker/src/types.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,5 @@ export interface IDedupActivityRelationsArgs extends IScriptBatchTestArgs {
5050
}
5151

5252
export interface IBlockOrganizationAffiliationArgs {
53-
organizationId: string
54-
offset?: number
53+
afterId?: string
5554
}

services/apps/script_executor_worker/src/workflows.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { blockOrganizationAffiliation } from './workflows/block-organization-affiliation'
1+
import { blockProjectOrganizationAffiliations } from './workflows/block-project-organization-affiliations'
22
import { cleanupDuplicateMembers } from './workflows/cleanup/duplicate-members'
33
import { cleanupMembers } from './workflows/cleanup/members'
44
import { cleanupOrganizations } from './workflows/cleanup/organizations'
@@ -8,6 +8,7 @@ import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from '.
88
import { fixBotMembersAffiliation } from './workflows/fix-bot-members-affiliation'
99
import { fixOrgIdentitiesWithWrongUrls } from './workflows/fixOrgIdentitiesWithWrongUrls'
1010
import { processLLMVerifiedMerges } from './workflows/processLLMVerifiedMerges'
11+
import { recalculateMemberAffiliations } from './workflows/recalculate-member-affiliations'
1112

1213
export {
1314
findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms,
@@ -19,5 +20,6 @@ export {
1920
processLLMVerifiedMerges,
2021
cleanupDuplicateMembers,
2122
fixBotMembersAffiliation,
22-
blockOrganizationAffiliation,
23+
blockProjectOrganizationAffiliations,
24+
recalculateMemberAffiliations,
2325
}

services/apps/script_executor_worker/src/workflows/block-organization-affiliation.ts

Lines changed: 0 additions & 46 deletions
This file was deleted.
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import {
2+
ChildWorkflowCancellationType,
3+
ParentClosePolicy,
4+
continueAsNew,
5+
proxyActivities,
6+
startChild,
7+
} from '@temporalio/workflow'
8+
9+
import * as activities from '../activities'
10+
import { IBlockOrganizationAffiliationArgs } from '../types'
11+
import { chunkArray } from '../utils/common'
12+
13+
import { recalculateMemberAffiliations } from './recalculate-member-affiliations'
14+
15+
const {
16+
fetchProjectMemberOrganizationsToBlock,
17+
blockMemberOrganizationAffiliation,
18+
markMemberForAffiliationRecalc,
19+
} = proxyActivities<typeof activities>({
20+
startToCloseTimeout: '30 minutes',
21+
})
22+
23+
export async function blockProjectOrganizationAffiliations(
24+
args: IBlockOrganizationAffiliationArgs,
25+
): Promise<void> {
26+
const MEMBERS_PER_RUN = 1000
27+
const afterId = args.afterId ?? undefined
28+
29+
const memberOrganizations = await fetchProjectMemberOrganizationsToBlock(MEMBERS_PER_RUN, afterId)
30+
31+
if (memberOrganizations?.length === 0) {
32+
console.log('No more organization members to block!')
33+
34+
await startChild(recalculateMemberAffiliations, {
35+
workflowId: `recalculateMemberAffiliations/${Date.now()}`,
36+
cancellationType: ChildWorkflowCancellationType.ABANDON,
37+
parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON,
38+
retry: {
39+
backoffCoefficient: 2,
40+
initialInterval: 2 * 1000,
41+
maximumInterval: 30 * 1000,
42+
},
43+
args: [
44+
{
45+
batchSize: 500,
46+
},
47+
],
48+
})
49+
50+
return
51+
}
52+
53+
const CONCURRENCY = 10
54+
const BATCH_SIZE = 50
55+
56+
// Step 1: Block affiliations in batches
57+
const batches = chunkArray(memberOrganizations, BATCH_SIZE).map((chunk) =>
58+
chunk.map((mo) => ({
59+
memberId: mo.memberId,
60+
memberOrganizationId: mo.id,
61+
allowAffiliation: false,
62+
})),
63+
)
64+
65+
for (let i = 0; i < batches.length; i += CONCURRENCY) {
66+
const slice = batches.slice(i, i + CONCURRENCY)
67+
await Promise.all(slice.map((batch) => blockMemberOrganizationAffiliation(batch)))
68+
}
69+
70+
// Step 2: Deduplicate memberIds and mark for affiliation recalculation
71+
const uniqueMemberIds = Array.from(new Set(memberOrganizations.map((mo) => mo.memberId)))
72+
73+
await markMemberForAffiliationRecalc(uniqueMemberIds)
74+
75+
const lastProcessedId = memberOrganizations[memberOrganizations.length - 1]?.id
76+
77+
// Step 3: Continue pagination
78+
await continueAsNew<typeof blockProjectOrganizationAffiliations>({
79+
...args,
80+
afterId: lastProcessedId,
81+
})
82+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { continueAsNew, proxyActivities } from '@temporalio/workflow'
2+
3+
import * as activities from '../activities'
4+
import { IScriptBatchTestArgs } from '../types'
5+
import { chunkArray } from '../utils/common'
6+
7+
const { getMembersForAffiliationRecalc, calculateMemberAffiliations } = proxyActivities<
8+
typeof activities
9+
>({
10+
startToCloseTimeout: '30 minutes',
11+
})
12+
13+
export async function recalculateMemberAffiliations(args: IScriptBatchTestArgs): Promise<void> {
14+
const MEMBERS_PER_RUN = args.batchSize ?? 200
15+
16+
const memberIds = await getMembersForAffiliationRecalc(MEMBERS_PER_RUN)
17+
18+
if (memberIds?.length === 0) {
19+
console.log('No more members to recalculate affiliations!')
20+
return
21+
}
22+
23+
for (const chunk of chunkArray(memberIds, 10)) {
24+
await Promise.all(chunk.map((memberId) => calculateMemberAffiliations(memberId)))
25+
}
26+
27+
await continueAsNew<typeof recalculateMemberAffiliations>(args)
28+
}

services/libs/data-access-layer/src/old/apps/script_executor_worker/organization.repo.ts

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -165,18 +165,34 @@ class OrganizationRepository {
165165
)
166166
}
167167

168-
async findOrganizationMembers(
169-
organizationId: string,
168+
async fetchProjectMemberOrganizationsToBlock(
170169
limit: number,
171-
offset: number,
172-
): Promise<IMemberOrganization[]> {
170+
afterId?: string,
171+
): Promise<Pick<IMemberOrganization, 'memberId' | 'id'>[]> {
173172
return this.connection.any(
174173
`
175-
SELECT * FROM "memberOrganizations"
176-
WHERE "organizationId" = $(organizationId) AND "deletedAt" IS NULL
177-
ORDER BY "memberId", "id"
178-
LIMIT $(limit) OFFSET $(offset)`,
179-
{ organizationId, limit, offset },
174+
select mo.id, mo."memberId"
175+
from "memberOrganizations" mo
176+
join organizations o on mo."organizationId" = o.id
177+
where o."deletedAt" is null
178+
and mo."deletedAt" is null
179+
and exists (
180+
select 1
181+
from segments s
182+
where trim(lower(o."displayName")) = trim(lower(s.name))
183+
)
184+
and not exists (
185+
select 1
186+
from "memberOrganizationAffiliationOverrides" mao
187+
where mao."memberId" = mo."memberId"
188+
and mao."memberOrganizationId" = mo.id
189+
and mao."allowAffiliation" = false
190+
)
191+
${afterId !== undefined ? 'and mo.id > $(afterId)' : ''}
192+
order by mo.id
193+
limit $(limit);
194+
`,
195+
{ limit, afterId },
180196
)
181197
}
182198
}

0 commit comments

Comments
 (0)