Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion docs/configuration/storage-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Quickwit currently supports four types of storage providers:
## Storage URIs

Storage URIs refer to different storage providers identified by a URI "protocol" or "scheme". Quickwit supports the following storage URI protocols:
- `s3://` for Amazon S3 and S3-compatible
- `s3://` for Amazon S3 and S3-compatible (per-bucket overrides via `storage.s3.profiles`, see [Per-bucket S3 profiles](#per-bucket-s3-profiles))
- `azure://` for Azure Blob Storage
- `file://` for local file systems
- `gs://` for Google Cloud Storage
Expand Down Expand Up @@ -104,6 +104,44 @@ storage:
endpoint: https://storage.googleapis.com
```

#### Per-bucket S3 profiles

In addition to the primary `s3:` block, you can declare per-bucket overrides under `storage.s3.profiles`. The map key is the bucket name; when an `s3://<bucket>/...` URI is resolved, an exact match supplies that bucket's own endpoint, credentials, region, and flags. Any bucket not listed falls back to the fields on the primary `s3:` block. URIs stay canonical `s3://` — routing is by bucket name, so nothing extra is persisted in the index metadata.

Each profile accepts the same fields as the primary `s3:` block, *except* `profiles` itself (no recursion). If `access_key_id` / `secret_access_key` are omitted on a profile, the global AWS SDK credential chain is used (env vars, instance metadata, etc.).

Profiles are self-contained: the process-wide `QW_S3_ENDPOINT` and `QW_S3_FORCE_PATH_STYLE_ACCESS` overrides apply to the primary `s3:` backend only. A profile always uses its own `endpoint` and `force_path_style_access` values.

> Because routing keys on bucket name, a given bucket name maps to exactly one backend. If you need the *same* bucket name on two different endpoints, give the buckets distinct names.

```yaml
storage:
s3:
# Primary backend — used for any bucket not listed under `profiles`.
endpoint: https://s3.us-east-1.amazonaws.com
region: us-east-1
profiles:
# Buckets named `logs-bucket-eu` resolve to this endpoint.
logs-bucket-eu:
endpoint: https://s3.eu-west-3.amazonaws.com
region: eu-west-3
access_key_id: ${SECONDARY_S3_ACCESS_KEY_ID}
secret_access_key: ${SECONDARY_S3_SECRET_ACCESS_KEY}
# Buckets named `seaweed-logs` resolve here. Falls back to the global AWS
# SDK credentials when keys are omitted.
seaweed-logs:
endpoint: http://seaweedfs-s3:8333
region: us-east-1
force_path_style_access: true
```

An index simply points at the bucket by its canonical URI; the matching profile is applied automatically:

```yaml
index_id: logs-eu
index_uri: s3://logs-bucket-eu/logs-eu
```

### Azure storage configuration

| Property | Description | Default value |
Expand Down
284 changes: 272 additions & 12 deletions quickwit/quickwit-config/src/storage_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::ops::Deref;
use std::sync::OnceLock;
use std::{env, fmt};

use anyhow::ensure;
Expand Down Expand Up @@ -362,6 +361,119 @@ pub struct S3StorageConfig {
pub disable_stalled_stream_protection_upload: bool,
#[serde(default)]
pub disable_stalled_stream_protection_download: bool,
/// Per-bucket S3-compatible backend overrides, keyed by bucket name. When an
/// `s3://<bucket>/...` URI is resolved, an exact match here supplies that
/// bucket's own endpoint, credentials, region, and flags; any bucket not
/// listed falls back to the fields on this (primary) backend.
#[serde(default)]
#[serde(skip_serializing_if = "std::collections::BTreeMap::is_empty")]
pub profiles: std::collections::BTreeMap<String, S3ProfileConfig>,
/// Set when this config is the projection of a per-bucket profile. Profiles
/// are self-contained, so the process-wide `QW_S3_ENDPOINT` /
/// `QW_S3_FORCE_PATH_STYLE_ACCESS` overrides apply to the primary backend
/// only. Not serialized; defaults to `false` (the primary backend).
#[serde(skip)]
pub is_profile: bool,
}

/// Per-bucket S3-compatible backend override nested under
/// `storage.s3.profiles.<bucket>`. Mirrors `S3StorageConfig` but cannot itself
/// nest further profiles (no recursion).
#[derive(Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct S3ProfileConfig {
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub flavor: Option<StorageBackendFlavor>,
#[serde(default)]
pub access_key_id: Option<String>,
#[serde(default)]
pub secret_access_key: Option<String>,
#[serde(default)]
pub region: Option<String>,
#[serde(default)]
pub endpoint: Option<String>,
#[serde(default)]
pub force_path_style_access: bool,
#[serde(alias = "disable_multi_object_delete_requests")]
#[serde(default)]
pub disable_multi_object_delete: bool,
#[serde(default)]
pub disable_multipart_upload: bool,
#[serde(default)]
pub checksum_algorithm: ChecksumAlgorithm,
/// Deprecated: applies into `checksum_algorithm: disabled`.
#[serde(default, skip_serializing)]
pub disable_checksums: bool,
#[serde(default)]
pub disable_stalled_stream_protection_upload: bool,
#[serde(default)]
pub disable_stalled_stream_protection_download: bool,
}

impl S3ProfileConfig {
/// Project this profile back into a full `S3StorageConfig`
/// (with an empty `profiles` map) so it can flow through the existing
/// S3 client construction code unchanged.
pub fn as_s3_config(&self) -> S3StorageConfig {
let mut s3_config = S3StorageConfig {
flavor: self.flavor,
access_key_id: self.access_key_id.clone(),
secret_access_key: self.secret_access_key.clone(),
region: self.region.clone(),
endpoint: self.endpoint.clone(),
force_path_style_access: self.force_path_style_access,
disable_multi_object_delete: self.disable_multi_object_delete,
disable_multipart_upload: self.disable_multipart_upload,
checksum_algorithm: self.checksum_algorithm,
disable_checksums: self.disable_checksums,
disable_stalled_stream_protection_upload: self.disable_stalled_stream_protection_upload,
disable_stalled_stream_protection_download: self
.disable_stalled_stream_protection_download,
profiles: Default::default(),
is_profile: true,
};
// Expand `flavor` shortcuts (region/path-style/checksum defaults) the
// same way the primary backend does at config load time.
s3_config.apply_flavor();
s3_config
}

pub fn redact(&mut self) {
if let Some(secret_access_key) = self.secret_access_key.as_mut() {
*secret_access_key = "***redacted***".to_string();
}
}
}

impl fmt::Debug for S3ProfileConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("S3ProfileConfig")
.field("flavor", &self.flavor)
.field("access_key_id", &self.access_key_id)
.field(
"secret_access_key",
&self.secret_access_key.as_ref().map(|_| "***redacted***"),
)
.field("region", &self.region)
.field("endpoint", &self.endpoint)
.field("force_path_style_access", &self.force_path_style_access)
.field(
"disable_multi_object_delete",
&self.disable_multi_object_delete,
)
.field("disable_multipart_upload", &self.disable_multipart_upload)
.field("checksum_algorithm", &self.checksum_algorithm)
.field(
"disable_stalled_stream_protection_upload",
&self.disable_stalled_stream_protection_upload,
)
.field(
"disable_stalled_stream_protection_download",
&self.disable_stalled_stream_protection_download,
)
.finish()
}
}

