entropice/era5.py

391 lines
19 KiB
Python
Raw Normal View History

2025-09-26 11:05:29 +02:00
"""Download and preprocess ERA5 data.
Variables of Interest:
- 2 metre temperature (t2m)
- Total precipitation (tp)
- Snow Fall (sf)
- Snow cover (snowc)
- Snow depth (sde)
- Surface sensible heat flux (sshf)
- Lake ice bottom temperature (lblt)
Aggregations:
- Summer / Winter 20-bin histogram?
Spatial -> Enrich -> Temporal ?
Author: Tobias Hölzer
Date: 09. June 2025
"""
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Literal
import cyclopts
import dask.distributed as dd
import geopandas as gpd
import odc.geo
import odc.geo.xr
import pandas as pd
import shapely
import shapely.ops
import xarray as xr
from numcodecs.zarr3 import Blosc
from rich import pretty, print, traceback
from shapely.geometry import LineString, Polygon
traceback.install(show_locals=True)
pretty.install()
DATA_DIR = Path("data/era5")
AGG_PATH = DATA_DIR / "era5_agg.zarr"
ALIGNED_PATH = DATA_DIR / "era5_spatial_aligned.zarr"
MONTHLY_PATH = DATA_DIR / "era5_monthly.zarr"
YEARLY_PATH = DATA_DIR / "era5_yearly.zarr"
min_lat = 50
max_lat = 85
min_time = "2022-01-01"
max_time = "2024-12-31"
subset = {"latitude": slice(max_lat, min_lat), "time": slice(min_time, max_time)}
DATA_DIR = Path("/isipd/projects/p_aicore_pf/tohoel001/era5_thawing_data")
today = time.strftime("%Y-%m-%d")
# TODO: I think it would be better to aggregate via hours instead of days
# Pipeline would be:
# Download hourly data -> Spatially match hourly data ->
# For {daily, monthly, yearly}:
# Enrich -> Aggregate temporally
def create_encoding(ds: xr.Dataset):
# encoding = {var: {"compressors": BloscCodec(cname="zlib", clevel=9)} for var in ds.data_vars}
encoding = {var: {"compressors": Blosc(cname="zstd", clevel=9)} for var in [*ds.data_vars, *ds.coords]}
return encoding
def download_daily_aggregated():
era5 = xr.open_dataset(
"https://data.earthdatahub.destine.eu/era5/reanalysis-era5-land-no-antartica-v0.zarr",
storage_options={"client_kwargs": {"trust_env": True}},
chunks={"latitude": 64 * 4, "longitude": 64 * 4},
# chunks={},
engine="zarr",
).rename({"valid_time": "time"})
era5 = era5.sel(**subset)
era5_agg = xr.merge(
[
era5.t2m.resample(time="1D").max().rename("t2m_daily_max"),
era5.t2m.resample(time="1D").min().rename("t2m_daily_min"),
era5.tp.resample(time="1D").sum().rename("tp_daily_sum"),
# era5.sf.resample(time="1D").sum().rename("sf_daily_sum"),
# era5.snowc.resample(time="1D").mean().rename("snowc_daily_mean"),
# era5.sde.resample(time="1D").mean().rename("sde_daily_mean"),
# era5.sshf.resample(time="1D").sum().rename("sshf_daily_sum"),
# era5.lblt.resample(time="1D").max().rename("lblt_daily_max"),
]
)
# Rechunk if the first time chunk is not the same as the middle ones
if era5_agg.chunksizes["time"][0] != era5_agg.chunksizes["time"][1]:
era5_agg = era5_agg.chunk({"time": 120})
# Assign attributes
era5_agg["t2m_daily_max"].attrs = {"long_name": "Daily maximum 2 metre temperature", "units": "K"}
era5_agg["t2m_daily_min"].attrs = {"long_name": "Daily minimum 2 metre temperature", "units": "K"}
era5_agg["tp_daily_sum"].attrs = {"long_name": "Daily total precipitation", "units": "m"}
# era5_agg["sf_daily_sum"].attrs = {"long_name": "Daily total snow fall", "units": "m"}
# era5_agg["snowc_daily_mean"].attrs = {"long_name": "Daily mean snow cover", "units": "m"}
# era5_agg["sde_daily_mean"].attrs = {"long_name": "Daily mean snow depth", "units": "m"}
# era5_agg["sshf_daily_sum"].attrs = {"long_name": "Daily total surface sensible heat flux", "units": "J/m²"}
# era5_agg["lblt_daily_max"].attrs = {"long_name": "Daily maximum lake ice bottom temperature", "units": "K"}
era5_agg.to_zarr(AGG_PATH, mode="w", encoding=create_encoding(era5_agg), consolidated=False)
def crosses_antimeridian(geom: Polygon) -> bool:
coords = shapely.get_coordinates(geom)
crosses_any_meridian = (coords[:, 0] > 0).any() and (coords[:, 0] < 0).any()
return crosses_any_meridian and abs(coords[:, 0]).max() > 90
def split_antimeridian_cell(geom: Polygon) -> list[Polygon]:
# Assumes that it is a antimeridian hex
coords = shapely.get_coordinates(geom)
for i in range(coords.shape[0]):
if coords[i, 0] < 0:
coords[i, 0] += 360
geom = Polygon(coords)
antimeridian = LineString([[180, -90], [180, 90]])
polys = shapely.ops.split(geom, antimeridian)
return list(polys.geoms)
def check_geobox(geobox):
x, y = geobox.shape
return x > 1 and y > 1
def extract_cell_data(idx: int, geom: Polygon) -> xr.Dataset:
era5_agg = xr.open_zarr(AGG_PATH)
assert {"latitude", "longitude", "time"} == set(era5_agg.dims), (
f"Expected dims ('latitude', 'longitude', 'time'), got {era5_agg.dims}"
)
# cell.geometry is a shapely Polygon
if not crosses_antimeridian(geom):
geoms = [geom]
# Split geometry in case it crossed antimeridian
else:
geoms = split_antimeridian_cell(geom)
cell_data = []
for geom in geoms:
geom = odc.geo.Geometry(geom, crs="epsg:4326")
if not check_geobox(era5_agg.odc.geobox.enclosing(geom)):
continue
cell_data.append(era5_agg.odc.crop(geom).drop_vars("spatial_ref").mean(["latitude", "longitude"]))
if len(cell_data) == 0:
return None
elif len(cell_data) == 1:
return cell_data[0].expand_dims({"cell": [idx]}).chunk({"cell": 1})
else:
return xr.concat(cell_data, dim="part").mean("part").expand_dims({"cell": [idx]}).chunk({"cell": 1})
def spatial_matching(grid: gpd.GeoDataFrame, n_workers: int = 10):
with ThreadPoolExecutor(max_workers=n_workers) as executor:
futures = {
executor.submit(extract_cell_data, idx, row.geometry): idx
for idx, row in grid.to_crs("epsg:4326").iterrows()
}
for future in as_completed(futures):
idx = futures[future]
try:
data = future.result()
data.to_zarr(ALIGNED_PATH, append_dim="cell", consolidated=False, encoding=create_encoding(data))
except Exception as e:
print(f"Error processing cell {idx}: {e}")
def daily_enrich() -> xr.Dataset:
era5 = xr.open_zarr(ALIGNED_PATH)
assert {"cell", "time"} == set(era5.dims), f"Expected dims ('cell', 'time'), got {era5.dims}"
# Formulas based on Groeke et. al. (2025) Stochastic Weather generation...
era5["t2m_daily_avg"] = (era5.t2m_daily_max + era5.t2m_daily_min) / 2
era5.t2m_daily_avg.attrs = {"long_name": "Daily average 2 metre temperature", "units": "K"}
era5["t2m_daily_range"] = era5.t2m_daily_max - era5.t2m_daily_min
era5.t2m_daily_range.attrs = {"long_name": "Daily range of 2 metre temperature", "units": "K"}
era5["t2m_daily_skew"] = (era5.t2m_daily_avg - era5.t2m_daily_min) / era5.t2m_daily_range
era5.t2m_daily_skew.attrs = {"long_name": "Daily skewness of 2 metre temperature"}
era5["thawing_degree_days"] = (era5.t2m_daily_avg - 273.15).clip(min=0)
era5.thawing_degree_days.attrs = {"long_name": "Thawing degree days", "units": "K"}
era5["freezing_degree_days"] = (273.15 - era5.t2m_daily_avg).clip(min=0)
era5.freezing_degree_days.attrs = {"long_name": "Freezing degree days", "units": "K"}
era5["thawing_days"] = (era5.t2m_daily_avg > 273.15).astype(int)
era5.thawing_days.attrs = {"long_name": "Thawing days"}
era5["freezing_days"] = (era5.t2m_daily_avg < 273.15).astype(int)
era5.freezing_days.attrs = {"long_name": "Freezing days"}
era5["precipitation_occurrences"] = (era5.tp_daily_sum > 0).astype(int)
era5.precipitation_occurrences.attrs = {"long_name": "Precipitation occurrences"}
era5["snowfall_occurrences"] = (era5.sf_daily_sum > 0).astype(int)
era5.snowfall_occurrences.attrs = {"long_name": "Snowfall occurrences"}
era5["snow_isolation"] = era5.snowc_daily_mean * era5.sde_daily_mean
era5.snow_isolation.attrs = {"long_name": "Snow isolation"}
return era5
def monthly_aggregate():
era5 = daily_enrich()
assert {"cell", "time"} == set(era5.dims), f"Expected dims ('cell', 'time'), got {era5.dims}"
# Monthly aggregates
monthly = xr.merge(
[
# Original variables
era5.t2m_daily_min.resample(time="1M").min().rename("t2m_monthly_min"),
era5.t2m_daily_max.resample(time="1M").max().rename("t2m_monthly_max"),
era5.tp_daily_sum.resample(time="1M").sum().rename("tp_monthly_sum"),
era5.sf_daily_sum.resample(time="1M").sum().rename("sf_monthly_sum"),
era5.snowc_daily_mean.resample(time="1M").mean().rename("snowc_monthly_mean"),
era5.sde_daily_mean.resample(time="1M").mean().rename("sde_monthly_mean"),
era5.sshf_daily_sum.resample(time="1M").sum().rename("sshf_monthly_sum"),
era5.lblt_daily_max.resample(time="1M").max().rename("lblt_monthly_max"),
# Enriched variables
era5.t2m_daily_avg.resample(time="1M").mean().rename("t2m_monthly_avg"),
era5.t2m_daily_range.resample(time="1M").mean().rename("t2m_daily_range_monthly_avg"),
era5.t2m_daily_skew.resample(time="1M").mean().rename("t2m_daily_skew_monthly_avg"),
era5.thawing_degree_days.resample(time="1M").sum().rename("thawing_degree_days_monthly"),
era5.freezing_degree_days.resample(time="1M").sum().rename("freezing_degree_days_monthly"),
era5.thawing_days.resample(time="1M").sum().rename("thawing_days_monthly"),
era5.freezing_days.resample(time="1M").sum().rename("freezing_days_monthly"),
era5.precipitation_occurrences.resample(time="1M").sum().rename("precipitation_occurrences_monthly"),
era5.snowfall_occurrences.resample(time="1M").sum().rename("snowfall_occurrences_monthly"),
era5.snow_isolation.resample(time="1M").mean().rename("snow_isolation_monthly_mean"),
]
)
monthly.to_zarr(MONTHLY_PATH, mode="w", encoding=create_encoding(monthly), consolidated=False)
def yearly_aggregate():
monthly = xr.open_zarr(MONTHLY_PATH)
assert {"cell", "time"} == set(monthly.dims), f"Expected dims ('cell', 'time'), got {monthly.dims}"
# Yearly aggregates (shifted by +10 months to start in Oktober, first and last years will be cropped)
monthly_shifted = monthly.copy()
monthly_shifted["time"] = monthly_shifted.get_index("time") + pd.DateOffset(months=10)
incomplete_years = {monthly_shifted.time.dt.year.min().item(), monthly_shifted.time.dt.year.max().item()}
monthly_shifted = monthly_shifted.sel(time=~monthly_shifted.time.dt.year.isin(incomplete_years))
yearly = xr.merge(
[
# Original variables
monthly_shifted.t2m_monthly_min.resample(time="1Y").min().rename("t2m_yearly_min"),
monthly_shifted.t2m_monthly_max.resample(time="1Y").max().rename("t2m_yearly_max"),
monthly_shifted.tp_monthly_sum.resample(time="1Y").sum().rename("tp_yearly_sum"),
monthly_shifted.sf_monthly_sum.resample(time="1Y").sum().rename("sf_yearly_sum"),
monthly_shifted.snowc_monthly_mean.resample(time="1Y").mean().rename("snowc_yearly_mean"),
monthly_shifted.sde_monthly_mean.resample(time="1Y").mean().rename("sde_yearly_mean"),
monthly_shifted.sshf_monthly_sum.resample(time="1Y").sum().rename("sshf_yearly_sum"),
monthly_shifted.lblt_monthly_max.resample(time="1Y").max().rename("lblt_yearly_max"),
# Enriched variables
monthly_shifted.t2m_monthly_avg.resample(time="1Y").mean().rename("t2m_yearly_avg"),
# TODO: Check if this is correct -> use daily / hourly data instead for range and skew?
monthly_shifted.t2m_monthly_range.resample(time="1Y").mean().rename("t2m_daily_range_yearly_avg"),
monthly_shifted.t2m_monthly_skew.resample(time="1Y").mean().rename("t2m_daily_skew_yearly_avg"),
monthly_shifted.thawing_degree_days_monthly.resample(time="1Y").sum().rename("thawing_degree_days_yearly"),
monthly_shifted.freezing_degree_days_monthly.resample(time="1Y")
.sum()
.rename("freezing_degree_days_yearly"),
monthly_shifted.thawing_days_monthly.resample(time="1Y").sum().rename("thawing_days_yearly"),
monthly_shifted.freezing_days_monthly.resample(time="1Y").sum().rename("freezing_days_yearly"),
monthly_shifted.precipitation_occurrences_monthly.resample(time="1Y")
.sum()
.rename("precipitation_occurrences_yearly"),
monthly_shifted.snowfall_occurrences_monthly.resample(time="1Y")
.sum()
.rename("snowfall_occurrences_yearly"),
monthly_shifted.snow_isolation_monthly_mean.resample(time="1Y").mean().rename("snow_isolation_yearly_mean"),
]
)
# Summer / Winter aggregates
winter_months = [1, 2, 3, 4, 5, 6, 7] # These do NOT correspond to calendar months, but to the shifted months
summer_months = [8, 9, 10, 11, 12]
monthly_shifted_winter = monthly_shifted.sel(time=monthly_shifted.time.dt.month.isin(winter_months))
monthly_shifted_summer = monthly_shifted.sel(time=monthly_shifted.time.dt.month.isin(summer_months))
winter = xr.merge(
[
# Original variables
monthly_shifted_winter.t2m_monthly_min.resample(time="1Y").min().rename("t2m_winter_min"),
monthly_shifted_winter.t2m_monthly_max.resample(time="1Y").max().rename("t2m_winter_max"),
monthly_shifted_winter.tp_monthly_sum.resample(time="1Y").sum().rename("tp_winter_sum"),
monthly_shifted_winter.sf_monthly_sum.resample(time="1Y").sum().rename("sf_winter_sum"),
monthly_shifted_winter.snowc_monthly_mean.resample(time="1Y").mean().rename("snowc_winter_mean"),
monthly_shifted_winter.sde_monthly_mean.resample(time="1Y").mean().rename("sde_winter_mean"),
monthly_shifted_winter.sshf_monthly_sum.resample(time="1Y").sum().rename("sshf_winter_sum"),
monthly_shifted_winter.lblt_monthly_max.resample(time="1Y").max().rename("lblt_winter_max"),
# Enriched variables
monthly_shifted_winter.t2m_monthly_avg.resample(time="1Y").mean().rename("t2m_winter_avg"),
# TODO: Check if this is correct -> use daily / hourly data instead for range and skew?
monthly_shifted_winter.t2m_monthly_range.resample(time="1Y").mean().rename("t2m_daily_range_winter_avg"),
monthly_shifted_winter.t2m_monthly_skew.resample(time="1Y").mean().rename("t2m_daily_skew_winter_avg"),
monthly_shifted_winter.thawing_degree_days_monthly.resample(time="1Y")
.sum()
.rename("thawing_degree_days_winter"),
monthly_shifted_winter.freezing_degree_days_monthly.resample(time="1Y")
.sum()
.rename("freezing_degree_days_winter"),
monthly_shifted_winter.thawing_days_monthly.resample(time="1Y").sum().rename("thawing_days_winter"),
monthly_shifted_winter.freezing_days_monthly.resample(time="1Y").sum().rename("freezing_days_winter"),
monthly_shifted_winter.precipitation_occurrences_monthly.resample(time="1Y")
.sum()
.rename("precipitation_occurrences_winter"),
monthly_shifted_winter.snowfall_occurrences_monthly.resample(time="1Y")
.sum()
.rename("snowfall_occurrences_winter"),
monthly_shifted_winter.snow_isolation_monthly_mean.resample(time="1Y")
.mean()
.rename("snow_isolation_winter_mean"),
]
)
summer = xr.merge(
[
# Original variables
monthly_shifted_summer.t2m_monthly_min.resample(time="1Y").min().rename("t2m_summer_min"),
monthly_shifted_summer.t2m_monthly_max.resample(time="1Y").max().rename("t2m_summer_max"),
monthly_shifted_summer.tp_monthly_sum.resample(time="1Y").sum().rename("tp_summer_sum"),
monthly_shifted_summer.sf_monthly_sum.resample(time="1Y").sum().rename("sf_summer_sum"),
monthly_shifted_summer.snowc_monthly_mean.resample(time="1Y").mean().rename("snowc_summer_mean"),
monthly_shifted_summer.sde_monthly_mean.resample(time="1Y").mean().rename("sde_summer_mean"),
monthly_shifted_summer.sshf_monthly_sum.resample(time="1Y").sum().rename("sshf_summer_sum"),
monthly_shifted_summer.lblt_monthly_max.resample(time="1Y").max().rename("lblt_summer_max"),
# Enriched variables
monthly_shifted_summer.t2m_monthly_avg.resample(time="1Y").mean().rename("t2m_summer_avg"),
# TODO: Check if this is correct -> use daily / hourly data instead for range and skew?
monthly_shifted_summer.t2m_monthly_range.resample(time="1Y").mean().rename("t2m_daily_range_summer_avg"),
monthly_shifted_summer.t2m_monthly_skew.resample(time="1Y").mean().rename("t2m_daily_skew_summer_avg"),
monthly_shifted_summer.thawing_degree_days_summer.resample(time="1Y")
.sum()
.rename("thawing_degree_days_summer"),
monthly_shifted_summer.freezing_degree_days_summer.resample(time="1Y")
.sum()
.rename("freezing_degree_days_summer"),
monthly_shifted_summer.thawing_days_summer.resample(time="1Y").sum().rename("thawing_days_summer"),
monthly_shifted_summer.freezing_days_summer.resample(time="1Y").sum().rename("freezing_days_summer"),
monthly_shifted_summer.precipitation_occurrences_summer.resample(time="1Y")
.sum()
.rename("precipitation_occurrences_summer"),
monthly_shifted_summer.snowfall_occurrences_summer.resample(time="1Y")
.sum()
.rename("snowfall_occurrences_summer"),
monthly_shifted_summer.snow_isolation_summer.resample(time="1Y")
.mean()
.rename("snow_isolation_summer_mean"),
]
)
combined = xr.merge([yearly, summer, winter])
combined.to_zarr(YEARLY_PATH, mode="w", encoding=create_encoding(combined), consolidated=False)
def cli(grid: Literal["hex", "healpix"], level: int, download: bool = False, n_workers: int = 10):
"""Run the CLI for ERA5 data processing.
Args:
grid (Literal["hex", "healpix"]): The grid type to use.
level (int): The processing level.
download (bool, optional): Whether to download data. Defaults to False.
n_workers (int, optional): Number of workers for parallel processing. Defaults to 10.
"""
cluster = dd.LocalCluster(n_workers=n_workers, threads_per_worker=4, memory_limit="20GB")
client = dd.Client(cluster)
print(client)
print(client.dashboard_link)
if download:
download_daily_aggregated()
print("Downloaded and aggregated ERA5 data.")
grid = gpd.read_parquet(DATA_DIR / f"grids/permafrost_{grid}{level}_grid.parquet")
spatial_matching(grid, n_workers=n_workers)
print("Spatially matched ERA5 data to grid.")
monthly_aggregate()
yearly_aggregate()
print("Enriched ERA5 data with additional features and aggregated it temporally.")
if __name__ == "__main__":
cyclopts.run(cli)