Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

from __future__ import annotations

from typing import Any

from uipath.core.governance import GovernRequest, PolicyContext, PolicyResponse

from ..common._config import UiPathApiConfig
Expand Down Expand Up @@ -79,3 +81,33 @@ def compensate(self, request: GovernRequest) -> None:
async def compensate_async(self, request: GovernRequest) -> None:
"""Async variant of :meth:`compensate`."""
await self._service._compensate_async(request)

# ── Custom telemetry events ──────────────────────────────────────

def track_event(
self,
*,
event_name: str,
data: dict[str, Any] | None = None,
operation_id: str | None = None,
) -> None:
"""Record a custom telemetry event — delegates to ``GovernanceService``.

See :meth:`GovernanceService.track_event` for parameter semantics
— in particular, the ``operation_id`` → trace-id fallback.
"""
self._service.track_event(
event_name=event_name, data=data, operation_id=operation_id
)

async def track_event_async(
self,
*,
event_name: str,
data: dict[str, Any] | None = None,
operation_id: str | None = None,
) -> None:
"""Async variant of :meth:`track_event`."""
await self._service.track_event_async(
event_name=event_name, data=data, operation_id=operation_id
)
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
"""Service for the ``agenticgovernance_`` ingress.

Wraps the two governance backend endpoints UiPath exposes:
Wraps the three governance backend endpoints UiPath exposes:

- ``GET /{org}/agenticgovernance_/api/v1/runtime/policy`` — fetch the
tenant-managed policy pack (see :meth:`GovernanceService.retrieve_policy`).
- ``POST /{org}/agenticgovernance_/api/v1/runtime/govern`` — compensating
governance call fired when a ``guardrail_fallback`` rule matches
(see :meth:`GovernanceService.compensate`).
- ``POST /{org}/agenticgovernance_/api/v1/runtime/log`` — agent-emitted
custom telemetry events forwarded to App Insights
(see :meth:`GovernanceService.track_event`).

Org/tenant scoping is read from :class:`UiPathConfig`; auth, retries,
trace context, and error enrichment come from :class:`BaseService`.
Expand All @@ -22,7 +25,7 @@
PolicyResponse,
)

from ..common._base_service import BaseService
from ..common._base_service import BaseService, resolve_trace_id
from ..common._config import UiPathConfig
from ..common._service_url_overrides import (
inject_routing_headers,
Expand All @@ -35,18 +38,26 @@
GOVERNANCE_SERVICE_PREFIX = "agenticgovernance_"
POLICY_API_PATH = "api/v1/runtime/policy"
GOVERN_API_PATH = "api/v1/runtime/govern"
LOG_API_PATH = "api/v1/runtime/log"
AGENT_TYPE_PARAM = "agentType"

# Caller-set correlation id that becomes the App Insights ``operation_Id``
# stamped on every customEvent produced by the matching ``/runtime/log``
# request — see the spec on the platform-side ``postLogHandler``.
HEADER_OPERATION_ID = "x-uipath-operation-id"


class GovernanceService(BaseService):
"""Service for the agenticgovernance_ ingress.

Exposes two endpoints:
Exposes three endpoints:

- :meth:`retrieve_policy` — GET the tenant-managed policy pack.
- :meth:`compensate` — POST a compensating ``/runtime/govern`` call
so the server can run a disabled centralized guardrail and write
the per-rule LLMOps audit records itself.
- :meth:`track_event` — POST a custom telemetry event to
``/runtime/log`` (forwarded to App Insights ``customEvents``).

Org and tenant scoping come from :attr:`UiPathConfig.organization_id`
and :attr:`UiPathConfig.tenant_id`; the tenant travels in the
Expand Down Expand Up @@ -274,6 +285,85 @@ async def _compensate_async(self, request: GovernRequest) -> None:
payload = self._build_govern_payload(request)
await self.request_async("POST", url=url, headers=headers, json=payload)

# ── Custom telemetry events ──────────────────────────────────────

