Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions crates/taurus/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use tucana::shared::module_status::StatusVariant;

use crate::client::runtime_execution::TaurusRuntimeExecutionService;
use crate::client::runtime_status::TaurusRuntimeStatusService;
use crate::client::runtime_usage::TaurusRuntimeUsageService;
use crate::config::Config;

pub async fn run() {
Expand All @@ -32,7 +31,6 @@ pub async fn run() {
let (
runtime_status_service,
runtime_execution_service,
runtime_usage_service,
mut runtime_status_heartbeat_task,
) = setup_dynamic_services_if_needed(&config).await;

Expand All @@ -44,7 +42,6 @@ pub async fn run() {
nats_remote,
runtime_emitter,
runtime_execution_service,
runtime_usage_service,
);

wait_for_shutdown(&mut worker_task, &mut health_task).await;
Expand Down Expand Up @@ -112,11 +109,10 @@ async fn setup_dynamic_services_if_needed(
) -> (
Option<Arc<TaurusRuntimeStatusService>>,
Option<TaurusRuntimeExecutionService>,
Option<TaurusRuntimeUsageService>,
Option<JoinHandle<()>>,
) {
if config.mode != DYNAMIC {
return (None, None, None, None);
return (None, None, None);
}

push_definitions_until_success(config).await;
Expand All @@ -129,11 +125,6 @@ async fn setup_dynamic_services_if_needed(
.await,
);

let runtime_usage_service = Some(
TaurusRuntimeUsageService::from_url(config.aquila_url.clone(), config.aquila_token.clone())
.await,
);

let runtime_status_service = Some(Arc::new(
TaurusRuntimeStatusService::from_url(
config.aquila_url.clone(),
Expand Down Expand Up @@ -184,7 +175,6 @@ async fn setup_dynamic_services_if_needed(
(
runtime_status_service,
runtime_execution_service,
runtime_usage_service,
runtime_status_heartbeat_task,
)
}
Expand Down
25 changes: 3 additions & 22 deletions crates/taurus/src/app/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,16 @@ use taurus_provider::providers::emitter::nats_emitter::NATSRespondEmitter;
use taurus_provider::providers::remote::nats_remote_runtime::NATSRemoteRuntime;
use tokio::task::JoinHandle;
use tucana::shared::execution_result;
use tucana::shared::{ExecutionFlow, ExecutionResult, NodeExecutionResult, RuntimeUsage, Value};
use tucana::shared::{ExecutionFlow, ExecutionResult, NodeExecutionResult, Value};

use crate::client::runtime_execution::TaurusRuntimeExecutionService;
use crate::client::runtime_usage::TaurusRuntimeUsageService;

pub fn spawn_worker(
client: async_nats::Client,
engine: ExecutionEngine,
nats_remote: NATSRemoteRuntime,
runtime_emitter: NATSRespondEmitter,
mut runtime_execution_service: Option<TaurusRuntimeExecutionService>,
runtime_usage_service: Option<TaurusRuntimeUsageService>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let mut execution_subscription = match client
Expand Down Expand Up @@ -52,7 +50,6 @@ pub fn spawn_worker(
&nats_remote,
&runtime_emitter,
runtime_execution_service.as_mut(),
runtime_usage_service.as_ref(),
).await;
}
None => {
Expand All @@ -74,7 +71,6 @@ async fn process_execution_message(
nats_remote: &NATSRemoteRuntime,
runtime_emitter: &NATSRespondEmitter,
mut runtime_execution_service: Option<&mut TaurusRuntimeExecutionService>,
runtime_usage_service: Option<&TaurusRuntimeUsageService>,
) {
let requested_execution_id = parse_execution_id_from_subject(&message.subject, "execution")
.unwrap_or_else(|| {
Expand Down Expand Up @@ -138,12 +134,6 @@ async fn process_execution_message(
.update_runtime_execution(execution_result)
.await;
}

if let Some(usage_service) = runtime_usage_service {
usage_service
.update_runtime_usage(run_result.runtime_usage)
.await;
}
}

#[derive(Clone)]
Expand All @@ -155,7 +145,6 @@ struct FlowRunResult {
input: Option<Value>,
signal: Signal,
node_execution_results: Vec<NodeExecutionResult>,
runtime_usage: RuntimeUsage,
}

async fn execute_flow(
Expand All @@ -166,7 +155,6 @@ async fn execute_flow(
respond_emitter: Option<&dyn RespondEmitter>,
) -> FlowRunResult {
let started_at = now_unix_micros();
let start = Instant::now();
let flow_id = flow.flow_id;
let input = flow.input_value.clone();
let report = engine
Expand All @@ -179,7 +167,6 @@ async fn execute_flow(
)
.await;
let finished_at = now_unix_micros();
let duration_micros = start.elapsed().as_micros() as i64;

FlowRunResult {
execution_id,
Expand All @@ -189,11 +176,7 @@ async fn execute_flow(
input,
signal: report.signal,
node_execution_results: report.node_execution_results,
runtime_usage: RuntimeUsage {
flow_id,
duration: duration_micros,
},
}
}
}

fn parse_execution_id_from_subject(
Expand Down Expand Up @@ -347,17 +330,15 @@ mod tests {
let run_result = execute_flow(execution_id, flow, &engine, None, None).await;

println!(
"started_at={} finished_at={} delta={} runtime_usage.duration={}",
"started_at={} finished_at={} delta={}",
run_result.started_at,
run_result.finished_at,
run_result.finished_at - run_result.started_at,
run_result.runtime_usage.duration
);

assert_eq!(run_result.execution_id, execution_id);
assert!(run_result.started_at >= 1_000_000_000_000_000);
assert!(run_result.finished_at >= run_result.started_at);
assert!(run_result.runtime_usage.duration > 0);
assert!(
run_result
.node_execution_results
Expand Down
1 change: 0 additions & 1 deletion crates/taurus/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
pub mod runtime_execution;
pub mod runtime_status;
pub mod runtime_usage;
48 changes: 0 additions & 48 deletions crates/taurus/src/client/runtime_usage.rs

This file was deleted.

6 changes: 2 additions & 4 deletions docs/dev.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Taurus is the execution runtime in the CodeZero execution block.
- Executes flow graphs via `taurus-core::runtime::engine::ExecutionEngine`
- Emits lifecycle events to NATS (`runtime.emitter.<execution_id>`)
- Delegates remote nodes to external services over NATS (`action.<service>.<execution_id>`)
- Reports runtime status, runtime usage, and execution results to Aquila in dynamic mode
- Reports runtime status and execution results to Aquila in dynamic mode

## Workspace Layout

Expand Down Expand Up @@ -50,7 +50,7 @@ graph TD
Core --> Remote
Remote -->|action.<service>.<execution_id>| NATS
NATS --> Service
Taurus -->|runtime status + usage + execution result| Aquila
Taurus -->|runtime status + execution result| Aquila
```

### Execution details
Expand All @@ -73,7 +73,6 @@ Taurus mode is controlled by `MODE`.

- Sends definitions to Aquila (retry loop until success)
- Starts runtime status reporting (including heartbeat)
- Sends runtime usage updates after each flow run
- Sends execution result updates after each flow run

### `static`
Expand All @@ -83,7 +82,6 @@ Taurus mode is controlled by `MODE`.
- Taurus still executes flows from NATS
- No definition push
- No runtime status updates
- No runtime usage updates
- No execution result updates

## Environment Variables
Expand Down