From 45715b6fa2080ccf13b2275658378cc498065cc8 Mon Sep 17 00:00:00 2001 From: Raphael Date: Sun, 21 Jun 2026 19:59:51 +0200 Subject: [PATCH 1/3] deps: added base64 --- Cargo.lock | 1 + adapter/rest/Cargo.toml | 1 + 2 files changed, 2 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index a8e4e4c..2a651cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1534,6 +1534,7 @@ version = "0.0.0" dependencies = [ "anyhow", "base", + "base64", "code0-flow", "http-body-util", "hyper", diff --git a/adapter/rest/Cargo.toml b/adapter/rest/Cargo.toml index b0b4de9..ce11b19 100644 --- a/adapter/rest/Cargo.toml +++ b/adapter/rest/Cargo.toml @@ -13,6 +13,7 @@ regex = { workspace = true } tonic = { workspace = true } base = { workspace = true } anyhow = { workspace = true } +base64 = "0.22.1" hyper-util = "0.1.19" hyper = "1.8.1" http-body-util = "0.1.3" From 1264ce371da997ba0773dd5cd8c3085d75e3e40b Mon Sep 17 00:00:00 2001 From: Raphael Date: Sun, 21 Jun 2026 20:37:56 +0200 Subject: [PATCH 2/3] feat: added auth module --- adapter/rest/Cargo.toml | 1 + adapter/rest/src/auth/credentials.rs | 140 +++++++++++++++++++++++++++ adapter/rest/src/auth/jwt.rs | 134 +++++++++++++++++++++++++ adapter/rest/src/auth/mod.rs | 65 +++++++++++++ adapter/rest/src/auth/settings.rs | 51 ++++++++++ adapter/rest/src/auth/types.rs | 113 +++++++++++++++++++++ adapter/rest/src/main.rs | 41 +++++--- 7 files changed, 530 insertions(+), 15 deletions(-) create mode 100644 adapter/rest/src/auth/credentials.rs create mode 100644 adapter/rest/src/auth/jwt.rs create mode 100644 adapter/rest/src/auth/mod.rs create mode 100644 adapter/rest/src/auth/settings.rs create mode 100644 adapter/rest/src/auth/types.rs diff --git a/adapter/rest/Cargo.toml b/adapter/rest/Cargo.toml index ce11b19..dfe21f8 100644 --- a/adapter/rest/Cargo.toml +++ b/adapter/rest/Cargo.toml @@ -14,6 +14,7 @@ tonic = { workspace = true } base = { workspace = true } anyhow = { workspace = true } base64 = "0.22.1" +ring = "0.17.14" hyper-util = "0.1.19" hyper = "1.8.1" http-body-util = "0.1.3" diff --git a/adapter/rest/src/auth/credentials.rs b/adapter/rest/src/auth/credentials.rs new file mode 100644 index 0000000..084ec94 --- /dev/null +++ b/adapter/rest/src/auth/credentials.rs @@ -0,0 +1,140 @@ +use base64::Engine; +use tucana::shared::{Struct, Value, value::Kind}; + +use super::jwt::validate_hs256_jwt; +use super::types::AuthenticationType; + +pub(super) fn matches_authorization( + auth_type: AuthenticationType, + auth_value: &Value, + authorization: &str, +) -> bool { + match auth_type { + AuthenticationType::BearerJwt => { + let Some(secret) = value_as_string(auth_value) else { + return false; + }; + + validate_hs256_jwt(authorization, secret) + } + AuthenticationType::BearerStatic => { + let Some(expected_token) = value_as_string(auth_value) else { + return false; + }; + + authorization.trim() == format!("Bearer {}", expected_token.trim()) + } + AuthenticationType::Basic => { + let Some(credentials) = basic_credentials(auth_value) else { + return false; + }; + + let expected_encoded = + base64::engine::general_purpose::STANDARD.encode(credentials.as_bytes()); + authorization.trim() == format!("Basic {}", expected_encoded) + } + } +} + +fn basic_credentials(value: &Value) -> Option { + if let Some(credentials) = value_as_string(value) { + return Some(credentials.trim().to_string()); + } + + let Some(Kind::StructValue(Struct { fields })) = value.kind.as_ref() else { + return None; + }; + + let username = fields + .get("username") + .or_else(|| fields.get("user")) + .and_then(value_as_string)?; + let password = fields + .get("password") + .or_else(|| fields.get("pass")) + .and_then(value_as_string)?; + + Some(format!("{username}:{password}")) +} + +fn value_as_string(value: &Value) -> Option<&str> { + match value.kind.as_ref() { + Some(Kind::StringValue(value)) => Some(value.as_str()), + _ => None, + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use tucana::shared::{Struct, Value, value::Kind}; + + use super::matches_authorization; + use crate::auth::jwt::tests::create_hs256_jwt; + use crate::auth::types::AuthenticationType; + + #[test] + fn bearer_static_matches_expected_token() { + let value = string_value("secret"); + + assert!(matches_authorization( + AuthenticationType::BearerStatic, + &value, + "Bearer secret" + )); + assert!(!matches_authorization( + AuthenticationType::BearerStatic, + &value, + "Bearer other" + )); + } + + #[test] + fn bearer_jwt_verifies_hs256_token() { + let secret = string_value("jwt-secret"); + let token = create_hs256_jwt("jwt-secret", r#"{"sub":"123"}"#); + + assert!(matches_authorization( + AuthenticationType::BearerJwt, + &secret, + &format!("Bearer {token}") + )); + assert!(!matches_authorization( + AuthenticationType::BearerJwt, + &secret, + "Bearer header.payload.bad-signature" + )); + } + + #[test] + fn basic_matches_encoded_username_password_object_pair() { + let value = basic_value("user", "pass"); + + assert!(matches_authorization( + AuthenticationType::Basic, + &value, + "Basic dXNlcjpwYXNz" + )); + assert!(!matches_authorization( + AuthenticationType::Basic, + &value, + "Basic dXNlcjpvdGhlcg==" + )); + } + + fn string_value(value: &str) -> Value { + Value { + kind: Some(Kind::StringValue(value.to_string())), + } + } + + fn basic_value(username: &str, password: &str) -> Value { + let mut fields = HashMap::new(); + fields.insert("username".to_string(), string_value(username)); + fields.insert("password".to_string(), string_value(password)); + + Value { + kind: Some(Kind::StructValue(Struct { fields })), + } + } +} diff --git a/adapter/rest/src/auth/jwt.rs b/adapter/rest/src/auth/jwt.rs new file mode 100644 index 0000000..dc32178 --- /dev/null +++ b/adapter/rest/src/auth/jwt.rs @@ -0,0 +1,134 @@ +use base64::Engine; +use ring::hmac; +use std::time::{SystemTime, UNIX_EPOCH}; + +pub(super) fn validate_hs256_jwt(authorization: &str, secret: &str) -> bool { + let Some(token) = authorization.trim().strip_prefix("Bearer ") else { + return false; + }; + + validate_token(token, secret) +} + +fn validate_token(token: &str, secret: &str) -> bool { + let mut segments = token.split('.'); + let Some(header_segment) = segments.next() else { + return false; + }; + let Some(payload_segment) = segments.next() else { + return false; + }; + let Some(signature_segment) = segments.next() else { + return false; + }; + + if segments.next().is_some() { + return false; + } + + let Some(header) = decode_json_segment(header_segment) else { + return false; + }; + + // The flow stores only one shared secret, so JWT auth is intentionally + // limited to HS256. Supporting RS/ES algorithms would require a public key + // or JWKS setting instead of a secret string. + if header.get("alg").and_then(|alg| alg.as_str()) != Some("HS256") { + return false; + } + + if !verify_signature(header_segment, payload_segment, signature_segment, secret) { + return false; + } + + let Some(payload) = decode_json_segment(payload_segment) else { + return false; + }; + + token_is_not_expired(&payload) +} + +fn verify_signature( + header_segment: &str, + payload_segment: &str, + signature_segment: &str, + secret: &str, +) -> bool { + let Some(signature) = decode_base64_url(signature_segment) else { + return false; + }; + + let signing_input = format!("{header_segment}.{payload_segment}"); + let key = hmac::Key::new(hmac::HMAC_SHA256, secret.as_bytes()); + + hmac::verify(&key, signing_input.as_bytes(), &signature).is_ok() +} + +fn token_is_not_expired(payload: &serde_json::Value) -> bool { + let Some(exp) = payload.get("exp").and_then(|exp| exp.as_i64()) else { + return true; + }; + + let Ok(now) = SystemTime::now().duration_since(UNIX_EPOCH) else { + return false; + }; + + exp > now.as_secs() as i64 +} + +fn decode_json_segment(segment: &str) -> Option { + let bytes = decode_base64_url(segment)?; + serde_json::from_slice::(&bytes).ok() +} + +fn decode_base64_url(value: &str) -> Option> { + base64::engine::general_purpose::URL_SAFE_NO_PAD + .decode(value) + .or_else(|_| base64::engine::general_purpose::URL_SAFE.decode(value)) + .ok() +} + +#[cfg(test)] +pub(crate) mod tests { + use base64::Engine; + use ring::hmac; + + use super::validate_hs256_jwt; + + #[test] + fn verifies_hs256_token() { + let token = create_hs256_jwt("jwt-secret", r#"{"sub":"123"}"#); + + assert!(validate_hs256_jwt(&format!("Bearer {token}"), "jwt-secret")); + } + + #[test] + fn rejects_expired_token() { + let token = create_hs256_jwt("jwt-secret", r#"{"exp":1}"#); + + assert!(!validate_hs256_jwt( + &format!("Bearer {token}"), + "jwt-secret" + )); + } + + #[test] + fn rejects_wrong_secret() { + let token = create_hs256_jwt("jwt-secret", r#"{"sub":"123"}"#); + + assert!(!validate_hs256_jwt(&format!("Bearer {token}"), "wrong")); + } + + pub(crate) fn create_hs256_jwt(secret: &str, payload: &str) -> String { + let header = r#"{"alg":"HS256","typ":"JWT"}"#; + let header_segment = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(header); + let payload_segment = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(payload); + let signing_input = format!("{header_segment}.{payload_segment}"); + let key = hmac::Key::new(hmac::HMAC_SHA256, secret.as_bytes()); + let signature = hmac::sign(&key, signing_input.as_bytes()); + let signature_segment = + base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(signature.as_ref()); + + format!("{signing_input}.{signature_segment}") + } +} diff --git a/adapter/rest/src/auth/mod.rs b/adapter/rest/src/auth/mod.rs new file mode 100644 index 0000000..9e18e64 --- /dev/null +++ b/adapter/rest/src/auth/mod.rs @@ -0,0 +1,65 @@ +mod credentials; +mod jwt; +mod settings; +mod types; + +use hyper::{ + HeaderMap, + header::{AUTHORIZATION, HeaderValue, WWW_AUTHENTICATE}, +}; +use tucana::shared::ValidationFlow; + +use self::credentials::matches_authorization; +use self::settings::{FlowAuthConfig, flow_auth_config}; +pub use self::types::AuthenticationError; + +pub fn validate_flow_auth( + flow: &ValidationFlow, + headers: &HeaderMap, +) -> Result<(), AuthenticationError> { + let auth_type = match flow_auth_config(flow) { + FlowAuthConfig::Unauthenticated => return Ok(()), + FlowAuthConfig::Invalid => { + log::warn!( + "auth reject: flow_id={} reason=invalid_httpAuth", + flow.flow_id + ); + return Err(AuthenticationError::InvalidAuthorization); + } + FlowAuthConfig::Authenticated(auth_type) => auth_type, + }; + + let Some(auth_value) = settings::flow_setting_value(flow, "httpAuthValue") else { + log::warn!( + "auth reject: flow_id={} reason=missing_or_invalid_httpAuthValue", + flow.flow_id + ); + return Err(AuthenticationError::invalid_for(auth_type)); + }; + + let Some(authorization) = headers + .get(AUTHORIZATION) + .and_then(|value| value.to_str().ok()) + else { + log::debug!( + "auth reject: flow_id={} reason=missing_authorization", + flow.flow_id + ); + return Err(AuthenticationError::missing_for(auth_type)); + }; + + if matches_authorization(auth_type, auth_value, authorization) { + log::debug!("auth accepted: flow_id={}", flow.flow_id); + Ok(()) + } else { + log::debug!( + "auth reject: flow_id={} reason=authorization_mismatch", + flow.flow_id + ); + Err(AuthenticationError::invalid_for(auth_type)) + } +} + +pub fn authenticate_header_name() -> hyper::header::HeaderName { + WWW_AUTHENTICATE +} diff --git a/adapter/rest/src/auth/settings.rs b/adapter/rest/src/auth/settings.rs new file mode 100644 index 0000000..fbb4d79 --- /dev/null +++ b/adapter/rest/src/auth/settings.rs @@ -0,0 +1,51 @@ +use tucana::shared::{ValidationFlow, Value, value::Kind}; + +use super::types::{AuthenticationType, is_unauthenticated_value}; + +pub(super) enum FlowAuthConfig { + Unauthenticated, + Authenticated(AuthenticationType), + Invalid, +} + +pub(super) fn flow_auth_config(flow: &ValidationFlow) -> FlowAuthConfig { + let Some(raw_auth_type) = flow_setting_as_string(flow, "httpAuth") else { + return FlowAuthConfig::Unauthenticated; + }; + + if is_unauthenticated_value(raw_auth_type) { + return FlowAuthConfig::Unauthenticated; + } + + match AuthenticationType::parse(raw_auth_type) { + Some(auth_type) => FlowAuthConfig::Authenticated(auth_type), + None => { + log::warn!( + "auth config invalid: flow_id={} httpAuth={:?}", + flow.flow_id, + raw_auth_type + ); + FlowAuthConfig::Invalid + } + } +} + +pub(super) fn flow_setting_value<'a>( + flow: &'a ValidationFlow, + flow_setting_id: &str, +) -> Option<&'a Value> { + flow.settings + .iter() + .find(|setting| setting.flow_setting_id == flow_setting_id) + .and_then(|setting| setting.value.as_ref()) +} + +fn flow_setting_as_string<'a>(flow: &'a ValidationFlow, flow_setting_id: &str) -> Option<&'a str> { + flow_setting_value(flow, flow_setting_id) + .and_then(|value| value.kind.as_ref()) + .and_then(|kind| match kind { + Kind::StringValue(value) => Some(value.as_str()), + // Missing or null/non-string httpAuth means the flow remains public. + _ => None, + }) +} diff --git a/adapter/rest/src/auth/types.rs b/adapter/rest/src/auth/types.rs new file mode 100644 index 0000000..aa4ad47 --- /dev/null +++ b/adapter/rest/src/auth/types.rs @@ -0,0 +1,113 @@ +use hyper::{StatusCode, header::HeaderValue}; + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub(crate) enum AuthenticationType { + BearerJwt, + BearerStatic, + Basic, +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum AuthenticationError { + MissingAuthorization(AuthenticationType), + InvalidAuthorizationFor(AuthenticationType), + InvalidAuthorization, +} + +impl AuthenticationError { + pub(super) fn missing_for(auth_type: AuthenticationType) -> Self { + Self::MissingAuthorization(auth_type) + } + + pub(super) fn invalid_for(auth_type: AuthenticationType) -> Self { + Self::InvalidAuthorizationFor(auth_type) + } + + pub fn status_code(self) -> StatusCode { + StatusCode::UNAUTHORIZED + } + + pub fn message(self) -> &'static str { + match self { + Self::MissingAuthorization(_) => "Missing authorization", + Self::InvalidAuthorizationFor(_) | Self::InvalidAuthorization => { + "Invalid authorization" + } + } + } + + pub fn challenge(self) -> HeaderValue { + match self.auth_type() { + Some(AuthenticationType::BearerJwt | AuthenticationType::BearerStatic) => { + HeaderValue::from_static("Bearer") + } + Some(AuthenticationType::Basic) => HeaderValue::from_static("Basic"), + None => HeaderValue::from_static("Bearer"), + } + } + + fn auth_type(self) -> Option { + match self { + Self::MissingAuthorization(auth_type) | Self::InvalidAuthorizationFor(auth_type) => { + Some(auth_type) + } + Self::InvalidAuthorization => None, + } + } +} + +impl AuthenticationType { + pub(super) fn parse(value: &str) -> Option { + let normalized = normalize_auth_type(value); + + match normalized.as_str() { + "bearerjwt" | "jwt" => Some(Self::BearerJwt), + "bearerstatic" | "bearer" | "staticbearer" => Some(Self::BearerStatic), + "basicaccessauth" | "basic" | "basicauth" => Some(Self::Basic), + _ => None, + } + } +} + +pub(super) fn is_unauthenticated_value(value: &str) -> bool { + matches!( + normalize_auth_type(value).as_str(), + "" | "none" | "noauth" | "unauthenticated" + ) +} + +fn normalize_auth_type(value: &str) -> String { + value + .trim() + .replace(['_', '-', ' '], "") + .to_ascii_lowercase() +} + +#[cfg(test)] +mod tests { + use super::{AuthenticationType, is_unauthenticated_value}; + + #[test] + fn rest_auth_type_values_parse() { + assert_eq!( + AuthenticationType::parse("Bearer JWT"), + Some(AuthenticationType::BearerJwt) + ); + assert_eq!( + AuthenticationType::parse("Bearer static"), + Some(AuthenticationType::BearerStatic) + ); + assert_eq!( + AuthenticationType::parse("Basic"), + Some(AuthenticationType::Basic) + ); + } + + #[test] + fn unauthenticated_values_are_explicit() { + assert!(is_unauthenticated_value("unauthenticated")); + assert!(is_unauthenticated_value("no-auth")); + assert!(is_unauthenticated_value("none")); + assert!(is_unauthenticated_value("")); + } +} diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index 9a8f40f..466aca3 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -21,8 +21,10 @@ use tucana::shared::{ Endpoint, ModuleDefinition, Struct, ValidationFlow, Value, helper::value::ToValue, value::Kind, }; +use crate::auth::{authenticate_header_name, validate_flow_auth}; use crate::response::{error_to_http_response, value_to_http_response}; +mod auth; mod config; mod content_type; mod response; @@ -112,21 +114,6 @@ pub async fn handle_request( } }; - let request_body_value = match content_type::parse_body_from_headers(&headers, &body_bytes) { - Ok(value) => value, - Err(err) => { - log::warn!("Failed to parse request body: {}", err); - let status_code = match err { - content_type::BodyParseError::UnsupportedContentType { .. } => { - StatusCode::UNSUPPORTED_MEDIA_TYPE - } - _ => StatusCode::BAD_REQUEST, - }; - - return Ok(error_to_http_response(status_code, &err.to_string())); - } - }; - // slug matching let Some(slug) = route::extract_slug_from_path(&path) else { return Ok(error_to_http_response( @@ -143,6 +130,30 @@ pub async fn handle_request( let resp = match store.get_possible_flow_match(pattern, route).await { FlowIdentifyResult::Single(flow) => { + if let Err(err) = validate_flow_auth(&flow, &headers) { + let mut response = error_to_http_response(err.status_code(), err.message()); + response + .headers_mut() + .insert(authenticate_header_name(), err.challenge()); + return Ok(response); + } + + let request_body_value = + match content_type::parse_body_from_headers(&headers, &body_bytes) { + Ok(value) => value, + Err(err) => { + log::warn!("Failed to parse request body: {}", err); + let status_code = match err { + content_type::BodyParseError::UnsupportedContentType { .. } => { + StatusCode::UNSUPPORTED_MEDIA_TYPE + } + _ => StatusCode::BAD_REQUEST, + }; + + return Ok(error_to_http_response(status_code, &err.to_string())); + } + }; + let mut header_fields = std::collections::HashMap::new(); let mut fields = std::collections::HashMap::new(); From f1f3271ae72d8d343d9aa929ca80378c3add0618 Mon Sep 17 00:00:00 2001 From: Raphael Date: Sun, 21 Jun 2026 22:21:40 +0200 Subject: [PATCH 3/3] ref: major refactoring --- Cargo.lock | 3 + adapter/rest/Cargo.toml | 2 + adapter/rest/src/main.rs | 141 +-------- adapter/rest/src/request/input.rs | 167 +++++++++++ adapter/rest/src/request/mod.rs | 96 ++++++ .../rest/src/{response.rs => response/mod.rs} | 31 +- adapter/rest/src/route.rs | 274 +++++++++++++++++- 7 files changed, 570 insertions(+), 144 deletions(-) create mode 100644 adapter/rest/src/request/input.rs create mode 100644 adapter/rest/src/request/mod.rs rename adapter/rest/src/{response.rs => response/mod.rs} (80%) diff --git a/Cargo.lock b/Cargo.lock index 2a651cd..00d4ae3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1536,12 +1536,15 @@ dependencies = [ "base", "base64", "code0-flow", + "form_urlencoded", "http-body-util", "hyper", "hyper-util", "log", + "percent-encoding", "prost", "regex", + "ring", "serde_json", "tokio", "tonic", diff --git a/adapter/rest/Cargo.toml b/adapter/rest/Cargo.toml index dfe21f8..48aa466 100644 --- a/adapter/rest/Cargo.toml +++ b/adapter/rest/Cargo.toml @@ -15,6 +15,8 @@ base = { workspace = true } anyhow = { workspace = true } base64 = "0.22.1" ring = "0.17.14" +form_urlencoded = "1.2.1" +percent-encoding = "2.3.1" hyper-util = "0.1.19" hyper = "1.8.1" http-body-util = "0.1.3" diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index 466aca3..59459a3 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -1,32 +1,20 @@ use base::{ runner::{ServerContext, ServerRunner}, - store::{FlowExecutionResult, FlowIdentifyResult}, traits::Server as ServerTrait, }; use code0_flow::flow_service::ModuleDefinitionAppendix; -use http_body_util::{BodyExt, Full}; use hyper::server::conn::http1; -use hyper::{Request, Response}; -use hyper::{ - StatusCode, - body::{Bytes, Incoming}, -}; use hyper_util::rt::TokioIo; -use std::convert::Infallible; use std::net::SocketAddr; use std::sync::Arc; use tokio::net::TcpListener; use tonic::async_trait; -use tucana::shared::{ - Endpoint, ModuleDefinition, Struct, ValidationFlow, Value, helper::value::ToValue, value::Kind, -}; - -use crate::auth::{authenticate_header_name, validate_flow_auth}; -use crate::response::{error_to_http_response, value_to_http_response}; +use tucana::shared::{Endpoint, ModuleDefinition}; mod auth; mod config; mod content_type; +mod request; mod response; mod route; @@ -69,129 +57,6 @@ struct HttpServer { addr: Option, } -async fn execute_flow_to_hyper_response( - flow: ValidationFlow, - body: Value, - store: Arc, -) -> Response> { - match store.execute_flow_with_emitter(flow, Some(body)).await { - FlowExecutionResult::Ongoing(result) => { - log::debug!("Received first ongoing response from emitter"); - value_to_http_response(result) - } - FlowExecutionResult::Failed => { - log::error!("Flow execution failed event received from emitter"); - error_to_http_response(StatusCode::INTERNAL_SERVER_ERROR, "Internal server error") - } - FlowExecutionResult::FinishedWithoutOngoing => Response::builder() - .status(StatusCode::NO_CONTENT) - .body(Full::new(Bytes::new())) - .unwrap(), - FlowExecutionResult::TransportError => { - log::error!("Flow execution transport error"); - error_to_http_response(StatusCode::INTERNAL_SERVER_ERROR, "Internal server error") - } - } -} - -pub async fn handle_request( - req: Request, - store: Arc, -) -> Result>, Infallible> { - let method = req.method().clone(); - let path = req.uri().path().to_string(); - let headers = req.headers().clone(); - - // Read full body - let body_bytes = match BodyExt::collect(req.into_body()).await { - Ok(collected) => collected.to_bytes().to_vec(), - Err(err) => { - log::error!("Failed to read request body: {}", err); - return Ok(error_to_http_response( - StatusCode::BAD_REQUEST, - "Failed to read request body", - )); - } - }; - - // slug matching - let Some(slug) = route::extract_slug_from_path(&path) else { - return Ok(error_to_http_response( - StatusCode::BAD_REQUEST, - "Missing slug in path", - )); - }; - - let pattern = format!("REST.{}.*", slug); - let route = route::RequestRoute { - url: path.clone(), - method, - }; - - let resp = match store.get_possible_flow_match(pattern, route).await { - FlowIdentifyResult::Single(flow) => { - if let Err(err) = validate_flow_auth(&flow, &headers) { - let mut response = error_to_http_response(err.status_code(), err.message()); - response - .headers_mut() - .insert(authenticate_header_name(), err.challenge()); - return Ok(response); - } - - let request_body_value = - match content_type::parse_body_from_headers(&headers, &body_bytes) { - Ok(value) => value, - Err(err) => { - log::warn!("Failed to parse request body: {}", err); - let status_code = match err { - content_type::BodyParseError::UnsupportedContentType { .. } => { - StatusCode::UNSUPPORTED_MEDIA_TYPE - } - _ => StatusCode::BAD_REQUEST, - }; - - return Ok(error_to_http_response(status_code, &err.to_string())); - } - }; - - let mut header_fields = std::collections::HashMap::new(); - let mut fields = std::collections::HashMap::new(); - - for (name, value) in headers.iter() { - let key = name.as_str().to_owned(); - let value_str = value - .to_str() - .map(str::to_owned) - .unwrap_or_else(|_| String::from_utf8_lossy(value.as_bytes()).into_owned()); - - header_fields.insert(key, value_str.to_value()); - } - - if let Some(v) = request_body_value { - fields.insert(String::from("payload"), v); - }; - - fields.insert( - String::from("headers"), - Value { - kind: Some(Kind::StructValue(Struct { - fields: header_fields, - })), - }, - ); - - let input = Value { - kind: Some(Kind::StructValue(Struct { fields })), - }; - - execute_flow_to_hyper_response(flow, input, store).await - } - _ => error_to_http_response(StatusCode::NOT_FOUND, "No flow found for path"), - }; - - Ok(resp) -} - #[async_trait] impl ServerTrait for HttpServer { async fn init(&mut self, ctx: &ServerContext) -> anyhow::Result<()> { @@ -247,7 +112,7 @@ impl ServerTrait for HttpServer { tokio::spawn(async move { let svc = hyper::service::service_fn(move |req| { let store = Arc::clone(&store); - async move { handle_request(req, store).await } + async move { request::handle(req, store).await } }); let conn = http1::Builder::new().serve_connection(io, svc); diff --git a/adapter/rest/src/request/input.rs b/adapter/rest/src/request/input.rs new file mode 100644 index 0000000..98fe71b --- /dev/null +++ b/adapter/rest/src/request/input.rs @@ -0,0 +1,167 @@ +use hyper::{HeaderMap, header::HeaderValue}; +use std::collections::HashMap; +use tucana::shared::{Struct, ValidationFlow, Value, helper::value::ToValue, value::Kind}; + +use crate::route; + +pub(super) fn build_flow_input( + flow: &ValidationFlow, + path: &str, + query: Option<&str>, + headers: &HeaderMap, + payload: Option, +) -> Value { + let mut fields = HashMap::new(); + + if let Some(payload) = payload { + fields.insert(String::from("payload"), payload); + } + + fields.insert( + String::from("headers"), + string_map_to_value(header_map(headers)), + ); + fields.insert( + String::from("query_params"), + string_map_to_value(query_params(query)), + ); + fields.insert( + String::from("path_params"), + string_map_to_value(route::extract_path_params(flow, path)), + ); + + Value { + kind: Some(Kind::StructValue(Struct { fields })), + } +} + +fn header_map(headers: &HeaderMap) -> HashMap { + headers + .iter() + .map(|(name, value)| { + let value = value + .to_str() + .map(str::to_owned) + .unwrap_or_else(|_| String::from_utf8_lossy(value.as_bytes()).into_owned()); + + (name.as_str().to_owned(), value) + }) + .collect() +} + +fn query_params(query: Option<&str>) -> HashMap { + let Some(query) = query else { + return HashMap::new(); + }; + + // Repeated query keys currently use last-write-wins because Taurus receives + // a simple object here, not a multi-map. + form_urlencoded::parse(query.as_bytes()) + .map(|(key, value)| (key.into_owned(), value.into_owned())) + .collect() +} + +fn string_map_to_value(map: HashMap) -> Value { + Value { + kind: Some(Kind::StructValue(Struct { + fields: map + .into_iter() + .map(|(key, value)| (key, value.to_value())) + .collect(), + })), + } +} + +#[cfg(test)] +mod tests { + use super::{build_flow_input, query_params, string_map_to_value}; + use hyper::HeaderMap; + use tucana::shared::{FlowSetting, Struct, ValidationFlow, Value, value::Kind}; + + #[test] + fn query_params_are_percent_decoded() { + let params = query_params(Some("search=hello+world&tag=a%2Fb&empty=")); + + assert_eq!( + params.get("search").map(String::as_str), + Some("hello world") + ); + assert_eq!(params.get("tag").map(String::as_str), Some("a/b")); + assert_eq!(params.get("empty").map(String::as_str), Some("")); + } + + #[test] + fn string_map_is_converted_to_struct_value() { + let value = + string_map_to_value([("id".to_string(), "42".to_string())].into_iter().collect()); + + let Value { + kind: Some(Kind::StructValue(Struct { fields })), + } = value + else { + panic!("expected struct value"); + }; + + assert_eq!( + fields.get("id").and_then(|value| value.kind.as_ref()), + Some(&Kind::StringValue("42".to_string())) + ); + } + + #[test] + fn flow_input_contains_query_and_path_params() { + let flow = ValidationFlow { + flow_id: 1, + project_slug: "project".to_string(), + settings: vec![FlowSetting { + database_id: None, + flow_setting_id: "httpURL".to_string(), + value: Some(Value { + kind: Some(Kind::StringValue("/users/:user_id".to_string())), + }), + cast: None, + }], + ..ValidationFlow::default() + }; + + let input = build_flow_input( + &flow, + "/project/users/42", + Some("search=hello+world"), + &HeaderMap::new(), + None, + ); + + assert_eq!( + nested_string_field(&input, "query_params", "search"), + Some("hello world") + ); + assert_eq!( + nested_string_field(&input, "path_params", "user_id"), + Some("42") + ); + } + + fn nested_string_field<'a>(value: &'a Value, field: &str, nested: &str) -> Option<&'a str> { + let Some(Kind::StructValue(Struct { fields })) = value.kind.as_ref() else { + return None; + }; + let Some(Value { + kind: + Some(Kind::StructValue(Struct { + fields: nested_fields, + })), + }) = fields.get(field) + else { + return None; + }; + let Some(Value { + kind: Some(Kind::StringValue(value)), + }) = nested_fields.get(nested) + else { + return None; + }; + + Some(value.as_str()) + } +} diff --git a/adapter/rest/src/request/mod.rs b/adapter/rest/src/request/mod.rs new file mode 100644 index 0000000..12c529f --- /dev/null +++ b/adapter/rest/src/request/mod.rs @@ -0,0 +1,96 @@ +mod input; + +use base::store::FlowIdentifyResult; +use http_body_util::{BodyExt, Full}; +use hyper::{ + Request, Response, StatusCode, + body::{Bytes, Incoming}, +}; +use std::convert::Infallible; +use std::sync::Arc; + +use crate::auth::{authenticate_header_name, validate_flow_auth}; +use crate::content_type; +use crate::response::{error_to_http_response, flow_execution_to_http_response}; +use crate::route::{self, RequestRoute}; + +pub async fn handle( + req: Request, + store: Arc, +) -> Result>, Infallible> { + let method = req.method().clone(); + let path = req.uri().path().to_string(); + let query = req.uri().query().map(str::to_owned); + let headers = req.headers().clone(); + + let body_bytes = match BodyExt::collect(req.into_body()).await { + Ok(collected) => collected.to_bytes().to_vec(), + Err(err) => { + log::error!("Failed to read request body: {}", err); + return Ok(error_to_http_response( + StatusCode::BAD_REQUEST, + "Failed to read request body", + )); + } + }; + + let Some(slug) = route::extract_slug_from_path(&path) else { + return Ok(error_to_http_response( + StatusCode::BAD_REQUEST, + "Missing slug in path", + )); + }; + + let pattern = format!("REST.{}.*", slug); + let route = RequestRoute { + url: path.clone(), + method, + }; + + let response = match store.get_possible_flow_match(pattern, route).await { + FlowIdentifyResult::Single(flow) => { + if let Err(err) = validate_flow_auth(&flow, &headers) { + let mut response = error_to_http_response(err.status_code(), err.message()); + response + .headers_mut() + .insert(authenticate_header_name(), err.challenge()); + return Ok(response); + } + + let request_body_value = match parse_request_body(&headers, &body_bytes) { + Ok(value) => value, + Err(response) => return Ok(response), + }; + + let input = input::build_flow_input( + &flow, + &path, + query.as_deref(), + &headers, + request_body_value, + ); + + flow_execution_to_http_response(flow, input, store).await + } + _ => error_to_http_response(StatusCode::NOT_FOUND, "No flow found for path"), + }; + + Ok(response) +} + +fn parse_request_body( + headers: &hyper::HeaderMap, + body_bytes: &[u8], +) -> Result, Response>> { + content_type::parse_body_from_headers(headers, body_bytes).map_err(|err| { + log::warn!("Failed to parse request body: {}", err); + let status_code = match err { + content_type::BodyParseError::UnsupportedContentType { .. } => { + StatusCode::UNSUPPORTED_MEDIA_TYPE + } + _ => StatusCode::BAD_REQUEST, + }; + + error_to_http_response(status_code, &err.to_string()) + }) +} diff --git a/adapter/rest/src/response.rs b/adapter/rest/src/response/mod.rs similarity index 80% rename from adapter/rest/src/response.rs rename to adapter/rest/src/response/mod.rs index d0230a2..85a37d2 100644 --- a/adapter/rest/src/response.rs +++ b/adapter/rest/src/response/mod.rs @@ -5,12 +5,39 @@ use hyper::{ header::{HeaderName, HeaderValue}, }; use std::collections::HashMap; +use std::sync::Arc; use tucana::shared::{ - Struct, Value, + Struct, ValidationFlow, Value, value::Kind::{self, StructValue}, }; use crate::content_type; +use base::store::FlowExecutionResult; + +pub async fn flow_execution_to_http_response( + flow: ValidationFlow, + input: Value, + store: Arc, +) -> Response> { + match store.execute_flow_with_emitter(flow, Some(input)).await { + FlowExecutionResult::Ongoing(result) => { + log::debug!("Received first ongoing response from emitter"); + value_to_http_response(result) + } + FlowExecutionResult::Failed => { + log::error!("Flow execution failed event received from emitter"); + error_to_http_response(StatusCode::INTERNAL_SERVER_ERROR, "Internal server error") + } + FlowExecutionResult::FinishedWithoutOngoing => Response::builder() + .status(StatusCode::NO_CONTENT) + .body(Full::new(Bytes::new())) + .unwrap(), + FlowExecutionResult::TransportError => { + log::error!("Flow execution transport error"); + error_to_http_response(StatusCode::INTERNAL_SERVER_ERROR, "Internal server error") + } + } +} pub fn error_to_http_response(status: StatusCode, msg: &str) -> Response> { let body = format!(r#"{{"error": "{}"}}"#, msg); @@ -51,7 +78,6 @@ pub fn value_to_http_response(value: Value) -> Response> { ); }; - // headers struct let Value { kind: Some(Kind::StructValue(Struct { fields: header_fields, @@ -82,7 +108,6 @@ pub fn value_to_http_response(value: Value) -> Response> { http_headers.insert("content-type".to_string(), "application/json".to_string()); } - // status_code number let Some(Kind::NumberValue(code)) = status_code_val.kind else { return error_to_http_response( StatusCode::INTERNAL_SERVER_ERROR, diff --git a/adapter/rest/src/route.rs b/adapter/rest/src/route.rs index c255e49..bb5a8ec 100644 --- a/adapter/rest/src/route.rs +++ b/adapter/rest/src/route.rs @@ -1,4 +1,5 @@ use base::traits::IdentifiableFlow; +use std::collections::HashMap; use tucana::shared::{ValidationFlow, value::Kind}; pub struct RequestRoute { @@ -52,7 +53,7 @@ impl IdentifiableFlow for RequestRoute { return false; }; - let route_pattern = format!("/{}{}", flow.project_slug, flow_http_url); + let route_pattern = flow_route_pattern(flow, flow_http_url); log::debug!( "route identify route check: flow_id={} httpURL={:?} resolved_pattern={:?} request_path={:?}", flow.flow_id, @@ -76,8 +77,30 @@ pub fn extract_slug_from_path(path: &str) -> Option<&str> { trimmed.split('/').next().filter(|s| !s.is_empty()) } +pub fn extract_path_params(flow: &ValidationFlow, path: &str) -> HashMap { + let Some(flow_http_url) = extract_flow_setting_as_string(flow, "httpURL") else { + return HashMap::new(); + }; + + extract_named_route_captures(&flow_route_pattern(flow, flow_http_url), path) +} + +fn flow_route_pattern(flow: &ValidationFlow, flow_http_url: &str) -> String { + format!("/{}{}", flow.project_slug, flow_http_url) +} + fn matches_route_pattern(pattern: &str, route: &str) -> bool { - let anchored_pattern = format!("^{}$", pattern); + let anchored_pattern = match compile_route_pattern(pattern) { + Ok(pattern) => pattern, + Err(err) => { + log::error!( + "route pattern invalid: raw_pattern={:?} error={}", + pattern, + err + ); + return false; + } + }; log::debug!( "route pattern eval: raw_pattern={:?} anchored_pattern={:?} route={:?}", pattern, @@ -107,6 +130,153 @@ fn matches_route_pattern(pattern: &str, route: &str) -> bool { is_match } +fn extract_named_route_captures(pattern: &str, route: &str) -> HashMap { + let anchored_pattern = match compile_route_pattern(pattern) { + Ok(pattern) => pattern, + Err(err) => { + log::error!( + "route path params invalid pattern: raw_pattern={:?} error={}", + pattern, + err + ); + return HashMap::new(); + } + }; + let regex = match regex::Regex::new(&anchored_pattern) { + Ok(regex) => regex, + Err(err) => { + log::error!( + "route path params invalid regex: anchored_pattern={:?} error={}", + anchored_pattern, + err + ); + return HashMap::new(); + } + }; + + let Some(captures) = regex.captures(route) else { + return HashMap::new(); + }; + + regex + .capture_names() + .flatten() + .filter_map(|name| { + captures.name(name).map(|value| { + ( + name.to_string(), + percent_encoding::percent_decode_str(value.as_str()) + .decode_utf8_lossy() + .into_owned(), + ) + }) + }) + .collect() +} + +fn compile_route_pattern(pattern: &str) -> Result { + if is_url_pattern_style(pattern) { + return compile_url_pattern_path(pattern).map(|pattern| format!("^{}$", pattern)); + } + + Ok(format!("^{}$", pattern)) +} + +fn is_url_pattern_style(pattern: &str) -> bool { + let bytes = pattern.as_bytes(); + + bytes.iter().enumerate().any(|(index, byte)| { + (*byte == b':' + && bytes + .get(index + 1) + .is_some_and(|next| next.is_ascii_alphabetic() || *next == b'_')) + || (*byte == b'*' && bytes.get(index.wrapping_sub(1)) != Some(&b'.')) + }) +} + +fn compile_url_pattern_path(pattern: &str) -> Result { + let mut compiled = String::new(); + let chars: Vec = pattern.chars().collect(); + let mut index = 0; + + while index < chars.len() { + match chars[index] { + ':' if is_param_start(chars.get(index + 1).copied()) => { + let (name, next_index) = read_param_name(&chars, index + 1); + index = next_index; + + let (capture_pattern, next_index) = if chars.get(index) == Some(&'(') { + read_balanced_group(&chars, index)? + } else { + (String::from("[^/]+"), index) + }; + + compiled.push_str(&format!("(?P<{name}>{capture_pattern})")); + index = next_index; + } + '*' => { + compiled.push_str(".*"); + index += 1; + } + value => { + compiled.push_str(®ex::escape(&value.to_string())); + index += 1; + } + } + } + + Ok(compiled) +} + +fn is_param_start(value: Option) -> bool { + value.is_some_and(|value| value.is_ascii_alphabetic() || value == '_') +} + +fn read_param_name(chars: &[char], start: usize) -> (String, usize) { + let mut index = start; + let mut name = String::new(); + + while let Some(value) = chars.get(index) { + if value.is_ascii_alphanumeric() || *value == '_' { + name.push(*value); + index += 1; + } else { + break; + } + } + + (name, index) +} + +fn read_balanced_group(chars: &[char], start: usize) -> Result<(String, usize), String> { + let mut depth = 0; + let mut index = start; + let mut pattern = String::new(); + + while let Some(value) = chars.get(index) { + match value { + '(' => { + depth += 1; + if depth > 1 { + pattern.push(*value); + } + } + ')' => { + depth -= 1; + if depth == 0 { + return Ok((pattern, index + 1)); + } + pattern.push(*value); + } + _ => pattern.push(*value), + } + + index += 1; + } + + Err(String::from("unclosed parameter regex group")) +} + fn extract_flow_setting_as_string<'a>( flow: &'a ValidationFlow, flow_setting_id: &str, @@ -167,7 +337,7 @@ fn extract_flow_setting_as_string<'a>( #[cfg(test)] mod tests { - use super::matches_route_pattern; + use super::{compile_route_pattern, extract_named_route_captures, matches_route_pattern}; #[test] fn exact_literal_match_works() { @@ -188,8 +358,106 @@ mod tests { )); } + #[test] + fn dynamic_route_params_match_path_segments() { + assert!(matches_route_pattern( + "/project/users/:id", + "/project/users/42" + )); + assert!(!matches_route_pattern( + "/project/users/:id", + "/project/users/42/orders" + )); + } + + #[test] + fn dynamic_route_params_are_returned_as_path_params() { + let params = extract_named_route_captures( + "/project/books/:category/:id", + "/project/books/classics/12345", + ); + + assert_eq!(params.get("category").map(String::as_str), Some("classics")); + assert_eq!(params.get("id").map(String::as_str), Some("12345")); + } + + #[test] + fn dynamic_route_params_support_regex_constraints() { + assert!(matches_route_pattern( + "/project/users/:user_id(\\d+)", + "/project/users/123" + )); + assert!(!matches_route_pattern( + "/project/users/:user_id(\\d+)", + "/project/users/abc" + )); + + let params = + extract_named_route_captures("/project/users/:user_id(\\d+)", "/project/users/123"); + assert_eq!(params.get("user_id").map(String::as_str), Some("123")); + } + + #[test] + fn dynamic_route_wildcards_match_remaining_path() { + assert!(matches_route_pattern( + "/project/assets/*", + "/project/assets/images/profile.jpg" + )); + } + + #[test] + fn legacy_regex_patterns_still_work() { + assert_eq!( + compile_route_pattern("/project/users/(?P[^/]+)").unwrap(), + "^/project/users/(?P[^/]+)$" + ); + assert!(matches_route_pattern( + "/project/users/(?P[^/]+)", + "/project/users/42" + )); + assert!(matches_route_pattern( + "/project/assets/.*", + "/project/assets/images/profile.jpg" + )); + } + #[test] fn invalid_regex_returns_false() { assert!(!matches_route_pattern("(", "/test")); } + + #[test] + fn named_route_captures_are_returned_as_path_params() { + let params = extract_named_route_captures( + "/project/users/(?P[^/]+)/orders/(?P[^/]+)", + "/project/users/42/orders/abc", + ); + + assert_eq!(params.get("user_id").map(String::as_str), Some("42")); + assert_eq!(params.get("order_id").map(String::as_str), Some("abc")); + } + + #[test] + fn named_route_captures_are_percent_decoded() { + let params = extract_named_route_captures( + "/project/files/(?P[^/]+)", + "/project/files/report%202026.txt", + ); + + assert_eq!( + params.get("file_name").map(String::as_str), + Some("report 2026.txt") + ); + } + + #[test] + fn unnamed_route_captures_are_ignored() { + let params = extract_named_route_captures( + "/project/users/([^/]+)/orders/(?P[^/]+)", + "/project/users/42/orders/abc", + ); + + assert!(!params.contains_key("1")); + assert_eq!(params.get("order_id").map(String::as_str), Some("abc")); + } }