@traced(name="governance_track_event", run_type="uipath")
def track_event(
self,
*,
event_name: str,
data: dict[str, Any] | None = None,
operation_id: str | None = None,
) -> None:
"""POST a custom telemetry event to ``/runtime/log``.

The server forwards the event to App Insights as a
``customEvents`` row. Account / tenant / organization are
stamped server-side from the gateway headers and JWT.

Args:
event_name: Non-empty event name — becomes the App Insights
row ``name``. The platform redactor runs over this before
it reaches the sink.
data: Optional properties flattened into the event. Non-dict
values are dropped server-side.
operation_id: Optional correlation id forwarded as the
``x-uipath-operation-id`` header. When omitted, falls
back to :func:`resolve_trace_id` so events emitted from
the same agent trace share an ``operation_Id`` and are
queryable together in KQL. When neither is available,
the header is omitted and App Insights generates its
own id per event.

Raises:
ValueError: If ``event_name`` is empty / whitespace-only, or
if ``UiPathConfig.organization_id`` /
``UiPathConfig.tenant_id`` is not set.
EnrichedException: If the backend returns a non-2xx response.
"""
self._validate_event_name(event_name)
url, headers = self._build_org_scoped_request(LOG_API_PATH)
resolved_op_id = operation_id or resolve_trace_id()
if resolved_op_id:
headers[HEADER_OPERATION_ID] = resolved_op_id
payload: dict[str, Any] = {"eventName": event_name}
if data is not None:
payload["data"] = data
self.request("POST", url=url, headers=headers, json=payload)

@traced(name="governance_track_event", run_type="uipath")
async def track_event_async(
self,
*,
event_name: str,
data: dict[str, Any] | None = None,
operation_id: str | None = None,
) -> None:
"""Asynchronously POST a custom telemetry event to ``/runtime/log``.

See :meth:`track_event` for parameter semantics.
"""
self._validate_event_name(event_name)
url, headers = self._build_org_scoped_request(LOG_API_PATH)
resolved_op_id = operation_id or resolve_trace_id()
if resolved_op_id:
headers[HEADER_OPERATION_ID] = resolved_op_id
payload: dict[str, Any] = {"eventName": event_name}
if data is not None:
payload["data"] = data
await self.request_async("POST", url=url, headers=headers, json=payload)

@staticmethod
def _validate_event_name(event_name: str) -> None:
"""Reject empty/whitespace-only event names client-side.

The platform's ``/runtime/log`` handler rejects these with a
4xx; failing fast here gives the caller a clearer error and
avoids the round trip.
"""
if not event_name or not event_name.strip():
raise ValueError("event_name must be a non-empty string.")

# ── Internals ────────────────────────────────────────────────────

def _build_org_scoped_request(self, path: str) -> tuple[str, dict[str, str]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,39 @@ async def test_compensate_async_delegates_to_service(
requests = httpx_mock.get_requests()
assert len(requests) == 1
assert requests[0].method == "POST"

def test_track_event_delegates_to_service(
self,
httpx_mock: HTTPXMock,
provider: UiPathPlatformGovernanceProvider,
base_url: str,
) -> None:
httpx_mock.add_response(
url=f"{base_url}/{ORG_ID}/agenticgovernance_/api/v1/runtime/log",
method="POST",
status_code=204,
)

provider.track_event(event_name="ev", data={"k": "v"}, operation_id="op-1")

sent = httpx_mock.get_requests()[-1]
assert sent.method == "POST"
assert sent.headers["x-uipath-operation-id"] == "op-1"

async def test_track_event_async_delegates_to_service(
self,
httpx_mock: HTTPXMock,
provider: UiPathPlatformGovernanceProvider,
base_url: str,
) -> None:
httpx_mock.add_response(
url=f"{base_url}/{ORG_ID}/agenticgovernance_/api/v1/runtime/log",
method="POST",
status_code=204,
)

await provider.track_event_async(event_name="ev", operation_id="op-2")

sent = httpx_mock.get_requests()[-1]
assert sent.method == "POST"
assert sent.headers["x-uipath-operation-id"] == "op-2"
Loading
Loading