Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 172 additions & 0 deletions crates/integration_tests/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,29 @@ fn extract_id_name_dt(batches: &[RecordBatch]) -> Vec<(i32, String, String)> {
rows
}

fn collect_rows_as_strings(batches: &[RecordBatch]) -> Vec<Vec<String>> {
let mut rows = Vec::new();
for batch in batches {
for row_index in 0..batch.num_rows() {
let mut row = Vec::with_capacity(batch.num_columns());
for (field, column) in batch.schema().fields().iter().zip(batch.columns()) {
if column.is_null(row_index) {
row.push("NULL".to_string());
} else if let Some(values) = column.as_any().downcast_ref::<Int32Array>() {
row.push(values.value(row_index).to_string());
} else if let Some(values) = column.as_any().downcast_ref::<StringArray>() {
row.push(values.value(row_index).to_string());
} else {
panic!("unsupported column type for {}", field.name());
}
}
rows.push(row);
}
}
rows.sort();
rows
}

fn extract_plan_partitions(plan: &Plan) -> HashSet<String> {
plan.splits()
.iter()
Expand Down Expand Up @@ -2372,6 +2395,155 @@ async fn test_time_travel_by_tag_name() {
);
}

#[tokio::test]
async fn time_travel_schema_evolution() {
let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, "time_travel_schema_evolution").await;

for (tag, expected_fields, expected_rows) in [
(
"before_add_column",
vec!["id", "name"],
vec![vec!["1", "alice"], vec!["2", "bob"]],
),
(
"after_add_column",
vec!["id", "name", "age"],
vec![
vec!["1", "alice", "NULL"],
vec!["2", "bob", "NULL"],
vec!["3", "carol", "30"],
vec!["4", "dave", "40"],
],
),
(
"before_rename",
vec!["id", "name", "age"],
vec![
vec!["1", "alice", "NULL"],
vec!["2", "bob", "NULL"],
vec!["3", "carol", "30"],
vec!["4", "dave", "40"],
],
),
(
"after_rename",
vec!["id", "full_name", "age"],
vec![
vec!["1", "alice", "NULL"],
vec!["2", "bob", "NULL"],
vec!["3", "carol", "30"],
vec!["4", "dave", "40"],
vec!["5", "erin", "50"],
vec!["6", "frank", "60"],
],
),
(
"before_drop",
vec!["id", "full_name", "age"],
vec![
vec!["1", "alice", "NULL"],
vec!["2", "bob", "NULL"],
vec!["3", "carol", "30"],
vec!["4", "dave", "40"],
vec!["5", "erin", "50"],
vec!["6", "frank", "60"],
],
),
(
"after_drop",
vec!["id", "full_name"],
vec![
vec!["1", "alice"],
vec!["2", "bob"],
vec!["3", "carol"],
vec!["4", "dave"],
vec!["5", "erin"],
vec!["6", "frank"],
vec!["7", "grace"],
vec!["8", "hank"],
],
),
(
"before_reorder",
vec!["id", "full_name"],
vec![
vec!["1", "alice"],
vec!["2", "bob"],
vec!["3", "carol"],
vec!["4", "dave"],
vec!["5", "erin"],
vec!["6", "frank"],
vec!["7", "grace"],
vec!["8", "hank"],
],
),
(
"after_reorder",
vec!["full_name", "id"],
vec![
vec!["alice", "1"],
vec!["bob", "2"],
vec!["carol", "3"],
vec!["dave", "4"],
vec!["erin", "5"],
vec!["frank", "6"],
vec!["grace", "7"],
vec!["hank", "8"],
vec!["ivy", "9"],
vec!["jane", "10"],
],
),
] {
let versioned_table = table
.copy_with_time_travel(HashMap::from([(
"scan.version".to_string(),
tag.to_string(),
)]))
.await
.unwrap_or_else(|err| panic!("failed to time travel to tag {tag}: {err}"));
let read_builder = versioned_table.new_read_builder();
let plan = read_builder
.new_scan()
.plan()
.await
.unwrap_or_else(|err| panic!("failed to plan tag {tag}: {err}"));
let read = read_builder
.new_read()
.unwrap_or_else(|err| panic!("failed to create read for tag {tag}: {err}"));
let batches: Vec<RecordBatch> = read
.to_arrow(plan.splits())
.unwrap_or_else(|err| panic!("failed to create stream for tag {tag}: {err}"))
.try_collect()
.await
.unwrap_or_else(|err| panic!("failed to collect tag {tag}: {err}"));

let schema = batches
.first()
.expect("time-travel tag should return rows")
.schema();
let field_names: Vec<&str> = schema
.fields()
.iter()
.map(|field| field.name().as_str())
.collect();
assert_eq!(
field_names, expected_fields,
"unexpected schema for tag {tag}"
);

let expected_rows: Vec<Vec<String>> = expected_rows
.into_iter()
.map(|row| row.into_iter().map(str::to_string).collect())
.collect();
assert_eq!(
collect_rows_as_strings(&batches),
expected_rows,
"unexpected rows for tag {tag}"
);
}
}

#[tokio::test]
async fn test_time_travel_conflicting_selectors_fail() {
let catalog = create_file_system_catalog();
Expand Down
151 changes: 151 additions & 0 deletions crates/integrations/datafusion/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,23 @@ async fn assert_sql_rows(sql: &str, expected: &[&[&str]]) {
assert_eq!(actual, expected, "unexpected result for SQL: {sql}");
}

async fn assert_time_travel_schema_evolution_rows(
tag: &str,
select_list: &str,
where_clause: Option<&str>,
expected: &[&[&str]],
) {
let sql = match where_clause {
Some(where_clause) => format!(
"SELECT {select_list} FROM paimon.default.time_travel_schema_evolution VERSION AS OF '{tag}' WHERE {where_clause} ORDER BY id"
),
None => format!(
"SELECT {select_list} FROM paimon.default.time_travel_schema_evolution VERSION AS OF '{tag}' ORDER BY id"
),
};
assert_sql_rows(&sql, expected).await;
}

async fn create_physical_plan(sql: &str) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
let ctx = create_context().await;
ctx.sql(sql).await?.create_physical_plan().await
Expand Down Expand Up @@ -657,6 +674,140 @@ async fn test_time_travel_by_tag_name() {
);
}

#[tokio::test]
async fn time_travel_schema_evolution() {
assert_time_travel_schema_evolution_rows(
"before_add_column",
"id, name",
None,
&[&["1", "alice"], &["2", "bob"]],
)
.await;
assert_time_travel_schema_evolution_rows(
"before_add_column",
"id, name",
Some("id = 2"),
&[&["2", "bob"]],
)
.await;

assert_time_travel_schema_evolution_rows(
"after_add_column",
"id, name, age",
None,
&[
&["1", "alice", "NULL"],
&["2", "bob", "NULL"],
&["3", "carol", "30"],
&["4", "dave", "40"],
],
)
.await;
assert_time_travel_schema_evolution_rows(
"after_add_column",
"id, name, age",
Some("age IS NOT NULL"),
&[&["3", "carol", "30"], &["4", "dave", "40"]],
)
.await;

assert_time_travel_schema_evolution_rows(
"before_rename",
"id, name, age",
Some("id >= 3"),
&[&["3", "carol", "30"], &["4", "dave", "40"]],
)
.await;

assert_time_travel_schema_evolution_rows(
"after_rename",
"id, full_name, age",
None,
&[
&["1", "alice", "NULL"],
&["2", "bob", "NULL"],
&["3", "carol", "30"],
&["4", "dave", "40"],
&["5", "erin", "50"],
&["6", "frank", "60"],
],
)
.await;
assert_time_travel_schema_evolution_rows(
"after_rename",
"id, full_name, age",
Some("full_name LIKE 'f%'"),
&[&["6", "frank", "60"]],
)
.await;

assert_time_travel_schema_evolution_rows(
"before_drop",
"id, full_name, age",
Some("age >= 50"),
&[&["5", "erin", "50"], &["6", "frank", "60"]],
)
.await;

assert_time_travel_schema_evolution_rows(
"after_drop",
"id, full_name",
None,
&[
&["1", "alice"],
&["2", "bob"],
&["3", "carol"],
&["4", "dave"],
&["5", "erin"],
&["6", "frank"],
&["7", "grace"],
&["8", "hank"],
],
)
.await;
assert_time_travel_schema_evolution_rows(
"after_drop",
"id, full_name",
Some("id > 6"),
&[&["7", "grace"], &["8", "hank"]],
)
.await;

assert_time_travel_schema_evolution_rows(
"before_reorder",
"id, full_name",
Some("id = 8"),
&[&["8", "hank"]],
)
.await;

assert_time_travel_schema_evolution_rows(
"after_reorder",
"id, full_name",
None,
&[
&["1", "alice"],
&["2", "bob"],
&["3", "carol"],
&["4", "dave"],
&["5", "erin"],
&["6", "frank"],
&["7", "grace"],
&["8", "hank"],
&["9", "ivy"],
&["10", "jane"],
],
)
.await;
assert_time_travel_schema_evolution_rows(
"after_reorder",
"id, full_name",
Some("id >= 9"),
&[&["9", "ivy"], &["10", "jane"]],
)
.await;
}

#[tokio::test]
async fn test_time_travel_conflicting_selectors_fail() {
// When both scan.version and scan.timestamp-millis are set on the same
Expand Down
Loading
Loading