Skip to content

Commit

Permalink
fix(data cleaning): 🐛 Switched to HDF5 as file format for faster I/O …
Browse files Browse the repository at this point in the history
…and better SageMaker support.

Arrow file format is no longer used for storing output in the cache.
  • Loading branch information
Erik Båvenstrand committed Jul 11, 2023
1 parent ad3d8f8 commit 61f9e42
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 37 deletions.
28 changes: 15 additions & 13 deletions mleko/cache/format/vaex_cache_format_mixin.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""The module containing the mixin class for `vaex` DataFrames to provide Arrow format caching capabilities."""
"""The module containing the mixin class for `vaex` DataFrames to provide HDF5 format caching capabilities."""
from __future__ import annotations

import warnings
from pathlib import Path

import vaex
Expand All @@ -10,9 +11,9 @@


class VaexCacheFormatMixin:
"""A mixin class for `vaex` DataFrames to provide Arrow format caching capabilities.
"""A mixin class for `vaex` DataFrames to provide HDF5 format caching capabilities.
This mixin class adds methods for reading and writing arrow cache files for `vaex` DataFrames.
This mixin class adds methods for reading and writing HDF5 cache files for `vaex` DataFrames.
This mixin class is intended to be used with the `Cache` class. It is not intended to be used directly.
Warning:
Expand All @@ -23,7 +24,7 @@ class VaexCacheFormatMixin:
>>> pass
"""

_cache_file_suffix = "arrow"
_cache_file_suffix = "hdf5"
"""The file extension to use for cache files."""

def _read_cache_file(self, cache_file_path: Path) -> vaex.DataFrame:
Expand All @@ -38,17 +39,18 @@ def _read_cache_file(self, cache_file_path: Path) -> vaex.DataFrame:
return vaex.open(cache_file_path)

