Skip to content

Commit

Permalink
IMGW Hydrology: Improve speed of downloading data
Browse files Browse the repository at this point in the history
  • Loading branch information
gutzbenj committed Dec 29, 2023
1 parent 41e1e63 commit 7339c1c
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Development
- EA Hydrology: Read json data from bytes
- Replace partial with lambda in most places
- IMGW: Use ttl of 5 minutes for caching
- IMGW Hydrology: Improve speed of downloading data

0.69.0 (18.12.2023)
*******************
Expand Down
82 changes: 51 additions & 31 deletions wetterdienst/provider/imgw/hydrology/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
import re
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
from io import BytesIO
from typing import Optional, Union

import pandas as pd
import polars as pl
from dateutil.relativedelta import relativedelta
from fsspec.implementations.zip import ZipFileSystem
from zoneinfo import ZoneInfo

from wetterdienst import Kind, Period, Provider, Resolution, Settings
from wetterdienst.core.timeseries.request import TimeseriesRequest
Expand All @@ -23,7 +25,7 @@
from wetterdienst.metadata.unit import OriginUnit, SIUnit, UnitEnum
from wetterdienst.util.cache import CacheExpiry
from wetterdienst.util.geo import convert_dms_string_to_dd
from wetterdienst.util.network import download_file, list_remote_files_fsspec
from wetterdienst.util.network import _download_file, download_file
from wetterdienst.util.parameter import DatasetTreeCore


Expand Down Expand Up @@ -140,14 +142,21 @@ def _collect_station_parameter(self, station_id: str, parameter: Enum, dataset:
:param dataset:
:return:
"""
urls = self._get_urls(dataset)

def _try_download_file(url: str) -> Optional[BytesIO]:
try:
return _download_file(url=url, settings=self.sr.settings, ttl=CacheExpiry.FIVE_MINUTES)
except Exception:
return None

urls = self._generate_urls(dataset)
with ThreadPoolExecutor() as p:
files_in_bytes = p.map(
lambda file: download_file(url=file, settings=self.sr.settings, ttl=CacheExpiry.FIVE_MINUTES), urls
)
files_in_bytes = p.map(lambda file: _try_download_file(url=file), urls)
data = []
file_schema = self._file_schema[self.sr.resolution.name.lower()][dataset.name.lower()]
for file_in_bytes in files_in_bytes:
if not file_in_bytes:
continue
df = self._parse_file(file_in_bytes=file_in_bytes, station_id=station_id, file_schema=file_schema)
if not df.is_empty():
data.append(df)
Expand All @@ -163,7 +172,7 @@ def _collect_station_parameter(self, station_id: str, parameter: Enum, dataset:
pl.lit(None, dtype=pl.Float64).alias("quality"),
)

def _parse_file(self, file_in_bytes: bytes, station_id: str, file_schema: dict) -> pl.DataFrame:
def _parse_file(self, file_in_bytes: BytesIO, station_id: str, file_schema: dict) -> pl.DataFrame:
"""Function to parse hydrological zip file, parses all files and combines
them
Expand Down Expand Up @@ -227,7 +236,7 @@ def __parse_file(self, file: bytes, station_id: str, schema: dict) -> pl.DataFra
if self.sr.resolution == Resolution.DAILY:
id_vars = ["station_id", "date"]
else:
# monthly data includes extrimity column (1: min, 2: mean, 3: max)
# monthly data includes extremity column (1: min, 2: mean, 3: max)
id_vars = ["station_id", "date", "extremity"]
df = df.melt(id_vars=id_vars, variable_name="parameter", value_name="value")
df = df.with_columns(pl.col("value").cast(pl.Float64))
Expand All @@ -246,28 +255,34 @@ def __parse_file(self, file: bytes, station_id: str, schema: dict) -> pl.DataFra
)
return df

