Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand property tests #373

Merged
merged 7 commits into from
Jul 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:
- name: Run Tests
id: status
run: |
pytest -n auto --cov=./ --cov-report=xml --hypothesis-profile ci
pytest --durations=20 --durations-min=0.5 -n auto --cov=./ --cov-report=xml --hypothesis-profile ci
- name: Upload code coverage to Codecov
uses: codecov/[email protected]
with:
Expand Down
2 changes: 1 addition & 1 deletion flox/aggregate_flox.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def ffill(group_idx, array, *, axis, **kwargs):
(group_starts,) = flag.nonzero()

# https://stackoverflow.com/questions/41190852/most-efficient-way-to-forward-fill-nan-values-in-numpy-array
mask = np.isnan(array)
mask = isnull(array)
# modified from the SO answer, just reset the index at the start of every group!
mask[..., np.asarray(group_starts)] = False

Expand Down
29 changes: 19 additions & 10 deletions flox/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ def generic_aggregate(
if func == "identity":
return array

if func in ["nanfirst", "nanlast"] and array.dtype.kind in "US":
func = func[3:]

if engine == "flox":
try:
method = getattr(aggregate_flox, func)
except AttributeError:
logger.debug(f"Couldn't find {func} for engine='flox'. Falling back to numpy")
# logger.debug(f"Couldn't find {func} for engine='flox'. Falling back to numpy")
method = get_npg_aggregation(func, engine="numpy")

elif engine == "numbagg":
Expand All @@ -91,7 +94,7 @@ def generic_aggregate(
method = getattr(aggregate_numbagg, func)

except AttributeError:
logger.debug(f"Couldn't find {func} for engine='numbagg'. Falling back to numpy")
# logger.debug(f"Couldn't find {func} for engine='numbagg'. Falling back to numpy")
method = get_npg_aggregation(func, engine="numpy")

elif engine in ["numpy", "numba"]:
Expand Down Expand Up @@ -144,6 +147,8 @@ def _maybe_promote_int(dtype) -> np.dtype:

def _get_fill_value(dtype, fill_value):
"""Returns dtype appropriate infinity. Returns +Inf equivalent for None."""
if fill_value in [None, dtypes.NA] and dtype.kind in "US":
return ""
if fill_value == dtypes.INF or fill_value is None:
return dtypes.get_pos_infinity(dtype, max_for_int=True)
if fill_value == dtypes.NINF:
Expand All @@ -153,7 +158,11 @@ def _get_fill_value(dtype, fill_value):
return np.nan
# This is madness, but npg checks that fill_value is compatible
# with array dtype even if the fill_value is never used.
elif np.issubdtype(dtype, np.integer):
elif (
np.issubdtype(dtype, np.integer)
or np.issubdtype(dtype, np.timedelta64)
or np.issubdtype(dtype, np.datetime64)
):
return dtypes.get_neg_infinity(dtype, min_for_int=True)
else:
return None
Expand Down Expand Up @@ -516,10 +525,10 @@ def _pick_second(*x):
final_dtype=np.intp,
)

first = Aggregation("first", chunk=None, combine=None, fill_value=0)
last = Aggregation("last", chunk=None, combine=None, fill_value=0)
nanfirst = Aggregation("nanfirst", chunk="nanfirst", combine="nanfirst", fill_value=np.nan)
nanlast = Aggregation("nanlast", chunk="nanlast", combine="nanlast", fill_value=np.nan)
first = Aggregation("first", chunk=None, combine=None, fill_value=None)
last = Aggregation("last", chunk=None, combine=None, fill_value=None)
nanfirst = Aggregation("nanfirst", chunk="nanfirst", combine="nanfirst", fill_value=dtypes.NA)
nanlast = Aggregation("nanlast", chunk="nanlast", combine="nanlast", fill_value=dtypes.NA)

all_ = Aggregation(
"all",
Expand Down Expand Up @@ -645,8 +654,8 @@ def __post_init__(self):


def reverse(a: AlignedArrays) -> AlignedArrays:
a.group_idx = a.group_idx[::-1]
a.array = a.array[::-1]
a.group_idx = a.group_idx[..., ::-1]
a.array = a.array[..., ::-1]
return a


Expand Down Expand Up @@ -808,7 +817,7 @@ def _initialize_aggregation(
)

final_dtype = _normalize_dtype(dtype_ or agg.dtype_init["final"], array_dtype, fill_value)
if agg.name not in ["min", "max", "nanmin", "nanmax"]:
if agg.name not in ["first", "last", "nanfirst", "nanlast", "min", "max", "nanmin", "nanmax"]:
final_dtype = _maybe_promote_int(final_dtype)
agg.dtype = {
"user": dtype, # Save to automatically choose an engine
Expand Down
37 changes: 21 additions & 16 deletions flox/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2011,7 +2011,7 @@ def _validate_reindex(
reindex = True

assert isinstance(reindex, bool)
logger.debug("Leaving _validate_reindex: reindex is {}".format(reindex)) # noqa
# logger.debug("Leaving _validate_reindex: reindex is {}".format(reindex)) # noqa

return reindex

Expand Down Expand Up @@ -2788,20 +2788,29 @@ def groupby_scan(
if by_.shape[-1] == 1 or by_.shape == grp_shape:
return array.astype(agg.dtype)

# Made a design choice here to have `preprocess` handle both array and group_idx
# Example: for reversing, we need to reverse the whole array, not just reverse
# each block independently
inp = AlignedArrays(array=array, group_idx=by_)
if agg.preprocess:
inp = agg.preprocess(inp)

if not has_dask:
final_state = chunk_scan(
AlignedArrays(array=array, group_idx=by_), axis=single_axis, agg=agg, dtype=agg.dtype
)
return extract_array(final_state)
final_state = chunk_scan(inp, axis=single_axis, agg=agg, dtype=agg.dtype)
result = _finalize_scan(final_state)
else:
return dask_groupby_scan(array, by_, axes=axis_, agg=agg)
result = dask_groupby_scan(inp.array, inp.group_idx, axes=axis_, agg=agg)

# Made a design choice here to have `postprocess` handle both array and group_idx
out = AlignedArrays(array=result, group_idx=by_)
if agg.finalize:
out = agg.finalize(out)
return out.array


def chunk_scan(inp: AlignedArrays, *, axis: int, agg: Scan, dtype=None, keepdims=None) -> ScanState:
assert axis == inp.array.ndim - 1

if agg.preprocess:
inp = agg.preprocess(inp)
# I don't think we need to re-factorize here unless we are grouping by a dask array
accumulated = generic_aggregate(
inp.group_idx,
Expand All @@ -2813,8 +2822,6 @@ def chunk_scan(inp: AlignedArrays, *, axis: int, agg: Scan, dtype=None, keepdims
fill_value=agg.identity,
)
result = AlignedArrays(array=accumulated, group_idx=inp.group_idx)
if agg.finalize:
result = agg.finalize(result)
return ScanState(result=result, state=None)


Expand All @@ -2840,10 +2847,9 @@ def _zip(group_idx: np.ndarray, array: np.ndarray) -> AlignedArrays:
return AlignedArrays(group_idx=group_idx, array=array)


def extract_array(block: ScanState, finalize: Callable | None = None) -> np.ndarray:
def _finalize_scan(block: ScanState) -> np.ndarray:
assert block.result is not None
result = finalize(block.result) if finalize is not None else block.result
return result.array
return block.result.array


def dask_groupby_scan(array, by, axes: T_Axes, agg: Scan) -> DaskArray:
Expand All @@ -2859,9 +2865,8 @@ def dask_groupby_scan(array, by, axes: T_Axes, agg: Scan) -> DaskArray:
array, by = _unify_chunks(array, by)

# 1. zip together group indices & array
to_map = _zip if agg.preprocess is None else tlz.compose(agg.preprocess, _zip)
zipped = map_blocks(
to_map, by, array, dtype=array.dtype, meta=array._meta, name="groupby-scan-preprocess"
_zip, by, array, dtype=array.dtype, meta=array._meta, name="groupby-scan-preprocess"
)

scan_ = partial(chunk_scan, agg=agg)
Expand All @@ -2882,7 +2887,7 @@ def dask_groupby_scan(array, by, axes: T_Axes, agg: Scan) -> DaskArray:
)

# 3. Unzip and extract the final result array, discard groups
result = map_blocks(extract_array, accumulated, dtype=agg.dtype, finalize=agg.finalize)
result = map_blocks(_finalize_scan, accumulated, dtype=agg.dtype)

assert result.chunks == array.chunks

Expand Down
6 changes: 4 additions & 2 deletions flox/xrdtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def get_pos_infinity(dtype, max_for_int=False):

if issubclass(dtype.type, np.integer):
if max_for_int:
dtype = np.int64 if dtype.kind in "Mm" else dtype
return np.iinfo(dtype).max
else:
return np.inf
Expand All @@ -124,8 +125,9 @@ def get_neg_infinity(dtype, min_for_int=False):
fill_value : positive infinity value corresponding to this dtype.
"""

if np.issubdtype(dtype, (np.timedelta64, np.datetime64)):
return dtype.type(np.iinfo(np.int64).min + 1)
if is_datetime_like(dtype):
unit, _ = np.datetime_data(dtype)
return dtype.type(np.iinfo(np.int64).min + 1, unit)

if issubclass(dtype.type, np.floating):
return -np.inf
Expand Down
117 changes: 107 additions & 10 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,25 +101,34 @@ def assert_equal(a, b, tolerance=None):
else:
tolerance = {}

if has_dask and isinstance(a, dask_array_type) or isinstance(b, dask_array_type):
# sometimes it's nice to see values and shapes
# rather than being dropped into some file in dask
np.testing.assert_allclose(a, b, **tolerance)
# does some validation of the dask graph
da.utils.assert_eq(a, b, equal_nan=True)
# Always run the numpy comparison first, so that we get nice error messages with dask.
# sometimes it's nice to see values and shapes
# rather than being dropped into some file in dask
if a.dtype != b.dtype:
raise AssertionError(f"a and b have different dtypes: (a: {a.dtype}, b: {b.dtype})")

if has_dask:
a_eager = a.compute() if isinstance(a, dask_array_type) else a
b_eager = b.compute() if isinstance(b, dask_array_type) else b
else:
if a.dtype != b.dtype:
raise AssertionError(f"a and b have different dtypes: (a: {a.dtype}, b: {b.dtype})")
a_eager, b_eager = a, b

np.testing.assert_allclose(a, b, equal_nan=True, **tolerance)
if a.dtype.kind in "SUMmO":
np.testing.assert_equal(a_eager, b_eager)
else:
np.testing.assert_allclose(a_eager, b_eager, equal_nan=True, **tolerance)

if has_dask and isinstance(a, dask_array_type) or isinstance(b, dask_array_type):
# does some validation of the dask graph
dask_assert_eq(a, b, equal_nan=True)


def assert_equal_tuple(a, b):
"""assert_equal for .blocks indexing tuples"""
assert len(a) == len(b)

for a_, b_ in zip(a, b):
assert type(a_) == type(b_)
assert type(a_) is type(b_)
if isinstance(a_, np.ndarray):
np.testing.assert_array_equal(a_, b_)
else:
Expand Down Expand Up @@ -156,3 +165,91 @@ def assert_equal_tuple(a, b):
"quantile",
"nanquantile",
) + tuple(SCIPY_STATS_FUNCS)


def dask_assert_eq(
a,
b,
check_shape=True,
check_graph=True,
check_meta=True,
check_chunks=True,
check_ndim=True,
check_type=True,
check_dtype=True,
equal_nan=True,
scheduler="sync",
**kwargs,
):
"""dask.array.utils.assert_eq modified to skip value checks. Their code is buggy for some dtypes.
We just check values through numpy and care about validating the graph in this function."""
from dask.array.utils import _get_dt_meta_computed

a_original = a
b_original = b

if isinstance(a, (list, int, float)):
a = np.array(a)
if isinstance(b, (list, int, float)):
b = np.array(b)

a, adt, a_meta, a_computed = _get_dt_meta_computed(
a,
check_shape=check_shape,
check_graph=check_graph,
check_chunks=check_chunks,
check_ndim=check_ndim,
scheduler=scheduler,
)
b, bdt, b_meta, b_computed = _get_dt_meta_computed(
b,
check_shape=check_shape,
check_graph=check_graph,
check_chunks=check_chunks,
check_ndim=check_ndim,
scheduler=scheduler,
)

if check_type:
_a = a if a.shape else a.item()
_b = b if b.shape else b.item()
assert type(_a) is type(_b), f"a and b have different types (a: {type(_a)}, b: {type(_b)})"
if check_meta:
if hasattr(a, "_meta") and hasattr(b, "_meta"):
dask_assert_eq(a._meta, b._meta)
if hasattr(a_original, "_meta"):
msg = (
f"compute()-ing 'a' changes its number of dimensions "
f"(before: {a_original._meta.ndim}, after: {a.ndim})"
)
assert a_original._meta.ndim == a.ndim, msg
if a_meta is not None:
msg = (
f"compute()-ing 'a' changes its type "
f"(before: {type(a_original._meta)}, after: {type(a_meta)})"
)
assert type(a_original._meta) is type(a_meta), msg
if not (np.isscalar(a_meta) or np.isscalar(a_computed)):
msg = (
f"compute()-ing 'a' results in a different type than implied by its metadata "
f"(meta: {type(a_meta)}, computed: {type(a_computed)})"
)
assert type(a_meta) is type(a_computed), msg
if hasattr(b_original, "_meta"):
msg = (
f"compute()-ing 'b' changes its number of dimensions "
f"(before: {b_original._meta.ndim}, after: {b.ndim})"
)
assert b_original._meta.ndim == b.ndim, msg
if b_meta is not None:
msg = (
f"compute()-ing 'b' changes its type "
f"(before: {type(b_original._meta)}, after: {type(b_meta)})"
)
assert type(b_original._meta) is type(b_meta), msg
if not (np.isscalar(b_meta) or np.isscalar(b_computed)):
msg = (
f"compute()-ing 'b' results in a different type than implied by its metadata "
f"(meta: {type(b_meta)}, computed: {type(b_computed)})"
)
assert type(b_meta) is type(b_computed), msg
12 changes: 12 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,15 @@
)
def engine(request):
return request.param


@pytest.fixture(
scope="module",
params=[
"flox",
"numpy",
pytest.param("numbagg", marks=requires_numbagg),
],
)
def engine_no_numba(request):
return request.param
Loading
Loading