Skip to content

Commit dbef72c

Browse files
Prevent unbounded alarm scheduling promise chain in ActorSqlite.
When user code repeatedly moves a Durable Object alarm to a later time and scheduleRun calls to the alarm manager take longer than local SQLite commits (common in production), the alarmLaterChain promise chain grew without bound. This caused alarms to fire at times far in the past and could block SRS commits for arbitrarily long when a subsequent setAlarm moved the alarm earlier. Replace the unbounded chain with a coalescing mechanism: at most one scheduleRun is in-flight at a time, and rapid intermediate alarm times are collapsed into a single pending value.
1 parent b5dbc80 commit dbef72c

3 files changed

Lines changed: 210 additions & 39 deletions

File tree

src/workerd/io/actor-sqlite-test.c++

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ static constexpr kj::Date twoMs = 2 * kj::MILLISECONDS + kj::UNIX_EPOCH;
2222
static constexpr kj::Date threeMs = 3 * kj::MILLISECONDS + kj::UNIX_EPOCH;
2323
static constexpr kj::Date fourMs = 4 * kj::MILLISECONDS + kj::UNIX_EPOCH;
2424
static constexpr kj::Date fiveMs = 5 * kj::MILLISECONDS + kj::UNIX_EPOCH;
25+
static constexpr kj::Date sixMs = 6 * kj::MILLISECONDS + kj::UNIX_EPOCH;
26+
static constexpr kj::Date tenMs = 10 * kj::MILLISECONDS + kj::UNIX_EPOCH;
2527
// Used as the "current time" parameter for armAlarmHandler in tests.
2628
// Set to epoch (before all test alarm times) so existing tests aren't affected by
2729
// the overdue alarm check.
@@ -1143,6 +1145,134 @@ KJ_TEST("rejected move-later alarm scheduling request does not break gate") {
11431145
test.pollAndExpectCalls({"commit"})[0]->fulfill();
11441146
}
11451147

