@@ -17,6 +17,8 @@ const log = getServiceChildLogger('integrationResolver')
1717
1818const CACHE_TTL_SECONDS = 24 * 60 * 60 // 24 hours
1919const LOCK_TTL_SECONDS = 30
20+ const LOCK_WAIT_RETRIES = 5
21+ const LOCK_WAIT_BASE_MS = 200
2022
2123export interface ResolvedIntegration {
2224 segmentId : string
@@ -121,20 +123,32 @@ export class IntegrationResolver {
121123 } else {
122124 log . info (
123125 { platform, segmentId : segmentRow . id } ,
124- 'Lock held by another consumer, fetching existing' ,
125- )
126- integration = await this . db . oneOrNone < { id : string } > (
127- `SELECT id
128- FROM integrations
129- WHERE platform = $1
130- AND "segmentId" = $2
131- AND "deletedAt" IS NULL
132- LIMIT 1` ,
133- [ platform , segmentRow . id ] ,
126+ 'Lock held by another consumer, waiting for commit' ,
134127 )
128+ for ( let attempt = 0 ; attempt < LOCK_WAIT_RETRIES ; attempt ++ ) {
129+ await new Promise ( ( r ) => setTimeout ( r , LOCK_WAIT_BASE_MS * 2 ** attempt ) )
130+ integration = await this . db . oneOrNone < { id : string } > (
131+ `SELECT id
132+ FROM integrations
133+ WHERE platform = $1
134+ AND "segmentId" = $2
135+ AND "deletedAt" IS NULL
136+ LIMIT 1` ,
137+ [ platform , segmentRow . id ] ,
138+ )
139+ if ( integration ) break
140+ }
135141 }
136142 }
137143
144+ if ( ! integration ) {
145+ log . warn (
146+ { platform, segmentId : segmentRow . id } ,
147+ 'Integration not found after lock contention, skipping' ,
148+ )
149+ return null
150+ }
151+
138152 const result : ResolvedIntegration = {
139153 segmentId : segmentRow . id ,
140154 integrationId : integration . id ,
0 commit comments