-
Notifications
You must be signed in to change notification settings - Fork 28
fix: conversational events - client side tool event responses should be queued #1760
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
53303a4
bd51afa
09e4444
37ff62c
fd04b7e
b45bf33
eac2553
f896222
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ | |
| import json | ||
| import logging | ||
| import os | ||
| from collections import deque | ||
| from typing import Any | ||
| from urllib.parse import urlparse | ||
|
|
||
|
|
@@ -25,6 +26,14 @@ | |
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| # Type for tool call resume values (confirmToolCall or endToolCall payloads) | ||
| ToolResumeValue = ( | ||
| UiPathConversationToolCallConfirmationEvent | UiPathConversationToolCallEndEvent | ||
| ) | ||
|
|
||
| # Wrapper that pairs a resume value with its tool_call_id for keyed matching | ||
| ToolResumeItem = dict[str, Any] # {"tool_call_id": str, "value": ToolResumeValue} | ||
|
|
||
|
|
||
| class CASErrorId: | ||
| """Error IDs for the Conversational Agent Service (CAS), matching the Temporal backend.""" | ||
|
|
@@ -129,12 +138,29 @@ def __init__( | |
| self._client: Any | None = None | ||
| self._connected_event = asyncio.Event() | ||
|
|
||
| self._tool_resume_event = asyncio.Event() | ||
| self._tool_resume_value: ( | ||
| UiPathConversationToolCallConfirmationEvent | ||
| | UiPathConversationToolCallEndEvent | ||
| | None | ||
| ) = None | ||
| # --- Tool call resume state --- | ||
| # When the LLM invokes multiple tools in one turn, the client can send | ||
| # back confirmToolCall / endToolCall responses concurrently and in any | ||
| # order. Three data structures coordinate matching each response to the | ||
| # correct wait_for_resume() call: | ||
| # | ||
| # 1. _expected_tool_call_ids (deque): | ||
| # Ordered queue of tool_call_ids populated by emit_interrupt_event() | ||
| # (called by the runtime BEFORE each wait_for_resume()). Tells | ||
| # wait_for_resume() WHICH tool_call_id it should consume next. | ||
| # | ||
| # 2. _tool_resume_results (dict): | ||
| # Responses that arrived BEFORE wait_for_resume() was called for that | ||
| # tool_call_id. When wait_for_resume() runs, it checks here first | ||
| # and returns immediately if a match exists — no blocking needed. | ||
| # | ||
| # 3. _tool_resume_pending (dict of Futures): | ||
| # Created by wait_for_resume() when the response hasn't arrived yet. | ||
| # When the response later arrives in _handle_conversation_event, the | ||
| # Future is resolved and wait_for_resume() unblocks. | ||
| self._tool_resume_results: dict[str, ToolResumeItem] = {} | ||
| self._tool_resume_pending: dict[str, asyncio.Future[ToolResumeItem]] = {} | ||
| self._expected_tool_call_ids: deque[str] = deque() | ||
| self._current_message_id: str | None = None | ||
|
|
||
| # Set CAS_WEBSOCKET_DISABLED when using the debugger to prevent websocket errors from | ||
|
|
@@ -376,14 +402,19 @@ async def emit_exchange_error_event(self, error: Exception) -> None: | |
| raise RuntimeError(f"Failed to send exchange error event: {e}") from e | ||
|
|
||
| async def emit_interrupt_event(self, resume_trigger: UiPathResumeTrigger): | ||
| """No-op. | ||
| """Register the trigger's tool_call_id for the upcoming wait_for_resume(). | ||
|
|
||
| Tool confirmation is handled end-to-end via ``startToolCall`` with | ||
| ``requireConfirmation: true`` paired with ``wait_for_resume()``. | ||
| executingToolCall is emitted by the MessageMapper (non-confirmed | ||
| tools) and the runtime loop post-confirmation (confirmed tools). | ||
| Does not emit any websocket event — tool confirmation and execution | ||
| events are handled elsewhere. The runtime calls this immediately | ||
| before wait_for_resume() for each trigger, so we record the | ||
| tool_call_id here so wait_for_resume() knows which response to match. | ||
| """ | ||
| return None | ||
| if resume_trigger.api_resume and isinstance( | ||
| resume_trigger.api_resume.request, dict | ||
| ): | ||
| tool_call_id = resume_trigger.api_resume.request.get("tool_call_id") | ||
| if isinstance(tool_call_id, str) and tool_call_id: | ||
| self._expected_tool_call_ids.append(tool_call_id) | ||
|
|
||
| async def emit_executing_tool_call_event( | ||
| self, | ||
|
|
@@ -410,21 +441,77 @@ async def emit_executing_tool_call_event( | |
| await self.emit_message_event(executing_event) | ||
|
|
||
| async def wait_for_resume(self) -> dict[str, Any]: | ||
| """Wait for a tool resume event (confirmToolCall or endToolCall) to be received.""" | ||
| if self._tool_resume_value is None: | ||
| self._tool_resume_event.clear() | ||
| await self._tool_resume_event.wait() | ||
| """Wait for a tool resume event (confirmToolCall or endToolCall). | ||
|
|
||
| Pops the next expected tool_call_id (registered by emit_interrupt_event) | ||
| and returns the matching response. Two cases: | ||
|
|
||
| 1. Response already arrived (stored in _tool_resume_results) — return | ||
| immediately without blocking. | ||
| 2. Response hasn't arrived yet — create a Future in _tool_resume_pending, | ||
| block until _handle_conversation_event resolves it. | ||
|
|
||
| Returns: | ||
| The resume data dict, including ``tool_call_id``. | ||
| """ | ||
|
Comment on lines
+444
to
+456
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! Thanks for the detailed flow + demoes. Two questions:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good questions.
for trigger in api_triggers:
emit_interrupt_event(trigger) # "I'm about to wait for toolcall-3"
resume_data = wait_for_resume() # "give me toolcall-3's result"
resume_map[trigger.interrupt_id] = resume_dataEach call must return the result for that specific trigger. Without the deque, One option was to pass
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense, thanks! |
||
| if not self._expected_tool_call_ids: | ||
| raise RuntimeError( | ||
| "wait_for_resume() called but no tool_call_id was registered " | ||
| "by emit_interrupt_event(). This indicates a caller/protocol mismatch." | ||
| ) | ||
|
|
||
| expected_id = self._expected_tool_call_ids.popleft() | ||
|
|
||
| value = self._tool_resume_value | ||
| self._tool_resume_value = None | ||
| self._tool_resume_event.clear() | ||
| if expected_id in self._tool_resume_results: | ||
| # Response arrived before we got here — return it immediately | ||
| item = self._tool_resume_results.pop(expected_id) | ||
| else: | ||
| # Response hasn't arrived yet — wait for it | ||
| future: asyncio.Future[ToolResumeItem] = ( | ||
| asyncio.get_running_loop().create_future() | ||
| ) | ||
| self._tool_resume_pending[expected_id] = future | ||
| item = await future | ||
|
norman-le marked this conversation as resolved.
|
||
|
|
||
|
norman-le marked this conversation as resolved.
|
||
| value = item["value"] | ||
| result = value.model_dump(mode="python", by_alias=False) | ||
| result["tool_call_id"] = item["tool_call_id"] | ||
| return result | ||
|
|
||
| def _resolve_or_store_resume( | ||
| self, tool_call_id: str, value: ToolResumeValue | ||
| ) -> None: | ||
| """Route an incoming confirmToolCall/endToolCall to the correct consumer. | ||
|
|
||
| """For the case where there's no tool confirmation and the client side tool sends endToolCall back before wait_for_resume is called. | ||
| Unlikely in practice, but possible in theory, since executingToolCall is emitted during the streaming. | ||
| Called from _handle_conversation_event when a tool resume response | ||
| arrives from the client. Two cases: | ||
|
|
||
| 1. wait_for_resume() is already waiting (Future in _tool_resume_pending) | ||
| — resolve the Future so it unblocks immediately. | ||
| 2. wait_for_resume() hasn't been called yet for this tool_call_id | ||
| — store in _tool_resume_results so it's found instantly when | ||
| wait_for_resume() runs later. | ||
| """ | ||
| if value: | ||
| return value.model_dump(mode="python", by_alias=False) | ||
| return {} | ||
| item: ToolResumeItem = {"tool_call_id": tool_call_id, "value": value} | ||
| if tool_call_id in self._tool_resume_pending: | ||
| future = self._tool_resume_pending.pop(tool_call_id) | ||
| if not future.done(): | ||
| future.set_result(item) | ||
| else: | ||
| # Future was cancelled or already resolved — store the payload | ||
| # so a subsequent wait_for_resume() can still find it. | ||
| logger.warning( | ||
| f"Resume for tool_call_id={tool_call_id} — " | ||
| "future already done, storing as fallback." | ||
| ) | ||
| self._tool_resume_results[tool_call_id] = item | ||
| else: | ||
| if tool_call_id in self._tool_resume_results: | ||
| logger.warning( | ||
| f"Duplicate resume for tool_call_id={tool_call_id} — " | ||
| "overwriting previously stored result." | ||
| ) | ||
| self._tool_resume_results[tool_call_id] = item | ||
|
|
||
| @property | ||
| def is_connected(self) -> bool: | ||
|
|
@@ -461,11 +548,9 @@ async def _handle_conversation_event( | |
| and (tool_call := parsed_event.exchange.message.tool_call) | ||
| ): | ||
| if confirm := tool_call.confirm: | ||
| self._tool_resume_value = confirm | ||
| self._tool_resume_event.set() | ||
| self._resolve_or_store_resume(tool_call.tool_call_id, confirm) | ||
| elif end := tool_call.end: | ||
| self._tool_resume_value = end | ||
| self._tool_resume_event.set() | ||
| self._resolve_or_store_resume(tool_call.tool_call_id, end) | ||
| except Exception as e: | ||
| logger.warning(f"Error parsing conversation event: {e}") | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.