Skip to content

feat: Sessions primitive — durable run-aware streams + dashboard (1/5)#3542

Open
ericallam wants to merge 1 commit intomainfrom
feature/sessions-primitive
Open

feat: Sessions primitive — durable run-aware streams + dashboard (1/5)#3542
ericallam wants to merge 1 commit intomainfrom
feature/sessions-primitive

Conversation

@ericallam
Copy link
Copy Markdown
Member

Layer 1 of 5 in the chat.agent stack split

Adds Sessions, a durable, run-aware stream primitive that scopes
session.in / session.out records to a session (not a single run).
Records survive run boundaries; reconnect-from-last-event-id is built in.
This is the foundation everything else builds on.

Targets main — merge this first

Replaces #3173 (closed). Original branch backed up at backup/tri-7532-pre-stack-split-20260510.

Server foundation

  • New /realtime/v1/sessions/:session/:io/append + /records routes
  • sessionRunManager + sessionsRepository + clickhouseSessionsRepository
  • mintRunToken for short-lived per-session tokens
  • s2Append retry-with-backoff + undici cause diagnostics
  • /api/v[12]/packets/* exempt from customer rate limits
  • BackgroundWorker gains taskKind enum (TASK, AGENT, SCHEDULED)
  • TaskRun.taskKind column + clickhouse 029_add_task_kind_to_task_runs_v2

Core types

  • New sessionStreams, inputStreams, realtimeStreams modules in @trigger.dev/core
  • session-streams-api / realtime-streams-api surface

Sessions dashboard UI (the primitive's own viewer)

  • /sessions index + detail routes
  • SessionsTable, SessionFilters, SessionStatus, CloseSessionDialog
  • AGENT/SCHEDULED filter in RunFilters + TaskTriggerSource

Stack

  • L1 → main (this PR)
  • L2 → L1: chat.agent runtime
  • L3 → L2: browser chat client + transport
  • L4 → L3: agent-view dashboard
  • L5 → L4: ai-chat reference + MCP tooling

Adds Sessions, a durable, run-aware stream primitive that scopes
session.in / session.out records to a session (not a single run).
Records survive run boundaries; reconnect-from-last-event-id is built in.

Server foundation:
- New /realtime/v1/sessions/:session/:io/append + /records routes
- sessionRunManager + sessionsRepository + clickhouseSessionsRepository
- mintRunToken for short-lived per-session tokens
- s2Append retry-with-backoff + undici cause diagnostics
- /api/v[12]/packets/* exempt from customer rate limits
- BackgroundWorker schema gains taskKind enum (TASK, AGENT, SCHEDULED)
- TaskRun.taskKind column + clickhouse 029_add_task_kind_to_task_runs_v2

Core types:
- new sessionStreams, inputStreams, realtimeStreams packages in @trigger.dev/core
- session-streams-api / realtime-streams-api surface

Sessions dashboard UI (the primitive's own viewer):
- /sessions index + detail routes
- SessionsTable, SessionFilters, SessionStatus, CloseSessionDialog
- AGENT/SCHEDULED filter in RunFilters + TaskTriggerSource

Includes the sessions-primitive changeset.
@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented May 10, 2026

🦋 Changeset detected

Latest commit: b84d537

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 29 packages
Name Type
@trigger.dev/core Patch
@trigger.dev/sdk Patch
@trigger.dev/build Patch
trigger.dev Patch
@trigger.dev/python Patch
@trigger.dev/redis-worker Patch
@trigger.dev/schema-to-json Patch
@internal/cache Patch
@internal/clickhouse Patch
@internal/llm-model-catalog Patch
@internal/redis Patch
@internal/replication Patch
@internal/run-engine Patch
@internal/schedule-engine Patch
@internal/testcontainers Patch
@internal/tracing Patch
@internal/tsql Patch
@internal/zod-worker Patch
@internal/sdk-compat-tests Patch
d3-chat Patch
references-d3-openai-agents Patch
references-nextjs-realtime Patch
references-realtime-hooks-test Patch
references-realtime-streams Patch
references-telemetry Patch
@trigger.dev/react-hooks Patch
@trigger.dev/rsc Patch
@trigger.dev/database Patch
@trigger.dev/otlp-importer Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 10, 2026

Walkthrough

Adds sessions list/detail routes and UI, server presenters, and a close-session action. Introduces session stream APIs/managers and SSE endpoints. Upgrades SSE client retries/timeouts and S2 writer with size limits and completion result. Adds AGENT trigger source, propagates taskKind through runs, filters, and ClickHouse. Updates repositories, path builders, and rate limits. Includes DB/ClickHouse migrations and extensive tests.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/sessions-primitive

Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 potential issue.

View 6 additional findings in Devin Review.

Open in Devin Review

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 getTaskQueueInfo early-return path omits taskKind, misclassifying AGENT/SCHEDULED runs as STANDARD

When getTaskQueueInfo is called for a non-locked-worker trigger that provides both a queue override and a per-trigger TTL, the early return at line 219 skips the DB query entirely and returns { queueName, taskTtl: undefined } without taskKind. The caller in triggerTask.server.ts:316 then falls through to taskKind ?? "STANDARD", permanently stamping AGENT or SCHEDULED runs as STANDARD in the ClickHouse task_kind column.

This directly contradicts the PR's own intent — the comment block at queues.server.ts:110-116 on the locked-worker path explicitly says "Always fetch the task so we can resolve triggerSource" to avoid exactly this bug. But the non-locked-worker path has the same early-return optimization from before the PR that was never updated to also fetch triggerSource.

Affected scenario: any AGENT task triggered via the API/SDK with queue: { name: "custom" } and ttl: "5m" (both set). The run will appear as "Standard" in the dashboard's Source filter and won't match a sources=["AGENT"] filter.

(Refers to lines 218-219)

Prompt for agents
In `getTaskQueueInfo` at apps/webapp/app/runEngine/concerns/queues.server.ts, the early-return at line 218-219 fires when both `overriddenQueueName` and `body.options?.ttl` are set and skips the DB query entirely — returning no `taskKind`. This means AGENT and SCHEDULED runs hitting this path get misclassified as STANDARD in ClickHouse.

The fix should either:
1. Remove this early-return optimization entirely (simplest — the subsequent DB queries are cheap and already cached by the replica). The locked-worker path at line 110-129 already does this: it always fetches the task row.
2. Or keep the optimization but still query `backgroundWorkerTask.findFirst` for just `triggerSource` so `taskKind` is populated. This is what the locked-worker path does at lines 117-129.

The same issue affects the `no worker found` fallback at line 231 (`return { queueName: overriddenQueueName ?? defaultQueueName, taskTtl: undefined }`) and the `no task found` fallback at line 265 — though those are edge cases where there's no task row to look up. Still, the line-218 path is the common case and must be fixed.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

🟡 Minor comments (11)
packages/core/src/v3/test/test-realtime-streams-manager.ts-158-161 (1)

158-161: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

reset() leaks writeListeners across tests.

reset() clears buffers and pipeWaits but leaves writeListeners intact. For a manager whose primary purpose is per-test isolation, any listener registered in one test (and not explicitly unsubscribed) will continue to fire for writes in subsequent tests that share this instance, leading to flaky/cross-talking tests.

🧹 Proposed fix
   reset(): void {
     this.buffers.clear();
     this.pipeWaits.clear();
+    this.writeListeners.clear();
   }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/test/test-realtime-streams-manager.ts` around lines 158
- 161, The reset() method currently clears buffers and pipeWaits but leaves
writeListeners populated, causing listeners to leak between tests; update the
reset() implementation (the reset() method in the test realtime streams manager)
to also remove all registered write listeners (e.g., clear or reinitialize the
writeListeners collection) so any callbacks registered via writeListeners are
unsubscribed/cleared between tests to ensure per-test isolation.
packages/core/src/v3/apiClient/runStream.ts-480-491 (1)

480-491: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Abort during backoff is not wakeable

The retry delay promise only listens to retryNowController. If caller signal aborts during backoff, shutdown can be delayed until the timer expires.

Suggested fix
     this.retryNowController = new AbortController();
     await new Promise<void>((resolve) => {
+      if (this.options.signal?.aborted) {
+        resolve();
+        return;
+      }
       const timer = setTimeout(() => {
         this.retryNowController?.signal.removeEventListener("abort", onAbort);
+        this.options.signal?.removeEventListener("abort", onUserAbort);
         resolve();
       }, delay);
       const onAbort = () => {
         clearTimeout(timer);
+        this.options.signal?.removeEventListener("abort", onUserAbort);
+        resolve();
+      };
+      const onUserAbort = () => {
+        clearTimeout(timer);
+        this.retryNowController?.signal.removeEventListener("abort", onAbort);
         resolve();
       };
       this.retryNowController!.signal.addEventListener("abort", onAbort, { once: true });
+      this.options.signal?.addEventListener("abort", onUserAbort, { once: true });
     });
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/apiClient/runStream.ts` around lines 480 - 491, The
backoff promise only listens to this.retryNowController and ignores the caller's
abort signal, causing shutdown to wait for the timer; update the wait logic in
runStream.ts to also listen to the external/caller signal (e.g., this.signal or
the passed-in signal) alongside this.retryNowController by adding an event
listener on that signal which clears the timeout, resolves the promise, and
removes both listeners (mirror the existing onAbort cleanup for
retryNowController.signal and the timer) to ensure immediate wake-up on caller
abort while preventing listener leaks.
packages/core/src/v3/inputStreams/manager.ts-181-188 (1)

181-188: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

disconnectStream may be silently undone by auto-reconnect when handlers or waiters still exist.

The tail's .finally block auto-reconnects whenever this.handlers or this.onceWaiters still have entries for streamId. If any .on() handler or pending .once() waiter is active when disconnectStream runs, the abort will complete but the tail immediately reconnects—negating the contract: "disconnect before .wait() suspends so the tail doesn't buffer duplicates delivered through the waitpoint path."

Add an explicitlyDisconnected flag (as already implemented in sessionStreams/manager.ts) to prevent auto-reconnect after intentional disconnect. Mark the stream before abort, check the flag in the .finally reconnect branch, and clear it on the next on()/once() call.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/inputStreams/manager.ts` around lines 181 - 188, The
disconnectStream implementation currently aborts and removes the tail but can be
immediately undone by the tail's .finally auto-reconnect if handlers or
onceWaiters still reference streamId; add an explicitlyDisconnected flag (same
pattern used in sessionStreams/manager.ts) to the manager: set
explicitlyDisconnected[streamId] = true before calling
tail.abortController.abort() in disconnectStream, check explicitlyDisconnected
inside the tail's .finally reconnect branch to skip reconnect when true, and
ensure explicitlyDisconnected[streamId] is cleared when a new .on() or .once()
call for that streamId occurs so reconnects resume normally.
packages/core/src/v3/test/test-session-stream-manager.ts-215-247 (1)

215-247: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Handler invocation in __sendFromTest doesn't use the safe invoke helper — and the helper itself is dead.

__sendFromTest calls handlers directly via Array.from(handlers).map((h) => Promise.resolve().then(() => h(data))) (Line 225). A thrown/rejected handler will reject the Promise.all and propagate out of __sendFromTest, which is the opposite of the comment on Line 278 ("Never let a handler error break test state"). Meanwhile, the invoke private method defined on Lines 270-279 (which does exactly that catch) is never referenced anywhere in the file — dead code.

Either drop invoke, or route handler dispatch through it so a misbehaving handler doesn't blow up the test harness.

🛡️ Suggested fix
     const handlers = this.handlers.get(key);
     if (handlers && handlers.size > 0) {
-      await Promise.all(
-        Array.from(handlers).map((h) => Promise.resolve().then(() => h(data)))
-      );
+      await Promise.all(
+        Array.from(handlers).map(async (h) => {
+          try {
+            await h(data);
+          } catch {
+            // Never let a handler error break test state
+          }
+        })
+      );
     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/test/test-session-stream-manager.ts` around lines 215 -
247, The handler loop in __sendFromTest currently calls handlers directly with
Promise.all(Array.from(handlers).map((h) => Promise.resolve().then(() =>
h(data)))) so a thrown/rejected handler will reject the whole send; instead
reuse the existing private invoke helper (or restore its implementation) to
swallow or handle handler errors as intended. Replace the direct handler
invocation in __sendFromTest to call this.invoke(h, data) for each handler
(i.e., Promise.resolve().then(() => this.invoke(h, data)) or map to this.invoke)
so individual handler failures are caught and do not break test state; if invoke
is currently dead/incorrect, fix its implementation to catch errors (and
optionally log) and return a resolved promise. Ensure the change references the
handlers map and the invoke method so behavior is consistent with the "Never let
a handler error break test state" comment.
packages/core/src/v3/sessionStreams/manager.ts-121-145 (1)

121-145: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Abort listener and timeout handle leak on normal once() resolution.

When a once() waiter resolves via #dispatch (Line 359), neither the signal's abort listener nor the unused timeout handle are removed/cleared. The abort listener is registered with { once: true }, so it self-clears if the signal eventually fires, but in the common case where the signal outlives this once() (e.g., a long-lived run-level AbortSignal reused across many once() calls), the listener — which captures the waiter and this — accumulates on the signal until it finally aborts. #dispatch should also clear the timeout handle (it currently only clears it on Line 358 for the leading waiter via clearTimeout) — wait, it does — but it never invokes signal.removeEventListener.

Compare with TestSessionStreamManager which keeps abortHandler on the waiter and removes it on dispatch (packages/core/src/v3/test/test-session-stream-manager.ts, Lines 234-236).

🛡️ Suggested fix
 type OnceWaiter = {
   resolve: (result: InputStreamOnceResult<unknown>) => void;
   reject: (error: Error) => void;
   timeoutHandle?: ReturnType<typeof setTimeout>;
+  signal?: AbortSignal;
+  abortHandler?: () => void;
 };
     return new InputStreamOncePromise<unknown>((resolve, reject) => {
       const waiter: OnceWaiter = { resolve, reject };

       if (options?.signal) {
         if (options.signal.aborted) {
           reject(new Error("Aborted"));
           return;
         }
-        options.signal.addEventListener(
-          "abort",
-          () => {
-            if (waiter.timeoutHandle) clearTimeout(waiter.timeoutHandle);
-            this.#removeOnceWaiter(key, waiter);
-            reject(new Error("Aborted"));
-          },
-          { once: true }
-        );
+        const abortHandler = () => {
+          if (waiter.timeoutHandle) clearTimeout(waiter.timeoutHandle);
+          this.#removeOnceWaiter(key, waiter);
+          reject(new Error("Aborted"));
+        };
+        waiter.signal = options.signal;
+        waiter.abortHandler = abortHandler;
+        options.signal.addEventListener("abort", abortHandler, { once: true });
       }

And in #dispatch (and the buffered-shift path) clear the listener when the waiter is resolved:

       const waiter = waiters.shift()!;
       if (waiters.length === 0) this.onceWaiters.delete(key);
       if (waiter.timeoutHandle) clearTimeout(waiter.timeoutHandle);
+      if (waiter.signal && waiter.abortHandler) {
+        waiter.signal.removeEventListener("abort", waiter.abortHandler);
+      }
       waiter.resolve({ ok: true, output: data });
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/sessionStreams/manager.ts` around lines 121 - 145, The
once() waiter installs an abort listener and timeout but does not remove the
abort listener when the waiter is resolved via `#dispatch`, causing
listener/closure leaks; update once() to store the abort handler on the waiter
(e.g., waiter.abortHandler) when adding options.signal.addEventListener, and in
`#dispatch` (and any buffered-shift resolution path) call
options.signal.removeEventListener("abort", waiter.abortHandler) and
clearTimeout(waiter.timeoutHandle) when resolving the waiter so both the
listener and timeout are cleaned up; reference the waiter object used in once()
and the `#dispatch` method to apply these removals.
apps/webapp/app/services/runsRepository/runsRepository.server.ts-45-46 (1)

45-46: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Constrain taskKinds to known values.

Line 45 currently accepts any string, so invalid values can silently flow into filtering and produce confusing empty results. Tightening this to known task kinds improves input correctness.

Suggested diff
 const RunStatus = z.enum(Object.values(TaskRunStatus) as [TaskRunStatus, ...TaskRunStatus[]]);
+const TaskKind = z.enum(["TASK", "AGENT", "SCHEDULED"]);

 const RunListInputOptionsSchema = z.object({
@@
-  taskKinds: z.array(z.string()).optional(),
+  taskKinds: z.array(TaskKind).optional(),
 });
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/webapp/app/services/runsRepository/runsRepository.server.ts` around
lines 45 - 46, The taskKinds schema currently allows any string (taskKinds:
z.array(z.string()).optional()), which lets invalid values pass; change it to
constrain to the known task kinds (e.g., replace z.string() with z.enum([...])
or z.nativeEnum(TaskKind) referencing your central TaskKind enum) so only valid
task kind values are accepted; update imports to bring in the TaskKind enum or
explicitly list allowed strings and adjust any callers/types if needed.
apps/webapp/app/routes/resources.sessions.$sessionParam.close.ts-10-14 (1)

10-14: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Reuse the shared close-session schema here.

This local schema has already drifted from the public contract: it accepts arbitrarily long reason strings, while CloseSessionRequestBody caps them at 256 chars. Import the shared schema (or at least mirror its limit) so the dashboard and API reject the same payloads.

As per coding guidelines: apps/webapp/**/*.{ts,tsx} should use subpath exports from @trigger.dev/core package instead of importing from the root @trigger.dev/core path.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/webapp/app/routes/resources.sessions`.$sessionParam.close.ts around
lines 10 - 14, The local closeSessionSchema has drifted from the shared
CloseSessionRequestBody contract (it allows unlimited reason length); update
closeSessionSchema to match the shared schema by either importing the shared
CloseSessionRequestBody schema from the `@trigger.dev/core` subpath export and
using it directly, or at minimum constrain reason to z.string().max(256). Ensure
the import uses a subpath export from `@trigger.dev/core` (e.g. import {
CloseSessionRequestBody } from '@trigger.dev/core/…') rather than importing from
the package root, and replace or remove the local schema definition accordingly.
apps/webapp/app/routes/realtime.v1.sessions.$session.$io.records.ts-18-23 (1)

18-23: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Reject oversized cursors before coercing them to Number.

afterEventId currently accepts any digit string via z.string().regex(/^\d+$/), but line 74 coerces it unsafely with Number(searchParams.afterEventId). Long inputs can become Infinity or lose precision, causing the endpoint to skip/duplicate records or pass an invalid cursor downstream. Tighten SearchSchema to validate finite safe integers before conversion.

Suggested fix
 const SearchSchema = z.object({
   // S2 sequence number — same cursor format as the SSE Last-Event-ID
   // (the SSE `id:` field on session-channel events is the seq_num,
   // stringified). Records returned have `seqNum > afterEventId`.
-  afterEventId: z.string().regex(/^\d+$/).optional(),
+  afterEventId: z
+    .string()
+    .regex(/^\d+$/)
+    .refine((value) => Number.isSafeInteger(Number(value)), {
+      message: "afterEventId must be a safe integer",
+    })
+    .optional(),
 });
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/webapp/app/routes/realtime.v1.sessions`.$session.$io.records.ts around
lines 18 - 23, SearchSchema currently allows any digit string for afterEventId
but the code later does Number(searchParams.afterEventId) (see usage of Number
in the handler), which can produce Infinity or lose precision for oversized
values; update SearchSchema to reject values outside JavaScript safe integer
range (or use a z.preprocess that coerces the string to a number and validates
Number.isFinite() && Number.isSafeInteger() and >= 0) so only finite safe
non-negative integers are accepted before conversion, then replace the unsafe
Number(...) usage with the validated numeric value from the parsed schema (keep
references to SearchSchema and afterEventId and the Number(...) conversion to
locate the change).
apps/webapp/app/components/runs/v3/TaskRunsTable.tsx-357-361 (1)

