Skip to content

Commit 37ee3e9

Browse files
authored
fix: prevent deadlocks in member affiliation updates (#3305)
1 parent 60c3b76 commit 37ee3e9

4 files changed

Lines changed: 192 additions & 222 deletions

File tree

services/apps/entity_merging_worker/src/activities/members.ts

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,47 @@ export async function deleteMember(memberId: string): Promise<void> {
2727
export async function recalculateActivityAffiliationsOfMemberAsync(
2828
memberId: string,
2929
): Promise<void> {
30-
await svc.temporal.workflow.start('memberUpdate', {
31-
taskQueue: 'profiles',
32-
workflowId: `${TemporalWorkflowId.MEMBER_UPDATE}/${memberId}`,
33-
workflowIdReusePolicy: WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING,
34-
retry: {
35-
maximumAttempts: 10,
36-
},
37-
args: [
38-
{
39-
member: {
40-
id: memberId,
41-
},
30+
const workflowId = `${TemporalWorkflowId.MEMBER_UPDATE}/${memberId}`
31+
32+
try {
33+
const handle = svc.temporal.workflow.getHandle(workflowId)
34+
const { status } = await handle.describe()
35+
36+
if (status.name === 'RUNNING') {
37+
await handle.result()
38+
}
39+
} catch (err) {
40+
if (err.name !== 'WorkflowNotFoundError') {
41+
svc.log.error({ err }, 'Failed to check workflow state')
42+
throw err
43+
}
44+
}
45+
46+
try {
47+
await svc.temporal.workflow.start('memberUpdate', {
48+
taskQueue: 'profiles',
49+
workflowId,
50+
// if the workflow is already running, this policy will throw an error
51+
workflowIdReusePolicy: WorkflowIdReusePolicy.REJECT_DUPLICATE,
52+
retry: {
53+
maximumAttempts: 10,
4254
},
43-
],
44-
})
55+
args: [
56+
{
57+
member: {
58+
id: memberId,
59+
},
60+
},
61+
],
62+
})
63+
} catch (err) {
64+
if (err.name === 'WorkflowExecutionAlreadyStartedError') {
65+
svc.log.info({ workflowId }, 'Workflow already started, skipping')
66+
return
67+
}
68+
69+
throw err
70+
}
4571
}
4672

4773
export async function syncMember(memberId: string): Promise<void> {

services/apps/profiles_worker/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@crowd/profiles-worker",
33
"scripts": {
4-
"start": "CROWD_TEMPORAL_TASKQUEUE=profiles SERVICE=profiles-worker tsx src/main.ts",
4+
"start": "CROWD_TEMPORAL_TASKQUEUE=profiles SERVICE=profiles-worker LOG_LEVEL=debug tsx src/main.ts",
55
"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=profiles SERVICE=profiles-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts",
66
"start:debug": "CROWD_TEMPORAL_TASKQUEUE=profiles SERVICE=profiles-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts",
77
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",

0 commit comments

Comments
 (0)