From 1e42770479e1b8c9c45db121147ee0d5c9d0023e Mon Sep 17 00:00:00 2001 From: Test User Date: Sat, 20 Jun 2026 23:50:22 +0200 Subject: [PATCH] feat(terraphim_server): implement WorkflowStep tracking in trace API Refs #2503 Add WorkflowStep and StepStatus types to the workflow module so that individual execution steps can be recorded against a running workflow. The /api/workflow/:id/trace endpoint now returns the accumulated steps instead of a hardcoded empty array. Changes: - Add WorkflowStep struct (id, name, status, started_at, completed_at, output) - Add StepStatus enum (Running, Completed, Failed) - Add steps: Vec to WorkflowStatus - Add record_workflow_step() public helper for handlers to push step records - Fix get_execution_trace() to serialise status.steps instead of [] - Update create_workflow_session() to initialise steps: vec![] - Fix existing simple_workflow_test.rs to include the new field - Add 3 unit tests: empty initial state, single step round-trip, ordering Co-Authored-By: Terraphim AI --- terraphim_server/src/workflows/mod.rs | 136 +++++++++++++++++- .../tests/simple_workflow_test.rs | 1 + 2 files changed, 135 insertions(+), 2 deletions(-) diff --git a/terraphim_server/src/workflows/mod.rs b/terraphim_server/src/workflows/mod.rs index a188c302b..c1a9f9eca 100644 --- a/terraphim_server/src/workflows/mod.rs +++ b/terraphim_server/src/workflows/mod.rs @@ -136,6 +136,8 @@ pub struct WorkflowStatus { pub result: Option, /// Error description; absent unless the workflow failed. pub error: Option, + /// Ordered list of steps recorded during execution. + pub steps: Vec, } /// Lifecycle phases of a workflow run. @@ -154,6 +156,35 @@ pub enum ExecutionStatus { Cancelled, } +/// Terminal state of a single workflow step. +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum StepStatus { + /// Step is currently executing. + Running, + /// Step finished without error. + Completed, + /// Step aborted due to an error. + Failed, +} + +/// A recorded execution step within a workflow run. +#[derive(Debug, Clone, Serialize)] +pub struct WorkflowStep { + /// Step identifier, unique within a workflow run. + pub id: String, + /// Human-readable step name. + pub name: String, + /// Terminal lifecycle state of this step. + pub status: StepStatus, + /// UTC timestamp when this step started. + pub started_at: chrono::DateTime, + /// UTC timestamp when this step finished; absent while the step is running. + pub completed_at: Option>, + /// Output produced by this step; absent when not available. + pub output: Option, +} + /// Event broadcast over WebSocket to connected clients. #[derive(Debug, Clone, Serialize)] pub struct WebSocketMessage { @@ -225,11 +256,10 @@ async fn get_execution_trace( let sessions = state.workflow_sessions.read().await; if let Some(status) = sessions.get(&id) { - // Return detailed execution trace let trace = serde_json::json!({ "workflow_id": id, "status": status.status, - "steps": [], // TODO: Implement detailed step tracking + "steps": status.steps, "timeline": { "started_at": status.started_at, "completed_at": status.completed_at @@ -248,6 +278,107 @@ async fn get_execution_trace( } } +/// Appends a completed step record to an existing workflow run. +pub async fn record_workflow_step( + sessions: &WorkflowSessions, + workflow_id: &str, + step: WorkflowStep, +) { + let mut sessions = sessions.write().await; + if let Some(workflow) = sessions.get_mut(workflow_id) { + workflow.steps.push(step); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn trace_includes_recorded_steps() { + let sessions = RwLock::new(HashMap::new()); + let (broadcaster, _rx) = broadcast::channel(16); + let wf_id = "wf_test_123".to_string(); + + create_workflow_session( + &sessions, + &broadcaster, + wf_id.clone(), + "test_pattern".to_string(), + ) + .await; + + let step = WorkflowStep { + id: "step_1".to_string(), + name: "Parse Input".to_string(), + status: StepStatus::Completed, + started_at: chrono::Utc::now(), + completed_at: Some(chrono::Utc::now()), + output: Some("parsed successfully".to_string()), + }; + record_workflow_step(&sessions, &wf_id, step).await; + + let guard = sessions.read().await; + let wf = guard.get(&wf_id).expect("workflow should exist"); + assert_eq!(wf.steps.len(), 1); + assert_eq!(wf.steps[0].name, "Parse Input"); + assert_eq!(wf.steps[0].id, "step_1"); + assert!(matches!(wf.steps[0].status, StepStatus::Completed)); + } + + #[tokio::test] + async fn trace_starts_with_empty_steps() { + let sessions = RwLock::new(HashMap::new()); + let (broadcaster, _rx) = broadcast::channel(16); + let wf_id = "wf_empty_456".to_string(); + + create_workflow_session( + &sessions, + &broadcaster, + wf_id.clone(), + "test_pattern".to_string(), + ) + .await; + + let guard = sessions.read().await; + let wf = guard.get(&wf_id).expect("workflow should exist"); + assert!(wf.steps.is_empty()); + } + + #[tokio::test] + async fn record_multiple_steps_preserves_order() { + let sessions = RwLock::new(HashMap::new()); + let (broadcaster, _rx) = broadcast::channel(16); + let wf_id = "wf_multi_789".to_string(); + + create_workflow_session( + &sessions, + &broadcaster, + wf_id.clone(), + "test_pattern".to_string(), + ) + .await; + + for i in 1..=3u32 { + let step = WorkflowStep { + id: format!("step_{i}"), + name: format!("Step {i}"), + status: StepStatus::Completed, + started_at: chrono::Utc::now(), + completed_at: Some(chrono::Utc::now()), + output: None, + }; + record_workflow_step(&sessions, &wf_id, step).await; + } + + let guard = sessions.read().await; + let wf = guard.get(&wf_id).expect("workflow should exist"); + assert_eq!(wf.steps.len(), 3); + assert_eq!(wf.steps[0].name, "Step 1"); + assert_eq!(wf.steps[2].name, "Step 3"); + } +} + async fn list_workflows(State(state): State) -> Json> { let sessions = state.workflow_sessions.read().await; let workflows: Vec = sessions.values().cloned().collect(); @@ -312,6 +443,7 @@ pub async fn create_workflow_session( completed_at: None, result: None, error: None, + steps: vec![], }; sessions.write().await.insert(workflow_id.clone(), status); diff --git a/terraphim_server/tests/simple_workflow_test.rs b/terraphim_server/tests/simple_workflow_test.rs index 4ad053660..647f9ffba 100644 --- a/terraphim_server/tests/simple_workflow_test.rs +++ b/terraphim_server/tests/simple_workflow_test.rs @@ -24,6 +24,7 @@ async fn test_workflow_system_basic() { completed_at: None, result: None, error: None, + steps: vec![], }, ); }