357-361: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Remove the unsafe type cast for taskKind.

The taskKind field is typed as TaskKind, which allows arbitrary strings via .or(anyString) in the Zod schema, while TaskTriggerSource is limited to three specific enum values: STANDARD, SCHEDULED, and AGENT. The cast bypasses type safety and could pass invalid values to TaskTriggerSourceIcon. Either narrow the TaskKind schema to exclude arbitrary strings, or validate the value before casting.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/webapp/app/components/runs/v3/TaskRunsTable.tsx` around lines 357 - 361,
The code unsafely casts run.taskKind to TaskTriggerSource before passing it to
TaskTriggerSourceIcon; instead validate or narrow the value: check that
run.taskKind is one of the TaskTriggerSource enum values (e.g., compare against
Object.values(TaskTriggerSource) or use a helper isTaskTriggerSource) and only
pass it when valid, otherwise pass a safe fallback (or undefined) or render a
default icon; alternatively tighten the TaskKind schema to only allow the
TaskTriggerSource union so no cast is needed. Ensure references:
TaskRunsTable.tsx, run.taskKind, TaskTriggerSource, TaskTriggerSourceIcon, and
TaskKind.
apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam.realtime.v1.$io.ts-61-69 (1)

61-69: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Reject zero and partially-numeric Timeout-Seconds values.

Line 63 uses parseInt(), so values like "10foo" are accepted, and Line 66’s truthy guard lets "0" through even though this route documents a 1..600 range.

Suggested fix
-  const timeoutInSecondsRaw = request.headers.get("Timeout-Seconds") ?? undefined;
-  const timeoutInSeconds = timeoutInSecondsRaw ? parseInt(timeoutInSecondsRaw) : undefined;
+  const timeoutInSecondsRaw = request.headers.get("Timeout-Seconds") ?? undefined;
+  const timeoutInSeconds =
+    timeoutInSecondsRaw === undefined ? undefined : Number(timeoutInSecondsRaw);

-  if (
-    timeoutInSeconds &&
-    (isNaN(timeoutInSeconds) || timeoutInSeconds < 1 || timeoutInSeconds > 600)
-  ) {
+  if (
+    timeoutInSeconds !== undefined &&
+    (!Number.isInteger(timeoutInSeconds) || timeoutInSeconds < 1 || timeoutInSeconds > 600)
+  ) {
     return new Response("Invalid timeout", { status: 400 });
   }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@apps/webapp/app/routes/resources.orgs`.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam.realtime.v1.$io.ts
