Skip to content

Commit bd7ac00

Browse files
matthvclaude
authored andcommitted
feat(workflow-executor): add graceful shutdown with in-flight step drain
- stop() now drains in-flight steps before closing resources - Add Runner.state getter: idle β†’ running β†’ draining β†’ stopped - Add stopTimeoutMs config (default 30s) to prevent hanging on stuck steps - Convert inFlightSteps from Set to Map to track step promises - HTTP server stays up during drain for frontend access - Add Logger.info optional method for drain status messages - 7 new tests: drain, timeout, state transitions, log messages fixes PRD-241 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent cf8e699 commit bd7ac00

6 files changed

Lines changed: 249 additions & 9 deletions

File tree

β€Žpackages/workflow-executor/CLAUDE.mdβ€Ž

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ Front ◀──▢ Orchestrator ◀──pull/push──▢ Executor ──
4343
```
4444
src/
4545
β”œβ”€β”€ errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError, NoRecordsError, NoReadableFieldsError, NoWritableFieldsError, NoActionsError, StepPersistenceError, NoRelationshipFieldsError, RelatedRecordNotFoundError
46-
β”œβ”€β”€ runner.ts # Runner class β€” main entry point (start/stop/triggerPoll, HTTP server wiring)
46+
β”œβ”€β”€ runner.ts # Runner class β€” main entry point (start/stop/triggerPoll, HTTP server wiring, graceful drain)
4747
β”œβ”€β”€ types/ # Core type definitions (@draft)
4848
β”‚ β”œβ”€β”€ step-definition.ts # StepType enum + step definition interfaces
4949
β”‚ β”œβ”€β”€ step-outcome.ts # Step outcome tracking types (StepOutcome, sent to orchestrator)
@@ -54,6 +54,10 @@ src/
5454
β”‚ β”œβ”€β”€ agent-port.ts # Interface to the Forest Admin agent (datasource)
5555
β”‚ β”œβ”€β”€ workflow-port.ts # Interface to the orchestrator
5656
β”‚ └── run-store.ts # Interface for persisting run state (scoped to a run)
57+
β”œβ”€β”€ stores/ # RunStore implementations
58+
β”‚ β”œβ”€β”€ in-memory-store.ts # InMemoryStore β€” Map-based, for tests
59+
β”‚ β”œβ”€β”€ database-store.ts # DatabaseStore β€” Sequelize + umzug migrations
60+
β”‚ └── build-run-store.ts # Factory functions: buildDatabaseRunStore, buildInMemoryRunStore
5761
β”œβ”€β”€ adapters/ # Port implementations
5862
β”‚ β”œβ”€β”€ agent-client-agent-port.ts # AgentPort via @forestadmin/agent-client
5963
β”‚ └── forest-server-workflow-port.ts # WorkflowPort via HTTP (forestadmin-client ServerUtils)
@@ -83,6 +87,7 @@ src/
8387
- **displayName in AI tools** β€” All `DynamicStructuredTool` schemas and system message prompts must use `displayName`, never `fieldName`. `displayName` is a Forest Admin frontend feature that replaces the technical field/relation/action name with a product-oriented label configured by the Forest Admin admin. End users write their workflow prompts using these display names, not the underlying technical names. After an AI tool call returns display names, map them back to `fieldName`/`name` before using them in datasource operations (e.g. filtering record values, calling `getRecord`).
8488
- **No recovery/retry** β€” Once the executor returns a step result to the orchestrator, the step is considered executed. There is no mechanism to re-dispatch a step, so executors must NOT include recovery checks (e.g. checking the RunStore for cached results before executing). Each step executes exactly once.
8589
- **Fetched steps must be executed** β€” Any step retrieved from the orchestrator via `getPendingStepExecutions()` must be executed. Silently discarding a fetched step (e.g. filtering it out by `runId` after fetching) violates the executor contract: the orchestrator assumes execution is guaranteed once the step is dispatched. The only valid filter before executing is deduplication via `inFlightSteps` (to avoid running the same step twice concurrently).
90+
- **Graceful shutdown** β€” `stop()` drains in-flight steps before closing resources. The `state` getter exposes the lifecycle: `idle β†’ running β†’ draining β†’ stopped`. `stopTimeoutMs` (default 30s) prevents `stop()` from hanging forever if a step is stuck. The HTTP server stays up during drain so the frontend can still query run status. Signal handling (`SIGTERM`/`SIGINT`) is the consumer's responsibility β€” the Runner is a library class.
8691

8792
## Commands
8893

β€Žpackages/workflow-executor/src/adapters/console-logger.tsβ€Ž

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,8 @@ export default class ConsoleLogger implements Logger {
44
error(message: string, context: Record<string, unknown>): void {
55
console.error(JSON.stringify({ message, timestamp: new Date().toISOString(), ...context }));
66
}
7+
8+
info(message: string, context: Record<string, unknown>): void {
9+
console.info(JSON.stringify({ message, timestamp: new Date().toISOString(), ...context }));
10+
}
711
}

β€Žpackages/workflow-executor/src/index.tsβ€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ export { default as ForestServerWorkflowPort } from './adapters/forest-server-wo
101101
export { default as ExecutorHttpServer } from './http/executor-http-server';
102102
export type { ExecutorHttpServerOptions } from './http/executor-http-server';
103103
export { default as Runner } from './runner';
104-
export type { RunnerConfig } from './runner';
104+
export type { RunnerConfig, RunnerState } from './runner';
105105
export { default as validateSecrets } from './validate-secrets';
106106
export { default as SchemaCache } from './schema-cache';
107107
export { default as InMemoryStore } from './stores/in-memory-store';
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
export interface Logger {
22
error(message: string, context: Record<string, unknown>): void;
3+
info?(message: string, context: Record<string, unknown>): void;
34
}

β€Žpackages/workflow-executor/src/runner.tsβ€Ž

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import ExecutorHttpServer from './http/executor-http-server';
2121
import patchBodySchemas from './pending-data-validators';
2222
import validateSecrets from './validate-secrets';
2323

24+
export type RunnerState = 'idle' | 'running' | 'draining' | 'stopped';
25+
2426
export interface RunnerConfig {
2527
agentPort: AgentPort;
2628
workflowPort: WorkflowPort;
@@ -32,15 +34,19 @@ export interface RunnerConfig {
3234
authSecret: string;
3335
logger?: Logger;
3436
httpPort?: number;
37+
stopTimeoutMs?: number;
3538
}
3639

40+
const DEFAULT_STOP_TIMEOUT_MS = 30_000;
41+
3742
export default class Runner {
3843
private readonly config: RunnerConfig;
3944
private httpServer: ExecutorHttpServer | null = null;
4045
private pollingTimer: NodeJS.Timeout | null = null;
41-
private readonly inFlightSteps = new Set<string>();
46+
private readonly inFlightSteps = new Map<string, Promise<void>>();
4247
private isRunning = false;
4348
private readonly logger: Logger;
49+
private _state: RunnerState = 'idle';
4450

4551
private static stepKey(step: PendingStepExecution): string {
4652
return `${step.runId}:${step.stepId}`;
@@ -51,12 +57,17 @@ export default class Runner {
5157
this.logger = config.logger ?? new ConsoleLogger();
5258
}
5359

60+
get state(): RunnerState {
61+
return this._state;
62+
}
63+
5464
async start(): Promise<void> {
5565
if (this.isRunning) return;
5666

5767
validateSecrets({ envSecret: this.config.envSecret, authSecret: this.config.authSecret });
5868

5969
this.isRunning = true;
70+
this._state = 'running';
6071

6172
try {
6273
await this.config.runStore.init(this.logger);
@@ -74,20 +85,48 @@ export default class Runner {
7485
}
7586
} catch (error) {
7687
this.isRunning = false;
88+
this._state = 'idle';
7789
throw error;
7890
}
7991

8092
this.schedulePoll();
8193
}
8294

8395
async stop(): Promise<void> {
96+
this._state = 'draining';
8497
this.isRunning = false;
8598

8699
if (this.pollingTimer !== null) {
87100
clearTimeout(this.pollingTimer);
88101
this.pollingTimer = null;
89102
}
90103

104+
// Drain in-flight steps
105+
if (this.inFlightSteps.size > 0) {
106+
this.logger.info?.('Draining in-flight steps', {
107+
count: this.inFlightSteps.size,
108+
steps: [...this.inFlightSteps.keys()],
109+
});
110+
111+
const timeout = this.config.stopTimeoutMs ?? DEFAULT_STOP_TIMEOUT_MS;
112+
const drainResult = await Promise.race([
113+
Promise.allSettled(this.inFlightSteps.values()).then(() => 'drained' as const),
114+
new Promise<'timeout'>(resolve => {
115+
setTimeout(() => resolve('timeout'), timeout);
116+
}),
117+
]);
118+
119+
if (drainResult === 'timeout') {
120+
this.logger.error('Drain timeout β€” steps still in flight', {
121+
remainingSteps: [...this.inFlightSteps.keys()],
122+
timeoutMs: timeout,
123+
});
124+
} else {
125+
this.logger.info?.('All in-flight steps drained', {});
126+
}
127+
}
128+
129+
// Close resources after drain
91130
if (this.httpServer) {
92131
await this.httpServer.stop();
93132
this.httpServer = null;
@@ -98,7 +137,7 @@ export default class Runner {
98137
this.config.runStore.close(this.logger),
99138
]);
100139

101-
// TODO: graceful drain of in-flight steps (out of scope PRD-223)
140+
this._state = 'stopped';
102141
}
103142

104143
async getRunStepExecutions(runId: string): Promise<StepExecutionData[]> {
@@ -189,10 +228,18 @@ export default class Runner {
189228
return this.config.aiClient.loadRemoteTools(mergedConfig);
190229
}
191230

192-
private async executeStep(step: PendingStepExecution): Promise<void> {
231+
private executeStep(step: PendingStepExecution): Promise<void> {
193232
const key = Runner.stepKey(step);
194-
this.inFlightSteps.add(key);
233+
const promise = this.doExecuteStep(step, key);
234+
this.inFlightSteps.set(key, promise);
195235

236+
return promise;
237+
}
238+
239+
private async doExecuteStep(
240+
step: PendingStepExecution,
241+
key: string,
242+
): Promise<void> {
196243
let result: StepExecutionResult;
197244

198245
try {

β€Žpackages/workflow-executor/test/runner.test.tsβ€Ž

Lines changed: 186 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ function createMockAiClient() {
6262
};
6363
}
6464

65-
function createMockLogger(): jest.Mocked<Logger> {
66-
return { error: jest.fn() };
65+
function createMockLogger(): jest.Mocked<Required<Logger>> {
66+
return { error: jest.fn(), info: jest.fn() };
6767
}
6868

6969
const VALID_ENV_SECRET = 'a'.repeat(64);
@@ -89,6 +89,7 @@ function createRunnerConfig(
8989
envSecret: string;
9090
authSecret: string;
9191
schemaCache: SchemaCache;
92+
stopTimeoutMs: number;
9293
}> = {},
9394
) {
9495
return {
@@ -178,7 +179,7 @@ beforeEach(() => {
178179

179180
afterEach(async () => {
180181
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
181-
if (runner) {
182+
if (runner && runner.state !== 'stopped') {
182183
await runner.stop();
183184
(runner as Runner | undefined) = undefined;
184185
}
@@ -285,6 +286,188 @@ describe('stop', () => {
285286
});
286287
});
287288

289+
// ---------------------------------------------------------------------------
290+
// Graceful shutdown
291+
// ---------------------------------------------------------------------------
292+
293+
describe('graceful shutdown', () => {
294+
it('state transitions: idle β†’ running β†’ draining β†’ stopped', async () => {
295+
runner = new Runner(createRunnerConfig());
296+
297+
expect(runner.state).toBe('idle');
298+
299+
await runner.start();
300+
expect(runner.state).toBe('running');
301+
302+
const stopPromise = runner.stop();
303+
expect(runner.state).toBe('draining');
304+
305+
await stopPromise;
306+
expect(runner.state).toBe('stopped');
307+
});
308+
309+
it('state resets to idle on start failure', async () => {
310+
const config = createRunnerConfig();
311+
(config.runStore.init as jest.Mock).mockRejectedValueOnce(new Error('init failed'));
312+
runner = new Runner(config);
313+
314+
await expect(runner.start()).rejects.toThrow('init failed');
315+
expect(runner.state).toBe('idle');
316+
});
317+
318+
it('stop() waits for in-flight steps before resolving', async () => {
319+
let resolveStep!: () => void;
320+
const stepPromise = new Promise<void>(resolve => {
321+
resolveStep = resolve;
322+
});
323+
324+
const workflowPort = createMockWorkflowPort();
325+
workflowPort.getPendingStepExecutions.mockResolvedValueOnce([
326+
makePendingStep({ runId: 'run-1', stepId: 'step-1' }),
327+
]);
328+
329+
jest.spyOn(StepExecutorFactory, 'create').mockResolvedValueOnce({
330+
execute: () =>
331+
stepPromise.then(() => ({
332+
stepOutcome: { type: 'condition', stepId: 'step-1', stepIndex: 0, status: 'success' },
333+
})),
334+
} as never);
335+
336+
runner = new Runner(createRunnerConfig({ workflowPort }));
337+
await runner.start();
338+
339+
jest.advanceTimersByTime(POLLING_INTERVAL_MS);
340+
await flushPromises();
341+
342+
let stopResolved = false;
343+
const stopPromise = runner.stop().then(() => {
344+
stopResolved = true;
345+
});
346+
347+
// stop() should not resolve while step is in flight
348+
await flushPromises();
349+
expect(stopResolved).toBe(false);
350+
351+
// Resolve the step
352+
resolveStep();
353+
await stopPromise;
354+
expect(stopResolved).toBe(true);
355+
});
356+
357+
it('stop() resolves after timeout when step is stuck', async () => {
358+
const workflowPort = createMockWorkflowPort();
359+
const logger = createMockLogger();
360+
workflowPort.getPendingStepExecutions.mockResolvedValueOnce([
361+
makePendingStep({ runId: 'run-1', stepId: 'stuck-step' }),
362+
]);
363+
364+
jest.spyOn(StepExecutorFactory, 'create').mockResolvedValueOnce({
365+
execute: () => new Promise(() => {}), // never resolves
366+
} as never);
367+
368+
runner = new Runner(createRunnerConfig({ workflowPort, logger, stopTimeoutMs: 50 }));
369+
await runner.start();
370+
371+
jest.advanceTimersByTime(POLLING_INTERVAL_MS);
372+
await flushPromises();
373+
374+
jest.useRealTimers();
375+
await runner.stop();
376+
jest.useFakeTimers();
377+
378+
expect(logger.error).toHaveBeenCalledWith(
379+
'Drain timeout β€” steps still in flight',
380+
expect.objectContaining({
381+
remainingSteps: ['run-1:stuck-step'],
382+
timeoutMs: 50,
383+
}),
384+
);
385+
expect(runner.state).toBe('stopped');
386+
});
387+
388+
it('stop() resolves immediately when no steps are in flight', async () => {
389+
const logger = createMockLogger();
390+
runner = new Runner(createRunnerConfig({ logger }));
391+
await runner.start();
392+
await runner.stop();
393+
394+
expect(logger.info).not.toHaveBeenCalledWith('Draining in-flight steps', expect.anything());
395+
expect(runner.state).toBe('stopped');
396+
});
397+
398+
it('HTTP server is closed after drain completes', async () => {
399+
let resolveStep!: () => void;
400+
const stepPromise = new Promise<void>(resolve => {
401+
resolveStep = resolve;
402+
});
403+
404+
const workflowPort = createMockWorkflowPort();
405+
workflowPort.getPendingStepExecutions.mockResolvedValueOnce([
406+
makePendingStep({ runId: 'run-1', stepId: 'step-1' }),
407+
]);
408+
409+
jest.spyOn(StepExecutorFactory, 'create').mockResolvedValueOnce({
410+
execute: () =>
411+
stepPromise.then(() => ({
412+
stepOutcome: { type: 'condition', stepId: 'step-1', stepIndex: 0, status: 'success' },
413+
})),
414+
} as never);
415+
416+
runner = new Runner(createRunnerConfig({ workflowPort, httpPort: 3100 }));
417+
await runner.start();
418+
419+
jest.advanceTimersByTime(POLLING_INTERVAL_MS);
420+
await flushPromises();
421+
422+
const stopPromise = runner.stop();
423+
await flushPromises();
424+
425+
// HTTP server should NOT have been stopped yet (drain in progress)
426+
expect(MockedExecutorHttpServer.prototype.stop).not.toHaveBeenCalled();
427+
428+
resolveStep();
429+
await stopPromise;
430+
431+
// Now HTTP server should be stopped
432+
expect(MockedExecutorHttpServer.prototype.stop).toHaveBeenCalled();
433+
});
434+
435+
it('logs drain info when steps are in flight', async () => {
436+
let resolveStep!: () => void;
437+
const stepPromise = new Promise<void>(resolve => {
438+
resolveStep = resolve;
439+
});
440+
441+
const workflowPort = createMockWorkflowPort();
442+
const logger = createMockLogger();
443+
workflowPort.getPendingStepExecutions.mockResolvedValueOnce([
444+
makePendingStep({ runId: 'run-1', stepId: 'step-1' }),
445+
]);
446+
447+
jest.spyOn(StepExecutorFactory, 'create').mockResolvedValueOnce({
448+
execute: () =>
449+
stepPromise.then(() => ({
450+
stepOutcome: { type: 'condition', stepId: 'step-1', stepIndex: 0, status: 'success' },
451+
})),
452+
} as never);
453+
454+
runner = new Runner(createRunnerConfig({ workflowPort, logger }));
455+
await runner.start();
456+
457+
jest.advanceTimersByTime(POLLING_INTERVAL_MS);
458+
await flushPromises();
459+
460+
resolveStep();
461+
await runner.stop();
462+
463+
expect(logger.info).toHaveBeenCalledWith('Draining in-flight steps', {
464+
count: 1,
465+
steps: ['run-1:step-1'],
466+
});
467+
expect(logger.info).toHaveBeenCalledWith('All in-flight steps drained', {});
468+
});
469+
});
470+
288471
// ---------------------------------------------------------------------------
289472
// Polling loop
290473
// ---------------------------------------------------------------------------

0 commit comments

Comments
Β (0)