diff --git a/crates/integrations/datafusion/tests/dynamic_bucket_tables.rs b/crates/integrations/datafusion/tests/dynamic_bucket_tables.rs index 4c8cbcb5..97d7052b 100644 --- a/crates/integrations/datafusion/tests/dynamic_bucket_tables.rs +++ b/crates/integrations/datafusion/tests/dynamic_bucket_tables.rs @@ -121,6 +121,54 @@ async fn collect_partial_update_rows( rows } +async fn collect_aggregation_rows( + sql_context: &paimon_datafusion::SQLContext, + sql: &str, +) -> Vec<(i32, Option, Option, Option)> { + let batches = sql_context.sql(sql).await.unwrap().collect().await.unwrap(); + let mut rows = Vec::new(); + for batch in &batches { + let ids = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let amounts = batch + .column_by_name("amount") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let tags = batch + .column_by_name("tag") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let notes = batch + .column_by_name("note") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + for i in 0..batch.num_rows() { + rows.push(( + ids.value(i), + if amounts.is_null(i) { + None + } else { + Some(amounts.value(i)) + }, + if tags.is_null(i) { + None + } else { + Some(tags.value(i).to_string()) + }, + if notes.is_null(i) { + None + } else { + Some(notes.value(i).to_string()) + }, + )); + } + } + rows.sort_by_key(|row| row.0); + rows +} + #[tokio::test] async fn test_pk_dynamic_bucket_partial_update() { let (_tmp, sql_context) = setup_sql_context().await; @@ -187,6 +235,138 @@ async fn test_pk_dynamic_bucket_partial_update() { ); } +#[tokio::test] +async fn test_pk_dynamic_bucket_aggregation_restores_existing_bucket_after_reload() { + let (_tmp, catalog) = create_test_env(); + let sql_context = create_sql_context(catalog.clone()).await; + sql_context + .sql("CREATE SCHEMA paimon.test_db") + .await + .expect("CREATE SCHEMA failed"); + + sql_context + .sql( + "CREATE TABLE paimon.test_db.t_dyn_agg ( + id INT NOT NULL, amount INT, tag STRING, note STRING, + PRIMARY KEY (id) + ) WITH ( + 'bucket' = '-1', + 'dynamic-bucket.target-row-num' = '1', + 'merge-engine' = 'aggregation', + 'fields.amount.aggregate-function' = 'sum', + 'fields.tag.aggregate-function' = 'listagg', + 'fields.tag.list-agg-delimiter' = '|', + 'fields.default-aggregate-function' = 'last_non_null_value' + )", + ) + .await + .unwrap(); + + sql_context + .sql( + "INSERT INTO paimon.test_db.t_dyn_agg VALUES + (2, 20, 'x', 'old-2'), + (1, 10, 'a', 'old-1')", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let table = catalog + .get_table(&Identifier::new("test_db", "t_dyn_agg")) + .await + .unwrap(); + assert_eq!( + index_bucket_count(&table).await, + 2, + "target row number 1 should create one HASH index bucket per new key" + ); + let id1_bucket = bucket_containing_id(&table, 1).await; + + let reloaded_context = create_sql_context(catalog.clone()).await; + reloaded_context + .sql( + "INSERT INTO paimon.test_db.t_dyn_agg VALUES + (1, 5, 'b', CAST(NULL AS STRING)), + (3, 99, 'solo', 'only-3')", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let table = catalog + .get_table(&Identifier::new("test_db", "t_dyn_agg")) + .await + .unwrap(); + assert_eq!( + bucket_containing_id(&table, 1).await, + id1_bucket, + "reloaded writer should restore the HASH index and route id=1 to its original bucket" + ); + assert_eq!( + index_bucket_count(&table).await, + 3, + "new key id=3 should allocate a third bucket when target row number is 1" + ); + + let reloaded_context = create_sql_context(catalog.clone()).await; + reloaded_context + .sql( + "INSERT INTO paimon.test_db.t_dyn_agg VALUES + (2, 7, CAST(NULL AS STRING), 'new-2'), + (1, CAST(NULL AS INT), 'c', 'new-1')", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let table = catalog + .get_table(&Identifier::new("test_db", "t_dyn_agg")) + .await + .unwrap(); + assert_eq!( + bucket_containing_id(&table, 1).await, + id1_bucket, + "a second reload should still keep duplicate PK writes in the existing bucket" + ); + + let read_context = create_sql_context(catalog).await; + let rows = collect_aggregation_rows( + &read_context, + "SELECT id, amount, tag, note FROM paimon.test_db.t_dyn_agg", + ) + .await; + assert_eq!( + rows, + vec![ + ( + 1, + Some(15), + Some("a|b|c".to_string()), + Some("new-1".to_string()) + ), + ( + 2, + Some(27), + Some("x".to_string()), + Some("new-2".to_string()) + ), + ( + 3, + Some(99), + Some("solo".to_string()), + Some("only-3".to_string()) + ), + ] + ); +} + async fn latest_splits(table: &paimon::Table) -> Vec { table .new_read_builder() diff --git a/crates/integrations/datafusion/tests/pk_tables.rs b/crates/integrations/datafusion/tests/pk_tables.rs index deed4887..4fd38b2e 100644 --- a/crates/integrations/datafusion/tests/pk_tables.rs +++ b/crates/integrations/datafusion/tests/pk_tables.rs @@ -2284,7 +2284,7 @@ async fn test_pk_partial_update_merges_across_tiny_splits() { /// Basic: aggregation engine sums numeric column and concatenates string /// column across overlapping primary keys. #[tokio::test] -async fn test_pk_aggregation_sum_and_listagg_fixed_bucket_e2e() { +async fn test_pk_aggregation_sum_and_listagg_fixed_multi_bucket_e2e() { let (_tmp, sql_context) = setup_sql_context().await; sql_context @@ -2293,7 +2293,7 @@ async fn test_pk_aggregation_sum_and_listagg_fixed_bucket_e2e() { id INT NOT NULL, amount INT, tag STRING, PRIMARY KEY (id) ) WITH ( - 'bucket' = '1', + 'bucket' = '4', 'merge-engine' = 'aggregation', 'fields.amount.aggregate-function' = 'sum', 'fields.tag.aggregate-function' = 'listagg', diff --git a/crates/paimon/src/table/table_write.rs b/crates/paimon/src/table/table_write.rs index 2eaa6d20..af9734d3 100644 --- a/crates/paimon/src/table/table_write.rs +++ b/crates/paimon/src/table/table_write.rs @@ -1149,6 +1149,42 @@ mod tests { .unwrap(); } + #[tokio::test] + async fn test_allows_aggregation_dynamic_bucket_table() { + let file_io = test_file_io(); + let table_path = "memory:/test_aggregation_dynamic_bucket_table"; + setup_dirs(&file_io, table_path).await; + + let table = Table::new( + file_io, + Identifier::new("default", "test_aggregation_dynamic_bucket_table"), + table_path.to_string(), + TableSchema::new( + 0, + &Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .primary_key(["id"]) + .option("bucket", "-1") + .option("merge-engine", "aggregation") + .option("fields.value.aggregate-function", "sum") + .build() + .unwrap(), + ), + None, + ); + + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); + assert!(matches!( + table_write.bucket_assigner, + BucketAssignerEnum::Dynamic(_) + )); + table_write + .write_arrow_batch(&make_batch(vec![1], vec![10])) + .await + .unwrap(); + } + #[tokio::test] async fn test_rejects_partial_update_with_deletion_vectors_when_creating_writer() { let file_io = test_file_io(); diff --git a/docs/src/sql.md b/docs/src/sql.md index 90898a5c..7d68f5fc 100644 --- a/docs/src/sql.md +++ b/docs/src/sql.md @@ -958,6 +958,19 @@ Set via `WITH ('key' = 'value')` at table creation time, or dynamically via `SET |---|---| | `'merge-engine' = 'deduplicate'` | Deduplicate engine (default for PK tables), last write wins | | `'merge-engine' = 'first-row'` | Keeps the first written row | +| `'merge-engine' = 'partial-update'` | Basic partial-update engine for PK tables | +| `'merge-engine' = 'aggregation'` | Basic aggregation engine for PK tables | + +Rust currently supports `merge-engine=aggregation` in basic mode only. It works +with fixed buckets and ordinary dynamic buckets (`'bucket' = '-1'`) when the +primary key includes all partition columns. It supports per-field aggregate +functions such as `sum`, `min`, `max`, value functions, boolean functions, and +`listagg`, plus `fields.default-aggregate-function`. + +This is not full Java feature parity. Aggregation tables do not support retract +rows (`DELETE` / `UPDATE_BEFORE`), deletion vectors, cross-partition dynamic +bucket writes, or advanced aggregation options such as `ignore-retract`, +`distinct`, `nested-key`, `count-limit`, and sequence groups. ### Other Options