around lines 61 - 69, The timeout parsing currently uses parseInt on
timeoutInSecondsRaw (from the "Timeout-Seconds" header) which accepts
partially-numeric strings like "10foo" and treats "0" as falsy; change the
validation in the timeoutInSeconds calculation (and the subsequent if) to first
verify the header matches a pure integer regex (e.g. /^\d+$/) before parsing,
then parse to a number (Number or parseInt) and explicitly reject 0 and values
outside 1..600; update the checks around timeoutInSecondsRaw/timeoutInSeconds
and the error return to ensure partially-numeric and "0" values return 400.
apps/webapp/app/components/runs/v3/RunFilters.tsx-194-196 (1)

194-196: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Validate sources against the supported trigger-source set.

Right now sources accepts any non-empty string, so ?sources=foo survives parsing, shows up as an applied filter, and gets passed downstream even though this UI only supports STANDARD, SCHEDULED, and AGENT.

Suggested fix
+const RunSource = z.enum(["STANDARD", "SCHEDULED", "AGENT"]);
+
 export const TaskRunListSearchFilters = z.object({
   cursor: z.string().optional().describe("Cursor for pagination - used internally for navigation"),
   direction: z
     .enum(["forward", "backward"])
     .optional()
@@
-  sources: StringOrStringArray.describe(
+  sources: z
+    .preprocess((value) => {
+      if (typeof value === "string") {
+        return value.length > 0 ? [value] : undefined;
+      }
+
+      if (Array.isArray(value)) {
+        return value.filter((v) => typeof v === "string" && v.length > 0);
+      }
+
+      return undefined;
+    }, RunSource.array().optional())
+    .describe(
     "Task trigger sources to filter by (STANDARD, SCHEDULED, AGENT)"
-  ),
+  ),
 });

As per coding guidelines, {packages/core,apps/webapp}/**/*.{ts,tsx}: Use zod for validation in packages/core and apps/webapp.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/webapp/app/components/runs/v3/RunFilters.tsx` around lines 194 - 196,
Replace the loose StringOrStringArray.describe for the "sources" field with a
zod-based schema that only allows the supported trigger sources: "STANDARD",
"SCHEDULED", "AGENT". Specifically, change the "sources" schema in
RunFilters.tsx to use zod.enum (or z.union of z.string and z.array(z.enum(...))
if you need single-or-array behavior) instead of StringOrStringArray.describe,
and ensure parsing/validation uses that zod schema so queries like ?sources=foo
are rejected/normalized before becoming an applied filter or passed downstream.
🧹 Nitpick comments (13)
packages/core/src/v3/test/test-realtime-streams-manager.ts (1)

22-22: 💤 Low value

pipeWaits is write-only dead state.

pipeWaits is populated in pipe() (lines 83-84) and cleared in reset() (line 160), but nothing ever reads from it. Either remove it, or expose the waitAll(key?) helper this state seems to anticipate (useful for tests that fan out multiple pipe() calls and want to await them collectively without holding each returned instance).

♻️ Option A — drop the unused field
-  private pipeWaits = new Map<string, Promise<void>[]>();
   private writeListeners = new Set<WriteListener>();
-    if (!this.pipeWaits.has(key)) this.pipeWaits.set(key, []);
-    this.pipeWaits.get(key)!.push(done);
-
     return {
   reset(): void {
     this.buffers.clear();
-    this.pipeWaits.clear();
   }

Option B — keep it and add a __waitAllFromTest(key?) helper that awaits the tracked done promises so tests can drain before assertions.

Also applies to: 83-84, 160-160

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/test/test-realtime-streams-manager.ts` at line 22, The
pipeWaits Map field is never read (populated in pipe() and cleared in reset())
so either remove pipeWaits entirely or implement a test-facing waiter so it’s
useful; to fix, choose one: (A) delete the private pipeWaits property and any
code that pushes into it inside pipe() and reset(), or (B) keep pipeWaits and
add a method like __waitAllFromTest(key?: string) / waitAll(key?: string) that
returns Promise<void> which awaits Promise.all(this.pipeWaits.get(key) || [])
and clears the stored array, and update reset() to use that helper to drain
entries; reference the pipeWaits field, the pipe() method where entries are
added, and the reset() method where they’re cleared when making the change.
packages/core/src/v3/test/test-input-stream-manager.ts (1)

