"""Download and preprocess ERA5 data. Variables of Interest: - 2 metre temperature (t2m) [instant] - Total precipitation (tp) [accum] - Snow Fall (sf) [accum] - Snow cover (snowc) [instant] - Snow depth (sde) [instant] - Surface sensible heat flux (sshf) [accum] - Lake ice bottom temperature (lblt) [instant] Daily Variables (downloaded from hourly data): - t2m_daily_max - t2m_daily_min - tp_daily_sum - sf_daily_sum - snowc_daily_mean - sde_daily_mean - sshf_daily_sum - lblt_daily_max Derived Daily Variables: - t2m_daily_avg - t2m_daily_range - t2m_daily_skew - thawing_degree_days - freezing_degree_days - thawing_days - freezing_days - precipitation_occurrences - snowfall_occurrences - snow_isolation (snowc * sde) Monthly Variables: - t2m_monthly_max - t2m_monthly_min - tp_monthly_sum - sf_monthly_sum - snowc_monthly_mean - sde_monthly_mean - sshf_monthly_sum - lblt_monthly_max - t2m_monthly_avg - t2m_monthly_range_avg - t2m_monthly_skew_avg - thawing_degree_days_monthly - freezing_degree_days_monthly - thawing_days_monthly - freezing_days_monthly - precipitation_occurrences_monthly TODO: Rename to precipitation_days_monthly? - snowfall_occurrences_monthly TODO: Rename to snowfall_days_monthly? - snow_isolation_monthly_mean Yearly Variables: - TODO # TODO Variables: - Day of first thaw (yearly) - Day of last thaw (yearly) - Thawing period length (yearly) - Freezing period length (yearly) Author: Tobias Hölzer Date: 09. June 2025 """ import os import time from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import Literal import cyclopts import dask.distributed as dd import geopandas as gpd import odc.geo import odc.geo.xr import pandas as pd import shapely import shapely.ops import xarray as xr from numcodecs.zarr3 import Blosc from rich import pretty, print, traceback from rich.progress import track from shapely.geometry import LineString, Polygon traceback.install(show_locals=True, suppress=[cyclopts, xr, pd]) pretty.install() cli = cyclopts.App() # TODO: Directly handle stuff on a grid level - this is more what the zarr access is indented to do DATA_DIR = Path(os.environ.get("DATA_DIR", "data")) / "entropyc-rts" # DATA_DIR = Path("data") ERA5_DIR = DATA_DIR / "era5" AGG_PATH = ERA5_DIR / "era5_agg.zarr" ALIGNED_PATH = ERA5_DIR / "era5_spatial_aligned.zarr" MONTHLY_PATH = ERA5_DIR / "era5_monthly.zarr" YEARLY_PATH = ERA5_DIR / "era5_yearly.zarr" min_lat = 50 max_lat = 83.7 # Ensures the right Chunks Size (90 - 64 / 10 + 0.1) min_time = "1990-01-01" max_time = "2024-12-31" today = time.strftime("%Y-%m-%d") # TODO: I think it would be better to aggregate via hours instead of days # Pipeline would be: # Download hourly data -> Spatially match hourly data -> # For {daily, monthly, yearly}: # Enrich -> Aggregate temporally # TODO: Rethink aggregations by differentiating between "instant" and "accum" variables: # https://consensus.app/search/instantaneous-versus-accumulated-weather/JBaNbhc1R_-BwN5E9Un0Fw/ # ================ # === Download === # ================ def create_encoding(ds: xr.Dataset): """Create compression encoding for zarr dataset storage. Creates Blosc compression configuration for all data variables and coordinates in the dataset using zstd compression with level 9. Args: ds (xr.Dataset): The xarray Dataset to create encoding for. Returns: dict: Encoding dictionary with compression settings for each variable. """ # encoding = {var: {"compressors": BloscCodec(cname="zlib", clevel=9)} for var in ds.data_vars} encoding = {var: {"compressors": Blosc(cname="zstd", clevel=9)} for var in [*ds.data_vars, *ds.coords]} return encoding def download_daily_aggregated(): """Download and aggregate ERA5 data to daily resolution. Downloads ERA5 reanalysis data from the DESTINE Earth Data Hub and aggregates it to daily resolution. Includes temperature extremes, precipitation, snow, and surface heat flux variables. The function downloads hourly data and creates daily aggregates: - Temperature: daily min/max - Precipitation and snowfall: daily totals - Snow cover and depth: daily means - Surface heat flux: daily totals - Lake ice temperature: daily max The aggregated data is saved to a zarr file with compression. """ era5 = xr.open_dataset( "https://data.earthdatahub.destine.eu/era5/reanalysis-era5-land-no-antartica-v0.zarr", storage_options={"client_kwargs": {"trust_env": True}}, chunks={}, # chunks={}, engine="zarr", ).rename({"valid_time": "time"}) subset = { "latitude": slice(max_lat, min_lat), } # Compute the clostest chunk-start to min_time, to avoid problems with cropped chunks at the start tchunksize = era5.chunksizes["time"][0] era5_chunk_starts = pd.date_range(era5.time.min().item(), era5.time.max().item(), freq=f"{tchunksize}h") closest_chunk_start = era5_chunk_starts[ era5_chunk_starts.get_indexer([pd.to_datetime(min_time)], method="ffill")[0] ] subset["time"] = slice(str(closest_chunk_start), max_time) era5 = era5.sel(**subset) era5_agg = xr.merge( [ era5.t2m.resample(time="1D").max().rename("t2m_daily_max"), era5.t2m.resample(time="1D").min().rename("t2m_daily_min"), era5.tp.resample(time="1D").sum().rename("tp_daily_sum"), era5.sf.resample(time="1D").sum().rename("sf_daily_sum"), era5.snowc.resample(time="1D").mean().rename("snowc_daily_mean"), era5.sde.resample(time="1D").mean().rename("sde_daily_mean"), era5.sshf.resample(time="1D").sum().rename("sshf_daily_sum"), era5.lblt.resample(time="1D").max().rename("lblt_daily_max"), ] ) # Assign attributes era5_agg["t2m_daily_max"].attrs = {"long_name": "Daily maximum 2 metre temperature", "units": "K"} era5_agg["t2m_daily_min"].attrs = {"long_name": "Daily minimum 2 metre temperature", "units": "K"} era5_agg["tp_daily_sum"].attrs = {"long_name": "Daily total precipitation", "units": "m"} era5_agg["sf_daily_sum"].attrs = {"long_name": "Daily total snow fall", "units": "m"} era5_agg["snowc_daily_mean"].attrs = {"long_name": "Daily mean snow cover", "units": "m"} era5_agg["sde_daily_mean"].attrs = {"long_name": "Daily mean snow depth", "units": "m"} era5_agg["sshf_daily_sum"].attrs = {"long_name": "Daily total surface sensible heat flux", "units": "J/m²"} era5_agg["lblt_daily_max"].attrs = {"long_name": "Daily maximum lake ice bottom temperature", "units": "K"} era5_agg = era5_agg.odc.assign_crs("epsg:4326") era5_agg = era5_agg.drop_vars(["surface", "number", "depthBelowLandLayer"]) era5_agg.to_zarr(AGG_PATH, mode="w", encoding=create_encoding(era5_agg), consolidated=False) @cli.command def download(): """Download ERA5 data using Dask cluster for parallel processing. Creates a local Dask cluster and downloads daily aggregated ERA5 data. The cluster is configured with a single worker with 10 threads and 100GB memory limit for optimal performance. """ with ( dd.LocalCluster(n_workers=1, threads_per_worker=10, memory_limit="100GB") as cluster, dd.Client(cluster) as client, ): print(client) print(client.dashboard_link) download_daily_aggregated() print("Downloaded and aggregated ERA5 data.") # =========================== # === Spatial Aggregation === # =========================== def _crosses_antimeridian(geom: Polygon) -> bool: coords = shapely.get_coordinates(geom) crosses_any_meridian = (coords[:, 0] > 0).any() and (coords[:, 0] < 0).any() return crosses_any_meridian and abs(coords[:, 0]).max() > 90 def _split_antimeridian_cell(geom: Polygon) -> list[Polygon]: # Assumes that it is a antimeridian hex coords = shapely.get_coordinates(geom) for i in range(coords.shape[0]): if coords[i, 0] < 0: coords[i, 0] += 360 geom = Polygon(coords) antimeridian = LineString([[180, -90], [180, 90]]) polys = shapely.ops.split(geom, antimeridian) return list(polys.geoms) def _check_geobox(geobox): x, y = geobox.shape return x > 1 and y > 1 def extract_cell_data(idx: int, geom: Polygon) -> xr.Dataset: """Extract ERA5 data for a specific grid cell geometry. Extracts and spatially averages ERA5 data within the bounds of a grid cell. Handles antimeridian-crossing cells by splitting them appropriately. The extracted data is written to the aligned zarr file. Args: idx (int): Index of the grid cell. geom (Polygon): Polygon geometry of the grid cell. Returns: xr.Dataset or bool: Returns True if successful, False if cell doesn't overlap with ERA5 data. """ era5_agg = ( xr.open_zarr(AGG_PATH, consolidated=False) .set_coords("spatial_ref") .drop_vars(["surface", "number", "depthBelowLandLayer"]) ) # cell.geometry is a shapely Polygon if not _crosses_antimeridian(geom): geoms = [geom] # Split geometry in case it crossed antimeridian else: geoms = _split_antimeridian_cell(geom) cell_data = [] for geom in geoms: geom = odc.geo.Geometry(geom, crs="epsg:4326") if not _check_geobox(era5_agg.odc.geobox.enclosing(geom)): continue cell_data.append(era5_agg.odc.crop(geom).drop_vars("spatial_ref").mean(["latitude", "longitude"])) if len(cell_data) == 0: return False elif len(cell_data) == 1: cell_data = cell_data[0] else: cell_data = xr.concat(cell_data, dim="part").mean("part") cell_data = cell_data.expand_dims({"cell": [idx]}).compute() cell_data.to_zarr(ALIGNED_PATH, region="auto", consolidated=False) return True @cli.command def spatial_agg(grid: Literal["hex", "healpix"], level: int, n_workers: int = 10): """Perform spatial aggregation of ERA5 data to grid cells. Loads a grid and spatially aggregates ERA5 data to each grid cell using parallel processing. Creates an empty zarr file first, then fills it with extracted data for each cell. Args: grid ("hex", "healpix"): Grid type. level (int): Grid resolution level. n_workers (int, optional): Number of parallel workers to use. Defaults to 10. """ gridname = f"permafrost_{grid}{level}" grid = gpd.read_parquet(DATA_DIR / f"grids/{gridname}_grid.parquet") # Create an empty zarr array with the right dimensions era5_agg = ( xr.open_zarr(AGG_PATH, consolidated=False) .set_coords("spatial_ref") .drop_vars(["surface", "number", "depthBelowLandLayer"]) ) assert {"latitude", "longitude", "time"} == set(era5_agg.dims), ( f"Expected dims ('latitude', 'longitude', 'time'), got {era5_agg.dims}" ) assert era5_agg.odc.crs == "epsg:4326", f"Expected CRS 'epsg:4326', got {era5_agg.odc.crs}" empty = ( xr.zeros_like(era5_agg.isel(latitude=0, longitude=0)) .expand_dims({"cell": [idx for idx, _ in grid.iterrows()]}) .chunk({"cell": 1, "time": len(era5_agg.time)}) ) empty.to_zarr(ALIGNED_PATH, mode="w", consolidated=False, encoding=create_encoding(empty)) print(f"Starting spatial matching of {len(grid)} cells with {n_workers} workers...") # TODO: Maybe change to process pool executor? with ThreadPoolExecutor(max_workers=n_workers) as executor: futures = { executor.submit(extract_cell_data, idx, row.geometry): idx for idx, row in grid.to_crs("epsg:4326").iterrows() } for future in track(as_completed(futures), total=len(futures), description="Processing cells"): idx = futures[future] try: flag = future.result() if flag: print(f"Successfully written cell {idx}") else: print(f"Cell {idx} did not overlap with ERA5 data.") except Exception as e: print(f"Error processing cell {idx}: {e}") print(type(e)) print("Finished spatial matching.") # ============================ # === Temporal Aggregation === # ============================ def daily_enrich() -> xr.Dataset: """Enrich daily ERA5 data with derived climate variables. Loads spatially aligned ERA5 data and computes additional climate variables. Creates derived variables including temperature statistics, degree days, and occurrence indicators. Derived variables include: - Daily average and range temperature - Temperature skewness - Thawing and freezing degree days - Thawing and freezing day counts - Precipitation and snowfall occurrences - Snow isolation index Returns: xr.Dataset: Enriched dataset with original and derived variables. """ era5 = xr.open_zarr(ALIGNED_PATH, consolidated=False).set_coords("spatial_ref") assert {"cell", "time"} == set(era5.dims), f"Expected dims ('cell', 'time'), got {era5.dims}" # Formulas based on Groeke et. al. (2025) Stochastic Weather generation... era5["t2m_daily_avg"] = (era5.t2m_daily_max + era5.t2m_daily_min) / 2 era5.t2m_daily_avg.attrs = {"long_name": "Daily average 2 metre temperature", "units": "K"} era5["t2m_daily_range"] = era5.t2m_daily_max - era5.t2m_daily_min era5.t2m_daily_range.attrs = {"long_name": "Daily range of 2 metre temperature", "units": "K"} era5["t2m_daily_skew"] = (era5.t2m_daily_avg - era5.t2m_daily_min) / era5.t2m_daily_range era5.t2m_daily_skew.attrs = {"long_name": "Daily skewness of 2 metre temperature"} era5["thawing_degree_days"] = (era5.t2m_daily_avg - 273.15).clip(min=0) era5.thawing_degree_days.attrs = {"long_name": "Thawing degree days", "units": "K"} era5["freezing_degree_days"] = (273.15 - era5.t2m_daily_avg).clip(min=0) era5.freezing_degree_days.attrs = {"long_name": "Freezing degree days", "units": "K"} era5["thawing_days"] = (era5.t2m_daily_avg > 273.15).astype(int) era5.thawing_days.attrs = {"long_name": "Thawing days"} era5["freezing_days"] = (era5.t2m_daily_avg < 273.15).astype(int) era5.freezing_days.attrs = {"long_name": "Freezing days"} era5["precipitation_occurrences"] = (era5.tp_daily_sum > 0).astype(int) era5.precipitation_occurrences.attrs = {"long_name": "Precipitation occurrences"} era5["snowfall_occurrences"] = (era5.sf_daily_sum > 0).astype(int) era5.snowfall_occurrences.attrs = {"long_name": "Snowfall occurrences"} era5["snow_isolation"] = era5.snowc_daily_mean * era5.sde_daily_mean era5.snow_isolation.attrs = {"long_name": "Snow isolation"} return era5 def monthly_aggregate(): """Aggregate enriched daily ERA5 data to monthly resolution. Takes the enriched daily ERA5 data and creates monthly aggregates using appropriate statistical functions for each variable type. Temperature variables use min/max/mean, accumulation variables use sums, and derived variables use appropriate aggregations. The aggregated monthly data is saved to a zarr file for further processing. """ era5 = daily_enrich() assert {"cell", "time"} == set(era5.dims), f"Expected dims ('cell', 'time'), got {era5.dims}" # Monthly aggregates monthly = xr.merge( [ # Original variables era5.t2m_daily_min.resample(time="1ME").min().rename("t2m_monthly_min"), era5.t2m_daily_max.resample(time="1ME").max().rename("t2m_monthly_max"), era5.tp_daily_sum.resample(time="1ME").sum().rename("tp_monthly_sum"), era5.sf_daily_sum.resample(time="1ME").sum().rename("sf_monthly_sum"), era5.snowc_daily_mean.resample(time="1ME").mean().rename("snowc_monthly_mean"), era5.sde_daily_mean.resample(time="1ME").mean().rename("sde_monthly_mean"), era5.sshf_daily_sum.resample(time="1ME").sum().rename("sshf_monthly_sum"), era5.lblt_daily_max.resample(time="1ME").max().rename("lblt_monthly_max"), # Enriched variables era5.t2m_daily_avg.resample(time="1ME").mean().rename("t2m_monthly_avg"), era5.t2m_daily_range.resample(time="1ME").mean().rename("t2m_monthly_range_avg"), era5.t2m_daily_skew.resample(time="1ME").mean().rename("t2m_monthly_skew_avg"), era5.thawing_degree_days.resample(time="1ME").sum().rename("thawing_degree_days_monthly"), era5.freezing_degree_days.resample(time="1ME").sum().rename("freezing_degree_days_monthly"), era5.thawing_days.resample(time="1ME").sum().rename("thawing_days_monthly"), era5.freezing_days.resample(time="1ME").sum().rename("freezing_days_monthly"), era5.precipitation_occurrences.resample(time="1ME").sum().rename("precipitation_occurrences_monthly"), era5.snowfall_occurrences.resample(time="1ME").sum().rename("snowfall_occurrences_monthly"), era5.snow_isolation.resample(time="1ME").mean().rename("snow_isolation_monthly_mean"), ] ) monthly.to_zarr(MONTHLY_PATH, mode="w", encoding=create_encoding(monthly), consolidated=False) def yearly_aggregate(): """Aggregate monthly ERA5 data to yearly resolution with seasonal splits. Takes monthly aggregated data and creates yearly aggregates using a shifted calendar (October to September) to better capture Arctic seasonal patterns. Creates separate aggregates for full year, winter (Oct-Apr), and summer (May-Sep) periods. The first and last incomplete years are excluded from the analysis. Winter months are defined as months 1-7 in the shifted calendar, and summer months are 8-12. The final dataset includes yearly, winter, and summer aggregates for all climate variables, saved to a zarr file. """ monthly = xr.open_zarr(MONTHLY_PATH, consolidated=False).set_coords("spatial_ref") assert {"cell", "time"} == set(monthly.dims), f"Expected dims ('cell', 'time'), got {monthly.dims}" # Yearly aggregates (shifted by +10 months to start in Oktober, first and last years will be cropped) monthly_shifted = monthly.copy() monthly_shifted["time"] = monthly_shifted.get_index("time") + pd.DateOffset(months=10) incomplete_years = {monthly_shifted.time.dt.year.min().item(), monthly_shifted.time.dt.year.max().item()} monthly_shifted = monthly_shifted.sel(time=~monthly_shifted.time.dt.year.isin(incomplete_years)) yearly = xr.merge( [ # Original variables monthly_shifted.t2m_monthly_min.resample(time="1YE").min().rename("t2m_yearly_min"), monthly_shifted.t2m_monthly_max.resample(time="1YE").max().rename("t2m_yearly_max"), monthly_shifted.tp_monthly_sum.resample(time="1YE").sum().rename("tp_yearly_sum"), monthly_shifted.sf_monthly_sum.resample(time="1YE").sum().rename("sf_yearly_sum"), monthly_shifted.snowc_monthly_mean.resample(time="1YE").mean().rename("snowc_yearly_mean"), monthly_shifted.sde_monthly_mean.resample(time="1YE").mean().rename("sde_yearly_mean"), monthly_shifted.sshf_monthly_sum.resample(time="1YE").sum().rename("sshf_yearly_sum"), monthly_shifted.lblt_monthly_max.resample(time="1YE").max().rename("lblt_yearly_max"), # Enriched variables monthly_shifted.t2m_monthly_avg.resample(time="1YE").mean().rename("t2m_yearly_avg"), # TODO: Check if this is correct -> use daily / hourly data instead for range and skew? monthly_shifted.t2m_monthly_range_avg.resample(time="1YE").mean().rename("t2m_daily_range_yearly_avg"), monthly_shifted.t2m_monthly_skew_avg.resample(time="1YE").mean().rename("t2m_daily_skew_yearly_avg"), monthly_shifted.thawing_degree_days_monthly.resample(time="1YE").sum().rename("thawing_degree_days_yearly"), monthly_shifted.freezing_degree_days_monthly.resample(time="1YE") .sum() .rename("freezing_degree_days_yearly"), monthly_shifted.thawing_days_monthly.resample(time="1YE").sum().rename("thawing_days_yearly"), monthly_shifted.freezing_days_monthly.resample(time="1YE").sum().rename("freezing_days_yearly"), monthly_shifted.precipitation_occurrences_monthly.resample(time="1YE") .sum() .rename("precipitation_occurrences_yearly"), monthly_shifted.snowfall_occurrences_monthly.resample(time="1YE") .sum() .rename("snowfall_occurrences_yearly"), monthly_shifted.snow_isolation_monthly_mean.resample(time="1YE") .mean() .rename("snow_isolation_yearly_mean"), ] ) # Summer / Winter aggregates winter_months = [1, 2, 3, 4, 5, 6, 7] # These do NOT correspond to calendar months, but to the shifted months summer_months = [8, 9, 10, 11, 12] monthly_shifted_winter = monthly_shifted.sel(time=monthly_shifted.time.dt.month.isin(winter_months)) monthly_shifted_summer = monthly_shifted.sel(time=monthly_shifted.time.dt.month.isin(summer_months)) winter = xr.merge( [ # Original variables monthly_shifted_winter.t2m_monthly_min.resample(time="1YE").min().rename("t2m_winter_min"), monthly_shifted_winter.t2m_monthly_max.resample(time="1YE").max().rename("t2m_winter_max"), monthly_shifted_winter.tp_monthly_sum.resample(time="1YE").sum().rename("tp_winter_sum"), monthly_shifted_winter.sf_monthly_sum.resample(time="1YE").sum().rename("sf_winter_sum"), monthly_shifted_winter.snowc_monthly_mean.resample(time="1YE").mean().rename("snowc_winter_mean"), monthly_shifted_winter.sde_monthly_mean.resample(time="1YE").mean().rename("sde_winter_mean"), monthly_shifted_winter.sshf_monthly_sum.resample(time="1YE").sum().rename("sshf_winter_sum"), monthly_shifted_winter.lblt_monthly_max.resample(time="1YE").max().rename("lblt_winter_max"), # Enriched variables monthly_shifted_winter.t2m_monthly_avg.resample(time="1YE").mean().rename("t2m_winter_avg"), # TODO: Check if this is correct -> use daily / hourly data instead for range and skew? monthly_shifted_winter.t2m_monthly_range_avg.resample(time="1YE") .mean() .rename("t2m_daily_range_winter_avg"), monthly_shifted_winter.t2m_monthly_skew_avg.resample(time="1YE").mean().rename("t2m_daily_skew_winter_avg"), monthly_shifted_winter.thawing_degree_days_monthly.resample(time="1YE") .sum() .rename("thawing_degree_days_winter"), monthly_shifted_winter.freezing_degree_days_monthly.resample(time="1YE") .sum() .rename("freezing_degree_days_winter"), monthly_shifted_winter.thawing_days_monthly.resample(time="1YE").sum().rename("thawing_days_winter"), monthly_shifted_winter.freezing_days_monthly.resample(time="1YE").sum().rename("freezing_days_winter"), monthly_shifted_winter.precipitation_occurrences_monthly.resample(time="1YE") .sum() .rename("precipitation_occurrences_winter"), monthly_shifted_winter.snowfall_occurrences_monthly.resample(time="1YE") .sum() .rename("snowfall_occurrences_winter"), monthly_shifted_winter.snow_isolation_monthly_mean.resample(time="1YE") .mean() .rename("snow_isolation_winter_mean"), ] ) summer = xr.merge( [ # Original variables monthly_shifted_summer.t2m_monthly_min.resample(time="1YE").min().rename("t2m_summer_min"), monthly_shifted_summer.t2m_monthly_max.resample(time="1YE").max().rename("t2m_summer_max"), monthly_shifted_summer.tp_monthly_sum.resample(time="1YE").sum().rename("tp_summer_sum"), monthly_shifted_summer.sf_monthly_sum.resample(time="1YE").sum().rename("sf_summer_sum"), monthly_shifted_summer.snowc_monthly_mean.resample(time="1YE").mean().rename("snowc_summer_mean"), monthly_shifted_summer.sde_monthly_mean.resample(time="1YE").mean().rename("sde_summer_mean"), monthly_shifted_summer.sshf_monthly_sum.resample(time="1YE").sum().rename("sshf_summer_sum"), monthly_shifted_summer.lblt_monthly_max.resample(time="1YE").max().rename("lblt_summer_max"), # Enriched variables monthly_shifted_summer.t2m_monthly_avg.resample(time="1YE").mean().rename("t2m_summer_avg"), # TODO: Check if this is correct -> use daily / hourly data instead for range and skew? monthly_shifted_summer.t2m_monthly_range_avg.resample(time="1YE") .mean() .rename("t2m_daily_range_summer_avg"), monthly_shifted_summer.t2m_monthly_skew_avg.resample(time="1YE").mean().rename("t2m_daily_skew_summer_avg"), monthly_shifted_summer.thawing_degree_days_monthly.resample(time="1YE") .sum() .rename("thawing_degree_days_summer"), monthly_shifted_summer.freezing_degree_days_monthly.resample(time="1YE") .sum() .rename("freezing_degree_days_summer"), monthly_shifted_summer.thawing_days_monthly.resample(time="1YE").sum().rename("thawing_days_summer"), monthly_shifted_summer.freezing_days_monthly.resample(time="1YE").sum().rename("freezing_days_summer"), monthly_shifted_summer.precipitation_occurrences_monthly.resample(time="1YE") .sum() .rename("precipitation_occurrences_summer"), monthly_shifted_summer.snowfall_occurrences_monthly.resample(time="1YE") .sum() .rename("snowfall_occurrences_summer"), monthly_shifted_summer.snow_isolation_monthly_mean.resample(time="1YE") .mean() .rename("snow_isolation_summer_mean"), ] ) combined = xr.merge([yearly, summer, winter]) combined.to_zarr(YEARLY_PATH, mode="w", encoding=create_encoding(combined), consolidated=False) @cli.command def temporal_agg(n_workers: int = 10): """Perform temporal aggregation of ERA5 data using Dask cluster. Creates a Dask cluster and runs both monthly and yearly aggregation functions to generate temporally aggregated climate datasets. The processing uses parallel workers for efficient computation. Args: n_workers (int, optional): Number of Dask workers to use. Defaults to 10. """ with ( dd.LocalCluster(n_workers=n_workers, threads_per_worker=20, memory_limit="10GB") as cluster, dd.Client(cluster) as client, ): print(client) print(client.dashboard_link) monthly_aggregate() yearly_aggregate() print("Enriched ERA5 data with additional features and aggregated it temporally.") if __name__ == "__main__": cli()