diff --git a/crates/paimon/src/spec/core_options.rs b/crates/paimon/src/spec/core_options.rs index a25fa5ca..e26319cc 100644 --- a/crates/paimon/src/spec/core_options.rs +++ b/crates/paimon/src/spec/core_options.rs @@ -45,6 +45,10 @@ const CHANGELOG_FILE_PREFIX_OPTION: &str = "changelog-file.prefix"; const CHANGELOG_FILE_FORMAT_OPTION: &str = "changelog-file.format"; const CHANGELOG_FILE_COMPRESSION_OPTION: &str = "changelog-file.compression"; const CHANGELOG_FILE_STATS_MODE_OPTION: &str = "changelog-file.stats-mode"; +const MANIFEST_TARGET_FILE_SIZE_OPTION: &str = "manifest.target-file-size"; +const MANIFEST_FULL_COMPACTION_THRESHOLD_SIZE_OPTION: &str = + "manifest.full-compaction-threshold-size"; +const MANIFEST_MERGE_MIN_COUNT_OPTION: &str = "manifest.merge-min-count"; const ROW_TRACKING_ENABLED_OPTION: &str = "row-tracking.enabled"; const WRITE_PARQUET_BUFFER_SIZE_OPTION: &str = "write.parquet-buffer-size"; pub(crate) const SEQUENCE_FIELD_OPTION: &str = "sequence.field"; @@ -65,6 +69,9 @@ const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024; const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__"; const DEFAULT_CHANGELOG_FILE_PREFIX: &str = "changelog-"; const DEFAULT_TARGET_FILE_SIZE: i64 = 256 * 1024 * 1024; +const DEFAULT_MANIFEST_TARGET_FILE_SIZE: i64 = 8 * 1024 * 1024; +const DEFAULT_MANIFEST_FULL_COMPACTION_THRESHOLD_SIZE: i64 = 16 * 1024 * 1024; +const DEFAULT_MANIFEST_MERGE_MIN_COUNT: usize = 30; const DEFAULT_WRITE_PARQUET_BUFFER_SIZE: i64 = 256 * 1024 * 1024; const DYNAMIC_BUCKET_TARGET_ROW_NUM_OPTION: &str = "dynamic-bucket.target-row-num"; const DEFAULT_DYNAMIC_BUCKET_TARGET_ROW_NUM: i64 = 200_000; @@ -414,6 +421,37 @@ impl<'a> CoreOptions<'a> { .unwrap_or_else(|| self.target_file_size()) } + /// Suggested file size of a manifest file. + /// + /// Corresponds to Java `CoreOptions.MANIFEST_TARGET_FILE_SIZE`. + pub fn manifest_target_file_size(&self) -> i64 { + self.options + .get(MANIFEST_TARGET_FILE_SIZE_OPTION) + .and_then(|v| parse_memory_size(v)) + .unwrap_or(DEFAULT_MANIFEST_TARGET_FILE_SIZE) + } + + /// Size threshold for triggering full compaction of manifests. + /// + /// Corresponds to Java `CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE`. + pub fn manifest_full_compaction_threshold_size(&self) -> i64 { + self.options + .get(MANIFEST_FULL_COMPACTION_THRESHOLD_SIZE_OPTION) + .and_then(|v| parse_memory_size(v)) + .unwrap_or(DEFAULT_MANIFEST_FULL_COMPACTION_THRESHOLD_SIZE) + } + + /// Minimum number of trailing manifest files to merge. + /// + /// Corresponds to Java `CoreOptions.MANIFEST_MERGE_MIN_COUNT`. + pub fn manifest_merge_min_count(&self) -> usize { + self.options + .get(MANIFEST_MERGE_MIN_COUNT_OPTION) + .and_then(|v| v.parse::().ok()) + .filter(|count| *count > 0) + .unwrap_or(DEFAULT_MANIFEST_MERGE_MIN_COUNT) + } + /// File format for data files (e.g. "parquet", "orc", "avro", "vortex"). /// Default is "parquet". pub fn file_format(&self) -> &str { @@ -619,6 +657,34 @@ mod tests { assert_eq!(parse_memory_size("abc"), None); } + #[test] + fn test_manifest_compaction_options_match_java_defaults_and_overrides() { + let options = HashMap::new(); + let core = CoreOptions::new(&options); + assert_eq!(core.manifest_target_file_size(), 8 * 1024 * 1024); + assert_eq!( + core.manifest_full_compaction_threshold_size(), + 16 * 1024 * 1024 + ); + assert_eq!(core.manifest_merge_min_count(), 30); + + let options = HashMap::from([ + ( + MANIFEST_TARGET_FILE_SIZE_OPTION.to_string(), + "500B".to_string(), + ), + ( + MANIFEST_FULL_COMPACTION_THRESHOLD_SIZE_OPTION.to_string(), + "200B".to_string(), + ), + (MANIFEST_MERGE_MIN_COUNT_OPTION.to_string(), "3".to_string()), + ]); + let core = CoreOptions::new(&options); + assert_eq!(core.manifest_target_file_size(), 500); + assert_eq!(core.manifest_full_compaction_threshold_size(), 200); + assert_eq!(core.manifest_merge_min_count(), 3); + } + #[test] fn test_partition_options_defaults() { let options = HashMap::new(); diff --git a/crates/paimon/src/spec/manifest.rs b/crates/paimon/src/spec/manifest.rs index bebd09e7..09a3cb1b 100644 --- a/crates/paimon/src/spec/manifest.rs +++ b/crates/paimon/src/spec/manifest.rs @@ -68,8 +68,9 @@ impl Manifest { } } -/// Merge ADD/DELETE entries by file identifier, returning only the active ADD set. -/// Mirrors Java [FileEntry.mergeEntries](https://github.com/apache/paimon/blob/release-1.4/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java). +/// Merge ADD/DELETE entries by file identifier for scan-style visibility, +/// returning only the active ADD set. Manifest compaction must preserve +/// unmatched DELETE entries and uses a table-commit-specific merge path. /// Return order is unspecified. pub(crate) fn merge_active_entries(entries: Vec) -> Vec { use std::collections::HashMap; @@ -94,9 +95,39 @@ pub(crate) fn merge_active_entries(entries: Vec) -> Vec ManifestEntry { + let stats = BinaryTableStats::empty(); + let file = DataFileMeta { + file_name: file_name.to_string(), + file_size: 100, + row_count: 10, + min_key: vec![], + max_key: vec![], + key_stats: stats.clone(), + value_stats: stats, + min_sequence_number: 0, + max_sequence_number: 0, + schema_id: 0, + level, + extra_files: vec![], + creation_time: None, + delete_row_count: None, + embedded_index: None, + file_source: None, + value_stats_cols: None, + external_path: None, + first_row_id: None, + write_cols: None, + }; + ManifestEntry::new(kind, vec![], 0, 1, file, 2) + } + #[tokio::test] async fn test_read_manifest_from_file() { let workdir = current_dir().unwrap(); @@ -120,37 +151,6 @@ mod tests { #[test] fn test_merge_active_entries_cancels_add_then_delete() { - use crate::spec::data_file::DataFileMeta; - use crate::spec::stats::BinaryTableStats; - use crate::spec::ManifestEntry; - - fn entry(kind: FileKind, file_name: &str, level: i32) -> ManifestEntry { - let stats = BinaryTableStats::empty(); - let file = DataFileMeta { - file_name: file_name.to_string(), - file_size: 100, - row_count: 10, - min_key: vec![], - max_key: vec![], - key_stats: stats.clone(), - value_stats: stats, - min_sequence_number: 0, - max_sequence_number: 0, - schema_id: 0, - level, - extra_files: vec![], - creation_time: None, - delete_row_count: None, - embedded_index: None, - file_source: None, - value_stats_cols: None, - external_path: None, - first_row_id: None, - write_cols: None, - }; - ManifestEntry::new(kind, vec![], 0, 1, file, 2) - } - let cancelled = merge_active_entries(vec![ entry(FileKind::Add, "f.parquet", 0), entry(FileKind::Delete, "f.parquet", 0), diff --git a/crates/paimon/src/table/manifest_file_merger.rs b/crates/paimon/src/table/manifest_file_merger.rs new file mode 100644 index 00000000..d2c0d5ab --- /dev/null +++ b/crates/paimon/src/table/manifest_file_merger.rs @@ -0,0 +1,919 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::io::FileIO; +use crate::spec::stats::BinaryTableStats; +use crate::spec::{ + extract_datum, BinaryRow, BinaryRowBuilder, DataField, DataType, Datum, FileKind, Identifier, + Manifest, ManifestEntry, ManifestFileMeta, +}; +use crate::Result; +use std::collections::hash_map::Entry; +use std::collections::{HashMap, HashSet}; + +/// Manifest file merger with Java `ManifestFileMerger`-style full and minor compaction. +pub(crate) struct ManifestFileMerger<'a> { + file_io: &'a FileIO, + manifest_dir: &'a str, + partition_fields: &'a [DataField], + schema_id: i64, + target_file_size: i64, + full_compaction_threshold_size: i64, + merge_min_count: usize, +} + +impl<'a> ManifestFileMerger<'a> { + pub(crate) fn new( + file_io: &'a FileIO, + manifest_dir: &'a str, + partition_fields: &'a [DataField], + schema_id: i64, + target_file_size: i64, + full_compaction_threshold_size: i64, + merge_min_count: usize, + ) -> Self { + Self { + file_io, + manifest_dir, + partition_fields, + schema_id, + target_file_size, + full_compaction_threshold_size, + merge_min_count, + } + } + + pub(crate) async fn merge( + &self, + manifest_files: Vec, + ) -> Result> { + if manifest_files.len() <= 1 { + return Ok(manifest_files); + } + + if let Some(compacted) = self.try_full_compaction(&manifest_files).await? { + return Ok(compacted); + } + + self.try_minor_compaction(manifest_files).await + } + + fn should_full_compact(&self, manifest_files: &[ManifestFileMeta]) -> bool { + let delta_size: i64 = manifest_files + .iter() + .filter(|file| self.must_change(file)) + .map(ManifestFileMeta::file_size) + .sum(); + delta_size >= self.full_compaction_threshold_size + } + + async fn try_full_compaction( + &self, + manifest_files: &[ManifestFileMeta], + ) -> Result>> { + if !self.should_full_compact(manifest_files) { + return Ok(None); + } + + let delete_identifiers = self.read_deleted_identifiers(manifest_files).await?; + let delete_partitions: HashSet> = delete_identifiers + .iter() + .map(|identifier| identifier.partition.clone()) + .collect(); + + let mut result = Vec::new(); + let mut to_be_merged = Vec::new(); + for manifest_file in manifest_files { + if self.must_change(manifest_file) + || self.may_contain_deleted_partitions(manifest_file, &delete_partitions) + { + to_be_merged.push(manifest_file.clone()); + } else { + result.push(manifest_file.clone()); + } + } + + if to_be_merged.len() <= 1 { + return Ok(None); + } + + let mut new_files = Vec::new(); + for manifest_file in to_be_merged { + let read_result = self + .read_for_full_compaction(&manifest_file, &delete_identifiers) + .await?; + if read_result.require_change { + new_files.extend(self.write_compacted_entries(&read_result.entries).await?); + } else { + result.push(read_result.file); + } + } + + result.extend(new_files); + Ok(Some(result)) + } + + async fn read_deleted_identifiers( + &self, + manifest_files: &[ManifestFileMeta], + ) -> Result> { + let mut identifiers = HashSet::new(); + for manifest_file in manifest_files { + if manifest_file.num_deleted_files() == 0 { + continue; + } + let path = self.manifest_path(manifest_file.file_name()); + for entry in Manifest::read(self.file_io, &path).await? { + if *entry.kind() == FileKind::Delete { + identifiers.insert(entry.into_identifier()); + } + } + } + Ok(identifiers) + } + + async fn read_for_full_compaction( + &self, + manifest_file: &ManifestFileMeta, + delete_identifiers: &HashSet, + ) -> Result { + let path = self.manifest_path(manifest_file.file_name()); + let mut require_change = self.must_change(manifest_file); + let mut entries = Vec::new(); + + for entry in Manifest::read(self.file_io, &path).await? { + if *entry.kind() != FileKind::Add { + continue; + } + if delete_identifiers.contains(&entry.identifier()) { + require_change = true; + } else { + entries.push(entry); + } + } + + Ok(FullCompactionReadResult { + file: manifest_file.clone(), + require_change, + entries, + }) + } + + fn must_change(&self, manifest_file: &ManifestFileMeta) -> bool { + manifest_file.num_deleted_files() > 0 || manifest_file.file_size() < self.target_file_size + } + + fn may_contain_deleted_partitions( + &self, + manifest_file: &ManifestFileMeta, + delete_partitions: &HashSet>, + ) -> bool { + if delete_partitions.is_empty() { + return false; + } + + if self.partition_fields.is_empty() { + return true; + } + + delete_partitions.iter().any(|partition| { + self.partition_may_match_manifest_stats(partition, manifest_file.partition_stats()) + }) + } + + fn partition_may_match_manifest_stats( + &self, + partition: &[u8], + stats: &BinaryTableStats, + ) -> bool { + let Ok(partition_row) = BinaryRow::from_serialized_bytes(partition) else { + return true; + }; + let Ok(min_row) = BinaryRow::from_serialized_bytes(stats.min_values()) else { + return true; + }; + let Ok(max_row) = BinaryRow::from_serialized_bytes(stats.max_values()) else { + return true; + }; + if partition_row.arity() < self.partition_fields.len() as i32 + || min_row.arity() < self.partition_fields.len() as i32 + || max_row.arity() < self.partition_fields.len() as i32 + { + return true; + } + + for (idx, field) in self.partition_fields.iter().enumerate() { + let data_type = field.data_type(); + let Ok(partition_datum) = extract_datum(&partition_row, idx, data_type) else { + return true; + }; + let Ok(min_datum) = extract_datum(&min_row, idx, data_type) else { + return true; + }; + let Ok(max_datum) = extract_datum(&max_row, idx, data_type) else { + return true; + }; + + match partition_datum { + Some(datum) => { + let (Some(min), Some(max)) = (min_datum, max_datum) else { + return true; + }; + if datum < min || datum > max { + return false; + } + } + None => { + if matches!(stats.null_counts().get(idx), Some(Some(0))) { + return false; + } + } + } + } + + true + } + + async fn try_minor_compaction( + &self, + manifest_files: Vec, + ) -> Result> { + let mut result = Vec::new(); + let mut candidates = Vec::new(); + let mut total_size = 0; + + for manifest_file in manifest_files { + total_size += manifest_file.file_size(); + candidates.push(manifest_file); + if total_size >= self.target_file_size { + let merged = self.merge_candidates(&candidates).await?; + result.extend(merged); + candidates.clear(); + total_size = 0; + } + } + + if candidates.len() >= self.merge_min_count { + let merged = self.merge_candidates(&candidates).await?; + result.extend(merged); + } else { + result.extend(candidates); + } + + Ok(result) + } + + async fn merge_candidates( + &self, + candidates: &[ManifestFileMeta], + ) -> Result> { + if candidates.len() == 1 { + return Ok(vec![candidates[0].clone()]); + } + + let mut entries = Vec::new(); + for manifest_file in candidates { + let path = self.manifest_path(manifest_file.file_name()); + entries.extend(Manifest::read(self.file_io, &path).await?); + } + + let merged_entries = merge_entries(entries)?; + self.write_compacted_entries(&merged_entries).await + } + + async fn write_compacted_entries( + &self, + entries: &[ManifestEntry], + ) -> Result> { + if entries.is_empty() { + return Ok(vec![]); + } + + let manifest_name = format!("manifest-{}-0", uuid::Uuid::new_v4()); + let manifest_path = self.manifest_path(&manifest_name); + let manifest_meta = self + .write_manifest_file(&manifest_path, &manifest_name, entries) + .await?; + Ok(vec![manifest_meta]) + } + + async fn write_manifest_file( + &self, + path: &str, + file_name: &str, + entries: &[ManifestEntry], + ) -> Result { + Manifest::write(self.file_io, path, entries).await?; + + let mut added_file_count: i64 = 0; + let mut deleted_file_count: i64 = 0; + let mut min_bucket: Option = None; + let mut max_bucket: Option = None; + let mut min_level: Option = None; + let mut max_level: Option = None; + for entry in entries { + match entry.kind() { + FileKind::Add => added_file_count += 1, + FileKind::Delete => deleted_file_count += 1, + } + let b = entry.bucket(); + min_bucket = Some(min_bucket.map_or(b, |cur| cur.min(b))); + max_bucket = Some(max_bucket.map_or(b, |cur| cur.max(b))); + let l = entry.file().level; + min_level = Some(min_level.map_or(l, |cur| cur.min(l))); + max_level = Some(max_level.map_or(l, |cur| cur.max(l))); + } + + let status = self.file_io.get_status(path).await?; + let partition_stats = compute_partition_stats(entries, self.partition_fields)?; + + Ok(ManifestFileMeta::new( + file_name.to_string(), + status.size as i64, + added_file_count, + deleted_file_count, + partition_stats, + self.schema_id, + ) + .with_bucket_level_stats(min_bucket, max_bucket, min_level, max_level)) + } + + fn manifest_path(&self, file_name: &str) -> String { + format!("{}/{}", self.manifest_dir, file_name) + } +} + +fn merge_entries(entries: impl IntoIterator) -> Result> { + let mut merged_entries = HashMap::new(); + for entry in entries { + merge_entry(&mut merged_entries, entry)?; + } + Ok(merged_entries.into_values().collect()) +} + +fn merge_entry( + merged_entries: &mut HashMap, + entry: ManifestEntry, +) -> Result<()> { + let identifier = entry.identifier(); + match *entry.kind() { + FileKind::Add => match merged_entries.entry(identifier) { + Entry::Vacant(entry_slot) => { + entry_slot.insert(entry); + } + Entry::Occupied(entry_slot) => { + return Err(crate::Error::DataInvalid { + message: format!( + "Trying to add file {:?} which is already in the manifest entry map", + entry_slot.key() + ), + source: None, + }) + } + }, + FileKind::Delete => { + if merged_entries.remove(&identifier).is_none() { + merged_entries.insert(identifier, entry); + } + } + } + Ok(()) +} + +fn compute_partition_stats( + entries: &[ManifestEntry], + partition_fields: &[DataField], +) -> Result { + let num_fields = partition_fields.len(); + + if num_fields == 0 || entries.is_empty() { + return Ok(BinaryTableStats::empty()); + } + + let data_types: Vec<_> = partition_fields + .iter() + .map(|f| f.data_type().clone()) + .collect(); + let mut mins: Vec> = vec![None; num_fields]; + let mut maxs: Vec> = vec![None; num_fields]; + let mut null_counts: Vec = vec![0; num_fields]; + + for entry in entries { + let partition_bytes = entry.partition(); + if partition_bytes.is_empty() { + continue; + } + let row = BinaryRow::from_serialized_bytes(partition_bytes)?; + for i in 0..num_fields { + match extract_datum(&row, i, &data_types[i])? { + Some(datum) => { + mins[i] = Some(match mins[i].take() { + Some(cur) if cur <= datum => cur, + Some(_) => datum.clone(), + None => datum.clone(), + }); + maxs[i] = Some(match maxs[i].take() { + Some(cur) if cur >= datum => cur, + Some(_) => datum, + None => datum, + }); + } + None => { + null_counts[i] += 1; + } + } + } + } + + let min_bytes = build_partition_stats_row(&mins, &data_types); + let max_bytes = build_partition_stats_row(&maxs, &data_types); + let null_counts = null_counts.into_iter().map(Some).collect(); + + Ok(BinaryTableStats::new(min_bytes, max_bytes, null_counts)) +} + +fn build_partition_stats_row(datums: &[Option], data_types: &[DataType]) -> Vec { + let mut builder = BinaryRowBuilder::new(datums.len() as i32); + for (pos, (datum_opt, data_type)) in datums.iter().zip(data_types.iter()).enumerate() { + match datum_opt { + Some(d) => builder.write_datum(pos, d, data_type), + None => builder.set_null_at(pos), + } + } + builder.build_serialized() +} + +struct FullCompactionReadResult { + file: ManifestFileMeta, + require_change: bool, + entries: Vec, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIOBuilder; + use crate::spec::stats::BinaryTableStats; + use crate::spec::{DataFileMeta, IntType, Schema, TableSchema, VarCharType}; + use chrono::{DateTime, Utc}; + use std::collections::HashMap; + + fn test_file_io() -> FileIO { + FileIOBuilder::new("memory").build().unwrap() + } + + fn test_schema() -> TableSchema { + let schema = Schema::builder() + .column("id", crate::spec::DataType::Int(IntType::new())) + .build() + .unwrap(); + TableSchema::new(0, &schema) + } + + fn test_partitioned_schema() -> TableSchema { + let schema = Schema::builder() + .column( + "pt", + crate::spec::DataType::VarChar(VarCharType::string_type()), + ) + .column("id", crate::spec::DataType::Int(IntType::new())) + .partition_keys(["pt"]) + .build() + .unwrap(); + TableSchema::new(0, &schema) + } + + fn test_data_file(name: &str, row_count: i64) -> DataFileMeta { + DataFileMeta { + file_name: name.to_string(), + file_size: 1024, + row_count, + min_key: vec![], + max_key: vec![], + key_stats: BinaryTableStats::empty(), + value_stats: BinaryTableStats::empty(), + min_sequence_number: 0, + max_sequence_number: 0, + schema_id: 0, + level: 0, + extra_files: vec![], + creation_time: Some( + "2024-09-06T07:45:55.039+00:00" + .parse::>() + .unwrap(), + ), + delete_row_count: Some(0), + embedded_index: None, + first_row_id: None, + write_cols: None, + external_path: None, + file_source: None, + value_stats_cols: None, + } + } + + async fn setup_dirs(file_io: &FileIO, table_path: &str) { + file_io + .mkdirs(&format!("{table_path}/snapshot/")) + .await + .unwrap(); + file_io + .mkdirs(&format!("{table_path}/manifest/")) + .await + .unwrap(); + } + + fn partition_bytes(pt: &str) -> Vec { + let mut builder = BinaryRowBuilder::new(1); + if pt.len() <= 7 { + builder.write_string_inline(0, pt); + } else { + builder.write_string(0, pt); + } + builder.build_serialized() + } + + fn manifest_merger<'a>( + file_io: &'a FileIO, + manifest_dir: &'a str, + partition_fields: &'a [DataField], + schema_id: i64, + options: HashMap, + ) -> ManifestFileMerger<'a> { + let core_options = crate::spec::CoreOptions::new(&options); + ManifestFileMerger::new( + file_io, + manifest_dir, + partition_fields, + schema_id, + core_options.manifest_target_file_size(), + core_options.manifest_full_compaction_threshold_size(), + core_options.manifest_merge_min_count(), + ) + } + + async fn write_manifest( + merger: &ManifestFileMerger<'_>, + name: &str, + entries: &[ManifestEntry], + ) -> ManifestFileMeta { + merger + .write_manifest_file(&format!("{}/{}", merger.manifest_dir, name), name, entries) + .await + .unwrap() + } + + #[test] + fn test_merge_entries_preserves_unmatched_delete() { + let merged = merge_entries(vec![ + ManifestEntry::new( + FileKind::Delete, + vec![], + 0, + 1, + test_data_file("deleted-in-base.parquet", 10), + 2, + ), + ManifestEntry::new( + FileKind::Add, + vec![], + 0, + 1, + test_data_file("unrelated.parquet", 5), + 2, + ), + ]) + .unwrap(); + + assert_eq!(merged.len(), 2); + assert!(merged.iter().any(|entry| { + *entry.kind() == FileKind::Delete && entry.file().file_name == "deleted-in-base.parquet" + })); + } + + #[tokio::test] + async fn test_minor_merge_preserves_unmatched_delete_entries() { + let file_io = test_file_io(); + let table_path = "memory:/test_minor_merge_preserves_unmatched_delete_entries"; + setup_dirs(&file_io, table_path).await; + let schema = test_schema(); + let partition_fields = schema.partition_fields(); + let manifest_dir = format!("{table_path}/manifest"); + let merger = manifest_merger( + &file_io, + &manifest_dir, + &partition_fields, + schema.id(), + HashMap::from([("manifest.merge-min-count".to_string(), "2".to_string())]), + ); + + let deleted_file = test_data_file("deleted-in-base.parquet", 10); + let delete_meta = write_manifest( + &merger, + "manifest-delete-only-0", + &[ManifestEntry::new( + FileKind::Delete, + vec![], + 0, + 1, + deleted_file.clone(), + 2, + )], + ) + .await; + let add_meta = write_manifest( + &merger, + "manifest-add-only-0", + &[ManifestEntry::new( + FileKind::Add, + vec![], + 0, + 1, + test_data_file("unrelated.parquet", 5), + 2, + )], + ) + .await; + + let compacted = merger.merge(vec![delete_meta, add_meta]).await.unwrap(); + assert_eq!(compacted.len(), 1); + + let compacted_entries = Manifest::read( + &file_io, + &format!("{}/{}", merger.manifest_dir, compacted[0].file_name()), + ) + .await + .unwrap(); + assert_eq!(compacted_entries.len(), 2); + assert!(compacted_entries.iter().any(|entry| { + *entry.kind() == FileKind::Delete && entry.file().file_name == deleted_file.file_name + })); + } + + #[tokio::test] + async fn test_minor_merge_keeps_tail_below_merge_min_count() { + let file_io = test_file_io(); + let table_path = "memory:/test_minor_merge_keeps_tail_below_merge_min_count"; + setup_dirs(&file_io, table_path).await; + let schema = test_schema(); + let partition_fields = schema.partition_fields(); + let manifest_dir = format!("{table_path}/manifest"); + let merger = manifest_merger( + &file_io, + &manifest_dir, + &partition_fields, + schema.id(), + HashMap::from([ + ("manifest.merge-min-count".to_string(), "3".to_string()), + ("manifest.target-file-size".to_string(), "1GB".to_string()), + ]), + ); + + let first_meta = write_manifest( + &merger, + "manifest-first-0", + &[ManifestEntry::new( + FileKind::Add, + vec![], + 0, + 1, + test_data_file("first.parquet", 5), + 2, + )], + ) + .await; + let second_meta = write_manifest( + &merger, + "manifest-second-0", + &[ManifestEntry::new( + FileKind::Add, + vec![], + 0, + 1, + test_data_file("second.parquet", 5), + 2, + )], + ) + .await; + + let merged = merger + .merge(vec![first_meta.clone(), second_meta.clone()]) + .await + .unwrap(); + assert_eq!(merged, vec![first_meta, second_meta]); + } + + #[tokio::test] + async fn test_full_manifest_compaction_filters_delete_entries() { + let file_io = test_file_io(); + let table_path = "memory:/test_full_manifest_compaction_filters_delete_entries"; + setup_dirs(&file_io, table_path).await; + let schema = test_schema(); + let partition_fields = schema.partition_fields(); + let manifest_dir = format!("{table_path}/manifest"); + let merger = manifest_merger( + &file_io, + &manifest_dir, + &partition_fields, + schema.id(), + HashMap::from([ + ( + "manifest.full-compaction-threshold-size".to_string(), + "1B".to_string(), + ), + ("manifest.target-file-size".to_string(), "1B".to_string()), + ]), + ); + + let delete_meta = write_manifest( + &merger, + "manifest-delete-only-0", + &[ManifestEntry::new( + FileKind::Delete, + vec![], + 0, + 1, + test_data_file("deleted-in-base.parquet", 10), + 2, + )], + ) + .await; + let add_meta = write_manifest( + &merger, + "manifest-add-only-0", + &[ManifestEntry::new( + FileKind::Add, + vec![], + 0, + 1, + test_data_file("unrelated.parquet", 5), + 2, + )], + ) + .await; + + let compacted = merger + .merge(vec![delete_meta, add_meta.clone()]) + .await + .unwrap(); + assert_eq!(compacted.len(), 1); + assert_eq!(compacted[0].file_name(), add_meta.file_name()); + } + + #[tokio::test] + async fn test_full_manifest_compaction_writes_each_changed_manifest_without_accumulating() { + let file_io = test_file_io(); + let table_path = + "memory:/test_full_manifest_compaction_writes_each_changed_manifest_without_accumulating"; + setup_dirs(&file_io, table_path).await; + let schema = test_schema(); + let partition_fields = schema.partition_fields(); + let manifest_dir = format!("{table_path}/manifest"); + let merger = manifest_merger( + &file_io, + &manifest_dir, + &partition_fields, + schema.id(), + HashMap::from([ + ( + "manifest.full-compaction-threshold-size".to_string(), + "1B".to_string(), + ), + ("manifest.target-file-size".to_string(), "1GB".to_string()), + ]), + ); + + let first_meta = write_manifest( + &merger, + "manifest-first-0", + &[ManifestEntry::new( + FileKind::Add, + vec![], + 0, + 1, + test_data_file("first.parquet", 5), + 2, + )], + ) + .await; + let second_meta = write_manifest( + &merger, + "manifest-second-0", + &[ManifestEntry::new( + FileKind::Add, + vec![], + 0, + 1, + test_data_file("second.parquet", 5), + 2, + )], + ) + .await; + + let compacted = merger.merge(vec![first_meta, second_meta]).await.unwrap(); + + assert_eq!(compacted.len(), 2); + let mut compacted_file_names = Vec::new(); + for file in &compacted { + let entries = Manifest::read( + &file_io, + &format!("{}/{}", merger.manifest_dir, file.file_name()), + ) + .await + .unwrap(); + assert_eq!(entries.len(), 1); + compacted_file_names.push(entries[0].file().file_name.clone()); + } + compacted_file_names.sort(); + assert_eq!( + compacted_file_names, + vec!["first.parquet".to_string(), "second.parquet".to_string()] + ); + } + + #[tokio::test] + async fn test_full_manifest_compaction_skips_unaffected_base_manifests() { + let file_io = test_file_io(); + let table_path = "memory:/test_full_manifest_compaction_skips_unaffected_base_manifests"; + setup_dirs(&file_io, table_path).await; + let schema = test_partitioned_schema(); + let partition_fields = schema.partition_fields(); + let manifest_dir = format!("{table_path}/manifest"); + let merger = manifest_merger( + &file_io, + &manifest_dir, + &partition_fields, + schema.id(), + HashMap::from([ + ( + "manifest.full-compaction-threshold-size".to_string(), + "1B".to_string(), + ), + ("manifest.target-file-size".to_string(), "1B".to_string()), + ]), + ); + + let keep_meta = write_manifest( + &merger, + "manifest-keep-0", + &[ManifestEntry::new( + FileKind::Add, + partition_bytes("keep"), + 0, + 1, + test_data_file("keep.parquet", 10), + 2, + )], + ) + .await; + + let drop_file = test_data_file("drop.parquet", 10); + let drop_meta = write_manifest( + &merger, + "manifest-drop-0", + &[ManifestEntry::new( + FileKind::Add, + partition_bytes("drop"), + 0, + 1, + drop_file.clone(), + 2, + )], + ) + .await; + let delete_meta = write_manifest( + &merger, + "manifest-delete-0", + &[ManifestEntry::new( + FileKind::Delete, + partition_bytes("drop"), + 0, + 1, + drop_file, + 2, + )], + ) + .await; + + let compacted = merger + .merge(vec![keep_meta.clone(), drop_meta, delete_meta]) + .await + .unwrap(); + assert_eq!(compacted.len(), 1); + assert_eq!(compacted[0].file_name(), keep_meta.file_name()); + } +} diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index 3158ffca..5b919af1 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -39,6 +39,7 @@ pub(crate) mod global_index_scanner; mod kv_file_reader; mod kv_file_writer; mod lumina_index_build_builder; +mod manifest_file_merger; pub(crate) mod merge_tree_split_generator; mod partition_filter; mod postpone_file_writer; diff --git a/crates/paimon/src/table/table_commit.rs b/crates/paimon/src/table/table_commit.rs index 938a0239..c5956a42 100644 --- a/crates/paimon/src/table/table_commit.rs +++ b/crates/paimon/src/table/table_commit.rs @@ -29,6 +29,7 @@ use crate::spec::{ ManifestList, PartitionStatistics, Snapshot, }; use crate::table::commit_message::CommitMessage; +use crate::table::manifest_file_merger::ManifestFileMerger; use crate::table::partition_filter::PartitionFilter; use crate::table::snapshot_commit::SnapshotCommit; use crate::table::{SnapshotManager, Table, TableScan}; @@ -56,6 +57,9 @@ pub struct TableCommit { commit_max_retry_wait_ms: u64, row_tracking_enabled: bool, partition_default_name: String, + manifest_target_file_size: i64, + manifest_full_compaction_threshold_size: i64, + manifest_merge_min_count: usize, } impl TableCommit { @@ -76,6 +80,10 @@ impl TableCommit { let commit_max_retry_wait_ms = core_options.commit_max_retry_wait_ms(); let row_tracking_enabled = core_options.row_tracking_enabled(); let partition_default_name = core_options.partition_default_name().to_string(); + let manifest_target_file_size = core_options.manifest_target_file_size(); + let manifest_full_compaction_threshold_size = + core_options.manifest_full_compaction_threshold_size(); + let manifest_merge_min_count = core_options.manifest_merge_min_count(); Self { table, snapshot_manager, @@ -88,6 +96,9 @@ impl TableCommit { commit_max_retry_wait_ms, row_tracking_enabled, partition_default_name, + manifest_target_file_size, + manifest_full_compaction_threshold_size, + manifest_merge_min_count, } } @@ -505,6 +516,9 @@ impl TableCommit { } else { vec![] }; + let existing_manifest_files = self + .compact_manifest_files_if_needed(file_io, &manifest_dir, existing_manifest_files) + .await?; ManifestList::write(file_io, &base_manifest_list_path, &existing_manifest_files).await?; @@ -541,6 +555,26 @@ impl TableCommit { self.snapshot_commit.commit(&snapshot, &statistics).await } + async fn compact_manifest_files_if_needed( + &self, + file_io: &FileIO, + manifest_dir: &str, + manifest_files: Vec, + ) -> Result> { + let partition_fields = self.table.schema().partition_fields(); + ManifestFileMerger::new( + file_io, + manifest_dir, + &partition_fields, + self.table.schema().id(), + self.manifest_target_file_size, + self.manifest_full_compaction_threshold_size, + self.manifest_merge_min_count, + ) + .merge(manifest_files) + .await + } + /// Write an index manifest file from already-merged entries. /// /// Returns `None` if `merged_index_entries` is empty. @@ -1430,11 +1464,19 @@ mod tests { } fn test_table(file_io: &FileIO, table_path: &str) -> Table { + test_table_with_options(file_io, table_path, HashMap::new()) + } + + fn test_table_with_options( + file_io: &FileIO, + table_path: &str, + options: HashMap, + ) -> Table { Table::new( file_io.clone(), Identifier::new("default", "test_table"), table_path.to_string(), - test_schema(), + test_schema().copy_with_options(options), None, ) }