Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions crates/integration_tests/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1526,6 +1526,98 @@ async fn test_read_data_evolution_type_promotion() {
);
}

#[tokio::test]
async fn test_read_data_evolution_mixed_format_add_column() {
let table_name = "data_evolution_mixed_format_add_column";
let (plan, batches) = scan_and_read_with_fs_catalog(table_name, None).await;
assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name);
assert_plan_has_multiple_schema_ids(&plan, table_name);

let mut rows: Vec<(i32, String, i32, Option<String>)> = Vec::new();
for batch in &batches {
let id = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("id");
let name = batch
.column_by_name("name")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("name");
let value = batch
.column_by_name("value")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("value");
let extra = batch
.column_by_name("extra")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("extra");
for i in 0..batch.num_rows() {
rows.push((
id.value(i),
name.value(i).to_string(),
value.value(i),
(!extra.is_null(i)).then(|| extra.value(i).to_string()),
));
}
}
rows.sort_by_key(|(id, _, _, _)| *id);

assert_eq!(
rows,
vec![
(1, "alice".into(), 100, None),
(2, "bob".into(), 200, None),
(3, "carol".into(), 300, Some("orc-extra".into())),
(4, "dave".into(), 400, Some("avro-extra".into())),
],
"Mixed-format data evolution should null-fill old extra values and preserve new values"
);
}

#[tokio::test]
async fn test_read_data_evolution_mixed_format_type_promotion() {
let table_name = "data_evolution_mixed_format_type_promotion";
let (plan, batches) = scan_and_read_with_fs_catalog(table_name, None).await;
assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name);
assert_plan_has_multiple_schema_ids(&plan, table_name);

for batch in &batches {
let value_col = batch.column_by_name("value").expect("value column");
assert_eq!(
value_col.data_type(),
&arrow_array::types::Int64Type::DATA_TYPE,
"value column should be Int64 after mixed-format data-evolution type promotion"
);
}

let mut rows: Vec<(i32, i64)> = Vec::new();
for batch in &batches {
let id = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("id");
let value = batch
.column_by_name("value")
.and_then(|c| c.as_any().downcast_ref::<Int64Array>())
.expect("value as Int64Array");
for i in 0..batch.num_rows() {
rows.push((id.value(i), value.value(i)));
}
}
rows.sort_by_key(|(id, _)| *id);

assert_eq!(
rows,
vec![
(1, 100i64),
(2, 200i64),
(3, 3_000_000_000i64),
(4, 4_000_000_000i64),
],
"Mixed-format data evolution should promote INT to BIGINT"
);
}

/// Test reading a table after ALTER TABLE DROP COLUMN.
/// Old data files have the dropped column; reader should ignore it.
#[tokio::test]
Expand Down Expand Up @@ -2866,6 +2958,63 @@ async fn test_read_data_evolution_table_only_row_id_with_row_ranges() {
);
}

#[tokio::test]
async fn test_read_data_evolution_mixed_format_row_id_projection() {
let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, "data_evolution_mixed_format_add_column").await;

let mut read_builder = table.new_read_builder();
read_builder.with_projection(&["_ROW_ID", "id"]);
let scan = read_builder.new_scan();
let plan = scan.plan().await.expect("Failed to plan scan");

assert_plan_file_formats(
&plan,
&["avro", "orc", "parquet"],
"data_evolution_mixed_format_add_column",
);
assert_plan_has_multiple_schema_ids(&plan, "data_evolution_mixed_format_add_column");

let read = read_builder.new_read().expect("Failed to create read");
let stream = read
.to_arrow(plan.splits())
.expect("Failed to create arrow stream");
let batches: Vec<RecordBatch> = stream
.try_collect()
.await
.expect("Failed to collect batches");

let mut row_ids = Vec::new();
let mut ids = Vec::new();
for batch in &batches {
let row_id = batch
.column_by_name("_ROW_ID")
.and_then(|c| c.as_any().downcast_ref::<Int64Array>())
.expect("_ROW_ID");
let id = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("id");
for i in 0..batch.num_rows() {
row_ids.push(row_id.value(i));
ids.push(id.value(i));
}
}

assert_eq!(row_ids.len(), ids.len());
assert_eq!(ids.len(), 4, "Expected all fixture rows to be readable");
assert!(
row_ids.iter().all(|&row_id| row_id >= 0),
"All _ROW_ID values should be non-negative"
);
let unique: HashSet<i64> = row_ids.iter().copied().collect();
assert_eq!(
unique.len(),
row_ids.len(),
"_ROW_ID values should be unique"
);
}

// ---------------------------------------------------------------------------
// Full types integration tests (parquet + orc + avro)
// ---------------------------------------------------------------------------
Expand Down
64 changes: 64 additions & 0 deletions dev/spark/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,70 @@ def main():
"INSERT INTO format_schema_evolution_type_promotion VALUES (5, 5000000000), (6, 6000000000)"
)

# ===== Mixed-format Data Evolution: Add Column =====
# Combines row-tracking/data-evolution with ADD COLUMN and mixed file formats.
# Old Parquet files lack extra; new ORC/Avro files contain extra.
spark.sql(
"""
CREATE TABLE IF NOT EXISTS data_evolution_mixed_format_add_column (
id INT,
name STRING,
value INT
) USING paimon
TBLPROPERTIES (
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true',
'file.format' = 'parquet'
)
"""
)
spark.sql(
"""
INSERT INTO data_evolution_mixed_format_add_column VALUES
(1, 'alice', 100),
(2, 'bob', 200)
"""
)
spark.sql("ALTER TABLE data_evolution_mixed_format_add_column ADD COLUMNS (extra STRING)")
spark.sql("ALTER TABLE data_evolution_mixed_format_add_column SET TBLPROPERTIES ('file.format' = 'orc')")
spark.sql(
"INSERT INTO data_evolution_mixed_format_add_column VALUES (3, 'carol', 300, 'orc-extra')"
)
spark.sql("ALTER TABLE data_evolution_mixed_format_add_column SET TBLPROPERTIES ('file.format' = 'avro')")
spark.sql(
"INSERT INTO data_evolution_mixed_format_add_column VALUES (4, 'dave', 400, 'avro-extra')"
)

# ===== Mixed-format Data Evolution: Type Promotion =====
# Old Parquet files have INT; new ORC/Avro files have BIGINT.
spark.sql(
"""
CREATE TABLE IF NOT EXISTS data_evolution_mixed_format_type_promotion (
id INT,
value INT
) USING paimon
TBLPROPERTIES (
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true',
'file.format' = 'parquet'
)
"""
)
spark.sql(
"INSERT INTO data_evolution_mixed_format_type_promotion VALUES (1, 100), (2, 200)"
)
spark.sql(
"ALTER TABLE data_evolution_mixed_format_type_promotion ALTER COLUMN value TYPE BIGINT"
)
spark.sql("ALTER TABLE data_evolution_mixed_format_type_promotion SET TBLPROPERTIES ('file.format' = 'orc')")
spark.sql(
"INSERT INTO data_evolution_mixed_format_type_promotion VALUES (3, 3000000000)"
)
spark.sql("ALTER TABLE data_evolution_mixed_format_type_promotion SET TBLPROPERTIES ('file.format' = 'avro')")
spark.sql(
"INSERT INTO data_evolution_mixed_format_type_promotion VALUES (4, 4000000000)"
)

# ===== Data Evolution + Schema Evolution: Add Column =====
# Combines data-evolution (row-tracking + MERGE INTO) with ALTER TABLE ADD COLUMNS.
# Old files lack the new column; MERGE INTO produces partial-column files.
Expand Down
Loading