Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 119 additions & 6 deletions src/uipath/runtime/governance/_audit/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
from abc import ABC, abstractmethod
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from typing import Any
from typing import Any, Callable

from uipath.core.governance import EnforcementMode

from .metadata import GovernanceRuntimeMetadata

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -244,6 +246,9 @@ def __init__(
async_mode: bool = True,
queue_maxsize: int = _DEFAULT_QUEUE_MAXSIZE,
register_default_sinks: bool = True,
*,
track_event: Callable[..., None] | None = None,
runtime_metadata: GovernanceRuntimeMetadata | None = None,
) -> None:
"""Initialize the audit manager.

Expand All @@ -252,10 +257,26 @@ def __init__(
thread. If False, events are processed synchronously.
queue_maxsize: Max queued events in async mode. On overflow the
oldest queued event is dropped to make room.
register_default_sinks: If True (default), register the platform
mandated ``traces`` sink and an atexit cleanup
handler. Tests that want a bare manager can pass
``False`` and register sinks explicitly.
register_default_sinks: If True (default), register the
platform-mandated ``traces`` and ``track_events``
sinks and an atexit cleanup handler. Tests that
want a bare manager can pass ``False`` and
register sinks explicitly.
track_event: Platform-supplied telemetry callable matching
``UiPathPlatformGovernanceProvider.track_event``.
The host is expected to wire this in production
deployments. When ``None`` (or any sink-
construction error), the ``track_events`` sink
is skipped and a warning is logged via the
registration helper's try/except — the runtime
continues so a wiring bug never breaks the
agent run.
runtime_metadata: Constants stamped on every telemetry event
(execution engine, agent type, agent framework,
runtime version). Defaults to
:class:`GovernanceRuntimeMetadata` () — auto-resolved
version + ``unknown`` agent type / framework.
The host overrides with concrete values.
"""
self._sinks: list[AuditSink] = []
# Single lock guards _sinks, _sink_failures, _tripped_sinks — every
Expand All @@ -277,6 +298,7 @@ def __init__(

if register_default_sinks:
self._register_traces_sink()
self._register_track_event_sink(track_event, runtime_metadata)
# Process-level atexit (one shared handler, weakref-tracked
# set) instead of per-instance ``atexit.register(self.method)``:
# avoids unbounded atexit list growth and the strong reference
Expand All @@ -299,6 +321,49 @@ def _register_traces_sink(self) -> None:
self.register_sink(sink)
logger.info("Governance audit sink registered: traces")

def _register_track_event_sink(
self,
track_event: Callable[..., None] | None,
runtime_metadata: GovernanceRuntimeMetadata | None,
) -> None:
"""Register the platform-mandated ``track_events`` sink.

Mirrors the shape of :meth:`_register_traces_sink`: deferred
import, construct, register, log. ``track_events`` is part of
the standard platform audit surface (App Insights
``customEvents``) and is expected to be wired by the host's
platform layer.

Wrapped in a broad ``except`` so a misconfigured wiring layer
(missing callable, sink construction error) never crashes the
agent — the runtime logs and proceeds without the sink. The
sink-level circuit breaker handles per-emit failures
separately.
"""
try:
from .metadata import GovernanceRuntimeMetadata as _Metadata
from .track_events import TrackEventAuditSink

if track_event is None:
raise ValueError(
"Platform-mandated track_event callable was not supplied. "
"The host wiring layer must pass "
"UiPathPlatformGovernanceProvider.track_event to "
"AuditManager(...)."
)
meta = runtime_metadata if runtime_metadata is not None else _Metadata()
sink = TrackEventAuditSink(track_event, meta)
self.register_sink(sink)
logger.info("Governance audit sink registered: track_events")
except Exception as exc: # noqa: BLE001 - registration must not crash the agent
# ``str(exc)`` instead of passing ``exc`` directly: the
# logging LogRecord retains its ``args`` tuple until the
# handler formats the record, and a raw exception there
# carries its ``__traceback__`` → frame chain → ``self``,
# which would keep the AuditManager alive in any
# log-capturing test. Stringifying breaks that ref.
logger.warning("Failed to register track_events sink: %s", str(exc))

def _start_worker(self) -> None:
"""Start the background worker thread."""
if self._worker_thread is not None and self._worker_thread.is_alive():
Expand Down Expand Up @@ -534,13 +599,20 @@ def emit_rule_evaluation(
agent_name: str = "agent",
trace_id: str = "",
description: str = "",
duration_ms: float = 0.0,
mapped_to_uipath: bool = False,
) -> None:
"""Convenience method to emit a rule evaluation event.

``enforcement_mode`` travels on the event so sinks don't have to
read a process-global. With instance-scoped runtimes the global
wouldn't be authoritative anyway — parallel runtimes can run in
different modes simultaneously.

``duration_ms`` and ``mapped_to_uipath`` are stamped on the
event's data dict for telemetry sinks (App Insights customEvents
via :class:`TrackEventAuditSink`); existing OTel-trace sinks
ignore them.
"""
self.emit(
AuditEvent(
Expand All @@ -558,6 +630,8 @@ def emit_rule_evaluation(
"detail": detail,
"description": description,
"status": "MATCHED" if matched else "PASS",
"duration_ms": duration_ms,
"mapped_to_uipath": mapped_to_uipath,
},
)
)
Expand All @@ -571,8 +645,41 @@ def emit_hook_summary(
final_action: str,
enforcement_mode: EnforcementMode,
trace_id: str = "",
duration_ms: float = 0.0,
skipped_policy_names: list[str] | None = None,
guardrail_dispatched_count: int = 0,
denied_count: int | None = None,
) -> None:
"""Convenience method to emit a hook summary event."""
"""Convenience method to emit a hook summary event.

``matched_rules`` keeps its historical meaning — any rule whose
checks matched, regardless of the configured action — for
backward compatibility with existing sinks. The newer
``denied_count`` captures only the rules the evaluator actually
wanted to act on (matched **and** configured action ≠
``allow``). A matched rule whose action is ``allow`` is a
positive informational match and is folded into
``passed_count``, not ``denied_count``.

Args:
duration_ms: Total wall time spent evaluating this hook.
skipped_policy_names: Rules in the pack that were not
evaluated (currently disabled). The summary carries
their ids so operators can spot which policies a tenant
turned off.
guardrail_dispatched_count: How many UiPath-mapped
guardrail-fallback rules fired for this hook (POSTed
to ``/runtime/govern``). Lets dashboards compute the
native-vs-server-dispatched ratio.
denied_count: Rules that matched **and** would have acted
(action ∈ {``deny``, ``escalate``, ``audit``}). When
``None`` (legacy callers), falls back to
``matched_rules`` so the old "matched == denial"
semantic is preserved.
"""
skipped = list(skipped_policy_names or [])
actual_denied = denied_count if denied_count is not None else matched_rules
passed_count = max(total_rules - actual_denied, 0)
self.emit(
AuditEvent(
event_type=EventType.HOOK_END,
Expand All @@ -584,6 +691,12 @@ def emit_hook_summary(
"matched_rules": matched_rules,
"final_action": final_action,
"enforcement_mode": enforcement_mode,
"duration_ms": duration_ms,
"passed_count": passed_count,
"denied_count": actual_denied,
"skipped_count": len(skipped),
"skipped_policy_names": skipped,
"guardrail_dispatched_count": guardrail_dispatched_count,
},
)
)
Expand Down
69 changes: 69 additions & 0 deletions src/uipath/runtime/governance/_audit/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Per-runtime metadata stamped on every governance telemetry event.

The host (uipath CLI) constructs a :class:`GovernanceRuntimeMetadata`
once per agent run and passes it to the telemetry sink. Every
``customEvents`` row produced by :class:`TrackEventAuditSink` carries
these fields so KQL queries in App Insights can pivot on engine /
agent type / framework / runtime version.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from importlib.metadata import PackageNotFoundError, version

NATIVE_EXECUTION_ENGINE = "uipath_native_governance_checker"


def _resolve_runtime_version() -> str:
"""Read the ``uipath-runtime`` package version, or ``"unknown"``.

``importlib.metadata.version`` fails when the package is imported
from a source checkout that was never installed (CI fixtures,
editable installs with stripped metadata). Telemetry must keep
flowing in those cases, so the fallback is a sentinel rather than
a raise.
"""
try:
return version("uipath-runtime")
except PackageNotFoundError:
return "unknown"


@dataclass(frozen=True)
class GovernanceRuntimeMetadata:
"""Constants stamped on every governance telemetry event.

Attributes:
execution_engine: Implementation behind the evaluator. Default
``"uipath_native_governance_checker"``. When a future engine
(e.g. AGT) replaces the native checker, the host supplies
its own identifier here so the App Insights row records
which engine produced the verdict.
agent_type: Category of agent under governance — e.g.
``"uipath_coded"``, ``"uipath_lowcode"``, ``"servicenow"``,
or any other identifier the host wants to attach. External
agents (ServiceNow, etc.) join this taxonomy when they
land. ``"unknown"`` keeps telemetry flowing if the host
forgets to set it.
agent_framework: Framework that drives the agent — e.g.
``"langchain"``, ``"openai_agents"``, ``"llamaindex"``,
``"google_adk"``, ``"agent_framework"``, ``"mcp"``.
runtime_version: ``uipath-runtime`` package version. Resolved
from installed package metadata at construction; falls
back to ``"unknown"`` for source checkouts.
"""

execution_engine: str = NATIVE_EXECUTION_ENGINE
agent_type: str = "unknown"
agent_framework: str = "unknown"
runtime_version: str = field(default_factory=_resolve_runtime_version)

def as_payload(self) -> dict[str, str]:
"""Return the metadata as a dict ready to merge into an event payload."""
return {
"execution_engine": self.execution_engine,
"agent_type": self.agent_type,
"agent_framework": self.agent_framework,
"runtime_version": self.runtime_version,
}
Loading
Loading