From 61f9e42ef215485b16d83be9726040b829d5a3e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20B=C3=A5venstrand?= Date: Tue, 11 Jul 2023 19:40:34 +0200 Subject: [PATCH] =?UTF-8?q?fix(data=20cleaning):=20=F0=9F=90=9B=20Switched?= =?UTF-8?q?=20to=20HDF5=20as=20file=20format=20for=20faster=20I/O=20and=20?= =?UTF-8?q?better=20SageMaker=20support.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Arrow file format is no longer used for storing output in the cache. --- mleko/cache/format/vaex_cache_format_mixin.py | 28 ++++++++++--------- .../dataset/convert/csv_to_vaex_converter.py | 13 +++++---- mleko/dataset/ingest/s3_ingester.py | 7 +++-- poetry.lock | 2 +- pyproject.toml | 1 + .../test_vaex_arrow_cache_format_mixin.py | 2 +- tests/conftest.py | 8 +++--- .../convert/test_csv_to_vaex_converter.py | 20 ++++++------- 8 files changed, 44 insertions(+), 37 deletions(-) diff --git a/mleko/cache/format/vaex_cache_format_mixin.py b/mleko/cache/format/vaex_cache_format_mixin.py index 9648dacd..139b7404 100644 --- a/mleko/cache/format/vaex_cache_format_mixin.py +++ b/mleko/cache/format/vaex_cache_format_mixin.py @@ -1,6 +1,7 @@ -"""The module containing the mixin class for `vaex` DataFrames to provide Arrow format caching capabilities.""" +"""The module containing the mixin class for `vaex` DataFrames to provide HDF5 format caching capabilities.""" from __future__ import annotations +import warnings from pathlib import Path import vaex @@ -10,9 +11,9 @@ class VaexCacheFormatMixin: - """A mixin class for `vaex` DataFrames to provide Arrow format caching capabilities. + """A mixin class for `vaex` DataFrames to provide HDF5 format caching capabilities. - This mixin class adds methods for reading and writing arrow cache files for `vaex` DataFrames. + This mixin class adds methods for reading and writing HDF5 cache files for `vaex` DataFrames. This mixin class is intended to be used with the `Cache` class. It is not intended to be used directly. Warning: @@ -23,7 +24,7 @@ class VaexCacheFormatMixin: >>> pass """ - _cache_file_suffix = "arrow" + _cache_file_suffix = "hdf5" """The file extension to use for cache files.""" def _read_cache_file(self, cache_file_path: Path) -> vaex.DataFrame: @@ -38,17 +39,18 @@ def _read_cache_file(self, cache_file_path: Path) -> vaex.DataFrame: return vaex.open(cache_file_path) def _write_cache_file(self, cache_file_path: Path, output: vaex.DataFrame) -> None: - """Writes the results of the DataFrame conversion to Arrow format in a cache file with `.arrow` suffix. + """Writes the results of the DataFrame conversion to HDF5 format in a cache file with `.hdf5` suffix. Args: cache_file_path: The path of the cache file to be written. output: The Vaex DataFrame to be saved in the cache file. """ - with tqdm(total=100, desc=f"Writing DataFrame to .{self._cache_file_suffix} file") as pbar: - output.export( - cache_file_path, - progress=set_tqdm_percent_wrapper(pbar), - parallel=True, - chunk_size=100_000, - ) - output.close() + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", "invalid value encountered in cast") + with tqdm(total=100, desc=f"Writing DataFrame to .{self._cache_file_suffix} file") as pbar: + output.export( + cache_file_path, + progress=set_tqdm_percent_wrapper(pbar), + parallel=True, + chunk_size=100_000, + ) diff --git a/mleko/dataset/convert/csv_to_vaex_converter.py b/mleko/dataset/convert/csv_to_vaex_converter.py index ef01f4e9..7894933f 100644 --- a/mleko/dataset/convert/csv_to_vaex_converter.py +++ b/mleko/dataset/convert/csv_to_vaex_converter.py @@ -163,7 +163,6 @@ def convert( def _convert_csv_file_to_arrow( file_path: Path | str, output_directory: Path, - dataframe_suffix: str, forced_numerical_columns: tuple[str, ...], forced_categorical_columns: tuple[str, ...], forced_boolean_columns: tuple[str, ...], @@ -181,7 +180,6 @@ def _convert_csv_file_to_arrow( Args: file_path: The path of the CSV file to be converted. output_directory: The directory where the converted file should be saved. - dataframe_suffix: The suffix for the converted dataframe files. forced_numerical_columns: A sequence of column names to be forced to numerical type. forced_categorical_columns: A sequence of column names to be forced to categorical type. forced_boolean_columns: A sequence of column names to be forced to boolean type. @@ -229,7 +227,7 @@ def _convert_csv_file_to_arrow( if get_column(df_chunk, column_name).dtype in (pa.date32(), pa.date64()): df_chunk[column_name] = get_column(df_chunk, column_name).astype("datetime64[s]") - output_path = output_directory / f"df_chunk_{file_path.stem}.{dataframe_suffix}" + output_path = output_directory / f"df_chunk_{file_path.stem}.arrow" df_chunk.export(output_path, chunk_size=100_000, parallel=False) df_chunk.close() @@ -250,7 +248,6 @@ def _convert(self, file_paths: list[Path] | list[str]) -> vaex.DataFrame: CSVToVaexConverter._convert_csv_file_to_arrow, file_paths, repeat(self._cache_directory), - repeat(self._cache_file_suffix), repeat(self._forced_numerical_columns), repeat(self._forced_categorical_columns), repeat(self._forced_boolean_columns), @@ -262,7 +259,11 @@ def _convert(self, file_paths: list[Path] | list[str]) -> vaex.DataFrame: ): pbar.update(1) - return vaex.open(self._cache_directory / f"df_chunk_*.{self._cache_file_suffix}") + df = vaex.open(self._cache_directory / "df_chunk_*.arrow") + for column_name in df.get_column_names(dtype=pa.null()): + df[column_name] = df[column_name].astype("string") + + return df def _write_cache_file(self, cache_file_path: Path, output: vaex.DataFrame) -> None: """Writes the results of the DataFrame conversion to Arrow format in a cache file with arrow suffix. @@ -272,4 +273,4 @@ def _write_cache_file(self, cache_file_path: Path, output: vaex.DataFrame) -> No output: The Vaex DataFrame to be saved in the cache file. """ super()._write_cache_file(cache_file_path, output) - clear_directory(cache_file_path.parent, pattern=f"df_chunk_*.{self._cache_file_suffix}") + clear_directory(cache_file_path.parent, pattern="df_chunk_*.arrow") diff --git a/mleko/dataset/ingest/s3_ingester.py b/mleko/dataset/ingest/s3_ingester.py index a1cb24f4..5178b389 100644 --- a/mleko/dataset/ingest/s3_ingester.py +++ b/mleko/dataset/ingest/s3_ingester.py @@ -79,7 +79,9 @@ def __init__( super().__init__(destination_directory) self._s3_bucket_name = s3_bucket_name self._s3_key_prefix = s3_key_prefix - self._s3_client = self._get_s3_client(aws_profile_name, aws_region_name) + self._aws_profile_name = aws_profile_name + self._aws_region_name = aws_region_name + self._s3_client = self._get_s3_client(self._aws_profile_name, self._aws_region_name) self._num_workers = num_workers self._manifest_file_name = manifest_file_name self._check_s3_timestamps = check_s3_timestamps @@ -100,6 +102,7 @@ def fetch_data(self, force_recompute: bool = False) -> list[Path]: Returns: A list of Path objects pointing to the downloaded data files. """ + self._s3_client = self._get_s3_client(self._aws_profile_name, self._aws_region_name) resp = self._s3_client.list_objects( Bucket=self._s3_bucket_name, Prefix=self._s3_key_prefix, @@ -107,7 +110,7 @@ def fetch_data(self, force_recompute: bool = False) -> list[Path]: if self._check_s3_timestamps: modification_dates = {key["LastModified"].day for key in resp["Contents"] if "LastModified" in key} - if len(modification_dates) != 1: + if len(modification_dates) > 1: raise Exception( "Files in S3 are from muliples dates. This might mean the data is corrupted/duplicated." ) diff --git a/poetry.lock b/poetry.lock index e457cc0b..cbb8e3f6 100644 --- a/poetry.lock +++ b/poetry.lock @@ -5414,4 +5414,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = ">=3.8,<3.11.dev0" -content-hash = "2e8d1c908373f3e294c6d872ace04a13b5e637b41ab26ef7d98a459f543828b5" +content-hash = "cc932638dfd02e88968c4751eccfc778d554a81563cd20d2ef19517d6557535d" diff --git a/pyproject.toml b/pyproject.toml index 12d44487..1312ecac 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ tqdm = "^4.65.0" vaex = "^4.16.0" scikit-learn = "^1.2.2" joblib = "^1.3.1" +pandas = "^2.0.3" [tool.poetry.group.dev.dependencies] Pygments = ">=2.10.0" diff --git a/tests/cache/format/test_vaex_arrow_cache_format_mixin.py b/tests/cache/format/test_vaex_arrow_cache_format_mixin.py index ab286096..7bac6338 100644 --- a/tests/cache/format/test_vaex_arrow_cache_format_mixin.py +++ b/tests/cache/format/test_vaex_arrow_cache_format_mixin.py @@ -33,4 +33,4 @@ def test_vaex_dataframe_arrow_mixin(self, temporary_directory: Path): df = my_test_instance.my_method(vaex.from_arrays(x=[1, 2, 3])) assert df.x.tolist() == [1, 2, 3] - assert len(list(temporary_directory.glob("*.arrow"))) == 1 + assert len(list(temporary_directory.glob("*.hdf5"))) == 1 diff --git a/tests/conftest.py b/tests/conftest.py index 06ad6ce9..a046c9de 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -41,10 +41,10 @@ def generate_csv_files(directory_path: Path, n_files: int, gzipped: bool = False file_path = directory_path / f"{uuid.uuid4()}.csv" with open(file_path, "w", newline="") as file: writer = csv.writer(file) - writer.writerow(["Time", "Date", "Count", "Name", "Is Best"]) - writer.writerow(["2023-01-01 20:00:00", "2023-01-01", 3, "Linux", False]) - writer.writerow(["2023-01-01 20:00:00", "2023-01-01", 5.4, "Windows", False]) - writer.writerow(["2023-01-01 20:00:00", "2023-01-01", -1, "-9999", True]) + writer.writerow(["Time", "Date", "Count", "Name", "Is_Best", "Extra_Column"]) + writer.writerow(["2023-01-01 20:00:00", "2023-01-01", 3, "Linux", False, None]) + writer.writerow(["2023-01-01 20:00:00", "2023-01-01", 5.4, "Windows", False, None]) + writer.writerow(["2023-01-01 20:00:00", "2023-01-01", -1, "-9999", True, None]) if gzipped: with open(file_path, "rb") as f_in, gzip.open(file_path.with_suffix(".gz"), "wb") as f_out: diff --git a/tests/dataset/convert/test_csv_to_vaex_converter.py b/tests/dataset/convert/test_csv_to_vaex_converter.py index 167538fc..187cb1b7 100644 --- a/tests/dataset/convert/test_csv_to_vaex_converter.py +++ b/tests/dataset/convert/test_csv_to_vaex_converter.py @@ -28,9 +28,9 @@ def test_convert(self, temporary_directory: Path): dfs = [vaex.open(f) for f in arrow_files] for df in dfs: - assert str(list(df.dtypes)) == "[datetime64[s], datetime64[s], float64, string, bool]" - assert df.column_names == ["Time", "Date", "Count", "Name", "Is Best"] - assert df.shape == (3, 5) + assert str(list(df.dtypes)) == "[datetime64[s], datetime64[s], float64, string, bool, null]" + assert df.column_names == ["Time", "Date", "Count", "Name", "Is_Best", "Extra_Column"] + assert df.shape == (3, 6) assert df.Name.countna() == 1 df.close() @@ -40,7 +40,7 @@ def test_cache_hit(self, temporary_directory: Path): temporary_directory, forced_numerical_columns=["Count"], forced_categorical_columns=["Name"], - forced_boolean_columns=["Is Best"], + forced_boolean_columns=["Is_Best"], num_workers=1, ) @@ -48,19 +48,19 @@ def test_cache_hit(self, temporary_directory: Path): file_paths = generate_csv_files(temporary_directory, n_files) df = csv_to_arrow_converter.convert(file_paths, force_recompute=False) - assert str(list(df.dtypes)) == "[datetime64[s], datetime64[s], float64, string, bool]" - assert df.column_names == ["Time", "Date", "Count", "Name", "Is Best"] - assert df.shape == (n_files * 3, 5) + assert str(list(df.dtypes)) == "[datetime64[s], datetime64[s], float64, string, bool, string]" + assert df.column_names == ["Time", "Date", "Count", "Name", "Is_Best", "Extra_Column"] + assert df.shape == (n_files * 3, 6) assert df.Name.countna() == n_files assert len(glob.glob(str(temporary_directory / "df_chunk_*.arrow"))) == 0 - assert len(glob.glob(str(temporary_directory / "*.arrow"))) == 1 + assert len(glob.glob(str(temporary_directory / "*.hdf5"))) == 1 with patch.object(CSVToVaexConverter, "_convert") as patched_convert: new_csv_to_arrow_converter = CSVToVaexConverter( temporary_directory, forced_numerical_columns=["Count"], forced_categorical_columns=["Name"], - forced_boolean_columns=["Is Best"], + forced_boolean_columns=["Is_Best"], num_workers=1, ) new_csv_to_arrow_converter.convert(file_paths, force_recompute=False) @@ -82,6 +82,6 @@ def test_cache_miss(self, temporary_directory: Path): ) df_new = new_csv_to_arrow_converter.convert(file_paths, force_recompute=False) - assert len(glob.glob(str(temporary_directory / "*.arrow"))) == 2 + assert len(glob.glob(str(temporary_directory / "*.hdf5"))) == 2 df.close() df_new.close()