Skip to content

Commit bb8cfea

Browse files
matthvalban bertolini
andcommitted
feat(workflow-executor): add graceful shutdown with in-flight step drain (#1512)
Co-authored-by: alban bertolini <[email protected]>
1 parent 87118c1 commit bb8cfea

13 files changed

Lines changed: 619 additions & 99 deletions

packages/workflow-executor/CLAUDE.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ Front ◀──▶ Orchestrator ◀──pull/push──▶ Executor ──
4343
```
4444
src/
4545
├── errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError, NoRecordsError, NoReadableFieldsError, NoWritableFieldsError, NoActionsError, StepPersistenceError, NoRelationshipFieldsError, RelatedRecordNotFoundError
46-
├── runner.ts # Runner class — main entry point (start/stop/triggerPoll, HTTP server wiring)
46+
├── runner.ts # Runner class — main entry point (start/stop/triggerPoll, HTTP server wiring, graceful drain)
4747
├── types/ # Core type definitions (@draft)
4848
│ ├── step-definition.ts # StepType enum + step definition interfaces
4949
│ ├── step-outcome.ts # Step outcome tracking types (StepOutcome, sent to orchestrator)
@@ -54,6 +54,10 @@ src/
5454
│ ├── agent-port.ts # Interface to the Forest Admin agent (datasource)
5555
│ ├── workflow-port.ts # Interface to the orchestrator
5656
│ └── run-store.ts # Interface for persisting run state (scoped to a run)
57+
├── stores/ # RunStore implementations
58+
│ ├── in-memory-store.ts # InMemoryStore — Map-based, for tests
59+
│ ├── database-store.ts # DatabaseStore — Sequelize + umzug migrations
60+
│ └── build-run-store.ts # Factory functions: buildDatabaseRunStore, buildInMemoryRunStore
5761
├── adapters/ # Port implementations
5862
│ ├── agent-client-agent-port.ts # AgentPort via @forestadmin/agent-client
5963
│ └── forest-server-workflow-port.ts # WorkflowPort via HTTP (forestadmin-client ServerUtils)
@@ -83,6 +87,7 @@ src/
8387
- **displayName in AI tools** — All `DynamicStructuredTool` schemas and system message prompts must use `displayName`, never `fieldName`. `displayName` is a Forest Admin frontend feature that replaces the technical field/relation/action name with a product-oriented label configured by the Forest Admin admin. End users write their workflow prompts using these display names, not the underlying technical names. After an AI tool call returns display names, map them back to `fieldName`/`name` before using them in datasource operations (e.g. filtering record values, calling `getRecord`).
8488
- **No recovery/retry** — Once the executor returns a step result to the orchestrator, the step is considered executed. There is no mechanism to re-dispatch a step, so executors must NOT include recovery checks (e.g. checking the RunStore for cached results before executing). Each step executes exactly once.
8589
- **Fetched steps must be executed** — Any step retrieved from the orchestrator via `getPendingStepExecutions()` must be executed. Silently discarding a fetched step (e.g. filtering it out by `runId` after fetching) violates the executor contract: the orchestrator assumes execution is guaranteed once the step is dispatched. The only valid filter before executing is deduplication via `inFlightSteps` (to avoid running the same step twice concurrently).
90+
- **Graceful shutdown**`stop()` drains in-flight steps before closing resources. The `state` getter exposes the lifecycle: `idle → running → draining → stopped`. `stopTimeoutMs` (default 30s) prevents `stop()` from hanging forever if a step is stuck. The HTTP server stays up during drain so the frontend can still query run status. Signal handling (`SIGTERM`/`SIGINT`) is the consumer's responsibility — the Runner is a library class.
8691

8792
## Commands
8893

packages/workflow-executor/src/adapters/console-logger.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,8 @@ export default class ConsoleLogger implements Logger {
44
error(message: string, context: Record<string, unknown>): void {
55
console.error(JSON.stringify({ message, timestamp: new Date().toISOString(), ...context }));
66
}
7+
8+
info(message: string, context: Record<string, unknown>): void {
9+
console.info(JSON.stringify({ message, timestamp: new Date().toISOString(), ...context }));
10+
}
711
}

packages/workflow-executor/src/build-workflow-executor.ts

Lines changed: 101 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,48 @@
11
import type { Logger } from './ports/logger-port';
2+
import type { RunnerState } from './runner';
23
import type { AiConfiguration } from '@forestadmin/ai-proxy';
34
import type { Options as SequelizeOptions } from 'sequelize';
45

56
import { AiClient } from '@forestadmin/ai-proxy';
67
import { Sequelize } from 'sequelize';
78

89
import AgentClientAgentPort from './adapters/agent-client-agent-port';
10+
import ConsoleLogger from './adapters/console-logger';
911
import ForestServerWorkflowPort from './adapters/forest-server-workflow-port';
12+
import ExecutorHttpServer from './http/executor-http-server';
1013
import Runner from './runner';
1114
import SchemaCache from './schema-cache';
1215
import DatabaseStore from './stores/database-store';
1316
import InMemoryStore from './stores/in-memory-store';
1417

1518
const DEFAULT_FOREST_SERVER_URL = 'https://api.forestadmin.com';
1619
const DEFAULT_POLLING_INTERVAL_MS = 5000;
20+
const FORCE_EXIT_DELAY_MS = 5000;
1721

1822
export interface WorkflowExecutor {
1923
start(): Promise<void>;
2024
stop(): Promise<void>;
25+
readonly state: RunnerState;
2126
}
2227

2328
export interface ExecutorOptions {
2429
envSecret: string;
2530
authSecret: string;
2631
agentUrl: string;
32+
httpPort: number;
2733
forestServerUrl?: string;
2834
aiConfigurations: AiConfiguration[];
2935
pollingIntervalMs?: number;
30-
httpPort?: number;
3136
logger?: Logger;
37+
stopTimeoutMs?: number;
3238
}
3339

3440
export type DatabaseExecutorOptions = ExecutorOptions &
3541
({ database: SequelizeOptions & { uri: string } } | { database: SequelizeOptions });
3642

3743
function buildCommonDependencies(options: ExecutorOptions) {
3844
const forestServerUrl = options.forestServerUrl ?? DEFAULT_FOREST_SERVER_URL;
45+
const logger = options.logger ?? new ConsoleLogger();
3946

4047
const workflowPort = new ForestServerWorkflowPort({
4148
envSecret: options.envSecret,
@@ -59,27 +66,114 @@ function buildCommonDependencies(options: ExecutorOptions) {
5966
schemaCache,
6067
workflowPort,
6168
aiClient,
69+
logger,
6270
pollingIntervalMs: options.pollingIntervalMs ?? DEFAULT_POLLING_INTERVAL_MS,
6371
envSecret: options.envSecret,
6472
authSecret: options.authSecret,
65-
httpPort: options.httpPort,
66-
logger: options.logger,
73+
stopTimeoutMs: options.stopTimeoutMs,
74+
};
75+
}
76+
77+
function createWorkflowExecutor(
78+
runner: Runner,
79+
server: ExecutorHttpServer,
80+
logger: Logger,
81+
): WorkflowExecutor {
82+
let shutdownPromise: Promise<void> | null = null;
83+
84+
const shutdown = async () => {
85+
try {
86+
await server.stop();
87+
} catch (err) {
88+
logger.error('HTTP server close failed during shutdown', {
89+
error: err instanceof Error ? err.message : String(err),
90+
});
91+
}
92+
93+
await runner.stop();
94+
};
95+
96+
const onSignal = async () => {
97+
logger.info?.('Received shutdown signal, stopping gracefully...', {});
98+
99+
try {
100+
if (!shutdownPromise) shutdownPromise = shutdown();
101+
await shutdownPromise;
102+
process.exitCode = 0;
103+
} catch (error) {
104+
logger.error('Graceful shutdown failed', {
105+
error: error instanceof Error ? error.message : String(error),
106+
});
107+
process.exitCode = 1;
108+
}
109+
110+
// Safety net: force exit if the event loop doesn't drain
111+
// eslint-disable-next-line no-console
112+
setTimeout(() => {
113+
logger.error('Process did not exit after shutdown — forcing exit', {});
114+
process.exit(process.exitCode ?? 1);
115+
}, FORCE_EXIT_DELAY_MS).unref();
116+
};
117+
118+
return {
119+
get state() {
120+
return runner.state;
121+
},
122+
123+
async start() {
124+
await runner.start();
125+
await server.start();
126+
127+
process.on('SIGTERM', onSignal);
128+
process.on('SIGINT', onSignal);
129+
},
130+
131+
async stop() {
132+
process.removeListener('SIGTERM', onSignal);
133+
process.removeListener('SIGINT', onSignal);
134+
135+
if (!shutdownPromise) shutdownPromise = shutdown();
136+
await shutdownPromise;
137+
},
67138
};
68139
}
69140

70141
export function buildInMemoryExecutor(options: ExecutorOptions): WorkflowExecutor {
71-
return new Runner({
72-
...buildCommonDependencies(options),
142+
const deps = buildCommonDependencies(options);
143+
144+
const runner = new Runner({
145+
...deps,
73146
runStore: new InMemoryStore(),
74147
});
148+
149+
const server = new ExecutorHttpServer({
150+
port: options.httpPort,
151+
runner,
152+
authSecret: options.authSecret,
153+
workflowPort: deps.workflowPort,
154+
logger: deps.logger,
155+
});
156+
157+
return createWorkflowExecutor(runner, server, deps.logger);
75158
}
76159

77160
export function buildDatabaseExecutor(options: DatabaseExecutorOptions): WorkflowExecutor {
161+
const deps = buildCommonDependencies(options);
78162
const { uri, ...sequelizeOptions } = options.database as SequelizeOptions & { uri?: string };
79163
const sequelize = uri ? new Sequelize(uri, sequelizeOptions) : new Sequelize(sequelizeOptions);
80164

81-
return new Runner({
82-
...buildCommonDependencies(options),
165+
const runner = new Runner({
166+
...deps,
83167
runStore: new DatabaseStore({ sequelize }),
84168
});
169+
170+
const server = new ExecutorHttpServer({
171+
port: options.httpPort,
172+
runner,
173+
authSecret: options.authSecret,
174+
workflowPort: deps.workflowPort,
175+
logger: deps.logger,
176+
});
177+
178+
return createWorkflowExecutor(runner, server, deps.logger);
85179
}

packages/workflow-executor/src/http/executor-http-server.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,19 @@ export default class ExecutorHttpServer {
6262
}
6363
});
6464

65+
// Health endpoint — before JWT so it's publicly accessible (infra probes don't send tokens)
66+
this.app.use(async (ctx, next) => {
67+
if (ctx.method === 'GET' && ctx.path === '/health') {
68+
const { state } = this.options.runner;
69+
ctx.status = state === 'running' || state === 'draining' ? 200 : 503;
70+
ctx.body = { state };
71+
72+
return;
73+
}
74+
75+
await next();
76+
});
77+
6578
this.app.use(bodyParser());
6679

6780
// JWT middleware — validates Bearer token using authSecret

packages/workflow-executor/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ export { default as ForestServerWorkflowPort } from './adapters/forest-server-wo
101101
export { default as ExecutorHttpServer } from './http/executor-http-server';
102102
export type { ExecutorHttpServerOptions } from './http/executor-http-server';
103103
export { default as Runner } from './runner';
104-
export type { RunnerConfig } from './runner';
104+
export type { RunnerConfig, RunnerState } from './runner';
105105
export { default as validateSecrets } from './validate-secrets';
106106
export { default as SchemaCache } from './schema-cache';
107107
export { default as InMemoryStore } from './stores/in-memory-store';
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
export interface Logger {
22
error(message: string, context: Record<string, unknown>): void;
3+
info?(message: string, context: Record<string, unknown>): void;
34
}

0 commit comments

Comments
 (0)