Finalize era5 and alphaearth

This commit is contained in:
Tobias Hölzer 2025-10-24 16:36:18 +02:00
parent ce4c728e1a
commit a562b2cf72
6 changed files with 1993 additions and 1392 deletions

View file

@ -8,6 +8,7 @@ requires-python = ">=3.12"
dependencies = [
"aiohttp>=3.12.11",
"bokeh>=3.7.3",
"bottleneck>=1.6.0",
"cartopy>=0.24.1",
"cdsapi>=0.7.6",
"cyclopts>=4.0.0",
@ -27,8 +28,11 @@ dependencies = [
"mapclassify>=2.9.0",
"matplotlib>=3.10.3",
"netcdf4>=1.7.2",
"numba>=0.62.1",
"numbagg>=0.9.3",
"numpy>=2.3.0",
"odc-geo[all]>=0.4.10",
"opt-einsum>=3.4.0",
"pyarrow>=20.0.0",
"requests>=2.32.3",
"rich>=14.0.0",

View file

@ -11,6 +11,7 @@ import geemap
import geopandas as gpd
import numpy as np
import pandas as pd
import xarray as xr
from rich import pretty, print, traceback
from rich.progress import track
@ -26,7 +27,11 @@ EMBEDDINGS_DIR = DATA_DIR / "embeddings"
EMBEDDINGS_DIR.mkdir(parents=True, exist_ok=True)
def cli(grid: Literal["hex", "healpix"], level: int, backup_intermediate: bool = False):
cli = cyclopts.App(name="alpha-earth")
@cli.command()
def download(grid: Literal["hex", "healpix"], level: int, backup_intermediate: bool = False):
"""Extract satellite embeddings from Google Earth Engine and map them to a grid.
Args:
@ -93,8 +98,49 @@ def cli(grid: Literal["hex", "healpix"], level: int, backup_intermediate: bool =
print(f"Saved embeddings for year {year} to {embeddings_file.resolve()}.")
@cli.command()
def combine_to_zarr(grid: Literal["hex", "healpix"], level: int):
"""Combine yearly embeddings parquet files into a single zarr store.
Args:
grid (Literal["hex", "healpix"]): The grid type to use.
level (int): The grid level to use.
"""
embs = gpd.read_parquet(DATA_DIR / "embeddings" / f"permafrost_{grid}{level}_embeddings-2017.parquet")
# ? Converting cell IDs from hex strings to integers for xdggs compatibility
cells = [int(cid, 16) for cid in embs.cell_id.to_list()]
years = list(range(2017, 2025))
aggs = ["median", "stdDev", "min", "max", "mean", "p1", "p5", "p25", "p75", "p95", "p99"]
bands = [f"A{str(i).zfill(2)}" for i in range(64)]
a = xr.DataArray(
np.nan,
dims=("year", "cell", "band", "agg"),
coords={"year": years, "cell": cells, "band": bands, "agg": aggs},
)
# ? These attributes are needed for xdggs
a.cell.attrs = {
"grid_name": "h3" if grid == "hex" else "healpix",
"level": level,
}
if grid == "healpix":
a.cell.attrs["indexing_scheme"] = "nested"
for year in track(years, total=len(years), description="Processing years..."):
embs = gpd.read_parquet(DATA_DIR / "embeddings" / f"permafrost_{grid}{level}_embeddings-{year}.parquet")
for band in bands:
for agg in aggs:
col = f"{band}_{agg}"
a.loc[{"band": band, "agg": agg, "year": year}] = embs[col].to_list()
zarr_path = EMBEDDINGS_DIR / f"permafrost_{grid}{level}_embeddings.zarr"
a.to_zarr(zarr_path, consolidated=False, mode="w")
print(f"Saved combined embeddings to {zarr_path.resolve()}.")
def main(): # noqa: D103
cyclopts.run(cli)
cli()
if __name__ == "__main__":

View file

@ -1,9 +1,17 @@
#!/bin/bash
# uv run alpha-earth --grid hex --level 3
uv run alpha-earth --grid hex --level 4
uv run alpha-earth --grid hex --level 5
uv run alpha-earth --grid healpix --level 6
uv run alpha-earth --grid healpix --level 7
uv run alpha-earth --grid healpix --level 8
uv run alpha-earth --grid healpix --level 9
# uv run alpha-earth download --grid hex --level 3
# uv run alpha-earth download --grid hex --level 4
# uv run alpha-earth download --grid hex --level 5
# uv run alpha-earth download --grid healpix --level 6
# uv run alpha-earth download --grid healpix --level 7
# uv run alpha-earth download --grid healpix --level 8
# uv run alpha-earth download --grid healpix --level 9
uv run alpha-earth combine-to-zarr --grid hex --level 3
uv run alpha-earth combine-to-zarr --grid hex --level 4
uv run alpha-earth combine-to-zarr --grid hex --level 5
uv run alpha-earth combine-to-zarr --grid healpix --level 6
uv run alpha-earth combine-to-zarr --grid healpix --level 7
uv run alpha-earth combine-to-zarr --grid healpix --level 8
uv run alpha-earth combine-to-zarr --grid healpix --level 9

View file

@ -12,11 +12,12 @@ Variables of Interest:
Naming patterns:
- Instant Variables are downloaded already as statistically aggregated (lossy),
therefore their names get the aggregation as suffix
- Accumulation Variables are downloaded as totals, their names stay the same
- Accumulation Variables are downloaded as totals (sum), their names stay the same
Daily Variables (downloaded from hourly data):
- t2m_max
- t2m_min
- t2m_mean
- snowc_mean
- sde_mean
- lblt_max
@ -25,80 +26,79 @@ Daily Variables (downloaded from hourly data):
- sshf
Derived Daily Variables:
- t2m_daily_avg
- t2m_daily_range
- t2m_daily_skew
- thawing_degree_days
- freezing_degree_days
- thawing_days
- freezing_days
- precipitation_occurrences
- snowfall_occurrences
- snow_isolation (snowc * sde)
- t2m_range [instant]: t2m_max - t2m_min
- t2m_avg [instant]: (t2m_max - t2m_min) / 2
- t2m_skew [instant]: (t2m_mean - t2m_min) / t2m_range
- thawing_degree_days [accum]: (t2m_avg - 273.15).clip(min=0)
- freezing_degree_days [accum]: (273.15 - t2m_avg).clip(min=0)
- thawing_days [accum]: (t2m_avg > 273.15).astype(int)
- freezing_days [accum]: (t2m_avg < 273.15).astype(int)
- precipitation_occurrences [accum]: (tp > 0.001).astype(int)
- snowfall_occurrences [accum]: (sf > 0.001).astype(int)
- naive_snow_isolation [instant]: snowc_mean * sde_mean
Monthly Variables:
- t2m_monthly_max
- t2m_monthly_min
- tp_monthly_sum
- sf_monthly_sum
- snowc_monthly_mean
- sde_monthly_mean
- sshf_monthly_sum
- lblt_monthly_max
- t2m_monthly_avg
- t2m_monthly_range_avg
- t2m_monthly_skew_avg
- thawing_degree_days_monthly
- freezing_degree_days_monthly
- thawing_days_monthly
- freezing_days_monthly
- precipitation_occurrences_monthly TODO: Rename to precipitation_days_monthly?
- snowfall_occurrences_monthly TODO: Rename to snowfall_days_monthly?
- snow_isolation_monthly_mean
Yearly Variables:
- TODO
Monthly, Winter, Summer & Yearly Aggregations (Names don't change):
- instant variables:
- *_min -> min
- *_max -> max
- *_rest -> median
- accum variables: sum
# TODO Variables:
- Day of first thaw (yearly)
- Day of last thaw (yearly)
- Thawing period length (yearly)
- Freezing period length (yearly)
Derived & (from monthly) Aggregated Winter Variables:
- effective_snow_depth [instant]: (sde_mean * M + 1 - m).sum(M) / (m).sum(M),see also https://tc.copernicus.org/articles/11/989/2017/tc-11-989-2017.pdf
Derived & (from daily) Aggregated Yearly Variables:
- day_of_first_thaw [yearly]: First day in year where t2m_daily_avg > 273.15
- day_of_last_thaw [yearly]: Last day in year where t2m_daily_avg > 273.15
- thawing_period_length [yearly]: day_of_last_thaw - day_of_first_thaw
- day_of_first_freeze [yearly]: First day in year where t2m_daily_avg < 273.15
- day_of_last_freeze [yearly]: Last day in year where t2m_daily_avg < 273.15
About yearly aggregates:
- A year always starts on 1st October and ends on 30th September of the next year
to better capture the Arctic seasonal cycle.
- Thus year == 2020 means 1st Oct 2019 - 30th Sep 2020
- Thus winter == 2020 means 1st Oct 2019 - 31th March 2020
- Thus summer == 2020 means 1st April 2020 - 30th Sep 2020
Author: Tobias Hölzer
Date: 09. June 2025
Date: June to October 2025
"""
import os
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Literal
import cyclopts
import dask.distributed as dd
import geopandas as gpd
import numpy as np
import odc.geo
import odc.geo.xr
import pandas as pd
import shapely
import shapely.ops
import xarray as xr
from numcodecs.zarr3 import Blosc
from rich import pretty, print, traceback
from rich.progress import track
from shapely.geometry import LineString, Polygon
from zarr.codecs import BloscCodec
traceback.install(show_locals=True, suppress=[cyclopts, xr, pd])
pretty.install()
cli = cyclopts.App()
# TODO: Directly handle download on a grid level - this is more what the zarr access is indented to do
DATA_DIR = Path(os.environ.get("DATA_DIR", "data")) / "entropyc-rts"
ERA5_DIR = DATA_DIR / "era5"
DAILY_RAW_PATH = ERA5_DIR / "daily_raw.zarr"
DAILY_ENRICHED_PATH = ERA5_DIR / "daily_enriched.zarr"
MONTHLY_RAW_PATH = ERA5_DIR / "monthly_raw.zarr"
YEARLY_RAW_PATH = ERA5_DIR / "yearly_aligned.zarr"
SUMMER_RAW_PATH = ERA5_DIR / "summer_aligned.zarr"
WINTER_RAW_PATH = ERA5_DIR / "winter_aligned.zarr"
def _get_grid_paths(
@ -119,16 +119,37 @@ max_time = "2024-12-31"
today = time.strftime("%Y-%m-%d")
# ================
# === Download ===
# ================
instants = {
"t2m_max",
"t2m_min",
"t2m_mean",
"snowc_mean",
"sde_mean",
"lblt_max",
"t2m_range",
"t2m_avg",
"t2m_skew",
"naive_snow_isolation",
}
accums = {
"tp",
"sf",
"sshf",
"thawing_degree_days",
"freezing_degree_days",
"thawing_days",
"freezing_days",
"precipitation_occurrences",
"snowfall_occurrences",
}
def create_encoding(ds: xr.Dataset):
"""Create compression encoding for zarr dataset storage.
Creates Blosc compression configuration for all data variables and coordinates
in the dataset using zstd compression with level 9.
in the dataset using zstd compression with level 5.
Args:
ds (xr.Dataset): The xarray Dataset to create encoding for.
@ -138,10 +159,15 @@ def create_encoding(ds: xr.Dataset):
"""
# encoding = {var: {"compressors": BloscCodec(cname="zlib", clevel=9)} for var in ds.data_vars}
encoding = {var: {"compressors": Blosc(cname="zstd", clevel=9)} for var in [*ds.data_vars, *ds.coords]}
encoding = {var: {"compressors": BloscCodec(cname="zstd", clevel=5)} for var in [*ds.data_vars, *ds.coords]}
return encoding
# ================
# === Download ===
# ================
def download_daily_aggregated():
"""Download and aggregate ERA5 data to daily resolution.
@ -184,6 +210,7 @@ def download_daily_aggregated():
# Instant
era5.t2m.resample(time="1D").max().rename("t2m_max"),
era5.t2m.resample(time="1D").min().rename("t2m_min"),
era5.t2m.resample(time="1D").mean().rename("t2m_mean"),
era5.snowc.resample(time="1D").mean().rename("snowc_mean"),
era5.sde.resample(time="1D").mean().rename("sde_mean"),
era5.lblt.resample(time="1D").max().rename("lblt_max"),
@ -197,6 +224,7 @@ def download_daily_aggregated():
# Assign attributes
daily_raw["t2m_max"].attrs = {"long_name": "Daily maximum 2 metre temperature", "units": "K"}
daily_raw["t2m_min"].attrs = {"long_name": "Daily minimum 2 metre temperature", "units": "K"}
daily_raw["t2m_mean"].attrs = {"long_name": "Daily mean 2 metre temperature", "units": "K"}
daily_raw["tp"].attrs = {"long_name": "Daily total precipitation", "units": "m"}
daily_raw["sf"].attrs = {"long_name": "Daily total snow fall", "units": "m"}
daily_raw["snowc_mean"].attrs = {"long_name": "Daily mean snow cover", "units": "m"}
@ -227,6 +255,309 @@ def download():
print(f"Downloaded and aggregated ERA5 data to {DAILY_RAW_PATH.resolve()}.")
# ============================
# === Temporal Aggregation ===
# ============================
def daily_enrich():
"""Enrich daily ERA5 data with derived climate variables.
Loads downloaded daily ERA5 data and computes additional climate variables.
Creates derived variables including temperature statistics, degree days, and occurrence indicators.
Derived variables include:
- Daily average and range temperature
- Temperature skewness
- Thawing and freezing degree days
- Thawing and freezing day counts
- Precipitation and snowfall occurrences
- Snow isolation index
"""
daily = xr.open_zarr(DAILY_RAW_PATH, consolidated=False).set_coords("spatial_ref")
assert "time" in daily.dims, f"Expected dim 'time' to be in {daily.dims=}"
# Formulas based on Groeke et. al. (2025) Stochastic Weather generation...
daily["t2m_avg"] = (daily.t2m_max + daily.t2m_min) / 2
daily.t2m_avg.attrs = {"long_name": "Daily average 2 metre temperature", "units": "K"}
daily["t2m_range"] = daily.t2m_max - daily.t2m_min
daily.t2m_range.attrs = {"long_name": "Daily range of 2 metre temperature", "units": "K"}
daily["t2m_skew"] = (daily.t2m_mean - daily.t2m_min) / daily.t2m_range
daily.t2m_skew.attrs = {"long_name": "Daily skewness of 2 metre temperature"}
daily["thawing_degree_days"] = (daily.t2m_avg - 273.15).clip(min=0)
daily.thawing_degree_days.attrs = {"long_name": "Thawing degree days", "units": "K"}
daily["freezing_degree_days"] = (273.15 - daily.t2m_avg).clip(min=0)
daily.freezing_degree_days.attrs = {"long_name": "Freezing degree days", "units": "K"}
daily["thawing_days"] = (daily.t2m_avg > 273.15).astype(int)
daily.thawing_days.attrs = {"long_name": "Thawing days"}
daily["freezing_days"] = (daily.t2m_avg < 273.15).astype(int)
daily.freezing_days.attrs = {"long_name": "Freezing days"}
daily["precipitation_occurrences"] = (daily.tp > 0).astype(int)
daily.precipitation_occurrences.attrs = {"long_name": "Precipitation occurrences"}
daily["snowfall_occurrences"] = (daily.sf > 0).astype(int)
daily.snowfall_occurrences.attrs = {"long_name": "Snowfall occurrences"}
daily["naive_snow_isolation"] = daily.snowc_mean * daily.sde_mean
daily.naive_snow_isolation.attrs = {"long_name": "Naive snow isolation"}
daily.to_zarr(DAILY_ENRICHED_PATH, mode="w", encoding=create_encoding(daily), consolidated=False)
def monthly_aggregate():
"""Aggregate enriched daily ERA5 data to monthly resolution.
Takes the enriched daily ERA5 data and creates monthly aggregates using
appropriate statistical functions for each variable type.
Instant variables use min, max, or median aggregations, while accumulative
variables are summed over the month.
The aggregated monthly data is saved to a zarr file for further processing.
"""
daily = xr.open_zarr(DAILY_ENRICHED_PATH, consolidated=False)
assert "time" in daily.dims, f"Expected dim 'time' to be in {daily.dims=}"
daily = daily.sel(time=slice(min_time, max_time))
# Monthly instant aggregates
monthly_instants = []
for var in instants:
if var.endswith("_min"):
agg = daily[var].resample(time="1ME").min().rename(var)
agg.attrs = daily[var].attrs
agg.attrs["long_name"] = f"Monthly minimum of {daily[var].attrs.get('long_name', var)}"
monthly_instants.append(agg)
elif var.endswith("_max"):
agg = daily[var].resample(time="1ME").max().rename(var)
agg.attrs = daily[var].attrs
agg.attrs["long_name"] = f"Monthly maximum of {daily[var].attrs.get('long_name', var)}"
monthly_instants.append(agg)
else:
agg = daily[var].resample(time="1ME").median().rename(var)
agg.attrs = daily[var].attrs
agg.attrs["long_name"] = f"Monthly median of {daily[var].attrs.get('long_name', var)}"
monthly_instants.append(agg)
monthly_accums = []
for var in accums:
agg = daily[var].resample(time="1ME").sum().rename(var)
agg.attrs = daily[var].attrs
monthly_accums.append(agg)
monthly = xr.merge(monthly_instants + monthly_accums)
monthly = monthly.chunk({"time": len(monthly.time), "latitude": 64, "longitude": 64})
monthly.to_zarr(MONTHLY_RAW_PATH, mode="w", encoding=create_encoding(monthly), consolidated=False)
def multi_monthly_aggregate(monthly: xr.Dataset, n: int = 12) -> xr.Dataset:
"""Aggregate monthly ERA5 data to a multi-month resolution.
Takes monthly aggregated data and creates multi-month aggregates using a shifted
calendar (October to September) to better capture Arctic seasonal patterns.
Args:
monthly (xr.Dataset): The monthly aggregates
n (int, optional): Number of months to aggregate over. Defaults to 12.
Returns:
xr.Dataset: The aggregated dataset
"""
# Instants
multimonthly_instants = []
for var in instants:
if var.endswith("_min"):
agg = monthly[var].resample(time=f"{n}MS", label="right").min().rename(var)
agg.attrs = monthly[var].attrs
agg.attrs["long_name"] = f"{n}-Monthly minimum of {monthly[var].attrs.get('long_name', var)}"
multimonthly_instants.append(agg)
elif var.endswith("_max"):
agg = monthly[var].resample(time=f"{n}MS", label="right").max().rename(var)
agg.attrs = monthly[var].attrs
agg.attrs["long_name"] = f"{n}-Monthly maximum of {monthly[var].attrs.get('long_name', var)}"
multimonthly_instants.append(agg)
else:
agg = monthly[var].resample(time=f"{n}MS", label="right").median().rename(var)
agg.attrs = monthly[var].attrs
agg.attrs["long_name"] = f"{n}-Monthly median of {monthly[var].attrs.get('long_name', var)}"
multimonthly_instants.append(agg)
# Accums
multimonthly_accums = []
for var in accums:
agg = monthly[var].resample(time=f"{n}MS", label="right").sum().rename(var)
agg.attrs = monthly[var].attrs
multimonthly_accums.append(agg)
multimonthly = xr.merge(multimonthly_instants + multimonthly_accums)
# Effective snow depth
m = np.resize(np.arange(1, n + 1), len(monthly.time))
m = xr.DataArray(m, coords={"time": monthly.time}, dims=["time"])
n_sum = n * (n + 1) // 2
multimonthly["effective_snow_depth"] = (monthly.sde_mean * (n + 1 - m)).resample(time=f"{n}MS").sum().rename(
"effective_snow_depth"
) / n_sum
multimonthly["effective_snow_depth"].attrs = {
"long_name": "Effective Snow Density",
"reference": "Slater et. al. (2017)",
"link": "https://tc.copernicus.org/articles/11/989/2017/tc-11-989-2017.pdf",
}
multimonthly = multimonthly.chunk({"time": len(multimonthly.time), "latitude": 64, "longitude": 64})
return multimonthly
def yearly_and_seasonal_aggregate():
"""Aggregate monthly ERA5 data to yearly resolution with seasonal splits.
Takes monthly aggregated data and creates yearly aggregates using a shifted
calendar (October to September) to better capture Arctic seasonal patterns.
Creates separate aggregates for full year, winter (Oct-Apr), and summer
(May-Sep) periods.
The first and last incomplete years are excluded from the analysis.
Winter months are defined as months 1-7 in the shifted calendar,
and summer months are 8-12.
The final dataset includes yearly, winter, and summer aggregates for all
climate variables, saved to a zarr file.
"""
monthly = xr.open_zarr(MONTHLY_RAW_PATH, consolidated=False).set_coords("spatial_ref")
assert "time" in monthly.dims, f"Expected dim 'time' to be in {monthly.dims=}"
# "Shift" the calendar by slicing the first Jan-Sep and the last Oct-Dec months
first_year = monthly.time.dt.year.min().item()
last_year = monthly.time.dt.year.max().item()
monthly = monthly.sel(time=slice(f"{first_year}-10-01", f"{last_year}-09-30"))
yearly = multi_monthly_aggregate(monthly, n=12)
yearly = derive_yearly_variables(yearly)
yearly.to_zarr(YEARLY_RAW_PATH, mode="w", encoding=create_encoding(yearly), consolidated=False)
summer_winter = multi_monthly_aggregate(monthly, n=6)
summer = summer_winter.sel(time=summer_winter.time.dt.month == 4)
summer.to_zarr(SUMMER_RAW_PATH, mode="w", encoding=create_encoding(summer), consolidated=False)
winter = summer_winter.sel(time=summer_winter.time.dt.month == 10)
winter.to_zarr(WINTER_RAW_PATH, mode="w", encoding=create_encoding(winter), consolidated=False)
def derive_yearly_variables(yearly: xr.Dataset) -> xr.Dataset:
"""Derive additional variables from daily data and add them to the yearly dataset.
Args:
yearly (xr.Dataset): The yearly aggregated dataset to enrich.
Returns:
xr.Dataset: The enriched yearly dataset with additional derived variables.
"""
assert "time" in yearly.dims, f"Expected dim 'time' to be in {yearly.dims=}"
daily = xr.open_zarr(DAILY_ENRICHED_PATH, consolidated=False).set_coords("spatial_ref")
assert "time" in daily.dims, f"Expected dim 'time' to be in {daily.dims=}"
daily = daily.sel(time=slice(min_time, max_time))
# ? Note: The functions do not really account for leap years
# n_days_in_year = daily.time.groupby("time.year").count().rename("n_days_in_year")
n_days_in_year = 365
# A mask to check which places never thaws
# Persist in memory because we need it twice and this dramatically reduces the Dask-Graph size
never_thaws = (daily.thawing_days.groupby("time.year").sum(dim="time") == 0).compute()
# ? First and last thaw day is NOT calculated within the october-september year, but within the calendar year
# This results in a much more correct representation of thawing periods in regions where the last thawing day
# is between october and december.
# This assumes that the 01-01 is almost everywhere one of the coldest days in the year
first_thaw_day = daily.thawing_days.groupby("time.year").apply(lambda x: x.argmax(dim="time")) + 1
first_thaw_day = first_thaw_day.where(~never_thaws).rename("day_of_first_thaw").rename(year="time")
first_thaw_day["time"] = pd.to_datetime([f"{y}-10-01" for y in first_thaw_day.time.values]) # noqa: PD011
first_thaw_day.attrs = {"long_name": "Day of first thaw in year", "units": "day of year"}
yearly["day_of_first_thaw"] = first_thaw_day.sel(time=yearly.time)
last_thaw_day = (
n_days_in_year - daily.thawing_days[::-1].groupby("time.year").apply(lambda x: x.argmax(dim="time")) + 1
)
last_thaw_day = last_thaw_day.where(~never_thaws).rename("day_of_last_thaw").rename(year="time")
last_thaw_day["time"] = pd.to_datetime([f"{y}-10-01" for y in last_thaw_day.time.values]) # noqa: PD011
last_thaw_day.attrs = {"long_name": "Day of last thaw in year", "units": "day of year"}
yearly["day_of_last_thaw"] = last_thaw_day.sel(time=yearly.time)
yearly["thawing_period_length"] = (yearly.day_of_last_thaw - yearly.day_of_first_thaw).rename(
"thawing_period_length"
)
yearly.thawing_period_length.attrs = {"long_name": "Thawing period length in year", "units": "days"}
# ? First and last freeze day is NOT calculated within the october-september year, but within an july-june year
# This results, similar to the thawing days, in a much more correct representation of freezing periods in regions
# where the first freezing day is between july and september.
# This assumes that the 01-07 is almost everywhere one of the warmest days in the year
daily_shifted = daily.copy()
daily_shifted["time"] = pd.to_datetime(daily_shifted.time.values) + pd.DateOffset(months=6)
# A mask to check which places never freeze
# Persist in memory because we need it twice and this dramatically reduces the Dask-Graph size
never_freezes = (daily_shifted.freezing_days.groupby("time.year").sum(dim="time") == 0).compute()
first_freezing_day = daily_shifted.freezing_days.groupby("time.year").apply(lambda x: x.argmax(dim="time")) + 1
first_freezing_day = first_freezing_day.where(~never_freezes).rename("day_of_first_freeze").rename(year="time")
first_freezing_day["time"] = pd.to_datetime([f"{y}-10-01" for y in first_freezing_day.time.values]) # noqa: PD011
first_freezing_day.attrs = {"long_name": "Day of first freeze in year", "units": "day of year"}
yearly["day_of_first_freeze"] = first_freezing_day.sel(time=yearly.time)
last_freezing_day = (
n_days_in_year
- daily_shifted.freezing_days[::-1].groupby("time.year").apply(lambda x: x.argmax(dim="time"))
+ 1
)
last_freezing_day = last_freezing_day.where(~never_freezes).rename("day_of_last_freeze").rename(year="time")
last_freezing_day["time"] = pd.to_datetime([f"{y}-10-01" for y in last_freezing_day.time.values]) # noqa: PD011
last_freezing_day.attrs = {"long_name": "Day of last freeze in year", "units": "day of year"}
yearly["day_of_last_freeze"] = last_freezing_day.sel(time=yearly.time)
yearly["freezing_period_length"] = (yearly.day_of_last_freeze - yearly.day_of_first_freeze).rename(
"freezing_period_length"
)
yearly.freezing_period_length.attrs = {"long_name": "Freezing period length in year", "units": "days"}
return yearly
@cli.command
def enrich(n_workers: int = 10, monthly: bool = True, yearly: bool = True, daily: bool = True):
"""Enrich data and pPerform temporal aggregation of ERA5 data using Dask cluster.
Creates a Dask cluster and runs both monthly and yearly aggregation
functions to generate temporally aggregated climate datasets. The
processing uses parallel workers for efficient computation.
Args:
n_workers (int, optional): Number of Dask workers to use. Defaults to 10.
monthly (bool, optional): Whether to perform monthly aggregation. Defaults to True.
yearly (bool, optional): Whether to perform yearly aggregation. Defaults to True.
daily (bool, optional): Whether to perform daily enrichment. Defaults to True.
"""
with (
dd.LocalCluster(n_workers=n_workers, threads_per_worker=20, memory_limit="10GB") as cluster,
dd.Client(cluster) as client,
):
print(client)
print(client.dashboard_link)
if daily:
daily_enrich()
if monthly:
monthly_aggregate()
if yearly:
yearly_and_seasonal_aggregate()
print("Enriched ERA5 data with additional features and aggregated it temporally.")
# ===========================
# === Spatial Aggregation ===
# ===========================
@ -250,26 +581,27 @@ def _split_antimeridian_cell(geom: Polygon) -> list[Polygon]:
return list(polys.geoms)
def _check_geobox(geobox):
x, y = geobox.shape
return x > 1 and y > 1
def _check_geom(geobox: odc.geo.geobox.GeoBox, geom: odc.geo.Geometry) -> bool:
enclosing = geobox.enclosing(geom)
x, y = enclosing.shape
if x <= 1 or y <= 1:
return False
roi: tuple[slice, slice] = geobox.overlap_roi(enclosing)
roix, roiy = roi
return (roix.stop - roix.start) > 1 and (roiy.stop - roiy.start) > 1
def extract_cell_data(idx: int, geom: Polygon) -> xr.Dataset:
def extract_cell_data(yearly: xr.Dataset, geom: Polygon):
"""Extract ERA5 data for a specific grid cell geometry.
Extracts and spatially averages ERA5 data within the bounds of a grid cell.
Handles antimeridian-crossing cells by splitting them appropriately.
Args:
idx (int): Index of the grid cell.
yearly (xr.Dataset): Yearly aggregated ERA5 dataset.
geom (Polygon): Polygon geometry of the grid cell.
Returns:
xr.Dataset: The computed cell dataset
"""
daily_raw = xr.open_zarr(DAILY_RAW_PATH, consolidated=False).set_coords("spatial_ref")
# cell.geometry is a shapely Polygon
if not _crosses_antimeridian(geom):
geoms = [geom]
@ -279,17 +611,16 @@ def extract_cell_data(idx: int, geom: Polygon) -> xr.Dataset:
cell_data = []
for geom in geoms:
geom = odc.geo.Geometry(geom, crs="epsg:4326")
if not _check_geobox(daily_raw.odc.geobox.enclosing(geom)):
if not _check_geom(yearly.odc.geobox, geom):
continue
# TODO: use mean for instant variables, sum for accum variables
cell_data.append(daily_raw.odc.crop(geom).drop_vars("spatial_ref").mean(["latitude", "longitude"]))
cell_data.append(yearly.odc.crop(geom).drop_vars("spatial_ref").mean(["latitude", "longitude"]))
if len(cell_data) == 0:
return False
elif len(cell_data) == 1:
cell_data = cell_data[0]
else:
cell_data = xr.concat(cell_data, dim="part").mean("part")
cell_data = cell_data.expand_dims({"cell": [idx]}).compute()
cell_data = cell_data.compute()
return cell_data
@ -297,8 +628,8 @@ def extract_cell_data(idx: int, geom: Polygon) -> xr.Dataset:
def spatial_agg(
grid: Literal["hex", "healpix"],
level: int,
agg: Literal["summer", "winter", "yearly"] = "yearly",
n_workers: int = 10,
executor: Literal["threads", "processes"] = "threads",
):
"""Perform spatial aggregation of ERA5 data to grid cells.
@ -309,267 +640,71 @@ def spatial_agg(
Args:
grid ("hex" | "healpix"): Grid type.
level (int): Grid resolution level.
agg ("summer" | "winter" | "yearly"): Type of aggregation to perform. Defaults to yearly.
n_workers (int, optional): Number of parallel workers to use. Defaults to 10.
executor ("threads" | "processes"): The type of parallel executor pool to use. Defaults to threads.
"""
gridname = f"permafrost_{grid}{level}"
daily_grid_path = _get_grid_paths("daily", grid, level)
grid = gpd.read_parquet(DATA_DIR / f"grids/{gridname}_grid.parquet")
agg_grid_path = _get_grid_paths(agg, grid, level)
grid_df = gpd.read_parquet(DATA_DIR / f"grids/{gridname}_grid.parquet")
# Create an empty zarr array with the right dimensions
daily_raw = xr.open_zarr(DAILY_RAW_PATH, consolidated=False).set_coords("spatial_ref")
assert {"latitude", "longitude", "time"} == set(daily_raw.dims), (
f"Expected dims ('latitude', 'longitude', 'time'), got {daily_raw.dims}"
if agg == "summer":
agg_data_path = SUMMER_RAW_PATH
elif agg == "winter":
agg_data_path = WINTER_RAW_PATH
elif agg == "yearly":
agg_data_path = YEARLY_RAW_PATH
else:
raise ValueError(f"Unknown aggregation type: {agg}")
agg_raw = (
xr.open_zarr(agg_data_path, consolidated=False, decode_timedelta=False)
.set_coords("spatial_ref")
.drop_vars(["surface", "number", "depthBelowLandLayer"])
.load()
)
assert daily_raw.odc.crs == "epsg:4326", f"Expected CRS 'epsg:4326', got {daily_raw.odc.crs}"
daily = (
xr.zeros_like(daily_raw.isel(latitude=0, longitude=0))
.expand_dims({"cell": [idx for idx, _ in grid.iterrows()]})
.chunk({"cell": min(len(grid), 1000), "time": len(daily_raw.time)}) # ~50MB chunks
assert {"latitude", "longitude", "time"} == set(agg_raw.dims), (
f"Expected dims ('latitude', 'longitude', 'time'), got {agg_raw.dims}"
)
daily.to_zarr(daily_grid_path, mode="w", consolidated=False, encoding=create_encoding(daily))
print(f"Created empty zarr at {daily_grid_path.resolve()} with shape {daily.sizes}.")
assert agg_raw.odc.crs == "epsg:4326", f"Expected CRS 'epsg:4326', got {agg_raw.odc.crs}"
print(f"Starting spatial matching of {len(grid)} cells with {n_workers} workers...")
ExecutorCls = ThreadPoolExecutor if executor == "threads" else ProcessPoolExecutor
with ExecutorCls(max_workers=n_workers) as executor:
futures = {
executor.submit(extract_cell_data, idx, row.geometry): idx
for idx, row in grid.to_crs("epsg:4326").iterrows()
}
for future in track(as_completed(futures), total=len(futures), description="Processing cells"):
idx = futures[future]
try:
cell_data = future.result()
if not cell_data:
print(f"Cell {idx} did not overlap with ERA5 data.")
cell_data.to_zarr(daily_grid_path, region="auto", consolidated=False)
print(f"Successfully written cell {idx}")
except Exception as e:
print(f"{type(e)} processing cell {idx}: {e}")
print("Finished spatial matching.")
# Convert lons to -180 to 180 instead of 0 to 360
agg_raw = agg_raw.assign_coords(longitude=(((agg_raw.longitude + 180) % 360) - 180)).sortby("longitude")
# ? Converting cell IDs from hex strings to integers for xdggs compatibility
cells = [int(cid, 16) for cid in grid_df.cell_id.to_list()]
# ============================
# === Temporal Aggregation ===
# ============================
def daily_enrich(grid: Literal["hex", "healpix"], level: int) -> xr.Dataset:
"""Enrich daily ERA5 data with derived climate variables.
Loads spatially aligned ERA5 data and computes additional climate variables.
Creates derived variables including temperature statistics, degree days, and occurrence indicators.
Derived variables include:
- Daily average and range temperature
- Temperature skewness
- Thawing and freezing degree days
- Thawing and freezing day counts
- Precipitation and snowfall occurrences
- Snow isolation index
Args:
grid ("hex", "healpix"): Grid type.
level (int): Grid resolution level.
Returns:
xr.Dataset: Enriched dataset with original and derived variables.
"""
daily_grid_path = _get_grid_paths("daily", grid, level)
daily = xr.open_zarr(daily_grid_path, consolidated=False).set_coords("spatial_ref")
assert {"cell", "time"} == set(daily.dims), f"Expected dims ('cell', 'time'), got {daily.dims}"
# Formulas based on Groeke et. al. (2025) Stochastic Weather generation...
daily["t2m_avg"] = (daily.t2m_max + daily.t2m_min) / 2
daily.t2m_avg.attrs = {"long_name": "Daily average 2 metre temperature", "units": "K"}
daily["t2m_range"] = daily.t2m_max - daily.t2m_min
daily.t2m_range.attrs = {"long_name": "Daily range of 2 metre temperature", "units": "K"}
daily["t2m_skew"] = (daily.t2m_avg - daily.t2m_min) / daily.t2m_range
daily.t2m_skew.attrs = {"long_name": "Daily skewness of 2 metre temperature"}
daily["thawing_degree_days"] = (daily.t2m_avg - 273.15).clip(min=0)
daily.thawing_degree_days.attrs = {"long_name": "Thawing degree days", "units": "K"}
daily["freezing_degree_days"] = (273.15 - daily.t2m_avg).clip(min=0)
daily.freezing_degree_days.attrs = {"long_name": "Freezing degree days", "units": "K"}
daily["thawing_days"] = (daily.t2m_avg > 273.15).astype(int)
daily.thawing_days.attrs = {"long_name": "Thawing days"}
daily["freezing_days"] = (daily.t2m_avg < 273.15).astype(int)
daily.freezing_days.attrs = {"long_name": "Freezing days"}
daily["precipitation_occurrences"] = (daily.tp > 0).astype(int)
daily.precipitation_occurrences.attrs = {"long_name": "Precipitation occurrences"}
daily["snowfall_occurrences"] = (daily.sf > 0).astype(int)
daily.snowfall_occurrences.attrs = {"long_name": "Snowfall occurrences"}
daily["snow_isolation"] = daily.snowc_mean * daily.sde_mean
daily.snow_isolation.attrs = {"long_name": "Snow isolation"}
return daily
def monthly_aggregate(grid: Literal["hex", "healpix"], level: int):
"""Aggregate enriched daily ERA5 data to monthly resolution.
Takes the enriched daily ERA5 data and creates monthly aggregates using
appropriate statistical functions for each variable type. Temperature
variables use min/max/mean, accumulation variables use sums, and derived
variables use appropriate aggregations.
The aggregated monthly data is saved to a zarr file for further processing.
Args:
grid ("hex", "healpix"): Grid type.
level (int): Grid resolution level.
"""
daily = daily_enrich(grid, level)
assert {"cell", "time"} == set(daily.dims), f"Expected dims ('cell', 'time'), got {daily.dims}"
# Monthly aggregates
monthly = xr.merge(
[
# Original variables
daily.t2m_min.resample(time="1ME").min().rename("t2m_min"),
daily.t2m_max.resample(time="1ME").max().rename("t2m_max"),
daily.snowc_mean.resample(time="1ME").mean().rename("snowc_mean"),
daily.sde_mean.resample(time="1ME").mean().rename("sde_mean"),
daily.lblt_max.resample(time="1ME").max().rename("lblt_max"),
daily.tp.resample(time="1ME").sum().rename("tp"),
daily.sf.resample(time="1ME").sum().rename("sf"),
daily.sshf.resample(time="1ME").sum().rename("sshf"),
# Enriched variables
daily.t2m_avg.resample(time="1ME").mean().rename("t2m_avg"),
daily.t2m_range.resample(time="1ME").mean().rename("t2m_mean_range"),
daily.t2m_skew.resample(time="1ME").mean().rename("t2m_mean_skew"),
daily.thawing_degree_days.resample(time="1ME").sum().rename("thawing_degree_days"),
daily.freezing_degree_days.resample(time="1ME").sum().rename("freezing_degree_days"),
daily.thawing_days.resample(time="1ME").sum().rename("thawing_days"),
daily.freezing_days.resample(time="1ME").sum().rename("freezing_days"),
daily.precipitation_occurrences.resample(time="1ME").sum().rename("precipitation_occurrences"),
daily.snowfall_occurrences.resample(time="1ME").sum().rename("snowfall_occurrences"),
daily.snow_isolation.resample(time="1ME").mean().rename("snow_mean_isolation"),
]
agg_aligned = (
xr.zeros_like(agg_raw.isel(latitude=0, longitude=0).drop_vars(["latitude", "longitude"]))
.expand_dims({"cell_ids": cells})
.chunk({"cell_ids": min(len(grid_df), 10000), "time": len(agg_raw.time)})
)
monthly_grid_path = _get_grid_paths("monthly", grid, level)
monthly.to_zarr(monthly_grid_path, mode="w", encoding=create_encoding(monthly), consolidated=False)
agg_aligned.cell_ids.attrs = {
"grid_name": "h3" if grid == "hex" else grid,
"level": level,
}
if grid == "healpix":
agg_aligned.cell_ids.attrs["indexing_scheme"] = "nested"
from stopuhr import stopwatch
def yearly_aggregate(monthly: xr.Dataset) -> xr.Dataset:
"""Aggregate monthly ERA5 data to yearly resolution.
Takes monthly aggregated data and creates yearly aggregates using a shifted
calendar (October to September) to better capture Arctic seasonal patterns.
Args:
monthly (xr.Dataset): The monthly aggregates
Returns:
xr.Dataset: The aggregated dataset
"""
return xr.merge(
[
# Original variables
monthly.t2m_min.resample(time="1YE").min().rename("t2m_min"),
monthly.t2m_max.resample(time="1YE").max().rename("t2m_max"),
monthly.snowc_mean.resample(time="1YE").mean().rename("snowc_mean"),
monthly.sde_mean.resample(time="1YE").mean().rename("sde_mean"),
monthly.lblt_max.resample(time="1YE").max().rename("lblt_max"),
monthly.tp.resample(time="1YE").sum().rename("tp"),
monthly.sf.resample(time="1YE").sum().rename("sf"),
monthly.sshf.resample(time="1YE").sum().rename("sshf"),
# Enriched variables
monthly.t2m_avg.resample(time="1YE").mean().rename("t2m_avg"),
# TODO: Check if this is correct -> use daily / hourly data instead for range and skew?
monthly.t2m_mean_range.resample(time="1YE").mean().rename("t2m_mean_range"),
monthly.t2m_mean_skew.resample(time="1YE").mean().rename("t2m_mean_skew"),
monthly.thawing_degree_days.resample(time="1YE").sum().rename("thawing_degree_days"),
monthly.freezing_degree_days.resample(time="1YE").sum().rename("freezing_degree_days"),
monthly.thawing_days.resample(time="1YE").sum().rename("thawing_days"),
monthly.freezing_days.resample(time="1YE").sum().rename("freezing_days"),
monthly.precipitation_occurrences.resample(time="1YE").sum().rename("precipitation_occurrences"),
monthly.snowfall_occurrences.resample(time="1YE").sum().rename("snowfall_occurrences"),
monthly.snow_mean_isolation.resample(time="1YE").mean().rename("snow_mean_isolation"),
]
)
def yearly_and_seasonal_aggregate(grid: Literal["hex", "healpix"], level: int):
"""Aggregate monthly ERA5 data to yearly resolution with seasonal splits.
Takes monthly aggregated data and creates yearly aggregates using a shifted
calendar (October to September) to better capture Arctic seasonal patterns.
Creates separate aggregates for full year, winter (Oct-Apr), and summer
(May-Sep) periods.
The first and last incomplete years are excluded from the analysis.
Winter months are defined as months 1-7 in the shifted calendar,
and summer months are 8-12.
The final dataset includes yearly, winter, and summer aggregates for all
climate variables, saved to a zarr file.
Args:
grid ("hex", "healpix"): Grid type.
level (int): Grid resolution level.
"""
monthly_grid_path = _get_grid_paths("monthly", grid, level)
monthly = xr.open_zarr(monthly_grid_path, consolidated=False).set_coords("spatial_ref")
assert {"cell", "time"} == set(monthly.dims), f"Expected dims ('cell', 'time'), got {monthly.dims}"
valid_years = slice(str(monthly.time.min().dt.year.item() + 1), str(monthly.time.max().dt.year.item()))
# Summer aggregates
summer = yearly_aggregate(monthly.sel(time=monthly.time.dt.month.isin([5, 6, 7, 8, 9])).sel(time=valid_years))
# Yearly aggregates (shifted by +8 months to start in Oktober, first and last years will be cropped)
monthly_shifted = monthly.copy()
monthly_shifted["time"] = monthly_shifted.get_index("time") + pd.DateOffset(months=8)
monthly_shifted = monthly_shifted.sel(time=valid_years)
yearly = yearly_aggregate(monthly_shifted)
# Winter aggregates (shifted by +8 months to start in Oktober, first and last years will be cropped)
monthly_shifted = monthly.copy().sel(time=monthly.time.dt.month.isin([1, 2, 3, 4, 10, 11, 12]))
monthly_shifted["time"] = monthly_shifted.get_index("time") + pd.DateOffset(months=8)
monthly_shifted = monthly_shifted.sel(time=valid_years)
winter = yearly_aggregate(monthly_shifted)
yearly_grid_path = _get_grid_paths("yearly", grid, level)
yearly.to_zarr(yearly_grid_path, mode="w", encoding=create_encoding(yearly), consolidated=False)
winter_grid_path = _get_grid_paths("winter", grid, level)
winter.to_zarr(winter_grid_path, mode="w", encoding=create_encoding(winter), consolidated=False)
summer_grid_path = _get_grid_paths("summer", grid, level)
summer.to_zarr(summer_grid_path, mode="w", encoding=create_encoding(summer), consolidated=False)
@cli.command
def temporal_agg(n_workers: int = 10):
"""Perform temporal aggregation of ERA5 data using Dask cluster.
Creates a Dask cluster and runs both monthly and yearly aggregation
functions to generate temporally aggregated climate datasets. The
processing uses parallel workers for efficient computation.
Args:
n_workers (int, optional): Number of Dask workers to use. Defaults to 10.
"""
with (
dd.LocalCluster(n_workers=n_workers, threads_per_worker=20, memory_limit="10GB") as cluster,
dd.Client(cluster) as client,
for _, row in track(
grid_df.to_crs("epsg:4326").iterrows(),
total=len(grid_df),
description="Spatially aggregating ERA5 data...",
):
print(client)
print(client.dashboard_link)
monthly_aggregate()
yearly_and_seasonal_aggregate()
print("Enriched ERA5 data with additional features and aggregated it temporally.")
cell_id = int(row.cell_id, 16)
with stopwatch("Extracting cell data", log=False):
cell_data = extract_cell_data(agg_raw, row.geometry)
if cell_data is False:
print(f"Warning: No data found for cell {cell_id}, skipping.")
continue
with stopwatch("Assigning cell data", log=False):
agg_aligned.loc[{"cell_ids": cell_id}] = cell_data
agg_aligned.to_zarr(agg_grid_path, mode="w", consolidated=False, encoding=create_encoding(agg_aligned))
print("Finished spatial matching.")
stopwatch.summary()
if __name__ == "__main__":

18
steps/s1_1_era5/era5.sh Normal file
View file

@ -0,0 +1,18 @@
#!/bin/bash
# uv run era5 download
# uv run era5 enrich
# Can be summer, winter or yearly
agg=$1
echo "Running ERA5 spatial aggregation for aggregation type: $agg"
uv run era5 spatial-agg --grid hex --level 3 --agg $agg
uv run era5 spatial-agg --grid hex --level 4 --agg $agg
uv run era5 spatial-agg --grid hex --level 5 --agg $agg
uv run era5 spatial-agg --grid healpix --level 6 --agg $agg
uv run era5 spatial-agg --grid healpix --level 7 --agg $agg
uv run era5 spatial-agg --grid healpix --level 8 --agg $agg
uv run era5 spatial-agg --grid healpix --level 9 --agg $agg

2542
uv.lock generated

File diff suppressed because it is too large Load diff