diff --git a/apps/docs/content/docs/en/api-reference/(generated)/workflows/meta.json b/apps/docs/content/docs/en/api-reference/(generated)/workflows/meta.json index cf845b451b9..d75b3184cd4 100644 --- a/apps/docs/content/docs/en/api-reference/(generated)/workflows/meta.json +++ b/apps/docs/content/docs/en/api-reference/(generated)/workflows/meta.json @@ -1,3 +1,10 @@ { - "pages": ["executeWorkflow", "cancelExecution", "listWorkflows", "getWorkflow", "getJobStatus"] + "pages": [ + "executeWorkflow", + "getWorkflowExecution", + "cancelExecution", + "listWorkflows", + "getWorkflow", + "getJobStatus" + ] } diff --git a/apps/docs/openapi.json b/apps/docs/openapi.json index 04e135f4aaf..0eb9df7846c 100644 --- a/apps/docs/openapi.json +++ b/apps/docs/openapi.json @@ -169,6 +169,158 @@ } } }, + "/api/workflows/{id}/executions/{executionId}": { + "get": { + "operationId": "getWorkflowExecution", + "summary": "Get Execution Status", + "description": "Get the current status of a workflow execution. Returns the run's lifecycle state (`running`, `paused`, `completed`, `failed`, etc.), timing, error, and optionally per-block outputs. Designed for polling \u2014 works for any execution, including ones that pause and resume.", + "tags": ["Workflows"], + "x-codeSamples": [ + { + "id": "curl", + "label": "cURL", + "lang": "bash", + "source": "curl \\\n \"https://www.sim.ai/api/workflows/{id}/executions/{executionId}\" \\\n -H \"X-API-Key: YOUR_API_KEY\"" + }, + { + "id": "curl-with-outputs", + "label": "cURL (with block outputs)", + "lang": "bash", + "source": "curl \\\n \"https://www.sim.ai/api/workflows/{id}/executions/{executionId}?selectedOutputs=blockId,blockId.field&includeOutput=true\" \\\n -H \"X-API-Key: YOUR_API_KEY\"" + } + ], + "parameters": [ + { + "name": "id", + "in": "path", + "required": true, + "description": "The unique identifier of the workflow.", + "schema": { + "type": "string", + "example": "wf_1a2b3c4d5e" + } + }, + { + "name": "executionId", + "in": "path", + "required": true, + "description": "The unique identifier of the execution.", + "schema": { + "type": "string", + "example": "exec_9f8e7d6c5b" + } + }, + { + "name": "includeOutput", + "in": "query", + "required": false, + "description": "When `true` and the execution has `status: completed`, include the workflow's final output in the response.", + "schema": { + "type": "string", + "enum": ["true", "false"] + } + }, + { + "name": "selectedOutputs", + "in": "query", + "required": false, + "description": "Comma-separated block-output selectors. A bare `blockId` returns that block's full output; a dot-path like `blockId.field` or `blockId.nested.path` returns just that value. Results are returned in the `blockOutputs` map keyed by the selector string.", + "schema": { + "type": "string", + "example": "c1b90bce-8a82-42a5-b6a5-5762846c2eaf,c1b90bce-8a82-42a5-b6a5-5762846c2eaf.waitDuration" + } + } + ], + "responses": { + "200": { + "description": "Execution status returned.", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/WorkflowExecutionStatus" + }, + "examples": { + "completed": { + "summary": "Completed run", + "value": { + "executionId": "9254f1c9-5a11-4a12-91e3-8065293f3609", + "workflowId": "81f661e1-d704-4861-b5c1-5bb3cf57e6a7", + "status": "completed", + "trigger": "api", + "level": "info", + "startedAt": "2026-05-15T19:43:12.189Z", + "endedAt": "2026-05-15T19:45:45.224Z", + "totalDurationMs": 153035, + "paused": null, + "cost": { + "total": 0.005 + }, + "error": null, + "finalOutput": null, + "blockOutputs": null + } + }, + "paused": { + "summary": "Currently paused run", + "value": { + "executionId": "772749f6-ee81-414c-a2c3-671549dd62b8", + "workflowId": "81f661e1-d704-4861-b5c1-5bb3cf57e6a7", + "status": "paused", + "trigger": "manual", + "level": "info", + "startedAt": "2026-05-15T22:25:57.178Z", + "endedAt": "2026-05-15T22:25:57.215Z", + "totalDurationMs": 1, + "paused": { + "pausedAt": "2026-05-15T22:25:57.216Z", + "resumeAt": "2026-05-16T18:25:57.200Z", + "pauseKind": "time", + "blockedOnBlockId": "c1b90bce-8a82-42a5-b6a5-5762846c2eaf", + "pausedExecutionId": "438bf05b-bd3c-4011-b78e-b19c112eeb66", + "pausePointCount": 1, + "resumedCount": 0 + }, + "cost": { + "total": 0.005 + }, + "error": null, + "finalOutput": null, + "blockOutputs": null + } + }, + "failed": { + "summary": "Failed run", + "value": { + "executionId": "3ccfdeed-a63c-4e86-98e2-8bec723bca52", + "workflowId": "81f661e1-d704-4861-b5c1-5bb3cf57e6a7", + "status": "failed", + "trigger": "api", + "level": "error", + "startedAt": "2026-05-15T22:24:50.991Z", + "endedAt": "2026-05-15T22:24:50.999Z", + "totalDurationMs": 2, + "paused": null, + "cost": { + "total": 0.005 + }, + "error": "Wait 1: Wait time exceeds maximum of 5 minutes; enable async mode to wait up to 30 days", + "finalOutput": null, + "blockOutputs": null + } + } + } + } + } + }, + "401": { + "$ref": "#/components/responses/Unauthorized" + }, + "404": { + "$ref": "#/components/responses/NotFound" + } + } + } + }, "/api/workflows/{id}/executions/{executionId}/cancel": { "post": { "operationId": "cancelExecution", @@ -5831,6 +5983,141 @@ } } }, + "WorkflowExecutionStatus": { + "type": "object", + "description": "Current status of a workflow execution.", + "properties": { + "executionId": { + "type": "string", + "description": "The unique identifier of the execution.", + "example": "9254f1c9-5a11-4a12-91e3-8065293f3609" + }, + "workflowId": { + "type": "string", + "description": "The unique identifier of the workflow.", + "example": "81f661e1-d704-4861-b5c1-5bb3cf57e6a7" + }, + "status": { + "type": "string", + "enum": ["pending", "running", "paused", "completed", "failed", "cancelled"], + "description": "Current normalized lifecycle status. `paused` is set when a row exists in pausedExecutions with status `paused` or `partially_resumed`; otherwise the workflowExecutionLogs row's status field is used.", + "example": "completed" + }, + "trigger": { + "type": "string", + "enum": ["api", "manual", "schedule", "webhook", "chat"], + "description": "What triggered the execution.", + "example": "api" + }, + "level": { + "type": "string", + "enum": ["info", "warning", "error"], + "description": "Log level of the execution.", + "example": "info" + }, + "startedAt": { + "type": "string", + "format": "date-time", + "description": "ISO 8601 timestamp when execution started.", + "example": "2026-05-15T19:43:12.189Z" + }, + "endedAt": { + "type": "string", + "format": "date-time", + "nullable": true, + "description": "ISO 8601 timestamp when execution ended. Null while the run is in flight.", + "example": "2026-05-15T19:45:45.224Z" + }, + "totalDurationMs": { + "type": "integer", + "nullable": true, + "description": "Total duration of the execution in milliseconds. Null while the run is in flight.", + "example": 153035 + }, + "paused": { + "type": "object", + "nullable": true, + "description": "Pause-state details. Present only when status is `paused`.", + "properties": { + "pausedAt": { + "type": "string", + "format": "date-time", + "description": "ISO 8601 timestamp when the workflow was paused.", + "example": "2026-05-15T22:25:57.216Z" + }, + "resumeAt": { + "type": "string", + "format": "date-time", + "nullable": true, + "description": "Earliest scheduled resume time across active pause points. Null for human-only pauses.", + "example": "2026-05-16T18:25:57.200Z" + }, + "pauseKind": { + "type": "string", + "enum": ["time", "human"], + "nullable": true, + "description": "What kind of pause the workflow is waiting on.", + "example": "time" + }, + "blockedOnBlockId": { + "type": "string", + "nullable": true, + "description": "The block currently blocking resume.", + "example": "c1b90bce-8a82-42a5-b6a5-5762846c2eaf" + }, + "pausedExecutionId": { + "type": "string", + "description": "ID of the paused-execution row, useful for cross-referencing with the human-in-the-loop endpoints.", + "example": "438bf05b-bd3c-4011-b78e-b19c112eeb66" + }, + "pausePointCount": { + "type": "integer", + "description": "Total number of pause points recorded for this execution.", + "example": 1 + }, + "resumedCount": { + "type": "integer", + "description": "Number of pause points already resumed.", + "example": 0 + } + } + }, + "cost": { + "type": "object", + "nullable": true, + "description": "Cost summary. Detailed token / model breakdown lives on the /v1/logs detail endpoint.", + "properties": { + "total": { + "type": "number", + "description": "Total cost in USD.", + "example": 0.005 + } + } + }, + "error": { + "type": "string", + "nullable": true, + "description": "Error message. Present only when status is `failed`.", + "example": null + }, + "finalOutput": { + "type": "object", + "nullable": true, + "description": "The workflow's final output. Returned only when ?includeOutput=true AND status is `completed`.", + "example": null + }, + "blockOutputs": { + "type": "object", + "nullable": true, + "description": "Per-block outputs keyed by the selector string. Returned only when `?selectedOutputs` is set.", + "additionalProperties": true, + "example": { + "c1b90bce-8a82-42a5-b6a5-5762846c2eaf.waitDuration": 60000, + "c1b90bce-8a82-42a5-b6a5-5762846c2eaf.status": "completed" + } + } + } + }, "AuditLogEntry": { "type": "object", "description": "An enterprise audit log entry recording an action taken in the workspace.", diff --git a/apps/sim/app/api/resume/poll/route.ts b/apps/sim/app/api/resume/poll/route.ts index 12949569575..ad07ea009b5 100644 --- a/apps/sim/app/api/resume/poll/route.ts +++ b/apps/sim/app/api/resume/poll/route.ts @@ -3,7 +3,7 @@ import { pausedExecutions } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { toError } from '@sim/utils/errors' import { generateShortId } from '@sim/utils/id' -import { and, asc, eq, isNotNull, lte } from 'drizzle-orm' +import { and, asc, inArray, isNotNull, lte } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { verifyCronAuth } from '@/lib/auth/internal' import { acquireLock, releaseLock } from '@/lib/core/config/redis' @@ -62,7 +62,10 @@ export const GET = withRouteHandler(async (request: NextRequest) => { .from(pausedExecutions) .where( and( - eq(pausedExecutions.status, 'paused'), + // 'partially_resumed' rows occur when a chained-pause workflow advanced past + // an earlier wait — e.g. wait1 → agent → wait2 — and now wait2's time pause + // is the one waiting for the cron. Include it alongside fresh 'paused' rows. + inArray(pausedExecutions.status, ['paused', 'partially_resumed']), isNotNull(pausedExecutions.nextResumeAt), lte(pausedExecutions.nextResumeAt, now) ) diff --git a/apps/sim/app/api/workflows/[id]/executions/[executionId]/route.ts b/apps/sim/app/api/workflows/[id]/executions/[executionId]/route.ts new file mode 100644 index 00000000000..6efff82a7cc --- /dev/null +++ b/apps/sim/app/api/workflows/[id]/executions/[executionId]/route.ts @@ -0,0 +1,223 @@ +import { db } from '@sim/db' +import { pausedExecutions, workflowExecutionLogs } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, eq } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { + getWorkflowExecutionContract, + type WorkflowExecutionStatusResponse, +} from '@/lib/api/contracts/workflows' +import { parseRequest } from '@/lib/api/server' +import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { validateWorkflowAccess } from '@/app/api/workflows/middleware' +import type { PausePoint } from '@/executor/types' + +const logger = createLogger('WorkflowExecutionStatusAPI') + +type LogStatus = 'pending' | 'running' | 'completed' | 'failed' | 'cancelled' + +interface TraceSpanShape { + blockId?: string + output?: Record + children?: TraceSpanShape[] +} + +interface ExecutionDataShape { + finalOutput?: { error?: string } & Record + error?: { message?: string } | string + completionFailure?: string + traceSpans?: TraceSpanShape[] +} + +function collectBlockOutputs(spans: TraceSpanShape[] | undefined): Map { + const map = new Map() + const visit = (list?: TraceSpanShape[]): void => { + if (!list) return + for (const span of list) { + if (span.blockId && span.output !== undefined && !map.has(span.blockId)) { + map.set(span.blockId, span.output) + } + if (span.children) visit(span.children) + } + } + visit(spans) + return map +} + +function resolvePath(value: unknown, path: string[]): unknown { + let current: unknown = value + for (const segment of path) { + if (current == null || typeof current !== 'object') return undefined + current = (current as Record)[segment] + } + return current +} + +function pickSelectedOutputs( + selectedOutputs: string[], + blockOutputs: Map +): Record { + const out: Record = {} + for (const selector of selectedOutputs) { + const [head, ...rest] = selector.split('.') + if (!head) continue + if (!blockOutputs.has(head)) continue + const blockValue = blockOutputs.get(head) + out[selector] = rest.length === 0 ? blockValue : resolvePath(blockValue, rest) + } + return out +} + +function pickEarliestPausePoint(points: PausePoint[]): PausePoint | null { + const active = points.filter((p) => p.resumeStatus === 'paused') + if (active.length === 0) return null + return active.reduce((best, current) => { + if (!best) return current + if (!current.resumeAt) return best + if (!best.resumeAt) return current + return current.resumeAt < best.resumeAt ? current : best + }, null) +} + +function normalizePausePoints(raw: unknown): PausePoint[] { + if (!raw) return [] + if (Array.isArray(raw)) return raw as PausePoint[] + if (typeof raw === 'object') return Object.values(raw as Record) + return [] +} + +function extractError(executionData: unknown): string | null { + if (!executionData || typeof executionData !== 'object') return null + const data = executionData as ExecutionDataShape + if (typeof data.error === 'string') return data.error + if (data.error && typeof data.error === 'object' && typeof data.error.message === 'string') { + return data.error.message + } + if (typeof data.finalOutput?.error === 'string') return data.finalOutput.error + if (typeof data.completionFailure === 'string') return data.completionFailure + return null +} + +export const GET = withRouteHandler( + async ( + request: NextRequest, + context: { params: Promise<{ id: string; executionId: string }> } + ) => { + const parsed = await parseRequest(getWorkflowExecutionContract, request, context) + if (!parsed.success) return parsed.response + const { id: workflowId, executionId } = parsed.data.params + const { includeOutput, selectedOutputs } = parsed.data.query + + const access = await validateWorkflowAccess(request, workflowId, false) + if (access.error) { + return NextResponse.json({ error: access.error.message }, { status: access.error.status }) + } + + const [logRow] = await db + .select({ + executionId: workflowExecutionLogs.executionId, + workflowId: workflowExecutionLogs.workflowId, + status: workflowExecutionLogs.status, + level: workflowExecutionLogs.level, + trigger: workflowExecutionLogs.trigger, + startedAt: workflowExecutionLogs.startedAt, + endedAt: workflowExecutionLogs.endedAt, + totalDurationMs: workflowExecutionLogs.totalDurationMs, + executionData: workflowExecutionLogs.executionData, + cost: workflowExecutionLogs.cost, + }) + .from(workflowExecutionLogs) + .where( + and( + eq(workflowExecutionLogs.executionId, executionId), + eq(workflowExecutionLogs.workflowId, workflowId) + ) + ) + .limit(1) + + if (!logRow) { + return NextResponse.json({ error: 'Execution not found' }, { status: 404 }) + } + + const [pausedRow] = await db + .select({ + id: pausedExecutions.id, + status: pausedExecutions.status, + pausePoints: pausedExecutions.pausePoints, + resumedCount: pausedExecutions.resumedCount, + pausedAt: pausedExecutions.pausedAt, + nextResumeAt: pausedExecutions.nextResumeAt, + }) + .from(pausedExecutions) + .where(eq(pausedExecutions.executionId, executionId)) + .limit(1) + + const isCurrentlyPaused = + !!pausedRow && (pausedRow.status === 'paused' || pausedRow.status === 'partially_resumed') + + let status: WorkflowExecutionStatusResponse['status'] + if (isCurrentlyPaused) { + status = 'paused' + } else { + status = logRow.status as LogStatus + } + + let paused: WorkflowExecutionStatusResponse['paused'] = null + if (isCurrentlyPaused && pausedRow) { + const points = normalizePausePoints(pausedRow.pausePoints) + const earliest = pickEarliestPausePoint(points) + paused = { + pausedAt: pausedRow.pausedAt.toISOString(), + resumeAt: pausedRow.nextResumeAt?.toISOString() ?? earliest?.resumeAt ?? null, + pauseKind: earliest?.pauseKind ?? null, + blockedOnBlockId: earliest?.blockId ?? null, + pausedExecutionId: pausedRow.id, + pausePointCount: points.length, + resumedCount: pausedRow.resumedCount, + } + } + + const cost = logRow.cost + ? { total: Number((logRow.cost as { total?: number }).total ?? 0) } + : null + + const error = status === 'failed' ? extractError(logRow.executionData) : null + + const executionData = logRow.executionData as ExecutionDataShape | undefined + + const finalOutput = + includeOutput && status === 'completed' && executionData + ? (executionData.finalOutput ?? null) + : null + + const blockOutputs = + selectedOutputs.length > 0 + ? pickSelectedOutputs(selectedOutputs, collectBlockOutputs(executionData?.traceSpans)) + : null + + const response: WorkflowExecutionStatusResponse = { + executionId: logRow.executionId, + workflowId: logRow.workflowId ?? workflowId, + status, + trigger: logRow.trigger, + level: logRow.level, + startedAt: logRow.startedAt.toISOString(), + endedAt: logRow.endedAt?.toISOString() ?? null, + totalDurationMs: logRow.totalDurationMs ?? null, + paused, + cost, + error, + finalOutput, + blockOutputs, + } + + logger.debug('Fetched execution status', { + workflowId, + executionId, + status, + paused: !!paused, + }) + + return NextResponse.json(response) + } +) diff --git a/apps/sim/blocks/blocks/wait.ts b/apps/sim/blocks/blocks/wait.ts index 3f043df09f7..25f882025be 100644 --- a/apps/sim/blocks/blocks/wait.ts +++ b/apps/sim/blocks/blocks/wait.ts @@ -8,14 +8,13 @@ const WaitIcon = (props: SVGProps) => createElement(PauseCircle, export const WaitBlock: BlockConfig = { type: 'wait', name: 'Wait', - description: 'Pause workflow execution for up to 30 days', + description: 'Pause workflow execution for a time interval', longDescription: - 'Pauses workflow execution for a specified time interval. Waits up to five minutes are held in-process; longer waits suspend the workflow and resume automatically once the configured duration elapses.', + 'Pauses workflow execution for a specified time interval. By default the wait runs in-process for up to 5 minutes. Enable Async to pause the run on disk and resume automatically for waits up to 30 days.', bestPractices: ` - - Configure the wait amount and unit (seconds, minutes, hours, or days) - - Maximum wait duration is 30 days - - Waits up to 5 minutes execute in-process and are interruptible via workflow cancellation - - Longer waits suspend the workflow; the execution resumes automatically when the timer fires + - Configure the wait amount and unit + - Default mode runs in-process and caps at 5 minutes + - Enable Async for longer waits (up to 30 days); seconds are not available in this mode - Enter a positive number for the wait amount `, category: 'blocks', @@ -27,7 +26,7 @@ export const WaitBlock: BlockConfig = { id: 'timeValue', title: 'Wait Amount', type: 'short-input', - description: 'Max: 30 days', + description: 'Max 5 minutes (300 seconds). Enable Async for up to 30 days.', placeholder: '10', value: () => '10', required: true, @@ -39,24 +38,49 @@ export const WaitBlock: BlockConfig = { options: [ { label: 'Seconds', id: 'seconds' }, { label: 'Minutes', id: 'minutes' }, + ], + value: () => 'seconds', + required: true, + condition: { field: 'async', value: true, not: true }, + }, + { + id: 'timeUnitLong', + title: 'Unit', + type: 'dropdown', + options: [ + { label: 'Minutes', id: 'minutes' }, { label: 'Hours', id: 'hours' }, { label: 'Days', id: 'days' }, ], - value: () => 'seconds', + value: () => 'minutes', required: true, + condition: { field: 'async', value: true }, + }, + { + id: 'async', + title: 'Async', + type: 'switch', }, ], tools: { access: [], }, inputs: { + async: { + type: 'boolean', + description: 'Run the wait asynchronously to allow durations up to 30 days', + }, timeValue: { type: 'string', description: 'Wait duration value', }, timeUnit: { type: 'string', - description: 'Wait duration unit (seconds, minutes, hours, or days)', + description: 'Wait duration unit when async is off (seconds or minutes)', + }, + timeUnitLong: { + type: 'string', + description: 'Wait duration unit when async is on (minutes, hours, or days)', }, }, outputs: { diff --git a/apps/sim/executor/handlers/wait/wait-handler.test.ts b/apps/sim/executor/handlers/wait/wait-handler.test.ts index 9e1aa288d3e..2f5957299d2 100644 --- a/apps/sim/executor/handlers/wait/wait-handler.test.ts +++ b/apps/sim/executor/handlers/wait/wait-handler.test.ts @@ -120,12 +120,45 @@ describe('WaitBlockHandler', () => { ).rejects.toThrow('Unknown wait unit: fortnights') }) - it('should reject waits longer than the 30-day ceiling', async () => { + it('should reject async waits longer than the 30-day ceiling', async () => { await expect( - handler.execute(mockContext, mockBlock, { timeValue: '31', timeUnit: 'days' }) + handler.execute(mockContext, mockBlock, { + async: true, + timeValue: '31', + timeUnitLong: 'days', + }) ).rejects.toThrow('Wait time exceeds maximum of 30 days') }) + it('should reject synchronous waits longer than 5 minutes', async () => { + await expect( + handler.execute(mockContext, mockBlock, { timeValue: '10', timeUnit: 'minutes' }) + ).rejects.toThrow('Wait time exceeds maximum of 5 minutes') + }) + + it('should default the async unit to minutes when timeUnitLong is missing', async () => { + vi.setSystemTime(new Date('2026-04-28T00:00:00.000Z')) + + const result = (await handler.execute(mockContext, mockBlock, { + async: true, + timeValue: '3', + })) as Record + + const waitMs = 3 * 60 * 1000 + expect(result.waitDuration).toBe(waitMs) + expect(result.status).toBe('waiting') + }) + + it('should reject seconds as a unit in async mode', async () => { + await expect( + handler.execute(mockContext, mockBlock, { + async: true, + timeValue: '30', + timeUnitLong: 'seconds', + }) + ).rejects.toThrow('Seconds are not allowed in async mode') + }) + it('should still execute in-process at the 5-minute boundary', async () => { const inputs = { timeValue: '5', timeUnit: 'minutes' } @@ -144,7 +177,7 @@ describe('WaitBlockHandler', () => { it('should suspend the workflow when wait exceeds the in-process threshold', async () => { vi.setSystemTime(new Date('2026-04-28T00:00:00.000Z')) - const inputs = { timeValue: '10', timeUnit: 'minutes' } + const inputs = { async: true, timeValue: '10', timeUnitLong: 'minutes' } const result = (await handler.execute(mockContext, mockBlock, inputs)) as Record @@ -167,7 +200,7 @@ describe('WaitBlockHandler', () => { it('should suspend the workflow for multi-day waits', async () => { vi.setSystemTime(new Date('2026-04-28T00:00:00.000Z')) - const inputs = { timeValue: '2', timeUnit: 'days' } + const inputs = { async: true, timeValue: '2', timeUnitLong: 'days' } const result = (await handler.execute(mockContext, mockBlock, inputs)) as Record @@ -185,8 +218,9 @@ describe('WaitBlockHandler', () => { vi.setSystemTime(new Date('2026-04-28T00:00:00.000Z')) const result = (await handler.execute(mockContext, mockBlock, { + async: true, timeValue: '3', - timeUnit: 'hours', + timeUnitLong: 'hours', })) as Record const waitMs = 3 * 60 * 60 * 1000 @@ -237,8 +271,9 @@ describe('WaitBlockHandler', () => { mockContext.abortSignal = abortController.signal const result = (await handler.execute(mockContext, mockBlock, { + async: true, timeValue: '1', - timeUnit: 'hours', + timeUnitLong: 'hours', })) as Record expect(result.status).toBe('waiting') @@ -264,8 +299,9 @@ describe('WaitBlockHandler', () => { vi.setSystemTime(new Date('2026-04-28T00:00:00.000Z')) const result = (await handler.execute(mockContext, mockBlock, { + async: true, timeValue: '1.5', - timeUnit: 'days', + timeUnitLong: 'days', })) as Record const waitMs = 1.5 * 24 * 60 * 60 * 1000 @@ -273,4 +309,23 @@ describe('WaitBlockHandler', () => { expect(result.status).toBe('waiting') expect(result._pauseMetadata.pauseKind).toBe('time') }) + + it('should always suspend when async is enabled, even for short waits', async () => { + vi.setSystemTime(new Date('2026-04-28T00:00:00.000Z')) + + const result = (await handler.execute(mockContext, mockBlock, { + async: true, + timeValue: '2', + timeUnitLong: 'minutes', + })) as Record + + const waitMs = 2 * 60 * 1000 + const expectedResumeAt = new Date(Date.now() + waitMs).toISOString() + + expect(result.status).toBe('waiting') + expect(result.waitDuration).toBe(waitMs) + expect(result.resumeAt).toBe(expectedResumeAt) + expect(result._pauseMetadata.pauseKind).toBe('time') + expect(result._pauseMetadata.resumeAt).toBe(expectedResumeAt) + }) }) diff --git a/apps/sim/executor/handlers/wait/wait-handler.ts b/apps/sim/executor/handlers/wait/wait-handler.ts index 43fbc4a2f31..1e490869d9d 100644 --- a/apps/sim/executor/handlers/wait/wait-handler.ts +++ b/apps/sim/executor/handlers/wait/wait-handler.ts @@ -10,11 +10,11 @@ import type { SerializedBlock } from '@/serializer/types' const CANCELLATION_CHECK_INTERVAL_MS = 500 -/** Threshold below which we hold the wait in-process; above, we suspend via PauseMetadata. */ -const INPROCESS_MAX_MS = 5 * 60 * 1000 +/** Hard ceiling for in-process (synchronous) waits. */ +const MAX_INPROCESS_WAIT_MS = 5 * 60 * 1000 -/** Hard ceiling on configurable wait duration. */ -const MAX_WAIT_MS = 30 * 24 * 60 * 60 * 1000 +/** Hard ceiling for async waits. */ +const MAX_ASYNC_WAIT_MS = 30 * 24 * 60 * 60 * 1000 interface SleepOptions { signal?: AbortSignal @@ -91,10 +91,10 @@ function isWaitUnit(value: string): value is WaitUnit { /** * Handler for Wait blocks that pause workflow execution for a time delay. * - * Waits up to {@link INPROCESS_MAX_MS} are held in-process via an interruptible sleep. - * Longer waits suspend the workflow by returning {@link PauseMetadata} with - * `pauseKind: 'time'`; the cron-driven resume poller (see `/api/resume/poll`) picks - * the execution back up once `resumeAt` is reached. + * Default (async=false) waits are held in-process via an interruptible sleep and capped at 5 minutes. + * When async=true is set, the workflow is always suspended by returning {@link PauseMetadata} with + * `pauseKind: 'time'`; the cron-driven resume poller (see `/api/resume/poll`) picks the execution back + * up once `resumeAt` is reached. Async caps at 30 days. */ export class WaitBlockHandler implements BlockHandler { canHandle(block: SerializedBlock): boolean { @@ -124,8 +124,9 @@ export class WaitBlockHandler implements BlockHandler { executionOrder?: number } ): Promise { + const isAsync = inputs.async === true || inputs.async === 'true' const timeValue = Number.parseFloat(inputs.timeValue || '10') - const timeUnit = inputs.timeUnit || 'seconds' + const timeUnit = isAsync ? inputs.timeUnitLong || 'minutes' : inputs.timeUnit || 'seconds' if (!Number.isFinite(timeValue) || timeValue <= 0) { throw new Error('Wait amount must be a positive number') @@ -134,13 +135,24 @@ export class WaitBlockHandler implements BlockHandler { if (!isWaitUnit(timeUnit)) { throw new Error(`Unknown wait unit: ${timeUnit}`) } + + if (isAsync && timeUnit === 'seconds') { + throw new Error('Seconds are not allowed in async mode') + } + const waitMs = Math.round(timeValue * UNIT_TO_MS[timeUnit]) - if (waitMs > MAX_WAIT_MS) { - throw new Error('Wait time exceeds maximum of 30 days') + if (isAsync) { + if (waitMs > MAX_ASYNC_WAIT_MS) { + throw new Error('Wait time exceeds maximum of 30 days') + } + } else if (waitMs > MAX_INPROCESS_WAIT_MS) { + throw new Error( + 'Wait time exceeds maximum of 5 minutes; enable async mode to wait up to 30 days' + ) } - if (waitMs <= INPROCESS_MAX_MS) { + if (!isAsync) { const completed = await sleep(waitMs, { signal: ctx.abortSignal, executionId: ctx.executionId, diff --git a/apps/sim/lib/api/contracts/workflows.ts b/apps/sim/lib/api/contracts/workflows.ts index 9ee11a84bfe..f623bcdaf55 100644 --- a/apps/sim/lib/api/contracts/workflows.ts +++ b/apps/sim/lib/api/contracts/workflows.ts @@ -513,6 +513,61 @@ const pausedWorkflowExecutionDetailSchema = pausedWorkflowExecutionSummarySchema queue: z.array(z.record(z.string(), z.unknown())), }) +const workflowExecutionStatusEnum = z.enum([ + 'pending', + 'running', + 'paused', + 'completed', + 'failed', + 'cancelled', +]) + +const workflowExecutionPausedDetailSchema = z.object({ + pausedAt: z.string(), + resumeAt: z.string().nullable(), + pauseKind: z.enum(['time', 'human']).nullable(), + blockedOnBlockId: z.string().nullable(), + pausedExecutionId: z.string(), + pausePointCount: z.number(), + resumedCount: z.number(), +}) + +const workflowExecutionStatusResponseSchema = z.object({ + executionId: z.string(), + workflowId: z.string(), + status: workflowExecutionStatusEnum, + trigger: z.string(), + level: z.string(), + startedAt: z.string(), + endedAt: z.string().nullable(), + totalDurationMs: z.number().nullable(), + paused: workflowExecutionPausedDetailSchema.nullable(), + cost: z.object({ total: z.number() }).nullable(), + error: z.string().nullable(), + finalOutput: z.unknown().nullable(), + blockOutputs: z.record(z.string(), z.unknown()).nullable(), +}) + +export type WorkflowExecutionStatusResponse = z.output + +const workflowExecutionStatusQuerySchema = z.object({ + includeOutput: z + .enum(['true', 'false']) + .optional() + .transform((value) => value === 'true'), + selectedOutputs: z + .string() + .optional() + .transform((value) => + value + ? value + .split(',') + .map((s) => s.trim()) + .filter(Boolean) + : [] + ), +}) + const cancelWorkflowExecutionResponseSchema = z.object({ success: z.boolean(), executionId: z.string(), @@ -797,6 +852,17 @@ export const cancelWorkflowExecutionContract = defineRouteContract({ }, }) +export const getWorkflowExecutionContract = defineRouteContract({ + method: 'GET', + path: '/api/workflows/[id]/executions/[executionId]', + params: workflowExecutionParamsSchema, + query: workflowExecutionStatusQuerySchema, + response: { + mode: 'json', + schema: workflowExecutionStatusResponseSchema, + }, +}) + export const streamWorkflowExecutionContract = defineRouteContract({ method: 'GET', path: '/api/workflows/[id]/executions/[executionId]/stream', diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts index 96054e5108e..717a9f0fcc9 100644 --- a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts @@ -1770,9 +1770,11 @@ export class PauseResumeManager { } /** - * Updates `next_resume_at` only when the row is still `status='paused'`. + * Updates `next_resume_at` only when the row is still in a poll-eligible state. * Guard prevents the cron poller from clobbering a freshly-written value when a - * concurrent manual resume has already advanced the row's state. + * concurrent manual resume has already advanced the row's state. `partially_resumed` + * rows must also be writable so the cron poller can null out their `nextResumeAt` + * after dispatch; otherwise the row keeps reappearing in every poll batch. */ static async setNextResumeAt(args: { pausedExecutionId: string @@ -1782,7 +1784,10 @@ export class PauseResumeManager { .update(pausedExecutions) .set({ nextResumeAt: args.nextResumeAt }) .where( - and(eq(pausedExecutions.id, args.pausedExecutionId), eq(pausedExecutions.status, 'paused')) + and( + eq(pausedExecutions.id, args.pausedExecutionId), + inArray(pausedExecutions.status, ['paused', 'partially_resumed']) + ) ) } diff --git a/scripts/check-api-validation-contracts.ts b/scripts/check-api-validation-contracts.ts index a888f7cbaae..a2b994937e8 100644 --- a/scripts/check-api-validation-contracts.ts +++ b/scripts/check-api-validation-contracts.ts @@ -9,8 +9,8 @@ const QUERY_HOOKS_DIR = path.join(ROOT, 'apps/sim/hooks/queries') const SELECTOR_HOOKS_DIR = path.join(ROOT, 'apps/sim/hooks/selectors') const BASELINE = { - totalRoutes: 744, - zodRoutes: 744, + totalRoutes: 745, + zodRoutes: 745, nonZodRoutes: 0, } as const