You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
@auto-engineer/message-store — Stream-based persistence for commands and events
Purpose
Without @auto-engineer/message-store, you would have to build your own stream-oriented message persistence with revision tracking, session management, filtering, and optimistic concurrency control.
Key Concepts
Streams — Messages are grouped by streamId. Each stream tracks its own revision number, incremented on every append.
Global position — Every message gets a monotonically increasing position across all streams, giving a total ordering of all writes.
Sessions — A session groups related messages together. Creating a session returns a sessionId that is stamped onto every message saved while the session is active.
Optimistic concurrency — Pass an expectedRevision when saving. If the stream's current revision does not match, the write is rejected.
Message type inference — The store infers whether a message is a command or event from an explicit parameter, the stream name, or the presence of a requestId field.
Pluggable backends — The IMessageStore interface allows alternate implementations. The built-in MemoryMessageStore stores everything in-process.
Installation
pnpm add @auto-engineer/message-store
Quick Start
import{MemoryMessageStore}from'@auto-engineer/message-store';conststore=newMemoryMessageStore();// Save a commandawaitstore.saveMessage('user-commands',{type: 'CreateUser',data: {name: 'Alice',email: 'alice@example.com'},requestId: 'req-123',});// Read it backconstmessages=awaitstore.getMessages('user-commands');// [{ streamId: 'user-commands', message: {...}, revision: 0n, position: 1n, ... }]
// Commands from the last hour with specific namesconstrecentCommands=awaitstore.getAllCommands({fromTimestamp: newDate(Date.now()-3_600_000),messageNames: ['CreateUser','UpdateUser'],});// All messages sharing a correlation IDconstcorrelated=awaitstore.getAllMessages({correlationId: 'corr-456',});// Filter on message data fieldsconstmatched=awaitstore.getAllMessages({jsonFilter: {orderId: 'ord-1'},});
Enforce optimistic concurrency
awaitstore.saveMessage('orders-123',command1);// revision becomes 0ntry{// This expects the stream to be empty (revision -1n), but it is at 0nawaitstore.saveMessage('orders-123',command2,BigInt(-1));}catch(err){// "Expected revision -1 but stream is at revision 0"}