Skip to content

Commit ab6a871

Browse files
committed
chore: optimized nango trigger frequencies
Signed-off-by: Uroš Marolt <[email protected]>
1 parent ba87406 commit ab6a871

2 files changed

Lines changed: 33 additions & 3 deletions

File tree

services/apps/cron_service/src/jobs/nangoTrigger.job.ts

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,25 @@ import { PlatformType } from '@crowd/types'
1717

1818
import { IJobDefinition } from '../types'
1919

20+
// How old an integration must be before we reduce its check frequency
21+
const AGE_THRESHOLD_MS = IS_DEV_ENV
22+
? 20 * 60 * 1000 // 20 minutes for local testing
23+
: 30 * 24 * 60 * 60 * 1000 // 1 month
24+
25+
// How often the cron runs (used to determine if old integrations should be triggered this run)
26+
const OLD_INTEGRATION_INTERVAL_HOURS = IS_DEV_ENV ? 0 : 6
27+
const OLD_INTEGRATION_INTERVAL_MINUTES = IS_DEV_ENV ? 15 : 0
28+
29+
function shouldTriggerOldIntegrations(now: Date): boolean {
30+
if (IS_DEV_ENV) {
31+
return now.getMinutes() % OLD_INTEGRATION_INTERVAL_MINUTES === 0
32+
}
33+
return now.getHours() % OLD_INTEGRATION_INTERVAL_HOURS === 0
34+
}
35+
2036
const job: IJobDefinition = {
2137
name: 'nango-trigger',
22-
cronTime: IS_DEV_ENV ? CronTime.everyMinute() : CronTime.everyHour(),
38+
cronTime: IS_DEV_ENV ? CronTime.every(5).minutes() : CronTime.everyHour(),
2339
timeout: 4 * 60 * 60, // 4 hours
2440
process: async (ctx) => {
2541
ctx.log.info('Triggering nango API check as if a webhook was received!')
@@ -28,10 +44,23 @@ const job: IJobDefinition = {
2844

2945
const dbConnection = await getDbConnection(READ_DB_CONFIG(), 3, 0)
3046

31-
const integrationsToTrigger = await fetchNangoIntegrationDataForCheck(pgpQx(dbConnection), [
47+
const allIntegrations = await fetchNangoIntegrationDataForCheck(pgpQx(dbConnection), [
3248
...new Set(ALL_NANGO_INTEGRATIONS.map(nangoIntegrationToPlatform)),
3349
])
3450

51+
const now = new Date()
52+
const triggerOld = shouldTriggerOldIntegrations(now)
53+
54+
const integrationsToTrigger = allIntegrations.filter((int) => {
55+
const ageMs = now.getTime() - new Date(int.createdAt).getTime()
56+
const isOld = ageMs >= AGE_THRESHOLD_MS
57+
return !isOld || triggerOld
58+
})
59+
60+
ctx.log.info(
61+
`Total integrations: ${allIntegrations.length}, triggering: ${integrationsToTrigger.length} (old integrations ${triggerOld ? 'included' : 'skipped'})`,
62+
)
63+
3564
const limiter = new ConcurrencyLimiter(5)
3665

3766
// Collect all workflow start operations

services/libs/data-access-layer/src/integrations/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ export interface INangoIntegrationData {
281281
segmentId: string
282282
platform: string
283283
settings: any
284+
createdAt: string
284285
}
285286

286287
export async function fetchIntegrationById(
@@ -323,7 +324,7 @@ export async function fetchNangoIntegrationDataForCheck(
323324
): Promise<INangoIntegrationData[]> {
324325
return qx.select(
325326
`
326-
select id, platform, settings
327+
select id, platform, settings, "createdAt"
327328
from integrations
328329
where platform in ($(platforms:csv)) and "deletedAt" is null
329330
order by (settings->'cursors' IS NULL) desc, "updatedAt" asc

0 commit comments

Comments
 (0)