Skip to content

Commit 4355b36

Browse files
committed
chore: lint & format
1 parent 289f5b2 commit 4355b36

13 files changed

Lines changed: 114 additions & 67 deletions

File tree

services/apps/cron_service/src/jobs/snowflakeS3Export.job.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
import CronTime from 'cron-time-generator'
22

3-
import {
4-
TEMPORAL_CONFIG,
5-
WorkflowIdReusePolicy,
6-
getTemporalClient,
7-
} from '@crowd/temporal'
3+
import { TEMPORAL_CONFIG, WorkflowIdReusePolicy, getTemporalClient } from '@crowd/temporal'
84

95
import { IJobDefinition } from '../types'
106

services/apps/snowflake_connectors/src/activities/exportActivity.ts

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@
44
* This activity is invoked by the exportWorkflow and performs
55
* the actual Snowflake export and metadata bookkeeping.
66
*/
7-
8-
import { PlatformType } from '@crowd/types'
7+
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/database'
98
import { getServiceChildLogger } from '@crowd/logging'
10-
import { getDbConnection, WRITE_DB_CONFIG } from '@crowd/database'
9+
import { PlatformType } from '@crowd/types'
1110

12-
import { SnowflakeExporter } from '../core/snowflakeExporter'
1311
import { MetadataStore } from '../core/metadataStore'
12+
import { SnowflakeExporter } from '../core/snowflakeExporter'
1413
import { getPlatform } from '../integrations'
14+
1515
export { getEnabledPlatforms } from '../integrations'
1616

1717
const log = getServiceChildLogger('exportActivity')
@@ -21,7 +21,10 @@ function buildS3FilenamePrefix(platform: string): string {
2121
const year = now.getFullYear()
2222
const month = String(now.getMonth() + 1).padStart(2, '0')
2323
const day = String(now.getDate()).padStart(2, '0')
24-
const s3BucketPath = process.env.CROWD_SNOWFLAKE_S3_BUCKET_PATH!
24+
const s3BucketPath = process.env.CROWD_SNOWFLAKE_S3_BUCKET_PATH
25+
if (!s3BucketPath) {
26+
throw new Error('Missing required env var CROWD_SNOWFLAKE_S3_BUCKET_PATH')
27+
}
2528
return `${s3BucketPath}/${platform}/${year}/${month}/${day}`
2629
}
2730

@@ -53,6 +56,8 @@ export async function executeExport(platform: PlatformType): Promise<void> {
5356
log.error({ platform, err }, 'Export failed')
5457
throw err
5558
} finally {
56-
await exporter.destroy().catch((err) => log.warn({ err }, 'Failed to close Snowflake connection'))
59+
await exporter
60+
.destroy()
61+
.catch((err) => log.warn({ err }, 'Failed to close Snowflake connection'))
5762
}
5863
}

services/apps/snowflake_connectors/src/consumer/transformerConsumer.ts

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,17 @@
44
* Continuously polls the metadata store for pending jobs
55
* that need transformation, then runs the appropriate transformer.
66
*/
7-
8-
import { PlatformType } from '@crowd/types'
9-
import { getServiceChildLogger } from '@crowd/logging'
10-
import { getDbConnection, WRITE_DB_CONFIG } from '@crowd/database'
11-
import { QueueFactory, QUEUE_CONFIG } from '@crowd/queue'
12-
import { getRedisClient, REDIS_CONFIG, RedisCache } from '@crowd/redis'
137
import { DataSinkWorkerEmitter } from '@crowd/common_services'
8+
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/database'
9+
import { getServiceChildLogger } from '@crowd/logging'
10+
import { QUEUE_CONFIG, QueueFactory } from '@crowd/queue'
11+
import { REDIS_CONFIG, RedisCache, getRedisClient } from '@crowd/redis'
12+
import { PlatformType } from '@crowd/types'
1413

14+
import { IntegrationResolver } from '../core/integrationResolver'
1515
import { MetadataStore, SnowflakeExportJob } from '../core/metadataStore'
1616
import { S3Consumer } from '../core/s3Consumer'
17-
import { IntegrationResolver } from '../core/integrationResolver'
18-
import { getPlatform, getEnabledPlatforms } from '../integrations'
17+
import { getEnabledPlatforms, getPlatform } from '../integrations'
1918

