From d45b49433414042b6e96707b83f0b45a8bab5332 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Fri, 7 Jul 2023 19:42:19 +0200 Subject: [PATCH 01/57] Turn on cumulative sum --- flox/aggregations.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/flox/aggregations.py b/flox/aggregations.py index d7db7e0fd..8ee64f921 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -433,6 +433,19 @@ def _zip_index(array_, idx_): # median = Aggregation("median", chunk=None, combine=None, fill_value=None) # nanmedian = Aggregation("nanmedian", chunk=None, combine=None, fill_value=None) +# Cumulatives: +cumsum_ = Aggregation( + "cumsum", + chunk="cumsum", + combine="sum", + fill_value=0, + final_fill_value=0, + # dtypes=bool, + # final_dtype=bool, +) +# sum_ = Aggregation("sum", chunk="sum", combine="sum", fill_value=0) + + aggregations = { "any": any_, "all": all_, @@ -459,6 +472,7 @@ def _zip_index(array_, idx_): "nanfirst": nanfirst, "last": last, "nanlast": nanlast, + "cumsum": cumsum_, } From 29747f02ed5025d4d276930f337de9e482f31eb2 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Fri, 7 Jul 2023 19:45:55 +0200 Subject: [PATCH 02/57] Add basic test --- tests/test_core.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/test_core.py b/tests/test_core.py index 2fc534d83..6845e1a30 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1203,3 +1203,15 @@ def test_1d_blockwise_sort_optimization(): array, time.dt.dayofyear.values[::-1], sort=False, method="blockwise", func="count" ) assert all("getitem" not in k for k in actual.dask.layers) + + +def test_cumsum(): + import numpy_groupies as npg + + group_idx = np.array([4, 3, 3, 4, 4, 1, 1, 1, 7, 8, 7, 4, 3, 3, 1, 1]) + a = np.array([3, 4, 1, 3, 9, 9, 6, 7, 7, 0, 8, 2, 1, 8, 9, 8]) + b = npg.aggregate(group_idx, a, func="cumsum") + + bb = groupby_reduce(a, group_idx, func="cumsum", engine="numpy") + + np.testing.assert_allclose(b, bb) From 4b6e5d6ef8d25873cc63c0de1dca59f786d4a416 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Fri, 7 Jul 2023 20:05:20 +0200 Subject: [PATCH 03/57] Minimal changes to get numpy_groupies running --- flox/core.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/flox/core.py b/flox/core.py index 7cbb5659e..c797cef3c 100644 --- a/flox/core.py +++ b/flox/core.py @@ -647,8 +647,13 @@ def chunk_reduce( else: nax = by.ndim - final_array_shape = array.shape[:-nax] + (1,) * (nax - 1) - final_groups_shape = (1,) * (nax - 1) + is_cumreduction = "cumsum" in funcs + if is_cumreduction: + final_array_shape = array.shape + final_groups_shape = (1,) * (nax - 1) + else: + final_array_shape = array.shape[:-nax] + (1,) * (nax - 1) + final_groups_shape = (1,) * (nax - 1) # when axis is a tuple # collapse and move reduction dimensions to the end @@ -692,7 +697,8 @@ def chunk_reduce( # if not is_arg_reduction: group_idx, array = _prepare_for_flox(group_idx, array) - final_array_shape += results["groups"].shape + if not is_cumreduction: + final_array_shape += results["groups"].shape final_groups_shape += results["groups"].shape # we commonly have func=(..., "nanlen", "nanlen") when @@ -721,7 +727,11 @@ def chunk_reduce( if np.any(props.nanmask): # remove NaN group label which should be last result = result[..., :-1] - result = result.reshape(final_array_shape[:-1] + found_groups_shape) + + if is_cumreduction: + result = result + else: + result = result.reshape(final_array_shape[:-1] + found_groups_shape) results["intermediates"].append(result) results["groups"] = np.broadcast_to(results["groups"], final_groups_shape) From 228f2cba0a9b3361a3733466f0f5bd3bf968fc6f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 7 Jul 2023 18:08:30 +0000 Subject: [PATCH 04/57] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_core.py b/tests/test_core.py index 032ba382e..6383d2025 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1464,6 +1464,7 @@ def test_method_check_numpy(): ) assert_equal(actual, expected) + def test_cumsum() -> None: import numpy_groupies as npg @@ -1473,4 +1474,4 @@ def test_cumsum() -> None: bb = groupby_reduce(a, group_idx, func="cumsum", engine="numpy") - np.testing.assert_allclose(b, bb) \ No newline at end of file + np.testing.assert_allclose(b, bb) From 60417494319a753741d72d84972403d2e537397d Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Fri, 7 Jul 2023 20:13:17 +0200 Subject: [PATCH 05/57] Update test_core.py --- tests/test_core.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_core.py b/tests/test_core.py index 032ba382e..a3716825b 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1464,6 +1464,7 @@ def test_method_check_numpy(): ) assert_equal(actual, expected) + def test_cumsum() -> None: import numpy_groupies as npg @@ -1471,6 +1472,6 @@ def test_cumsum() -> None: a = np.array([3, 4, 1, 3, 9, 9, 6, 7, 7, 0, 8, 2, 1, 8, 9, 8]) b = npg.aggregate(group_idx, a, func="cumsum") - bb = groupby_reduce(a, group_idx, func="cumsum", engine="numpy") + bb = groupby_reduce(a, group_idx, func="cumsum", engine="numpy")[0] - np.testing.assert_allclose(b, bb) \ No newline at end of file + np.testing.assert_allclose(b, bb) From 78faf3f4ccebaa1d2a5141daa04d34376892fec4 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Fri, 7 Jul 2023 20:24:24 +0200 Subject: [PATCH 06/57] Update core.py --- flox/core.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/flox/core.py b/flox/core.py index 49610bdec..c6bcfeccf 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2054,9 +2054,10 @@ def groupby_reduce( # nan group labels are factorized to -1, and preserved # now we get rid of them by reindexing # This also handles bins with no data - result = reindex_( - result, from_=groups[0], to=expected_groups, fill_value=fill_value - ).reshape(result.shape[:-1] + grp_shape) + result = reindex_(result, from_=groups[0], to=expected_groups, fill_value=fill_value) + is_cumreduction = func == "cumsum" + if not is_cumreduction: + result = result.reshape(result.shape[:-1] + grp_shape) groups = final_groups if is_bool_array and (_is_minmax_reduction(func) or _is_first_last_reduction(func)): From 971544079e327ea6460a1b2ee2c8a7369e65b229 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Fri, 7 Jul 2023 20:31:09 +0200 Subject: [PATCH 07/57] reduce diffs --- flox/core.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flox/core.py b/flox/core.py index c6bcfeccf..adc2df3eb 100644 --- a/flox/core.py +++ b/flox/core.py @@ -746,10 +746,9 @@ def chunk_reduce( is_cumreduction = "cumsum" in funcs if is_cumreduction: final_array_shape = array.shape - final_groups_shape = (1,) * (nax - 1) else: final_array_shape = array.shape[:-nax] + (1,) * (nax - 1) - final_groups_shape = (1,) * (nax - 1) + final_groups_shape = (1,) * (nax - 1) if 1 < nax < by.ndim: # when axis is a tuple From 734ef95baa48991ff1bc648ff6199c8bbf8026f4 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Fri, 7 Jul 2023 20:33:13 +0200 Subject: [PATCH 08/57] reduce diffs --- flox/core.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/flox/core.py b/flox/core.py index adc2df3eb..4607f5833 100644 --- a/flox/core.py +++ b/flox/core.py @@ -832,9 +832,7 @@ def chunk_reduce( # remove NaN group label which should be last result = result[..., :-1] - if is_cumreduction: - result = result - else: + if not is_cumreduction: result = result.reshape(final_array_shape[:-1] + found_groups_shape) results["intermediates"].append(result) From 423184a57b86ef4a4152d5831a1f3e8f9cb8101e Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Fri, 7 Jul 2023 22:24:30 +0200 Subject: [PATCH 09/57] Copy some reduce to accumulate --- flox/core.py | 601 ++++++++++++++++++++++++++++++++++++++++++++- tests/test_core.py | 14 +- 2 files changed, 603 insertions(+), 12 deletions(-) diff --git a/flox/core.py b/flox/core.py index 4607f5833..2578acac7 100644 --- a/flox/core.py +++ b/flox/core.py @@ -110,11 +110,13 @@ def _get_expected_groups(by: T_By, sort: bool) -> T_ExpectIndex: return _convert_expected_groups_to_index((expected,), isbin=(False,), sort=sort)[0] -def _get_chunk_reduction(reduction_type: Literal["reduce", "argreduce"]) -> Callable: +def _get_chunk_reduction(reduction_type: Literal["reduce", "argreduce", "accumulate"]) -> Callable: if reduction_type == "reduce": return chunk_reduce elif reduction_type == "argreduce": return chunk_argreduce + elif reduction_type == "accumulate": + return chunk_accumulate else: raise ValueError(f"Unknown reduction type: {reduction_type}") @@ -613,6 +615,7 @@ def factorize_( return group_idx, tuple(found_groups), grp_shape, ngroups, size, props +# %% Reduce def chunk_argreduce( array_plus_idx: tuple[np.ndarray, ...], by: np.ndarray, @@ -743,11 +746,7 @@ def chunk_reduce( assert by.ndim <= array.ndim - is_cumreduction = "cumsum" in funcs - if is_cumreduction: - final_array_shape = array.shape - else: - final_array_shape = array.shape[:-nax] + (1,) * (nax - 1) + final_array_shape = array.shape[:-nax] + (1,) * (nax - 1) final_groups_shape = (1,) * (nax - 1) if 1 < nax < by.ndim: @@ -801,8 +800,7 @@ def chunk_reduce( # if not is_arg_reduction: group_idx, array = _prepare_for_flox(group_idx, array) - if not is_cumreduction: - final_array_shape += results["groups"].shape + final_array_shape += results["groups"].shape final_groups_shape += results["groups"].shape # we commonly have func=(..., "nanlen", "nanlen") when @@ -831,9 +829,7 @@ def chunk_reduce( if np.any(props.nanmask): # remove NaN group label which should be last result = result[..., :-1] - - if not is_cumreduction: - result = result.reshape(final_array_shape[:-1] + found_groups_shape) + result = result.reshape(final_array_shape[:-1] + found_groups_shape) results["intermediates"].append(result) results["groups"] = np.broadcast_to(results["groups"], final_groups_shape) @@ -2060,3 +2056,586 @@ def groupby_reduce( if is_bool_array and (_is_minmax_reduction(func) or _is_first_last_reduction(func)): result = result.astype(bool) return (result, *groups) # type: ignore[return-value] # Unpack not in mypy yet + + +# %% Accumulate +def chunk_accumulate( + array: np.ndarray, + by: np.ndarray, + func: T_Funcs, + expected_groups: pd.Index | None, + axis: T_AxesOpt = None, + fill_value: T_FillValues = None, + dtype: T_Dtypes = None, + reindex: bool = False, + engine: T_Engine = "numpy", + kwargs: Sequence[dict] | None = None, + sort: bool = True, +) -> IntermediateDict: + """ + Wrapper for numpy_groupies aggregate that supports nD ``array`` and + mD ``by``. + + Core groupby reduction using numpy_groupies. Uses ``pandas.factorize`` to factorize + ``by``. Offsets the groups if not reducing along all dimensions of ``by``. + Always ravels ``by`` to 1D, flattens appropriate dimensions of array. + + When dask arrays are passed to groupby_reduce, this function is called on every + block. + + Parameters + ---------- + array : numpy.ndarray + Array of values to reduced + by : numpy.ndarray + Array to group by. + func : str or Callable or Sequence[str] or Sequence[Callable] + Name of reduction or function, passed to numpy_groupies. + Supports multiple reductions. + axis : (optional) int or Sequence[int] + If None, reduce along all dimensions of array. + Else reduce along specified axes. + + Returns + ------- + dict + """ + + if not (isinstance(func, str) or callable(func)): + funcs = func + else: + funcs = (func,) + nfuncs = len(funcs) + + if isinstance(dtype, Sequence): + dtypes = dtype + else: + dtypes = (dtype,) * nfuncs + assert len(dtypes) >= nfuncs + + if isinstance(fill_value, Sequence): + fill_values = fill_value + else: + fill_values = (fill_value,) * nfuncs + assert len(fill_values) >= nfuncs + + if isinstance(kwargs, Sequence): + kwargss = kwargs + else: + kwargss = ({},) * nfuncs + assert len(kwargss) >= nfuncs + + if isinstance(axis, Sequence): + axes: T_Axes = axis + nax = len(axes) + else: + nax = by.ndim + if axis is None: + axes = () + else: + axes = (axis,) * nax + + assert by.ndim <= array.ndim + + final_array_shape = array.shape + final_groups_shape = (1,) * (nax - 1) + + if 1 < nax < by.ndim: + # when axis is a tuple + # collapse and move reduction dimensions to the end + by = _collapse_axis(by, nax) + array = _collapse_axis(array, nax) + axes = (-1,) + nax = 1 + + # if indices=[2,2,2], npg assumes groups are (0, 1, 2); + # and will return a result that is bigger than necessary + # avoid by factorizing again so indices=[2,2,2] is changed to + # indices=[0,0,0]. This is necessary when combining block results + # factorize can handle strings etc unlike digitize + group_idx, grps, found_groups_shape, _, size, props = factorize_( + (by,), axes, expected_groups=(expected_groups,), reindex=reindex, sort=sort + ) + groups = grps[0] + + if nax > 1: + needs_broadcast = any( + group_idx.shape[ax] != array.shape[ax] and group_idx.shape[ax] == 1 + for ax in range(-nax, 0) + ) + if needs_broadcast: + group_idx = np.broadcast_to(group_idx, array.shape[-by.ndim :]) + # always reshape to 1D along group dimensions + newshape = array.shape[: array.ndim - by.ndim] + (math.prod(array.shape[-by.ndim :]),) + array = array.reshape(newshape) + group_idx = group_idx.reshape(-1) + + assert group_idx.ndim == 1 + empty = np.all(props.nanmask) + + results: IntermediateDict = {"groups": [], "intermediates": []} + if reindex and expected_groups is not None: + # TODO: what happens with binning here? + results["groups"] = expected_groups.to_numpy() + else: + if empty: + results["groups"] = np.array([np.nan]) + else: + results["groups"] = groups + + # npg's argmax ensures that index of first "max" is returned assuming there + # are many elements equal to the "max". Sorting messes this up totally. + # so we skip this for argreductions + if engine == "flox": + # is_arg_reduction = any("arg" in f for f in func if isinstance(f, str)) + # if not is_arg_reduction: + group_idx, array = _prepare_for_flox(group_idx, array) + + final_groups_shape += results["groups"].shape + + # we commonly have func=(..., "nanlen", "nanlen") when + # counts are needed for the final result as well as for masking + # optimize that out. + previous_reduction: T_Func = "" + for reduction, fv, kw, dt in zip(funcs, fill_values, kwargss, dtypes): + if empty: + result = np.full(shape=final_array_shape, fill_value=fv) + else: + if is_nanlen(reduction) and is_nanlen(previous_reduction): + result = results["intermediates"][-1] + + # fill_value here is necessary when reducing with "offset" groups + kw_func = dict(size=size, dtype=dt, fill_value=fv) + kw_func.update(kw) + + if callable(reduction): + # passing a custom reduction for npg to apply per-group is really slow! + # So this `reduction` has to do the groupby-aggregation + result = reduction(group_idx, array, **kw_func) + else: + result = generic_aggregate( + group_idx, array, axis=-1, engine=engine, func=reduction, **kw_func + ).astype(dt, copy=False) + if np.any(props.nanmask): + # remove NaN group label which should be last + result = result[..., :-1] + + results["intermediates"].append(result) + + results["groups"] = np.broadcast_to(results["groups"], final_groups_shape) + return results + + +def _accumulate_blockwise( + array, + by, + agg: Aggregation, + *, + axis: T_Axes, + expected_groups, + fill_value, + engine: T_Engine, + sort, + reindex, +) -> FinalResultsDict: + """ + Blockwise groupby reduction that produces the final result. This code path is + also used for non-dask array aggregations. + """ + # for pure numpy grouping, we just use npg directly and avoid "finalizing" + # (agg.finalize = None). We still need to do the reindexing step in finalize + # so that everything matches the dask version. + agg.finalize = None + + assert agg.finalize_kwargs is not None + if isinstance(agg.finalize_kwargs, Mapping): + finalize_kwargs_: tuple[dict[Any, Any], ...] = (agg.finalize_kwargs,) + else: + finalize_kwargs_ = agg.finalize_kwargs + finalize_kwargs_ += ({},) + ({},) + + results = chunk_accumulate( + array, + by, + func=agg.numpy, + axis=axis, + expected_groups=expected_groups, + # This fill_value should only apply to groups that only contain NaN observations + # BUT there is funkiness when axis is a subset of all possible values + # (see below) + fill_value=agg.fill_value["numpy"], + dtype=agg.dtype["numpy"], + kwargs=finalize_kwargs_, + engine=engine, + sort=sort, + reindex=reindex, + ) + + if _is_arg_reduction(agg): + results["intermediates"][0] = np.unravel_index(results["intermediates"][0], array.shape)[-1] + + result = _finalize_results( + results, agg, axis, expected_groups, fill_value=fill_value, reindex=reindex + ) + return result + + +def groupby_accumulate( + array: np.ndarray | DaskArray, + *by: T_By, + func: T_Agg, + expected_groups: T_ExpectedGroupsOpt = None, + sort: bool = True, + isbin: T_IsBins = False, + axis: T_AxesOpt = None, + fill_value=None, + dtype: np.typing.DTypeLike = None, + min_count: int | None = None, + method: T_Method = "map-reduce", + engine: T_Engine = "numpy", + reindex: bool | None = None, + finalize_kwargs: dict[Any, Any] | None = None, +) -> tuple[DaskArray, Unpack[tuple[np.ndarray | DaskArray, ...]]]: # type: ignore[misc] # Unpack not in mypy yet + """ + GroupBy reductions using tree reductions for dask.array + + Parameters + ---------- + array : ndarray or DaskArray + Array to be reduced, possibly nD + *by : ndarray or DaskArray + Array of labels to group over. Must be aligned with ``array`` so that + ``array.shape[-by.ndim :] == by.shape`` + func : str or Aggregation + Single function name or an Aggregation instance + expected_groups : (optional) Sequence + Expected unique labels. + isbin : bool, optional + Are ``expected_groups`` bin edges? + sort : bool, optional + Whether groups should be returned in sorted order. Only applies for dask + reductions when ``method`` is not ``"map-reduce"``. For ``"map-reduce"``, the groups + are always sorted. + axis : None or int or Sequence[int], optional + If None, reduce across all dimensions of by + Else, reduce across corresponding axes of array + Negative integers are normalized using array.ndim + fill_value : Any + Value to assign when a label in ``expected_groups`` is not present. + dtype : data-type , optional + DType for the output. Can be anything that is accepted by ``np.dtype``. + min_count : int, default: None + The required number of valid values to perform the operation. If + fewer than min_count non-NA values are present the result will be + NA. Only used if skipna is set to True or defaults to True for the + array's dtype. + method : {"map-reduce", "blockwise", "cohorts", "split-reduce"}, optional + Strategy for reduction of dask arrays only: + * ``"map-reduce"``: + First apply the reduction blockwise on ``array``, then + combine a few newighbouring blocks, apply the reduction. + Continue until finalizing. Usually, ``func`` will need + to be an Aggregation instance for this method to work. + Common aggregations are implemented. + * ``"blockwise"``: + Only reduce using blockwise and avoid aggregating blocks + together. Useful for resampling-style reductions where group + members are always together. If `by` is 1D, `array` is automatically + rechunked so that chunk boundaries line up with group boundaries + i.e. each block contains all members of any group present + in that block. For nD `by`, you must make sure that all members of a group + are present in a single block. + * ``"cohorts"``: + Finds group labels that tend to occur together ("cohorts"), + indexes out cohorts and reduces that subset using "map-reduce", + repeat for all cohorts. This works well for many time groupings + where the group labels repeat at regular intervals like 'hour', + 'month', dayofyear' etc. Optimize chunking ``array`` for this + method by first rechunking using ``rechunk_for_cohorts`` + (for 1D ``by`` only). + * ``"split-reduce"``: + Same as "cohorts" and will be removed soon. + engine : {"flox", "numpy", "numba"}, optional + Algorithm to compute the groupby reduction on non-dask arrays and on each dask chunk: + * ``"numpy"``: + Use the vectorized implementations in ``numpy_groupies.aggregate_numpy``. + This is the default choice because it works for most array types. + * ``"flox"``: + Use an internal implementation where the data is sorted so that + all members of a group occur sequentially, and then numpy.ufunc.reduceat + is to used for the reduction. This will fall back to ``numpy_groupies.aggregate_numpy`` + for a reduction that is not yet implemented. + * ``"numba"``: + Use the implementations in ``numpy_groupies.aggregate_numba``. + reindex : bool, optional + Whether to "reindex" the blockwise results to ``expected_groups`` (possibly automatically detected). + If True, the intermediate result of the blockwise groupby-reduction has a value for all expected groups, + and the final result is a simple reduction of those intermediates. In nearly all cases, this is a significant + boost in computation speed. For cases like time grouping, this may result in large intermediates relative to the + original block size. Avoid that by using ``method="cohorts"``. By default, it is turned off for argreductions. + finalize_kwargs : dict, optional + Kwargs passed to finalize the reduction such as ``ddof`` for var, std. + + Returns + ------- + result + Aggregated result + *groups + Group labels + + See Also + -------- + xarray.xarray_reduce + """ + + if engine == "flox" and _is_arg_reduction(func): + raise NotImplementedError( + "argreductions not supported for engine='flox' yet." + "Try engine='numpy' or engine='numba' instead." + ) + + bys: T_Bys = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by) + nby = len(bys) + by_is_dask = tuple(is_duck_dask_array(b) for b in bys) + any_by_dask = any(by_is_dask) + + if method in ["split-reduce", "cohorts"] and any_by_dask: + raise ValueError(f"method={method!r} can only be used when grouping by numpy arrays.") + + if method == "split-reduce": + method = "cohorts" + + reindex = _validate_reindex( + reindex, func, method, expected_groups, any_by_dask, is_duck_dask_array(array) + ) + + if not is_duck_array(array): + array = np.asarray(array) + is_bool_array = np.issubdtype(array.dtype, bool) + array = array.astype(int) if is_bool_array else array + + if isinstance(isbin, Sequence): + isbins = isbin + else: + isbins = (isbin,) * nby + + _assert_by_is_aligned(array.shape, bys) + + expected_groups = _validate_expected_groups(nby, expected_groups) + + for idx, (expect, is_dask) in enumerate(zip(expected_groups, by_is_dask)): + if is_dask and (reindex or nby > 1) and expect is None: + raise ValueError( + f"`expected_groups` for array {idx} in `by` cannot be None since it is a dask.array." + ) + + # We convert to pd.Index since that lets us know if we are binning or not + # (pd.IntervalIndex or not) + expected_groups = _convert_expected_groups_to_index(expected_groups, isbins, sort) + + # Don't factorize "early only when + # grouping by dask arrays, and not having expected_groups + factorize_early = not ( + # can't do it if we are grouping by dask array but don't have expected_groups + any(is_dask and ex_ is None for is_dask, ex_ in zip(by_is_dask, expected_groups)) + ) + if factorize_early: + bys, final_groups, grp_shape = _factorize_multiple( + bys, + expected_groups, + any_by_dask=any_by_dask, + # This is the only way it makes sense I think. + # reindex controls what's actually allocated in chunk_reduce + # At this point, we care about an accurate conversion to codes. + reindex=True, + sort=sort, + ) + expected_groups = (pd.RangeIndex(math.prod(grp_shape)),) + + assert len(bys) == 1 + (by_,) = bys + (expected_groups,) = expected_groups + + if axis is None: + axis_ = tuple(array.ndim + np.arange(-by_.ndim, 0)) + else: + # TODO: How come this function doesn't exist according to mypy? + axis_ = np.core.numeric.normalize_axis_tuple(axis, array.ndim) # type: ignore + nax = len(axis_) + + has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_) + + if _is_first_last_reduction(func): + if has_dask and nax != 1: + raise ValueError( + "For dask arrays: first, last, nanfirst, nanlast reductions are " + "only supported along a single axis. Please reshape appropriately." + ) + + elif nax not in [1, by_.ndim]: + raise ValueError( + "first, last, nanfirst, nanlast reductions are only supported " + "along a single axis or when reducing across all dimensions of `by`." + ) + + # TODO: make sure expected_groups is unique + if nax == 1 and by_.ndim > 1 and expected_groups is None: + if not any_by_dask: + expected_groups = _get_expected_groups(by_, sort) + else: + # When we reduce along all axes, we are guaranteed to see all + # groups in the final combine stage, so everything works. + # This is not necessarily true when reducing along a subset of axes + # (of by) + # TODO: Does this depend on chunking of by? + # For e.g., we could relax this if there is only one chunk along all + # by dim != axis? + raise NotImplementedError( + "Please provide ``expected_groups`` when not reducing along all axes." + ) + + assert nax <= by_.ndim + if nax < by_.ndim: + by_ = _move_reduce_dims_to_end(by_, tuple(-array.ndim + ax + by_.ndim for ax in axis_)) + array = _move_reduce_dims_to_end(array, axis_) + axis_ = tuple(array.ndim + np.arange(-nax, 0)) + nax = len(axis_) + + # When axis is a subset of possible values; then npg will + # apply it to groups that don't exist along a particular axis (for e.g.) + # since these count as a group that is absent. thoo! + # fill_value applies to all-NaN groups as well as labels in expected_groups that are not found. + # The only way to do this consistently is mask out using min_count + # Consider np.sum([np.nan]) = np.nan, np.nansum([np.nan]) = 0 + if min_count is None: + if nax < by_.ndim or fill_value is not None: + min_count_: int = 1 + else: + min_count_ = 0 + else: + min_count_ = min_count + + # TODO: set in xarray? + if min_count_ > 0 and func in ["nansum", "nanprod"] and fill_value is None: + # nansum, nanprod have fill_value=0, 1 + # overwrite than when min_count is set + fill_value = np.nan + + kwargs = dict(axis=axis_, fill_value=fill_value, engine=engine) + agg = _initialize_aggregation(func, dtype, array.dtype, fill_value, min_count_, finalize_kwargs) + + groups: tuple[np.ndarray | DaskArray, ...] + if not has_dask: + results = _accumulate_blockwise( + array, by_, agg, expected_groups=expected_groups, reindex=reindex, sort=sort, **kwargs + ) + groups = (results["groups"],) + result = results[agg.name] + + else: + if TYPE_CHECKING: + # TODO: How else to narrow that array.chunks is there? + assert isinstance(array, DaskArray) + + if agg.chunk[0] is None and method != "blockwise": + raise NotImplementedError( + f"Aggregation {agg.name!r} is only implemented for dask arrays when method='blockwise'." + f"\n\n Received: {func}" + ) + + if method in ["blockwise", "cohorts"] and nax != by_.ndim: + raise NotImplementedError( + "Must reduce along all dimensions of `by` when method != 'map-reduce'." + f"Received method={method!r}" + ) + + # TODO: just do this in dask_groupby_agg + # we always need some fill_value (see above) so choose the default if needed + if kwargs["fill_value"] is None: + kwargs["fill_value"] = agg.fill_value[agg.name] + + partial_agg = partial(dask_groupby_agg, **kwargs) + + if method == "blockwise" and by_.ndim == 1: + array = rechunk_for_blockwise(array, axis=-1, labels=by_) + + result, groups = partial_agg( + array, + by_, + expected_groups=None if method == "blockwise" else expected_groups, + agg=agg, + reindex=reindex, + method=method, + sort=sort, + ) + + if sort and method != "map-reduce": + assert len(groups) == 1 + sorted_idx = np.argsort(groups[0]) + # This optimization helps specifically with resampling + if not (sorted_idx[:-1] <= sorted_idx[1:]).all(): + result = result[..., sorted_idx] + groups = (groups[0][sorted_idx],) + + if factorize_early: + # nan group labels are factorized to -1, and preserved + # now we get rid of them by reindexing + # This also handles bins with no data + result = reindex_(result, from_=groups[0], to=expected_groups, fill_value=fill_value) + groups = final_groups + + if is_bool_array and (_is_minmax_reduction(func) or _is_first_last_reduction(func)): + result = result.astype(bool) + return (result, *groups) # type: ignore[return-value] # Unpack not in mypy yet + + +# %% Aggregate +def groupby_aggregate( + array: np.ndarray | DaskArray, + *by: T_By, + func: T_Agg, + expected_groups: T_ExpectedGroupsOpt = None, + sort: bool = True, + isbin: T_IsBins = False, + axis: T_AxesOpt = None, + fill_value=None, + dtype: np.typing.DTypeLike = None, + min_count: int | None = None, + method: T_Method = "map-reduce", + engine: T_Engine = "numpy", + reindex: bool | None = None, + finalize_kwargs: dict[Any, Any] | None = None, +) -> tuple[DaskArray, Unpack[tuple[np.ndarray | DaskArray, ...]]]: # type: ignore[misc] # Unpack not in mypy + is_accumulator = "cumsum" == func + if is_accumulator: + return groupby_accumulate( + array, + *by, + func=func, + expected_groups=expected_groups, + sort=sort, + isbin=isbin, + axis=axis, + fill_value=fill_value, + dtype=dtype, + min_count=min_count, + method=method, + reindex=reindex, + finalize_kwargs=finalize_kwargs, + ) + else: + return groupby_reduce( + array, + *by, + func=func, + expected_groups=expected_groups, + sort=sort, + isbin=isbin, + axis=axis, + fill_value=fill_value, + dtype=dtype, + min_count=min_count, + method=method, + reindex=reindex, + finalize_kwargs=finalize_kwargs, + ) diff --git a/tests/test_core.py b/tests/test_core.py index a3716825b..c1a78938b 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1472,6 +1472,18 @@ def test_cumsum() -> None: a = np.array([3, 4, 1, 3, 9, 9, 6, 7, 7, 0, 8, 2, 1, 8, 9, 8]) b = npg.aggregate(group_idx, a, func="cumsum") - bb = groupby_reduce(a, group_idx, func="cumsum", engine="numpy")[0] + bb = groupby_accumulate(a, group_idx, func="cumsum", engine="numpy")[0] + + np.testing.assert_allclose(b, bb) + + +def test_groupby_aggregate() -> None: + import numpy_groupies as npg + + group_idx = np.array([4, 3, 3, 4, 4, 1, 1, 1, 7, 8, 7, 4, 3, 3, 1, 1]) + a = np.array([3, 4, 1, 3, 9, 9, 6, 7, 7, 0, 8, 2, 1, 8, 9, 8]) + b = npg.aggregate(group_idx, a, func="cumsum") + + bb = groupby_aggregate(a, group_idx, func="cumsum", engine="numpy")[0] np.testing.assert_allclose(b, bb) From d40d23da2f34a731bb0c76f99165ffed634c92b7 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Fri, 7 Jul 2023 22:25:30 +0200 Subject: [PATCH 10/57] add accumulate as reduction_type --- flox/aggregations.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/flox/aggregations.py b/flox/aggregations.py index ae13256b4..362320351 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -3,7 +3,7 @@ import copy import warnings from functools import partial -from typing import TYPE_CHECKING, Any, Callable, TypedDict +from typing import TYPE_CHECKING, Any, Callable, Literal, TypedDict import numpy as np import numpy_groupies as npg @@ -130,7 +130,7 @@ def __init__( final_fill_value=dtypes.NA, dtypes=None, final_dtype: DTypeLike | None = None, - reduction_type="reduce", + reduction_type: Literal["reduce", "argreduce", "accumulate"] = "reduce", ): """ Blueprint for computing grouped aggregations. @@ -475,8 +475,7 @@ def _pick_second(*x): "cumsum", chunk="cumsum", combine="sum", - fill_value=0, - final_fill_value=0, + reduction_type="accumulate", # dtypes=bool, # final_dtype=bool, ) From 9684ac2f696b9828d862a204a214a3be4a2a282c Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Fri, 7 Jul 2023 22:25:40 +0200 Subject: [PATCH 11/57] Update aggregations.py --- flox/aggregations.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flox/aggregations.py b/flox/aggregations.py index 362320351..b2d789373 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -476,6 +476,8 @@ def _pick_second(*x): chunk="cumsum", combine="sum", reduction_type="accumulate", + # fill_value=0, + # final_fill_value=0, # dtypes=bool, # final_dtype=bool, ) From f41e50e102e8ed8686710c76863a2599431e208a Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Fri, 7 Jul 2023 22:25:56 +0200 Subject: [PATCH 12/57] add groupby_accumulate and groupby_aggregate --- flox/__init__.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/flox/__init__.py b/flox/__init__.py index cc44cbca0..3b69832f3 100644 --- a/flox/__init__.py +++ b/flox/__init__.py @@ -3,7 +3,13 @@ """Top-level module for flox .""" from . import cache from .aggregations import Aggregation # noqa -from .core import groupby_reduce, rechunk_for_blockwise, rechunk_for_cohorts # noqa +from .core import ( + groupby_accumulate, + groupby_aggregate, + groupby_reduce, + rechunk_for_blockwise, + rechunk_for_cohorts, +) # noqa def _get_version(): From d32ae4cde98ae3cc58d509794534dbdad2d84bc2 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Fri, 7 Jul 2023 22:28:19 +0200 Subject: [PATCH 13/57] Update test_core.py --- tests/test_core.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_core.py b/tests/test_core.py index c1a78938b..795921308 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -19,6 +19,8 @@ _validate_reindex, factorize_, find_group_cohorts, + groupby_accumulate, + groupby_aggregate, groupby_reduce, rechunk_for_cohorts, reindex_, From d369d5071d39b650317225121bdc99142e288ad5 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Fri, 7 Jul 2023 22:36:47 +0200 Subject: [PATCH 14/57] reduction_type to kind --- flox/aggregations.py | 20 ++++++++++---------- flox/core.py | 18 +++++++++--------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/flox/aggregations.py b/flox/aggregations.py index b2d789373..41c7bf259 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -19,7 +19,7 @@ def _is_arg_reduction(func: str | Aggregation) -> bool: if isinstance(func, str) and func in ["argmin", "argmax", "nanargmax", "nanargmin"]: return True - if isinstance(func, Aggregation) and func.reduction_type == "argreduce": + if isinstance(func, Aggregation) and func.kind == "argreduce": return True return False @@ -130,7 +130,7 @@ def __init__( final_fill_value=dtypes.NA, dtypes=None, final_dtype: DTypeLike | None = None, - reduction_type: Literal["reduce", "argreduce", "accumulate"] = "reduce", + kind: Literal["reduce", "argreduce", "accumulate"] = "reduce", ): """ Blueprint for computing grouped aggregations. @@ -159,7 +159,7 @@ def __init__( final result. preprocess : callable For dask inputs only. Preprocess inputs before ``chunk`` stage. - reduction_type : {"reduce", "argreduce"} + kind : {"reduce", "argreduce"} Type of reduction. fill_value : number or tuple(number), optional Value to use when a group has no members. If single value will be converted @@ -178,7 +178,7 @@ def __init__( # preprocess before blockwise self.preprocess = preprocess # Use "chunk_reduce" or "chunk_argreduce" - self.reduction_type = reduction_type + self.kind = kind self.numpy: FuncTuple = (numpy,) if numpy else (self.name,) # initialize blockwise reduction self.chunk: FuncTuple = _atleast_1d(chunk) @@ -222,7 +222,7 @@ def __dask_tokenize__(self): Aggregation, self.name, self.preprocess, - self.reduction_type, + self.kind, self.numpy, self.chunk, self.combine, @@ -394,7 +394,7 @@ def _pick_second(*x): preprocess=argreduce_preprocess, chunk=("max", "argmax"), # order is important combine=("max", "argmax"), - reduction_type="argreduce", + kind="argreduce", fill_value=(dtypes.NINF, 0), final_fill_value=-1, finalize=_pick_second, @@ -407,7 +407,7 @@ def _pick_second(*x): preprocess=argreduce_preprocess, chunk=("min", "argmin"), # order is important combine=("min", "argmin"), - reduction_type="argreduce", + kind="argreduce", fill_value=(dtypes.INF, 0), final_fill_value=-1, finalize=_pick_second, @@ -420,7 +420,7 @@ def _pick_second(*x): preprocess=argreduce_preprocess, chunk=("nanmax", "nanargmax"), # order is important combine=("max", "argmax"), - reduction_type="argreduce", + kind="argreduce", fill_value=(dtypes.NINF, 0), final_fill_value=-1, finalize=_pick_second, @@ -433,7 +433,7 @@ def _pick_second(*x): preprocess=argreduce_preprocess, chunk=("nanmin", "nanargmin"), # order is important combine=("min", "argmin"), - reduction_type="argreduce", + kind="argreduce", fill_value=(dtypes.INF, 0), final_fill_value=-1, finalize=_pick_second, @@ -475,7 +475,7 @@ def _pick_second(*x): "cumsum", chunk="cumsum", combine="sum", - reduction_type="accumulate", + kind="accumulate", # fill_value=0, # final_fill_value=0, # dtypes=bool, diff --git a/flox/core.py b/flox/core.py index 2578acac7..c4025a76a 100644 --- a/flox/core.py +++ b/flox/core.py @@ -87,7 +87,7 @@ def _is_arg_reduction(func: T_Agg) -> bool: if isinstance(func, str) and func in ["argmin", "argmax", "nanargmax", "nanargmin"]: return True - if isinstance(func, Aggregation) and func.reduction_type == "argreduce": + if isinstance(func, Aggregation) and func.kind == "argreduce": return True return False @@ -110,15 +110,15 @@ def _get_expected_groups(by: T_By, sort: bool) -> T_ExpectIndex: return _convert_expected_groups_to_index((expected,), isbin=(False,), sort=sort)[0] -def _get_chunk_reduction(reduction_type: Literal["reduce", "argreduce", "accumulate"]) -> Callable: - if reduction_type == "reduce": +def _get_chunk_aggregation(kind: Literal["reduce", "argreduce", "accumulate"]) -> Callable: + if kind == "reduce": return chunk_reduce - elif reduction_type == "argreduce": + elif kind == "argreduce": return chunk_argreduce - elif reduction_type == "accumulate": + elif kind == "accumulate": return chunk_accumulate else: - raise ValueError(f"Unknown reduction type: {reduction_type}") + raise ValueError(f"Unknown aggregation kind: {kind}") def is_nanlen(reduction: T_Func) -> bool: @@ -1046,7 +1046,7 @@ def _grouped_combine( groups = _conc2(x_chunk, "groups", axis=neg_axis) - if agg.reduction_type == "argreduce": + if agg.kind == "argreduce": # If "nanlen" was added for masking later, we need to account for that if agg.chunk[-1] == "nanlen": slicer = slice(None, -1) @@ -1098,7 +1098,7 @@ def _grouped_combine( )["intermediates"][0] ) - elif agg.reduction_type == "reduce": + elif agg.kind == "reduce": # Here we reduce the intermediates individually results = {"groups": None, "intermediates": []} for idx, (combine, fv, dtype) in enumerate( @@ -1365,7 +1365,7 @@ def dask_groupby_agg( else: # choose `chunk_reduce` or `chunk_argreduce` blockwise_method = partial( - _get_chunk_reduction(agg.reduction_type), + _get_chunk_aggregation(agg.kind), func=agg.chunk, fill_value=agg.fill_value["intermediate"], dtype=agg.dtype["intermediate"], From d0c29b09013656b1c4c27301890c5516a0fdd796 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Fri, 7 Jul 2023 22:40:50 +0200 Subject: [PATCH 15/57] chunk=None --- flox/aggregations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flox/aggregations.py b/flox/aggregations.py index 41c7bf259..e1fd25d59 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -473,7 +473,7 @@ def _pick_second(*x): # Cumulatives: cumsum_ = Aggregation( "cumsum", - chunk="cumsum", + chunk=None, combine="sum", kind="accumulate", # fill_value=0, From 08bf8180e9662ce4fffc62dd0a88e1c41a99c2f5 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Sat, 8 Jul 2023 10:01:03 +0200 Subject: [PATCH 16/57] Trim unused paths --- flox/core.py | 35 +---------------------------------- 1 file changed, 1 insertion(+), 34 deletions(-) diff --git a/flox/core.py b/flox/core.py index c4025a76a..64bc27258 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2271,9 +2271,6 @@ def _accumulate_blockwise( reindex=reindex, ) - if _is_arg_reduction(agg): - results["intermediates"][0] = np.unravel_index(results["intermediates"][0], array.shape)[-1] - result = _finalize_results( results, agg, axis, expected_groups, fill_value=fill_value, reindex=reindex ) @@ -2388,23 +2385,14 @@ def groupby_accumulate( xarray.xarray_reduce """ - if engine == "flox" and _is_arg_reduction(func): - raise NotImplementedError( - "argreductions not supported for engine='flox' yet." - "Try engine='numpy' or engine='numba' instead." - ) - bys: T_Bys = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by) nby = len(bys) by_is_dask = tuple(is_duck_dask_array(b) for b in bys) any_by_dask = any(by_is_dask) - if method in ["split-reduce", "cohorts"] and any_by_dask: + if method in ["cohorts"] and any_by_dask: raise ValueError(f"method={method!r} can only be used when grouping by numpy arrays.") - if method == "split-reduce": - method = "cohorts" - reindex = _validate_reindex( reindex, func, method, expected_groups, any_by_dask, is_duck_dask_array(array) ) @@ -2465,19 +2453,6 @@ def groupby_accumulate( has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_) - if _is_first_last_reduction(func): - if has_dask and nax != 1: - raise ValueError( - "For dask arrays: first, last, nanfirst, nanlast reductions are " - "only supported along a single axis. Please reshape appropriately." - ) - - elif nax not in [1, by_.ndim]: - raise ValueError( - "first, last, nanfirst, nanlast reductions are only supported " - "along a single axis or when reducing across all dimensions of `by`." - ) - # TODO: make sure expected_groups is unique if nax == 1 and by_.ndim > 1 and expected_groups is None: if not any_by_dask: @@ -2515,12 +2490,6 @@ def groupby_accumulate( else: min_count_ = min_count - # TODO: set in xarray? - if min_count_ > 0 and func in ["nansum", "nanprod"] and fill_value is None: - # nansum, nanprod have fill_value=0, 1 - # overwrite than when min_count is set - fill_value = np.nan - kwargs = dict(axis=axis_, fill_value=fill_value, engine=engine) agg = _initialize_aggregation(func, dtype, array.dtype, fill_value, min_count_, finalize_kwargs) @@ -2584,8 +2553,6 @@ def groupby_accumulate( result = reindex_(result, from_=groups[0], to=expected_groups, fill_value=fill_value) groups = final_groups - if is_bool_array and (_is_minmax_reduction(func) or _is_first_last_reduction(func)): - result = result.astype(bool) return (result, *groups) # type: ignore[return-value] # Unpack not in mypy yet From d8aa5f20aaf3d287aca2a2518af6167443abd4b4 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Sat, 8 Jul 2023 10:01:39 +0200 Subject: [PATCH 17/57] cumulative bool check --- flox/core.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/flox/core.py b/flox/core.py index 64bc27258..a5b896a53 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2059,6 +2059,14 @@ def groupby_reduce( # %% Accumulate +def _is_arg_cumulative(func: T_Agg) -> bool: + if isinstance(func, str) and func in ("cumsum", "cumprod"): + return True + if isinstance(func, Aggregation) and func.kind == "accumulate": + return True + return False + + def chunk_accumulate( array: np.ndarray, by: np.ndarray, @@ -2573,8 +2581,7 @@ def groupby_aggregate( reindex: bool | None = None, finalize_kwargs: dict[Any, Any] | None = None, ) -> tuple[DaskArray, Unpack[tuple[np.ndarray | DaskArray, ...]]]: # type: ignore[misc] # Unpack not in mypy - is_accumulator = "cumsum" == func - if is_accumulator: + if _is_arg_cumulative(func): return groupby_accumulate( array, *by, From 3e30f359625ed3837bcf79d21210a164d18cb6f1 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Sat, 8 Jul 2023 10:07:19 +0200 Subject: [PATCH 18/57] rename --- flox/core.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/flox/core.py b/flox/core.py index a5b896a53..4b39b8d71 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2058,7 +2058,7 @@ def groupby_reduce( return (result, *groups) # type: ignore[return-value] # Unpack not in mypy yet -# %% Accumulate +# %% Cumulate def _is_arg_cumulative(func: T_Agg) -> bool: if isinstance(func, str) and func in ("cumsum", "cumprod"): return True @@ -2067,7 +2067,7 @@ def _is_arg_cumulative(func: T_Agg) -> bool: return False -def chunk_accumulate( +def chunk_cumulate( array: np.ndarray, by: np.ndarray, func: T_Funcs, @@ -2234,7 +2234,7 @@ def chunk_accumulate( return results -def _accumulate_blockwise( +def _cumulate_blockwise( array, by, agg: Aggregation, @@ -2262,7 +2262,7 @@ def _accumulate_blockwise( finalize_kwargs_ = agg.finalize_kwargs finalize_kwargs_ += ({},) + ({},) - results = chunk_accumulate( + results = chunk_cumulate( array, by, func=agg.numpy, @@ -2503,7 +2503,7 @@ def groupby_accumulate( groups: tuple[np.ndarray | DaskArray, ...] if not has_dask: - results = _accumulate_blockwise( + results = _cumulate_blockwise( array, by_, agg, expected_groups=expected_groups, reindex=reindex, sort=sort, **kwargs ) groups = (results["groups"],) From 4b21e898cf9e710e0add5ed52293817b94821da4 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Sat, 8 Jul 2023 10:17:48 +0200 Subject: [PATCH 19/57] Add cumprod, renaming --- flox/aggregations.py | 18 +++++++++++------- flox/core.py | 2 +- tests/test_core.py | 19 ++++++++++--------- 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/flox/aggregations.py b/flox/aggregations.py index e1fd25d59..21487b275 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -130,7 +130,7 @@ def __init__( final_fill_value=dtypes.NA, dtypes=None, final_dtype: DTypeLike | None = None, - kind: Literal["reduce", "argreduce", "accumulate"] = "reduce", + kind: Literal["reduce", "argreduce", "cumulate"] = "reduce", ): """ Blueprint for computing grouped aggregations. @@ -475,16 +475,18 @@ def _pick_second(*x): "cumsum", chunk=None, combine="sum", - kind="accumulate", - # fill_value=0, - # final_fill_value=0, - # dtypes=bool, - # final_dtype=bool, + kind="cumulate", +) +cumprod_ = Aggregation( + "cumprod", + chunk=None, + combine="prod", + kind="cumulate", ) -# sum_ = Aggregation("sum", chunk="sum", combine="sum", fill_value=0) aggregations = { + # Reductions: "any": any_, "all": all_, "count": count, @@ -510,7 +512,9 @@ def _pick_second(*x): "nanfirst": nanfirst, "last": last, "nanlast": nanlast, + # Cumulatives: "cumsum": cumsum_, + "cumprod": cumprod_, } diff --git a/flox/core.py b/flox/core.py index 4b39b8d71..8a1e1e2f2 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2062,7 +2062,7 @@ def groupby_reduce( def _is_arg_cumulative(func: T_Agg) -> bool: if isinstance(func, str) and func in ("cumsum", "cumprod"): return True - if isinstance(func, Aggregation) and func.kind == "accumulate": + if isinstance(func, Aggregation) and func.kind == "cumulate": return True return False diff --git a/tests/test_core.py b/tests/test_core.py index 795921308..2e542311a 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -80,7 +80,7 @@ def dask_array_ones(*args): ) if TYPE_CHECKING: - from flox.core import T_Engine, T_ExpectedGroupsOpt, T_Func2 + from flox.core import T_Engine, T_ExpectedGroupsOpt, T_Agg def _get_array_func(func: str) -> Callable: @@ -137,7 +137,7 @@ def test_alignment_error(): ) def test_groupby_reduce( engine: T_Engine, - func: T_Func2, + func: T_Agg, array: np.ndarray, by: np.ndarray, expected: list[float], @@ -1467,16 +1467,17 @@ def test_method_check_numpy(): assert_equal(actual, expected) -def test_cumsum() -> None: +@pytest.mark.parametrize("func", ["cumsum", "cumprod"]) +def test_cumulatives(func: T_Agg) -> None: import numpy_groupies as npg group_idx = np.array([4, 3, 3, 4, 4, 1, 1, 1, 7, 8, 7, 4, 3, 3, 1, 1]) a = np.array([3, 4, 1, 3, 9, 9, 6, 7, 7, 0, 8, 2, 1, 8, 9, 8]) - b = npg.aggregate(group_idx, a, func="cumsum") + expected = npg.aggregate(group_idx, a, func=func) - bb = groupby_accumulate(a, group_idx, func="cumsum", engine="numpy")[0] + actual = groupby_accumulate(a, group_idx, func=func, engine="numpy")[0] - np.testing.assert_allclose(b, bb) + np.testing.assert_allclose(expected, actual) def test_groupby_aggregate() -> None: @@ -1484,8 +1485,8 @@ def test_groupby_aggregate() -> None: group_idx = np.array([4, 3, 3, 4, 4, 1, 1, 1, 7, 8, 7, 4, 3, 3, 1, 1]) a = np.array([3, 4, 1, 3, 9, 9, 6, 7, 7, 0, 8, 2, 1, 8, 9, 8]) - b = npg.aggregate(group_idx, a, func="cumsum") + expected = npg.aggregate(group_idx, a, func="cumsum") - bb = groupby_aggregate(a, group_idx, func="cumsum", engine="numpy")[0] + actual = groupby_aggregate(a, group_idx, func="cumsum", engine="numpy")[0] - np.testing.assert_allclose(b, bb) + np.testing.assert_allclose(expected, actual) From 1848b022352b7d690d0a6ffead74050251e9d970 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 8 Jul 2023 08:18:04 +0000 Subject: [PATCH 20/57] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_core.py b/tests/test_core.py index 2e542311a..2447337b6 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -80,7 +80,7 @@ def dask_array_ones(*args): ) if TYPE_CHECKING: - from flox.core import T_Engine, T_ExpectedGroupsOpt, T_Agg + from flox.core import T_Agg, T_Engine, T_ExpectedGroupsOpt def _get_array_func(func: str) -> Callable: From 24466deba2ffc632849e3f020bfc8d0388b0ddb7 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Sat, 8 Jul 2023 10:20:44 +0200 Subject: [PATCH 21/57] Update core.py --- flox/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index 8a1e1e2f2..186676aa5 100644 --- a/flox/core.py +++ b/flox/core.py @@ -116,7 +116,7 @@ def _get_chunk_aggregation(kind: Literal["reduce", "argreduce", "accumulate"]) - elif kind == "argreduce": return chunk_argreduce elif kind == "accumulate": - return chunk_accumulate + return chunk_cumulate else: raise ValueError(f"Unknown aggregation kind: {kind}") From 82b99b3b490adc1c1bcb1625a6a774ee11830063 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Sat, 8 Jul 2023 10:33:09 +0200 Subject: [PATCH 22/57] align typing --- flox/aggregations.py | 3 ++- flox/core.py | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/flox/aggregations.py b/flox/aggregations.py index 21487b275..34bb6ae6d 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -14,6 +14,7 @@ if TYPE_CHECKING: FuncTuple = tuple[Callable | str, ...] + T_Kind = Literal["reduce", "argreduce", "cumulate"] def _is_arg_reduction(func: str | Aggregation) -> bool: @@ -130,7 +131,7 @@ def __init__( final_fill_value=dtypes.NA, dtypes=None, final_dtype: DTypeLike | None = None, - kind: Literal["reduce", "argreduce", "cumulate"] = "reduce", + kind: T_Kind = "reduce", ): """ Blueprint for computing grouped aggregations. diff --git a/flox/core.py b/flox/core.py index 186676aa5..8d1ddbfd8 100644 --- a/flox/core.py +++ b/flox/core.py @@ -48,6 +48,8 @@ import dask.array.Array as DaskArray + from .aggregations import T_Kind + T_DuckArray = Union[np.ndarray, DaskArray] # Any ? T_By = T_DuckArray T_Bys = tuple[T_By, ...] @@ -110,7 +112,7 @@ def _get_expected_groups(by: T_By, sort: bool) -> T_ExpectIndex: return _convert_expected_groups_to_index((expected,), isbin=(False,), sort=sort)[0] -def _get_chunk_aggregation(kind: Literal["reduce", "argreduce", "accumulate"]) -> Callable: +def _get_chunk_aggregation(kind: T_Kind) -> Callable: if kind == "reduce": return chunk_reduce elif kind == "argreduce": From 9d5079221b036a1fb2a6441d5789d4d247f6807f Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Sat, 8 Jul 2023 10:50:38 +0200 Subject: [PATCH 23/57] cumprod not supported in npg from numpy --- flox/aggregations.py | 2 +- tests/test_core.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flox/aggregations.py b/flox/aggregations.py index 34bb6ae6d..8bf5e80d8 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -515,7 +515,7 @@ def _pick_second(*x): "nanlast": nanlast, # Cumulatives: "cumsum": cumsum_, - "cumprod": cumprod_, + # "cumprod": cumprod_, } diff --git a/tests/test_core.py b/tests/test_core.py index 2447337b6..970764113 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1467,7 +1467,7 @@ def test_method_check_numpy(): assert_equal(actual, expected) -@pytest.mark.parametrize("func", ["cumsum", "cumprod"]) +@pytest.mark.parametrize("func", ["cumsum"]) # "cumprod" def test_cumulatives(func: T_Agg) -> None: import numpy_groupies as npg From 69fbad0a56404cee95253d424d868d97bc9c2a10 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Sat, 8 Jul 2023 15:32:09 +0200 Subject: [PATCH 24/57] cleanup --- flox/core.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/flox/core.py b/flox/core.py index 8d1ddbfd8..597e111df 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2049,10 +2049,9 @@ def groupby_reduce( # nan group labels are factorized to -1, and preserved # now we get rid of them by reindexing # This also handles bins with no data - result = reindex_(result, from_=groups[0], to=expected_groups, fill_value=fill_value) - is_cumreduction = func == "cumsum" - if not is_cumreduction: - result = result.reshape(result.shape[:-1] + grp_shape) + result = reindex_( + result, from_=groups[0], to=expected_groups, fill_value=fill_value + ).reshape(result.shape[:-1] + grp_shape) groups = final_groups if is_bool_array and (_is_minmax_reduction(func) or _is_first_last_reduction(func)): From 6da99df25e9c337f1063b61fde2e0720366a99d7 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Sat, 8 Jul 2023 15:33:11 +0200 Subject: [PATCH 25/57] Update aggregations.py --- flox/aggregations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flox/aggregations.py b/flox/aggregations.py index 8bf5e80d8..d17b476f0 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -515,7 +515,7 @@ def _pick_second(*x): "nanlast": nanlast, # Cumulatives: "cumsum": cumsum_, - # "cumprod": cumprod_, + # "cumprod": cumprod_, } From 25d36ebe14c63ebca86fa3e70c1f14f0e376643d Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Sat, 8 Jul 2023 15:47:17 +0200 Subject: [PATCH 26/57] missed engine arg --- flox/aggregations.py | 2 +- flox/core.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/flox/aggregations.py b/flox/aggregations.py index d17b476f0..34bb6ae6d 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -515,7 +515,7 @@ def _pick_second(*x): "nanlast": nanlast, # Cumulatives: "cumsum": cumsum_, - # "cumprod": cumprod_, + "cumprod": cumprod_, } diff --git a/flox/core.py b/flox/core.py index 597e111df..4e8a77341 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2595,6 +2595,7 @@ def groupby_aggregate( dtype=dtype, min_count=min_count, method=method, + engine=engine, reindex=reindex, finalize_kwargs=finalize_kwargs, ) @@ -2611,6 +2612,7 @@ def groupby_aggregate( dtype=dtype, min_count=min_count, method=method, + engine=engine, reindex=reindex, finalize_kwargs=finalize_kwargs, ) From 03d57a4d1fbad71134df82257d1c6ee69022eec7 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Sat, 8 Jul 2023 21:28:18 +0200 Subject: [PATCH 27/57] Fix typo --- flox/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index 4e8a77341..00ee46ca4 100644 --- a/flox/core.py +++ b/flox/core.py @@ -117,7 +117,7 @@ def _get_chunk_aggregation(kind: T_Kind) -> Callable: return chunk_reduce elif kind == "argreduce": return chunk_argreduce - elif kind == "accumulate": + elif kind == "cumulate": return chunk_cumulate else: raise ValueError(f"Unknown aggregation kind: {kind}") From 4dc5b0959b92e04343f6efa9d8059b482ae7c370 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Sun, 9 Jul 2023 23:03:51 +0200 Subject: [PATCH 28/57] Apply suggestions from code review Co-authored-by: Deepak Cherian --- flox/core.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/flox/core.py b/flox/core.py index 4e8a77341..38a6916d8 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2290,17 +2290,11 @@ def groupby_accumulate( array: np.ndarray | DaskArray, *by: T_By, func: T_Agg, - expected_groups: T_ExpectedGroupsOpt = None, - sort: bool = True, + expected_groups: T_ExpectedGroupsOpt = None, #TODO: think about this one isbin: T_IsBins = False, axis: T_AxesOpt = None, - fill_value=None, dtype: np.typing.DTypeLike = None, - min_count: int | None = None, - method: T_Method = "map-reduce", engine: T_Engine = "numpy", - reindex: bool | None = None, - finalize_kwargs: dict[Any, Any] | None = None, ) -> tuple[DaskArray, Unpack[tuple[np.ndarray | DaskArray, ...]]]: # type: ignore[misc] # Unpack not in mypy yet """ GroupBy reductions using tree reductions for dask.array @@ -2562,7 +2556,7 @@ def groupby_accumulate( result = reindex_(result, from_=groups[0], to=expected_groups, fill_value=fill_value) groups = final_groups - return (result, *groups) # type: ignore[return-value] # Unpack not in mypy yet + return result # %% Aggregate From b1f3a4099772821019c112e74ce40835ba809754 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 9 Jul 2023 21:04:01 +0000 Subject: [PATCH 29/57] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- flox/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index 38a6916d8..ba63246db 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2290,7 +2290,7 @@ def groupby_accumulate( array: np.ndarray | DaskArray, *by: T_By, func: T_Agg, - expected_groups: T_ExpectedGroupsOpt = None, #TODO: think about this one + expected_groups: T_ExpectedGroupsOpt = None, # TODO: think about this one isbin: T_IsBins = False, axis: T_AxesOpt = None, dtype: np.typing.DTypeLike = None, From 1bcd2773152f60769dc1dbbb67ec8738cb7d22ae Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 10 Jul 2023 14:19:26 +0200 Subject: [PATCH 30/57] trim --- flox/core.py | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/flox/core.py b/flox/core.py index acbb8b024..43b9392b3 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2210,9 +2210,6 @@ def chunk_cumulate( if empty: result = np.full(shape=final_array_shape, fill_value=fv) else: - if is_nanlen(reduction) and is_nanlen(previous_reduction): - result = results["intermediates"][-1] - # fill_value here is necessary when reducing with "offset" groups kw_func = dict(size=size, dtype=dt, fill_value=fv) kw_func.update(kw) @@ -2509,17 +2506,17 @@ def groupby_accumulate( # TODO: How else to narrow that array.chunks is there? assert isinstance(array, DaskArray) - if agg.chunk[0] is None and method != "blockwise": - raise NotImplementedError( - f"Aggregation {agg.name!r} is only implemented for dask arrays when method='blockwise'." - f"\n\n Received: {func}" - ) - - if method in ["blockwise", "cohorts"] and nax != by_.ndim: - raise NotImplementedError( - "Must reduce along all dimensions of `by` when method != 'map-reduce'." - f"Received method={method!r}" - ) + # if agg.chunk[0] is None and method != "blockwise": + # raise NotImplementedError( + # f"Aggregation {agg.name!r} is only implemented for dask arrays when method='blockwise'." + # f"\n\n Received: {func}" + # ) + + # if method in ["blockwise", "cohorts"] and nax != by_.ndim: + # raise NotImplementedError( + # "Must reduce along all dimensions of `by` when method != 'map-reduce'." + # f"Received method={method!r}" + # ) # TODO: just do this in dask_groupby_agg # we always need some fill_value (see above) so choose the default if needed From 29b6f3f954e65e99fd312b01e87ef37807f5a8b2 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 10 Jul 2023 12:19:40 +0000 Subject: [PATCH 31/57] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- flox/core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index 43b9392b3..7ba3a799a 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2205,7 +2205,6 @@ def chunk_cumulate( # we commonly have func=(..., "nanlen", "nanlen") when # counts are needed for the final result as well as for masking # optimize that out. - previous_reduction: T_Func = "" for reduction, fv, kw, dt in zip(funcs, fill_values, kwargss, dtypes): if empty: result = np.full(shape=final_array_shape, fill_value=fv) From cedbc4de6eb2c7b341d9d77d5f99fba5f7b6f800 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 10 Jul 2023 14:36:01 +0200 Subject: [PATCH 32/57] Remove a bunch of not needed code --- flox/core.py | 70 +++++++++++++++++----------------------------------- 1 file changed, 23 insertions(+), 47 deletions(-) diff --git a/flox/core.py b/flox/core.py index 43b9392b3..b151dab08 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2390,12 +2390,8 @@ def groupby_accumulate( by_is_dask = tuple(is_duck_dask_array(b) for b in bys) any_by_dask = any(by_is_dask) - if method in ["cohorts"] and any_by_dask: - raise ValueError(f"method={method!r} can only be used when grouping by numpy arrays.") - - reindex = _validate_reindex( - reindex, func, method, expected_groups, any_by_dask, is_duck_dask_array(array) - ) + _sort = False # TODO: needed? + _fill_value = None # TODO: needed? if not is_duck_array(array): array = np.asarray(array) @@ -2412,14 +2408,14 @@ def groupby_accumulate( expected_groups = _validate_expected_groups(nby, expected_groups) for idx, (expect, is_dask) in enumerate(zip(expected_groups, by_is_dask)): - if is_dask and (reindex or nby > 1) and expect is None: + if is_dask and (nby > 1) and expect is None: raise ValueError( f"`expected_groups` for array {idx} in `by` cannot be None since it is a dask.array." ) # We convert to pd.Index since that lets us know if we are binning or not # (pd.IntervalIndex or not) - expected_groups = _convert_expected_groups_to_index(expected_groups, isbins, sort) + expected_groups = _convert_expected_groups_to_index(expected_groups, isbins, _sort) # Don't factorize "early only when # grouping by dask arrays, and not having expected_groups @@ -2436,7 +2432,7 @@ def groupby_accumulate( # reindex controls what's actually allocated in chunk_reduce # At this point, we care about an accurate conversion to codes. reindex=True, - sort=sort, + sort=_sort, ) expected_groups = (pd.RangeIndex(math.prod(grp_shape)),) @@ -2456,7 +2452,7 @@ def groupby_accumulate( # TODO: make sure expected_groups is unique if nax == 1 and by_.ndim > 1 and expected_groups is None: if not any_by_dask: - expected_groups = _get_expected_groups(by_, sort) + expected_groups = _get_expected_groups(by_, _sort) else: # When we reduce along all axes, we are guaranteed to see all # groups in the final combine stage, so everything works. @@ -2476,27 +2472,13 @@ def groupby_accumulate( axis_ = tuple(array.ndim + np.arange(-nax, 0)) nax = len(axis_) - # When axis is a subset of possible values; then npg will - # apply it to groups that don't exist along a particular axis (for e.g.) - # since these count as a group that is absent. thoo! - # fill_value applies to all-NaN groups as well as labels in expected_groups that are not found. - # The only way to do this consistently is mask out using min_count - # Consider np.sum([np.nan]) = np.nan, np.nansum([np.nan]) = 0 - if min_count is None: - if nax < by_.ndim or fill_value is not None: - min_count_: int = 1 - else: - min_count_ = 0 - else: - min_count_ = min_count - - kwargs = dict(axis=axis_, fill_value=fill_value, engine=engine) - agg = _initialize_aggregation(func, dtype, array.dtype, fill_value, min_count_, finalize_kwargs) + kwargs = dict(axis=axis_, fill_value=_fill_value, engine=engine) + agg = _initialize_aggregation(func, dtype, array.dtype, _fill_value, None, None) groups: tuple[np.ndarray | DaskArray, ...] if not has_dask: results = _cumulate_blockwise( - array, by_, agg, expected_groups=expected_groups, reindex=reindex, sort=sort, **kwargs + array, by_, agg, expected_groups=expected_groups, reindex=False, sort=_sort, **kwargs ) groups = (results["groups"],) result = results[agg.name] @@ -2525,32 +2507,32 @@ def groupby_accumulate( partial_agg = partial(dask_groupby_agg, **kwargs) - if method == "blockwise" and by_.ndim == 1: - array = rechunk_for_blockwise(array, axis=-1, labels=by_) + # if method == "blockwise" and by_.ndim == 1: + # array = rechunk_for_blockwise(array, axis=-1, labels=by_) result, groups = partial_agg( array, by_, - expected_groups=None if method == "blockwise" else expected_groups, + expected_groups=expected_groups, agg=agg, - reindex=reindex, - method=method, - sort=sort, + reindex=False, + method="", # TODO: ? + sort=_sort, ) - if sort and method != "map-reduce": - assert len(groups) == 1 - sorted_idx = np.argsort(groups[0]) - # This optimization helps specifically with resampling - if not (sorted_idx[:-1] <= sorted_idx[1:]).all(): - result = result[..., sorted_idx] - groups = (groups[0][sorted_idx],) + # if sort and method != "map-reduce": + # assert len(groups) == 1 + # sorted_idx = np.argsort(groups[0]) + # # This optimization helps specifically with resampling + # if not (sorted_idx[:-1] <= sorted_idx[1:]).all(): + # result = result[..., sorted_idx] + # groups = (groups[0][sorted_idx],) if factorize_early: # nan group labels are factorized to -1, and preserved # now we get rid of them by reindexing # This also handles bins with no data - result = reindex_(result, from_=groups[0], to=expected_groups, fill_value=fill_value) + result = reindex_(result, from_=groups[0], to=expected_groups, fill_value=_fill_value) groups = final_groups return result @@ -2579,16 +2561,10 @@ def groupby_aggregate( *by, func=func, expected_groups=expected_groups, - sort=sort, isbin=isbin, axis=axis, - fill_value=fill_value, dtype=dtype, - min_count=min_count, - method=method, engine=engine, - reindex=reindex, - finalize_kwargs=finalize_kwargs, ) else: return groupby_reduce( From ffe5f823276b2834094c47fbd4dab027cf4d6df9 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 10 Jul 2023 14:41:10 +0200 Subject: [PATCH 33/57] Update core.py --- flox/core.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/flox/core.py b/flox/core.py index 0b9fc40f5..bb36f79c2 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2471,13 +2471,13 @@ def groupby_accumulate( axis_ = tuple(array.ndim + np.arange(-nax, 0)) nax = len(axis_) - kwargs = dict(axis=axis_, fill_value=_fill_value, engine=engine) - agg = _initialize_aggregation(func, dtype, array.dtype, _fill_value, None, None) + kwargs_ = dict(axis=axis_, fill_value=_fill_value, engine=engine) + agg = _initialize_aggregation(func, dtype, array.dtype, _fill_value, 0, None) groups: tuple[np.ndarray | DaskArray, ...] if not has_dask: results = _cumulate_blockwise( - array, by_, agg, expected_groups=expected_groups, reindex=False, sort=_sort, **kwargs + array, by_, agg, expected_groups=expected_groups, reindex=False, sort=_sort, **kwargs_ ) groups = (results["groups"],) result = results[agg.name] @@ -2501,10 +2501,10 @@ def groupby_accumulate( # TODO: just do this in dask_groupby_agg # we always need some fill_value (see above) so choose the default if needed - if kwargs["fill_value"] is None: - kwargs["fill_value"] = agg.fill_value[agg.name] + if kwargs_["fill_value"] is None: + kwargs_["fill_value"] = agg.fill_value[agg.name] - partial_agg = partial(dask_groupby_agg, **kwargs) + partial_agg = partial(dask_groupby_agg, **kwargs_) # if method == "blockwise" and by_.ndim == 1: # array = rechunk_for_blockwise(array, axis=-1, labels=by_) From 2e27ca567cb0e751fa8718220a079d6ab7ed0733 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 10 Jul 2023 14:56:50 +0200 Subject: [PATCH 34/57] More trimming --- flox/core.py | 56 +++------------------------------------------- tests/test_core.py | 4 ++-- 2 files changed, 5 insertions(+), 55 deletions(-) diff --git a/flox/core.py b/flox/core.py index bb36f79c2..3d49821bb 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2238,10 +2238,7 @@ def _cumulate_blockwise( *, axis: T_Axes, expected_groups, - fill_value, engine: T_Engine, - sort, - reindex, ) -> FinalResultsDict: """ Blockwise groupby reduction that produces the final result. This code path is @@ -2272,13 +2269,9 @@ def _cumulate_blockwise( dtype=agg.dtype["numpy"], kwargs=finalize_kwargs_, engine=engine, - sort=sort, - reindex=reindex, ) - result = _finalize_results( - results, agg, axis, expected_groups, fill_value=fill_value, reindex=reindex - ) + result = _finalize_results(results, agg, axis, expected_groups, fill_value=None, reindex=False) return result @@ -2308,49 +2301,12 @@ def groupby_accumulate( Expected unique labels. isbin : bool, optional Are ``expected_groups`` bin edges? - sort : bool, optional - Whether groups should be returned in sorted order. Only applies for dask - reductions when ``method`` is not ``"map-reduce"``. For ``"map-reduce"``, the groups - are always sorted. axis : None or int or Sequence[int], optional If None, reduce across all dimensions of by Else, reduce across corresponding axes of array Negative integers are normalized using array.ndim - fill_value : Any - Value to assign when a label in ``expected_groups`` is not present. dtype : data-type , optional DType for the output. Can be anything that is accepted by ``np.dtype``. - min_count : int, default: None - The required number of valid values to perform the operation. If - fewer than min_count non-NA values are present the result will be - NA. Only used if skipna is set to True or defaults to True for the - array's dtype. - method : {"map-reduce", "blockwise", "cohorts", "split-reduce"}, optional - Strategy for reduction of dask arrays only: - * ``"map-reduce"``: - First apply the reduction blockwise on ``array``, then - combine a few newighbouring blocks, apply the reduction. - Continue until finalizing. Usually, ``func`` will need - to be an Aggregation instance for this method to work. - Common aggregations are implemented. - * ``"blockwise"``: - Only reduce using blockwise and avoid aggregating blocks - together. Useful for resampling-style reductions where group - members are always together. If `by` is 1D, `array` is automatically - rechunked so that chunk boundaries line up with group boundaries - i.e. each block contains all members of any group present - in that block. For nD `by`, you must make sure that all members of a group - are present in a single block. - * ``"cohorts"``: - Finds group labels that tend to occur together ("cohorts"), - indexes out cohorts and reduces that subset using "map-reduce", - repeat for all cohorts. This works well for many time groupings - where the group labels repeat at regular intervals like 'hour', - 'month', dayofyear' etc. Optimize chunking ``array`` for this - method by first rechunking using ``rechunk_for_cohorts`` - (for 1D ``by`` only). - * ``"split-reduce"``: - Same as "cohorts" and will be removed soon. engine : {"flox", "numpy", "numba"}, optional Algorithm to compute the groupby reduction on non-dask arrays and on each dask chunk: * ``"numpy"``: @@ -2363,12 +2319,6 @@ def groupby_accumulate( for a reduction that is not yet implemented. * ``"numba"``: Use the implementations in ``numpy_groupies.aggregate_numba``. - reindex : bool, optional - Whether to "reindex" the blockwise results to ``expected_groups`` (possibly automatically detected). - If True, the intermediate result of the blockwise groupby-reduction has a value for all expected groups, - and the final result is a simple reduction of those intermediates. In nearly all cases, this is a significant - boost in computation speed. For cases like time grouping, this may result in large intermediates relative to the - original block size. Avoid that by using ``method="cohorts"``. By default, it is turned off for argreductions. finalize_kwargs : dict, optional Kwargs passed to finalize the reduction such as ``ddof`` for var, std. @@ -2553,7 +2503,7 @@ def groupby_aggregate( engine: T_Engine = "numpy", reindex: bool | None = None, finalize_kwargs: dict[Any, Any] | None = None, -) -> tuple[DaskArray, Unpack[tuple[np.ndarray | DaskArray, ...]]]: # type: ignore[misc] # Unpack not in mypy +) -> DaskArray: if _is_arg_cumulative(func): return groupby_accumulate( array, @@ -2581,4 +2531,4 @@ def groupby_aggregate( engine=engine, reindex=reindex, finalize_kwargs=finalize_kwargs, - ) + )[0] diff --git a/tests/test_core.py b/tests/test_core.py index 82c00ef6f..f4692ee4c 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1477,7 +1477,7 @@ def test_cumulatives(func: T_Agg) -> None: a = np.array([3, 4, 1, 3, 9, 9, 6, 7, 7, 0, 8, 2, 1, 8, 9, 8]) expected = npg.aggregate(group_idx, a, func=func) - actual = groupby_accumulate(a, group_idx, func=func, engine="numpy")[0] + actual = groupby_accumulate(a, group_idx, func=func, engine="numpy") np.testing.assert_allclose(expected, actual) @@ -1489,6 +1489,6 @@ def test_groupby_aggregate() -> None: a = np.array([3, 4, 1, 3, 9, 9, 6, 7, 7, 0, 8, 2, 1, 8, 9, 8]) expected = npg.aggregate(group_idx, a, func="cumsum") - actual = groupby_aggregate(a, group_idx, func="cumsum", engine="numpy")[0] + actual = groupby_aggregate(a, group_idx, func="cumsum", engine="numpy") np.testing.assert_allclose(expected, actual) From 37e5b95a46d022a2896c349bb8d02412c82ef5ef Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 10 Jul 2023 15:06:13 +0200 Subject: [PATCH 35/57] Update core.py --- flox/core.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/flox/core.py b/flox/core.py index 3d49821bb..d04edacc2 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2421,13 +2421,12 @@ def groupby_accumulate( axis_ = tuple(array.ndim + np.arange(-nax, 0)) nax = len(axis_) - kwargs_ = dict(axis=axis_, fill_value=_fill_value, engine=engine) agg = _initialize_aggregation(func, dtype, array.dtype, _fill_value, 0, None) groups: tuple[np.ndarray | DaskArray, ...] if not has_dask: results = _cumulate_blockwise( - array, by_, agg, expected_groups=expected_groups, reindex=False, sort=_sort, **kwargs_ + array, by_, agg, axis=axis_, expected_groups=expected_groups, engine=engine ) groups = (results["groups"],) result = results[agg.name] @@ -2451,10 +2450,10 @@ def groupby_accumulate( # TODO: just do this in dask_groupby_agg # we always need some fill_value (see above) so choose the default if needed - if kwargs_["fill_value"] is None: - kwargs_["fill_value"] = agg.fill_value[agg.name] + if _fill_value is None: + _fill_value = agg.fill_value[agg.name] - partial_agg = partial(dask_groupby_agg, **kwargs_) + partial_agg = partial(dask_groupby_agg, axis=axis_, fill_value=_fill_value, engine=engine) # if method == "blockwise" and by_.ndim == 1: # array = rechunk_for_blockwise(array, axis=-1, labels=by_) From 9a4ce231e369818d44f3c7dec705d8d5e2bcec41 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 10 Jul 2023 15:40:15 +0200 Subject: [PATCH 36/57] Update core.py --- flox/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index d04edacc2..dedb4e237 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2284,7 +2284,7 @@ def groupby_accumulate( axis: T_AxesOpt = None, dtype: np.typing.DTypeLike = None, engine: T_Engine = "numpy", -) -> tuple[DaskArray, Unpack[tuple[np.ndarray | DaskArray, ...]]]: # type: ignore[misc] # Unpack not in mypy yet +) -> DaskArray """ GroupBy reductions using tree reductions for dask.array From 4d6c8a4412666772f4ab7e5c5cd19f0db9508df7 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 10 Jul 2023 15:41:07 +0200 Subject: [PATCH 37/57] Update core.py --- flox/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index dedb4e237..816377b6a 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2284,7 +2284,7 @@ def groupby_accumulate( axis: T_AxesOpt = None, dtype: np.typing.DTypeLike = None, engine: T_Engine = "numpy", -) -> DaskArray +) -> DaskArray: """ GroupBy reductions using tree reductions for dask.array From c209100ee8b411cd254afdbb64ee1e3ca65d81f6 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 10 Jul 2023 18:00:46 +0200 Subject: [PATCH 38/57] Update flox/core.py Co-authored-by: Deepak Cherian --- flox/core.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/flox/core.py b/flox/core.py index 816377b6a..df4d311d5 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2074,12 +2074,8 @@ def chunk_cumulate( func: T_Funcs, expected_groups: pd.Index | None, axis: T_AxesOpt = None, - fill_value: T_FillValues = None, dtype: T_Dtypes = None, - reindex: bool = False, engine: T_Engine = "numpy", - kwargs: Sequence[dict] | None = None, - sort: bool = True, ) -> IntermediateDict: """ Wrapper for numpy_groupies aggregate that supports nD ``array`` and From 0b01fa0a785c57098b9664a19951990e2efe896b Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 10 Jul 2023 20:32:34 +0200 Subject: [PATCH 39/57] Apply suggestions from code review Co-authored-by: Deepak Cherian --- flox/core.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/flox/core.py b/flox/core.py index df4d311d5..90fabe099 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2240,18 +2240,6 @@ def _cumulate_blockwise( Blockwise groupby reduction that produces the final result. This code path is also used for non-dask array aggregations. """ - # for pure numpy grouping, we just use npg directly and avoid "finalizing" - # (agg.finalize = None). We still need to do the reindexing step in finalize - # so that everything matches the dask version. - agg.finalize = None - - assert agg.finalize_kwargs is not None - if isinstance(agg.finalize_kwargs, Mapping): - finalize_kwargs_: tuple[dict[Any, Any], ...] = (agg.finalize_kwargs,) - else: - finalize_kwargs_ = agg.finalize_kwargs - finalize_kwargs_ += ({},) + ({},) - results = chunk_cumulate( array, by, @@ -2263,7 +2251,6 @@ def _cumulate_blockwise( # (see below) fill_value=agg.fill_value["numpy"], dtype=agg.dtype["numpy"], - kwargs=finalize_kwargs_, engine=engine, ) From 0deeb1183dde18fd280bfde6b7029d2a691be6fb Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 10 Jul 2023 20:41:27 +0200 Subject: [PATCH 40/57] trim --- flox/core.py | 85 +++++++--------------------------------------- tests/test_core.py | 12 ------- 2 files changed, 13 insertions(+), 84 deletions(-) diff --git a/flox/core.py b/flox/core.py index 90fabe099..5f9f8d7a4 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2118,18 +2118,6 @@ def chunk_cumulate( dtypes = (dtype,) * nfuncs assert len(dtypes) >= nfuncs - if isinstance(fill_value, Sequence): - fill_values = fill_value - else: - fill_values = (fill_value,) * nfuncs - assert len(fill_values) >= nfuncs - - if isinstance(kwargs, Sequence): - kwargss = kwargs - else: - kwargss = ({},) * nfuncs - assert len(kwargss) >= nfuncs - if isinstance(axis, Sequence): axes: T_Axes = axis nax = len(axes) @@ -2158,8 +2146,10 @@ def chunk_cumulate( # avoid by factorizing again so indices=[2,2,2] is changed to # indices=[0,0,0]. This is necessary when combining block results # factorize can handle strings etc unlike digitize + _reindex = False # TODO: Needed? + _sort = False # TODO: Needed? group_idx, grps, found_groups_shape, _, size, props = factorize_( - (by,), axes, expected_groups=(expected_groups,), reindex=reindex, sort=sort + (by,), axes, expected_groups=(expected_groups,), reindex=_reindex, sort=_sort ) groups = grps[0] @@ -2179,7 +2169,7 @@ def chunk_cumulate( empty = np.all(props.nanmask) results: IntermediateDict = {"groups": [], "intermediates": []} - if reindex and expected_groups is not None: + if _reindex and expected_groups is not None: # TODO: what happens with binning here? results["groups"] = expected_groups.to_numpy() else: @@ -2201,21 +2191,19 @@ def chunk_cumulate( # we commonly have func=(..., "nanlen", "nanlen") when # counts are needed for the final result as well as for masking # optimize that out. - for reduction, fv, kw, dt in zip(funcs, fill_values, kwargss, dtypes): - if empty: - result = np.full(shape=final_array_shape, fill_value=fv) + for aggregation, dt in zip(funcs, dtypes): + if empty: # TODO: Can ever be empty? + result = np.full(shape=final_array_shape) else: - # fill_value here is necessary when reducing with "offset" groups - kw_func = dict(size=size, dtype=dt, fill_value=fv) - kw_func.update(kw) + kw_func = dict(size=size, dtype=dt) - if callable(reduction): - # passing a custom reduction for npg to apply per-group is really slow! - # So this `reduction` has to do the groupby-aggregation - result = reduction(group_idx, array, **kw_func) + if callable(aggregation): + # passing a custom aggregation for npg to apply per-group is really slow! + # So this `aggregation` has to do the groupby-aggregation + result = aggregation(group_idx, array, **kw_func) else: result = generic_aggregate( - group_idx, array, axis=-1, engine=engine, func=reduction, **kw_func + group_idx, array, axis=-1, engine=engine, func=aggregation, **kw_func ).astype(dt, copy=False) if np.any(props.nanmask): # remove NaN group label which should be last @@ -2467,50 +2455,3 @@ def groupby_accumulate( groups = final_groups return result - - -# %% Aggregate -def groupby_aggregate( - array: np.ndarray | DaskArray, - *by: T_By, - func: T_Agg, - expected_groups: T_ExpectedGroupsOpt = None, - sort: bool = True, - isbin: T_IsBins = False, - axis: T_AxesOpt = None, - fill_value=None, - dtype: np.typing.DTypeLike = None, - min_count: int | None = None, - method: T_Method = "map-reduce", - engine: T_Engine = "numpy", - reindex: bool | None = None, - finalize_kwargs: dict[Any, Any] | None = None, -) -> DaskArray: - if _is_arg_cumulative(func): - return groupby_accumulate( - array, - *by, - func=func, - expected_groups=expected_groups, - isbin=isbin, - axis=axis, - dtype=dtype, - engine=engine, - ) - else: - return groupby_reduce( - array, - *by, - func=func, - expected_groups=expected_groups, - sort=sort, - isbin=isbin, - axis=axis, - fill_value=fill_value, - dtype=dtype, - min_count=min_count, - method=method, - engine=engine, - reindex=reindex, - finalize_kwargs=finalize_kwargs, - )[0] diff --git a/tests/test_core.py b/tests/test_core.py index f4692ee4c..ffeec8410 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1480,15 +1480,3 @@ def test_cumulatives(func: T_Agg) -> None: actual = groupby_accumulate(a, group_idx, func=func, engine="numpy") np.testing.assert_allclose(expected, actual) - - -def test_groupby_aggregate() -> None: - import numpy_groupies as npg - - group_idx = np.array([4, 3, 3, 4, 4, 1, 1, 1, 7, 8, 7, 4, 3, 3, 1, 1]) - a = np.array([3, 4, 1, 3, 9, 9, 6, 7, 7, 0, 8, 2, 1, 8, 9, 8]) - expected = npg.aggregate(group_idx, a, func="cumsum") - - actual = groupby_aggregate(a, group_idx, func="cumsum", engine="numpy") - - np.testing.assert_allclose(expected, actual) From 1a373cb2359b6ff25f569b0cca26788005a86905 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 10 Jul 2023 18:42:21 +0000 Subject: [PATCH 41/57] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_core.py b/tests/test_core.py index ffeec8410..e3088c77e 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -20,7 +20,6 @@ factorize_, find_group_cohorts, groupby_accumulate, - groupby_aggregate, groupby_reduce, rechunk_for_cohorts, reindex_, From 6b4a3aed667d2b2e176dff6412bcfb46b9b2c239 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 10 Jul 2023 20:44:24 +0200 Subject: [PATCH 42/57] consistent naming --- flox/__init__.py | 3 +-- tests/test_core.py | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/flox/__init__.py b/flox/__init__.py index 3b69832f3..b8552ed92 100644 --- a/flox/__init__.py +++ b/flox/__init__.py @@ -4,8 +4,7 @@ from . import cache from .aggregations import Aggregation # noqa from .core import ( - groupby_accumulate, - groupby_aggregate, + groupby_cumulate, groupby_reduce, rechunk_for_blockwise, rechunk_for_cohorts, diff --git a/tests/test_core.py b/tests/test_core.py index ffeec8410..4d51379db 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -19,7 +19,7 @@ _validate_reindex, factorize_, find_group_cohorts, - groupby_accumulate, + groupby_cumulate, groupby_aggregate, groupby_reduce, rechunk_for_cohorts, @@ -1477,6 +1477,6 @@ def test_cumulatives(func: T_Agg) -> None: a = np.array([3, 4, 1, 3, 9, 9, 6, 7, 7, 0, 8, 2, 1, 8, 9, 8]) expected = npg.aggregate(group_idx, a, func=func) - actual = groupby_accumulate(a, group_idx, func=func, engine="numpy") + actual = groupby_cumulate(a, group_idx, func=func, engine="numpy") np.testing.assert_allclose(expected, actual) From 12ee35a33287e40eaeb9283cc0670d605338ee30 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 10 Jul 2023 20:47:17 +0200 Subject: [PATCH 43/57] Update core.py --- flox/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index 5f9f8d7a4..1495d5e66 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2246,7 +2246,7 @@ def _cumulate_blockwise( return result -def groupby_accumulate( +def groupby_cumulate( array: np.ndarray | DaskArray, *by: T_By, func: T_Agg, From fac463489ac89909cd5d9fb4603aceb6642a4358 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 10 Jul 2023 20:49:21 +0200 Subject: [PATCH 44/57] Apply suggestions from code review --- flox/aggregations.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/flox/aggregations.py b/flox/aggregations.py index 34bb6ae6d..4c96c92d0 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -478,12 +478,6 @@ def _pick_second(*x): combine="sum", kind="cumulate", ) -cumprod_ = Aggregation( - "cumprod", - chunk=None, - combine="prod", - kind="cumulate", -) aggregations = { @@ -515,7 +509,6 @@ def _pick_second(*x): "nanlast": nanlast, # Cumulatives: "cumsum": cumsum_, - "cumprod": cumprod_, } From 62f659eb9f1b002d680463b20a73b8aa4a98492c Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 10 Jul 2023 20:57:28 +0200 Subject: [PATCH 45/57] Update core.py --- flox/core.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/flox/core.py b/flox/core.py index 1495d5e66..f7b9b1db7 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2234,10 +2234,6 @@ def _cumulate_blockwise( func=agg.numpy, axis=axis, expected_groups=expected_groups, - # This fill_value should only apply to groups that only contain NaN observations - # BUT there is funkiness when axis is a subset of all possible values - # (see below) - fill_value=agg.fill_value["numpy"], dtype=agg.dtype["numpy"], engine=engine, ) From 7637313cbc1d182594b9625338969277481b2b07 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 10 Jul 2023 21:04:05 +0200 Subject: [PATCH 46/57] Update core.py --- flox/core.py | 33 +++++++++++++-------------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/flox/core.py b/flox/core.py index f7b9b1db7..f6139795d 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2166,17 +2166,13 @@ def chunk_cumulate( group_idx = group_idx.reshape(-1) assert group_idx.ndim == 1 - empty = np.all(props.nanmask) results: IntermediateDict = {"groups": [], "intermediates": []} if _reindex and expected_groups is not None: # TODO: what happens with binning here? results["groups"] = expected_groups.to_numpy() else: - if empty: - results["groups"] = np.array([np.nan]) - else: - results["groups"] = groups + results["groups"] = groups # npg's argmax ensures that index of first "max" is returned assuming there # are many elements equal to the "max". Sorting messes this up totally. @@ -2192,22 +2188,19 @@ def chunk_cumulate( # counts are needed for the final result as well as for masking # optimize that out. for aggregation, dt in zip(funcs, dtypes): - if empty: # TODO: Can ever be empty? - result = np.full(shape=final_array_shape) - else: - kw_func = dict(size=size, dtype=dt) + kw_func = dict(size=size, dtype=dt) - if callable(aggregation): - # passing a custom aggregation for npg to apply per-group is really slow! - # So this `aggregation` has to do the groupby-aggregation - result = aggregation(group_idx, array, **kw_func) - else: - result = generic_aggregate( - group_idx, array, axis=-1, engine=engine, func=aggregation, **kw_func - ).astype(dt, copy=False) - if np.any(props.nanmask): - # remove NaN group label which should be last - result = result[..., :-1] + if callable(aggregation): + # passing a custom aggregation for npg to apply per-group is really slow! + # So this `aggregation` has to do the groupby-aggregation + result = aggregation(group_idx, array, **kw_func) + else: + result = generic_aggregate( + group_idx, array, axis=-1, engine=engine, func=aggregation, **kw_func + ).astype(dt, copy=False) + if np.any(props.nanmask): + # remove NaN group label which should be last + result = result[..., :-1] results["intermediates"].append(result) From ace3b7730fcf9212d71494786e18b03de57dbe65 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 10 Jul 2023 19:04:22 +0000 Subject: [PATCH 47/57] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- flox/core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index f6139795d..3480e87cb 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2130,7 +2130,6 @@ def chunk_cumulate( assert by.ndim <= array.ndim - final_array_shape = array.shape final_groups_shape = (1,) * (nax - 1) if 1 < nax < by.ndim: From 91e68846e8a8e1b145d510eb5801a0157016f215 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 10 Jul 2023 21:26:59 +0200 Subject: [PATCH 48/57] Have to send a fill_value to npg --- flox/aggregations.py | 1 + flox/core.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/flox/aggregations.py b/flox/aggregations.py index 4c96c92d0..5c72c00de 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -476,6 +476,7 @@ def _pick_second(*x): "cumsum", chunk=None, combine="sum", + fill_value=0, kind="cumulate", ) diff --git a/flox/core.py b/flox/core.py index f6139795d..25026a000 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2074,6 +2074,7 @@ def chunk_cumulate( func: T_Funcs, expected_groups: pd.Index | None, axis: T_AxesOpt = None, + fill_value=None, dtype: T_Dtypes = None, engine: T_Engine = "numpy", ) -> IntermediateDict: @@ -2188,7 +2189,7 @@ def chunk_cumulate( # counts are needed for the final result as well as for masking # optimize that out. for aggregation, dt in zip(funcs, dtypes): - kw_func = dict(size=size, dtype=dt) + kw_func = dict(size=size, dtype=dt, fill_value=fill_value) if callable(aggregation): # passing a custom aggregation for npg to apply per-group is really slow! @@ -2227,6 +2228,7 @@ def _cumulate_blockwise( func=agg.numpy, axis=axis, expected_groups=expected_groups, + fill_value=agg.fill_value["numpy"], dtype=agg.dtype["numpy"], engine=engine, ) From a1e6af7b2649906c3a09b8aeef3130df969bfd52 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 10 Jul 2023 21:39:13 +0200 Subject: [PATCH 49/57] Add asv benchmark --- asv_bench/benchmarks/cumulate.py | 68 ++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 asv_bench/benchmarks/cumulate.py diff --git a/asv_bench/benchmarks/cumulate.py b/asv_bench/benchmarks/cumulate.py new file mode 100644 index 000000000..dd328ca32 --- /dev/null +++ b/asv_bench/benchmarks/cumulate.py @@ -0,0 +1,68 @@ +import numpy as np +import numpy_groupies as npg +import pandas as pd + +import flox + +from . import parameterized + +N = 1000 +funcs = ["cumsum"] +engines = ["numpy"] +expected_groups = [None, pd.IntervalIndex.from_breaks([1, 2, 4])] + + +class ChunkCumulate: + """Time the core reduction function.""" + + def setup(self, *args, **kwargs): + # pre-compile jitted funcs + if "numba" in engines: + for func in funcs: + npg.aggregate_numba.aggregate( + np.ones((100,), dtype=int), np.ones((100,), dtype=int), func=func + ) + raise NotImplementedError + + @parameterized("func, engine, expected_groups", [funcs, engines, expected_groups]) + def time_cumulate(self, func, engine, expected_groups): + flox.groupby_reduce( + self.array, + self.labels, + func=func, + engine=engine, + axis=self.axis, + expected_groups=expected_groups, + ) + + @parameterized("func, engine, expected_groups", [funcs, engines, expected_groups]) + def peakmem_cumulate(self, func, engine, expected_groups): + flox.groupby_reduce( + self.array, + self.labels, + func=func, + engine=engine, + axis=self.axis, + expected_groups=expected_groups, + ) + + +class ChunkCumulate1D(ChunkCumulate): + def setup(self, *args, **kwargs): + self.array = np.ones((N,)) + self.labels = np.repeat(np.arange(5), repeats=N // 5) + self.axis = -1 + + +class ChunkCumulate2D(ChunkCumulate): + def setup(self, *args, **kwargs): + self.array = np.ones((N, N)) + self.labels = np.repeat(np.arange(N // 5), repeats=5) + self.axis = -1 + + +class ChunkCumulate2DAllAxes(ChunkCumulate): + def setup(self, *args, **kwargs): + self.array = np.ones((N, N)) + self.labels = np.repeat(np.arange(N // 5), repeats=5) + self.axis = None From 940659886a8f39cd44d8f3b9a4fb23481b68a1ab Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 10 Jul 2023 21:40:58 +0200 Subject: [PATCH 50/57] Update cumulate.py --- asv_bench/benchmarks/cumulate.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/asv_bench/benchmarks/cumulate.py b/asv_bench/benchmarks/cumulate.py index dd328ca32..1df78775c 100644 --- a/asv_bench/benchmarks/cumulate.py +++ b/asv_bench/benchmarks/cumulate.py @@ -26,7 +26,7 @@ def setup(self, *args, **kwargs): @parameterized("func, engine, expected_groups", [funcs, engines, expected_groups]) def time_cumulate(self, func, engine, expected_groups): - flox.groupby_reduce( + flox.groupby_cumulate( self.array, self.labels, func=func, @@ -37,7 +37,7 @@ def time_cumulate(self, func, engine, expected_groups): @parameterized("func, engine, expected_groups", [funcs, engines, expected_groups]) def peakmem_cumulate(self, func, engine, expected_groups): - flox.groupby_reduce( + flox.groupby_cumulate( self.array, self.labels, func=func, From 54d6ace0790ce087ff0ee7735740c0db954dd951 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Tue, 11 Jul 2023 16:06:37 +0200 Subject: [PATCH 51/57] Update core.py --- flox/core.py | 43 +++++++++---------------------------------- 1 file changed, 9 insertions(+), 34 deletions(-) diff --git a/flox/core.py b/flox/core.py index 46c231105..ee7f4f796 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2082,25 +2082,25 @@ def chunk_cumulate( Wrapper for numpy_groupies aggregate that supports nD ``array`` and mD ``by``. - Core groupby reduction using numpy_groupies. Uses ``pandas.factorize`` to factorize + Core groupby xynykRUIBA using numpy_groupies. Uses ``pandas.factorize`` to factorize ``by``. Offsets the groups if not reducing along all dimensions of ``by``. Always ravels ``by`` to 1D, flattens appropriate dimensions of array. - When dask arrays are passed to groupby_reduce, this function is called on every + When dask arrays are passed to groupby_cumulate, this function is called on every block. Parameters ---------- array : numpy.ndarray - Array of values to reduced + Array of values to cumulated by : numpy.ndarray Array to group by. func : str or Callable or Sequence[str] or Sequence[Callable] - Name of reduction or function, passed to numpy_groupies. - Supports multiple reductions. + Name of cumulation or function, passed to numpy_groupies. + Supports multiple cumulations. axis : (optional) int or Sequence[int] - If None, reduce along all dimensions of array. - Else reduce along specified axes. + If None, cumulate along all dimensions of array. + Else cumulate along specified axes. Returns ------- @@ -2218,7 +2218,7 @@ def _cumulate_blockwise( engine: T_Engine, ) -> FinalResultsDict: """ - Blockwise groupby reduction that produces the final result. This code path is + Blockwise groupby cumulation that produces the final result. This code path is also used for non-dask array aggregations. """ results = chunk_cumulate( @@ -2247,7 +2247,7 @@ def groupby_cumulate( engine: T_Engine = "numpy", ) -> DaskArray: """ - GroupBy reductions using tree reductions for dask.array + GroupBy cumulations Parameters ---------- @@ -2287,8 +2287,6 @@ def groupby_cumulate( ------- result Aggregated result - *groups - Group labels See Also -------- @@ -2397,18 +2395,6 @@ def groupby_cumulate( # TODO: How else to narrow that array.chunks is there? assert isinstance(array, DaskArray) - # if agg.chunk[0] is None and method != "blockwise": - # raise NotImplementedError( - # f"Aggregation {agg.name!r} is only implemented for dask arrays when method='blockwise'." - # f"\n\n Received: {func}" - # ) - - # if method in ["blockwise", "cohorts"] and nax != by_.ndim: - # raise NotImplementedError( - # "Must reduce along all dimensions of `by` when method != 'map-reduce'." - # f"Received method={method!r}" - # ) - # TODO: just do this in dask_groupby_agg # we always need some fill_value (see above) so choose the default if needed if _fill_value is None: @@ -2416,9 +2402,6 @@ def groupby_cumulate( partial_agg = partial(dask_groupby_agg, axis=axis_, fill_value=_fill_value, engine=engine) - # if method == "blockwise" and by_.ndim == 1: - # array = rechunk_for_blockwise(array, axis=-1, labels=by_) - result, groups = partial_agg( array, by_, @@ -2429,14 +2412,6 @@ def groupby_cumulate( sort=_sort, ) - # if sort and method != "map-reduce": - # assert len(groups) == 1 - # sorted_idx = np.argsort(groups[0]) - # # This optimization helps specifically with resampling - # if not (sorted_idx[:-1] <= sorted_idx[1:]).all(): - # result = result[..., sorted_idx] - # groups = (groups[0][sorted_idx],) - if factorize_early: # nan group labels are factorized to -1, and preserved # now we get rid of them by reindexing From 4e0d93e308263e02245c39dda7da794310293599 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 8 Jun 2024 07:49:08 +0000 Subject: [PATCH 52/57] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_core.py b/tests/test_core.py index d1ebbc9e6..62ee94aa4 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1812,4 +1812,4 @@ def test_cumulatives(func: T_Agg) -> None: actual = groupby_cumulate(a, group_idx, func=func, engine="numpy") - np.testing.assert_allclose(expected, actual) \ No newline at end of file + np.testing.assert_allclose(expected, actual) From 9da1246859c0e23ba0bda815f07e1c546f4396aa Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Sat, 8 Jun 2024 09:53:02 +0200 Subject: [PATCH 53/57] Update core.py --- flox/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index 02d029200..e905d39bd 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1866,7 +1866,7 @@ def _reduction_func(a, by, axis, start_group, num_groups): def _groupby_func(a, by, axis, intermediate_dtype, num_groups): blockwise_method = partial( - _get_chunk_reduction(agg.reduction_type), + _get_chunk_aggregation(agg.reduction_type), func=agg.chunk, fill_value=agg.fill_value["intermediate"], dtype=agg.dtype["intermediate"], From c0e9642e79c58daf6d0be576e01db4a80601e3cf Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Sat, 8 Jun 2024 10:03:52 +0200 Subject: [PATCH 54/57] Update core.py --- flox/core.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/flox/core.py b/flox/core.py index e905d39bd..00448390a 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2891,28 +2891,27 @@ def groupby_cumulate( # (pd.IntervalIndex or not) expected_groups = _convert_expected_groups_to_index(expected_groups, isbins, _sort) - # Don't factorize "early only when + # Don't factorize early only when # grouping by dask arrays, and not having expected_groups factorize_early = not ( # can't do it if we are grouping by dask array but don't have expected_groups any(is_dask and ex_ is None for is_dask, ex_ in zip(by_is_dask, expected_groups)) ) + expected_: pd.RangeIndex | None if factorize_early: bys, final_groups, grp_shape = _factorize_multiple( bys, expected_groups, any_by_dask=any_by_dask, - # This is the only way it makes sense I think. - # reindex controls what's actually allocated in chunk_reduce - # At this point, we care about an accurate conversion to codes. - reindex=True, sort=_sort, ) - expected_groups = (pd.RangeIndex(math.prod(grp_shape)),) + expected_ = pd.RangeIndex(math.prod(grp_shape)) + else: + assert expected_groups == (None,) + expected_ = None assert len(bys) == 1 (by_,) = bys - (expected_groups,) = expected_groups if axis is None: axis_ = tuple(array.ndim + np.arange(-by_.ndim, 0)) @@ -2924,9 +2923,9 @@ def groupby_cumulate( has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_) # TODO: make sure expected_groups is unique - if nax == 1 and by_.ndim > 1 and expected_groups is None: + if nax == 1 and by_.ndim > 1 and expected_ is None: if not any_by_dask: - expected_groups = _get_expected_groups(by_, _sort) + expected_ = _get_expected_groups(by_, _sort) else: # When we reduce along all axes, we are guaranteed to see all # groups in the final combine stage, so everything works. @@ -2951,7 +2950,7 @@ def groupby_cumulate( groups: tuple[np.ndarray | DaskArray, ...] if not has_dask: results = _cumulate_blockwise( - array, by_, agg, axis=axis_, expected_groups=expected_groups, engine=engine + array, by_, agg, axis=axis_, expected_groups=expected_, engine=engine ) groups = (results["groups"],) result = results[agg.name] @@ -2971,7 +2970,7 @@ def groupby_cumulate( result, groups = partial_agg( array, by_, - expected_groups=expected_groups, + expected_groups=expected_, agg=agg, reindex=False, method="", # TODO: ? @@ -2982,7 +2981,7 @@ def groupby_cumulate( # nan group labels are factorized to -1, and preserved # now we get rid of them by reindexing # This also handles bins with no data - result = reindex_(result, from_=groups[0], to=expected_groups, fill_value=_fill_value) + result = reindex_(result, from_=groups[0], to=expected_, fill_value=_fill_value) groups = final_groups return result From afb390bd2a6905c6f9ffebf56211be6548f3aaac Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Sat, 8 Jun 2024 10:24:19 +0200 Subject: [PATCH 55/57] Update core.py --- flox/core.py | 164 +++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 140 insertions(+), 24 deletions(-) diff --git a/flox/core.py b/flox/core.py index 00448390a..19e0bb638 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2917,27 +2917,37 @@ def groupby_cumulate( axis_ = tuple(array.ndim + np.arange(-by_.ndim, 0)) else: # TODO: How come this function doesn't exist according to mypy? - axis_ = np.core.numeric.normalize_axis_tuple(axis, array.ndim) # type: ignore + axis_ = normalize_axis_tuple(axis, array.ndim) nax = len(axis_) has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_) + has_cubed = is_duck_cubed_array(array) or is_duck_cubed_array(by_) - # TODO: make sure expected_groups is unique - if nax == 1 and by_.ndim > 1 and expected_ is None: - if not any_by_dask: - expected_ = _get_expected_groups(by_, _sort) - else: - # When we reduce along all axes, we are guaranteed to see all - # groups in the final combine stage, so everything works. - # This is not necessarily true when reducing along a subset of axes - # (of by) - # TODO: Does this depend on chunking of by? - # For e.g., we could relax this if there is only one chunk along all - # by dim != axis? - raise NotImplementedError( - "Please provide ``expected_groups`` when not reducing along all axes." + if _is_first_last_reduction(func): + if has_dask and nax != 1: + raise ValueError( + "For dask arrays: first, last, nanfirst, nanlast reductions are " + "only supported along a single axis. Please reshape appropriately." ) + elif nax not in [1, by_.ndim]: + raise ValueError( + "first, last, nanfirst, nanlast reductions are only supported " + "along a single axis or when reducing across all dimensions of `by`." + ) + + if nax == 1 and by_.ndim > 1 and expected_ is None: + # When we reduce along all axes, we are guaranteed to see all + # groups in the final combine stage, so everything works. + # This is not necessarily true when reducing along a subset of axes + # (of by) + # TODO: Does this depend on chunking of by? + # For e.g., we could relax this if there is only one chunk along all + # by dim != axis? + raise NotImplementedError( + "Please provide ``expected_groups`` when not reducing along all axes." + ) + assert nax <= by_.ndim if nax < by_.ndim: by_ = _move_reduce_dims_to_end(by_, tuple(-array.ndim + ax + by_.ndim for ax in axis_)) @@ -2945,12 +2955,61 @@ def groupby_cumulate( axis_ = tuple(array.ndim + np.arange(-nax, 0)) nax = len(axis_) + # When axis is a subset of possible values; then npg will + # apply the fill_value to groups that don't exist along a particular axis (for e.g.) + # since these count as a group that is absent. thoo! + # fill_value applies to all-NaN groups as well as labels in expected_groups that are not found. + # The only way to do this consistently is mask out using min_count + # Consider np.sum([np.nan]) = np.nan, np.nansum([np.nan]) = 0 + if min_count is None: + if nax < by_.ndim or (fill_value is not None and provided_expected): + min_count_: int = 1 + else: + min_count_ = 0 + else: + min_count_ = min_count + + # TODO: set in xarray? + if min_count_ > 0 and func in ["nansum", "nanprod"] and fill_value is None: + # nansum, nanprod have fill_value=0, 1 + # overwrite than when min_count is set + fill_value = np.nan + + kwargs = dict(axis=axis_, fill_value=fill_value) agg = _initialize_aggregation(func, dtype, array.dtype, _fill_value, 0, None) - groups: tuple[np.ndarray | DaskArray, ...] - if not has_dask: + # Need to set this early using `agg` + # It cannot be done in the core loop of chunk_reduce + # since we "prepare" the data for flox. + kwargs["engine"] = _choose_engine(by_, agg) if engine is None else engine + + groups: tuple[np.ndarray | DaskArray, ...] + if has_cubed: + if method is None: + method = "map-reduce" + + if method not in ("map-reduce", "blockwise"): + raise NotImplementedError( + "Reduction for Cubed arrays is only implemented for methods 'map-reduce' and 'blockwise'." + ) + + partial_agg = partial(cubed_groupby_agg, **kwargs) + + result, groups = partial_agg( + array, + by_, + expected_groups=expected_, + agg=agg, + reindex=reindex, + method=method, + sort=_sort, + ) + + return (result, groups) + + elif not has_dask: results = _cumulate_blockwise( - array, by_, agg, axis=axis_, expected_groups=expected_, engine=engine + array, by_, agg, expected_groups=expected_, reindex=reindex, sort=_sort, **kwargs ) groups = (results["groups"],) result = results[agg.name] @@ -2960,23 +3019,78 @@ def groupby_cumulate( # TODO: How else to narrow that array.chunks is there? assert isinstance(array, DaskArray) + if (not any_by_dask and method is None) or method == "cohorts": + preferred_method, chunks_cohorts = find_group_cohorts( + by_, + [array.chunks[ax] for ax in range(-by_.ndim, 0)], + expected_groups=expected_, + # when provided with cohorts, we *always* 'merge' + merge=(method == "cohorts"), + ) + else: + preferred_method = "map-reduce" + chunks_cohorts = {} + + method = _choose_method(method, preferred_method, agg, by_, nax) + + if agg.chunk[0] is None and method != "blockwise": + raise NotImplementedError( + f"Aggregation {agg.name!r} is only implemented for dask arrays when method='blockwise'." + f"Received method={method!r}" + ) + + if ( + _is_arg_reduction(agg) + and method == "blockwise" + and not all(nchunks == 1 for nchunks in array.numblocks[-nax:]) + ): + raise NotImplementedError( + "arg-reductions are not supported with method='blockwise', use 'cohorts' instead." + ) + + if nax != by_.ndim and method in ["blockwise", "cohorts"]: + raise NotImplementedError( + "Must reduce along all dimensions of `by` when method != 'map-reduce'." + f"Received method={method!r}" + ) + + # TODO: clean this up + reindex = _validate_reindex( + reindex, func, method, expected_, any_by_dask, is_duck_dask_array(array) + ) + + if TYPE_CHECKING: + assert method is not None + # TODO: just do this in dask_groupby_agg # we always need some fill_value (see above) so choose the default if needed - if _fill_value is None: - _fill_value = agg.fill_value[agg.name] + if kwargs["fill_value"] is None: + kwargs["fill_value"] = agg.fill_value[agg.name] - partial_agg = partial(dask_groupby_agg, axis=axis_, fill_value=_fill_value, engine=engine) + partial_agg = partial(dask_groupby_agg, **kwargs) + + if method == "blockwise" and by_.ndim == 1: + array = rechunk_for_blockwise(array, axis=-1, labels=by_) result, groups = partial_agg( array, by_, expected_groups=expected_, agg=agg, - reindex=False, - method="", # TODO: ? + reindex=reindex, + method=method, + chunks_cohorts=chunks_cohorts, sort=_sort, ) + if _sort and method != "map-reduce": + assert len(groups) == 1 + sorted_idx = np.argsort(groups[0]) + # This optimization helps specifically with resampling + if not _issorted(sorted_idx): + result = result[..., sorted_idx] + groups = (groups[0][sorted_idx],) + if factorize_early: # nan group labels are factorized to -1, and preserved # now we get rid of them by reindexing @@ -2984,4 +3098,6 @@ def groupby_cumulate( result = reindex_(result, from_=groups[0], to=expected_, fill_value=_fill_value) groups = final_groups - return result + if is_bool_array and (_is_minmax_reduction(func) or _is_first_last_reduction(func)): + result = result.astype(bool) + return (result, *groups) \ No newline at end of file From 33d24036266e466a68148eb9106020d48438701c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 8 Jun 2024 08:24:35 +0000 Subject: [PATCH 56/57] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- flox/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flox/core.py b/flox/core.py index 19e0bb638..68a042c72 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2935,7 +2935,7 @@ def groupby_cumulate( "first, last, nanfirst, nanlast reductions are only supported " "along a single axis or when reducing across all dimensions of `by`." ) - + if nax == 1 and by_.ndim > 1 and expected_ is None: # When we reduce along all axes, we are guaranteed to see all # groups in the final combine stage, so everything works. @@ -3100,4 +3100,4 @@ def groupby_cumulate( if is_bool_array and (_is_minmax_reduction(func) or _is_first_last_reduction(func)): result = result.astype(bool) - return (result, *groups) \ No newline at end of file + return (result, *groups) From c44d506931b5c4268edce2b66d4986743c43f431 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Sat, 8 Jun 2024 10:27:01 +0200 Subject: [PATCH 57/57] Update core.py --- flox/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index 19e0bb638..fb44c6ba4 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2983,7 +2983,7 @@ def groupby_cumulate( # since we "prepare" the data for flox. kwargs["engine"] = _choose_engine(by_, agg) if engine is None else engine - groups: tuple[np.ndarray | DaskArray, ...] + groups: tuple[np.ndarray | DaskArray, ...] if has_cubed: if method is None: method = "map-reduce"