Skip to content

Latest commit

 

History

History

README.md

@auto-engineer/pipeline

Event-sourced pipeline orchestration with declarative workflow definitions, reactive event handling, and real-time SSE monitoring.


Purpose

Without @auto-engineer/pipeline, you would have to manually wire event-to-command mappings, coordinate phased execution across ordered groups, track scatter-gather completion for parallel commands, implement settled-handler logic for fan-in aggregation, and build your own SSE streaming layer for real-time pipeline visibility.


Key Concepts

  • Event -- An immutable fact that has occurred (e.g. CodeAnalyzed, DeploySucceeded).
  • Command -- An intention to perform an action (e.g. AnalyzeCode, Deploy).
  • Handler Descriptor -- A declarative rule that maps events to commands. Five types exist: emit, custom, run-await, foreach-phased, and settled.
  • Pipeline -- A named collection of handler descriptors plus a toGraph() method that produces a GraphIR for visualization.
  • PipelineRuntime -- Indexes handler descriptors by event type and executes matching handlers when events arrive.
  • PipelineServer -- An Express HTTP server that wires everything together: command dispatch, event routing, settled/phased bridges, SSE broadcast, and an in-memory event store with projections.
  • GraphIR -- An intermediate representation of the pipeline as typed nodes (event, command, settled, phased, await) and edges, used for Mermaid rendering and UI visualization.

Installation

pnpm add @auto-engineer/pipeline

Quick Start

import { define, PipelineServer } from '@auto-engineer/pipeline';

const pipeline = define('my-sdlc')
  .on('CodePushed')
  .emit('AnalyzeCode', (e) => ({ filePath: e.data.path }))

  .on('CodeAnalyzed')
  .emit('Deploy', { version: '1.0.0' })

  .build();

const server = new PipelineServer({ port: 3000 });
server.registerPipeline(pipeline);

await server.start();
console.log(`Pipeline server running on port ${server.port}`);

How-to Guides

Define a pipeline with emit handlers

import { define } from '@auto-engineer/pipeline';

const pipeline = define('simple')
  .on('TriggerEvent')
  .emit('DoWork', { key: 'value' })
  .build();

Use conditional predicates

const pipeline = define('conditional')
  .on('CodeAnalyzed')
  .when((e) => e.data.issues.length === 0)
  .emit('Deploy', {})
  .build();

Use custom handlers

const pipeline = define('custom')
  .on('SpecialEvent')
  .handle(async (event, ctx) => {
    await ctx.emit('ProcessingDone', { result: event.data.value });
  }, { emits: ['ProcessingDone'] })
  .build();

Run commands and await completion

import { dispatch } from '@auto-engineer/pipeline';

const pipeline = define('scatter-gather')
  .on('FilesDiscovered')
  .run((e) => e.data.files.map((f) => dispatch('ProcessFile', { path: f })))
  .awaitAll('ProcessFile', (e) => e.data.path)
  .onSuccess('AllFilesProcessed', (ctx) => ({
    count: ctx.results.length,
    duration: ctx.duration,
  }))
  .onFailure('FileProcessingFailed', (ctx) => ({
    failures: ctx.failures,
  }))
  .build();

Use phased execution

Process items in ordered groups where each phase completes before the next begins.

const pipeline = define('phased')
  .on('ComponentsReady')
  .forEach((e) => e.data.components)
  .groupInto(['critical', 'normal'], (c) => c.priority)
  .process('ProcessComponent', (c) => ({ path: c.path }))
  .stopOnFailure()
  .onComplete({
    success: 'AllComplete',
    failure: 'ProcessingFailed',
    itemKey: (e) => e.data.path,
  })
  .build();

Use settled handlers

Fire logic when all specified commands have completed.

const pipeline = define('aggregator')
  .settled(['TaskA', 'TaskB', 'TaskC'])
  .dispatch({ dispatches: ['FinalizeProcess'] }, (events, send) => {
    send('FinalizeProcess', { combined: Object.values(events).flat() });
  })
  .build();

Register command handlers and start the server

import { PipelineServer } from '@auto-engineer/pipeline';
import type { CommandHandlerWithMetadata } from '@auto-engineer/pipeline';

const handlers: CommandHandlerWithMetadata[] = [
  {
    name: 'AnalyzeCode',
    events: ['CodeAnalyzed'],
    handle: async (cmd) => ({
      type: 'CodeAnalyzed',
      data: { path: cmd.data.filePath, issues: [] },
    }),
  },
];

const server = new PipelineServer({ port: 3000 });
server.registerCommandHandlers(handlers);
server.registerPipeline(pipeline);
await server.start();

Configure concurrency control

server.registerConcurrency('ProcessFile', {
  strategy: 'cancel-in-progress',  // or 'queue'
  groupKey: (data) => data.path,
});

Load plugins from packages

import { pipelineConfig, loadPipelineConfig } from '@auto-engineer/pipeline';

const config = pipelineConfig({
  plugins: ['@auto-engineer/code-analyzer', '@auto-engineer/deployer'],
  pipeline: myPipeline,
});

const { handlers, pipeline } = await loadPipelineConfig(config);

Consume SSE events

const url = new URL('/events', 'http://localhost:3000');
url.searchParams.set('correlationId', 'corr-123'); // optional filter

const eventSource = new EventSource(url.toString());
eventSource.onmessage = (msg) => {
  const event = JSON.parse(msg.data);
  console.log(event.type, event.data);
};

Use the v2 builder API

import { defineV2, toGraphV2 } from '@auto-engineer/pipeline';

const pipeline = defineV2('my-pipeline')
  .on('Start')
  .emit('DoWork', { key: 'value' })
  .build();

const graph = toGraphV2(pipeline);

API Reference

Package Exports

// Builder
export { define } from './builder/define';
export { defineV2, toGraphV2 } from './builder/define-v2';

// Core helpers
export { dispatch } from './core/types';

// Runtime
export { PipelineRuntime } from './runtime/pipeline-runtime';
export { EventCommandMapper } from './runtime/event-command-map';
export { AwaitTracker } from './runtime/await-tracker';

// Server
export { PipelineServer } from './server/pipeline-server';
export { SSEManager } from './server/sse-manager';

// Logging
export { EventLogger } from './logging/event-logger';

// Engine (v2)
export { createPipelineEngine } from './engine/pipeline-engine';
export { createCommandDispatcher, dispatchAndStore } from './engine/command-dispatcher';
export { createEventRouter } from './engine/event-router';
export { createWorkflowProcessor } from './engine/workflow-processor';
export { createPipelineStore } from './engine/sqlite-store';
export { createConsumer } from './engine/sqlite-consumer';
export { createAwaitWorkflow } from './engine/workflows/await-workflow';
export { createPhasedWorkflow } from './engine/workflows/phased-workflow';
export { createSettledWorkflow } from './engine/workflows/settled-workflow';

// Engine projections
export { itemStatusProjection } from './engine/projections/item-status';
export { latestRunProjection } from './engine/projections/latest-run';
export { messageLogProjection } from './engine/projections/message-log';
export { nodeStatusProjection } from './engine/projections/node-status';
export { statsProjection } from './engine/projections/stats';

// Store
export { createPipelineEventStore } from './store/pipeline-event-store';
export { PipelineReadModel } from './store/pipeline-read-model';

// Testing utilities
export {
  compareEventSequence,
  containsSubsequence,
  findMissingEvents,
  findUnexpectedEvents,
  formatSnapshotDiff,
} from './testing/snapshot-compare';

Key Types

interface Pipeline {
  descriptor: Readonly<PipelineDescriptor>;
  toGraph(): GraphIR;
}

interface PipelineDescriptor {
  name: string;
  version?: string;
  description?: string;
  keys: Map<string, KeyExtractor>;
  handlers: HandlerDescriptor[];
}

type HandlerDescriptor =
  | EmitHandlerDescriptor
  | RunAwaitHandlerDescriptor
  | ForEachPhasedDescriptor
  | CustomHandlerDescriptor
  | SettledHandlerDescriptor
  | AcceptsDescriptor;

interface CommandDispatch<D = Record<string, unknown>> {
  commandType: string;
  data: D | ((event: Event) => D);
}

interface PipelineServerConfig {
  port: number;
  storeFileName?: string;  // SQLite file for persistent event storage
}

interface CommandHandlerWithMetadata {
  name: string;
  alias?: string;
  description?: string;
  displayName?: string;
  fields?: Record<string, unknown>;
  examples?: unknown[];
  events?: EventDefinition[];
  handle: (command: Command, context?: PipelineContext) => Promise<Event | Event[]>;
}

interface PipelineContext {
  emit: (type: string, data: unknown) => Promise<void>;
  sendCommand: (type: string, data: unknown, correlationId?: string) => Promise<void>;
  correlationId: string;
  signal?: AbortSignal;
}

interface ConcurrencyConfig {
  strategy: 'cancel-in-progress' | 'queue';
  groupKey?: (data: unknown) => string;
}

interface GraphIR {
  nodes: GraphNode[];
  edges: GraphEdge[];
}

interface GraphNode {
  id: string;
  type: 'event' | 'command' | 'settled' | 'phased' | 'await';
  label: string;
  status?: 'idle' | 'running' | 'success' | 'error';
}

define(name: string): PipelineBuilder

Entry point for the fluent builder API. Chain .on(), .emit(), .run(), .forEach(), .settled(), .handle(), and .build().

defineV2(name: string): PipelineV2Builder

Alternative builder that produces a PipelineV2 with flat registration arrays. Convert to graph with toGraphV2().

dispatch(commandType: string, data: DataOrFactory): CommandDispatch

Helper to create CommandDispatch objects for use with .run().

PipelineServer