1148+
KJ_TEST("rapid move-later alarm changes coalesce into bounded scheduleRun calls") {
1149+
// When many commits each move the alarm time later while a scheduleRun is already in-flight,
1150+
// the scheduleLaterAlarm mechanism should coalesce them into at most one pending request,
1151+
// rather than chaining N promises (one per commit).
1152+
ActorSqliteTest test;
1153+
1154+
// Initialize alarm state to 1ms.
1155+
test.setAlarm(oneMs);
1156+
test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
1157+
test.pollAndExpectCalls({"commit"})[0]->fulfill();
1158+
test.pollAndExpectCalls({});
1159+
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
1160+
1161+
// Move alarm to 2ms. The db commit completes, triggering a post-commit scheduleRun(2ms)
1162+
// since the alarm moved later.
1163+
test.setAlarm(twoMs);
1164+
test.pollAndExpectCalls({"commit"})[0]->fulfill();
1165+
KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);
1166+
// The first move-later scheduleRun starts.
1167+
auto fulfiller2Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]);
1168+
1169+
// While 2ms scheduleRun is in-flight, move alarm to 3ms, 4ms, 5ms in rapid succession.
1170+
// Each commit completes immediately but the scheduleRun for 2ms is still pending.
1171+
// Only the final value (5ms) should be scheduled after the 2ms scheduleRun completes.
1172+
test.setAlarm(threeMs);
1173+
test.pollAndExpectCalls({"commit"})[0]->fulfill();
1174+
test.pollAndExpectCalls({}); // No new scheduleRun -- coalesced into pending.
1175+
KJ_ASSERT(expectSync(test.getAlarm()) == threeMs);
1176+
1177+
test.setAlarm(fourMs);
1178+
test.pollAndExpectCalls({"commit"})[0]->fulfill();
1179+
test.pollAndExpectCalls({}); // No new scheduleRun -- coalesced into pending.
1180+
KJ_ASSERT(expectSync(test.getAlarm()) == fourMs);
1181+
1182+
test.setAlarm(fiveMs);
1183+
test.pollAndExpectCalls({"commit"})[0]->fulfill();
1184+
test.pollAndExpectCalls({}); // No new scheduleRun -- coalesced into pending.
1185+
KJ_ASSERT(expectSync(test.getAlarm()) == fiveMs);
1186+
1187+
// Now fulfill the 2ms scheduleRun. The coalesced pending time (5ms) should be scheduled next.
1188+
fulfiller2Ms->fulfill();
1189+
auto fulfiller5Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(5ms)"})[0]);
1190+
// Importantly, there is exactly one scheduleRun(5ms), not three separate calls for 3ms, 4ms, 5ms.
1191+
1192+
fulfiller5Ms->fulfill();
1193+
test.pollAndExpectCalls({});
1194+
1195+
KJ_ASSERT(expectSync(test.getAlarm()) == fiveMs);
1196+
}
1197+
1198+
KJ_TEST("armAlarmHandler with coalesced pending alarms schedules reschedule exactly once") {
1199+
// Verifies two properties:
1200+
// 1. No duplicate scheduleRun(6ms): armAlarmHandler clears pendingLaterAlarmTime so the
1201+
// FORK_A completion handler does not re-issue it.
1202+
// 2. Future commits (10ms) that arrive after armAlarmHandler fires are correctly handled:
1203+
// they queue in pendingLaterAlarmTime, get picked up by FORK_A's completion handler,
1204+
// and chain off FORK_B (armAlarmHandler's fork) so the order is 3ms -> 6ms -> 10ms.
1205+
ActorSqliteTest test;
1206+
1207+
// Initialize alarm to 1ms and fully commit it so lastConfirmedAlarmDbState = 1ms.
1208+
test.setAlarm(oneMs);
1209+
test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
1210+
test.pollAndExpectCalls({"commit"})[0]->fulfill();
1211+
test.pollAndExpectCalls({});
1212+
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
1213+
1214+
// Move alarm to 3ms -- scheduleRun(3ms) goes in-flight via scheduleLaterAlarm.
1215+
// alarmLaterIsInFlight=true, alarmLaterInFlight=FORK_A.
1216+
test.setAlarm(threeMs);
1217+
test.pollAndExpectCalls({"commit"})[0]->fulfill();
1218+
auto fulfiller3Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(3ms)"})[0]);
1219+
1220+
// While 3ms scheduleRun is in-flight, rapidly move to 4ms then 6ms.
1221+
// Both coalesce into pendingLaterAlarmTime=6ms; no new scheduleRun issued.
1222+
test.setAlarm(fourMs);
1223+
test.pollAndExpectCalls({"commit"})[0]->fulfill();
1224+
test.pollAndExpectCalls({});
1225+
1226+
test.setAlarm(sixMs);
1227+
test.pollAndExpectCalls({"commit"})[0]->fulfill();
1228+
test.pollAndExpectCalls({});
1229+
KJ_ASSERT(expectSync(test.getAlarm()) == sixMs);
1230+
1231+
// The 1ms alarm fires. armAlarmHandler sees scheduledTime=1ms, localAlarmState=6ms.
1232+
// willFireEarlier(1ms, 6ms) => reschedule-later path:
1233+
// requestScheduledAlarm(6ms, FORK_A.addBranch()) called synchronously -> FORK_B
1234+
// pendingLaterAlarmTime cleared to kj::none
1235+
// alarmLaterInFlight = FORK_B
1236+
// alarmLaterIsInFlight unchanged (still true, owned by FORK_A lifecycle)
1237+
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
1238+
KJ_ASSERT(armResult.is<ActorSqlite::CancelAlarmHandler>());
1239+
auto& cancelResult = armResult.get<ActorSqlite::CancelAlarmHandler>();
1240+
1241+
// scheduleRun(6ms) issued exactly once -- synchronously inside armAlarmHandler.
1242+
auto fulfiller6Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(6ms)"})[0]);
1243+
1244+
// Commit for 10ms arrives while scheduleRun(3ms) is still in-flight.
1245+
// alarmLaterIsInFlight=true (FORK_A lifecycle still active) so 10ms is correctly
1246+
// queued: pendingLaterAlarmTime=Some(10ms). FORK_B is referenced by alarmLaterInFlight.
1247+
test.setAlarm(tenMs);
1248+
test.pollAndExpectCalls({"commit"})[0]->fulfill();
1249+
test.pollAndExpectCalls({}); // No scheduleRun yet -- coalesced into pending.
1250+
1251+
// Fulfill scheduleRun(3ms). FORK_A resolves. FORK_A completion handler fires:
1252+
// alarmLaterIsInFlight=false
1253+
// pendingLaterAlarmTime=Some(10ms) -> scheduleLaterAlarm(10ms)
1254+
// requestScheduledAlarm(10ms, FORK_B.addBranch()) -> FORK_C chains off FORK_B
1255+
// scheduleRun(10ms) issued synchronously
1256+
// Importantly: scheduleRun(6ms) is NOT issued again here -- pendingLaterAlarmTime
1257+
// held 10ms (not 6ms), because armAlarmHandler had already cleared the 6ms.
1258+
fulfiller3Ms->fulfill();
1259+
auto fulfiller10Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(10ms)"})[0]);
1260+
1261+
// Fulfill scheduleRun(6ms). FORK_B resolves cleanly -- it has no completion handler,
1262+
// so overwriting alarmLaterInFlight with FORK_C is safe: FORK_C captured a branch of
1263+
// FORK_B as priorTask before the field was overwritten, keeping FORK_B alive. FORK_B
1264+
// resolving propagates into FORK_C's priorTask silently. No new scheduleRun here.
1265+
fulfiller6Ms->fulfill();
1266+
KJ_ASSERT(cancelResult.waitBeforeCancel.poll(test.ws));
1267+
test.pollAndExpectCalls({});
1268+
1269+
// Fulfill scheduleRun(10ms). FORK_C resolves, its completion handler fires with no
1270+
// pending times. Done.
1271+
fulfiller10Ms->fulfill();
1272+
test.pollAndExpectCalls({});
1273+
KJ_ASSERT(expectSync(test.getAlarm()) == tenMs);
1274+
}
1275+
11461276
KJ_TEST("an exception thrown during merged commits does not hang") {
11471277
ActorSqliteTest test({.monitorOutputGate = false});
11481278

src/workerd/io/actor-sqlite.c++

Lines changed: 61 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -383,19 +383,51 @@ kj::Promise<void> ActorSqlite::requestScheduledAlarm(
383383
});
384384
}
385385

