diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index 90f16cb5..7f945e0a 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -182,6 +182,29 @@ fn extract_id_name_dt(batches: &[RecordBatch]) -> Vec<(i32, String, String)> { rows } +fn collect_rows_as_strings(batches: &[RecordBatch]) -> Vec> { + let mut rows = Vec::new(); + for batch in batches { + for row_index in 0..batch.num_rows() { + let mut row = Vec::with_capacity(batch.num_columns()); + for (field, column) in batch.schema().fields().iter().zip(batch.columns()) { + if column.is_null(row_index) { + row.push("NULL".to_string()); + } else if let Some(values) = column.as_any().downcast_ref::() { + row.push(values.value(row_index).to_string()); + } else if let Some(values) = column.as_any().downcast_ref::() { + row.push(values.value(row_index).to_string()); + } else { + panic!("unsupported column type for {}", field.name()); + } + } + rows.push(row); + } + } + rows.sort(); + rows +} + fn extract_plan_partitions(plan: &Plan) -> HashSet { plan.splits() .iter() @@ -2372,6 +2395,155 @@ async fn test_time_travel_by_tag_name() { ); } +#[tokio::test] +async fn time_travel_schema_evolution() { + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, "time_travel_schema_evolution").await; + + for (tag, expected_fields, expected_rows) in [ + ( + "before_add_column", + vec!["id", "name"], + vec![vec!["1", "alice"], vec!["2", "bob"]], + ), + ( + "after_add_column", + vec!["id", "name", "age"], + vec![ + vec!["1", "alice", "NULL"], + vec!["2", "bob", "NULL"], + vec!["3", "carol", "30"], + vec!["4", "dave", "40"], + ], + ), + ( + "before_rename", + vec!["id", "name", "age"], + vec![ + vec!["1", "alice", "NULL"], + vec!["2", "bob", "NULL"], + vec!["3", "carol", "30"], + vec!["4", "dave", "40"], + ], + ), + ( + "after_rename", + vec!["id", "full_name", "age"], + vec![ + vec!["1", "alice", "NULL"], + vec!["2", "bob", "NULL"], + vec!["3", "carol", "30"], + vec!["4", "dave", "40"], + vec!["5", "erin", "50"], + vec!["6", "frank", "60"], + ], + ), + ( + "before_drop", + vec!["id", "full_name", "age"], + vec![ + vec!["1", "alice", "NULL"], + vec!["2", "bob", "NULL"], + vec!["3", "carol", "30"], + vec!["4", "dave", "40"], + vec!["5", "erin", "50"], + vec!["6", "frank", "60"], + ], + ), + ( + "after_drop", + vec!["id", "full_name"], + vec![ + vec!["1", "alice"], + vec!["2", "bob"], + vec!["3", "carol"], + vec!["4", "dave"], + vec!["5", "erin"], + vec!["6", "frank"], + vec!["7", "grace"], + vec!["8", "hank"], + ], + ), + ( + "before_reorder", + vec!["id", "full_name"], + vec![ + vec!["1", "alice"], + vec!["2", "bob"], + vec!["3", "carol"], + vec!["4", "dave"], + vec!["5", "erin"], + vec!["6", "frank"], + vec!["7", "grace"], + vec!["8", "hank"], + ], + ), + ( + "after_reorder", + vec!["full_name", "id"], + vec![ + vec!["alice", "1"], + vec!["bob", "2"], + vec!["carol", "3"], + vec!["dave", "4"], + vec!["erin", "5"], + vec!["frank", "6"], + vec!["grace", "7"], + vec!["hank", "8"], + vec!["ivy", "9"], + vec!["jane", "10"], + ], + ), + ] { + let versioned_table = table + .copy_with_time_travel(HashMap::from([( + "scan.version".to_string(), + tag.to_string(), + )])) + .await + .unwrap_or_else(|err| panic!("failed to time travel to tag {tag}: {err}")); + let read_builder = versioned_table.new_read_builder(); + let plan = read_builder + .new_scan() + .plan() + .await + .unwrap_or_else(|err| panic!("failed to plan tag {tag}: {err}")); + let read = read_builder + .new_read() + .unwrap_or_else(|err| panic!("failed to create read for tag {tag}: {err}")); + let batches: Vec = read + .to_arrow(plan.splits()) + .unwrap_or_else(|err| panic!("failed to create stream for tag {tag}: {err}")) + .try_collect() + .await + .unwrap_or_else(|err| panic!("failed to collect tag {tag}: {err}")); + + let schema = batches + .first() + .expect("time-travel tag should return rows") + .schema(); + let field_names: Vec<&str> = schema + .fields() + .iter() + .map(|field| field.name().as_str()) + .collect(); + assert_eq!( + field_names, expected_fields, + "unexpected schema for tag {tag}" + ); + + let expected_rows: Vec> = expected_rows + .into_iter() + .map(|row| row.into_iter().map(str::to_string).collect()) + .collect(); + assert_eq!( + collect_rows_as_strings(&batches), + expected_rows, + "unexpected rows for tag {tag}" + ); + } +} + #[tokio::test] async fn test_time_travel_conflicting_selectors_fail() { let catalog = create_file_system_catalog(); diff --git a/crates/integrations/datafusion/tests/read_tables.rs b/crates/integrations/datafusion/tests/read_tables.rs index f0b45bbb..9c70255c 100644 --- a/crates/integrations/datafusion/tests/read_tables.rs +++ b/crates/integrations/datafusion/tests/read_tables.rs @@ -129,6 +129,23 @@ async fn assert_sql_rows(sql: &str, expected: &[&[&str]]) { assert_eq!(actual, expected, "unexpected result for SQL: {sql}"); } +async fn assert_time_travel_schema_evolution_rows( + tag: &str, + select_list: &str, + where_clause: Option<&str>, + expected: &[&[&str]], +) { + let sql = match where_clause { + Some(where_clause) => format!( + "SELECT {select_list} FROM paimon.default.time_travel_schema_evolution VERSION AS OF '{tag}' WHERE {where_clause} ORDER BY id" + ), + None => format!( + "SELECT {select_list} FROM paimon.default.time_travel_schema_evolution VERSION AS OF '{tag}' ORDER BY id" + ), + }; + assert_sql_rows(&sql, expected).await; +} + async fn create_physical_plan(sql: &str) -> datafusion::error::Result> { let ctx = create_context().await; ctx.sql(sql).await?.create_physical_plan().await @@ -657,6 +674,140 @@ async fn test_time_travel_by_tag_name() { ); } +#[tokio::test] +async fn time_travel_schema_evolution() { + assert_time_travel_schema_evolution_rows( + "before_add_column", + "id, name", + None, + &[&["1", "alice"], &["2", "bob"]], + ) + .await; + assert_time_travel_schema_evolution_rows( + "before_add_column", + "id, name", + Some("id = 2"), + &[&["2", "bob"]], + ) + .await; + + assert_time_travel_schema_evolution_rows( + "after_add_column", + "id, name, age", + None, + &[ + &["1", "alice", "NULL"], + &["2", "bob", "NULL"], + &["3", "carol", "30"], + &["4", "dave", "40"], + ], + ) + .await; + assert_time_travel_schema_evolution_rows( + "after_add_column", + "id, name, age", + Some("age IS NOT NULL"), + &[&["3", "carol", "30"], &["4", "dave", "40"]], + ) + .await; + + assert_time_travel_schema_evolution_rows( + "before_rename", + "id, name, age", + Some("id >= 3"), + &[&["3", "carol", "30"], &["4", "dave", "40"]], + ) + .await; + + assert_time_travel_schema_evolution_rows( + "after_rename", + "id, full_name, age", + None, + &[ + &["1", "alice", "NULL"], + &["2", "bob", "NULL"], + &["3", "carol", "30"], + &["4", "dave", "40"], + &["5", "erin", "50"], + &["6", "frank", "60"], + ], + ) + .await; + assert_time_travel_schema_evolution_rows( + "after_rename", + "id, full_name, age", + Some("full_name LIKE 'f%'"), + &[&["6", "frank", "60"]], + ) + .await; + + assert_time_travel_schema_evolution_rows( + "before_drop", + "id, full_name, age", + Some("age >= 50"), + &[&["5", "erin", "50"], &["6", "frank", "60"]], + ) + .await; + + assert_time_travel_schema_evolution_rows( + "after_drop", + "id, full_name", + None, + &[ + &["1", "alice"], + &["2", "bob"], + &["3", "carol"], + &["4", "dave"], + &["5", "erin"], + &["6", "frank"], + &["7", "grace"], + &["8", "hank"], + ], + ) + .await; + assert_time_travel_schema_evolution_rows( + "after_drop", + "id, full_name", + Some("id > 6"), + &[&["7", "grace"], &["8", "hank"]], + ) + .await; + + assert_time_travel_schema_evolution_rows( + "before_reorder", + "id, full_name", + Some("id = 8"), + &[&["8", "hank"]], + ) + .await; + + assert_time_travel_schema_evolution_rows( + "after_reorder", + "id, full_name", + None, + &[ + &["1", "alice"], + &["2", "bob"], + &["3", "carol"], + &["4", "dave"], + &["5", "erin"], + &["6", "frank"], + &["7", "grace"], + &["8", "hank"], + &["9", "ivy"], + &["10", "jane"], + ], + ) + .await; + assert_time_travel_schema_evolution_rows( + "after_reorder", + "id, full_name", + Some("id >= 9"), + &[&["9", "ivy"], &["10", "jane"]], + ) + .await; +} + #[tokio::test] async fn test_time_travel_conflicting_selectors_fail() { // When both scan.version and scan.timestamp-millis are set on the same diff --git a/dev/spark/provision.py b/dev/spark/provision.py index 2222d0d2..a10a8ccd 100644 --- a/dev/spark/provision.py +++ b/dev/spark/provision.py @@ -21,6 +21,7 @@ # for paimon-rust integration tests to read. import shutil +import time from pathlib import Path from urllib.parse import unquote, urlparse @@ -52,10 +53,17 @@ def _reset_warehouse_dir(warehouse_path: Path) -> None: warehouse_path.mkdir(parents=True, exist_ok=True) for child in warehouse_path.iterdir(): - if child.is_symlink() or child.is_file(): - child.unlink() - else: - shutil.rmtree(child) + for attempt in range(3): + try: + if child.is_symlink() or child.is_file(): + child.unlink() + else: + shutil.rmtree(child) + break + except OSError: + if attempt == 2: + raise + time.sleep(0.1) def main(): @@ -330,6 +338,69 @@ def main(): spark.sql("CALL sys.create_tag('default.time_travel_table', 'snapshot1', 1)") spark.sql("CALL sys.create_tag('default.time_travel_table', 'snapshot2', 2)") + # ===== Time travel + schema evolution table ===== + # Each tag points at a stable schema boundary. Tests read by tag name rather + # than depending on global warehouse snapshot numbering. + spark.sql( + """ + CREATE TABLE IF NOT EXISTS time_travel_schema_evolution ( + id INT, + name STRING + ) USING paimon + """ + ) + spark.sql( + """ + INSERT INTO time_travel_schema_evolution VALUES + (1, 'alice'), + (2, 'bob') + """ + ) + spark.sql("CALL sys.create_tag('default.time_travel_schema_evolution', 'before_add_column', 1)") + + spark.sql("ALTER TABLE time_travel_schema_evolution ADD COLUMNS (age INT)") + spark.sql( + """ + INSERT INTO time_travel_schema_evolution VALUES + (3, 'carol', 30), + (4, 'dave', 40) + """ + ) + spark.sql("CALL sys.create_tag('default.time_travel_schema_evolution', 'after_add_column', 2)") + spark.sql("CALL sys.create_tag('default.time_travel_schema_evolution', 'before_rename', 2)") + + spark.sql("ALTER TABLE time_travel_schema_evolution RENAME COLUMN name TO full_name") + spark.sql( + """ + INSERT INTO time_travel_schema_evolution VALUES + (5, 'erin', 50), + (6, 'frank', 60) + """ + ) + spark.sql("CALL sys.create_tag('default.time_travel_schema_evolution', 'after_rename', 3)") + spark.sql("CALL sys.create_tag('default.time_travel_schema_evolution', 'before_drop', 3)") + + spark.sql("ALTER TABLE time_travel_schema_evolution DROP COLUMN age") + spark.sql( + """ + INSERT INTO time_travel_schema_evolution VALUES + (7, 'grace'), + (8, 'hank') + """ + ) + spark.sql("CALL sys.create_tag('default.time_travel_schema_evolution', 'after_drop', 4)") + spark.sql("CALL sys.create_tag('default.time_travel_schema_evolution', 'before_reorder', 4)") + + spark.sql("ALTER TABLE time_travel_schema_evolution ALTER COLUMN full_name FIRST") + spark.sql( + """ + INSERT INTO time_travel_schema_evolution VALUES + ('ivy', 9), + ('jane', 10) + """ + ) + spark.sql("CALL sys.create_tag('default.time_travel_schema_evolution', 'after_reorder', 5)") + # ===== Schema Evolution: Add Column ===== # Old files have (id, name); after ALTER TABLE ADD COLUMNS, new files have (id, name, age). # Reader must fill nulls for 'age' when reading old files.