From eaf917753da13d2ca6cf0dda027616a6eb4993e1 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Fri, 19 Jun 2026 23:18:58 +0800 Subject: [PATCH 1/5] feat: support manifest compaction in table commits --- crates/paimon/src/spec/core_options.rs | 66 +++ crates/paimon/src/spec/manifest.rs | 5 +- crates/paimon/src/table/table_commit.rs | 582 +++++++++++++++++++++++- 3 files changed, 650 insertions(+), 3 deletions(-) diff --git a/crates/paimon/src/spec/core_options.rs b/crates/paimon/src/spec/core_options.rs index a25fa5ca..e26319cc 100644 --- a/crates/paimon/src/spec/core_options.rs +++ b/crates/paimon/src/spec/core_options.rs @@ -45,6 +45,10 @@ const CHANGELOG_FILE_PREFIX_OPTION: &str = "changelog-file.prefix"; const CHANGELOG_FILE_FORMAT_OPTION: &str = "changelog-file.format"; const CHANGELOG_FILE_COMPRESSION_OPTION: &str = "changelog-file.compression"; const CHANGELOG_FILE_STATS_MODE_OPTION: &str = "changelog-file.stats-mode"; +const MANIFEST_TARGET_FILE_SIZE_OPTION: &str = "manifest.target-file-size"; +const MANIFEST_FULL_COMPACTION_THRESHOLD_SIZE_OPTION: &str = + "manifest.full-compaction-threshold-size"; +const MANIFEST_MERGE_MIN_COUNT_OPTION: &str = "manifest.merge-min-count"; const ROW_TRACKING_ENABLED_OPTION: &str = "row-tracking.enabled"; const WRITE_PARQUET_BUFFER_SIZE_OPTION: &str = "write.parquet-buffer-size"; pub(crate) const SEQUENCE_FIELD_OPTION: &str = "sequence.field"; @@ -65,6 +69,9 @@ const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024; const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__"; const DEFAULT_CHANGELOG_FILE_PREFIX: &str = "changelog-"; const DEFAULT_TARGET_FILE_SIZE: i64 = 256 * 1024 * 1024; +const DEFAULT_MANIFEST_TARGET_FILE_SIZE: i64 = 8 * 1024 * 1024; +const DEFAULT_MANIFEST_FULL_COMPACTION_THRESHOLD_SIZE: i64 = 16 * 1024 * 1024; +const DEFAULT_MANIFEST_MERGE_MIN_COUNT: usize = 30; const DEFAULT_WRITE_PARQUET_BUFFER_SIZE: i64 = 256 * 1024 * 1024; const DYNAMIC_BUCKET_TARGET_ROW_NUM_OPTION: &str = "dynamic-bucket.target-row-num"; const DEFAULT_DYNAMIC_BUCKET_TARGET_ROW_NUM: i64 = 200_000; @@ -414,6 +421,37 @@ impl<'a> CoreOptions<'a> { .unwrap_or_else(|| self.target_file_size()) } + /// Suggested file size of a manifest file. + /// + /// Corresponds to Java `CoreOptions.MANIFEST_TARGET_FILE_SIZE`. + pub fn manifest_target_file_size(&self) -> i64 { + self.options + .get(MANIFEST_TARGET_FILE_SIZE_OPTION) + .and_then(|v| parse_memory_size(v)) + .unwrap_or(DEFAULT_MANIFEST_TARGET_FILE_SIZE) + } + + /// Size threshold for triggering full compaction of manifests. + /// + /// Corresponds to Java `CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE`. + pub fn manifest_full_compaction_threshold_size(&self) -> i64 { + self.options + .get(MANIFEST_FULL_COMPACTION_THRESHOLD_SIZE_OPTION) + .and_then(|v| parse_memory_size(v)) + .unwrap_or(DEFAULT_MANIFEST_FULL_COMPACTION_THRESHOLD_SIZE) + } + + /// Minimum number of trailing manifest files to merge. + /// + /// Corresponds to Java `CoreOptions.MANIFEST_MERGE_MIN_COUNT`. + pub fn manifest_merge_min_count(&self) -> usize { + self.options + .get(MANIFEST_MERGE_MIN_COUNT_OPTION) + .and_then(|v| v.parse::().ok()) + .filter(|count| *count > 0) + .unwrap_or(DEFAULT_MANIFEST_MERGE_MIN_COUNT) + } + /// File format for data files (e.g. "parquet", "orc", "avro", "vortex"). /// Default is "parquet". pub fn file_format(&self) -> &str { @@ -619,6 +657,34 @@ mod tests { assert_eq!(parse_memory_size("abc"), None); } + #[test] + fn test_manifest_compaction_options_match_java_defaults_and_overrides() { + let options = HashMap::new(); + let core = CoreOptions::new(&options); + assert_eq!(core.manifest_target_file_size(), 8 * 1024 * 1024); + assert_eq!( + core.manifest_full_compaction_threshold_size(), + 16 * 1024 * 1024 + ); + assert_eq!(core.manifest_merge_min_count(), 30); + + let options = HashMap::from([ + ( + MANIFEST_TARGET_FILE_SIZE_OPTION.to_string(), + "500B".to_string(), + ), + ( + MANIFEST_FULL_COMPACTION_THRESHOLD_SIZE_OPTION.to_string(), + "200B".to_string(), + ), + (MANIFEST_MERGE_MIN_COUNT_OPTION.to_string(), "3".to_string()), + ]); + let core = CoreOptions::new(&options); + assert_eq!(core.manifest_target_file_size(), 500); + assert_eq!(core.manifest_full_compaction_threshold_size(), 200); + assert_eq!(core.manifest_merge_min_count(), 3); + } + #[test] fn test_partition_options_defaults() { let options = HashMap::new(); diff --git a/crates/paimon/src/spec/manifest.rs b/crates/paimon/src/spec/manifest.rs index bebd09e7..dfbc0a6c 100644 --- a/crates/paimon/src/spec/manifest.rs +++ b/crates/paimon/src/spec/manifest.rs @@ -68,8 +68,9 @@ impl Manifest { } } -/// Merge ADD/DELETE entries by file identifier, returning only the active ADD set. -/// Mirrors Java [FileEntry.mergeEntries](https://github.com/apache/paimon/blob/release-1.4/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java). +/// Merge ADD/DELETE entries by file identifier for scan-style visibility, +/// returning only the active ADD set. Manifest compaction must preserve +/// unmatched DELETE entries and uses a table-commit-specific merge path. /// Return order is unspecified. pub(crate) fn merge_active_entries(entries: Vec) -> Vec { use std::collections::HashMap; diff --git a/crates/paimon/src/table/table_commit.rs b/crates/paimon/src/table/table_commit.rs index 938a0239..707aefb7 100644 --- a/crates/paimon/src/table/table_commit.rs +++ b/crates/paimon/src/table/table_commit.rs @@ -56,6 +56,9 @@ pub struct TableCommit { commit_max_retry_wait_ms: u64, row_tracking_enabled: bool, partition_default_name: String, + manifest_target_file_size: i64, + manifest_full_compaction_threshold_size: i64, + manifest_merge_min_count: usize, } impl TableCommit { @@ -76,6 +79,10 @@ impl TableCommit { let commit_max_retry_wait_ms = core_options.commit_max_retry_wait_ms(); let row_tracking_enabled = core_options.row_tracking_enabled(); let partition_default_name = core_options.partition_default_name().to_string(); + let manifest_target_file_size = core_options.manifest_target_file_size(); + let manifest_full_compaction_threshold_size = + core_options.manifest_full_compaction_threshold_size(); + let manifest_merge_min_count = core_options.manifest_merge_min_count(); Self { table, snapshot_manager, @@ -88,6 +95,9 @@ impl TableCommit { commit_max_retry_wait_ms, row_tracking_enabled, partition_default_name, + manifest_target_file_size, + manifest_full_compaction_threshold_size, + manifest_merge_min_count, } } @@ -505,6 +515,9 @@ impl TableCommit { } else { vec![] }; + let existing_manifest_files = self + .compact_manifest_files_if_needed(file_io, &manifest_dir, existing_manifest_files) + .await?; ManifestList::write(file_io, &base_manifest_list_path, &existing_manifest_files).await?; @@ -541,6 +554,263 @@ impl TableCommit { self.snapshot_commit.commit(&snapshot, &statistics).await } + async fn compact_manifest_files_if_needed( + &self, + file_io: &FileIO, + manifest_dir: &str, + manifest_files: Vec, + ) -> Result> { + if manifest_files.len() <= 1 { + return Ok(manifest_files); + } + + if let Some(compacted) = self + .full_compact_manifest_files(file_io, manifest_dir, &manifest_files) + .await? + { + return Ok(compacted); + } + + self.minor_compact_manifest_files(file_io, manifest_dir, manifest_files) + .await + } + + fn should_full_compact_manifests(&self, manifest_files: &[ManifestFileMeta]) -> bool { + let delta_size: i64 = manifest_files + .iter() + .filter(|file| { + file.num_deleted_files() > 0 || file.file_size() < self.manifest_target_file_size + }) + .map(ManifestFileMeta::file_size) + .sum(); + delta_size >= self.manifest_full_compaction_threshold_size + } + + async fn full_compact_manifest_files( + &self, + file_io: &FileIO, + manifest_dir: &str, + manifest_files: &[ManifestFileMeta], + ) -> Result>> { + if !self.should_full_compact_manifests(manifest_files) { + return Ok(None); + } + + let delete_identifiers = self + .read_deleted_manifest_identifiers(file_io, manifest_dir, manifest_files) + .await?; + let delete_partitions: HashSet> = delete_identifiers + .iter() + .map(|identifier| identifier.partition.clone()) + .collect(); + + let mut result = Vec::new(); + let mut candidates = Vec::new(); + for manifest_file in manifest_files { + let must_change = self.manifest_must_change(manifest_file); + let affected_by_deletes = + self.manifest_may_contain_deleted_partitions(manifest_file, &delete_partitions); + if must_change || affected_by_deletes { + candidates.push(manifest_file.clone()); + } else { + result.push(manifest_file.clone()); + } + } + + if candidates.len() <= 1 { + return Ok(None); + } + + result.extend( + self.merge_manifest_candidates(file_io, manifest_dir, &candidates) + .await?, + ); + Ok(Some(result)) + } + + async fn read_deleted_manifest_identifiers( + &self, + file_io: &FileIO, + manifest_dir: &str, + manifest_files: &[ManifestFileMeta], + ) -> Result> { + let mut identifiers = HashSet::new(); + for manifest_file in manifest_files { + if manifest_file.num_deleted_files() == 0 { + continue; + } + let path = format!("{manifest_dir}/{}", manifest_file.file_name()); + for entry in Manifest::read(file_io, &path).await? { + if *entry.kind() == FileKind::Delete { + identifiers.insert(entry.into_identifier()); + } + } + } + Ok(identifiers) + } + + fn manifest_must_change(&self, manifest_file: &ManifestFileMeta) -> bool { + manifest_file.num_deleted_files() > 0 + || manifest_file.file_size() < self.manifest_target_file_size + } + + fn manifest_may_contain_deleted_partitions( + &self, + manifest_file: &ManifestFileMeta, + delete_partitions: &HashSet>, + ) -> bool { + if delete_partitions.is_empty() { + return false; + } + + let partition_fields = self.table.schema().partition_fields(); + if partition_fields.is_empty() { + return true; + } + + delete_partitions.iter().any(|partition| { + self.partition_may_match_manifest_stats( + partition, + manifest_file.partition_stats(), + &partition_fields, + ) + }) + } + + fn partition_may_match_manifest_stats( + &self, + partition: &[u8], + stats: &BinaryTableStats, + partition_fields: &[crate::spec::DataField], + ) -> bool { + let Ok(partition_row) = BinaryRow::from_serialized_bytes(partition) else { + return true; + }; + let Ok(min_row) = BinaryRow::from_serialized_bytes(stats.min_values()) else { + return true; + }; + let Ok(max_row) = BinaryRow::from_serialized_bytes(stats.max_values()) else { + return true; + }; + if partition_row.arity() < partition_fields.len() as i32 + || min_row.arity() < partition_fields.len() as i32 + || max_row.arity() < partition_fields.len() as i32 + { + return true; + } + + for (idx, field) in partition_fields.iter().enumerate() { + let data_type = field.data_type(); + let Ok(partition_datum) = extract_datum(&partition_row, idx, data_type) else { + return true; + }; + let Ok(min_datum) = extract_datum(&min_row, idx, data_type) else { + return true; + }; + let Ok(max_datum) = extract_datum(&max_row, idx, data_type) else { + return true; + }; + + match partition_datum { + Some(datum) => { + let (Some(min), Some(max)) = (min_datum, max_datum) else { + return true; + }; + if datum < min || datum > max { + return false; + } + } + None => { + if matches!(stats.null_counts().get(idx), Some(Some(0))) { + return false; + } + } + } + } + + true + } + + async fn minor_compact_manifest_files( + &self, + file_io: &FileIO, + manifest_dir: &str, + manifest_files: Vec, + ) -> Result> { + let mut result = Vec::new(); + let mut candidates = Vec::new(); + let mut total_size = 0; + + for manifest_file in manifest_files { + total_size += manifest_file.file_size(); + candidates.push(manifest_file); + if total_size >= self.manifest_target_file_size { + let merged = self + .merge_manifest_candidates(file_io, manifest_dir, &candidates) + .await?; + result.extend(merged); + candidates.clear(); + total_size = 0; + } + } + + if candidates.len() >= self.manifest_merge_min_count { + let merged = self + .merge_manifest_candidates(file_io, manifest_dir, &candidates) + .await?; + result.extend(merged); + } else { + result.extend(candidates); + } + + Ok(result) + } + + async fn merge_manifest_candidates( + &self, + file_io: &FileIO, + manifest_dir: &str, + candidates: &[ManifestFileMeta], + ) -> Result> { + if candidates.len() == 1 { + return Ok(vec![candidates[0].clone()]); + } + + let merged_entries = self + .merge_manifest_candidate_entries(file_io, manifest_dir, candidates) + .await?; + if merged_entries.is_empty() { + return Ok(vec![]); + } + let compacted_manifest_name = format!("manifest-{}-0", uuid::Uuid::new_v4()); + let compacted_manifest_path = format!("{manifest_dir}/{compacted_manifest_name}"); + let compacted_meta = self + .write_manifest_file( + file_io, + &compacted_manifest_path, + &compacted_manifest_name, + &merged_entries, + ) + .await?; + Ok(vec![compacted_meta]) + } + + async fn merge_manifest_candidate_entries( + &self, + file_io: &FileIO, + manifest_dir: &str, + manifest_files: &[ManifestFileMeta], + ) -> Result> { + let mut merged_entries = HashMap::new(); + for manifest_file in manifest_files { + let path = format!("{manifest_dir}/{}", manifest_file.file_name()); + for entry in Manifest::read(file_io, &path).await? { + merge_manifest_entry_for_compaction(&mut merged_entries, entry)?; + } + } + Ok(merged_entries.into_values().collect()) + } + /// Write an index manifest file from already-merged entries. /// /// Returns `None` if `merged_index_entries` is empty. @@ -1295,6 +1565,35 @@ fn build_partition_stats_row(datums: &[Option], data_types: &[DataType]) builder.build_serialized() } +fn merge_manifest_entry_for_compaction( + entries: &mut HashMap, + entry: ManifestEntry, +) -> Result<()> { + let identifier = entry.identifier(); + match *entry.kind() { + FileKind::Add => { + if entries.contains_key(&identifier) { + return Err(crate::Error::DataInvalid { + message: format!( + "Trying to add file {:?} which is already in the manifest entry map", + identifier + ), + source: None, + }); + } + entries.insert(identifier, entry); + } + FileKind::Delete => { + if entries.contains_key(&identifier) { + entries.remove(&identifier); + } else { + entries.insert(identifier, entry); + } + } + } + Ok(()) +} + /// Plan for resolving commit entries. enum CommitEntriesPlan { /// Caller-provided entries. May contain `FileKind::Delete` entries from CoW @@ -1430,11 +1729,19 @@ mod tests { } fn test_table(file_io: &FileIO, table_path: &str) -> Table { + test_table_with_options(file_io, table_path, HashMap::new()) + } + + fn test_table_with_options( + file_io: &FileIO, + table_path: &str, + options: HashMap, + ) -> Table { Table::new( file_io.clone(), Identifier::new("default", "test_table"), table_path.to_string(), - test_schema(), + test_schema().copy_with_options(options), None, ) } @@ -1505,11 +1812,35 @@ mod tests { TableCommit::new(table, "test-user".to_string()) } + fn setup_commit_with_options( + file_io: &FileIO, + table_path: &str, + options: HashMap, + ) -> TableCommit { + let table = test_table_with_options(file_io, table_path, options); + TableCommit::new(table, "test-user".to_string()) + } + fn setup_partitioned_commit(file_io: &FileIO, table_path: &str) -> TableCommit { let table = test_partitioned_table(file_io, table_path); TableCommit::new(table, "test-user".to_string()) } + fn setup_partitioned_commit_with_options( + file_io: &FileIO, + table_path: &str, + options: HashMap, + ) -> TableCommit { + let table = Table::new( + file_io.clone(), + Identifier::new("default", "test_table"), + table_path.to_string(), + test_partitioned_schema().copy_with_options(options), + None, + ); + TableCommit::new(table, "test-user".to_string()) + } + fn partition_bytes(pt: &str) -> Vec { let mut builder = BinaryRowBuilder::new(1); if pt.len() <= 7 { @@ -1605,6 +1936,255 @@ mod tests { assert_eq!(snapshot.delta_record_count(), Some(200)); } + #[tokio::test] + async fn test_commit_keeps_manifest_tail_below_default_merge_min_count() { + let file_io = test_file_io(); + let table_path = "memory:/test_commit_keeps_manifest_tail_below_default_merge_min_count"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_commit(&file_io, table_path); + for i in 0..6 { + commit + .commit(vec![CommitMessage::new( + vec![], + 0, + vec![test_data_file(&format!("data-{i}.parquet"), 10)], + )]) + .await + .unwrap(); + } + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.id(), 6); + assert_eq!(snapshot.total_record_count(), Some(60)); + + let manifest_dir = format!("{table_path}/manifest"); + let base_path = format!("{manifest_dir}/{}", snapshot.base_manifest_list()); + let base_metas = ManifestList::read(&file_io, &base_path).await.unwrap(); + assert_eq!( + base_metas.len(), + 5, + "default manifest.merge-min-count=30 should keep a small tail unchanged" + ); + + let delta_path = format!("{manifest_dir}/{}", snapshot.delta_manifest_list()); + let delta_metas = ManifestList::read(&file_io, &delta_path).await.unwrap(); + assert_eq!(delta_metas.len(), 1); + } + + #[tokio::test] + async fn test_commit_compacts_manifest_tail_when_merge_min_count_is_reached() { + let file_io = test_file_io(); + let table_path = + "memory:/test_commit_compacts_manifest_tail_when_merge_min_count_is_reached"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_commit_with_options( + &file_io, + table_path, + HashMap::from([("manifest.merge-min-count".to_string(), "3".to_string())]), + ); + for i in 0..4 { + commit + .commit(vec![CommitMessage::new( + vec![], + 0, + vec![test_data_file(&format!("data-{i}.parquet"), 10)], + )]) + .await + .unwrap(); + } + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.id(), 4); + assert_eq!(snapshot.total_record_count(), Some(40)); + + let manifest_dir = format!("{table_path}/manifest"); + let base_path = format!("{manifest_dir}/{}", snapshot.base_manifest_list()); + let base_metas = ManifestList::read(&file_io, &base_path).await.unwrap(); + assert_eq!( + base_metas.len(), + 1, + "manifest.merge-min-count=3 should merge the tail of 3 manifests" + ); + + let compacted_entries = Manifest::read( + &file_io, + &format!("{manifest_dir}/{}", base_metas[0].file_name()), + ) + .await + .unwrap(); + assert_eq!(compacted_entries.len(), 3); + assert!( + !base_metas[0].file_name().ends_with("-compact"), + "compacted manifest file names should match Java-style manifest--" + ); + + let delta_path = format!("{manifest_dir}/{}", snapshot.delta_manifest_list()); + let delta_metas = ManifestList::read(&file_io, &delta_path).await.unwrap(); + assert_eq!(delta_metas.len(), 1); + } + + #[tokio::test] + async fn test_full_manifest_compaction_skips_unaffected_base_manifests() { + let file_io = test_file_io(); + let table_path = "memory:/test_full_manifest_compaction_skips_unaffected_base_manifests"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_partitioned_commit_with_options( + &file_io, + table_path, + HashMap::from([ + ( + "manifest.full-compaction-threshold-size".to_string(), + "1B".to_string(), + ), + ("manifest.target-file-size".to_string(), "1B".to_string()), + ]), + ); + let manifest_dir = format!("{table_path}/manifest"); + + let keep_entry = ManifestEntry::new( + FileKind::Add, + partition_bytes("keep"), + 0, + 1, + test_data_file("keep.parquet", 10), + 2, + ); + let keep_meta = commit + .write_manifest_file( + &file_io, + &format!("{manifest_dir}/manifest-keep-0"), + "manifest-keep-0", + &[keep_entry], + ) + .await + .unwrap(); + + let drop_file = test_data_file("drop.parquet", 10); + let drop_entry = ManifestEntry::new( + FileKind::Add, + partition_bytes("drop"), + 0, + 1, + drop_file.clone(), + 2, + ); + let drop_meta = commit + .write_manifest_file( + &file_io, + &format!("{manifest_dir}/manifest-drop-0"), + "manifest-drop-0", + &[drop_entry], + ) + .await + .unwrap(); + + let delete_entry = ManifestEntry::new( + FileKind::Delete, + partition_bytes("drop"), + 0, + 1, + drop_file, + 2, + ); + let delete_meta = commit + .write_manifest_file( + &file_io, + &format!("{manifest_dir}/manifest-delete-0"), + "manifest-delete-0", + &[delete_entry], + ) + .await + .unwrap(); + + assert!(commit.should_full_compact_manifests(&[ + keep_meta.clone(), + drop_meta.clone(), + delete_meta.clone(), + ])); + + let compacted = commit + .compact_manifest_files_if_needed( + &file_io, + &manifest_dir, + vec![keep_meta.clone(), drop_meta, delete_meta], + ) + .await + .unwrap(); + + assert_eq!( + compacted.len(), + 1, + "full compaction should preserve unaffected base manifests and drop fully canceled candidates" + ); + assert_eq!(compacted[0].file_name(), keep_meta.file_name()); + } + + #[tokio::test] + async fn test_manifest_compaction_preserves_unmatched_delete_entries() { + let file_io = test_file_io(); + let table_path = "memory:/test_manifest_compaction_preserves_unmatched_delete_entries"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_commit(&file_io, table_path); + let manifest_dir = format!("{table_path}/manifest"); + + let deleted_file = test_data_file("deleted-in-base.parquet", 10); + let delete_entry = + ManifestEntry::new(FileKind::Delete, vec![], 0, 1, deleted_file.clone(), 2); + let delete_meta = commit + .write_manifest_file( + &file_io, + &format!("{manifest_dir}/manifest-delete-only-0"), + "manifest-delete-only-0", + &[delete_entry], + ) + .await + .unwrap(); + + let add_entry = ManifestEntry::new( + FileKind::Add, + vec![], + 0, + 1, + test_data_file("unrelated.parquet", 5), + 2, + ); + let add_meta = commit + .write_manifest_file( + &file_io, + &format!("{manifest_dir}/manifest-add-only-0"), + "manifest-add-only-0", + &[add_entry], + ) + .await + .unwrap(); + + let compacted = commit + .merge_manifest_candidates(&file_io, &manifest_dir, &[delete_meta, add_meta]) + .await + .unwrap(); + assert_eq!(compacted.len(), 1); + + let compacted_entries = Manifest::read( + &file_io, + &format!("{manifest_dir}/{}", compacted[0].file_name()), + ) + .await + .unwrap(); + assert_eq!(compacted_entries.len(), 2); + assert!( + compacted_entries.iter().any(|entry| { + *entry.kind() == FileKind::Delete && entry.file().file_name == deleted_file.file_name + }), + "Java FileEntry.mergeEntries keeps unmatched DELETE entries because the ADD can live in an older manifest" + ); + } + #[tokio::test] async fn test_empty_commit_is_noop() { let file_io = test_file_io(); From 031a3b56d06b51408aff8e6c55fff46d2c13d4c4 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Sat, 20 Jun 2026 12:11:26 +0800 Subject: [PATCH 2/5] fix: align manifest compaction delete handling --- crates/paimon/src/spec/manifest.rs | 121 ++++++++++----- crates/paimon/src/spec/mod.rs | 2 +- crates/paimon/src/table/table_commit.rs | 188 +++++++++++++++++------- 3 files changed, 225 insertions(+), 86 deletions(-) diff --git a/crates/paimon/src/spec/manifest.rs b/crates/paimon/src/spec/manifest.rs index dfbc0a6c..67dd1d2f 100644 --- a/crates/paimon/src/spec/manifest.rs +++ b/crates/paimon/src/spec/manifest.rs @@ -22,6 +22,7 @@ use crate::spec::manifest_entry::MANIFEST_ENTRY_SCHEMA; use crate::spec::FileKind; use crate::Result; +use std::collections::HashMap; /// Manifest file reader and writer. /// @@ -68,13 +69,54 @@ impl Manifest { } } +/// Merge ADD/DELETE entries by file identifier, matching Java `FileEntry.mergeEntries`. +/// +/// An unmatched DELETE is preserved because its matching ADD may live in an +/// older manifest that is not part of this merge batch. +pub(crate) fn merge_entries( + entries: impl IntoIterator, +) -> Result> { + let mut merged_entries = HashMap::new(); + for entry in entries { + merge_entry(&mut merged_entries, entry)?; + } + Ok(merged_entries.into_values().collect()) +} + +fn merge_entry( + merged_entries: &mut HashMap, + entry: ManifestEntry, +) -> Result<()> { + let identifier = entry.identifier(); + match *entry.kind() { + FileKind::Add => { + if merged_entries.contains_key(&identifier) { + return Err(crate::Error::DataInvalid { + message: format!( + "Trying to add file {:?} which is already in the manifest entry map", + identifier + ), + source: None, + }); + } + merged_entries.insert(identifier, entry); + } + FileKind::Delete => { + if merged_entries.contains_key(&identifier) { + merged_entries.remove(&identifier); + } else { + merged_entries.insert(identifier, entry); + } + } + } + Ok(()) +} + /// Merge ADD/DELETE entries by file identifier for scan-style visibility, /// returning only the active ADD set. Manifest compaction must preserve /// unmatched DELETE entries and uses a table-commit-specific merge path. /// Return order is unspecified. pub(crate) fn merge_active_entries(entries: Vec) -> Vec { - use std::collections::HashMap; - use crate::spec::manifest_entry::Identifier; let mut map: HashMap = HashMap::new(); for entry in entries { @@ -95,9 +137,39 @@ pub(crate) fn merge_active_entries(entries: Vec) -> Vec ManifestEntry { + let stats = BinaryTableStats::empty(); + let file = DataFileMeta { + file_name: file_name.to_string(), + file_size: 100, + row_count: 10, + min_key: vec![], + max_key: vec![], + key_stats: stats.clone(), + value_stats: stats, + min_sequence_number: 0, + max_sequence_number: 0, + schema_id: 0, + level, + extra_files: vec![], + creation_time: None, + delete_row_count: None, + embedded_index: None, + file_source: None, + value_stats_cols: None, + external_path: None, + first_row_id: None, + write_cols: None, + }; + ManifestEntry::new(kind, vec![], 0, 1, file, 2) + } + #[tokio::test] async fn test_read_manifest_from_file() { let workdir = current_dir().unwrap(); @@ -121,37 +193,6 @@ mod tests { #[test] fn test_merge_active_entries_cancels_add_then_delete() { - use crate::spec::data_file::DataFileMeta; - use crate::spec::stats::BinaryTableStats; - use crate::spec::ManifestEntry; - - fn entry(kind: FileKind, file_name: &str, level: i32) -> ManifestEntry { - let stats = BinaryTableStats::empty(); - let file = DataFileMeta { - file_name: file_name.to_string(), - file_size: 100, - row_count: 10, - min_key: vec![], - max_key: vec![], - key_stats: stats.clone(), - value_stats: stats, - min_sequence_number: 0, - max_sequence_number: 0, - schema_id: 0, - level, - extra_files: vec![], - creation_time: None, - delete_row_count: None, - embedded_index: None, - file_source: None, - value_stats_cols: None, - external_path: None, - first_row_id: None, - write_cols: None, - }; - ManifestEntry::new(kind, vec![], 0, 1, file, 2) - } - let cancelled = merge_active_entries(vec![ entry(FileKind::Add, "f.parquet", 0), entry(FileKind::Delete, "f.parquet", 0), @@ -172,4 +213,18 @@ mod tests { assert_eq!(compacted.len(), 1); assert_eq!(compacted[0].file().level, 1); } + + #[test] + fn test_merge_entries_preserves_unmatched_delete() { + let merged = merge_entries(vec![ + entry(FileKind::Delete, "deleted-in-base.parquet", 0), + entry(FileKind::Add, "unrelated.parquet", 0), + ]) + .unwrap(); + + assert_eq!(merged.len(), 2); + assert!(merged.iter().any(|entry| { + *entry.kind() == FileKind::Delete && entry.file().file_name == "deleted-in-base.parquet" + })); + } } diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs index 88fec22f..fa14f8ad 100644 --- a/crates/paimon/src/spec/mod.rs +++ b/crates/paimon/src/spec/mod.rs @@ -61,8 +61,8 @@ pub use index_file_meta::*; mod index_manifest; pub use index_manifest::{IndexManifest, IndexManifestEntry}; mod manifest; -pub(crate) use manifest::merge_active_entries; pub use manifest::Manifest; +pub(crate) use manifest::{merge_active_entries, merge_entries}; mod manifest_common; pub use manifest_common::FileKind; mod manifest_entry; diff --git a/crates/paimon/src/table/table_commit.rs b/crates/paimon/src/table/table_commit.rs index 707aefb7..9d91094d 100644 --- a/crates/paimon/src/table/table_commit.rs +++ b/crates/paimon/src/table/table_commit.rs @@ -24,9 +24,9 @@ use crate::io::FileIO; use crate::spec::stats::BinaryTableStats; use crate::spec::FileKind; use crate::spec::{ - datums_to_binary_row, extract_datum, BinaryRow, BinaryRowBuilder, CommitKind, CoreOptions, - DataType, Datum, IndexManifest, IndexManifestEntry, Manifest, ManifestEntry, ManifestFileMeta, - ManifestList, PartitionStatistics, Snapshot, + datums_to_binary_row, extract_datum, merge_entries, BinaryRow, BinaryRowBuilder, CommitKind, + CoreOptions, DataType, Datum, IndexManifest, IndexManifestEntry, Manifest, ManifestEntry, + ManifestFileMeta, ManifestList, PartitionStatistics, Snapshot, }; use crate::table::commit_message::CommitMessage; use crate::table::partition_filter::PartitionFilter; @@ -621,13 +621,56 @@ impl TableCommit { return Ok(None); } + let mut merged_entries = Vec::new(); + for manifest_file in candidates { + let (require_change, entries) = self + .read_manifest_for_full_compaction( + file_io, + manifest_dir, + &manifest_file, + &delete_identifiers, + ) + .await?; + + if require_change { + merged_entries.extend(entries); + } else { + result.push(manifest_file); + } + } + result.extend( - self.merge_manifest_candidates(file_io, manifest_dir, &candidates) + self.write_compacted_manifest_entries(file_io, manifest_dir, &merged_entries) .await?, ); Ok(Some(result)) } + async fn read_manifest_for_full_compaction( + &self, + file_io: &FileIO, + manifest_dir: &str, + manifest_file: &ManifestFileMeta, + delete_identifiers: &HashSet, + ) -> Result<(bool, Vec)> { + let path = format!("{manifest_dir}/{}", manifest_file.file_name()); + let mut require_change = self.manifest_must_change(manifest_file); + let mut entries = Vec::new(); + + for entry in Manifest::read(file_io, &path).await? { + if *entry.kind() != FileKind::Add { + continue; + } + if delete_identifiers.contains(&entry.identifier()) { + require_change = true; + } else { + entries.push(entry); + } + } + + Ok((require_change, entries)) + } + async fn read_deleted_manifest_identifiers( &self, file_io: &FileIO, @@ -779,20 +822,8 @@ impl TableCommit { let merged_entries = self .merge_manifest_candidate_entries(file_io, manifest_dir, candidates) .await?; - if merged_entries.is_empty() { - return Ok(vec![]); - } - let compacted_manifest_name = format!("manifest-{}-0", uuid::Uuid::new_v4()); - let compacted_manifest_path = format!("{manifest_dir}/{compacted_manifest_name}"); - let compacted_meta = self - .write_manifest_file( - file_io, - &compacted_manifest_path, - &compacted_manifest_name, - &merged_entries, - ) - .await?; - Ok(vec![compacted_meta]) + self.write_compacted_manifest_entries(file_io, manifest_dir, &merged_entries) + .await } async fn merge_manifest_candidate_entries( @@ -801,14 +832,30 @@ impl TableCommit { manifest_dir: &str, manifest_files: &[ManifestFileMeta], ) -> Result> { - let mut merged_entries = HashMap::new(); + let mut entries = Vec::new(); for manifest_file in manifest_files { let path = format!("{manifest_dir}/{}", manifest_file.file_name()); - for entry in Manifest::read(file_io, &path).await? { - merge_manifest_entry_for_compaction(&mut merged_entries, entry)?; - } + entries.extend(Manifest::read(file_io, &path).await?); } - Ok(merged_entries.into_values().collect()) + merge_entries(entries) + } + + async fn write_compacted_manifest_entries( + &self, + file_io: &FileIO, + manifest_dir: &str, + entries: &[ManifestEntry], + ) -> Result> { + if entries.is_empty() { + return Ok(vec![]); + } + + let manifest_name = format!("manifest-{}-0", uuid::Uuid::new_v4()); + let manifest_path = format!("{manifest_dir}/{manifest_name}"); + let manifest_meta = self + .write_manifest_file(file_io, &manifest_path, &manifest_name, entries) + .await?; + Ok(vec![manifest_meta]) } /// Write an index manifest file from already-merged entries. @@ -1565,35 +1612,6 @@ fn build_partition_stats_row(datums: &[Option], data_types: &[DataType]) builder.build_serialized() } -fn merge_manifest_entry_for_compaction( - entries: &mut HashMap, - entry: ManifestEntry, -) -> Result<()> { - let identifier = entry.identifier(); - match *entry.kind() { - FileKind::Add => { - if entries.contains_key(&identifier) { - return Err(crate::Error::DataInvalid { - message: format!( - "Trying to add file {:?} which is already in the manifest entry map", - identifier - ), - source: None, - }); - } - entries.insert(identifier, entry); - } - FileKind::Delete => { - if entries.contains_key(&identifier) { - entries.remove(&identifier); - } else { - entries.insert(identifier, entry); - } - } - } - Ok(()) -} - /// Plan for resolving commit entries. enum CommitEntriesPlan { /// Caller-provided entries. May contain `FileKind::Delete` entries from CoW @@ -2185,6 +2203,72 @@ mod tests { ); } + #[tokio::test] + async fn test_full_manifest_compaction_filters_delete_entries() { + let file_io = test_file_io(); + let table_path = "memory:/test_full_manifest_compaction_filters_delete_entries"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_commit_with_options( + &file_io, + table_path, + HashMap::from([ + ( + "manifest.full-compaction-threshold-size".to_string(), + "1B".to_string(), + ), + ("manifest.target-file-size".to_string(), "1B".to_string()), + ]), + ); + let manifest_dir = format!("{table_path}/manifest"); + + let deleted_file = test_data_file("deleted-in-base.parquet", 10); + let delete_entry = + ManifestEntry::new(FileKind::Delete, vec![], 0, 1, deleted_file.clone(), 2); + let delete_meta = commit + .write_manifest_file( + &file_io, + &format!("{manifest_dir}/manifest-delete-only-0"), + "manifest-delete-only-0", + &[delete_entry], + ) + .await + .unwrap(); + + let add_entry = ManifestEntry::new( + FileKind::Add, + vec![], + 0, + 1, + test_data_file("unrelated.parquet", 5), + 2, + ); + let add_meta = commit + .write_manifest_file( + &file_io, + &format!("{manifest_dir}/manifest-add-only-0"), + "manifest-add-only-0", + &[add_entry], + ) + .await + .unwrap(); + + let compacted = commit + .compact_manifest_files_if_needed( + &file_io, + &manifest_dir, + vec![delete_meta, add_meta.clone()], + ) + .await + .unwrap(); + assert_eq!( + compacted.len(), + 1, + "full compaction should remove delete-only manifests instead of writing DELETE entries back" + ); + assert_eq!(compacted[0].file_name(), add_meta.file_name()); + } + #[tokio::test] async fn test_empty_commit_is_noop() { let file_io = test_file_io(); From 600dad32970b3cf37b5bd6839351b0964e8323f8 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Sat, 20 Jun 2026 12:25:17 +0800 Subject: [PATCH 3/5] refactor: extract manifest file merger --- crates/paimon/src/spec/manifest.rs | 60 +- crates/paimon/src/spec/mod.rs | 2 +- .../paimon/src/table/manifest_file_merger.rs | 794 ++++++++++++++++++ crates/paimon/src/table/mod.rs | 1 + crates/paimon/src/table/table_commit.rs | 327 +------- 5 files changed, 819 insertions(+), 365 deletions(-) create mode 100644 crates/paimon/src/table/manifest_file_merger.rs diff --git a/crates/paimon/src/spec/manifest.rs b/crates/paimon/src/spec/manifest.rs index 67dd1d2f..09a3cb1b 100644 --- a/crates/paimon/src/spec/manifest.rs +++ b/crates/paimon/src/spec/manifest.rs @@ -22,7 +22,6 @@ use crate::spec::manifest_entry::MANIFEST_ENTRY_SCHEMA; use crate::spec::FileKind; use crate::Result; -use std::collections::HashMap; /// Manifest file reader and writer. /// @@ -69,54 +68,13 @@ impl Manifest { } } -/// Merge ADD/DELETE entries by file identifier, matching Java `FileEntry.mergeEntries`. -/// -/// An unmatched DELETE is preserved because its matching ADD may live in an -/// older manifest that is not part of this merge batch. -pub(crate) fn merge_entries( - entries: impl IntoIterator, -) -> Result> { - let mut merged_entries = HashMap::new(); - for entry in entries { - merge_entry(&mut merged_entries, entry)?; - } - Ok(merged_entries.into_values().collect()) -} - -fn merge_entry( - merged_entries: &mut HashMap, - entry: ManifestEntry, -) -> Result<()> { - let identifier = entry.identifier(); - match *entry.kind() { - FileKind::Add => { - if merged_entries.contains_key(&identifier) { - return Err(crate::Error::DataInvalid { - message: format!( - "Trying to add file {:?} which is already in the manifest entry map", - identifier - ), - source: None, - }); - } - merged_entries.insert(identifier, entry); - } - FileKind::Delete => { - if merged_entries.contains_key(&identifier) { - merged_entries.remove(&identifier); - } else { - merged_entries.insert(identifier, entry); - } - } - } - Ok(()) -} - /// Merge ADD/DELETE entries by file identifier for scan-style visibility, /// returning only the active ADD set. Manifest compaction must preserve /// unmatched DELETE entries and uses a table-commit-specific merge path. /// Return order is unspecified. pub(crate) fn merge_active_entries(entries: Vec) -> Vec { + use std::collections::HashMap; + use crate::spec::manifest_entry::Identifier; let mut map: HashMap = HashMap::new(); for entry in entries { @@ -213,18 +171,4 @@ mod tests { assert_eq!(compacted.len(), 1); assert_eq!(compacted[0].file().level, 1); } - - #[test] - fn test_merge_entries_preserves_unmatched_delete() { - let merged = merge_entries(vec![ - entry(FileKind::Delete, "deleted-in-base.parquet", 0), - entry(FileKind::Add, "unrelated.parquet", 0), - ]) - .unwrap(); - - assert_eq!(merged.len(), 2); - assert!(merged.iter().any(|entry| { - *entry.kind() == FileKind::Delete && entry.file().file_name == "deleted-in-base.parquet" - })); - } } diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs index fa14f8ad..88fec22f 100644 --- a/crates/paimon/src/spec/mod.rs +++ b/crates/paimon/src/spec/mod.rs @@ -61,8 +61,8 @@ pub use index_file_meta::*; mod index_manifest; pub use index_manifest::{IndexManifest, IndexManifestEntry}; mod manifest; +pub(crate) use manifest::merge_active_entries; pub use manifest::Manifest; -pub(crate) use manifest::{merge_active_entries, merge_entries}; mod manifest_common; pub use manifest_common::FileKind; mod manifest_entry; diff --git a/crates/paimon/src/table/manifest_file_merger.rs b/crates/paimon/src/table/manifest_file_merger.rs new file mode 100644 index 00000000..bebce3fc --- /dev/null +++ b/crates/paimon/src/table/manifest_file_merger.rs @@ -0,0 +1,794 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::io::FileIO; +use crate::spec::stats::BinaryTableStats; +use crate::spec::{ + extract_datum, BinaryRow, BinaryRowBuilder, DataField, DataType, Datum, FileKind, Identifier, + Manifest, ManifestEntry, ManifestFileMeta, +}; +use crate::Result; +use std::collections::{HashMap, HashSet}; + +/// Manifest file merger with Java `ManifestFileMerger`-style full and minor compaction. +pub(crate) struct ManifestFileMerger<'a> { + file_io: &'a FileIO, + manifest_dir: &'a str, + partition_fields: &'a [DataField], + schema_id: i64, + target_file_size: i64, + full_compaction_threshold_size: i64, + merge_min_count: usize, +} + +impl<'a> ManifestFileMerger<'a> { + pub(crate) fn new( + file_io: &'a FileIO, + manifest_dir: &'a str, + partition_fields: &'a [DataField], + schema_id: i64, + target_file_size: i64, + full_compaction_threshold_size: i64, + merge_min_count: usize, + ) -> Self { + Self { + file_io, + manifest_dir, + partition_fields, + schema_id, + target_file_size, + full_compaction_threshold_size, + merge_min_count, + } + } + + pub(crate) async fn merge( + &self, + manifest_files: Vec, + ) -> Result> { + if manifest_files.len() <= 1 { + return Ok(manifest_files); + } + + if let Some(compacted) = self.try_full_compaction(&manifest_files).await? { + return Ok(compacted); + } + + self.try_minor_compaction(manifest_files).await + } + + fn should_full_compact(&self, manifest_files: &[ManifestFileMeta]) -> bool { + let delta_size: i64 = manifest_files + .iter() + .filter(|file| self.must_change(file)) + .map(ManifestFileMeta::file_size) + .sum(); + delta_size >= self.full_compaction_threshold_size + } + + async fn try_full_compaction( + &self, + manifest_files: &[ManifestFileMeta], + ) -> Result>> { + if !self.should_full_compact(manifest_files) { + return Ok(None); + } + + let delete_identifiers = self.read_deleted_identifiers(manifest_files).await?; + let delete_partitions: HashSet> = delete_identifiers + .iter() + .map(|identifier| identifier.partition.clone()) + .collect(); + + let mut result = Vec::new(); + let mut to_be_merged = Vec::new(); + for manifest_file in manifest_files { + if self.must_change(manifest_file) + || self.may_contain_deleted_partitions(manifest_file, &delete_partitions) + { + to_be_merged.push(manifest_file.clone()); + } else { + result.push(manifest_file.clone()); + } + } + + if to_be_merged.len() <= 1 { + return Ok(None); + } + + let mut merged_entries = Vec::new(); + for manifest_file in to_be_merged { + let read_result = self + .read_for_full_compaction(&manifest_file, &delete_identifiers) + .await?; + if read_result.require_change { + merged_entries.extend(read_result.entries); + } else { + result.push(read_result.file); + } + } + + result.extend(self.write_compacted_entries(&merged_entries).await?); + Ok(Some(result)) + } + + async fn read_deleted_identifiers( + &self, + manifest_files: &[ManifestFileMeta], + ) -> Result> { + let mut identifiers = HashSet::new(); + for manifest_file in manifest_files { + if manifest_file.num_deleted_files() == 0 { + continue; + } + let path = self.manifest_path(manifest_file.file_name()); + for entry in Manifest::read(self.file_io, &path).await? { + if *entry.kind() == FileKind::Delete { + identifiers.insert(entry.into_identifier()); + } + } + } + Ok(identifiers) + } + + async fn read_for_full_compaction( + &self, + manifest_file: &ManifestFileMeta, + delete_identifiers: &HashSet, + ) -> Result { + let path = self.manifest_path(manifest_file.file_name()); + let mut require_change = self.must_change(manifest_file); + let mut entries = Vec::new(); + + for entry in Manifest::read(self.file_io, &path).await? { + if *entry.kind() != FileKind::Add { + continue; + } + if delete_identifiers.contains(&entry.identifier()) { + require_change = true; + } else { + entries.push(entry); + } + } + + Ok(FullCompactionReadResult { + file: manifest_file.clone(), + require_change, + entries, + }) + } + + fn must_change(&self, manifest_file: &ManifestFileMeta) -> bool { + manifest_file.num_deleted_files() > 0 || manifest_file.file_size() < self.target_file_size + } + + fn may_contain_deleted_partitions( + &self, + manifest_file: &ManifestFileMeta, + delete_partitions: &HashSet>, + ) -> bool { + if delete_partitions.is_empty() { + return false; + } + + if self.partition_fields.is_empty() { + return true; + } + + delete_partitions.iter().any(|partition| { + self.partition_may_match_manifest_stats(partition, manifest_file.partition_stats()) + }) + } + + fn partition_may_match_manifest_stats( + &self, + partition: &[u8], + stats: &BinaryTableStats, + ) -> bool { + let Ok(partition_row) = BinaryRow::from_serialized_bytes(partition) else { + return true; + }; + let Ok(min_row) = BinaryRow::from_serialized_bytes(stats.min_values()) else { + return true; + }; + let Ok(max_row) = BinaryRow::from_serialized_bytes(stats.max_values()) else { + return true; + }; + if partition_row.arity() < self.partition_fields.len() as i32 + || min_row.arity() < self.partition_fields.len() as i32 + || max_row.arity() < self.partition_fields.len() as i32 + { + return true; + } + + for (idx, field) in self.partition_fields.iter().enumerate() { + let data_type = field.data_type(); + let Ok(partition_datum) = extract_datum(&partition_row, idx, data_type) else { + return true; + }; + let Ok(min_datum) = extract_datum(&min_row, idx, data_type) else { + return true; + }; + let Ok(max_datum) = extract_datum(&max_row, idx, data_type) else { + return true; + }; + + match partition_datum { + Some(datum) => { + let (Some(min), Some(max)) = (min_datum, max_datum) else { + return true; + }; + if datum < min || datum > max { + return false; + } + } + None => { + if matches!(stats.null_counts().get(idx), Some(Some(0))) { + return false; + } + } + } + } + + true + } + + async fn try_minor_compaction( + &self, + manifest_files: Vec, + ) -> Result> { + let mut result = Vec::new(); + let mut candidates = Vec::new(); + let mut total_size = 0; + + for manifest_file in manifest_files { + total_size += manifest_file.file_size(); + candidates.push(manifest_file); + if total_size >= self.target_file_size { + let merged = self.merge_candidates(&candidates).await?; + result.extend(merged); + candidates.clear(); + total_size = 0; + } + } + + if candidates.len() >= self.merge_min_count { + let merged = self.merge_candidates(&candidates).await?; + result.extend(merged); + } else { + result.extend(candidates); + } + + Ok(result) + } + + async fn merge_candidates( + &self, + candidates: &[ManifestFileMeta], + ) -> Result> { + if candidates.len() == 1 { + return Ok(vec![candidates[0].clone()]); + } + + let mut entries = Vec::new(); + for manifest_file in candidates { + let path = self.manifest_path(manifest_file.file_name()); + entries.extend(Manifest::read(self.file_io, &path).await?); + } + + let merged_entries = merge_entries(entries)?; + self.write_compacted_entries(&merged_entries).await + } + + async fn write_compacted_entries( + &self, + entries: &[ManifestEntry], + ) -> Result> { + if entries.is_empty() { + return Ok(vec![]); + } + + let manifest_name = format!("manifest-{}-0", uuid::Uuid::new_v4()); + let manifest_path = self.manifest_path(&manifest_name); + let manifest_meta = self + .write_manifest_file(&manifest_path, &manifest_name, entries) + .await?; + Ok(vec![manifest_meta]) + } + + async fn write_manifest_file( + &self, + path: &str, + file_name: &str, + entries: &[ManifestEntry], + ) -> Result { + Manifest::write(self.file_io, path, entries).await?; + + let mut added_file_count: i64 = 0; + let mut deleted_file_count: i64 = 0; + let mut min_bucket: Option = None; + let mut max_bucket: Option = None; + let mut min_level: Option = None; + let mut max_level: Option = None; + for entry in entries { + match entry.kind() { + FileKind::Add => added_file_count += 1, + FileKind::Delete => deleted_file_count += 1, + } + let b = entry.bucket(); + min_bucket = Some(min_bucket.map_or(b, |cur| cur.min(b))); + max_bucket = Some(max_bucket.map_or(b, |cur| cur.max(b))); + let l = entry.file().level; + min_level = Some(min_level.map_or(l, |cur| cur.min(l))); + max_level = Some(max_level.map_or(l, |cur| cur.max(l))); + } + + let status = self.file_io.get_status(path).await?; + let partition_stats = compute_partition_stats(entries, self.partition_fields)?; + + Ok(ManifestFileMeta::new( + file_name.to_string(), + status.size as i64, + added_file_count, + deleted_file_count, + partition_stats, + self.schema_id, + ) + .with_bucket_level_stats(min_bucket, max_bucket, min_level, max_level)) + } + + fn manifest_path(&self, file_name: &str) -> String { + format!("{}/{}", self.manifest_dir, file_name) + } +} + +fn merge_entries(entries: impl IntoIterator) -> Result> { + let mut merged_entries = HashMap::new(); + for entry in entries { + merge_entry(&mut merged_entries, entry)?; + } + Ok(merged_entries.into_values().collect()) +} + +fn merge_entry( + merged_entries: &mut HashMap, + entry: ManifestEntry, +) -> Result<()> { + let identifier = entry.identifier(); + match *entry.kind() { + FileKind::Add => { + if merged_entries.contains_key(&identifier) { + return Err(crate::Error::DataInvalid { + message: format!( + "Trying to add file {:?} which is already in the manifest entry map", + identifier + ), + source: None, + }); + } + merged_entries.insert(identifier, entry); + } + FileKind::Delete => { + if merged_entries.contains_key(&identifier) { + merged_entries.remove(&identifier); + } else { + merged_entries.insert(identifier, entry); + } + } + } + Ok(()) +} + +fn compute_partition_stats( + entries: &[ManifestEntry], + partition_fields: &[DataField], +) -> Result { + let num_fields = partition_fields.len(); + + if num_fields == 0 || entries.is_empty() { + return Ok(BinaryTableStats::empty()); + } + + let data_types: Vec<_> = partition_fields + .iter() + .map(|f| f.data_type().clone()) + .collect(); + let mut mins: Vec> = vec![None; num_fields]; + let mut maxs: Vec> = vec![None; num_fields]; + let mut null_counts: Vec = vec![0; num_fields]; + + for entry in entries { + let partition_bytes = entry.partition(); + if partition_bytes.is_empty() { + continue; + } + let row = BinaryRow::from_serialized_bytes(partition_bytes)?; + for i in 0..num_fields { + match extract_datum(&row, i, &data_types[i])? { + Some(datum) => { + mins[i] = Some(match mins[i].take() { + Some(cur) if cur <= datum => cur, + Some(_) => datum.clone(), + None => datum.clone(), + }); + maxs[i] = Some(match maxs[i].take() { + Some(cur) if cur >= datum => cur, + Some(_) => datum, + None => datum, + }); + } + None => { + null_counts[i] += 1; + } + } + } + } + + let min_bytes = build_partition_stats_row(&mins, &data_types); + let max_bytes = build_partition_stats_row(&maxs, &data_types); + let null_counts = null_counts.into_iter().map(Some).collect(); + + Ok(BinaryTableStats::new(min_bytes, max_bytes, null_counts)) +} + +fn build_partition_stats_row(datums: &[Option], data_types: &[DataType]) -> Vec { + let mut builder = BinaryRowBuilder::new(datums.len() as i32); + for (pos, (datum_opt, data_type)) in datums.iter().zip(data_types.iter()).enumerate() { + match datum_opt { + Some(d) => builder.write_datum(pos, d, data_type), + None => builder.set_null_at(pos), + } + } + builder.build_serialized() +} + +struct FullCompactionReadResult { + file: ManifestFileMeta, + require_change: bool, + entries: Vec, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIOBuilder; + use crate::spec::stats::BinaryTableStats; + use crate::spec::{DataFileMeta, IntType, Schema, TableSchema, VarCharType}; + use chrono::{DateTime, Utc}; + use std::collections::HashMap; + + fn test_file_io() -> FileIO { + FileIOBuilder::new("memory").build().unwrap() + } + + fn test_schema() -> TableSchema { + let schema = Schema::builder() + .column("id", crate::spec::DataType::Int(IntType::new())) + .build() + .unwrap(); + TableSchema::new(0, &schema) + } + + fn test_partitioned_schema() -> TableSchema { + let schema = Schema::builder() + .column( + "pt", + crate::spec::DataType::VarChar(VarCharType::string_type()), + ) + .column("id", crate::spec::DataType::Int(IntType::new())) + .partition_keys(["pt"]) + .build() + .unwrap(); + TableSchema::new(0, &schema) + } + + fn test_data_file(name: &str, row_count: i64) -> DataFileMeta { + DataFileMeta { + file_name: name.to_string(), + file_size: 1024, + row_count, + min_key: vec![], + max_key: vec![], + key_stats: BinaryTableStats::empty(), + value_stats: BinaryTableStats::empty(), + min_sequence_number: 0, + max_sequence_number: 0, + schema_id: 0, + level: 0, + extra_files: vec![], + creation_time: Some( + "2024-09-06T07:45:55.039+00:00" + .parse::>() + .unwrap(), + ), + delete_row_count: Some(0), + embedded_index: None, + first_row_id: None, + write_cols: None, + external_path: None, + file_source: None, + value_stats_cols: None, + } + } + + async fn setup_dirs(file_io: &FileIO, table_path: &str) { + file_io + .mkdirs(&format!("{table_path}/snapshot/")) + .await + .unwrap(); + file_io + .mkdirs(&format!("{table_path}/manifest/")) + .await + .unwrap(); + } + + fn partition_bytes(pt: &str) -> Vec { + let mut builder = BinaryRowBuilder::new(1); + if pt.len() <= 7 { + builder.write_string_inline(0, pt); + } else { + builder.write_string(0, pt); + } + builder.build_serialized() + } + + fn manifest_merger<'a>( + file_io: &'a FileIO, + manifest_dir: &'a str, + partition_fields: &'a [DataField], + schema_id: i64, + options: HashMap, + ) -> ManifestFileMerger<'a> { + let core_options = crate::spec::CoreOptions::new(&options); + ManifestFileMerger::new( + file_io, + manifest_dir, + partition_fields, + schema_id, + core_options.manifest_target_file_size(), + core_options.manifest_full_compaction_threshold_size(), + core_options.manifest_merge_min_count(), + ) + } + + async fn write_manifest( + merger: &ManifestFileMerger<'_>, + name: &str, + entries: &[ManifestEntry], + ) -> ManifestFileMeta { + merger + .write_manifest_file(&format!("{}/{}", merger.manifest_dir, name), name, entries) + .await + .unwrap() + } + + #[test] + fn test_merge_entries_preserves_unmatched_delete() { + let merged = merge_entries(vec![ + ManifestEntry::new( + FileKind::Delete, + vec![], + 0, + 1, + test_data_file("deleted-in-base.parquet", 10), + 2, + ), + ManifestEntry::new( + FileKind::Add, + vec![], + 0, + 1, + test_data_file("unrelated.parquet", 5), + 2, + ), + ]) + .unwrap(); + + assert_eq!(merged.len(), 2); + assert!(merged.iter().any(|entry| { + *entry.kind() == FileKind::Delete && entry.file().file_name == "deleted-in-base.parquet" + })); + } + + #[tokio::test] + async fn test_minor_merge_preserves_unmatched_delete_entries() { + let file_io = test_file_io(); + let table_path = "memory:/test_minor_merge_preserves_unmatched_delete_entries"; + setup_dirs(&file_io, table_path).await; + let schema = test_schema(); + let partition_fields = schema.partition_fields(); + let manifest_dir = format!("{table_path}/manifest"); + let merger = manifest_merger( + &file_io, + &manifest_dir, + &partition_fields, + schema.id(), + HashMap::from([("manifest.merge-min-count".to_string(), "2".to_string())]), + ); + + let deleted_file = test_data_file("deleted-in-base.parquet", 10); + let delete_meta = write_manifest( + &merger, + "manifest-delete-only-0", + &[ManifestEntry::new( + FileKind::Delete, + vec![], + 0, + 1, + deleted_file.clone(), + 2, + )], + ) + .await; + let add_meta = write_manifest( + &merger, + "manifest-add-only-0", + &[ManifestEntry::new( + FileKind::Add, + vec![], + 0, + 1, + test_data_file("unrelated.parquet", 5), + 2, + )], + ) + .await; + + let compacted = merger.merge(vec![delete_meta, add_meta]).await.unwrap(); + assert_eq!(compacted.len(), 1); + + let compacted_entries = Manifest::read( + &file_io, + &format!("{}/{}", merger.manifest_dir, compacted[0].file_name()), + ) + .await + .unwrap(); + assert_eq!(compacted_entries.len(), 2); + assert!(compacted_entries.iter().any(|entry| { + *entry.kind() == FileKind::Delete && entry.file().file_name == deleted_file.file_name + })); + } + + #[tokio::test] + async fn test_full_manifest_compaction_filters_delete_entries() { + let file_io = test_file_io(); + let table_path = "memory:/test_full_manifest_compaction_filters_delete_entries"; + setup_dirs(&file_io, table_path).await; + let schema = test_schema(); + let partition_fields = schema.partition_fields(); + let manifest_dir = format!("{table_path}/manifest"); + let merger = manifest_merger( + &file_io, + &manifest_dir, + &partition_fields, + schema.id(), + HashMap::from([ + ( + "manifest.full-compaction-threshold-size".to_string(), + "1B".to_string(), + ), + ("manifest.target-file-size".to_string(), "1B".to_string()), + ]), + ); + + let delete_meta = write_manifest( + &merger, + "manifest-delete-only-0", + &[ManifestEntry::new( + FileKind::Delete, + vec![], + 0, + 1, + test_data_file("deleted-in-base.parquet", 10), + 2, + )], + ) + .await; + let add_meta = write_manifest( + &merger, + "manifest-add-only-0", + &[ManifestEntry::new( + FileKind::Add, + vec![], + 0, + 1, + test_data_file("unrelated.parquet", 5), + 2, + )], + ) + .await; + + let compacted = merger + .merge(vec![delete_meta, add_meta.clone()]) + .await + .unwrap(); + assert_eq!(compacted.len(), 1); + assert_eq!(compacted[0].file_name(), add_meta.file_name()); + } + + #[tokio::test] + async fn test_full_manifest_compaction_skips_unaffected_base_manifests() { + let file_io = test_file_io(); + let table_path = "memory:/test_full_manifest_compaction_skips_unaffected_base_manifests"; + setup_dirs(&file_io, table_path).await; + let schema = test_partitioned_schema(); + let partition_fields = schema.partition_fields(); + let manifest_dir = format!("{table_path}/manifest"); + let merger = manifest_merger( + &file_io, + &manifest_dir, + &partition_fields, + schema.id(), + HashMap::from([ + ( + "manifest.full-compaction-threshold-size".to_string(), + "1B".to_string(), + ), + ("manifest.target-file-size".to_string(), "1B".to_string()), + ]), + ); + + let keep_meta = write_manifest( + &merger, + "manifest-keep-0", + &[ManifestEntry::new( + FileKind::Add, + partition_bytes("keep"), + 0, + 1, + test_data_file("keep.parquet", 10), + 2, + )], + ) + .await; + + let drop_file = test_data_file("drop.parquet", 10); + let drop_meta = write_manifest( + &merger, + "manifest-drop-0", + &[ManifestEntry::new( + FileKind::Add, + partition_bytes("drop"), + 0, + 1, + drop_file.clone(), + 2, + )], + ) + .await; + let delete_meta = write_manifest( + &merger, + "manifest-delete-0", + &[ManifestEntry::new( + FileKind::Delete, + partition_bytes("drop"), + 0, + 1, + drop_file, + 2, + )], + ) + .await; + + let compacted = merger + .merge(vec![keep_meta.clone(), drop_meta, delete_meta]) + .await + .unwrap(); + assert_eq!(compacted.len(), 1); + assert_eq!(compacted[0].file_name(), keep_meta.file_name()); + } +} diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index 3158ffca..5b919af1 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -39,6 +39,7 @@ pub(crate) mod global_index_scanner; mod kv_file_reader; mod kv_file_writer; mod lumina_index_build_builder; +mod manifest_file_merger; pub(crate) mod merge_tree_split_generator; mod partition_filter; mod postpone_file_writer; diff --git a/crates/paimon/src/table/table_commit.rs b/crates/paimon/src/table/table_commit.rs index 9d91094d..320e1153 100644 --- a/crates/paimon/src/table/table_commit.rs +++ b/crates/paimon/src/table/table_commit.rs @@ -24,11 +24,12 @@ use crate::io::FileIO; use crate::spec::stats::BinaryTableStats; use crate::spec::FileKind; use crate::spec::{ - datums_to_binary_row, extract_datum, merge_entries, BinaryRow, BinaryRowBuilder, CommitKind, - CoreOptions, DataType, Datum, IndexManifest, IndexManifestEntry, Manifest, ManifestEntry, - ManifestFileMeta, ManifestList, PartitionStatistics, Snapshot, + datums_to_binary_row, extract_datum, BinaryRow, BinaryRowBuilder, CommitKind, CoreOptions, + DataType, Datum, IndexManifest, IndexManifestEntry, Manifest, ManifestEntry, ManifestFileMeta, + ManifestList, PartitionStatistics, Snapshot, }; use crate::table::commit_message::CommitMessage; +use crate::table::manifest_file_merger::ManifestFileMerger; use crate::table::partition_filter::PartitionFilter; use crate::table::snapshot_commit::SnapshotCommit; use crate::table::{SnapshotManager, Table, TableScan}; @@ -560,302 +561,18 @@ impl TableCommit { manifest_dir: &str, manifest_files: Vec, ) -> Result> { - if manifest_files.len() <= 1 { - return Ok(manifest_files); - } - - if let Some(compacted) = self - .full_compact_manifest_files(file_io, manifest_dir, &manifest_files) - .await? - { - return Ok(compacted); - } - - self.minor_compact_manifest_files(file_io, manifest_dir, manifest_files) - .await - } - - fn should_full_compact_manifests(&self, manifest_files: &[ManifestFileMeta]) -> bool { - let delta_size: i64 = manifest_files - .iter() - .filter(|file| { - file.num_deleted_files() > 0 || file.file_size() < self.manifest_target_file_size - }) - .map(ManifestFileMeta::file_size) - .sum(); - delta_size >= self.manifest_full_compaction_threshold_size - } - - async fn full_compact_manifest_files( - &self, - file_io: &FileIO, - manifest_dir: &str, - manifest_files: &[ManifestFileMeta], - ) -> Result>> { - if !self.should_full_compact_manifests(manifest_files) { - return Ok(None); - } - - let delete_identifiers = self - .read_deleted_manifest_identifiers(file_io, manifest_dir, manifest_files) - .await?; - let delete_partitions: HashSet> = delete_identifiers - .iter() - .map(|identifier| identifier.partition.clone()) - .collect(); - - let mut result = Vec::new(); - let mut candidates = Vec::new(); - for manifest_file in manifest_files { - let must_change = self.manifest_must_change(manifest_file); - let affected_by_deletes = - self.manifest_may_contain_deleted_partitions(manifest_file, &delete_partitions); - if must_change || affected_by_deletes { - candidates.push(manifest_file.clone()); - } else { - result.push(manifest_file.clone()); - } - } - - if candidates.len() <= 1 { - return Ok(None); - } - - let mut merged_entries = Vec::new(); - for manifest_file in candidates { - let (require_change, entries) = self - .read_manifest_for_full_compaction( - file_io, - manifest_dir, - &manifest_file, - &delete_identifiers, - ) - .await?; - - if require_change { - merged_entries.extend(entries); - } else { - result.push(manifest_file); - } - } - - result.extend( - self.write_compacted_manifest_entries(file_io, manifest_dir, &merged_entries) - .await?, - ); - Ok(Some(result)) - } - - async fn read_manifest_for_full_compaction( - &self, - file_io: &FileIO, - manifest_dir: &str, - manifest_file: &ManifestFileMeta, - delete_identifiers: &HashSet, - ) -> Result<(bool, Vec)> { - let path = format!("{manifest_dir}/{}", manifest_file.file_name()); - let mut require_change = self.manifest_must_change(manifest_file); - let mut entries = Vec::new(); - - for entry in Manifest::read(file_io, &path).await? { - if *entry.kind() != FileKind::Add { - continue; - } - if delete_identifiers.contains(&entry.identifier()) { - require_change = true; - } else { - entries.push(entry); - } - } - - Ok((require_change, entries)) - } - - async fn read_deleted_manifest_identifiers( - &self, - file_io: &FileIO, - manifest_dir: &str, - manifest_files: &[ManifestFileMeta], - ) -> Result> { - let mut identifiers = HashSet::new(); - for manifest_file in manifest_files { - if manifest_file.num_deleted_files() == 0 { - continue; - } - let path = format!("{manifest_dir}/{}", manifest_file.file_name()); - for entry in Manifest::read(file_io, &path).await? { - if *entry.kind() == FileKind::Delete { - identifiers.insert(entry.into_identifier()); - } - } - } - Ok(identifiers) - } - - fn manifest_must_change(&self, manifest_file: &ManifestFileMeta) -> bool { - manifest_file.num_deleted_files() > 0 - || manifest_file.file_size() < self.manifest_target_file_size - } - - fn manifest_may_contain_deleted_partitions( - &self, - manifest_file: &ManifestFileMeta, - delete_partitions: &HashSet>, - ) -> bool { - if delete_partitions.is_empty() { - return false; - } - let partition_fields = self.table.schema().partition_fields(); - if partition_fields.is_empty() { - return true; - } - - delete_partitions.iter().any(|partition| { - self.partition_may_match_manifest_stats( - partition, - manifest_file.partition_stats(), - &partition_fields, - ) - }) - } - - fn partition_may_match_manifest_stats( - &self, - partition: &[u8], - stats: &BinaryTableStats, - partition_fields: &[crate::spec::DataField], - ) -> bool { - let Ok(partition_row) = BinaryRow::from_serialized_bytes(partition) else { - return true; - }; - let Ok(min_row) = BinaryRow::from_serialized_bytes(stats.min_values()) else { - return true; - }; - let Ok(max_row) = BinaryRow::from_serialized_bytes(stats.max_values()) else { - return true; - }; - if partition_row.arity() < partition_fields.len() as i32 - || min_row.arity() < partition_fields.len() as i32 - || max_row.arity() < partition_fields.len() as i32 - { - return true; - } - - for (idx, field) in partition_fields.iter().enumerate() { - let data_type = field.data_type(); - let Ok(partition_datum) = extract_datum(&partition_row, idx, data_type) else { - return true; - }; - let Ok(min_datum) = extract_datum(&min_row, idx, data_type) else { - return true; - }; - let Ok(max_datum) = extract_datum(&max_row, idx, data_type) else { - return true; - }; - - match partition_datum { - Some(datum) => { - let (Some(min), Some(max)) = (min_datum, max_datum) else { - return true; - }; - if datum < min || datum > max { - return false; - } - } - None => { - if matches!(stats.null_counts().get(idx), Some(Some(0))) { - return false; - } - } - } - } - - true - } - - async fn minor_compact_manifest_files( - &self, - file_io: &FileIO, - manifest_dir: &str, - manifest_files: Vec, - ) -> Result> { - let mut result = Vec::new(); - let mut candidates = Vec::new(); - let mut total_size = 0; - - for manifest_file in manifest_files { - total_size += manifest_file.file_size(); - candidates.push(manifest_file); - if total_size >= self.manifest_target_file_size { - let merged = self - .merge_manifest_candidates(file_io, manifest_dir, &candidates) - .await?; - result.extend(merged); - candidates.clear(); - total_size = 0; - } - } - - if candidates.len() >= self.manifest_merge_min_count { - let merged = self - .merge_manifest_candidates(file_io, manifest_dir, &candidates) - .await?; - result.extend(merged); - } else { - result.extend(candidates); - } - - Ok(result) - } - - async fn merge_manifest_candidates( - &self, - file_io: &FileIO, - manifest_dir: &str, - candidates: &[ManifestFileMeta], - ) -> Result> { - if candidates.len() == 1 { - return Ok(vec![candidates[0].clone()]); - } - - let merged_entries = self - .merge_manifest_candidate_entries(file_io, manifest_dir, candidates) - .await?; - self.write_compacted_manifest_entries(file_io, manifest_dir, &merged_entries) - .await - } - - async fn merge_manifest_candidate_entries( - &self, - file_io: &FileIO, - manifest_dir: &str, - manifest_files: &[ManifestFileMeta], - ) -> Result> { - let mut entries = Vec::new(); - for manifest_file in manifest_files { - let path = format!("{manifest_dir}/{}", manifest_file.file_name()); - entries.extend(Manifest::read(file_io, &path).await?); - } - merge_entries(entries) - } - - async fn write_compacted_manifest_entries( - &self, - file_io: &FileIO, - manifest_dir: &str, - entries: &[ManifestEntry], - ) -> Result> { - if entries.is_empty() { - return Ok(vec![]); - } - - let manifest_name = format!("manifest-{}-0", uuid::Uuid::new_v4()); - let manifest_path = format!("{manifest_dir}/{manifest_name}"); - let manifest_meta = self - .write_manifest_file(file_io, &manifest_path, &manifest_name, entries) - .await?; - Ok(vec![manifest_meta]) + ManifestFileMerger::new( + file_io, + manifest_dir, + &partition_fields, + self.table.schema().id(), + self.manifest_target_file_size, + self.manifest_full_compaction_threshold_size, + self.manifest_merge_min_count, + ) + .merge(manifest_files) + .await } /// Write an index manifest file from already-merged entries. @@ -2119,12 +1836,6 @@ mod tests { .await .unwrap(); - assert!(commit.should_full_compact_manifests(&[ - keep_meta.clone(), - drop_meta.clone(), - delete_meta.clone(), - ])); - let compacted = commit .compact_manifest_files_if_needed( &file_io, @@ -2148,7 +1859,11 @@ mod tests { let table_path = "memory:/test_manifest_compaction_preserves_unmatched_delete_entries"; setup_dirs(&file_io, table_path).await; - let commit = setup_commit(&file_io, table_path); + let commit = setup_commit_with_options( + &file_io, + table_path, + HashMap::from([("manifest.merge-min-count".to_string(), "2".to_string())]), + ); let manifest_dir = format!("{table_path}/manifest"); let deleted_file = test_data_file("deleted-in-base.parquet", 10); @@ -2183,7 +1898,7 @@ mod tests { .unwrap(); let compacted = commit - .merge_manifest_candidates(&file_io, &manifest_dir, &[delete_meta, add_meta]) + .compact_manifest_files_if_needed(&file_io, &manifest_dir, vec![delete_meta, add_meta]) .await .unwrap(); assert_eq!(compacted.len(), 1); From 51e6f6e0f2ef7d0d58375feb189f42d5d6ea2dbf Mon Sep 17 00:00:00 2001 From: Aitozi Date: Sat, 20 Jun 2026 12:45:47 +0800 Subject: [PATCH 4/5] test: move manifest compaction coverage --- .../paimon/src/table/manifest_file_merger.rs | 53 +++ crates/paimon/src/table/table_commit.rs | 337 ------------------ 2 files changed, 53 insertions(+), 337 deletions(-) diff --git a/crates/paimon/src/table/manifest_file_merger.rs b/crates/paimon/src/table/manifest_file_merger.rs index bebce3fc..143c36a3 100644 --- a/crates/paimon/src/table/manifest_file_merger.rs +++ b/crates/paimon/src/table/manifest_file_merger.rs @@ -663,6 +663,59 @@ mod tests { })); } + #[tokio::test] + async fn test_minor_merge_keeps_tail_below_merge_min_count() { + let file_io = test_file_io(); + let table_path = "memory:/test_minor_merge_keeps_tail_below_merge_min_count"; + setup_dirs(&file_io, table_path).await; + let schema = test_schema(); + let partition_fields = schema.partition_fields(); + let manifest_dir = format!("{table_path}/manifest"); + let merger = manifest_merger( + &file_io, + &manifest_dir, + &partition_fields, + schema.id(), + HashMap::from([ + ("manifest.merge-min-count".to_string(), "3".to_string()), + ("manifest.target-file-size".to_string(), "1GB".to_string()), + ]), + ); + + let first_meta = write_manifest( + &merger, + "manifest-first-0", + &[ManifestEntry::new( + FileKind::Add, + vec![], + 0, + 1, + test_data_file("first.parquet", 5), + 2, + )], + ) + .await; + let second_meta = write_manifest( + &merger, + "manifest-second-0", + &[ManifestEntry::new( + FileKind::Add, + vec![], + 0, + 1, + test_data_file("second.parquet", 5), + 2, + )], + ) + .await; + + let merged = merger + .merge(vec![first_meta.clone(), second_meta.clone()]) + .await + .unwrap(); + assert_eq!(merged, vec![first_meta, second_meta]); + } + #[tokio::test] async fn test_full_manifest_compaction_filters_delete_entries() { let file_io = test_file_io(); diff --git a/crates/paimon/src/table/table_commit.rs b/crates/paimon/src/table/table_commit.rs index 320e1153..c5956a42 100644 --- a/crates/paimon/src/table/table_commit.rs +++ b/crates/paimon/src/table/table_commit.rs @@ -1547,35 +1547,11 @@ mod tests { TableCommit::new(table, "test-user".to_string()) } - fn setup_commit_with_options( - file_io: &FileIO, - table_path: &str, - options: HashMap, - ) -> TableCommit { - let table = test_table_with_options(file_io, table_path, options); - TableCommit::new(table, "test-user".to_string()) - } - fn setup_partitioned_commit(file_io: &FileIO, table_path: &str) -> TableCommit { let table = test_partitioned_table(file_io, table_path); TableCommit::new(table, "test-user".to_string()) } - fn setup_partitioned_commit_with_options( - file_io: &FileIO, - table_path: &str, - options: HashMap, - ) -> TableCommit { - let table = Table::new( - file_io.clone(), - Identifier::new("default", "test_table"), - table_path.to_string(), - test_partitioned_schema().copy_with_options(options), - None, - ); - TableCommit::new(table, "test-user".to_string()) - } - fn partition_bytes(pt: &str) -> Vec { let mut builder = BinaryRowBuilder::new(1); if pt.len() <= 7 { @@ -1671,319 +1647,6 @@ mod tests { assert_eq!(snapshot.delta_record_count(), Some(200)); } - #[tokio::test] - async fn test_commit_keeps_manifest_tail_below_default_merge_min_count() { - let file_io = test_file_io(); - let table_path = "memory:/test_commit_keeps_manifest_tail_below_default_merge_min_count"; - setup_dirs(&file_io, table_path).await; - - let commit = setup_commit(&file_io, table_path); - for i in 0..6 { - commit - .commit(vec![CommitMessage::new( - vec![], - 0, - vec![test_data_file(&format!("data-{i}.parquet"), 10)], - )]) - .await - .unwrap(); - } - - let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); - let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); - assert_eq!(snapshot.id(), 6); - assert_eq!(snapshot.total_record_count(), Some(60)); - - let manifest_dir = format!("{table_path}/manifest"); - let base_path = format!("{manifest_dir}/{}", snapshot.base_manifest_list()); - let base_metas = ManifestList::read(&file_io, &base_path).await.unwrap(); - assert_eq!( - base_metas.len(), - 5, - "default manifest.merge-min-count=30 should keep a small tail unchanged" - ); - - let delta_path = format!("{manifest_dir}/{}", snapshot.delta_manifest_list()); - let delta_metas = ManifestList::read(&file_io, &delta_path).await.unwrap(); - assert_eq!(delta_metas.len(), 1); - } - - #[tokio::test] - async fn test_commit_compacts_manifest_tail_when_merge_min_count_is_reached() { - let file_io = test_file_io(); - let table_path = - "memory:/test_commit_compacts_manifest_tail_when_merge_min_count_is_reached"; - setup_dirs(&file_io, table_path).await; - - let commit = setup_commit_with_options( - &file_io, - table_path, - HashMap::from([("manifest.merge-min-count".to_string(), "3".to_string())]), - ); - for i in 0..4 { - commit - .commit(vec![CommitMessage::new( - vec![], - 0, - vec![test_data_file(&format!("data-{i}.parquet"), 10)], - )]) - .await - .unwrap(); - } - - let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); - let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); - assert_eq!(snapshot.id(), 4); - assert_eq!(snapshot.total_record_count(), Some(40)); - - let manifest_dir = format!("{table_path}/manifest"); - let base_path = format!("{manifest_dir}/{}", snapshot.base_manifest_list()); - let base_metas = ManifestList::read(&file_io, &base_path).await.unwrap(); - assert_eq!( - base_metas.len(), - 1, - "manifest.merge-min-count=3 should merge the tail of 3 manifests" - ); - - let compacted_entries = Manifest::read( - &file_io, - &format!("{manifest_dir}/{}", base_metas[0].file_name()), - ) - .await - .unwrap(); - assert_eq!(compacted_entries.len(), 3); - assert!( - !base_metas[0].file_name().ends_with("-compact"), - "compacted manifest file names should match Java-style manifest--" - ); - - let delta_path = format!("{manifest_dir}/{}", snapshot.delta_manifest_list()); - let delta_metas = ManifestList::read(&file_io, &delta_path).await.unwrap(); - assert_eq!(delta_metas.len(), 1); - } - - #[tokio::test] - async fn test_full_manifest_compaction_skips_unaffected_base_manifests() { - let file_io = test_file_io(); - let table_path = "memory:/test_full_manifest_compaction_skips_unaffected_base_manifests"; - setup_dirs(&file_io, table_path).await; - - let commit = setup_partitioned_commit_with_options( - &file_io, - table_path, - HashMap::from([ - ( - "manifest.full-compaction-threshold-size".to_string(), - "1B".to_string(), - ), - ("manifest.target-file-size".to_string(), "1B".to_string()), - ]), - ); - let manifest_dir = format!("{table_path}/manifest"); - - let keep_entry = ManifestEntry::new( - FileKind::Add, - partition_bytes("keep"), - 0, - 1, - test_data_file("keep.parquet", 10), - 2, - ); - let keep_meta = commit - .write_manifest_file( - &file_io, - &format!("{manifest_dir}/manifest-keep-0"), - "manifest-keep-0", - &[keep_entry], - ) - .await - .unwrap(); - - let drop_file = test_data_file("drop.parquet", 10); - let drop_entry = ManifestEntry::new( - FileKind::Add, - partition_bytes("drop"), - 0, - 1, - drop_file.clone(), - 2, - ); - let drop_meta = commit - .write_manifest_file( - &file_io, - &format!("{manifest_dir}/manifest-drop-0"), - "manifest-drop-0", - &[drop_entry], - ) - .await - .unwrap(); - - let delete_entry = ManifestEntry::new( - FileKind::Delete, - partition_bytes("drop"), - 0, - 1, - drop_file, - 2, - ); - let delete_meta = commit - .write_manifest_file( - &file_io, - &format!("{manifest_dir}/manifest-delete-0"), - "manifest-delete-0", - &[delete_entry], - ) - .await - .unwrap(); - - let compacted = commit - .compact_manifest_files_if_needed( - &file_io, - &manifest_dir, - vec![keep_meta.clone(), drop_meta, delete_meta], - ) - .await - .unwrap(); - - assert_eq!( - compacted.len(), - 1, - "full compaction should preserve unaffected base manifests and drop fully canceled candidates" - ); - assert_eq!(compacted[0].file_name(), keep_meta.file_name()); - } - - #[tokio::test] - async fn test_manifest_compaction_preserves_unmatched_delete_entries() { - let file_io = test_file_io(); - let table_path = "memory:/test_manifest_compaction_preserves_unmatched_delete_entries"; - setup_dirs(&file_io, table_path).await; - - let commit = setup_commit_with_options( - &file_io, - table_path, - HashMap::from([("manifest.merge-min-count".to_string(), "2".to_string())]), - ); - let manifest_dir = format!("{table_path}/manifest"); - - let deleted_file = test_data_file("deleted-in-base.parquet", 10); - let delete_entry = - ManifestEntry::new(FileKind::Delete, vec![], 0, 1, deleted_file.clone(), 2); - let delete_meta = commit - .write_manifest_file( - &file_io, - &format!("{manifest_dir}/manifest-delete-only-0"), - "manifest-delete-only-0", - &[delete_entry], - ) - .await - .unwrap(); - - let add_entry = ManifestEntry::new( - FileKind::Add, - vec![], - 0, - 1, - test_data_file("unrelated.parquet", 5), - 2, - ); - let add_meta = commit - .write_manifest_file( - &file_io, - &format!("{manifest_dir}/manifest-add-only-0"), - "manifest-add-only-0", - &[add_entry], - ) - .await - .unwrap(); - - let compacted = commit - .compact_manifest_files_if_needed(&file_io, &manifest_dir, vec![delete_meta, add_meta]) - .await - .unwrap(); - assert_eq!(compacted.len(), 1); - - let compacted_entries = Manifest::read( - &file_io, - &format!("{manifest_dir}/{}", compacted[0].file_name()), - ) - .await - .unwrap(); - assert_eq!(compacted_entries.len(), 2); - assert!( - compacted_entries.iter().any(|entry| { - *entry.kind() == FileKind::Delete && entry.file().file_name == deleted_file.file_name - }), - "Java FileEntry.mergeEntries keeps unmatched DELETE entries because the ADD can live in an older manifest" - ); - } - - #[tokio::test] - async fn test_full_manifest_compaction_filters_delete_entries() { - let file_io = test_file_io(); - let table_path = "memory:/test_full_manifest_compaction_filters_delete_entries"; - setup_dirs(&file_io, table_path).await; - - let commit = setup_commit_with_options( - &file_io, - table_path, - HashMap::from([ - ( - "manifest.full-compaction-threshold-size".to_string(), - "1B".to_string(), - ), - ("manifest.target-file-size".to_string(), "1B".to_string()), - ]), - ); - let manifest_dir = format!("{table_path}/manifest"); - - let deleted_file = test_data_file("deleted-in-base.parquet", 10); - let delete_entry = - ManifestEntry::new(FileKind::Delete, vec![], 0, 1, deleted_file.clone(), 2); - let delete_meta = commit - .write_manifest_file( - &file_io, - &format!("{manifest_dir}/manifest-delete-only-0"), - "manifest-delete-only-0", - &[delete_entry], - ) - .await - .unwrap(); - - let add_entry = ManifestEntry::new( - FileKind::Add, - vec![], - 0, - 1, - test_data_file("unrelated.parquet", 5), - 2, - ); - let add_meta = commit - .write_manifest_file( - &file_io, - &format!("{manifest_dir}/manifest-add-only-0"), - "manifest-add-only-0", - &[add_entry], - ) - .await - .unwrap(); - - let compacted = commit - .compact_manifest_files_if_needed( - &file_io, - &manifest_dir, - vec![delete_meta, add_meta.clone()], - ) - .await - .unwrap(); - assert_eq!( - compacted.len(), - 1, - "full compaction should remove delete-only manifests instead of writing DELETE entries back" - ); - assert_eq!(compacted[0].file_name(), add_meta.file_name()); - } - #[tokio::test] async fn test_empty_commit_is_noop() { let file_io = test_file_io(); From 2782e68c2484f8330f5c831fbd6a43e985feb431 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Sat, 20 Jun 2026 13:23:09 +0800 Subject: [PATCH 5/5] fix: avoid manifest merger check failure --- .../paimon/src/table/manifest_file_merger.rs | 96 ++++++++++++++++--- 1 file changed, 84 insertions(+), 12 deletions(-) diff --git a/crates/paimon/src/table/manifest_file_merger.rs b/crates/paimon/src/table/manifest_file_merger.rs index 143c36a3..d2c0d5ab 100644 --- a/crates/paimon/src/table/manifest_file_merger.rs +++ b/crates/paimon/src/table/manifest_file_merger.rs @@ -22,6 +22,7 @@ use crate::spec::{ Manifest, ManifestEntry, ManifestFileMeta, }; use crate::Result; +use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; /// Manifest file merger with Java `ManifestFileMerger`-style full and minor compaction. @@ -110,19 +111,19 @@ impl<'a> ManifestFileMerger<'a> { return Ok(None); } - let mut merged_entries = Vec::new(); + let mut new_files = Vec::new(); for manifest_file in to_be_merged { let read_result = self .read_for_full_compaction(&manifest_file, &delete_identifiers) .await?; if read_result.require_change { - merged_entries.extend(read_result.entries); + new_files.extend(self.write_compacted_entries(&read_result.entries).await?); } else { result.push(read_result.file); } } - result.extend(self.write_compacted_entries(&merged_entries).await?); + result.extend(new_files); Ok(Some(result)) } @@ -370,22 +371,22 @@ fn merge_entry( ) -> Result<()> { let identifier = entry.identifier(); match *entry.kind() { - FileKind::Add => { - if merged_entries.contains_key(&identifier) { + FileKind::Add => match merged_entries.entry(identifier) { + Entry::Vacant(entry_slot) => { + entry_slot.insert(entry); + } + Entry::Occupied(entry_slot) => { return Err(crate::Error::DataInvalid { message: format!( "Trying to add file {:?} which is already in the manifest entry map", - identifier + entry_slot.key() ), source: None, - }); + }) } - merged_entries.insert(identifier, entry); - } + }, FileKind::Delete => { - if merged_entries.contains_key(&identifier) { - merged_entries.remove(&identifier); - } else { + if merged_entries.remove(&identifier).is_none() { merged_entries.insert(identifier, entry); } } @@ -773,6 +774,77 @@ mod tests { assert_eq!(compacted[0].file_name(), add_meta.file_name()); } + #[tokio::test] + async fn test_full_manifest_compaction_writes_each_changed_manifest_without_accumulating() { + let file_io = test_file_io(); + let table_path = + "memory:/test_full_manifest_compaction_writes_each_changed_manifest_without_accumulating"; + setup_dirs(&file_io, table_path).await; + let schema = test_schema(); + let partition_fields = schema.partition_fields(); + let manifest_dir = format!("{table_path}/manifest"); + let merger = manifest_merger( + &file_io, + &manifest_dir, + &partition_fields, + schema.id(), + HashMap::from([ + ( + "manifest.full-compaction-threshold-size".to_string(), + "1B".to_string(), + ), + ("manifest.target-file-size".to_string(), "1GB".to_string()), + ]), + ); + + let first_meta = write_manifest( + &merger, + "manifest-first-0", + &[ManifestEntry::new( + FileKind::Add, + vec![], + 0, + 1, + test_data_file("first.parquet", 5), + 2, + )], + ) + .await; + let second_meta = write_manifest( + &merger, + "manifest-second-0", + &[ManifestEntry::new( + FileKind::Add, + vec![], + 0, + 1, + test_data_file("second.parquet", 5), + 2, + )], + ) + .await; + + let compacted = merger.merge(vec![first_meta, second_meta]).await.unwrap(); + + assert_eq!(compacted.len(), 2); + let mut compacted_file_names = Vec::new(); + for file in &compacted { + let entries = Manifest::read( + &file_io, + &format!("{}/{}", merger.manifest_dir, file.file_name()), + ) + .await + .unwrap(); + assert_eq!(entries.len(), 1); + compacted_file_names.push(entries[0].file().file_name.clone()); + } + compacted_file_names.sort(); + assert_eq!( + compacted_file_names, + vec!["first.parquet".to_string(), "second.parquet".to_string()] + ); + } + #[tokio::test] async fn test_full_manifest_compaction_skips_unaffected_base_manifests() { let file_io = test_file_io();