Skip to content

Commit cd152ee

Browse files
committed
more cleanup
1 parent 1fd99e3 commit cd152ee

6 files changed

Lines changed: 185 additions & 7 deletions

File tree

apps/sim/executor/execution/executor.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ export class DAGExecutor {
220220
})
221221
context.subflowParentMap = this.buildSubflowParentMap(dag)
222222

223-
const engine = this.buildExecutionPipeline(context, dag, state)
223+
const engine = this.buildExecutionPipeline(context, dag, state, filteredSnapshot)
224224
return await engine.run()
225225
}
226226

@@ -303,7 +303,12 @@ export class DAGExecutor {
303303
}
304304
}
305305

306-
private buildExecutionPipeline(context: ExecutionContext, dag: DAG, state: ExecutionState) {
306+
private buildExecutionPipeline(
307+
context: ExecutionContext,
308+
dag: DAG,
309+
state: ExecutionState,
310+
snapshotState = this.contextExtensions.snapshotState
311+
) {
307312
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state, {
308313
navigatePathAsync,
309314
})
@@ -325,8 +330,8 @@ export class DAGExecutor {
325330
edgeManager
326331
)
327332
edgeManager.restoreDeactivatedEdges(
328-
this.contextExtensions.snapshotState?.deactivatedEdges,
329-
this.contextExtensions.snapshotState?.nodesWithActivatedEdge
333+
snapshotState?.deactivatedEdges,
334+
snapshotState?.nodesWithActivatedEdge
330335
)
331336
const nodeOrchestrator = new NodeExecutionOrchestrator(
332337
dag,

apps/sim/executor/execution/state.test.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,15 @@ import { describe, expect, it } from 'vitest'
55
import { ExecutionState } from '@/executor/execution/state'
66

77
describe('ExecutionState', () => {
8+
it('returns exact suffixed cached node outputs', () => {
9+
const state = new ExecutionState()
10+
state.setBlockOutput('producer₍1₎', { value: 'branch-1' })
11+
state.setBlockOutput('producer_loop1', { value: 'loop-1' })
12+
13+
expect(state.getBlockOutput('producer₍1₎')).toEqual({ value: 'branch-1' })
14+
expect(state.getBlockOutput('producer_loop1')).toEqual({ value: 'loop-1' })
15+
})
16+
817
it('prefers branch-local cloned outputs when resolving original block references', () => {
918
const state = new ExecutionState()
1019
state.setBlockOutput('producer', { value: 'branch-0' })

apps/sim/executor/execution/state.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ export interface LoopScope {
2626
condition?: string
2727
loopType?: 'for' | 'forEach' | 'while' | 'doWhile'
2828
skipFirstConditionCheck?: boolean
29+
skippedAtStart?: boolean
2930
/** Error message if loop validation failed (e.g., exceeded max iterations) */
3031
validationError?: string
3132
}
@@ -65,7 +66,7 @@ export class ExecutionState implements BlockStateController {
6566
getBlockOutput(blockId: string, currentNodeId?: string): NormalizedBlockOutput | undefined {
6667
const normalizedId = normalizeLookupId(blockId)
6768
if (normalizedId !== blockId) {
68-
return undefined
69+
return this.blockStates.get(blockId)?.output
6970
}
7071

7172
if (currentNodeId) {

apps/sim/executor/orchestrators/loop.test.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,52 @@ describe('LoopOrchestrator', () => {
124124
)
125125
expect(scope.maxIterations).toBe(1)
126126
})
127+
128+
it('exits immediately when a loop was skipped at start', async () => {
129+
const loopId = 'loop-1'
130+
const state = createState()
131+
const dag: DAG = {
132+
nodes: new Map(),
133+
loopConfigs: new Map([[loopId, { id: loopId, nodes: ['task-1'], loopType: 'while' }]]),
134+
parallelConfigs: new Map(),
135+
}
136+
const resolver = {
137+
resolveSingleReference: vi.fn().mockResolvedValue(1),
138+
}
139+
const orchestrator = new LoopOrchestrator(dag, state, resolver as any, {}, {
140+
clearDeactivatedEdgesForNodes: vi.fn(),
141+
} as unknown as EdgeManager)
142+
const ctx = {
143+
workflowId: 'workflow-1',
144+
workspaceId: 'workspace-1',
145+
executionId: 'execution-1',
146+
userId: 'user-1',
147+
loopExecutions: new Map([
148+
[
149+
loopId,
150+
{
151+
iteration: 0,
152+
currentIterationOutputs: new Map(),
153+
allIterationOutputs: [],
154+
loopType: 'while',
155+
condition: '<loop.index> > 0',
156+
skippedAtStart: true,
157+
},
158+
],
159+
]),
160+
blockLogs: [],
161+
metadata: {},
162+
}
163+
164+
const result = await orchestrator.evaluateLoopContinuation(ctx as any, loopId)
165+
166+
expect(result).toMatchObject({
167+
shouldContinue: false,
168+
shouldExit: true,
169+
selectedRoute: EDGE.LOOP_EXIT,
170+
aggregatedResults: [],
171+
})
172+
expect(resolver.resolveSingleReference).not.toHaveBeenCalled()
173+
expect(state.setBlockOutput).toHaveBeenCalledWith(loopId, { results: [] }, 0)
174+
})
127175
})

apps/sim/executor/orchestrators/loop.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,11 @@ export class LoopOrchestrator {
253253
return await this.createExitResult(ctx, loopId, scope)
254254
}
255255

256+
if (scope.skippedAtStart) {
257+
scope.skippedAtStart = false
258+
return await this.createExitResult(ctx, loopId, scope)
259+
}
260+
256261
const iterationResults: NormalizedBlockOutput[] = []
257262
for (const blockOutput of scope.currentIterationOutputs.values()) {
258263
iterationResults.push(blockOutput)
@@ -600,6 +605,7 @@ export class LoopOrchestrator {
600605
if (scope.loopType === 'forEach') {
601606
if (!scope.items || scope.items.length === 0) {
602607
logger.info('ForEach loop has empty collection, skipping loop body', { loopId })
608+
scope.skippedAtStart = true
603609
return false
604610
}
605611
return true
@@ -608,6 +614,7 @@ export class LoopOrchestrator {
608614
if (scope.loopType === 'for') {
609615
if (scope.maxIterations === 0) {
610616
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
617+
scope.skippedAtStart = true
611618
return false
612619
}
613620
return true
@@ -620,6 +627,7 @@ export class LoopOrchestrator {
620627
if (scope.loopType === 'while') {
621628
if (!scope.condition) {
622629
logger.warn('No condition defined for while loop', { loopId })
630+
scope.skippedAtStart = true
623631
return false
624632
}
625633

@@ -632,6 +640,7 @@ export class LoopOrchestrator {
632640

633641
if (!result) {
634642
logger.info('While loop initial condition is false, skipping loop body', { loopId })
643+
scope.skippedAtStart = true
635644
}
636645

637646
return result

apps/sim/executor/orchestrators/parallel.test.ts

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,17 @@
22
* @vitest-environment node
33
*/
44
import { beforeEach, describe, expect, it, vi } from 'vitest'
5-
import type { DAG } from '@/executor/dag/builder'
5+
import type { DAG, DAGNode } from '@/executor/dag/builder'
66
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
77
import { ParallelOrchestrator } from '@/executor/orchestrators/parallel'
88
import type { ExecutionContext } from '@/executor/types'
9-
import { buildBranchNodeId } from '@/executor/utils/subflow-utils'
9+
import {
10+
buildBranchNodeId,
11+
buildParallelSentinelEndId,
12+
buildParallelSentinelStartId,
13+
buildSentinelEndId,
14+
buildSentinelStartId,
15+
} from '@/executor/utils/subflow-utils'
1016

1117
const { mockCompactSubflowResults } = vi.hoisted(() => ({
1218
mockCompactSubflowResults: vi.fn(async (results: unknown) => results),
@@ -34,6 +40,24 @@ function createDag(): DAG {
3440
}
3541
}
3642

43+
function createDagNode(id: string, metadata: DAGNode['metadata'] = {}): DAGNode {
44+
return {
45+
id,
46+
block: {
47+
id,
48+
position: { x: 0, y: 0 },
49+
config: { tool: '', params: {} },
50+
inputs: {},
51+
outputs: {},
52+
metadata: { id: 'function', name: id },
53+
enabled: true,
54+
},
55+
incomingEdges: new Set(),
56+
outgoingEdges: new Map(),
57+
metadata,
58+
}
59+
}
60+
3761
function createState(): BlockStateWriter {
3862
return {
3963
setBlockOutput: vi.fn(),
@@ -422,6 +446,88 @@ describe('ParallelOrchestrator', () => {
422446
expect(dirtySet.has(secondBranchId)).toBe(true)
423447
})
424448

449+
it('marks cloned nested loop body nodes dirty for non-zero branches', () => {
450+
const dag = createDag()
451+
const parallelId = 'parallel-1'
452+
const loopId = 'loop-1'
453+
const taskId = 'task-1'
454+
const parallelStartId = buildParallelSentinelStartId(parallelId)
455+
const parallelEndId = buildParallelSentinelEndId(parallelId)
456+
const loopStartId = buildSentinelStartId(loopId)
457+
const loopEndId = buildSentinelEndId(loopId)
458+
459+
dag.parallelConfigs.set(parallelId, {
460+
id: parallelId,
461+
nodes: [loopId],
462+
count: 2,
463+
parallelType: 'count',
464+
})
465+
dag.loopConfigs.set(loopId, {
466+
id: loopId,
467+
nodes: [taskId],
468+
loopType: 'for',
469+
iterations: 1,
470+
})
471+
dag.nodes.set(parallelStartId, createDagNode(parallelStartId))
472+
dag.nodes.set(parallelEndId, createDagNode(parallelEndId))
473+
dag.nodes.set(
474+
loopStartId,
475+
createDagNode(loopStartId, {
476+
isSentinel: true,
477+
sentinelType: 'start',
478+
subflowId: loopId,
479+
subflowType: 'loop',
480+
})
481+
)
482+
dag.nodes.set(
483+
taskId,
484+
createDagNode(taskId, {
485+
isLoopNode: true,
486+
subflowId: loopId,
487+
subflowType: 'loop',
488+
originalBlockId: taskId,
489+
})
490+
)
491+
dag.nodes.set(
492+
loopEndId,
493+
createDagNode(loopEndId, {
494+
isSentinel: true,
495+
sentinelType: 'end',
496+
subflowId: loopId,
497+
subflowType: 'loop',
498+
})
499+
)
500+
dag.nodes.get(loopStartId)!.outgoingEdges.set(`${loopStartId}->${taskId}`, { target: taskId })
501+
dag.nodes.get(taskId)!.incomingEdges.add(loopStartId)
502+
dag.nodes.get(taskId)!.outgoingEdges.set(`${taskId}->${loopEndId}`, { target: loopEndId })
503+
dag.nodes.get(loopEndId)!.incomingEdges.add(taskId)
504+
505+
const dirtySet = new Set([parallelId])
506+
const orchestrator = new ParallelOrchestrator(dag, createState(), null, {})
507+
orchestrator.prepareCurrentBatch(
508+
createContext({
509+
runFromBlockContext: { startBlockId: parallelId, dirtySet },
510+
parallelExecutions: new Map([
511+
[
512+
parallelId,
513+
{
514+
parallelId,
515+
totalBranches: 2,
516+
batchSize: 2,
517+
currentBatchStart: 0,
518+
currentBatchSize: 2,
519+
branchOutputs: new Map(),
520+
},
521+
],
522+
]),
523+
}),
524+
parallelId
525+
)
526+
527+
expect([...dirtySet]).toContain(taskId)
528+
expect([...dirtySet].some((nodeId) => nodeId.startsWith(`${taskId}__clone`))).toBe(true)
529+
})
530+
425531
it('compacts accumulated outputs before scheduling later batches', async () => {
426532
const dag = createDag()
427533
const templateBranchId = buildBranchNodeId('task-1', 0)

0 commit comments

Comments
 (0)