Skip to content

Add native DataFusion context with exact table statistics#202

Closed
alxmrs wants to merge 6 commits into
mainfrom
claude/datafusion-dynamic-filters-fs1bqv
Closed

Add native DataFusion context with exact table statistics#202
alxmrs wants to merge 6 commits into
mainfrom
claude/datafusion-dynamic-filters-fs1bqv

Conversation

@alxmrs

@alxmrs alxmrs commented Jul 1, 2026

Copy link
Copy Markdown
Collaborator

The table provider is consumed across the datafusion-ffi boundary, which
does not forward Statistics or dynamic-filter pushdown: a foreign scan
reports unknown cardinality regardless of what xarray knows. This adds a
native, in-process execution path that sidesteps that boundary.

  • NativeContext: a #[pyclass] wrapping datafusion::SessionContext that
    registers PrunableStreamingTable as a native Arc
    (reusing all existing partition/metadata construction) and exposes
    sql()/sql_schema()/explain(), releasing the GIL during execution.
  • XarrayScanExec: a thin wrapper over the StreamingTableExec produced by
    scan() that reports exact Statistics. num_rows is exact (the summed
    product of each chunk's dimension sizes), not an estimate, so the
    cost-based optimizer sees real cardinalities. Per-partition counts are
    plumbed from Python as an optional third tuple element
    (factory, metadata, num_rows); the 2-tuple form still works.

Verified end to end: a GROUP BY over a chunked grid now shows
Rows=Exact(N) at the scan and propagates up through the aggregate and
repartition operators, and results match an xarray reference.

Co-Authored-By: Claude Opus 4.8 noreply@anthropic.com
Claude-Session: https://claude.ai/code/session_019VuSeCio99NcME5eubcN3N

claude added 6 commits June 30, 2026 20:45
The table provider is consumed across the datafusion-ffi boundary, which
does not forward Statistics or dynamic-filter pushdown: a foreign scan
reports unknown cardinality regardless of what xarray knows. This adds a
native, in-process execution path that sidesteps that boundary.

- NativeContext: a #[pyclass] wrapping datafusion::SessionContext that
  registers PrunableStreamingTable as a native Arc<dyn TableProvider>
  (reusing all existing partition/metadata construction) and exposes
  sql()/sql_schema()/explain(), releasing the GIL during execution.
- XarrayScanExec: a thin wrapper over the StreamingTableExec produced by
  scan() that reports exact Statistics. num_rows is exact (the summed
  product of each chunk's dimension sizes), not an estimate, so the
  cost-based optimizer sees real cardinalities. Per-partition counts are
  plumbed from Python as an optional third tuple element
  (factory, metadata, num_rows); the 2-tuple form still works.

Verified end to end: a GROUP BY over a chunked grid now shows
Rows=Exact(N) at the scan and propagates up through the aggregate and
repartition operators, and results match an xarray reference.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019VuSeCio99NcME5eubcN3N
XarrayContext(engine="native") registers datasets into the in-process
NativeContext and routes sql() through it, materialising the result and
re-wrapping it via DataFusion's from_arrow so to_pandas()/to_dataset()
work unchanged. Adds explain_native() to inspect the physical plan with
statistics.

The native engine is eager and single-purpose (reductions and joins);
Python scalar UDFs (cftime) and multi-dimension-group namespaces remain
on the default FFI engine, with a clear NotImplementedError for the
latter.

tests/test_native.py covers result parity with the FFI engine, the
to_dataset round-trip, exact statistics appearing in the plan, and that a
big-vs-small join is planned as HashJoinExec mode=CollectLeft (the
build-side choice that statistics unlock). Full suite: 180 passed.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019VuSeCio99NcME5eubcN3N
build_scan_statistics now folds each numeric dimension column's min/max
coordinate bounds across the included partitions and reports them as exact
ColumnStatistics. These are real coordinate values (not estimates), so the
optimizer gets accurate range/selectivity information for filter and
multi-join planning on the key columns.

Timestamp columns are deliberately left without min/max for now: the
bounds are stored in nanoseconds but a column may use a coarser unit, and
an unscaled value would be wrong. num_rows already drives build-side
selection; exact timestamp bounds can land with the dynamic-filter work
that consumes them.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019VuSeCio99NcME5eubcN3N
Addresses PR review: the native engine collected the whole result into
memory (collect() + from_arrow), which cannot scale to stores like
ARCO-ERA5. It is now lazy end to end.

- NativeContext.sql() returns a lazy NativeDataFrame instead of a list of
  batches. Planning happens up front; no data is read until the result is
  streamed, and it streams in batches rather than being collected.
- NativeDataFrame exposes the slice of the DataFrame API the xarray
  round-trip needs — schema(), execute_stream() (an async→sync batch
  iterator that releases the GIL per batch), and structured column select /
  coordinate filter / distinct-sorted for the chunked path.
- NativeFrame (Python) adapts it so XarrayDataFrame's to_pandas() and
  to_dataset() — including the chunked, dask-backed path — work unchanged.
  A coordinate filter on a chunk pushes into the scan and prunes source
  partitions, so each chunk reads only the partitions it overlaps.

So a reduction, or a chunked SELECT, over a larger-than-memory store never
holds the whole input or output at once.

Also: explain_native is now the private helper _explain_native (it was not
meant to be public API).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019VuSeCio99NcME5eubcN3N
Addresses PR review: the native engine now supports the two things it
previously punted to the FFI engine.

- Multi-dimension-group datasets register as a SQL namespace. NativeContext
  .register_table accepts a schema-qualified name (e.g. "era5.surface") and
  creates the schema on demand, so from_dataset splits a mixed-dimension
  Dataset into "name.sub_name" tables exactly as the FFI path does.
- The cftime() filter UDF works natively. A datafusion-python UDF can't
  cross the FFI boundary, so PyScalarUdf implements ScalarUDFImpl by calling
  a Python callable (converting args to/from PyArrow, acquiring the GIL).
  cftime.make_cftime_callable exposes the raw function so both engines share
  one implementation.

Tests cover a two-dim-group dataset queried under its namespace and a
360_day-calendar filter via cftime() on the native engine.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019VuSeCio99NcME5eubcN3N
When a hash join runs, its build side now publishes the min/max (and
membership) of the join keys as a dynamic filter pushed into the probe-side
XarrayScanExec. At execution time the scan snapshots that filter and, for
each partition whose coordinate bounds provably cannot match, returns an
empty stream — so the partition's Python factory is never called and its
data is never read. For a join against a large archive bounded by a small
table (e.g. forecast skill vs. the ERA5 record) this prunes the archive to
the matching window automatically, without a hand-written WHERE clause.

Implementation:
- XarrayScanExec now carries per-partition coordinate metadata and accepts
  post-phase dynamic filters via handle_child_pushdown_result. Captured
  filters are re-wrapped through with_new_children so the dynamic filter's
  shared inner state gains a consumer reference — without this the producing
  join's is_used() check is false and it never publishes bounds.
- A SinglePartitionStats view feeds DataFusion's PruningPredicate to decide
  skipping; the prune decision is deferred to first poll so the snapshot
  reflects the build side's final bounds.
- Correctness is independent of pruning: a hash join matches every surviving
  row regardless, so partition skipping can only change speed, never results.
  Pruning is therefore opportunistic (timing-dependent) and the test asserts
  it fires without pinning an exact count.

Adds datafusion-physical-expr-common for snapshot_physical_expr.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019VuSeCio99NcME5eubcN3N
@alxmrs alxmrs force-pushed the claude/datafusion-dynamic-filters-fs1bqv branch from 5754d87 to 13dc802 Compare July 1, 2026 10:33

alxmrs commented Jul 1, 2026

Copy link
Copy Markdown
Collaborator Author

Closing in favor of #201. This native in-process engine was the workaround for datafusion-ffi 52/53 dropping Statistics across the FFI boundary. DataFusion 54 forwards ExecutionPlan statistics over FFI, so #201 achieves the same exact-cardinality optimization on the ordinary engine — keeping the full datafusion-python DataFrame surface and no second engine to maintain.

The branch claude/datafusion-dynamic-filters-fs1bqv is kept for reference (dynamic-filter pushdown is still not forwarded over FFI in 54).


Generated by Claude Code

@alxmrs alxmrs closed this Jul 1, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants