Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .changeset/agent-skills.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
"@trigger.dev/build": patch
"trigger.dev": patch
---

Add Agent Skills for `chat.agent`. Drop a folder with a `SKILL.md` and any helper scripts/references next to your task code, register it with `skills.define({ id, path })`, and the CLI bundles it into the deploy image automatically — no `trigger.config.ts` changes. The agent gets a one-line summary in its system prompt and discovers full instructions on demand via `loadSkill`, with `bash` and `readFile` tools scoped per-skill (path-traversal guards, output caps, abort-signal propagation).

```ts
const pdfSkill = skills.define({ id: "pdf-extract", path: "./skills/pdf-extract" });

chat.skills.set([await pdfSkill.local()]);
```

Built on the [AI SDK cookbook pattern](https://ai-sdk.dev/cookbook/guides/agent-skills) — portable across providers. SDK + CLI only for now; dashboard-editable `SKILL.md` text is on the roadmap.
33 changes: 33 additions & 0 deletions .changeset/chat-actions-no-turn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
---
"@trigger.dev/sdk": minor
---

`chat.agent` actions are no longer treated as turns. They fire `hydrateMessages` and `onAction` only — no `onTurnStart` / `prepareMessages` / `onBeforeTurnComplete` / `onTurnComplete`, no `run()`, no turn-counter increment. The trace span is named `chat action` instead of `chat turn N`.

`onAction` can now return a `StreamTextResult`, `string`, or `UIMessage` to produce a model response from the action; returning `void` (the previous and now default) is side-effect-only.

**Migration**: if you previously had `run()` branching on `payload.trigger === "action"`, return your `streamText(...)` from `onAction` instead. If you persisted in `onTurnComplete`, do that work inside `onAction`. For any other state-only action, just remove your skip-the-model workaround — the default is now correct.

```ts
// before
onAction: async ({ action }) => {
if (action.type === "regenerate") {
chat.store.set({ skipModelCall: false });
chat.history.slice(0, -1);
}
},
run: async ({ messages, signal }) => {
if (chat.store.get()?.skipModelCall) return;
return streamText({ model, messages, abortSignal: signal });
},

// after
onAction: async ({ action, messages, signal }) => {
if (action.type === "regenerate") {
chat.history.slice(0, -1);
return streamText({ model, messages, abortSignal: signal });
}
},
run: async ({ messages, signal }) =>
streamText({ model, messages, abortSignal: signal }),
```
8 changes: 8 additions & 0 deletions .changeset/chat-agent-delta-wire-snapshots.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
---

`chat.agent` wire is now delta-only — clients ship at most one new message per `.in/append` instead of the full `UIMessage[]` history. The agent rebuilds prior history at run boot from a JSON snapshot in object storage plus a `wait=0` replay of the `session.out` tail. Long chats stop hitting the 512 KiB body cap on `/realtime/v1/sessions/{id}/in/append`. Snapshot writes happen after every `onTurnComplete`, awaited so they survive idle suspend; reads happen only at run boot. Registering a `hydrateMessages` hook short-circuits both the snapshot read/write and the replay — the customer is the source of truth for history.

Custom transports that constructed `ChatTaskWirePayload` directly need to drop the `messages: UIMessage[]` field and use `message?: UIMessage` (singular). Built-in transports (`TriggerChatTransport`, `AgentChat`) handle the change below the customer-facing surface — most apps need no changes. Configure object-store env vars (`OBJECT_STORE_*`) on your webapp deployment if you haven't already; without an object store and without `hydrateMessages`, conversations don't survive run boundaries.
30 changes: 30 additions & 0 deletions .changeset/chat-agent.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
"@trigger.dev/sdk": minor
"@trigger.dev/core": patch
---

Run AI chats as durable Trigger.dev tasks. Define the agent in one function, wire `useChat` to it from React, and the conversation survives page refreshes, network blips, and process restarts — with built-in support for tools, HITL approvals, multi-turn state, and stop-mid-stream cancellation.

```ts
import { chat } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";

export const myChat = chat.agent({
id: "my-chat",
run: async ({ messages, signal }) =>
streamText({ model: openai("gpt-4o"), messages, abortSignal: signal }),
});
```

```tsx
import { useChat } from "@ai-sdk/react";
import { useTriggerChatTransport } from "@trigger.dev/sdk/chat/react";

const transport = useTriggerChatTransport({ task: "my-chat", accessToken });
const { messages, sendMessage } = useChat({ transport });
```

Lifecycle hooks (`onPreload`, `onTurnStart`, `onTurnComplete`, etc.) cover the common needs around persistence, validation, and post-turn work. `chat.store` gives you a typed shared-data slot the agent and client both read and write. `chat.endRun()` exits cleanly when the agent decides it's done. The transport's `watch` mode lets a dashboard tab observe a run without driving it.

Drops the pre-Sessions chat stream constants (`CHAT_STREAM_KEY`, `CHAT_MESSAGES_STREAM_ID`, `CHAT_STOP_STREAM_ID`) — migrate to `sessions.open(id).out` / `.in`.
34 changes: 34 additions & 0 deletions .changeset/chat-head-start.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
---
"@trigger.dev/sdk": minor
---

Add `chat.headStart` — an opt-in fast-path that runs the first turn's `streamText` step in your warm Next.js / Hono / Workers / Express handler while the trigger agent run boots in parallel. Cold-start TTFC drops by ~50% on the first message; the agent owns step 2+ (tool execution, persistence, hooks) so heavy deps stay where they belong.

```ts
// app/api/chat/route.ts (Next.js / any Web Fetch framework)
import { chat } from "@trigger.dev/sdk/chat-server";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";
import { headStartTools } from "@/lib/chat-tools-schemas"; // schema-only

export const POST = chat.headStart({
agentId: "ai-chat",
run: async ({ chat: chatHelper }) =>
streamText({
...chatHelper.toStreamTextOptions({ tools: headStartTools }),
model: openai("gpt-4o-mini"),
system: "You are a helpful AI assistant.",
}),
});
```

```tsx
// browser — opt in by pointing the transport at your handler
const transport = useTriggerChatTransport({
task: "ai-chat",
accessToken,
headStart: "/api/chat", // first-turn-only; turn 2+ bypasses the endpoint
});
```

For Node-only frameworks (Express, Fastify, Koa, raw `node:http`) use `chat.toNodeListener(handler)` to bridge the Web Fetch handler to `(req, res)`. Adds a new `@trigger.dev/sdk/chat-server` subpath; bundle stays Web Fetch–only with no `node:*` imports.
21 changes: 21 additions & 0 deletions .changeset/chat-history-read-primitives.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
"@trigger.dev/sdk": minor
---

Add read primitives to `chat.history` for HITL flows: `getPendingToolCalls()`, `getResolvedToolCalls()`, `extractNewToolResults(message)`, `getChain()`, and `findMessage(messageId)`. These lift the accumulator-walking logic that customers building human-in-the-loop tools were re-implementing into the SDK.

Use `getPendingToolCalls()` to gate fresh user turns while a tool call is awaiting an answer. Use `extractNewToolResults(message)` to dedup tool results when persisting to your own store — the helper returns only the parts whose `toolCallId` is not already resolved on the chain.

```ts
const pending = chat.history.getPendingToolCalls();
if (pending.length > 0) {
// an addToolOutput is expected before a new user message
}

onTurnComplete: async ({ responseMessage }) => {
const newResults = chat.history.extractNewToolResults(responseMessage);
for (const r of newResults) {
await db.toolResults.upsert({ id: r.toolCallId, output: r.output, errorText: r.errorText });
}
};
```
6 changes: 6 additions & 0 deletions .changeset/chat-session-attributes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
---

Stamp `gen_ai.conversation.id` (the chat id) on every span and metric emitted from inside a `chat.task` or `chat.agent` run. Lets you filter dashboard spans, runs, and metrics by the chat conversation that produced them — independent of the run boundary, so multi-run chats correlate cleanly. No code changes required on the user side.
8 changes: 8 additions & 0 deletions .changeset/mock-chat-agent-test-harness.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
---

Unit-test `chat.agent` definitions offline with `mockChatAgent` from `@trigger.dev/sdk/ai/test`. Drives a real agent's turn loop in-process — no network, no task runtime — so you can send messages, actions, and stop signals via driver methods, inspect captured output chunks, and verify hooks fire. Pairs with `MockLanguageModelV3` from `ai/test` for model mocking. `setupLocals` lets you pre-seed `locals` (DB clients, service stubs) before `run()` starts.

The broader `runInMockTaskContext` harness it's built on lives at `@trigger.dev/core/v3/test` — useful for unit-testing any task code, not just chat.
235 changes: 235 additions & 0 deletions apps/webapp/test/chat-snapshot-integration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
// Plan F.3: integration test that round-trips a `ChatSnapshotV1` blob
// through the SDK's snapshot helpers + a real MinIO backing store. Mirrors
// the testcontainer pattern from `objectStore.test.ts`.
//
// What this verifies end-to-end:
// - SDK's `writeChatSnapshot` calls `apiClient.createUploadPayloadUrl`
// to mint a presigned PUT, then PUTs JSON to it.
// - SDK's `readChatSnapshot` calls `apiClient.getPayloadUrl` to mint a
// presigned GET, then fetches and parses.
// - The webapp's `generatePresignedUrl` produces URLs MinIO accepts.
// - The blob round-trips with `version: 1` shape preserved.
// - 404 (no snapshot for a fresh session) returns `undefined`, not an
// error.
//
// This is the integration safety net behind the unit tests in
// `packages/trigger-sdk/test/chat-snapshot.test.ts` — those tests mock
// `fetch`; this one drives a real S3-compatible backend.

import { postgresAndMinioTest } from "@internal/testcontainers";
import { apiClientManager } from "@trigger.dev/core/v3";
import {
__readChatSnapshotProductionPathForTests as readChatSnapshot,
__writeChatSnapshotProductionPathForTests as writeChatSnapshot,
type ChatSnapshotV1,
} from "@trigger.dev/sdk/ai";
import type { UIMessage } from "ai";
import { afterEach, describe, expect, vi } from "vitest";
import { env } from "~/env.server";
import { generatePresignedUrl } from "~/v3/objectStore.server";

vi.setConfig({ testTimeout: 60_000 });

// ── Helpers ────────────────────────────────────────────────────────────

function makeSnapshot(opts: { messages?: UIMessage[]; lastOutEventId?: string } = {}): ChatSnapshotV1 {
return {
version: 1,
savedAt: 1_700_000_000_000,
messages: opts.messages ?? [
{
id: "u-1",
role: "user",
parts: [{ type: "text", text: "hello" }],
},
{
id: "a-1",
role: "assistant",
parts: [{ type: "text", text: "world" }],
},
],
lastOutEventId: opts.lastOutEventId ?? "evt-42",
lastOutTimestamp: 1_700_000_000_500,
};
}

/**
* Stub `apiClientManager.clientOrThrow()` so the SDK helpers see a fake
* api client whose `getPayloadUrl` / `createUploadPayloadUrl` return
* presigned URLs minted by the webapp's real `generatePresignedUrl`
* (which signs against MinIO).
*
* The SDK helpers internally do `fetch(presignedUrl, ...)` to read/write
* the blob, so MinIO ends up holding the actual bytes.
*/
function stubApiClient(opts: { projectRef: string; envSlug: string }) {
vi.spyOn(apiClientManager, "clientOrThrow").mockReturnValue({
async getPayloadUrl(filename: string) {
const result = await generatePresignedUrl(opts.projectRef, opts.envSlug, filename, "GET");
if (!result.success) throw new Error(result.error);
return { presignedUrl: result.url };
},
async createUploadPayloadUrl(filename: string) {
const result = await generatePresignedUrl(opts.projectRef, opts.envSlug, filename, "PUT");
if (!result.success) throw new Error(result.error);
return { presignedUrl: result.url };
},
} as never);
}

// Suppress noisy warnings from logger.warn during error-path tests.
let warnSpy: ReturnType<typeof vi.spyOn>;

afterEach(() => {
vi.restoreAllMocks();
warnSpy?.mockRestore();
});

// ── Tests ──────────────────────────────────────────────────────────────

describe("chat snapshot integration (MinIO + SDK helpers)", () => {
postgresAndMinioTest("round-trips a snapshot through real MinIO", async ({ minioConfig }) => {
env.OBJECT_STORE_BASE_URL = minioConfig.baseUrl;
env.OBJECT_STORE_ACCESS_KEY_ID = minioConfig.accessKeyId;
env.OBJECT_STORE_SECRET_ACCESS_KEY = minioConfig.secretAccessKey;
env.OBJECT_STORE_REGION = minioConfig.region;
env.OBJECT_STORE_DEFAULT_PROTOCOL = undefined;

stubApiClient({ projectRef: "proj_snap_rt", envSlug: "dev" });

const sessionId = "sess_round_trip_1";
const snapshot = makeSnapshot();

// Write through the SDK helper — should land in MinIO at
// `packets/proj_snap_rt/dev/sessions/sess_round_trip_1/snapshot.json`.
await writeChatSnapshot(sessionId, snapshot);

// Read back through the SDK helper — should reconstruct the original.
const result = await readChatSnapshot(sessionId);

expect(result).toEqual(snapshot);
});

postgresAndMinioTest("returns undefined for a fresh session with no snapshot", async ({ minioConfig }) => {
env.OBJECT_STORE_BASE_URL = minioConfig.baseUrl;
env.OBJECT_STORE_ACCESS_KEY_ID = minioConfig.accessKeyId;
env.OBJECT_STORE_SECRET_ACCESS_KEY = minioConfig.secretAccessKey;
env.OBJECT_STORE_REGION = minioConfig.region;
env.OBJECT_STORE_DEFAULT_PROTOCOL = undefined;

stubApiClient({ projectRef: "proj_snap_404", envSlug: "dev" });

warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});

// Session never had a snapshot written — read returns undefined.
const result = await readChatSnapshot("sess_never_existed");
expect(result).toBeUndefined();
});

postgresAndMinioTest("overwrites a prior snapshot in place (single-writer)", async ({ minioConfig }) => {
// The runtime guarantees one attempt alive at a time, and
// `writeChatSnapshot` runs awaited after `onTurnComplete`. Verify
// that a second write to the same key replaces the first cleanly —
// the read-after-write reflects the latest blob.
env.OBJECT_STORE_BASE_URL = minioConfig.baseUrl;
env.OBJECT_STORE_ACCESS_KEY_ID = minioConfig.accessKeyId;
env.OBJECT_STORE_SECRET_ACCESS_KEY = minioConfig.secretAccessKey;
env.OBJECT_STORE_REGION = minioConfig.region;
env.OBJECT_STORE_DEFAULT_PROTOCOL = undefined;

stubApiClient({ projectRef: "proj_snap_overwrite", envSlug: "dev" });

const sessionId = "sess_overwrite";

const turn1 = makeSnapshot({
messages: [
{ id: "u-1", role: "user", parts: [{ type: "text", text: "first" }] },
],
lastOutEventId: "evt-turn1",
});
const turn2 = makeSnapshot({
messages: [
{ id: "u-1", role: "user", parts: [{ type: "text", text: "first" }] },
{ id: "a-1", role: "assistant", parts: [{ type: "text", text: "reply-1" }] },
{ id: "u-2", role: "user", parts: [{ type: "text", text: "second" }] },
{ id: "a-2", role: "assistant", parts: [{ type: "text", text: "reply-2" }] },
],
lastOutEventId: "evt-turn2",
});

await writeChatSnapshot(sessionId, turn1);
await writeChatSnapshot(sessionId, turn2);

const result = await readChatSnapshot(sessionId);
expect(result).toEqual(turn2);
expect(result?.messages).toHaveLength(4);
expect(result?.lastOutEventId).toBe("evt-turn2");
});

postgresAndMinioTest("isolates snapshots by sessionId (no cross-talk)", async ({ minioConfig }) => {
env.OBJECT_STORE_BASE_URL = minioConfig.baseUrl;
env.OBJECT_STORE_ACCESS_KEY_ID = minioConfig.accessKeyId;
env.OBJECT_STORE_SECRET_ACCESS_KEY = minioConfig.secretAccessKey;
env.OBJECT_STORE_REGION = minioConfig.region;
env.OBJECT_STORE_DEFAULT_PROTOCOL = undefined;

stubApiClient({ projectRef: "proj_snap_iso", envSlug: "dev" });

const sessA = "sess_iso_A";
const sessB = "sess_iso_B";
const snapA = makeSnapshot({ lastOutEventId: "evt-A" });
const snapB = makeSnapshot({ lastOutEventId: "evt-B" });

await writeChatSnapshot(sessA, snapA);
await writeChatSnapshot(sessB, snapB);

const readA = await readChatSnapshot(sessA);
const readB = await readChatSnapshot(sessB);

expect(readA?.lastOutEventId).toBe("evt-A");
expect(readB?.lastOutEventId).toBe("evt-B");
// Distinct objects — modifying one shouldn't affect the other.
expect(readA?.lastOutEventId).not.toBe(readB?.lastOutEventId);
});

postgresAndMinioTest("handles snapshots with large message lists (~50 messages)", async ({ minioConfig }) => {
// Stress test: a 50-turn chat snapshot. Plan F.4 mentions the
// pre-change baseline grew past 512 KiB around turn 10-30 with tool
// use; the post-slim wire keeps wire payloads small but the snapshot
// itself can still get large. Verify the helpers handle a realistic
// payload size.
env.OBJECT_STORE_BASE_URL = minioConfig.baseUrl;
env.OBJECT_STORE_ACCESS_KEY_ID = minioConfig.accessKeyId;
env.OBJECT_STORE_SECRET_ACCESS_KEY = minioConfig.secretAccessKey;
env.OBJECT_STORE_REGION = minioConfig.region;
env.OBJECT_STORE_DEFAULT_PROTOCOL = undefined;

stubApiClient({ projectRef: "proj_snap_big", envSlug: "dev" });

const messages: UIMessage[] = [];
for (let i = 0; i < 50; i++) {
messages.push({
id: `u-${i}`,
role: "user",
parts: [{ type: "text", text: `user message ${i}: ${"x".repeat(200)}` }],
});
messages.push({
id: `a-${i}`,
role: "assistant",
parts: [{ type: "text", text: `assistant reply ${i}: ${"y".repeat(500)}` }],
});
}
const snapshot = makeSnapshot({ messages, lastOutEventId: "evt-50" });

await writeChatSnapshot("sess_big_chat", snapshot);
const result = await readChatSnapshot("sess_big_chat");

expect(result).toBeDefined();
expect(result!.messages).toHaveLength(100);
expect(result!.lastOutEventId).toBe("evt-50");
// Spot-check ordering integrity — the messages array round-tripped
// in the same order.
expect(result!.messages[0]!.id).toBe("u-0");
expect(result!.messages[99]!.id).toBe("a-49");
});
});
Loading
Loading