Skip to content

Commit c66fc38

Browse files
committed
chore: refactor cursors into a separate table
Signed-off-by: Uroš Marolt <uros@marolt.me>
1 parent ab6a871 commit c66fc38

11 files changed

Lines changed: 279 additions & 185 deletions

File tree

backend/src/database/migrations/U1770818540__createNangoCursorsTable.sql

Whitespace-only changes.
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
-- Backup settings before any modifications
2+
CREATE TABLE integration.integrations_settings_backup_02_13_2026 AS
3+
SELECT id, settings FROM integrations;
4+
5+
CREATE TABLE integration.nango_cursors (
6+
"integrationId" UUID NOT NULL,
7+
"connectionId" TEXT NOT NULL,
8+
platform TEXT NOT NULL,
9+
model TEXT NOT NULL,
10+
cursor TEXT NOT NULL,
11+
"lastCheckedAt" TIMESTAMPTZ,
12+
"createdAt" TIMESTAMPTZ NOT NULL DEFAULT now(),
13+
"updatedAt" TIMESTAMPTZ NOT NULL DEFAULT now(),
14+
PRIMARY KEY ("integrationId", "connectionId", model),
15+
FOREIGN KEY ("integrationId") REFERENCES integrations(id) ON DELETE CASCADE
16+
);
17+
18+
CREATE INDEX ix_nango_cursors_lastCheckedAt ON integration.nango_cursors ("lastCheckedAt" NULLS FIRST);
19+
CREATE INDEX ix_nango_cursors_connectionId ON integration.nango_cursors ("connectionId");
20+
21+
-- GitHub-nango: unnest nangoMapping keys x cursor models
22+
INSERT INTO integration.nango_cursors ("integrationId", "connectionId", platform, model, cursor)
23+
SELECT
24+
i.id,
25+
nm.key,
26+
'github',
27+
cm.key,
28+
cm.value #>> '{}'
29+
FROM integrations i,
30+
jsonb_each(i.settings->'nangoMapping') nm,
31+
jsonb_each(COALESCE(i.settings->'cursors'->nm.key, '{}'::jsonb)) cm
32+
WHERE i.platform = 'github-nango'
33+
AND i."deletedAt" IS NULL
34+
AND i.settings->'nangoMapping' IS NOT NULL
35+
ON CONFLICT DO NOTHING;
36+
37+
-- Non-GitHub nango: connectionId = integrationId, unnest cursor models
38+
INSERT INTO integration.nango_cursors ("integrationId", "connectionId", platform, model, cursor)
39+
SELECT
40+
i.id,
41+
i.id::text,
42+
CASE
43+
WHEN i.platform = 'gerrit' THEN 'gerrit'
44+
WHEN i.platform = 'jira' THEN COALESCE(i.settings->>'nangoIntegrationName', 'jira-basic')
45+
WHEN i.platform = 'confluence' THEN COALESCE(i.settings->>'nangoIntegrationName', 'confluence')
46+
ELSE i.platform
47+
END,
48+
cm.key,
49+
cm.value #>> '{}'
50+
FROM integrations i,
51+
jsonb_each(COALESCE(i.settings->'cursors'->i.id::text, '{}'::jsonb)) cm
52+
WHERE i.platform IN ('gerrit', 'jira', 'confluence')
53+
AND i."deletedAt" IS NULL
54+
ON CONFLICT DO NOTHING;
55+
56+
-- Clean up settings.cursors
57+
UPDATE integrations
58+
SET settings = settings - 'cursors'
59+
WHERE settings->'cursors' IS NOT NULL;

