Skip to content

Commit e679565

Browse files
committed
chore: refactor trigger workflows into parent - child workflows for better grouping in temporal ui
Signed-off-by: Uroš Marolt <[email protected]>
1 parent d49716c commit e679565

4 files changed

Lines changed: 83 additions & 28 deletions

File tree

services/apps/cron_service/src/jobs/nangoTrigger.job.ts

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import {
99
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
1010
import {
1111
ALL_NANGO_INTEGRATIONS,
12-
INangoWebhookPayload,
1312
NANGO_INTEGRATION_CONFIG,
1413
NangoIntegration,
1514
nangoIntegrationToPlatform,
@@ -40,6 +39,12 @@ const OLD_INTERVAL_MS = IS_DEV_ENV
4039
? 15 * 60 * 1000 // 15 minutes
4140
: 6 * 60 * 60 * 1000 // 6 hours
4241

42+
interface INangoConnectionToCheck {
43+
connectionId: string
44+
models: string[]
45+
workflowIdPrefix: string
46+
}
47+
4348
const job: IJobDefinition = {
4449
name: 'nango-trigger',
4550
cronTime: IS_DEV_ENV ? CronTime.every(5).minutes() : CronTime.everyHour(),
@@ -87,6 +92,9 @@ const job: IJobDefinition = {
8792
const connectionIds: string[] =
8893
platform === NangoIntegration.GITHUB ? Object.keys(settings.nangoMapping) : [id]
8994

95+
const models = Object.values(NANGO_INTEGRATION_CONFIG[platform].models) as string[]
96+
const connections: INangoConnectionToCheck[] = []
97+
9098
for (const connectionId of connectionIds) {
9199
const key = `${id}/${connectionId}`
92100
const lastCheckedAt = lastCheckedAtMap.get(key)
@@ -104,35 +112,30 @@ const job: IJobDefinition = {
104112
`${i + 1}/${allIntegrations.length} Triggering nango integration check for ${id} / ${connectionId} (${platform})`,
105113
)
106114

107-
for (const model of Object.values(NANGO_INTEGRATION_CONFIG[platform].models)) {
108-
const payload: INangoWebhookPayload = {
109-
connectionId,
110-
providerConfigKey: platform,
111-
syncName: 'not important',
112-
model,
113-
responseResults: { added: 1, updated: 1, deleted: 1 },
114-
syncType: 'INCREMENTAL',
115-
modifiedAfter: new Date().toISOString(),
116-
}
115+
let workflowIdPrefix = ''
116+
if (platform === NangoIntegration.GITHUB) {
117+
const mapping = settings.nangoMapping[connectionId]
118+
workflowIdPrefix = `${mapping.owner}/${mapping.repoName}/${connectionId}`
119+
}
120+
121+
connections.push({ connectionId, models, workflowIdPrefix })
122+
}
117123

118-
const workflowId =
119-
platform === NangoIntegration.GITHUB
120-
? `nango-webhook/${platform}/${id}/${connectionId}/${model}/cron-triggered`
121-
: `nango-webhook/${platform}/${id}/${model}/cron-triggered`
122-
123-
workflowStarts.push(async () => {
124-
await temporal.workflow.start('processNangoWebhook', {
125-
taskQueue: 'nango',
126-
workflowId,
127-
workflowIdReusePolicy: WorkflowIdReusePolicy.ALLOW_DUPLICATE,
128-
workflowIdConflictPolicy: WorkflowIdConflictPolicy.USE_EXISTING,
129-
retry: {
130-
maximumAttempts: 10,
131-
},
132-
args: [payload],
133-
})
124+
if (connections.length > 0) {
125+
const workflowId = `nango-trigger/${platform}/${id}/cron-triggered`
126+
127+
workflowStarts.push(async () => {
128+
await temporal.workflow.start('triggerNangoIntegrationCheck', {
129+
taskQueue: 'nango',
130+
workflowId,
131+
workflowIdReusePolicy: WorkflowIdReusePolicy.ALLOW_DUPLICATE,
132+
workflowIdConflictPolicy: WorkflowIdConflictPolicy.USE_EXISTING,
133+
retry: {
134+
maximumAttempts: 10,
135+
},
136+
args: [{ integrationId: id, providerConfigKey: platform, connections }],
134137
})
135-
}
138+
})
136139
}
137140
}
138141

services/apps/nango_worker/src/types.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,15 @@ export interface ISyncGithubRepoArgs {
5252
export interface ISyncGithubRepoResult {
5353
skipped: boolean
5454
}
55+
56+
export interface INangoConnectionToCheck {
57+
connectionId: string
58+
models: string[]
59+
workflowIdPrefix: string
60+
}
61+
62+
export interface ITriggerNangoIntegrationCheckArguments {
63+
integrationId: string
64+
providerConfigKey: string
65+
connections: INangoConnectionToCheck[]
66+
}

services/apps/nango_worker/src/workflows.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ import { deleteGithubRepoConnection } from './workflows/deleteGithubRepoConnecti
33
import { processNangoWebhook } from './workflows/processNangoWebhook'
44
import { syncGithubIntegration } from './workflows/syncGithubIntegration'
55
import { syncGithubRepo } from './workflows/syncGithubRepo'
6+
import { triggerNangoIntegrationCheck } from './workflows/triggerNangoIntegrationCheck'
67

78
export {
89
deleteDuplicateGithubConnection,
910
deleteGithubRepoConnection,
1011
processNangoWebhook,
1112
syncGithubIntegration,
1213
syncGithubRepo,
14+
triggerNangoIntegrationCheck,
1315
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { ParentClosePolicy, WorkflowIdReusePolicy, startChild } from '@temporalio/workflow'
2+
3+
import { ITriggerNangoIntegrationCheckArguments } from '../types'
4+
5+
import { processNangoWebhook } from './processNangoWebhook'
6+
7+
export async function triggerNangoIntegrationCheck(
8+
args: ITriggerNangoIntegrationCheckArguments,
9+
): Promise<void> {
10+
for (const connection of args.connections) {
11+
for (const model of connection.models) {
12+
const workflowId = connection.workflowIdPrefix
13+
? `nango-webhook/${args.providerConfigKey}/${args.integrationId}/${connection.workflowIdPrefix}/${model}/cron-triggered`
14+
: `nango-webhook/${args.providerConfigKey}/${args.integrationId}/${model}/cron-triggered`
15+
16+
try {
17+
await startChild(processNangoWebhook, {
18+
workflowId,
19+
parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON,
20+
workflowIdReusePolicy: WorkflowIdReusePolicy.ALLOW_DUPLICATE,
21+
args: [
22+
{
23+
connectionId: connection.connectionId,
24+
providerConfigKey: args.providerConfigKey,
25+
model,
26+
syncType: 'INCREMENTAL' as const,
27+
},
28+
],
29+
})
30+
} catch (err) {
31+
if (err instanceof Error && err.name === 'WorkflowExecutionAlreadyStartedError') {
32+
continue
33+
}
34+
throw err
35+
}
36+
}
37+
}
38+
}

0 commit comments

Comments
 (0)