impl S3StorageConfig {
Expand Down Expand Up @@ -397,23 +509,32 @@ impl S3StorageConfig {
if let Some(secret_access_key) = self.secret_access_key.as_mut() {
*secret_access_key = "***redacted***".to_string();
}
for profile in self.profiles.values_mut() {
profile.redact();
}
}

pub fn endpoint(&self) -> Option<String> {
env::var("QW_S3_ENDPOINT")
.ok()
.or_else(|| self.endpoint.clone())
// `QW_S3_ENDPOINT` overrides the primary backend only; per-bucket
// profiles are self-contained and use their own configured endpoint.
if !self.is_profile
&& let Ok(endpoint) = env::var("QW_S3_ENDPOINT")
{
return Some(endpoint);
}
self.endpoint.clone()
}

pub fn force_path_style_access(&self) -> Option<bool> {
static FORCE_PATH_STYLE: OnceLock<Option<bool>> = OnceLock::new();
*FORCE_PATH_STYLE.get_or_init(|| {
let force_path_style_access = get_bool_from_env(
"QW_S3_FORCE_PATH_STYLE_ACCESS",
self.force_path_style_access,
);
Some(force_path_style_access)
})
// `QW_S3_FORCE_PATH_STYLE_ACCESS` overrides the primary backend only.
// No process-wide cache: each backend must honor its own setting.
if self.is_profile {
return Some(self.force_path_style_access);
}
Some(get_bool_from_env(
"QW_S3_FORCE_PATH_STYLE_ACCESS",
self.force_path_style_access,
))
}
}

Expand Down Expand Up @@ -442,6 +563,7 @@ impl fmt::Debug for S3StorageConfig {
"disable_stalled_stream_protection_download",
&self.disable_stalled_stream_protection_download,
)
.field("profiles", &self.profiles)
.finish()
}
}
Expand Down Expand Up @@ -740,4 +862,142 @@ mod tests {
assert_eq!(s3_storage_config.flavor, Some(StorageBackendFlavor::MinIO));
}
}

