Skip to content

Commit

Permalink
IMGW Hydrology: Improve speed of downloading data
Browse files Browse the repository at this point in the history
  • Loading branch information
gutzbenj committed Jan 7, 2024
1 parent 1650539 commit 41fff03
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Development
- EA Hydrology: Read json data from bytes
- Replace partial with lambda in most places
- IMGW: Use ttl of 5 minutes for caching
- IMGW Hydrology: Improve speed of downloading data

0.69.0 (18.12.2023)
*******************
Expand Down
88 changes: 49 additions & 39 deletions wetterdienst/provider/imgw/hydrology/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
import re
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
from io import BytesIO
from typing import Optional, Union

import pandas as pd
import polars as pl
from dateutil.relativedelta import relativedelta
from fsspec.implementations.zip import ZipFileSystem
from zoneinfo import ZoneInfo

from wetterdienst import Kind, Period, Provider, Resolution, Settings
from wetterdienst.core.timeseries.request import TimeseriesRequest
Expand All @@ -21,9 +23,10 @@
from wetterdienst.metadata.resolution import ResolutionType
from wetterdienst.metadata.timezone import Timezone
from wetterdienst.metadata.unit import OriginUnit, SIUnit, UnitEnum
from wetterdienst.provider.imgw.util import _try_download_file
from wetterdienst.util.cache import CacheExpiry
from wetterdienst.util.geo import convert_dms_string_to_dd
from wetterdienst.util.network import download_file, list_remote_files_fsspec
from wetterdienst.util.network import _download_file, download_file
from wetterdienst.util.parameter import DatasetTreeCore


Expand Down Expand Up @@ -140,14 +143,14 @@ def _collect_station_parameter(self, station_id: str, parameter: Enum, dataset:
:param dataset:
:return:
"""
urls = self._get_urls(dataset)
urls = self._generate_urls(dataset)
with ThreadPoolExecutor() as p:
files_in_bytes = p.map(
lambda file: download_file(url=file, settings=self.sr.settings, ttl=CacheExpiry.FIVE_MINUTES), urls
)
files_in_bytes = p.map(lambda file: _try_download_file(url=file, settings=self.sr.settings, ttl=CacheExpiry.FIVE_MINUTES), urls)
data = []
file_schema = self._file_schema[self.sr.resolution.name.lower()][dataset.name.lower()]
for file_in_bytes in files_in_bytes:
if not file_in_bytes:
continue
df = self._parse_file(file_in_bytes=file_in_bytes, station_id=station_id, file_schema=file_schema)
if not df.is_empty():
data.append(df)
Expand All @@ -163,7 +166,7 @@ def _collect_station_parameter(self, station_id: str, parameter: Enum, dataset:
pl.lit(None, dtype=pl.Float64).alias("quality"),
)

def _parse_file(self, file_in_bytes: bytes, station_id: str, file_schema: dict) -> pl.DataFrame:
def _parse_file(self, file_in_bytes: BytesIO, station_id: str, file_schema: dict) -> pl.DataFrame:
"""Function to parse hydrological zip file, parses all files and combines
them
Expand Down Expand Up @@ -227,7 +230,7 @@ def __parse_file(self, file: bytes, station_id: str, schema: dict) -> pl.DataFra
if self.sr.resolution == Resolution.DAILY:
id_vars = ["station_id", "date"]
else:
# monthly data includes extrimity column (1: min, 2: mean, 3: max)
# monthly data includes extremity column (1: min, 2: mean, 3: max)
id_vars = ["station_id", "date", "extremity"]
df = df.melt(id_vars=id_vars, variable_name="parameter", value_name="value")
df = df.with_columns(pl.col("value").cast(pl.Float64))
Expand All @@ -246,28 +249,31 @@ def __parse_file(self, file: bytes, station_id: str, schema: dict) -> pl.DataFra
)
return df