117-121: ⚡ Quick win

Test manager's shiftBuffer/disconnectStream diverge from real manager semantics.

shiftBuffer always returns false and disconnectStream is a complete no-op, but the real StandardInputStreamManager shifts/clears the buffered head used by once(). Since pendingSends plays exactly that role in this test fixture (drained by once() at lines 62–68), tests that exercise .wait()-style flows (drop-the-duplicate / disconnect-before-suspend) will not observe correct behavior here.

Aligning the stubs would make tests faithfully reproduce production semantics.

♻️ Suggested alignment
-  shiftBuffer(_streamId: string): boolean {
-    return false;
-  }
-
-  disconnectStream(_streamId: string): void {}
+  shiftBuffer(streamId: string): boolean {
+    const buffered = this.pendingSends.get(streamId);
+    if (buffered && buffered.length > 0) {
+      buffered.shift();
+      if (buffered.length === 0) this.pendingSends.delete(streamId);
+      return true;
+    }
+    return false;
+  }
+
+  disconnectStream(streamId: string): void {
+    this.pendingSends.delete(streamId);
+  }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/test/test-input-stream-manager.ts` around lines 117 -
121, The test fixture's shiftBuffer and disconnectStream do not mimic production
semantics: update shiftBuffer(streamId) to remove/shift the buffered head from
the pendingSends structure for that stream and return true when it actually
removed an entry (false otherwise), and update disconnectStream(streamId) to
clear any buffered entries in pendingSends for that stream (and/or call
shiftBuffer repeatedly) so that the once() consumer behavior that drains
pendingSends is reproduced; reference the shiftBuffer and disconnectStream
methods and the pendingSends collection and once() consumer when making the
changes.
packages/core/src/v3/sessionStreams/types.ts (1)

24-76: 💤 Low value

Prefer type alias over interface for the new SessionStreamManager declaration.

Per the repository's TypeScript guideline, types are preferred over interfaces. Since this is a brand-new declaration in a new file, it's a good opportunity to follow the guideline (the pre-existing InputStreamManager interface can be migrated separately).

♻️ Proposed change
-export interface SessionStreamManager {
-  /** Register a handler that fires every time data arrives on the given channel. */
-  on(
-    sessionId: string,
-    io: SessionChannelIO,
-    handler: (data: unknown) => void | Promise<void>
-  ): { off: () => void };
-  ...
-}
+export type SessionStreamManager = {
+  /** Register a handler that fires every time data arrives on the given channel. */
+  on(
+    sessionId: string,
+    io: SessionChannelIO,
+    handler: (data: unknown) => void | Promise<void>
+  ): { off: () => void };
+  ...
+};

As per coding guidelines: "Use types over interfaces for TypeScript".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/sessionStreams/types.ts` around lines 24 - 76, The
declaration for SessionStreamManager should be converted from an interface to a
type alias per repository TypeScript guidelines: replace the "interface
SessionStreamManager { ... }" with "type SessionStreamManager = { ... }"
preserving all member signatures (on, once, peek, lastSeqNum, setLastSeqNum,
setMinTimestamp, shiftBuffer, disconnectStream, clearHandlers, reset,
disconnect) and exported name; no API or runtime behavior should change, just
the syntactic form (leave InputStreamManager untouched for separate migration).
packages/core/src/v3/sessionStreams/manager.ts (1)

137-145: 💤 Low value

timeoutMs: 0 silently disables the timeout.

The truthy check on Line 137 treats timeoutMs: 0 as "no timeout", but TestSessionStreamManager.once (test-session-stream-manager.ts Line 102) uses options?.timeoutMs !== undefined and would arm a 0ms timer that resolves immediately. The two implementations therefore disagree on 0. If 0 is not a meaningful "fire immediately" value here, leave a comment; otherwise switch to an explicit !== undefined check for parity.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/sessionStreams/manager.ts` around lines 137 - 145, The
current truthy check in the once waiter (where waiter.timeoutHandle is set)
treats timeoutMs: 0 as unset and thus disables the timer; align behavior with
TestSessionStreamManager.once by checking explicitly for undefined (use
options.timeoutMs !== undefined) so a 0ms timeout is honored; update the
condition around waiter.timeoutHandle and ensure the timeout callback still uses
options.timeoutMs (and constructs the InputStreamTimeoutError(key,
options.timeoutMs!)) and leaves `#removeOnceWaiter` and resolve logic unchanged.
packages/core/src/v3/test/test-session-stream-manager.ts (2)

144-146: 💤 Low value

setLastSeqNum is unconditionally monotonic in production but not here.

StandardSessionStreamManager.setLastSeqNum only advances the stored seqNum when seqNum > current (packages/core/src/v3/sessionStreams/manager.ts Lines 168-171). The test manager overwrites unconditionally, which can mask a class of regressions where production-side regression is the whole point of the test (e.g., resume-after-reconnect with out-of-order acks). Consider matching production semantics unless there's a specific reason to diverge.

♻️ Suggested fix
   setLastSeqNum(sessionId: string, io: SessionChannelIO, seqNum: number): void {
-    this.seqNums.set(keyFor(sessionId, io), seqNum);
+    const key = keyFor(sessionId, io);
+    const current = this.seqNums.get(key);
+    if (current === undefined || seqNum > current) {
+      this.seqNums.set(key, seqNum);
+    }
   }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/test/test-session-stream-manager.ts` around lines 144 -
146, The test implementation of setLastSeqNum currently overwrites seqNums
unconditionally; change it to match production behavior in
StandardSessionStreamManager by only updating this.seqNums for keyFor(sessionId,
io) when the incoming seqNum is greater than the existing value (i.e., retrieve
current via this.seqNums.get(...), compare and only call this.seqNums.set(...)
if seqNum > current), preserving monotonic advancement semantics used for
resume-after-reconnect/out-of-order ack tests.

197-214: 💤 Low value

Docstring misstates production behavior.

The comment on Lines 207-213 says "Production discards records that only match handlers — but in production the SSE tail introduces enough latency that the next .once is usually registered before the next record arrives." Looking at StandardSessionStreamManager.#dispatch (packages/core/src/v3/sessionStreams/manager.ts Lines 364-378), production also buffers the record when there's no waiter (handlers fire and buffer is appended). The test manager's behavior actually mirrors production here; the only deviation is that production buffers from onPart, not from a test-driver entry point. Update the comment so future readers don't get a misleading mental model.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/test/test-session-stream-manager.ts` around lines 197 -
214, The docstring in test-session-stream-manager.ts incorrectly claims
production discards records that only match handlers; update the comment to
reflect that StandardSessionStreamManager.#dispatch also appends to the buffer
when no waiter is present (handlers fire and the record is buffered), and
clarify that the real difference is where buffering is triggered (production
buffers from onPart while the test manager buffers from the test-driver entry
point). Edit the block describing dispatch rules (the paragraph starting with
"If no waiter, the record is buffered...") to state this corrected behavior and
mention the actual deviation between production and tests (buffer origin),
referencing the TestSessionStreamManager and
StandardSessionStreamManager.#dispatch to guide readers.
packages/core/src/v3/sessionStreams/manager.test.ts (2)

101-134: 💤 Low value

Test name vs. assertion is slightly off; consider tightening.

The test claims it verifies that a filter on "in" doesn't bleed into "out", but the first manager only ever asserts on "out" and never proves the "in" filter on that same manager (Line 113 receives { kind: "in-record" }, which the singleShot mock delivers to whichever io subscribes first). The actual filter assertion uses a second manager (manager2) with separate state (Lines 120-127). The proof you want — same manager, same session, different io, only one is filtered — isn't actually exercised. Consider building one mock that routes records per io (e.g., emit different fixtures on different _io calls) so a single manager can demonstrate the per-io isolation end-to-end.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/sessionStreams/manager.test.ts` around lines 101 - 134,
The test currently proves filtering on "in" using a separate manager (manager2)
instead of demonstrating per-(sessionId, io) isolation on the same
StandardSessionStreamManager; update the test to use a single
StandardSessionStreamManager and a single mock API that emits different fixture
records depending on the requested _io so that the same manager subscribes to
both "in" and "out" and you can call setMinTimestamp(sessionId, "in", 5000) then
assert once(sessionId, "out") returns the expected record while once(sessionId,
"in") times out; modify or replace singleShotApiClient to inspect the _io
argument and return distinct records for "in" vs "out", then keep using
manager.disconnectStream/manager.disconnect to clean up.

13-44: 💤 Low value

Mock cast bypasses ApiClient type checking.

as unknown as ApiClient (Line 43) silently absorbs any breaking change to the ApiClient shape, and the inner cast on Line 41 papers over the async-generator vs AsyncIterableStream mismatch. If subscribeToSessionStream ever gains required surface (or its return type changes), these tests will keep compiling while the manager breaks at runtime against the real client. Consider typing the partial mock as Pick<ApiClient, "subscribeToSessionStream"> and avoiding the outer double-cast — the manager only depends on that one method.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/sessionStreams/manager.test.ts` around lines 13 - 44,
The mock currently double-casts to ApiClient (singleShotApiClient) which hides
shape mismatches; change the mock's type to return Pick<ApiClient,
"subscribeToSessionStream"> instead of ApiClient and remove the outer `as
unknown as ApiClient` cast; ensure the subscribeToSessionStream method signature
and its returned async-iterable value match ApiClient's actual return type (use
ReturnType<ApiClient["subscribeToSessionStream"]> / Awaited<...> for the
function return) so you only need to satisfy the single method the manager uses
(subscribeToSessionStream) and avoid masking future API shape changes; keep
SSEStreamPart usage and the delivered logic intact.
packages/core/src/v3/realtimeStreams/streamsWriterV2.test.ts (1)

