Skip to content

Commit 2fb9d6a

Browse files
dahliaclaude
andcommitted
Fix advisory lock leak in PostgresMessageQueue
The original implementation called pg_try_advisory_lock() inside a WHERE clause of a subquery. PostgreSQL evaluates WHERE conditions for each row, causing the advisory lock to be acquired multiple times when there were multiple messages with the same ordering_key. However, pg_advisory_unlock() was only called once after processing, leaving the lock partially held due to PostgreSQL's reentrant lock counter. This rewrites the listen() method to: - Process messages WITHOUT ordering key first using a CTE with FOR UPDATE SKIP LOCKED - Process messages WITH ordering key by: 1. Selecting a candidate message 2. Calling pg_try_advisory_lock() in a separate query (exactly once) 3. If lock acquired, delete and process the message 4. Always release the lock in a finally block 5. If lock not acquired, try other candidates with different ordering keys This ensures the advisory lock is acquired exactly once per ordering key, so pg_advisory_unlock() properly releases it. Fixes #538 Co-Authored-By: Claude <[email protected]>
1 parent 6aedee4 commit 2fb9d6a

2 files changed

Lines changed: 204 additions & 24 deletions

File tree

packages/postgres/src/mq.test.ts

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import { test } from "@fedify/fixture";
22
import { PostgresMessageQueue } from "@fedify/postgres/mq";
33
import { getRandomKey, testMessageQueue } from "@fedify/testing";
4+
import { deepStrictEqual } from "node:assert/strict";
45
import process from "node:process";
6+
import { test as nodeTest } from "node:test";
57
import postgres from "postgres";
68

79
const dbUrl = process.env.POSTGRES_URL;
@@ -35,4 +37,121 @@ test("PostgresMessageQueue", { ignore: dbUrl == null }, () => {
3537
);
3638
});
3739

40+
// Regression test for advisory lock not being fully released after processing
41+
// a message with an ordering key. This test verifies that after processing
42+
// a message through PostgresMessageQueue.listen(), the advisory lock is fully
43+
// released and another session can immediately acquire it.
44+
//
45+
// The original bug: pg_try_advisory_lock() in a WHERE clause was called
46+
// multiple times during query execution (once per row with the same
47+
// ordering_key), causing the lock's reentrant counter to be > 1. But
48+
// pg_advisory_unlock() was only called once, leaving the lock partially held.
49+
//
50+
// To reproduce the bug, we need MULTIPLE messages with the SAME ordering key.
51+
// This causes the WHERE clause to evaluate pg_try_advisory_lock() for each row,
52+
// incrementing the lock counter multiple times.
53+
//
54+
// See: https://github.com/fedify-dev/fedify/issues/538
55+
nodeTest(
56+
"PostgresMessageQueue advisory lock release",
57+
{ skip: dbUrl == null },
58+
async () => {
59+
const tableName = getRandomKey("message");
60+
const channelName = getRandomKey("channel");
61+
62+
// Use two separate connections to verify lock behavior across sessions
63+
const sql1 = postgres(dbUrl!);
64+
const sql2 = postgres(dbUrl!);
65+
66+
const mq = new PostgresMessageQueue(sql1, {
67+
tableName,
68+
channelName,
69+
pollInterval: { milliseconds: 100 },
70+
});
71+
72+
try {
73+
await mq.initialize();
74+
75+
// CRITICAL: Enqueue MULTIPLE messages with the SAME ordering key.
76+
// This is what triggers the bug - the WHERE clause evaluates
77+
// pg_try_advisory_lock() for each row, incrementing the lock counter
78+
// multiple times, but pg_advisory_unlock() is only called once.
79+
const orderingKey = "test-ordering-key";
80+
await mq.enqueue({ value: 1 }, { orderingKey });
81+
await mq.enqueue({ value: 2 }, { orderingKey });
82+
await mq.enqueue({ value: 3 }, { orderingKey });
83+
84+
// Track when the FIRST message is processed
85+
let firstMessageProcessed = false;
86+
let lockReleasedAfterProcessing = false;
87+
88+
const controller = new AbortController();
89+
90+
// Start listening - we only care about processing the FIRST message
91+
const listening = mq.listen(
92+
() => {
93+
if (!firstMessageProcessed) {
94+
firstMessageProcessed = true;
95+
// Abort after processing first message to stop the listener
96+
controller.abort();
97+
}
98+
},
99+
{ signal: controller.signal },
100+
);
101+
102+
// Wait for the first message to be processed
103+
const startTime = Date.now();
104+
while (!firstMessageProcessed && Date.now() - startTime < 10000) {
105+
await new Promise((resolve) => setTimeout(resolve, 100));
106+
}
107+
deepStrictEqual(
108+
firstMessageProcessed,
109+
true,
110+
"First message should be processed",
111+
);
112+
113+
// Wait for listener to fully stop and release locks
114+
await listening;
115+
await new Promise((resolve) => setTimeout(resolve, 200));
116+
117+
// THE KEY TEST: After processing ONE message (with multiple messages
118+
// having the same ordering key in the queue), the advisory lock should
119+
// be FULLY released. With the bug, the lock counter would be > 0
120+
// because pg_try_advisory_lock was called N times but pg_advisory_unlock
121+
// was only called once.
122+
const lockAfterProcessing = await sql2`
123+
SELECT pg_try_advisory_lock(
124+
hashtext(${tableName}),
125+
hashtext(${orderingKey})
126+
) AS acquired
127+
`;
128+
lockReleasedAfterProcessing = lockAfterProcessing[0].acquired;
129+
130+
// Release the lock we just acquired in sql2
131+
if (lockReleasedAfterProcessing) {
132+
await sql2`
133+
SELECT pg_advisory_unlock(
134+
hashtext(${tableName}),
135+
hashtext(${orderingKey})
136+
)
137+
`;
138+
}
139+
140+
// THE FIX: After processing, the lock should be fully released
141+
// and another session should be able to acquire it
142+
deepStrictEqual(
143+
lockReleasedAfterProcessing,
144+
true,
145+
"Lock should be fully released after message processing " +
146+
"(bug: lock counter was incremented multiple times but only " +
147+
"decremented once)",
148+
);
149+
} finally {
150+
await mq.drop();
151+
await sql1.end();
152+
await sql2.end();
153+
}
154+
},
155+
);
156+
38157
// cspell: ignore sqls

