Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/uipath-platform/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-platform"
version = "0.1.64"
version = "0.1.65"
description = "HTTP client library for programmatic access to UiPath Platform"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
41 changes: 41 additions & 0 deletions packages/uipath-platform/src/uipath/platform/mcp_jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Framework-neutral client core for long-running UiPath jobs over MCP.

This package owns the ``uipath.com/job`` ``_meta`` contract and the executor
abstraction used to suspend/await a UiPath job that an MCP server started behind a
``tools/call``. It never imports the ``mcp`` SDK or any agent framework, so every
integration (uipath-langchain, future SDKs) can reuse it and supply only its own
:class:`McpJobExecutor`.
"""

from ._executor import (
BlockingJobExecutor,
FetchFn,
JobStatusReader,
McpJobExecutor,
StartFn,
)
from ._handle import JobStart, UiPathJobHandle
from ._meta import (
JOB_META_KEY,
JOB_PROTOCOL_VERSION,
build_fetch_meta,
build_start_meta,
read_job_handle,
read_job_version,
)

__all__ = [
"JOB_META_KEY",
"JOB_PROTOCOL_VERSION",
"BlockingJobExecutor",
"FetchFn",
"JobStart",
"JobStatusReader",
"McpJobExecutor",
"StartFn",
"UiPathJobHandle",
"build_fetch_meta",
"build_start_meta",
"read_job_handle",
"read_job_version",
]
141 changes: 141 additions & 0 deletions packages/uipath-platform/src/uipath/platform/mcp_jobs/_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""Framework-neutral execution of MCP-backed UiPath jobs.

The START ``tools/call`` and the FETCH re-call are MCP-shaped concerns owned by the
caller (they touch the ``mcp`` SDK). This module models only HOW to await the started
job — by suspending the host (a framework-specific adapter) or by polling (the neutral
:class:`BlockingJobExecutor` default).
"""

from __future__ import annotations

import asyncio
from typing import Any, Awaitable, Callable, Optional, Protocol, runtime_checkable

from ..orchestrator.job import Job, JobState
from ._handle import JobStart, UiPathJobHandle

__all__ = [
"BlockingJobExecutor",
"FetchFn",
"McpJobExecutor",
"StartFn",
]

StartFn = Callable[[], Awaitable[JobStart]]
"""Issues the START ``tools/call`` once and returns its :class:`JobStart` outcome."""

FetchFn = Callable[[UiPathJobHandle], Awaitable[Any]]
"""Re-calls the tool with the FETCH ``_meta`` for a handle; returns the job result."""

_TERMINAL_STATES = frozenset({JobState.SUCCESSFUL.value, JobState.FAULTED.value})


@runtime_checkable
class McpJobExecutor(Protocol):
"""Awaits a job-backed MCP tool call and returns its final output.

An implementation owns the START → await → FETCH lifecycle for one tool call:
it invokes ``start`` (exactly once, inside its durable boundary when it
suspends), waits for the job to finish (by suspending the host or by polling),
then returns ``await fetch(handle)``. Implementations differ only in *how* they
wait, never in the wire contract.
"""

async def run(self, *, start: StartFn, fetch: FetchFn, tool_name: str) -> Any:
"""Run one job-backed tool call to completion.

Args:
start: Issues the START ``tools/call`` once; returns a :class:`JobStart`.
fetch: Re-calls the tool with the FETCH ``_meta`` for a handle.
tool_name: The MCP tool name (for diagnostics/tracing).

Returns:
The tool's final output — the FETCH result for a job-backed call, or the
normal tool result when the call did not start a job.
"""
...


@runtime_checkable
class JobStatusReader(Protocol):
"""Minimal jobs-service shape consumed by :class:`BlockingJobExecutor`."""

async def retrieve_async(
self, job_key: str, *, folder_key: Optional[str] = None
) -> Job:
"""Retrieve the job identified by ``job_key`` in folder ``folder_key``."""
...