5-16: ⚡ Quick win

Assert lastEventId on the success path.

The new contract here is StreamWriteResult, but this test only proves that wait() resolves. If wait() regresses back to {} or undefined, this suite still passes. Seed lastAckedPosition() with a known seqNum and assert the returned { lastEventId }.

Also applies to: 132-148

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/realtimeStreams/streamsWriterV2.test.ts` around lines 5
- 16, The test currently stubs lastAckedPosition and appendSession but never
asserts the StreamWriteResult.lastEventId; update the mock for lastAckedPosition
(e.g., have lastAckedPosition return a known seqNum/object) and add an assertion
after calling wait() that the resolved value includes lastEventId equal to that
known seqNum; apply the same change to the second occurrence referenced (the
block around lines 132-148) so both success-path tests verify lastEventId from
lastAckedPosition via the appendSession mock and wait() result.
packages/core/src/v3/realtimeStreams/sessionStreamInstance.ts (2)

45-50: 💤 Low value

Best practice: Specify radix parameter for parseInt.

Lines 46 and 49 call parseInt without the radix parameter. While the header values should be base-10 strings, explicitly passing 10 as the radix prevents potential issues and follows best practices.

♻️ Add radix parameter
     const flushIntervalMs = headers["x-s2-flush-interval-ms"]
-      ? parseInt(headers["x-s2-flush-interval-ms"])
+      ? parseInt(headers["x-s2-flush-interval-ms"], 10)
       : undefined;
     const maxRetries = headers["x-s2-max-retries"]
-      ? parseInt(headers["x-s2-max-retries"])
+      ? parseInt(headers["x-s2-max-retries"], 10)
       : undefined;
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/realtimeStreams/sessionStreamInstance.ts` around lines
45 - 50, The parseInt calls used to parse headers["x-s2-flush-interval-ms"] and
headers["x-s2-max-retries"] (producing flushIntervalMs and maxRetries) should
specify a radix to avoid ambiguous parsing; update those parseInt invocations to
pass 10 as the second argument (e.g., parseInt(..., 10)) so the header strings
are parsed explicitly as base-10 integers.

29-38: 💤 Low value

Optional: Remove redundant optional chaining.

Line 37 uses this.options?.requestOptions, but options is a required constructor parameter and cannot be undefined. The ?. operator can be simplified to ..

♻️ Simplify optional chaining
     const response = await this.options.apiClient.initializeSessionStream(
       this.options.sessionId,
       this.options.io,
-      this.options?.requestOptions
+      this.options.requestOptions
     );
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/realtimeStreams/sessionStreamInstance.ts` around lines
29 - 38, The code uses redundant optional chaining for requestOptions in
initializeWriter; since options is required on the SessionStreamInstance
constructor (type SessionStreamInstanceOptions<T>), replace
this.options?.requestOptions with this.options.requestOptions inside the
initializeWriter method (the call to
this.options.apiClient.initializeSessionStream) and remove the unnecessary ?. to
simplify the expression.
apps/webapp/app/presenters/v3/SessionPresenter.server.ts (1)

99-109: 💤 Low value

Optional: Consider extracting currentRun lookup for clarity.

The ternary and nullish coalescing on lines 99-108 are correct but require a non-null assertion on line 105. While this is safe (the outer ternary ensures session.currentRunId is truthy), extracting this logic into a helper or using an if-statement could improve readability.

♻️ Alternative: Extract to helper or use if-statement
-    const currentRun = session.currentRunId
-      ? runsById.get(session.currentRunId) ??
-        (await startActiveSpan(
-          "SessionPresenter.findCurrentRunFallback",
-          () =>
-            this.replica.taskRun.findFirst({
-              where: { id: session.currentRunId! },
-              select: { id: true, friendlyId: true, status: true },
-            })
-        ))
-      : null;
+    let currentRun = null;
+    if (session.currentRunId) {
+      currentRun = runsById.get(session.currentRunId);
+      if (!currentRun) {
+        currentRun = await startActiveSpan(
+          "SessionPresenter.findCurrentRunFallback",
+          () =>
+            this.replica.taskRun.findFirst({
+              where: { id: session.currentRunId },
+              select: { id: true, friendlyId: true, status: true },
+            })
+        );
+      }
+    }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/webapp/app/presenters/v3/SessionPresenter.server.ts` around lines 99 -
109, The currentRun lookup uses a nested ternary/nullish coalescing with a
non-null assertion on session.currentRunId; extract this into a small helper or
an if-statement to improve readability and remove the need for the assertion.
Specifically, replace the inline expression that builds currentRun (which
references session.currentRunId, runsById,
startActiveSpan("SessionPresenter.findCurrentRunFallback", ...), and
this.replica.taskRun.findFirst) with a clear helper function (e.g.,
findCurrentRun) or an explicit if-block that: 1) checks if session.currentRunId
is set, 2) returns runsById.get(session.currentRunId) if present, and 3)
otherwise calls startActiveSpan(...)/this.replica.taskRun.findFirst to fetch and
return the run; update the currentRun assignment to call that helper or use the
if-block so no non-null assertion is required.
apps/webapp/app/v3/services/createBackgroundWorker.server.ts (1)

348-348: ⚡ Quick win

The as any cast bypasses type safety.

Casting task.agentConfig as any removes compile-time guarantees that the config is JSON-serializable. While this may be necessary for Prisma's Json type, consider using a Zod schema to validate the config structure before persistence, or at minimum add a comment explaining why the cast is safe.

🛡️ Safer alternative with Zod validation

As per coding guidelines, use Zod for validation in apps/webapp:

