Refactor
This commit is contained in:
parent
3ad332b5a8
commit
eeab8fff1e
16 changed files with 536 additions and 943 deletions
|
|
@ -1,5 +1,5 @@
|
|||
[project]
|
||||
name = "entropic-perma-risk"
|
||||
name = "entropice"
|
||||
version = "0.1.0"
|
||||
description = "Add your description here"
|
||||
readme = "README.md"
|
||||
|
|
@ -49,10 +49,14 @@ dependencies = [
|
|||
]
|
||||
|
||||
[project.scripts]
|
||||
create-grid = "steps.s0_0_grids.create_grid:main"
|
||||
rts = "steps.s0_1_rts.rts:main"
|
||||
alpha-earth = "steps.s1_0_alphaearth.alphaearth:main"
|
||||
era5 = "steps.s1_1_era5.era5:cli"
|
||||
create-grid = "entropice.grids:main"
|
||||
darts = "entropice.darts:main"
|
||||
alpha-earth = "entropice.alphaearth:main"
|
||||
era5 = "entropice.era5:cli"
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[tool.uv]
|
||||
package = true
|
||||
|
|
|
|||
2
src/entropice/__init__.py
Normal file
2
src/entropice/__init__.py
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
def hello() -> str:
|
||||
return "Hello from entropice!"
|
||||
|
|
@ -4,9 +4,7 @@ Author: Tobias Hölzer
|
|||
Date: October 2025
|
||||
"""
|
||||
|
||||
import os
|
||||
import warnings
|
||||
from pathlib import Path
|
||||
from typing import Literal
|
||||
|
||||
import cyclopts
|
||||
|
|
@ -16,9 +14,13 @@ import geopandas as gpd
|
|||
import numpy as np
|
||||
import pandas as pd
|
||||
import xarray as xr
|
||||
import xdggs
|
||||
from rich import pretty, print, traceback
|
||||
from rich.progress import track
|
||||
|
||||
from entropice import codecs, grids
|
||||
from entropice.paths import get_annual_embeddings_file, get_embeddings_store
|
||||
|
||||
# Filter out the GeoDataFrame.swapaxes deprecation warning
|
||||
warnings.filterwarnings("ignore", message=".*GeoDataFrame.swapaxes.*", category=FutureWarning)
|
||||
|
||||
|
|
@ -26,26 +28,19 @@ pretty.install()
|
|||
traceback.install()
|
||||
ee.Initialize(project="ee-tobias-hoelzer")
|
||||
|
||||
DATA_DIR = Path(os.environ.get("DATA_DIR", "../../data")) / "entropyc-rts"
|
||||
EMBEDDINGS_DIR = DATA_DIR / "embeddings"
|
||||
EMBEDDINGS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
cli = cyclopts.App(name="alpha-earth")
|
||||
|
||||
|
||||
@cli.command()
|
||||
def download(grid: Literal["hex", "healpix"], level: int, backup_intermediate: bool = False):
|
||||
def download(grid: Literal["hex", "healpix"], level: int):
|
||||
"""Extract satellite embeddings from Google Earth Engine and map them to a grid.
|
||||
|
||||
Args:
|
||||
grid (Literal["hex", "healpix"]): The grid type to use.
|
||||
level (int): The grid level to use.
|
||||
backup_intermediate (bool, optional): Whether to backup intermediate results. Defaults to False.
|
||||
|
||||
"""
|
||||
gridname = f"permafrost_{grid}{level}"
|
||||
grid = gpd.read_parquet(DATA_DIR / f"grids/{gridname}_grid.parquet")
|
||||
grid_gdf = grids.open(grid, level)
|
||||
|
||||
for year in track(range(2017, 2025), total=8, description="Processing years..."):
|
||||
embedding_collection = ee.ImageCollection("GOOGLE/SATELLITE_EMBEDDING/V1/ANNUAL")
|
||||
|
|
@ -72,9 +67,9 @@ def download(grid: Literal["hex", "healpix"], level: int, backup_intermediate: b
|
|||
# Process grid in batches of 100
|
||||
batch_size = 100
|
||||
all_results = []
|
||||
n_batches = len(grid) // batch_size
|
||||
n_batches = len(grid_gdf) // batch_size
|
||||
for batch_num, batch_grid in track(
|
||||
enumerate(np.array_split(grid, n_batches)),
|
||||
enumerate(np.array_split(grid_gdf, n_batches)),
|
||||
description="Processing batches...",
|
||||
total=n_batches,
|
||||
):
|
||||
|
|
@ -88,18 +83,12 @@ def download(grid: Literal["hex", "healpix"], level: int, backup_intermediate: b
|
|||
# Store batch results
|
||||
all_results.append(df_batch)
|
||||
|
||||
# Save batch immediately to disk as backup
|
||||
if backup_intermediate:
|
||||
batch_filename = f"{gridname}_embeddings-{year}_batch{batch_num:06d}.parquet"
|
||||
batch_result = batch_grid.merge(df_batch[[*bands, "cell_id"]], on="cell_id", how="left")
|
||||
batch_result.to_parquet(EMBEDDINGS_DIR / f"{batch_filename}")
|
||||
|
||||
# Combine all batch results
|
||||
df = pd.concat(all_results, ignore_index=True)
|
||||
embeddings_on_grid = grid.merge(df[[*bands, "cell_id"]], on="cell_id", how="left")
|
||||
embeddings_file = EMBEDDINGS_DIR / f"{gridname}_embeddings-{year}.parquet"
|
||||
embeddings_file = get_annual_embeddings_file(grid, level, year)
|
||||
embeddings_on_grid.to_parquet(embeddings_file)
|
||||
print(f"Saved embeddings for year {year} to {embeddings_file.resolve()}.")
|
||||
print(f"Saved embeddings for year {year} to {embeddings_file}.")
|
||||
|
||||
|
||||
@cli.command()
|
||||
|
|
@ -111,36 +100,38 @@ def combine_to_zarr(grid: Literal["hex", "healpix"], level: int):
|
|||
level (int): The grid level to use.
|
||||
|
||||
"""
|
||||
embs = gpd.read_parquet(DATA_DIR / "embeddings" / f"permafrost_{grid}{level}_embeddings-2017.parquet")
|
||||
# ? Converting cell IDs from hex strings to integers for xdggs compatibility
|
||||
cells = [int(cid, 16) for cid in embs.cell_id.to_list()]
|
||||
cell_ids = grids.get_cell_ids(grid, level)
|
||||
years = list(range(2017, 2025))
|
||||
aggs = ["median", "stdDev", "min", "max", "mean", "p1", "p5", "p25", "p75", "p95", "p99"]
|
||||
bands = [f"A{str(i).zfill(2)}" for i in range(64)]
|
||||
|
||||
a = xr.DataArray(
|
||||
np.nan,
|
||||
dims=("year", "cell", "band", "agg"),
|
||||
coords={"year": years, "cell": cells, "band": bands, "agg": aggs},
|
||||
)
|
||||
dims=("year", "cell_ids", "band", "agg"),
|
||||
coords={"year": years, "cell_ids": cell_ids, "band": bands, "agg": aggs},
|
||||
).astype(np.float32)
|
||||
|
||||
# ? These attributes are needed for xdggs
|
||||
a.cell.attrs = {
|
||||
a.cell_ids.attrs = {
|
||||
"grid_name": "h3" if grid == "hex" else "healpix",
|
||||
"level": level,
|
||||
}
|
||||
if grid == "healpix":
|
||||
a.cell.attrs["indexing_scheme"] = "nested"
|
||||
a.cell_ids.attrs["indexing_scheme"] = "nested"
|
||||
|
||||
for year in track(years, total=len(years), description="Processing years..."):
|
||||
embs = gpd.read_parquet(DATA_DIR / "embeddings" / f"permafrost_{grid}{level}_embeddings-{year}.parquet")
|
||||
embeddings_file = get_annual_embeddings_file(grid, level, year)
|
||||
embs = gpd.read_parquet(embeddings_file)
|
||||
for band in bands:
|
||||
for agg in aggs:
|
||||
col = f"{band}_{agg}"
|
||||
a.loc[{"band": band, "agg": agg, "year": year}] = embs[col].to_list()
|
||||
|
||||
zarr_path = EMBEDDINGS_DIR / f"permafrost_{grid}{level}_embeddings.zarr"
|
||||
a.to_zarr(zarr_path, consolidated=False, mode="w")
|
||||
print(f"Saved combined embeddings to {zarr_path.resolve()}.")
|
||||
a = xdggs.decode(a)
|
||||
|
||||
zarr_path = get_embeddings_store(grid, level)
|
||||
a.to_zarr(zarr_path, consolidated=False, mode="w", encoding=codecs.from_ds(a))
|
||||
print(f"Saved combined embeddings to {zarr_path}.")
|
||||
|
||||
|
||||
def main(): # noqa: D103
|
||||
31
src/entropice/codecs.py
Normal file
31
src/entropice/codecs.py
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
"""Encoding utilities for zarr dataset storage."""
|
||||
|
||||
import xarray as xr
|
||||
from zarr.codecs import BloscCodec
|
||||
|
||||
|
||||
def from_ds(ds: xr.Dataset, store_floats_as_float32: bool = True, include_coords: bool = True) -> dict:
|
||||
"""Create compression encoding for zarr dataset storage.
|
||||
|
||||
Creates Blosc compression configuration for all data variables and coordinates
|
||||
in the dataset using zstd compression with level 5.
|
||||
|
||||
Args:
|
||||
ds (xr.Dataset): The xarray Dataset to create encoding for.
|
||||
store_floats_as_float32 (bool, optional): Whether to store floating point data as float32.
|
||||
Defaults to True.
|
||||
include_coords (bool, optional): Whether to include coordinates in the encoding.
|
||||
This is useful when appending to an existing store.
|
||||
Defaults to True.
|
||||
|
||||
Returns:
|
||||
dict: Encoding dictionary with compression settings for each variable.
|
||||
|
||||
"""
|
||||
var_names = [*ds.data_vars, *ds.coords] if include_coords else ds.data_vars
|
||||
encoding = {var: {"compressors": BloscCodec(cname="zstd", clevel=5)} for var in var_names}
|
||||
if store_floats_as_float32:
|
||||
for var in ds.data_vars:
|
||||
if ds[var].dtype == "float64":
|
||||
encoding[var]["dtype"] = "float32"
|
||||
return encoding
|
||||
72
src/entropice/darts.py
Normal file
72
src/entropice/darts.py
Normal file
|
|
@ -0,0 +1,72 @@
|
|||
"""Labels of Retrogressive-Thaw-Slumps (RTS).
|
||||
|
||||
Assumes that the level 1 and level 2 DARTS features have been downloaded into $DATA_DIR / entropyc-rts / darts: https://arcticdata.io/catalog/view/doi:10.18739/A22B8VD7C
|
||||
|
||||
Author: Tobias Hölzer
|
||||
Date: October 2025
|
||||
"""
|
||||
|
||||
from typing import Literal
|
||||
|
||||
import cyclopts
|
||||
import geopandas as gpd
|
||||
from rich import pretty, print, traceback
|
||||
from rich.progress import track
|
||||
from stopuhr import stopwatch
|
||||
|
||||
from entropice import grids
|
||||
from entropice.paths import dartsl2_cov_file, dartsl2_file, get_darts_rts_file
|
||||
|
||||
traceback.install()
|
||||
pretty.install()
|
||||
|
||||
|
||||
def extract_darts_rts(grid: Literal["hex", "healpix"], level: int):
|
||||
"""Extract RTS labels from DARTS dataset.
|
||||
|
||||
Args:
|
||||
grid (Literal["hex", "healpix"]): The grid type to use.
|
||||
level (int): The grid level to use.
|
||||
|
||||
"""
|
||||
with stopwatch("Load data"):
|
||||
darts_l2 = gpd.read_parquet(dartsl2_file)
|
||||
darts_cov_l2 = gpd.read_parquet(dartsl2_cov_file)
|
||||
grid_gdf = grids.open(grid, level)
|
||||
|
||||
with stopwatch("Extract RTS labels"):
|
||||
grid_l2 = grid_gdf.overlay(darts_l2.to_crs(grid_gdf.crs), how="intersection")
|
||||
grid_cov_l2 = grid_gdf.overlay(darts_cov_l2.to_crs(grid_gdf.crs), how="intersection")
|
||||
|
||||
years = list(grid_cov_l2["year"].unique())
|
||||
for year in track(years, total=len(years), description="Processing years..."):
|
||||
with stopwatch("Processing RTS", log=False):
|
||||
subset = grid_l2[grid_l2["year"] == year]
|
||||
subset_cov = grid_cov_l2[grid_cov_l2["year"] == year]
|
||||
|
||||
counts = subset.groupby("cell_id").size()
|
||||
grid_gdf[f"darts_{year}_rts_count"] = grid_gdf.cell_id.map(counts)
|
||||
|
||||
areas = subset.groupby("cell_id").apply(lambda x: x.geometry.area.sum(), include_groups=False)
|
||||
grid_gdf[f"darts_{year}_rts_area"] = grid_gdf.cell_id.map(areas)
|
||||
|
||||
areas_cov = subset_cov.groupby("cell_id").apply(lambda x: x.geometry.area.sum(), include_groups=False)
|
||||
grid_gdf[f"darts_{year}_covered_area"] = grid_gdf.cell_id.map(areas_cov)
|
||||
grid_gdf[f"darts_{year}_coverage"] = grid_gdf[f"darts_{year}_covered_area"] / grid_gdf.geometry.area
|
||||
|
||||
grid_gdf[f"darts_{year}_rts_density"] = (
|
||||
grid_gdf[f"darts_{year}_rts_area"] / grid_gdf[f"darts_{year}_covered_area"]
|
||||
)
|
||||
|
||||
output_path = get_darts_rts_file(grid, level)
|
||||
grid_gdf.to_parquet(output_path)
|
||||
print(f"Saved RTS labels to {output_path}")
|
||||
stopwatch.summary()
|
||||
|
||||
|
||||
def main(): # noqa: D103
|
||||
cyclopts.run(extract_darts_rts)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,3 +1,4 @@
|
|||
# ruff: noqa: PD011
|
||||
"""Download and preprocess ERA5 data.
|
||||
|
||||
Variables of Interest:
|
||||
|
|
@ -9,6 +10,12 @@ Variables of Interest:
|
|||
- Surface sensible heat flux (sshf) [accum]
|
||||
- Lake ice bottom temperature (lblt) [instant]
|
||||
|
||||
Snow Fall and Total precipitation are both further accumulated over a day -
|
||||
thus instead of taking the daily sum, only the last value of the day is taken.
|
||||
This is 00:00 of the next day, e.g.:
|
||||
tp_2020_06_23 <- tp_2020_06_24_00:00
|
||||
See: https://confluence.ecmwf.int/display/CKB/ERA5-Land%3A+data+documentation#ERA5Land:datadocumentation-accumulationsAccumulations
|
||||
|
||||
Naming patterns:
|
||||
- Instant Variables are downloaded already as statistically aggregated (lossy),
|
||||
therefore their names get the aggregation as suffix
|
||||
|
|
@ -66,14 +73,12 @@ Author: Tobias Hölzer
|
|||
Date: June to October 2025
|
||||
"""
|
||||
|
||||
import os
|
||||
import cProfile
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Literal
|
||||
|
||||
import cyclopts
|
||||
import dask.distributed as dd
|
||||
import geopandas as gpd
|
||||
import numpy as np
|
||||
import odc.geo
|
||||
import odc.geo.xr
|
||||
|
|
@ -84,32 +89,16 @@ import xarray as xr
|
|||
from rich import pretty, print, traceback
|
||||
from rich.progress import track
|
||||
from shapely.geometry import LineString, Polygon
|
||||
from zarr.codecs import BloscCodec
|
||||
from stopuhr import stopwatch
|
||||
|
||||
traceback.install(show_locals=True, suppress=[cyclopts, xr, pd])
|
||||
from entropice import codecs, grids, watermask
|
||||
from entropice.paths import get_era5_stores
|
||||
|
||||
traceback.install(show_locals=True, suppress=[cyclopts, xr, pd, cProfile])
|
||||
pretty.install()
|
||||
|
||||
cli = cyclopts.App()
|
||||
|
||||
DATA_DIR = Path(os.environ.get("DATA_DIR", "data")) / "entropyc-rts"
|
||||
ERA5_DIR = DATA_DIR / "era5"
|
||||
DAILY_RAW_PATH = ERA5_DIR / "daily_raw.zarr"
|
||||
DAILY_ENRICHED_PATH = ERA5_DIR / "daily_enriched.zarr"
|
||||
MONTHLY_RAW_PATH = ERA5_DIR / "monthly_raw.zarr"
|
||||
YEARLY_RAW_PATH = ERA5_DIR / "yearly_aligned.zarr"
|
||||
SUMMER_RAW_PATH = ERA5_DIR / "summer_aligned.zarr"
|
||||
WINTER_RAW_PATH = ERA5_DIR / "winter_aligned.zarr"
|
||||
|
||||
|
||||
def _get_grid_paths(
|
||||
agg: Literal["daily", "monthly", "summer", "winter", "yearly"],
|
||||
grid: Literal["hex", "healpix"],
|
||||
level: int,
|
||||
):
|
||||
gridname = f"permafrost_{grid}{level}"
|
||||
aligned_path = ERA5_DIR / f"{agg}_{gridname}.zarr"
|
||||
return aligned_path
|
||||
|
||||
|
||||
min_lat = 50
|
||||
max_lat = 83.7 # Ensures the right Chunks Size (90 - 64 / 10 + 0.1)
|
||||
|
|
@ -145,24 +134,6 @@ accums = {
|
|||
}
|
||||
|
||||
|
||||
def create_encoding(ds: xr.Dataset):
|
||||
"""Create compression encoding for zarr dataset storage.
|
||||
|
||||
Creates Blosc compression configuration for all data variables and coordinates
|
||||
in the dataset using zstd compression with level 5.
|
||||
|
||||
Args:
|
||||
ds (xr.Dataset): The xarray Dataset to create encoding for.
|
||||
|
||||
Returns:
|
||||
dict: Encoding dictionary with compression settings for each variable.
|
||||
|
||||
"""
|
||||
# encoding = {var: {"compressors": BloscCodec(cname="zlib", clevel=9)} for var in ds.data_vars}
|
||||
encoding = {var: {"compressors": BloscCodec(cname="zstd", clevel=5)} for var in [*ds.data_vars, *ds.coords]}
|
||||
return encoding
|
||||
|
||||
|
||||
# ================
|
||||
# === Download ===
|
||||
# ================
|
||||
|
|
@ -208,16 +179,18 @@ def download_daily_aggregated():
|
|||
daily_raw = xr.merge(
|
||||
[
|
||||
# Instant
|
||||
era5.t2m.resample(time="1D").max().rename("t2m_max"),
|
||||
era5.t2m.resample(time="1D").min().rename("t2m_min"),
|
||||
era5.t2m.resample(time="1D").mean().rename("t2m_mean"),
|
||||
era5.snowc.resample(time="1D").mean().rename("snowc_mean"),
|
||||
era5.sde.resample(time="1D").mean().rename("sde_mean"),
|
||||
era5.lblt.resample(time="1D").max().rename("lblt_max"),
|
||||
era5.t2m.resample(time="1D").max().rename("t2m_max").astype(np.float32),
|
||||
era5.t2m.resample(time="1D").min().rename("t2m_min").astype(np.float32),
|
||||
era5.t2m.resample(time="1D").mean().rename("t2m_mean").astype(np.float32),
|
||||
era5.snowc.resample(time="1D").mean().rename("snowc_mean").astype(np.float32),
|
||||
era5.sde.resample(time="1D").mean().rename("sde_mean").astype(np.float32),
|
||||
era5.lblt.resample(time="1D").max().rename("lblt_max").astype(np.float32),
|
||||
# Accum
|
||||
era5.tp.resample(time="1D").sum().rename("tp"),
|
||||
era5.sf.resample(time="1D").sum().rename("sf"),
|
||||
era5.sshf.resample(time="1D").sum().rename("sshf"),
|
||||
era5.sshf.resample(time="1D").sum(skipna=False).rename("sshf").astype(np.float32),
|
||||
# Precipitation and snow fall are special
|
||||
# Take only the last value of the day (00:00 of next day)
|
||||
era5.tp.resample(time="1D").first().shift(time=-1).rename("tp").astype(np.float32),
|
||||
era5.sf.resample(time="1D").first().shift(time=-1).rename("sf").astype(np.float32),
|
||||
]
|
||||
)
|
||||
|
||||
|
|
@ -225,16 +198,18 @@ def download_daily_aggregated():
|
|||
daily_raw["t2m_max"].attrs = {"long_name": "Daily maximum 2 metre temperature", "units": "K"}
|
||||
daily_raw["t2m_min"].attrs = {"long_name": "Daily minimum 2 metre temperature", "units": "K"}
|
||||
daily_raw["t2m_mean"].attrs = {"long_name": "Daily mean 2 metre temperature", "units": "K"}
|
||||
daily_raw["tp"].attrs = {"long_name": "Daily total precipitation", "units": "m"}
|
||||
daily_raw["sf"].attrs = {"long_name": "Daily total snow fall", "units": "m"}
|
||||
daily_raw["snowc_mean"].attrs = {"long_name": "Daily mean snow cover", "units": "m"}
|
||||
daily_raw["sde_mean"].attrs = {"long_name": "Daily mean snow depth", "units": "m"}
|
||||
daily_raw["sshf"].attrs = {"long_name": "Daily total surface sensible heat flux", "units": "J/m²"}
|
||||
daily_raw["lblt_max"].attrs = {"long_name": "Daily maximum lake ice bottom temperature", "units": "K"}
|
||||
daily_raw["tp"].attrs = {"long_name": "Daily total precipitation", "units": "m"} # Units are rather m^3 / m^2
|
||||
daily_raw["sf"].attrs = {"long_name": "Daily total snow fall", "units": "m"} # Units are rather m^3 / m^2
|
||||
daily_raw["sshf"].attrs = {"long_name": "Daily total surface sensible heat flux", "units": "J/m²"}
|
||||
|
||||
daily_raw = daily_raw.odc.assign_crs("epsg:4326")
|
||||
daily_raw = daily_raw.drop_vars(["surface", "number", "depthBelowLandLayer"])
|
||||
daily_raw.to_zarr(DAILY_RAW_PATH, mode="w", encoding=create_encoding(daily_raw), consolidated=False)
|
||||
daily_store = get_era5_stores("daily")
|
||||
print(f"Saving downloaded and daily aggregated ERA5 data to {daily_store}.")
|
||||
daily_raw.to_zarr(daily_store, mode="w", encoding=codecs.from_ds(daily_raw), consolidated=False)
|
||||
|
||||
|
||||
@cli.command
|
||||
|
|
@ -252,7 +227,7 @@ def download():
|
|||
print(client)
|
||||
print(client.dashboard_link)
|
||||
download_daily_aggregated()
|
||||
print(f"Downloaded and aggregated ERA5 data to {DAILY_RAW_PATH.resolve()}.")
|
||||
print("Downloaded and aggregated ERA5 data.")
|
||||
|
||||
|
||||
# ============================
|
||||
|
|
@ -275,36 +250,62 @@ def daily_enrich():
|
|||
- Snow isolation index
|
||||
|
||||
"""
|
||||
daily = xr.open_zarr(DAILY_RAW_PATH, consolidated=False).set_coords("spatial_ref")
|
||||
daily_store = get_era5_stores("daily")
|
||||
daily = xr.open_zarr(daily_store, consolidated=False).set_coords("spatial_ref")
|
||||
assert "time" in daily.dims, f"Expected dim 'time' to be in {daily.dims=}"
|
||||
|
||||
# For better dask performance, all variables are written immediately after calculation
|
||||
# The smart scheduling which could performantly handle more is according to docs not yet implemented.
|
||||
# See https://docs.xarray.dev/en/stable/user-guide/dask.html#best-practices point 3 or https://github.com/dask/dask/issues/874
|
||||
def _store(v: str):
|
||||
nonlocal daily
|
||||
|
||||
encoding = codecs.from_ds(daily[[v]], include_coords=False)
|
||||
print(f"Storing enriched daily variable {v} to {daily_store}...")
|
||||
with stopwatch("Storing enriched daily variable"):
|
||||
daily[[v]].to_zarr(daily_store, mode="a", encoding=encoding, consolidated=False)
|
||||
daily = xr.open_zarr(daily_store, consolidated=False).set_coords("spatial_ref")
|
||||
|
||||
# Formulas based on Groeke et. al. (2025) Stochastic Weather generation...
|
||||
daily["t2m_avg"] = (daily.t2m_max + daily.t2m_min) / 2
|
||||
daily.t2m_avg.attrs = {"long_name": "Daily average 2 metre temperature", "units": "K"}
|
||||
_store("t2m_avg")
|
||||
|
||||
daily["t2m_range"] = daily.t2m_max - daily.t2m_min
|
||||
daily.t2m_range.attrs = {"long_name": "Daily range of 2 metre temperature", "units": "K"}
|
||||
_store("t2m_range")
|
||||
|
||||
daily["t2m_skew"] = (daily.t2m_mean - daily.t2m_min) / daily.t2m_range
|
||||
daily.t2m_skew.attrs = {"long_name": "Daily skewness of 2 metre temperature"}
|
||||
_store("t2m_skew")
|
||||
|
||||
daily["thawing_degree_days"] = (daily.t2m_avg - 273.15).clip(min=0)
|
||||
daily.thawing_degree_days.attrs = {"long_name": "Thawing degree days", "units": "K"}
|
||||
_store("thawing_degree_days")
|
||||
|
||||
daily["freezing_degree_days"] = (273.15 - daily.t2m_avg).clip(min=0)
|
||||
daily.freezing_degree_days.attrs = {"long_name": "Freezing degree days", "units": "K"}
|
||||
_store("freezing_degree_days")
|
||||
|
||||
daily["thawing_days"] = (daily.t2m_avg > 273.15).astype(int)
|
||||
daily.thawing_days.attrs = {"long_name": "Thawing days"}
|
||||
_store("thawing_days")
|
||||
|
||||
daily["freezing_days"] = (daily.t2m_avg < 273.15).astype(int)
|
||||
daily.freezing_days.attrs = {"long_name": "Freezing days"}
|
||||
_store("freezing_days")
|
||||
|
||||
daily["precipitation_occurrences"] = (daily.tp > 0).astype(int)
|
||||
daily["precipitation_occurrences"] = (daily.tp > 0.001).astype(int)
|
||||
daily.precipitation_occurrences.attrs = {"long_name": "Precipitation occurrences"}
|
||||
daily["snowfall_occurrences"] = (daily.sf > 0).astype(int)
|
||||
_store("precipitation_occurrences")
|
||||
|
||||
daily["snowfall_occurrences"] = (daily.sf > 0.001).astype(int)
|
||||
daily.snowfall_occurrences.attrs = {"long_name": "Snowfall occurrences"}
|
||||
_store("snowfall_occurrences")
|
||||
|
||||
daily["naive_snow_isolation"] = daily.snowc_mean * daily.sde_mean
|
||||
daily.naive_snow_isolation.attrs = {"long_name": "Naive snow isolation"}
|
||||
|
||||
daily.to_zarr(DAILY_ENRICHED_PATH, mode="w", encoding=create_encoding(daily), consolidated=False)
|
||||
_store("naive_snow_isolation")
|
||||
|
||||
|
||||
def monthly_aggregate():
|
||||
|
|
@ -318,7 +319,8 @@ def monthly_aggregate():
|
|||
The aggregated monthly data is saved to a zarr file for further processing.
|
||||
|
||||
"""
|
||||
daily = xr.open_zarr(DAILY_ENRICHED_PATH, consolidated=False)
|
||||
daily_store = get_era5_stores("daily")
|
||||
daily = xr.open_zarr(daily_store, consolidated=False).set_coords("spatial_ref")
|
||||
assert "time" in daily.dims, f"Expected dim 'time' to be in {daily.dims=}"
|
||||
daily = daily.sel(time=slice(min_time, max_time))
|
||||
|
||||
|
|
@ -348,8 +350,10 @@ def monthly_aggregate():
|
|||
monthly_accums.append(agg)
|
||||
|
||||
monthly = xr.merge(monthly_instants + monthly_accums)
|
||||
monthly = monthly.chunk({"time": len(monthly.time), "latitude": 64, "longitude": 64})
|
||||
monthly.to_zarr(MONTHLY_RAW_PATH, mode="w", encoding=create_encoding(monthly), consolidated=False)
|
||||
monthly = monthly.chunk({"time": len(monthly.time), "latitude": 256, "longitude": 256}) # ~ 100Mb / chunk for f32
|
||||
monthly_store = get_era5_stores("monthly")
|
||||
print(f"Saving monthly aggregated ERA5 data to {monthly_store}.")
|
||||
monthly.to_zarr(monthly_store, mode="w", encoding=codecs.from_ds(monthly), consolidated=False)
|
||||
|
||||
|
||||
def multi_monthly_aggregate(monthly: xr.Dataset, n: int = 12) -> xr.Dataset:
|
||||
|
|
@ -407,47 +411,12 @@ def multi_monthly_aggregate(monthly: xr.Dataset, n: int = 12) -> xr.Dataset:
|
|||
"link": "https://tc.copernicus.org/articles/11/989/2017/tc-11-989-2017.pdf",
|
||||
}
|
||||
|
||||
multimonthly = multimonthly.chunk({"time": len(multimonthly.time), "latitude": 64, "longitude": 64})
|
||||
multimonthly = multimonthly.chunk(
|
||||
{"time": len(multimonthly.time), "latitude": 128, "longitude": 1024}
|
||||
) # ~36Mb / chunk for f64
|
||||
return multimonthly
|
||||
|
||||
|
||||
def yearly_and_seasonal_aggregate():
|
||||
"""Aggregate monthly ERA5 data to yearly resolution with seasonal splits.
|
||||
|
||||
Takes monthly aggregated data and creates yearly aggregates using a shifted
|
||||
calendar (October to September) to better capture Arctic seasonal patterns.
|
||||
Creates separate aggregates for full year, winter (Oct-Apr), and summer
|
||||
(May-Sep) periods.
|
||||
|
||||
The first and last incomplete years are excluded from the analysis.
|
||||
Winter months are defined as months 1-7 in the shifted calendar,
|
||||
and summer months are 8-12.
|
||||
|
||||
The final dataset includes yearly, winter, and summer aggregates for all
|
||||
climate variables, saved to a zarr file.
|
||||
|
||||
"""
|
||||
monthly = xr.open_zarr(MONTHLY_RAW_PATH, consolidated=False).set_coords("spatial_ref")
|
||||
assert "time" in monthly.dims, f"Expected dim 'time' to be in {monthly.dims=}"
|
||||
|
||||
# "Shift" the calendar by slicing the first Jan-Sep and the last Oct-Dec months
|
||||
first_year = monthly.time.dt.year.min().item()
|
||||
last_year = monthly.time.dt.year.max().item()
|
||||
monthly = monthly.sel(time=slice(f"{first_year}-10-01", f"{last_year}-09-30"))
|
||||
|
||||
yearly = multi_monthly_aggregate(monthly, n=12)
|
||||
yearly = derive_yearly_variables(yearly)
|
||||
yearly.to_zarr(YEARLY_RAW_PATH, mode="w", encoding=create_encoding(yearly), consolidated=False)
|
||||
|
||||
summer_winter = multi_monthly_aggregate(monthly, n=6)
|
||||
|
||||
summer = summer_winter.sel(time=summer_winter.time.dt.month == 4)
|
||||
summer.to_zarr(SUMMER_RAW_PATH, mode="w", encoding=create_encoding(summer), consolidated=False)
|
||||
|
||||
winter = summer_winter.sel(time=summer_winter.time.dt.month == 10)
|
||||
winter.to_zarr(WINTER_RAW_PATH, mode="w", encoding=create_encoding(winter), consolidated=False)
|
||||
|
||||
|
||||
def derive_yearly_variables(yearly: xr.Dataset) -> xr.Dataset:
|
||||
"""Derive additional variables from daily data and add them to the yearly dataset.
|
||||
|
||||
|
|
@ -459,7 +428,8 @@ def derive_yearly_variables(yearly: xr.Dataset) -> xr.Dataset:
|
|||
|
||||
"""
|
||||
assert "time" in yearly.dims, f"Expected dim 'time' to be in {yearly.dims=}"
|
||||
daily = xr.open_zarr(DAILY_ENRICHED_PATH, consolidated=False).set_coords("spatial_ref")
|
||||
daily_store = get_era5_stores("daily")
|
||||
daily = xr.open_zarr(daily_store, consolidated=False).set_coords("spatial_ref")
|
||||
assert "time" in daily.dims, f"Expected dim 'time' to be in {daily.dims=}"
|
||||
daily = daily.sel(time=slice(min_time, max_time))
|
||||
# ? Note: The functions do not really account for leap years
|
||||
|
|
@ -476,7 +446,7 @@ def derive_yearly_variables(yearly: xr.Dataset) -> xr.Dataset:
|
|||
# This assumes that the 01-01 is almost everywhere one of the coldest days in the year
|
||||
first_thaw_day = daily.thawing_days.groupby("time.year").apply(lambda x: x.argmax(dim="time")) + 1
|
||||
first_thaw_day = first_thaw_day.where(~never_thaws).rename("day_of_first_thaw").rename(year="time")
|
||||
first_thaw_day["time"] = pd.to_datetime([f"{y}-10-01" for y in first_thaw_day.time.values]) # noqa: PD011
|
||||
first_thaw_day["time"] = pd.to_datetime([f"{y}-10-01" for y in first_thaw_day.time.values])
|
||||
first_thaw_day.attrs = {"long_name": "Day of first thaw in year", "units": "day of year"}
|
||||
yearly["day_of_first_thaw"] = first_thaw_day.sel(time=yearly.time)
|
||||
|
||||
|
|
@ -484,7 +454,7 @@ def derive_yearly_variables(yearly: xr.Dataset) -> xr.Dataset:
|
|||
n_days_in_year - daily.thawing_days[::-1].groupby("time.year").apply(lambda x: x.argmax(dim="time")) + 1
|
||||
)
|
||||
last_thaw_day = last_thaw_day.where(~never_thaws).rename("day_of_last_thaw").rename(year="time")
|
||||
last_thaw_day["time"] = pd.to_datetime([f"{y}-10-01" for y in last_thaw_day.time.values]) # noqa: PD011
|
||||
last_thaw_day["time"] = pd.to_datetime([f"{y}-10-01" for y in last_thaw_day.time.values])
|
||||
last_thaw_day.attrs = {"long_name": "Day of last thaw in year", "units": "day of year"}
|
||||
yearly["day_of_last_thaw"] = last_thaw_day.sel(time=yearly.time)
|
||||
|
||||
|
|
@ -506,7 +476,7 @@ def derive_yearly_variables(yearly: xr.Dataset) -> xr.Dataset:
|
|||
|
||||
first_freezing_day = daily_shifted.freezing_days.groupby("time.year").apply(lambda x: x.argmax(dim="time")) + 1
|
||||
first_freezing_day = first_freezing_day.where(~never_freezes).rename("day_of_first_freeze").rename(year="time")
|
||||
first_freezing_day["time"] = pd.to_datetime([f"{y}-10-01" for y in first_freezing_day.time.values]) # noqa: PD011
|
||||
first_freezing_day["time"] = pd.to_datetime([f"{y}-10-01" for y in first_freezing_day.time.values])
|
||||
first_freezing_day.attrs = {"long_name": "Day of first freeze in year", "units": "day of year"}
|
||||
yearly["day_of_first_freeze"] = first_freezing_day.sel(time=yearly.time)
|
||||
|
||||
|
|
@ -516,7 +486,7 @@ def derive_yearly_variables(yearly: xr.Dataset) -> xr.Dataset:
|
|||
+ 1
|
||||
)
|
||||
last_freezing_day = last_freezing_day.where(~never_freezes).rename("day_of_last_freeze").rename(year="time")
|
||||
last_freezing_day["time"] = pd.to_datetime([f"{y}-10-01" for y in last_freezing_day.time.values]) # noqa: PD011
|
||||
last_freezing_day["time"] = pd.to_datetime([f"{y}-10-01" for y in last_freezing_day.time.values])
|
||||
last_freezing_day.attrs = {"long_name": "Day of last freeze in year", "units": "day of year"}
|
||||
yearly["day_of_last_freeze"] = last_freezing_day.sel(time=yearly.time)
|
||||
|
||||
|
|
@ -528,6 +498,53 @@ def derive_yearly_variables(yearly: xr.Dataset) -> xr.Dataset:
|
|||
return yearly
|
||||
|
||||
|
||||
def yearly_and_seasonal_aggregate():
|
||||
"""Aggregate monthly ERA5 data to yearly resolution with seasonal splits.
|
||||
|
||||
Takes monthly aggregated data and creates yearly aggregates using a shifted
|
||||
calendar (October to September) to better capture Arctic seasonal patterns.
|
||||
Creates separate aggregates for full year, winter (Oct-Apr), and summer
|
||||
(May-Sep) periods.
|
||||
|
||||
The first and last incomplete years are excluded from the analysis.
|
||||
Winter months are defined as months 1-7 in the shifted calendar,
|
||||
and summer months are 8-12.
|
||||
|
||||
The final dataset includes yearly, winter, and summer aggregates for all
|
||||
climate variables, saved to a zarr file.
|
||||
|
||||
"""
|
||||
monthly_store = get_era5_stores("monthly")
|
||||
monthly = xr.open_zarr(monthly_store, consolidated=False).set_coords("spatial_ref")
|
||||
assert "time" in monthly.dims, f"Expected dim 'time' to be in {monthly.dims=}"
|
||||
|
||||
# "Shift" the calendar by slicing the first Jan-Sep and the last Oct-Dec months
|
||||
first_year = monthly.time.dt.year.min().item()
|
||||
last_year = monthly.time.dt.year.max().item()
|
||||
monthly = monthly.sel(time=slice(f"{first_year}-10-01", f"{last_year}-09-30"))
|
||||
|
||||
yearly = multi_monthly_aggregate(monthly, n=12)
|
||||
yearly = derive_yearly_variables(yearly)
|
||||
yearly = yearly.chunk({"time": len(yearly.time), "latitude": 256, "longitude": 1024}) # ~36Mb / chunk for f32
|
||||
yearly_store = get_era5_stores("yearly")
|
||||
print(f"Saving yearly aggregated ERA5 data to {yearly_store}.")
|
||||
yearly.to_zarr(yearly_store, mode="w", encoding=codecs.from_ds(yearly), consolidated=False)
|
||||
|
||||
summer_winter = multi_monthly_aggregate(monthly, n=6)
|
||||
|
||||
summer = summer_winter.sel(time=summer_winter.time.dt.month == 4)
|
||||
summer = summer.chunk({"time": len(summer.time), "latitude": 256, "longitude": 1024}) # ~36Mb / chunk for f32
|
||||
summer_store = get_era5_stores("summer")
|
||||
print(f"Saving summer aggregated ERA5 data to {summer_store}.")
|
||||
summer.to_zarr(summer_store, mode="w", encoding=codecs.from_ds(summer), consolidated=False)
|
||||
|
||||
winter = summer_winter.sel(time=summer_winter.time.dt.month == 10)
|
||||
winter = winter.chunk({"time": len(winter.time), "latitude": 256, "longitude": 1024}) # ~36Mb / chunk for f32
|
||||
winter_store = get_era5_stores("winter")
|
||||
print(f"Saving winter aggregated ERA5 data to {winter_store}.")
|
||||
winter.to_zarr(winter_store, mode="w", encoding=codecs.from_ds(winter), consolidated=False)
|
||||
|
||||
|
||||
@cli.command
|
||||
def enrich(n_workers: int = 10, monthly: bool = True, yearly: bool = True, daily: bool = True):
|
||||
"""Enrich data and pPerform temporal aggregation of ERA5 data using Dask cluster.
|
||||
|
|
@ -591,15 +608,16 @@ def _check_geom(geobox: odc.geo.geobox.GeoBox, geom: odc.geo.Geometry) -> bool:
|
|||
return (roix.stop - roix.start) > 1 and (roiy.stop - roiy.start) > 1
|
||||
|
||||
|
||||
def extract_cell_data(yearly: xr.Dataset, geom: Polygon):
|
||||
"""Extract ERA5 data for a specific grid cell geometry.
|
||||
|
||||
Extracts and spatially averages ERA5 data within the bounds of a grid cell.
|
||||
Handles antimeridian-crossing cells by splitting them appropriately.
|
||||
@stopwatch("Getting corrected geometries", log=False)
|
||||
def _get_corrected_geoms(geom: Polygon, gbox: odc.geo.geobox.GeoBox) -> list[odc.geo.Geometry]:
|
||||
"""Get corrected geometries for antimeridian-crossing polygons.
|
||||
|
||||
Args:
|
||||
yearly (xr.Dataset): Yearly aggregated ERA5 dataset.
|
||||
geom (Polygon): Polygon geometry of the grid cell.
|
||||
geom (Polygon): Input polygon geometry.
|
||||
gbox (odc.geo.geobox.GeoBox): GeoBox for spatial reference.
|
||||
|
||||
Returns:
|
||||
list[odc.geo.Geometry]: List of corrected, georeferenced geometries.
|
||||
|
||||
"""
|
||||
# cell.geometry is a shapely Polygon
|
||||
|
|
@ -608,28 +626,73 @@ def extract_cell_data(yearly: xr.Dataset, geom: Polygon):
|
|||
# Split geometry in case it crossed antimeridian
|
||||
else:
|
||||
geoms = _split_antimeridian_cell(geom)
|
||||
cell_data = []
|
||||
for geom in geoms:
|
||||
geom = odc.geo.Geometry(geom, crs="epsg:4326")
|
||||
if not _check_geom(yearly.odc.geobox, geom):
|
||||
continue
|
||||
cell_data.append(yearly.odc.crop(geom).drop_vars("spatial_ref").mean(["latitude", "longitude"]))
|
||||
if len(cell_data) == 0:
|
||||
|
||||
geoms = [odc.geo.Geometry(g, crs="epsg:4326") for g in geoms]
|
||||
geoms = filter(lambda g: _check_geom(gbox, g), geoms)
|
||||
return geoms
|
||||
|
||||
|
||||
@stopwatch("Extracting cell data", log=False)
|
||||
def extract_cell_data(ds: xr.Dataset, geoms: list[odc.geo.Geometry]) -> xr.Dataset | bool:
|
||||
"""Extract ERA5 data for a specific grid cell geometry.
|
||||
|
||||
Extracts and spatially averages ERA5 data within the bounds of a grid cell.
|
||||
Handles antimeridian-crossing cells by splitting them appropriately.
|
||||
|
||||
Args:
|
||||
ds (xr.Dataset): An ERA5 dataset.
|
||||
geoms (list[odc.geo.Geometry]): List of (valid) geometries of the grid cell.
|
||||
|
||||
"""
|
||||
if len(geoms) == 0:
|
||||
return False
|
||||
elif len(cell_data) == 1:
|
||||
cell_data = cell_data[0]
|
||||
elif len(geoms) == 1:
|
||||
return ds.odc.crop(geoms[0]).drop_vars("spatial_ref").mean(["latitude", "longitude"], skipna=True).compute()
|
||||
else:
|
||||
cell_data = xr.concat(cell_data, dim="part").mean("part")
|
||||
cell_data = cell_data.compute()
|
||||
return cell_data
|
||||
parts = [
|
||||
ds.odc.crop(geom).drop_vars("spatial_ref").mean(["latitude", "longitude"], skipna=True) for geom in geoms
|
||||
]
|
||||
parts = [part for part in parts if part.latitude.size > 0 and part.longitude.size > 0]
|
||||
if len(parts) == 0:
|
||||
raise ValueError("No valid parts found for geometry. This should not happen!")
|
||||
elif len(parts) == 1:
|
||||
return parts[0].compute()
|
||||
else:
|
||||
return xr.concat(parts, dim="part").mean("part", skipna=True).compute()
|
||||
|
||||
|
||||
def _correct_longs(ds: xr.Dataset) -> xr.Dataset:
|
||||
return ds.assign_coords(longitude=(((ds.longitude + 180) % 360) - 180)).sortby("longitude")
|
||||
|
||||
|
||||
@stopwatch("Creating aligned dataset", log=False)
|
||||
def _create_aligned(
|
||||
ds: xr.Dataset, data: dict[str, np.ndarray], grid: Literal["hex", "healpix"], level: int
|
||||
) -> xr.Dataset:
|
||||
cell_ids = grids.get_cell_ids(grid, level)
|
||||
data_vars = {var: (["cell_ids", "time"], values) for var, values in data.items()}
|
||||
aligned = xr.Dataset(
|
||||
data_vars,
|
||||
coords={"cell_ids": cell_ids, "time": ds.time},
|
||||
)
|
||||
gridinfo = {
|
||||
"grid_name": "h3" if grid == "hex" else grid,
|
||||
"level": level,
|
||||
}
|
||||
if grid == "healpix":
|
||||
gridinfo["indexing_scheme"] = "nested"
|
||||
aligned.cell_ids.attrs = gridinfo
|
||||
for var in ds.data_vars:
|
||||
aligned[var].attrs = ds[var].attrs
|
||||
|
||||
aligned = aligned.chunk({"cell_ids": min(len(aligned.cell_ids), 10000), "time": len(aligned.time)})
|
||||
return aligned
|
||||
|
||||
|
||||
@cli.command
|
||||
def spatial_agg(
|
||||
grid: Literal["hex", "healpix"],
|
||||
level: int,
|
||||
agg: Literal["summer", "winter", "yearly"] = "yearly",
|
||||
n_workers: int = 10,
|
||||
):
|
||||
"""Perform spatial aggregation of ERA5 data to grid cells.
|
||||
|
||||
|
|
@ -640,69 +703,73 @@ def spatial_agg(
|
|||
Args:
|
||||
grid ("hex" | "healpix"): Grid type.
|
||||
level (int): Grid resolution level.
|
||||
agg ("summer" | "winter" | "yearly"): Type of aggregation to perform. Defaults to yearly.
|
||||
n_workers (int, optional): Number of parallel workers to use. Defaults to 10.
|
||||
|
||||
"""
|
||||
gridname = f"permafrost_{grid}{level}"
|
||||
agg_grid_path = _get_grid_paths(agg, grid, level)
|
||||
grid_df = gpd.read_parquet(DATA_DIR / f"grids/{gridname}_grid.parquet")
|
||||
# Create an empty zarr array with the right dimensions
|
||||
if agg == "summer":
|
||||
agg_data_path = SUMMER_RAW_PATH
|
||||
elif agg == "winter":
|
||||
agg_data_path = WINTER_RAW_PATH
|
||||
elif agg == "yearly":
|
||||
agg_data_path = YEARLY_RAW_PATH
|
||||
else:
|
||||
raise ValueError(f"Unknown aggregation type: {agg}")
|
||||
agg_raw = (
|
||||
xr.open_zarr(agg_data_path, consolidated=False, decode_timedelta=False)
|
||||
.set_coords("spatial_ref")
|
||||
.drop_vars(["surface", "number", "depthBelowLandLayer"])
|
||||
.load()
|
||||
)
|
||||
assert {"latitude", "longitude", "time"} == set(agg_raw.dims), (
|
||||
f"Expected dims ('latitude', 'longitude', 'time'), got {agg_raw.dims}"
|
||||
)
|
||||
assert agg_raw.odc.crs == "epsg:4326", f"Expected CRS 'epsg:4326', got {agg_raw.odc.crs}"
|
||||
grid_gdf = grids.open(grid, level)
|
||||
# ? Mask out water, since we don't want to aggregate over oceans
|
||||
grid_gdf = watermask.clip_grid(grid_gdf)
|
||||
|
||||
# Convert lons to -180 to 180 instead of 0 to 360
|
||||
agg_raw = agg_raw.assign_coords(longitude=(((agg_raw.longitude + 180) % 360) - 180)).sortby("longitude")
|
||||
summer_unaligned_store = get_era5_stores("summer")
|
||||
winter_unaligned_store = get_era5_stores("winter")
|
||||
yearly_unaligned_store = get_era5_stores("yearly")
|
||||
with stopwatch("Loading summer ERA5 data"):
|
||||
summer_unaligned = xr.open_zarr(summer_unaligned_store, consolidated=False).set_coords("spatial_ref")
|
||||
assert {"latitude", "longitude", "time"} == set(summer_unaligned.dims)
|
||||
assert summer_unaligned.odc.crs == "epsg:4326", f"Expected CRS 'epsg:4326', got {summer_unaligned.odc.crs}"
|
||||
summer_unaligned = _correct_longs(summer_unaligned)
|
||||
with stopwatch("Loading winter ERA5 data"):
|
||||
winter_unaligned = xr.open_zarr(winter_unaligned_store, consolidated=False).set_coords("spatial_ref")
|
||||
assert {"latitude", "longitude", "time"} == set(winter_unaligned.dims)
|
||||
assert winter_unaligned.odc.crs == "epsg:4326", f"Expected CRS 'epsg:4326', got {winter_unaligned.odc.crs}"
|
||||
winter_unaligned = _correct_longs(winter_unaligned)
|
||||
with stopwatch("Loading yearly ERA5 data"):
|
||||
yearly_unaligned = xr.open_zarr(yearly_unaligned_store, consolidated=False).set_coords("spatial_ref")
|
||||
assert {"latitude", "longitude", "time"} == set(yearly_unaligned.dims)
|
||||
assert yearly_unaligned.odc.crs == "epsg:4326", f"Expected CRS 'epsg:4326', got {yearly_unaligned.odc.crs}"
|
||||
yearly_unaligned = _correct_longs(yearly_unaligned)
|
||||
|
||||
# ? Converting cell IDs from hex strings to integers for xdggs compatibility
|
||||
cells = [int(cid, 16) for cid in grid_df.cell_id.to_list()]
|
||||
|
||||
agg_aligned = (
|
||||
xr.zeros_like(agg_raw.isel(latitude=0, longitude=0).drop_vars(["latitude", "longitude"]))
|
||||
.expand_dims({"cell_ids": cells})
|
||||
.chunk({"cell_ids": min(len(grid_df), 10000), "time": len(agg_raw.time)})
|
||||
)
|
||||
|
||||
agg_aligned.cell_ids.attrs = {
|
||||
"grid_name": "h3" if grid == "hex" else grid,
|
||||
"level": level,
|
||||
summer_data = {
|
||||
var: np.full((len(grid_gdf), len(summer_unaligned.time)), np.nan, dtype=np.float32)
|
||||
for var in summer_unaligned.data_vars
|
||||
}
|
||||
if grid == "healpix":
|
||||
agg_aligned.cell_ids.attrs["indexing_scheme"] = "nested"
|
||||
|
||||
from stopuhr import stopwatch
|
||||
|
||||
for _, row in track(
|
||||
grid_df.to_crs("epsg:4326").iterrows(),
|
||||
total=len(grid_df),
|
||||
winter_data = {
|
||||
var: np.full((len(grid_gdf), len(winter_unaligned.time)), np.nan, dtype=np.float32)
|
||||
for var in winter_unaligned.data_vars
|
||||
}
|
||||
yearly_data = {
|
||||
var: np.full((len(grid_gdf), len(yearly_unaligned.time)), np.nan, dtype=np.float32)
|
||||
for var in yearly_unaligned.data_vars
|
||||
}
|
||||
for i, (_, row) in track(
|
||||
enumerate(grid_gdf.to_crs("epsg:4326").iterrows()),
|
||||
total=len(grid_gdf),
|
||||
description="Spatially aggregating ERA5 data...",
|
||||
):
|
||||
cell_id = int(row.cell_id, 16)
|
||||
with stopwatch("Extracting cell data", log=False):
|
||||
cell_data = extract_cell_data(agg_raw, row.geometry)
|
||||
if cell_data is False:
|
||||
print(f"Warning: No data found for cell {cell_id}, skipping.")
|
||||
geoms = _get_corrected_geoms(row.geometry, summer_unaligned.odc.geobox)
|
||||
if len(geoms) == 0:
|
||||
print(f"Warning: No valid geometry for cell {row.cell_id}.")
|
||||
continue
|
||||
with stopwatch("Assigning cell data", log=False):
|
||||
agg_aligned.loc[{"cell_ids": cell_id}] = cell_data
|
||||
|
||||
agg_aligned.to_zarr(agg_grid_path, mode="w", consolidated=False, encoding=create_encoding(agg_aligned))
|
||||
cell_data = extract_cell_data(summer_unaligned, geoms)
|
||||
for var in summer_unaligned.data_vars:
|
||||
summer_data[var][i, :] = cell_data[var].values.astype()
|
||||
cell_data = extract_cell_data(winter_unaligned, geoms)
|
||||
for var in winter_unaligned.data_vars:
|
||||
winter_data[var][i, :] = cell_data[var].values
|
||||
cell_data = extract_cell_data(yearly_unaligned, geoms)
|
||||
for var in yearly_unaligned.data_vars:
|
||||
yearly_data[var][i, :] = cell_data[var].values
|
||||
|
||||
summer = _create_aligned(summer_unaligned, summer_data, grid, level)
|
||||
winter = _create_aligned(winter_unaligned, winter_data, grid, level)
|
||||
yearly = _create_aligned(yearly_unaligned, yearly_data, grid, level)
|
||||
|
||||
summer_store = get_era5_stores("summer", grid, level)
|
||||
winter_store = get_era5_stores("winter", grid, level)
|
||||
yearly_store = get_era5_stores("yearly", grid, level)
|
||||
summer.to_zarr(summer_store, mode="w", consolidated=False, encoding=codecs.from_ds(summer))
|
||||
winter.to_zarr(winter_store, mode="w", consolidated=False, encoding=codecs.from_ds(winter))
|
||||
yearly.to_zarr(yearly_store, mode="w", consolidated=False, encoding=codecs.from_ds(yearly))
|
||||
print("Finished spatial matching.")
|
||||
stopwatch.summary()
|
||||
|
||||
|
|
@ -4,8 +4,6 @@ Author: Tobias Hölzer
|
|||
Date: 09. June 2025
|
||||
"""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Literal
|
||||
|
||||
import cartopy.crs as ccrs
|
||||
|
|
@ -25,14 +23,45 @@ from shapely.ops import transform
|
|||
from stopuhr import stopwatch
|
||||
from xdggs.healpix import HealpixInfo
|
||||
|
||||
from entropice.paths import get_grid_file, get_grid_viz_file, watermask_file
|
||||
|
||||
traceback.install()
|
||||
pretty.install()
|
||||
|
||||
DATA_DIR = Path(os.environ.get("DATA_DIR", "../../data")) / "entropyc-rts"
|
||||
GRIDS_DIR = DATA_DIR / "grids"
|
||||
FIGURES_DIR = DATA_DIR / "figures"
|
||||
GRIDS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
FIGURES_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def open(grid: Literal["hex", "healpix"], level: int):
|
||||
"""Open a saved grid from parquet file.
|
||||
|
||||
Args:
|
||||
grid (Literal["hex", "healpix"]): The grid type to use.
|
||||
level (int): The grid level to use.
|
||||
|
||||
Returns:
|
||||
GeoDataFrame: The loaded grid.
|
||||
|
||||
"""
|
||||
gridfile = get_grid_file(grid, level)
|
||||
grid = gpd.read_parquet(gridfile)
|
||||
return grid
|
||||
|
||||
|
||||
def get_cell_ids(grid: Literal["hex", "healpix"], level: int):
|
||||
"""Get the cell IDs of a saved grid.
|
||||
|
||||
Args:
|
||||
grid (Literal["hex", "healpix"]): The grid type to use.
|
||||
level (int): The grid level to use.
|
||||
|
||||
Returns:
|
||||
List[str]: The list of cell IDs.
|
||||
|
||||
"""
|
||||
grid_gdf = open(grid, level)
|
||||
cell_ids = grid_gdf["cell_id"].tolist()
|
||||
if grid == "hex":
|
||||
# ? Converting cell IDs from hex strings to integers for xdggs compatibility
|
||||
cell_ids = [int(cid, 16) for cid in cell_ids]
|
||||
return cell_ids
|
||||
|
||||
|
||||
@stopwatch("Create a global hex grid")
|
||||
|
|
@ -131,7 +160,7 @@ def filter_permafrost_grid(grid: gpd.GeoDataFrame):
|
|||
grid = grid.to_crs("EPSG:3413")
|
||||
|
||||
# Filter out non-land areas (e.g., oceans)
|
||||
water_mask = gpd.read_file(DATA_DIR / "simplified-water-polygons-split-3857/simplified_water_polygons.shp")
|
||||
water_mask = gpd.read_file(watermask_file)
|
||||
water_mask = water_mask.to_crs("EPSG:3413")
|
||||
|
||||
ov = gpd.overlay(grid, water_mask, how="intersection")
|
||||
|
|
@ -226,14 +255,14 @@ def cli(grid: Literal["hex", "healpix"], level: int):
|
|||
print("No valid grid cells found.")
|
||||
return
|
||||
|
||||
grid_file = GRIDS_DIR / f"permafrost_{grid}{level}_grid.parquet"
|
||||
grid_file = get_grid_file(grid, level)
|
||||
grid_gdf.to_parquet(grid_file)
|
||||
print(f"Saved to {grid_file.resolve()}")
|
||||
print(f"Saved to {grid_file}")
|
||||
|
||||
fig = vizualize_grid(grid_gdf, grid, level)
|
||||
fig_file = FIGURES_DIR / f"permafrost_{grid}{level}_grid.png"
|
||||
fig_file = get_grid_viz_file(grid, level)
|
||||
fig.savefig(fig_file, dpi=300)
|
||||
print(f"Saved figure to {fig_file.resolve()}")
|
||||
print(f"Saved figure to {fig_file}")
|
||||
plt.close(fig)
|
||||
|
||||
|
||||
75
src/entropice/paths.py
Normal file
75
src/entropice/paths.py
Normal file
|
|
@ -0,0 +1,75 @@
|
|||
# ruff: noqa: D103
|
||||
"""Paths for entropice data storage."""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Literal
|
||||
|
||||
DATA_DIR = Path(os.environ.get("DATA_DIR", "data")).resolve() / "entropice"
|
||||
|
||||
GRIDS_DIR = DATA_DIR / "grids"
|
||||
FIGURES_DIR = DATA_DIR / "figures"
|
||||
DARTS_DIR = DATA_DIR / "darts"
|
||||
ERA5_DIR = DATA_DIR / "era5"
|
||||
EMBEDDINGS_DIR = DATA_DIR / "embeddings"
|
||||
WATERMASK_DIR = DATA_DIR / "watermask"
|
||||
|
||||
GRIDS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
FIGURES_DIR.mkdir(parents=True, exist_ok=True)
|
||||
DARTS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
ERA5_DIR.mkdir(parents=True, exist_ok=True)
|
||||
EMBEDDINGS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
WATERMASK_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
watermask_file = DATA_DIR / "simplified_water_polygons.shp"
|
||||
|
||||
dartsl2_file = DARTS_DIR / "DARTS_NitzeEtAl_v1-2_features_2018-2023_level2.parquet"
|
||||
dartsl2_cov_file = DARTS_DIR / "DARTS_NitzeEtAl_v1-2_coverage_2018-2023_level2.parquet"
|
||||
|
||||
|
||||
def _get_gridname(grid: Literal["hex", "healpix"], level: int) -> str:
|
||||
return f"permafrost_{grid}{level}"
|
||||
|
||||
|
||||
def get_grid_file(grid: Literal["hex", "healpix"], level: int) -> Path:
|
||||
gridname = _get_gridname(grid, level)
|
||||
gridfile = GRIDS_DIR / f"{gridname}_grid.parquet"
|
||||
return gridfile
|
||||
|
||||
|
||||
def get_grid_viz_file(grid: Literal["hex", "healpix"], level: int) -> Path:
|
||||
gridname = _get_gridname(grid, level)
|
||||
vizfile = FIGURES_DIR / f"{gridname}_grid.png"
|
||||
return vizfile
|
||||
|
||||
|
||||
def get_darts_rts_file(grid: Literal["hex", "healpix"], level: int) -> Path:
|
||||
gridname = _get_gridname(grid, level)
|
||||
rtsfile = DARTS_DIR / f"{gridname}_rts.parquet"
|
||||
return rtsfile
|
||||
|
||||
|
||||
def get_annual_embeddings_file(grid: Literal["hex", "healpix"], level: int, year: int) -> Path:
|
||||
gridname = _get_gridname(grid, level)
|
||||
embfile = EMBEDDINGS_DIR / f"{gridname}_embeddings-{year}.parquet"
|
||||
return embfile
|
||||
|
||||
|
||||
def get_embeddings_store(grid: Literal["hex", "healpix"], level: int) -> Path:
|
||||
gridname = _get_gridname(grid, level)
|
||||
embstore = EMBEDDINGS_DIR / f"{gridname}_embeddings.zarr"
|
||||
return embstore
|
||||
|
||||
|
||||
def get_era5_stores(
|
||||
agg: Literal["daily", "monthly", "summer", "winter", "yearly"],
|
||||
grid: Literal["hex", "healpix"] | None = None,
|
||||
level: int | None = None,
|
||||
):
|
||||
if grid is None or level is None:
|
||||
return ERA5_DIR / f"{agg}_climate_unaligned.zarr"
|
||||
|
||||
gridname = _get_gridname(grid, level)
|
||||
aligned_path = ERA5_DIR / f"{gridname}_{agg}_climate.zarr"
|
||||
return aligned_path
|
||||
32
src/entropice/watermask.py
Normal file
32
src/entropice/watermask.py
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
"""Helpers for the watermask."""
|
||||
|
||||
import geopandas as gpd
|
||||
|
||||
from entropice.paths import watermask_file
|
||||
|
||||
|
||||
def open():
|
||||
"""Open the watermask shapefile.
|
||||
|
||||
Returns:
|
||||
GeoDataFrame: The watermask polygons.
|
||||
|
||||
"""
|
||||
watermask = gpd.read_file(watermask_file)
|
||||
return watermask
|
||||
|
||||
|
||||
def clip_grid(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
|
||||
"""Clip the input GeoDataFrame with the watermask.
|
||||
|
||||
Args:
|
||||
gdf (gpd.GeoDataFrame): The input GeoDataFrame to clip.
|
||||
|
||||
Returns:
|
||||
gpd.GeoDataFrame: The clipped GeoDataFrame.
|
||||
|
||||
"""
|
||||
watermask = open()
|
||||
watermask = watermask.to_crs("EPSG:3413")
|
||||
gdf = gdf.overlay(watermask, how="difference")
|
||||
return gdf
|
||||
|
|
@ -1,66 +0,0 @@
|
|||
"""Labels of Retrogressive-Thaw-Slumps (RTS).
|
||||
|
||||
Assumes that the level 1 and level 2 DARTS features have been downloaded into $DATA_DIR / entropyc-rts / darts: https://arcticdata.io/catalog/view/doi:10.18739/A22B8VD7C
|
||||
|
||||
Author: Tobias Hölzer
|
||||
Date: October 2025
|
||||
"""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Literal
|
||||
|
||||
import cyclopts
|
||||
import geopandas as gpd
|
||||
from rich.progress import track
|
||||
|
||||
DATA_DIR = Path(os.environ.get("DATA_DIR", "../../data")) / "entropyc-rts"
|
||||
LEVEL2_PATH = DATA_DIR / "darts" / "DARTS_NitzeEtAl_v1-2_features_2018-2023_level2.parquet"
|
||||
LEVEL2_COV_PATH = DATA_DIR / "darts" / "DARTS_NitzeEtAl_v1-2_coverage_2018-2023_level2.parquet"
|
||||
|
||||
|
||||
def extract_darts_rts(grid: Literal["hex", "healpix"], level: int):
|
||||
"""Extract RTS labels from DARTS dataset.
|
||||
|
||||
Args:
|
||||
grid (Literal["hex", "healpix"]): The grid type to use.
|
||||
level (int): The grid level to use.
|
||||
|
||||
"""
|
||||
darts_l2 = gpd.read_parquet(LEVEL2_PATH)
|
||||
darts_cov_l2 = gpd.read_parquet(LEVEL2_COV_PATH)
|
||||
|
||||
gridname = f"permafrost_{grid}{level}"
|
||||
grid = gpd.read_parquet(DATA_DIR / f"grids/{gridname}_grid.parquet")
|
||||
|
||||
grid_l2 = grid.overlay(darts_l2.to_crs(grid.crs), how="intersection")
|
||||
grid_cov_l2 = grid.overlay(darts_cov_l2.to_crs(grid.crs), how="intersection")
|
||||
|
||||
years = list(grid_cov_l2["year"].unique())
|
||||
for year in track(years, total=len(years), description="Processing years..."):
|
||||
subset = grid_l2[grid_l2["year"] == year]
|
||||
subset_cov = grid_cov_l2[grid_cov_l2["year"] == year]
|
||||
|
||||
counts = subset.groupby("cell_id").size()
|
||||
grid[f"darts_{year}_rts_count"] = grid.cell_id.map(counts)
|
||||
|
||||
areas = subset.groupby("cell_id").apply(lambda x: x.geometry.area.sum(), include_groups=False)
|
||||
grid[f"darts_{year}_rts_area"] = grid.cell_id.map(areas)
|
||||
|
||||
areas_cov = subset_cov.groupby("cell_id").apply(lambda x: x.geometry.area.sum(), include_groups=False)
|
||||
grid[f"darts_{year}_covered_area"] = grid.cell_id.map(areas_cov)
|
||||
grid[f"darts_{year}_coverage"] = grid[f"darts_{year}_covered_area"] / grid.geometry.area
|
||||
|
||||
grid[f"darts_{year}_rts_density"] = grid[f"darts_{year}_rts_area"] / grid[f"darts_{year}_covered_area"]
|
||||
|
||||
output_path = DATA_DIR / f"darts/{gridname}_darts.parquet"
|
||||
grid.to_parquet(output_path)
|
||||
print(f"Saved RTS labels to {output_path}")
|
||||
|
||||
|
||||
def main(): # noqa: D103
|
||||
cyclopts.run(extract_darts_rts)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
File diff suppressed because one or more lines are too long
2
uv.lock
generated
2
uv.lock
generated
|
|
@ -1199,7 +1199,7 @@ wheels = [
|
|||
]
|
||||
|
||||
[[package]]
|
||||
name = "entropic-perma-risk"
|
||||
name = "entropice"
|
||||
version = "0.1.0"
|
||||
source = { editable = "." }
|
||||
dependencies = [
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue