diff --git a/.ai/skills/check-upstream/SKILL.md b/.ai/skills/check-upstream/SKILL.md index 3bac018ef..529add0c3 100644 --- a/.ai/skills/check-upstream/SKILL.md +++ b/.ai/skills/check-upstream/SKILL.md @@ -82,11 +82,18 @@ The user may specify an area via `$ARGUMENTS`. If no area is specified or "all" - Python API: `python/datafusion/functions.py` (aggregate functions are mixed in with scalar functions) - Rust bindings: `crates/core/src/functions.rs` +**Evaluated and not requiring separate Python exposure:** +- `count_distinct` — covered by `count(expr, distinct=True)`. Both forms call + `count_udaf` with `distinct: bool = true` and produce the same logical plan. +- `sum_distinct` — covered by `sum(expr, distinct=True)`. +- `avg_distinct` — covered by `avg(expr, distinct=True)`. + **How to check:** 1. Fetch the upstream aggregate function documentation page 2. Compare against aggregate functions in `python/datafusion/functions.py` (check `__all__` list and function definitions) 3. A function is covered if it exists in the Python API, even if it aliases another function's Rust binding -4. Report only functions missing from the Python API +4. Check against the "evaluated and not requiring exposure" list before flagging as a gap +5. Report only functions missing from the Python API ### 3. Window Functions diff --git a/docs/source/user-guide/upgrade-guides.rst b/docs/source/user-guide/upgrade-guides.rst index 2ac7f7703..a953b1da4 100644 --- a/docs/source/user-guide/upgrade-guides.rst +++ b/docs/source/user-guide/upgrade-guides.rst @@ -18,6 +18,29 @@ Upgrade Guides ============== +DataFusion 54.0.0 +----------------- + +The aggregate functions :py:func:`~datafusion.functions.sum` and +:py:func:`~datafusion.functions.avg` now accept a ``distinct`` argument, matching +the other aggregate functions. ``distinct`` is inserted *before* ``filter`` in the +argument list, so any code that passed ``filter`` positionally must be updated to +pass it as a keyword argument. + +Before: + +.. code-block:: python + + f.sum(column("a"), my_filter) + f.avg(column("a"), my_filter) + +Now: + +.. code-block:: python + + f.sum(column("a"), filter=my_filter) + f.avg(column("a"), filter=my_filter) + DataFusion 53.0.0 ----------------- diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 9761d1879..c427b2f4b 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -4521,6 +4521,7 @@ def grouping( def avg( expression: Expr, + distinct: bool = False, filter: Expr | None = None, ) -> Expr: """Returns the average value. @@ -4528,10 +4529,11 @@ def avg( This aggregate function expects a numeric expression and will return a float. If using the builder functions described in ref:`_aggregation` this function ignores - the options ``order_by``, ``null_treatment``, and ``distinct``. + the options ``order_by`` and ``null_treatment``. Args: expression: Values to combine into an array + distinct: If True, duplicate values are removed before averaging. filter: If provided, only compute against rows for which the filter is True Examples: @@ -4551,9 +4553,17 @@ def avg( ... ).alias("v")]) >>> result.collect_column("v")[0].as_py() 2.5 + + >>> df = ctx.from_pydict({"a": [1.0, 1.0, 2.0, 3.0]}) + >>> result = df.aggregate( + ... [], [dfn.functions.avg( + ... dfn.col("a"), distinct=True, + ... ).alias("v")]) + >>> result.collect_column("v")[0].as_py() + 2.0 """ filter_raw = filter.expr if filter is not None else None - return Expr(f.avg(expression.expr, filter=filter_raw)) + return Expr(f.avg(expression.expr, distinct=distinct, filter=filter_raw)) def corr(value_y: Expr, value_x: Expr, filter: Expr | None = None) -> Expr: @@ -4838,6 +4848,7 @@ def min(expression: Expr, filter: Expr | None = None) -> Expr: def sum( expression: Expr, + distinct: bool = False, filter: Expr | None = None, ) -> Expr: """Computes the sum of a set of numbers. @@ -4845,10 +4856,11 @@ def sum( This aggregate function expects a numeric expression. If using the builder functions described in ref:`_aggregation` this function ignores - the options ``order_by``, ``null_treatment``, and ``distinct``. + the options ``order_by`` and ``null_treatment``. Args: expression: Values to combine into an array + distinct: If True, duplicate values are removed before summing. filter: If provided, only compute against rows for which the filter is True Examples: @@ -4868,9 +4880,17 @@ def sum( ... ).alias("v")]) >>> result.collect_column("v")[0].as_py() 5 + + >>> df = ctx.from_pydict({"a": [1, 1, 2, 3]}) + >>> result = df.aggregate( + ... [], [dfn.functions.sum( + ... dfn.col("a"), distinct=True, + ... ).alias("v")]) + >>> result.collect_column("v")[0].as_py() + 6 """ filter_raw = filter.expr if filter is not None else None - return Expr(f.sum(expression.expr, filter=filter_raw)) + return Expr(f.sum(expression.expr, distinct=distinct, filter=filter_raw)) def stddev(expression: Expr, filter: Expr | None = None) -> Expr: diff --git a/python/tests/test_aggregation.py b/python/tests/test_aggregation.py index f5c54f756..ef51343aa 100644 --- a/python/tests/test_aggregation.py +++ b/python/tests/test_aggregation.py @@ -192,7 +192,9 @@ def test_aggregation_stats(df, agg_expr, calc_expected): False, ), (f.avg(column("b"), filter=column("a") != lit(1)), pa.array([5.0]), False), + (f.avg(column("b"), distinct=True), pa.array([5.0]), False), (f.sum(column("b"), filter=column("a") != lit(1)), pa.array([10]), False), + (f.sum(column("b"), distinct=True), pa.array([10]), False), (f.count(column("b"), distinct=True), pa.array([2]), False), (f.count(column("b"), filter=column("a") != 3), pa.array([2]), False), (f.count(), pa.array([3]), False),