Skip to content

Commit cf8e699

Browse files
authored
feat(workflow-executor): add buildInMemoryExecutor and buildDatabaseExecutor factories (#1510)
1 parent cf7d069 commit cf8e699

37 files changed

Lines changed: 2156 additions & 509 deletions

WORKFLOW-EXECUTOR-CONTRACT.md

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Workflow Executor — Contract Types
22

33
> Types exchanged between the **orchestrator (server)**, the **executor (agent-nodejs)**, and the **frontend**.
4-
> Last updated: 2026-03-25
4+
> Last updated: 2026-03-26
55
66
---
77

@@ -12,18 +12,59 @@
1212
The executor polls for the current pending step of a run. The server must return **one object** (not an array), or `null` if the run is not found.
1313

1414
```typescript
15+
interface StepUser {
16+
id: number;
17+
email: string;
18+
firstName: string;
19+
lastName: string;
20+
team: string;
21+
renderingId: number;
22+
role: string;
23+
permissionLevel: string;
24+
tags: Record<string, string>;
25+
}
26+
1527
interface PendingStepExecution {
1628
runId: string;
1729
stepId: string;
1830
stepIndex: number;
1931
baseRecordRef: RecordRef;
2032
stepDefinition: StepDefinition;
2133
previousSteps: Step[];
34+
user: StepUser; // identity of the user who initiated the step
2235
}
2336
```
2437

2538
> **`null` response** → executor throws `RunNotFoundError` → HTTP 404 returned to caller.
2639
40+
### CollectionSchema
41+
42+
Schema of a collection, returned by the orchestrator via `GET /liana/v1/collections/:collectionName`. Used by the executor to resolve primary keys and action endpoints.
43+
44+
```typescript
45+
interface CollectionSchema {
46+
collectionName: string;
47+
collectionDisplayName: string;
48+
primaryKeyFields: string[];
49+
fields: FieldSchema[];
50+
actions: ActionSchema[];
51+
}
52+
53+
interface FieldSchema {
54+
fieldName: string;
55+
displayName: string;
56+
isRelationship: boolean;
57+
relationType?: "BelongsTo" | "HasMany" | "HasOne";
58+
relatedCollectionName?: string;
59+
}
60+
61+
interface ActionSchema {
62+
name: string;
63+
displayName: string;
64+
endpoint: string; // route path used by the agent to execute the action
65+
}
66+
```
67+
2768
### RecordRef
2869

2970
Lightweight pointer to a specific record.
@@ -97,7 +138,7 @@ interface ConditionStepOutcome {
97138
type: "condition";
98139
stepId: string;
99140
stepIndex: number;
100-
status: "success" | "error" | "manual-decision";
141+
status: "success" | "error";
101142
selectedOption?: string; // present when status = "success"
102143
error?: string; // present when status = "error"
103144
}

packages/workflow-executor/src/adapters/agent-client-agent-port.ts

Lines changed: 69 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,13 @@ import type {
55
GetRelatedDataQuery,
66
UpdateRecordQuery,
77
} from '../ports/agent-port';
8-
import type { CollectionSchema } from '../types/record';
9-
import type { RemoteAgentClient, SelectOptions } from '@forestadmin/agent-client';
8+
import type SchemaCache from '../schema-cache';
9+
import type { StepUser } from '../types/execution';
10+
import type { CollectionSchema, RecordData } from '../types/record';
11+
import type { SelectOptions } from '@forestadmin/agent-client';
12+
13+
import { createRemoteAgentClient } from '@forestadmin/agent-client';
14+
import jsonwebtoken from 'jsonwebtoken';
1015

1116
import { RecordNotFoundError } from '../errors';
1217

@@ -41,20 +46,20 @@ function extractRecordId(
4146
}
4247

4348
export default class AgentClientAgentPort implements AgentPort {
44-
private readonly client: RemoteAgentClient;
45-
private readonly collectionSchemas: Record<string, CollectionSchema>;
46-
47-
constructor(params: {
48-
client: RemoteAgentClient;
49-
collectionSchemas: Record<string, CollectionSchema>;
50-
}) {
51-
this.client = params.client;
52-
this.collectionSchemas = params.collectionSchemas;
49+
private readonly agentUrl: string;
50+
private readonly authSecret: string;
51+
private readonly schemaCache: SchemaCache;
52+
53+
constructor(params: { agentUrl: string; authSecret: string; schemaCache: SchemaCache }) {
54+
this.agentUrl = params.agentUrl;
55+
this.authSecret = params.authSecret;
56+
this.schemaCache = params.schemaCache;
5357
}
5458

55-
async getRecord({ collection, id, fields }: GetRecordQuery) {
59+
async getRecord({ collection, id, fields }: GetRecordQuery, user: StepUser): Promise<RecordData> {
60+
const client = this.createClient(user);
5661
const schema = this.resolveSchema(collection);
57-
const records = await this.client.collection(collection).list<Record<string, unknown>>({
62+
const records = await client.collection(collection).list<Record<string, unknown>>({
5863
filters: buildPkFilter(schema.primaryKeyFields, id),
5964
pagination: { size: 1, number: 1 },
6065
...(fields?.length && { fields }),
@@ -67,18 +72,26 @@ export default class AgentClientAgentPort implements AgentPort {
6772
return { collectionName: collection, recordId: id, values: records[0] };
6873
}
6974

70-
async updateRecord({ collection, id, values }: UpdateRecordQuery) {
71-
const updatedRecord = await this.client
75+
async updateRecord(
76+
{ collection, id, values }: UpdateRecordQuery,
77+
user: StepUser,
78+
): Promise<RecordData> {
79+
const client = this.createClient(user);
80+
const updatedRecord = await client
7281
.collection(collection)
7382
.update<Record<string, unknown>>(encodePk(id), values);
7483

7584
return { collectionName: collection, recordId: id, values: updatedRecord };
7685
}
7786

78-
async getRelatedData({ collection, id, relation, limit, fields }: GetRelatedDataQuery) {
87+
async getRelatedData(
88+
{ collection, id, relation, limit, fields }: GetRelatedDataQuery,
89+
user: StepUser,
90+
): Promise<RecordData[]> {
91+
const client = this.createClient(user);
7992
const relatedSchema = this.resolveSchema(relation);
8093

81-
const records = await this.client
94+
const records = await client
8295
.collection(collection)
8396
.relation(relation, encodePk(id))
8497
.list<Record<string, unknown>>({
@@ -93,26 +106,55 @@ export default class AgentClientAgentPort implements AgentPort {
93106
}));
94107
}
95108

96-
async executeAction({ collection, action, id }: ExecuteActionQuery): Promise<unknown> {
109+
async executeAction(
110+
{ collection, action, id }: ExecuteActionQuery,
111+
user: StepUser,
112+
): Promise<unknown> {
113+
const client = this.createClient(user);
97114
const encodedId = id?.length ? [encodePk(id)] : [];
98-
const act = await this.client.collection(collection).action(action, { recordIds: encodedId });
115+
const act = await client.collection(collection).action(action, { recordIds: encodedId });
99116

100117
return act.execute();
101118
}
102119

103-
private resolveSchema(collectionName: string): CollectionSchema {
104-
const schema = this.collectionSchemas[collectionName];
120+
private createClient(user: StepUser) {
121+
const token = jsonwebtoken.sign({ ...user, scope: 'step-execution' }, this.authSecret, {
122+
expiresIn: '5m',
123+
});
124+
125+
return createRemoteAgentClient({
126+
url: this.agentUrl,
127+
token,
128+
actionEndpoints: this.buildActionEndpoints(),
129+
});
130+
}
131+
132+
private buildActionEndpoints() {
133+
const endpoints: Record<string, Record<string, { name: string; endpoint: string }>> = {};
134+
135+
for (const [collectionName, schema] of this.schemaCache) {
136+
endpoints[collectionName] = {};
105137

106-
if (!schema) {
107-
return {
138+
for (const action of schema.actions) {
139+
endpoints[collectionName][action.name] = {
140+
name: action.name,
141+
endpoint: action.endpoint,
142+
};
143+
}
144+
}
145+
146+
return endpoints;
147+
}
148+
149+
private resolveSchema(collectionName: string): CollectionSchema {
150+
return (
151+
this.schemaCache.get(collectionName) ?? {
108152
collectionName,
109153
collectionDisplayName: collectionName,
110154
primaryKeyFields: ['id'],
111155
fields: [],
112156
actions: [],
113-
};
114-
}
115-
116-
return schema;
157+
}
158+
);
117159
}
118160
}

packages/workflow-executor/src/adapters/forest-server-workflow-port.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { McpConfiguration, WorkflowPort } from '../ports/workflow-port';
2-
import type { PendingStepExecution } from '../types/execution';
2+
import type { PendingStepExecution, StepUser } from '../types/execution';
33
import type { CollectionSchema } from '../types/record';
44
import type { StepOutcome } from '../types/step-outcome';
55
import type { HttpOptions } from '@forestadmin/forestadmin-client';
@@ -62,10 +62,8 @@ export default class ForestServerWorkflowPort implements WorkflowPort {
6262
}
6363

6464
// eslint-disable-next-line @typescript-eslint/no-unused-vars
65-
async hasRunAccess(_runId: string, _userToken: string): Promise<boolean> {
65+
async hasRunAccess(_runId: string, _user: StepUser): Promise<boolean> {
6666
// TODO: implement once GET /liana/v1/workflow-runs/:runId/access is available.
67-
// When live: call ServerUtils.query with extra header 'forest-user-token': userToken
68-
// to let the orchestrator verify ownership.
6967
return true;
7068
}
7169
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import type { Logger } from './ports/logger-port';
2+
import type { AiConfiguration } from '@forestadmin/ai-proxy';
3+
import type { Options as SequelizeOptions } from 'sequelize';
4+
5+
import { AiClient } from '@forestadmin/ai-proxy';
6+
import { Sequelize } from 'sequelize';
7+
8+
import AgentClientAgentPort from './adapters/agent-client-agent-port';
9+
import ForestServerWorkflowPort from './adapters/forest-server-workflow-port';
10+
import Runner from './runner';
11+
import SchemaCache from './schema-cache';
12+
import DatabaseStore from './stores/database-store';
13+
import InMemoryStore from './stores/in-memory-store';
14+
15+
const DEFAULT_FOREST_SERVER_URL = 'https://api.forestadmin.com';
16+
const DEFAULT_POLLING_INTERVAL_MS = 5000;
17+
18+
export interface WorkflowExecutor {
19+
start(): Promise<void>;
20+
stop(): Promise<void>;
21+
}
22+
23+
export interface ExecutorOptions {
24+
envSecret: string;
25+
authSecret: string;
26+
agentUrl: string;
27+
forestServerUrl?: string;
28+
aiConfigurations: AiConfiguration[];
29+
pollingIntervalMs?: number;
30+
httpPort?: number;
31+
logger?: Logger;
32+
}
33+
34+
export type DatabaseExecutorOptions = ExecutorOptions &
35+
({ database: SequelizeOptions & { uri: string } } | { database: SequelizeOptions });
36+
37+
function buildCommonDependencies(options: ExecutorOptions) {
38+
const forestServerUrl = options.forestServerUrl ?? DEFAULT_FOREST_SERVER_URL;
39+
40+
const workflowPort = new ForestServerWorkflowPort({
41+
envSecret: options.envSecret,
42+
forestServerUrl,
43+
});
44+
45+
const aiClient = new AiClient({
46+
aiConfigurations: options.aiConfigurations,
47+
});
48+
49+
const schemaCache = new SchemaCache();
50+
51+
const agentPort = new AgentClientAgentPort({
52+
agentUrl: options.agentUrl,
53+
authSecret: options.authSecret,
54+
schemaCache,
55+
});
56+
57+
return {
58+
agentPort,
59+
schemaCache,
60+
workflowPort,
61+
aiClient,
62+
pollingIntervalMs: options.pollingIntervalMs ?? DEFAULT_POLLING_INTERVAL_MS,
63+
envSecret: options.envSecret,
64+
authSecret: options.authSecret,
65+
httpPort: options.httpPort,
66+
logger: options.logger,
67+
};
68+
}
69+
70+
export function buildInMemoryExecutor(options: ExecutorOptions): WorkflowExecutor {
71+
return new Runner({
72+
...buildCommonDependencies(options),
73+
runStore: new InMemoryStore(),
74+
});
75+
}
76+
77+
export function buildDatabaseExecutor(options: DatabaseExecutorOptions): WorkflowExecutor {
78+
const { uri, ...sequelizeOptions } = options.database as SequelizeOptions & { uri?: string };
79+
const sequelize = uri ? new Sequelize(uri, sequelizeOptions) : new Sequelize(sequelizeOptions);
80+
81+
return new Runner({
82+
...buildCommonDependencies(options),
83+
runStore: new DatabaseStore({ sequelize }),
84+
});
85+
}

packages/workflow-executor/src/errors.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,13 @@ export class RunNotFoundError extends Error {
216216
}
217217
}
218218

219+
export class UserMismatchError extends Error {
220+
constructor(runId: string) {
221+
super(`User not authorized for run "${runId}"`);
222+
this.name = 'UserMismatchError';
223+
}
224+
}
225+
219226
export class PendingDataNotFoundError extends Error {
220227
constructor(runId: string, stepIndex: number) {
221228
super(`Step ${stepIndex} in run "${runId}" not found or has no pending data`);

packages/workflow-executor/src/executors/base-step-executor.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St
2929

3030
protected readonly agentPort: AgentPort;
3131

32-
protected readonly schemaCache = new Map<string, CollectionSchema>();
33-
3432
constructor(context: ExecutionContext<TStep>) {
3533
this.context = context;
3634
this.agentPort = new SafeAgentPort(context.agentPort);
@@ -260,13 +258,13 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St
260258
return records[selectedIndex];
261259
}
262260

263-
/** Fetches a collection schema from WorkflowPort, with caching. */
261+
/** Fetches a collection schema from WorkflowPort, with TTL-based caching. */
264262
protected async getCollectionSchema(collectionName: string): Promise<CollectionSchema> {
265-
const cached = this.schemaCache.get(collectionName);
263+
const cached = this.context.schemaCache.get(collectionName);
266264
if (cached) return cached;
267265

268266
const schema = await this.context.workflowPort.getCollectionSchema(collectionName);
269-
this.schemaCache.set(collectionName, schema);
267+
this.context.schemaCache.set(collectionName, schema);
270268

271269
return schema;
272270
}

packages/workflow-executor/src/executors/condition-step-executor.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { StepExecutionResult } from '../types/execution';
22
import type { ConditionStepDefinition } from '../types/step-definition';
3-
import type { ConditionStepStatus } from '../types/step-outcome';
3+
import type { BaseStepStatus } from '../types/step-outcome';
44

55
import { DynamicStructuredTool, HumanMessage, SystemMessage } from '@forestadmin/ai-proxy';
66
import { z } from 'zod';
@@ -38,7 +38,7 @@ const GATEWAY_SYSTEM_PROMPT = `You are an AI agent selecting the correct option
3838

3939
export default class ConditionStepExecutor extends BaseStepExecutor<ConditionStepDefinition> {
4040
protected buildOutcomeResult(outcome: {
41-
status: ConditionStepStatus;
41+
status: BaseStepStatus;
4242
error?: string;
4343
selectedOption?: string;
4444
}): StepExecutionResult {
@@ -97,7 +97,10 @@ export default class ConditionStepExecutor extends BaseStepExecutor<ConditionSte
9797
}
9898

9999
if (!selectedOption) {
100-
return this.buildOutcomeResult({ status: 'manual-decision' });
100+
return this.buildOutcomeResult({
101+
status: 'error',
102+
error: "The AI couldn't decide. Try rephrasing the step's prompt.",
103+
});
101104
}
102105

103106
return this.buildOutcomeResult({ status: 'success', selectedOption });

0 commit comments

Comments
 (0)