class BlockingJobExecutor:
"""Neutral default executor: poll the job to a terminal state, then FETCH.

This executor does **not** suspend the host. It is correct in any environment
(a CLI, an eval harness, a framework without durable interrupts): the child job
stays running while we poll, but the tool always returns the right result. Hosts
that *can* suspend should inject a framework-specific executor instead (for
example a LangGraph one that interrupts on the job and resumes when it finishes).
"""

def __init__(
self,
jobs: Optional[JobStatusReader] = None,
*,
poll_interval: float = 5.0,
timeout: Optional[float] = None,
) -> None:
"""Initialize the executor.

Args:
jobs: A jobs service exposing
``retrieve_async(job_key, *, folder_key)``. Defaults to
``UiPath().jobs`` (constructed lazily) when ``None``.
poll_interval: Seconds to wait between status polls.
timeout: Optional overall timeout in seconds; ``None`` waits
indefinitely.
"""
self._jobs = jobs
self._poll_interval = poll_interval
self._timeout = timeout

def _jobs_service(self) -> JobStatusReader:
if self._jobs is None:
from .._uipath import UiPath

self._jobs = UiPath().jobs
return self._jobs

async def run(self, *, start: StartFn, fetch: FetchFn, tool_name: str) -> Any:

Check warning on line 108 in packages/uipath-platform/src/uipath/platform/mcp_jobs/_executor.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the unused function parameter "tool_name".

See more on https://sonarcloud.io/project/issues?id=UiPath_uipath-python&issues=AZ7Jum9Lz0W_bWZeTGAK&open=AZ7Jum9Lz0W_bWZeTGAK&pullRequest=1717
"""Start the job, poll until terminal, then FETCH its result.

Args:
start: Issues the START ``tools/call`` once.
fetch: Re-calls the tool with the FETCH ``_meta`` for a handle.
tool_name: The MCP tool name (for diagnostics/tracing).

Returns:
The FETCH result for a job-backed call, or the normal tool result when
the call did not start a job.
"""
outcome = await start()
if outcome.handle is None:
return outcome.result
await self._wait_until_terminal(outcome.handle)
return await fetch(outcome.handle)

async def _wait_until_terminal(self, handle: UiPathJobHandle) -> None:
jobs = self._jobs_service()
loop = asyncio.get_event_loop()
deadline = None if self._timeout is None else loop.time() + self._timeout
Comment on lines +128 to +129
while True:
job = await jobs.retrieve_async(
handle.job_key, folder_key=handle.folder_key
)
if (job.state or "").lower() in _TERMINAL_STATES:
return
if deadline is not None and loop.time() >= deadline:
raise TimeoutError(
f"Job {handle.job_key} did not reach a terminal state "
f"within {self._timeout}s"
)
await asyncio.sleep(self._poll_interval)
44 changes: 44 additions & 0 deletions packages/uipath-platform/src/uipath/platform/mcp_jobs/_handle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Framework-neutral value objects for MCP-backed UiPath jobs."""

from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Optional

__all__ = ["JobStart", "UiPathJobHandle"]


@dataclass(frozen=True)
class UiPathJobHandle:
"""Handle to a UiPath job started behind an MCP ``tools/call``.

The server returns this in the START response ``_meta`` (under
``uipath.com/job``). It is used to drive the job to completion (suspend +
resume, or poll) and to FETCH its result with a follow-up ``tools/call``.

Attributes:
job_key: The Orchestrator job key (GUID) — also the resume-trigger
``item_key`` when the host suspends on the job.
folder_key: The key of the folder the job runs in.
"""

job_key: str
folder_key: str


@dataclass(frozen=True)
class JobStart:
"""Outcome of the START ``tools/call``.

Either the server handed back a job handle (the tool is job-backed and the
server started a job) or it returned a normal tool result (a non-job tool, or
no opt-in / an older server).

