Skip to content

Commit a52c00a

Browse files
authored
refactor(workflow-executor): encapsulate pending-data business logic in Runner (#1503)
1 parent 6e7858a commit a52c00a

7 files changed

Lines changed: 318 additions & 411 deletions

File tree

packages/workflow-executor/CLAUDE.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ src/
7676
- **Privacy** — Zero client data leaves the client's infrastructure. `StepOutcome` is sent to the orchestrator and must NEVER contain client data. Privacy-sensitive information (e.g. AI reasoning) must stay in `StepExecutionData` (persisted in the RunStore, client-side only).
7777
- **Ports (IO injection)** — All external IO goes through injected port interfaces, keeping the core pure and testable.
7878
- **AI integration** — Uses `@langchain/core` (`BaseChatModel`, `DynamicStructuredTool`) for AI-powered steps. `ExecutionContext.model` is a `BaseChatModel`.
79-
- **Error hierarchy** — All domain errors must extend `WorkflowExecutorError` (defined in `src/errors.ts`). This ensures executors can distinguish domain errors (caught → error outcome) from infrastructure errors (uncaught → propagate to caller). Never throw a plain `Error` for a domain error case.
79+
- **Error hierarchy** — Two families of errors coexist in `src/errors.ts`:
80+
- **Domain errors** (`extends WorkflowExecutorError`) — Thrown during step execution (e.g. `RecordNotFoundError`, `MissingToolCallError`). Caught by `base-step-executor.ts` and converted into `stepOutcome.error` sent to the orchestrator. All domain errors must extend `WorkflowExecutorError`.
81+
- **Boundary errors** (`extends Error`) — Thrown outside step execution, at the HTTP or Runner layer (e.g. `RunNotFoundError`, `PendingDataNotFoundError`, `ConfigurationError`). Caught by the HTTP server and translated into HTTP status codes (404, 400, etc.). These intentionally do NOT extend `WorkflowExecutorError` to prevent `base-step-executor` from catching them as step failures.
8082
- **Dual error messages**`WorkflowExecutorError` carries two messages: `message` (technical, for dev logs) and `userMessage` (human-readable, surfaced to the Forest Admin UI via `stepOutcome.error`). The mapping happens in a single place: `base-step-executor.ts` uses `error.userMessage` when building the error outcome. When adding a new error subclass, always provide a distinct `userMessage` oriented toward end-users (no collection names, field names, or AI internals). If `userMessage` is omitted in the constructor call, it falls back to `message`.
8183
- **displayName in AI tools** — All `DynamicStructuredTool` schemas and system message prompts must use `displayName`, never `fieldName`. `displayName` is a Forest Admin frontend feature that replaces the technical field/relation/action name with a product-oriented label configured by the Forest Admin admin. End users write their workflow prompts using these display names, not the underlying technical names. After an AI tool call returns display names, map them back to `fieldName`/`name` before using them in datasource operations (e.g. filtering record values, calling `getRecord`).
8284
- **No recovery/retry** — Once the executor returns a step result to the orchestrator, the step is considered executed. There is no mechanism to re-dispatch a step, so executors must NOT include recovery checks (e.g. checking the RunStore for cached results before executing). Each step executes exactly once.

packages/workflow-executor/src/errors.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,3 +215,26 @@ export class RunNotFoundError extends Error {
215215
if (cause !== undefined) this.cause = cause;
216216
}
217217
}
218+
219+
export class PendingDataNotFoundError extends Error {
220+
constructor(runId: string, stepIndex: number) {
221+
super(`Step ${stepIndex} in run "${runId}" not found or has no pending data`);
222+
this.name = 'PendingDataNotFoundError';
223+
}
224+
}
225+
226+
/** Minimal mirror of ZodIssue — avoids importing Zod types into errors.ts. */
227+
export interface ValidationIssue {
228+
path: (string | number)[];
229+
message: string;
230+
code: string;
231+
}
232+
233+
export class InvalidPendingDataError extends WorkflowExecutorError {
234+
readonly issues: ValidationIssue[];
235+
236+
constructor(issues: ValidationIssue[]) {
237+
super('Invalid pending data', 'The request body is invalid.');
238+
this.issues = issues;
239+
}
240+
}

packages/workflow-executor/src/http/executor-http-server.ts

Lines changed: 16 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
import type { Logger } from '../ports/logger-port';
2-
import type { RunStore } from '../ports/run-store';
32
import type { WorkflowPort } from '../ports/workflow-port';
43
import type Runner from '../runner';
5-
import type { StepExecutionData } from '../types/step-execution-data';
64
import type { Server } from 'http';
75

86
import bodyParser from '@koa/bodyparser';
@@ -11,12 +9,10 @@ import http from 'http';
119
import Koa from 'koa';
1210
import koaJwt from 'koa-jwt';
1311

14-
import { RunNotFoundError } from '../errors';
15-
import patchBodySchemas from './pending-data-validators';
12+
import { InvalidPendingDataError, PendingDataNotFoundError, RunNotFoundError } from '../errors';
1613

