Skip to content

Commit d49e12e

Browse files
committed
fix hitl cases
1 parent 00596de commit d49e12e

17 files changed

Lines changed: 499 additions & 129 deletions

File tree

apps/sim/executor/dag/construction/edges.test.ts

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,36 @@ describe('EdgeConstructor', () => {
220220
})
221221
expect(dag.nodes.get(sourceEndId)!.incomingEdges).not.toContain(sourceStartId)
222222
})
223+
224+
it('wires terminal top-level loop start exit through its own sentinel end', () => {
225+
const loopId = 'loop-1'
226+
const taskId = 'task-1'
227+
const loopStartId = `loop-${loopId}-sentinel-start`
228+
const loopEndId = `loop-${loopId}-sentinel-end`
229+
const dag = createMockDAG([loopStartId, loopEndId, taskId])
230+
dag.loopConfigs.set(loopId, { id: loopId, nodes: [taskId], iterations: 1 })
231+
const workflow = createMockWorkflow(
232+
[createMockBlock(loopId, 'loop'), createMockBlock(taskId)],
233+
[]
234+
)
235+
236+
edgeConstructor.execute(
237+
workflow,
238+
dag,
239+
new Set(),
240+
new Set([taskId]),
241+
new Set([taskId]),
242+
new Map()
243+
)
244+
245+
const loopStartTargets = Array.from(dag.nodes.get(loopStartId)!.outgoingEdges.values())
246+
expect(loopStartTargets).toContainEqual({
247+
target: loopEndId,
248+
sourceHandle: 'loop_exit',
249+
targetHandle: undefined,
250+
})
251+
expect(dag.nodes.get(loopEndId)!.incomingEdges).not.toContain(loopStartId)
252+
})
223253
})
224254

