feat: add support for _partition metadata column#2668
Conversation
|
@advancedxy fyi |
074f903 to
caa7fb0
Compare
| /// # Arguments | ||
| /// * `partition_specs` - Iterator over all partition specs in the table | ||
| /// * `schema` - The current table schema (needed to determine result types of transforms) | ||
| pub fn compute_unified_partition_type<'a>( |
There was a problem hiding this comment.
nit: it's a bit of odd that this function is added in metadata_column.rs. Do you think it's a good idea to create partitioning.rs and put this function into partitioning.rs?
There was a problem hiding this comment.
Created crates/iceberg/src/partitioning.rs, moved the function there, registered as pub mod partitioning in lib.rs
| let mut seen_field_ids = std::collections::HashSet::new(); | ||
| let mut struct_fields: Vec<NestedFieldRef> = Vec::new(); | ||
|
|
||
| for spec in partition_specs { |
There was a problem hiding this comment.
I notice some inconsistent with java's impl:
- unknown specs are rejected in java.
- specs are sorted by spec id first(in reverse order), which means newer partition spec's field name will be picked fist.
- V1 table's void transform field is also handled in java: the partition field that was dropped later.
There was a problem hiding this comment.
Good point!
Updated the implementation (partitioning.rs) with -
- Sort specs by spec_id descending (newer field names take precedence)
- Skips Transform::Void fields (dropped partition columns)
- Deduplicates by field_id
| let partition_column_constant = | ||
| if let Some(ref unified_partition_type) = self.unified_partition_type { | ||
| let partition_spec = self | ||
| .table_metadata | ||
| .partition_spec_by_id(self.partition_spec_id); | ||
| if let Some(spec) = partition_spec { | ||
| let constant = build_partition_column_constant( | ||
| unified_partition_type, | ||
| spec, | ||
| &self.manifest_entry.data_file.partition, | ||
| )?; | ||
| Some(Arc::new(constant)) | ||
| } else { | ||
| None | ||
| } | ||
| } else { | ||
| None | ||
| }; |
There was a problem hiding this comment.
I'm not sure this is a good idea to calculate the partition column in the plan/scan phase and it adds build_partition_column_constant dep from record_batch_transformer' mod into the scan side.
There was a problem hiding this comment.
Fair point. Removed build_partition_column_constant from scan/context.rs. The scan phase only passes unified_partition_type through to the task. The actual struct constant is computed lazily at read time in pipeline.rs.
| #[serde(serialize_with = "serialize_not_implemented")] | ||
| #[serde(deserialize_with = "deserialize_not_implemented")] | ||
| #[builder(default)] | ||
| pub partition_column_constant: Option<Arc<PartitionColumnConstant>>, |
There was a problem hiding this comment.
Like the comment in https://github.com/apache/iceberg-rust/pull/2668/changes#r3448373215, I think it's better to pass the unified partition type here rather than passing the actual unified partition value. It would be easier for comet to pooling the type rather than the actual value.
BTW, this would be unnecessary if we can rebuild/access the table/metadata when reading on the executor side. Java archives this by SerializableTable and with Spark's broadcast. We don't have similar thing on the rust yet.
There was a problem hiding this comment.
FileScanTask now carries unified_partition_type: Option<Arc<StructType>> instead of partition_column_constant: Option<Arc<PartitionColumnConstant>>. The reader computes the value from type + spec + partition data.
Also removed the table_metadata field from ManifestEntryContext and ManifestFileContext (it was only used for the old precomputation path)
| let mut ids: Vec<i32> = value.identifier_field_ids.into_iter().collect(); | ||
| ids.sort_unstable(); | ||
| Some(ids) | ||
| }, |
There was a problem hiding this comment.
the changes in this file seem unrelated? And I don't think the spec requires sorting identifier field ids.
There was a problem hiding this comment.
You're right, this got left over by accident. Removed.
| (DataType::LargeBinary, Some(PrimitiveLiteral::Binary(value))) => { | ||
| Arc::new(LargeBinaryArray::from_vec(vec![value; num_rows])) | ||
| } | ||
| (DataType::LargeBinary, None) => { | ||
| let vals: Vec<Option<&[u8]>> = vec![None; num_rows]; | ||
| Arc::new(LargeBinaryArray::from_opt_vec(vals)) | ||
| } | ||
| (DataType::FixedSizeBinary(len), Some(PrimitiveLiteral::Binary(value))) => { | ||
| let repeated: Vec<&[u8]> = vec![value.as_slice(); num_rows]; | ||
| Arc::new(FixedSizeBinaryArray::try_from_iter(repeated.into_iter()).map_err(|e| { | ||
| Error::new( | ||
| ErrorKind::DataInvalid, | ||
| format!("Failed to create FixedSizeBinary({len}) array: {e}"), | ||
| ) | ||
| })?) | ||
| } | ||
| (DataType::FixedSizeBinary(len), None) => { | ||
| let repeated: Vec<Option<&[u8]>> = vec![None; num_rows]; | ||
| Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(repeated.into_iter(), *len).map_err(|e| { | ||
| Error::new( | ||
| ErrorKind::DataInvalid, | ||
| format!("Failed to create null FixedSizeBinary({len}) array: {e}"), | ||
| ) | ||
| })?) | ||
| } |
There was a problem hiding this comment.
these seems not related?
Our internal integration also shows iceberg mapping Iceberg's binary to Arrow's LargeBinary though, which should also be updated.
There was a problem hiding this comment.
these seems not related?
This is needed, I think. When a table is partitioned by a Binary, UUID, or Time column, create_primitive_array_repeated must handle those Arrow types to produce the child arrays of the _partition struct.
Our internal integration also shows iceberg mapping Iceberg's binary to Arrow's LargeBinary though, which should also be updated.
the default type_to_arrow_type mapping for Iceberg's Binary type should produce LargeBinary (matching Java). However, that change affects all Binary column reads (not just _partition), so it should probably be a follow up.
The LargeBinary arm in create_primitive_array_repeated ensures we handle it correctly if/when that mapping change lands.
There was a problem hiding this comment.
This is needed, I think. When a table is partitioned by a Binary, UUID, or Time column, create_primitive_array_repeated must handle those Arrow types to produce the child arrays of the _partition struct.
thanks for the explanation.
the default type_to_arrow_type mapping for Iceberg's Binary type should produce LargeBinary (matching Java)
Actually, java maps Iceberg's Binary to Arrow's Binary at least in the java arrow reader, see https://github.com/apache/iceberg/blob/main/arrow/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java#L134 and I think parquet-rs reads the binary column in parquet as binary rather than large binary as well.
However, that change affects all Binary column reads (not just _partition), so it should probably be a follow up.
Yes, it should be addressed in a follow-up.
The LargeBinary arm in create_primitive_array_repeated ensures we handle it correctly if/when that mapping change lands.
That totally makes sense.
|
|
||
| // A struct column where each child is a constant primitive value. | ||
| // Used for the _partition metadata column. | ||
| AddStructConstant { |
There was a problem hiding this comment.
hmmm. why this is not added to the constant fields? I think java's impl simply add partition's field constant map with a struct projection to mapping the task's partition data into the unified type?
There was a problem hiding this comment.
Java's approach uses ConstantVectorReader with a StructProjection to map partition data into the unified type. This works because Java's constant map can hold arbitrary values including struct projections. In iceberg-rust, the constant_fields: HashMap<i32, Datum> infrastructure only supports primitive Datum values — Datum has no struct variant. Adding struct support to Datum would be a significant separate effort (touching the spec, serde, and value layers).
The AddStructConstant column source achieves the same functional result through a different mechanism - it builds the struct array directly from the precomputed child values. The per-batch cost is the same (materialize N primitive arrays + wrap in StructArray). If Datum gains struct support in the future, we can consolidated it into the existing constant field infrastructure but for now, the separate variant keeps the change self-contained without requiring changes to the Datum type system.
There was a problem hiding this comment.
In iceberg-rust, the constant_fields: HashMap<i32, Datum> infrastructure only supports primitive Datum values — Datum has no struct variant. Adding struct support to Datum would be a significant separate effort (touching the spec, serde, and value layers).
I see the trade off now. I think it's better to choose your approach now to avoid touching too much internals.
However, I think iceberg-rust already has Struct literal in crates/iceberg/src/spec/values/literal.rs, we can use that to replace the child_values?
And BTW, taking a step forward, the AddStructConstant is essentially the same as
Add {
target_type: DataType,
value: Option<PrimitiveLiteral>,
},which should also be refactor to use Option<Literal> in value.
Maybe, we can create a follow-up issue now to refactor these operations. And we can go with your approach for now.
There was a problem hiding this comment.
Thanks for logging the issue.
I was wondering if we can simply use Struct type for child _values? It's not required, I'm ok to do that in follow-up.
|
@parthchandra thanks for pinging me and working on this. I think I'm concerned that the unified partition value is carried in the file scan task, which seems a bit of odd. |
caa7fb0 to
33b690a
Compare
| schema: &Schema, | ||
| ) -> Result<StructType> { | ||
| let mut seen_field_ids = std::collections::HashSet::new(); | ||
| let mut struct_fields: Vec<NestedFieldRef> = Vec::new(); |
There was a problem hiding this comment.
nit: should unknown transform should be handled too?
|
@parthchandra I did another round and left some follow-up comments. I think it's in good shape overall. |
|
Comet generates the FileScanTasks from Iceberg Java. Does Iceberg Java provide |
Good point! Iceberg Java doesn't provide unified_partition_type on individual FileScanTasks — Java computes it at a higher level via Partitioning.partitionType(table) We can do the equivalent in Comet - https://github.com/parthchandra/datafusion-comet/blob/iceberg-metadata-columns/native/core/src/execution/planner.rs#L3409-L3440 |
| { | ||
| let struct_type = DataType::Struct(pc.fields.clone()); | ||
| let nullable = pc.fields.is_empty(); | ||
| let arrow_field = Field::new("_partition", struct_type, nullable) |
There was a problem hiding this comment.
nit: should use RESERVED_COL_NAME_PARTITION in metadata_columns directly.
| } | ||
|
|
||
| // Skip void transforms (dropped partition columns) and unknown transforms | ||
| if matches!(field.transform, Transform::Void | Transform::Unknown) { |
There was a problem hiding this comment.
hmm. I don't think this is consistent with iceberg-java.
Unknown transform should be checked rather than ignored.
Considering a new transform introduced in iceberg-spec but not adapted by iceberg-rust yet should be considered as unknown transform.
Are you able to demonstrate this feature working in Comet? Usually when I bring an enhancement to iceberg-rust motivated by Comet usage, I create a draft PR on the Comet repo with tests and the feature built out (using the PR branch here) to demonstrate it works with the Iceberg Java suites. |
|
Thanks @advancedxy @mbutrovich I'm on the road atm, will address these in a few days. |
4df3d67 to
8da37c9
Compare
mbutrovich
left a comment
There was a problem hiding this comment.
Thanks for this @parthchandra! Computing the unified type once per scan and threading it through the context is a nice shape, and matching Java's "empty partition struct becomes null" choice is a good call for engine compatibility. A few things I'd like to talk through before it lands, mostly around matching Partitioning.partitionType exactly and trimming some surface area:
- There looks to be an ordering dependency on #2695 (the
partition_specwiring). Could you confirm? See thecontext.rsnote. - Two spots where I think we may diverge from
Partitioning.partitionType(dropped source column; result field ordering), flagged as questions inline. - Some of the array construction could lean on arrow-rs helpers, plus a couple of API-surface trims.
- Coverage is identity-transform-only right now; I listed the cases I'd find most reassuring.
Re: tests:
The two new unit tests are identity-only. Cases that would most increase my confidence:
- non-identity transform (
bucket/truncate), confirming the transformed partition value lands in_partition, consistent withPartitionUtil.constantsMapreadingpartitionData.get(pos); - unpartitioned table end-to-end (exercises the empty-struct-to-null branch);
- explicit null partition value (distinct from evolution-missing);
_partitionselected alongside_file/_pos/_spec_idin one scan, since they share the projection/transform path.
|
|
||
| seen_field_ids.insert(field.field_id); | ||
|
|
||
| let source_field = schema.field_by_id(field.source_id).ok_or_else(|| { |
There was a problem hiding this comment.
Java's Partitioning.partitionType only considers allActiveFieldIds, which filters out fields whose source column is gone (schema.findField(field.sourceId()) != null in Partitioning.java). Here we return ErrorKind::Unexpected instead. If I'm reading both right, a table that dropped a column it used to partition on (allowed under https://iceberg.apache.org/spec/#partition-evolution) would error the whole scan when _partition is projected. Would continue-ing on a missing source column match Java's intent better? A test that drops a partition source column would settle it either way.
| } | ||
| } | ||
|
|
||
| Ok(StructType::new(struct_fields)) |
There was a problem hiding this comment.
buildPartitionProjectionType sorts its output fields by id ascending (fieldMap.keySet().stream().sorted(Comparator.naturalOrder()) in Partitioning.java). Here we emit them in spec-descending, first-seen order. In the common case these coincide, but under partition evolution I think they can differ, and since the _partition struct schema is consumed by engines (this is the Comet path), matching Java's ordering seems safer. Worth a sort plus an evolution test that would otherwise reorder? Curious whether you already considered this and decided it's fine.
| } | ||
|
|
||
| // Skip void transforms (dropped partition columns) | ||
| if matches!(field.transform, Transform::Void) { |
There was a problem hiding this comment.
Void/unknown handling was already discussed and implemented, so I won't re-litigate it. One residual nuance only if you're interested: because we continue on Void before inserting into seen_field_ids, if the newest spec marks a field Void and an older spec has it non-Void, the field is still included (good, matches Java's type-upgrade intent), but its name then comes from the older spec, whereas Java prefers the newest spec's name (Partitioning.java populates nameMap from the first/newest spec that defines the id). Minor and possibly fine as-is; a void-in-newest test would pin whichever behavior you intend.
| let vals: Vec<Option<&[u8]>> = vec![None; num_rows]; | ||
| Arc::new(BinaryArray::from_opt_vec(vals)) | ||
| } | ||
| (DataType::LargeBinary, Some(PrimitiveLiteral::Binary(value))) => { |
There was a problem hiding this comment.
@advancedxy already questioned whether these arms are related; you explained they're needed for _partition struct children and logged the Binary-to-LargeBinary mapping as #2698. This is a separate, reuse-only angle on the same lines, not a relevance question.
I'd push for using arrow-rs here rather than growing our own array-construction match. The hand-rolled arms are duplicate code we own forever: every Arrow type a partition column can take is another arm, and a missing one is a runtime error (the (dt, _) catch-all), not a compile error. arrow-rs already maintains correct, exhaustive versions:
arrow_array::new_null_array(dt, len)builds an all-null array for anyDataType, recursively forStruct(arrow-array/src/array/mod.rs:1020; struct pathstruct_array.rs:192). This replaces every(dt, None) => ...arm and the(DataType::Struct(fields), None)arm.PrimitiveArray::from_value(v, n)(arrow-array/src/array/primitive_array.rs:808) for the primitiveSomearms.GenericByteArray::new_repeated(v, n)(arrow-array/src/array/byte_array.rs:198) for String/Binary/LargeBinary.
Delegating to these shrinks the match substantially and means new partition types are covered by arrow-rs instead of by us adding (and testing) another arm. If there's a reason to keep the explicit arms (e.g. a type arrow-rs handles differently), worth a comment saying so; otherwise I'd lean on the library. A test driving _partition through decimal/timestamptz/uuid would also turn any remaining gap into a test failure rather than a read-time error. (Could fold into #2698.)
| } | ||
| } | ||
|
|
||
| fn create_struct_column( |
There was a problem hiding this comment.
create_struct_column is hand-rolled struct assembly that arrow-rs already provides: the empty/all-null path is StructArray::new_null(fields, len) / new_null_array (arrow-array/src/array/struct_array.rs:192), and the populated path is StructArray::try_new(fields, arrays, nulls), which validates fields.len() == arrays.len() and child-length agreement (struct_array.rs:106). I'd reuse those rather than maintain our own constructor: try_new also enforces the fields/child_values length invariant that the current zip silently truncates. Same principle as the value.rs note: prefer the library's validated constructors over duplicating them here.
| partition_specs: impl Iterator<Item = &'a PartitionSpec>, | ||
| schema: &Schema, | ||
| ) -> Result<StructType> { | ||
| let mut seen_field_ids = std::collections::HashSet::new(); |
There was a problem hiding this comment.
Small consistency thing: use std::collections::HashSet; / use std::cmp::Reverse; up top matches the surrounding style.
| let struct_type = DataType::Struct(pc.fields.clone()); | ||
| let nullable = pc.fields.is_empty(); | ||
| let arrow_field = | ||
| Field::new(RESERVED_COL_NAME_PARTITION, struct_type, nullable) |
There was a problem hiding this comment.
This exact block recurs several times across these PRs. A small fn field_with_id(name, dt, nullable, field_id) -> Arc<Field> helper would remove the repetition and keep the field-id metadata format in one place.
| @@ -138,6 +142,7 @@ impl ManifestEntryContext { | |||
| // TODO: Pass actual PartitionSpec through context chain for native flow | |||
| .with_partition_spec(None) | |||
There was a problem hiding this comment.
build_partition_column_constant needs task.partition_spec to be Some for partitioned tables, but this PR keeps partition_spec(None); #2695 is the one that wires the real spec through context.rs. Is the plan to rebase on #2695? If so, an end-to-end test asserting non-null partition values would confirm the wiring once rebased.
| .transpose()? | ||
| .map(Arc::new); | ||
|
|
||
| // Compute unified partition type if _partition is projected |
There was a problem hiding this comment.
Gating on "_partition projected" is a nice touch. Java builds the type from table.specs().values() (all specs). Could we confirm partition_specs_iter() enumerates every historical spec, not just current, and ideally pin it with a multi-spec test? That is what makes evolution visible in _partition.
| /// materialize the `_partition` struct column at read time. | ||
| #[serde(default)] | ||
| #[serde(skip_serializing_if = "Option::is_none")] | ||
| #[serde(serialize_with = "serialize_not_implemented")] |
There was a problem hiding this comment.
This connects to your own thread on the PR (Comet doesn't get unified_partition_type from Iceberg Java, and parthchandra's plan is for Comet to compute the equivalent, per the linked planner.rs). Independent of that: the serde attrs here match the existing partition / partition_spec / name_mapping fields, so it's consistent, not a regression. The implication worth one line in the PR text: this field (like the others) does not round-trip through serde, so the _partition machinery works in the native scan flow only. Since Comet builds tasks natively that's fine, just easy for a future reader to assume otherwise.
I've opened a draft PR in Comet: apache/datafusion-comet#4752 based on the same branch I referenced above. All existing tests pass. Working on addressing your newer comments (thanks for the detailed review!). |
Which issue does this PR close?
What changes are included in this PR?
Implements the _partition metadata column for table scans. This is a struct column whose type is the union of all partition fields across all partition specs (handling partition evolution). Each row gets the
partition values for its data file.
Are these changes tested?
Because we do not have write support yet, I made the corresponding change to comet and then tested by adding tests in Comet which uses iceberg-java to write files and then iceberg-rust to read them back.
https://github.com/parthchandra/datafusion-comet/blob/iceberg-metadata-columns/spark/src/test/resources/sql-tests/iceberg/metadata_column_partition.sql