2019
const log = getServiceChildLogger('transformerConsumer')
2120

@@ -56,9 +55,15 @@ export class TransformerConsumer {
5655
continue
5756
}
5857

59-
log.info({ currentPollingIntervalMs: this.currentPollingIntervalMs }, 'No pending jobs, backing off')
58+
log.info(
59+
{ currentPollingIntervalMs: this.currentPollingIntervalMs },
60+
'No pending jobs, backing off',
61+
)
6062
await this.sleep(this.currentPollingIntervalMs)
61-
this.currentPollingIntervalMs = Math.min(this.currentPollingIntervalMs * 2, MAX_POLLING_INTERVAL_MS)
63+
this.currentPollingIntervalMs = Math.min(
64+
this.currentPollingIntervalMs * 2,
65+
MAX_POLLING_INTERVAL_MS,
66+
)
6267
}
6368

6469
log.info('Transformer consumer stopped')
@@ -97,7 +102,11 @@ export class TransformerConsumer {
97102
continue
98103
}
99104

100-
await this.emitter.createAndProcessActivityResult(resolved.segmentId, resolved.integrationId, result.activity)
105+
await this.emitter.createAndProcessActivityResult(
106+
resolved.segmentId,
107+
resolved.integrationId,
108+
result.activity,
109+
)
101110
transformedCount++
102111
}
103112

@@ -115,7 +124,9 @@ export class TransformerConsumer {
115124
log.error({ jobId: job.id, err }, 'Job failed')
116125

117126
try {
118-
await this.metadataStore.markFailed(job.id, errorMessage, { processingDurationMs: Date.now() - startTime })
127+
await this.metadataStore.markFailed(job.id, errorMessage, {
128+
processingDurationMs: Date.now() - startTime,
129+
})
119130
} catch (updateErr) {
120131
log.error({ jobId: job.id, updateErr }, 'Failed to mark job as failed')
121132
}

services/apps/snowflake_connectors/src/core/integrationResolver.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,11 @@
55
* integration for the given platform + segment pair. Results are cached
66
* in Redis to avoid repeated DB queries across consumer restarts.
77
*/
8-
98
import { DEFAULT_TENANT_ID, generateUUIDv4 } from '@crowd/common'
109
import type { DbConnection } from '@crowd/database'
11-
import { PlatformType, IntegrationState } from '@crowd/types'
1210
import { getServiceChildLogger } from '@crowd/logging'
1311
import { RedisCache } from '@crowd/redis'
12+
import { IntegrationState, PlatformType } from '@crowd/types'
1413

1514
import type { SegmentRef } from './transformerBase'
1615

@@ -120,7 +119,10 @@ export class IntegrationResolver {
120119
await this.cache.delete(lockKey)
121120
}
122121
} else {
123-
log.info({ platform, segmentId: segmentRow.id }, 'Lock held by another consumer, fetching existing')
122+
log.info(
123+
{ platform, segmentId: segmentRow.id },
124+
'Lock held by another consumer, fetching existing',
125+
)
124126
integration = await this.db.oneOrNone<{ id: string }>(
125127
`SELECT id
126128
FROM integrations

services/apps/snowflake_connectors/src/core/metadataStore.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
* Tracks export batches, processing status, and timestamps
55
* to enable incremental exports and consumer polling.
66
*/
7-
87
import type { DbConnection } from '@crowd/database'
98

109
export interface JobMetrics {
@@ -88,7 +87,6 @@ export class MetadataStore {
8887
return row ? mapRowToJob(row) : null
8988
}
9089

91-
9290
// TODO: Add a cleanup workflow that deletes S3 files for completed/failed jobs
9391
// and sets "cleanedAt" to reclaim storage.
9492

@@ -125,7 +123,6 @@ export class MetadataStore {
125123
)
126124
return row?.max ?? null
127125
}
128-
129126
}
130127

131128
function mapRowToJob(row: {

services/apps/snowflake_connectors/src/core/s3Consumer.ts

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
* Responsible for reading exported files from S3
55
* (e.g., Parquet manifests or raw data) for downstream transformation.
66
*/
7-
87
import { GetObjectCommand, S3Client } from '@aws-sdk/client-s3'
98
import { ParquetReader } from '@dsnp/parquetjs'
109

@@ -18,19 +17,27 @@ export class S3Consumer {
1817
private readonly s3: S3Client
1918

2019
constructor() {
20+
const accessKeyId = process.env.CROWD_SNOWFLAKE_S3_ACCESS_KEY_ID
21+
const secretAccessKey = process.env.CROWD_SNOWFLAKE_S3_SECRET_ACCESS_KEY
22+
if (!accessKeyId || !secretAccessKey) {
23+
throw new Error(
24+
'Missing required env vars CROWD_SNOWFLAKE_S3_ACCESS_KEY_ID / CROWD_SNOWFLAKE_S3_SECRET_ACCESS_KEY',
25+
)
26+
}
27+
2128
this.s3 = new S3Client({
2229
region: process.env.CROWD_SNOWFLAKE_S3_REGION,
23-
credentials: {
24-
accessKeyId: process.env.CROWD_SNOWFLAKE_S3_ACCESS_KEY_ID!,
25-
secretAccessKey: process.env.CROWD_SNOWFLAKE_S3_SECRET_ACCESS_KEY!,
26-
},
30+
credentials: { accessKeyId, secretAccessKey },
2731
})
2832
}
2933

3034
async downloadFile(s3Uri: string): Promise<Buffer> {
3135
const { bucket, key } = this.parseS3Uri(s3Uri)
3236
const response = await this.s3.send(new GetObjectCommand({ Bucket: bucket, Key: key }))
33-
const byteArray = await response.Body!.transformToByteArray()
37+
if (!response.Body) {
38+
throw new Error(`Empty response body for ${s3Uri}`)
39+
}
40+
const byteArray = await response.Body.transformToByteArray()
3441
return Buffer.from(byteArray)
3542
}
3643

@@ -39,8 +46,8 @@ export class S3Consumer {
3946
const reader = await ParquetReader.openBuffer(buffer)
4047
const cursor = reader.getCursor()
4148
const rows: Record<string, unknown>[] = []
42-
let row: Record<string, unknown> | null
43-
while ((row = await cursor.next()) !== null) {
49+
let row: Record<string, unknown> | null = null
50+
while ((row = (await cursor.next()) as Record<string, unknown> | null) !== null) {
4451
rows.push(row)
4552
}
4653
await reader.close()

services/apps/snowflake_connectors/src/core/snowflakeExporter.ts

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,18 @@
44
* Responsible for executing COPY INTO queries against Snowflake
55
* to export data into S3 as Parquet files.
66
*/
7-
8-
import { SnowflakeClient } from '@crowd/snowflake'
97
import { getServiceChildLogger } from '@crowd/logging'
8+
import { SnowflakeClient } from '@crowd/snowflake'
109

1110
const log = getServiceChildLogger('snowflakeExporter')
1211

1312
const DEFAULT_BATCH_SIZE = 10_000
1413

15-
export type OnBatchComplete = (s3Path: string, totalRows: number, totalBytes: number) => Promise<void>
14+
export type OnBatchComplete = (
15+
s3Path: string,
16+
totalRows: number,
17+
totalBytes: number,
18+
) => Promise<void>
1619

1720
interface CopyIntoRow {
1821
rows_unloaded: number
@@ -37,12 +40,16 @@ export class SnowflakeExporter {
3740
s3FilenamePrefix: string,
3841
onBatchComplete?: OnBatchComplete,
3942
): Promise<void> {
40-
const storageIntegration = process.env.CROWD_SNOWFLAKE_STORAGE_INTEGRATION!
43+
const storageIntegration = process.env.CROWD_SNOWFLAKE_STORAGE_INTEGRATION
44+
if (!storageIntegration) {
45+
throw new Error('Missing required env var CROWD_SNOWFLAKE_STORAGE_INTEGRATION')
46+
}
4147
const limit = DEFAULT_BATCH_SIZE
4248
let offset = 0
4349
let batch = 0
50+
let hasMoreRows = true
4451

45-
while (true) {
52+
while (hasMoreRows) {
4653
const filename = `batch_${batch}.parquet`
4754
const s3Path = `${s3FilenamePrefix}/${filename}`
4855

@@ -58,13 +65,17 @@ export class SnowflakeExporter {
5865
OVERWRITE = TRUE
5966
`
6067

61-
log.info({ s3FilenamePrefix, storageIntegration, batch, offset, limit }, 'Executing COPY INTO batch')
68+
log.info(
69+
{ s3FilenamePrefix, storageIntegration, batch, offset, limit },
70+
'Executing COPY INTO batch',
71+
)
6272

6373
const results = await this.snowflake.run<CopyIntoRow>(copyQuery)
6474

6575
if (results.length === 0) {
6676
log.info({ batch, totalRowsExported: offset }, 'No more rows to export')
67-
break
77+
hasMoreRows = false
78+
continue
6879
}
6980

7081
const batchRows = results.reduce((sum, r) => sum + r.rows_unloaded, 0)
@@ -76,8 +87,12 @@ export class SnowflakeExporter {
7687
await onBatchComplete(s3Path, batchRows, batchBytes)
7788
}
7889
if (batchRows < limit) {
79-
log.info({ totalRowsExported: offset + batchRows }, 'Export finished (last batch was partial)')
80-
break
90+
log.info(
91+
{ totalRowsExported: offset + batchRows },
92+
'Export finished (last batch was partial)',
93+
)
94+
hasMoreRows = false
95+
continue
8196
}
8297

8398
offset += limit

services/apps/snowflake_connectors/src/core/transformerBase.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,8 @@
44
* Each integration must implement this base class to define
55
* how raw exported data is transformed into activities.
66
*/
7-
8-
import { IActivityData, PlatformType } from '@crowd/types'
97
import { getServiceChildLogger } from '@crowd/logging'
8+
import { IActivityData, PlatformType } from '@crowd/types'
109

1110
const log = getServiceChildLogger('transformer')
1211

@@ -28,12 +27,12 @@ export abstract class TransformerBase {
2827
* Transform a single raw row from the S3 export into an activity
2928
* along with routing metadata. Returns null if the row should be skipped.
3029
*/
31-
abstract transformRow(row: Record<string, any>): TransformedActivity | null
30+
abstract transformRow(row: Record<string, unknown>): TransformedActivity | null
3231

3332
/**
3433
* Safe wrapper around transformRow that catches errors and returns null.
3534
*/
36-
safeTransformRow(row: Record<string, any>): TransformedActivity | null {
35+
safeTransformRow(row: Record<string, unknown>): TransformedActivity | null {
3736
try {
3837
return this.transformRow(row)
3938
} catch (err) {

services/apps/snowflake_connectors/src/index.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
/**
22
* Entry point: Start Temporal worker + transformer consumer loop.
33
*/
4-
54
import { getServiceChildLogger } from '@crowd/logging'
65

7-
import { svc } from './main'
86
import { createTransformerConsumer } from './consumer/transformerConsumer'
7+
import { svc } from './main'
98

109
const log = getServiceChildLogger('main')
1110

services/apps/snowflake_connectors/src/integrations/cvent/buildSourceQuery.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ const CDP_MATCHED_SEGMENTS = `
3030

3131
export const buildSourceQuery = (sinceTimestamp?: string): string => {
3232
const excludeClause = TIMESTAMP_TZ_COLUMNS.join(', ')
33-
const castClauses = TIMESTAMP_TZ_COLUMNS.map(
34-
(col) => `er.${col}::TIMESTAMP_NTZ AS ${col}`,
35-
).join(', ')
33+
const castClauses = TIMESTAMP_TZ_COLUMNS.map((col) => `er.${col}::TIMESTAMP_NTZ AS ${col}`).join(
34+
', ',
35+
)
3636

3737
const select = `
3838
SELECT

0 commit comments

Comments
 (0)