backend/src/services/integrationService.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -869,11 +869,6 @@ export default class IntegrationService {
869869
platform: PlatformType.GITHUB_NANGO,
870870
settings: {
871871
...settings,
872-
...(integration.settings.cursors
873-
? {
874-
cursors: integration.settings.cursors,
875-
}
876-
: {}),
877872
...(integration.settings.nangoMapping
878873
? {
879874
nangoMapping: integration.settings.nangoMapping,

services/apps/cron_service/src/jobs/nangoMonitoring.job.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
1111
import {
1212
INangoIntegrationData,
13+
fetchNangoCursorRowsForIntegration,
1314
fetchNangoIntegrationData,
1415
} from '@crowd/data-access-layer/src/integrations'
1516
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
@@ -98,9 +99,12 @@ const job: IJobDefinition = {
9899

99100
// then collect nango connection status checks for each connection
100101
if (int.settings.nangoMapping) {
102+
const cursorRows = await fetchNangoCursorRowsForIntegration(pgpQx(dbConnection), int.id)
103+
const connectionIdsWithCursors = new Set(cursorRows.map((r) => r.connectionId))
104+
101105
for (const connectionId of Object.keys(int.settings.nangoMapping)) {
102106
// check if we have cursors already for this connection
103-
if (!int.settings.cursors || !int.settings.cursors[connectionId]) {
107+
if (!connectionIdsWithCursors.has(connectionId)) {
104108
if (ghNoCursorsYet.has(int.id)) {
105109
ghNoCursorsYet.set(int.id, ghNoCursorsYet.get(int.id) + 1)
106110
} else {

services/apps/cron_service/src/jobs/nangoTrigger.job.ts

Lines changed: 74 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ import CronTime from 'cron-time-generator'
22

33
import { ConcurrencyLimiter, IS_DEV_ENV } from '@crowd/common'
44
import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
5-
import { fetchNangoIntegrationDataForCheck } from '@crowd/data-access-layer/src/integrations'
5+
import {
6+
fetchNangoIntegrationDataForCheck,
7+
fetchNangoLastCheckedAt,
8+
} from '@crowd/data-access-layer/src/integrations'
69
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
710
import {
811
ALL_NANGO_INTEGRATIONS,
@@ -12,7 +15,12 @@ import {
1215
nangoIntegrationToPlatform,
1316
platformToNangoIntegration,
1417
} from '@crowd/nango'
15-
import { TEMPORAL_CONFIG, WorkflowIdReusePolicy, getTemporalClient } from '@crowd/temporal'
18+
import {
19+
TEMPORAL_CONFIG,
20+
WorkflowIdConflictPolicy,
21+
WorkflowIdReusePolicy,
22+
getTemporalClient,
23+
} from '@crowd/temporal'
1624
import { PlatformType } from '@crowd/types'
1725

1826
import { IJobDefinition } from '../types'
@@ -22,16 +30,15 @@ const AGE_THRESHOLD_MS = IS_DEV_ENV
2230
? 20 * 60 * 1000 // 20 minutes for local testing
2331
: 30 * 24 * 60 * 60 * 1000 // 1 month
2432

25-
// How often the cron runs (used to determine if old integrations should be triggered this run)
26-
const OLD_INTEGRATION_INTERVAL_HOURS = IS_DEV_ENV ? 0 : 6
27-
const OLD_INTEGRATION_INTERVAL_MINUTES = IS_DEV_ENV ? 15 : 0
33+
// Minimum interval between checks for new integrations
34+
const NEW_INTERVAL_MS = IS_DEV_ENV
35+
? 5 * 60 * 1000 // 5 minutes
36+
: 60 * 60 * 1000 // 1 hour
2837

29-
function shouldTriggerOldIntegrations(now: Date): boolean {
30-
if (IS_DEV_ENV) {
31-
return now.getMinutes() % OLD_INTEGRATION_INTERVAL_MINUTES === 0
32-
}
33-
return now.getHours() % OLD_INTEGRATION_INTERVAL_HOURS === 0
34-
}
38+
// Minimum interval between checks for old integrations
39+
const OLD_INTERVAL_MS = IS_DEV_ENV
40+
? 15 * 60 * 1000 // 15 minutes
41+
: 6 * 60 * 60 * 1000 // 6 hours
3542

3643
const job: IJobDefinition = {
3744
name: 'nango-trigger',
@@ -43,100 +50,63 @@ const job: IJobDefinition = {
4350
const temporal = await getTemporalClient(TEMPORAL_CONFIG())
4451

4552
const dbConnection = await getDbConnection(READ_DB_CONFIG(), 3, 0)
53+
const qx = pgpQx(dbConnection)
4654

47-
const allIntegrations = await fetchNangoIntegrationDataForCheck(pgpQx(dbConnection), [
48-
...new Set(ALL_NANGO_INTEGRATIONS.map(nangoIntegrationToPlatform)),
49-
])
55+
const platforms = [...new Set(ALL_NANGO_INTEGRATIONS.map(nangoIntegrationToPlatform))]
5056

51-
const now = new Date()
52-
const triggerOld = shouldTriggerOldIntegrations(now)
57+
const allIntegrations = await fetchNangoIntegrationDataForCheck(qx, platforms)
5358

54-
const integrationsToTrigger = allIntegrations.filter((int) => {
55-
const ageMs = now.getTime() - new Date(int.createdAt).getTime()
56-
const isOld = ageMs >= AGE_THRESHOLD_MS
57-
return !isOld || triggerOld
58-
})
59-
60-
ctx.log.info(
61-
`Total integrations: ${allIntegrations.length}, triggering: ${integrationsToTrigger.length} (old integrations ${triggerOld ? 'included' : 'skipped'})`,
62-
)
59+
// Batch-fetch lastCheckedAt for all connections
60+
const lastCheckedAtRows = await fetchNangoLastCheckedAt(qx, platforms)
61+
const lastCheckedAtMap = new Map<string, string | null>()
62+
for (const row of lastCheckedAtRows) {
63+
lastCheckedAtMap.set(`${row.integrationId}/${row.connectionId}`, row.lastCheckedAt)
64+
}
6365

66+
const now = new Date()
6467
const limiter = new ConcurrencyLimiter(5)
65-
66-
// Collect all workflow start operations
6768
const workflowStarts: Array<() => Promise<void>> = []
69+
let skippedConnections = 0
6870

69-
for (let i = 0; i < integrationsToTrigger.length; i++) {
70-
const int = integrationsToTrigger[i]
71-
71+
for (let i = 0; i < allIntegrations.length; i++) {
72+
const int = allIntegrations[i]
7273
const { id, settings } = int
7374

74-
ctx.log.info(
75-
`${i + 1}/${integrationsToTrigger.length} Triggering nango integration check for ${id} (${int.platform})`,
76-
)
77-
7875
const platform = platformToNangoIntegration(int.platform as PlatformType, settings)
7976

8077
if (platform === NangoIntegration.GITHUB && !settings.nangoMapping) {
8178
// ignore non-nango github integrations
8279
continue
8380
}
8481

85-
for (const model of Object.values(NANGO_INTEGRATION_CONFIG[platform].models)) {
86-
ctx.log.debug(
87-
{
88-
integrationId: id,
89-
platform,
90-
model,
91-
},
92-
'Triggering nango integration check!',
93-
)
82+
const integrationAgeMs = now.getTime() - new Date(int.createdAt).getTime()
83+
const isOld = integrationAgeMs >= AGE_THRESHOLD_MS
84+
const requiredInterval = isOld ? OLD_INTERVAL_MS : NEW_INTERVAL_MS
9485

95-
if (platform === NangoIntegration.GITHUB) {
96-
// trigger for each connection id - could be multiple because 1 integration can have multiple repositories and each repository has a connection id on nango
97-
for (const connectionId of Object.keys(settings.nangoMapping)) {
98-
const payload: INangoWebhookPayload = {
99-
connectionId: connectionId,
100-
providerConfigKey: platform,
101-
syncName: 'not important',
102-
model,
103-
responseResults: { added: 1, updated: 1, deleted: 1 },
104-
syncType: 'INCREMENTAL',
105-
modifiedAfter: new Date().toISOString(),
106-
}
107-
108-
workflowStarts.push(async () => {
109-
try {
110-
await temporal.workflow.start('processNangoWebhook', {
111-
taskQueue: 'nango',
112-
workflowId: `nango-webhook/${platform}/${id}/${connectionId}/${model}/cron-triggered`,
113-
workflowIdReusePolicy:
114-
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
115-
retry: {
116-
maximumAttempts: 10,
117-
},
118-
args: [payload],
119-
})
120-
} catch (error) {
121-
if (error.name === 'WorkflowExecutionAlreadyStartedError') {
122-
ctx.log.debug(
123-
{
124-
integrationId: id,
125-
platform,
126-
model,
127-
connectionId,
128-
},
129-
'Workflow already running, skipping...',
130-
)
131-
return
132-
}
133-
throw error
134-
}
135-
})
86+
// Determine connectionIds for this integration
87+
const connectionIds: string[] =
88+
platform === NangoIntegration.GITHUB ? Object.keys(settings.nangoMapping) : [id]
89+
90+
for (const connectionId of connectionIds) {
91+
const key = `${id}/${connectionId}`
92+
const lastCheckedAt = lastCheckedAtMap.get(key)
93+
94+
// Skip if checked recently enough
95+
if (lastCheckedAt) {
96+
const elapsed = now.getTime() - new Date(lastCheckedAt).getTime()
97+
if (elapsed < requiredInterval) {
98+
skippedConnections++
99+
continue
136100
}
137-
} else {
101+
}
102+
103+
ctx.log.info(
104+
`${i + 1}/${allIntegrations.length} Triggering nango integration check for ${id} / ${connectionId} (${platform})`,
105+
)
106+
107+
for (const model of Object.values(NANGO_INTEGRATION_CONFIG[platform].models)) {
138108
const payload: INangoWebhookPayload = {
139-
connectionId: id,
109+
connectionId,
140110
providerConfigKey: platform,
141111
syncName: 'not important',
142112
model,
@@ -145,38 +115,30 @@ const job: IJobDefinition = {
145115
modifiedAfter: new Date().toISOString(),
146116
}
147117

118+
const workflowId =
119+
platform === NangoIntegration.GITHUB
120+
? `nango-webhook/${platform}/${id}/${connectionId}/${model}/cron-triggered`
121+
: `nango-webhook/${platform}/${id}/${model}/cron-triggered`
122+
148123
workflowStarts.push(async () => {
149-
try {
150-
await temporal.workflow.start('processNangoWebhook', {
151-
taskQueue: 'nango',
152-
workflowId: `nango-webhook/${platform}/${id}/${model}/cron-triggered`,
153-
workflowIdReusePolicy:
154-
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
155-
retry: {
156-
maximumAttempts: 10,
157-
},
158-
args: [payload],
159-
})
160-
} catch (error) {
161-
if (error.name === 'WorkflowExecutionAlreadyStartedError') {
162-
ctx.log.debug(
163-
{
164-
integrationId: id,
165-
platform,
166-
model,
167-
},
168-
'Workflow already running, skipping...',
169-
)
170-
return
171-
}
172-
throw error
173-
}
124+
await temporal.workflow.start('processNangoWebhook', {
125+
taskQueue: 'nango',
126+
workflowId,
127+
workflowIdReusePolicy: WorkflowIdReusePolicy.ALLOW_DUPLICATE,
128+
workflowIdConflictPolicy: WorkflowIdConflictPolicy.USE_EXISTING,
129+
retry: {
130+
maximumAttempts: 10,
131+
},
132+
args: [payload],
133+
})
174134
})
175135
}
176136
}
177137
}
178138

179-
ctx.log.info(`Triggering nango integration checks with ${workflowStarts.length} workflows!`)
139+
ctx.log.info(
140+
`Triggering ${workflowStarts.length} workflows (skipped ${skippedConnections} connections due to recent checks)`,
141+
)
180142

181143
// Track completed workflows
182144
let completedWorkflows = 0

0 commit comments

Comments
 (0)