Skip to content

Commit 4dd3457

Browse files
committed
chore: replace cronjob with temporal schedule
1 parent 64d1639 commit 4dd3457

5 files changed

Lines changed: 79 additions & 2 deletions

File tree

services/apps/snowflake_connectors/package.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
"lint": "npx eslint --ext .ts src --max-warnings=0",
1010
"format": "npx prettier --write \"src/**/*.ts\"",
1111
"format-check": "npx prettier --check .",
12-
"tsc-check": "tsc --noEmit"
12+
"tsc-check": "tsc --noEmit",
13+
"trigger-export": "SERVICE=snowflake-connectors tsx src/scripts/triggerExport.ts"
1314
},
1415
"dependencies": {
1516
"@crowd/archetype-standard": "workspace:*",
@@ -20,11 +21,13 @@
2021
"@crowd/archetype-worker": "workspace:*",
2122
"@crowd/queue": "workspace:*",
2223
"@crowd/redis": "workspace:*",
24+
"@crowd/slack": "workspace:*",
2325
"@crowd/snowflake": "workspace:*",
2426
"@crowd/temporal": "workspace:*",
2527
"@crowd/types": "workspace:*",
2628
"@aws-sdk/client-s3": "^3.700.0",
2729
"@dsnp/parquetjs": "^1.7.0",
30+
"@temporalio/client": "~1.11.8",
2831
"@temporalio/workflow": "~1.11.8",
2932
"tsx": "^4.7.1",
3033
"typescript": "^5.6.3"

services/apps/snowflake_connectors/src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@ import { getServiceChildLogger } from '@crowd/logging'
55

66
import { createTransformerConsumer } from './consumer/transformerConsumer'
77
import { svc } from './main'
8+
import { scheduleSnowflakeS3Export } from './schedules/snowflakeS3Export'
89

910
const log = getServiceChildLogger('main')
1011

1112
setImmediate(async () => {
1213
await svc.init()
1314

15+
await scheduleSnowflakeS3Export()
16+
1417
const consumer = await createTransformerConsumer()
1518
consumer.start()
1619

services/apps/snowflake_connectors/src/main.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ const config: Config = {
2020
enabled: false,
2121
},
2222
temporal: {
23-
enabled: false,
23+
enabled: true,
2424
},
2525
redis: {
2626
enabled: true,
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/client'
2+
3+
import { SlackChannel, SlackPersona, sendSlackNotification } from '@crowd/slack'
4+
5+
import { svc } from '../main'
6+
import { snowflakeS3ExportScheduler } from '../workflows'
7+
8+
export const scheduleSnowflakeS3Export = async () => {
9+
try {
10+
await svc.temporal.schedule.create({
11+
scheduleId: 'snowflake-s3-export',
12+
spec: {
13+
cronExpressions: ['0 0 * * *'],
14+
},
15+
policies: {
16+
overlap: ScheduleOverlapPolicy.SKIP,
17+
catchupWindow: '1 minute',
18+
},
19+
action: {
20+
type: 'startWorkflow',
21+
workflowType: snowflakeS3ExportScheduler,
22+
taskQueue: 'snowflakeConnectors',
23+
retry: {
24+
initialInterval: '15 seconds',
25+
backoffCoefficient: 2,
26+
maximumAttempts: 3,
27+
},
28+
args: [],
29+
},
30+
})
31+
} catch (err) {
32+
if (err instanceof ScheduleAlreadyRunning) {
33+
svc.log.info('Schedule already registered in Temporal.')
34+
svc.log.info('Configuration may have changed since. Please make sure they are in sync.')
35+
} else {
36+
svc.log.error({ err }, 'Failed to create snowflake-s3-export schedule')
37+
sendSlackNotification(
38+
SlackChannel.INTEGRATION_NOTIFICATIONS,
39+
SlackPersona.ERROR_REPORTER,
40+
'Snowflake S3 Export Schedule Failed',
41+
`Failed to create the \`snowflake-s3-export\` Temporal schedule.\n\n*Error:* ${err.message || err}`,
42+
)
43+
}
44+
}
45+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import { TEMPORAL_CONFIG, getTemporalClient } from '../config/settings'
2+
3+
async function main() {
4+
const client = await getTemporalClient(TEMPORAL_CONFIG())
5+
6+
const workflowId = `snowflake-export/manual/${new Date().toISOString().slice(0, 19)}`
7+
8+
await client.workflow.start('snowflakeS3ExportScheduler', {
9+
taskQueue: 'snowflakeConnectors',
10+
workflowId,
11+
retry: {
12+
initialInterval: '15 seconds',
13+
backoffCoefficient: 2,
14+
maximumAttempts: 3,
15+
},
16+
args: [],
17+
})
18+
19+
console.log(`Snowflake S3 export workflow started: ${workflowId}`)
20+
process.exit(0)
21+
}
22+
23+
main().catch((err) => {
24+
console.error('Failed to trigger workflow:', err)
25+
process.exit(1)
26+
})

0 commit comments

Comments
 (0)