diff --git a/docs/configuration/storage-config.md b/docs/configuration/storage-config.md index 5c5d24af416..4a9c163fd28 100644 --- a/docs/configuration/storage-config.md +++ b/docs/configuration/storage-config.md @@ -14,7 +14,7 @@ Quickwit currently supports four types of storage providers: ## Storage URIs Storage URIs refer to different storage providers identified by a URI "protocol" or "scheme". Quickwit supports the following storage URI protocols: -- `s3://` for Amazon S3 and S3-compatible +- `s3://` for Amazon S3 and S3-compatible (per-bucket overrides via `storage.s3.profiles`, see [Per-bucket S3 profiles](#per-bucket-s3-profiles)) - `azure://` for Azure Blob Storage - `file://` for local file systems - `gs://` for Google Cloud Storage @@ -104,6 +104,44 @@ storage: endpoint: https://storage.googleapis.com ``` +#### Per-bucket S3 profiles + +In addition to the primary `s3:` block, you can declare per-bucket overrides under `storage.s3.profiles`. The map key is the bucket name; when an `s3:///...` URI is resolved, an exact match supplies that bucket's own endpoint, credentials, region, and flags. Any bucket not listed falls back to the fields on the primary `s3:` block. URIs stay canonical `s3://` — routing is by bucket name, so nothing extra is persisted in the index metadata. + +Each profile accepts the same fields as the primary `s3:` block, *except* `profiles` itself (no recursion). If `access_key_id` / `secret_access_key` are omitted on a profile, the global AWS SDK credential chain is used (env vars, instance metadata, etc.). + +Profiles are self-contained: the process-wide `QW_S3_ENDPOINT` and `QW_S3_FORCE_PATH_STYLE_ACCESS` overrides apply to the primary `s3:` backend only. A profile always uses its own `endpoint` and `force_path_style_access` values. + +> Because routing keys on bucket name, a given bucket name maps to exactly one backend. If you need the *same* bucket name on two different endpoints, give the buckets distinct names. + +```yaml +storage: + s3: + # Primary backend — used for any bucket not listed under `profiles`. + endpoint: https://s3.us-east-1.amazonaws.com + region: us-east-1 + profiles: + # Buckets named `logs-bucket-eu` resolve to this endpoint. + logs-bucket-eu: + endpoint: https://s3.eu-west-3.amazonaws.com + region: eu-west-3 + access_key_id: ${SECONDARY_S3_ACCESS_KEY_ID} + secret_access_key: ${SECONDARY_S3_SECRET_ACCESS_KEY} + # Buckets named `seaweed-logs` resolve here. Falls back to the global AWS + # SDK credentials when keys are omitted. + seaweed-logs: + endpoint: http://seaweedfs-s3:8333 + region: us-east-1 + force_path_style_access: true +``` + +An index simply points at the bucket by its canonical URI; the matching profile is applied automatically: + +```yaml +index_id: logs-eu +index_uri: s3://logs-bucket-eu/logs-eu +``` + ### Azure storage configuration | Property | Description | Default value | diff --git a/quickwit/quickwit-config/src/storage_config.rs b/quickwit/quickwit-config/src/storage_config.rs index d04cd93aaa0..b4526b0c0d5 100644 --- a/quickwit/quickwit-config/src/storage_config.rs +++ b/quickwit/quickwit-config/src/storage_config.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::ops::Deref; -use std::sync::OnceLock; use std::{env, fmt}; use anyhow::ensure; @@ -362,6 +361,119 @@ pub struct S3StorageConfig { pub disable_stalled_stream_protection_upload: bool, #[serde(default)] pub disable_stalled_stream_protection_download: bool, + /// Per-bucket S3-compatible backend overrides, keyed by bucket name. When an + /// `s3:///...` URI is resolved, an exact match here supplies that + /// bucket's own endpoint, credentials, region, and flags; any bucket not + /// listed falls back to the fields on this (primary) backend. + #[serde(default)] + #[serde(skip_serializing_if = "std::collections::BTreeMap::is_empty")] + pub profiles: std::collections::BTreeMap, + /// Set when this config is the projection of a per-bucket profile. Profiles + /// are self-contained, so the process-wide `QW_S3_ENDPOINT` / + /// `QW_S3_FORCE_PATH_STYLE_ACCESS` overrides apply to the primary backend + /// only. Not serialized; defaults to `false` (the primary backend). + #[serde(skip)] + pub is_profile: bool, +} + +/// Per-bucket S3-compatible backend override nested under +/// `storage.s3.profiles.`. Mirrors `S3StorageConfig` but cannot itself +/// nest further profiles (no recursion). +#[derive(Clone, Default, Eq, PartialEq, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct S3ProfileConfig { + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub flavor: Option, + #[serde(default)] + pub access_key_id: Option, + #[serde(default)] + pub secret_access_key: Option, + #[serde(default)] + pub region: Option, + #[serde(default)] + pub endpoint: Option, + #[serde(default)] + pub force_path_style_access: bool, + #[serde(alias = "disable_multi_object_delete_requests")] + #[serde(default)] + pub disable_multi_object_delete: bool, + #[serde(default)] + pub disable_multipart_upload: bool, + #[serde(default)] + pub checksum_algorithm: ChecksumAlgorithm, + /// Deprecated: applies into `checksum_algorithm: disabled`. + #[serde(default, skip_serializing)] + pub disable_checksums: bool, + #[serde(default)] + pub disable_stalled_stream_protection_upload: bool, + #[serde(default)] + pub disable_stalled_stream_protection_download: bool, +} + +impl S3ProfileConfig { + /// Project this profile back into a full `S3StorageConfig` + /// (with an empty `profiles` map) so it can flow through the existing + /// S3 client construction code unchanged. + pub fn as_s3_config(&self) -> S3StorageConfig { + let mut s3_config = S3StorageConfig { + flavor: self.flavor, + access_key_id: self.access_key_id.clone(), + secret_access_key: self.secret_access_key.clone(), + region: self.region.clone(), + endpoint: self.endpoint.clone(), + force_path_style_access: self.force_path_style_access, + disable_multi_object_delete: self.disable_multi_object_delete, + disable_multipart_upload: self.disable_multipart_upload, + checksum_algorithm: self.checksum_algorithm, + disable_checksums: self.disable_checksums, + disable_stalled_stream_protection_upload: self.disable_stalled_stream_protection_upload, + disable_stalled_stream_protection_download: self + .disable_stalled_stream_protection_download, + profiles: Default::default(), + is_profile: true, + }; + // Expand `flavor` shortcuts (region/path-style/checksum defaults) the + // same way the primary backend does at config load time. + s3_config.apply_flavor(); + s3_config + } + + pub fn redact(&mut self) { + if let Some(secret_access_key) = self.secret_access_key.as_mut() { + *secret_access_key = "***redacted***".to_string(); + } + } +} + +impl fmt::Debug for S3ProfileConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("S3ProfileConfig") + .field("flavor", &self.flavor) + .field("access_key_id", &self.access_key_id) + .field( + "secret_access_key", + &self.secret_access_key.as_ref().map(|_| "***redacted***"), + ) + .field("region", &self.region) + .field("endpoint", &self.endpoint) + .field("force_path_style_access", &self.force_path_style_access) + .field( + "disable_multi_object_delete", + &self.disable_multi_object_delete, + ) + .field("disable_multipart_upload", &self.disable_multipart_upload) + .field("checksum_algorithm", &self.checksum_algorithm) + .field( + "disable_stalled_stream_protection_upload", + &self.disable_stalled_stream_protection_upload, + ) + .field( + "disable_stalled_stream_protection_download", + &self.disable_stalled_stream_protection_download, + ) + .finish() + } } impl S3StorageConfig { @@ -397,23 +509,32 @@ impl S3StorageConfig { if let Some(secret_access_key) = self.secret_access_key.as_mut() { *secret_access_key = "***redacted***".to_string(); } + for profile in self.profiles.values_mut() { + profile.redact(); + } } pub fn endpoint(&self) -> Option { - env::var("QW_S3_ENDPOINT") - .ok() - .or_else(|| self.endpoint.clone()) + // `QW_S3_ENDPOINT` overrides the primary backend only; per-bucket + // profiles are self-contained and use their own configured endpoint. + if !self.is_profile + && let Ok(endpoint) = env::var("QW_S3_ENDPOINT") + { + return Some(endpoint); + } + self.endpoint.clone() } pub fn force_path_style_access(&self) -> Option { - static FORCE_PATH_STYLE: OnceLock> = OnceLock::new(); - *FORCE_PATH_STYLE.get_or_init(|| { - let force_path_style_access = get_bool_from_env( - "QW_S3_FORCE_PATH_STYLE_ACCESS", - self.force_path_style_access, - ); - Some(force_path_style_access) - }) + // `QW_S3_FORCE_PATH_STYLE_ACCESS` overrides the primary backend only. + // No process-wide cache: each backend must honor its own setting. + if self.is_profile { + return Some(self.force_path_style_access); + } + Some(get_bool_from_env( + "QW_S3_FORCE_PATH_STYLE_ACCESS", + self.force_path_style_access, + )) } } @@ -442,6 +563,7 @@ impl fmt::Debug for S3StorageConfig { "disable_stalled_stream_protection_download", &self.disable_stalled_stream_protection_download, ) + .field("profiles", &self.profiles) .finish() } } @@ -740,4 +862,142 @@ mod tests { assert_eq!(s3_storage_config.flavor, Some(StorageBackendFlavor::MinIO)); } } + + #[test] + fn test_storage_s3_profiles_serde() { + let s3_storage_config_yaml = r#" + endpoint: https://primary.example.com + region: us-east-1 + profiles: + logs-bucket-eu: + endpoint: https://alt.example.com + region: eu-west-3 + force_path_style_access: true + access_key_id: alt-key + secret_access_key: alt-secret + seaweed-logs: + endpoint: http://seaweedfs-s3:8333 + region: us-east-1 + force_path_style_access: true + "#; + let s3_storage_config: S3StorageConfig = + serde_yaml::from_str(s3_storage_config_yaml).unwrap(); + assert_eq!(s3_storage_config.profiles.len(), 2); + + let eu = s3_storage_config.profiles.get("logs-bucket-eu").unwrap(); + assert_eq!(eu.region.as_deref(), Some("eu-west-3")); + assert_eq!(eu.access_key_id.as_deref(), Some("alt-key")); + assert!(eu.force_path_style_access); + + // `as_s3_config` projects a profile back into a full S3StorageConfig + // (with an empty `profiles` map) so it can drive the S3 client builder + // unchanged. + let projected = eu.as_s3_config(); + assert_eq!(projected.region.as_deref(), Some("eu-west-3")); + assert_eq!(projected.access_key_id.as_deref(), Some("alt-key")); + assert!(projected.force_path_style_access); + assert!(projected.profiles.is_empty()); + } + + #[test] + fn test_storage_s3_profiles_bucket_name_keys() { + // The map key is a bucket name, so dotted bucket names are accepted as-is + // — no URI-scheme syntax is imposed on the key. + let s3_storage_config_yaml = r#" + profiles: + my.dotted.bucket: + endpoint: https://logs.example.com + "#; + let s3_storage_config: S3StorageConfig = + serde_yaml::from_str(s3_storage_config_yaml).unwrap(); + assert!(s3_storage_config.profiles.contains_key("my.dotted.bucket")); + let storage_configs = StorageConfigs::new(vec![s3_storage_config.into()]); + storage_configs.validate().unwrap(); + } + + #[test] + fn test_storage_s3_profiles_redact() { + let mut profile = S3ProfileConfig { + access_key_id: Some("public-key".to_string()), + secret_access_key: Some("super-secret".to_string()), + ..Default::default() + }; + profile.redact(); + assert_eq!(profile.access_key_id.as_deref(), Some("public-key")); + assert_eq!(profile.secret_access_key.as_deref(), Some("***redacted***")); + } + + #[test] + fn test_storage_s3_profiles_field_parity() { + // Profiles accept the same fields as the primary S3 block, including the + // legacy `disable_multi_object_delete_requests` alias and the + // stalled-stream toggles, and project them through `as_s3_config`. + let s3_storage_config_yaml = r#" + profiles: + logs-bucket: + endpoint: https://alt.example.com + disable_multi_object_delete_requests: true + disable_stalled_stream_protection_upload: true + disable_stalled_stream_protection_download: true + checksum_algorithm: disabled + "#; + let s3_storage_config: S3StorageConfig = + serde_yaml::from_str(s3_storage_config_yaml).unwrap(); + let profile = s3_storage_config.profiles.get("logs-bucket").unwrap(); + assert!(profile.disable_multi_object_delete); + assert!(profile.disable_stalled_stream_protection_upload); + assert!(profile.disable_stalled_stream_protection_download); + + let projected = profile.as_s3_config(); + assert!(projected.is_profile); + assert!(projected.disable_multi_object_delete); + assert!(projected.disable_stalled_stream_protection_upload); + assert!(projected.disable_stalled_stream_protection_download); + assert_eq!(projected.checksum_algorithm, ChecksumAlgorithm::Disabled); + + // A genuinely unknown field is still rejected. + let invalid_yaml = r#" + profiles: + logs-bucket: + bogus_field: true + "#; + assert!(serde_yaml::from_str::(invalid_yaml).is_err()); + } + + #[test] + fn test_storage_s3_profile_applies_flavor() { + // `flavor` shortcuts expand for profiles just like the primary backend. + let s3_storage_config_yaml = r#" + profiles: + minio-bucket: + flavor: minio + endpoint: http://minio.example.com:9000 + "#; + let s3_storage_config: S3StorageConfig = + serde_yaml::from_str(s3_storage_config_yaml).unwrap(); + let projected = s3_storage_config + .profiles + .get("minio-bucket") + .unwrap() + .as_s3_config(); + assert_eq!(projected.region.as_deref(), Some("minio")); + assert!(projected.force_path_style_access); + } + + #[test] + fn test_storage_s3_profile_uses_own_endpoint() { + // A profile is self-contained: `endpoint()` returns its configured + // endpoint regardless of the process-wide `QW_S3_ENDPOINT` override, + // which applies to the primary backend only. + let profile = S3ProfileConfig { + endpoint: Some("https://profile.example.com".to_string()), + ..Default::default() + }; + let projected = profile.as_s3_config(); + assert!(projected.is_profile); + assert_eq!( + projected.endpoint(), + Some("https://profile.example.com".to_string()) + ); + } } diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs index 92b4406486b..50b70ac03d4 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; use async_trait::async_trait; use aws_sdk_s3::Client as S3Client; @@ -20,7 +21,7 @@ use quickwit_common::uri::Uri; use quickwit_config::{S3StorageConfig, StorageBackend}; use tokio::sync::OnceCell; -use super::s3_compatible_storage::create_s3_client; +use super::s3_compatible_storage::{create_s3_client, parse_s3_uri}; use crate::{ DebouncedStorage, S3CompatibleObjectStorage, Storage, StorageFactory, StorageResolverError, }; @@ -34,6 +35,9 @@ pub struct S3CompatibleObjectStorageFactory { // end up being used, or if something like azure, gcs, or even local files, will be used // instead. s3_client: OnceCell, + // Per-bucket client cell for buckets matched by a `storage.s3.profiles.` entry. + // The mutex is held only to fetch/insert the cell, never across the client build. + profile_s3_clients: Mutex>>>, } impl S3CompatibleObjectStorageFactory { @@ -42,6 +46,7 @@ impl S3CompatibleObjectStorageFactory { Self { storage_config, s3_client: OnceCell::new(), + profile_s3_clients: Mutex::new(HashMap::new()), } } } @@ -53,6 +58,28 @@ impl StorageFactory for S3CompatibleObjectStorageFactory { } async fn resolve(&self, uri: &Uri) -> Result, StorageResolverError> { + // A `storage.s3.profiles.` entry, if present, supplies that bucket's + // own endpoint/credentials/region; any unlisted bucket uses the primary backend. + if let Some((bucket, _prefix)) = parse_s3_uri(uri) + && let Some(profile) = self.storage_config.profiles.get(&bucket) + { + let profile_config = profile.as_s3_config(); + let client_cell = { + let mut clients = self + .profile_s3_clients + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + Arc::clone(clients.entry(bucket).or_default()) + }; + let client = client_cell + .get_or_init(|| create_s3_client(&profile_config)) + .await + .clone(); + let storage = + S3CompatibleObjectStorage::from_uri_and_client(&profile_config, uri, client) + .await?; + return Ok(Arc::new(DebouncedStorage::new(storage))); + } let s3_client = self .s3_client .get_or_init(|| create_s3_client(&self.storage_config))