386+
void ActorSqlite::scheduleLaterAlarm(kj::Maybe<kj::Date> newAlarmTime, SpanParent parentSpan) {
387+
if (alarmLaterIsInFlight) {
388+
// There's already a move-later request in-flight. Just store the desired time; the in-flight
389+
// request's completion handler will pick it up and start a new request. This overwrites any
390+
// previously pending time, which is fine -- only the latest value matters.
391+
pendingLaterAlarmTime = newAlarmTime;
392+
return;
393+
}
394+
395+
alarmLaterIsInFlight = true;
396+
alarmLaterInFlight = requestScheduledAlarm(newAlarmTime, alarmLaterInFlight.addBranch())
397+
.attach(parentSpan.newChild("actor_sqlite_alarm_sync"_kjc))
398+
.catch_([](kj::Exception&& e) {
399+
// If an exception occurs when scheduling the alarm later, it's OK -- the alarm will
400+
// eventually fire at the earlier time, and the rescheduling will be retried.
401+
// We catch here to prevent the chain from breaking on errors.
402+
LOG_WARNING_PERIODICALLY("NOSENTRY SQLite reschedule later alarm failed", e);
403+
}).fork();
404+
405+
commitTasks.add(alarmLaterInFlight.addBranch()
406+
.then([this]() {
407+
alarmLaterIsInFlight = false;
408+
KJ_IF_SOME(nextTime, kj::mv(pendingLaterAlarmTime)) {
409+
scheduleLaterAlarm(nextTime, nullptr);
410+
}
411+
}).catch_([](kj::Exception&& e) {
412+
// Move-later alarm failures are non-fatal; catch here to prevent taskFailed() from
413+
// breaking the output gate.
414+
LOG_WARNING_PERIODICALLY("NOSENTRY SQLite reschedule later alarm drain failed", e);
415+
}));
416+
}
417+
386418
ActorSqlite::PrecommitAlarmState ActorSqlite::startPrecommitAlarmScheduling() {
387419
PrecommitAlarmState state;
388420
if (pendingCommit == kj::none &&
389421
willFireEarlier(metadata.getAlarm(), alarmScheduledNoLaterThan)) {
390-
// We must wait on the `alarmLaterChain` here, otherwise, if there is a pending "move later"
391-
// alarm task and it fails, our "move earlier" alarm might interleave, succeed, and be followed
392-
// by a retry of the "move later" alarm. This happens because "move later" alarms complete after
393-
// we commit to local SQLite.
422+
// We must wait on the `alarmLaterInFlight` promise here, otherwise, if there is a pending
423+
// "move later" alarm task and it fails, our "move earlier" alarm might interleave, succeed,
424+
// and be followed by a retry of the "move later" alarm. This happens because "move later"
425+
// alarms complete after we commit to local SQLite.
394426
//
395-
// By waiting on any pending "move later" alarm, we correctly serialize our `scheduleRun()`
427+
// By waiting on any in-flight "move later" alarm, we correctly serialize our `scheduleRun()`
396428
// calls to the alarm manager.
397429
state.schedulingPromise =
398-
requestScheduledAlarm(metadata.getAlarm(), alarmLaterChain.addBranch());
430+
requestScheduledAlarm(metadata.getAlarm(), alarmLaterInFlight.addBranch());
399431
}
400432
return kj::mv(state);
401433
}
@@ -466,17 +498,18 @@ kj::Promise<void> ActorSqlite::commitImpl(
466498
KJ_LOG(WARNING, "NOSENTRY DEBUG_ALARM: Move earlier loop iteration", syncIterations,
467499
logDate(currentAlarmState), logDate(alarmScheduledNoLaterThan), alarmVersion);
468500
}
469-
// Note that we do not pass alarmLaterChain here. We don't need to for the following reasons:
501+
// Note that we do not pass alarmLaterInFlight here. We don't need to for the following
502+
// reasons:
470503
//
471-
// 1. We already waited for the chain in the precommitAlarmState promise above.
504+
// 1. We already waited for it in the precommitAlarmState promise above.
472505
// 2. We set the `pendingCommit` prior to yielding to the event loop earlier, so any subsequent
473506
// commits have to wait for us to fulfill the pendingCommit promise. In short, no one could
474-
// have added another "move-later" alarm to the chain, not until we finish.
507+
// have started another "move-later" alarm, not until we finish.
475508
//
476-
// While we *could* pass the alarmLaterChain promise (it wouldn't be incorrect), when calling
477-
// addBranch() on a resolved ForkedPromise, the continuation would be evaluated on a future turn
478-
// of the event loop. That means we're going to suspend, even if the promise is ready, which
479-
// means we'd take a performance hit.
509+
// While we *could* pass the alarmLaterInFlight promise (it wouldn't be incorrect), when
510+
// calling addBranch() on a resolved ForkedPromise, the continuation would be evaluated on a
511+
// future turn of the event loop. That means we're going to suspend, even if the promise is
512+
// ready, which means we'd take a performance hit.
480513
co_await requestScheduledAlarm(metadata.getAlarm(), kj::READY_NOW);
481514
syncIterations++;
482515
}
@@ -535,20 +568,7 @@ kj::Promise<void> ActorSqlite::commitImpl(
535568
KJ_LOG(WARNING, "NOSENTRY DEBUG_ALARM: Moving alarm later", "sqlite_has",
536569
logDate(alarmStateForCommit), logDate(alarmScheduledNoLaterThan), alarmVersion);
537570
}
538-
// We need to extend our alarmLaterChain now that we're adding a new "move-later" alarm task.
539-
//
540-
// Technically, we don't need serialize our "move-later" alarms since SQLite has the later
541-
// time committed locally. We could just set the `alarmLaterChain` and pass a `kj::READY_NOW`
542-
// to requestScheduledAlarm, and so if we have a partial failure we would just recover when
543-
// the alarm runs early. That said, it doesn't hurt to serialize on the client-side.
544-
alarmLaterChain = requestScheduledAlarm(alarmStateForCommit, alarmLaterChain.addBranch())
545-
.attach(commitSpan.newChild("actor_sqlite_alarm_sync"_kjc))
546-
.catch_([](kj::Exception&& e) {
547-
// If an exception occurs when scheduling the alarm later, it's OK -- the alarm will
548-
// eventually fire at the earlier time, and the rescheduling will be retried.
549-
// We catch here to prevent the chain from breaking on errors.
550-
LOG_WARNING_PERIODICALLY("NOSENTRY SQLite reschedule later alarm failed", e);
551-
}).fork();
571+
scheduleLaterAlarm(alarmStateForCommit, SpanParent(commitSpan));
552572
}
553573
}
554574
}
@@ -945,14 +965,20 @@ kj::OneOf<ActorSqlite::CancelAlarmHandler, ActorSqlite::RunAlarmHandler> ActorSq
945965
"NOSENTRY SQLite alarm handler canceled with requestScheduledAlarm.", scheduledTime,
946966
localAlarmState.orDefault(kj::UNIX_EPOCH), actorId);
947967