packages/postgres/src/mq.ts

Lines changed: 85 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -161,50 +161,111 @@ export class PostgresMessageQueue implements MessageQueue {
161161
const { signal } = options;
162162
const poll = async () => {
163163
while (!signal?.aborted) {
164-
// Use PostgreSQL advisory locks for distributed ordering key locking.
165-
// pg_try_advisory_lock returns true if the lock was acquired, false
166-
// otherwise. We use hashtext() to convert the table name and ordering
167-
// key to integers for the lock ID. Messages without an ordering key
168-
// (null) can always be processed.
164+
let processed = false;
165+
166+
// Step 1: Try to process messages without ordering key first.
167+
// These don't need advisory locks.
169168
const query = this.#sql`
170-
DELETE FROM ${this.#sql(this.#tableName)}
171-
WHERE id = (
172-
SELECT id
169+
WITH candidate AS (
170+
SELECT id, ordering_key
173171
FROM ${this.#sql(this.#tableName)}
174172
WHERE created + delay < CURRENT_TIMESTAMP
175-
AND (ordering_key IS NULL
176-
OR pg_try_advisory_lock(
177-
hashtext(${this.#tableName}),
178-
hashtext(ordering_key)
179-
))
173+
AND ordering_key IS NULL
180174
ORDER BY created
181175
LIMIT 1
176+
FOR UPDATE SKIP LOCKED
182177
)
178+
DELETE FROM ${this.#sql(this.#tableName)}
179+
WHERE id IN (SELECT id FROM candidate)
183180
RETURNING message, ordering_key;
184181
`.execute();
185182
const cancel = query.cancel.bind(query);
186183
signal?.addEventListener("abort", cancel);
187-
let i = 0;
188184
for (const row of await query) {
189185
if (signal?.aborted) return;
190-
const orderingKey = row.ordering_key as string | null;
191-
try {
192-
await handler(row.message);
193-
} finally {
194-
// Release the distributed advisory lock if we acquired one
195-
if (orderingKey != null) {
186+
await handler(row.message);
187+
processed = true;
188+
}
189+
signal?.removeEventListener("abort", cancel);
190+
191+
// If we processed a message without ordering key, continue the loop
192+
if (processed) continue;
193+
194+
// Step 2: Try to process a message with an ordering key.
195+
// We do this separately to ensure pg_try_advisory_lock is called
196+
// exactly once per attempt.
197+
// We loop through candidates until we find one we can lock, or run out.
198+
const attemptedOrderingKeys = new Set<string>();
199+
while (!signal?.aborted) {
200+
// Find a candidate with ordering key that we haven't tried yet
201+
const candidateResult = await this.#sql`
202+
SELECT id, ordering_key
203+
FROM ${this.#sql(this.#tableName)}
204+
WHERE created + delay < CURRENT_TIMESTAMP
205+
AND ordering_key IS NOT NULL
206+
${
207+
attemptedOrderingKeys.size > 0
208+
? this.#sql`AND ordering_key NOT IN ${
209+
this.#sql([...attemptedOrderingKeys])
210+
}`
211+
: this.#sql``
212+
}
213+
ORDER BY created
214+
LIMIT 1
215+
`;
216+
217+
if (candidateResult.length === 0) {
218+
// No more candidates to try
219+
break;
220+
}
221+
222+
const candidate = candidateResult[0];
223+
const candidateId = candidate.id as string;
224+
const orderingKey = candidate.ordering_key as string;
225+
attemptedOrderingKeys.add(orderingKey);
226+
227+
// Try to acquire the advisory lock (exactly once)
228+
const lockResult = await this.#sql`
229+
SELECT pg_try_advisory_lock(
230+
hashtext(${this.#tableName}),
231+
hashtext(${orderingKey})
232+
) AS acquired
233+
`;
234+
235+
if (lockResult[0].acquired) {
236+
try {
237+
// We have the lock, now delete and process the message
238+
const deleteResult = await this.#sql`
239+
DELETE FROM ${this.#sql(this.#tableName)}
240+
WHERE id = ${candidateId}
241+
RETURNING message, ordering_key
242+
`;
243+
244+
for (const row of deleteResult) {
245+
if (signal?.aborted) return;
246+
await handler(row.message);
247+
processed = true;
248+
}
249+
} finally {
250+
// Always release the advisory lock
196251
await this.#sql`
197252
SELECT pg_advisory_unlock(
198253
hashtext(${this.#tableName}),
199254
hashtext(${orderingKey})
200-
);
255+
)
201256
`;
202257
}
258+
// If we processed a message, continue the outer loop
259+
if (processed) break;
203260
}
204-
i++;
261+
// Lock not acquired, try next candidate with different ordering key
205262
}
206-
signal?.removeEventListener("abort", cancel);
207-
if (i < 1) break;
263+
264+
// If we processed a message, continue the outer loop
265+
if (processed) continue;
266+
267+
// No messages to process, exit the loop
268+
break;
208269
}
209270
};
210271
const timeouts = new Set<ReturnType<typeof setTimeout>>();

0 commit comments

Comments
 (0)