Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
1107759
feat(workflow-executor): scaffold @forestadmin/workflow-executor package
matthv Mar 17, 2026
4510b7b
feat(workflow-executor): finalize scaffold — clean CLAUDE.md, remove …
Mar 17, 2026
17f26ca
fix(workflow-executor): address review — lint test dir, improve test,…
Mar 17, 2026
29f5646
feat(workflow-executor): define foundational types and port interface…
Scra3 Mar 18, 2026
127b579
feat(workflow-executor): implement condition step executor (AI-only) …
Scra3 Mar 18, 2026
cb8036b
feat(workflow-executor): implement AgentPort adapter using agent-clie…
matthv Mar 18, 2026
0ebae51
feat(ai-proxy): add programmatic API to aiClient (#1492)
Scra3 Mar 18, 2026
c25a953
feat(workflow-executor): implement WorkflowPort adapter using foresta…
matthv Mar 18, 2026
c9877fe
feat(workflow-executor): add ReadRecordStepExecutor (#1497)
Scra3 Mar 19, 2026
613ec1b
feat(workflow-executor): add HTTP server and WorkflowRunner scaffold …
matthv Mar 20, 2026
39de72a
refactor(workflow-executor): workflow steps (#1502)
Scra3 Mar 24, 2026
de39c30
feat(workflow-executor): add JWT auth and secret validation to HTTP s…
matthv Mar 24, 2026
9399bb7
refactor(workflow-executor): move userConfirmed to RunStore pendingDa…
Scra3 Mar 25, 2026
50583e8
feat(workflow-executor): add RunStore implementations (InMemoryStore …
matthv Mar 25, 2026
6e7858a
feat(workflow-executor): type-specific PATCH body validation (#1508)
Scra3 Mar 25, 2026
a52c00a
refactor(workflow-executor): encapsulate pending-data business logic …
Scra3 Mar 25, 2026
cf7d069
docs(workflow-executor): update contract to match implementation (#1511)
Scra3 Mar 25, 2026
cf8e699
feat(workflow-executor): add buildInMemoryExecutor and buildDatabaseE…
Scra3 Mar 26, 2026
7142e6c
feat(workflow-executor): add graceful shutdown with in-flight step dr…
matthv Mar 27, 2026
df7e43a
feat(workflow-executor): add info logging around step execution (#1514)
Scra3 Mar 30, 2026
6ed7182
feat(workflow-executor): add pre-recorded AI decisions for record ste…
Scra3 Mar 31, 2026
252b69d
refactor(workflow-executor): remove redundant Task from all naming (#…
Scra3 Mar 31, 2026
5ae1445
feat(workflow-executor): add example setup with Docker and buildDatab…
matthv Apr 1, 2026
78902c0
refactor(ai-proxy): align McpClient API with main branch
matthv Apr 1, 2026
5db77cd
style(ai-proxy): fix prettier formatting in ai-client.ts
matthv Apr 2, 2026
61abf3c
feat(workflow-executor): add sourceId to McpToolRef for MCP tool trac…
matthv Apr 2, 2026
f970c8a
style(workflow-executor): fix prettier formatting in tests
matthv Apr 2, 2026
6744696
fix(workflow-executor): guard selectedTool lookup and resolve related…
matthv Apr 2, 2026
dd78bf5
fix: merge main and resolve conflicts
matthv Apr 2, 2026
3c51658
fix(workflow-executor): reject non-integer record index in bounds check
matthv Apr 2, 2026
f815eca
feat(workflow-executor): server-side AI fallback (#1530)
Scra3 Apr 7, 2026
22450d1
feat(workflow-executor): add user context and date/time to AI prompts…
Scra3 Apr 7, 2026
1798709
feat(workflow-executor): add guidance step type (PRD-272) (#1532)
Scra3 Apr 7, 2026
13990f4
refactor(workflow-executor): use envSecret instead of JWT for server …
Scra3 Apr 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ jobs:
- plugin-aws-s3
- plugin-export-advanced
- plugin-flattener
- workflow-executor
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
Expand Down
5 changes: 5 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ yarn workspace @forestadmin/agent test
5. Are edge cases handled?
6. Is the naming clear and consistent?

## Git Workflow

The **main working branch** for workflow-executor development is `feat/prd-214-setup-workflow-executor-package`.
All feature branches for this area should be based on and PRs targeted at this branch (not `main`).

## Linear Tickets

### MCP Setup
Expand Down
232 changes: 232 additions & 0 deletions WORKFLOW-EXECUTOR-CONTRACT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
# Workflow Executor — Contract Types

> Types exchanged between the **orchestrator (server)**, the **executor (agent-nodejs)**, and the **frontend**.
> Last updated: 2026-03-24

---

## 1. Polling

**`GET /liana/v1/workflow-step-executions/pending?runId=<runId>`**

The executor polls for the current pending step of a run. The server must return **one object** (not an array), or `null` if the run is not found.

```typescript
interface PendingStepExecution {
runId: string;
stepId: string;
stepIndex: number;
baseRecordRef: RecordRef;
stepDefinition: StepDefinition;
previousSteps: Step[];
userConfirmed?: boolean; // true = user confirmed a pending action on this step
}
```

> **`null` response** → executor throws `RunNotFoundError` → HTTP 404 returned to caller.

### RecordRef

Lightweight pointer to a specific record.

```typescript
interface RecordRef {
collectionName: string;
recordId: Array<string | number>;
stepIndex: number; // index of the workflow step that loaded this record
}
```

### Step

History entry for an already-executed step (used in `previousSteps`).

```typescript
interface Step {
stepDefinition: StepDefinition;
stepOutcome: StepOutcome;
}
```

### StepDefinition

Discriminated union on `type`.

```typescript
type StepDefinition =
| ConditionStepDefinition
| RecordTaskStepDefinition
| McpTaskStepDefinition;

interface ConditionStepDefinition {
type: "condition";
options: [string, ...string[]]; // at least one option required
prompt?: string;
aiConfigName?: string;
}

interface RecordTaskStepDefinition {
type: "read-record"
| "update-record"
| "trigger-action"
| "load-related-record";
prompt?: string;
aiConfigName?: string;
automaticExecution?: boolean;
}

interface McpTaskStepDefinition {
type: "mcp-task";
mcpServerId?: string;
prompt?: string;
aiConfigName?: string;
automaticExecution?: boolean;
}
```

### StepOutcome

What the executor previously reported for each past step (used in `previousSteps`).

```typescript
type StepOutcome =
| ConditionStepOutcome
| RecordTaskStepOutcome
| McpTaskStepOutcome;

interface ConditionStepOutcome {
type: "condition";
stepId: string;
stepIndex: number;
status: "success" | "error" | "manual-decision";
selectedOption?: string; // present when status = "success"
error?: string; // present when status = "error"
}

interface RecordTaskStepOutcome {
type: "record-task";
stepId: string;
stepIndex: number;
status: "success" | "error" | "awaiting-input";
error?: string; // present when status = "error"
}

interface McpTaskStepOutcome {
type: "mcp-task";
stepId: string;
stepIndex: number;
status: "success" | "error" | "awaiting-input";
error?: string; // present when status = "error"
}
```

---

## 2. Step Result

**`POST /liana/v1/workflow-step-executions/<runId>/complete`**

After executing a step, the executor posts the outcome back to the server. The body is one of the `StepOutcome` shapes above.

> ⚠️ **NEVER contains client data** (field values, AI reasoning, etc.) — those stay in the `RunStore` on the client side.

---

## 3. Pending Data

Steps that require user input pause with `status: "awaiting-input"`. The frontend writes `pendingData` to unblock them via a dedicated endpoint on the executor HTTP server.

> **TODO** — The pending-data write endpoint is not yet implemented. Route, method, and per-step-type body shapes are TBD (PRD-240).

Once written, the frontend calls `POST /runs/:runId/trigger` and the executor resumes with `userConfirmed: true`.

### update-record — user picks a field + value to write

> **TODO** — Pending-data write endpoint TBD (PRD-240).

```typescript
interface UpdateRecordPendingData {
name: string; // technical field name
displayName: string; // label shown in the UI
value: string; // value chosen by the user
}
```

### trigger-action — user confirmation only

No payload required from the frontend. The executor selects the action and writes `pendingData` itself (action name + displayName) to the RunStore. The frontend just confirms:

```
POST /runs/:runId/trigger
```

### load-related-record — user picks the relation and/or the record

The frontend can override **both** the relation (field) and the selected record.

> **Current status** — The frontend cannot yet override the AI selection. The executor HTTP server does not yet expose the pending-data write endpoint. Until it is implemented, the executor writes the AI's pick directly into `selectedRecordId`.

```typescript
// Written by the executor; overwritable by the frontend via the pending-data endpoint (TBD)
interface LoadRelatedRecordPendingData {
name: string; // technical relation name
displayName: string; // label shown in the UI
relatedCollectionName: string; // collection of the related record
suggestedFields?: string[]; // fields suggested for display
selectedRecordId: Array<string|number>; // AI's pick; overwritten by the frontend via the pending-data endpoint
}
```

The executor initially writes the AI's pick into `selectedRecordId`. The pending-data endpoint overwrites it (and optionally `name`, `displayName`, `relatedCollectionName`) when the user changes the selection.

#### Future endpoint — pending-data write (not yet implemented)

> **TODO** — Route and method TBD (PRD-240).

Request body:

```typescript
{
selectedRecordId?: Array<string | number>; // record chosen by the user
name?: string; // relation changed
displayName?: string; // relation changed
relatedCollectionName?: string; // required if name is provided
}
```

Response: `204 No Content`.

The frontend calls this endpoint **before** `POST /runs/:runId/trigger`. On the next poll, `userConfirmed: true` and the executor reads `selectedRecordId` from the RunStore.

### mcp-task — user confirmation only

No payload required from the frontend. The executor selects the tool and writes `pendingData` itself (tool name + input) to the RunStore. The frontend just confirms:

```
POST /runs/:runId/trigger
```

The executor resumes with `userConfirmed: true` and executes the pre-selected tool.

---

## Flow Summary

```
Orchestrator ──► GET pending?runId=X ──► Executor
executes step
┌───────────────┴───────────────┐
needs input done
│ │
status: awaiting-input POST /complete
│ (StepOutcome)
Frontend writes pendingData
to executor HTTP server TODO: route TBD
POST /runs/:runId/trigger
(next poll: userConfirmed = true)
Executor resumes
```
65 changes: 65 additions & 0 deletions packages/ai-proxy/src/ai-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import type { McpConfiguration } from './mcp-client';
import type { AiConfiguration } from './provider';
import type { Logger } from '@forestadmin/datasource-toolkit';
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';

import { createBaseChatModel } from './create-base-chat-model';
import { AINotConfiguredError } from './errors';
import getAiConfiguration from './get-ai-configuration';
import McpClient from './mcp-client';
import validateAiConfigurations from './validate-ai-configurations';

// eslint-disable-next-line import/prefer-default-export
export class AiClient {
private readonly aiConfigurations: AiConfiguration[];
private readonly logger?: Logger;
private readonly modelCache = new Map<string, BaseChatModel>();
private mcpClient?: McpClient;

constructor(params?: { aiConfigurations?: AiConfiguration[]; logger?: Logger }) {
this.aiConfigurations = params?.aiConfigurations ?? [];
this.logger = params?.logger;

validateAiConfigurations(this.aiConfigurations);
}

getModel(aiName?: string): BaseChatModel {
const config = getAiConfiguration(this.aiConfigurations, aiName, this.logger);
if (!config) throw new AINotConfiguredError();

const cached = this.modelCache.get(config.name);
if (cached) return cached;

const model = createBaseChatModel(config);
this.modelCache.set(config.name, model);

return model;
}

async loadRemoteTools(mcpConfig: McpConfiguration): Promise<McpClient['tools']> {
await this.closeMcpClient('Error closing previous MCP connection');

const newClient = new McpClient(mcpConfig, this.logger);
const tools = await newClient.loadTools();
this.mcpClient = newClient;

return tools;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium src/ai-client.ts:39

In loadRemoteTools, if newClient.loadTools() throws, the McpClient instance is orphaned and never closed. Since this.mcpClient is only assigned after success, closeConnections() cannot reach the failed client, leaving resources leaked. Consider wrapping loadTools() in a try/finally to ensure cleanup on failure.

  async loadRemoteTools(mcpConfig: McpConfiguration): Promise<McpClient['tools']> {
     await this.closeMcpClient('Error closing previous MCP connection');
 
     const newClient = new McpClient(mcpConfig, this.logger);
-    const tools = await newClient.loadTools();
-    this.mcpClient = newClient;
+    try {
+      const tools = await newClient.loadTools();
+      this.mcpClient = newClient;
 
-    return tools;
+      return tools;
+    } catch (error) {
+      await newClient.closeConnections();
+      throw error;
+    }
   }
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file packages/ai-proxy/src/ai-client.ts around lines 39-47:

In `loadRemoteTools`, if `newClient.loadTools()` throws, the `McpClient` instance is orphaned and never closed. Since `this.mcpClient` is only assigned after success, `closeConnections()` cannot reach the failed client, leaving resources leaked. Consider wrapping `loadTools()` in a try/finally to ensure cleanup on failure.

Evidence trail:
packages/ai-proxy/src/ai-client.ts lines 39-45 (loadRemoteTools function) and lines 52-60 (closeMcpClient function) at REVIEWED_COMMIT. The code shows newClient is instantiated at line 42, loadTools() is called at line 43, and this.mcpClient is assigned at line 44. If line 43 throws, line 44 never executes, leaving newClient orphaned with no cleanup path.


async closeConnections(): Promise<void> {
await this.closeMcpClient('Error during MCP connection cleanup');
}

private async closeMcpClient(errorMessage: string): Promise<void> {
if (!this.mcpClient) return;

try {
await this.mcpClient.closeConnections();
} catch (error) {
const err = error instanceof Error ? error : new Error(String(error));
this.logger?.('Error', errorMessage, err);
} finally {
this.mcpClient = undefined;
}
}
}
26 changes: 26 additions & 0 deletions packages/ai-proxy/src/create-base-chat-model.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import type { AiConfiguration } from './provider';
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';

import { ChatAnthropic } from '@langchain/anthropic';
import { ChatOpenAI } from '@langchain/openai';

import { AIBadRequestError } from './errors';

// eslint-disable-next-line import/prefer-default-export
export function createBaseChatModel(config: AiConfiguration): BaseChatModel {
if (config.provider === 'openai') {
const { provider, name, ...opts } = config;

return new ChatOpenAI({ maxRetries: 0, ...opts });
}

if (config.provider === 'anthropic') {
const { provider, name, model, ...opts } = config;

return new ChatAnthropic({ maxRetries: 0, ...opts, model });
}

throw new AIBadRequestError(
`Unsupported AI provider '${(config as { provider: string }).provider}'.`,
);
}
28 changes: 28 additions & 0 deletions packages/ai-proxy/src/get-ai-configuration.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import type { AiConfiguration } from './provider';
import type { Logger } from '@forestadmin/datasource-toolkit';

export default function getAiConfiguration(
aiConfigurations: AiConfiguration[],
aiName?: string,
logger?: Logger,
): AiConfiguration | null {
if (aiConfigurations.length === 0) return null;

if (aiName) {
const config = aiConfigurations.find(c => c.name === aiName);

if (!config) {
const fallback = aiConfigurations[0];
logger?.(
'Warn',
`AI configuration '${aiName}' not found. Falling back to '${fallback.name}' (provider: ${fallback.provider}, model: ${fallback.model})`,
);

return fallback;
}

return config;
}

return aiConfigurations[0];
}
9 changes: 9 additions & 0 deletions packages/ai-proxy/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ import type { McpConfiguration } from './mcp-client';
import McpConfigChecker from './mcp-config-checker';

export { createAiProvider } from './create-ai-provider';
export { createBaseChatModel } from './create-base-chat-model';
export { default as ProviderDispatcher } from './provider-dispatcher';
export * from './provider-dispatcher';
export * from './ai-client';
export * from './remote-tools';
export { default as RemoteTool } from './remote-tool';
export * from './router';
export * from './mcp-client';
export * from './oauth-token-injector';
Expand All @@ -14,3 +17,9 @@ export * from './errors';
export function validMcpConfigurationOrThrow(mcpConfig: McpConfiguration) {
return McpConfigChecker.check(mcpConfig);
}

export type { BaseChatModel } from '@langchain/core/language_models/chat_models';
export type { BaseMessage } from '@langchain/core/messages';
export { HumanMessage, SystemMessage } from '@langchain/core/messages';
export type { StructuredToolInterface } from '@langchain/core/tools';
export { DynamicStructuredTool } from '@langchain/core/tools';
Loading
Loading