def _write_cache_file(self, cache_file_path: Path, output: vaex.DataFrame) -> None:
"""Writes the results of the DataFrame conversion to Arrow format in a cache file with `.arrow` suffix.
"""Writes the results of the DataFrame conversion to HDF5 format in a cache file with `.hdf5` suffix.
Args:
cache_file_path: The path of the cache file to be written.
output: The Vaex DataFrame to be saved in the cache file.
"""
with tqdm(total=100, desc=f"Writing DataFrame to .{self._cache_file_suffix} file") as pbar:
output.export(
cache_file_path,
progress=set_tqdm_percent_wrapper(pbar),
parallel=True,
chunk_size=100_000,
)
output.close()
with warnings.catch_warnings():
warnings.filterwarnings("ignore", "invalid value encountered in cast")
with tqdm(total=100, desc=f"Writing DataFrame to .{self._cache_file_suffix} file") as pbar:
output.export(
cache_file_path,
progress=set_tqdm_percent_wrapper(pbar),
parallel=True,
chunk_size=100_000,
)
13 changes: 7 additions & 6 deletions mleko/dataset/convert/csv_to_vaex_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ def convert(
def _convert_csv_file_to_arrow(
file_path: Path | str,
output_directory: Path,
dataframe_suffix: str,
forced_numerical_columns: tuple[str, ...],
forced_categorical_columns: tuple[str, ...],
forced_boolean_columns: tuple[str, ...],
Expand All @@ -181,7 +180,6 @@ def _convert_csv_file_to_arrow(
Args:
file_path: The path of the CSV file to be converted.
output_directory: The directory where the converted file should be saved.
dataframe_suffix: The suffix for the converted dataframe files.
forced_numerical_columns: A sequence of column names to be forced to numerical type.
forced_categorical_columns: A sequence of column names to be forced to categorical type.
forced_boolean_columns: A sequence of column names to be forced to boolean type.
Expand Down Expand Up @@ -229,7 +227,7 @@ def _convert_csv_file_to_arrow(
if get_column(df_chunk, column_name).dtype in (pa.date32(), pa.date64()):
df_chunk[column_name] = get_column(df_chunk, column_name).astype("datetime64[s]")

output_path = output_directory / f"df_chunk_{file_path.stem}.{dataframe_suffix}"
output_path = output_directory / f"df_chunk_{file_path.stem}.arrow"
df_chunk.export(output_path, chunk_size=100_000, parallel=False)
df_chunk.close()

Expand All @@ -250,7 +248,6 @@ def _convert(self, file_paths: list[Path] | list[str]) -> vaex.DataFrame:
CSVToVaexConverter._convert_csv_file_to_arrow,
file_paths,
repeat(self._cache_directory),
repeat(self._cache_file_suffix),
repeat(self._forced_numerical_columns),
repeat(self._forced_categorical_columns),
repeat(self._forced_boolean_columns),
Expand All @@ -262,7 +259,11 @@ def _convert(self, file_paths: list[Path] | list[str]) -> vaex.DataFrame:
):
pbar.update(1)

return vaex.open(self._cache_directory / f"df_chunk_*.{self._cache_file_suffix}")
df = vaex.open(self._cache_directory / "df_chunk_*.arrow")
for column_name in df.get_column_names(dtype=pa.null()):
df[column_name] = df[column_name].astype("string")

return df

def _write_cache_file(self, cache_file_path: Path, output: vaex.DataFrame) -> None:
"""Writes the results of the DataFrame conversion to Arrow format in a cache file with arrow suffix.
Expand All @@ -272,4 +273,4 @@ def _write_cache_file(self, cache_file_path: Path, output: vaex.DataFrame) -> No
output: The Vaex DataFrame to be saved in the cache file.
"""
super()._write_cache_file(cache_file_path, output)
clear_directory(cache_file_path.parent, pattern=f"df_chunk_*.{self._cache_file_suffix}")
clear_directory(cache_file_path.parent, pattern="df_chunk_*.arrow")
7 changes: 5 additions & 2 deletions mleko/dataset/ingest/s3_ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ def __init__(
super().__init__(destination_directory)
self._s3_bucket_name = s3_bucket_name
self._s3_key_prefix = s3_key_prefix
self._s3_client = self._get_s3_client(aws_profile_name, aws_region_name)
self._aws_profile_name = aws_profile_name
self._aws_region_name = aws_region_name
self._s3_client = self._get_s3_client(self._aws_profile_name, self._aws_region_name)
self._num_workers = num_workers
self._manifest_file_name = manifest_file_name
self._check_s3_timestamps = check_s3_timestamps
Expand All @@ -100,14 +102,15 @@ def fetch_data(self, force_recompute: bool = False) -> list[Path]:
Returns:
A list of Path objects pointing to the downloaded data files.
"""
self._s3_client = self._get_s3_client(self._aws_profile_name, self._aws_region_name)
resp = self._s3_client.list_objects(
Bucket=self._s3_bucket_name,
Prefix=self._s3_key_prefix,
)

if self._check_s3_timestamps:
modification_dates = {key["LastModified"].day for key in resp["Contents"] if "LastModified" in key}
if len(modification_dates) != 1:
if len(modification_dates) > 1:
raise Exception(
"Files in S3 are from muliples dates. This might mean the data is corrupted/duplicated."
)
Expand Down
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ tqdm = "^4.65.0"
vaex = "^4.16.0"
scikit-learn = "^1.2.2"
joblib = "^1.3.1"
pandas = "^2.0.3"

[tool.poetry.group.dev.dependencies]
Pygments = ">=2.10.0"
Expand Down
2 changes: 1 addition & 1 deletion tests/cache/format/test_vaex_arrow_cache_format_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ def test_vaex_dataframe_arrow_mixin(self, temporary_directory: Path):
df = my_test_instance.my_method(vaex.from_arrays(x=[1, 2, 3]))
assert df.x.tolist() == [1, 2, 3]

assert len(list(temporary_directory.glob("*.arrow"))) == 1
assert len(list(temporary_directory.glob("*.hdf5"))) == 1
8 changes: 4 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ def generate_csv_files(directory_path: Path, n_files: int, gzipped: bool = False
file_path = directory_path / f"{uuid.uuid4()}.csv"
with open(file_path, "w", newline="") as file:
writer = csv.writer(file)
writer.writerow(["Time", "Date", "Count", "Name", "Is Best"])
writer.writerow(["2023-01-01 20:00:00", "2023-01-01", 3, "Linux", False])
writer.writerow(["2023-01-01 20:00:00", "2023-01-01", 5.4, "Windows", False])
writer.writerow(["2023-01-01 20:00:00", "2023-01-01", -1, "-9999", True])
writer.writerow(["Time", "Date", "Count", "Name", "Is_Best", "Extra_Column"])
writer.writerow(["2023-01-01 20:00:00", "2023-01-01", 3, "Linux", False, None])
writer.writerow(["2023-01-01 20:00:00", "2023-01-01", 5.4, "Windows", False, None])
writer.writerow(["2023-01-01 20:00:00", "2023-01-01", -1, "-9999", True, None])

if gzipped:
with open(file_path, "rb") as f_in, gzip.open(file_path.with_suffix(".gz"), "wb") as f_out:
Expand Down
20 changes: 10 additions & 10 deletions tests/dataset/convert/test_csv_to_vaex_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ def test_convert(self, temporary_directory: Path):
dfs = [vaex.open(f) for f in arrow_files]

for df in dfs:
assert str(list(df.dtypes)) == "[datetime64[s], datetime64[s], float64, string, bool]"
assert df.column_names == ["Time", "Date", "Count", "Name", "Is Best"]
assert df.shape == (3, 5)
assert str(list(df.dtypes)) == "[datetime64[s], datetime64[s], float64, string, bool, null]"
assert df.column_names == ["Time", "Date", "Count", "Name", "Is_Best", "Extra_Column"]
assert df.shape == (3, 6)
assert df.Name.countna() == 1
df.close()

Expand All @@ -40,27 +40,27 @@ def test_cache_hit(self, temporary_directory: Path):
temporary_directory,
forced_numerical_columns=["Count"],
forced_categorical_columns=["Name"],
forced_boolean_columns=["Is Best"],
forced_boolean_columns=["Is_Best"],
num_workers=1,
)

n_files = 1
file_paths = generate_csv_files(temporary_directory, n_files)
df = csv_to_arrow_converter.convert(file_paths, force_recompute=False)

assert str(list(df.dtypes)) == "[datetime64[s], datetime64[s], float64, string, bool]"
assert df.column_names == ["Time", "Date", "Count", "Name", "Is Best"]
assert df.shape == (n_files * 3, 5)
assert str(list(df.dtypes)) == "[datetime64[s], datetime64[s], float64, string, bool, string]"
assert df.column_names == ["Time", "Date", "Count", "Name", "Is_Best", "Extra_Column"]
assert df.shape == (n_files * 3, 6)
assert df.Name.countna() == n_files
assert len(glob.glob(str(temporary_directory / "df_chunk_*.arrow"))) == 0
assert len(glob.glob(str(temporary_directory / "*.arrow"))) == 1
assert len(glob.glob(str(temporary_directory / "*.hdf5"))) == 1

with patch.object(CSVToVaexConverter, "_convert") as patched_convert:
new_csv_to_arrow_converter = CSVToVaexConverter(
temporary_directory,
forced_numerical_columns=["Count"],
forced_categorical_columns=["Name"],
forced_boolean_columns=["Is Best"],
forced_boolean_columns=["Is_Best"],
num_workers=1,
)
new_csv_to_arrow_converter.convert(file_paths, force_recompute=False)
Expand All @@ -82,6 +82,6 @@ def test_cache_miss(self, temporary_directory: Path):
)
df_new = new_csv_to_arrow_converter.convert(file_paths, force_recompute=False)

assert len(glob.glob(str(temporary_directory / "*.arrow"))) == 2
assert len(glob.glob(str(temporary_directory / "*.hdf5"))) == 2
df.close()
df_new.close()

0 comments on commit 61f9e42

Please sign in to comment.