diff --git a/src/uipath/_cli/_evals/_runtime.py b/src/uipath/_cli/_evals/_runtime.py index 8db78ab43..54f52dc29 100644 --- a/src/uipath/_cli/_evals/_runtime.py +++ b/src/uipath/_cli/_evals/_runtime.py @@ -18,6 +18,7 @@ import coverage from opentelemetry import context as context_api +from opentelemetry import trace from opentelemetry.sdk.trace import ReadableSpan, Span from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult from pydantic import BaseModel @@ -299,60 +300,72 @@ async def execute(self) -> UiPathRuntimeResult: ) try: with self._mocker_cache(): - ( - evaluation_set, - evaluators, - evaluation_iterable, - ) = await self.initiate_evaluation(runtime) - workers = self.context.workers or 1 - assert workers >= 1 - eval_run_result_list = await execute_parallel( - evaluation_iterable, workers - ) - results = UiPathEvalOutput( - evaluation_set_name=evaluation_set.name, - evaluation_set_results=eval_run_result_list, - ) + # Create the parent "Evaluation set run" span + tracer = trace.get_tracer(__name__) + span_attributes = { + "execution.id": self.execution_id, + "span_type": "eval_set_run", + } + if self.context.eval_set_run_id: + span_attributes["eval_set_run_id"] = self.context.eval_set_run_id + with tracer.start_as_current_span( + "Evaluation Set Run", + attributes=span_attributes + ): + ( + evaluation_set, + evaluators, + evaluation_iterable, + ) = await self.initiate_evaluation(runtime) + workers = self.context.workers or 1 + assert workers >= 1 + eval_run_result_list = await execute_parallel( + evaluation_iterable, workers + ) + results = UiPathEvalOutput( + evaluation_set_name=evaluation_set.name, + evaluation_set_results=eval_run_result_list, + ) - # Computing evaluator averages - evaluator_averages: dict[str, float] = defaultdict(float) - evaluator_count: dict[str, int] = defaultdict(int) - - # Check if any eval runs failed - any_failed = False - for eval_run_result in results.evaluation_set_results: - # Check if the agent execution had an error - if ( - eval_run_result.agent_execution_output - and eval_run_result.agent_execution_output.result.error - ): - any_failed = True - - for result_dto in eval_run_result.evaluation_run_results: - evaluator_averages[result_dto.evaluator_id] += ( - result_dto.result.score - ) - evaluator_count[result_dto.evaluator_id] += 1 + # Computing evaluator averages + evaluator_averages: dict[str, float] = defaultdict(float) + evaluator_count: dict[str, int] = defaultdict(int) + + # Check if any eval runs failed + any_failed = False + for eval_run_result in results.evaluation_set_results: + # Check if the agent execution had an error + if ( + eval_run_result.agent_execution_output + and eval_run_result.agent_execution_output.result.error + ): + any_failed = True + + for result_dto in eval_run_result.evaluation_run_results: + evaluator_averages[result_dto.evaluator_id] += ( + result_dto.result.score + ) + evaluator_count[result_dto.evaluator_id] += 1 - for eval_id in evaluator_averages: - evaluator_averages[eval_id] = ( - evaluator_averages[eval_id] / evaluator_count[eval_id] + for eval_id in evaluator_averages: + evaluator_averages[eval_id] = ( + evaluator_averages[eval_id] / evaluator_count[eval_id] + ) + await self.event_bus.publish( + EvaluationEvents.UPDATE_EVAL_SET_RUN, + EvalSetRunUpdatedEvent( + execution_id=self.execution_id, + evaluator_scores=evaluator_averages, + success=not any_failed, + ), + wait_for_completion=False, ) - await self.event_bus.publish( - EvaluationEvents.UPDATE_EVAL_SET_RUN, - EvalSetRunUpdatedEvent( - execution_id=self.execution_id, - evaluator_scores=evaluator_averages, - success=not any_failed, - ), - wait_for_completion=False, - ) - result = UiPathRuntimeResult( - output={**results.model_dump(by_alias=True)}, - status=UiPathRuntimeStatus.SUCCESSFUL, - ) - return result + result = UiPathRuntimeResult( + output={**results.model_dump(by_alias=True)}, + status=UiPathRuntimeStatus.SUCCESSFUL, + ) + return result finally: await runtime.dispose() @@ -378,165 +391,176 @@ async def _execute_eval( ), ) - evaluation_run_results = EvaluationRunResult( - evaluation_name=eval_item.name, evaluation_run_results=[] - ) + # Create the "Evaluation" span for this eval item + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span( + "Evaluation", + attributes={ + "execution.id": execution_id, + "span_type": "evaluation", + "eval_item_id": eval_item.id, + "eval_item_name": eval_item.name, + } + ): + evaluation_run_results = EvaluationRunResult( + evaluation_name=eval_item.name, evaluation_run_results=[] + ) - try: try: - agent_execution_output = await self.execute_runtime( - eval_item, execution_id, runtime - ) - except Exception as e: - if self.context.verbose: - if isinstance(e, EvaluationRuntimeException): - spans = e.spans - logs = e.logs - execution_time = e.execution_time - loggable_error = e.root_exception - else: - spans = [] - logs = [] - execution_time = 0 - loggable_error = e - - error_info = UiPathErrorContract( - code="RUNTIME_SHUTDOWN_ERROR", - title="Runtime shutdown failed", - detail=f"Error: {str(loggable_error)}", - category=UiPathErrorCategory.UNKNOWN, - ) - error_result = UiPathRuntimeResult( - status=UiPathRuntimeStatus.FAULTED, - error=error_info, + try: + agent_execution_output = await self.execute_runtime( + eval_item, execution_id, runtime ) + except Exception as e: + if self.context.verbose: + if isinstance(e, EvaluationRuntimeException): + spans = e.spans + logs = e.logs + execution_time = e.execution_time + loggable_error = e.root_exception + else: + spans = [] + logs = [] + execution_time = 0 + loggable_error = e + + error_info = UiPathErrorContract( + code="RUNTIME_SHUTDOWN_ERROR", + title="Runtime shutdown failed", + detail=f"Error: {str(loggable_error)}", + category=UiPathErrorCategory.UNKNOWN, + ) + error_result = UiPathRuntimeResult( + status=UiPathRuntimeStatus.FAULTED, + error=error_info, + ) + evaluation_run_results.agent_execution_output = ( + convert_eval_execution_output_to_serializable( + UiPathEvalRunExecutionOutput( + execution_time=execution_time, + result=error_result, + spans=spans, + logs=logs, + ) + ) + ) + raise + + if self.context.verbose: evaluation_run_results.agent_execution_output = ( convert_eval_execution_output_to_serializable( - UiPathEvalRunExecutionOutput( - execution_time=execution_time, - result=error_result, - spans=spans, - logs=logs, - ) + agent_execution_output ) ) - raise - - if self.context.verbose: - evaluation_run_results.agent_execution_output = ( - convert_eval_execution_output_to_serializable( - agent_execution_output + evaluation_item_results: list[EvalItemResult] = [] + + for evaluator in evaluators: + if evaluator.id not in eval_item.evaluation_criterias: + # Skip! + continue + evaluation_criteria = eval_item.evaluation_criterias[evaluator.id] + + evaluation_result = await self.run_evaluator( + evaluator=evaluator, + execution_output=agent_execution_output, + eval_item=eval_item, + evaluation_criteria=evaluator.evaluation_criteria_type( + **evaluation_criteria + ) + if evaluation_criteria + else evaluator.evaluator_config.default_evaluation_criteria, ) - ) - evaluation_item_results: list[EvalItemResult] = [] - for evaluator in evaluators: - if evaluator.id not in eval_item.evaluation_criterias: - # Skip! - continue - evaluation_criteria = eval_item.evaluation_criterias[evaluator.id] - - evaluation_result = await self.run_evaluator( - evaluator=evaluator, - execution_output=agent_execution_output, - eval_item=eval_item, - evaluation_criteria=evaluator.evaluation_criteria_type( - **evaluation_criteria + dto_result = EvaluationResultDto.from_evaluation_result( + evaluation_result ) - if evaluation_criteria - else evaluator.evaluator_config.default_evaluation_criteria, - ) - - dto_result = EvaluationResultDto.from_evaluation_result( - evaluation_result - ) - evaluation_run_results.evaluation_run_results.append( - EvaluationRunResultDto( - evaluator_name=evaluator.name, - result=dto_result, - evaluator_id=evaluator.id, + evaluation_run_results.evaluation_run_results.append( + EvaluationRunResultDto( + evaluator_name=evaluator.name, + result=dto_result, + evaluator_id=evaluator.id, + ) ) - ) - evaluation_item_results.append( - EvalItemResult( - evaluator_id=evaluator.id, - result=evaluation_result, + evaluation_item_results.append( + EvalItemResult( + evaluator_id=evaluator.id, + result=evaluation_result, + ) ) + + exception_details = None + agent_output = agent_execution_output.result.output + if agent_execution_output.result.status == UiPathRuntimeStatus.FAULTED: + error = agent_execution_output.result.error + if error is not None: + # we set the exception details for the run event + # Convert error contract to exception + error_exception = Exception( + f"{error.title}: {error.detail} (code: {error.code})" + ) + exception_details = EvalItemExceptionDetails( + exception=error_exception + ) + agent_output = error.model_dump() + + await self.event_bus.publish( + EvaluationEvents.UPDATE_EVAL_RUN, + EvalRunUpdatedEvent( + execution_id=execution_id, + eval_item=eval_item, + eval_results=evaluation_item_results, + success=not agent_execution_output.result.error, + agent_output=agent_output, + agent_execution_time=agent_execution_output.execution_time, + spans=agent_execution_output.spans, + logs=agent_execution_output.logs, + exception_details=exception_details, + ), + wait_for_completion=False, ) - exception_details = None - agent_output = agent_execution_output.result.output - if agent_execution_output.result.status == UiPathRuntimeStatus.FAULTED: - error = agent_execution_output.result.error - if error is not None: - # we set the exception details for the run event - # Convert error contract to exception - error_exception = Exception( - f"{error.title}: {error.detail} (code: {error.code})" - ) - exception_details = EvalItemExceptionDetails( - exception=error_exception + except Exception as e: + exception_details = EvalItemExceptionDetails(exception=e) + + for evaluator in evaluators: + evaluation_run_results.evaluation_run_results.append( + EvaluationRunResultDto( + evaluator_name=evaluator.name, + evaluator_id=evaluator.id, + result=EvaluationResultDto(score=0), + ) ) - agent_output = error.model_dump() - await self.event_bus.publish( - EvaluationEvents.UPDATE_EVAL_RUN, - EvalRunUpdatedEvent( + eval_run_updated_event = EvalRunUpdatedEvent( execution_id=execution_id, eval_item=eval_item, - eval_results=evaluation_item_results, - success=not agent_execution_output.result.error, - agent_output=agent_output, - agent_execution_time=agent_execution_output.execution_time, - spans=agent_execution_output.spans, - logs=agent_execution_output.logs, + eval_results=[], + success=False, + agent_output={}, + agent_execution_time=0.0, exception_details=exception_details, - ), - wait_for_completion=False, - ) - - except Exception as e: - exception_details = EvalItemExceptionDetails(exception=e) - - for evaluator in evaluators: - evaluation_run_results.evaluation_run_results.append( - EvaluationRunResultDto( - evaluator_name=evaluator.name, - evaluator_id=evaluator.id, - result=EvaluationResultDto(score=0), - ) + spans=[], + logs=[], ) + if isinstance(e, EvaluationRuntimeException): + eval_run_updated_event.spans = e.spans + eval_run_updated_event.logs = e.logs + if eval_run_updated_event.exception_details: + eval_run_updated_event.exception_details.exception = ( + e.root_exception + ) + eval_run_updated_event.exception_details.runtime_exception = True - eval_run_updated_event = EvalRunUpdatedEvent( - execution_id=execution_id, - eval_item=eval_item, - eval_results=[], - success=False, - agent_output={}, - agent_execution_time=0.0, - exception_details=exception_details, - spans=[], - logs=[], - ) - if isinstance(e, EvaluationRuntimeException): - eval_run_updated_event.spans = e.spans - eval_run_updated_event.logs = e.logs - if eval_run_updated_event.exception_details: - eval_run_updated_event.exception_details.exception = ( - e.root_exception - ) - eval_run_updated_event.exception_details.runtime_exception = True - - await self.event_bus.publish( - EvaluationEvents.UPDATE_EVAL_RUN, - eval_run_updated_event, - wait_for_completion=False, - ) - finally: - clear_execution_context() + await self.event_bus.publish( + EvaluationEvents.UPDATE_EVAL_RUN, + eval_run_updated_event, + wait_for_completion=False, + ) + finally: + clear_execution_context() - return evaluation_run_results + return evaluation_run_results async def _generate_input_for_eval( self, eval_item: EvaluationItem, runtime: UiPathRuntimeProtocol @@ -678,26 +702,37 @@ async def run_evaluator( *, evaluation_criteria: Any, ) -> EvaluationResult: - output_data: dict[str, Any] | str = {} - if execution_output.result.output: - if isinstance(execution_output.result.output, BaseModel): - output_data = execution_output.result.output.model_dump() - else: - output_data = execution_output.result.output - - agent_execution = AgentExecution( - agent_input=eval_item.inputs, - agent_output=output_data, - agent_trace=execution_output.spans, - expected_agent_behavior=eval_item.expected_agent_behavior, - ) - - result = await evaluator.validate_and_evaluate_criteria( - agent_execution=agent_execution, - evaluation_criteria=evaluation_criteria, - ) + # Create span for evaluator execution + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span( + f"Evaluator: {evaluator.name}", + attributes={ + "span_type": "evaluator", + "evaluator_id": evaluator.id, + "evaluator_name": evaluator.name, + "eval_item_id": eval_item.id, + } + ): + output_data: dict[str, Any] | str = {} + if execution_output.result.output: + if isinstance(execution_output.result.output, BaseModel): + output_data = execution_output.result.output.model_dump() + else: + output_data = execution_output.result.output + + agent_execution = AgentExecution( + agent_input=eval_item.inputs, + agent_output=output_data, + agent_trace=execution_output.spans, + expected_agent_behavior=eval_item.expected_agent_behavior, + ) - return result + result = await evaluator.validate_and_evaluate_criteria( + agent_execution=agent_execution, + evaluation_criteria=evaluation_criteria, + ) + + return result async def _get_agent_model(self, runtime: UiPathRuntimeProtocol) -> str | None: """Get agent model from the runtime. diff --git a/src/uipath/_cli/cli_eval.py b/src/uipath/_cli/cli_eval.py index 736d82ae8..f50fc1ef2 100644 --- a/src/uipath/_cli/cli_eval.py +++ b/src/uipath/_cli/cli_eval.py @@ -19,7 +19,7 @@ from uipath._utils._bindings import ResourceOverwritesContext from uipath.eval._helpers import auto_discover_entrypoint from uipath.platform.common import UiPathConfig -from uipath.tracing import LlmOpsHttpExporter +from uipath.tracing import JsonLinesFileExporter, LlmOpsHttpExporter from ._utils._console import ConsoleLogger from ._utils._eval_set import EvalHelpers @@ -98,6 +98,12 @@ def setup_reporting_prereq(no_report: bool) -> bool: default="default", help="Model settings ID from evaluation set to override agent settings (default: 'default')", ) +@click.option( + "--trace-file", + required=False, + type=click.Path(exists=False), + help="File path where the trace data will be exported for local testing", +) def eval( entrypoint: str | None, eval_set: str | None, @@ -109,6 +115,7 @@ def eval( enable_mocker_cache: bool, report_coverage: bool, model_settings_id: str, + trace_file: str | None, ) -> None: """Run an evaluation set against the agent. @@ -122,6 +129,7 @@ def eval( enable_mocker_cache: Enable caching for LLM mocker responses report_coverage: Report evaluation coverage model_settings_id: Model settings ID to override agent settings + trace_file: File path where the trace data will be exported for local testing """ should_register_progress_reporter = setup_reporting_prereq(no_report) @@ -179,6 +187,9 @@ async def execute_eval(): ) as ctx: if ctx.job_id: trace_manager.add_span_exporter(LlmOpsHttpExporter()) + + if trace_file: + trace_manager.add_span_exporter(JsonLinesFileExporter(trace_file)) project_id = UiPathConfig.project_id