Attributes:
handle: The job handle when the call started a job, else ``None``.
result: The normalized tool result when ``handle`` is ``None``, else
``None``.
"""

handle: Optional[UiPathJobHandle]
result: Any = None
109 changes: 109 additions & 0 deletions packages/uipath-platform/src/uipath/platform/mcp_jobs/_meta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""The ``uipath.com/job`` ``_meta`` contract (framework- and MCP-SDK-neutral).

All proprietary signaling for long-running UiPath jobs over MCP rides ``_meta``
under the single key ``uipath.com/job``:

* **Advertise** (server → client, ``InitializeResult._meta``): ``{"version": N}``
— present only when the server can back tools with jobs.
* **START** (client → server, request ``params._meta``): ``{"version": N}`` — no
``key`` means "start the job and hand me a handle".
* **Handle** (server → client, result ``_meta``): ``{"key", "folderKey"}`` — the
started job.
* **FETCH** (client → server, request ``params._meta``): ``{"key", "folderKey"}``
— ``key`` present means "return the job's current status/result".

These helpers build and parse that key as plain ``dict`` / ``Mapping`` values, so
they work against any MCP SDK version (the wire ``_meta`` is a plain JSON object)
without importing ``mcp``.
"""

from __future__ import annotations

from typing import Any, Dict, Mapping, Optional

from ._handle import UiPathJobHandle

__all__ = [
"JOB_META_KEY",
"JOB_PROTOCOL_VERSION",
"build_fetch_meta",
"build_start_meta",
"read_job_handle",
"read_job_version",
]

JOB_META_KEY = "uipath.com/job"
"""Reverse-DNS ``_meta`` key under which all job signaling lives."""

JOB_PROTOCOL_VERSION = 1
"""Current ``uipath.com/job`` contract version emitted by this client."""


def build_start_meta(version: int = JOB_PROTOCOL_VERSION) -> Dict[str, Any]:
"""Build the START opt-in ``_meta`` (no ``key`` ⇒ START intent).

Args:
version: The contract version to send on the call.

Returns:
A ``_meta`` mapping to merge into ``CallToolRequest.params._meta``.
"""
return {JOB_META_KEY: {"version": version}}


def build_fetch_meta(handle: UiPathJobHandle) -> Dict[str, Any]:
"""Build the FETCH ``_meta`` for a started job (``key`` present ⇒ FETCH intent).

Args:
handle: The job handle returned by the START response.

Returns:
A ``_meta`` mapping to merge into ``CallToolRequest.params._meta``.
"""
return {JOB_META_KEY: {"key": handle.job_key, "folderKey": handle.folder_key}}


def _job_section(meta: Optional[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
"""Return the ``uipath.com/job`` sub-object of a ``_meta`` mapping, if present."""
if not meta:
return None
section = meta.get(JOB_META_KEY)
return section if isinstance(section, Mapping) else None


def read_job_handle(meta: Optional[Mapping[str, Any]]) -> Optional[UiPathJobHandle]:
"""Parse a job handle from a result's ``_meta`` mapping.

Args:
meta: The result ``_meta`` mapping (``result._meta`` / ``result.meta``).

Returns:
A :class:`UiPathJobHandle` when both ``key`` and ``folderKey`` are present
(a START response), else ``None`` (a normal result, or a version-only
opt-in echoed back).
"""
section = _job_section(meta)
if not section:
return None
key = section.get("key")
folder_key = section.get("folderKey")
if isinstance(key, str) and key and isinstance(folder_key, str) and folder_key:
return UiPathJobHandle(job_key=key, folder_key=folder_key)
return None


def read_job_version(meta: Optional[Mapping[str, Any]]) -> Optional[int]:
"""Parse the advertised / opted-in contract version from a ``_meta`` mapping.

Args:
meta: A ``_meta`` mapping (an ``InitializeResult._meta`` advertisement, or
a request opt-in).

Returns:
The integer ``version`` when present, else ``None``.
"""
section = _job_section(meta)
if not section:
return None
version = section.get("version")
return version if isinstance(version, int) else None
Comment on lines +108 to +109
Loading
Loading