948-
// Since we're requesting to move the alarm time to later, we need to add to our
949-
// `alarmLaterChain`. Note that for the chain, we want to make sure any scheduling failure
950-
// does not break us, but for the `CancelAlarmHandler`, we want the caller to receive the
951-
// exception normally, so we do not consume the exception.
968+
// Since we're requesting to move the alarm time to later, we need to update the
969+
// alarmLaterInFlight promise. We issue a single requestScheduledAlarm call, fork it,
970+
// and use one branch for tracking (with error catching) and the other for the caller
971+
// (which propagates errors). Note that we directly update alarmLaterInFlight here
972+
// rather than using scheduleLaterAlarm(), because we need a separate un-caught branch
973+
// of the promise for the CancelAlarmHandler return value.
952974
auto schedulingPromise =
953-
requestScheduledAlarm(localAlarmState, alarmLaterChain.addBranch()).fork();
954-
alarmLaterChain = schedulingPromise.addBranch()
955-
.catch_([](kj::Exception&& e) {
975+
requestScheduledAlarm(localAlarmState, alarmLaterInFlight.addBranch()).fork();
976+
// Clear any stale pending time so that when the existing completion handler
977+
// fires it does not start a redundant scheduleLaterAlarm for the same time that
978+
// armAlarmHandler is already scheduling.
979+
pendingLaterAlarmTime = kj::none;
980+
alarmLaterInFlight = schedulingPromise.addBranch()
981+
.catch_([](kj::Exception&& e) {
956982
// If an exception occurs when scheduling the alarm later, it's OK -- the alarm will
957983
// eventually fire at the earlier time, and the rescheduling will be retried.
958984
// We catch here to prevent the chain from breaking on errors.
@@ -978,7 +1004,7 @@ kj::OneOf<ActorSqlite::CancelAlarmHandler, ActorSqlite::RunAlarmHandler> ActorSq
9781004
// handler invocation.
9791005
//
9801006
// We pass kj::READY_NOW because being in this branch (SQLite is ahead of the alarm manager)
981-
// means there's no recent move-later operation to wait for, so no need for alarmLaterChain.
1007+
// means there's no recent move-later operation to wait for, so no need for alarmLaterInFlight.
9821008
return CancelAlarmHandler{
9831009
.waitBeforeCancel = requestScheduledAlarm(localAlarmState, kj::READY_NOW)};
9841010
}

src/workerd/io/actor-sqlite.h

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
2828
// Makes a request to the alarm manager to run the alarm handler at the given time, returning
2929
// a promise that resolves when the scheduling has succeeded. `priorTask` is any work we must
3030
// wait on prior to scheduling the new request, as of this writing, this would be the
31-
// alarmLaterChain, which holds promises to move the alarm time "later" than is currently set.
31+
// alarmLaterInFlight promise, which tracks any in-flight request to move the alarm "later"
32+
// than is currently set.
3233
virtual kj::Promise<void> scheduleRun(
3334
kj::Maybe<kj::Date> newAlarmTime, kj::Promise<void> priorTask);
3435

@@ -263,9 +264,18 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
263264
// for the output gate lock hold trace when a non-allowUnconfirmed write occurs.
264265
SpanParent currentCommitSpan = nullptr;
265266

266-
// Promise chain for serializing "move alarm later" operations to prevent races
267-
// at the alarm manager. Each update waits for the previous one to complete.
268-
kj::ForkedPromise<void> alarmLaterChain = kj::Promise<void>(kj::READY_NOW).fork();
267+
// Promise for the currently in-flight "move alarm later" operation, if any.
268+
// Used to serialize move-earlier operations against any pending move-later operation.
269+
kj::ForkedPromise<void> alarmLaterInFlight = kj::Promise<void>(kj::READY_NOW).fork();
270+
271+
// True when a "move alarm later" request is currently in-flight via scheduleLaterAlarm().
272+
bool alarmLaterIsInFlight = false;
273+
274+
// When a "move alarm later" request is already in-flight and we need to schedule another one,
275+
// we store the desired alarm time here. When the in-flight request completes, it checks this variable and
276+
// starts a new request if needed. The outer Maybe indicates whether there is a pending time at
277+
// all; the inner Maybe<Date> is the alarm time to set (where kj::none means "clear the alarm").
278+
kj::Maybe<kj::Maybe<kj::Date>> pendingLaterAlarmTime;
269279

270280
// Version counter that increments on every alarm change. Used to detect if another commit
271281
// modified the alarm while we were async, allowing us to skip redundant post-commit alarm
@@ -286,6 +296,11 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
286296
kj::Promise<void> requestScheduledAlarm(
287297
kj::Maybe<kj::Date> requestedTime, kj::Promise<void> priorTask);
288298

299+
// Schedules a "move alarm later" operation. If no move-later is currently in-flight, starts one
300+
// immediately. If one is already in-flight, stores the desired time in `pendingLaterAlarmTime`
301+
// so it will be picked up when the current in-flight operation completes.
302+
void scheduleLaterAlarm(kj::Maybe<kj::Date> newAlarmTime, SpanParent parentSpan);
303+
289304
struct PrecommitAlarmState {
290305
// Promise for the completion of precommit alarm scheduling
291306
kj::Maybe<kj::Promise<void>> schedulingPromise;

0 commit comments

Comments
 (0)