diff --git a/.gitignore b/.gitignore index 7644bf5..6f0e547 100755 --- a/.gitignore +++ b/.gitignore @@ -9,8 +9,9 @@ wheels/ # Virtual environments .venv -# Data +# Data & figures data +figures # Editors .vscode/ diff --git a/pyproject.toml b/pyproject.toml index 8b19e81..ade1bdd 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ dependencies = [ "geopandas>=1.1.0", "h3>=4.2.2", "h5netcdf>=1.6.4", + "ipycytoscape>=1.3.3", "ipykernel>=6.29.5", "ipywidgets>=8.1.7", "mapclassify>=2.9.0", @@ -34,6 +35,7 @@ dependencies = [ "odc-geo[all]>=0.4.10", "opt-einsum>=3.4.0", "pyarrow>=20.0.0", + "rechunker>=0.5.2", "requests>=2.32.3", "rich>=14.0.0", "rioxarray>=0.19.0", @@ -41,6 +43,7 @@ dependencies = [ "seaborn>=0.13.2", "smart-geocubes[gee,dask,stac,viz]>=0.0.9", "stopuhr>=0.0.10", + "ultraplot>=1.63.0", "xanimate", "xarray>=2025.9.0", "xdggs>=0.2.1", diff --git a/src/entropice/codecs.py b/src/entropice/codecs.py index 8a0fa44..b8db1e6 100644 --- a/src/entropice/codecs.py +++ b/src/entropice/codecs.py @@ -4,7 +4,9 @@ import xarray as xr from zarr.codecs import BloscCodec -def from_ds(ds: xr.Dataset, store_floats_as_float32: bool = True, include_coords: bool = True) -> dict: +def from_ds( + ds: xr.Dataset, store_floats_as_float32: bool = True, include_coords: bool = True, filter_existing: bool = True +) -> dict: """Create compression encoding for zarr dataset storage. Creates Blosc compression configuration for all data variables and coordinates @@ -17,12 +19,17 @@ def from_ds(ds: xr.Dataset, store_floats_as_float32: bool = True, include_coords include_coords (bool, optional): Whether to include coordinates in the encoding. This is useful when appending to an existing store. Defaults to True. + filter_existing (bool, optional): Whether to filter out variables that already have + encoding defined. Useful when appending to existing stores. Defaults to True. Returns: dict: Encoding dictionary with compression settings for each variable. """ var_names = [*ds.data_vars, *ds.coords] if include_coords else ds.data_vars + # Filter out variables which already have encoding (e.g., when appending to existing store) + if filter_existing: + var_names = [var for var in var_names if not ds[var].encoding] encoding = {var: {"compressors": BloscCodec(cname="zstd", clevel=5)} for var in var_names} if store_floats_as_float32: for var in ds.data_vars: diff --git a/src/entropice/era5.py b/src/entropice/era5.py index b941ec7..dc69859 100644 --- a/src/entropice/era5.py +++ b/src/entropice/era5.py @@ -1,4 +1,4 @@ -# ruff: noqa: PD011 +# ruff: noqa: PD003, PD011 """Download and preprocess ERA5 data. Variables of Interest: @@ -85,14 +85,16 @@ import odc.geo.xr import pandas as pd import shapely import shapely.ops +import ultraplot as uplt import xarray as xr +import xdggs from rich import pretty, print, traceback from rich.progress import track from shapely.geometry import LineString, Polygon from stopuhr import stopwatch from entropice import codecs, grids, watermask -from entropice.paths import get_era5_stores +from entropice.paths import FIGURES_DIR, get_era5_stores traceback.install(show_locals=True, suppress=[cyclopts, xr, pd, cProfile]) pretty.install() @@ -207,6 +209,7 @@ def download_daily_aggregated(): daily_raw = daily_raw.odc.assign_crs("epsg:4326") daily_raw = daily_raw.drop_vars(["surface", "number", "depthBelowLandLayer"]) + daily_raw.attrs = era5.attrs daily_store = get_era5_stores("daily") print(f"Saving downloaded and daily aggregated ERA5 data to {daily_store}.") daily_raw.to_zarr(daily_store, mode="w", encoding=codecs.from_ds(daily_raw), consolidated=False) @@ -253,6 +256,8 @@ def daily_enrich(): daily_store = get_era5_stores("daily") daily = xr.open_zarr(daily_store, consolidated=False).set_coords("spatial_ref") assert "time" in daily.dims, f"Expected dim 'time' to be in {daily.dims=}" + # Keep track of stored variables for re-calculation and encodings + stored_vars = list(daily.data_vars) # For better dask performance, all variables are written immediately after calculation # The smart scheduling which could performantly handle more is according to docs not yet implemented. @@ -260,10 +265,11 @@ def daily_enrich(): def _store(v: str): nonlocal daily - encoding = codecs.from_ds(daily[[v]], include_coords=False) + encoding = codecs.from_ds(daily[[v]], include_coords=False) if v not in stored_vars else None print(f"Storing enriched daily variable {v} to {daily_store}...") with stopwatch("Storing enriched daily variable"): daily[[v]].to_zarr(daily_store, mode="a", encoding=encoding, consolidated=False) + stored_vars.append(v) daily = xr.open_zarr(daily_store, consolidated=False).set_coords("spatial_ref") # Formulas based on Groeke et. al. (2025) Stochastic Weather generation... @@ -275,9 +281,10 @@ def daily_enrich(): daily.t2m_range.attrs = {"long_name": "Daily range of 2 metre temperature", "units": "K"} _store("t2m_range") - daily["t2m_skew"] = (daily.t2m_mean - daily.t2m_min) / daily.t2m_range - daily.t2m_skew.attrs = {"long_name": "Daily skewness of 2 metre temperature"} - _store("t2m_skew") + with np.errstate(invalid="ignore"): + daily["t2m_skew"] = (daily.t2m_mean - daily.t2m_min) / daily.t2m_range + daily.t2m_skew.attrs = {"long_name": "Daily skewness of 2 metre temperature"} + _store("t2m_skew") daily["thawing_degree_days"] = (daily.t2m_avg - 273.15).clip(min=0) daily.thawing_degree_days.attrs = {"long_name": "Thawing degree days", "units": "K"} @@ -287,19 +294,19 @@ def daily_enrich(): daily.freezing_degree_days.attrs = {"long_name": "Freezing degree days", "units": "K"} _store("freezing_degree_days") - daily["thawing_days"] = (daily.t2m_avg > 273.15).astype(int) + daily["thawing_days"] = (daily.t2m_avg > 273.15).where(~daily.t2m_avg.isnull()) daily.thawing_days.attrs = {"long_name": "Thawing days"} _store("thawing_days") - daily["freezing_days"] = (daily.t2m_avg < 273.15).astype(int) + daily["freezing_days"] = (daily.t2m_avg < 273.15).where(~daily.t2m_avg.isnull()) daily.freezing_days.attrs = {"long_name": "Freezing days"} _store("freezing_days") - daily["precipitation_occurrences"] = (daily.tp > 0.001).astype(int) + daily["precipitation_occurrences"] = (daily.tp > 0.001).where(~daily.tp.isnull()) daily.precipitation_occurrences.attrs = {"long_name": "Precipitation occurrences"} _store("precipitation_occurrences") - daily["snowfall_occurrences"] = (daily.sf > 0.001).astype(int) + daily["snowfall_occurrences"] = (daily.sf > 0.001).where(~daily.sf.isnull()) daily.snowfall_occurrences.attrs = {"long_name": "Snowfall occurrences"} _store("snowfall_occurrences") @@ -308,6 +315,14 @@ def daily_enrich(): _store("naive_snow_isolation") +def _load_daily(): + daily_store = get_era5_stores("daily") + daily = xr.open_zarr(daily_store, consolidated=False, chunks={"latitude": -1, "longitude": 640}) + daily = daily.set_coords("spatial_ref") + assert "time" in daily.dims, f"Expected dim 'time' to be in {daily.dims=}" + return daily + + def monthly_aggregate(): """Aggregate enriched daily ERA5 data to monthly resolution. @@ -319,230 +334,206 @@ def monthly_aggregate(): The aggregated monthly data is saved to a zarr file for further processing. """ - daily_store = get_era5_stores("daily") - daily = xr.open_zarr(daily_store, consolidated=False).set_coords("spatial_ref") - assert "time" in daily.dims, f"Expected dim 'time' to be in {daily.dims=}" + daily = _load_daily() daily = daily.sel(time=slice(min_time, max_time)) - # Monthly instant aggregates - monthly_instants = [] - for var in instants: - if var.endswith("_min"): - agg = daily[var].resample(time="1ME").min().rename(var) - agg.attrs = daily[var].attrs - agg.attrs["long_name"] = f"Monthly minimum of {daily[var].attrs.get('long_name', var)}" - monthly_instants.append(agg) - elif var.endswith("_max"): - agg = daily[var].resample(time="1ME").max().rename(var) - agg.attrs = daily[var].attrs - agg.attrs["long_name"] = f"Monthly maximum of {daily[var].attrs.get('long_name', var)}" - monthly_instants.append(agg) - else: - agg = daily[var].resample(time="1ME").median().rename(var) - agg.attrs = daily[var].attrs - agg.attrs["long_name"] = f"Monthly median of {daily[var].attrs.get('long_name', var)}" - monthly_instants.append(agg) + monthly = ( + xr.Dataset( + {}, + coords={ + "time": daily.time.resample(time="1MS").last(), + "latitude": daily.latitude, + "longitude": daily.longitude, + }, + attrs=daily.attrs, + ) + .odc.assign_crs("epsg:4326") + .chunk({"time": 12, "latitude": -1, "longitude": 640}) + ) - monthly_accums = [] - for var in accums: - agg = daily[var].resample(time="1ME").sum().rename(var) - agg.attrs = daily[var].attrs - monthly_accums.append(agg) - - monthly = xr.merge(monthly_instants + monthly_accums) - monthly = monthly.chunk({"time": len(monthly.time), "latitude": 256, "longitude": 256}) # ~ 100Mb / chunk for f32 monthly_store = get_era5_stores("monthly") - print(f"Saving monthly aggregated ERA5 data to {monthly_store}.") + print(f"Saving empty monthly ERA5 data to {monthly_store}.") monthly.to_zarr(monthly_store, mode="w", encoding=codecs.from_ds(monthly), consolidated=False) + # Monthly instant aggregates + for var in instants | accums: + if var in accums: + monthly[var] = daily[var].resample(time="1MS").sum(skipna=False).rename(var) + elif var.endswith("_min"): + monthly[var] = daily[var].resample(time="1MS").min().rename(var) + elif var.endswith("_max"): + monthly[var] = daily[var].resample(time="1MS").max().rename(var) + else: + monthly[var] = daily[var].resample(time="1MS").median().rename(var) -def multi_monthly_aggregate(monthly: xr.Dataset, n: int = 12) -> xr.Dataset: + monthly[var].attrs = daily[var].attrs + if var in daily or var.endswith(("_min", "_max")): + monthly[var].attrs["long_name"] = monthly[var].attrs.get("long_name", "").replace("Daily", "Monthly") + else: + monthly[var].attrs["long_name"] = f"Monthly median of {monthly[var].attrs.get('long_name', var)}" + + # Similar storing logic as in daily + encoding = codecs.from_ds(monthly[[var]], include_coords=False) + print(f"Storing enriched monthly variable {var} to {monthly_store}...") + with stopwatch("Storing enriched monthly variable"): + monthly[[var]].chunk({"time": 12, "latitude": -1, "longitude": 640}).to_zarr( + monthly_store, mode="a", encoding=encoding, consolidated=False + ) + monthly = xr.open_zarr(monthly_store, consolidated=False).set_coords("spatial_ref") + + +def multi_monthly_aggregate(agg: Literal["yearly", "seasonal", "shoulder"] = "yearly"): """Aggregate monthly ERA5 data to a multi-month resolution. Takes monthly aggregated data and creates multi-month aggregates using a shifted calendar (October to September) to better capture Arctic seasonal patterns. Args: - monthly (xr.Dataset): The monthly aggregates - n (int, optional): Number of months to aggregate over. Defaults to 12. + agg (Literal["yearly", "seasonal", "shoulder"], optional): The type of multi-month aggregation. - Returns: - xr.Dataset: The aggregated dataset """ - # Instants - multimonthly_instants = [] - for var in instants: - if var.endswith("_min"): - agg = monthly[var].resample(time=f"{n}MS", label="right").min().rename(var) - agg.attrs = monthly[var].attrs - agg.attrs["long_name"] = f"{n}-Monthly minimum of {monthly[var].attrs.get('long_name', var)}" - multimonthly_instants.append(agg) + n = {"yearly": 12, "seasonal": 6, "shoulder": 3}[agg] + daily = _load_daily() + + # "Shift" the calendar by slicing the first Jan-Sep and the last Oct-Dec months + first_year = daily.time.dt.year.min().item() + last_year = daily.time.dt.year.max().item() + daily = daily.sel(time=slice(f"{first_year}-10-01", f"{last_year}-09-30")) + + chunks = {"time": 3, "latitude": -1, "longitude": 640} + multimonthly = ( + xr.Dataset( + {}, + coords={ + "time": daily.resample(time=f"{n}MS").last().time, + "latitude": daily.latitude, + "longitude": daily.longitude, + }, + attrs=daily.attrs, + ) + .odc.assign_crs("epsg:4326") + .chunk(chunks) + ) + + multimonthly_store = get_era5_stores(agg) + print(f"Saving empty multi-monthly ERA5 data to {multimonthly_store}.") + multimonthly.to_zarr(multimonthly_store, mode="w", encoding=codecs.from_ds(multimonthly), consolidated=False) + + def _store(var): + nonlocal multimonthly + + # Similar storing logic as in daily + encoding = codecs.from_ds(multimonthly[[var]], include_coords=False) + print(f"Storing enriched multi-monthly variable {var} to {multimonthly_store}...") + with stopwatch("Storing enriched multi-monthly variable"): + multimonthly[[var]].chunk(chunks).to_zarr( + multimonthly_store, mode="a", encoding=encoding, consolidated=False + ) + multimonthly = xr.open_zarr(multimonthly_store, consolidated=False).set_coords("spatial_ref") + + # Multi-Monthly instant aggregates + for var in instants | accums: + if var in accums: + multimonthly[var] = daily[var].resample(time=f"{n}MS").sum(skipna=False).rename(var) + elif var.endswith("_min"): + multimonthly[var] = daily[var].resample(time=f"{n}MS").min().rename(var) elif var.endswith("_max"): - agg = monthly[var].resample(time=f"{n}MS", label="right").max().rename(var) - agg.attrs = monthly[var].attrs - agg.attrs["long_name"] = f"{n}-Monthly maximum of {monthly[var].attrs.get('long_name', var)}" - multimonthly_instants.append(agg) + multimonthly[var] = daily[var].resample(time=f"{n}MS").max().rename(var) else: - agg = monthly[var].resample(time=f"{n}MS", label="right").median().rename(var) - agg.attrs = monthly[var].attrs - agg.attrs["long_name"] = f"{n}-Monthly median of {monthly[var].attrs.get('long_name', var)}" - multimonthly_instants.append(agg) + multimonthly[var] = daily[var].resample(time=f"{n}MS").median().rename(var) - # Accums - multimonthly_accums = [] - for var in accums: - agg = monthly[var].resample(time=f"{n}MS", label="right").sum().rename(var) - agg.attrs = monthly[var].attrs - multimonthly_accums.append(agg) - - multimonthly = xr.merge(multimonthly_instants + multimonthly_accums) + multimonthly[var].attrs = daily[var].attrs + if var in accums or var.endswith(("_min", "_max")): + multimonthly[var].attrs["long_name"] = ( + multimonthly[var].attrs.get("long_name", "").replace("Daily", agg.capitalize()) + ) + else: + multimonthly[var].attrs["long_name"] = ( + f"{agg.capitalize()} median of {multimonthly[var].attrs.get('long_name', var)}" + ) + _store(var) # Effective snow depth - m = np.resize(np.arange(1, n + 1), len(monthly.time)) - m = xr.DataArray(m, coords={"time": monthly.time}, dims=["time"]) - n_sum = n * (n + 1) // 2 - multimonthly["effective_snow_depth"] = (monthly.sde_mean * (n + 1 - m)).resample(time=f"{n}MS").sum().rename( - "effective_snow_depth" - ) / n_sum + m = ( + daily.time.resample(time=f"{n}MS") + .map(lambda x: np.arange(1, x.count() + 1)) + .assign_coords({"time": daily.time}) + ) + nd = ( + daily.time.resample(time=f"{n}MS") + .map(lambda x: np.ones(x.count().item()) * x.count().item()) + .assign_coords({"time": daily.time}) + ) + n_sum = (nd * (nd + 1) // 2).resample(time=f"{n}MS").sum() + efd = (daily.sde_mean * (nd + 1 - m)).resample(time=f"{n}MS").sum(skipna=False) / n_sum + multimonthly["effective_snow_depth"] = efd.rename("effective_snow_depth") multimonthly["effective_snow_depth"].attrs = { "long_name": "Effective Snow Density", "reference": "Slater et. al. (2017)", "link": "https://tc.copernicus.org/articles/11/989/2017/tc-11-989-2017.pdf", } - - multimonthly = multimonthly.chunk( - {"time": len(multimonthly.time), "latitude": 128, "longitude": 1024} - ) # ~36Mb / chunk for f64 - return multimonthly + _store("effective_snow_depth") -def derive_yearly_variables(yearly: xr.Dataset) -> xr.Dataset: - """Derive additional variables from daily data and add them to the yearly dataset. - - Args: - yearly (xr.Dataset): The yearly aggregated dataset to enrich. - - Returns: - xr.Dataset: The enriched yearly dataset with additional derived variables. - - """ - assert "time" in yearly.dims, f"Expected dim 'time' to be in {yearly.dims=}" - daily_store = get_era5_stores("daily") - daily = xr.open_zarr(daily_store, consolidated=False).set_coords("spatial_ref") - assert "time" in daily.dims, f"Expected dim 'time' to be in {daily.dims=}" - daily = daily.sel(time=slice(min_time, max_time)) - # ? Note: The functions do not really account for leap years - # n_days_in_year = daily.time.groupby("time.year").count().rename("n_days_in_year") - n_days_in_year = 365 - - # A mask to check which places never thaws - # Persist in memory because we need it twice and this dramatically reduces the Dask-Graph size - never_thaws = (daily.thawing_days.groupby("time.year").sum(dim="time") == 0).compute() - - # ? First and last thaw day is NOT calculated within the october-september year, but within the calendar year - # This results in a much more correct representation of thawing periods in regions where the last thawing day - # is between october and december. - # This assumes that the 01-01 is almost everywhere one of the coldest days in the year - first_thaw_day = daily.thawing_days.groupby("time.year").apply(lambda x: x.argmax(dim="time")) + 1 - first_thaw_day = first_thaw_day.where(~never_thaws).rename("day_of_first_thaw").rename(year="time") - first_thaw_day["time"] = pd.to_datetime([f"{y}-10-01" for y in first_thaw_day.time.values]) - first_thaw_day.attrs = {"long_name": "Day of first thaw in year", "units": "day of year"} - yearly["day_of_first_thaw"] = first_thaw_day.sel(time=yearly.time) - - last_thaw_day = ( - n_days_in_year - daily.thawing_days[::-1].groupby("time.year").apply(lambda x: x.argmax(dim="time")) + 1 - ) - last_thaw_day = last_thaw_day.where(~never_thaws).rename("day_of_last_thaw").rename(year="time") - last_thaw_day["time"] = pd.to_datetime([f"{y}-10-01" for y in last_thaw_day.time.values]) - last_thaw_day.attrs = {"long_name": "Day of last thaw in year", "units": "day of year"} - yearly["day_of_last_thaw"] = last_thaw_day.sel(time=yearly.time) - - yearly["thawing_period_length"] = (yearly.day_of_last_thaw - yearly.day_of_first_thaw).rename( - "thawing_period_length" - ) - yearly.thawing_period_length.attrs = {"long_name": "Thawing period length in year", "units": "days"} - - # ? First and last freeze day is NOT calculated within the october-september year, but within an july-june year - # This results, similar to the thawing days, in a much more correct representation of freezing periods in regions - # where the first freezing day is between july and september. - # This assumes that the 01-07 is almost everywhere one of the warmest days in the year - daily_shifted = daily.copy() - daily_shifted["time"] = pd.to_datetime(daily_shifted.time.values) + pd.DateOffset(months=6) - - # A mask to check which places never freeze - # Persist in memory because we need it twice and this dramatically reduces the Dask-Graph size - never_freezes = (daily_shifted.freezing_days.groupby("time.year").sum(dim="time") == 0).compute() - - first_freezing_day = daily_shifted.freezing_days.groupby("time.year").apply(lambda x: x.argmax(dim="time")) + 1 - first_freezing_day = first_freezing_day.where(~never_freezes).rename("day_of_first_freeze").rename(year="time") - first_freezing_day["time"] = pd.to_datetime([f"{y}-10-01" for y in first_freezing_day.time.values]) - first_freezing_day.attrs = {"long_name": "Day of first freeze in year", "units": "day of year"} - yearly["day_of_first_freeze"] = first_freezing_day.sel(time=yearly.time) - - last_freezing_day = ( - n_days_in_year - - daily_shifted.freezing_days[::-1].groupby("time.year").apply(lambda x: x.argmax(dim="time")) - + 1 - ) - last_freezing_day = last_freezing_day.where(~never_freezes).rename("day_of_last_freeze").rename(year="time") - last_freezing_day["time"] = pd.to_datetime([f"{y}-10-01" for y in last_freezing_day.time.values]) - last_freezing_day.attrs = {"long_name": "Day of last freeze in year", "units": "day of year"} - yearly["day_of_last_freeze"] = last_freezing_day.sel(time=yearly.time) - - yearly["freezing_period_length"] = (yearly.day_of_last_freeze - yearly.day_of_first_freeze).rename( - "freezing_period_length" - ) - yearly.freezing_period_length.attrs = {"long_name": "Freezing period length in year", "units": "days"} - - return yearly +def _get_first_day(x): + return x.argmax(dim="time") + 1 -def yearly_and_seasonal_aggregate(): - """Aggregate monthly ERA5 data to yearly resolution with seasonal splits. - - Takes monthly aggregated data and creates yearly aggregates using a shifted - calendar (October to September) to better capture Arctic seasonal patterns. - Creates separate aggregates for full year, winter (Oct-Apr), and summer - (May-Sep) periods. - - The first and last incomplete years are excluded from the analysis. - Winter months are defined as months 1-7 in the shifted calendar, - and summer months are 8-12. - - The final dataset includes yearly, winter, and summer aggregates for all - climate variables, saved to a zarr file. - - """ - monthly_store = get_era5_stores("monthly") - monthly = xr.open_zarr(monthly_store, consolidated=False).set_coords("spatial_ref") - assert "time" in monthly.dims, f"Expected dim 'time' to be in {monthly.dims=}" - - # "Shift" the calendar by slicing the first Jan-Sep and the last Oct-Dec months - first_year = monthly.time.dt.year.min().item() - last_year = monthly.time.dt.year.max().item() - monthly = monthly.sel(time=slice(f"{first_year}-10-01", f"{last_year}-09-30")) - - yearly = multi_monthly_aggregate(monthly, n=12) - yearly = derive_yearly_variables(yearly) - yearly = yearly.chunk({"time": len(yearly.time), "latitude": 256, "longitude": 1024}) # ~36Mb / chunk for f32 +def _store_in_yearly(da, var): yearly_store = get_era5_stores("yearly") - print(f"Saving yearly aggregated ERA5 data to {yearly_store}.") - yearly.to_zarr(yearly_store, mode="w", encoding=codecs.from_ds(yearly), consolidated=False) + yearly = xr.open_zarr(yearly_store, consolidated=False).set_coords("spatial_ref") + assert "time" in yearly.dims, f"Expected dim 'time' to be in {yearly.dims=}" - summer_winter = multi_monthly_aggregate(monthly, n=6) + da["time"] = yearly.time + yearly[var] = da.rename(var) - summer = summer_winter.sel(time=summer_winter.time.dt.month == 4) - summer = summer.chunk({"time": len(summer.time), "latitude": 256, "longitude": 1024}) # ~36Mb / chunk for f32 - summer_store = get_era5_stores("summer") - print(f"Saving summer aggregated ERA5 data to {summer_store}.") - summer.to_zarr(summer_store, mode="w", encoding=codecs.from_ds(summer), consolidated=False) + # Similar storing logic as in daily + encoding = codecs.from_ds(yearly[[var]], include_coords=False) + chunks = {"time": 20, "latitude": -1, "longitude": 640} + print(f"Storing enriched yearly variable {var} to {yearly_store}...") + with stopwatch("Storing enriched yearly variable"): + yearly[[var]].chunk(chunks).to_zarr(yearly_store, mode="a", encoding=encoding, consolidated=False) - winter = summer_winter.sel(time=summer_winter.time.dt.month == 10) - winter = winter.chunk({"time": len(winter.time), "latitude": 256, "longitude": 1024}) # ~36Mb / chunk for f32 - winter_store = get_era5_stores("winter") - print(f"Saving winter aggregated ERA5 data to {winter_store}.") - winter.to_zarr(winter_store, mode="w", encoding=codecs.from_ds(winter), consolidated=False) + +def yearly_thaw_periods(): + daily = _load_daily() + first_year = daily.time.dt.year.min().item() + 1 + last_year = daily.time.dt.year.max().item() + daily = daily.sel(time=slice(f"{first_year}-01-01", f"{last_year}-12-31")) + + never_thaws = (daily.thawing_days.resample(time="12MS").sum(dim="time") == 0).persist() + + first_thaw_day = daily.thawing_days.resample(time="12MS").map(_get_first_day).where(~never_thaws) + first_thaw_day.attrs = {"long_name": "Day of first thaw in year", "units": "day of year"} + _store_in_yearly(first_thaw_day, "day_of_first_thaw") + + n_days_in_year = xr.where(366, 365, daily.time[::-1].dt.is_leap_year).resample(time="12MS").first() + last_thaw_day = n_days_in_year - daily.thawing_days[::-1].resample(time="12MS").map(_get_first_day).where( + ~never_thaws + ) + last_thaw_day.attrs = {"long_name": "Day of last thaw in year", "units": "day of year"} + _store_in_yearly(last_thaw_day, "day_of_last_thaw") + + +def yearly_freeze_periods(): + daily = _load_daily() + first_year = daily.time.dt.year.min().item() + last_year = daily.time.dt.year.max().item() - 1 + daily = daily.sel(time=slice(f"{first_year}-07-01", f"{last_year}-06-30")) + + never_freezes = (daily.freezing_days.resample(time="12MS").sum(dim="time") == 0).persist() + + first_freeze_day = daily.freezing_days.resample(time="12MS").map(_get_first_day).where(~never_freezes) + first_freeze_day.attrs = {"long_name": "Day of first freeze in year", "units": "day of year"} + _store_in_yearly(first_freeze_day, "day_of_first_freeze") + + n_days_in_year = xr.where(366, 365, daily.time[::-1].dt.is_leap_year).resample(time="12MS").last() + last_freeze_day = n_days_in_year - daily.freezing_days[::-1].resample(time="12MS").map(_get_first_day).where( + ~never_freezes + ) + last_freeze_day.attrs = {"long_name": "Day of last freeze in year", "units": "day of year"} + _store_in_yearly(last_freeze_day, "day_of_last_freeze") @cli.command @@ -571,8 +562,48 @@ def enrich(n_workers: int = 10, monthly: bool = True, yearly: bool = True, daily if monthly: monthly_aggregate() if yearly: - yearly_and_seasonal_aggregate() + multi_monthly_aggregate(agg="shoulder") + multi_monthly_aggregate(agg="seasonal") + multi_monthly_aggregate(agg="yearly") + # yearly_thaw_periods() + # yearly_freeze_periods() print("Enriched ERA5 data with additional features and aggregated it temporally.") + stopwatch.summary() + + +@cli.command +def viz(agg: Literal["daily", "monthly", "yearly", "summer", "winter"]): + """Visualize a small overview of ERA5 variables for a given aggregation. + + Args: + agg (Literal["daily", "monthly", "yearly", "summer", "winter"]): + Aggregation identifier used to locate the appropriate ERA5 Zarr store via + get_era5_stores. Determines which dataset is opened and visualized. + + Example: + >>> # produce and save an overview for monthly ERA5 data + >>> viz("monthly") + + """ + store = get_era5_stores(agg) + ds = xr.open_zarr(store, consolidated=False).set_coords("spatial_ref") + + tis = [0, 1, -2, -1] + ts = [str(ds.time.isel(time=t).values)[:10] for t in tis] + vnames = [ds[var].attrs.get("long_name", var) for var in ds.data_vars] + vunits = [ds[var].attrs.get("units", "") for var in ds.data_vars] + vlabels = [f"{name} [{unit}]" if unit else name for name, unit in zip(vnames, vunits)] + + fig, axs = uplt.subplots(ncols=len(tis), nrows=len(vlabels), proj="npaeqd", share=0) + axs.format(boundinglat=50, coast=True, toplabels=ts, leftlabels=vlabels) + for t in tis: + for i, var in enumerate(ds.data_vars): + subset = ds[var].isel(time=t).load() + m = axs[i, t].pcolormesh(subset, cmap="viridis") + axs[i, t].colorbar(m, loc="ll", label=var) + fig.format(suptitle="ERA5 Data") + (FIGURES_DIR / "era5").mkdir(parents=True, exist_ok=True) + fig.savefig(FIGURES_DIR / "era5" / f"{agg}_overview_unaligned.png") # =========================== @@ -686,6 +717,7 @@ def _create_aligned( aligned[var].attrs = ds[var].attrs aligned = aligned.chunk({"cell_ids": min(len(aligned.cell_ids), 10000), "time": len(aligned.time)}) + aligned = xdggs.decode(aligned) return aligned @@ -708,69 +740,45 @@ def spatial_agg( grid_gdf = grids.open(grid, level) # ? Mask out water, since we don't want to aggregate over oceans grid_gdf = watermask.clip_grid(grid_gdf) + grid_gdf = grid_gdf.to_crs("epsg:4326") - summer_unaligned_store = get_era5_stores("summer") - winter_unaligned_store = get_era5_stores("winter") - yearly_unaligned_store = get_era5_stores("yearly") - with stopwatch("Loading summer ERA5 data"): - summer_unaligned = xr.open_zarr(summer_unaligned_store, consolidated=False).set_coords("spatial_ref") - assert {"latitude", "longitude", "time"} == set(summer_unaligned.dims) - assert summer_unaligned.odc.crs == "epsg:4326", f"Expected CRS 'epsg:4326', got {summer_unaligned.odc.crs}" - summer_unaligned = _correct_longs(summer_unaligned) - with stopwatch("Loading winter ERA5 data"): - winter_unaligned = xr.open_zarr(winter_unaligned_store, consolidated=False).set_coords("spatial_ref") - assert {"latitude", "longitude", "time"} == set(winter_unaligned.dims) - assert winter_unaligned.odc.crs == "epsg:4326", f"Expected CRS 'epsg:4326', got {winter_unaligned.odc.crs}" - winter_unaligned = _correct_longs(winter_unaligned) - with stopwatch("Loading yearly ERA5 data"): - yearly_unaligned = xr.open_zarr(yearly_unaligned_store, consolidated=False).set_coords("spatial_ref") - assert {"latitude", "longitude", "time"} == set(yearly_unaligned.dims) - assert yearly_unaligned.odc.crs == "epsg:4326", f"Expected CRS 'epsg:4326', got {yearly_unaligned.odc.crs}" - yearly_unaligned = _correct_longs(yearly_unaligned) + # Precompute the geometries to clip later + daily_store = get_era5_stores("daily") + daily_unaligned = xr.open_zarr(daily_store, consolidated=False).set_coords("spatial_ref") + assert {"latitude", "longitude", "time"} == set(daily_unaligned.dims) + assert daily_unaligned.odc.crs == "epsg:4326", f"Expected CRS 'epsg:4326', got {daily_unaligned.odc.crs}" + daily_unaligned = _correct_longs(daily_unaligned) - summer_data = { - var: np.full((len(grid_gdf), len(summer_unaligned.time)), np.nan, dtype=np.float32) - for var in summer_unaligned.data_vars - } - winter_data = { - var: np.full((len(grid_gdf), len(winter_unaligned.time)), np.nan, dtype=np.float32) - for var in winter_unaligned.data_vars - } - yearly_data = { - var: np.full((len(grid_gdf), len(yearly_unaligned.time)), np.nan, dtype=np.float32) - for var in yearly_unaligned.data_vars - } - for i, (_, row) in track( - enumerate(grid_gdf.to_crs("epsg:4326").iterrows()), - total=len(grid_gdf), - description="Spatially aggregating ERA5 data...", - ): - geoms = _get_corrected_geoms(row.geometry, summer_unaligned.odc.geobox) - if len(geoms) == 0: - print(f"Warning: No valid geometry for cell {row.cell_id}.") - continue + cell_geometries = [_get_corrected_geoms(row.geometry, daily_unaligned.odc.geobox) for _, row in grid_gdf.iterrows()] - cell_data = extract_cell_data(summer_unaligned, geoms) - for var in summer_unaligned.data_vars: - summer_data[var][i, :] = cell_data[var].values.astype() - cell_data = extract_cell_data(winter_unaligned, geoms) - for var in winter_unaligned.data_vars: - winter_data[var][i, :] = cell_data[var].values - cell_data = extract_cell_data(yearly_unaligned, geoms) - for var in yearly_unaligned.data_vars: - yearly_data[var][i, :] = cell_data[var].values + for agg in ["summer", "winter", "yearly"]: + unaligned_store = get_era5_stores(agg) + with stopwatch(f"Loading {agg} ERA5 data"): + unaligned = xr.open_zarr(unaligned_store, consolidated=False).set_coords("spatial_ref") + assert {"latitude", "longitude", "time"} == set(unaligned.dims) + assert unaligned.odc.crs == "epsg:4326", f"Expected CRS 'epsg:4326', got {unaligned.odc.crs}" + unaligned = _correct_longs(unaligned) - summer = _create_aligned(summer_unaligned, summer_data, grid, level) - winter = _create_aligned(winter_unaligned, winter_data, grid, level) - yearly = _create_aligned(yearly_unaligned, yearly_data, grid, level) + data = { + var: np.full((len(grid_gdf), len(unaligned.time)), np.nan, dtype=np.float32) for var in unaligned.data_vars + } + for i, geoms in track( + enumerate(cell_geometries), + total=len(grid_gdf), + description=f"Spatially aggregating {agg} ERA5 data...", + ): + if len(geoms) == 0: + print(f"Warning: No valid geometry for cell {grid_gdf.iloc[i].cell_id}.") + continue - summer_store = get_era5_stores("summer", grid, level) - winter_store = get_era5_stores("winter", grid, level) - yearly_store = get_era5_stores("yearly", grid, level) - summer.to_zarr(summer_store, mode="w", consolidated=False, encoding=codecs.from_ds(summer)) - winter.to_zarr(winter_store, mode="w", consolidated=False, encoding=codecs.from_ds(winter)) - yearly.to_zarr(yearly_store, mode="w", consolidated=False, encoding=codecs.from_ds(yearly)) - print("Finished spatial matching.") + cell_data = extract_cell_data(unaligned, geoms) + for var in unaligned.data_vars: + data[var][i, :] = cell_data[var].values + + aggregated = _create_aligned(unaligned, data, grid, level) + store = get_era5_stores(agg, grid, level) + aggregated.to_zarr(store, mode="w", consolidated=False, encoding=codecs.from_ds(aggregated)) + print(f"Finished spatial matching for {agg} data.") stopwatch.summary() diff --git a/src/entropice/paths.py b/src/entropice/paths.py index beae350..5c95930 100644 --- a/src/entropice/paths.py +++ b/src/entropice/paths.py @@ -8,7 +8,7 @@ from typing import Literal DATA_DIR = Path(os.environ.get("DATA_DIR", "data")).resolve() / "entropice" GRIDS_DIR = DATA_DIR / "grids" -FIGURES_DIR = DATA_DIR / "figures" +FIGURES_DIR = Path("figures") DARTS_DIR = DATA_DIR / "darts" ERA5_DIR = DATA_DIR / "era5" EMBEDDINGS_DIR = DATA_DIR / "embeddings" @@ -63,12 +63,13 @@ def get_embeddings_store(grid: Literal["hex", "healpix"], level: int) -> Path: def get_era5_stores( - agg: Literal["daily", "monthly", "summer", "winter", "yearly"], + agg: Literal["daily", "monthly", "summer", "winter", "yearly", "seasonal", "shoulder"] = "daily", grid: Literal["hex", "healpix"] | None = None, level: int | None = None, ): if grid is None or level is None: - return ERA5_DIR / f"{agg}_climate_unaligned.zarr" + (ERA5_DIR / "intermediate").mkdir(parents=True, exist_ok=True) + return ERA5_DIR / "intermediate" / f"{agg}_climate.zarr" gridname = _get_gridname(grid, level) aligned_path = ERA5_DIR / f"{gridname}_{agg}_climate.zarr" diff --git a/uv.lock b/uv.lock index 3c1ca14..6159e36 100755 --- a/uv.lock +++ b/uv.lock @@ -1022,6 +1022,10 @@ complete = [ { name = "pandas" }, { name = "pyarrow" }, ] +diagnostics = [ + { name = "bokeh" }, + { name = "jinja2" }, +] distributed = [ { name = "distributed" }, ] @@ -1220,6 +1224,7 @@ dependencies = [ { name = "geopandas" }, { name = "h3" }, { name = "h5netcdf" }, + { name = "ipycytoscape" }, { name = "ipykernel" }, { name = "ipywidgets" }, { name = "mapclassify" }, @@ -1231,6 +1236,7 @@ dependencies = [ { name = "odc-geo", extra = ["all"] }, { name = "opt-einsum" }, { name = "pyarrow" }, + { name = "rechunker" }, { name = "requests" }, { name = "rich" }, { name = "rioxarray" }, @@ -1238,6 +1244,7 @@ dependencies = [ { name = "seaborn" }, { name = "smart-geocubes", extra = ["dask", "gee", "stac", "viz"] }, { name = "stopuhr" }, + { name = "ultraplot" }, { name = "xanimate" }, { name = "xarray" }, { name = "xdggs" }, @@ -1264,6 +1271,7 @@ requires-dist = [ { name = "geopandas", specifier = ">=1.1.0" }, { name = "h3", specifier = ">=4.2.2" }, { name = "h5netcdf", specifier = ">=1.6.4" }, + { name = "ipycytoscape", specifier = ">=1.3.3" }, { name = "ipykernel", specifier = ">=6.29.5" }, { name = "ipywidgets", specifier = ">=8.1.7" }, { name = "mapclassify", specifier = ">=2.9.0" }, @@ -1275,6 +1283,7 @@ requires-dist = [ { name = "odc-geo", extras = ["all"], specifier = ">=0.4.10" }, { name = "opt-einsum", specifier = ">=3.4.0" }, { name = "pyarrow", specifier = ">=20.0.0" }, + { name = "rechunker", specifier = ">=0.5.2" }, { name = "requests", specifier = ">=2.32.3" }, { name = "rich", specifier = ">=14.0.0" }, { name = "rioxarray", specifier = ">=0.19.0" }, @@ -1282,6 +1291,7 @@ requires-dist = [ { name = "seaborn", specifier = ">=0.13.2" }, { name = "smart-geocubes", extras = ["gee", "dask", "stac", "viz"], specifier = ">=0.0.9" }, { name = "stopuhr", specifier = ">=0.0.10" }, + { name = "ultraplot", specifier = ">=1.63.0" }, { name = "xanimate", git = "https://github.com/davbyr/xAnimate" }, { name = "xarray", specifier = ">=2025.9.0" }, { name = "xdggs", specifier = ">=0.2.1" }, @@ -1970,6 +1980,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cb/bd/b394387b598ed84d8d0fa90611a90bee0adc2021820ad5729f7ced74a8e2/imageio-2.37.0-py3-none-any.whl", hash = "sha256:11efa15b87bc7871b61590326b2d635439acc321cf7f8ce996f812543ce10eed", size = 315796 }, ] +[[package]] +name = "ipycytoscape" +version = "1.3.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "ipywidgets" }, + { name = "spectate" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/21/4b/dca529aa566ce225107580c6c8625c7dc5ecb1532f7d73259e2888d2187a/ipycytoscape-1.3.3.tar.gz", hash = "sha256:b6f3199df034f088e92d388e27e629f58ae2901b213cb9299e5b564272f9a2f8", size = 3885550 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/4c/0f/b66d63d4a5426c09005d3713b056e634e00e69788fdc88d1ffe40e5b7654/ipycytoscape-1.3.3-py2.py3-none-any.whl", hash = "sha256:4bc205724971f5f7a3fc2b09dfec20c357c4c6dfa2b4bd41e7c33c995c3f6906", size = 3595634 }, +] + [[package]] name = "ipyevents" version = "2.0.4" @@ -2675,6 +2698,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/93/cf/be4e93afbfa0def2cd6fac9302071db0bd6d0617999ecbf53f92b9398de3/multiurl-0.3.7-py3-none-any.whl", hash = "sha256:054f42974064f103be0ed55b43f0c32fc435a47dc7353a9adaffa643b99fa380", size = 21524 }, ] +[[package]] +name = "mypy-extensions" +version = "1.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/a2/6e/371856a3fb9d31ca8dac321cda606860fa4548858c0cc45d9d1d4ca2628b/mypy_extensions-1.1.0.tar.gz", hash = "sha256:52e68efc3284861e772bbcd66823fde5ae21fd2fdb51c62a211403730b916558", size = 6343 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/79/7b/2c79738432f5c924bef5071f933bcc9efd0473bac3b4aa584a6f7c1c8df8/mypy_extensions-1.1.0-py3-none-any.whl", hash = "sha256:1be4cccdb0f2482337c4743e60421de3a356cd97508abadd57d47403e94f5505", size = 4963 }, +] + [[package]] name = "narwhals" version = "2.9.0" @@ -3742,6 +3774,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f2/98/7e6d147fd16a10a5f821db6e25f192265d6ecca3d82957a4fdd592cad49c/ratelim-0.1.6-py2.py3-none-any.whl", hash = "sha256:e1a7dd39e6b552b7cc7f52169cd66cdb826a1a30198e355d7016012987c9ad08", size = 4017 }, ] +[[package]] +name = "rechunker" +version = "0.5.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "dask", extra = ["array", "diagnostics"] }, + { name = "mypy-extensions" }, + { name = "zarr" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/55/8e/9a76d6762d0db09ab289a836ea423a2feacc2f341885bea61f10312fa245/rechunker-0.5.2.tar.gz", hash = "sha256:18c610cc65854b3627c4d511138a7b962281f8e00838f78148cbf765e1ba2fb2", size = 480044 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/34/83/a485250bc09db55e4b4389d99e583fac871ceeaaa4620b67a31d8db95ef5/rechunker-0.5.2-py3-none-any.whl", hash = "sha256:e09585d69b429ae466470047a2b828f003c123dffe6a265720c0ab0ca78f4937", size = 22088 }, +] + [[package]] name = "referencing" version = "0.37.0" @@ -4159,6 +4205,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/14/a0/bb38d3b76b8cae341dad93a2dd83ab7462e6dbcdd84d43f54ee60a8dc167/soupsieve-2.8-py3-none-any.whl", hash = "sha256:0cc76456a30e20f5d7f2e14a98a4ae2ee4e5abdc7c5ea0aafe795f344bc7984c", size = 36679 }, ] +[[package]] +name = "spectate" +version = "1.0.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/c8/8d/78dbadaeea943cc0fb9d3cd6b0a4f4668a46f84de1c5507fe3c9f02b8973/spectate-1.0.1.tar.gz", hash = "sha256:49a2dde0962fcecf120cb361cc293989489078eb29ba1d8c3d342a741e898b7e", size = 14573 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/81/ec/8bdccea3ff7d557601183581340c3768b7bb7b1e32c8991f1130f0c1e2c4/spectate-1.0.1-py2.py3-none-any.whl", hash = "sha256:c4585194c238979f953fbf2ecf9f94c84d9d0a929432c7104e39984f52c9e718", size = 11077 }, +] + [[package]] name = "stack-data" version = "0.6.3" @@ -4291,6 +4346,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/5c/23/c7abc0ca0a1526a0774eca151daeb8de62ec457e77262b66b359c3c7679e/tzdata-2025.2-py2.py3-none-any.whl", hash = "sha256:1a403fada01ff9221ca8044d701868fa132215d84beb92242d9acd2147f667a8", size = 347839 }, ] +[[package]] +name = "ultraplot" +version = "1.63.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "matplotlib" }, + { name = "numpy" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c3/f6/64cf6ec8e98d8534b6f4530d2517f00705b25100169a30bfcd27fc4f4e35/ultraplot-1.63.0.tar.gz", hash = "sha256:e3e2fd7029a7c8bf5577709c3c454aab7d156439123ac8ba841602127e2ae163", size = 14783448 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/47/fd/48a7ba597876e97b29b900f9191808ebd29340923627eadfd172295e8425/ultraplot-1.63.0-py3-none-any.whl", hash = "sha256:8e57063a627167bb1c835877c75347d6f1883434709a04a9666a680e42e5546a", size = 13574760 }, +] + [[package]] name = "uritemplate" version = "4.2.0"