Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions crates/paimon/src/spec/core_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ const CHANGELOG_FILE_PREFIX_OPTION: &str = "changelog-file.prefix";
const CHANGELOG_FILE_FORMAT_OPTION: &str = "changelog-file.format";
const CHANGELOG_FILE_COMPRESSION_OPTION: &str = "changelog-file.compression";
const CHANGELOG_FILE_STATS_MODE_OPTION: &str = "changelog-file.stats-mode";
const MANIFEST_TARGET_FILE_SIZE_OPTION: &str = "manifest.target-file-size";
const MANIFEST_FULL_COMPACTION_THRESHOLD_SIZE_OPTION: &str =
"manifest.full-compaction-threshold-size";
const MANIFEST_MERGE_MIN_COUNT_OPTION: &str = "manifest.merge-min-count";
const ROW_TRACKING_ENABLED_OPTION: &str = "row-tracking.enabled";
const WRITE_PARQUET_BUFFER_SIZE_OPTION: &str = "write.parquet-buffer-size";
pub(crate) const SEQUENCE_FIELD_OPTION: &str = "sequence.field";
Expand All @@ -65,6 +69,9 @@ const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024;
const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__";
const DEFAULT_CHANGELOG_FILE_PREFIX: &str = "changelog-";
const DEFAULT_TARGET_FILE_SIZE: i64 = 256 * 1024 * 1024;
const DEFAULT_MANIFEST_TARGET_FILE_SIZE: i64 = 8 * 1024 * 1024;
const DEFAULT_MANIFEST_FULL_COMPACTION_THRESHOLD_SIZE: i64 = 16 * 1024 * 1024;
const DEFAULT_MANIFEST_MERGE_MIN_COUNT: usize = 30;
const DEFAULT_WRITE_PARQUET_BUFFER_SIZE: i64 = 256 * 1024 * 1024;
const DYNAMIC_BUCKET_TARGET_ROW_NUM_OPTION: &str = "dynamic-bucket.target-row-num";
const DEFAULT_DYNAMIC_BUCKET_TARGET_ROW_NUM: i64 = 200_000;
Expand Down Expand Up @@ -414,6 +421,37 @@ impl<'a> CoreOptions<'a> {
.unwrap_or_else(|| self.target_file_size())
}

/// Suggested file size of a manifest file.
///
/// Corresponds to Java `CoreOptions.MANIFEST_TARGET_FILE_SIZE`.
pub fn manifest_target_file_size(&self) -> i64 {
self.options
.get(MANIFEST_TARGET_FILE_SIZE_OPTION)
.and_then(|v| parse_memory_size(v))
.unwrap_or(DEFAULT_MANIFEST_TARGET_FILE_SIZE)
}

/// Size threshold for triggering full compaction of manifests.
///
/// Corresponds to Java `CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE`.
pub fn manifest_full_compaction_threshold_size(&self) -> i64 {
self.options
.get(MANIFEST_FULL_COMPACTION_THRESHOLD_SIZE_OPTION)
.and_then(|v| parse_memory_size(v))
.unwrap_or(DEFAULT_MANIFEST_FULL_COMPACTION_THRESHOLD_SIZE)
}

/// Minimum number of trailing manifest files to merge.
///
/// Corresponds to Java `CoreOptions.MANIFEST_MERGE_MIN_COUNT`.
pub fn manifest_merge_min_count(&self) -> usize {
self.options
.get(MANIFEST_MERGE_MIN_COUNT_OPTION)
.and_then(|v| v.parse::<usize>().ok())
.filter(|count| *count > 0)
.unwrap_or(DEFAULT_MANIFEST_MERGE_MIN_COUNT)
}

