Skip to content

Latest commit

 

History

History

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 

README.md

@auto-engineer/message-store — Stream-based persistence for commands and events

Purpose

Without @auto-engineer/message-store, you would have to build your own stream-oriented message persistence with revision tracking, session management, filtering, and optimistic concurrency control.

Key Concepts

  • Streams — Messages are grouped by streamId. Each stream tracks its own revision number, incremented on every append.
  • Global position — Every message gets a monotonically increasing position across all streams, giving a total ordering of all writes.
  • Sessions — A session groups related messages together. Creating a session returns a sessionId that is stamped onto every message saved while the session is active.
  • Optimistic concurrency — Pass an expectedRevision when saving. If the stream's current revision does not match, the write is rejected.
  • Message type inference — The store infers whether a message is a command or event from an explicit parameter, the stream name, or the presence of a requestId field.
  • Pluggable backends — The IMessageStore interface allows alternate implementations. The built-in MemoryMessageStore stores everything in-process.

Installation

pnpm add @auto-engineer/message-store

Quick Start

import { MemoryMessageStore } from '@auto-engineer/message-store';

const store = new MemoryMessageStore();

// Save a command
await store.saveMessage('user-commands', {
  type: 'CreateUser',
  data: { name: 'Alice', email: 'alice@example.com' },
  requestId: 'req-123',
});

// Read it back
const messages = await store.getMessages('user-commands');
// [{ streamId: 'user-commands', message: {...}, revision: 0n, position: 1n, ... }]

How-to Guides

Save multiple messages in one call

await store.saveMessages('order-events', [
  { type: 'OrderPlaced', data: { orderId: 'ord-1', total: 42 } },
  { type: 'OrderConfirmed', data: { orderId: 'ord-1' } },
]);

Use sessions to group related activity

const sessionId = await store.createSession();

await store.saveMessage('commands', { type: 'StartProcess', data: {} });
await store.saveMessage('events', { type: 'ProcessStarted', data: {} });

const sessionMessages = await store.getSessionMessages(sessionId);
await store.endSession(sessionId);

Filter messages

// Commands from the last hour with specific names
const recentCommands = await store.getAllCommands({
  fromTimestamp: new Date(Date.now() - 3_600_000),
  messageNames: ['CreateUser', 'UpdateUser'],
});

// All messages sharing a correlation ID
const correlated = await store.getAllMessages({
  correlationId: 'corr-456',
});

// Filter on message data fields
const matched = await store.getAllMessages({
  jsonFilter: { orderId: 'ord-1' },
});

Enforce optimistic concurrency

await store.saveMessage('orders-123', command1); // revision becomes 0n

try {
  // This expects the stream to be empty (revision -1n), but it is at 0n
  await store.saveMessage('orders-123', command2, BigInt(-1));
} catch (err) {
  // "Expected revision -1 but stream is at revision 0"
}

API Reference

Exports

import {
  MemoryMessageStore,
  type IMessageStore,
  type ILocalMessageStore,
  type Message,
  type MessageType,
  type PositionalMessage,
  type MessageFilter,
  type StreamInfo,
  type SessionInfo,
  type MessageStoreStats,
} from '@auto-engineer/message-store';

IMessageStore

Method Signature Description
saveMessage (streamId: string, message: Message, expectedRevision?: bigint, messageType?: MessageType) => Promise<void> Save a single message to a stream
saveMessages (streamId: string, messages: Message[], expectedRevision?: bigint, messageType?: MessageType) => Promise<void> Save multiple messages to a stream
getMessages (streamId: string, fromRevision?: bigint, count?: number) => Promise<PositionalMessage[]> Read messages from a stream
getAllMessages (filter?: MessageFilter, count?: number) => Promise<PositionalMessage[]> Read all messages across streams
getAllCommands (filter?, count?) => Promise<PositionalMessage[]> Shorthand for getAllMessages with messageType: 'command'
getAllEvents (filter?, count?) => Promise<PositionalMessage[]> Shorthand for getAllMessages with messageType: 'event'
getStreamInfo (streamId: string) => Promise<StreamInfo | null> Get metadata about a stream
getStreams () => Promise<string[]> List all stream IDs
getSessions () => Promise<SessionInfo[]> List all sessions
getSessionInfo (sessionId: string) => Promise<SessionInfo | null> Get metadata about a session
getSessionMessages (sessionId: string, filter?, count?) => Promise<PositionalMessage[]> Get messages for a session
getStats () => Promise<MessageStoreStats> Get aggregate statistics
dispose () => Promise<void> Release resources

ILocalMessageStore (extends IMessageStore)

Method Signature Description
reset () => Promise<void> Clear all stored messages, sessions, and state
createSession () => Promise<string> Start a new session, returns the session ID
endSession (sessionId: string) => Promise<void> Mark a session as ended

Types

type Message = Command | Event; // from @auto-engineer/message-bus

type MessageType = 'command' | 'event';

interface PositionalMessage {
  streamId: string;
  message: Message;
  messageType: MessageType;
  revision: bigint;   // position within the stream
  position: bigint;   // global position across all streams
  timestamp: Date;
  sessionId: string;
}

interface MessageFilter {
  messageType?: MessageType;
  messageNames?: string[];
  streamId?: string;
  sessionId?: string;
  correlationId?: string;
  requestId?: string;
  fromPosition?: bigint;
  toPosition?: bigint;
  fromTimestamp?: Date;
  toTimestamp?: Date;
  jsonFilter?: Record<string, unknown>;
}

interface StreamInfo {
  streamId: string;
  revision: bigint;
  messageCount: number;
  firstPosition: bigint;
  lastPosition: bigint;
  createdAt: Date;
  lastUpdated: Date;
}

interface SessionInfo {
  sessionId: string;
  startedAt: Date;
  messageCount: number;
  commandCount: number;
  eventCount: number;
  lastActivity: Date;
}

interface MessageStoreStats {
  totalMessages: number;
  totalCommands: number;
  totalEvents: number;
  totalStreams: number;
  totalSessions: number;
  memoryUsage?: number;
  oldestMessage?: Date;
  newestMessage?: Date;
}

Architecture

src/
├── index.ts                          # Re-exports all public API
├── interfaces/
│   ├── IMessageStore.ts              # IMessageStore and ILocalMessageStore interfaces
│   └── types.ts                      # Message, PositionalMessage, filter and info types
└── memory/
    └── MemoryMessageStore.ts         # In-memory implementation

Dependencies

Package Role
@auto-engineer/message-bus Provides Command and Event types that compose Message
debug Namespaced debug logging (auto:message-store:memory)
nanoid Generates unique session IDs