diff --git a/.changeset/calm-queens-brush.md b/.changeset/calm-queens-brush.md new file mode 100644 index 0000000000..0618e4c459 --- /dev/null +++ b/.changeset/calm-queens-brush.md @@ -0,0 +1,5 @@ +--- +"@workflow/core": patch +--- + +test(core): cover cancel run spec version diff --git a/packages/core/src/runtime/runs.test.ts b/packages/core/src/runtime/runs.test.ts index 2c983e21c7..2928b72175 100644 --- a/packages/core/src/runtime/runs.test.ts +++ b/packages/core/src/runtime/runs.test.ts @@ -19,7 +19,7 @@ import { hydrateStepReturnValue, } from '../serialization.js'; import { Run } from './run.js'; -import { wakeUpRun } from './runs.js'; +import { cancelRun, wakeUpRun } from './runs.js'; import { setWorld } from './world.js'; function createMockWorld( @@ -108,6 +108,41 @@ describe('wakeUpRun', () => { }); }); +describe('cancelRun', () => { + it('should create a run_cancelled event with the run specVersion', async () => { + const world = createMockWorld({ run: { specVersion: 2 } }); + + await cancelRun(world, 'wrun_123'); + + expect(world.runs.get).toHaveBeenCalledWith('wrun_123', { + resolveData: 'none', + }); + expect(world.events.create).toHaveBeenCalledWith( + 'wrun_123', + { + eventType: 'run_cancelled', + specVersion: 2, + }, + { v1Compat: false } + ); + }); + + it('should use the legacy cancel endpoint for legacy runs', async () => { + const world = createMockWorld({ run: { specVersion: undefined } }); + + await cancelRun(world, 'wrun_123'); + + expect(world.events.create).toHaveBeenCalledWith( + 'wrun_123', + { + eventType: 'run_cancelled', + specVersion: 1, + }, + { v1Compat: true } + ); + }); +}); + describe('Run.exists', () => { afterEach(() => { setWorld(undefined as unknown as World); diff --git a/packages/world-postgres/src/queue.test.ts b/packages/world-postgres/src/queue.test.ts index dd2122ecde..2348401c6e 100644 --- a/packages/world-postgres/src/queue.test.ts +++ b/packages/world-postgres/src/queue.test.ts @@ -2,21 +2,19 @@ import { createServer, type Server } from 'node:http'; import { JsonTransport } from '@vercel/queue'; import { getWorkflowPort } from '@workflow/utils/get-port'; import { MessageId, type QueuePayload } from '@workflow/world'; +import { createLocalWorld } from '@workflow/world-local'; import { makeWorkerUtils, run, type WorkerUtils } from 'graphile-worker'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; -import { createLocalWorld } from '@workflow/world-local'; import { stepEntrypoint } from '../../core/dist/runtime/step-handler.js'; -import { createQueue } from './queue.js'; import { MessageData } from './message.js'; +import { createQueue } from './queue.js'; const transport = new JsonTransport(); const createdQueues: Array> = []; const createdServers: Server[] = []; vi.mock('graphile-worker', () => ({ - Logger: class Logger { - constructor(_: unknown) {} - }, + Logger: class Logger {}, makeWorkerUtils: vi.fn(), run: vi.fn(), })); @@ -46,8 +44,13 @@ describe('postgres queue http execution', () => { const wrappedHandler = vi.fn(async () => Response.json({ ok: true })); const localWorldClose = vi.fn(); const createQueueHandler = vi.fn(() => wrappedHandler); + const advisoryClient = { + query: vi.fn(), + release: vi.fn(), + }; const pool = { query: vi.fn(async () => ({ rows: [{ exists: false }] })), + connect: vi.fn(async () => advisoryClient), } as any; beforeEach(() => { @@ -62,6 +65,38 @@ describe('postgres queue http execution', () => { } as any); }); + it('serializes graphile migrations with an advisory lock', async () => { + const queue = buildQueue({ connectionString: 'postgres://test' }, pool); + + await queue.start(); + + expect(pool.connect).toHaveBeenCalled(); + expect(advisoryClient.query).toHaveBeenCalledWith( + 'SELECT pg_advisory_lock(hashtext($1))', + ['workflow_graphile_worker_migrate'] + ); + expect(advisoryClient.query).toHaveBeenCalledWith( + 'SELECT pg_advisory_unlock(hashtext($1))', + ['workflow_graphile_worker_migrate'] + ); + expect(advisoryClient.release).toHaveBeenCalledOnce(); + + const lockOrder = advisoryClient.query.mock.invocationCallOrder[0]; + const migrateOrder = vi.mocked(workerUtilsMock.migrate).mock + .invocationCallOrder[0]; + const unlockOrder = + advisoryClient.query.mock.invocationCallOrder[ + advisoryClient.query.mock.calls.findIndex( + ([query]) => query === 'SELECT pg_advisory_unlock(hashtext($1))' + ) + ]; + const releaseOrder = advisoryClient.release.mock.invocationCallOrder[0]; + + expect(lockOrder).toBeLessThan(migrateOrder); + expect(migrateOrder).toBeLessThan(unlockOrder); + expect(unlockOrder).toBeLessThan(releaseOrder); + }); + afterEach(async () => { await Promise.all(createdQueues.splice(0).map((queue) => queue.close())); await Promise.all( diff --git a/packages/world-postgres/src/queue.ts b/packages/world-postgres/src/queue.ts index 83624cc6b4..a79b59fcc7 100644 --- a/packages/world-postgres/src/queue.ts +++ b/packages/world-postgres/src/queue.ts @@ -17,7 +17,7 @@ import { run, type WorkerUtils, } from 'graphile-worker'; -import type { Pool } from 'pg'; +import type { Pool, PoolClient } from 'pg'; import { monotonicFactory } from 'ulid'; import { z } from 'zod/v4'; import type { PostgresWorldConfig } from './config.js'; @@ -43,6 +43,7 @@ function createGraphileLogger() { const graphileLogger = createGraphileLogger(); const COMPLETED_IDEMPOTENCY_CACHE_LIMIT = 10_000; +const GRAPHILE_MIGRATION_LOCK_KEY = 'workflow_graphile_worker_migrate'; const GraphileHelpers = z.object({ job: z.object({ attempts: z.number().int().positive(), @@ -329,16 +330,38 @@ export function createQueue( } } + async function withGraphileMigrationLock( + fn: () => Promise + ): Promise { + const client: PoolClient = await pool.connect(); + try { + await client.query('SELECT pg_advisory_lock(hashtext($1))', [ + GRAPHILE_MIGRATION_LOCK_KEY, + ]); + try { + return await fn(); + } finally { + await client.query('SELECT pg_advisory_unlock(hashtext($1))', [ + GRAPHILE_MIGRATION_LOCK_KEY, + ]); + } + } finally { + client.release(); + } + } + async function start(): Promise { if (!startPromise) { startPromise = (async () => { try { - workerUtils = await makeWorkerUtils({ - pgPool: pool, - logger: graphileLogger, + await withGraphileMigrationLock(async () => { + workerUtils = await makeWorkerUtils({ + pgPool: pool, + logger: graphileLogger, + }); + await workerUtils.migrate(); + await migratePgBossJobs(workerUtils); }); - await workerUtils.migrate(); - await migratePgBossJobs(workerUtils); await setupListeners(); } catch (err) { startPromise = null;