def _get_urls(self, dataset: Enum) -> pl.Series:
def _generate_urls(self, dataset: Enum) -> pl.Series:
"""Get file urls from server
:param dataset: dataset for which the filelist is retrieved
:return:
"""
res = self.sr.stations._resolution_base[self.sr.resolution.name]
url = self._endpoint.format(resolution=res.value, dataset=dataset.value)
files = list_remote_files_fsspec(url, self.sr.settings)
df_files = pl.DataFrame({"url": files})
df_files = df_files.with_columns(pl.col("url").str.split("/").list.last().alias("file"))
df_files = df_files.filter(pl.col("file").str.ends_with(".zip") & ~pl.col("file").str.starts_with("zjaw"))
if self.sr.resolution == Resolution.DAILY:
months = list(range(1, 13))
file_prefix = "codz"
else:
months = [None]
file_prefix = "mies"
start_year = self.sr.start_date.year if self.sr.start_date else 1951
start_year = max(start_year, 1951)
current_year = dt.date.today().year - 1
end_year = self.sr.end_date.year if self.sr.end_date else current_year
end_year = min(end_year, current_year)
files_generated = [
{"file": f"{file_prefix}_{year}{f'_{month:02d}' if month else ''}.zip", "year": year, "month": month}
for year in range(start_year, end_year + 1)
for month in months
]
df_files = pl.DataFrame(files_generated)
if self.sr.start_date:
interval = pd.Interval(pd.Timestamp(self.sr.start_date), pd.Timestamp(self.sr.end_date), closed="both")
if self.sr.resolution == Resolution.DAILY:
df_files = df_files.with_columns(
pl.col("file").str.strip_chars_end(".zip").str.split("_").list.slice(1).alias("year_month")
)
df_files = df_files.with_columns(
pl.col("year_month").list.first().cast(pl.Int64).alias("year"),
pl.col("year_month").list.last().cast(pl.Int64).alias("month"),
)
df_files = df_files.with_columns(
pl.when(pl.col("month") <= 2).then(pl.col("year") - 1).otherwise(pl.col("year")).alias("year"),
pl.when(pl.col("month").add(10).gt(12))
Expand All @@ -287,21 +302,23 @@ def _get_urls(self, dataset: Enum) -> pl.Series:
)
else:
df_files = df_files.with_columns(
pl.col("file")
.str.strip_chars_end(".zip")
.str.split("_")
.list.last()
.str.to_datetime("%Y", time_zone="UTC", strict=False)
pl.col("year")
.map_elements(
lambda d: [d - relativedelta(months=2), d + relativedelta(months=11) - relativedelta(days=1)]
lambda year: [
dt.datetime(
year=year,
month=1,
day=1,
tzinfo=ZoneInfo("UTC"),
)
- relativedelta(months=2),
dt.datetime(year=year, month=1, day=1, tzinfo=ZoneInfo("UTC"))
+ relativedelta(months=11)
- relativedelta(days=1),
]
)
.alias("date_range")
)
df_files = df_files.select(
pl.col("url"),
pl.col("date_range").list.first().cast(pl.Datetime(time_zone="UTC")).alias("start_date"),
pl.col("date_range").list.last().cast(pl.Datetime(time_zone="UTC")).alias("end_date"),
)
df_files = df_files.with_columns(
pl.struct(["start_date", "end_date"])
.map_elements(
Expand All @@ -310,7 +327,10 @@ def _get_urls(self, dataset: Enum) -> pl.Series:
.alias("interval")
)
df_files = df_files.filter(pl.col("interval").map_elements(lambda i: i.overlaps(interval)))
return df_files.get_column("url")

return df_files.select(
pl.lit(url).add(pl.col("year").cast(pl.Utf8)).add("/").add(pl.col("file")).alias("url")
).get_column("url")


class ImgwHydrologyRequest(TimeseriesRequest):
Expand Down
8 changes: 8 additions & 0 deletions wetterdienst/util/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,19 @@ def list_remote_files_fsspec(url: str, settings: Settings, ttl: CacheExpiry = Ca
@stamina.retry(on=Exception, attempts=3)
def download_file(
url: str, settings: Settings, ttl: Optional[Union[int, CacheExpiry]] = CacheExpiry.NO_CACHE
) -> BytesIO:
"""A version of _download_file that retries on failure."""
return _download_file(url=url, settings=settings, ttl=ttl)


def _download_file(
url: str, settings: Settings, ttl: Optional[Union[int, CacheExpiry]] = CacheExpiry.NO_CACHE
) -> BytesIO:
"""
A function used to download a specified file from the server.
:param url: The url to the file on the dwd server
:param settings: The settings object.
:param ttl: How long the resource should be cached.
:returns: Bytes of the file.
Expand Down

0 comments on commit 7339c1c

Please sign in to comment.