Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions crates/integrations/datafusion/tests/dynamic_bucket_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,54 @@ async fn collect_partial_update_rows(
rows
}

async fn collect_aggregation_rows(
sql_context: &paimon_datafusion::SQLContext,
sql: &str,
) -> Vec<(i32, Option<i32>, Option<String>, Option<String>)> {
let batches = sql_context.sql(sql).await.unwrap().collect().await.unwrap();
let mut rows = Vec::new();
for batch in &batches {
let ids = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.unwrap();
let amounts = batch
.column_by_name("amount")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.unwrap();
let tags = batch
.column_by_name("tag")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.unwrap();
let notes = batch
.column_by_name("note")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.unwrap();
for i in 0..batch.num_rows() {
rows.push((
ids.value(i),
if amounts.is_null(i) {
None
} else {
Some(amounts.value(i))
},
if tags.is_null(i) {
None
} else {
Some(tags.value(i).to_string())
},
if notes.is_null(i) {
None
} else {
Some(notes.value(i).to_string())
},
));
}
}
rows.sort_by_key(|row| row.0);
rows
}

#[tokio::test]
async fn test_pk_dynamic_bucket_partial_update() {
let (_tmp, sql_context) = setup_sql_context().await;
Expand Down Expand Up @@ -187,6 +235,138 @@ async fn test_pk_dynamic_bucket_partial_update() {
);
}

#[tokio::test]
async fn test_pk_dynamic_bucket_aggregation_restores_existing_bucket_after_reload() {
let (_tmp, catalog) = create_test_env();
let sql_context = create_sql_context(catalog.clone()).await;
sql_context
.sql("CREATE SCHEMA paimon.test_db")
.await
.expect("CREATE SCHEMA failed");

sql_context
.sql(
"CREATE TABLE paimon.test_db.t_dyn_agg (
id INT NOT NULL, amount INT, tag STRING, note STRING,
PRIMARY KEY (id)
) WITH (
'bucket' = '-1',
'dynamic-bucket.target-row-num' = '1',
'merge-engine' = 'aggregation',
'fields.amount.aggregate-function' = 'sum',
'fields.tag.aggregate-function' = 'listagg',
'fields.tag.list-agg-delimiter' = '|',
'fields.default-aggregate-function' = 'last_non_null_value'
)",
)
.await
.unwrap();

sql_context
.sql(
"INSERT INTO paimon.test_db.t_dyn_agg VALUES
(2, 20, 'x', 'old-2'),
(1, 10, 'a', 'old-1')",
)
.await
.unwrap()
.collect()
.await
.unwrap();

let table = catalog
.get_table(&Identifier::new("test_db", "t_dyn_agg"))
.await
.unwrap();
assert_eq!(
index_bucket_count(&table).await,
2,
"target row number 1 should create one HASH index bucket per new key"
);
let id1_bucket = bucket_containing_id(&table, 1).await;

let reloaded_context = create_sql_context(catalog.clone()).await;
reloaded_context
.sql(
"INSERT INTO paimon.test_db.t_dyn_agg VALUES
(1, 5, 'b', CAST(NULL AS STRING)),
(3, 99, 'solo', 'only-3')",
)
.await
.unwrap()
.collect()
.await
.unwrap();

let table = catalog
.get_table(&Identifier::new("test_db", "t_dyn_agg"))
.await
.unwrap();
assert_eq!(
bucket_containing_id(&table, 1).await,
id1_bucket,
"reloaded writer should restore the HASH index and route id=1 to its original bucket"
);
assert_eq!(
index_bucket_count(&table).await,
3,
"new key id=3 should allocate a third bucket when target row number is 1"
);

let reloaded_context = create_sql_context(catalog.clone()).await;
reloaded_context
.sql(
"INSERT INTO paimon.test_db.t_dyn_agg VALUES
(2, 7, CAST(NULL AS STRING), 'new-2'),
(1, CAST(NULL AS INT), 'c', 'new-1')",
)
.await
.unwrap()
.collect()
.await
.unwrap();

let table = catalog
.get_table(&Identifier::new("test_db", "t_dyn_agg"))
.await
.unwrap();
assert_eq!(
bucket_containing_id(&table, 1).await,
id1_bucket,
"a second reload should still keep duplicate PK writes in the existing bucket"
);

let read_context = create_sql_context(catalog).await;
let rows = collect_aggregation_rows(
&read_context,
"SELECT id, amount, tag, note FROM paimon.test_db.t_dyn_agg",
)
.await;
assert_eq!(
rows,
vec![
(
1,
Some(15),
Some("a|b|c".to_string()),
Some("new-1".to_string())
),
(
2,
Some(27),
Some("x".to_string()),
Some("new-2".to_string())
),
(
3,
Some(99),
Some("solo".to_string()),
Some("only-3".to_string())
),
]
);
}

async fn latest_splits(table: &paimon::Table) -> Vec<DataSplit> {
table
.new_read_builder()
Expand Down
4 changes: 2 additions & 2 deletions crates/integrations/datafusion/tests/pk_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2284,7 +2284,7 @@ async fn test_pk_partial_update_merges_across_tiny_splits() {
/// Basic: aggregation engine sums numeric column and concatenates string
/// column across overlapping primary keys.
#[tokio::test]
async fn test_pk_aggregation_sum_and_listagg_fixed_bucket_e2e() {
async fn test_pk_aggregation_sum_and_listagg_fixed_multi_bucket_e2e() {
let (_tmp, sql_context) = setup_sql_context().await;

sql_context
Expand All @@ -2293,7 +2293,7 @@ async fn test_pk_aggregation_sum_and_listagg_fixed_bucket_e2e() {
id INT NOT NULL, amount INT, tag STRING,
PRIMARY KEY (id)
) WITH (
'bucket' = '1',
'bucket' = '4',
'merge-engine' = 'aggregation',
'fields.amount.aggregate-function' = 'sum',
'fields.tag.aggregate-function' = 'listagg',
Expand Down
36 changes: 36 additions & 0 deletions crates/paimon/src/table/table_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,42 @@ mod tests {
.unwrap();
}

#[tokio::test]
async fn test_allows_aggregation_dynamic_bucket_table() {
let file_io = test_file_io();
let table_path = "memory:/test_aggregation_dynamic_bucket_table";
setup_dirs(&file_io, table_path).await;

let table = Table::new(
file_io,
Identifier::new("default", "test_aggregation_dynamic_bucket_table"),
table_path.to_string(),
TableSchema::new(
0,
&Schema::builder()
.column("id", DataType::Int(IntType::new()))
.column("value", DataType::Int(IntType::new()))
.primary_key(["id"])
.option("bucket", "-1")
.option("merge-engine", "aggregation")
.option("fields.value.aggregate-function", "sum")
.build()
.unwrap(),
),
None,
);

let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap();
assert!(matches!(
table_write.bucket_assigner,
BucketAssignerEnum::Dynamic(_)
));
table_write
.write_arrow_batch(&make_batch(vec![1], vec![10]))
.await
.unwrap();
}

#[tokio::test]
async fn test_rejects_partial_update_with_deletion_vectors_when_creating_writer() {
let file_io = test_file_io();
Expand Down
13 changes: 13 additions & 0 deletions docs/src/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,19 @@ Set via `WITH ('key' = 'value')` at table creation time, or dynamically via `SET
|---|---|
| `'merge-engine' = 'deduplicate'` | Deduplicate engine (default for PK tables), last write wins |
| `'merge-engine' = 'first-row'` | Keeps the first written row |
| `'merge-engine' = 'partial-update'` | Basic partial-update engine for PK tables |
| `'merge-engine' = 'aggregation'` | Basic aggregation engine for PK tables |

Rust currently supports `merge-engine=aggregation` in basic mode only. It works
with fixed buckets and ordinary dynamic buckets (`'bucket' = '-1'`) when the
primary key includes all partition columns. It supports per-field aggregate
functions such as `sum`, `min`, `max`, value functions, boolean functions, and
`listagg`, plus `fields.default-aggregate-function`.

This is not full Java feature parity. Aggregation tables do not support retract
rows (`DELETE` / `UPDATE_BEFORE`), deletion vectors, cross-partition dynamic
bucket writes, or advanced aggregation options such as `ignore-retract`,
`distinct`, `nested-key`, `count-limit`, and sequence groups.

### Other Options

Expand Down
Loading