diff --git a/.gitignore b/.gitignore index 00f25b7..d670061 100755 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,6 @@ wheels/ # Data data + +# Editors +.vscode/ diff --git a/alphaearth.py b/alphaearth.py index 1ebd4cf..0a4d29b 100644 --- a/alphaearth.py +++ b/alphaearth.py @@ -1,5 +1,6 @@ """Extract satellite embeddings from Google Earth Engine and map them to a grid.""" +import os from pathlib import Path from typing import Literal @@ -16,68 +17,69 @@ pretty.install() traceback.install() ee.Initialize(project="ee-tobias-hoelzer") -DATA_DIR = Path("data") +DATA_DIR = Path(os.environ.get("DATA_DIR", "data")) / "entropyc-rts" EMBEDDINGS_DIR = DATA_DIR / "embeddings" EMBEDDINGS_DIR.mkdir(parents=True, exist_ok=True) -def cli(grid: Literal["hex", "healpix"], level: int, year: int): +def cli(grid: Literal["hex", "healpix"], level: int, backup_intermediate: bool = False): """Extract satellite embeddings from Google Earth Engine and map them to a grid. Args: grid (Literal["hex", "healpix"]): The grid type to use. level (int): The grid level to use. - year (int): The year to extract embeddings for. Must be between 2017 and 2024. + backup_intermediate (bool, optional): Whether to backup intermediate results. Defaults to False. """ gridname = f"permafrost_{grid}{level}" grid = gpd.read_parquet(DATA_DIR / f"grids/{gridname}_grid.parquet") - embedding_collection = ee.ImageCollection("GOOGLE/SATELLITE_EMBEDDING/V1/ANNUAL") - embedding_collection = embedding_collection.filterDate(f"{year}-01-01", f"{year}-12-31") - bands = [f"A{str(i).zfill(2)}" for i in range(64)] - def extract_embedding(feature): - # Filter collection by geometry - geom = feature.geometry() - embedding = embedding_collection.filterBounds(geom).mosaic() - # Get mean embedding value for the geometry - mean_dict = embedding.reduceRegion( - reducer=ee.Reducer.median(), - geometry=geom, - ) - # Add mean embedding values as properties to the feature - return feature.set(mean_dict) + for year in track(range(2022, 2025), total=3, description="Processing years..."): + embedding_collection = ee.ImageCollection("GOOGLE/SATELLITE_EMBEDDING/V1/ANNUAL") + embedding_collection = embedding_collection.filterDate(f"{year}-01-01", f"{year}-12-31") + bands = [f"A{str(i).zfill(2)}" for i in range(64)] - # Process grid in batches of 100 - batch_size = 100 - all_results = [] - n_batches = len(grid) // batch_size - for batch_num, batch_grid in track( - enumerate(np.array_split(grid, n_batches)), - description="Processing batches...", - total=n_batches, - ): - print(f"Processing batch with {len(batch_grid)} items") + def extract_embedding(feature): + # Filter collection by geometry + geom = feature.geometry() + embedding = embedding_collection.filterBounds(geom).mosaic() + # Get mean embedding value for the geometry + mean_dict = embedding.reduceRegion( + reducer=ee.Reducer.median(), + geometry=geom, + ) + # Add mean embedding values as properties to the feature + return feature.set(mean_dict) - # Convert batch to EE FeatureCollection - eegrid_batch = ee.FeatureCollection(batch_grid.to_crs("epsg:4326").__geo_interface__) + # Process grid in batches of 100 + batch_size = 100 + all_results = [] + n_batches = len(grid) // batch_size + for batch_num, batch_grid in track( + enumerate(np.array_split(grid, n_batches)), + description="Processing batches...", + total=n_batches, + ): + # Convert batch to EE FeatureCollection + eegrid_batch = ee.FeatureCollection(batch_grid.to_crs("epsg:4326").__geo_interface__) - # Apply embedding extraction to batch - eeegrid_batch = eegrid_batch.map(extract_embedding) - df_batch = geemap.ee_to_df(eeegrid_batch) + # Apply embedding extraction to batch + eeegrid_batch = eegrid_batch.map(extract_embedding) + df_batch = geemap.ee_to_df(eeegrid_batch) - # Save batch immediately to disk as backup - batch_filename = f"{gridname}_embeddings-{year}_batch{batch_num:06d}.parquet" - batch_result = batch_grid.merge(df_batch[[*bands, "cell_id"]], on="cell_id", how="left") - batch_result.to_parquet(EMBEDDINGS_DIR / f"{batch_filename}") + # Store batch results + all_results.append(df_batch) - # Store batch results - all_results.append(df_batch) + # Save batch immediately to disk as backup + if backup_intermediate: + batch_filename = f"{gridname}_embeddings-{year}_batch{batch_num:06d}.parquet" + batch_result = batch_grid.merge(df_batch[[*bands, "cell_id"]], on="cell_id", how="left") + batch_result.to_parquet(EMBEDDINGS_DIR / f"{batch_filename}") - # Combine all batch results - df = pd.concat(all_results, ignore_index=True) - embeddings_on_grid = grid.merge(df[[*bands, "cell_id"]], on="cell_id", how="left") - embeddings_on_grid.to_parquet(EMBEDDINGS_DIR / f"{gridname}_embeddings-{year}.parquet") + # Combine all batch results + df = pd.concat(all_results, ignore_index=True) + embeddings_on_grid = grid.merge(df[[*bands, "cell_id"]], on="cell_id", how="left") + embeddings_on_grid.to_parquet(EMBEDDINGS_DIR / f"{gridname}_embeddings-{year}.parquet") if __name__ == "__main__": diff --git a/cds.py b/cds.py index adad49f..4ed77b2 100644 --- a/cds.py +++ b/cds.py @@ -3,6 +3,7 @@ Web platform: https://cds.climate.copernicus.eu """ +import os import re from pathlib import Path @@ -13,6 +14,8 @@ from rich import pretty, print, traceback traceback.install() pretty.install() +DATA_DIR = Path(os.environ.get("DATA_DIR", "data")) / "entropyc-rts" + def hourly(years: str): """Download ERA5 data from the Copernicus Data Store. @@ -28,7 +31,7 @@ def hourly(years: str): dataset = "reanalysis-era5-single-levels" client = cdsapi.Client(wait_until_complete=False) - outdir = Path("/isipd/projects/p_aicore_pf/tohoel001/era5-cds").resolve() + outdir = (DATA_DIR / "era5/cds").resolve() outdir.mkdir(parents=True, exist_ok=True) print(f"Downloading ERA5 data from {start_year} to {end_year}...") diff --git a/era5.py b/era5.py index d4b421e..c0deebc 100644 --- a/era5.py +++ b/era5.py @@ -1,23 +1,70 @@ """Download and preprocess ERA5 data. Variables of Interest: -- 2 metre temperature (t2m) -- Total precipitation (tp) -- Snow Fall (sf) -- Snow cover (snowc) -- Snow depth (sde) -- Surface sensible heat flux (sshf) -- Lake ice bottom temperature (lblt) +- 2 metre temperature (t2m) [instant] +- Total precipitation (tp) [accum] +- Snow Fall (sf) [accum] +- Snow cover (snowc) [instant] +- Snow depth (sde) [instant] +- Surface sensible heat flux (sshf) [accum] +- Lake ice bottom temperature (lblt) [instant] -Aggregations: -- Summer / Winter 20-bin histogram? +Daily Variables (downloaded from hourly data): +- t2m_daily_max +- t2m_daily_min +- tp_daily_sum +- sf_daily_sum +- snowc_daily_mean +- sde_daily_mean +- sshf_daily_sum +- lblt_daily_max -Spatial -> Enrich -> Temporal ? +Derived Daily Variables: +- t2m_daily_avg +- t2m_daily_range +- t2m_daily_skew +- thawing_degree_days +- freezing_degree_days +- thawing_days +- freezing_days +- precipitation_occurrences +- snowfall_occurrences +- snow_isolation (snowc * sde) + +Monthly Variables: +- t2m_monthly_max +- t2m_monthly_min +- tp_monthly_sum +- sf_monthly_sum +- snowc_monthly_mean +- sde_monthly_mean +- sshf_monthly_sum +- lblt_monthly_max +- t2m_monthly_avg +- t2m_monthly_range_avg +- t2m_monthly_skew_avg +- thawing_degree_days_monthly +- freezing_degree_days_monthly +- thawing_days_monthly +- freezing_days_monthly +- precipitation_occurrences_monthly TODO: Rename to precipitation_days_monthly? +- snowfall_occurrences_monthly TODO: Rename to snowfall_days_monthly? +- snow_isolation_monthly_mean + +Yearly Variables: +- TODO + +# TODO Variables: +- Day of first thaw (yearly) +- Day of last thaw (yearly) +- Thawing period length (yearly) +- Freezing period length (yearly) Author: Tobias Hölzer Date: 09. June 2025 """ +import os import time from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path @@ -34,24 +81,29 @@ import shapely.ops import xarray as xr from numcodecs.zarr3 import Blosc from rich import pretty, print, traceback +from rich.progress import track from shapely.geometry import LineString, Polygon -traceback.install(show_locals=True) +traceback.install(show_locals=True, suppress=[cyclopts, xr, pd]) pretty.install() -DATA_DIR = Path("data/era5") -AGG_PATH = DATA_DIR / "era5_agg.zarr" -ALIGNED_PATH = DATA_DIR / "era5_spatial_aligned.zarr" -MONTHLY_PATH = DATA_DIR / "era5_monthly.zarr" -YEARLY_PATH = DATA_DIR / "era5_yearly.zarr" +cli = cyclopts.App() + +# TODO: Directly handle stuff on a grid level - this is more what the zarr access is indented to do + +DATA_DIR = Path(os.environ.get("DATA_DIR", "data")) / "entropyc-rts" +# DATA_DIR = Path("data") +ERA5_DIR = DATA_DIR / "era5" +AGG_PATH = ERA5_DIR / "era5_agg.zarr" +ALIGNED_PATH = ERA5_DIR / "era5_spatial_aligned.zarr" +MONTHLY_PATH = ERA5_DIR / "era5_monthly.zarr" +YEARLY_PATH = ERA5_DIR / "era5_yearly.zarr" min_lat = 50 -max_lat = 85 -min_time = "2022-01-01" +max_lat = 83.7 # Ensures the right Chunks Size (90 - 64 / 10 + 0.1) +min_time = "1990-01-01" max_time = "2024-12-31" -subset = {"latitude": slice(max_lat, min_lat), "time": slice(min_time, max_time)} -DATA_DIR = Path("/isipd/projects/p_aicore_pf/tohoel001/era5_thawing_data") today = time.strftime("%Y-%m-%d") @@ -63,20 +115,67 @@ today = time.strftime("%Y-%m-%d") # Enrich -> Aggregate temporally +# TODO: Rethink aggregations by differentiating between "instant" and "accum" variables: +# https://consensus.app/search/instantaneous-versus-accumulated-weather/JBaNbhc1R_-BwN5E9Un0Fw/ + +# ================ +# === Download === +# ================ + + def create_encoding(ds: xr.Dataset): + """Create compression encoding for zarr dataset storage. + + Creates Blosc compression configuration for all data variables and coordinates + in the dataset using zstd compression with level 9. + + Args: + ds (xr.Dataset): The xarray Dataset to create encoding for. + + Returns: + dict: Encoding dictionary with compression settings for each variable. + + """ # encoding = {var: {"compressors": BloscCodec(cname="zlib", clevel=9)} for var in ds.data_vars} encoding = {var: {"compressors": Blosc(cname="zstd", clevel=9)} for var in [*ds.data_vars, *ds.coords]} return encoding def download_daily_aggregated(): + """Download and aggregate ERA5 data to daily resolution. + + Downloads ERA5 reanalysis data from the DESTINE Earth Data Hub and aggregates + it to daily resolution. Includes temperature extremes, precipitation, snow, + and surface heat flux variables. + + The function downloads hourly data and creates daily aggregates: + - Temperature: daily min/max + - Precipitation and snowfall: daily totals + - Snow cover and depth: daily means + - Surface heat flux: daily totals + - Lake ice temperature: daily max + + The aggregated data is saved to a zarr file with compression. + """ era5 = xr.open_dataset( "https://data.earthdatahub.destine.eu/era5/reanalysis-era5-land-no-antartica-v0.zarr", storage_options={"client_kwargs": {"trust_env": True}}, - chunks={"latitude": 64 * 4, "longitude": 64 * 4}, + chunks={}, # chunks={}, engine="zarr", ).rename({"valid_time": "time"}) + subset = { + "latitude": slice(max_lat, min_lat), + } + + # Compute the clostest chunk-start to min_time, to avoid problems with cropped chunks at the start + tchunksize = era5.chunksizes["time"][0] + era5_chunk_starts = pd.date_range(era5.time.min().item(), era5.time.max().item(), freq=f"{tchunksize}h") + closest_chunk_start = era5_chunk_starts[ + era5_chunk_starts.get_indexer([pd.to_datetime(min_time)], method="ffill")[0] + ] + subset["time"] = slice(str(closest_chunk_start), max_time) + era5 = era5.sel(**subset) era5_agg = xr.merge( @@ -84,38 +183,59 @@ def download_daily_aggregated(): era5.t2m.resample(time="1D").max().rename("t2m_daily_max"), era5.t2m.resample(time="1D").min().rename("t2m_daily_min"), era5.tp.resample(time="1D").sum().rename("tp_daily_sum"), - # era5.sf.resample(time="1D").sum().rename("sf_daily_sum"), - # era5.snowc.resample(time="1D").mean().rename("snowc_daily_mean"), - # era5.sde.resample(time="1D").mean().rename("sde_daily_mean"), - # era5.sshf.resample(time="1D").sum().rename("sshf_daily_sum"), - # era5.lblt.resample(time="1D").max().rename("lblt_daily_max"), + era5.sf.resample(time="1D").sum().rename("sf_daily_sum"), + era5.snowc.resample(time="1D").mean().rename("snowc_daily_mean"), + era5.sde.resample(time="1D").mean().rename("sde_daily_mean"), + era5.sshf.resample(time="1D").sum().rename("sshf_daily_sum"), + era5.lblt.resample(time="1D").max().rename("lblt_daily_max"), ] ) - # Rechunk if the first time chunk is not the same as the middle ones - if era5_agg.chunksizes["time"][0] != era5_agg.chunksizes["time"][1]: - era5_agg = era5_agg.chunk({"time": 120}) - # Assign attributes era5_agg["t2m_daily_max"].attrs = {"long_name": "Daily maximum 2 metre temperature", "units": "K"} era5_agg["t2m_daily_min"].attrs = {"long_name": "Daily minimum 2 metre temperature", "units": "K"} era5_agg["tp_daily_sum"].attrs = {"long_name": "Daily total precipitation", "units": "m"} - # era5_agg["sf_daily_sum"].attrs = {"long_name": "Daily total snow fall", "units": "m"} - # era5_agg["snowc_daily_mean"].attrs = {"long_name": "Daily mean snow cover", "units": "m"} - # era5_agg["sde_daily_mean"].attrs = {"long_name": "Daily mean snow depth", "units": "m"} - # era5_agg["sshf_daily_sum"].attrs = {"long_name": "Daily total surface sensible heat flux", "units": "J/m²"} - # era5_agg["lblt_daily_max"].attrs = {"long_name": "Daily maximum lake ice bottom temperature", "units": "K"} + era5_agg["sf_daily_sum"].attrs = {"long_name": "Daily total snow fall", "units": "m"} + era5_agg["snowc_daily_mean"].attrs = {"long_name": "Daily mean snow cover", "units": "m"} + era5_agg["sde_daily_mean"].attrs = {"long_name": "Daily mean snow depth", "units": "m"} + era5_agg["sshf_daily_sum"].attrs = {"long_name": "Daily total surface sensible heat flux", "units": "J/m²"} + era5_agg["lblt_daily_max"].attrs = {"long_name": "Daily maximum lake ice bottom temperature", "units": "K"} + era5_agg = era5_agg.odc.assign_crs("epsg:4326") + era5_agg = era5_agg.drop_vars(["surface", "number", "depthBelowLandLayer"]) era5_agg.to_zarr(AGG_PATH, mode="w", encoding=create_encoding(era5_agg), consolidated=False) -def crosses_antimeridian(geom: Polygon) -> bool: +@cli.command +def download(): + """Download ERA5 data using Dask cluster for parallel processing. + + Creates a local Dask cluster and downloads daily aggregated ERA5 data. + The cluster is configured with a single worker with 10 threads and 100GB + memory limit for optimal performance. + """ + with ( + dd.LocalCluster(n_workers=1, threads_per_worker=10, memory_limit="100GB") as cluster, + dd.Client(cluster) as client, + ): + print(client) + print(client.dashboard_link) + download_daily_aggregated() + print("Downloaded and aggregated ERA5 data.") + + +# =========================== +# === Spatial Aggregation === +# =========================== + + +def _crosses_antimeridian(geom: Polygon) -> bool: coords = shapely.get_coordinates(geom) crosses_any_meridian = (coords[:, 0] > 0).any() and (coords[:, 0] < 0).any() return crosses_any_meridian and abs(coords[:, 0]).max() > 90 -def split_antimeridian_cell(geom: Polygon) -> list[Polygon]: +def _split_antimeridian_cell(geom: Polygon) -> list[Polygon]: # Assumes that it is a antimeridian hex coords = shapely.get_coordinates(geom) for i in range(coords.shape[0]): @@ -127,53 +247,134 @@ def split_antimeridian_cell(geom: Polygon) -> list[Polygon]: return list(polys.geoms) -def check_geobox(geobox): +def _check_geobox(geobox): x, y = geobox.shape return x > 1 and y > 1 def extract_cell_data(idx: int, geom: Polygon) -> xr.Dataset: - era5_agg = xr.open_zarr(AGG_PATH) - assert {"latitude", "longitude", "time"} == set(era5_agg.dims), ( - f"Expected dims ('latitude', 'longitude', 'time'), got {era5_agg.dims}" + """Extract ERA5 data for a specific grid cell geometry. + + Extracts and spatially averages ERA5 data within the bounds of a grid cell. + Handles antimeridian-crossing cells by splitting them appropriately. + The extracted data is written to the aligned zarr file. + + Args: + idx (int): Index of the grid cell. + geom (Polygon): Polygon geometry of the grid cell. + + Returns: + xr.Dataset or bool: Returns True if successful, False if cell doesn't + overlap with ERA5 data. + + """ + era5_agg = ( + xr.open_zarr(AGG_PATH, consolidated=False) + .set_coords("spatial_ref") + .drop_vars(["surface", "number", "depthBelowLandLayer"]) ) # cell.geometry is a shapely Polygon - if not crosses_antimeridian(geom): + if not _crosses_antimeridian(geom): geoms = [geom] # Split geometry in case it crossed antimeridian else: - geoms = split_antimeridian_cell(geom) + geoms = _split_antimeridian_cell(geom) cell_data = [] for geom in geoms: geom = odc.geo.Geometry(geom, crs="epsg:4326") - if not check_geobox(era5_agg.odc.geobox.enclosing(geom)): + if not _check_geobox(era5_agg.odc.geobox.enclosing(geom)): continue cell_data.append(era5_agg.odc.crop(geom).drop_vars("spatial_ref").mean(["latitude", "longitude"])) if len(cell_data) == 0: - return None + return False elif len(cell_data) == 1: - return cell_data[0].expand_dims({"cell": [idx]}).chunk({"cell": 1}) + cell_data = cell_data[0] else: - return xr.concat(cell_data, dim="part").mean("part").expand_dims({"cell": [idx]}).chunk({"cell": 1}) + cell_data = xr.concat(cell_data, dim="part").mean("part") + cell_data = cell_data.expand_dims({"cell": [idx]}).compute() + cell_data.to_zarr(ALIGNED_PATH, region="auto", consolidated=False) + return True -def spatial_matching(grid: gpd.GeoDataFrame, n_workers: int = 10): +@cli.command +def spatial_agg(grid: Literal["hex", "healpix"], level: int, n_workers: int = 10): + """Perform spatial aggregation of ERA5 data to grid cells. + + Loads a grid and spatially aggregates ERA5 data to each grid cell using + parallel processing. Creates an empty zarr file first, then fills it + with extracted data for each cell. + + Args: + grid ("hex", "healpix"): Grid type. + level (int): Grid resolution level. + n_workers (int, optional): Number of parallel workers to use. Defaults to 10. + + """ + gridname = f"permafrost_{grid}{level}" + grid = gpd.read_parquet(DATA_DIR / f"grids/{gridname}_grid.parquet") + # Create an empty zarr array with the right dimensions + era5_agg = ( + xr.open_zarr(AGG_PATH, consolidated=False) + .set_coords("spatial_ref") + .drop_vars(["surface", "number", "depthBelowLandLayer"]) + ) + assert {"latitude", "longitude", "time"} == set(era5_agg.dims), ( + f"Expected dims ('latitude', 'longitude', 'time'), got {era5_agg.dims}" + ) + assert era5_agg.odc.crs == "epsg:4326", f"Expected CRS 'epsg:4326', got {era5_agg.odc.crs}" + empty = ( + xr.zeros_like(era5_agg.isel(latitude=0, longitude=0)) + .expand_dims({"cell": [idx for idx, _ in grid.iterrows()]}) + .chunk({"cell": 1, "time": len(era5_agg.time)}) + ) + empty.to_zarr(ALIGNED_PATH, mode="w", consolidated=False, encoding=create_encoding(empty)) + + print(f"Starting spatial matching of {len(grid)} cells with {n_workers} workers...") + + # TODO: Maybe change to process pool executor? with ThreadPoolExecutor(max_workers=n_workers) as executor: futures = { executor.submit(extract_cell_data, idx, row.geometry): idx for idx, row in grid.to_crs("epsg:4326").iterrows() } - for future in as_completed(futures): + for future in track(as_completed(futures), total=len(futures), description="Processing cells"): idx = futures[future] try: - data = future.result() - data.to_zarr(ALIGNED_PATH, append_dim="cell", consolidated=False, encoding=create_encoding(data)) + flag = future.result() + if flag: + print(f"Successfully written cell {idx}") + else: + print(f"Cell {idx} did not overlap with ERA5 data.") except Exception as e: print(f"Error processing cell {idx}: {e}") + print(type(e)) + print("Finished spatial matching.") + + +# ============================ +# === Temporal Aggregation === +# ============================ def daily_enrich() -> xr.Dataset: - era5 = xr.open_zarr(ALIGNED_PATH) + """Enrich daily ERA5 data with derived climate variables. + + Loads spatially aligned ERA5 data and computes additional climate variables. + Creates derived variables including temperature statistics, degree days, and occurrence indicators. + + Derived variables include: + - Daily average and range temperature + - Temperature skewness + - Thawing and freezing degree days + - Thawing and freezing day counts + - Precipitation and snowfall occurrences + - Snow isolation index + + Returns: + xr.Dataset: Enriched dataset with original and derived variables. + + """ + era5 = xr.open_zarr(ALIGNED_PATH, consolidated=False).set_coords("spatial_ref") assert {"cell", "time"} == set(era5.dims), f"Expected dims ('cell', 'time'), got {era5.dims}" # Formulas based on Groeke et. al. (2025) Stochastic Weather generation... @@ -206,6 +407,15 @@ def daily_enrich() -> xr.Dataset: def monthly_aggregate(): + """Aggregate enriched daily ERA5 data to monthly resolution. + + Takes the enriched daily ERA5 data and creates monthly aggregates using + appropriate statistical functions for each variable type. Temperature + variables use min/max/mean, accumulation variables use sums, and derived + variables use appropriate aggregations. + + The aggregated monthly data is saved to a zarr file for further processing. + """ era5 = daily_enrich() assert {"cell", "time"} == set(era5.dims), f"Expected dims ('cell', 'time'), got {era5.dims}" @@ -213,32 +423,46 @@ def monthly_aggregate(): monthly = xr.merge( [ # Original variables - era5.t2m_daily_min.resample(time="1M").min().rename("t2m_monthly_min"), - era5.t2m_daily_max.resample(time="1M").max().rename("t2m_monthly_max"), - era5.tp_daily_sum.resample(time="1M").sum().rename("tp_monthly_sum"), - era5.sf_daily_sum.resample(time="1M").sum().rename("sf_monthly_sum"), - era5.snowc_daily_mean.resample(time="1M").mean().rename("snowc_monthly_mean"), - era5.sde_daily_mean.resample(time="1M").mean().rename("sde_monthly_mean"), - era5.sshf_daily_sum.resample(time="1M").sum().rename("sshf_monthly_sum"), - era5.lblt_daily_max.resample(time="1M").max().rename("lblt_monthly_max"), + era5.t2m_daily_min.resample(time="1ME").min().rename("t2m_monthly_min"), + era5.t2m_daily_max.resample(time="1ME").max().rename("t2m_monthly_max"), + era5.tp_daily_sum.resample(time="1ME").sum().rename("tp_monthly_sum"), + era5.sf_daily_sum.resample(time="1ME").sum().rename("sf_monthly_sum"), + era5.snowc_daily_mean.resample(time="1ME").mean().rename("snowc_monthly_mean"), + era5.sde_daily_mean.resample(time="1ME").mean().rename("sde_monthly_mean"), + era5.sshf_daily_sum.resample(time="1ME").sum().rename("sshf_monthly_sum"), + era5.lblt_daily_max.resample(time="1ME").max().rename("lblt_monthly_max"), # Enriched variables - era5.t2m_daily_avg.resample(time="1M").mean().rename("t2m_monthly_avg"), - era5.t2m_daily_range.resample(time="1M").mean().rename("t2m_daily_range_monthly_avg"), - era5.t2m_daily_skew.resample(time="1M").mean().rename("t2m_daily_skew_monthly_avg"), - era5.thawing_degree_days.resample(time="1M").sum().rename("thawing_degree_days_monthly"), - era5.freezing_degree_days.resample(time="1M").sum().rename("freezing_degree_days_monthly"), - era5.thawing_days.resample(time="1M").sum().rename("thawing_days_monthly"), - era5.freezing_days.resample(time="1M").sum().rename("freezing_days_monthly"), - era5.precipitation_occurrences.resample(time="1M").sum().rename("precipitation_occurrences_monthly"), - era5.snowfall_occurrences.resample(time="1M").sum().rename("snowfall_occurrences_monthly"), - era5.snow_isolation.resample(time="1M").mean().rename("snow_isolation_monthly_mean"), + era5.t2m_daily_avg.resample(time="1ME").mean().rename("t2m_monthly_avg"), + era5.t2m_daily_range.resample(time="1ME").mean().rename("t2m_monthly_range_avg"), + era5.t2m_daily_skew.resample(time="1ME").mean().rename("t2m_monthly_skew_avg"), + era5.thawing_degree_days.resample(time="1ME").sum().rename("thawing_degree_days_monthly"), + era5.freezing_degree_days.resample(time="1ME").sum().rename("freezing_degree_days_monthly"), + era5.thawing_days.resample(time="1ME").sum().rename("thawing_days_monthly"), + era5.freezing_days.resample(time="1ME").sum().rename("freezing_days_monthly"), + era5.precipitation_occurrences.resample(time="1ME").sum().rename("precipitation_occurrences_monthly"), + era5.snowfall_occurrences.resample(time="1ME").sum().rename("snowfall_occurrences_monthly"), + era5.snow_isolation.resample(time="1ME").mean().rename("snow_isolation_monthly_mean"), ] ) monthly.to_zarr(MONTHLY_PATH, mode="w", encoding=create_encoding(monthly), consolidated=False) def yearly_aggregate(): - monthly = xr.open_zarr(MONTHLY_PATH) + """Aggregate monthly ERA5 data to yearly resolution with seasonal splits. + + Takes monthly aggregated data and creates yearly aggregates using a shifted + calendar (October to September) to better capture Arctic seasonal patterns. + Creates separate aggregates for full year, winter (Oct-Apr), and summer + (May-Sep) periods. + + The first and last incomplete years are excluded from the analysis. + Winter months are defined as months 1-7 in the shifted calendar, + and summer months are 8-12. + + The final dataset includes yearly, winter, and summer aggregates for all + climate variables, saved to a zarr file. + """ + monthly = xr.open_zarr(MONTHLY_PATH, consolidated=False).set_coords("spatial_ref") assert {"cell", "time"} == set(monthly.dims), f"Expected dims ('cell', 'time'), got {monthly.dims}" # Yearly aggregates (shifted by +10 months to start in Oktober, first and last years will be cropped) @@ -249,32 +473,34 @@ def yearly_aggregate(): yearly = xr.merge( [ # Original variables - monthly_shifted.t2m_monthly_min.resample(time="1Y").min().rename("t2m_yearly_min"), - monthly_shifted.t2m_monthly_max.resample(time="1Y").max().rename("t2m_yearly_max"), - monthly_shifted.tp_monthly_sum.resample(time="1Y").sum().rename("tp_yearly_sum"), - monthly_shifted.sf_monthly_sum.resample(time="1Y").sum().rename("sf_yearly_sum"), - monthly_shifted.snowc_monthly_mean.resample(time="1Y").mean().rename("snowc_yearly_mean"), - monthly_shifted.sde_monthly_mean.resample(time="1Y").mean().rename("sde_yearly_mean"), - monthly_shifted.sshf_monthly_sum.resample(time="1Y").sum().rename("sshf_yearly_sum"), - monthly_shifted.lblt_monthly_max.resample(time="1Y").max().rename("lblt_yearly_max"), + monthly_shifted.t2m_monthly_min.resample(time="1YE").min().rename("t2m_yearly_min"), + monthly_shifted.t2m_monthly_max.resample(time="1YE").max().rename("t2m_yearly_max"), + monthly_shifted.tp_monthly_sum.resample(time="1YE").sum().rename("tp_yearly_sum"), + monthly_shifted.sf_monthly_sum.resample(time="1YE").sum().rename("sf_yearly_sum"), + monthly_shifted.snowc_monthly_mean.resample(time="1YE").mean().rename("snowc_yearly_mean"), + monthly_shifted.sde_monthly_mean.resample(time="1YE").mean().rename("sde_yearly_mean"), + monthly_shifted.sshf_monthly_sum.resample(time="1YE").sum().rename("sshf_yearly_sum"), + monthly_shifted.lblt_monthly_max.resample(time="1YE").max().rename("lblt_yearly_max"), # Enriched variables - monthly_shifted.t2m_monthly_avg.resample(time="1Y").mean().rename("t2m_yearly_avg"), + monthly_shifted.t2m_monthly_avg.resample(time="1YE").mean().rename("t2m_yearly_avg"), # TODO: Check if this is correct -> use daily / hourly data instead for range and skew? - monthly_shifted.t2m_monthly_range.resample(time="1Y").mean().rename("t2m_daily_range_yearly_avg"), - monthly_shifted.t2m_monthly_skew.resample(time="1Y").mean().rename("t2m_daily_skew_yearly_avg"), - monthly_shifted.thawing_degree_days_monthly.resample(time="1Y").sum().rename("thawing_degree_days_yearly"), - monthly_shifted.freezing_degree_days_monthly.resample(time="1Y") + monthly_shifted.t2m_monthly_range_avg.resample(time="1YE").mean().rename("t2m_daily_range_yearly_avg"), + monthly_shifted.t2m_monthly_skew_avg.resample(time="1YE").mean().rename("t2m_daily_skew_yearly_avg"), + monthly_shifted.thawing_degree_days_monthly.resample(time="1YE").sum().rename("thawing_degree_days_yearly"), + monthly_shifted.freezing_degree_days_monthly.resample(time="1YE") .sum() .rename("freezing_degree_days_yearly"), - monthly_shifted.thawing_days_monthly.resample(time="1Y").sum().rename("thawing_days_yearly"), - monthly_shifted.freezing_days_monthly.resample(time="1Y").sum().rename("freezing_days_yearly"), - monthly_shifted.precipitation_occurrences_monthly.resample(time="1Y") + monthly_shifted.thawing_days_monthly.resample(time="1YE").sum().rename("thawing_days_yearly"), + monthly_shifted.freezing_days_monthly.resample(time="1YE").sum().rename("freezing_days_yearly"), + monthly_shifted.precipitation_occurrences_monthly.resample(time="1YE") .sum() .rename("precipitation_occurrences_yearly"), - monthly_shifted.snowfall_occurrences_monthly.resample(time="1Y") + monthly_shifted.snowfall_occurrences_monthly.resample(time="1YE") .sum() .rename("snowfall_occurrences_yearly"), - monthly_shifted.snow_isolation_monthly_mean.resample(time="1Y").mean().rename("snow_isolation_yearly_mean"), + monthly_shifted.snow_isolation_monthly_mean.resample(time="1YE") + .mean() + .rename("snow_isolation_yearly_mean"), ] ) # Summer / Winter aggregates @@ -286,34 +512,36 @@ def yearly_aggregate(): winter = xr.merge( [ # Original variables - monthly_shifted_winter.t2m_monthly_min.resample(time="1Y").min().rename("t2m_winter_min"), - monthly_shifted_winter.t2m_monthly_max.resample(time="1Y").max().rename("t2m_winter_max"), - monthly_shifted_winter.tp_monthly_sum.resample(time="1Y").sum().rename("tp_winter_sum"), - monthly_shifted_winter.sf_monthly_sum.resample(time="1Y").sum().rename("sf_winter_sum"), - monthly_shifted_winter.snowc_monthly_mean.resample(time="1Y").mean().rename("snowc_winter_mean"), - monthly_shifted_winter.sde_monthly_mean.resample(time="1Y").mean().rename("sde_winter_mean"), - monthly_shifted_winter.sshf_monthly_sum.resample(time="1Y").sum().rename("sshf_winter_sum"), - monthly_shifted_winter.lblt_monthly_max.resample(time="1Y").max().rename("lblt_winter_max"), + monthly_shifted_winter.t2m_monthly_min.resample(time="1YE").min().rename("t2m_winter_min"), + monthly_shifted_winter.t2m_monthly_max.resample(time="1YE").max().rename("t2m_winter_max"), + monthly_shifted_winter.tp_monthly_sum.resample(time="1YE").sum().rename("tp_winter_sum"), + monthly_shifted_winter.sf_monthly_sum.resample(time="1YE").sum().rename("sf_winter_sum"), + monthly_shifted_winter.snowc_monthly_mean.resample(time="1YE").mean().rename("snowc_winter_mean"), + monthly_shifted_winter.sde_monthly_mean.resample(time="1YE").mean().rename("sde_winter_mean"), + monthly_shifted_winter.sshf_monthly_sum.resample(time="1YE").sum().rename("sshf_winter_sum"), + monthly_shifted_winter.lblt_monthly_max.resample(time="1YE").max().rename("lblt_winter_max"), # Enriched variables - monthly_shifted_winter.t2m_monthly_avg.resample(time="1Y").mean().rename("t2m_winter_avg"), + monthly_shifted_winter.t2m_monthly_avg.resample(time="1YE").mean().rename("t2m_winter_avg"), # TODO: Check if this is correct -> use daily / hourly data instead for range and skew? - monthly_shifted_winter.t2m_monthly_range.resample(time="1Y").mean().rename("t2m_daily_range_winter_avg"), - monthly_shifted_winter.t2m_monthly_skew.resample(time="1Y").mean().rename("t2m_daily_skew_winter_avg"), - monthly_shifted_winter.thawing_degree_days_monthly.resample(time="1Y") + monthly_shifted_winter.t2m_monthly_range_avg.resample(time="1YE") + .mean() + .rename("t2m_daily_range_winter_avg"), + monthly_shifted_winter.t2m_monthly_skew_avg.resample(time="1YE").mean().rename("t2m_daily_skew_winter_avg"), + monthly_shifted_winter.thawing_degree_days_monthly.resample(time="1YE") .sum() .rename("thawing_degree_days_winter"), - monthly_shifted_winter.freezing_degree_days_monthly.resample(time="1Y") + monthly_shifted_winter.freezing_degree_days_monthly.resample(time="1YE") .sum() .rename("freezing_degree_days_winter"), - monthly_shifted_winter.thawing_days_monthly.resample(time="1Y").sum().rename("thawing_days_winter"), - monthly_shifted_winter.freezing_days_monthly.resample(time="1Y").sum().rename("freezing_days_winter"), - monthly_shifted_winter.precipitation_occurrences_monthly.resample(time="1Y") + monthly_shifted_winter.thawing_days_monthly.resample(time="1YE").sum().rename("thawing_days_winter"), + monthly_shifted_winter.freezing_days_monthly.resample(time="1YE").sum().rename("freezing_days_winter"), + monthly_shifted_winter.precipitation_occurrences_monthly.resample(time="1YE") .sum() .rename("precipitation_occurrences_winter"), - monthly_shifted_winter.snowfall_occurrences_monthly.resample(time="1Y") + monthly_shifted_winter.snowfall_occurrences_monthly.resample(time="1YE") .sum() .rename("snowfall_occurrences_winter"), - monthly_shifted_winter.snow_isolation_monthly_mean.resample(time="1Y") + monthly_shifted_winter.snow_isolation_monthly_mean.resample(time="1YE") .mean() .rename("snow_isolation_winter_mean"), ] @@ -322,34 +550,36 @@ def yearly_aggregate(): summer = xr.merge( [ # Original variables - monthly_shifted_summer.t2m_monthly_min.resample(time="1Y").min().rename("t2m_summer_min"), - monthly_shifted_summer.t2m_monthly_max.resample(time="1Y").max().rename("t2m_summer_max"), - monthly_shifted_summer.tp_monthly_sum.resample(time="1Y").sum().rename("tp_summer_sum"), - monthly_shifted_summer.sf_monthly_sum.resample(time="1Y").sum().rename("sf_summer_sum"), - monthly_shifted_summer.snowc_monthly_mean.resample(time="1Y").mean().rename("snowc_summer_mean"), - monthly_shifted_summer.sde_monthly_mean.resample(time="1Y").mean().rename("sde_summer_mean"), - monthly_shifted_summer.sshf_monthly_sum.resample(time="1Y").sum().rename("sshf_summer_sum"), - monthly_shifted_summer.lblt_monthly_max.resample(time="1Y").max().rename("lblt_summer_max"), + monthly_shifted_summer.t2m_monthly_min.resample(time="1YE").min().rename("t2m_summer_min"), + monthly_shifted_summer.t2m_monthly_max.resample(time="1YE").max().rename("t2m_summer_max"), + monthly_shifted_summer.tp_monthly_sum.resample(time="1YE").sum().rename("tp_summer_sum"), + monthly_shifted_summer.sf_monthly_sum.resample(time="1YE").sum().rename("sf_summer_sum"), + monthly_shifted_summer.snowc_monthly_mean.resample(time="1YE").mean().rename("snowc_summer_mean"), + monthly_shifted_summer.sde_monthly_mean.resample(time="1YE").mean().rename("sde_summer_mean"), + monthly_shifted_summer.sshf_monthly_sum.resample(time="1YE").sum().rename("sshf_summer_sum"), + monthly_shifted_summer.lblt_monthly_max.resample(time="1YE").max().rename("lblt_summer_max"), # Enriched variables - monthly_shifted_summer.t2m_monthly_avg.resample(time="1Y").mean().rename("t2m_summer_avg"), + monthly_shifted_summer.t2m_monthly_avg.resample(time="1YE").mean().rename("t2m_summer_avg"), # TODO: Check if this is correct -> use daily / hourly data instead for range and skew? - monthly_shifted_summer.t2m_monthly_range.resample(time="1Y").mean().rename("t2m_daily_range_summer_avg"), - monthly_shifted_summer.t2m_monthly_skew.resample(time="1Y").mean().rename("t2m_daily_skew_summer_avg"), - monthly_shifted_summer.thawing_degree_days_summer.resample(time="1Y") + monthly_shifted_summer.t2m_monthly_range_avg.resample(time="1YE") + .mean() + .rename("t2m_daily_range_summer_avg"), + monthly_shifted_summer.t2m_monthly_skew_avg.resample(time="1YE").mean().rename("t2m_daily_skew_summer_avg"), + monthly_shifted_summer.thawing_degree_days_monthly.resample(time="1YE") .sum() .rename("thawing_degree_days_summer"), - monthly_shifted_summer.freezing_degree_days_summer.resample(time="1Y") + monthly_shifted_summer.freezing_degree_days_monthly.resample(time="1YE") .sum() .rename("freezing_degree_days_summer"), - monthly_shifted_summer.thawing_days_summer.resample(time="1Y").sum().rename("thawing_days_summer"), - monthly_shifted_summer.freezing_days_summer.resample(time="1Y").sum().rename("freezing_days_summer"), - monthly_shifted_summer.precipitation_occurrences_summer.resample(time="1Y") + monthly_shifted_summer.thawing_days_monthly.resample(time="1YE").sum().rename("thawing_days_summer"), + monthly_shifted_summer.freezing_days_monthly.resample(time="1YE").sum().rename("freezing_days_summer"), + monthly_shifted_summer.precipitation_occurrences_monthly.resample(time="1YE") .sum() .rename("precipitation_occurrences_summer"), - monthly_shifted_summer.snowfall_occurrences_summer.resample(time="1Y") + monthly_shifted_summer.snowfall_occurrences_monthly.resample(time="1YE") .sum() .rename("snowfall_occurrences_summer"), - monthly_shifted_summer.snow_isolation_summer.resample(time="1Y") + monthly_shifted_summer.snow_isolation_monthly_mean.resample(time="1YE") .mean() .rename("snow_isolation_summer_mean"), ] @@ -359,32 +589,28 @@ def yearly_aggregate(): combined.to_zarr(YEARLY_PATH, mode="w", encoding=create_encoding(combined), consolidated=False) -def cli(grid: Literal["hex", "healpix"], level: int, download: bool = False, n_workers: int = 10): - """Run the CLI for ERA5 data processing. +@cli.command +def temporal_agg(n_workers: int = 10): + """Perform temporal aggregation of ERA5 data using Dask cluster. + + Creates a Dask cluster and runs both monthly and yearly aggregation + functions to generate temporally aggregated climate datasets. The + processing uses parallel workers for efficient computation. Args: - grid (Literal["hex", "healpix"]): The grid type to use. - level (int): The processing level. - download (bool, optional): Whether to download data. Defaults to False. - n_workers (int, optional): Number of workers for parallel processing. Defaults to 10. + n_workers (int, optional): Number of Dask workers to use. Defaults to 10. """ - cluster = dd.LocalCluster(n_workers=n_workers, threads_per_worker=4, memory_limit="20GB") - client = dd.Client(cluster) - print(client) - print(client.dashboard_link) - - if download: - download_daily_aggregated() - print("Downloaded and aggregated ERA5 data.") - - grid = gpd.read_parquet(DATA_DIR / f"grids/permafrost_{grid}{level}_grid.parquet") - spatial_matching(grid, n_workers=n_workers) - print("Spatially matched ERA5 data to grid.") - monthly_aggregate() - yearly_aggregate() - print("Enriched ERA5 data with additional features and aggregated it temporally.") + with ( + dd.LocalCluster(n_workers=n_workers, threads_per_worker=20, memory_limit="10GB") as cluster, + dd.Client(cluster) as client, + ): + print(client) + print(client.dashboard_link) + monthly_aggregate() + yearly_aggregate() + print("Enriched ERA5 data with additional features and aggregated it temporally.") if __name__ == "__main__": - cyclopts.run(cli) + cli() diff --git a/pyproject.toml b/pyproject.toml index 2fde7b0..e74046b 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ dependencies = [ "distributed>=2025.5.1", "earthengine-api>=1.6.9", "eemont>=2025.7.1", - # "entropyc", + "entropyc", "flox>=0.10.4", "folium>=0.19.7", "geemap>=0.36.3", @@ -37,12 +37,12 @@ dependencies = [ "smart-geocubes[arcticdem,dask,stac,viz]>=0.0.9", "stopuhr>=0.0.10", "xanimate", - "xarray>=2025.4.0", + "xarray>=2025.9.0", "xdggs>=0.2.1", "xvec>=0.5.1", - "zarr[remote]>=3.0.8", + "zarr[remote]>=3.1.3", ] [tool.uv.sources] -# entropyc = { git = "ssh://git@github.com/AlbertEMC2Stein/entropyc", branch = "refactor/tobi" } +entropyc = { git = "ssh://git@github.com/AlbertEMC2Stein/entropyc", branch = "refactor/tobi" } xanimate = { git = "https://github.com/davbyr/xAnimate" } diff --git a/uv.lock b/uv.lock index cb33562..ec1a9a0 100755 --- a/uv.lock +++ b/uv.lock @@ -1075,6 +1075,7 @@ dependencies = [ { name = "distributed" }, { name = "earthengine-api" }, { name = "eemont" }, + { name = "entropyc" }, { name = "flox" }, { name = "folium" }, { name = "geemap" }, @@ -1113,6 +1114,7 @@ requires-dist = [ { name = "distributed", specifier = ">=2025.5.1" }, { name = "earthengine-api", specifier = ">=1.6.9" }, { name = "eemont", specifier = ">=2025.7.1" }, + { name = "entropyc", git = "ssh://git@github.com/AlbertEMC2Stein/entropyc?branch=refactor%2Ftobi" }, { name = "flox", specifier = ">=0.10.4" }, { name = "folium", specifier = ">=0.19.7" }, { name = "geemap", specifier = ">=0.36.3" }, @@ -1134,10 +1136,19 @@ requires-dist = [ { name = "smart-geocubes", extras = ["arcticdem", "dask", "stac", "viz"], specifier = ">=0.0.9" }, { name = "stopuhr", specifier = ">=0.0.10" }, { name = "xanimate", git = "https://github.com/davbyr/xAnimate" }, - { name = "xarray", specifier = ">=2025.4.0" }, + { name = "xarray", specifier = ">=2025.9.0" }, { name = "xdggs", specifier = ">=0.2.1" }, { name = "xvec", specifier = ">=0.5.1" }, - { name = "zarr", extras = ["remote"], specifier = ">=3.0.8" }, + { name = "zarr", extras = ["remote"], specifier = ">=3.1.3" }, +] + +[[package]] +name = "entropyc" +version = "0.1.0" +source = { git = "ssh://git@github.com/AlbertEMC2Stein/entropyc?branch=refactor%2Ftobi#22a191d194a76b6c182481acb2af1bde3f60b49e" } +dependencies = [ + { name = "numpy" }, + { name = "scipy" }, ] [[package]]