From 3afbc7d8caee6234ba88d6e42b95873c840e367a Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 17 May 2026 12:41:09 -0400 Subject: [PATCH 1/4] feat: accept distinct kwarg on sum and avg MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Upstream exposes `sum_distinct` / `avg_distinct` / `count_distinct` as sibling functions that call the same underlying UDAF with `distinct: bool = true`. The Rust binding side already routes `distinct=Some(true)` through the aggregate builder for `sum`, `avg`, and `count` — but only `count` exposed the kwarg on the Python wrapper. Add `distinct: bool = False` to `sum()` and `avg()` mirroring the existing `count()` signature, and update SKILL.md so the check-upstream audit does not re-flag the three upstream `*_distinct` shortcuts as gaps. The plan emitted by `sum(col, distinct=True)` matches what upstream's `sum_distinct(col)` builds. Co-Authored-By: Claude Opus 4.7 (1M context) --- .ai/skills/check-upstream/SKILL.md | 9 ++++++++- python/datafusion/functions.py | 30 ++++++++++++++++++++++++++---- python/tests/test_functions.py | 30 ++++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+), 5 deletions(-) diff --git a/.ai/skills/check-upstream/SKILL.md b/.ai/skills/check-upstream/SKILL.md index 3bac018ef..529add0c3 100644 --- a/.ai/skills/check-upstream/SKILL.md +++ b/.ai/skills/check-upstream/SKILL.md @@ -82,11 +82,18 @@ The user may specify an area via `$ARGUMENTS`. If no area is specified or "all" - Python API: `python/datafusion/functions.py` (aggregate functions are mixed in with scalar functions) - Rust bindings: `crates/core/src/functions.rs` +**Evaluated and not requiring separate Python exposure:** +- `count_distinct` — covered by `count(expr, distinct=True)`. Both forms call + `count_udaf` with `distinct: bool = true` and produce the same logical plan. +- `sum_distinct` — covered by `sum(expr, distinct=True)`. +- `avg_distinct` — covered by `avg(expr, distinct=True)`. + **How to check:** 1. Fetch the upstream aggregate function documentation page 2. Compare against aggregate functions in `python/datafusion/functions.py` (check `__all__` list and function definitions) 3. A function is covered if it exists in the Python API, even if it aliases another function's Rust binding -4. Report only functions missing from the Python API +4. Check against the "evaluated and not requiring exposure" list before flagging as a gap +5. Report only functions missing from the Python API ### 3. Window Functions diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 9761d1879..83af282ff 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -4521,6 +4521,7 @@ def grouping( def avg( expression: Expr, + distinct: bool = False, filter: Expr | None = None, ) -> Expr: """Returns the average value. @@ -4528,10 +4529,12 @@ def avg( This aggregate function expects a numeric expression and will return a float. If using the builder functions described in ref:`_aggregation` this function ignores - the options ``order_by``, ``null_treatment``, and ``distinct``. + the options ``order_by`` and ``null_treatment``. Args: expression: Values to combine into an array + distinct: If True, only distinct values are averaged. Equivalent to the + upstream ``avg_distinct`` shortcut. filter: If provided, only compute against rows for which the filter is True Examples: @@ -4551,9 +4554,17 @@ def avg( ... ).alias("v")]) >>> result.collect_column("v")[0].as_py() 2.5 + + >>> df = ctx.from_pydict({"a": [1.0, 1.0, 2.0, 3.0]}) + >>> result = df.aggregate( + ... [], [dfn.functions.avg( + ... dfn.col("a"), distinct=True, + ... ).alias("v")]) + >>> result.collect_column("v")[0].as_py() + 2.0 """ filter_raw = filter.expr if filter is not None else None - return Expr(f.avg(expression.expr, filter=filter_raw)) + return Expr(f.avg(expression.expr, distinct=distinct, filter=filter_raw)) def corr(value_y: Expr, value_x: Expr, filter: Expr | None = None) -> Expr: @@ -4838,6 +4849,7 @@ def min(expression: Expr, filter: Expr | None = None) -> Expr: def sum( expression: Expr, + distinct: bool = False, filter: Expr | None = None, ) -> Expr: """Computes the sum of a set of numbers. @@ -4845,10 +4857,12 @@ def sum( This aggregate function expects a numeric expression. If using the builder functions described in ref:`_aggregation` this function ignores - the options ``order_by``, ``null_treatment``, and ``distinct``. + the options ``order_by`` and ``null_treatment``. Args: expression: Values to combine into an array + distinct: If True, only distinct values are summed. Equivalent to the + upstream ``sum_distinct`` shortcut. filter: If provided, only compute against rows for which the filter is True Examples: @@ -4868,9 +4882,17 @@ def sum( ... ).alias("v")]) >>> result.collect_column("v")[0].as_py() 5 + + >>> df = ctx.from_pydict({"a": [1, 1, 2, 3]}) + >>> result = df.aggregate( + ... [], [dfn.functions.sum( + ... dfn.col("a"), distinct=True, + ... ).alias("v")]) + >>> result.collect_column("v")[0].as_py() + 6 """ filter_raw = filter.expr if filter is not None else None - return Expr(f.sum(expression.expr, filter=filter_raw)) + return Expr(f.sum(expression.expr, distinct=distinct, filter=filter_raw)) def stddev(expression: Expr, filter: Expr | None = None) -> Expr: diff --git a/python/tests/test_functions.py b/python/tests/test_functions.py index 5538fc33b..34435ac12 100644 --- a/python/tests/test_functions.py +++ b/python/tests/test_functions.py @@ -1957,6 +1957,36 @@ def test_get_field(df): assert result.column(1) == pa.array([4, 5, 6]) +def test_sum_distinct_kwarg(): + ctx = SessionContext() + df = ctx.from_pydict({"a": [1, 1, 2, 3]}) + distinct = ( + df.aggregate([], [f.sum(column("a"), distinct=True).alias("v")]) + .collect_column("v")[0] + .as_py() + ) + total = ( + df.aggregate([], [f.sum(column("a")).alias("v")]).collect_column("v")[0].as_py() + ) + assert distinct == 6 + assert total == 7 + + +def test_avg_distinct_kwarg(): + ctx = SessionContext() + df = ctx.from_pydict({"a": [1.0, 1.0, 2.0, 3.0]}) + distinct = ( + df.aggregate([], [f.avg(column("a"), distinct=True).alias("v")]) + .collect_column("v")[0] + .as_py() + ) + mean = ( + df.aggregate([], [f.avg(column("a")).alias("v")]).collect_column("v")[0].as_py() + ) + assert distinct == 2.0 + assert mean == 1.75 + + def test_arrow_metadata(): ctx = SessionContext() field = pa.field("val", pa.int64(), metadata={"key1": "value1", "key2": "value2"}) From 6a000ec48eeee1130e0e35a59485e50477958bed Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 27 May 2026 16:47:54 -0400 Subject: [PATCH 2/4] test: fold sum/avg distinct tests into parameterized aggregation test Move the standalone test_sum_distinct_kwarg and test_avg_distinct_kwarg from test_functions.py into the existing test_aggregation::test_aggregation parameterization, matching how distinct is already covered for median, array_agg, count, and bit_xor. Co-Authored-By: Claude Opus 4.7 (1M context) --- python/tests/test_aggregation.py | 2 ++ python/tests/test_functions.py | 30 ------------------------------ 2 files changed, 2 insertions(+), 30 deletions(-) diff --git a/python/tests/test_aggregation.py b/python/tests/test_aggregation.py index f5c54f756..ef51343aa 100644 --- a/python/tests/test_aggregation.py +++ b/python/tests/test_aggregation.py @@ -192,7 +192,9 @@ def test_aggregation_stats(df, agg_expr, calc_expected): False, ), (f.avg(column("b"), filter=column("a") != lit(1)), pa.array([5.0]), False), + (f.avg(column("b"), distinct=True), pa.array([5.0]), False), (f.sum(column("b"), filter=column("a") != lit(1)), pa.array([10]), False), + (f.sum(column("b"), distinct=True), pa.array([10]), False), (f.count(column("b"), distinct=True), pa.array([2]), False), (f.count(column("b"), filter=column("a") != 3), pa.array([2]), False), (f.count(), pa.array([3]), False), diff --git a/python/tests/test_functions.py b/python/tests/test_functions.py index 34435ac12..5538fc33b 100644 --- a/python/tests/test_functions.py +++ b/python/tests/test_functions.py @@ -1957,36 +1957,6 @@ def test_get_field(df): assert result.column(1) == pa.array([4, 5, 6]) -def test_sum_distinct_kwarg(): - ctx = SessionContext() - df = ctx.from_pydict({"a": [1, 1, 2, 3]}) - distinct = ( - df.aggregate([], [f.sum(column("a"), distinct=True).alias("v")]) - .collect_column("v")[0] - .as_py() - ) - total = ( - df.aggregate([], [f.sum(column("a")).alias("v")]).collect_column("v")[0].as_py() - ) - assert distinct == 6 - assert total == 7 - - -def test_avg_distinct_kwarg(): - ctx = SessionContext() - df = ctx.from_pydict({"a": [1.0, 1.0, 2.0, 3.0]}) - distinct = ( - df.aggregate([], [f.avg(column("a"), distinct=True).alias("v")]) - .collect_column("v")[0] - .as_py() - ) - mean = ( - df.aggregate([], [f.avg(column("a")).alias("v")]).collect_column("v")[0].as_py() - ) - assert distinct == 2.0 - assert mean == 1.75 - - def test_arrow_metadata(): ctx = SessionContext() field = pa.field("val", pa.int64(), metadata={"key1": "value1", "key2": "value2"}) From d16b5685e587d4503a2830b3fc051de01aea6f6e Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 27 May 2026 16:48:27 -0400 Subject: [PATCH 3/4] docs: clarify distinct kwarg on sum and avg Drop the unhelpful "upstream avg_distinct/sum_distinct shortcut" reference in favor of describing the actual behavior. Co-Authored-By: Claude Opus 4.7 (1M context) --- python/datafusion/functions.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 83af282ff..c427b2f4b 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -4533,8 +4533,7 @@ def avg( Args: expression: Values to combine into an array - distinct: If True, only distinct values are averaged. Equivalent to the - upstream ``avg_distinct`` shortcut. + distinct: If True, duplicate values are removed before averaging. filter: If provided, only compute against rows for which the filter is True Examples: @@ -4861,8 +4860,7 @@ def sum( Args: expression: Values to combine into an array - distinct: If True, only distinct values are summed. Equivalent to the - upstream ``sum_distinct`` shortcut. + distinct: If True, duplicate values are removed before summing. filter: If provided, only compute against rows for which the filter is True Examples: From 1a50d99f2d8ef651929b31bfd73699f21888e452 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 27 May 2026 16:53:50 -0400 Subject: [PATCH 4/4] docs: note sum/avg distinct argument-order breaking change distinct is inserted before filter on sum and avg for consistency with the other aggregate functions, breaking positional filter callers. Add a DataFusion 54.0.0 upgrade-guide entry covering the migration. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/source/user-guide/upgrade-guides.rst | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/docs/source/user-guide/upgrade-guides.rst b/docs/source/user-guide/upgrade-guides.rst index 2ac7f7703..a953b1da4 100644 --- a/docs/source/user-guide/upgrade-guides.rst +++ b/docs/source/user-guide/upgrade-guides.rst @@ -18,6 +18,29 @@ Upgrade Guides ============== +DataFusion 54.0.0 +----------------- + +The aggregate functions :py:func:`~datafusion.functions.sum` and +:py:func:`~datafusion.functions.avg` now accept a ``distinct`` argument, matching +the other aggregate functions. ``distinct`` is inserted *before* ``filter`` in the +argument list, so any code that passed ``filter`` positionally must be updated to +pass it as a keyword argument. + +Before: + +.. code-block:: python + + f.sum(column("a"), my_filter) + f.avg(column("a"), my_filter) + +Now: + +.. code-block:: python + + f.sum(column("a"), filter=my_filter) + f.avg(column("a"), filter=my_filter) + DataFusion 53.0.0 -----------------