Skip to content

Commit 255c911

Browse files
committed
memory management
1 parent 2e5127d commit 255c911

2 files changed

Lines changed: 128 additions & 28 deletions

File tree

infra/bigquery-export/firestore.js

Lines changed: 107 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,64 @@ export class FirestoreBatch {
2020
// Configuration constants
2121
this.config = {
2222
timeout: 10 * 60 * 1000, // 10 minutes
23-
progressReportInterval: 200000, // Report progress every N operations
24-
flushThreshold: 200000 // Flush BulkWriter every N operations
23+
progressReportInterval: 100000, // Report progress every N operations
24+
flushThreshold: 50000, // Flush BulkWriter every N operations
25+
gcInterval: 50000 // Force garbage collection interval
2526
}
2627

2728
this.reset()
2829
}
2930

31+
// Memory monitoring utility
32+
logMemoryUsage (operation = '') {
33+
const used = process.memoryUsage()
34+
const memoryInfo = {
35+
rss: Math.round(used.rss / 1024 / 1024 * 100) / 100,
36+
heapTotal: Math.round(used.heapTotal / 1024 / 1024 * 100) / 100,
37+
heapUsed: Math.round(used.heapUsed / 1024 / 1024 * 100) / 100,
38+
external: Math.round(used.external / 1024 / 1024 * 100) / 100
39+
}
40+
41+
console.log(`Memory usage ${operation}: RSS ${memoryInfo.rss}MB, Heap Used ${memoryInfo.heapUsed}MB, Heap Total ${memoryInfo.heapTotal}MB, External ${memoryInfo.external}MB`)
42+
43+
// Configurable memory warning threshold from environment
44+
const warningThreshold = parseInt(process.env.MEMORY_WARNING_THRESHOLD_MB || '1500')
45+
if (memoryInfo.heapUsed > warningThreshold) {
46+
console.warn(`⚠️ High memory usage detected: ${memoryInfo.heapUsed}MB heap used (threshold: ${warningThreshold}MB)`)
47+
}
48+
49+
return memoryInfo
50+
}
51+
52+
// Enhanced reset with memory cleanup
3053
reset () {
3154
this.processedDocs = 0
3255
this.totalDocs = 0
56+
57+
// Clean up existing BulkWriter if it exists
58+
if (this.bulkWriter) {
59+
try {
60+
this.bulkWriter.close()
61+
} catch (error) {
62+
console.warn('Error closing existing BulkWriter:', error.message)
63+
}
64+
}
3365
this.bulkWriter = null
66+
67+
// Force garbage collection if available
68+
if (global.gc) {
69+
global.gc()
70+
}
71+
72+
// Log memory usage after reset
73+
this.logMemoryUsage('after reset')
3474
}
3575

3676
createBulkWriter (operation) {
3777
const bulkWriter = this.firestore.bulkWriter()
3878

79+
bulkWriter.maxBatchSize = 500 // Reduce batch size for memory efficiency
80+
3981
// Configure error handling with progress info
4082
bulkWriter.onWriteError((error) => {
4183
const progressInfo = this.totalDocs > 0 ? ` (${this.processedDocs}/${this.totalDocs})` : ''
@@ -54,6 +96,11 @@ export class FirestoreBatch {
5496
if (this.processedDocs % this.config.progressReportInterval === 0) {
5597
const progressInfo = this.totalDocs > 0 ? ` (${this.processedDocs}/${this.totalDocs})` : ` (${this.processedDocs} processed)`
5698
console.log(`Progress${progressInfo} - ${operation}ing documents in ${this.collectionName}`)
99+
100+
// Force garbage collection periodically
101+
if (this.processedDocs % this.config.gcInterval === 0 && global.gc) {
102+
global.gc()
103+
}
57104
}
58105
})
59106

@@ -115,20 +162,26 @@ export class FirestoreBatch {
115162
if (snapshot.empty) break
116163

117164
// Add all delete operations to BulkWriter
118-
snapshot.docs.forEach(doc => {
165+
for (const doc of snapshot.docs) {
119166
this.bulkWriter.delete(doc.ref)
120167
deletedCount++
121-
})
122168

123-
// Periodically flush to manage memory
124-
// if (deletedCount % this.config.flushThreshold === 0) {
125-
console.log(`Flushing BulkWriter at ${deletedCount} operations...`)
126-
await this.bulkWriter.flush()
127-
// }
169+
// Frequent flushing to prevent memory buildup
170+
if (deletedCount % this.config.flushThreshold === 0) {
171+
console.log(`Flushing BulkWriter at ${deletedCount} operations...`)
172+
await this.bulkWriter.flush()
173+
174+
// Force garbage collection after flush
175+
if (global.gc) {
176+
global.gc()
177+
}
178+
}
179+
}
128180
}
129181

130182
// Final flush and close
131183
console.log('Finalizing deletion operations...')
184+
await this.bulkWriter.flush()
132185
await this.bulkWriter.close()
133186

134187
const duration = (Date.now() - startTime) / 1000
@@ -144,32 +197,52 @@ export class FirestoreBatch {
144197
this.bulkWriter = this.createBulkWriter('writ')
145198

146199
let rowCount = 0
200+
let batchCount = 0
147201
const collectionRef = this.firestore.collection(this.collectionName)
148202

149-
for await (const row of rowStream) {
150-
// Add document to BulkWriter
151-
const docRef = collectionRef.doc()
152-
this.bulkWriter.set(docRef, row)
153-
154-
rowCount++
155-
this.totalDocs = rowCount // Update total as we go since we can't predict BigQuery result size
156-
157-
// Periodically flush to manage memory
158-
if (rowCount % this.config.flushThreshold === 0) {
159-
console.log(`Flushing BulkWriter at ${rowCount} operations...`)
160-
await this.bulkWriter.flush()
203+
try {
204+
for await (const row of rowStream) {
205+
// Add document to BulkWriter
206+
const docRef = collectionRef.doc()
207+
this.bulkWriter.set(docRef, row)
208+
209+
rowCount++
210+
this.totalDocs = rowCount // Update totalDocs for progress tracking
211+
212+
if (rowCount % this.config.flushThreshold === 0) {
213+
console.log(`Flushing BulkWriter at ${rowCount} operations...`)
214+
await this.bulkWriter.flush()
215+
batchCount++
216+
217+
if (batchCount % 5 === 0 && global.gc) {
218+
console.log(`Forcing garbage collection at batch ${batchCount}...`)
219+
global.gc()
220+
}
221+
}
161222
}
223+
} catch (error) {
224+
console.error('Error during BigQuery streaming:', error)
225+
throw error
162226
}
163227

164228
// Final flush and close
165229
console.log('Finalizing write operations...')
230+
await this.bulkWriter.flush()
166231
await this.bulkWriter.close()
167232

233+
// Final garbage collection
234+
if (global.gc) {
235+
global.gc()
236+
}
237+
168238
const duration = (Date.now() - startTime) / 1000
169239
console.info(`Transfer to ${this.collectionName} complete. Total rows processed: ${this.processedDocs}. Time: ${duration} seconds`)
170240
}
171241

172242
async export (query, exportConfig) {
243+
console.log(`Starting export to ${exportConfig.collection}...`)
244+
this.logMemoryUsage('at start')
245+
173246
// Configure Firestore settings
174247
this.firestore.settings({
175248
databaseId: exportConfig.database,
@@ -183,9 +256,19 @@ export class FirestoreBatch {
183256
date: exportConfig.date
184257
})
185258

186-
await this.batchDelete()
259+
try {
260+
await this.batchDelete()
261+
this.logMemoryUsage('after deletion')
187262

188-
const rowStream = await this.bigquery.queryResultsStream(query)
189-
await this.streamFromBigQuery(rowStream)
263+
const rowStream = await this.bigquery.queryResultsStream(query)
264+
await this.streamFromBigQuery(rowStream)
265+
266+
this.logMemoryUsage('at completion')
267+
console.log(`✅ Export to ${exportConfig.collection} completed successfully`)
268+
} catch (error) {
269+
this.logMemoryUsage('on error')
270+
console.error(`❌ Export to ${exportConfig.collection} failed:`, error)
271+
throw error
272+
}
190273
}
191274
}

infra/tf/bigquery_export/main.tf

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,14 @@ resource "google_cloud_run_v2_job" "bigquery_export" {
2020
deletion_protection = false
2121

2222
template {
23-
parallelism = 0
23+
parallelism = 1
24+
task_count = 1 # Ensure single task execution
25+
2426
template {
27+
timeout = "10800s" # 3 hours
28+
service_account = var.function_identity
29+
max_retries = 0 # No retries
30+
2531
containers {
2632
image = "${var.location}.gcr.io/${var.project}/cloud-run/${var.function_name}:latest"
2733
resources {
@@ -34,10 +40,21 @@ resource "google_cloud_run_v2_job" "bigquery_export" {
3440
name = "EXPORT_CONFIG"
3541
value = ""
3642
}
43+
env {
44+
name = "NODE_OPTIONS"
45+
value = "--expose-gc --max-old-space-size=6144" # 6GB heap limit with GC
46+
}
47+
48+
env {
49+
name = "MEMORY_WARNING_THRESHOLD_MB"
50+
value = "4915" # 80% of max heap size = 6144MB
51+
}
52+
53+
env {
54+
name = "LOG_LEVEL"
55+
value = "info"
56+
}
3757
}
38-
timeout = "7200s"
39-
service_account = var.function_identity
40-
max_retries = 1
4158
}
4259
}
4360
}

0 commit comments

Comments
 (0)