Skip to content

Commit

Permalink
Make more parameters summarizable/interpolatable and other small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
gutzbenj committed Sep 17, 2023
1 parent 110ad1a commit a00f295
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 41 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ Changelog
Development
***********

- Make parameters TEMPERATURE_AIR_MAX_200 and TEMPERATURE_AIR_MIN_200 summarizable/interpolatable

0.60.0 (16.09.2023)
*******************

Expand Down
15 changes: 10 additions & 5 deletions benchmarks/interpolation_over_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import matplotlib.pyplot as plt
import numpy as np
import polars as pl
from sklearn.feature_selection import r_regression
from sklearn.metrics import mean_squared_error

from wetterdienst import Parameter
from wetterdienst.provider.dwd.observation import (
Expand Down Expand Up @@ -37,13 +39,16 @@ def get_regular_df(parameter: str, start_date: datetime, end_date: datetime, exc
return df.filter(pl.col("station_id").eq(first_station_id))


def get_rmse(regular_values: pl.Series, interpolated_values: pl.Series):
n = regular_values.len()
return (((regular_values - interpolated_values).drop_nulls() ** 2).sum() / n) ** 0.5
def get_rmse(regular_values: pl.Series, interpolated_values: pl.Series) -> float:
return mean_squared_error(
regular_values.reshape((-1, 1)).to_list(), interpolated_values.reshape((-1, 1)).to_list(), squared=False
)


def get_corr(regular_values: pl.Series, interpolated_values: pl.Series):
return np.corrcoef(regular_values.to_list(), interpolated_values.to_list())[0][1].item()
def get_corr(regular_values: pl.Series, interpolated_values: pl.Series) -> float:
return r_regression(
regular_values.reshape((-1, 1)).to_list(), interpolated_values.reshape((-1, 1)).to_list()
).item()


def visualize(parameter: str, unit: str, regular_df: pl.DataFrame, interpolated_df: pl.DataFrame):
Expand Down
6 changes: 4 additions & 2 deletions benchmarks/interpolation_precipitation_difference.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
DwdObservationResolution,
)

LATLON = (52.52, 13.40)


def get_interpolated_df(start_date: datetime, end_date: datetime) -> pl.DataFrame:
stations = DwdObservationRequest(
Expand All @@ -16,7 +18,7 @@ def get_interpolated_df(start_date: datetime, end_date: datetime) -> pl.DataFram
start_date=start_date,
end_date=end_date,
)
return stations.interpolate(latlon=(50.0, 8.9)).df
return stations.interpolate(latlon=LATLON).df


def get_regular_df(start_date: datetime, end_date: datetime, exclude_stations: list) -> pl.DataFrame:
Expand All @@ -26,7 +28,7 @@ def get_regular_df(start_date: datetime, end_date: datetime, exclude_stations: l
start_date=start_date,
end_date=end_date,
)
request = stations.filter_by_distance(latlon=(50.0, 8.9), distance=30)
request = stations.filter_by_distance(latlon=LATLON, distance=30)
df = request.values.all().df.drop_nulls()
station_ids = df.get_column("station_id")
first_station_id = set(station_ids).difference(set(exclude_stations)).pop()
Expand Down
16 changes: 9 additions & 7 deletions benchmarks/summary_over_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ def main():
lon = 13.8470

summarized_df = get_summarized_df(start_date, end_date, lat, lon)
summarized_df["color"] = summarized_df.station_id.map(
lambda x: {"01050": "yellow", "01048": "green", "01051": "blue", "05282": "violet"}.get(x)
summarized_df = summarized_df.with_columns(
pl.col("station_id")
.map_dict({"01050": "yellow", "01048": "green", "01051": "blue", "05282": "violet"})
.alias("color")
)

regular_df_01050 = get_regular_df(start_date, end_date, "01050")
Expand All @@ -51,11 +53,11 @@ def main():

fig, ax = plt.subplots(nrows=5, tight_layout=True, sharex=True)

summarized_df.plot("date", "value", c="color", label="summarized", kind="scatter", ax=ax[0], s=5)
regular_df_01050.plot("date", "value", color="yellow", label="01050", ax=ax[1])
regular_df_01051.plot("date", "value", color="blue", label="01051", ax=ax[2])
regular_df_01048.plot("date", "value", color="green", label="01048", ax=ax[3])
regular_df_05282.plot("date", "value", color="pink", label="05282", ax=ax[4])
summarized_df.to_pandas().plot("date", "value", c="color", label="summarized", kind="scatter", ax=ax[0], s=5)
regular_df_01050.to_pandas().plot("date", "value", color="yellow", label="01050", ax=ax[1])
regular_df_01051.to_pandas().plot("date", "value", color="blue", label="01051", ax=ax[2])
regular_df_01048.to_pandas().plot("date", "value", color="green", label="01048", ax=ax[3])
regular_df_05282.to_pandas().plot("date", "value", color="pink", label="05282", ax=ax[4])

ax[0].set_ylabel(None)

Expand Down
28 changes: 16 additions & 12 deletions wetterdienst/core/timeseries/interpolate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2018-2022, earthobservations developers.
# Distributed under the MIT License. See LICENSE for more info.
import logging
from collections import defaultdict
from datetime import datetime
from enum import Enum
from functools import lru_cache
Expand All @@ -24,6 +25,17 @@

log = logging.getLogger(__name__)

INTERPOLATION_STATION_KM_LIMIT = defaultdict(
lambda: 40,
{
Parameter.TEMPERATURE_AIR_MEAN_200.name: 40,
Parameter.TEMPERATURE_AIR_MAX_200.name: 40,
Parameter.TEMPERATURE_AIR_MIN_200.name: 40,
Parameter.WIND_SPEED.name: 40,
Parameter.PRECIPITATION_HEIGHT.name: 20,
},
)


def get_interpolated_df(request: "TimeseriesRequest", latitude: float, longitude: float) -> pl.DataFrame:
utm_x, utm_y, _, _ = utm.from_latlon(latitude, longitude)
Expand All @@ -36,12 +48,11 @@ def request_stations(
) -> Tuple[dict, dict]:
param_dict = {}
stations_dict = {}
hard_distance_km_limit = 40
stations_ranked = request.filter_by_rank(latlon=(latitude, longitude), rank=20)
stations_ranked_df = stations_ranked.df.drop_nulls()

for station, result in zip(stations_ranked_df.iter_rows(named=True), stations_ranked.values.query()):
if station[Columns.DISTANCE.value] > hard_distance_km_limit:
if station[Columns.DISTANCE.value] > max(INTERPOLATION_STATION_KM_LIMIT.values()):
break

valid_station_groups_exists = not get_valid_station_groups(stations_dict, utm_x, utm_y).empty()
Expand All @@ -65,13 +76,7 @@ def apply_station_values_per_parameter(
param_dict: dict,
station: dict,
valid_station_groups_exists: bool,
):
km_limit = {
Parameter.TEMPERATURE_AIR_MEAN_200.name: 40,
Parameter.WIND_SPEED.name: 40,
Parameter.PRECIPITATION_HEIGHT.name: 20,
}

) -> None:
for parameter, dataset in stations_ranked.stations.parameter:
if parameter == dataset:
log.info("only individual parameters can be interpolated")
Expand All @@ -81,7 +86,7 @@ def apply_station_values_per_parameter(
log.info(f"parameter {parameter.name} can not be interpolated")
continue

if station["distance"] > km_limit[parameter.name]:
if station["distance"] > INTERPOLATION_STATION_KM_LIMIT[parameter.name]:
log.info(f"Station for parameter {parameter.name} is too far away")
continue

Expand Down Expand Up @@ -219,7 +224,6 @@ def apply_interpolation(
if valid_values:
first_station = list(valid_values.keys())[0]
return parameter, valid_values[first_station], stations_dict[first_station[1:]][2], [first_station[1:]]

vals = {s: v for s, v in row.items() if v is not None}
station_group_ids = get_station_group_ids(valid_station_groups, frozenset([s[1:] for s in vals.keys()]))

Expand All @@ -230,7 +234,7 @@ def apply_interpolation(
vals = None

if not vals or len(vals) < 4:
return parameter, None, None, station_group_ids
return parameter, None, None, []

xs, ys, distances = map(list, zip(*[stations_dict[station_id] for station_id in station_group_ids]))
distance_mean = sum(distances) / len(distances)
Expand Down
2 changes: 2 additions & 0 deletions wetterdienst/core/timeseries/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ def _values(self):
# - homogeneous parameters such as temperature_air_200
interpolatable_parameters = [
Parameter.TEMPERATURE_AIR_MEAN_200.name,
Parameter.TEMPERATURE_AIR_MAX_200.name,
Parameter.TEMPERATURE_AIR_MIN_200.name,
Parameter.WIND_SPEED.name,
Parameter.PRECIPITATION_HEIGHT.name,
]
Expand Down
34 changes: 20 additions & 14 deletions wetterdienst/core/timeseries/summarize.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
from collections import defaultdict
from datetime import datetime
from typing import Tuple
from enum import Enum
from typing import Optional, Tuple

import polars as pl

Expand All @@ -12,6 +14,17 @@

log = logging.getLogger(__name__)

SUMMARY_STATION_KM_LIMIT = defaultdict(
lambda: 40,
{
Parameter.TEMPERATURE_AIR_MEAN_200.name: 40,
Parameter.TEMPERATURE_AIR_MAX_200.name: 40,
Parameter.TEMPERATURE_AIR_MIN_200.name: 40,
Parameter.WIND_SPEED.name: 40,
Parameter.PRECIPITATION_HEIGHT.name: 20,
},
)


def get_summarized_df(request: "TimeseriesRequest", latitude: float, longitude: float) -> pl.DataFrame:
stations_dict, param_dict = request_stations(request, latitude, longitude)
Expand All @@ -21,13 +34,12 @@ def get_summarized_df(request: "TimeseriesRequest", latitude: float, longitude:
def request_stations(request: "TimeseriesRequest", latitude: float, longitude: float) -> Tuple[dict, dict]:
param_dict = {}
stations_dict = {}
hard_distance_km_limit = 40

stations_ranked = request.filter_by_rank(latlon=(latitude, longitude), rank=20)
stations_ranked_df = stations_ranked.df.drop_nulls()

for station, result in zip(stations_ranked_df.iter_rows(named=True), stations_ranked.values.query()):
if station[Columns.DISTANCE.value] > hard_distance_km_limit:
if station[Columns.DISTANCE.value] > max(SUMMARY_STATION_KM_LIMIT.values()):
break

# check if all parameters found enough stations and the stations build a valid station group
Expand All @@ -48,13 +60,7 @@ def apply_station_values_per_parameter(
stations_ranked: "StationsResult",
param_dict: dict,
station: dict,
):
km_limit = {
Parameter.TEMPERATURE_AIR_MEAN_200.name: 40,
Parameter.WIND_SPEED.name: 40,
Parameter.PRECIPITATION_HEIGHT.name: 20,
}

) -> None:
for parameter, dataset in stations_ranked.stations.parameter:
if parameter == dataset:
log.info("only individual parameters can be interpolated")
Expand All @@ -64,7 +70,7 @@ def apply_station_values_per_parameter(
log.info(f"parameter {parameter.name} can not be interpolated")
continue

if station["distance"] > km_limit[parameter.name]:
if station["distance"] > SUMMARY_STATION_KM_LIMIT[parameter.name]:
log.info(f"Station for parameter {parameter.name} is too far away")
continue

Expand Down Expand Up @@ -142,12 +148,12 @@ def calculate_summary(stations_dict: dict, param_dict: dict) -> pl.DataFrame:
def apply_summary(
row: dict,
stations_dict: dict,
parameter,
):
parameter: Enum,
) -> Tuple[Enum, Optional[float], Optional[float], Optional[str]]:
vals = {s: v for s, v in row.items() if v is not None}

if not vals:
return None, None, None, None
return parameter, None, None, None

value = list(vals.values())[0]
station_id = list(vals.keys())[0][1:]
Expand Down
2 changes: 1 addition & 1 deletion wetterdienst/core/timeseries/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def __init__(self, values: pl.DataFrame, station_ids: List[str] = None, extra_st

def extract_station_values(
param_data: _ParameterData, result_series_param: pl.Series, valid_station_groups_exists: bool
):
) -> None:
# Three rules:
# 1. only add further stations if not a minimum of 4 stations is reached OR
# 2. a gain of 10% of timestamps with at least 4 existing values over all stations is seen OR
Expand Down

0 comments on commit a00f295

Please sign in to comment.