+import { z } from "zod";
+
+const AgentConfigSchema = z.record(z.unknown()).optional();
+
 async function createWorkerTask(
   task: TaskResource,
   ...
 ) {
   ...
+  const validatedConfig = task.agentConfig 
+    ? AgentConfigSchema.parse(task.agentConfig) 
+    : undefined;
+
   await prisma.backgroundWorkerTask.create({
     data: {
       ...
-      config: task.agentConfig ? (task.agentConfig as any) : undefined,
+      config: validatedConfig as any,
     },
   });
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/webapp/app/v3/services/createBackgroundWorker.server.ts` at line 348,
The cast "task.agentConfig as any" in the createBackgroundWorker flow removes
type safety and should be replaced by runtime validation: define a Zod schema
(e.g., AgentConfigSchema) that matches the expected JSON-serializable shape,
call AgentConfigSchema.parse or safeParse on task.agentConfig inside the
function that builds the Prisma payload (the spot assigning config), and use the
validated value (or undefined) instead of the as any cast; if validation fails,
handle the error (reject/create a clear log and abort persisting) or default to
undefined. If a Zod schema cannot be introduced now, add a clear comment
explaining why the cast is safe and what invariants guarantee
JSON-serializability, referencing the config assignment (config:
task.agentConfig ...) so future reviewers know why it's exempt.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 9a6d7e41-895b-4cd9-874d-48dd9f05eb37

📥 Commits

Reviewing files that changed from the base of the PR and between 6cdd881 and b84d537.

📒 Files selected for processing (88)
  • .changeset/sessions-primitive.md
  • .gitignore
  • CLAUDE.md
  • apps/webapp/app/components/BulkActionFilterSummary.tsx
  • apps/webapp/app/components/runs/v3/RunFilters.tsx
  • apps/webapp/app/components/runs/v3/TaskRunsTable.tsx
  • apps/webapp/app/components/runs/v3/TaskTriggerSource.tsx
  • apps/webapp/app/components/sessions/v1/CloseSessionDialog.tsx
  • apps/webapp/app/components/sessions/v1/SessionFilters.tsx
  • apps/webapp/app/components/sessions/v1/SessionStatus.tsx
  • apps/webapp/app/components/sessions/v1/SessionsTable.tsx
  • apps/webapp/app/presenters/RunFilters.server.ts
  • apps/webapp/app/presenters/SessionFilters.server.ts
  • apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts
  • apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts
  • apps/webapp/app/presenters/v3/SessionListPresenter.server.ts
  • apps/webapp/app/presenters/v3/SessionPresenter.server.ts
  • apps/webapp/app/presenters/v3/TaskListPresenter.server.ts
  • apps/webapp/app/presenters/v3/TestPresenter.server.ts
  • apps/webapp/app/presenters/v3/TestTaskPresenter.server.ts
  • apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam/route.tsx
  • apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions._index/route.tsx
  • apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions/route.tsx
  • apps/webapp/app/routes/api.v1.deployments.current.ts
  • apps/webapp/app/routes/realtime.v1.sessions.$session.$io.records.ts
  • apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
  • apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam.realtime.v1.$io.ts
  • apps/webapp/app/routes/resources.sessions.$sessionParam.close.ts
  • apps/webapp/app/routes/runs.$runParam.ts
  • apps/webapp/app/runEngine/concerns/queues.server.ts
  • apps/webapp/app/runEngine/services/triggerTask.server.ts
  • apps/webapp/app/runEngine/types.ts
  • apps/webapp/app/services/apiRateLimit.server.ts
  • apps/webapp/app/services/realtime/mintRunToken.server.ts
  • apps/webapp/app/services/realtime/s2realtimeStreams.server.ts
  • apps/webapp/app/services/realtime/sessionRunManager.server.ts
  • apps/webapp/app/services/runsReplicationService.server.ts
  • apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts
  • apps/webapp/app/services/runsRepository/runsRepository.server.ts
  • apps/webapp/app/services/sessionsRepository/clickhouseSessionsRepository.server.ts
  • apps/webapp/app/services/sessionsRepository/sessionsRepository.server.ts
  • apps/webapp/app/utils/pathBuilder.ts
  • apps/webapp/app/v3/services/createBackgroundWorker.server.ts
  • internal-packages/clickhouse/schema/029_add_task_kind_to_task_runs_v2.sql
  • internal-packages/clickhouse/src/taskRuns.test.ts
  • internal-packages/clickhouse/src/taskRuns.ts
  • internal-packages/database/prisma/migrations/20260329100903_add_agent_trigger_source_and_task_config/migration.sql
  • internal-packages/database/prisma/migrations/20260330113734_add_playground_conversation/migration.sql
  • internal-packages/database/prisma/migrations/20260330135232_add_messages_and_last_event_id_to_playground/migration.sql
  • internal-packages/database/prisma/schema.prisma
  • packages/core/src/v3/apiClient/errors.ts
  • packages/core/src/v3/apiClient/runStream.test.ts
  • packages/core/src/v3/apiClient/runStream.ts
  • packages/core/src/v3/inputStreams/index.ts
  • packages/core/src/v3/inputStreams/manager.ts
  • packages/core/src/v3/inputStreams/noopManager.ts
  • packages/core/src/v3/inputStreams/types.ts
  • packages/core/src/v3/realtime-streams-api.ts
  • packages/core/src/v3/realtimeStreams/index.ts
  • packages/core/src/v3/realtimeStreams/manager.ts
  • packages/core/src/v3/realtimeStreams/noopManager.ts
  • packages/core/src/v3/realtimeStreams/sessionStreamInstance.ts
  • packages/core/src/v3/realtimeStreams/streamInstance.ts
  • packages/core/src/v3/realtimeStreams/streamsWriterV1.ts
  • packages/core/src/v3/realtimeStreams/streamsWriterV2.test.ts
  • packages/core/src/v3/realtimeStreams/streamsWriterV2.ts
  • packages/core/src/v3/realtimeStreams/types.ts
  • packages/core/src/v3/schemas/api.ts
  • packages/core/src/v3/schemas/build.ts
  • packages/core/src/v3/schemas/resources.ts
  • packages/core/src/v3/schemas/runEngine.ts
  • packages/core/src/v3/schemas/schemas.ts
  • packages/core/src/v3/semanticInternalAttributes.ts
  • packages/core/src/v3/session-streams-api.ts
  • packages/core/src/v3/sessionStreams/index.ts
  • packages/core/src/v3/sessionStreams/manager.test.ts
  • packages/core/src/v3/sessionStreams/manager.ts
  • packages/core/src/v3/sessionStreams/noopManager.ts
  • packages/core/src/v3/sessionStreams/types.ts
  • packages/core/src/v3/test/test-input-stream-manager.ts
  • packages/core/src/v3/test/test-realtime-streams-manager.ts
  • packages/core/src/v3/test/test-run-metadata-manager.ts
  • packages/core/src/v3/test/test-session-stream-manager.ts
  • packages/core/src/v3/types/tasks.ts
  • packages/core/src/v3/utils/globals.ts
  • packages/core/src/v3/workers/index.ts
  • packages/core/src/v3/workers/taskExecutor.ts
  • packages/core/test/runStream.test.ts

Comment on lines +1 to +3
import { CheckCircleIcon, ClockIcon } from "@heroicons/react/20/solid";
import assertNever from "assert-never";
import { type SessionStatus } from "~/services/sessionsRepository/sessionsRepository.server";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify the server-only import and locate a shared non-server SessionStatus definition.
rg -n 'sessionsRepository\.server' apps/webapp/app/components/sessions/v1/SessionStatus.tsx
rg -n '\b(SessionStatus|allSessionStatuses)\b' apps/webapp/app packages/core -g '!**/*.server.ts'

Repository: triggerdotdev/trigger.dev

Length of output: 3838


Replace the .server module import with the type from @trigger.dev/core.

SessionStatus.tsx is a client-side component but imports SessionStatus from a .server module, which violates Remix's server/client boundary and will fail bundling. Import the type from packages/core/src/v3/schemas/api.ts instead, which provides the same SessionStatus type definition.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/webapp/app/components/sessions/v1/SessionStatus.tsx` around lines 1 - 3,
SessionStatus.tsx currently imports the SessionStatus type from a ".server"
module which breaks the client/server boundary; change the import to pull the
type from the core package (import type { SessionStatus } from
"@trigger.dev/core") so the component remains client-safe, and use a type-only
import to avoid bundling server code while keeping existing symbols like
SessionStatus, CheckCircleIcon, and ClockIcon unchanged.

Comment on lines +163 to +166
? this.replica.taskRun.findMany({
where: { id: { in: currentRunIds } },
select: { id: true, friendlyId: true },
})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Scope the current-run lookup to the same project/environment.

Session.currentRunId is a plain string column with no FK, so resolving runs by id alone can surface another tenant’s run if that pointer is ever stale or corrupted. This follow-up lookup should be constrained to the same projectId and environmentId.

Suggested fix
         return currentRunIds.length > 0
           ? this.replica.taskRun.findMany({
-              where: { id: { in: currentRunIds } },
+              where: {
+                id: { in: currentRunIds },
+                projectId,
+                runtimeEnvironmentId: environmentId,
+              },
               select: { id: true, friendlyId: true },
             })
           : [];
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/webapp/app/presenters/v3/SessionListPresenter.server.ts` around lines
163 - 166, The current lookup using this.replica.taskRun.findMany({ where: { id:
{ in: currentRunIds } }, ... }) is unsafe because Session.currentRunId has no FK
and can return runs from other projects/environments; update the where clause to
also constrain by projectId and environmentId (e.g. include projectId: { in:
currentProjectIds } and environmentId: { in: currentEnvironmentIds } or match
the session's projectId/environmentId context) so the findMany call only returns
taskRun rows belonging to the same project/environment as the sessions; adjust
how currentRunIds are grouped/mapped if needed to ensure correct scoping for
this.replica.taskRun.findMany and references to Session.currentRunId.

Comment on lines +40 to +42
const environment = await findEnvironmentBySlug(project.id, envParam, userId);
if (!environment) {
throw new Error("Environment not found");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Return a user-facing 404 here instead of throwing a generic error.

Line 42 turns a bad or unauthorized environment slug into an unhandled route error, while the adjacent project-not-found path already uses user-facing handling.

Suggested fix
   const environment = await findEnvironmentBySlug(project.id, envParam, userId);
   if (!environment) {
-    throw new Error("Environment not found");
+    throw new Response("Environment not found", { status: 404 });
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const environment = await findEnvironmentBySlug(project.id, envParam, userId);
if (!environment) {
throw new Error("Environment not found");
const environment = await findEnvironmentBySlug(project.id, envParam, userId);
if (!environment) {
throw new Response("Environment not found", { status: 404 });
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@apps/webapp/app/routes/_app.orgs`.$organizationSlug.projects.$projectParam.env.$envParam.sessions._index/route.tsx
around lines 40 - 42, Replace the generic throw in the environment lookup with a
user-facing 404 response: when findEnvironmentBySlug(project.id, envParam,
userId) returns falsy, throw or return a Response with status 404 (e.g., throw
new Response("Environment not found", { status: 404 })) so the route renders a
proper Not Found page instead of bubbling a generic Error; update the branch
that currently throws new Error("Environment not found") to use this 404
Response (referencing findEnvironmentBySlug and the environment check in the
route handler).

Comment on lines +72 to +75
const environment = await findEnvironmentBySlug(project.id, envParam, userId);
if (!environment) {
throw new Error("Environment not found");
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Handle a missing environment as a 404, not a generic error.

Line 74 makes an invalid or unauthorized environment slug fall into the route error boundary/500 path instead of returning a user-facing not-found response.

Suggested fix
   const environment = await findEnvironmentBySlug(project.id, envParam, userId);
   if (!environment) {
-    throw new Error("Environment not found");
+    throw new Response("Environment not found", { status: 404 });
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const environment = await findEnvironmentBySlug(project.id, envParam, userId);
if (!environment) {
throw new Error("Environment not found");
}
const environment = await findEnvironmentBySlug(project.id, envParam, userId);
if (!environment) {
throw new Response("Environment not found", { status: 404 });
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@apps/webapp/app/routes/_app.orgs`.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam/route.tsx
around lines 72 - 75, The code currently throws a generic Error when
findEnvironmentBySlug returns null; change this to return a 404 response so
missing/unauthorized env slugs hit the not-found UI instead of the error
boundary. Replace the throw new Error("Environment not found") in the block
after await findEnvironmentBySlug(project.id, envParam, userId) with a 404
response (e.g., throw new Response("Not Found", { status: 404 }) or use your
framework's notFound() helper) so the route returns a proper 404 for a missing
environment.

Comment on lines +20 to +33
select: {
deployment: {
select: {
friendlyId: true,
createdAt: true,
shortCode: true,
version: true,
runtime: true,
runtimeVersion: true,
status: true,
deployedAt: true,
git: true,
errorData: true,
},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Return updatedAt to match the shared response schema.

RetrieveCurrentDeploymentResponseBody now reuses ApiDeploymentListResponseItem, which requires updatedAt, but this loader neither selects nor returns it. Clients validating against the shared schema will fail on this endpoint until the field is included.

Suggested fix
         select: {
           deployment: {
             select: {
               friendlyId: true,
               createdAt: true,
+              updatedAt: true,
               shortCode: true,
               version: true,
               runtime: true,
               runtimeVersion: true,
               status: true,
@@
     return json({
       id: deployment.friendlyId,
       createdAt: deployment.createdAt,
+      updatedAt: deployment.updatedAt,
       shortCode: deployment.shortCode,
       version: deployment.version,
       runtime: deployment.runtime,

Also applies to: 42-53

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/webapp/app/routes/api.v1.deployments.current.ts` around lines 20 - 33,
The loader is missing the updatedAt field required by
ApiDeploymentListResponseItem/ RetrieveCurrentDeploymentResponseBody; update the
deployment select in this route so the nested deployment.select includes
updatedAt (add updatedAt: true wherever deployment is being selected in this
file) so the returned object matches the shared response schema.

Comment on lines +276 to 278
cancel() {
self.options.onComplete?.();
},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Stream cancel does not terminate in-flight connection/retry work

On Line 276, cancel() only calls onComplete; it does not abort the active fetch/read or pending backoff. This can leave background reconnect logic running after consumer cancellation.

Suggested fix
 export class SSEStreamSubscription implements StreamSubscription {
+  private cancelled = false;
   private lastEventId: string | undefined;
   private retryCount = 0;
@@
   async subscribe(): Promise<ReadableStream<SSEStreamPart>> {
     const self = this;

     return new ReadableStream({
       async start(controller) {
         await self.connectStream(controller);
       },
       cancel() {
+        self.cancelled = true;
+        self.internalAbort?.abort();
+        self.retryNowController?.abort();
         self.options.onComplete?.();
       },
     });
   }
@@
-      if (this.options.signal?.aborted) {
+      if (this.options.signal?.aborted || this.cancelled) {
         controller.close();
         this.options.onComplete?.();
         return;
       }
@@
-    if (this.options.signal?.aborted) {
+    if (this.options.signal?.aborted || this.cancelled) {
       controller.close();
       this.options.onComplete?.();
       return;
     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/apiClient/runStream.ts` around lines 276 - 278, The
cancel() implementation currently only calls self.options.onComplete and doesn't
stop the active fetch/read or any pending backoff/reconnects; update cancel()
(in runStream.ts) to: signal an AbortController used by the active fetch/read
loop (e.g., a stored this/ self.abortController) so the in-flight fetch/read is
aborted, clear or cancel any pending retry/backoff timers (e.g., this/
self.backoffTimer or backoff handle) and set a cancelled flag (e.g., this/
self.isCancelled) to prevent further reconnect attempts, then call
self.options.onComplete?.(); ensure any code that starts fetch/retry checks the
cancelled flag and uses the same AbortController so cancellation fully
terminates background work.

Comment on lines +18 to +32
vi.mock("@s2-dev/streamstore", async (importOriginal) => {
const actual = await importOriginal<typeof import("@s2-dev/streamstore")>();
return {
...actual,
S2: class FakeS2 {
basin() {
return {
stream: () => ({
appendSession,
}),
};
}
},
};
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Avoid vi.mock() here; test this path without module mocking.

This mock sidesteps the exact appendSession() / lastAckedPosition() behavior that StreamsWriterV2.wait() now depends on, and it also conflicts with the repo’s test rules. A small extraction of the JSON-envelope/size-check logic into a pure helper would let you cover the oversize branch without mocking @s2-dev/streamstore.

As per coding guidelines, "Use vitest exclusively for testing. Never mock anything - use testcontainers instead."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/realtimeStreams/streamsWriterV2.test.ts` around lines 18
- 32, Remove the vi.mock block and test this path without mocking; instead
extract the JSON-envelope/size-check logic from StreamsWriterV2.wait() into a
pure helper (e.g., computeEnvelopeSize or isEnvelopeOversize) and update
StreamsWriterV2.wait() to call that helper so you can unit-test the oversize
branch directly in streamsWriterV2.test.ts. For tests that require real
appendSession/lastAckedPosition behavior, replace the mocked module usage with
an integration test using testcontainers to start the real `@s2-dev/streamstore`
service and assert StreamsWriterV2.wait() behavior against the real
appendSession/lastAckedPosition semantics. Ensure references to appendSession,
lastAckedPosition, and StreamsWriterV2.wait are updated to call the new helper
and to use testcontainers-based integration tests instead of vi.mock.

Comment on lines +1498 to +1503
/** Per-run wall-clock cap (seconds). Forwarded to `TaskRunOptions.maxDuration`. */
maxDuration: z.number().int().positive().optional(),
/** Pin every run to a specific worker version. Forwarded to `TaskRunOptions.lockToVersion`. */
lockToVersion: z.string().optional(),
/** Region to schedule runs in. Forwarded to `TaskRunOptions.region`. */
region: z.string().optional(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Match session maxDuration to the existing 5-second floor.

TriggerOptions.maxDuration is documented in this file as requiring at least 5 seconds, but SessionTriggerConfig.maxDuration currently accepts 1..4. That lets callers persist a session config that only fails later when the session tries to trigger a run.

Suggested fix
-  maxDuration: z.number().int().positive().optional(),
+  maxDuration: z.number().int().min(5).optional(),

As per coding guidelines: {packages/core,apps/webapp}/**/*.{ts,tsx} should use Zod for validation in packages/core and apps/webapp.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/** Per-run wall-clock cap (seconds). Forwarded to `TaskRunOptions.maxDuration`. */
maxDuration: z.number().int().positive().optional(),
/** Pin every run to a specific worker version. Forwarded to `TaskRunOptions.lockToVersion`. */
lockToVersion: z.string().optional(),
/** Region to schedule runs in. Forwarded to `TaskRunOptions.region`. */
region: z.string().optional(),
/** Per-run wall-clock cap (seconds). Forwarded to `TaskRunOptions.maxDuration`. */
maxDuration: z.number().int().min(5).optional(),
/** Pin every run to a specific worker version. Forwarded to `TaskRunOptions.lockToVersion`. */
lockToVersion: z.string().optional(),
/** Region to schedule runs in. Forwarded to `TaskRunOptions.region`. */
region: z.string().optional(),
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/schemas/api.ts` around lines 1498 - 1503,
SessionTriggerConfig.maxDuration allows values 1–4 but must match
TriggerOptions.maxDuration's 5-second minimum; update the Zod schema for
SessionTriggerConfig.maxDuration in packages/core/src/v3/schemas/api.ts to
enforce an integer minimum of 5 seconds (e.g., use .int().min(5) or
.int().gte(5) before .optional()), ensuring the same validation as
TriggerOptions.maxDuration so invalid configs are rejected at persist time.

Comment on lines +287 to +351
async #runTail(
sessionId: string,
io: SessionChannelIO,
signal: AbortSignal
): Promise<void> {
const key = keyFor(sessionId, io);
try {
const lastSeq = this.seqNums.get(key);
// Dispatch is driven from `onPart` (not the for-await loop) so each
// record reaches dispatch with its full SSE metadata in scope —
// specifically the timestamp, which we need for the per-stream
// min-timestamp filter. The for-await loop below just drains the
// pipeThrough output to keep the source flowing.
const stream = await this.apiClient.subscribeToSessionStream<unknown>(sessionId, io, {
signal,
baseUrl: this.baseUrl,
timeoutInSeconds: 600,
lastEventId: lastSeq !== undefined ? String(lastSeq) : undefined,
onPart: (part) => {
if (signal.aborted) return;
const seqNum = parseInt(part.id, 10);
if (Number.isFinite(seqNum)) {
this.seqNums.set(key, seqNum);
}

// Min-timestamp filter: drop records older than (or at) the
// bound. Used to skip already-processed records on OOM-retry
// boot.
const minTs = this.minTimestamps.get(key);
if (minTs !== undefined && part.timestamp <= minTs) {
return;
}

let data: unknown = part.chunk;
if (typeof data === "string") {
try {
data = JSON.parse(data);
} catch {
// keep as string
}
}
this.#dispatch(key, data);
},
onComplete: () => {
if (this.debug) {
console.log(`[SessionStreamManager] Tail completed for "${key}"`);
}
},
onError: (error) => {
if (this.debug) {
console.error(`[SessionStreamManager] Tail error for "${key}":`, error);
}
},
});

// Drain to keep the pipeThrough flowing. Records were already
// dispatched in `onPart`, so the body here is a no-op.
for await (const _record of stream) {
if (signal.aborted) break;
}
} catch (error) {
if (error instanceof Error && error.name === "AbortError") return;
throw error;
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Compare reconnect/backoff behavior between InputStreamManager and SessionStreamManager
fd -t f 'manager.ts' packages/core/src/v3/inputStreams packages/core/src/v3/sessionStreams --exec cat {}

Repository: triggerdotdev/trigger.dev

Length of output: 27244


🏁 Script executed:

# Check if there's any backoff/retry delay mechanism in either manager or apiClient
rg -t ts "backoff|delay|setTimeout.*reconnect|minDelay|exponential" packages/core/src/v3/

Repository: triggerdotdev/trigger.dev

Length of output: 16648


🏁 Script executed:

# Find subscribeToSessionStream definition
rg -t ts "subscribeToSessionStream" packages/core/src/v3 -A 5 | head -50

Repository: triggerdotdev/trigger.dev

Length of output: 2242


🏁 Script executed:

# Find the actual subscribeToSessionStream definition in apiClient
fd -t f -name "*.ts" packages/core/src/v3/apiClient --exec grep -l "subscribeToSessionStream" {}

Repository: triggerdotdev/trigger.dev

Length of output: 300


🏁 Script executed:

# Find the subscribeToSessionStream method definition
rg -t ts "subscribeToSessionStream\s*\(" packages/core/src/v3/apiClient -A 30

Repository: triggerdotdev/trigger.dev

Length of output: 51


🏁 Script executed:

# Search for ApiClient class and subscribeToSessionStream method
rg -t ts "class ApiClient|subscribeToSessionStream" packages/core/src/v3 -B 2 -A 10

Repository: triggerdotdev/trigger.dev

Length of output: 6608


🏁 Script executed:

# Get the full ApiClient class to find subscribeToSessionStream method
rg -t ts "subscribeToSessionStream" packages/core/src/v3/apiClient/index.ts -A 25 -B 2

Repository: triggerdotdev/trigger.dev

Length of output: 51


🏁 Script executed:

# Search all files for subscribeToSessionStream definition
rg -t ts "subscribeToSessionStream.*\{" packages/core/src/v3 -A 30

Repository: triggerdotdev/trigger.dev

Length of output: 2591


🏁 Script executed:

# Search for the method signature in ApiClient
rg -t ts "async subscribeToSessionStream|subscribeToSessionStream\s*<" packages/core/src/v3/apiClient/ -A 20

Repository: triggerdotdev/trigger.dev

Length of output: 51


🏁 Script executed:

# Search for subscribeToSessionStream in type definitions
rg -t ts "subscribeToSessionStream" packages/core/src/v3 --type-list | head -20

Repository: triggerdotdev/trigger.dev

Length of output: 459


🏁 Script executed:

# Look at the ApiClient index file structure
head -200 packages/core/src/v3/apiClient/index.ts | tail -100

Repository: triggerdotdev/trigger.dev

Length of output: 2479


🏁 Script executed:

# Find where subscribeToSessionStream is defined in apiClient
grep -n "subscribeToSessionStream" packages/core/src/v3/apiClient/index.ts

Repository: triggerdotdev/trigger.dev

Length of output: 51


🏁 Script executed:

# List all files in apiClient directory
ls -la packages/core/src/v3/apiClient/

Repository: triggerdotdev/trigger.dev

Length of output: 887


🏁 Script executed:

# Search for subscribeToSessionStream method in index.ts
rg "subscribeToSessionStream" packages/core/src/v3/apiClient/index.ts

Repository: triggerdotdev/trigger.dev

Length of output: 51


🏁 Script executed:

# Search all of packages/core for the definition of subscribeToSessionStream
rg "subscribeToSessionStream.*=" packages/core/src/v3 -A 5

Repository: triggerdotdev/trigger.dev

Length of output: 51


🏁 Script executed:

# Search for subscribeToSessionStream definition with more flexibility
rg "subscribeToSessionStream" packages/core/src/v3 -B 2 -A 15 | head -60

Repository: triggerdotdev/trigger.dev

Length of output: 5405


🏁 Script executed:

# Search entire codebase for the method implementation
rg "subscribeToSessionStream\s*\(" packages/core/src -A 20 | grep -v "test.ts" | grep -v "await this.apiClient"

Repository: triggerdotdev/trigger.dev

Length of output: 51


Add backoff and error handling to prevent reconnect loops on persistent failures.

In #runTail (lines 335-339), onError only logs (when debug is on) and never aborts or rethrows. If subscribeToSessionStream fails with a non-abort error, the for-await loop exits normally, #runTail returns, and the .finally reconnect path at lines 251-260 immediately re-attaches whenever hasHandlers || hasWaiters. With a persistent failure (auth rejected, 5xx, DNS, etc.) and an on() handler installed, this creates an unbounded tight reconnect loop with no backoff.

The same issue exists in StandardInputStreamManager—both managers lack exponential backoff on reconnect failures. Consider adding minimum backoff before reconnect, capping retry attempts, or surfacing terminal errors through persistent handlers / pending once() waiters so callers can react.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/sessionStreams/manager.ts` around lines 287 - 351, The
tail loop in `#runTail` uses subscribeToSessionStream with an onError that only
logs, causing immediate reconnection loops when persistent errors occur; update
onError to surface terminal failures by aborting the provided AbortSignal or
throwing a distinct error so the outer caller can detect a failure, and
implement exponential backoff with a capped delay and retry limit in the
reconnect logic that currently decides to re-attach when hasHandlers ||
hasWaiters (the same pattern should be applied to StandardInputStreamManager).
Concretely: modify the onError handler passed into subscribeToSessionStream to
call signal.abort() or forward a non-Abort terminal Error to cause `#runTail` to
exit with an error, then in the surrounding reconnect/finally path add a backoff
loop (backoff base, multiplier, maxDelay, and maxAttempts) before re-invoking
`#runTail`; ensure seqNums/minTimestamps/#dispatch behavior is unchanged and that
aborts still return cleanly for AbortError.

Comment on lines +650 to +672
/**
* Trigger a task and subscribe to its updates via realtime. Unlike `triggerAndWait`,
* this does NOT suspend the parent run — the parent stays alive and polls for updates.
* This enables parallel tool calls and proper abort signal handling.
*
* @param payload
* @param options - Options for the task run, including an optional `signal` to cancel the subscription and child run
* @returns TaskRunPromise
* @example
* ```
* const result = await task.triggerAndSubscribe({ foo: "bar" }, { signal: abortSignal });
*
* if (result.ok) {
* console.log(result.output);
* } else {
* console.error(result.error);
* }
* ```
*/
triggerAndSubscribe: (
payload: TInput,
options?: TriggerAndSubscribeOptions,
) => TaskRunPromise<TIdentifier, TOutput>;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Add a changeset for this SDK surface change.

triggerAndSubscribe()/TriggerAndSubscribeOptions extend the public packages/core API, so this PR needs a changeset before merge or consumers won't get a versioned release note for the new surface.

Based on learnings: When modifying any public package (packages/* or integrations/*), add a changeset using pnpm run changeset:add.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/v3/types/tasks.ts` around lines 650 - 672, This PR adds a
new public API surface (triggerAndSubscribe and TriggerAndSubscribeOptions) in
packages/core, so add a changeset describing this public API addition: run `pnpm
run changeset:add`, select the "packages/core" package, choose a version bump
(likely patch), and write a short summary noting the new
Task.triggerAndSubscribe and TriggerAndSubscribeOptions export so consumers
receive a release note; commit the generated changeset file with the PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant