From 67ee7b3cc02c3a0c17e40a352b3c6fcadc1ba62c Mon Sep 17 00:00:00 2001 From: liujiwen-up Date: Tue, 23 Jun 2026 20:40:38 +0800 Subject: [PATCH] test: add partitioned schema evolution coverage --- crates/integration_tests/tests/read_tables.rs | 130 ++++++++++++++++++ .../datafusion/tests/read_tables.rs | 31 +++++ dev/spark/provision.py | 46 +++++++ 3 files changed, 207 insertions(+) diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index 90f16cb5..991ea014 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -1277,6 +1277,136 @@ async fn test_read_format_schema_evolution_add_column() { ); } +#[tokio::test] +async fn test_read_partitioned_format_schema_evolution_add_column() { + use paimon::spec::{Datum, Predicate, PredicateBuilder}; + + let table_name = "partitioned_format_schema_evolution_add_column"; + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, table_name).await; + let (plan, batches) = scan_and_read(&catalog, table_name, None).await; + assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name); + assert_plan_has_multiple_schema_ids(&plan, table_name); + assert_eq!( + extract_plan_partitions(&plan), + HashSet::from([ + "2024-01-01".to_string(), + "2024-01-02".to_string(), + "2024-01-03".to_string(), + ]), + "Full scan should include all dt partitions" + ); + + let mut rows: Vec<(String, i32, String, Option)> = Vec::new(); + for batch in &batches { + let dt = batch + .column_by_name("dt") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("dt"); + let id = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("id"); + let name = batch + .column_by_name("name") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("name"); + let extra = batch + .column_by_name("extra") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("extra"); + for i in 0..batch.num_rows() { + rows.push(( + dt.value(i).to_string(), + id.value(i), + name.value(i).to_string(), + (!extra.is_null(i)).then(|| extra.value(i).to_string()), + )); + } + } + rows.sort_by(|left, right| left.0.cmp(&right.0).then(left.1.cmp(&right.1))); + + assert_eq!( + rows, + vec![ + ("2024-01-01".into(), 1, "alice".into(), None), + ( + "2024-01-01".into(), + 3, + "carol".into(), + Some("orc-extra-1".into()), + ), + ("2024-01-02".into(), 2, "bob".into(), None), + ( + "2024-01-02".into(), + 5, + "eve".into(), + Some("avro-extra-1".into()), + ), + ( + "2024-01-03".into(), + 4, + "dave".into(), + Some("orc-extra-2".into()), + ), + ( + "2024-01-03".into(), + 6, + "frank".into(), + Some("avro-extra-2".into()), + ), + ], + "Old partitioned Parquet rows should null-fill extra and new ORC/Avro rows should keep values" + ); + + let pb = PredicateBuilder::new(table.schema().fields()); + let filter = pb + .equal("dt", Datum::String("2024-01-02".into())) + .expect("Failed to build predicate"); + let (plan, batches) = scan_and_read_with_filter(&table, filter).await; + assert_eq!( + extract_plan_partitions(&plan), + HashSet::from(["2024-01-02".to_string()]), + "dt filter should prune unrelated partitions" + ); + assert_eq!( + extract_id_name_dt(&batches), + vec![ + (2, "bob".into(), "2024-01-02".into()), + (5, "eve".into(), "2024-01-02".into()), + ] + ); + + let filter = Predicate::and(vec![ + pb.equal("dt", Datum::String("2024-01-03".into())).unwrap(), + pb.equal("extra", Datum::String("avro-extra-2".into())) + .unwrap(), + ]); + let (plan, batches) = + scan_and_read_with_projection_and_filter(&table, Some(&["dt", "id", "extra"]), filter) + .await; + assert_eq!( + extract_plan_partitions(&plan), + HashSet::from(["2024-01-03".to_string()]), + "Partition predicate should still prune when projection includes dt and filter uses extra" + ); + assert_eq!(extract_ids(&batches), vec![6]); + + let filter = Predicate::and(vec![ + pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(), + pb.is_null("extra").unwrap(), + ]); + let (plan, batches) = + scan_and_read_with_projection_and_filter(&table, Some(&["dt", "id", "extra"]), filter) + .await; + assert_eq!( + extract_plan_partitions(&plan), + HashSet::from(["2024-01-01".to_string()]), + "extra IS NULL with dt filter should retain only the matching old-schema partition file" + ); + assert_eq!(extract_ids(&batches), vec![1]); +} + /// Test reading mixed-format files after ALTER TABLE ALTER COLUMN TYPE (INT -> BIGINT). /// Old Parquet files have INT; newer ORC/Avro files have BIGINT. #[tokio::test] diff --git a/crates/integrations/datafusion/tests/read_tables.rs b/crates/integrations/datafusion/tests/read_tables.rs index f0b45bbb..f88f3bbf 100644 --- a/crates/integrations/datafusion/tests/read_tables.rs +++ b/crates/integrations/datafusion/tests/read_tables.rs @@ -795,6 +795,37 @@ async fn test_sql_read_format_schema_evolution_add_column() { .await; } +#[tokio::test] +async fn test_sql_read_partitioned_format_schema_evolution_add_column() { + assert_sql_rows( + "SELECT id, name, extra FROM paimon.default.partitioned_format_schema_evolution_add_column WHERE dt = '2024-01-02' ORDER BY id", + &[ + &["2", "bob", "NULL"], + &["5", "eve", "avro-extra-1"], + ], + ) + .await; + + assert_sql_rows( + "SELECT id, name, extra FROM paimon.default.partitioned_format_schema_evolution_add_column WHERE dt = '2024-01-01' AND extra IS NULL ORDER BY id", + &[&["1", "alice", "NULL"]], + ) + .await; + + assert_sql_rows( + "SELECT dt, id, extra FROM paimon.default.partitioned_format_schema_evolution_add_column ORDER BY id", + &[ + &["2024-01-01", "1", "NULL"], + &["2024-01-02", "2", "NULL"], + &["2024-01-01", "3", "orc-extra-1"], + &["2024-01-03", "4", "orc-extra-2"], + &["2024-01-02", "5", "avro-extra-1"], + &["2024-01-03", "6", "avro-extra-2"], + ], + ) + .await; +} + #[tokio::test] async fn test_sql_read_format_schema_evolution_type_promotion() { assert_sql_rows( diff --git a/dev/spark/provision.py b/dev/spark/provision.py index 2222d0d2..3e1b5190 100644 --- a/dev/spark/provision.py +++ b/dev/spark/provision.py @@ -396,6 +396,52 @@ def main(): "INSERT INTO format_schema_evolution_add_column VALUES (5, 'eve', 50), (6, 'frank', 60)" ) + # ===== Partitioned Mixed-format Schema Evolution: Add Column ===== + # Old Parquet files lack extra; new ORC/Avro files contain extra across dt partitions. + spark.sql( + """ + CREATE TABLE IF NOT EXISTS partitioned_format_schema_evolution_add_column ( + id INT, + name STRING, + dt STRING + ) USING paimon + PARTITIONED BY (dt) + TBLPROPERTIES ( + 'file.format' = 'parquet' + ) + """ + ) + spark.sql( + """ + INSERT INTO partitioned_format_schema_evolution_add_column VALUES + (1, 'alice', '2024-01-01'), + (2, 'bob', '2024-01-02') + """ + ) + spark.sql( + "ALTER TABLE partitioned_format_schema_evolution_add_column ADD COLUMNS (extra STRING)" + ) + spark.sql( + "ALTER TABLE partitioned_format_schema_evolution_add_column SET TBLPROPERTIES ('file.format' = 'orc')" + ) + spark.sql( + """ + INSERT INTO partitioned_format_schema_evolution_add_column (id, name, extra, dt) VALUES + (3, 'carol', 'orc-extra-1', '2024-01-01'), + (4, 'dave', 'orc-extra-2', '2024-01-03') + """ + ) + spark.sql( + "ALTER TABLE partitioned_format_schema_evolution_add_column SET TBLPROPERTIES ('file.format' = 'avro')" + ) + spark.sql( + """ + INSERT INTO partitioned_format_schema_evolution_add_column (id, name, extra, dt) VALUES + (5, 'eve', 'avro-extra-1', '2024-01-02'), + (6, 'frank', 'avro-extra-2', '2024-01-03') + """ + ) + # ===== Mixed-format Schema Evolution: Type Promotion (INT -> BIGINT) ===== # Old Parquet files have value as INT; new ORC/Avro files have value as BIGINT. spark.sql(