diff --git a/src/uipath/runtime/governance/_audit/base.py b/src/uipath/runtime/governance/_audit/base.py index 13b7cde..92138c3 100644 --- a/src/uipath/runtime/governance/_audit/base.py +++ b/src/uipath/runtime/governance/_audit/base.py @@ -22,10 +22,12 @@ from abc import ABC, abstractmethod from dataclasses import asdict, dataclass, field from datetime import datetime, timezone -from typing import Any +from typing import Any, Callable from uipath.core.governance import EnforcementMode +from .metadata import GovernanceRuntimeMetadata + logger = logging.getLogger(__name__) @@ -244,6 +246,9 @@ def __init__( async_mode: bool = True, queue_maxsize: int = _DEFAULT_QUEUE_MAXSIZE, register_default_sinks: bool = True, + *, + track_event: Callable[..., None] | None = None, + runtime_metadata: GovernanceRuntimeMetadata | None = None, ) -> None: """Initialize the audit manager. @@ -252,10 +257,26 @@ def __init__( thread. If False, events are processed synchronously. queue_maxsize: Max queued events in async mode. On overflow the oldest queued event is dropped to make room. - register_default_sinks: If True (default), register the platform - mandated ``traces`` sink and an atexit cleanup - handler. Tests that want a bare manager can pass - ``False`` and register sinks explicitly. + register_default_sinks: If True (default), register the + platform-mandated ``traces`` and ``track_events`` + sinks and an atexit cleanup handler. Tests that + want a bare manager can pass ``False`` and + register sinks explicitly. + track_event: Platform-supplied telemetry callable matching + ``UiPathPlatformGovernanceProvider.track_event``. + The host is expected to wire this in production + deployments. When ``None`` (or any sink- + construction error), the ``track_events`` sink + is skipped and a warning is logged via the + registration helper's try/except — the runtime + continues so a wiring bug never breaks the + agent run. + runtime_metadata: Constants stamped on every telemetry event + (execution engine, agent type, agent framework, + runtime version). Defaults to + :class:`GovernanceRuntimeMetadata` () — auto-resolved + version + ``unknown`` agent type / framework. + The host overrides with concrete values. """ self._sinks: list[AuditSink] = [] # Single lock guards _sinks, _sink_failures, _tripped_sinks — every @@ -277,6 +298,7 @@ def __init__( if register_default_sinks: self._register_traces_sink() + self._register_track_event_sink(track_event, runtime_metadata) # Process-level atexit (one shared handler, weakref-tracked # set) instead of per-instance ``atexit.register(self.method)``: # avoids unbounded atexit list growth and the strong reference @@ -299,6 +321,49 @@ def _register_traces_sink(self) -> None: self.register_sink(sink) logger.info("Governance audit sink registered: traces") + def _register_track_event_sink( + self, + track_event: Callable[..., None] | None, + runtime_metadata: GovernanceRuntimeMetadata | None, + ) -> None: + """Register the platform-mandated ``track_events`` sink. + + Mirrors the shape of :meth:`_register_traces_sink`: deferred + import, construct, register, log. ``track_events`` is part of + the standard platform audit surface (App Insights + ``customEvents``) and is expected to be wired by the host's + platform layer. + + Wrapped in a broad ``except`` so a misconfigured wiring layer + (missing callable, sink construction error) never crashes the + agent — the runtime logs and proceeds without the sink. The + sink-level circuit breaker handles per-emit failures + separately. + """ + try: + from .metadata import GovernanceRuntimeMetadata as _Metadata + from .track_events import TrackEventAuditSink + + if track_event is None: + raise ValueError( + "Platform-mandated track_event callable was not supplied. " + "The host wiring layer must pass " + "UiPathPlatformGovernanceProvider.track_event to " + "AuditManager(...)." + ) + meta = runtime_metadata if runtime_metadata is not None else _Metadata() + sink = TrackEventAuditSink(track_event, meta) + self.register_sink(sink) + logger.info("Governance audit sink registered: track_events") + except Exception as exc: # noqa: BLE001 - registration must not crash the agent + # ``str(exc)`` instead of passing ``exc`` directly: the + # logging LogRecord retains its ``args`` tuple until the + # handler formats the record, and a raw exception there + # carries its ``__traceback__`` → frame chain → ``self``, + # which would keep the AuditManager alive in any + # log-capturing test. Stringifying breaks that ref. + logger.warning("Failed to register track_events sink: %s", str(exc)) + def _start_worker(self) -> None: """Start the background worker thread.""" if self._worker_thread is not None and self._worker_thread.is_alive(): @@ -534,6 +599,8 @@ def emit_rule_evaluation( agent_name: str = "agent", trace_id: str = "", description: str = "", + duration_ms: float = 0.0, + mapped_to_uipath: bool = False, ) -> None: """Convenience method to emit a rule evaluation event. @@ -541,6 +608,11 @@ def emit_rule_evaluation( read a process-global. With instance-scoped runtimes the global wouldn't be authoritative anyway — parallel runtimes can run in different modes simultaneously. + + ``duration_ms`` and ``mapped_to_uipath`` are stamped on the + event's data dict for telemetry sinks (App Insights customEvents + via :class:`TrackEventAuditSink`); existing OTel-trace sinks + ignore them. """ self.emit( AuditEvent( @@ -558,6 +630,8 @@ def emit_rule_evaluation( "detail": detail, "description": description, "status": "MATCHED" if matched else "PASS", + "duration_ms": duration_ms, + "mapped_to_uipath": mapped_to_uipath, }, ) ) @@ -571,8 +645,41 @@ def emit_hook_summary( final_action: str, enforcement_mode: EnforcementMode, trace_id: str = "", + duration_ms: float = 0.0, + skipped_policy_names: list[str] | None = None, + guardrail_dispatched_count: int = 0, + denied_count: int | None = None, ) -> None: - """Convenience method to emit a hook summary event.""" + """Convenience method to emit a hook summary event. + + ``matched_rules`` keeps its historical meaning — any rule whose + checks matched, regardless of the configured action — for + backward compatibility with existing sinks. The newer + ``denied_count`` captures only the rules the evaluator actually + wanted to act on (matched **and** configured action ≠ + ``allow``). A matched rule whose action is ``allow`` is a + positive informational match and is folded into + ``passed_count``, not ``denied_count``. + + Args: + duration_ms: Total wall time spent evaluating this hook. + skipped_policy_names: Rules in the pack that were not + evaluated (currently disabled). The summary carries + their ids so operators can spot which policies a tenant + turned off. + guardrail_dispatched_count: How many UiPath-mapped + guardrail-fallback rules fired for this hook (POSTed + to ``/runtime/govern``). Lets dashboards compute the + native-vs-server-dispatched ratio. + denied_count: Rules that matched **and** would have acted + (action ∈ {``deny``, ``escalate``, ``audit``}). When + ``None`` (legacy callers), falls back to + ``matched_rules`` so the old "matched == denial" + semantic is preserved. + """ + skipped = list(skipped_policy_names or []) + actual_denied = denied_count if denied_count is not None else matched_rules + passed_count = max(total_rules - actual_denied, 0) self.emit( AuditEvent( event_type=EventType.HOOK_END, @@ -584,6 +691,12 @@ def emit_hook_summary( "matched_rules": matched_rules, "final_action": final_action, "enforcement_mode": enforcement_mode, + "duration_ms": duration_ms, + "passed_count": passed_count, + "denied_count": actual_denied, + "skipped_count": len(skipped), + "skipped_policy_names": skipped, + "guardrail_dispatched_count": guardrail_dispatched_count, }, ) ) diff --git a/src/uipath/runtime/governance/_audit/metadata.py b/src/uipath/runtime/governance/_audit/metadata.py new file mode 100644 index 0000000..5295e43 --- /dev/null +++ b/src/uipath/runtime/governance/_audit/metadata.py @@ -0,0 +1,69 @@ +"""Per-runtime metadata stamped on every governance telemetry event. + +The host (uipath CLI) constructs a :class:`GovernanceRuntimeMetadata` +once per agent run and passes it to the telemetry sink. Every +``customEvents`` row produced by :class:`TrackEventAuditSink` carries +these fields so KQL queries in App Insights can pivot on engine / +agent type / framework / runtime version. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from importlib.metadata import PackageNotFoundError, version + +NATIVE_EXECUTION_ENGINE = "uipath_native_governance_checker" + + +def _resolve_runtime_version() -> str: + """Read the ``uipath-runtime`` package version, or ``"unknown"``. + + ``importlib.metadata.version`` fails when the package is imported + from a source checkout that was never installed (CI fixtures, + editable installs with stripped metadata). Telemetry must keep + flowing in those cases, so the fallback is a sentinel rather than + a raise. + """ + try: + return version("uipath-runtime") + except PackageNotFoundError: + return "unknown" + + +@dataclass(frozen=True) +class GovernanceRuntimeMetadata: + """Constants stamped on every governance telemetry event. + + Attributes: + execution_engine: Implementation behind the evaluator. Default + ``"uipath_native_governance_checker"``. When a future engine + (e.g. AGT) replaces the native checker, the host supplies + its own identifier here so the App Insights row records + which engine produced the verdict. + agent_type: Category of agent under governance — e.g. + ``"uipath_coded"``, ``"uipath_lowcode"``, ``"servicenow"``, + or any other identifier the host wants to attach. External + agents (ServiceNow, etc.) join this taxonomy when they + land. ``"unknown"`` keeps telemetry flowing if the host + forgets to set it. + agent_framework: Framework that drives the agent — e.g. + ``"langchain"``, ``"openai_agents"``, ``"llamaindex"``, + ``"google_adk"``, ``"agent_framework"``, ``"mcp"``. + runtime_version: ``uipath-runtime`` package version. Resolved + from installed package metadata at construction; falls + back to ``"unknown"`` for source checkouts. + """ + + execution_engine: str = NATIVE_EXECUTION_ENGINE + agent_type: str = "unknown" + agent_framework: str = "unknown" + runtime_version: str = field(default_factory=_resolve_runtime_version) + + def as_payload(self) -> dict[str, str]: + """Return the metadata as a dict ready to merge into an event payload.""" + return { + "execution_engine": self.execution_engine, + "agent_type": self.agent_type, + "agent_framework": self.agent_framework, + "runtime_version": self.runtime_version, + } diff --git a/src/uipath/runtime/governance/_audit/track_events.py b/src/uipath/runtime/governance/_audit/track_events.py new file mode 100644 index 0000000..b2c0e42 --- /dev/null +++ b/src/uipath/runtime/governance/_audit/track_events.py @@ -0,0 +1,294 @@ +"""Telemetry sink that emits governance evaluation events to ``/runtime/log``. + +Posts custom telemetry events through a caller-supplied ``track_event`` +callable so the runtime stays decoupled from any specific platform-side +service implementation. The host (uipath CLI) wires +``UiPathPlatformGovernanceProvider.track_event`` as the callable; +tests pass a mock that captures payloads. + +Event-rate policy (volume control): + +- DENIED rules (``matched=True``) → one ``governance.rule.denied`` event + per matched rule. +- PASSED + SKIPPED rules → one ``governance.hook.summary`` event per + hook with counts + the names of skipped rules + the count of + UiPath-mapped guardrails that fell back to ``/runtime/govern``. + +Why: a 50-rule pack evaluated on every hook of every agent step would +multiply per-step telemetry calls by 50. Bundling the "nothing +happened" cases into a single hook summary cuts that to 1. +""" + +from __future__ import annotations + +import logging +from typing import Any, Callable + +from uipath.core.governance import EnforcementMode + +from .base import AuditEvent, AuditSink, EventType +from .metadata import GovernanceRuntimeMetadata + +logger = logging.getLogger(__name__) + + +TrackEventCallable = Callable[..., None] +"""Signature matches :meth:`UiPathPlatformGovernanceProvider.track_event`. + +Expected kwargs: ``event_name: str``, ``data: dict | None``, +``operation_id: str | None``. The sink calls this synchronously from +the audit-manager worker thread. +""" + + +EVENT_RULE_DENIED = "governance.rule.denied" +EVENT_HOOK_SUMMARY = "governance.hook.summary" + + +def _mode_str(mode: Any) -> str: + """Coerce an enforcement-mode field to its canonical uppercase string.""" + if isinstance(mode, EnforcementMode): + return mode.value.upper() + if isinstance(mode, str): + return mode.upper() + return "AUDIT" + + +def _evaluator_result(action: str) -> str: + """Map a rule's configured action to the spec-vocabulary verdict. + + Mirrors :func:`uipath.runtime.governance._audit.traces._derive_results` + so both sinks agree on the (matched, action) → verdict mapping. + """ + action_lc = action.lower() + if action_lc == "deny": + return "DENY" + if action_lc == "escalate": + return "HITL" + if action_lc == "audit": + # The rule wanted to deny but was tagged audit-only at the + # check level — the evaluator's intent is still "deny". + return "DENY" + return "ALLOW" + + +def _action_applied(evaluator_result: str, configured_action: str, mode: str) -> str: + """Mode-adjusted action: AUDIT mode collapses DENY/HITL into AUDIT.""" + if mode == "AUDIT": + if evaluator_result in ("DENY", "HITL"): + return "AUDIT" + return "NONE" + # ENFORCE mode: per-rule ``audit`` override stays AUDIT. + if configured_action.lower() == "audit": + return "AUDIT" + return evaluator_result if evaluator_result != "ALLOW" else "NONE" + + +class TrackEventAuditSink(AuditSink): + """Sink that POSTs governance telemetry to App Insights via ``track_event``. + + Volume-controlled per the module docstring: individual events only + for denials, one aggregated event per hook for passed / skipped / + dispatched. Other event types are dropped at :meth:`accepts`. + + The sink relies on the :class:`AuditManager` worker thread for + asynchronous dispatch — the ``track_event`` callable runs on the + worker, not on the agent's hook thread. + """ + + SINK_NAME = "track_events" + + def __init__( + self, + track_event: TrackEventCallable, + runtime_metadata: GovernanceRuntimeMetadata, + *, + name: str = SINK_NAME, + ) -> None: + """Initialize the sink. + + Args: + track_event: The platform-side telemetry callable. Receives + ``event_name``, ``data``, ``operation_id`` kwargs. + Wired by the host to + ``UiPathPlatformGovernanceProvider.track_event``; tests + pass a capture callable. **Required** — the platform + wiring layer is expected to provide this. ``None`` is a + wiring bug and is rejected here so the manager's + registration try/except surfaces it as a clear warning + rather than letting a ``TypeError`` fire on the first + event. + runtime_metadata: Per-run constants stamped on every event + (execution engine, agent type, agent framework, runtime + version). + name: Sink name used for circuit-breaker accounting in the + manager. Override only if more than one telemetry sink + of this kind is registered against the same manager. + """ + if track_event is None: + raise ValueError( + "TrackEventAuditSink requires a non-None track_event callable. " + "The platform wiring layer is expected to supply " + "UiPathPlatformGovernanceProvider.track_event." + ) + self._track_event = track_event + self._meta = runtime_metadata + self._name = name + + @property + def name(self) -> str: + return self._name + + def accepts(self, event: AuditEvent) -> bool: + """Filter the audit stream down to denials + non-empty hook summaries. + + Drops: + + - **Any event whose mode is** :attr:`EnforcementMode.DISABLED` + — governance is off; emit nothing. The evaluator short-circuits + before ``_emit_audit`` in this case, so in practice the sink + shouldn't see these, but the guard here is belt-and-suspenders + against any future emitter that bypasses the short-circuit. + - Rules that didn't match (``matched=False``) — rolled into the + hook summary's ``passed_count``. + - Rules that matched but whose configured ``action`` is + ``allow`` — a positive informational match isn't a denial; + rolled into the hook summary too. + - **Empty hook summaries** — ``total_rules=0`` AND + ``skipped_count=0`` means nothing was evaluated and nothing + was deliberately skipped. Zero operator value; suppressed. + (A summary with ``skipped_count>0`` still fires because the + ``skipped_policy_names`` payload is operator-relevant — it + shows which policies a tenant turned off.) + - Other event types (``hook_start``, ``session_*``, generic + violations) — out of scope for this sink's telemetry surface. + + The result is bounded volume: at most one ``rule.denied`` event + per matched-and-restrictive rule + at most one + ``hook.summary`` per hook end (and only when it carries data). + """ + if _mode_str(event.data.get("enforcement_mode")) == "DISABLED": + return False + + if event.event_type == EventType.RULE_EVALUATION: + if not event.data.get("matched"): + return False + action = str(event.data.get("action") or "allow").lower() + return action != "allow" + if event.event_type == EventType.HOOK_END: + data = event.data + total = int(data.get("total_rules") or 0) + skipped = int(data.get("skipped_count") or 0) + return not (total == 0 and skipped == 0) + return False + + def emit(self, event: AuditEvent) -> None: + """Render the event and dispatch via the injected callable. + + Re-checks ``matched`` for rule evaluations as a defense-in-depth + guard. :meth:`accepts` already drops passed rules at the + :class:`AuditManager` dispatch boundary, but a direct + ``sink.emit(event)`` call (tests, future alternate dispatch + paths) must not route a passed rule into the + ``governance.rule.denied`` telemetry stream. + + Errors from ``track_event`` propagate to the audit manager, + which has its own circuit breaker (10 consecutive failures → + sink tripped for the rest of the process). The sink itself + doesn't catch — letting the manager track failure rate is the + whole point of the sink-failure protocol. + """ + # DISABLED governance = no telemetry, full stop. Same + # defense-in-depth rationale as the matched/allow check below: + # ``accepts`` drops these at the manager boundary, but a + # direct ``sink.emit`` call must not leak a DISABLED-mode + # event into App Insights either. + if _mode_str(event.data.get("enforcement_mode")) == "DISABLED": + return + + if event.event_type == EventType.RULE_EVALUATION: + if not event.data.get("matched"): + return + action = str(event.data.get("action") or "allow").lower() + if action == "allow": + # Positive informational match — not a denial. Same + # defense-in-depth rationale as the ``matched`` check + # above: ``accepts`` drops these at the manager + # boundary, but a direct ``sink.emit`` call must not + # leak them into the ``rule.denied`` stream. + return + self._emit_rule_denied(event) + elif event.event_type == EventType.HOOK_END: + data = event.data + total = int(data.get("total_rules") or 0) + skipped = int(data.get("skipped_count") or 0) + if total == 0 and skipped == 0: + # Empty hook — same defense-in-depth as the disabled + # mode and matched-allow filters. + return + self._emit_hook_summary(event) + # Other event types are filtered out at accepts(); reaching + # here for anything else would be a contract violation by the + # manager, not the sink. + + def _common_payload(self, event: AuditEvent) -> dict[str, Any]: + """Per-runtime constants + event identifiers stamped on every payload.""" + payload: dict[str, Any] = dict(self._meta.as_payload()) + payload["agent_name"] = event.agent_name + payload["hook"] = event.hook + payload["timestamp"] = event.timestamp.isoformat() + return payload + + def _emit_rule_denied(self, event: AuditEvent) -> None: + """Emit one ``governance.rule.denied`` per matched rule.""" + data = event.data + mode = _mode_str(data.get("enforcement_mode")) + configured_action = str(data.get("action") or "allow") + evaluator_result = _evaluator_result(configured_action) + action_applied = _action_applied(evaluator_result, configured_action, mode) + + payload = self._common_payload(event) + payload.update( + { + "pack": str(data.get("pack_name") or ""), + "clause": str(data.get("policy_id") or ""), + "rule_name": str(data.get("rule_name") or ""), + "mode": mode, + "evaluator_result": evaluator_result, + "action_applied": action_applied, + "duration_ms": float(data.get("duration_ms") or 0.0), + "mapped_to_uipath": bool(data.get("mapped_to_uipath", False)), + "detail": str(data.get("detail") or ""), + } + ) + self._track_event( + event_name=EVENT_RULE_DENIED, + data=payload, + operation_id=event.trace_id or None, + ) + + def _emit_hook_summary(self, event: AuditEvent) -> None: + """Emit one ``governance.hook.summary`` per hook end.""" + data = event.data + mode = _mode_str(data.get("enforcement_mode")) + + payload = self._common_payload(event) + payload.update( + { + "mode": mode, + "passed_count": int(data.get("passed_count") or 0), + "skipped_count": int(data.get("skipped_count") or 0), + "skipped_policy_names": list(data.get("skipped_policy_names") or []), + "denied_count": int(data.get("denied_count") or 0), + "guardrail_dispatched_count": int( + data.get("guardrail_dispatched_count") or 0 + ), + "duration_ms": float(data.get("duration_ms") or 0.0), + "final_action": str(data.get("final_action") or "allow").upper(), + } + ) + self._track_event( + event_name=EVENT_HOOK_SUMMARY, + data=payload, + operation_id=event.trace_id or None, + ) diff --git a/src/uipath/runtime/governance/native/evaluator.py b/src/uipath/runtime/governance/native/evaluator.py index 2290361..5fb79f7 100644 --- a/src/uipath/runtime/governance/native/evaluator.py +++ b/src/uipath/runtime/governance/native/evaluator.py @@ -12,6 +12,7 @@ import logging import math import re +import time from collections import Counter from datetime import datetime, timezone from functools import lru_cache @@ -374,14 +375,27 @@ def evaluate(self, context: CheckContext) -> AuditRecord: rules = self._policy_index.get_rules_for_hook(context.hook) evaluations: list[RuleEvaluation] = [] + # Per-rule wall time (ms), keyed by rule_id. Forwarded to the + # telemetry sink via emit_rule_evaluation(duration_ms=...) so + # operators can see which rules are slow. + rule_durations: dict[str, float] = {} + # Disabled rules — never evaluated, surfaced in the hook + # summary's skipped_policy_names so dashboards can spot which + # policies a tenant turned off. + skipped_policy_ids: list[str] = [] raw_action = Action.ALLOW # The action before mode adjustment deny_would_fire = False # Track if DENY would have fired + hook_start = time.monotonic() + for rule in rules: if not rule.enabled: + skipped_policy_ids.append(rule.rule_id) continue + rule_start = time.monotonic() evaluation = self._evaluate_rule(rule, context) + rule_durations[rule.rule_id] = (time.monotonic() - rule_start) * 1000.0 evaluations.append(evaluation) if evaluation.matched: @@ -397,6 +411,8 @@ def evaluate(self, context: CheckContext) -> AuditRecord: elif eval_action == Action.AUDIT and raw_action == Action.ALLOW: raw_action = Action.AUDIT + hook_duration_ms = (time.monotonic() - hook_start) * 1000.0 + # Apply enforcement mode final_action = self._apply_enforcement_mode(raw_action) @@ -417,7 +433,13 @@ def evaluate(self, context: CheckContext) -> AuditRecord: metadata=record_metadata, ) - self._emit_audit(audit, mode) + self._emit_audit( + audit, + mode, + rule_durations=rule_durations, + skipped_policy_ids=skipped_policy_ids, + hook_duration_ms=hook_duration_ms, + ) # For any guardrail mapped to UiPath but currently disabled, hand # the disabled guardrails to the governance-server's @@ -484,30 +506,60 @@ def _dispatch_compensation( "Failed to dispatch compensating governance call: %s", exc ) - def _emit_audit(self, audit: AuditRecord, mode: EnforcementMode) -> None: + def _emit_audit( + self, + audit: AuditRecord, + mode: EnforcementMode, + *, + rule_durations: dict[str, float] | None = None, + skipped_policy_ids: list[str] | None = None, + hook_duration_ms: float = 0.0, + ) -> None: """Emit per-rule and hook-summary events to the injected audit manager. No-op when no audit manager was supplied at construction. The per-runtime :class:`AuditManager` handles sink-level circuit breaking; emission errors stay there and never break evaluation. + + Args: + audit: Resolved :class:`AuditRecord` produced by + :meth:`evaluate`. + mode: Active :class:`EnforcementMode`. + rule_durations: Per-rule wall time in ms, keyed by + ``rule_id``. Looked up when emitting individual + rule-evaluation events; unknown ids default to 0. + skipped_policy_ids: Ids of rules that were disabled and + therefore not evaluated. Folded into the hook + summary's ``skipped_policy_names``. + hook_duration_ms: Total wall time spent evaluating this + hook. Folded into the hook summary's ``duration_ms``. """ manager = self._audit_manager if manager is None: return hook_name = audit.hook.name + durations = rule_durations or {} # ``guardrail_fallback`` rules are server-traced: the agent POSTs # to ``/runtime/govern`` (see :meth:`_dispatch_compensation`) and # the governance-server emits the audit event with the actual # validator verdict. Emitting a Python-side ``rule_evaluation`` # event here would produce a duplicate trace carrying no - # verdict, so filter these rules out of every event the Python - # evaluator emits (per-rule AND the hook summary's counts). + # verdict, so filter these rules out of every per-rule event the + # Python evaluator emits AND out of the hook summary's + # passed/denied counts. The summary keeps a separate + # ``guardrail_dispatched_count`` so the mapped-vs-unmapped + # dimension is still queryable. emittable = [ ev for ev in audit.evaluations if not self._is_guardrail_fallback_rule(ev.rule_id) ] + guardrail_dispatched_count = sum( + 1 + for ev in audit.evaluations + if ev.matched and self._is_guardrail_fallback_rule(ev.rule_id) + ) for evaluation in emittable: manager.emit_rule_evaluation( @@ -522,16 +574,37 @@ def _emit_audit(self, audit: AuditRecord, mode: EnforcementMode) -> None: agent_name=audit.agent_name, trace_id=audit.trace_id, description=evaluation.description, + duration_ms=durations.get(evaluation.rule_id, 0.0), + # Per-rule events the Python evaluator emits are never + # for UiPath-mapped guardrails (those are filtered out + # of ``emittable`` above and traced by the server). + # Stamping ``False`` keeps the schema stable for sinks. + mapped_to_uipath=False, ) + # ``matched_rules`` keeps the historical "any check matched" + # semantic (used by the legacy traces sink). ``denied_count`` + # is the precise spec verdict — only rules whose configured + # action would actually act (not ``allow``). A matched rule + # configured as ``allow`` is an explicit positive match and + # rolls into ``passed_count`` via the manager's arithmetic. + matched_rules = sum(1 for ev in emittable if ev.matched) + denied_count = sum( + 1 for ev in emittable if ev.matched and ev.action != Action.ALLOW + ) + manager.emit_hook_summary( hook=hook_name, agent_name=audit.agent_name, total_rules=len(emittable), - matched_rules=sum(1 for ev in emittable if ev.matched), + matched_rules=matched_rules, final_action=audit.final_action.value, enforcement_mode=mode, trace_id=audit.trace_id, + duration_ms=hook_duration_ms, + skipped_policy_names=list(skipped_policy_ids or []), + guardrail_dispatched_count=guardrail_dispatched_count, + denied_count=denied_count, ) def _is_guardrail_fallback_rule(self, rule_id: str) -> bool: diff --git a/tests/test_audit_manager_track_event_wiring.py b/tests/test_audit_manager_track_event_wiring.py new file mode 100644 index 0000000..d090d47 --- /dev/null +++ b/tests/test_audit_manager_track_event_wiring.py @@ -0,0 +1,186 @@ +"""Tests for ``AuditManager`` auto-registration of the ``track_events`` sink. + +The sink is platform-mandated, like ``traces``. The host wires the +``track_event`` callable + ``GovernanceRuntimeMetadata`` at +construction. ``_register_track_event_sink`` mirrors the simple shape +of ``_register_traces_sink`` — deferred import, construct, register, +log — wrapped in a broad ``except`` so a misconfigured wiring layer +(missing callable, sink-construction error) is surfaced as a warning +instead of crashing the agent. +""" + +from __future__ import annotations + +import logging +from typing import Any + +import pytest + +from uipath.runtime.governance._audit.base import AuditManager +from uipath.runtime.governance._audit.metadata import GovernanceRuntimeMetadata +from uipath.runtime.governance._audit.track_events import TrackEventAuditSink + + +class _Capture: + """Stand-in for ``provider.track_event`` — records calls.""" + + def __init__(self) -> None: + self.calls: list[dict[str, Any]] = [] + + def __call__(self, **kwargs: Any) -> None: + self.calls.append(kwargs) + + +# --------------------------------------------------------------------------- +# Happy path — host wires the callable +# --------------------------------------------------------------------------- + + +def test_track_event_sink_registered_when_callable_supplied() -> None: + mgr = AuditManager( + async_mode=False, + track_event=_Capture(), + runtime_metadata=GovernanceRuntimeMetadata(agent_type="uipath_coded"), + ) + try: + sinks = mgr.list_sinks() + assert TrackEventAuditSink.SINK_NAME in sinks + sink = mgr.get_sink(TrackEventAuditSink.SINK_NAME) + assert isinstance(sink, TrackEventAuditSink) + finally: + mgr.close() + + +def test_traces_sink_is_still_registered_alongside() -> None: + """track_events doesn't replace traces — both are mandatory.""" + mgr = AuditManager( + async_mode=False, + track_event=_Capture(), + runtime_metadata=GovernanceRuntimeMetadata(), + ) + try: + sinks = mgr.list_sinks() + assert "traces" in sinks + assert TrackEventAuditSink.SINK_NAME in sinks + finally: + mgr.close() + + +def test_supplied_runtime_metadata_reaches_sink() -> None: + mgr = AuditManager( + async_mode=False, + track_event=_Capture(), + runtime_metadata=GovernanceRuntimeMetadata( + agent_type="uipath_coded", agent_framework="langchain" + ), + ) + try: + sink = mgr.get_sink(TrackEventAuditSink.SINK_NAME) + assert isinstance(sink, TrackEventAuditSink) + # Internal: confirm the sink carries the host-supplied metadata. + assert sink._meta.agent_type == "uipath_coded" + assert sink._meta.agent_framework == "langchain" + finally: + mgr.close() + + +def test_metadata_defaults_when_not_supplied() -> None: + """Helper falls back to ``GovernanceRuntimeMetadata()`` defaults.""" + mgr = AuditManager(async_mode=False, track_event=_Capture()) + try: + sink = mgr.get_sink(TrackEventAuditSink.SINK_NAME) + assert isinstance(sink, TrackEventAuditSink) + assert sink._meta.agent_type == "unknown" + assert sink._meta.agent_framework == "unknown" + finally: + mgr.close() + + +# --------------------------------------------------------------------------- +# Missing-callable path — caught by the helper's try/except, logged +# --------------------------------------------------------------------------- + + +def test_missing_callable_logs_warning_and_skips_sink( + caplog: pytest.LogCaptureFixture, +) -> None: + """``track_event=None`` → sink __init__ raises → helper catches + warns.""" + caplog.set_level(logging.WARNING, logger="uipath.runtime.governance._audit.base") + mgr = AuditManager(async_mode=False) # no track_event supplied + try: + # Traces sink still wired; track_events skipped because sink + # construction raised ValueError caught by the helper. + assert "traces" in mgr.list_sinks() + assert TrackEventAuditSink.SINK_NAME not in mgr.list_sinks() + + warnings = [ + r.message for r in caplog.records if r.levelno == logging.WARNING + ] + assert any( + "Failed to register track_events sink" in m for m in warnings + ), f"expected registration-failure warning, got: {warnings}" + finally: + mgr.close() + + +def test_sink_construction_error_does_not_crash_manager( + caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch +) -> None: + """An exception inside ``TrackEventAuditSink.__init__`` is swallowed. + + Forces a non-``ValueError`` exception (a raise inside ``__init__`` + triggered via the runtime_metadata path) to confirm the helper's + ``except Exception`` covers the broad case the user asked for, not + just the validation error. + """ + caplog.set_level(logging.WARNING, logger="uipath.runtime.governance._audit.base") + + def _boom(*args: Any, **kwargs: Any) -> Any: + raise RuntimeError("synthetic registration failure") + + from uipath.runtime.governance._audit import track_events as te + monkeypatch.setattr(te, "TrackEventAuditSink", _boom) + + mgr = AuditManager(async_mode=False, track_event=_Capture()) + try: + # Helper swallowed the RuntimeError; manager kept the traces sink + # and reached a constructed state without crashing. + assert "traces" in mgr.list_sinks() + assert TrackEventAuditSink.SINK_NAME not in mgr.list_sinks() + warnings = [ + r.message for r in caplog.records if r.levelno == logging.WARNING + ] + assert any( + "Failed to register track_events sink" in m + and "synthetic registration failure" in m + for m in warnings + ), f"expected wrapped registration failure, got: {warnings}" + finally: + mgr.close() + + +# --------------------------------------------------------------------------- +# Opt-out path — register_default_sinks=False +# --------------------------------------------------------------------------- + + +def test_opt_out_skips_both_sinks() -> None: + """register_default_sinks=False keeps the audit pipeline empty for tests.""" + mgr = AuditManager(async_mode=False, register_default_sinks=False) + try: + assert mgr.list_sinks() == [] + finally: + mgr.close() + + +def test_opt_out_does_not_warn(caplog: pytest.LogCaptureFixture) -> None: + """Opting out is an explicit signal — no missing-callable warning.""" + caplog.set_level(logging.WARNING, logger="uipath.runtime.governance._audit.base") + mgr = AuditManager(async_mode=False, register_default_sinks=False) + try: + warnings = [ + r.message for r in caplog.records if r.levelno == logging.WARNING + ] + assert not any("track_events" in m for m in warnings) + finally: + mgr.close() diff --git a/tests/test_evaluator_telemetry.py b/tests/test_evaluator_telemetry.py new file mode 100644 index 0000000..eeab93d --- /dev/null +++ b/tests/test_evaluator_telemetry.py @@ -0,0 +1,290 @@ +"""Tests that the evaluator passes the new telemetry fields downstream. + +The evaluator must: + +- Measure per-rule and per-hook wall-clock duration. +- Collect disabled rule ids into ``skipped_policy_names``. +- Count matched UiPath-mapped guardrails (``guardrail_dispatched_count``). +- Hand all of the above to ``AuditManager.emit_rule_evaluation`` and + ``emit_hook_summary`` so the ``TrackEventAuditSink`` payload carries + them. + +We capture events via a manager registered with no default sinks + +one ``_CapturingSink`` so the assertions see exactly the per-rule and +hook-summary events the evaluator emits. +""" + +from __future__ import annotations + +import pytest +from uipath.core.governance import EnforcementMode +from uipath.core.governance.models import Action, LifecycleHook + +from uipath.runtime.governance._audit.base import ( + AuditEvent, + AuditManager, + AuditSink, + EventType, +) +from uipath.runtime.governance.native.evaluator import GovernanceEvaluator +from uipath.runtime.governance.native.models import ( + Check, + CheckContext, + Condition, + PolicyIndex, + PolicyPack, + Rule, +) + + +class _CapturingSink(AuditSink): + def __init__(self) -> None: + self.events: list[AuditEvent] = [] + + @property + def name(self) -> str: + return "capturing" + + def emit(self, event: AuditEvent) -> None: + self.events.append(event) + + +def _rule( + rule_id: str, + *, + enabled: bool = True, + matches: bool = True, + action: Action = Action.DENY, +) -> Rule: + """A rule whose ``contains`` check always (or never) matches the input.""" + return Rule( + rule_id=rule_id, + name=f"rule-{rule_id}", + clause=rule_id, + hook=LifecycleHook.BEFORE_AGENT, + action=action, + enabled=enabled, + checks=[ + Check( + conditions=[ + Condition( + operator="contains", + field="agent_input", + value="needle" if matches else "absent-needle", + ) + ], + action=action, + message=f"matched {rule_id}", + ) + ], + ) + + +def _pack(*rules: Rule) -> PolicyIndex: + idx = PolicyIndex() + idx.add_pack( + PolicyPack(name="test_pack", version="1.0", description="t", rules=list(rules)) + ) + return idx + + +def _ctx() -> CheckContext: + return CheckContext( + hook=LifecycleHook.BEFORE_AGENT, + agent_name="agent-x", + runtime_id="run-1", + trace_id="trace-1", + agent_input="needle", + ) + + +@pytest.fixture +def sink_and_manager() -> tuple[_CapturingSink, AuditManager]: + """Sink-capturing manager with no default sinks (no traces, no track_events).""" + sink = _CapturingSink() + mgr = AuditManager(async_mode=False, register_default_sinks=False) + mgr.register_sink(sink) + return sink, mgr + + +def _rule_events(sink: _CapturingSink) -> list[AuditEvent]: + return [e for e in sink.events if e.event_type == EventType.RULE_EVALUATION] + + +def _hook_summary(sink: _CapturingSink) -> AuditEvent: + summaries = [e for e in sink.events if e.event_type == EventType.HOOK_END] + assert len(summaries) == 1, f"expected 1 hook summary, got {len(summaries)}" + return summaries[0] + + +# --------------------------------------------------------------------------- +# Per-rule timing +# --------------------------------------------------------------------------- + + +def test_rule_evaluation_carries_duration_ms( + sink_and_manager: tuple[_CapturingSink, AuditManager], +) -> None: + sink, mgr = sink_and_manager + try: + evaluator = GovernanceEvaluator( + _pack(_rule("A.1.1")), + enforcement_mode=EnforcementMode.AUDIT, + audit_manager=mgr, + ) + evaluator.evaluate(_ctx()) + finally: + mgr.close() + + rule_evs = _rule_events(sink) + assert len(rule_evs) == 1 + duration = rule_evs[0].data["duration_ms"] + assert isinstance(duration, float) + assert duration >= 0.0 + + +def test_rule_evaluation_mapped_to_uipath_is_false_for_native( + sink_and_manager: tuple[_CapturingSink, AuditManager], +) -> None: + """Native rules (no guardrail_fallback) always emit ``mapped_to_uipath=False``.""" + sink, mgr = sink_and_manager + try: + evaluator = GovernanceEvaluator( + _pack(_rule("A.1.1")), + enforcement_mode=EnforcementMode.AUDIT, + audit_manager=mgr, + ) + evaluator.evaluate(_ctx()) + finally: + mgr.close() + + rule_evs = _rule_events(sink) + assert rule_evs[0].data["mapped_to_uipath"] is False + + +# --------------------------------------------------------------------------- +# Hook-summary aggregates +# --------------------------------------------------------------------------- + + +def test_hook_summary_carries_total_duration( + sink_and_manager: tuple[_CapturingSink, AuditManager], +) -> None: + sink, mgr = sink_and_manager + try: + evaluator = GovernanceEvaluator( + _pack(_rule("A.1.1"), _rule("A.1.2", matches=False)), + enforcement_mode=EnforcementMode.AUDIT, + audit_manager=mgr, + ) + evaluator.evaluate(_ctx()) + finally: + mgr.close() + + summary = _hook_summary(sink) + duration = summary.data["duration_ms"] + assert isinstance(duration, float) + assert duration >= 0.0 + + +def test_hook_summary_tracks_skipped_disabled_rules( + sink_and_manager: tuple[_CapturingSink, AuditManager], +) -> None: + """Disabled rules appear in ``skipped_policy_names`` (and skipped_count).""" + sink, mgr = sink_and_manager + try: + evaluator = GovernanceEvaluator( + _pack( + _rule("A.1.1"), # enabled, matches + _rule("A.1.2", enabled=False), # DISABLED + _rule("A.1.3", enabled=False), # DISABLED + ), + enforcement_mode=EnforcementMode.AUDIT, + audit_manager=mgr, + ) + evaluator.evaluate(_ctx()) + finally: + mgr.close() + + summary = _hook_summary(sink) + assert summary.data["skipped_count"] == 2 + assert set(summary.data["skipped_policy_names"]) == {"A.1.2", "A.1.3"} + + +def test_hook_summary_passed_and_denied_counts( + sink_and_manager: tuple[_CapturingSink, AuditManager], +) -> None: + """One match + two non-matches → denied=1, passed=2, matched_rules=1.""" + sink, mgr = sink_and_manager + try: + evaluator = GovernanceEvaluator( + _pack( + _rule("A.1.1"), # matches (DENY) + _rule("A.1.2", matches=False), # passes + _rule("A.1.3", matches=False), # passes + ), + enforcement_mode=EnforcementMode.AUDIT, + audit_manager=mgr, + ) + evaluator.evaluate(_ctx()) + finally: + mgr.close() + + summary = _hook_summary(sink) + assert summary.data["denied_count"] == 1 + assert summary.data["passed_count"] == 2 + # ``matched_rules`` keeps its historical "any check matched" sense. + assert summary.data["matched_rules"] == 1 + + +def test_hook_summary_matched_allow_rule_counts_as_passed( + sink_and_manager: tuple[_CapturingSink, AuditManager], +) -> None: + """A matched rule with action=ALLOW is a positive informational match. + + It should NOT contribute to ``denied_count`` — it rolls into + ``passed_count`` instead. ``matched_rules`` (raw count) still + includes it for backward compatibility with the legacy traces sink. + """ + sink, mgr = sink_and_manager + try: + evaluator = GovernanceEvaluator( + _pack( + _rule("A.1.1"), # matches (DENY) + _rule("A.1.2", action=Action.ALLOW), # matches (ALLOW) — positive + _rule("A.1.3", matches=False), # passes (unmatched) + ), + enforcement_mode=EnforcementMode.AUDIT, + audit_manager=mgr, + ) + evaluator.evaluate(_ctx()) + finally: + mgr.close() + + summary = _hook_summary(sink) + # 3 total rules; only the explicit DENY counts as a denial. + assert summary.data["denied_count"] == 1 + # ``passed_count`` includes the unmatched rule AND the matched-allow rule. + assert summary.data["passed_count"] == 2 + # Raw ``matched_rules`` still reflects "any check matched" — both + # the DENY and the ALLOW match contribute. + assert summary.data["matched_rules"] == 2 + + +def test_hook_summary_guardrail_dispatched_count_for_native_rules_is_zero( + sink_and_manager: tuple[_CapturingSink, AuditManager], +) -> None: + """Without guardrail_fallback conditions, dispatched count is 0.""" + sink, mgr = sink_and_manager + try: + evaluator = GovernanceEvaluator( + _pack(_rule("A.1.1")), + enforcement_mode=EnforcementMode.AUDIT, + audit_manager=mgr, + ) + evaluator.evaluate(_ctx()) + finally: + mgr.close() + + summary = _hook_summary(sink) + assert summary.data["guardrail_dispatched_count"] == 0 diff --git a/tests/test_governance_metadata.py b/tests/test_governance_metadata.py new file mode 100644 index 0000000..0feb264 --- /dev/null +++ b/tests/test_governance_metadata.py @@ -0,0 +1,81 @@ +"""Tests for :class:`GovernanceRuntimeMetadata`. + +The dataclass carries the per-runtime constants every governance +telemetry event stamps. Defaults must keep telemetry flowing when the +host hasn't populated agent type / framework yet, and the runtime +version must resolve from installed package metadata when available. +""" + +from __future__ import annotations + +from unittest.mock import patch + +from uipath.runtime.governance._audit.metadata import ( + NATIVE_EXECUTION_ENGINE, + GovernanceRuntimeMetadata, + _resolve_runtime_version, +) + + +def test_defaults() -> None: + """Default-constructed metadata uses native engine + ``unknown`` slots.""" + meta = GovernanceRuntimeMetadata() + assert meta.execution_engine == NATIVE_EXECUTION_ENGINE + assert meta.agent_type == "unknown" + assert meta.agent_framework == "unknown" + # runtime_version is resolved at construction; either real or "unknown" + assert isinstance(meta.runtime_version, str) + assert meta.runtime_version != "" + + +def test_explicit_overrides_persist() -> None: + """Host-supplied values override the defaults verbatim.""" + meta = GovernanceRuntimeMetadata( + execution_engine="agt", + agent_type="uipath_coded", + agent_framework="langchain", + runtime_version="1.2.3", + ) + assert meta.execution_engine == "agt" + assert meta.agent_type == "uipath_coded" + assert meta.agent_framework == "langchain" + assert meta.runtime_version == "1.2.3" + + +def test_frozen() -> None: + """Dataclass is frozen — host can't mutate per-run constants mid-run.""" + meta = GovernanceRuntimeMetadata() + try: + meta.agent_type = "other" # type: ignore[misc] + except Exception as exc: + assert "frozen" in str(exc).lower() or "cannot assign" in str(exc).lower() + else: + raise AssertionError("frozen dataclass must reject attribute writes") + + +def test_as_payload_contains_all_four_fields() -> None: + """``as_payload`` is the canonical merge-into-event-data dict shape.""" + meta = GovernanceRuntimeMetadata( + execution_engine="agt", + agent_type="uipath_coded", + agent_framework="langchain", + runtime_version="1.2.3", + ) + payload = meta.as_payload() + assert payload == { + "execution_engine": "agt", + "agent_type": "uipath_coded", + "agent_framework": "langchain", + "runtime_version": "1.2.3", + } + + +def test_runtime_version_fallback_on_missing_package() -> None: + """A source checkout with no installed metadata falls back to ``unknown``.""" + from importlib.metadata import PackageNotFoundError + + with patch( + "uipath.runtime.governance._audit.metadata.version", + side_effect=PackageNotFoundError("uipath-runtime"), + ): + assert _resolve_runtime_version() == "unknown" diff --git a/tests/test_track_events_sink.py b/tests/test_track_events_sink.py new file mode 100644 index 0000000..31312b8 --- /dev/null +++ b/tests/test_track_events_sink.py @@ -0,0 +1,405 @@ +"""Tests for :class:`TrackEventAuditSink`. + +Verifies the volume-control filter (``accepts``), event-name vocabulary, +payload shape (runtime metadata + per-event fields), and trace-id → +operation-id correlation. Uses a capture-callable in place of the real +``UiPathPlatformGovernanceProvider.track_event`` so no HTTP fires. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any + +import pytest +from uipath.core.governance import EnforcementMode + +from uipath.runtime.governance._audit.base import AuditEvent, EventType +from uipath.runtime.governance._audit.metadata import GovernanceRuntimeMetadata +from uipath.runtime.governance._audit.track_events import ( + EVENT_HOOK_SUMMARY, + EVENT_RULE_DENIED, + TrackEventAuditSink, +) + + +class _Capture: + """Stand-in for ``provider.track_event`` — records every call.""" + + def __init__(self) -> None: + self.calls: list[dict[str, Any]] = [] + + def __call__( + self, + *, + event_name: str, + data: dict[str, Any] | None = None, + operation_id: str | None = None, + ) -> None: + self.calls.append( + {"event_name": event_name, "data": data, "operation_id": operation_id} + ) + + +@pytest.fixture +def capture() -> _Capture: + return _Capture() + + +@pytest.fixture +def metadata() -> GovernanceRuntimeMetadata: + return GovernanceRuntimeMetadata( + execution_engine="uipath_native_governance_checker", + agent_type="uipath_coded", + agent_framework="langchain", + runtime_version="9.9.9-test", + ) + + +@pytest.fixture +def sink( + capture: _Capture, metadata: GovernanceRuntimeMetadata +) -> TrackEventAuditSink: + return TrackEventAuditSink(capture, metadata) + + +def _rule_event( + *, + matched: bool, + action: str = "deny", + mode: EnforcementMode = EnforcementMode.AUDIT, + trace_id: str = "trace-1", + duration_ms: float = 12.5, + mapped_to_uipath: bool = False, +) -> AuditEvent: + return AuditEvent( + event_type=EventType.RULE_EVALUATION, + trace_id=trace_id, + agent_name="agent-x", + hook="after_model", + data={ + "policy_id": "A.6.1.4", + "rule_name": "commitment-language", + "pack_name": "iso42001", + "matched": matched, + "action": action, + "enforcement_mode": mode, + "detail": "Detected a commitment phrase.", + "duration_ms": duration_ms, + "mapped_to_uipath": mapped_to_uipath, + }, + timestamp=datetime(2026, 6, 25, 12, 0, 0, tzinfo=timezone.utc), + ) + + +def _hook_event( + *, + mode: EnforcementMode = EnforcementMode.AUDIT, + duration_ms: float = 45.0, + passed_count: int = 4, + denied_count: int = 1, + skipped_policy_names: list[str] | None = None, + guardrail_dispatched_count: int = 0, + trace_id: str = "trace-1", +) -> AuditEvent: + return AuditEvent( + event_type=EventType.HOOK_END, + trace_id=trace_id, + agent_name="agent-x", + hook="after_model", + data={ + "total_rules": passed_count + denied_count, + "matched_rules": denied_count, + "final_action": "audit", + "enforcement_mode": mode, + "duration_ms": duration_ms, + "passed_count": passed_count, + "denied_count": denied_count, + "skipped_count": len(skipped_policy_names or []), + "skipped_policy_names": list(skipped_policy_names or []), + "guardrail_dispatched_count": guardrail_dispatched_count, + }, + timestamp=datetime(2026, 6, 25, 12, 0, 0, tzinfo=timezone.utc), + ) + + +# --------------------------------------------------------------------------- +# accepts() — volume-control filter +# --------------------------------------------------------------------------- + + +def test_accepts_matched_rule_evaluation(sink: TrackEventAuditSink) -> None: + assert sink.accepts(_rule_event(matched=True)) is True + + +def test_rejects_unmatched_rule_evaluation(sink: TrackEventAuditSink) -> None: + assert sink.accepts(_rule_event(matched=False)) is False + + +def test_rejects_matched_allow_rule(sink: TrackEventAuditSink) -> None: + """A matched rule with action=allow is a positive informational + match — it should NOT trigger the ``rule.denied`` stream. It rolls + into the hook summary's ``passed_count`` instead. + """ + assert sink.accepts(_rule_event(matched=True, action="allow")) is False + + +def test_direct_emit_of_unmatched_rule_is_noop( + sink: TrackEventAuditSink, capture: _Capture +) -> None: + """Defense-in-depth: even if a caller bypasses ``accepts``, passed + rules must not be routed to ``governance.rule.denied``. + + The AuditManager dispatch path always consults ``accepts`` first, + but a direct ``sink.emit(...)`` call (tests, future alternate + dispatch) must still produce zero ``track_event`` calls for a + ``matched=False`` rule. + """ + sink.emit(_rule_event(matched=False)) + assert capture.calls == [] + + +def test_direct_emit_of_matched_allow_rule_is_noop( + sink: TrackEventAuditSink, capture: _Capture +) -> None: + """Defense-in-depth equivalent of the matched-but-allow filter.""" + sink.emit(_rule_event(matched=True, action="allow")) + assert capture.calls == [] + + +# --------------------------------------------------------------------------- +# DISABLED mode — zero telemetry across every event type +# --------------------------------------------------------------------------- + + +def test_rejects_disabled_mode_rule_evaluation(sink: TrackEventAuditSink) -> None: + """Governance off → ``accepts`` returns False for rule events.""" + assert ( + sink.accepts(_rule_event(matched=True, mode=EnforcementMode.DISABLED)) + is False + ) + + +def test_rejects_disabled_mode_hook_summary(sink: TrackEventAuditSink) -> None: + """Governance off → ``accepts`` returns False for hook summaries.""" + assert sink.accepts(_hook_event(mode=EnforcementMode.DISABLED)) is False + + +def test_direct_emit_of_disabled_rule_is_noop( + sink: TrackEventAuditSink, capture: _Capture +) -> None: + """Defense-in-depth: ``emit`` drops DISABLED-mode events too.""" + sink.emit(_rule_event(matched=True, mode=EnforcementMode.DISABLED)) + assert capture.calls == [] + + +def test_direct_emit_of_disabled_hook_is_noop( + sink: TrackEventAuditSink, capture: _Capture +) -> None: + """Defense-in-depth for hook summary in DISABLED mode.""" + sink.emit(_hook_event(mode=EnforcementMode.DISABLED)) + assert capture.calls == [] + + +# --------------------------------------------------------------------------- +# Empty hook summary suppression +# --------------------------------------------------------------------------- + + +def test_rejects_empty_hook_summary(sink: TrackEventAuditSink) -> None: + """``total_rules=0`` AND ``skipped_count=0`` → nothing to report.""" + ev = _hook_event(passed_count=0, denied_count=0, skipped_policy_names=[]) + assert sink.accepts(ev) is False + + +def test_accepts_hook_summary_with_only_skipped(sink: TrackEventAuditSink) -> None: + """Hook with ``total=0`` but ``skipped_count>0`` is still operator-useful.""" + ev = _hook_event( + passed_count=0, + denied_count=0, + skipped_policy_names=["A.1.1", "A.1.2"], + ) + assert sink.accepts(ev) is True + + +def test_direct_emit_of_empty_hook_is_noop( + sink: TrackEventAuditSink, capture: _Capture +) -> None: + """Defense-in-depth equivalent of the empty-hook filter.""" + sink.emit(_hook_event(passed_count=0, denied_count=0, skipped_policy_names=[])) + assert capture.calls == [] + + +def test_accepts_hook_end(sink: TrackEventAuditSink) -> None: + assert sink.accepts(_hook_event()) is True + + +def test_rejects_other_event_types(sink: TrackEventAuditSink) -> None: + for et in ( + EventType.HOOK_START, + EventType.SESSION_START, + EventType.SESSION_END, + EventType.POLICY_VIOLATION, + EventType.POLICY_ALLOW, + EventType.PACKS_LOADED, + ): + ev = AuditEvent(event_type=et, agent_name="agent-x") + assert sink.accepts(ev) is False, f"sink must drop {et}" + + +# --------------------------------------------------------------------------- +# emit() — rule-denied event shape +# --------------------------------------------------------------------------- + + +def test_rule_denied_event_name_and_operation_id( + sink: TrackEventAuditSink, capture: _Capture +) -> None: + sink.emit(_rule_event(matched=True, trace_id="trace-7")) + assert len(capture.calls) == 1 + call = capture.calls[0] + assert call["event_name"] == EVENT_RULE_DENIED + assert call["operation_id"] == "trace-7" + + +def test_rule_denied_payload_carries_metadata( + sink: TrackEventAuditSink, capture: _Capture +) -> None: + sink.emit(_rule_event(matched=True)) + data = capture.calls[0]["data"] + assert data["execution_engine"] == "uipath_native_governance_checker" + assert data["agent_type"] == "uipath_coded" + assert data["agent_framework"] == "langchain" + assert data["runtime_version"] == "9.9.9-test" + assert data["agent_name"] == "agent-x" + assert data["hook"] == "after_model" + + +def test_rule_denied_payload_splits_pack_and_clause( + sink: TrackEventAuditSink, capture: _Capture +) -> None: + """``pack`` and ``clause`` are separate fields so KQL can aggregate.""" + sink.emit(_rule_event(matched=True)) + data = capture.calls[0]["data"] + assert data["pack"] == "iso42001" + assert data["clause"] == "A.6.1.4" + assert data["rule_name"] == "commitment-language" + + +def test_rule_denied_audit_mode_collapses_to_audit( + sink: TrackEventAuditSink, capture: _Capture +) -> None: + """AUDIT mode never escalates DENY past AUDIT, but evaluator_result is true.""" + sink.emit(_rule_event(matched=True, action="deny", mode=EnforcementMode.AUDIT)) + data = capture.calls[0]["data"] + assert data["mode"] == "AUDIT" + assert data["evaluator_result"] == "DENY" + assert data["action_applied"] == "AUDIT" + + +def test_rule_denied_enforce_mode_deny( + sink: TrackEventAuditSink, capture: _Capture +) -> None: + sink.emit(_rule_event(matched=True, action="deny", mode=EnforcementMode.ENFORCE)) + data = capture.calls[0]["data"] + assert data["mode"] == "ENFORCE" + assert data["evaluator_result"] == "DENY" + assert data["action_applied"] == "DENY" + + +def test_rule_denied_enforce_mode_escalate_is_hitl( + sink: TrackEventAuditSink, capture: _Capture +) -> None: + sink.emit( + _rule_event(matched=True, action="escalate", mode=EnforcementMode.ENFORCE) + ) + data = capture.calls[0]["data"] + assert data["evaluator_result"] == "HITL" + assert data["action_applied"] == "HITL" + + +def test_rule_denied_carries_duration( + sink: TrackEventAuditSink, capture: _Capture +) -> None: + sink.emit(_rule_event(matched=True, duration_ms=42.5)) + assert capture.calls[0]["data"]["duration_ms"] == 42.5 + + +def test_rule_denied_mapped_to_uipath_passes_through( + sink: TrackEventAuditSink, capture: _Capture +) -> None: + """Field defaults False for native events but the schema is stable.""" + sink.emit(_rule_event(matched=True, mapped_to_uipath=False)) + assert capture.calls[0]["data"]["mapped_to_uipath"] is False + + +# --------------------------------------------------------------------------- +# emit() — hook-summary event shape +# --------------------------------------------------------------------------- + + +def test_hook_summary_event_name_and_operation_id( + sink: TrackEventAuditSink, capture: _Capture +) -> None: + sink.emit(_hook_event(trace_id="trace-9")) + assert len(capture.calls) == 1 + assert capture.calls[0]["event_name"] == EVENT_HOOK_SUMMARY + assert capture.calls[0]["operation_id"] == "trace-9" + + +def test_hook_summary_carries_counts( + sink: TrackEventAuditSink, capture: _Capture +) -> None: + sink.emit( + _hook_event( + passed_count=10, + denied_count=2, + skipped_policy_names=["A.1.2", "A.3.4"], + guardrail_dispatched_count=3, + ) + ) + data = capture.calls[0]["data"] + assert data["passed_count"] == 10 + assert data["denied_count"] == 2 + assert data["skipped_count"] == 2 + assert data["skipped_policy_names"] == ["A.1.2", "A.3.4"] + assert data["guardrail_dispatched_count"] == 3 + + +def test_hook_summary_duration_ms( + sink: TrackEventAuditSink, capture: _Capture +) -> None: + sink.emit(_hook_event(duration_ms=123.4)) + assert capture.calls[0]["data"]["duration_ms"] == 123.4 + + +def test_hook_summary_carries_metadata( + sink: TrackEventAuditSink, capture: _Capture +) -> None: + """Same per-runtime metadata stamping as rule-denied events.""" + sink.emit(_hook_event()) + data = capture.calls[0]["data"] + assert data["execution_engine"] == "uipath_native_governance_checker" + assert data["agent_type"] == "uipath_coded" + assert data["agent_framework"] == "langchain" + assert data["runtime_version"] == "9.9.9-test" + + +def test_hook_summary_mode_string( + sink: TrackEventAuditSink, capture: _Capture +) -> None: + sink.emit(_hook_event(mode=EnforcementMode.ENFORCE)) + assert capture.calls[0]["data"]["mode"] == "ENFORCE" + + +# --------------------------------------------------------------------------- +# operation_id fallback when trace_id is empty +# --------------------------------------------------------------------------- + + +def test_operation_id_none_when_trace_id_empty( + sink: TrackEventAuditSink, capture: _Capture +) -> None: + """Empty trace_id resolves to ``None`` so App Insights generates its own.""" + sink.emit(_rule_event(matched=True, trace_id="")) + assert capture.calls[0]["operation_id"] is None