diff --git a/backend/app/celery/tasks/job_execution.py b/backend/app/celery/tasks/job_execution.py index 507134e40..428d47c7f 100644 --- a/backend/app/celery/tasks/job_execution.py +++ b/backend/app/celery/tasks/job_execution.py @@ -190,6 +190,26 @@ def run_delete_collection_job( ) +@celery_app.task(bind=True, queue="low_priority", priority=1) +@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_evaluation_batch_submission") +def run_evaluation_batch_submission( + self, project_id: int, job_id: str, trace_id: str, **kwargs +): + from app.services.evaluations.batch_job import execute_evaluation_batch_submission + + _set_trace(trace_id) + return _run_with_otel_parent( + self, + lambda: execute_evaluation_batch_submission( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, + ), + ) + + @celery_app.task(bind=True, queue="low_priority", priority=1) @gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_stt_batch_submission") def run_stt_batch_submission( diff --git a/backend/app/celery/utils.py b/backend/app/celery/utils.py index f0ad23ca1..49bf75c61 100644 --- a/backend/app/celery/utils.py +++ b/backend/app/celery/utils.py @@ -144,6 +144,24 @@ def start_delete_collection_job( return task_id +def start_evaluation_batch_submission( + project_id: int, job_id: str, trace_id: str = "N/A", **kwargs +) -> str: + from app.celery.tasks.job_execution import run_evaluation_batch_submission + + task_id = _enqueue_with_trace_context( + run_evaluation_batch_submission, + project_id=project_id, + job_id=job_id, + trace_id=trace_id, + **kwargs, + ) + logger.info( + f"[start_evaluation_batch_submission] Started job {job_id} with Celery task {task_id}" + ) + return task_id + + def start_stt_batch_submission( project_id: int, job_id: str, trace_id: str = "N/A", **kwargs ) -> str: diff --git a/backend/app/core/batch/gemini.py b/backend/app/core/batch/gemini.py index f121bb9ee..4222afbb6 100644 --- a/backend/app/core/batch/gemini.py +++ b/backend/app/core/batch/gemini.py @@ -349,7 +349,7 @@ def upload_file(self, content: str, purpose: str = "batch") -> str: uploaded_file = self._upload_to_gemini( content=content, suffix=".jsonl", - mime_type="jsonl", + mime_type="application/jsonl", display_name=f"batch-input-{int(time.time())}", ) diff --git a/backend/app/core/batch/operations.py b/backend/app/core/batch/operations.py index d02884fdb..bcd51866b 100644 --- a/backend/app/core/batch/operations.py +++ b/backend/app/core/batch/operations.py @@ -74,7 +74,8 @@ def start_batch_job( logger.error(f"[start_batch_job] Failed | {e}", exc_info=True) batch_job_update = BatchJobUpdate( - error_message=f"Batch creation failed: {str(e)}" + provider_status="failed", + error_message=f"Batch creation failed: {str(e)}", ) update_batch_job( session=session, batch_job=batch_job, batch_job_update=batch_job_update diff --git a/backend/app/crud/evaluations/batch.py b/backend/app/crud/evaluations/batch.py index 4181910b8..0ce20a4c2 100644 --- a/backend/app/crud/evaluations/batch.py +++ b/backend/app/crud/evaluations/batch.py @@ -11,15 +11,23 @@ from typing import Any from langfuse import Langfuse -from openai import OpenAI from sqlmodel import Session -from app.core.batch.base import BATCH_KEY - -from app.core.batch import OpenAIBatchProvider, start_batch_job +from app.core.batch import ( + BATCH_KEY, + GeminiBatchProvider, + OpenAIBatchProvider, + start_batch_job, +) +from app.core.batch.client import GeminiClient from app.models import EvaluationRun from app.models.batch_job import BatchJobType -from app.models.llm.request import KaapiLLMParams +from app.services.llm.mappers import ( + map_kaapi_to_google_params, + map_kaapi_to_openai_params, +) +from app.services.llm.providers.registry import LLMProvider +from app.utils import get_openai_client logger = logging.getLogger(__name__) @@ -62,149 +70,202 @@ def fetch_dataset_items(langfuse: Langfuse, dataset_name: str) -> list[dict[str, return items -def build_evaluation_jsonl( - dataset_items: list[dict[str, Any]], config: KaapiLLMParams +def build_openai_evaluation_jsonl( + dataset_items: list[dict[str, Any]], openai_params: dict[str, Any] ) -> list[dict[str, Any]]: + """Build OpenAI Responses API batch JSONL from Langfuse dataset items. + + Each line: + { + BATCH_KEY: , + "method": "POST", + "url": "/v1/responses", + "body": { ...openai_params, "input": } + } """ - Build JSONL data for evaluation batch using OpenAI Responses API. + jsonl_data: list[dict[str, Any]] = [] + for item in dataset_items: + question = item["input"].get("question", "") + if not question: + logger.warning( + f"[build_openai_evaluation_jsonl] Skipping item - no question found | item_id={item['id']}" + ) + continue - Each line is a dict with: - - BATCH_KEY: Unique identifier for the request (dataset item ID) - - method: POST - - url: /v1/responses - - body: Response request using config as-is with input from dataset + body = dict(openai_params) + body["input"] = question - Args: - dataset_items: List of dataset items from Langfuse - config: Evaluation configuration dict with OpenAI Responses API parameters. - This config is used as-is in the body, with only "input" being added - from the dataset. Config can include any fields like: - - model (required) - - instructions - - tools - - reasoning - - text - - temperature - - include - etc. + jsonl_data.append( + { + BATCH_KEY: item["id"], + "method": "POST", + "url": "/v1/responses", + "body": body, + } + ) + return jsonl_data - Returns: - List of dictionaries (JSONL data) + +def build_google_evaluation_jsonl( + dataset_items: list[dict[str, Any]], google_params: dict[str, Any] +) -> list[dict[str, Any]]: + """Build Gemini batch JSONL from Langfuse dataset items. + + Each line: + { + "key": , + "request": { contents, systemInstruction?, generationConfig? } + } """ - jsonl_data = [] + jsonl_data: list[dict[str, Any]] = [] + system_instruction = google_params.get("instructions") + + generation_config: dict[str, Any] = {} + temperature = google_params.get("temperature") + if temperature is not None: + generation_config["temperature"] = temperature + reasoning = google_params.get("reasoning") + if reasoning: + generation_config["thinkingConfig"] = { + "includeThoughts": False, + "thinkingLevel": reasoning, + } + for item in dataset_items: - # Extract question from input question = item["input"].get("question", "") if not question: logger.warning( - f"[build_evaluation_jsonl] Skipping item - no question found | item_id={item['id']}" + f"[build_google_evaluation_jsonl] Skipping item - no question found | item_id={item['id']}" ) continue - # Build the batch request object for Responses API - # Use config as-is and only add the input field - body: dict[str, Any] = { - "model": config.model, - "instructions": config.instructions, - "input": question, # Add input from dataset + request: dict[str, Any] = { + "contents": [{"parts": [{"text": question}], "role": "user"}], } + if system_instruction: + request["systemInstruction"] = {"parts": [{"text": system_instruction}]} + if generation_config: + request["generationConfig"] = generation_config - if "temperature" in config.model_fields_set: - body["temperature"] = config.temperature - - # Add reasoning only if provided - if config.reasoning: - body["reasoning"] = {"effort": config.reasoning} - - # Add tools only if knowledge_base_ids are provided - if config.knowledge_base_ids: - body["tools"] = [ - { - "type": "file_search", - "vector_store_ids": config.knowledge_base_ids, - "max_num_results": config.max_num_results or 20, - } - ] - - batch_request = { - BATCH_KEY: item["id"], - "method": "POST", - "url": "/v1/responses", - "body": body, - } + jsonl_data.append({"key": item["id"], "request": request}) - jsonl_data.append(batch_request) return jsonl_data def start_evaluation_batch( langfuse: Langfuse, - openai_client: OpenAI, session: Session, eval_run: EvaluationRun, - config: KaapiLLMParams, + params: dict[str, Any], + provider: str, ) -> EvaluationRun: """ - Fetch data, build JSONL, and start evaluation batch. - - This function orchestrates the evaluation-specific logic and delegates - to the generic batch infrastructure for actual batch creation. + Fetch dataset, build JSONL, submit batch via the appropriate provider. Args: langfuse: Configured Langfuse client - openai_client: Configured OpenAI client session: Database session - eval_run: EvaluationRun database object (with run_name, dataset_name, config) - config: KaapiLLMParams with model, instructions, knowledge_base_ids, etc. + eval_run: EvaluationRun database object + params: Kaapi-standardized completion params (dict) + provider: Completion provider ("openai" or "google-aistudio", with optional "-native" suffix) Returns: Updated EvaluationRun with batch_job_id populated - - Raises: - Exception: If any step fails """ try: - # Step 1: Fetch dataset items from Langfuse logger.info( - f"[start_evaluation_batch] Starting evaluation batch | run={eval_run.run_name}" + f"[start_evaluation_batch] Starting evaluation batch | run={eval_run.run_name} | provider={provider}" ) dataset_items = fetch_dataset_items( langfuse=langfuse, dataset_name=eval_run.dataset_name ) - # Step 2: Build evaluation-specific JSONL - jsonl_data = build_evaluation_jsonl(dataset_items=dataset_items, config=config) + base_provider = provider.replace("-native", "") - if not jsonl_data: - raise ValueError( - "Evaluation dataset did not produce any JSONL entries (missing questions?)." + if base_provider == LLMProvider.OPENAI: + mapped_params, warnings = map_kaapi_to_openai_params( + session=session, kaapi_params=params ) + if warnings: + logger.info("[start_evaluation_batch] Mapper warnings: %s", warnings) - # Step 3: Create batch provider - provider = OpenAIBatchProvider(client=openai_client) + jsonl_data = build_openai_evaluation_jsonl( + dataset_items=dataset_items, openai_params=mapped_params + ) + if not jsonl_data: + raise ValueError( + "Evaluation dataset did not produce any JSONL entries (missing questions?)." + ) + + openai_client = get_openai_client( + session=session, + org_id=eval_run.organization_id, + project_id=eval_run.project_id, + ) + batch_provider = OpenAIBatchProvider(client=openai_client) - # Step 4: Prepare batch configuration - batch_config = { - "endpoint": "/v1/responses", - "description": f"Evaluation: {eval_run.run_name}", - "completion_window": "24h", - # Store complete config for reference - "evaluation_config": config.model_dump(exclude_unset=True), - } + batch_config = { + "endpoint": "/v1/responses", + "description": f"Evaluation: {eval_run.run_name}", + "completion_window": "24h", + "evaluation_config": params, + } - # Step 5: Start batch job using generic infrastructure - batch_job = start_batch_job( - session=session, - provider=provider, - provider_name="openai", - job_type=BatchJobType.EVALUATION, - organization_id=eval_run.organization_id, - project_id=eval_run.project_id, - jsonl_data=jsonl_data, - config=batch_config, - ) + batch_job = start_batch_job( + session=session, + provider=batch_provider, + provider_name="openai", + job_type=BatchJobType.EVALUATION, + organization_id=eval_run.organization_id, + project_id=eval_run.project_id, + jsonl_data=jsonl_data, + config=batch_config, + ) + + elif base_provider == LLMProvider.GOOGLE_AISTUDIO: + mapped_params, warnings = map_kaapi_to_google_params( + kaapi_params=params, completion_type="text" + ) + if warnings: + logger.info("[start_evaluation_batch] Mapper warnings: %s", warnings) + + jsonl_data = build_google_evaluation_jsonl( + dataset_items=dataset_items, google_params=mapped_params + ) + if not jsonl_data: + raise ValueError( + "Evaluation dataset did not produce any JSONL entries (missing questions?)." + ) + + gemini_client = GeminiClient.from_credentials( + session=session, + org_id=eval_run.organization_id, + project_id=eval_run.project_id, + ) + model_name = mapped_params.get("model", "gemini-2.5-pro") + batch_provider = GeminiBatchProvider( + client=gemini_client.client, model=model_name + ) + + batch_config = { + "display_name": f"evaluation-{eval_run.run_name}", + "model": f"models/{model_name}", + } + + batch_job = start_batch_job( + session=session, + provider=batch_provider, + provider_name="google-aistudio", + job_type=BatchJobType.EVALUATION, + organization_id=eval_run.organization_id, + project_id=eval_run.project_id, + jsonl_data=jsonl_data, + config=batch_config, + ) + + else: + raise ValueError(f"Unsupported provider for evaluation batches: {provider}") - # Step 6: Link batch_job to evaluation_run eval_run.batch_job_id = batch_job.id eval_run.status = "processing" eval_run.total_items = batch_job.total_items @@ -217,7 +278,8 @@ def start_evaluation_batch( f"[start_evaluation_batch] Successfully started evaluation batch | " f"batch_job_id={batch_job.id} | " f"provider_batch_id={batch_job.provider_batch_id} | " - f"run={eval_run.run_name} | items={batch_job.total_items}" + f"run={eval_run.run_name} | items={batch_job.total_items} | " + f"provider={base_provider}" ) return eval_run diff --git a/backend/app/crud/evaluations/langfuse.py b/backend/app/crud/evaluations/langfuse.py index 98c38990c..4d1255c7a 100644 --- a/backend/app/crud/evaluations/langfuse.py +++ b/backend/app/crud/evaluations/langfuse.py @@ -156,11 +156,17 @@ def create_langfuse_dataset_run( trace_id_mapping[item_id] = trace_id except Exception as e: - logger.error( - f"[create_langfuse_dataset_run] Failed to create trace | " - f"item_id={item_id} | {e}", - exc_info=True, - ) + if getattr(e, "status_code", None) == 429: + logger.error( + f"[create_langfuse_dataset_run] Langfuse rate limit (429) | " + f"item_id={item_id}" + ) + else: + logger.error( + f"[create_langfuse_dataset_run] Failed to create trace | " + f"item_id={item_id} | {e}", + exc_info=True, + ) continue langfuse.flush() diff --git a/backend/app/crud/evaluations/processing.py b/backend/app/crud/evaluations/processing.py index 2fa166546..d1115f2be 100644 --- a/backend/app/crud/evaluations/processing.py +++ b/backend/app/crud/evaluations/processing.py @@ -9,6 +9,7 @@ """ import ast +import asyncio import json import logging from collections import defaultdict @@ -20,13 +21,17 @@ from sqlmodel import Session, select from app.core.batch import ( + GeminiBatchProvider, OpenAIBatchProvider, download_batch_results, poll_batch_status, upload_batch_results_to_object_store, ) -from app.core.batch.base import BATCH_KEY +from app.core.batch.base import BATCH_KEY, BatchProvider +from app.core.batch.client import GeminiClient +from app.core.batch.gemini import BatchJobState, extract_text_from_response_dict from app.crud.evaluations.batch import fetch_dataset_items +from app.services.llm.providers.registry import LLMProvider from app.crud.evaluations.core import resolve_model_from_config, update_evaluation_run from app.crud.evaluations.cost import attach_cost from app.crud.evaluations.embeddings import ( @@ -48,6 +53,35 @@ logger = logging.getLogger(__name__) +def _get_batch_provider( + session: Session, + provider_name: str, + organization_id: int, + project_id: int, +) -> BatchProvider: + """Get appropriate batch provider instance for the main response batch.""" + if provider_name in (LLMProvider.OPENAI, LLMProvider.OPENAI_NATIVE): + openai_client = get_openai_client( + session=session, + org_id=organization_id, + project_id=project_id, + ) + return OpenAIBatchProvider(client=openai_client) + + if provider_name in ( + LLMProvider.GOOGLE_AISTUDIO, + LLMProvider.GOOGLE_AISTUDIO_NATIVE, + ): + gemini_client = GeminiClient.from_credentials( + session=session, + org_id=organization_id, + project_id=project_id, + ) + return GeminiBatchProvider(client=gemini_client.client) + + raise ValueError(f"Unsupported provider for evaluation polling: {provider_name}") + + def _extract_batch_error_message( provider: OpenAIBatchProvider, error_file_id: str, @@ -116,8 +150,35 @@ def _extract_batch_error_message( return error_msg +def _extract_gemini_usage(response: dict[str, Any]) -> dict[str, Any] | None: + """Map Gemini usageMetadata to OpenAI-style usage dict for cost tracking.""" + usage_metadata = response.get("usageMetadata") or response.get("usage_metadata") + if not isinstance(usage_metadata, dict): + return None + input_tokens = usage_metadata.get("promptTokenCount") or usage_metadata.get( + "prompt_token_count" + ) + output_tokens = usage_metadata.get("candidatesTokenCount") or usage_metadata.get( + "candidates_token_count" + ) + total_tokens = usage_metadata.get("totalTokenCount") or usage_metadata.get( + "total_token_count" + ) + reasoning_tokens = usage_metadata.get("thoughtsTokenCount") or usage_metadata.get( + "thoughts_token_count" + ) + return { + "input_tokens": input_tokens or 0, + "output_tokens": output_tokens or 0, + "total_tokens": total_tokens or 0, + "reasoning_tokens": reasoning_tokens or 0, + } + + def parse_evaluation_output( - raw_results: list[dict[str, Any]], dataset_items: list[dict[str, Any]] + raw_results: list[dict[str, Any]], + dataset_items: list[dict[str, Any]], + provider_name: str = LLMProvider.OPENAI, ) -> list[dict[str, Any]]: """ Parse batch output into evaluation results. @@ -149,20 +210,22 @@ def parse_evaluation_output( """ # Create lookup map for dataset items by ID dataset_map = {item["id"]: item for item in dataset_items} + is_google = provider_name in ( + LLMProvider.GOOGLE_AISTUDIO, + LLMProvider.GOOGLE_AISTUDIO_NATIVE, + ) results = [] for line_num, response in enumerate(raw_results, 1): try: - # Extract BATCH_KEY (which is our dataset item ID) - item_id = response.get(BATCH_KEY) + item_id = response.get(BATCH_KEY) or response.get("key") if not item_id: logger.warning( - f"[parse_evaluation_output] No {BATCH_KEY} found, skipping | line={line_num}" + f"[parse_evaluation_output] No item_id found, skipping | line={line_num}" ) continue - # Get original dataset item dataset_item = dataset_map.get(item_id) if not dataset_item: logger.warning( @@ -170,64 +233,70 @@ def parse_evaluation_output( ) continue - # Extract the response body - response_body = response.get("response", {}).get("body", {}) + generated_output = "" + response_id: str | None = None + usage: dict[str, Any] | None = None - # Extract response ID from response.body.id - response_id = response_body.get("id") + if is_google: + gemini_response = response.get("response") + error = response.get("error") + if error: + error_msg = str(error) + logger.warning( + f"[parse_evaluation_output] Item had error | item_id={item_id} | {error_msg}" + ) + generated_output = f"ERROR: {error_msg}" + elif isinstance(gemini_response, dict): + text = extract_text_from_response_dict(gemini_response) + generated_output = text or "" + response_id = gemini_response.get( + "responseId" + ) or gemini_response.get("response_id") + usage = _extract_gemini_usage(gemini_response) + else: + response_body = response.get("response", {}).get("body", {}) + response_id = response_body.get("id") + usage = response_body.get("usage") - # Extract usage information for cost tracking - usage = response_body.get("usage") + if response.get("error"): + error_msg = response["error"].get("message", "Unknown error") + logger.warning( + f"[parse_evaluation_output] Item had error | item_id={item_id} | {error_msg}" + ) + generated_output = f"ERROR: {error_msg}" + else: + output = response_body.get("output", "") - # Handle errors in batch processing - if response.get("error"): - error_msg = response["error"].get("message", "Unknown error") - logger.warning( - f"[parse_evaluation_output] Item had error | item_id={item_id} | {error_msg}" - ) - generated_output = f"ERROR: {error_msg}" - else: - # Extract text from output (can be string, list, or complex structure) - output = response_body.get("output", "") - - # If string, try to parse it (may be JSON or Python repr of list) - if isinstance(output, str): - try: - output = json.loads(output) - except (json.JSONDecodeError, ValueError): + if isinstance(output, str): try: - output = ast.literal_eval(output) - except (ValueError, SyntaxError): - # Keep as string if parsing fails - generated_output = output - output = None - - # If we have a list structure, extract text from message items - if isinstance(output, list): - generated_output = "" - for item in output: - if isinstance(item, dict) and item.get("type") == "message": - for content in item.get("content", []): - if ( - isinstance(content, dict) - and content.get("type") == "output_text" - ): - generated_output = content.get("text", "") + output = json.loads(output) + except (json.JSONDecodeError, ValueError): + try: + output = ast.literal_eval(output) + except (ValueError, SyntaxError): + generated_output = output + output = None + + if isinstance(output, list): + for item in output: + if isinstance(item, dict) and item.get("type") == "message": + for content in item.get("content", []): + if ( + isinstance(content, dict) + and content.get("type") == "output_text" + ): + generated_output = content.get("text", "") + break + if generated_output: break - if generated_output: - break - elif output is not None: - # output was not a string and not a list - generated_output = "" - logger.warning( - f"[parse_evaluation_output] Unexpected output type | item_id={item_id} | type={type(output)}" - ) + elif output is not None: + generated_output = "" + logger.warning( + f"[parse_evaluation_output] Unexpected output type | item_id={item_id} | type={type(output)}" + ) - # Extract question and ground truth from dataset item question = dataset_item["input"].get("question", "") ground_truth = dataset_item["expected_output"].get("answer", "") - - # Extract question_id from dataset item metadata question_id = dataset_item.get("metadata", {}).get("question_id") results.append( @@ -301,8 +370,15 @@ async def process_completed_evaluation( logger.info( f"[process_completed_evaluation] {log_prefix} Downloading batch results | batch_job_id={batch_job.id}" ) - provider = OpenAIBatchProvider(client=openai_client) - raw_results = download_batch_results(provider=provider, batch_job=batch_job) + provider = _get_batch_provider( + session=session, + provider_name=batch_job.provider, + organization_id=eval_run.organization_id, + project_id=eval_run.project_id, + ) + raw_results = await asyncio.to_thread( + download_batch_results, provider=provider, batch_job=batch_job + ) # Step 2a: Upload raw results to object store for evaluation_run object_store_url = None @@ -319,13 +395,17 @@ async def process_completed_evaluation( logger.info( f"[process_completed_evaluation] {log_prefix} Fetching dataset items | dataset={eval_run.dataset_name}" ) - dataset_items = fetch_dataset_items( - langfuse=langfuse, dataset_name=eval_run.dataset_name + dataset_items = await asyncio.to_thread( + fetch_dataset_items, + langfuse=langfuse, + dataset_name=eval_run.dataset_name, ) # Step 4: Parse evaluation results results = parse_evaluation_output( - raw_results=raw_results, dataset_items=dataset_items + raw_results=raw_results, + dataset_items=dataset_items, + provider_name=batch_job.provider, ) if not results: @@ -349,7 +429,8 @@ async def process_completed_evaluation( update=EvaluationRunUpdate(cost=eval_run.cost), ) - trace_id_mapping = create_langfuse_dataset_run( + trace_id_mapping = await asyncio.to_thread( + create_langfuse_dataset_run, langfuse=langfuse, dataset_name=eval_run.dataset_name, model=model, @@ -366,7 +447,8 @@ async def process_completed_evaluation( # Step 6: Start embedding batch for similarity scoring # Pass trace_id_mapping directly without storing in DB try: - eval_run = start_embedding_batch( + eval_run = await asyncio.to_thread( + start_embedding_batch, session=session, openai_client=openai_client, eval_run=eval_run, @@ -464,8 +546,10 @@ async def process_completed_embedding_batch( # Step 2: Create provider and download results provider = OpenAIBatchProvider(client=openai_client) - raw_results = download_batch_results( - provider=provider, batch_job=embedding_batch_job + raw_results = await asyncio.to_thread( + download_batch_results, + provider=provider, + batch_job=embedding_batch_job, ) # Step 3: Parse embedding results @@ -498,7 +582,8 @@ async def process_completed_embedding_batch( per_item_scores = similarity_stats.get("per_item_scores", []) if per_item_scores: try: - update_traces_with_cosine_scores( + await asyncio.to_thread( + update_traces_with_cosine_scores, langfuse=langfuse, per_item_scores=per_item_scores, ) @@ -595,8 +680,11 @@ async def check_and_process_evaluation( if embedding_batch_job: # Poll embedding batch status provider = OpenAIBatchProvider(client=openai_client) - poll_batch_status( - session=session, provider=provider, batch_job=embedding_batch_job + await asyncio.to_thread( + poll_batch_status, + session=session, + provider=provider, + batch_job=embedding_batch_job, ) session.refresh(embedding_batch_job) @@ -667,23 +755,41 @@ async def check_and_process_evaluation( f"BatchJob {eval_run.batch_job_id} not found for evaluation {eval_run.id}" ) - # IMPORTANT: Poll OpenAI to get the latest status before checking - provider = OpenAIBatchProvider(client=openai_client) - status_result = poll_batch_status( - session=session, provider=provider, batch_job=batch_job + # Build batch provider from batch_job.provider (OpenAI or Gemini) + provider = _get_batch_provider( + session=session, + provider_name=batch_job.provider, + organization_id=eval_run.organization_id, + project_id=eval_run.project_id, + ) + status_result = await asyncio.to_thread( + poll_batch_status, + session=session, + provider=provider, + batch_job=batch_job, ) # Refresh batch_job to get the updated provider_status session.refresh(batch_job) provider_status = batch_job.provider_status + is_openai = batch_job.provider in ( + LLMProvider.OPENAI, + LLMProvider.OPENAI_NATIVE, + ) # Handle different provider statuses - if provider_status == "completed": - # Check if batch completed but all requests failed - # (output_file_id is absent, error_file_id is present) - if not status_result.get( - "provider_output_file_id", batch_job.provider_output_file_id - ) and status_result.get("error_file_id"): + if ( + provider_status == "completed" + or provider_status == BatchJobState.SUCCEEDED.value + ): + # OpenAI-only: batch completed but all requests failed (error file present, output file absent) + if ( + is_openai + and not status_result.get( + "provider_output_file_id", batch_job.provider_output_file_id + ) + and status_result.get("error_file_id") + ): error_msg = _extract_batch_error_message( provider=provider, error_file_id=status_result["error_file_id"], @@ -731,7 +837,14 @@ async def check_and_process_evaluation( "action": "processed", } - elif provider_status in ["failed", "expired", "cancelled"]: + elif provider_status in ( + "failed", + "expired", + "cancelled", + BatchJobState.FAILED.value, + BatchJobState.CANCELLED.value, + BatchJobState.EXPIRED.value, + ): # Mark evaluation as failed based on provider status error_msg = batch_job.error_message or f"Provider batch {provider_status}" @@ -934,6 +1047,16 @@ async def poll_all_pending_evaluations(session: Session) -> dict[str, Any]: ) total_failed_count += 1 + # Flush background ingestion threads before discarding this client. + # Each Langfuse() instance spawns TaskManager threads; without flush, + # they accumulate across cron runs and pile up concurrent API calls. + try: + langfuse.flush() + except Exception as flush_err: + logger.warning( + f"[poll_all_pending_evaluations] Langfuse flush failed | project_id={project_id} | {flush_err}" + ) + except Exception as e: logger.error( f"[poll_all_pending_evaluations] Failed to process project | project_id={project_id} | {e}", diff --git a/backend/app/services/evaluations/batch_job.py b/backend/app/services/evaluations/batch_job.py new file mode 100644 index 000000000..ec13045ee --- /dev/null +++ b/backend/app/services/evaluations/batch_job.py @@ -0,0 +1,92 @@ +import logging +from uuid import UUID + +from celery.exceptions import SoftTimeLimitExceeded +from gevent import Timeout +from sqlmodel import Session + +from app.core.db import engine +from app.crud.evaluations import ( + get_evaluation_run_by_id, + resolve_evaluation_config, + start_evaluation_batch, +) +from app.crud.evaluations.core import update_evaluation_run +from app.models.evaluation import EvaluationRunUpdate +from app.utils import get_langfuse_client + +logger = logging.getLogger(__name__) + + +def execute_evaluation_batch_submission( + project_id: int, + job_id: str, + task_id: str, + task_instance, + organization_id: int, + config_id: str, + config_version: int, + **kwargs, +) -> dict: + run_id = int(job_id) + logger.info( + f"[execute_evaluation_batch_submission] Starting | run_id={run_id} | task={task_id}" + ) + with Session(engine) as session: + run = get_evaluation_run_by_id( + session=session, + evaluation_id=run_id, + organization_id=organization_id, + project_id=project_id, + ) + if not run: + return {"success": False, "error": "Run not found"} + try: + config, error = resolve_evaluation_config( + session=session, + config_id=UUID(str(config_id)), + config_version=config_version, + project_id=project_id, + ) + if error: + update_evaluation_run( + session=session, + eval_run=run, + update=EvaluationRunUpdate(status="failed", error_message=error), + ) + return {"success": False, "error": error} + + langfuse = get_langfuse_client( + session=session, org_id=organization_id, project_id=project_id + ) + run = start_evaluation_batch( + langfuse=langfuse, + session=session, + eval_run=run, + params=config.completion.params, + provider=config.completion.provider, + ) + return {"success": True, "batch_job_id": run.batch_job_id} + except (Timeout, SoftTimeLimitExceeded): + logger.warning( + f"[execute_evaluation_batch_submission] Timeout | run_id={run_id}" + ) + update_evaluation_run( + session=session, + eval_run=run, + update=EvaluationRunUpdate( + status="failed", error_message="Task exceeded soft time limit" + ), + ) + raise + except Exception as e: + logger.error( + f"[execute_evaluation_batch_submission] Failed | run_id={run_id} | {e}", + exc_info=True, + ) + update_evaluation_run( + session=session, + eval_run=run, + update=EvaluationRunUpdate(status="failed", error_message=str(e)), + ) + raise diff --git a/backend/app/services/evaluations/evaluation.py b/backend/app/services/evaluations/evaluation.py index 0ad55e494..c8e29e60c 100644 --- a/backend/app/services/evaluations/evaluation.py +++ b/backend/app/services/evaluations/evaluation.py @@ -4,10 +4,12 @@ from typing import Any from uuid import UUID +from asgi_correlation_id import correlation_id from fastapi import HTTPException from sqlalchemy.exc import IntegrityError from sqlmodel import Session +from app.celery.utils import start_evaluation_batch_submission from app.core.cloud.storage import get_cloud_storage from app.core.storage_utils import load_json_from_object_store from app.crud.evaluations import ( @@ -20,13 +22,12 @@ resolve_evaluation_config, save_score, sort_traces_by_question_id, - start_evaluation_batch, ) +from app.crud.evaluations.core import update_evaluation_run from app.crud.evaluations.score import CategoryMetrics, TraceData -from app.models.evaluation import EvaluationRun, RunModeEnum -from app.models.llm.request import STTLLMParams, TextLLMParams, TTSLLMParams +from app.models.evaluation import EvaluationRun, EvaluationRunUpdate, RunModeEnum from app.services.llm.providers import LLMProvider -from app.utils import get_langfuse_client, get_openai_client +from app.utils import get_langfuse_client logger = logging.getLogger(__name__) @@ -40,6 +41,16 @@ # other IntegrityError (e.g. FK violations), which must not be masked. _RUN_NAME_UNIQUE_CONSTRAINT = "uq_evaluation_run_org_project_run_name" +# Providers whose configs can run through the async batch evaluation path. +# Mirrors app.services.assessment.service._SUPPORTED_BATCH_PROVIDERS — the +# native variants forward raw params but still submit via the same batch API. +_SUPPORTED_BATCH_PROVIDERS = { + LLMProvider.OPENAI, + LLMProvider.OPENAI_NATIVE, + LLMProvider.GOOGLE_AISTUDIO, + LLMProvider.GOOGLE_AISTUDIO_NATIVE, +} + def _is_run_name_conflict(error: IntegrityError) -> bool: """True only when the IntegrityError is the run_name uniqueness violation.""" @@ -267,28 +278,20 @@ def validate_and_start_batch_evaluation( status_code=400, detail=f"Failed to resolve config from stored config: {error}", ) - elif config.completion.provider != LLMProvider.OPENAI: + elif config.completion.provider not in _SUPPORTED_BATCH_PROVIDERS: raise HTTPException( status_code=422, - detail="Only 'openai' provider is supported for evaluation configs", + detail=( + f"Provider '{config.completion.provider}' is not supported for " + f"evaluation configs. Supported providers: " + f"{sorted(_SUPPORTED_BATCH_PROVIDERS)}" + ), ) logger.info( "[validate_and_start_batch_evaluation] Successfully resolved config from config management" ) - # Get API clients - openai_client = get_openai_client( - session=session, - org_id=organization_id, - project_id=project_id, - ) - langfuse = get_langfuse_client( - session=session, - org_id=organization_id, - project_id=project_id, - ) - # Step 3: Create EvaluationRun record with config references eval_run = create_evaluation_run_or_409( session=session, @@ -302,39 +305,35 @@ def validate_and_start_batch_evaluation( log_context="validate_and_start_batch_evaluation", ) - # Step 4: Start the batch evaluation + # Step 4: Queue the batch submission for asynchronous processing + trace_id = correlation_id.get() or "N/A" try: - # Convert params dict to appropriate model instance based on type - param_models = { - "text": TextLLMParams, - "stt": STTLLMParams, - "tts": TTSLLMParams, - } - model_class = param_models[config.completion.type] - validated_params = model_class.model_validate(config.completion.params) - - eval_run = start_evaluation_batch( - langfuse=langfuse, - openai_client=openai_client, - session=session, - eval_run=eval_run, - config=validated_params, + celery_task_id = start_evaluation_batch_submission( + project_id=project_id, + job_id=str(eval_run.id), + trace_id=trace_id, + organization_id=organization_id, + config_id=str(config_id), + config_version=config_version, ) - logger.info( - f"[validate_and_start_batch_evaluation] Evaluation started successfully | " - f"batch_job_id={eval_run.batch_job_id} | total_items={eval_run.total_items}" + f"[validate_and_start_batch_evaluation] Batch submission queued | " + f"run_id={eval_run.id} | celery_task_id={celery_task_id}" ) - return eval_run - except Exception as e: logger.error( - f"[validate_and_start_batch_evaluation] Failed to start evaluation | run_id={eval_run.id} | {e}", + f"[validate_and_start_batch_evaluation] Failed to queue batch submission | run_id={eval_run.id} | {e}", exc_info=True, ) - # Error is already handled in start_evaluation_batch - session.refresh(eval_run) + eval_run = update_evaluation_run( + session=session, + eval_run=eval_run, + update=EvaluationRunUpdate( + status="failed", + error_message=f"Failed to queue batch submission: {e}", + ), + ) return eval_run diff --git a/backend/app/tests/api/routes/test_evaluation.py b/backend/app/tests/api/routes/test_evaluation.py index 8b4e6679c..7653f6182 100644 --- a/backend/app/tests/api/routes/test_evaluation.py +++ b/backend/app/tests/api/routes/test_evaluation.py @@ -1,15 +1,27 @@ import io from typing import Any -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, Mock, patch from uuid import uuid4 import pytest from fastapi.testclient import TestClient from sqlmodel import Session, select -from app.crud.evaluations.batch import build_evaluation_jsonl -from app.models import EvaluationDataset, EvaluationRun -from app.models.llm.request import TextLLMParams +from app.core.util import now +from app.crud.evaluations.batch import ( + build_google_evaluation_jsonl, + build_openai_evaluation_jsonl, + start_evaluation_batch, +) +from app.crud.evaluations.core import create_evaluation_run +from app.models import ( + BatchJob, + EvaluationDataset, + EvaluationRun, + Organization, + Project, +) +from app.models.batch_job import BatchJobType from app.tests.utils.auth import TestAuthContext from app.tests.utils.test_data import create_test_config, create_test_evaluation_dataset @@ -597,10 +609,18 @@ def test_start_batch_evaluation_without_authentication( class TestBatchEvaluationJSONLBuilding: - """Test JSONL building logic for batch evaluation.""" + """Test JSONL building logic for batch evaluation. + + ``build_openai_evaluation_jsonl`` wraps an already-mapped ``openai_params`` + dict (the output of ``map_kaapi_to_openai_params``) into Responses API batch + lines: ``body`` is the params dict plus the per-item ``input``. Param shaping + (temperature inclusion, knowledge_base_ids -> tools) is the mapper's job and + is covered in tests/services/llm/test_mappers.py, so these tests pass plain + pre-mapped dicts and assert faithful pass-through. + """ def test_build_batch_jsonl_basic(self) -> None: - """Test basic JSONL building with minimal config.""" + """Test basic JSONL building with a fully populated params dict.""" dataset_items = [ { "id": "item1", @@ -610,13 +630,13 @@ def test_build_batch_jsonl_basic(self) -> None: } ] - config = TextLLMParams( - model="gpt-4o", - temperature=0.2, - instructions="You are a helpful assistant", - ) + openai_params = { + "model": "gpt-4o", + "temperature": 0.2, + "instructions": "You are a helpful assistant", + } - jsonl_data = build_evaluation_jsonl(dataset_items, config) + jsonl_data = build_openai_evaluation_jsonl(dataset_items, openai_params) assert len(jsonl_data) == 1 assert isinstance(jsonl_data[0], dict) @@ -631,7 +651,7 @@ def test_build_batch_jsonl_basic(self) -> None: assert request["body"]["input"] == "What is 2+2?" def test_build_batch_jsonl_with_tools(self) -> None: - """Test JSONL building with tools configuration.""" + """Tools present in the params dict are passed through to the body.""" dataset_items = [ { "id": "item1", @@ -641,13 +661,19 @@ def test_build_batch_jsonl_with_tools(self) -> None: } ] - config = TextLLMParams( - model="gpt-4o-mini", - instructions="Search documents", - knowledge_base_ids=["vs_abc123"], - ) + openai_params = { + "model": "gpt-4o-mini", + "instructions": "Search documents", + "tools": [ + { + "type": "file_search", + "vector_store_ids": ["vs_abc123"], + "max_num_results": 20, + } + ], + } - jsonl_data = build_evaluation_jsonl(dataset_items, config) + jsonl_data = build_openai_evaluation_jsonl(dataset_items, openai_params) assert len(jsonl_data) == 1 request = jsonl_data[0] @@ -655,7 +681,7 @@ def test_build_batch_jsonl_with_tools(self) -> None: assert "vs_abc123" in request["body"]["tools"][0]["vector_store_ids"] def test_build_batch_jsonl_minimal_config(self) -> None: - """Test JSONL building with minimal config (only model required).""" + """Test JSONL building with minimal params (only model).""" dataset_items = [ { "id": "item1", @@ -665,9 +691,9 @@ def test_build_batch_jsonl_minimal_config(self) -> None: } ] - config = TextLLMParams(model="gpt-4o") # Only model provided + openai_params = {"model": "gpt-4o"} - jsonl_data = build_evaluation_jsonl(dataset_items, config) + jsonl_data = build_openai_evaluation_jsonl(dataset_items, openai_params) assert len(jsonl_data) == 1 request = jsonl_data[0] @@ -697,9 +723,9 @@ def test_build_batch_jsonl_skips_empty_questions(self) -> None: }, ] - config = TextLLMParams(model="gpt-4o", instructions="Test") + openai_params = {"model": "gpt-4o", "instructions": "Test"} - jsonl_data = build_evaluation_jsonl(dataset_items, config) + jsonl_data = build_openai_evaluation_jsonl(dataset_items, openai_params) # Should only have 1 valid item assert len(jsonl_data) == 1 @@ -717,12 +743,12 @@ def test_build_batch_jsonl_multiple_items(self) -> None: for i in range(5) ] - config = TextLLMParams( - model="gpt-4o", - instructions="Answer questions", - ) + openai_params = { + "model": "gpt-4o", + "instructions": "Answer questions", + } - jsonl_data = build_evaluation_jsonl(dataset_items, config) + jsonl_data = build_openai_evaluation_jsonl(dataset_items, openai_params) assert len(jsonl_data) == 5 @@ -732,7 +758,7 @@ def test_build_batch_jsonl_multiple_items(self) -> None: assert request_dict["body"]["model"] == "gpt-4o" def test_build_batch_jsonl_temperature_included_when_explicitly_set(self) -> None: - """When temperature is explicitly set, it should appear in the JSONL body.""" + """A temperature present in the params dict appears in the JSONL body.""" dataset_items = [ { "id": "item1", @@ -742,16 +768,16 @@ def test_build_batch_jsonl_temperature_included_when_explicitly_set(self) -> Non } ] - config = TextLLMParams(model="gpt-4o", temperature=0.5) + openai_params = {"model": "gpt-4o", "temperature": 0.5} - jsonl_data = build_evaluation_jsonl(dataset_items, config) + jsonl_data = build_openai_evaluation_jsonl(dataset_items, openai_params) assert len(jsonl_data) == 1 assert "temperature" in jsonl_data[0]["body"] assert jsonl_data[0]["body"]["temperature"] == 0.5 def test_build_batch_jsonl_temperature_excluded_when_not_set(self) -> None: - """When temperature is not explicitly set, it should NOT appear in the JSONL body.""" + """A temperature absent from the params dict stays absent from the body.""" dataset_items = [ { "id": "item1", @@ -761,10 +787,10 @@ def test_build_batch_jsonl_temperature_excluded_when_not_set(self) -> None: } ] - # Only model provided — temperature not in model_fields_set - config = TextLLMParams(model="gpt-4o") + # Mapper omits temperature when it was never set, so it isn't in the dict. + openai_params = {"model": "gpt-4o"} - jsonl_data = build_evaluation_jsonl(dataset_items, config) + jsonl_data = build_openai_evaluation_jsonl(dataset_items, openai_params) assert len(jsonl_data) == 1 assert "temperature" not in jsonl_data[0]["body"] @@ -772,7 +798,7 @@ def test_build_batch_jsonl_temperature_excluded_when_not_set(self) -> None: def test_build_batch_jsonl_temperature_zero_included_when_explicitly_set( self, ) -> None: - """When temperature is explicitly set to 0.0, it should still appear in the body.""" + """A temperature of 0.0 in the params dict is preserved in the body.""" dataset_items = [ { "id": "item1", @@ -782,15 +808,343 @@ def test_build_batch_jsonl_temperature_zero_included_when_explicitly_set( } ] - config = TextLLMParams(model="gpt-4o", temperature=0.0) + openai_params = {"model": "gpt-4o", "temperature": 0.0} - jsonl_data = build_evaluation_jsonl(dataset_items, config) + jsonl_data = build_openai_evaluation_jsonl(dataset_items, openai_params) assert len(jsonl_data) == 1 assert "temperature" in jsonl_data[0]["body"] assert jsonl_data[0]["body"]["temperature"] == 0.0 +class TestBuildGoogleEvaluationJSONL: + """Test JSONL building for Gemini batch evaluation. + + ``build_google_evaluation_jsonl`` wraps an already-mapped ``google_params`` + dict (output of ``map_kaapi_to_google_params``) into Gemini batch lines: + ``{"key": , "request": {...}}``. The request always carries the + per-item question in ``contents``; ``instructions``/``temperature``/ + ``reasoning`` are folded into ``systemInstruction``/``generationConfig`` only + when present, so these tests pass pre-mapped dicts and assert that shaping. + """ + + def _items(self, *questions: str) -> list[dict[str, Any]]: + return [ + { + "id": f"item{i}", + "input": {"question": q}, + "expected_output": {"answer": "a"}, + "metadata": {}, + } + for i, q in enumerate(questions) + ] + + def test_basic_question_only(self) -> None: + """A bare params dict produces contents but no system/generation config.""" + jsonl_data = build_google_evaluation_jsonl( + self._items("What is 2+2?"), {"model": "gemini-2.5-pro"} + ) + + assert len(jsonl_data) == 1 + line = jsonl_data[0] + assert line["key"] == "item0" + request = line["request"] + assert request["contents"] == [ + {"parts": [{"text": "What is 2+2?"}], "role": "user"} + ] + assert "systemInstruction" not in request + assert "generationConfig" not in request + + def test_instructions_become_system_instruction(self) -> None: + """``instructions`` are mapped into ``systemInstruction``.""" + jsonl_data = build_google_evaluation_jsonl( + self._items("Q"), + {"model": "gemini-2.5-pro", "instructions": "Be concise"}, + ) + + request = jsonl_data[0]["request"] + assert request["systemInstruction"] == {"parts": [{"text": "Be concise"}]} + + def test_temperature_in_generation_config(self) -> None: + """``temperature`` goes into ``generationConfig``.""" + jsonl_data = build_google_evaluation_jsonl( + self._items("Q"), {"model": "gemini-2.5-pro", "temperature": 0.3} + ) + + assert jsonl_data[0]["request"]["generationConfig"]["temperature"] == 0.3 + + def test_temperature_zero_included(self) -> None: + """A temperature of 0.0 is preserved (not treated as absent).""" + jsonl_data = build_google_evaluation_jsonl( + self._items("Q"), {"model": "gemini-2.5-pro", "temperature": 0.0} + ) + + assert jsonl_data[0]["request"]["generationConfig"]["temperature"] == 0.0 + + def test_reasoning_becomes_thinking_config(self) -> None: + """``reasoning`` is mapped into ``thinkingConfig``.""" + jsonl_data = build_google_evaluation_jsonl( + self._items("Q"), {"model": "gemini-2.5-pro", "reasoning": "high"} + ) + + thinking = jsonl_data[0]["request"]["generationConfig"]["thinkingConfig"] + assert thinking == {"includeThoughts": False, "thinkingLevel": "high"} + + def test_full_params(self) -> None: + """All optional fields together populate both config blocks.""" + jsonl_data = build_google_evaluation_jsonl( + self._items("Q"), + { + "model": "gemini-2.5-pro", + "instructions": "sys", + "temperature": 0.7, + "reasoning": "low", + }, + ) + + request = jsonl_data[0]["request"] + assert request["systemInstruction"] == {"parts": [{"text": "sys"}]} + assert request["generationConfig"]["temperature"] == 0.7 + assert request["generationConfig"]["thinkingConfig"]["thinkingLevel"] == "low" + + def test_skips_empty_and_missing_questions(self) -> None: + """Items with empty or missing questions are dropped.""" + dataset_items = [ + { + "id": "ok", + "input": {"question": "Real"}, + "expected_output": {}, + "metadata": {}, + }, + { + "id": "empty", + "input": {"question": ""}, + "expected_output": {}, + "metadata": {}, + }, + {"id": "missing", "input": {}, "expected_output": {}, "metadata": {}}, + ] + + jsonl_data = build_google_evaluation_jsonl( + dataset_items, {"model": "gemini-2.5-pro"} + ) + + assert len(jsonl_data) == 1 + assert jsonl_data[0]["key"] == "ok" + + def test_multiple_items(self) -> None: + """Each item yields one line keyed by its id.""" + jsonl_data = build_google_evaluation_jsonl( + self._items("Q0", "Q1", "Q2"), {"model": "gemini-2.5-pro"} + ) + + assert [line["key"] for line in jsonl_data] == ["item0", "item1", "item2"] + assert jsonl_data[1]["request"]["contents"][0]["parts"][0]["text"] == "Q1" + + +_BATCH_DATASET_ITEMS = [ + { + "id": "item1", + "input": {"question": "What is 2+2?"}, + "expected_output": {"answer": "4"}, + "metadata": {}, + } +] + + +class TestStartEvaluationBatch: + """Test start_evaluation_batch, the provider-dispatching orchestrator.""" + + @pytest.fixture + def eval_run(self, db: Session) -> EvaluationRun: + """A persisted EvaluationRun in the default (pending) state.""" + org = db.exec(select(Organization)).first() + project = db.exec( + select(Project).where(Project.organization_id == org.id) + ).first() + dataset = create_test_evaluation_dataset( + db=db, + organization_id=org.id, + project_id=project.id, + name="test_dataset_start_batch", + description="Test dataset", + original_items_count=1, + duplication_factor=1, + ) + config = create_test_config(db, project_id=project.id, use_kaapi_schema=True) + return create_evaluation_run( + session=db, + run_name="start_batch_run", + dataset_name=dataset.name, + dataset_id=dataset.id, + config_id=config.id, + config_version=1, + organization_id=org.id, + project_id=project.id, + ) + + def _make_batch_job( + self, db: Session, eval_run: EvaluationRun, provider: str + ) -> BatchJob: + """Persist a BatchJob to stand in for start_batch_job's return value.""" + batch_job = BatchJob( + provider=provider, + provider_batch_id="batch_xyz", + provider_status="submitted", + job_type=BatchJobType.EVALUATION, + total_items=1, + status="submitted", + organization_id=eval_run.organization_id, + project_id=eval_run.project_id, + inserted_at=now(), + updated_at=now(), + ) + db.add(batch_job) + db.commit() + db.refresh(batch_job) + return batch_job + + @patch("app.crud.evaluations.batch.start_batch_job") + @patch("app.crud.evaluations.batch.OpenAIBatchProvider") + @patch("app.crud.evaluations.batch.get_openai_client") + @patch("app.crud.evaluations.batch.map_kaapi_to_openai_params") + @patch("app.crud.evaluations.batch.fetch_dataset_items") + def test_openai_branch( + self, + mock_fetch, + mock_map, + mock_get_client, + mock_provider_cls, + mock_start_batch, + db: Session, + eval_run: EvaluationRun, + ) -> None: + mock_fetch.return_value = _BATCH_DATASET_ITEMS + mock_map.return_value = ({"model": "gpt-4o"}, []) + batch_job = self._make_batch_job(db, eval_run, "openai") + mock_start_batch.return_value = batch_job + + result = start_evaluation_batch( + langfuse=MagicMock(), + session=db, + eval_run=eval_run, + params={"model": "gpt-4o"}, + provider="openai", + ) + + assert result.status == "processing" + assert result.batch_job_id == batch_job.id + assert result.total_items == 1 + assert mock_start_batch.call_args.kwargs["provider_name"] == "openai" + mock_get_client.assert_called_once() + + @patch("app.crud.evaluations.batch.start_batch_job") + @patch("app.crud.evaluations.batch.OpenAIBatchProvider") + @patch("app.crud.evaluations.batch.get_openai_client") + @patch("app.crud.evaluations.batch.map_kaapi_to_openai_params") + @patch("app.crud.evaluations.batch.fetch_dataset_items") + def test_native_suffix_resolves_to_base_provider( + self, + mock_fetch, + mock_map, + mock_get_client, + mock_provider_cls, + mock_start_batch, + db: Session, + eval_run: EvaluationRun, + ) -> None: + """An 'openai-native' provider runs through the OpenAI branch.""" + mock_fetch.return_value = _BATCH_DATASET_ITEMS + mock_map.return_value = ({"model": "gpt-4o"}, ["warn"]) + mock_start_batch.return_value = self._make_batch_job(db, eval_run, "openai") + + result = start_evaluation_batch( + langfuse=MagicMock(), + session=db, + eval_run=eval_run, + params={"model": "gpt-4o"}, + provider="openai-native", + ) + + assert result.status == "processing" + assert mock_start_batch.call_args.kwargs["provider_name"] == "openai" + + @patch("app.crud.evaluations.batch.start_batch_job") + @patch("app.crud.evaluations.batch.GeminiBatchProvider") + @patch("app.crud.evaluations.batch.GeminiClient") + @patch("app.crud.evaluations.batch.map_kaapi_to_google_params") + @patch("app.crud.evaluations.batch.fetch_dataset_items") + def test_google_branch( + self, + mock_fetch, + mock_map, + mock_gemini_client, + mock_provider_cls, + mock_start_batch, + db: Session, + eval_run: EvaluationRun, + ) -> None: + mock_fetch.return_value = _BATCH_DATASET_ITEMS + mock_map.return_value = ({"model": "gemini-2.5-pro"}, []) + batch_job = self._make_batch_job(db, eval_run, "google-aistudio") + mock_start_batch.return_value = batch_job + + result = start_evaluation_batch( + langfuse=MagicMock(), + session=db, + eval_run=eval_run, + params={"model": "gemini-2.5-pro"}, + provider="google-aistudio", + ) + + assert result.status == "processing" + assert result.batch_job_id == batch_job.id + assert mock_start_batch.call_args.kwargs["provider_name"] == "google-aistudio" + mock_gemini_client.from_credentials.assert_called_once() + + @patch("app.crud.evaluations.batch.fetch_dataset_items") + def test_unsupported_provider_marks_failed( + self, mock_fetch, db: Session, eval_run: EvaluationRun + ) -> None: + mock_fetch.return_value = _BATCH_DATASET_ITEMS + + with pytest.raises(ValueError, match="Unsupported provider"): + start_evaluation_batch( + langfuse=MagicMock(), + session=db, + eval_run=eval_run, + params={"model": "claude"}, + provider="anthropic", + ) + + db.refresh(eval_run) + assert eval_run.status == "failed" + assert eval_run.error_message + + @patch("app.crud.evaluations.batch.map_kaapi_to_openai_params") + @patch("app.crud.evaluations.batch.fetch_dataset_items") + def test_empty_jsonl_marks_failed( + self, mock_fetch, mock_map, db: Session, eval_run: EvaluationRun + ) -> None: + """No questions in the dataset -> empty JSONL -> failure.""" + mock_fetch.return_value = [ + {"id": "x", "input": {}, "expected_output": {}, "metadata": {}} + ] + mock_map.return_value = ({"model": "gpt-4o"}, []) + + with pytest.raises(ValueError, match="did not produce any JSONL"): + start_evaluation_batch( + langfuse=MagicMock(), + session=db, + eval_run=eval_run, + params={"model": "gpt-4o"}, + provider="openai", + ) + + db.refresh(eval_run) + assert eval_run.status == "failed" + + class TestGetEvaluationRunStatus: """Test GET /evaluations/{evaluation_id} endpoint.""" diff --git a/backend/app/tests/crud/evaluations/test_langfuse.py b/backend/app/tests/crud/evaluations/test_langfuse.py index 31980b7fd..6344b8f98 100644 --- a/backend/app/tests/crud/evaluations/test_langfuse.py +++ b/backend/app/tests/crud/evaluations/test_langfuse.py @@ -176,6 +176,41 @@ def test_create_langfuse_dataset_run_handles_trace_error(self) -> None: assert "item_1" in trace_id_mapping assert "item_2" not in trace_id_mapping + def test_create_langfuse_dataset_run_handles_rate_limit(self) -> None: + """A 429 status_code error is handled (logged distinctly) and skipped.""" + mock_langfuse = MagicMock() + mock_dataset = MagicMock() + + rate_limit_error = Exception("Too Many Requests") + rate_limit_error.status_code = 429 + + mock_item = MagicMock() + mock_item.id = "item_1" + mock_item.observe.side_effect = rate_limit_error + + mock_dataset.items = [mock_item] + mock_langfuse.get_dataset.return_value = mock_dataset + + results = [ + { + "item_id": "item_1", + "question": "What is 2+2?", + "generated_output": "4", + "ground_truth": "4", + "response_id": "resp_123", + "usage": {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}, + } + ] + + trace_id_mapping = create_langfuse_dataset_run( + langfuse=mock_langfuse, + dataset_name="test_dataset", + run_name="test_run", + results=results, + ) + + assert trace_id_mapping == {} + def test_create_langfuse_dataset_run_empty_results(self) -> None: """Test with empty results list.""" mock_langfuse = MagicMock() diff --git a/backend/app/tests/crud/evaluations/test_processing.py b/backend/app/tests/crud/evaluations/test_processing.py index 40472ea8f..cc6922bdb 100644 --- a/backend/app/tests/crud/evaluations/test_processing.py +++ b/backend/app/tests/crud/evaluations/test_processing.py @@ -1,4 +1,5 @@ import json +from typing import Any from unittest.mock import MagicMock, patch import pytest @@ -8,6 +9,8 @@ from app.crud.evaluations.core import create_evaluation_run from app.crud.evaluations.processing import ( _extract_batch_error_message, + _extract_gemini_usage, + _get_batch_provider, check_and_process_evaluation, parse_evaluation_output, poll_all_pending_evaluations, @@ -268,6 +271,184 @@ def test_parse_evaluation_output_multiple_items(self) -> None: assert result["ground_truth"] == f"A{i}" +class TestParseEvaluationOutputGoogle: + """Test parsing Gemini batch output (provider_name=google-aistudio). + + Gemini lines are keyed by ``key`` (not ``custom_id``) and carry a nested + ``response`` dict with ``candidates``/``usageMetadata`` rather than the + OpenAI ``response.body`` shape. + """ + + def _dataset_items(self) -> list[dict[str, Any]]: + return [ + { + "id": "item1", + "input": {"question": "What is 2+2?"}, + "expected_output": {"answer": "4"}, + "metadata": {"question_id": 7}, + } + ] + + def test_parse_gemini_success(self) -> None: + """Text is pulled from candidates and usage from usageMetadata.""" + raw_results = [ + { + "key": "item1", + "response": { + "responseId": "resp_g1", + "candidates": [ + {"content": {"parts": [{"text": "The answer is 4"}]}} + ], + "usageMetadata": { + "promptTokenCount": 10, + "candidatesTokenCount": 5, + "totalTokenCount": 15, + "thoughtsTokenCount": 2, + }, + }, + } + ] + + results = parse_evaluation_output( + raw_results, self._dataset_items(), provider_name="google-aistudio" + ) + + assert len(results) == 1 + result = results[0] + assert result["item_id"] == "item1" + assert result["generated_output"] == "The answer is 4" + assert result["response_id"] == "resp_g1" + assert result["question_id"] == 7 + assert result["usage"] == { + "input_tokens": 10, + "output_tokens": 5, + "total_tokens": 15, + "reasoning_tokens": 2, + } + + def test_parse_gemini_error(self) -> None: + """An ``error`` on the line yields an ERROR: generated_output.""" + raw_results = [{"key": "item1", "error": {"message": "quota exceeded"}}] + + results = parse_evaluation_output( + raw_results, self._dataset_items(), provider_name="google-aistudio" + ) + + assert len(results) == 1 + assert results[0]["generated_output"].startswith("ERROR:") + assert "quota exceeded" in results[0]["generated_output"] + + def test_parse_gemini_native_provider(self) -> None: + """The -native provider variant is treated as Google too.""" + raw_results = [ + { + "key": "item1", + "response": { + "candidates": [{"content": {"parts": [{"text": "ok"}]}}], + }, + } + ] + + results = parse_evaluation_output( + raw_results, + self._dataset_items(), + provider_name="google-aistudio-native", + ) + + assert results[0]["generated_output"] == "ok" + assert results[0]["usage"] is None + + +class TestExtractGeminiUsage: + """Test ``_extract_gemini_usage`` mapping of Gemini usage to OpenAI shape.""" + + def test_camel_case(self) -> None: + usage = _extract_gemini_usage( + { + "usageMetadata": { + "promptTokenCount": 10, + "candidatesTokenCount": 5, + "totalTokenCount": 15, + "thoughtsTokenCount": 3, + } + } + ) + assert usage == { + "input_tokens": 10, + "output_tokens": 5, + "total_tokens": 15, + "reasoning_tokens": 3, + } + + def test_snake_case(self) -> None: + usage = _extract_gemini_usage( + { + "usage_metadata": { + "prompt_token_count": 1, + "candidates_token_count": 2, + "total_token_count": 3, + "thoughts_token_count": 4, + } + } + ) + assert usage == { + "input_tokens": 1, + "output_tokens": 2, + "total_tokens": 3, + "reasoning_tokens": 4, + } + + def test_missing_metadata_returns_none(self) -> None: + assert _extract_gemini_usage({}) is None + assert _extract_gemini_usage({"usageMetadata": None}) is None + + def test_partial_metadata_defaults_to_zero(self) -> None: + usage = _extract_gemini_usage({"usageMetadata": {"promptTokenCount": 9}}) + assert usage == { + "input_tokens": 9, + "output_tokens": 0, + "total_tokens": 0, + "reasoning_tokens": 0, + } + + +class TestGetBatchProvider: + """Test _get_batch_provider dispatch by provider name.""" + + @patch("app.crud.evaluations.processing.OpenAIBatchProvider") + @patch("app.crud.evaluations.processing.get_openai_client") + def test_openai(self, mock_get_client, mock_provider_cls) -> None: + provider = _get_batch_provider( + session=MagicMock(), + provider_name="openai", + organization_id=1, + project_id=2, + ) + mock_get_client.assert_called_once() + assert provider is mock_provider_cls.return_value + + @patch("app.crud.evaluations.processing.GeminiBatchProvider") + @patch("app.crud.evaluations.processing.GeminiClient") + def test_google(self, mock_gemini_client, mock_provider_cls) -> None: + provider = _get_batch_provider( + session=MagicMock(), + provider_name="google-aistudio", + organization_id=1, + project_id=2, + ) + mock_gemini_client.from_credentials.assert_called_once() + assert provider is mock_provider_cls.return_value + + def test_unsupported_raises(self) -> None: + with pytest.raises(ValueError, match="Unsupported provider"): + _get_batch_provider( + session=MagicMock(), + provider_name="anthropic", + organization_id=1, + project_id=2, + ) + + class TestProcessCompletedEvaluation: """Test processing completed evaluation batch.""" diff --git a/backend/app/tests/services/evaluations/test_evaluation_service_s3.py b/backend/app/tests/services/evaluations/test_evaluation_service_s3.py index 78089fbdc..722318ac2 100644 --- a/backend/app/tests/services/evaluations/test_evaluation_service_s3.py +++ b/backend/app/tests/services/evaluations/test_evaluation_service_s3.py @@ -1,13 +1,18 @@ -"""Tests for get_evaluation_with_scores() S3 retrieval.""" +"""Tests for app.services.evaluations.evaluation service functions.""" from collections.abc import Callable from typing import Optional from unittest.mock import MagicMock, patch +from uuid import uuid4 import pytest +from fastapi import HTTPException from app.models import EvaluationRun -from app.services.evaluations.evaluation import get_evaluation_with_scores +from app.services.evaluations.evaluation import ( + get_evaluation_with_scores, + validate_and_start_batch_evaluation, +) class TestGetEvaluationWithScoresS3: @@ -200,3 +205,113 @@ def test_resync_skipped_when_cache_unreadable( assert error is not None mock_fetch_langfuse.assert_not_called() # did not re-fetch mock_save_score.assert_not_called() # did not overwrite the cache + + +_MODULE = "app.services.evaluations.evaluation" + + +def _dataset() -> MagicMock: + dataset = MagicMock() + dataset.id = 1 + dataset.name = "ds" + dataset.langfuse_dataset_id = "lf_ds_1" + dataset.object_store_url = None + return dataset + + +def _config(provider: str) -> MagicMock: + config = MagicMock() + config.completion.provider = provider + config.completion.type = "text" + config.completion.params = {"model": "m"} + return config + + +class TestValidateAndStartBatchEvaluation: + """Test validate_and_start_batch_evaluation provider gating and queueing.""" + + @patch(f"{_MODULE}.resolve_evaluation_config") + @patch(f"{_MODULE}.get_dataset_by_id") + def test_unsupported_provider_raises_422( + self, mock_get_dataset, mock_resolve + ) -> None: + mock_get_dataset.return_value = _dataset() + mock_resolve.return_value = (_config("anthropic"), None) + + with pytest.raises(HTTPException) as exc: + validate_and_start_batch_evaluation( + session=MagicMock(), + dataset_id=1, + experiment_name="exp", + config_id=uuid4(), + config_version=1, + organization_id=1, + project_id=2, + ) + + assert exc.value.status_code == 422 + assert "not supported" in exc.value.detail + + @patch(f"{_MODULE}.update_evaluation_run") + @patch(f"{_MODULE}.start_evaluation_batch_submission") + @patch(f"{_MODULE}.create_evaluation_run_or_409") + @patch(f"{_MODULE}.resolve_evaluation_config") + @patch(f"{_MODULE}.get_dataset_by_id") + def test_queue_failure_marks_run_failed( + self, + mock_get_dataset, + mock_resolve, + mock_create_run, + mock_submit, + mock_update, + ) -> None: + """A failure enqueueing the celery task flips the run to failed (no raise).""" + mock_get_dataset.return_value = _dataset() + mock_resolve.return_value = (_config("openai"), None) + + eval_run = MagicMock() + eval_run.id = 99 + mock_create_run.return_value = eval_run + mock_submit.side_effect = Exception("broker down") + mock_update.return_value = eval_run + + result = validate_and_start_batch_evaluation( + session=MagicMock(), + dataset_id=1, + experiment_name="exp", + config_id=uuid4(), + config_version=1, + organization_id=1, + project_id=2, + ) + + assert result is eval_run + update_arg = mock_update.call_args.kwargs["update"] + assert update_arg.status == "failed" + assert "Failed to queue batch submission" in update_arg.error_message + + @patch(f"{_MODULE}.start_evaluation_batch_submission") + @patch(f"{_MODULE}.create_evaluation_run_or_409") + @patch(f"{_MODULE}.resolve_evaluation_config") + @patch(f"{_MODULE}.get_dataset_by_id") + def test_success_returns_run( + self, mock_get_dataset, mock_resolve, mock_create_run, mock_submit + ) -> None: + mock_get_dataset.return_value = _dataset() + mock_resolve.return_value = (_config("google-aistudio"), None) + eval_run = MagicMock() + mock_create_run.return_value = eval_run + mock_submit.return_value = "task-1" + + result = validate_and_start_batch_evaluation( + session=MagicMock(), + dataset_id=1, + experiment_name="exp", + config_id=uuid4(), + config_version=1, + organization_id=1, + project_id=2, + ) + + assert result is eval_run + mock_submit.assert_called_once()