Skip to content

Commit eb8020e

Browse files
committed
Ensure concurrent prompts are handled.
This basically implements the pending prompt queueing RFD. When a prompt request is received while the agent is already generating, we immediately forward the message to Claude and then wait until it replays the message back to us. When it replays it, we respond to the previous prompt request with "end_turn", which also singifies to the client that the message is now being processed by the agent. See agentclientprotocol/agent-client-protocol#484.
1 parent b57a429 commit eb8020e

1 file changed

Lines changed: 79 additions & 11 deletions

File tree

src/acp-agent.ts

Lines changed: 79 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ type Session = {
149149
permissionMode: PermissionMode;
150150
settingsManager: SettingsManager;
151151
configOptions: SessionConfigOption[];
152+
promptRunning: boolean;
153+
pendingMessages: Map<string, { resolve: (cancelled: boolean) => void; order: number }>;
154+
nextPendingOrder: number;
152155
};
153156

154157
type SessionHistoryEntry = {
@@ -292,6 +295,11 @@ export class ClaudeAcpAgent implements Agent {
292295
return {
293296
protocolVersion: 1,
294297
agentCapabilities: {
298+
_meta: {
299+
claudeCode: {
300+
promptQueueing: true,
301+
},
302+
},
295303
promptCapabilities: {
296304
image: true,
297305
embeddedContext: true,
@@ -611,20 +619,39 @@ export class ClaudeAcpAgent implements Agent {
611619
}
612620

613621
async prompt(params: PromptRequest): Promise<PromptResponse> {
614-
if (!this.sessions[params.sessionId]) {
622+
const session = this.sessions[params.sessionId];
623+
if (!session) {
615624
throw new Error("Session not found");
616625
}
617626

618-
this.sessions[params.sessionId].cancelled = false;
627+
session.cancelled = false;
619628

620-
const { query, input } = this.sessions[params.sessionId];
629+
const userMessage = promptToClaude(params);
621630

622-
input.push(promptToClaude(params));
631+
if (session.promptRunning) {
632+
const uuid = randomUUID();
633+
userMessage.uuid = uuid;
634+
session.input.push(userMessage);
635+
const order = session.nextPendingOrder++;
636+
const cancelled = await new Promise<boolean>((resolve) => {
637+
session.pendingMessages.set(uuid, { resolve, order });
638+
});
639+
if (cancelled) {
640+
return { stopReason: "cancelled" };
641+
}
642+
} else {
643+
session.input.push(userMessage);
644+
}
645+
646+
session.promptRunning = true;
647+
let handedOff = false;
648+
649+
try {
623650
while (true) {
624-
const { value: message, done } = await (query as AsyncGenerator<SDKMessageTemp, void>).next();
651+
const { value: message, done } = await (session.query as AsyncGenerator<SDKMessageTemp, void>).next();
625652

626653
if (done || !message) {
627-
if (this.sessions[params.sessionId].cancelled) {
654+
if (session.cancelled) {
628655
return { stopReason: "cancelled" };
629656
}
630657
break;
@@ -651,7 +678,7 @@ export class ClaudeAcpAgent implements Agent {
651678
}
652679
break;
653680
case "result": {
654-
if (this.sessions[params.sessionId].cancelled) {
681+
if (session.cancelled) {
655682
return { stopReason: "cancelled" };
656683
}
657684

@@ -704,10 +731,23 @@ export class ClaudeAcpAgent implements Agent {
704731
}
705732
case "user":
706733
case "assistant": {
707-
if (this.sessions[params.sessionId].cancelled) {
734+
if (session.cancelled) {
708735
break;
709736
}
710737

738+
// Check for queued prompt replay
739+
if (message.type === "user" && "uuid" in message && message.uuid) {
740+
const pending = session.pendingMessages.get(message.uuid as string);
741+
if (pending) {
742+
pending.resolve(false);
743+
session.pendingMessages.delete(message.uuid as string);
744+
handedOff = true;
745+
// the current loop stops with end_turn,
746+
// the loop of the next prompt continues running
747+
return { stopReason: "end_turn" };
748+
}
749+
}
750+
711751
// Slash commands like /compact can generate invalid output... doesn't match
712752
// their own docs: https://docs.anthropic.com/en/docs/claude-code/sdk/sdk-slash-commands#%2Fcompact-compact-conversation-history
713753
if (
@@ -793,14 +833,35 @@ export class ClaudeAcpAgent implements Agent {
793833
}
794834
}
795835
throw new Error("Session did not end in result");
836+
} finally {
837+
if (!handedOff) {
838+
session.promptRunning = false;
839+
// This usually should not happen, but in case the loop finishes
840+
// without claude sending all message replays, we resolve the
841+
// next pending prompt call to ensure no prompts get stuck.
842+
if (session.pendingMessages.size > 0) {
843+
const next = [...session.pendingMessages.entries()]
844+
.sort((a, b) => a[1].order - b[1].order)[0];
845+
if (next) {
846+
next[1].resolve(false);
847+
session.pendingMessages.delete(next[0]);
848+
}
849+
}
850+
}
851+
}
796852
}
797853

798854
async cancel(params: CancelNotification): Promise<void> {
799-
if (!this.sessions[params.sessionId]) {
855+
const session = this.sessions[params.sessionId];
856+
if (!session) {
800857
throw new Error("Session not found");
801858
}
802-
this.sessions[params.sessionId].cancelled = true;
803-
await this.sessions[params.sessionId].query.interrupt();
859+
session.cancelled = true;
860+
for (const [, pending] of session.pendingMessages) {
861+
pending.resolve(true);
862+
}
863+
session.pendingMessages.clear();
864+
await session.query.interrupt();
804865
}
805866

806867
async unstable_setSessionModel(
@@ -1246,6 +1307,10 @@ export class ClaudeAcpAgent implements Agent {
12461307
// here works to find zed's managed node version.
12471308
executable: process.execPath as any,
12481309
executableArgs: isStaticBinary() ? ["--cli"] : undefined,
1310+
extraArgs: {
1311+
...userProvidedOptions?.extraArgs,
1312+
"replay-user-messages": "",
1313+
},
12491314
...(process.env.CLAUDE_CODE_EXECUTABLE
12501315
? { pathToClaudeCodeExecutable: process.env.CLAUDE_CODE_EXECUTABLE }
12511316
: isStaticBinary()
@@ -1305,6 +1370,9 @@ export class ClaudeAcpAgent implements Agent {
13051370
permissionMode,
13061371
settingsManager,
13071372
configOptions: [],
1373+
promptRunning: false,
1374+
pendingMessages: new Map(),
1375+
nextPendingOrder: 0,
13081376
};
13091377

13101378
const initializationResult = await q.initializationResult();

0 commit comments

Comments
 (0)