#[test]
fn test_storage_s3_profiles_serde() {
let s3_storage_config_yaml = r#"
endpoint: https://primary.example.com
region: us-east-1
profiles:
logs-bucket-eu:
endpoint: https://alt.example.com
region: eu-west-3
force_path_style_access: true
access_key_id: alt-key
secret_access_key: alt-secret
seaweed-logs:
endpoint: http://seaweedfs-s3:8333
region: us-east-1
force_path_style_access: true
"#;
let s3_storage_config: S3StorageConfig =
serde_yaml::from_str(s3_storage_config_yaml).unwrap();
assert_eq!(s3_storage_config.profiles.len(), 2);

let eu = s3_storage_config.profiles.get("logs-bucket-eu").unwrap();
assert_eq!(eu.region.as_deref(), Some("eu-west-3"));
assert_eq!(eu.access_key_id.as_deref(), Some("alt-key"));
assert!(eu.force_path_style_access);

// `as_s3_config` projects a profile back into a full S3StorageConfig
// (with an empty `profiles` map) so it can drive the S3 client builder
// unchanged.
let projected = eu.as_s3_config();
assert_eq!(projected.region.as_deref(), Some("eu-west-3"));
assert_eq!(projected.access_key_id.as_deref(), Some("alt-key"));
assert!(projected.force_path_style_access);
assert!(projected.profiles.is_empty());
}

#[test]
fn test_storage_s3_profiles_bucket_name_keys() {
// The map key is a bucket name, so dotted bucket names are accepted as-is
// — no URI-scheme syntax is imposed on the key.
let s3_storage_config_yaml = r#"
profiles:
my.dotted.bucket:
endpoint: https://logs.example.com
"#;
let s3_storage_config: S3StorageConfig =
serde_yaml::from_str(s3_storage_config_yaml).unwrap();
assert!(s3_storage_config.profiles.contains_key("my.dotted.bucket"));
let storage_configs = StorageConfigs::new(vec![s3_storage_config.into()]);
storage_configs.validate().unwrap();
}

#[test]
fn test_storage_s3_profiles_redact() {
let mut profile = S3ProfileConfig {
access_key_id: Some("public-key".to_string()),
secret_access_key: Some("super-secret".to_string()),
..Default::default()
};
profile.redact();
assert_eq!(profile.access_key_id.as_deref(), Some("public-key"));
assert_eq!(profile.secret_access_key.as_deref(), Some("***redacted***"));
}

#[test]
fn test_storage_s3_profiles_field_parity() {
// Profiles accept the same fields as the primary S3 block, including the
// legacy `disable_multi_object_delete_requests` alias and the
// stalled-stream toggles, and project them through `as_s3_config`.
let s3_storage_config_yaml = r#"
profiles:
logs-bucket:
endpoint: https://alt.example.com
disable_multi_object_delete_requests: true
disable_stalled_stream_protection_upload: true
disable_stalled_stream_protection_download: true
checksum_algorithm: disabled
"#;
let s3_storage_config: S3StorageConfig =
serde_yaml::from_str(s3_storage_config_yaml).unwrap();
let profile = s3_storage_config.profiles.get("logs-bucket").unwrap();
assert!(profile.disable_multi_object_delete);
assert!(profile.disable_stalled_stream_protection_upload);
assert!(profile.disable_stalled_stream_protection_download);

let projected = profile.as_s3_config();
assert!(projected.is_profile);
assert!(projected.disable_multi_object_delete);
assert!(projected.disable_stalled_stream_protection_upload);
assert!(projected.disable_stalled_stream_protection_download);
assert_eq!(projected.checksum_algorithm, ChecksumAlgorithm::Disabled);

// A genuinely unknown field is still rejected.
let invalid_yaml = r#"
profiles:
logs-bucket:
bogus_field: true
"#;
assert!(serde_yaml::from_str::<S3StorageConfig>(invalid_yaml).is_err());
}

#[test]
fn test_storage_s3_profile_applies_flavor() {
// `flavor` shortcuts expand for profiles just like the primary backend.
let s3_storage_config_yaml = r#"
profiles:
minio-bucket:
flavor: minio
endpoint: http://minio.example.com:9000
"#;
let s3_storage_config: S3StorageConfig =
serde_yaml::from_str(s3_storage_config_yaml).unwrap();
let projected = s3_storage_config
.profiles
.get("minio-bucket")
.unwrap()
.as_s3_config();
assert_eq!(projected.region.as_deref(), Some("minio"));
assert!(projected.force_path_style_access);
}

#[test]
fn test_storage_s3_profile_uses_own_endpoint() {
// A profile is self-contained: `endpoint()` returns its configured
// endpoint regardless of the process-wide `QW_S3_ENDPOINT` override,
// which applies to the primary backend only.
let profile = S3ProfileConfig {
endpoint: Some("https://profile.example.com".to_string()),
..Default::default()
};
let projected = profile.as_s3_config();
assert!(projected.is_profile);
assert_eq!(
projected.endpoint(),
Some("https://profile.example.com".to_string())
);
}
}
Loading
Loading