Event-sourced pipeline orchestration with declarative workflow definitions, reactive event handling, and real-time SSE monitoring.
Without @auto-engineer/pipeline, you would have to manually wire event-to-command mappings, coordinate phased execution across ordered groups, track scatter-gather completion for parallel commands, implement settled-handler logic for fan-in aggregation, and build your own SSE streaming layer for real-time pipeline visibility.
- Event -- An immutable fact that has occurred (e.g.
CodeAnalyzed,DeploySucceeded). - Command -- An intention to perform an action (e.g.
AnalyzeCode,Deploy). - Handler Descriptor -- A declarative rule that maps events to commands. Five types exist:
emit,custom,run-await,foreach-phased, andsettled. - Pipeline -- A named collection of handler descriptors plus a
toGraph()method that produces aGraphIRfor visualization. - PipelineRuntime -- Indexes handler descriptors by event type and executes matching handlers when events arrive.
- PipelineServer -- An Express HTTP server that wires everything together: command dispatch, event routing, settled/phased bridges, SSE broadcast, and an in-memory event store with projections.
- GraphIR -- An intermediate representation of the pipeline as typed nodes (
event,command,settled,phased,await) and edges, used for Mermaid rendering and UI visualization.
pnpm add @auto-engineer/pipelineimport { define, PipelineServer } from '@auto-engineer/pipeline';
const pipeline = define('my-sdlc')
.on('CodePushed')
.emit('AnalyzeCode', (e) => ({ filePath: e.data.path }))
.on('CodeAnalyzed')
.emit('Deploy', { version: '1.0.0' })
.build();
const server = new PipelineServer({ port: 3000 });
server.registerPipeline(pipeline);
await server.start();
console.log(`Pipeline server running on port ${server.port}`);import { define } from '@auto-engineer/pipeline';
const pipeline = define('simple')
.on('TriggerEvent')
.emit('DoWork', { key: 'value' })
.build();const pipeline = define('conditional')
.on('CodeAnalyzed')
.when((e) => e.data.issues.length === 0)
.emit('Deploy', {})
.build();const pipeline = define('custom')
.on('SpecialEvent')
.handle(async (event, ctx) => {
await ctx.emit('ProcessingDone', { result: event.data.value });
}, { emits: ['ProcessingDone'] })
.build();import { dispatch } from '@auto-engineer/pipeline';
const pipeline = define('scatter-gather')
.on('FilesDiscovered')
.run((e) => e.data.files.map((f) => dispatch('ProcessFile', { path: f })))
.awaitAll('ProcessFile', (e) => e.data.path)
.onSuccess('AllFilesProcessed', (ctx) => ({
count: ctx.results.length,
duration: ctx.duration,
}))
.onFailure('FileProcessingFailed', (ctx) => ({
failures: ctx.failures,
}))
.build();Process items in ordered groups where each phase completes before the next begins.
const pipeline = define('phased')
.on('ComponentsReady')
.forEach((e) => e.data.components)
.groupInto(['critical', 'normal'], (c) => c.priority)
.process('ProcessComponent', (c) => ({ path: c.path }))
.stopOnFailure()
.onComplete({
success: 'AllComplete',
failure: 'ProcessingFailed',
itemKey: (e) => e.data.path,
})
.build();Fire logic when all specified commands have completed.
const pipeline = define('aggregator')
.settled(['TaskA', 'TaskB', 'TaskC'])
.dispatch({ dispatches: ['FinalizeProcess'] }, (events, send) => {
send('FinalizeProcess', { combined: Object.values(events).flat() });
})
.build();import { PipelineServer } from '@auto-engineer/pipeline';
import type { CommandHandlerWithMetadata } from '@auto-engineer/pipeline';
const handlers: CommandHandlerWithMetadata[] = [
{
name: 'AnalyzeCode',
events: ['CodeAnalyzed'],
handle: async (cmd) => ({
type: 'CodeAnalyzed',
data: { path: cmd.data.filePath, issues: [] },
}),
},
];
const server = new PipelineServer({ port: 3000 });
server.registerCommandHandlers(handlers);
server.registerPipeline(pipeline);
await server.start();server.registerConcurrency('ProcessFile', {
strategy: 'cancel-in-progress', // or 'queue'
groupKey: (data) => data.path,
});import { pipelineConfig, loadPipelineConfig } from '@auto-engineer/pipeline';
const config = pipelineConfig({
plugins: ['@auto-engineer/code-analyzer', '@auto-engineer/deployer'],
pipeline: myPipeline,
});
const { handlers, pipeline } = await loadPipelineConfig(config);const url = new URL('/events', 'http://localhost:3000');
url.searchParams.set('correlationId', 'corr-123'); // optional filter
const eventSource = new EventSource(url.toString());
eventSource.onmessage = (msg) => {
const event = JSON.parse(msg.data);
console.log(event.type, event.data);
};import { defineV2, toGraphV2 } from '@auto-engineer/pipeline';
const pipeline = defineV2('my-pipeline')
.on('Start')
.emit('DoWork', { key: 'value' })
.build();
const graph = toGraphV2(pipeline);// Builder
export { define } from './builder/define';
export { defineV2, toGraphV2 } from './builder/define-v2';
// Core helpers
export { dispatch } from './core/types';
// Runtime
export { PipelineRuntime } from './runtime/pipeline-runtime';
export { EventCommandMapper } from './runtime/event-command-map';
export { AwaitTracker } from './runtime/await-tracker';
// Server
export { PipelineServer } from './server/pipeline-server';
export { SSEManager } from './server/sse-manager';
// Logging
export { EventLogger } from './logging/event-logger';
// Engine (v2)
export { createPipelineEngine } from './engine/pipeline-engine';
export { createCommandDispatcher, dispatchAndStore } from './engine/command-dispatcher';
export { createEventRouter } from './engine/event-router';
export { createWorkflowProcessor } from './engine/workflow-processor';
export { createPipelineStore } from './engine/sqlite-store';
export { createConsumer } from './engine/sqlite-consumer';
export { createAwaitWorkflow } from './engine/workflows/await-workflow';
export { createPhasedWorkflow } from './engine/workflows/phased-workflow';
export { createSettledWorkflow } from './engine/workflows/settled-workflow';
// Engine projections
export { itemStatusProjection } from './engine/projections/item-status';
export { latestRunProjection } from './engine/projections/latest-run';
export { messageLogProjection } from './engine/projections/message-log';
export { nodeStatusProjection } from './engine/projections/node-status';
export { statsProjection } from './engine/projections/stats';
// Store
export { createPipelineEventStore } from './store/pipeline-event-store';
export { PipelineReadModel } from './store/pipeline-read-model';
// Testing utilities
export {
compareEventSequence,
containsSubsequence,
findMissingEvents,
findUnexpectedEvents,
formatSnapshotDiff,
} from './testing/snapshot-compare';interface Pipeline {
descriptor: Readonly<PipelineDescriptor>;
toGraph(): GraphIR;
}
interface PipelineDescriptor {
name: string;
version?: string;
description?: string;
keys: Map<string, KeyExtractor>;
handlers: HandlerDescriptor[];
}
type HandlerDescriptor =
| EmitHandlerDescriptor
| RunAwaitHandlerDescriptor
| ForEachPhasedDescriptor
| CustomHandlerDescriptor
| SettledHandlerDescriptor
| AcceptsDescriptor;
interface CommandDispatch<D = Record<string, unknown>> {
commandType: string;
data: D | ((event: Event) => D);
}
interface PipelineServerConfig {
port: number;
storeFileName?: string; // SQLite file for persistent event storage
}
interface CommandHandlerWithMetadata {
name: string;
alias?: string;
description?: string;
displayName?: string;
fields?: Record<string, unknown>;
examples?: unknown[];
events?: EventDefinition[];
handle: (command: Command, context?: PipelineContext) => Promise<Event | Event[]>;
}
interface PipelineContext {
emit: (type: string, data: unknown) => Promise<void>;
sendCommand: (type: string, data: unknown, correlationId?: string) => Promise<void>;
correlationId: string;
signal?: AbortSignal;
}
interface ConcurrencyConfig {
strategy: 'cancel-in-progress' | 'queue';
groupKey?: (data: unknown) => string;
}
interface GraphIR {
nodes: GraphNode[];
edges: GraphEdge[];
}
interface GraphNode {
id: string;
type: 'event' | 'command' | 'settled' | 'phased' | 'await';
label: string;
status?: 'idle' | 'running' | 'success' | 'error';
}Entry point for the fluent builder API. Chain .on(), .emit(), .run(), .forEach(), .settled(), .handle(), and .build().
Alternative builder that produces a PipelineV2 with flat registration arrays. Convert to graph with toGraphV2().
Helper to create CommandDispatch objects for use with .run().
class PipelineServer {
constructor(config: PipelineServerConfig);
registerCommandHandlers(handlers: CommandHandlerWithMetadata[]): void;
registerPipeline(pipeline: Pipeline): void;
registerConcurrency(commandType: string, config: ConcurrencyConfig): void;
registerItemKeyExtractor(commandType: string, extractor: (data: unknown) => string | undefined): void;
use(handler: express.RequestHandler): this;
start(): Promise<void>;
stop(): Promise<void>;
readonly port: number;
getHttpServer(): HttpServer;
getMessageBus(): MessageBus;
getRegisteredCommands(): string[];
getPipelineNames(): string[];
}| Endpoint | Method | Description |
|---|---|---|
/health |
GET | Health check with uptime |
/registry |
GET | List registered event and command handlers |
/pipeline |
GET | Pipeline graph with live node status |
/pipeline/mermaid |
GET | Mermaid diagram as plain text |
/pipeline/diagram |
GET | Self-contained HTML page with rendered diagram |
/command |
POST | Dispatch a command (returns ack with correlationId) |
/execute |
POST | Dispatch and wait for synchronous result |
/events |
GET | SSE stream (optional ?correlationId= filter) |
/messages |
GET | Full message log |
/stats |
GET | Aggregate message statistics |
/run-stats |
GET | Per-run item/node statistics |
function compareEventSequence(expected: string[], actual: string[]): SnapshotResult;
function containsSubsequence(sequence: string[], subsequence: string[]): boolean;
function findMissingEvents(sequence: string[], required: string[]): string[];
function findUnexpectedEvents(sequence: string[], allowed: string[]): string[];
function formatSnapshotDiff(result: SnapshotResult): string;src/
├── builder/ # define() and defineV2() fluent builder APIs
│ ├── define.ts # Primary builder with graph extraction
│ └── define-v2.ts # Simplified v2 builder
├── config/ # Pipeline configuration and plugin loading
│ └── pipeline-config.ts
├── core/ # Shared types and descriptor definitions
│ ├── descriptors.ts # Handler descriptor types
│ └── types.ts # Event, Command, CommandDispatch
├── engine/ # v2 engine: dispatcher, router, workflows, SQLite store
│ ├── command-dispatcher.ts
│ ├── event-router.ts
│ ├── pipeline-engine.ts
│ ├── sqlite-consumer.ts
│ ├── sqlite-store.ts
│ ├── workflow-processor.ts
│ ├── projections/ # Event-driven projections (item, node, run, stats)
│ └── workflows/ # Await, phased, and settled workflow implementations
├── graph/ # GraphIR types and filtering
│ ├── types.ts
│ └── filter-graph.ts
├── logging/ # EventLogger for recording pipeline activity
├── plugins/ # Dynamic plugin loading and handler adaptation
│ ├── plugin-loader.ts
│ └── handler-adapter.ts
├── projections/ # In-memory event store projections
│ ├── await-tracker-projection.ts
│ ├── item-status-projection.ts
│ ├── latest-run-projection.ts
│ ├── message-log-projection.ts
│ ├── node-status-projection.ts
│ └── stats-projection.ts
├── runtime/ # PipelineRuntime, AwaitTracker, EventCommandMapper
├── server/ # PipelineServer, SSE, command gate, quiescence tracking
│ ├── pipeline-server.ts
│ ├── pipeline-server-v2.ts
│ ├── sse-manager.ts
│ ├── command-gate.ts
│ ├── quiescence-tracker.ts
│ ├── phased-bridge.ts
│ └── v2-runtime-bridge.ts
├── store/ # In-memory event store with Emmett projections
│ ├── pipeline-event-store.ts
│ └── pipeline-read-model.ts
└── testing/ # Test helpers: mock handlers, event capture, snapshots
├── event-capture.ts
├── mock-handlers.ts
├── snapshot-compare.ts
└── snapshot-sanitize.ts
flowchart TB
subgraph Builder
define["define() / defineV2()"]
end
subgraph Core
Pipeline["Pipeline + Descriptor"]
GraphIR["GraphIR"]
end
subgraph Runtime
PR["PipelineRuntime"]
AT["AwaitTracker"]
ECM["EventCommandMapper"]
end
subgraph Server
PS["PipelineServer"]
SSE["SSEManager"]
CG["CommandGate"]
QT["QuiescenceTracker"]
end
subgraph Store
ES["EventStore + Projections"]
RM["PipelineReadModel"]
end
define --> Pipeline
Pipeline --> GraphIR
Pipeline --> PR
PS --> PR
PS --> SSE
PS --> CG
PS --> QT
PS --> ES
ES --> RM
PR --> AT
PR --> ECM
| Package | Usage |
|---|---|
@auto-engineer/message-bus |
Command/event messaging infrastructure |
@auto-engineer/file-store |
File storage utilities |
@event-driven-io/emmett |
In-memory event store with projections |
@event-driven-io/emmett-sqlite |
SQLite-backed persistent event store |
express |
HTTP server for REST + SSE endpoints |
nanoid |
Compact unique ID generation |
chokidar |
File system watching |
jose |
JWT handling |
get-port |
Automatic port selection |