/// File format for data files (e.g. "parquet", "orc", "avro", "vortex").
/// Default is "parquet".
pub fn file_format(&self) -> &str {
Expand Down Expand Up @@ -619,6 +657,34 @@ mod tests {
assert_eq!(parse_memory_size("abc"), None);
}

#[test]
fn test_manifest_compaction_options_match_java_defaults_and_overrides() {
let options = HashMap::new();
let core = CoreOptions::new(&options);
assert_eq!(core.manifest_target_file_size(), 8 * 1024 * 1024);
assert_eq!(
core.manifest_full_compaction_threshold_size(),
16 * 1024 * 1024
);
assert_eq!(core.manifest_merge_min_count(), 30);

let options = HashMap::from([
(
MANIFEST_TARGET_FILE_SIZE_OPTION.to_string(),
"500B".to_string(),
),
(
MANIFEST_FULL_COMPACTION_THRESHOLD_SIZE_OPTION.to_string(),
"200B".to_string(),
),
(MANIFEST_MERGE_MIN_COUNT_OPTION.to_string(), "3".to_string()),
]);
let core = CoreOptions::new(&options);
assert_eq!(core.manifest_target_file_size(), 500);
assert_eq!(core.manifest_full_compaction_threshold_size(), 200);
assert_eq!(core.manifest_merge_min_count(), 3);
}

#[test]
fn test_partition_options_defaults() {
let options = HashMap::new();
Expand Down
66 changes: 33 additions & 33 deletions crates/paimon/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ impl Manifest {
}
}

/// Merge ADD/DELETE entries by file identifier, returning only the active ADD set.
/// Mirrors Java [FileEntry.mergeEntries](https://github.com/apache/paimon/blob/release-1.4/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java).
/// Merge ADD/DELETE entries by file identifier for scan-style visibility,
/// returning only the active ADD set. Manifest compaction must preserve
/// unmatched DELETE entries and uses a table-commit-specific merge path.
/// Return order is unspecified.
pub(crate) fn merge_active_entries(entries: Vec<ManifestEntry>) -> Vec<ManifestEntry> {
use std::collections::HashMap;
Expand All @@ -94,9 +95,39 @@ pub(crate) fn merge_active_entries(entries: Vec<ManifestEntry>) -> Vec<ManifestE
mod tests {
use super::*;
use crate::io::FileIO;
use crate::spec::data_file::DataFileMeta;
use crate::spec::manifest_common::FileKind;
use crate::spec::stats::BinaryTableStats;
use crate::spec::ManifestEntry;
use std::env::current_dir;

fn entry(kind: FileKind, file_name: &str, level: i32) -> ManifestEntry {
let stats = BinaryTableStats::empty();
let file = DataFileMeta {
file_name: file_name.to_string(),
file_size: 100,
row_count: 10,
min_key: vec![],
max_key: vec![],
key_stats: stats.clone(),
value_stats: stats,
min_sequence_number: 0,
max_sequence_number: 0,
schema_id: 0,
level,
extra_files: vec![],
creation_time: None,
delete_row_count: None,
embedded_index: None,
file_source: None,
value_stats_cols: None,
external_path: None,
first_row_id: None,
write_cols: None,
};
ManifestEntry::new(kind, vec![], 0, 1, file, 2)
}

#[tokio::test]
async fn test_read_manifest_from_file() {
let workdir = current_dir().unwrap();
Expand All @@ -120,37 +151,6 @@ mod tests {

#[test]
fn test_merge_active_entries_cancels_add_then_delete() {
use crate::spec::data_file::DataFileMeta;
use crate::spec::stats::BinaryTableStats;
use crate::spec::ManifestEntry;

fn entry(kind: FileKind, file_name: &str, level: i32) -> ManifestEntry {
let stats = BinaryTableStats::empty();
let file = DataFileMeta {
file_name: file_name.to_string(),
file_size: 100,
row_count: 10,
min_key: vec![],
max_key: vec![],
key_stats: stats.clone(),
value_stats: stats,
min_sequence_number: 0,
max_sequence_number: 0,
schema_id: 0,
level,
extra_files: vec![],
creation_time: None,
delete_row_count: None,
embedded_index: None,
file_source: None,
value_stats_cols: None,
external_path: None,
first_row_id: None,
write_cols: None,
};
ManifestEntry::new(kind, vec![], 0, 1, file, 2)
}

let cancelled = merge_active_entries(vec![
entry(FileKind::Add, "f.parquet", 0),
entry(FileKind::Delete, "f.parquet", 0),
Expand Down
Loading
Loading