simulation glue code#5688
Conversation
- Add RemoteSession client class for controlling agents via the lk.agent.session byte stream protocol (run, wait_for_ready, get_chat_history, get_agent_info, get_session_state) - Remove text_input_cb from SessionHost RunInput handler — always use session.run() to properly collect response items - Simulator mode: detect lk.simulator attribute, disable audio I/O - Propagate errors to RunResult
- TcpSessionTransport for local development via TCP socket - SessionHost integration for console mode with TCP audio I/O - Fix IPC: use AgentDevMessage, serialize job as proto bytes - Fix TCP event loop mismatch: defer connection to start()
- Replace rich CLI with minimal argparse-based wrapper - Add --dev flag for development mode - Add --log-format flag (json/colored) - Read LIVEKIT_AGENT_NAME from environment
- Drop tracebacks from expected warnings - Log room name instead of room SID in job metadata - Decouple log format from dev mode flag
| try: | ||
| await server.run(devmode=args.devmode, unregistered=jupyter) | ||
|
|
||
| await server.run(devmode=devmode, unregistered=False) |
There was a problem hiding this comment.
🔴 Jupyter integration broken: _run_worker lost jupyter=True special handling
The old _run_worker accepted a jupyter: bool = False parameter. When jupyter=True (used by jupyter.py), it would: (1) skip installing signal handlers, (2) pass unregistered=True to server.run(), (3) skip draining on shutdown, (4) only do loop.close() for cleanup. The new _run_worker removed the jupyter parameter entirely. Now jupyter.py at line 131 calls cli._run_worker(server, args) without any Jupyter accommodation:
- Signal handlers are installed that raise
_ExitClion SIGINT/SIGTERM, overriding Jupyter's own handlers. A second Ctrl+C callsos._exit(1), killing the kernel. unregistered=Falseis hardcoded, so the worker tries WebSocket registration instead of just simulating a job.args.devdefaults toFalse(the old code passeddevmode=True), sodrain()is called on shutdown, potentially hanging the notebook.- Full aggressive cleanup (cancel all tasks, shutdown asyncgens, shutdown default executor) runs instead of just
loop.close().
Additionally, the old CliArgs had devmode=True for Jupyter but the new CliArgs field is dev (defaulting to False), so the Jupyter path no longer gets dev-mode behavior.
Was this helpful? React with 👍 or 👎 to provide feedback.
| self._text_input_cb: TextInputCallback | None = None | ||
| self._text_stream_handler_registered = False | ||
|
|
||
| self._text_input_cb: TextInputCallback | None = None | ||
| self._chat_handler_registered = False |
There was a problem hiding this comment.
🟡 Duplicate register_text_input method definition causes dead code and duplicate handler registration
The RoomIO class defines register_text_input twice: at line 96 and again at line 349. Python uses the second definition, making the first one dead code. The first definition (line 96) registers _on_chat_text_stream with _chat_handler_registered flag, while the second (line 349) registers _on_user_text_input with _text_stream_handler_registered flag. Similarly, _text_input_cb is declared twice (lines 90 and 93). The stale first method body, _chat_handler_registered flag, and its cleanup in aclose() (lines 216-221) are all dead code. This also means _on_chat_text_stream (line 534) is never registered — if it was intended to serve as the handler, it silently goes unused.
Was this helpful? React with 👍 or 👎 to provide feedback.
…t dev log format - Log user input and agent responses at DEBUG level in agent_session - Add ChatContext.to_proto() for serializing to proto ChatContext - Dev mode logs: time-only (no date), no dash separators
| if tts_task and tts_task.done() and not tts_task.cancelled() and (exc := tts_task.exception()): | ||
| speech_handle._mark_done(error=exc) | ||
| return |
There was a problem hiding this comment.
🔴 Unreachable _mark_done(error=exc) for tts_task due to earlier raise exc in for loop
In _do_play_speech, the for loop at lines 2293-2297 iterates over (tts_task, forward_audio_task, forward_text_task) and raises the first exception it finds (raise exc). The tts_task-specific check at lines 2299-2301 that calls speech_handle._mark_done(error=exc) is therefore unreachable when tts_task has an exception — the for loop already raised it. The intent of the new code was to gracefully propagate TTS errors through the speech handle (via _mark_done) instead of raising, but the pre-existing for loop short-circuits that path. The same pattern works correctly in _generate_llm_reply_and_play_speech (lines 2641-2647) because there is no preceding for-loop raising the exception.
Prompt for agents
In _do_play_speech, lines 2293-2297 contain a for loop that raises exceptions from tts_task/forward_audio_task/forward_text_task. This pre-empts the new graceful error propagation via _mark_done at lines 2299-2301. The fix should either: (a) remove or refactor the for loop at 2293-2297 to use _mark_done instead of raise, similar to how _generate_llm_reply_and_play_speech handles it at lines 2641-2647; or (b) remove the dead code at 2299-2301. The intent appears to be (a), since the parallel method _generate_llm_reply_and_play_speech uses the _mark_done pattern without a preceding for-loop raise.
Was this helpful? React with 👍 or 👎 to provide feedback.
| if c._tcp_transport is not None: | ||
| self._session_host = SessionHost( | ||
| c._tcp_transport, | ||
| audio_input=c._tcp_audio_input, | ||
| audio_output=c._tcp_audio_output, | ||
| ) | ||
| self._session_host.register_session(self) |
There was a problem hiding this comment.
🔴 TCP console SessionHost never started — transport connection never established
In the TCP console code path, a TcpSessionTransport is created in _run_tcp_console (cli/cli.py:215) but start() is never called on it. When AgentSession.start() creates a SessionHost using this transport (agent_session.py:692-698), neither SessionHost.start() nor transport.start() is invoked. Because TcpSessionTransport.start() is what opens the TCP connection (setting _reader and _writer), both remain None. This means send_message silently no-ops (remote_session.py:177: if self._closed or self._writer is None: return), and the _recv_loop never runs. As a result, the SessionHost cannot receive any messages — including audio input frames dispatched via _dispatch_transport_message to TcpAudioInput.push_frame() — making the entire TCP console mode non-functional.
Was this helpful? React with 👍 or 👎 to provide feedback.
- simulation.py: proto-backed Scenario/ScenarioGroup, load_scenarios, SimulationContext - rtc_session on_simulation_end; JobContext.simulation_context() from dispatch metadata - thread simulation_end_fnc through ipc; SessionHost SimulationFinalize handler - hotel example scenarios.yaml + on_simulation_end (db vs target_state)
# Conflicts: # livekit-agents/livekit/agents/cli/cli.py # livekit-agents/livekit/agents/voice/__init__.py # livekit-agents/livekit/agents/voice/agent_activity.py # livekit-agents/livekit/agents/voice/audio_recognition.py # livekit-agents/livekit/agents/voice/remote_session.py # livekit-agents/livekit/agents/worker.py # livekit-agents/pyproject.toml # uv.lock
- cli.run_app delegates to the deprecated rich CLI (cli/_legacy.py) - new thin start/console(TCP) interface lives in livekit.agents.__main__ - restore worker.py unregistered/agent-name/aclose handling the thin CLI needs - keep rich-CLI deps (typer/sounddevice/watchfiles) required until it's removed
- room_io: remove duplicate register_text_input left by an earlier main merge (keep main's _on_chat_text_stream path; the branch dup was shadowing it) - type-check now green: cast tcp_console __anext__ return, drop unused ignores - tcp_console: raise StopAsyncIteration from None (B904) - test_ipc: pass simulation_end_fnc to ProcPool/ProcJobExecutor (branch arg) - add unit category markers to branch-only test modules (unblocks --unit collection) - test_remote_session: run_input -> run (method was renamed)
- test_remote_session: fix mock AgentSessionOptions (preemptive_generation is dict-like, not bool) and give FakeRunResult an event so handlers succeed - test_run_input_errors: wrap conn_options in SessionConnectOptions(llm_conn_options=...) and call RemoteSession.run (renamed from run_input) - xfail the two direct session.run() error-propagation tests: LLM errors aren't surfaced to RunResult yet (the e2e SessionHost path does propagate them)
When the LLM task fails (and was not interrupted/cancelled), record the exception on the SpeechHandle (_maybe_run_final_output) so RunResult raises it on await, instead of swallowing it in the speech-generation path. Also retrieves the task exception (no 'never retrieved' warning). Removes the xfail on the two error-propagation tests and asserts APIStatusError.
There was a problem hiding this comment.
🚩 Removal of _disable_vad_interruption_soon simplifies backchannel boundary handling
The old _disable_vad_interruption_soon method delayed disabling VAD interruption until the backchannel boundary expired, allowing brief utterances (like 'uh-huh') to not interrupt the agent. The new code immediately sets self._interruption_by_audio_activity_enabled = False without any backchannel boundary check. This changes the interruption behavior: previously, users could produce short backchannel utterances during the boundary window without triggering interruption through VAD. Now, VAD-based interruption is always disabled immediately when the agent starts speaking. The adaptive interruption detector (_interruption_detection) still handles the intelligent interruption decisions, so this may be intentional if backchannel handling is now delegated entirely to the adaptive detector.
(Refers to line 2241)
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
🚩 Duplicate tts_task error check is unreachable dead code
In _do_play_speech, lines 2294-2297 iterate over (tts_task, forward_audio_task, forward_text_task) and raise exc if any has an exception. Lines 2299-2301 then check tts_task again with speech_handle._mark_done(error=exc) + return. If tts_task has an exception, the for-loop raises it first (line 2297), so lines 2299-2301 are unreachable for that case. The two blocks also have different semantics (raise vs _mark_done+return). The intent appears to be that the _mark_done pattern is the new error-propagation approach for this PR, but the pre-existing raise in the loop fires first, making the new code dead. This same pattern appears in the _do_generate_reply method at lines 2641-2647 where the new _mark_done checks come after wait_if_not_interrupted but before the for-loop raise check, so they ARE reachable there.
(Refers to lines 2299-2301)
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
🚩 VAD/STT disabled when audio input is not enabled, even if models are configured
At agent_activity.py:789-793, the new code passes stt=None when not self._session.input.audio_enabled and vad=None when audio is not enabled. Previously, STT and VAD were passed based solely on whether models were configured. This means if a session starts with audio disabled but later enables it, the AudioRecognition was initialized without STT/VAD and would need to be updated. The update_stt and update_vad methods exist for this purpose, but it's unclear if the audio-enabled callback path triggers them. This could be intentional for the simulator/TCP console modes that don't use audio.
(Refers to lines 789-793)
Was this helpful? React with 👍 or 👎 to provide feedback.
| log_level="DEBUG", url=ws_url, api_key=api_key, api_secret=api_secret | ||
| ) | ||
| cli._run_worker(server, args, jupyter=True) | ||
| cli._run_worker(server, args) |
There was a problem hiding this comment.
🚩 Jupyter path no longer has special cleanup handling
The old _run_worker accepted a jupyter=True parameter that provided Jupyter-specific behavior: (1) it didn't install signal handlers (if not jupyter), (2) it called loop.close() separately since it must be called from the main thread, and (3) it used _run_worker(server, args, jupyter=True). The new _run_worker at livekit-agents/livekit/agents/cli/cli.py:281 removes all Jupyter-specific handling. The call at livekit-agents/livekit/agents/jupyter.py:133 is now cli._run_worker(server, args) without jupyter=True. This means: (a) signal handlers are now installed in Jupyter (which may cause issues), (b) on second Ctrl+C, os._exit(1) is called instead of graceful cleanup, (c) the final cleanup phase cancels all tasks and calls loop.close() which previously was skipped in Jupyter. This behavioral change may cause issues in Jupyter environments where signal handling works differently.
Was this helpful? React with 👍 or 👎 to provide feedback.
| items_list: list[agent_pb.ChatContext.ChatItem] = [] | ||
| error: str | None = None | ||
| text = req.run_input.text | ||
| if text: | ||
| if self._text_input_cb is not None: | ||
| from .room_io.types import TextInputEvent | ||
|
|
||
| cb_result = self._text_input_cb( | ||
| self._session, | ||
| TextInputEvent(text=text, info=None, participant=None), | ||
| ) | ||
| if asyncio.iscoroutine(cb_result): | ||
| await cb_result | ||
| else: | ||
| try: | ||
| await self._session.interrupt(force=True) | ||
| except RuntimeError: | ||
| pass | ||
|
|
||
| try: | ||
| result: RunResult[None] = self._session.run(user_input=text) | ||
| await result | ||
| items_list = [_chat_item_to_proto(ev.item) for ev in result.events] | ||
| except Exception as e: | ||
| error = str(e) | ||
| if not text: | ||
| error = "empty run_input text" | ||
| else: | ||
| try: | ||
| await self._session.interrupt(force=True) | ||
| except RuntimeError: | ||
| pass | ||
|
|
||
| try: | ||
| result: RunResult[None] = self._session.run(user_input=text) | ||
| await result | ||
| items_list = [_chat_item_to_proto(ev.item) for ev in result.events] | ||
| except Exception as e: | ||
| error = str(e) | ||
|
|
||
| if not items_list and not error: | ||
| error = "agent produced no response items" |
There was a problem hiding this comment.
🚩 SessionHost run_input no longer delegates to text_input_cb
The old run_input handler in remote_session.py checked for self._text_input_cb (registered by RoomIO) and delegated text input handling to it. The new code removes _text_input_cb entirely and always calls self._session.interrupt(force=True) followed by self._session.run(user_input=text) directly. The register_text_input method was also removed from SessionHost. This simplifies the code path but changes behavior: previously, custom text input callbacks (set via RoomIO.register_text_input) would be invoked for remote text input; now they're bypassed for the SessionHost path. This only affects the RoomSessionTransport path (not TCP console), since the TCP console path creates the SessionHost separately from RoomIO.
(Refers to lines 638-658)
Was this helpful? React with 👍 or 👎 to provide feedback.
| self._text_input_cb: TextInputCallback | None = None | ||
| self._chat_handler_registered = False | ||
|
|
There was a problem hiding this comment.
🚩 Duplicate register_text_input method in RoomIO creates dead code
Two register_text_input methods are defined in RoomIO: the first at line 96 registers _on_chat_text_stream with _chat_handler_registered, and the second at line 349 registers _on_user_text_input with _text_stream_handler_registered. Python uses the last definition, so the first method is dead code. The _text_input_cb attribute is also initialized twice at lines 90 and 93 in __init__. The aclose() method at lines 216-228 tries to clean up both _chat_handler_registered and _text_stream_handler_registered, but _chat_handler_registered is always False since the shadowed method is never called. This doesn't cause a runtime error but the first method definition and associated _on_chat_text_stream handler should be removed.
Was this helpful? React with 👍 or 👎 to provide feedback.
…ormat The merged simulation feature imports livekit.protocol.agent_simulation (and FinalizeSimulation/SimulationVerdict messages), which only exist in livekit-protocol 1.1.14 — CI installed 1.1.13, so conftest's import failed and cascaded to every unit/plugin/example/type-check job. - bump livekit-protocol pin to >=1.1.14 and re-lock - ruff: sort imports + format (simulation files, tests, examples)
aclose is untyped in the released livekit-api wheel CI installs (no-untyped-call), so the type: ignore is required. It only looked unused locally because the dev build links the typed livekit-api source.
| recoverable=True, | ||
| ) | ||
|
|
||
| # response errors are handled by _handle_response_done via _done_fut. |
There was a problem hiding this comment.
🚩 Realtime model: removal of error→response_created_futures propagation
The old _handle_error at realtime_model.py:2103-2108 would pop a pending _response_created_futures entry matching the error's event_id and set an exception on it. This handled the case where OpenAI sends an error event instead of (or before) response.created. The new code removes this entirely, with a comment that response errors are handled by _handle_response_done via _done_fut. This assumes that response.done with a failed/incomplete status always fires for response-related errors. If OpenAI ever sends an error event WITHOUT a subsequent response.done, the _response_created_futures future would never resolve, potentially causing a hang in generate(). This is likely safe given OpenAI's current API behavior but represents a reduced safety net.
Was this helpful? React with 👍 or 👎 to provide feedback.
| if not items_list and not error: | ||
| error = "agent produced no response items" |
There was a problem hiding this comment.
🚩 run_input handler now returns error for empty response instead of silently succeeding
The SessionHost's run_input handler at remote_session.py:656-657 adds a check if not items_list and not error: error = 'agent produced no response items'. This means that a successful run that produces no conversation items (e.g., the agent decided not to respond) will now be treated as an error by the remote session caller. This might be too strict — there are legitimate scenarios where an agent produces no response items (e.g., silent tool execution, or the agent intentionally choosing not to reply).
Was this helpful? React with 👍 or 👎 to provide feedback.
| # response errors are handled by _handle_response_done via _done_fut. | ||
| # error events here are for non-response errors (e.g. invalid request). |
There was a problem hiding this comment.
🟡 Realtime model: removed _response_created_futures rejection on error events causes 10s hangs
The old _handle_error method resolved _response_created_futures immediately when an error event referenced the originating event_id (e.g., a response.create that triggered an invalid_request_error). The new code removed this logic with the comment "response errors are handled by _handle_response_done via _done_fut." However, _response_created_futures and _done_fut are separate mechanisms: _done_fut lives on _ResponseGeneration which is only created upon response.created. If the error occurs before response.created (e.g., rate limit or invalid tool config), no _ResponseGeneration exists and _handle_response_done never fires. The future in _response_created_futures will hang until its 10-second timeout (realtime_model.py:1588) instead of failing immediately.
| # response errors are handled by _handle_response_done via _done_fut. | |
| # error events here are for non-response errors (e.g. invalid request). | |
| # response errors are handled by _handle_response_done via _done_fut. | |
| # error events here are for non-response errors (e.g. invalid request). | |
| # If this error references a pending response.create, reject its future | |
| # immediately so generate_reply doesn't hang until the 10s timeout. | |
| if (event_id := event.error.event_id) and ( | |
| fut := self._response_created_futures.pop(event_id, None) | |
| ): | |
| if not fut.done(): | |
| fut.set_exception(llm.RealtimeError(event.error.message)) |
Was this helpful? React with 👍 or 👎 to provide feedback.
…op entrypoint priming (#6018)
| if event.response.status in ("failed", "incomplete"): | ||
| details = event.response.status_details | ||
| msg = f"response {event.response.status}" | ||
| if details and details.error: | ||
| msg = f"{msg}: [{details.error.type}] {details.error.code}" | ||
| elif details and details.reason: | ||
| msg = f"{msg}: {details.reason}" | ||
| self._current_generation._done_fut.set_exception(llm.RealtimeError(msg)) | ||
| else: | ||
| self._current_generation._done_fut.set_result(None) |
There was a problem hiding this comment.
🚩 Realtime model now raises RealtimeError for 'incomplete' response status
The old _handle_response_done called _done_fut.set_result(None) for ALL response statuses. The new code at realtime_model.py:1994-2003 sets _done_fut.set_exception(llm.RealtimeError(...)) for both failed AND incomplete statuses. In OpenAI's Realtime API, incomplete occurs for normal scenarios like hitting max_output_tokens or content filtering — not just errors. Converting these to exceptions changes the contract for any code awaiting _done_fut. If downstream consumers treat a _done_fut exception as a hard failure, partial responses that previously succeeded silently would now error. The cancelled status (user interruptions) is handled separately and is unaffected. This appears intentional but is a semantic change worth verifying against all consumers of _ResponseGeneration._done_fut.
Was this helpful? React with 👍 or 👎 to provide feedback.
…unused run_id/job_id + yaml helpers
…call_soon_threadsafe
…beats LIVEKIT_AGENT_NAME default (#6022)
No description provided.