"""Download and preprocess ERA5 data. Variables of Interest: - 2 metre temperature (t2m) - Total precipitation (tp) - Snow Fall (sf) - Snow cover (snowc) - Snow depth (sde) - Surface sensible heat flux (sshf) - Lake ice bottom temperature (lblt) Aggregations: - Summer / Winter 20-bin histogram? Spatial -> Enrich -> Temporal ? Author: Tobias Hölzer Date: 09. June 2025 """ import time from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import Literal import cyclopts import dask.distributed as dd import geopandas as gpd import odc.geo import odc.geo.xr import pandas as pd import shapely import shapely.ops import xarray as xr from numcodecs.zarr3 import Blosc from rich import pretty, print, traceback from shapely.geometry import LineString, Polygon traceback.install(show_locals=True) pretty.install() DATA_DIR = Path("data/era5") AGG_PATH = DATA_DIR / "era5_agg.zarr" ALIGNED_PATH = DATA_DIR / "era5_spatial_aligned.zarr" MONTHLY_PATH = DATA_DIR / "era5_monthly.zarr" YEARLY_PATH = DATA_DIR / "era5_yearly.zarr" min_lat = 50 max_lat = 85 min_time = "2022-01-01" max_time = "2024-12-31" subset = {"latitude": slice(max_lat, min_lat), "time": slice(min_time, max_time)} DATA_DIR = Path("/isipd/projects/p_aicore_pf/tohoel001/era5_thawing_data") today = time.strftime("%Y-%m-%d") # TODO: I think it would be better to aggregate via hours instead of days # Pipeline would be: # Download hourly data -> Spatially match hourly data -> # For {daily, monthly, yearly}: # Enrich -> Aggregate temporally def create_encoding(ds: xr.Dataset): # encoding = {var: {"compressors": BloscCodec(cname="zlib", clevel=9)} for var in ds.data_vars} encoding = {var: {"compressors": Blosc(cname="zstd", clevel=9)} for var in [*ds.data_vars, *ds.coords]} return encoding def download_daily_aggregated(): era5 = xr.open_dataset( "https://data.earthdatahub.destine.eu/era5/reanalysis-era5-land-no-antartica-v0.zarr", storage_options={"client_kwargs": {"trust_env": True}}, chunks={"latitude": 64 * 4, "longitude": 64 * 4}, # chunks={}, engine="zarr", ).rename({"valid_time": "time"}) era5 = era5.sel(**subset) era5_agg = xr.merge( [ era5.t2m.resample(time="1D").max().rename("t2m_daily_max"), era5.t2m.resample(time="1D").min().rename("t2m_daily_min"), era5.tp.resample(time="1D").sum().rename("tp_daily_sum"), # era5.sf.resample(time="1D").sum().rename("sf_daily_sum"), # era5.snowc.resample(time="1D").mean().rename("snowc_daily_mean"), # era5.sde.resample(time="1D").mean().rename("sde_daily_mean"), # era5.sshf.resample(time="1D").sum().rename("sshf_daily_sum"), # era5.lblt.resample(time="1D").max().rename("lblt_daily_max"), ] ) # Rechunk if the first time chunk is not the same as the middle ones if era5_agg.chunksizes["time"][0] != era5_agg.chunksizes["time"][1]: era5_agg = era5_agg.chunk({"time": 120}) # Assign attributes era5_agg["t2m_daily_max"].attrs = {"long_name": "Daily maximum 2 metre temperature", "units": "K"} era5_agg["t2m_daily_min"].attrs = {"long_name": "Daily minimum 2 metre temperature", "units": "K"} era5_agg["tp_daily_sum"].attrs = {"long_name": "Daily total precipitation", "units": "m"} # era5_agg["sf_daily_sum"].attrs = {"long_name": "Daily total snow fall", "units": "m"} # era5_agg["snowc_daily_mean"].attrs = {"long_name": "Daily mean snow cover", "units": "m"} # era5_agg["sde_daily_mean"].attrs = {"long_name": "Daily mean snow depth", "units": "m"} # era5_agg["sshf_daily_sum"].attrs = {"long_name": "Daily total surface sensible heat flux", "units": "J/m²"} # era5_agg["lblt_daily_max"].attrs = {"long_name": "Daily maximum lake ice bottom temperature", "units": "K"} era5_agg.to_zarr(AGG_PATH, mode="w", encoding=create_encoding(era5_agg), consolidated=False) def crosses_antimeridian(geom: Polygon) -> bool: coords = shapely.get_coordinates(geom) crosses_any_meridian = (coords[:, 0] > 0).any() and (coords[:, 0] < 0).any() return crosses_any_meridian and abs(coords[:, 0]).max() > 90 def split_antimeridian_cell(geom: Polygon) -> list[Polygon]: # Assumes that it is a antimeridian hex coords = shapely.get_coordinates(geom) for i in range(coords.shape[0]): if coords[i, 0] < 0: coords[i, 0] += 360 geom = Polygon(coords) antimeridian = LineString([[180, -90], [180, 90]]) polys = shapely.ops.split(geom, antimeridian) return list(polys.geoms) def check_geobox(geobox): x, y = geobox.shape return x > 1 and y > 1 def extract_cell_data(idx: int, geom: Polygon) -> xr.Dataset: era5_agg = xr.open_zarr(AGG_PATH) assert {"latitude", "longitude", "time"} == set(era5_agg.dims), ( f"Expected dims ('latitude', 'longitude', 'time'), got {era5_agg.dims}" ) # cell.geometry is a shapely Polygon if not crosses_antimeridian(geom): geoms = [geom] # Split geometry in case it crossed antimeridian else: geoms = split_antimeridian_cell(geom) cell_data = [] for geom in geoms: geom = odc.geo.Geometry(geom, crs="epsg:4326") if not check_geobox(era5_agg.odc.geobox.enclosing(geom)): continue cell_data.append(era5_agg.odc.crop(geom).drop_vars("spatial_ref").mean(["latitude", "longitude"])) if len(cell_data) == 0: return None elif len(cell_data) == 1: return cell_data[0].expand_dims({"cell": [idx]}).chunk({"cell": 1}) else: return xr.concat(cell_data, dim="part").mean("part").expand_dims({"cell": [idx]}).chunk({"cell": 1}) def spatial_matching(grid: gpd.GeoDataFrame, n_workers: int = 10): with ThreadPoolExecutor(max_workers=n_workers) as executor: futures = { executor.submit(extract_cell_data, idx, row.geometry): idx for idx, row in grid.to_crs("epsg:4326").iterrows() } for future in as_completed(futures): idx = futures[future] try: data = future.result() data.to_zarr(ALIGNED_PATH, append_dim="cell", consolidated=False, encoding=create_encoding(data)) except Exception as e: print(f"Error processing cell {idx}: {e}") def daily_enrich() -> xr.Dataset: era5 = xr.open_zarr(ALIGNED_PATH) assert {"cell", "time"} == set(era5.dims), f"Expected dims ('cell', 'time'), got {era5.dims}" # Formulas based on Groeke et. al. (2025) Stochastic Weather generation... era5["t2m_daily_avg"] = (era5.t2m_daily_max + era5.t2m_daily_min) / 2 era5.t2m_daily_avg.attrs = {"long_name": "Daily average 2 metre temperature", "units": "K"} era5["t2m_daily_range"] = era5.t2m_daily_max - era5.t2m_daily_min era5.t2m_daily_range.attrs = {"long_name": "Daily range of 2 metre temperature", "units": "K"} era5["t2m_daily_skew"] = (era5.t2m_daily_avg - era5.t2m_daily_min) / era5.t2m_daily_range era5.t2m_daily_skew.attrs = {"long_name": "Daily skewness of 2 metre temperature"} era5["thawing_degree_days"] = (era5.t2m_daily_avg - 273.15).clip(min=0) era5.thawing_degree_days.attrs = {"long_name": "Thawing degree days", "units": "K"} era5["freezing_degree_days"] = (273.15 - era5.t2m_daily_avg).clip(min=0) era5.freezing_degree_days.attrs = {"long_name": "Freezing degree days", "units": "K"} era5["thawing_days"] = (era5.t2m_daily_avg > 273.15).astype(int) era5.thawing_days.attrs = {"long_name": "Thawing days"} era5["freezing_days"] = (era5.t2m_daily_avg < 273.15).astype(int) era5.freezing_days.attrs = {"long_name": "Freezing days"} era5["precipitation_occurrences"] = (era5.tp_daily_sum > 0).astype(int) era5.precipitation_occurrences.attrs = {"long_name": "Precipitation occurrences"} era5["snowfall_occurrences"] = (era5.sf_daily_sum > 0).astype(int) era5.snowfall_occurrences.attrs = {"long_name": "Snowfall occurrences"} era5["snow_isolation"] = era5.snowc_daily_mean * era5.sde_daily_mean era5.snow_isolation.attrs = {"long_name": "Snow isolation"} return era5 def monthly_aggregate(): era5 = daily_enrich() assert {"cell", "time"} == set(era5.dims), f"Expected dims ('cell', 'time'), got {era5.dims}" # Monthly aggregates monthly = xr.merge( [ # Original variables era5.t2m_daily_min.resample(time="1M").min().rename("t2m_monthly_min"), era5.t2m_daily_max.resample(time="1M").max().rename("t2m_monthly_max"), era5.tp_daily_sum.resample(time="1M").sum().rename("tp_monthly_sum"), era5.sf_daily_sum.resample(time="1M").sum().rename("sf_monthly_sum"), era5.snowc_daily_mean.resample(time="1M").mean().rename("snowc_monthly_mean"), era5.sde_daily_mean.resample(time="1M").mean().rename("sde_monthly_mean"), era5.sshf_daily_sum.resample(time="1M").sum().rename("sshf_monthly_sum"), era5.lblt_daily_max.resample(time="1M").max().rename("lblt_monthly_max"), # Enriched variables era5.t2m_daily_avg.resample(time="1M").mean().rename("t2m_monthly_avg"), era5.t2m_daily_range.resample(time="1M").mean().rename("t2m_daily_range_monthly_avg"), era5.t2m_daily_skew.resample(time="1M").mean().rename("t2m_daily_skew_monthly_avg"), era5.thawing_degree_days.resample(time="1M").sum().rename("thawing_degree_days_monthly"), era5.freezing_degree_days.resample(time="1M").sum().rename("freezing_degree_days_monthly"), era5.thawing_days.resample(time="1M").sum().rename("thawing_days_monthly"), era5.freezing_days.resample(time="1M").sum().rename("freezing_days_monthly"), era5.precipitation_occurrences.resample(time="1M").sum().rename("precipitation_occurrences_monthly"), era5.snowfall_occurrences.resample(time="1M").sum().rename("snowfall_occurrences_monthly"), era5.snow_isolation.resample(time="1M").mean().rename("snow_isolation_monthly_mean"), ] ) monthly.to_zarr(MONTHLY_PATH, mode="w", encoding=create_encoding(monthly), consolidated=False) def yearly_aggregate(): monthly = xr.open_zarr(MONTHLY_PATH) assert {"cell", "time"} == set(monthly.dims), f"Expected dims ('cell', 'time'), got {monthly.dims}" # Yearly aggregates (shifted by +10 months to start in Oktober, first and last years will be cropped) monthly_shifted = monthly.copy() monthly_shifted["time"] = monthly_shifted.get_index("time") + pd.DateOffset(months=10) incomplete_years = {monthly_shifted.time.dt.year.min().item(), monthly_shifted.time.dt.year.max().item()} monthly_shifted = monthly_shifted.sel(time=~monthly_shifted.time.dt.year.isin(incomplete_years)) yearly = xr.merge( [ # Original variables monthly_shifted.t2m_monthly_min.resample(time="1Y").min().rename("t2m_yearly_min"), monthly_shifted.t2m_monthly_max.resample(time="1Y").max().rename("t2m_yearly_max"), monthly_shifted.tp_monthly_sum.resample(time="1Y").sum().rename("tp_yearly_sum"), monthly_shifted.sf_monthly_sum.resample(time="1Y").sum().rename("sf_yearly_sum"), monthly_shifted.snowc_monthly_mean.resample(time="1Y").mean().rename("snowc_yearly_mean"), monthly_shifted.sde_monthly_mean.resample(time="1Y").mean().rename("sde_yearly_mean"), monthly_shifted.sshf_monthly_sum.resample(time="1Y").sum().rename("sshf_yearly_sum"), monthly_shifted.lblt_monthly_max.resample(time="1Y").max().rename("lblt_yearly_max"), # Enriched variables monthly_shifted.t2m_monthly_avg.resample(time="1Y").mean().rename("t2m_yearly_avg"), # TODO: Check if this is correct -> use daily / hourly data instead for range and skew? monthly_shifted.t2m_monthly_range.resample(time="1Y").mean().rename("t2m_daily_range_yearly_avg"), monthly_shifted.t2m_monthly_skew.resample(time="1Y").mean().rename("t2m_daily_skew_yearly_avg"), monthly_shifted.thawing_degree_days_monthly.resample(time="1Y").sum().rename("thawing_degree_days_yearly"), monthly_shifted.freezing_degree_days_monthly.resample(time="1Y") .sum() .rename("freezing_degree_days_yearly"), monthly_shifted.thawing_days_monthly.resample(time="1Y").sum().rename("thawing_days_yearly"), monthly_shifted.freezing_days_monthly.resample(time="1Y").sum().rename("freezing_days_yearly"), monthly_shifted.precipitation_occurrences_monthly.resample(time="1Y") .sum() .rename("precipitation_occurrences_yearly"), monthly_shifted.snowfall_occurrences_monthly.resample(time="1Y") .sum() .rename("snowfall_occurrences_yearly"), monthly_shifted.snow_isolation_monthly_mean.resample(time="1Y").mean().rename("snow_isolation_yearly_mean"), ] ) # Summer / Winter aggregates winter_months = [1, 2, 3, 4, 5, 6, 7] # These do NOT correspond to calendar months, but to the shifted months summer_months = [8, 9, 10, 11, 12] monthly_shifted_winter = monthly_shifted.sel(time=monthly_shifted.time.dt.month.isin(winter_months)) monthly_shifted_summer = monthly_shifted.sel(time=monthly_shifted.time.dt.month.isin(summer_months)) winter = xr.merge( [ # Original variables monthly_shifted_winter.t2m_monthly_min.resample(time="1Y").min().rename("t2m_winter_min"), monthly_shifted_winter.t2m_monthly_max.resample(time="1Y").max().rename("t2m_winter_max"), monthly_shifted_winter.tp_monthly_sum.resample(time="1Y").sum().rename("tp_winter_sum"), monthly_shifted_winter.sf_monthly_sum.resample(time="1Y").sum().rename("sf_winter_sum"), monthly_shifted_winter.snowc_monthly_mean.resample(time="1Y").mean().rename("snowc_winter_mean"), monthly_shifted_winter.sde_monthly_mean.resample(time="1Y").mean().rename("sde_winter_mean"), monthly_shifted_winter.sshf_monthly_sum.resample(time="1Y").sum().rename("sshf_winter_sum"), monthly_shifted_winter.lblt_monthly_max.resample(time="1Y").max().rename("lblt_winter_max"), # Enriched variables monthly_shifted_winter.t2m_monthly_avg.resample(time="1Y").mean().rename("t2m_winter_avg"), # TODO: Check if this is correct -> use daily / hourly data instead for range and skew? monthly_shifted_winter.t2m_monthly_range.resample(time="1Y").mean().rename("t2m_daily_range_winter_avg"), monthly_shifted_winter.t2m_monthly_skew.resample(time="1Y").mean().rename("t2m_daily_skew_winter_avg"), monthly_shifted_winter.thawing_degree_days_monthly.resample(time="1Y") .sum() .rename("thawing_degree_days_winter"), monthly_shifted_winter.freezing_degree_days_monthly.resample(time="1Y") .sum() .rename("freezing_degree_days_winter"), monthly_shifted_winter.thawing_days_monthly.resample(time="1Y").sum().rename("thawing_days_winter"), monthly_shifted_winter.freezing_days_monthly.resample(time="1Y").sum().rename("freezing_days_winter"), monthly_shifted_winter.precipitation_occurrences_monthly.resample(time="1Y") .sum() .rename("precipitation_occurrences_winter"), monthly_shifted_winter.snowfall_occurrences_monthly.resample(time="1Y") .sum() .rename("snowfall_occurrences_winter"), monthly_shifted_winter.snow_isolation_monthly_mean.resample(time="1Y") .mean() .rename("snow_isolation_winter_mean"), ] ) summer = xr.merge( [ # Original variables monthly_shifted_summer.t2m_monthly_min.resample(time="1Y").min().rename("t2m_summer_min"), monthly_shifted_summer.t2m_monthly_max.resample(time="1Y").max().rename("t2m_summer_max"), monthly_shifted_summer.tp_monthly_sum.resample(time="1Y").sum().rename("tp_summer_sum"), monthly_shifted_summer.sf_monthly_sum.resample(time="1Y").sum().rename("sf_summer_sum"), monthly_shifted_summer.snowc_monthly_mean.resample(time="1Y").mean().rename("snowc_summer_mean"), monthly_shifted_summer.sde_monthly_mean.resample(time="1Y").mean().rename("sde_summer_mean"), monthly_shifted_summer.sshf_monthly_sum.resample(time="1Y").sum().rename("sshf_summer_sum"), monthly_shifted_summer.lblt_monthly_max.resample(time="1Y").max().rename("lblt_summer_max"), # Enriched variables monthly_shifted_summer.t2m_monthly_avg.resample(time="1Y").mean().rename("t2m_summer_avg"), # TODO: Check if this is correct -> use daily / hourly data instead for range and skew? monthly_shifted_summer.t2m_monthly_range.resample(time="1Y").mean().rename("t2m_daily_range_summer_avg"), monthly_shifted_summer.t2m_monthly_skew.resample(time="1Y").mean().rename("t2m_daily_skew_summer_avg"), monthly_shifted_summer.thawing_degree_days_summer.resample(time="1Y") .sum() .rename("thawing_degree_days_summer"), monthly_shifted_summer.freezing_degree_days_summer.resample(time="1Y") .sum() .rename("freezing_degree_days_summer"), monthly_shifted_summer.thawing_days_summer.resample(time="1Y").sum().rename("thawing_days_summer"), monthly_shifted_summer.freezing_days_summer.resample(time="1Y").sum().rename("freezing_days_summer"), monthly_shifted_summer.precipitation_occurrences_summer.resample(time="1Y") .sum() .rename("precipitation_occurrences_summer"), monthly_shifted_summer.snowfall_occurrences_summer.resample(time="1Y") .sum() .rename("snowfall_occurrences_summer"), monthly_shifted_summer.snow_isolation_summer.resample(time="1Y") .mean() .rename("snow_isolation_summer_mean"), ] ) combined = xr.merge([yearly, summer, winter]) combined.to_zarr(YEARLY_PATH, mode="w", encoding=create_encoding(combined), consolidated=False) def cli(grid: Literal["hex", "healpix"], level: int, download: bool = False, n_workers: int = 10): """Run the CLI for ERA5 data processing. Args: grid (Literal["hex", "healpix"]): The grid type to use. level (int): The processing level. download (bool, optional): Whether to download data. Defaults to False. n_workers (int, optional): Number of workers for parallel processing. Defaults to 10. """ cluster = dd.LocalCluster(n_workers=n_workers, threads_per_worker=4, memory_limit="20GB") client = dd.Client(cluster) print(client) print(client.dashboard_link) if download: download_daily_aggregated() print("Downloaded and aggregated ERA5 data.") grid = gpd.read_parquet(DATA_DIR / f"grids/permafrost_{grid}{level}_grid.parquet") spatial_matching(grid, n_workers=n_workers) print("Spatially matched ERA5 data to grid.") monthly_aggregate() yearly_aggregate() print("Enriched ERA5 data with additional features and aggregated it temporally.") if __name__ == "__main__": cyclopts.run(cli)