diff --git a/src/server/action_transfer_service_server_impl.rs b/src/server/action_transfer_service_server_impl.rs index 0edd17e..e5a5621 100644 --- a/src/server/action_transfer_service_server_impl.rs +++ b/src/server/action_transfer_service_server_impl.rs @@ -1,4 +1,7 @@ -use crate::configuration::service::ServiceConfiguration; +use crate::{ + configuration::service::ServiceConfiguration, + sagittarius::module_service_client_impl::SagittariusModuleServiceClient, +}; use async_nats::{Subject, Subscriber}; use futures::StreamExt; use futures_core::Stream; @@ -16,12 +19,19 @@ use tucana::{ shared::{ExecutionFlow, Flows, ValidationFlow, Value}, }; -type PendingReplies = Arc>>; +type PendingReplies = Arc>>; + +#[derive(Clone)] +struct PendingReply { + reply_subject: Subject, + keys: Vec, +} pub struct AquilaActionTransferServiceServer { client: async_nats::Client, kv: async_nats::jetstream::kv::Store, actions: ServiceConfiguration, + module_service: Option>>, action_config_tx: tokio::sync::broadcast::Sender, is_static: bool, } @@ -31,6 +41,7 @@ impl AquilaActionTransferServiceServer { client: async_nats::Client, kv: async_nats::jetstream::kv::Store, actions: ServiceConfiguration, + module_service: Option>>, action_config_tx: tokio::sync::broadcast::Sender, is_static: bool, ) -> Self { @@ -38,6 +49,7 @@ impl AquilaActionTransferServiceServer { client, kv, actions, + module_service, action_config_tx, is_static, } @@ -131,6 +143,87 @@ fn applies_to_action( }) } +fn overwrite_module_definition_sources( + module: &mut tucana::shared::Module, + action_identifier: &str, +) { + let source = format!("action.{}", action_identifier); + + for flow_type in &mut module.flow_types { + flow_type.definition_source = Some(source.clone()); + } + for runtime_flow_type in &mut module.runtime_flow_types { + runtime_flow_type.definition_source = Some(source.clone()); + } + for function_definition in &mut module.function_definitions { + function_definition.definition_source = source.clone(); + } + for runtime_function_definition in &mut module.runtime_function_definitions { + runtime_function_definition.definition_source = source.clone(); + } + for definition_data_type in &mut module.definition_data_types { + definition_data_type.definition_source = source.clone(); + } +} + +fn subject_execution_identifier(subject: &Subject) -> Option { + subject + .as_str() + .rsplit('.') + .next() + .filter(|execution_id| !execution_id.is_empty()) + .map(ToString::to_string) +} + +fn pending_reply_keys( + request_execution_id: &str, + subject_execution_id: Option<&str>, +) -> Vec { + let mut keys = Vec::new(); + + if !request_execution_id.is_empty() { + keys.push(request_execution_id.to_string()); + } + + if let Some(subject_execution_id) = subject_execution_id { + if !subject_execution_id.is_empty() && !keys.iter().any(|key| key == subject_execution_id) { + keys.push(subject_execution_id.to_string()); + } + } + + keys +} + +fn insert_pending_reply( + pending: &mut HashMap, + reply_subject: Subject, + keys: Vec, +) { + let pending_reply = PendingReply { + reply_subject, + keys: keys.clone(), + }; + + for key in keys { + pending.insert(key, pending_reply.clone()); + } +} + +fn remove_pending_reply( + pending: &mut HashMap, + execution_id: &str, +) -> Option { + let pending_reply = pending.remove(execution_id)?; + + for key in &pending_reply.keys { + if key != execution_id { + pending.remove(key); + } + } + + Some(pending_reply) +} + /// Extracts the bearer token from gRPC metadata. fn extract_token( request: &tonic::Request>, @@ -161,8 +254,9 @@ fn extract_token( /// Validates the logon request, starts NATS + config forwarders, and returns the accepted logon. async fn handle_logon( token: &str, - action_logon: ActionLogon, + mut action_logon: ActionLogon, actions: Arc>, + module_service: Option>>, client: async_nats::Client, cfg_tx: tokio::sync::broadcast::Sender, tx: tokio::sync::mpsc::Sender>, @@ -171,21 +265,46 @@ async fn handle_logon( ) -> Result { log::info!("Action logon attempt payload={:?}", action_logon); - let identifier = match action_logon.module { - Some(ref m) => m.identifier.clone(), + let module = match action_logon.module.as_mut() { + Some(m) => m, None => { return Err(Status::aborted("Please provide a module configuration.")); } }; - let lock = actions.lock().await; - if !lock.has_action(&token.to_string(), &identifier) { - log::warn!( - "Rejected action logon identifier={} reason=token_not_registered", - identifier - ); - return Err(Status::unauthenticated( - "token not matching to action identifier", - )); + let identifier = module.identifier.clone(); + + { + let lock = actions.lock().await; + if !lock.has_action(&token.to_string(), &identifier) { + log::warn!( + "Rejected action logon identifier={} reason=token_not_registered", + identifier + ); + return Err(Status::unauthenticated( + "token not matching to action identifier", + )); + } + } + + overwrite_module_definition_sources(module, &identifier); + + if let Some(module_service) = module_service { + let mut client = module_service.lock().await; + let response = client + .update_modules(tucana::aquila::ModuleUpdateRequest { + modules: vec![module.clone()], + }) + .await; + + if !response.success { + log::error!( + "Rejected action logon identifier={} reason=sagittarius_module_update_failed", + identifier + ); + return Err(Status::internal( + "could not update action module via Sagittarius", + )); + } } log::debug!("Action connected identifier={}", identifier); @@ -204,6 +323,17 @@ async fn handle_logon( } }; + if let Err(err) = client.flush().await { + log::error!( + "Could not flush action subscription: {}. Reason: {:?}", + identifier, + err + ); + return Err(Status::internal( + "could not register action subscription with NATS", + )); + } + log::debug!("Subscribed to action subject action.{}.*", identifier); let tx_clone = tx.clone(); @@ -283,6 +413,8 @@ async fn handle_event( let bytes = execution_flow.encode_to_vec(); let topic = format!("execution.{}", uuid); + log::info!("{:#?}", execution_flow); + log::info!( "Requesting execution flow_id={} execution_id={}", flow_id, @@ -307,12 +439,12 @@ async fn handle_result( ) { let execution_id = execution_result.execution_identifier.clone(); - let reply_subject = { + let pending_reply = { let mut pending = pending_replies.lock().await; - pending.remove(&execution_id) + remove_pending_reply(&mut pending, &execution_id) }; - let Some(reply_subject) = reply_subject else { + let Some(pending_reply) = pending_reply else { log::error!( "No pending NATS reply subject found execution_id={}", execution_id @@ -323,16 +455,28 @@ async fn handle_result( log::debug!( "Publishing execution result for {} to reply subject {}", execution_id, - reply_subject + pending_reply.reply_subject ); let payload = execution_result.encode_to_vec(); - if let Err(err) = client.publish(reply_subject, payload.into()).await { + if let Err(err) = client + .publish(pending_reply.reply_subject, payload.into()) + .await + { log::error!( "Failed to publish action result for execution {}: {:?}", execution_id, err ); + return; + } + + if let Err(err) = client.flush().await { + log::error!( + "Failed to flush action result for execution {}: {:?}", + execution_id, + err + ); } } @@ -355,6 +499,7 @@ impl ActionTransferService for AquilaActionTransferServiceServer { let actions = Arc::new(Mutex::new(self.actions.clone())); let kv = self.kv.clone(); let client = self.client.clone(); + let module_service = self.module_service.clone(); let cfg_tx = self.action_config_tx.clone(); let is_static = self.is_static; let pending_replies: PendingReplies = Arc::new(Mutex::new(HashMap::new())); @@ -402,6 +547,7 @@ impl ActionTransferService for AquilaActionTransferServiceServer { &token, action_logon, actions.clone(), + module_service.clone(), client.clone(), cfg_tx.clone(), tx.clone(), @@ -465,9 +611,10 @@ impl ActionTransferService for AquilaActionTransferServiceServer { } tucana::aquila::action_transfer_request::Data::Result(execution_result) => { log::debug!( - "Received execution result execution_id={} action={}", + "Received execution result execution_id={} action={}, result={:#?}", execution_result.execution_identifier, identifier, + execution_result ); handle_result(execution_result, client.clone(), pending_replies.clone()) @@ -494,7 +641,7 @@ async fn forward_nats_to_action( while let Some(msg) = sub.next().await { log::debug!("Received RemoteRuntime execution request"); - let execution = match ActionExecutionRequest::decode(msg.payload.as_ref()) { + let mut execution = match ActionExecutionRequest::decode(msg.payload.as_ref()) { Ok(req) => req, Err(err) => { log::error!("Invalid execution request payload: {:?}", err); @@ -502,6 +649,18 @@ async fn forward_nats_to_action( } }; + let subject_execution_id = subject_execution_identifier(&msg.subject); + if execution.execution_identifier.is_empty() { + if let Some(subject_execution_id) = subject_execution_id.as_ref() { + log::warn!( + "Filled missing action execution identifier from NATS subject subject={} execution_id={}", + msg.subject, + subject_execution_id + ); + execution.execution_identifier = subject_execution_id.clone(); + } + } + let execution_id = execution.execution_identifier.clone(); let Some(reply_subject) = msg.reply.clone() else { @@ -512,15 +671,32 @@ async fn forward_nats_to_action( continue; }; + let keys = pending_reply_keys(&execution_id, subject_execution_id.as_deref()); + if keys.is_empty() { + log::error!( + "Cannot store NATS reply subject without execution identifier subject={} reply_subject={}", + msg.subject, + reply_subject + ); + continue; + } + { let mut pending = pending_replies.lock().await; - pending.insert(execution_id.clone(), reply_subject.clone()); + insert_pending_reply(&mut pending, reply_subject.clone(), keys.clone()); } log::debug!( - "Stored reply subject reply_subject={} execution_id={}", + "Stored reply subject reply_subject={} execution_id={} keys={:?}", reply_subject, - execution_id + execution_id, + keys + ); + + log::debug!( + "Forwarding execution request to action execution_id={} request={:#?}", + execution_id, + execution ); let resp = ActionTransferResponse { @@ -534,7 +710,7 @@ async fn forward_nats_to_action( // cleanup, since the request can no longer be delivered to the action let mut pending = pending_replies.lock().await; - pending.remove(&execution_id); + remove_pending_reply(&mut pending, &execution_id); break; } @@ -542,3 +718,100 @@ async fn forward_nats_to_action( log::debug!("Execution forwarder stopped"); } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn overwrite_module_definition_sources_uses_action_source() { + let mut module = tucana::shared::Module { + flow_types: vec![tucana::shared::FlowType { + definition_source: Some("module.old".to_string()), + ..Default::default() + }], + runtime_flow_types: vec![tucana::shared::RuntimeFlowType { + definition_source: Some("module.old".to_string()), + ..Default::default() + }], + function_definitions: vec![tucana::shared::FunctionDefinition { + definition_source: "module.old".to_string(), + ..Default::default() + }], + runtime_function_definitions: vec![tucana::shared::RuntimeFunctionDefinition { + definition_source: "module.old".to_string(), + ..Default::default() + }], + definition_data_types: vec![tucana::shared::DefinitionDataType { + definition_source: "module.old".to_string(), + ..Default::default() + }], + ..Default::default() + }; + + overwrite_module_definition_sources(&mut module, "send-email"); + + assert_eq!( + module.flow_types[0].definition_source.as_deref(), + Some("action.send-email") + ); + assert_eq!( + module.runtime_flow_types[0].definition_source.as_deref(), + Some("action.send-email") + ); + assert_eq!( + module.function_definitions[0].definition_source, + "action.send-email" + ); + assert_eq!( + module.runtime_function_definitions[0].definition_source, + "action.send-email" + ); + assert_eq!( + module.definition_data_types[0].definition_source, + "action.send-email" + ); + } + + #[test] + fn pending_reply_keys_include_payload_and_subject_ids_once() { + assert_eq!( + pending_reply_keys("payload-id", Some("subject-id")), + vec!["payload-id".to_string(), "subject-id".to_string()] + ); + assert_eq!( + pending_reply_keys("same-id", Some("same-id")), + vec!["same-id".to_string()] + ); + assert_eq!( + pending_reply_keys("", Some("subject-id")), + vec!["subject-id".to_string()] + ); + } + + #[test] + fn remove_pending_reply_removes_all_aliases() { + let reply_subject = Subject::from("_INBOX.reply"); + let mut pending = HashMap::new(); + + insert_pending_reply( + &mut pending, + reply_subject.clone(), + vec!["payload-id".to_string(), "subject-id".to_string()], + ); + + let removed = remove_pending_reply(&mut pending, "subject-id") + .expect("pending reply should be found by alias"); + + assert_eq!(removed.reply_subject, reply_subject); + assert!(pending.is_empty()); + } + + #[test] + fn subject_execution_identifier_uses_last_subject_token() { + assert_eq!( + subject_execution_identifier(&Subject::from("action.example.execution-id")), + Some("execution-id".to_string()) + ); + } +} diff --git a/src/server/dynamic_server.rs b/src/server/dynamic_server.rs index 7e6c739..9b66ce3 100644 --- a/src/server/dynamic_server.rs +++ b/src/server/dynamic_server.rs @@ -137,6 +137,7 @@ impl AquilaDynamicServer { self.nats_client.clone(), self.kv_store.as_ref().clone(), self.service_configuration.clone(), + Some(module_service.clone()), self.action_config_tx.clone(), false, ); diff --git a/src/server/static_server.rs b/src/server/static_server.rs index 48c924f..55dd6d4 100644 --- a/src/server/static_server.rs +++ b/src/server/static_server.rs @@ -56,6 +56,7 @@ impl AquilaStaticServer { self.nats_client.clone(), self.kv_store.as_ref().clone(), self.service_configuration.clone(), + None, self.action_config_tx.clone(), true, );