Skip to content

Commit 486e1fb

Browse files
authored
feat: cleanup s3 files after processing snowflake activities [CM-1003] (#3880)
1 parent 67f97ce commit 486e1fb

14 files changed

Lines changed: 177 additions & 13 deletions

File tree

services/apps/snowflake_connectors/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
"format": "npx prettier --write \"src/**/*.ts\"",
1111
"format-check": "npx prettier --check .",
1212
"tsc-check": "tsc --noEmit",
13-
"trigger-export": "SERVICE=snowflake-connectors-worker tsx src/scripts/triggerExport.ts"
13+
"trigger-export": "SERVICE=snowflake-connectors-worker tsx src/scripts/triggerExport.ts",
14+
"trigger-cleanup": "SERVICE=snowflake-connectors-worker tsx src/scripts/triggerCleanup.ts"
1415
},
1516
"dependencies": {
1617
"@crowd/archetype-standard": "workspace:*",
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/database'
2+
import { getServiceChildLogger } from '@crowd/logging'
3+
import { SlackChannel, SlackPersona, sendSlackNotification } from '@crowd/slack'
4+
5+
import { MetadataStore } from '../core/metadataStore'
6+
import { S3Service } from '../core/s3Service'
7+
8+
const log = getServiceChildLogger('cleanupActivity')
9+
10+
export async function executeCleanup(intervalHours = 24): Promise<void> {
11+
const db = await getDbConnection(WRITE_DB_CONFIG())
12+
const metadataStore = new MetadataStore(db)
13+
const s3Service = new S3Service()
14+
15+
const jobs = await metadataStore.getCleanableJobS3Paths(intervalHours)
16+
log.info({ jobCount: jobs.length, intervalHours }, 'Found cleanable jobs')
17+
18+
for (const job of jobs) {
19+
try {
20+
await s3Service.deleteFile(job.s3Path)
21+
await metadataStore.markCleaned(job.id)
22+
log.info({ jobId: job.id, s3Path: job.s3Path }, 'Cleaned job')
23+
} catch (err) {
24+
log.error({ jobId: job.id, s3Path: job.s3Path, err }, 'Failed to clean job, skipping')
25+
sendSlackNotification(
26+
SlackChannel.INTEGRATION_NOTIFICATIONS,
27+
SlackPersona.ERROR_REPORTER,
28+
'Snowflake S3 Cleanup Failed',
29+
`Failed to clean job \`${job.id}\` at \`${job.s3Path}\`.\n\n*Error:* ${err instanceof Error ? err.message : err}`,
30+
)
31+
}
32+
}
33+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
export { getEnabledPlatforms, executeExport } from './exportActivity'
2+
export { executeCleanup } from './cleanupActivity'

services/apps/snowflake_connectors/src/consumer/transformerConsumer.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import { PlatformType } from '@crowd/types'
1313

1414
import { IntegrationResolver } from '../core/integrationResolver'
1515
import { MetadataStore, SnowflakeExportJob } from '../core/metadataStore'
16-
import { S3Consumer } from '../core/s3Consumer'
16+
import { S3Service } from '../core/s3Service'
1717
import { getEnabledPlatforms, getPlatform } from '../integrations'
1818

1919
const log = getServiceChildLogger('transformerConsumer')
@@ -26,7 +26,7 @@ export class TransformerConsumer {
2626

2727
constructor(
2828
private readonly metadataStore: MetadataStore,
29-
private readonly s3Consumer: S3Consumer,
29+
private readonly s3Service: S3Service,
3030
private readonly integrationResolver: IntegrationResolver,
3131
private readonly emitter: DataSinkWorkerEmitter,
3232
private readonly pollingIntervalMs: number,
@@ -81,7 +81,7 @@ export class TransformerConsumer {
8181
try {
8282
const platformDef = getPlatform(job.platform as PlatformType)
8383

84-
const rows = await this.s3Consumer.readParquetRows(job.s3Path)
84+
const rows = await this.s3Service.readParquetRows(job.s3Path)
8585

8686
let transformedCount = 0
8787
let transformSkippedCount = 0
@@ -146,7 +146,7 @@ export class TransformerConsumer {
146146
export async function createTransformerConsumer(): Promise<TransformerConsumer> {
147147
const db = await getDbConnection(WRITE_DB_CONFIG())
148148
const metadataStore = new MetadataStore(db)
149-
const s3Consumer = new S3Consumer()
149+
const s3Service = new S3Service()
150150
const redisClient = await getRedisClient(REDIS_CONFIG(), true)
151151
const cache = new RedisCache('snowflake-integration-resolver', redisClient, log)
152152
const resolver = new IntegrationResolver(db, cache)
@@ -158,5 +158,5 @@ export async function createTransformerConsumer(): Promise<TransformerConsumer>
158158

159159
const pollingIntervalMs = 10_000 // 10 seconds
160160

161-
return new TransformerConsumer(metadataStore, s3Consumer, resolver, emitter, pollingIntervalMs)
161+
return new TransformerConsumer(metadataStore, s3Service, resolver, emitter, pollingIntervalMs)
162162
}

services/apps/snowflake_connectors/src/core/metadataStore.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,31 @@ export class MetadataStore {
8989
return row ? mapRowToJob(row) : null
9090
}
9191

92-
// TODO: Add a cleanup workflow that deletes S3 files for completed/failed jobs
93-
// and sets "cleanedAt" to reclaim storage.
92+
async getCleanableJobS3Paths(intervalHours = 24): Promise<{ id: number; s3Path: string }[]> {
93+
const rows = await this.db.manyOrNone<{ id: number; s3_path: string }>(
94+
`SELECT id, s3_path
95+
FROM integration."snowflakeExportJobs"
96+
WHERE "completedAt" IS NOT NULL
97+
AND "cleanedAt" IS NULL
98+
AND error IS NULL
99+
AND metrics IS NOT NULL
100+
AND metrics ? 'skippedCount'
101+
AND (metrics->>'skippedCount')::int = 0
102+
AND "completedAt" <= NOW() - make_interval(hours => $1)
103+
ORDER BY "completedAt" ASC`,
104+
[intervalHours],
105+
)
106+
return rows.map((r) => ({ id: r.id, s3Path: r.s3_path }))
107+
}
108+
109+
async markCleaned(jobId: number): Promise<void> {
110+
await this.db.none(
111+
`UPDATE integration."snowflakeExportJobs"
112+
SET "cleanedAt" = NOW(), "updatedAt" = NOW()
113+
WHERE id = $1`,
114+
[jobId],
115+
)
116+
}
94117

95118
async markCompleted(jobId: number, metrics?: Partial<JobMetrics>): Promise<void> {
96119
await this.db.none(

services/apps/snowflake_connectors/src/core/s3Consumer.ts renamed to services/apps/snowflake_connectors/src/core/s3Service.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
* Responsible for reading exported files from S3
55
* (e.g., Parquet manifests or raw data) for downstream transformation.
66
*/
7-
import { GetObjectCommand, S3Client } from '@aws-sdk/client-s3'
7+
import { DeleteObjectCommand, GetObjectCommand, S3Client } from '@aws-sdk/client-s3'
88
import { ParquetReader } from '@dsnp/parquetjs'
99

1010
export interface S3Object {
@@ -13,7 +13,7 @@ export interface S3Object {
1313
lastModified: Date
1414
}
1515

16-
export class S3Consumer {
16+
export class S3Service {
1717
private readonly s3: S3Client
1818

1919
constructor() {
@@ -54,6 +54,11 @@ export class S3Consumer {
5454
return rows
5555
}
5656

57+
async deleteFile(s3Uri: string): Promise<void> {
58+
const { bucket, key } = this.parseS3Uri(s3Uri)
59+
await this.s3.send(new DeleteObjectCommand({ Bucket: bucket, Key: key }))
60+
}
61+
5762
private parseS3Uri(s3Uri: string): { bucket: string; key: string } {
5863
const url = new URL(s3Uri)
5964
return { bucket: url.hostname, key: url.pathname.slice(1) }

services/apps/snowflake_connectors/src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@ import { getServiceChildLogger } from '@crowd/logging'
55

66
import { createTransformerConsumer } from './consumer/transformerConsumer'
77
import { svc } from './main'
8-
import { scheduleSnowflakeS3Export } from './schedules/snowflakeS3Export'
8+
import { scheduleSnowflakeS3Cleanup, scheduleSnowflakeS3Export } from './schedules'
99

1010
const log = getServiceChildLogger('main')
1111

1212
setImmediate(async () => {
1313
await svc.init()
1414

1515
await scheduleSnowflakeS3Export()
16+
await scheduleSnowflakeS3Cleanup()
1617

1718
const consumer = await createTransformerConsumer()
1819
consumer.start()

services/apps/snowflake_connectors/src/integrations/cvent/buildSourceQuery.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { IS_PROD_ENV } from '@crowd/common'
2+
13
// Exclude TIMESTAMP_TZ columns and re-add as TIMESTAMP_NTZ so Parquet export gets timezone-normalized values.
24
const TIMESTAMP_TZ_COLUMNS = [
35
'EVENT_START_DATE',
@@ -34,7 +36,7 @@ export const buildSourceQuery = (sinceTimestamp?: string): string => {
3436
', ',
3537
)
3638

37-
const select = `
39+
let select = `
3840
SELECT
3941
er.* EXCLUDE (${excludeClause}),
4042
${castClauses},
@@ -55,6 +57,11 @@ export const buildSourceQuery = (sinceTimestamp?: string): string => {
5557
ON er.account_id = org.account_id
5658
WHERE ${LFID_COALESCE} IS NOT NULL`
5759

60+
// Limit to a single project in non-prod to avoid exporting all projects data
61+
if (!IS_PROD_ENV) {
62+
select += ` AND er.project_slug = 'pytorch'`
63+
}
64+
5865
const dedup = `
5966
QUALIFY ROW_NUMBER() OVER (PARTITION BY er.registration_id ORDER BY org.website DESC) = 1`
6067

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export { scheduleSnowflakeS3Cleanup } from './snowflakeS3Cleanup'
2+
export { scheduleSnowflakeS3Export } from './snowflakeS3Export'
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/client'
2+
3+
import { SlackChannel, SlackPersona, sendSlackNotification } from '@crowd/slack'
4+
5+
import { svc } from '../main'
6+
import { snowflakeS3CleanupScheduler } from '../workflows'
7+
8+
export const scheduleSnowflakeS3Cleanup = async () => {
9+
try {
10+
await svc.temporal.schedule.create({
11+
scheduleId: 'snowflake-s3-cleanup',
12+
spec: {
13+
// Run at 02:00 every day
14+
cronExpressions: ['00 2 * * *'],
15+
},
16+
policies: {
17+
overlap: ScheduleOverlapPolicy.SKIP,
18+
catchupWindow: '1 minute',
19+
},
20+
action: {
21+
type: 'startWorkflow',
22+
workflowType: snowflakeS3CleanupScheduler,
23+
taskQueue: 'snowflakeConnectors',
24+
retry: {
25+
initialInterval: '15 seconds',
26+
backoffCoefficient: 2,
27+
maximumAttempts: 3,
28+
},
29+
args: [],
30+
},
31+
})
32+
} catch (err) {
33+
if (err instanceof ScheduleAlreadyRunning) {
34+
svc.log.info('Cleanup schedule already registered in Temporal.')
35+
svc.log.info('Configuration may have changed since. Please make sure they are in sync.')
36+
} else {
37+
svc.log.error({ err }, 'Failed to create snowflake-s3-cleanup schedule')
38+
sendSlackNotification(
39+
SlackChannel.INTEGRATION_NOTIFICATIONS,
40+
SlackPersona.ERROR_REPORTER,
41+
'Snowflake S3 Cleanup Schedule Failed',
42+
`Failed to create the \`snowflake-s3-cleanup\` Temporal schedule.\n\n*Error:* ${err.message || err}`,
43+
)
44+
}
45+
}
46+
}

0 commit comments

Comments
 (0)