Skip to content

Commit 289f5b2

Browse files
committed
feat: add metrics for observability
1 parent 16245f4 commit 289f5b2

6 files changed

Lines changed: 52 additions & 28 deletions

File tree

backend/src/database/migrations/U1771605060__addMetricsToSnowflakeExport.sql

Whitespace-only changes.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
ALTER TABLE integration."snowflakeExportJobs"
2+
ADD COLUMN metrics JSONB;
3+
4+
ALTER TABLE integration."snowflakeExportJobs"
5+
DROP COLUMN "totalRows",
6+
DROP COLUMN "totalBytes";

services/apps/snowflake_connectors/src/activities/exportActivity.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,5 @@ export async function executeExport(platform: PlatformType): Promise<void> {
5454
throw err
5555
} finally {
5656
await exporter.destroy().catch((err) => log.warn({ err }, 'Failed to close Snowflake connection'))
57-
await db.$pool.end().catch((err) => log.warn({ err }, 'Failed to close DB connection'))
5857
}
5958
}

services/apps/snowflake_connectors/src/consumer/transformerConsumer.ts

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ export class TransformerConsumer {
7171
private async processJob(job: SnowflakeExportJob): Promise<void> {
7272
log.info({ jobId: job.id, platform: job.platform, s3Path: job.s3Path }, 'Processing job')
7373

74+
const startTime = Date.now()
75+
7476
try {
7577
const platformDef = getPlatform(job.platform as PlatformType)
7678

@@ -99,18 +101,21 @@ export class TransformerConsumer {
99101
transformedCount++
100102
}
101103

102-
await this.metadataStore.markCompleted(job.id)
104+
const processingMetrics = {
105+
transformedCount,
106+
skippedCount,
107+
processingDurationMs: Date.now() - startTime,
108+
}
109+
110+
await this.metadataStore.markCompleted(job.id, processingMetrics)
103111

104-
log.info(
105-
{ jobId: job.id, totalRows: rows.length, transformedCount, skippedCount },
106-
'Job completed',
107-
)
112+
log.info({ jobId: job.id, ...processingMetrics }, 'Job completed')
108113
} catch (err) {
109114
const errorMessage = err instanceof Error ? err.message : String(err)
110115
log.error({ jobId: job.id, err }, 'Job failed')
111116

112117
try {
113-
await this.metadataStore.markFailed(job.id, errorMessage)
118+
await this.metadataStore.markFailed(job.id, errorMessage, { processingDurationMs: Date.now() - startTime })
114119
} catch (updateErr) {
115120
log.error({ jobId: job.id, updateErr }, 'Failed to mark job as failed')
116121
}

services/apps/snowflake_connectors/src/core/metadataStore.ts

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,26 @@
77

88
import type { DbConnection } from '@crowd/database'
99

10+
export interface JobMetrics {
11+
exportedRows?: number
12+
exportedBytes?: number
13+
transformedCount?: number
14+
skippedCount?: number
15+
processingDurationMs?: number
16+
}
17+
1018
export interface SnowflakeExportJob {
1119
id: number
1220
platform: string
1321
s3Path: string
14-
totalRows: number
15-
totalBytes: number
1622
exportStartedAt: Date | null
1723
createdAt: Date
1824
updatedAt: Date
1925
processingStartedAt: Date | null
2026
completedAt: Date | null
2127
cleanedAt: Date | null
2228
error: string | null
29+
metrics: JobMetrics | null
2330
}
2431

2532
export class MetadataStore {
@@ -32,19 +39,19 @@ export class MetadataStore {
3239
totalBytes: number,
3340
exportStartedAt: Date,
3441
): Promise<void> {
42+
const metrics: JobMetrics = { exportedRows: totalRows, exportedBytes: totalBytes }
3543
await this.db.none(
36-
`INSERT INTO integration."snowflakeExportJobs" (platform, s3_path, "totalRows", "totalBytes", "exportStartedAt")
37-
VALUES ($1, $2, $3, $4, $5)
44+
`INSERT INTO integration."snowflakeExportJobs" (platform, s3_path, "exportStartedAt", metrics)
45+
VALUES ($1, $2, $3, $4)
3846
ON CONFLICT (s3_path) DO UPDATE SET
39-
"totalRows" = EXCLUDED."totalRows",
40-
"totalBytes" = EXCLUDED."totalBytes",
4147
"exportStartedAt" = EXCLUDED."exportStartedAt",
4248
"processingStartedAt" = NULL,
4349
"completedAt" = NULL,
4450
"cleanedAt" = NULL,
4551
error = NULL,
52+
metrics = EXCLUDED.metrics,
4653
"updatedAt" = NOW()`,
47-
[platform, s3Path, totalRows, totalBytes, exportStartedAt],
54+
[platform, s3Path, exportStartedAt, JSON.stringify(metrics)],
4855
)
4956
}
5057

@@ -57,15 +64,14 @@ export class MetadataStore {
5764
id: number
5865
platform: string
5966
s3_path: string
60-
totalRows: string
61-
totalBytes: string
6267
exportStartedAt: Date | null
6368
createdAt: Date
6469
updatedAt: Date
6570
processingStartedAt: Date | null
6671
completedAt: Date | null
6772
cleanedAt: Date | null
6873
error: string | null
74+
metrics: JobMetrics | null
6975
}>(
7076
`UPDATE integration."snowflakeExportJobs"
7177
SET "processingStartedAt" = NOW(), "updatedAt" = NOW()
@@ -76,27 +82,35 @@ export class MetadataStore {
7682
LIMIT 1
7783
FOR UPDATE SKIP LOCKED
7884
)
79-
RETURNING id, platform, s3_path, "totalRows", "totalBytes", "exportStartedAt",
80-
"createdAt", "updatedAt", "processingStartedAt", "completedAt", "cleanedAt", error`,
85+
RETURNING id, platform, s3_path, "exportStartedAt",
86+
"createdAt", "updatedAt", "processingStartedAt", "completedAt", "cleanedAt", error, metrics`,
8187
)
8288
return row ? mapRowToJob(row) : null
8389
}
8490

85-
async markCompleted(jobId: number): Promise<void> {
91+
92+
// TODO: Add a cleanup workflow that deletes S3 files for completed/failed jobs
93+
// and sets "cleanedAt" to reclaim storage.
94+
95+
async markCompleted(jobId: number, metrics?: Partial<JobMetrics>): Promise<void> {
8696
await this.db.none(
8797
`UPDATE integration."snowflakeExportJobs"
88-
SET "completedAt" = NOW(), "updatedAt" = NOW()
98+
SET "completedAt" = NOW(),
99+
metrics = COALESCE(metrics, '{}'::jsonb) || COALESCE($2::jsonb, '{}'::jsonb),
100+
"updatedAt" = NOW()
89101
WHERE id = $1`,
90-
[jobId],
102+
[jobId, metrics ? JSON.stringify(metrics) : null],
91103
)
92104
}
93105

94-
async markFailed(jobId: number, error: string): Promise<void> {
106+
async markFailed(jobId: number, error: string, metrics?: Partial<JobMetrics>): Promise<void> {
95107
await this.db.none(
96108
`UPDATE integration."snowflakeExportJobs"
97-
SET error = $2, "completedAt" = NOW(), "updatedAt" = NOW()
109+
SET error = $2, "completedAt" = NOW(),
110+
metrics = COALESCE(metrics, '{}'::jsonb) || COALESCE($3::jsonb, '{}'::jsonb),
111+
"updatedAt" = NOW()
98112
WHERE id = $1`,
99-
[jobId, error],
113+
[jobId, error, metrics ? JSON.stringify(metrics) : null],
100114
)
101115
}
102116

@@ -118,28 +132,26 @@ function mapRowToJob(row: {
118132
id: number
119133
platform: string
120134
s3_path: string
121-
totalRows: string
122-
totalBytes: string
123135
exportStartedAt: Date | null
124136
createdAt: Date
125137
updatedAt: Date
126138
processingStartedAt: Date | null
127139
completedAt: Date | null
128140
cleanedAt: Date | null
129141
error: string | null
142+
metrics: JobMetrics | null
130143
}): SnowflakeExportJob {
131144
return {
132145
id: row.id,
133146
platform: row.platform,
134147
s3Path: row.s3_path,
135-
totalRows: Number(row.totalRows),
136-
totalBytes: Number(row.totalBytes),
137148
exportStartedAt: row.exportStartedAt,
138149
createdAt: row.createdAt,
139150
updatedAt: row.updatedAt,
140151
processingStartedAt: row.processingStartedAt,
141152
completedAt: row.completedAt,
142153
cleanedAt: row.cleanedAt,
143154
error: row.error,
155+
metrics: row.metrics,
144156
}
145157
}

services/apps/snowflake_connectors/src/core/snowflakeExporter.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ export class SnowflakeExporter {
4646
const filename = `batch_${batch}.parquet`
4747
const s3Path = `${s3FilenamePrefix}/${filename}`
4848

49+
// TODO: LIMIT/OFFSET over mutable data can skip or duplicate rows if the source changes between batches.
50+
// Consider materializing a temp table from the source query first, then paginating over it.
4951
const copyQuery = `
5052
COPY INTO '${s3Path}'
5153
FROM (${sourceQuery} LIMIT ${limit} OFFSET ${offset})

0 commit comments

Comments
 (0)