class PipelineServer {
  constructor(config: PipelineServerConfig);
  registerCommandHandlers(handlers: CommandHandlerWithMetadata[]): void;
  registerPipeline(pipeline: Pipeline): void;
  registerConcurrency(commandType: string, config: ConcurrencyConfig): void;
  registerItemKeyExtractor(commandType: string, extractor: (data: unknown) => string | undefined): void;
  use(handler: express.RequestHandler): this;
  start(): Promise<void>;
  stop(): Promise<void>;
  readonly port: number;
  getHttpServer(): HttpServer;
  getMessageBus(): MessageBus;
  getRegisteredCommands(): string[];
  getPipelineNames(): string[];
}

Server Endpoints

Endpoint Method Description
/health GET Health check with uptime
/registry GET List registered event and command handlers
/pipeline GET Pipeline graph with live node status
/pipeline/mermaid GET Mermaid diagram as plain text
/pipeline/diagram GET Self-contained HTML page with rendered diagram
/command POST Dispatch a command (returns ack with correlationId)
/execute POST Dispatch and wait for synchronous result
/events GET SSE stream (optional ?correlationId= filter)
/messages GET Full message log
/stats GET Aggregate message statistics
/run-stats GET Per-run item/node statistics

Testing Utilities

function compareEventSequence(expected: string[], actual: string[]): SnapshotResult;
function containsSubsequence(sequence: string[], subsequence: string[]): boolean;
function findMissingEvents(sequence: string[], required: string[]): string[];
function findUnexpectedEvents(sequence: string[], allowed: string[]): string[];
function formatSnapshotDiff(result: SnapshotResult): string;

Architecture

src/
├── builder/          # define() and defineV2() fluent builder APIs
│   ├── define.ts             # Primary builder with graph extraction
│   └── define-v2.ts          # Simplified v2 builder
├── config/           # Pipeline configuration and plugin loading
│   └── pipeline-config.ts
├── core/             # Shared types and descriptor definitions
│   ├── descriptors.ts        # Handler descriptor types
│   └── types.ts              # Event, Command, CommandDispatch
├── engine/           # v2 engine: dispatcher, router, workflows, SQLite store
│   ├── command-dispatcher.ts
│   ├── event-router.ts
│   ├── pipeline-engine.ts
│   ├── sqlite-consumer.ts
│   ├── sqlite-store.ts
│   ├── workflow-processor.ts
│   ├── projections/          # Event-driven projections (item, node, run, stats)
│   └── workflows/            # Await, phased, and settled workflow implementations
├── graph/            # GraphIR types and filtering
│   ├── types.ts
│   └── filter-graph.ts
├── logging/          # EventLogger for recording pipeline activity
├── plugins/          # Dynamic plugin loading and handler adaptation
│   ├── plugin-loader.ts
│   └── handler-adapter.ts
├── projections/      # In-memory event store projections
│   ├── await-tracker-projection.ts
│   ├── item-status-projection.ts
│   ├── latest-run-projection.ts
│   ├── message-log-projection.ts
│   ├── node-status-projection.ts
│   └── stats-projection.ts
├── runtime/          # PipelineRuntime, AwaitTracker, EventCommandMapper
├── server/           # PipelineServer, SSE, command gate, quiescence tracking
│   ├── pipeline-server.ts
│   ├── pipeline-server-v2.ts
│   ├── sse-manager.ts
│   ├── command-gate.ts
│   ├── quiescence-tracker.ts
│   ├── phased-bridge.ts
│   └── v2-runtime-bridge.ts
├── store/            # In-memory event store with Emmett projections
│   ├── pipeline-event-store.ts
│   └── pipeline-read-model.ts
└── testing/          # Test helpers: mock handlers, event capture, snapshots
    ├── event-capture.ts
    ├── mock-handlers.ts
    ├── snapshot-compare.ts
    └── snapshot-sanitize.ts
flowchart TB
    subgraph Builder
        define["define() / defineV2()"]
    end

    subgraph Core
        Pipeline["Pipeline + Descriptor"]
        GraphIR["GraphIR"]
    end

    subgraph Runtime
        PR["PipelineRuntime"]
        AT["AwaitTracker"]
        ECM["EventCommandMapper"]
    end

    subgraph Server
        PS["PipelineServer"]
        SSE["SSEManager"]
        CG["CommandGate"]
        QT["QuiescenceTracker"]
    end

    subgraph Store
        ES["EventStore + Projections"]
        RM["PipelineReadModel"]
    end

    define --> Pipeline
    Pipeline --> GraphIR
    Pipeline --> PR
    PS --> PR
    PS --> SSE
    PS --> CG
    PS --> QT
    PS --> ES
    ES --> RM
    PR --> AT
    PR --> ECM
Loading

Dependencies

Package Usage
@auto-engineer/message-bus Command/event messaging infrastructure
@auto-engineer/file-store File storage utilities
@event-driven-io/emmett In-memory event store with projections
@event-driven-io/emmett-sqlite SQLite-backed persistent event store
express HTTP server for REST + SSE endpoints
nanoid Compact unique ID generation
chokidar File system watching
jose JWT handling
get-port Automatic port selection