225255
describe('Condition block edge wiring', () => {
@@ -530,8 +560,9 @@ describe('EdgeConstructor', () => {
530560

531561
// Sentinel start should have edge to node in loop (it's a start node - no incoming from loop)
532562
const sentinelStartNode = dag.nodes.get(sentinelStartId)!
533-
expect(sentinelStartNode.outgoingEdges.size).toBe(1)
534-
const startEdge = Array.from(sentinelStartNode.outgoingEdges.values())[0]
563+
const startEdge = Array.from(sentinelStartNode.outgoingEdges.values()).find(
564+
(edge) => edge.target === nodeInLoopId
565+
)
535566
expect(startEdge.target).toBe(nodeInLoopId)
536567

537568
// Node in loop should have edge to sentinel end (it's a terminal node - no outgoing to loop)
@@ -589,7 +620,10 @@ describe('EdgeConstructor', () => {
589620

590621
// Sentinel start should have edges to both nodes (both are start nodes)
591622
const sentinelStartNode = dag.nodes.get(sentinelStartId)!
592-
expect(sentinelStartNode.outgoingEdges.size).toBe(2)
623+
const bodyStartEdges = Array.from(sentinelStartNode.outgoingEdges.values()).filter(
624+
(edge) => edge.target === node1Id || edge.target === node2Id
625+
)
626+
expect(bodyStartEdges).toHaveLength(2)
593627

594628
// Both nodes should have edges to sentinel end (both are terminal nodes)
595629
const node1 = dag.nodes.get(node1Id)!
@@ -791,7 +825,7 @@ describe('EdgeConstructor', () => {
791825

792826
const loop1StartNode = dag.nodes.get(loop1SentinelStart)!
793827
const earlyExitEdges = Array.from(loop1StartNode.outgoingEdges.values()).filter(
794-
(e) => e.target === loop2SentinelStart && e.sourceHandle === 'loop_exit'
828+
(e) => e.target === loop1SentinelEnd && e.sourceHandle === 'loop_exit'
795829
)
796830
expect(earlyExitEdges.length).toBeGreaterThan(0)
797831
})
@@ -953,7 +987,7 @@ describe('EdgeConstructor', () => {
953987

954988
const loopStartNode = dag.nodes.get(loopSentinelStart)!
955989
const earlyExitEdges = Array.from(loopStartNode.outgoingEdges.values()).filter(
956-
(e) => e.target === parallelSentinelStart && e.sourceHandle === 'loop_exit'
990+
(e) => e.target === loopSentinelEnd && e.sourceHandle === 'loop_exit'
957991
)
958992
expect(earlyExitEdges.length).toBeGreaterThan(0)
959993
})

apps/sim/executor/dag/construction/edges.ts

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -236,16 +236,15 @@ export class EdgeConstructor {
236236
continue
237237
}
238238

239-
let loopSentinelStartId: string | undefined
240-
241239
if (sourceIsLoopBlock) {
242240
const sentinelEndId = buildSentinelEndId(originalSource)
243-
loopSentinelStartId = buildSentinelStartId(originalSource)
241+
const loopSentinelStartId = buildSentinelStartId(originalSource)
244242
if (!dag.nodes.has(sentinelEndId) || !dag.nodes.has(loopSentinelStartId)) {
245243
continue
246244
}
247245
source = sentinelEndId
248246
sourceHandle = EDGE.LOOP_EXIT
247+
this.addSubflowStartExitBypass(dag, originalSource)
249248
}
250249

251250
if (targetIsLoopBlock) {
@@ -286,11 +285,6 @@ export class EdgeConstructor {
286285
continue
287286
}
288287

289-
const sourceLoopNodes = dag.loopConfigs.get(originalSource)?.nodes
290-
if (loopSentinelStartId && !sourceLoopNodes?.includes(originalTarget)) {
291-
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
292-
}
293-
294288
if (!this.isEdgeReachable(source, target, reachableBlocks, dag)) {
295289
continue
296290
}
@@ -326,6 +320,8 @@ export class EdgeConstructor {
326320
continue
327321
}
328322

323+
this.addSubflowStartExitBypass(dag, loopId)
324+
329325
const { startNodes, terminalNodes } = this.findLoopBoundaryNodes(nodes, dag)
330326

331327
for (const startNodeId of startNodes) {
@@ -367,6 +363,8 @@ export class EdgeConstructor {
367363
continue
368364
}
369365

366+
this.addSubflowStartExitBypass(dag, parallelId)
367+
370368
const { entryNodes, terminalNodes } = this.findParallelBoundaryNodes(nodes, dag)
371369

372370
for (const entryNodeId of entryNodes) {

apps/sim/executor/execution/edge-manager.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,10 @@ export class EdgeManager {
141141
this.nodesWithActivatedEdge = new Set(activatedNodeIds ?? [])
142142
}
143143

144+
markNodeWithActivatedEdge(nodeId: string): void {
145+
this.nodesWithActivatedEdge.add(nodeId)
146+
}
147+
144148
/**
145149
* Clear deactivated edges for a set of nodes (used when restoring loop state for next iteration).
146150
*

apps/sim/executor/execution/engine.test.ts

Lines changed: 102 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ vi.mock('@/lib/execution/cancellation', () => ({
99
}))
1010

1111
import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation'
12+
import { EDGE } from '@/executor/constants'
1213
import type { DAG, DAGNode } from '@/executor/dag/builder'
1314
import type { EdgeManager } from '@/executor/execution/edge-manager'
1415
import type { NodeExecutionOrchestrator } from '@/executor/orchestrators/node'
@@ -89,6 +90,9 @@ function createMockEdgeManager(
8990
restoreIncomingEdge: vi.fn(),
9091
clearDeactivatedEdges: vi.fn(),
9192
clearDeactivatedEdgesForNodes: vi.fn(),
93+
getDeactivatedEdges: vi.fn(() => []),
94+
getNodesWithActivatedEdge: vi.fn(() => []),
95+
markNodeWithActivatedEdge: vi.fn(),
9296
} as unknown as MockEdgeManager
9397
}
9498

@@ -130,7 +134,9 @@ describe('ExecutionEngine', () => {
130134
endNode.incomingEdges.add('start')
131135

132136
const dag = createMockDAG([startNode, endNode])
133-
const context = createMockContext()
137+
const context = createMockContext({
138+
decisions: { router: new Map(), condition: new Map() },
139+
})
134140
const edgeManager = createMockEdgeManager((node) => {
135141
if (node.id === 'start') return ['end']
136142
return []
@@ -147,7 +153,9 @@ describe('ExecutionEngine', () => {
147153
it('should mark execution as successful when completed without cancellation', async () => {
148154
const startNode = createMockNode('start', 'starter')
149155
const dag = createMockDAG([startNode])
150-
const context = createMockContext()
156+
const context = createMockContext({
157+
decisions: { router: new Map(), condition: new Map() },
158+
})
151159
const edgeManager = createMockEdgeManager()
152160
const nodeOrchestrator = createMockNodeOrchestrator()
153161

@@ -173,12 +181,36 @@ describe('ExecutionEngine', () => {
173181
const nodeOrchestrator = createMockNodeOrchestrator()
174182

175183
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
176-
const result = await engine.run()
184+
const result = await engine.run('hitl')
177185

178186
expect(result.success).toBe(true)
179187
expect(nodeOrchestrator.executionCount).toBe(0)
180188
})
181189

190+
it('marks resumed pause edge targets as activated before readiness checks', async () => {
191+
const targetNode = createMockNode('join', 'function')
192+
targetNode.incomingEdges.add('pause-block')
193+
targetNode.incomingEdges.add('condition-block')
194+
const dag = createMockDAG([targetNode])
195+
const context = createMockContext({
196+
metadata: {
197+
executionId: 'test-execution',
198+
startTime: new Date().toISOString(),
199+
pendingBlocks: [],
200+
remainingEdges: [{ source: 'pause-block', target: 'join' }],
201+
} as any,
202+
})
203+
const edgeManager = createMockEdgeManager(() => [])
204+
vi.mocked(edgeManager.isNodeReady).mockReturnValue(false)
205+
const nodeOrchestrator = createMockNodeOrchestrator()
206+
207+
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
208+
await engine.run()
209+
210+
expect(edgeManager.markNodeWithActivatedEdge).toHaveBeenCalledWith('join')
211+
expect(nodeOrchestrator.executeNode).not.toHaveBeenCalled()
212+
})
213+
182214
it('should execute all nodes in a multi-node workflow', async () => {
183215
const nodes = [
184216
createMockNode('start', 'starter'),
@@ -207,6 +239,73 @@ describe('ExecutionEngine', () => {
207239
expect(result.success).toBe(true)
208240
expect(nodeOrchestrator.executionCount).toBe(4)
209241
})
242+
243+
it('records paused block completion before returning paused result', async () => {
244+
const node = createMockNode('hitl', 'function')
245+
const dag = createMockDAG([node])
246+
const context = createMockContext({
247+
decisions: { router: new Map(), condition: new Map() },
248+
})
249+
const edgeManager = createMockEdgeManager()
250+
const nodeOrchestrator = createMockNodeOrchestrator()
251+
const pauseOutput = {
252+
response: { status: 'paused' },
253+
_pauseMetadata: {
254+
contextId: 'pause-1',
255+
blockId: 'hitl',
256+
response: { status: 'paused' },
257+
timestamp: new Date().toISOString(),
258+
pauseKind: 'hitl',
259+
},
260+
}
261+
vi.mocked(nodeOrchestrator.executeNode).mockResolvedValue({
262+
nodeId: 'hitl',
263+
output: pauseOutput,
264+
isFinalOutput: false,
265+
})
266+
267+
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
268+
const result = await engine.run('hitl')
269+
270+
expect(result.status).toBe('paused')
271+
expect(nodeOrchestrator.handleNodeCompletion).toHaveBeenCalledWith(
272+
context,
273+
'hitl',
274+
pauseOutput
275+
)
276+
})
277+
278+
it('does not stop run-until execution on parallel batch continuation', async () => {
279+
const parallelEnd = createMockNode('parallel-end', 'parallel')
280+
const nextNode = createMockNode('next', 'function')
281+
parallelEnd.outgoingEdges.set('continue', {
282+
target: 'next',
283+
sourceHandle: EDGE.PARALLEL_CONTINUE,
284+
})
285+
nextNode.incomingEdges.add('parallel-end')
286+
const dag = createMockDAG([parallelEnd, nextNode])
287+
const context = createMockContext({
288+
stopAfterBlockId: 'parallel-end',
289+
decisions: { router: new Map(), condition: new Map() },
290+
})
291+
const edgeManager = createMockEdgeManager((node) =>
292+
node.id === 'parallel-end' ? ['next'] : []
293+
)
294+
const nodeOrchestrator = createMockNodeOrchestrator()
295+
vi.mocked(nodeOrchestrator.executeNode).mockImplementation(async (_ctx, nodeId) => ({
296+
nodeId,
297+
output:
298+
nodeId === 'parallel-end'
299+
? { selectedRoute: EDGE.PARALLEL_CONTINUE }
300+
: { result: 'done' },
301+
isFinalOutput: nodeId === 'next',
302+
}))
303+
304+
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
305+
await engine.run('parallel-end')
306+
307+
expect(nodeOrchestrator.executeNode).toHaveBeenCalledWith(context, 'next')
308+
})
210309
})
211310

212311
describe('Cancellation via AbortSignal', () => {

apps/sim/executor/execution/engine.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { createLogger, type Logger } from '@sim/logger'
22
import { toError } from '@sim/utils/errors'
33
import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation'
4-
import { BlockType } from '@/executor/constants'
4+
import { BlockType, EDGE } from '@/executor/constants'
55
import type { DAG } from '@/executor/dag/builder'
66
import type { EdgeManager } from '@/executor/execution/edge-manager'
77
import { serializePauseSnapshot } from '@/executor/execution/snapshot-serializer'
@@ -303,6 +303,9 @@ export class ExecutionEngine {
303303
if (targetNode) {
304304
const hadEdge = targetNode.incomingEdges.has(edge.source)
305305
targetNode.incomingEdges.delete(edge.source)
306+
if (hadEdge) {
307+
this.edgeManager.markNodeWithActivatedEdge(targetNode.id)
308+
}
306309

307310
if (this.edgeManager.isNodeReady(targetNode)) {
308311
this.execLogger.info('Node became ready after edge removal', { nodeId: targetNode.id })
@@ -412,6 +415,8 @@ export class ExecutionEngine {
412415
}
413416

414417
if (output._pauseMetadata) {
418+
await this.nodeOrchestrator.handleNodeCompletion(this.context, nodeId, output)
419+
415420
const pauseMetadata = output._pauseMetadata
416421
this.pausedBlocks.set(pauseMetadata.contextId, pauseMetadata)
417422
this.context.metadata.status = 'paused'
@@ -439,8 +444,9 @@ export class ExecutionEngine {
439444
if (this.context.stopAfterBlockId === nodeId) {
440445
// For loop/parallel sentinels, only stop if the subflow has fully exited (all iterations done)
441446
// shouldContinue: true means more iterations, shouldExit: true means loop is done
442-
const shouldContinueLoop = output.shouldContinue === true
443-
if (!shouldContinueLoop) {
447+
const shouldContinue =
448+
output.shouldContinue === true || output.selectedRoute === EDGE.PARALLEL_CONTINUE
449+
if (!shouldContinue) {
444450
this.execLogger.info('Stopping execution after target block', { nodeId })
445451
this.stoppedEarlyFlag = true
446452
return

apps/sim/executor/orchestrators/loop.test.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,47 @@ describe('LoopOrchestrator', () => {
8181
expect(parallelStart.incomingEdges.has(loopStartId)).toBe(true)
8282
expect(parallelStart.incomingEdges.has(parallelEndId)).toBe(false)
8383
})
84+
85+
it('resolves forEach collections with the loop start sentinel scope', async () => {
86+
const loopId = 'loop-1'
87+
const dag: DAG = {
88+
nodes: new Map(),
89+
loopConfigs: new Map([
90+
[
91+
loopId,
92+
{
93+
id: loopId,
94+
nodes: ['task-1'],
95+
loopType: 'forEach',
96+
forEachItems: '<Producer.items>',
97+
},
98+
],
99+
]),
100+
parallelConfigs: new Map(),
101+
}
102+
const resolver = {
103+
resolveSingleReference: vi.fn().mockResolvedValue(['item-1']),
104+
}
105+
const orchestrator = new LoopOrchestrator(dag, createState(), resolver as any, {}, {
106+
clearDeactivatedEdgesForNodes: vi.fn(),
107+
} as unknown as EdgeManager)
108+
const ctx = {
109+
workflowId: 'workflow-1',
110+
workspaceId: 'workspace-1',
111+
executionId: 'execution-1',
112+
userId: 'user-1',
113+
loopExecutions: new Map(),
114+
blockLogs: [],
115+
metadata: {},
116+
}
117+
118+
const scope = await orchestrator.initializeLoopScope(ctx as any, loopId)
119+
120+
expect(resolver.resolveSingleReference).toHaveBeenCalledWith(
121+
expect.any(Object),
122+
'loop-loop-1-sentinel-start',
123+
'<Producer.items>'
124+
)
125+
expect(scope.maxIterations).toBe(1)
126+
})
84127
})

apps/sim/executor/orchestrators/loop.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,12 @@ export class LoopOrchestrator {
139139
}
140140
let items: any[]
141141
try {
142-
items = await resolveArrayInputAsync(ctx, loopConfig.forEachItems, this.resolver)
142+
items = await resolveArrayInputAsync(
143+
ctx,
144+
loopConfig.forEachItems,
145+
this.resolver,
146+
buildSentinelStartId(loopId)
147+
)
143148
} catch (error) {
144149
const errorMessage = `ForEach loop resolution failed: ${toError(error).message}`
145150
logger.error(errorMessage, { loopId, forEachItems: loopConfig.forEachItems })

0 commit comments

Comments
 (0)