def _get_urls(self, dataset: Enum) -> pl.Series:
def _generate_urls(self, dataset: Enum) -> pl.Series:
"""Get file urls from server
:param dataset: dataset for which the filelist is retrieved
:return:
"""
res = self.sr.stations._resolution_base[self.sr.resolution.name]
url = self._endpoint.format(resolution=res.value, dataset=dataset.value)
files = list_remote_files_fsspec(url, self.sr.settings)
df_files = pl.DataFrame({"url": files})
df_files = df_files.with_columns(pl.col("url").str.split("/").list.last().alias("file"))
df_files = df_files.filter(pl.col("file").str.ends_with(".zip") & ~pl.col("file").str.starts_with("zjaw"))
if self.sr.resolution == Resolution.DAILY:
months = list(range(1, 13))
file_prefix = "codz"
else:
months = [None]
file_prefix = "mies"
start_year = 1951
end_year = dt.date.today().year - 1
files_generated = [
{"file": f"{file_prefix}_{year}{f'_{month:02d}' if month else ''}.zip", "year": year, "month": month}
for year in range(start_year, end_year + 1)
for month in months
]
df_files = pl.DataFrame(files_generated)
if self.sr.start_date:
interval = pd.Interval(pd.Timestamp(self.sr.start_date), pd.Timestamp(self.sr.end_date), closed="both")
if self.sr.resolution == Resolution.DAILY:
df_files = df_files.with_columns(
pl.col("file").str.strip_chars_end(".zip").str.split("_").list.slice(1).alias("year_month")
)
df_files = df_files.with_columns(
pl.col("year_month").list.first().cast(pl.Int64).alias("year"),
pl.col("year_month").list.last().cast(pl.Int64).alias("month"),
)
df_files = df_files.with_columns(
pl.when(pl.col("month") <= 2).then(pl.col("year") - 1).otherwise(pl.col("year")).alias("year"),
pl.when(pl.col("month").add(10).gt(12))
Expand All @@ -279,38 +285,42 @@ def _get_urls(self, dataset: Enum) -> pl.Series:
pl.struct(["year", "month"])
.map_elements(
lambda x: [
dt.datetime(x["year"], x["month"], 1),
dt.datetime(x["year"], x["month"], 1) + relativedelta(months=1) - relativedelta(days=1),
dt.datetime(x["year"], x["month"], 1, tzinfo=ZoneInfo("UTC")),
dt.datetime(x["year"], x["month"], 1, tzinfo=ZoneInfo("UTC"))
+ relativedelta(months=1)
- relativedelta(days=1),
]
)
.alias("date_range")
)
else:
df_files = df_files.with_columns(
pl.col("file")
.str.strip_chars_end(".zip")
.str.split("_")
.list.last()
.str.to_datetime("%Y", time_zone="UTC", strict=False)
pl.col("year")
.map_elements(
lambda d: [d - relativedelta(months=2), d + relativedelta(months=11) - relativedelta(days=1)]
lambda year: [
dt.datetime(
year=year,
month=1,
day=1,
tzinfo=ZoneInfo("UTC"),
)
- relativedelta(months=2),
dt.datetime(year=year, month=1, day=1, tzinfo=ZoneInfo("UTC"))
+ relativedelta(months=11)
- relativedelta(days=1),
]
)
.alias("date_range")
)
df_files = df_files.select(
pl.col("url"),
pl.col("date_range").list.first().cast(pl.Datetime(time_zone="UTC")).alias("start_date"),
pl.col("date_range").list.last().cast(pl.Datetime(time_zone="UTC")).alias("end_date"),
)
df_files = df_files.with_columns(
pl.struct(["start_date", "end_date"])
.map_elements(
lambda x: pd.Interval(pd.Timestamp(x["start_date"]), pd.Timestamp(x["end_date"]), closed="both")
df_files = df_files.filter(
pl.col("date_range").map_elements(
lambda dr: pd.Interval(*[pd.Timestamp(d) for d in dr], closed="both").overlaps(interval)
)
.alias("interval")
)
df_files = df_files.filter(pl.col("interval").map_elements(lambda i: i.overlaps(interval)))
return df_files.get_column("url")

return df_files.select(
pl.lit(url).add(pl.col("year").cast(pl.Utf8)).add("/").add(pl.col("file")).alias("url")
).get_column("url")


class ImgwHydrologyRequest(TimeseriesRequest):
Expand Down
9 changes: 6 additions & 3 deletions wetterdienst/provider/imgw/meteorology/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from wetterdienst.metadata.resolution import ResolutionType
from wetterdienst.metadata.timezone import Timezone
from wetterdienst.metadata.unit import OriginUnit, SIUnit, UnitEnum
from wetterdienst.provider.imgw.util import _try_download_file
from wetterdienst.util.cache import CacheExpiry
from wetterdienst.util.geo import convert_dms_string_to_dd
from wetterdienst.util.network import download_file, list_remote_files_fsspec
Expand Down Expand Up @@ -362,14 +363,16 @@ def _collect_station_parameter(self, station_id: str, parameter: Enum, dataset:
:param dataset:
:return:
"""
urls = self._get_urls(dataset)
urls = self._generate_urls(dataset)
with ThreadPoolExecutor() as p:
files_in_bytes = p.map(
lambda file: download_file(url=file, settings=self.sr.settings, ttl=CacheExpiry.FIVE_MINUTES), urls
lambda file: _try_download_file(url=file, settings=self.sr.settings, ttl=CacheExpiry.FIVE_MINUTES), urls
)
data = []
file_schema = self._file_schema[self.sr.resolution.name.lower()][dataset.name.lower()]
for file_in_bytes in files_in_bytes:
if not file_in_bytes:
continue
df = self._parse_file(file_in_bytes, station_id, file_schema)
if not df.is_empty():
data.append(df)
Expand Down Expand Up @@ -435,7 +438,7 @@ def __parse_file(self, file: bytes, station_id: str, schema: dict) -> pl.DataFra
df = df.melt(id_vars=["station_id", "date"], variable_name="parameter", value_name="value")
return df.with_columns(pl.col("value").cast(pl.Float64))

def _get_urls(self, dataset: Enum) -> pl.Series:
def _generate_urls(self, dataset: Enum) -> pl.Series:
"""Get file urls from server
:param dataset: dataset for which the filelist is retrieved
Expand Down
13 changes: 13 additions & 0 deletions wetterdienst/provider/imgw/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from io import BytesIO
from typing import Optional

from wetterdienst import Settings
from wetterdienst.util.cache import CacheExpiry
from wetterdienst.util.network import _download_file


def _try_download_file(url: str, settings: Settings, ttl: CacheExpiry) -> Optional[BytesIO]:
try:
return _download_file(url=url, settings=settings, ttl=ttl)
except Exception:
return None
8 changes: 8 additions & 0 deletions wetterdienst/util/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,19 @@ def list_remote_files_fsspec(url: str, settings: Settings, ttl: CacheExpiry = Ca
@stamina.retry(on=Exception, attempts=3)
def download_file(
url: str, settings: Settings, ttl: Optional[Union[int, CacheExpiry]] = CacheExpiry.NO_CACHE
) -> BytesIO:
"""A version of _download_file that retries on failure."""
return _download_file(url=url, settings=settings, ttl=ttl)


def _download_file(
url: str, settings: Settings, ttl: Optional[Union[int, CacheExpiry]] = CacheExpiry.NO_CACHE
) -> BytesIO:
"""
A function used to download a specified file from the server.
:param url: The url to the file on the dwd server
:param settings: The settings object.
:param ttl: How long the resource should be cached.
:returns: Bytes of the file.
Expand Down

0 comments on commit 41fff03

Please sign in to comment.