1714
export interface ExecutorHttpServerOptions {
1815
port: number;
19-
runStore: RunStore;
2016
runner: Runner;
2117
authSecret: string;
2218
workflowPort: WorkflowPort;
@@ -142,9 +138,7 @@ export default class ExecutorHttpServer {
142138
}
143139

144140
private async handleGetRun(ctx: Koa.Context): Promise<void> {
145-
const { runId } = ctx.params;
146-
const steps = await this.options.runStore.getStepExecutions(runId);
147-
141+
const steps = await this.options.runner.getRunStepExecutions(ctx.params.runId);
148142
ctx.body = { steps };
149143
}
150144

@@ -179,36 +173,26 @@ export default class ExecutorHttpServer {
179173
return;
180174
}
181175

182-
const stepExecutions = await this.options.runStore.getStepExecutions(runId);
183-
const execution = stepExecutions.find(e => e.stepIndex === stepIndex);
184-
const schema = execution ? patchBodySchemas[execution.type] : undefined;
185-
186-
if (
187-
!execution ||
188-
!schema ||
189-
!('pendingData' in execution) ||
190-
execution.pendingData === undefined
191-
) {
192-
ctx.status = 404;
193-
ctx.body = { error: 'Step execution not found or has no pending data' };
176+
try {
177+
await this.options.runner.patchPendingData(runId, stepIndex, ctx.request.body);
178+
} catch (err) {
179+
if (err instanceof PendingDataNotFoundError) {
180+
ctx.status = 404;
181+
ctx.body = { error: 'Step execution not found or has no pending data' };
194182

195-
return;
196-
}
183+
return;
184+
}
197185

198-
const parsed = schema.safeParse(ctx.request.body);
186+
if (err instanceof InvalidPendingDataError) {
187+
ctx.status = 400;
188+
ctx.body = { error: 'Invalid request body', details: err.issues };
199189

200-
if (!parsed.success) {
201-
ctx.status = 400;
202-
ctx.body = { error: 'Invalid request body', details: parsed.error.issues };
190+
return;
191+
}
203192

204-
return;
193+
throw err;
205194
}
206195

207-
await this.options.runStore.saveStepExecution(runId, {
208-
...execution,
209-
pendingData: { ...(execution.pendingData as object), ...(parsed.data as object) },
210-
} as StepExecutionData);
211-
212196
ctx.status = 204;
213197
}
214198
}

packages/workflow-executor/src/http/pending-data-validators.ts renamed to packages/workflow-executor/src/pending-data-validators.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { StepExecutionData } from '../types/step-execution-data';
1+
import type { StepExecutionData } from './types/step-execution-data';
22

33
import { z } from 'zod';
44

packages/workflow-executor/src/runner.ts

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,19 @@ import type { Logger } from './ports/logger-port';
44
import type { RunStore } from './ports/run-store';
55
import type { McpConfiguration, WorkflowPort } from './ports/workflow-port';
66
import type { PendingStepExecution, StepExecutionResult } from './types/execution';
7+
import type { StepExecutionData } from './types/step-execution-data';
78
import type { AiClient, RemoteTool } from '@forestadmin/ai-proxy';
89

910
import ConsoleLogger from './adapters/console-logger';
10-
import { RunNotFoundError, causeMessage } from './errors';
11+
import {
12+
InvalidPendingDataError,
13+
PendingDataNotFoundError,
14+
RunNotFoundError,
15+
causeMessage,
16+
} from './errors';
1117
import StepExecutorFactory from './executors/step-executor-factory';
1218
import ExecutorHttpServer from './http/executor-http-server';
19+
import patchBodySchemas from './pending-data-validators';
1320
import validateSecrets from './validate-secrets';
1421

1522
export interface RunnerConfig {
@@ -64,7 +71,6 @@ export default class Runner {
6471
if (this.config.httpPort !== undefined && !this.httpServer) {
6572
const server = new ExecutorHttpServer({
6673
port: this.config.httpPort,
67-
runStore: this.config.runStore,
6874
runner: this,
6975
authSecret: this.config.authSecret,
7076
workflowPort: this.config.workflowPort,
@@ -102,6 +108,41 @@ export default class Runner {
102108
// TODO: graceful drain of in-flight steps (out of scope PRD-223)
103109
}
104110

111+
async getRunStepExecutions(runId: string): Promise<StepExecutionData[]> {
112+
return this.config.runStore.getStepExecutions(runId);
113+
}
114+
115+
async patchPendingData(runId: string, stepIndex: number, body: unknown): Promise<void> {
116+
const stepExecutions = await this.config.runStore.getStepExecutions(runId);
117+
const execution = stepExecutions.find(e => e.stepIndex === stepIndex);
118+
const schema = execution ? patchBodySchemas[execution.type] : undefined;
119+
120+
// pendingData is typed as T | undefined; null is not expected (RunStore never persists null)
121+
// but `== null` guards against both for safety.
122+
if (!execution || !schema || !('pendingData' in execution) || execution.pendingData == null) {
123+
throw new PendingDataNotFoundError(runId, stepIndex);
124+
}
125+
126+
const parsed = schema.safeParse(body);
127+
128+
if (!parsed.success) {
129+
throw new InvalidPendingDataError(
130+
parsed.error.issues.map(({ path, message, code }) => ({
131+
path: path as (string | number)[],
132+
message,
133+
code,
134+
})),
135+
);
136+
}
137+
138+
// Cast is safe: the type guard above ensures `execution` is the correct union branch,
139+
// and patchBodySchemas[execution.type] only accepts keys valid for that branch.
140+
await this.config.runStore.saveStepExecution(runId, {
141+
...execution,
142+
pendingData: { ...(execution.pendingData as object), ...(parsed.data as object) },
143+
} as StepExecutionData);
144+
}
145+
105146
async triggerPoll(runId: string): Promise<void> {
106147
const step = await this.config.workflowPort.getPendingStepExecutionsForRun(runId);
107148

0 commit comments

Comments
 (0)