Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions crates/integration_tests/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,136 @@ async fn test_read_format_schema_evolution_add_column() {
);
}

#[tokio::test]
async fn test_read_partitioned_format_schema_evolution_add_column() {
use paimon::spec::{Datum, Predicate, PredicateBuilder};

let table_name = "partitioned_format_schema_evolution_add_column";
let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, table_name).await;
let (plan, batches) = scan_and_read(&catalog, table_name, None).await;
assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name);
assert_plan_has_multiple_schema_ids(&plan, table_name);
assert_eq!(
extract_plan_partitions(&plan),
HashSet::from([
"2024-01-01".to_string(),
"2024-01-02".to_string(),
"2024-01-03".to_string(),
]),
"Full scan should include all dt partitions"
);

let mut rows: Vec<(String, i32, String, Option<String>)> = Vec::new();
for batch in &batches {
let dt = batch
.column_by_name("dt")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("dt");
let id = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("id");
let name = batch
.column_by_name("name")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("name");
let extra = batch
.column_by_name("extra")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("extra");
for i in 0..batch.num_rows() {
rows.push((
dt.value(i).to_string(),
id.value(i),
name.value(i).to_string(),
(!extra.is_null(i)).then(|| extra.value(i).to_string()),
));
}
}
rows.sort_by(|left, right| left.0.cmp(&right.0).then(left.1.cmp(&right.1)));

assert_eq!(
rows,
vec![
("2024-01-01".into(), 1, "alice".into(), None),
(
"2024-01-01".into(),
3,
"carol".into(),
Some("orc-extra-1".into()),
),
("2024-01-02".into(), 2, "bob".into(), None),
(
"2024-01-02".into(),
5,
"eve".into(),
Some("avro-extra-1".into()),
),
(
"2024-01-03".into(),
4,
"dave".into(),
Some("orc-extra-2".into()),
),
(
"2024-01-03".into(),
6,
"frank".into(),
Some("avro-extra-2".into()),
),
],
"Old partitioned Parquet rows should null-fill extra and new ORC/Avro rows should keep values"
);

let pb = PredicateBuilder::new(table.schema().fields());
let filter = pb
.equal("dt", Datum::String("2024-01-02".into()))
.expect("Failed to build predicate");
let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
assert_eq!(
extract_plan_partitions(&plan),
HashSet::from(["2024-01-02".to_string()]),
"dt filter should prune unrelated partitions"
);
assert_eq!(
extract_id_name_dt(&batches),
vec![
(2, "bob".into(), "2024-01-02".into()),
(5, "eve".into(), "2024-01-02".into()),
]
);

let filter = Predicate::and(vec![
pb.equal("dt", Datum::String("2024-01-03".into())).unwrap(),
pb.equal("extra", Datum::String("avro-extra-2".into()))
.unwrap(),
]);
let (plan, batches) =
scan_and_read_with_projection_and_filter(&table, Some(&["dt", "id", "extra"]), filter)
.await;
assert_eq!(
extract_plan_partitions(&plan),
HashSet::from(["2024-01-03".to_string()]),
"Partition predicate should still prune when projection includes dt and filter uses extra"
);
assert_eq!(extract_ids(&batches), vec![6]);

let filter = Predicate::and(vec![
pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
pb.is_null("extra").unwrap(),
]);
let (plan, batches) =
scan_and_read_with_projection_and_filter(&table, Some(&["dt", "id", "extra"]), filter)
.await;
assert_eq!(
extract_plan_partitions(&plan),
HashSet::from(["2024-01-01".to_string()]),
"extra IS NULL with dt filter should retain only the matching old-schema partition file"
);
assert_eq!(extract_ids(&batches), vec![1]);
}

/// Test reading mixed-format files after ALTER TABLE ALTER COLUMN TYPE (INT -> BIGINT).
/// Old Parquet files have INT; newer ORC/Avro files have BIGINT.
#[tokio::test]
Expand Down
31 changes: 31 additions & 0 deletions crates/integrations/datafusion/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,37 @@ async fn test_sql_read_format_schema_evolution_add_column() {
.await;
}

#[tokio::test]
async fn test_sql_read_partitioned_format_schema_evolution_add_column() {
assert_sql_rows(
"SELECT id, name, extra FROM paimon.default.partitioned_format_schema_evolution_add_column WHERE dt = '2024-01-02' ORDER BY id",
&[
&["2", "bob", "NULL"],
&["5", "eve", "avro-extra-1"],
],
)
.await;

assert_sql_rows(
"SELECT id, name, extra FROM paimon.default.partitioned_format_schema_evolution_add_column WHERE dt = '2024-01-01' AND extra IS NULL ORDER BY id",
&[&["1", "alice", "NULL"]],
)
.await;

assert_sql_rows(
"SELECT dt, id, extra FROM paimon.default.partitioned_format_schema_evolution_add_column ORDER BY id",
&[
&["2024-01-01", "1", "NULL"],
&["2024-01-02", "2", "NULL"],
&["2024-01-01", "3", "orc-extra-1"],
&["2024-01-03", "4", "orc-extra-2"],
&["2024-01-02", "5", "avro-extra-1"],
&["2024-01-03", "6", "avro-extra-2"],
],
)
.await;
}

#[tokio::test]
async fn test_sql_read_format_schema_evolution_type_promotion() {
assert_sql_rows(
Expand Down
46 changes: 46 additions & 0 deletions dev/spark/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,52 @@ def main():
"INSERT INTO format_schema_evolution_add_column VALUES (5, 'eve', 50), (6, 'frank', 60)"
)

# ===== Partitioned Mixed-format Schema Evolution: Add Column =====
# Old Parquet files lack extra; new ORC/Avro files contain extra across dt partitions.
spark.sql(
"""
CREATE TABLE IF NOT EXISTS partitioned_format_schema_evolution_add_column (
id INT,
name STRING,
dt STRING
) USING paimon
PARTITIONED BY (dt)
TBLPROPERTIES (
'file.format' = 'parquet'
)
"""
)
spark.sql(
"""
INSERT INTO partitioned_format_schema_evolution_add_column VALUES
(1, 'alice', '2024-01-01'),
(2, 'bob', '2024-01-02')
"""
)
spark.sql(
"ALTER TABLE partitioned_format_schema_evolution_add_column ADD COLUMNS (extra STRING)"
)
spark.sql(
"ALTER TABLE partitioned_format_schema_evolution_add_column SET TBLPROPERTIES ('file.format' = 'orc')"
)
spark.sql(
"""
INSERT INTO partitioned_format_schema_evolution_add_column (id, name, extra, dt) VALUES
(3, 'carol', 'orc-extra-1', '2024-01-01'),
(4, 'dave', 'orc-extra-2', '2024-01-03')
"""
)
spark.sql(
"ALTER TABLE partitioned_format_schema_evolution_add_column SET TBLPROPERTIES ('file.format' = 'avro')"
)
spark.sql(
"""
INSERT INTO partitioned_format_schema_evolution_add_column (id, name, extra, dt) VALUES
(5, 'eve', 'avro-extra-1', '2024-01-02'),
(6, 'frank', 'avro-extra-2', '2024-01-03')
"""
)

# ===== Mixed-format Schema Evolution: Type Promotion (INT -> BIGINT) =====
# Old Parquet files have value as INT; new ORC/Avro files have value as BIGINT.
spark.sql(
Expand Down
Loading