Increase Era5 Performance

This commit is contained in:
Tobias Hölzer 2025-10-28 20:44:00 +01:00
parent eeab8fff1e
commit ad3d7aae73
6 changed files with 352 additions and 264 deletions

3
.gitignore vendored
View file

@ -9,8 +9,9 @@ wheels/
# Virtual environments
.venv
# Data
# Data & figures
data
figures
# Editors
.vscode/

View file

@ -23,6 +23,7 @@ dependencies = [
"geopandas>=1.1.0",
"h3>=4.2.2",
"h5netcdf>=1.6.4",
"ipycytoscape>=1.3.3",
"ipykernel>=6.29.5",
"ipywidgets>=8.1.7",
"mapclassify>=2.9.0",
@ -34,6 +35,7 @@ dependencies = [
"odc-geo[all]>=0.4.10",
"opt-einsum>=3.4.0",
"pyarrow>=20.0.0",
"rechunker>=0.5.2",
"requests>=2.32.3",
"rich>=14.0.0",
"rioxarray>=0.19.0",
@ -41,6 +43,7 @@ dependencies = [
"seaborn>=0.13.2",
"smart-geocubes[gee,dask,stac,viz]>=0.0.9",
"stopuhr>=0.0.10",
"ultraplot>=1.63.0",
"xanimate",
"xarray>=2025.9.0",
"xdggs>=0.2.1",

View file

@ -4,7 +4,9 @@ import xarray as xr
from zarr.codecs import BloscCodec
def from_ds(ds: xr.Dataset, store_floats_as_float32: bool = True, include_coords: bool = True) -> dict:
def from_ds(
ds: xr.Dataset, store_floats_as_float32: bool = True, include_coords: bool = True, filter_existing: bool = True
) -> dict:
"""Create compression encoding for zarr dataset storage.
Creates Blosc compression configuration for all data variables and coordinates
@ -17,12 +19,17 @@ def from_ds(ds: xr.Dataset, store_floats_as_float32: bool = True, include_coords
include_coords (bool, optional): Whether to include coordinates in the encoding.
This is useful when appending to an existing store.
Defaults to True.
filter_existing (bool, optional): Whether to filter out variables that already have
encoding defined. Useful when appending to existing stores. Defaults to True.
Returns:
dict: Encoding dictionary with compression settings for each variable.
"""
var_names = [*ds.data_vars, *ds.coords] if include_coords else ds.data_vars
# Filter out variables which already have encoding (e.g., when appending to existing store)
if filter_existing:
var_names = [var for var in var_names if not ds[var].encoding]
encoding = {var: {"compressors": BloscCodec(cname="zstd", clevel=5)} for var in var_names}
if store_floats_as_float32:
for var in ds.data_vars:

View file

@ -1,4 +1,4 @@
# ruff: noqa: PD011
# ruff: noqa: PD003, PD011
"""Download and preprocess ERA5 data.
Variables of Interest:
@ -85,14 +85,16 @@ import odc.geo.xr
import pandas as pd
import shapely
import shapely.ops
import ultraplot as uplt
import xarray as xr
import xdggs
from rich import pretty, print, traceback
from rich.progress import track
from shapely.geometry import LineString, Polygon
from stopuhr import stopwatch
from entropice import codecs, grids, watermask
from entropice.paths import get_era5_stores
from entropice.paths import FIGURES_DIR, get_era5_stores
traceback.install(show_locals=True, suppress=[cyclopts, xr, pd, cProfile])
pretty.install()
@ -207,6 +209,7 @@ def download_daily_aggregated():
daily_raw = daily_raw.odc.assign_crs("epsg:4326")
daily_raw = daily_raw.drop_vars(["surface", "number", "depthBelowLandLayer"])
daily_raw.attrs = era5.attrs
daily_store = get_era5_stores("daily")
print(f"Saving downloaded and daily aggregated ERA5 data to {daily_store}.")
daily_raw.to_zarr(daily_store, mode="w", encoding=codecs.from_ds(daily_raw), consolidated=False)
@ -253,6 +256,8 @@ def daily_enrich():
daily_store = get_era5_stores("daily")
daily = xr.open_zarr(daily_store, consolidated=False).set_coords("spatial_ref")
assert "time" in daily.dims, f"Expected dim 'time' to be in {daily.dims=}"
# Keep track of stored variables for re-calculation and encodings
stored_vars = list(daily.data_vars)
# For better dask performance, all variables are written immediately after calculation
# The smart scheduling which could performantly handle more is according to docs not yet implemented.
@ -260,10 +265,11 @@ def daily_enrich():
def _store(v: str):
nonlocal daily
encoding = codecs.from_ds(daily[[v]], include_coords=False)
encoding = codecs.from_ds(daily[[v]], include_coords=False) if v not in stored_vars else None
print(f"Storing enriched daily variable {v} to {daily_store}...")
with stopwatch("Storing enriched daily variable"):
daily[[v]].to_zarr(daily_store, mode="a", encoding=encoding, consolidated=False)
stored_vars.append(v)
daily = xr.open_zarr(daily_store, consolidated=False).set_coords("spatial_ref")
# Formulas based on Groeke et. al. (2025) Stochastic Weather generation...
@ -275,9 +281,10 @@ def daily_enrich():
daily.t2m_range.attrs = {"long_name": "Daily range of 2 metre temperature", "units": "K"}
_store("t2m_range")
daily["t2m_skew"] = (daily.t2m_mean - daily.t2m_min) / daily.t2m_range
daily.t2m_skew.attrs = {"long_name": "Daily skewness of 2 metre temperature"}
_store("t2m_skew")
with np.errstate(invalid="ignore"):
daily["t2m_skew"] = (daily.t2m_mean - daily.t2m_min) / daily.t2m_range
daily.t2m_skew.attrs = {"long_name": "Daily skewness of 2 metre temperature"}
_store("t2m_skew")
daily["thawing_degree_days"] = (daily.t2m_avg - 273.15).clip(min=0)
daily.thawing_degree_days.attrs = {"long_name": "Thawing degree days", "units": "K"}
@ -287,19 +294,19 @@ def daily_enrich():
daily.freezing_degree_days.attrs = {"long_name": "Freezing degree days", "units": "K"}
_store("freezing_degree_days")
daily["thawing_days"] = (daily.t2m_avg > 273.15).astype(int)
daily["thawing_days"] = (daily.t2m_avg > 273.15).where(~daily.t2m_avg.isnull())
daily.thawing_days.attrs = {"long_name": "Thawing days"}
_store("thawing_days")
daily["freezing_days"] = (daily.t2m_avg < 273.15).astype(int)
daily["freezing_days"] = (daily.t2m_avg < 273.15).where(~daily.t2m_avg.isnull())
daily.freezing_days.attrs = {"long_name": "Freezing days"}
_store("freezing_days")
daily["precipitation_occurrences"] = (daily.tp > 0.001).astype(int)
daily["precipitation_occurrences"] = (daily.tp > 0.001).where(~daily.tp.isnull())
daily.precipitation_occurrences.attrs = {"long_name": "Precipitation occurrences"}
_store("precipitation_occurrences")
daily["snowfall_occurrences"] = (daily.sf > 0.001).astype(int)
daily["snowfall_occurrences"] = (daily.sf > 0.001).where(~daily.sf.isnull())
daily.snowfall_occurrences.attrs = {"long_name": "Snowfall occurrences"}
_store("snowfall_occurrences")
@ -308,6 +315,14 @@ def daily_enrich():
_store("naive_snow_isolation")
def _load_daily():
daily_store = get_era5_stores("daily")
daily = xr.open_zarr(daily_store, consolidated=False, chunks={"latitude": -1, "longitude": 640})
daily = daily.set_coords("spatial_ref")
assert "time" in daily.dims, f"Expected dim 'time' to be in {daily.dims=}"
return daily
def monthly_aggregate():
"""Aggregate enriched daily ERA5 data to monthly resolution.
@ -319,230 +334,206 @@ def monthly_aggregate():
The aggregated monthly data is saved to a zarr file for further processing.
"""
daily_store = get_era5_stores("daily")
daily = xr.open_zarr(daily_store, consolidated=False).set_coords("spatial_ref")
assert "time" in daily.dims, f"Expected dim 'time' to be in {daily.dims=}"
daily = _load_daily()
daily = daily.sel(time=slice(min_time, max_time))
# Monthly instant aggregates
monthly_instants = []
for var in instants:
if var.endswith("_min"):
agg = daily[var].resample(time="1ME").min().rename(var)
agg.attrs = daily[var].attrs
agg.attrs["long_name"] = f"Monthly minimum of {daily[var].attrs.get('long_name', var)}"
monthly_instants.append(agg)
elif var.endswith("_max"):
agg = daily[var].resample(time="1ME").max().rename(var)
agg.attrs = daily[var].attrs
agg.attrs["long_name"] = f"Monthly maximum of {daily[var].attrs.get('long_name', var)}"
monthly_instants.append(agg)
else:
agg = daily[var].resample(time="1ME").median().rename(var)
agg.attrs = daily[var].attrs
agg.attrs["long_name"] = f"Monthly median of {daily[var].attrs.get('long_name', var)}"
monthly_instants.append(agg)
monthly = (
xr.Dataset(
{},
coords={
"time": daily.time.resample(time="1MS").last(),
"latitude": daily.latitude,
"longitude": daily.longitude,
},
attrs=daily.attrs,
)
.odc.assign_crs("epsg:4326")
.chunk({"time": 12, "latitude": -1, "longitude": 640})
)
monthly_accums = []
for var in accums:
agg = daily[var].resample(time="1ME").sum().rename(var)
agg.attrs = daily[var].attrs
monthly_accums.append(agg)
monthly = xr.merge(monthly_instants + monthly_accums)
monthly = monthly.chunk({"time": len(monthly.time), "latitude": 256, "longitude": 256}) # ~ 100Mb / chunk for f32
monthly_store = get_era5_stores("monthly")
print(f"Saving monthly aggregated ERA5 data to {monthly_store}.")
print(f"Saving empty monthly ERA5 data to {monthly_store}.")
monthly.to_zarr(monthly_store, mode="w", encoding=codecs.from_ds(monthly), consolidated=False)
# Monthly instant aggregates
for var in instants | accums:
if var in accums:
monthly[var] = daily[var].resample(time="1MS").sum(skipna=False).rename(var)
elif var.endswith("_min"):
monthly[var] = daily[var].resample(time="1MS").min().rename(var)
elif var.endswith("_max"):
monthly[var] = daily[var].resample(time="1MS").max().rename(var)
else:
monthly[var] = daily[var].resample(time="1MS").median().rename(var)
def multi_monthly_aggregate(monthly: xr.Dataset, n: int = 12) -> xr.Dataset:
monthly[var].attrs = daily[var].attrs
if var in daily or var.endswith(("_min", "_max")):
monthly[var].attrs["long_name"] = monthly[var].attrs.get("long_name", "").replace("Daily", "Monthly")
else:
monthly[var].attrs["long_name"] = f"Monthly median of {monthly[var].attrs.get('long_name', var)}"
# Similar storing logic as in daily
encoding = codecs.from_ds(monthly[[var]], include_coords=False)
print(f"Storing enriched monthly variable {var} to {monthly_store}...")
with stopwatch("Storing enriched monthly variable"):
monthly[[var]].chunk({"time": 12, "latitude": -1, "longitude": 640}).to_zarr(
monthly_store, mode="a", encoding=encoding, consolidated=False
)
monthly = xr.open_zarr(monthly_store, consolidated=False).set_coords("spatial_ref")
def multi_monthly_aggregate(agg: Literal["yearly", "seasonal", "shoulder"] = "yearly"):
"""Aggregate monthly ERA5 data to a multi-month resolution.
Takes monthly aggregated data and creates multi-month aggregates using a shifted
calendar (October to September) to better capture Arctic seasonal patterns.
Args:
monthly (xr.Dataset): The monthly aggregates
n (int, optional): Number of months to aggregate over. Defaults to 12.
agg (Literal["yearly", "seasonal", "shoulder"], optional): The type of multi-month aggregation.
Returns:
xr.Dataset: The aggregated dataset
"""
# Instants
multimonthly_instants = []
for var in instants:
if var.endswith("_min"):
agg = monthly[var].resample(time=f"{n}MS", label="right").min().rename(var)
agg.attrs = monthly[var].attrs
agg.attrs["long_name"] = f"{n}-Monthly minimum of {monthly[var].attrs.get('long_name', var)}"
multimonthly_instants.append(agg)
n = {"yearly": 12, "seasonal": 6, "shoulder": 3}[agg]
daily = _load_daily()
# "Shift" the calendar by slicing the first Jan-Sep and the last Oct-Dec months
first_year = daily.time.dt.year.min().item()
last_year = daily.time.dt.year.max().item()
daily = daily.sel(time=slice(f"{first_year}-10-01", f"{last_year}-09-30"))
chunks = {"time": 3, "latitude": -1, "longitude": 640}
multimonthly = (
xr.Dataset(
{},
coords={
"time": daily.resample(time=f"{n}MS").last().time,
"latitude": daily.latitude,
"longitude": daily.longitude,
},
attrs=daily.attrs,
)
.odc.assign_crs("epsg:4326")
.chunk(chunks)
)
multimonthly_store = get_era5_stores(agg)
print(f"Saving empty multi-monthly ERA5 data to {multimonthly_store}.")
multimonthly.to_zarr(multimonthly_store, mode="w", encoding=codecs.from_ds(multimonthly), consolidated=False)
def _store(var):
nonlocal multimonthly
# Similar storing logic as in daily
encoding = codecs.from_ds(multimonthly[[var]], include_coords=False)
print(f"Storing enriched multi-monthly variable {var} to {multimonthly_store}...")
with stopwatch("Storing enriched multi-monthly variable"):
multimonthly[[var]].chunk(chunks).to_zarr(
multimonthly_store, mode="a", encoding=encoding, consolidated=False
)
multimonthly = xr.open_zarr(multimonthly_store, consolidated=False).set_coords("spatial_ref")
# Multi-Monthly instant aggregates
for var in instants | accums:
if var in accums:
multimonthly[var] = daily[var].resample(time=f"{n}MS").sum(skipna=False).rename(var)
elif var.endswith("_min"):
multimonthly[var] = daily[var].resample(time=f"{n}MS").min().rename(var)
elif var.endswith("_max"):
agg = monthly[var].resample(time=f"{n}MS", label="right").max().rename(var)
agg.attrs = monthly[var].attrs
agg.attrs["long_name"] = f"{n}-Monthly maximum of {monthly[var].attrs.get('long_name', var)}"
multimonthly_instants.append(agg)
multimonthly[var] = daily[var].resample(time=f"{n}MS").max().rename(var)
else:
agg = monthly[var].resample(time=f"{n}MS", label="right").median().rename(var)
agg.attrs = monthly[var].attrs
agg.attrs["long_name"] = f"{n}-Monthly median of {monthly[var].attrs.get('long_name', var)}"
multimonthly_instants.append(agg)
multimonthly[var] = daily[var].resample(time=f"{n}MS").median().rename(var)
# Accums
multimonthly_accums = []
for var in accums:
agg = monthly[var].resample(time=f"{n}MS", label="right").sum().rename(var)
agg.attrs = monthly[var].attrs
multimonthly_accums.append(agg)
multimonthly = xr.merge(multimonthly_instants + multimonthly_accums)
multimonthly[var].attrs = daily[var].attrs
if var in accums or var.endswith(("_min", "_max")):
multimonthly[var].attrs["long_name"] = (
multimonthly[var].attrs.get("long_name", "").replace("Daily", agg.capitalize())
)
else:
multimonthly[var].attrs["long_name"] = (
f"{agg.capitalize()} median of {multimonthly[var].attrs.get('long_name', var)}"
)
_store(var)
# Effective snow depth
m = np.resize(np.arange(1, n + 1), len(monthly.time))
m = xr.DataArray(m, coords={"time": monthly.time}, dims=["time"])
n_sum = n * (n + 1) // 2
multimonthly["effective_snow_depth"] = (monthly.sde_mean * (n + 1 - m)).resample(time=f"{n}MS").sum().rename(
"effective_snow_depth"
) / n_sum
m = (
daily.time.resample(time=f"{n}MS")
.map(lambda x: np.arange(1, x.count() + 1))
.assign_coords({"time": daily.time})
)
nd = (
daily.time.resample(time=f"{n}MS")
.map(lambda x: np.ones(x.count().item()) * x.count().item())
.assign_coords({"time": daily.time})
)
n_sum = (nd * (nd + 1) // 2).resample(time=f"{n}MS").sum()
efd = (daily.sde_mean * (nd + 1 - m)).resample(time=f"{n}MS").sum(skipna=False) / n_sum
multimonthly["effective_snow_depth"] = efd.rename("effective_snow_depth")
multimonthly["effective_snow_depth"].attrs = {
"long_name": "Effective Snow Density",
"reference": "Slater et. al. (2017)",
"link": "https://tc.copernicus.org/articles/11/989/2017/tc-11-989-2017.pdf",
}
multimonthly = multimonthly.chunk(
{"time": len(multimonthly.time), "latitude": 128, "longitude": 1024}
) # ~36Mb / chunk for f64
return multimonthly
_store("effective_snow_depth")
def derive_yearly_variables(yearly: xr.Dataset) -> xr.Dataset:
"""Derive additional variables from daily data and add them to the yearly dataset.
Args:
yearly (xr.Dataset): The yearly aggregated dataset to enrich.
Returns:
xr.Dataset: The enriched yearly dataset with additional derived variables.
"""
assert "time" in yearly.dims, f"Expected dim 'time' to be in {yearly.dims=}"
daily_store = get_era5_stores("daily")
daily = xr.open_zarr(daily_store, consolidated=False).set_coords("spatial_ref")
assert "time" in daily.dims, f"Expected dim 'time' to be in {daily.dims=}"
daily = daily.sel(time=slice(min_time, max_time))
# ? Note: The functions do not really account for leap years
# n_days_in_year = daily.time.groupby("time.year").count().rename("n_days_in_year")
n_days_in_year = 365
# A mask to check which places never thaws
# Persist in memory because we need it twice and this dramatically reduces the Dask-Graph size
never_thaws = (daily.thawing_days.groupby("time.year").sum(dim="time") == 0).compute()
# ? First and last thaw day is NOT calculated within the october-september year, but within the calendar year
# This results in a much more correct representation of thawing periods in regions where the last thawing day
# is between october and december.
# This assumes that the 01-01 is almost everywhere one of the coldest days in the year
first_thaw_day = daily.thawing_days.groupby("time.year").apply(lambda x: x.argmax(dim="time")) + 1
first_thaw_day = first_thaw_day.where(~never_thaws).rename("day_of_first_thaw").rename(year="time")
first_thaw_day["time"] = pd.to_datetime([f"{y}-10-01" for y in first_thaw_day.time.values])
first_thaw_day.attrs = {"long_name": "Day of first thaw in year", "units": "day of year"}
yearly["day_of_first_thaw"] = first_thaw_day.sel(time=yearly.time)
last_thaw_day = (
n_days_in_year - daily.thawing_days[::-1].groupby("time.year").apply(lambda x: x.argmax(dim="time")) + 1
)
last_thaw_day = last_thaw_day.where(~never_thaws).rename("day_of_last_thaw").rename(year="time")
last_thaw_day["time"] = pd.to_datetime([f"{y}-10-01" for y in last_thaw_day.time.values])
last_thaw_day.attrs = {"long_name": "Day of last thaw in year", "units": "day of year"}
yearly["day_of_last_thaw"] = last_thaw_day.sel(time=yearly.time)
yearly["thawing_period_length"] = (yearly.day_of_last_thaw - yearly.day_of_first_thaw).rename(
"thawing_period_length"
)
yearly.thawing_period_length.attrs = {"long_name": "Thawing period length in year", "units": "days"}
# ? First and last freeze day is NOT calculated within the october-september year, but within an july-june year
# This results, similar to the thawing days, in a much more correct representation of freezing periods in regions
# where the first freezing day is between july and september.
# This assumes that the 01-07 is almost everywhere one of the warmest days in the year
daily_shifted = daily.copy()
daily_shifted["time"] = pd.to_datetime(daily_shifted.time.values) + pd.DateOffset(months=6)
# A mask to check which places never freeze
# Persist in memory because we need it twice and this dramatically reduces the Dask-Graph size
never_freezes = (daily_shifted.freezing_days.groupby("time.year").sum(dim="time") == 0).compute()
first_freezing_day = daily_shifted.freezing_days.groupby("time.year").apply(lambda x: x.argmax(dim="time")) + 1
first_freezing_day = first_freezing_day.where(~never_freezes).rename("day_of_first_freeze").rename(year="time")
first_freezing_day["time"] = pd.to_datetime([f"{y}-10-01" for y in first_freezing_day.time.values])
first_freezing_day.attrs = {"long_name": "Day of first freeze in year", "units": "day of year"}
yearly["day_of_first_freeze"] = first_freezing_day.sel(time=yearly.time)
last_freezing_day = (
n_days_in_year
- daily_shifted.freezing_days[::-1].groupby("time.year").apply(lambda x: x.argmax(dim="time"))
+ 1
)
last_freezing_day = last_freezing_day.where(~never_freezes).rename("day_of_last_freeze").rename(year="time")
last_freezing_day["time"] = pd.to_datetime([f"{y}-10-01" for y in last_freezing_day.time.values])
last_freezing_day.attrs = {"long_name": "Day of last freeze in year", "units": "day of year"}
yearly["day_of_last_freeze"] = last_freezing_day.sel(time=yearly.time)
yearly["freezing_period_length"] = (yearly.day_of_last_freeze - yearly.day_of_first_freeze).rename(
"freezing_period_length"
)
yearly.freezing_period_length.attrs = {"long_name": "Freezing period length in year", "units": "days"}
return yearly
def _get_first_day(x):
return x.argmax(dim="time") + 1
def yearly_and_seasonal_aggregate():
"""Aggregate monthly ERA5 data to yearly resolution with seasonal splits.
Takes monthly aggregated data and creates yearly aggregates using a shifted
calendar (October to September) to better capture Arctic seasonal patterns.
Creates separate aggregates for full year, winter (Oct-Apr), and summer
(May-Sep) periods.
The first and last incomplete years are excluded from the analysis.
Winter months are defined as months 1-7 in the shifted calendar,
and summer months are 8-12.
The final dataset includes yearly, winter, and summer aggregates for all
climate variables, saved to a zarr file.
"""
monthly_store = get_era5_stores("monthly")
monthly = xr.open_zarr(monthly_store, consolidated=False).set_coords("spatial_ref")
assert "time" in monthly.dims, f"Expected dim 'time' to be in {monthly.dims=}"
# "Shift" the calendar by slicing the first Jan-Sep and the last Oct-Dec months
first_year = monthly.time.dt.year.min().item()
last_year = monthly.time.dt.year.max().item()
monthly = monthly.sel(time=slice(f"{first_year}-10-01", f"{last_year}-09-30"))
yearly = multi_monthly_aggregate(monthly, n=12)
yearly = derive_yearly_variables(yearly)
yearly = yearly.chunk({"time": len(yearly.time), "latitude": 256, "longitude": 1024}) # ~36Mb / chunk for f32
def _store_in_yearly(da, var):
yearly_store = get_era5_stores("yearly")
print(f"Saving yearly aggregated ERA5 data to {yearly_store}.")
yearly.to_zarr(yearly_store, mode="w", encoding=codecs.from_ds(yearly), consolidated=False)
yearly = xr.open_zarr(yearly_store, consolidated=False).set_coords("spatial_ref")
assert "time" in yearly.dims, f"Expected dim 'time' to be in {yearly.dims=}"
summer_winter = multi_monthly_aggregate(monthly, n=6)
da["time"] = yearly.time
yearly[var] = da.rename(var)
summer = summer_winter.sel(time=summer_winter.time.dt.month == 4)
summer = summer.chunk({"time": len(summer.time), "latitude": 256, "longitude": 1024}) # ~36Mb / chunk for f32
summer_store = get_era5_stores("summer")
print(f"Saving summer aggregated ERA5 data to {summer_store}.")
summer.to_zarr(summer_store, mode="w", encoding=codecs.from_ds(summer), consolidated=False)
# Similar storing logic as in daily
encoding = codecs.from_ds(yearly[[var]], include_coords=False)
chunks = {"time": 20, "latitude": -1, "longitude": 640}
print(f"Storing enriched yearly variable {var} to {yearly_store}...")
with stopwatch("Storing enriched yearly variable"):
yearly[[var]].chunk(chunks).to_zarr(yearly_store, mode="a", encoding=encoding, consolidated=False)
winter = summer_winter.sel(time=summer_winter.time.dt.month == 10)
winter = winter.chunk({"time": len(winter.time), "latitude": 256, "longitude": 1024}) # ~36Mb / chunk for f32
winter_store = get_era5_stores("winter")
print(f"Saving winter aggregated ERA5 data to {winter_store}.")
winter.to_zarr(winter_store, mode="w", encoding=codecs.from_ds(winter), consolidated=False)
def yearly_thaw_periods():
daily = _load_daily()
first_year = daily.time.dt.year.min().item() + 1
last_year = daily.time.dt.year.max().item()
daily = daily.sel(time=slice(f"{first_year}-01-01", f"{last_year}-12-31"))
never_thaws = (daily.thawing_days.resample(time="12MS").sum(dim="time") == 0).persist()
first_thaw_day = daily.thawing_days.resample(time="12MS").map(_get_first_day).where(~never_thaws)
first_thaw_day.attrs = {"long_name": "Day of first thaw in year", "units": "day of year"}
_store_in_yearly(first_thaw_day, "day_of_first_thaw")
n_days_in_year = xr.where(366, 365, daily.time[::-1].dt.is_leap_year).resample(time="12MS").first()
last_thaw_day = n_days_in_year - daily.thawing_days[::-1].resample(time="12MS").map(_get_first_day).where(
~never_thaws
)
last_thaw_day.attrs = {"long_name": "Day of last thaw in year", "units": "day of year"}
_store_in_yearly(last_thaw_day, "day_of_last_thaw")
def yearly_freeze_periods():
daily = _load_daily()
first_year = daily.time.dt.year.min().item()
last_year = daily.time.dt.year.max().item() - 1
daily = daily.sel(time=slice(f"{first_year}-07-01", f"{last_year}-06-30"))
never_freezes = (daily.freezing_days.resample(time="12MS").sum(dim="time") == 0).persist()
first_freeze_day = daily.freezing_days.resample(time="12MS").map(_get_first_day).where(~never_freezes)
first_freeze_day.attrs = {"long_name": "Day of first freeze in year", "units": "day of year"}
_store_in_yearly(first_freeze_day, "day_of_first_freeze")
n_days_in_year = xr.where(366, 365, daily.time[::-1].dt.is_leap_year).resample(time="12MS").last()
last_freeze_day = n_days_in_year - daily.freezing_days[::-1].resample(time="12MS").map(_get_first_day).where(
~never_freezes
)
last_freeze_day.attrs = {"long_name": "Day of last freeze in year", "units": "day of year"}
_store_in_yearly(last_freeze_day, "day_of_last_freeze")
@cli.command
@ -571,8 +562,48 @@ def enrich(n_workers: int = 10, monthly: bool = True, yearly: bool = True, daily
if monthly:
monthly_aggregate()
if yearly:
yearly_and_seasonal_aggregate()
multi_monthly_aggregate(agg="shoulder")
multi_monthly_aggregate(agg="seasonal")
multi_monthly_aggregate(agg="yearly")
# yearly_thaw_periods()
# yearly_freeze_periods()
print("Enriched ERA5 data with additional features and aggregated it temporally.")
stopwatch.summary()
@cli.command
def viz(agg: Literal["daily", "monthly", "yearly", "summer", "winter"]):
"""Visualize a small overview of ERA5 variables for a given aggregation.
Args:
agg (Literal["daily", "monthly", "yearly", "summer", "winter"]):
Aggregation identifier used to locate the appropriate ERA5 Zarr store via
get_era5_stores. Determines which dataset is opened and visualized.
Example:
>>> # produce and save an overview for monthly ERA5 data
>>> viz("monthly")
"""
store = get_era5_stores(agg)
ds = xr.open_zarr(store, consolidated=False).set_coords("spatial_ref")
tis = [0, 1, -2, -1]
ts = [str(ds.time.isel(time=t).values)[:10] for t in tis]
vnames = [ds[var].attrs.get("long_name", var) for var in ds.data_vars]
vunits = [ds[var].attrs.get("units", "") for var in ds.data_vars]
vlabels = [f"{name} [{unit}]" if unit else name for name, unit in zip(vnames, vunits)]
fig, axs = uplt.subplots(ncols=len(tis), nrows=len(vlabels), proj="npaeqd", share=0)
axs.format(boundinglat=50, coast=True, toplabels=ts, leftlabels=vlabels)
for t in tis:
for i, var in enumerate(ds.data_vars):
subset = ds[var].isel(time=t).load()
m = axs[i, t].pcolormesh(subset, cmap="viridis")
axs[i, t].colorbar(m, loc="ll", label=var)
fig.format(suptitle="ERA5 Data")
(FIGURES_DIR / "era5").mkdir(parents=True, exist_ok=True)
fig.savefig(FIGURES_DIR / "era5" / f"{agg}_overview_unaligned.png")
# ===========================
@ -686,6 +717,7 @@ def _create_aligned(
aligned[var].attrs = ds[var].attrs
aligned = aligned.chunk({"cell_ids": min(len(aligned.cell_ids), 10000), "time": len(aligned.time)})
aligned = xdggs.decode(aligned)
return aligned
@ -708,69 +740,45 @@ def spatial_agg(
grid_gdf = grids.open(grid, level)
# ? Mask out water, since we don't want to aggregate over oceans
grid_gdf = watermask.clip_grid(grid_gdf)
grid_gdf = grid_gdf.to_crs("epsg:4326")
summer_unaligned_store = get_era5_stores("summer")
winter_unaligned_store = get_era5_stores("winter")
yearly_unaligned_store = get_era5_stores("yearly")
with stopwatch("Loading summer ERA5 data"):
summer_unaligned = xr.open_zarr(summer_unaligned_store, consolidated=False).set_coords("spatial_ref")
assert {"latitude", "longitude", "time"} == set(summer_unaligned.dims)
assert summer_unaligned.odc.crs == "epsg:4326", f"Expected CRS 'epsg:4326', got {summer_unaligned.odc.crs}"
summer_unaligned = _correct_longs(summer_unaligned)
with stopwatch("Loading winter ERA5 data"):
winter_unaligned = xr.open_zarr(winter_unaligned_store, consolidated=False).set_coords("spatial_ref")
assert {"latitude", "longitude", "time"} == set(winter_unaligned.dims)
assert winter_unaligned.odc.crs == "epsg:4326", f"Expected CRS 'epsg:4326', got {winter_unaligned.odc.crs}"
winter_unaligned = _correct_longs(winter_unaligned)
with stopwatch("Loading yearly ERA5 data"):
yearly_unaligned = xr.open_zarr(yearly_unaligned_store, consolidated=False).set_coords("spatial_ref")
assert {"latitude", "longitude", "time"} == set(yearly_unaligned.dims)
assert yearly_unaligned.odc.crs == "epsg:4326", f"Expected CRS 'epsg:4326', got {yearly_unaligned.odc.crs}"
yearly_unaligned = _correct_longs(yearly_unaligned)
# Precompute the geometries to clip later
daily_store = get_era5_stores("daily")
daily_unaligned = xr.open_zarr(daily_store, consolidated=False).set_coords("spatial_ref")
assert {"latitude", "longitude", "time"} == set(daily_unaligned.dims)
assert daily_unaligned.odc.crs == "epsg:4326", f"Expected CRS 'epsg:4326', got {daily_unaligned.odc.crs}"
daily_unaligned = _correct_longs(daily_unaligned)
summer_data = {
var: np.full((len(grid_gdf), len(summer_unaligned.time)), np.nan, dtype=np.float32)
for var in summer_unaligned.data_vars
}
winter_data = {
var: np.full((len(grid_gdf), len(winter_unaligned.time)), np.nan, dtype=np.float32)
for var in winter_unaligned.data_vars
}
yearly_data = {
var: np.full((len(grid_gdf), len(yearly_unaligned.time)), np.nan, dtype=np.float32)
for var in yearly_unaligned.data_vars
}
for i, (_, row) in track(
enumerate(grid_gdf.to_crs("epsg:4326").iterrows()),
total=len(grid_gdf),
description="Spatially aggregating ERA5 data...",
):
geoms = _get_corrected_geoms(row.geometry, summer_unaligned.odc.geobox)
if len(geoms) == 0:
print(f"Warning: No valid geometry for cell {row.cell_id}.")
continue
cell_geometries = [_get_corrected_geoms(row.geometry, daily_unaligned.odc.geobox) for _, row in grid_gdf.iterrows()]
cell_data = extract_cell_data(summer_unaligned, geoms)
for var in summer_unaligned.data_vars:
summer_data[var][i, :] = cell_data[var].values.astype()
cell_data = extract_cell_data(winter_unaligned, geoms)
for var in winter_unaligned.data_vars:
winter_data[var][i, :] = cell_data[var].values
cell_data = extract_cell_data(yearly_unaligned, geoms)
for var in yearly_unaligned.data_vars:
yearly_data[var][i, :] = cell_data[var].values
for agg in ["summer", "winter", "yearly"]:
unaligned_store = get_era5_stores(agg)
with stopwatch(f"Loading {agg} ERA5 data"):
unaligned = xr.open_zarr(unaligned_store, consolidated=False).set_coords("spatial_ref")
assert {"latitude", "longitude", "time"} == set(unaligned.dims)
assert unaligned.odc.crs == "epsg:4326", f"Expected CRS 'epsg:4326', got {unaligned.odc.crs}"
unaligned = _correct_longs(unaligned)
summer = _create_aligned(summer_unaligned, summer_data, grid, level)
winter = _create_aligned(winter_unaligned, winter_data, grid, level)
yearly = _create_aligned(yearly_unaligned, yearly_data, grid, level)
data = {
var: np.full((len(grid_gdf), len(unaligned.time)), np.nan, dtype=np.float32) for var in unaligned.data_vars
}
for i, geoms in track(
enumerate(cell_geometries),
total=len(grid_gdf),
description=f"Spatially aggregating {agg} ERA5 data...",
):
if len(geoms) == 0:
print(f"Warning: No valid geometry for cell {grid_gdf.iloc[i].cell_id}.")
continue
summer_store = get_era5_stores("summer", grid, level)
winter_store = get_era5_stores("winter", grid, level)
yearly_store = get_era5_stores("yearly", grid, level)
summer.to_zarr(summer_store, mode="w", consolidated=False, encoding=codecs.from_ds(summer))
winter.to_zarr(winter_store, mode="w", consolidated=False, encoding=codecs.from_ds(winter))
yearly.to_zarr(yearly_store, mode="w", consolidated=False, encoding=codecs.from_ds(yearly))
print("Finished spatial matching.")
cell_data = extract_cell_data(unaligned, geoms)
for var in unaligned.data_vars:
data[var][i, :] = cell_data[var].values
aggregated = _create_aligned(unaligned, data, grid, level)
store = get_era5_stores(agg, grid, level)
aggregated.to_zarr(store, mode="w", consolidated=False, encoding=codecs.from_ds(aggregated))
print(f"Finished spatial matching for {agg} data.")
stopwatch.summary()

View file

@ -8,7 +8,7 @@ from typing import Literal
DATA_DIR = Path(os.environ.get("DATA_DIR", "data")).resolve() / "entropice"
GRIDS_DIR = DATA_DIR / "grids"
FIGURES_DIR = DATA_DIR / "figures"
FIGURES_DIR = Path("figures")
DARTS_DIR = DATA_DIR / "darts"
ERA5_DIR = DATA_DIR / "era5"
EMBEDDINGS_DIR = DATA_DIR / "embeddings"
@ -63,12 +63,13 @@ def get_embeddings_store(grid: Literal["hex", "healpix"], level: int) -> Path:
def get_era5_stores(
agg: Literal["daily", "monthly", "summer", "winter", "yearly"],
agg: Literal["daily", "monthly", "summer", "winter", "yearly", "seasonal", "shoulder"] = "daily",
grid: Literal["hex", "healpix"] | None = None,
level: int | None = None,
):
if grid is None or level is None:
return ERA5_DIR / f"{agg}_climate_unaligned.zarr"
(ERA5_DIR / "intermediate").mkdir(parents=True, exist_ok=True)
return ERA5_DIR / "intermediate" / f"{agg}_climate.zarr"
gridname = _get_gridname(grid, level)
aligned_path = ERA5_DIR / f"{gridname}_{agg}_climate.zarr"

68
uv.lock generated
View file

@ -1022,6 +1022,10 @@ complete = [
{ name = "pandas" },
{ name = "pyarrow" },
]
diagnostics = [
{ name = "bokeh" },
{ name = "jinja2" },
]
distributed = [
{ name = "distributed" },
]
@ -1220,6 +1224,7 @@ dependencies = [
{ name = "geopandas" },
{ name = "h3" },
{ name = "h5netcdf" },
{ name = "ipycytoscape" },
{ name = "ipykernel" },
{ name = "ipywidgets" },
{ name = "mapclassify" },
@ -1231,6 +1236,7 @@ dependencies = [
{ name = "odc-geo", extra = ["all"] },
{ name = "opt-einsum" },
{ name = "pyarrow" },
{ name = "rechunker" },
{ name = "requests" },
{ name = "rich" },
{ name = "rioxarray" },
@ -1238,6 +1244,7 @@ dependencies = [
{ name = "seaborn" },
{ name = "smart-geocubes", extra = ["dask", "gee", "stac", "viz"] },
{ name = "stopuhr" },
{ name = "ultraplot" },
{ name = "xanimate" },
{ name = "xarray" },
{ name = "xdggs" },
@ -1264,6 +1271,7 @@ requires-dist = [
{ name = "geopandas", specifier = ">=1.1.0" },
{ name = "h3", specifier = ">=4.2.2" },
{ name = "h5netcdf", specifier = ">=1.6.4" },
{ name = "ipycytoscape", specifier = ">=1.3.3" },
{ name = "ipykernel", specifier = ">=6.29.5" },
{ name = "ipywidgets", specifier = ">=8.1.7" },
{ name = "mapclassify", specifier = ">=2.9.0" },
@ -1275,6 +1283,7 @@ requires-dist = [
{ name = "odc-geo", extras = ["all"], specifier = ">=0.4.10" },
{ name = "opt-einsum", specifier = ">=3.4.0" },
{ name = "pyarrow", specifier = ">=20.0.0" },
{ name = "rechunker", specifier = ">=0.5.2" },
{ name = "requests", specifier = ">=2.32.3" },
{ name = "rich", specifier = ">=14.0.0" },
{ name = "rioxarray", specifier = ">=0.19.0" },
@ -1282,6 +1291,7 @@ requires-dist = [
{ name = "seaborn", specifier = ">=0.13.2" },
{ name = "smart-geocubes", extras = ["gee", "dask", "stac", "viz"], specifier = ">=0.0.9" },
{ name = "stopuhr", specifier = ">=0.0.10" },
{ name = "ultraplot", specifier = ">=1.63.0" },
{ name = "xanimate", git = "https://github.com/davbyr/xAnimate" },
{ name = "xarray", specifier = ">=2025.9.0" },
{ name = "xdggs", specifier = ">=0.2.1" },
@ -1970,6 +1980,19 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/cb/bd/b394387b598ed84d8d0fa90611a90bee0adc2021820ad5729f7ced74a8e2/imageio-2.37.0-py3-none-any.whl", hash = "sha256:11efa15b87bc7871b61590326b2d635439acc321cf7f8ce996f812543ce10eed", size = 315796 },
]
[[package]]
name = "ipycytoscape"
version = "1.3.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "ipywidgets" },
{ name = "spectate" },
]
sdist = { url = "https://files.pythonhosted.org/packages/21/4b/dca529aa566ce225107580c6c8625c7dc5ecb1532f7d73259e2888d2187a/ipycytoscape-1.3.3.tar.gz", hash = "sha256:b6f3199df034f088e92d388e27e629f58ae2901b213cb9299e5b564272f9a2f8", size = 3885550 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/4c/0f/b66d63d4a5426c09005d3713b056e634e00e69788fdc88d1ffe40e5b7654/ipycytoscape-1.3.3-py2.py3-none-any.whl", hash = "sha256:4bc205724971f5f7a3fc2b09dfec20c357c4c6dfa2b4bd41e7c33c995c3f6906", size = 3595634 },
]
[[package]]
name = "ipyevents"
version = "2.0.4"
@ -2675,6 +2698,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/93/cf/be4e93afbfa0def2cd6fac9302071db0bd6d0617999ecbf53f92b9398de3/multiurl-0.3.7-py3-none-any.whl", hash = "sha256:054f42974064f103be0ed55b43f0c32fc435a47dc7353a9adaffa643b99fa380", size = 21524 },
]
[[package]]
name = "mypy-extensions"
version = "1.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/a2/6e/371856a3fb9d31ca8dac321cda606860fa4548858c0cc45d9d1d4ca2628b/mypy_extensions-1.1.0.tar.gz", hash = "sha256:52e68efc3284861e772bbcd66823fde5ae21fd2fdb51c62a211403730b916558", size = 6343 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/79/7b/2c79738432f5c924bef5071f933bcc9efd0473bac3b4aa584a6f7c1c8df8/mypy_extensions-1.1.0-py3-none-any.whl", hash = "sha256:1be4cccdb0f2482337c4743e60421de3a356cd97508abadd57d47403e94f5505", size = 4963 },
]
[[package]]
name = "narwhals"
version = "2.9.0"
@ -3742,6 +3774,20 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f2/98/7e6d147fd16a10a5f821db6e25f192265d6ecca3d82957a4fdd592cad49c/ratelim-0.1.6-py2.py3-none-any.whl", hash = "sha256:e1a7dd39e6b552b7cc7f52169cd66cdb826a1a30198e355d7016012987c9ad08", size = 4017 },
]
[[package]]
name = "rechunker"
version = "0.5.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "dask", extra = ["array", "diagnostics"] },
{ name = "mypy-extensions" },
{ name = "zarr" },
]
sdist = { url = "https://files.pythonhosted.org/packages/55/8e/9a76d6762d0db09ab289a836ea423a2feacc2f341885bea61f10312fa245/rechunker-0.5.2.tar.gz", hash = "sha256:18c610cc65854b3627c4d511138a7b962281f8e00838f78148cbf765e1ba2fb2", size = 480044 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/34/83/a485250bc09db55e4b4389d99e583fac871ceeaaa4620b67a31d8db95ef5/rechunker-0.5.2-py3-none-any.whl", hash = "sha256:e09585d69b429ae466470047a2b828f003c123dffe6a265720c0ab0ca78f4937", size = 22088 },
]
[[package]]
name = "referencing"
version = "0.37.0"
@ -4159,6 +4205,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/14/a0/bb38d3b76b8cae341dad93a2dd83ab7462e6dbcdd84d43f54ee60a8dc167/soupsieve-2.8-py3-none-any.whl", hash = "sha256:0cc76456a30e20f5d7f2e14a98a4ae2ee4e5abdc7c5ea0aafe795f344bc7984c", size = 36679 },
]
[[package]]
name = "spectate"
version = "1.0.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/c8/8d/78dbadaeea943cc0fb9d3cd6b0a4f4668a46f84de1c5507fe3c9f02b8973/spectate-1.0.1.tar.gz", hash = "sha256:49a2dde0962fcecf120cb361cc293989489078eb29ba1d8c3d342a741e898b7e", size = 14573 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/81/ec/8bdccea3ff7d557601183581340c3768b7bb7b1e32c8991f1130f0c1e2c4/spectate-1.0.1-py2.py3-none-any.whl", hash = "sha256:c4585194c238979f953fbf2ecf9f94c84d9d0a929432c7104e39984f52c9e718", size = 11077 },
]
[[package]]
name = "stack-data"
version = "0.6.3"
@ -4291,6 +4346,19 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/5c/23/c7abc0ca0a1526a0774eca151daeb8de62ec457e77262b66b359c3c7679e/tzdata-2025.2-py2.py3-none-any.whl", hash = "sha256:1a403fada01ff9221ca8044d701868fa132215d84beb92242d9acd2147f667a8", size = 347839 },
]
[[package]]
name = "ultraplot"
version = "1.63.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "matplotlib" },
{ name = "numpy" },
]
sdist = { url = "https://files.pythonhosted.org/packages/c3/f6/64cf6ec8e98d8534b6f4530d2517f00705b25100169a30bfcd27fc4f4e35/ultraplot-1.63.0.tar.gz", hash = "sha256:e3e2fd7029a7c8bf5577709c3c454aab7d156439123ac8ba841602127e2ae163", size = 14783448 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/47/fd/48a7ba597876e97b29b900f9191808ebd29340923627eadfd172295e8425/ultraplot-1.63.0-py3-none-any.whl", hash = "sha256:8e57063a627167bb1c835877c75347d6f1883434709a04a9666a680e42e5546a", size = 13574760 },
]
[[package]]
name = "uritemplate"
version = "4.2.0"