Refactor training script

This commit is contained in:
Tobias Hölzer 2026-01-11 19:48:19 +01:00
parent ad5f810f34
commit 2cefe35690
4 changed files with 458 additions and 357 deletions

View file

@ -149,6 +149,36 @@ class SplittedArrays:
train: torch.Tensor | np.ndarray | cp.ndarray
test: torch.Tensor | np.ndarray | cp.ndarray
@cached_property
def combined(self) -> torch.Tensor | np.ndarray | cp.ndarray:
"""Combined train and test arrays."""
if isinstance(self.train, torch.Tensor) and isinstance(self.test, torch.Tensor):
return torch.cat([self.train, self.test], dim=0)
elif isinstance(self.train, cp.ndarray) and isinstance(self.test, cp.ndarray):
return cp.concatenate([self.train, self.test], axis=0)
elif isinstance(self.train, np.ndarray) and isinstance(self.test, np.ndarray):
return np.concatenate([self.train, self.test], axis=0)
else:
raise TypeError("Incompatible types for train and test arrays.")
def as_numpy(self) -> "SplittedArrays":
"""Return the arrays as numpy arrays."""
train_np = (
self.train.cpu().numpy()
if isinstance(self.train, torch.Tensor)
else self.train.get()
if isinstance(self.train, cp.ndarray)
else self.train
)
test_np = (
self.test.cpu().numpy()
if isinstance(self.test, torch.Tensor)
else self.test.get()
if isinstance(self.test, cp.ndarray)
else self.test
)
return SplittedArrays(train=train_np, test=test_np)
@dataclass(frozen=True, eq=False)
class TrainingSet:
@ -185,12 +215,28 @@ class TrainingSet:
intervals.append((category_raw_values.min(), category_raw_values.max()))
return intervals
@cached_property
def target_labels(self) -> list[str]:
"""Labels of the target categories."""
@property
def target_labels(self) -> list[str] | None:
"""Labels of the target categories, None if not categorical."""
binned = self.targets["y"]
assert binned.dtype.name == "category", "Target labels are not categorical."
return list(binned.cat.categories)
if binned.dtype.name == "category":
return list(binned.cat.categories)
else:
return None
@property
def target_codes(self) -> list[int] | None:
"""Label-Codes of the target categories, None if not categorical."""
binned = self.targets["y"]
if binned.dtype.name == "category":
return list(binned.cat.codes)
else:
return None
@property
def feature_names(self) -> list[str]:
"""Names of the features."""
return self.features.columns.tolist()
def __len__(self):
return len(self.z)
@ -538,7 +584,7 @@ class DatasetEnsemble:
"none": No caching.
"read": Read from cache if exists, otherwise create and save to cache.
"overwrite": Always create and save to cache, overwriting existing cache.
Defaults to "none".
Defaults to "read".
Yields:
Generator[pd.DataFrame]: Generator yielding DataFrames with features for inference.
@ -572,7 +618,7 @@ class DatasetEnsemble:
"none": No caching.
"read": Read from cache if exists, otherwise create and save to cache.
"overwrite": Always create and save to cache, overwriting existing cache.
Defaults to "none".
Defaults to "read".
Returns:
pd.DataFrame: The training DataFrame.
@ -604,7 +650,7 @@ class DatasetEnsemble:
"none": No caching.
"read": Read from cache if exists, otherwise create and save to cache.
"overwrite": Always create and save to cache, overwriting existing cache.
Defaults to "none".
Defaults to "read".
Returns:
TrainingSet: The training set.

View file

@ -1,17 +1,17 @@
# ruff: noqa: N806
"""Inference runs on trained models."""
from typing import Literal
import cupy as cp
import geopandas as gpd
import pandas as pd
import torch
from cuml.ensemble import RandomForestClassifier
from entropy import ESPAClassifier
from rich import pretty, traceback
from sklearn import set_config
from xgboost.sklearn import XGBClassifier
from entropice.ml.dataset import DatasetEnsemble
from entropice.ml.models import SupportedModel
traceback.install()
pretty.install()
@ -21,41 +21,40 @@ set_config(array_api_dispatch=True)
def predict_proba(
e: DatasetEnsemble,
clf: RandomForestClassifier | ESPAClassifier | XGBClassifier,
classes: list,
model: SupportedModel,
device: Literal["cpu", "cuda", "torch"] = "cuda",
) -> gpd.GeoDataFrame:
"""Get predicted probabilities for each cell.
Args:
e (DatasetEnsemble): The dataset ensemble configuration.
clf (BaseEstimator): The trained classifier to use for predictions.
classes (list): List of class names.
model: SupportedModel: The trained model to use for predictions.
device (Literal["cpu", "cuda", "torch"]): The device to use for predictions.
This must match with the state of the model!
Returns:
list: A list of predicted probabilities for each cell.
gpd.GeoDataFrame: A GeoDataFrame with cell_id, predicted probability, and geometry.
"""
# Predict in batches to avoid memory issues
batch_size = 10000
batch_size = 50000
preds = []
for batch in e.create_batches(batch_size=batch_size):
cols_to_drop = ["geometry"]
if e.target == "darts_mllabels":
cols_to_drop += [col for col in batch.columns if col.startswith("dartsml_")]
else:
cols_to_drop += [col for col in batch.columns if col.startswith("darts_")]
X_batch = batch.drop(columns=cols_to_drop).dropna()
grid_gdf = e.read_grid()
for batch in e.create_inference_df(batch_size=batch_size):
# Skip empty batches (all rows had NaN values)
if len(X_batch) == 0:
if len(batch) == 0:
continue
cell_ids = X_batch.index.to_numpy()
cell_geoms = batch.loc[X_batch.index, "geometry"].to_numpy()
X_batch = X_batch.to_numpy(dtype="float64")
X_batch = torch.asarray(X_batch, device=0)
batch_preds = clf.predict(X_batch)
cell_ids = batch.index.to_numpy()
cell_geoms = grid_gdf.loc[batch.index, "geometry"].to_numpy()
X_batch = batch.to_numpy(dtype="float64")
if device == "torch":
X_batch = torch.from_numpy(X_batch).to("cuda")
elif device == "cuda":
X_batch = cp.asarray(X_batch)
batch_preds = model.predict(X_batch)
if isinstance(batch_preds, cp.ndarray):
batch_preds = batch_preds.get()
elif torch.is_tensor(batch_preds):
@ -63,9 +62,9 @@ def predict_proba(
batch_preds = gpd.GeoDataFrame(
{
"cell_id": cell_ids,
"predicted_class": [classes[i] for i in batch_preds],
"predicted": batch_preds,
"geometry": cell_geoms,
},
).set_crs(epsg=3413, inplace=False)
).set_crs(grid_gdf.crs, inplace=False)
preds.append(batch_preds)
return gpd.GeoDataFrame(pd.concat(preds, ignore_index=True))

299
src/entropice/ml/models.py Normal file
View file

@ -0,0 +1,299 @@
"""Training helper to create and configure ML models with hyperparameter search."""
from dataclasses import dataclass, field
from typing import TypedDict
import scipy.stats
import xarray as xr
from cuml.ensemble import RandomForestClassifier, RandomForestRegressor
from cuml.neighbors import KNeighborsClassifier, KNeighborsRegressor
from entropy import ESPAClassifier
from scipy.stats._distn_infrastructure import rv_continuous_frozen, rv_discrete_frozen
from xgboost.sklearn import XGBClassifier, XGBRegressor
from entropice.ml.dataset import TrainingSet
from entropice.utils.types import Task
class Distribution(TypedDict):
"""Distribution specification for hyperparameter optimization."""
distribution: str
low: float | int
high: float | int
type HPConfig = dict[str, Distribution | list]
type SupportedModel = (
ESPAClassifier
| XGBClassifier
| XGBRegressor
| RandomForestClassifier
| RandomForestRegressor
| KNeighborsClassifier
| KNeighborsRegressor
)
@dataclass(frozen=True)
class ModelHPOConfig:
"""Model - Hyperparameter Optimization Configuration."""
model: SupportedModel
hp_config: HPConfig
fit_params: dict[str, object] = field(default_factory=dict)
@property
def search_space(self) -> dict[str, list | rv_continuous_frozen | rv_discrete_frozen]:
"""Convert the HPConfig into a search space dictionary usable by sklearn's RandomizedSearchCV."""
search_space = {}
for key, dist in self.hp_config.items():
if isinstance(dist, list):
search_space[key] = dist
continue
assert hasattr(scipy.stats, dist["distribution"]), (
f"Unknown distribution type for {key}: {dist['distribution']}"
)
distfn = getattr(scipy.stats, dist["distribution"])
search_space[key] = distfn(dist["low"], dist["high"])
return search_space
# Hardcode Search Settings for now
espa_hpconfig: HPConfig = {
"eps_cl": {"distribution": "loguniform", "low": 1e-11, "high": 1e-6},
"eps_e": {"distribution": "loguniform", "low": 1e4, "high": 1e8},
"initial_K": {"distribution": "randint", "low": 400, "high": 800},
}
xgboost_hpconfig: HPConfig = {
"learning_rate": {"distribution": "loguniform", "low": 1e-3, "high": 1e-1},
"n_estimators": {"distribution": "randint", "low": 50, "high": 2000},
}
rf_hpconfig: HPConfig = {
"max_depth": {"distribution": "randint", "low": 5, "high": 50},
"n_estimators": {"distribution": "randint", "low": 50, "high": 1000},
}
knn_hpconfig: HPConfig = {
"n_neighbors": {"distribution": "randint", "low": 10, "high": 200},
"weights": ["uniform", "distance"], # Categorical parameter
}
def get_model_hpo_config(model: str, task: Task, **model_kwargs) -> ModelHPOConfig:
"""Create a model and its hyperparameter optimization configuration based on the specified model type and task.
Args:
model (str): The type of model to create. Supported values are "espa", "xgboost", "rf", and "knn".
task (Task): The type of task to perform. Supported values are "classifier" and "regressor".
Kwargs:
model_kwargs: Additional keyword arguments to pass to the model constructor.
Returns:
ModelHPOConfig: A dataclass containing the model instance, hyperparameter configuration, and any fit parameters.
"""
model_type = "classifier" if task in ("binary", "count_regimes", "density_regimes") else "regressor"
match (model, model_type):
case ("espa", "classifier"):
clf = ESPAClassifier(20, 0.1, 0.1, robust=True, **model_kwargs)
fit_params = {"max_iter": 300}
return ModelHPOConfig(clf, espa_hpconfig, fit_params)
case ("xgboost", "classifier"):
clf = XGBClassifier(
objective="multi:softprob" if task != "binary" else "binary:logistic",
eval_metric="mlogloss" if task != "binary" else "logloss",
tree_method="hist",
max_depth=10,
**model_kwargs,
)
return ModelHPOConfig(clf, xgboost_hpconfig)
case ("xgboost", "regressor"):
reg = XGBRegressor(
objective="reg:squarederror",
eval_metric="rmse",
tree_method="hist",
max_depth=10,
**model_kwargs,
)
return ModelHPOConfig(reg, xgboost_hpconfig)
case ("rf", "classifier"):
clf = RandomForestClassifier(split_criterion="entropy", **model_kwargs)
return ModelHPOConfig(clf, rf_hpconfig)
case ("rf", "regressor"):
reg = RandomForestRegressor(split_criterion="variance", **model_kwargs)
return ModelHPOConfig(reg, rf_hpconfig)
case ("knn", "classifier"):
clf = KNeighborsClassifier(**model_kwargs)
return ModelHPOConfig(clf, knn_hpconfig)
case ("knn", "regressor"):
reg = KNeighborsRegressor(**model_kwargs)
return ModelHPOConfig(reg, knn_hpconfig)
case _:
raise ValueError(f"Unsupported model/task combination: {model}/{task}")
def extract_espa_feature_importance(model: ESPAClassifier, training_data: TrainingSet) -> xr.Dataset:
"""Extract the inner state of a trained ESPAClassifier as an xarray Dataset."""
# Annotate the state with xarray metadata
boxes = list(range(model.K_))
box_centers = xr.DataArray(
model.S_.cpu().numpy(),
dims=["feature", "box"],
coords={"feature": training_data.feature_names, "box": boxes},
name="box_centers",
attrs={"description": "Centers of the boxes in feature space."},
)
box_assignments = xr.DataArray(
model.Lambda_.cpu().numpy(),
dims=["class", "box"],
coords={"class": training_data.target_labels, "box": boxes},
name="box_assignments",
attrs={"description": "Assignments of samples to boxes."},
)
feature_weights = xr.DataArray(
model.W_.cpu().numpy(),
dims=["feature"],
coords={"feature": training_data.feature_names},
name="feature_weights",
attrs={"description": "Feature weights for each box."},
)
state = xr.Dataset(
{
"box_centers": box_centers,
"box_assignments": box_assignments,
"feature_weights": feature_weights,
},
attrs={
"description": "Inner state of the best ESPAClassifier from RandomizedSearchCV.",
},
)
return state
def extract_xgboost_feature_importance(model: XGBClassifier | XGBRegressor, training_data: TrainingSet) -> xr.Dataset:
"""Extract feature importance from a trained XGBoost model as an xarray Dataset."""
# Extract XGBoost-specific information
# Get the underlying booster
booster = model.get_booster()
# Feature importance with different importance types
# Note: get_score() returns dict with keys like 'f0', 'f1', etc. (feature indices)
importance_weight = booster.get_score(importance_type="weight")
importance_gain = booster.get_score(importance_type="gain")
importance_cover = booster.get_score(importance_type="cover")
importance_total_gain = booster.get_score(importance_type="total_gain")
importance_total_cover = booster.get_score(importance_type="total_cover")
# Create aligned arrays for all features (including zero-importance)
def align_importance(importance_dict, features):
"""Align importance dict to feature list, filling missing with 0.
XGBoost returns feature indices (f0, f1, ...) as keys, so we need to map them.
"""
return [importance_dict.get(f"f{i}", 0.0) for i in range(len(features))]
feature_importance_weight = xr.DataArray(
align_importance(importance_weight, training_data.feature_names),
dims=["feature"],
coords={"feature": training_data.feature_names},
name="feature_importance_weight",
attrs={"description": "Number of times a feature is used to split the data across all trees."},
)
feature_importance_gain = xr.DataArray(
align_importance(importance_gain, training_data.feature_names),
dims=["feature"],
coords={"feature": training_data.feature_names},
name="feature_importance_gain",
attrs={"description": "Average gain across all splits the feature is used in."},
)
feature_importance_cover = xr.DataArray(
align_importance(importance_cover, training_data.feature_names),
dims=["feature"],
coords={"feature": training_data.feature_names},
name="feature_importance_cover",
attrs={"description": "Average coverage across all splits the feature is used in."},
)
feature_importance_total_gain = xr.DataArray(
align_importance(importance_total_gain, training_data.feature_names),
dims=["feature"],
coords={"feature": training_data.feature_names},
name="feature_importance_total_gain",
attrs={"description": "Total gain across all splits the feature is used in."},
)
feature_importance_total_cover = xr.DataArray(
align_importance(importance_total_cover, training_data.feature_names),
dims=["feature"],
coords={"feature": training_data.feature_names},
name="feature_importance_total_cover",
attrs={"description": "Total coverage across all splits the feature is used in."},
)
# Store tree information
n_trees = booster.num_boosted_rounds()
state = xr.Dataset(
{
"feature_importance_weight": feature_importance_weight,
"feature_importance_gain": feature_importance_gain,
"feature_importance_cover": feature_importance_cover,
"feature_importance_total_gain": feature_importance_total_gain,
"feature_importance_total_cover": feature_importance_total_cover,
},
attrs={
"description": "Inner state of the best XGBClassifier from RandomizedSearchCV.",
"n_trees": n_trees,
"objective": str(model.objective),
},
)
return state
def extract_rf_feature_importance(
model: RandomForestClassifier | RandomForestRegressor, training_data: TrainingSet
) -> xr.Dataset:
"""Extract feature importance from a trained RandomForest model as an xarray Dataset."""
# Extract Random Forest-specific information
# Note: cuML's RandomForestClassifier doesn't expose individual trees (estimators_)
# like sklearn does, so we can only extract feature importances and model parameters
# Feature importances (Gini importance)
feature_importances = model.feature_importances_
feature_importance = xr.DataArray(
feature_importances,
dims=["feature"],
coords={"feature": training_data.feature_names},
name="feature_importance",
attrs={"description": "Gini importance (impurity-based feature importance)."},
)
# cuML RF doesn't expose individual trees, so we store model parameters instead
n_estimators = model.n_estimators
max_depth = model.max_depth
# OOB score if available
oob_score = None
if hasattr(model, "oob_score_") and model.oob_score:
oob_score = float(model.oob_score_)
# cuML RandomForest doesn't provide per-tree statistics like sklearn
# Store what we have: feature importances and model configuration
attrs = {
"description": "Inner state of the best RandomForestClassifier from RandomizedSearchCV (cuML).",
"n_estimators": int(n_estimators),
"note": "cuML RandomForest does not expose individual tree statistics like sklearn",
}
# Only add optional attributes if they have values
if max_depth is not None:
attrs["max_depth"] = int(max_depth)
if oob_score is not None:
attrs["oob_score"] = oob_score
state = xr.Dataset(
{
"feature_importance": feature_importance,
},
attrs=attrs,
)
return state

View file

@ -4,42 +4,31 @@ import pickle
from dataclasses import asdict, dataclass
from functools import partial
import cupy as cp
import cyclopts
import numpy as np
import pandas as pd
import toml
import torch
import xarray as xr
from cuml.ensemble import RandomForestClassifier
from cuml.neighbors import KNeighborsClassifier
from entropy import ESPAClassifier
from rich import pretty, traceback
from scipy.stats import loguniform, randint
from scipy.stats._distn_infrastructure import rv_continuous_frozen, rv_discrete_frozen
from sklearn import set_config
from sklearn.metrics import (
accuracy_score,
confusion_matrix,
f1_score,
jaccard_score,
precision_score,
recall_score,
)
from sklearn.metrics import accuracy_score, confusion_matrix, f1_score, jaccard_score, precision_score, recall_score
from sklearn.model_selection import KFold, RandomizedSearchCV
from stopuhr import stopwatch
from xgboost.sklearn import XGBClassifier
from entropice.ml.dataset import DatasetEnsemble
from entropice.ml.dataset import DatasetEnsemble, SplittedArrays
from entropice.ml.inference import predict_proba
from entropice.ml.models import (
extract_espa_feature_importance,
extract_rf_feature_importance,
extract_xgboost_feature_importance,
get_model_hpo_config,
)
from entropice.utils.paths import get_cv_results_dir
from entropice.utils.types import Model, Task
traceback.install()
pretty.install()
# Disabled array_api_dispatch to avoid namespace conflicts between NumPy and CuPy
# when using XGBoost with device="cuda"
set_config(array_api_dispatch=True)
cli = cyclopts.App("entropice-training", config=cyclopts.config.Toml("training-config.toml")) # ty:ignore[invalid-argument-type]
@ -82,93 +71,21 @@ _metric_functions = {
@cyclopts.Parameter("*")
@dataclass(frozen=True, kw_only=True)
class CVSettings:
"""Cross-validation settings for model training."""
n_iter: int = 2000
robust: bool = False
task: Task = "binary"
model: Model = "espa"
@dataclass(frozen=True, kw_only=True)
class TrainingSettings(DatasetEnsemble, CVSettings):
"""Helper Wrapper to store combined training and dataset ensemble settings."""
param_grid: dict
cv_splits: int
metrics: list[str]
classes: list[str]
def _create_clf(
settings: CVSettings,
):
if settings.model == "espa":
if settings.task == "binary":
param_grid = {
"eps_cl": loguniform(1e-4, 1e1),
"eps_e": loguniform(1e1, 1e7),
"initial_K": randint(20, 400),
}
else:
param_grid = {
"eps_cl": loguniform(1e-11, 1e-6),
"eps_e": loguniform(1e4, 1e8),
"initial_K": randint(400, 800),
}
clf = ESPAClassifier(20, 0.1, 0.1, random_state=42, robust=settings.robust)
fit_params = {"max_iter": 300}
elif settings.model == "xgboost":
param_grid = {
"learning_rate": loguniform(1e-4, 1e-1),
"max_depth": randint(5, 50),
"n_estimators": randint(50, 1000),
}
clf = XGBClassifier(
objective="multi:softprob" if settings.task != "binary" else "binary:logistic",
eval_metric="mlogloss" if settings.task != "binary" else "logloss",
random_state=42,
tree_method="hist",
device="gpu", # Using CPU to avoid CuPy/NumPy namespace conflicts in sklearn metrics
)
fit_params = {}
elif settings.model == "rf":
param_grid = {
"max_depth": randint(5, 50),
"n_estimators": randint(50, 1000),
}
clf = RandomForestClassifier(random_state=42, split_criterion="entropy")
fit_params = {}
elif settings.model == "knn":
param_grid = {
"n_neighbors": randint(10, 200),
"weights": ["uniform", "distance"],
}
clf = KNeighborsClassifier()
fit_params = {}
else:
raise ValueError(f"Unknown model: {settings.model}")
return clf, param_grid, fit_params
def _serialize_param_grid(param_grid):
param_grid_serializable = {}
for key, dist in param_grid.items():
# ! Hacky, but I can't find a better way to serialize scipy distributions once they are created
if isinstance(dist, rv_continuous_frozen):
param_grid_serializable[key] = {
"distribution": "loguniform",
"low": dist.a,
"high": dist.b,
}
elif isinstance(dist, rv_discrete_frozen):
param_grid_serializable[key] = {
"distribution": "randint",
"low": dist.a,
"high": dist.b,
}
elif isinstance(dist, list):
param_grid_serializable[key] = dist
else:
raise ValueError(f"Unknown distribution type for {key}: {type(dist)}")
return param_grid_serializable
classes: list[str] | None
@cli.default
@ -179,24 +96,24 @@ def random_cv(
"""Perform random cross-validation on the training dataset.
Args:
grid (Grid): The grid type to use.
level (int): The grid level to use.
n_iter (int, optional): Number of parameter settings that are sampled. Defaults to 2000.
robust (bool, optional): Whether to use robust training. Defaults to False.
task (Literal["binary", "count", "density"], optional): The classification task type. Defaults to "binary".
dataset_ensemble (DatasetEnsemble): The dataset ensemble configuration.
settings (CVSettings): The cross-validation settings.
"""
device = "torch" if settings.model in ["espa"] else "cuda"
print("Creating training data...")
training_data = dataset_ensemble.create_cat_training_dataset(task=settings.task, device=device)
# Since we use cuml and xgboost libraries, we can only enable array API for ESPA
use_array_api = settings.model == "espa"
device = "torch" if use_array_api else "cuda"
set_config(array_api_dispatch=use_array_api)
clf, param_grid, fit_params = _create_clf(settings)
print(f"Using model: {settings.model} with parameters: {param_grid}")
print("Creating training data...")
training_data = dataset_ensemble.create_training_set(task=settings.task, device=device)
model_hpo_config = get_model_hpo_config(settings.model, settings.task)
print(f"Using model: {settings.model} with parameters: {model_hpo_config.hp_config}")
cv = KFold(n_splits=5, shuffle=True, random_state=42)
metrics = _metrics["binary" if settings.task == "binary" else "multiclass"]
search = RandomizedSearchCV(
clf,
param_grid,
model_hpo_config.model,
model_hpo_config.search_space,
n_iter=settings.n_iter,
n_jobs=1,
cv=cv,
@ -208,47 +125,27 @@ def random_cv(
print(f"Starting RandomizedSearchCV with {search.n_iter} candidates...")
with stopwatch(f"RandomizedSearchCV fitting for {search.n_iter} candidates"):
y_train = (
training_data.y.train.get()
if settings.model == "xgboost" and isinstance(training_data.y.train, cp.ndarray)
else training_data.y.train
search.fit(
training_data.X.train,
# XGBoost returns it's labels as numpy arrays instead of cupy arrays
# Thus, for the scoring to work, we need to convert them back to numpy
training_data.y.as_numpy().train if settings.model == "xgboost" else training_data.y.train,
**model_hpo_config.fit_params,
)
search.fit(training_data.X.train, y_train, **fit_params)
print("Best parameters combination found:")
best_estimator = search.best_estimator_
best_parameters = best_estimator.get_params()
for param_name in sorted(param_grid.keys()):
for param_name in sorted(model_hpo_config.hp_config.keys()):
print(f"{param_name}: {best_parameters[param_name]}")
y_test = (
training_data.y.test.get()
if settings.model == "xgboost" and isinstance(training_data.y.test, cp.ndarray)
else training_data.y.test
test_accuracy = search.score(
training_data.X.test,
training_data.y.as_numpy().test if settings.model == "xgboost" else training_data.y.test,
)
test_accuracy = search.score(training_data.X.test, y_test)
print(f"Accuracy of the best parameters using the inner CV of the random search: {search.best_score_:.3f}")
print(f"Accuracy on test set: {test_accuracy:.3f}")
# Compute predictions on the test set
y_pred = best_estimator.predict(training_data.X.test)
labels = list(range(len(training_data.y.labels)))
y_test = torch.asarray(y_test, device="cuda")
y_pred = torch.asarray(y_pred, device="cuda")
labels = torch.asarray(labels, device="cuda")
test_metrics = {metric: _metric_functions[metric](y_test, y_pred) for metric in metrics}
# Get a confusion matrix
cm = confusion_matrix(y_test, y_pred, labels=labels)
label_names = [training_data.y.labels[i] for i in range(len(training_data.y.labels))]
cm = xr.DataArray(
cm.cpu().numpy(),
dims=["true_label", "predicted_label"],
coords={"true_label": label_names, "predicted_label": label_names},
name="confusion_matrix",
)
results_dir = get_cv_results_dir(
"random_search",
grid=dataset_ensemble.grid,
@ -257,15 +154,13 @@ def random_cv(
)
# Store the search settings
# First convert the param_grid distributions to a serializable format
param_grid_serializable = _serialize_param_grid(param_grid)
combined_settings = TrainingSettings(
**asdict(settings),
**asdict(dataset_ensemble),
param_grid=param_grid_serializable,
param_grid=model_hpo_config.hp_config,
cv_splits=cv.get_n_splits(),
metrics=metrics,
classes=training_data.y.labels,
classes=training_data.target_labels,
)
settings_file = results_dir / "search_settings.toml"
print(f"Storing search settings to {settings_file}")
@ -288,209 +183,71 @@ def random_cv(
print(f"Storing CV results to {results_file}")
results.to_parquet(results_file)
# Store the test metrics
test_metrics_file = results_dir / "test_metrics.toml"
# Compute predictions on the all sets and move them to numpy for metric computations
y_pred = SplittedArrays(
train=best_estimator.predict(training_data.X.train),
test=best_estimator.predict(training_data.X.test),
).as_numpy()
# Compute and StoreMetrics
y = training_data.y.as_numpy()
test_metrics = {metric: _metric_functions[metric](y.test, y_pred.test) for metric in metrics}
train_metrics = {metric: _metric_functions[metric](y.train, y_pred.train) for metric in metrics}
combined_metrics = {metric: _metric_functions[metric](y.combined, y_pred.combined) for metric in metrics}
all_metrics = {
"test_metrics": test_metrics,
"train_metrics": train_metrics,
"combined_metrics": combined_metrics,
}
test_metrics_file = results_dir / "metrics.toml"
print(f"Storing test metrics to {test_metrics_file}")
with open(test_metrics_file, "w") as f:
toml.dump({"test_metrics": test_metrics}, f)
toml.dump(all_metrics, f)
# Store the confusion matrix
cm_file = results_dir / "confusion_matrix.nc"
print(f"Storing confusion matrix to {cm_file}")
cm.to_netcdf(cm_file, engine="h5netcdf")
# Make confusion matrices for classification taasks
if settings.task in ["binary", "count_regimes", "density_regimes"]:
labels = np.array(training_data.target_codes)
cm = xr.Dataset(
{
"test": (("true_label", "predicted_label"), confusion_matrix(y.test, y_pred.test, labels=labels)),
"train": (("true_label", "predicted_label"), confusion_matrix(y.train, y_pred.train, labels=labels)),
"combined": (
("true_label", "predicted_label"),
confusion_matrix(y.combined, y_pred.combined, labels=labels),
),
},
coords={"true_label": training_data.target_labels, "predicted_label": training_data.target_labels},
)
# Store the confusion matrices
cm_file = results_dir / "confusion_matrix.nc"
print(f"Storing confusion matrices to {cm_file}")
cm.to_netcdf(cm_file, engine="h5netcdf")
# Get the inner state of the best estimator
features = training_data.X.data.columns.tolist()
if settings.model == "espa":
# Annotate the state with xarray metadata
boxes = list(range(best_estimator.K_))
box_centers = xr.DataArray(
best_estimator.S_.cpu().numpy(),
dims=["feature", "box"],
coords={"feature": features, "box": boxes},
name="box_centers",
attrs={"description": "Centers of the boxes in feature space."},
)
box_assignments = xr.DataArray(
best_estimator.Lambda_.cpu().numpy(),
dims=["class", "box"],
coords={"class": training_data.y.labels, "box": boxes},
name="box_assignments",
attrs={"description": "Assignments of samples to boxes."},
)
feature_weights = xr.DataArray(
best_estimator.W_.cpu().numpy(),
dims=["feature"],
coords={"feature": features},
name="feature_weights",
attrs={"description": "Feature weights for each box."},
)
state = xr.Dataset(
{
"box_centers": box_centers,
"box_assignments": box_assignments,
"feature_weights": feature_weights,
},
attrs={
"description": "Inner state of the best ESPAClassifier from RandomizedSearchCV.",
},
)
state = extract_espa_feature_importance(best_estimator, training_data)
state_file = results_dir / "best_estimator_state.nc"
print(f"Storing best estimator state to {state_file}")
state.to_netcdf(state_file, engine="h5netcdf")
elif settings.model == "xgboost":
# Extract XGBoost-specific information
# Get the underlying booster
booster = best_estimator.get_booster()
# Feature importance with different importance types
# Note: get_score() returns dict with keys like 'f0', 'f1', etc. (feature indices)
importance_weight = booster.get_score(importance_type="weight")
importance_gain = booster.get_score(importance_type="gain")
importance_cover = booster.get_score(importance_type="cover")
importance_total_gain = booster.get_score(importance_type="total_gain")
importance_total_cover = booster.get_score(importance_type="total_cover")
# Create aligned arrays for all features (including zero-importance)
def align_importance(importance_dict, features):
"""Align importance dict to feature list, filling missing with 0.
XGBoost returns feature indices (f0, f1, ...) as keys, so we need to map them.
"""
return [importance_dict.get(f"f{i}", 0.0) for i in range(len(features))]
feature_importance_weight = xr.DataArray(
align_importance(importance_weight, features),
dims=["feature"],
coords={"feature": features},
name="feature_importance_weight",
attrs={"description": "Number of times a feature is used to split the data across all trees."},
)
feature_importance_gain = xr.DataArray(
align_importance(importance_gain, features),
dims=["feature"],
coords={"feature": features},
name="feature_importance_gain",
attrs={"description": "Average gain across all splits the feature is used in."},
)
feature_importance_cover = xr.DataArray(
align_importance(importance_cover, features),
dims=["feature"],
coords={"feature": features},
name="feature_importance_cover",
attrs={"description": "Average coverage across all splits the feature is used in."},
)
feature_importance_total_gain = xr.DataArray(
align_importance(importance_total_gain, features),
dims=["feature"],
coords={"feature": features},
name="feature_importance_total_gain",
attrs={"description": "Total gain across all splits the feature is used in."},
)
feature_importance_total_cover = xr.DataArray(
align_importance(importance_total_cover, features),
dims=["feature"],
coords={"feature": features},
name="feature_importance_total_cover",
attrs={"description": "Total coverage across all splits the feature is used in."},
)
# Store tree information
n_trees = booster.num_boosted_rounds()
state = xr.Dataset(
{
"feature_importance_weight": feature_importance_weight,
"feature_importance_gain": feature_importance_gain,
"feature_importance_cover": feature_importance_cover,
"feature_importance_total_gain": feature_importance_total_gain,
"feature_importance_total_cover": feature_importance_total_cover,
},
attrs={
"description": "Inner state of the best XGBClassifier from RandomizedSearchCV.",
"n_trees": n_trees,
"objective": str(best_estimator.objective),
},
)
state = extract_xgboost_feature_importance(best_estimator, training_data)
state_file = results_dir / "best_estimator_state.nc"
print(f"Storing best estimator state to {state_file}")
state.to_netcdf(state_file, engine="h5netcdf")
elif settings.model == "rf":
# Extract Random Forest-specific information
# Note: cuML's RandomForestClassifier doesn't expose individual trees (estimators_)
# like sklearn does, so we can only extract feature importances and model parameters
# Feature importances (Gini importance)
feature_importances = best_estimator.feature_importances_
feature_importance = xr.DataArray(
feature_importances,
dims=["feature"],
coords={"feature": features},
name="feature_importance",
attrs={"description": "Gini importance (impurity-based feature importance)."},
)
# cuML RF doesn't expose individual trees, so we store model parameters instead
n_estimators = best_estimator.n_estimators
max_depth = best_estimator.max_depth
# OOB score if available
oob_score = None
if hasattr(best_estimator, "oob_score_") and best_estimator.oob_score:
oob_score = float(best_estimator.oob_score_)
# cuML RandomForest doesn't provide per-tree statistics like sklearn
# Store what we have: feature importances and model configuration
attrs = {
"description": "Inner state of the best RandomForestClassifier from RandomizedSearchCV (cuML).",
"n_estimators": int(n_estimators),
"note": "cuML RandomForest does not expose individual tree statistics like sklearn",
}
# Only add optional attributes if they have values
if max_depth is not None:
attrs["max_depth"] = int(max_depth)
if oob_score is not None:
attrs["oob_score"] = oob_score
state = xr.Dataset(
{
"feature_importance": feature_importance,
},
attrs=attrs,
)
state = extract_rf_feature_importance(best_estimator, training_data)
state_file = results_dir / "best_estimator_state.nc"
print(f"Storing best estimator state to {state_file}")
state.to_netcdf(state_file, engine="h5netcdf")
elif settings.model == "knn":
# KNN doesn't have traditional feature importance
# Store information about the training data and neighbors
n_neighbors = best_estimator.n_neighbors
# We can't extract meaningful "state" from KNN in the same way
# but we can store metadata about the model
state = xr.Dataset(
attrs={
"description": "Metadata of the best KNeighborsClassifier from RandomizedSearchCV.",
"n_neighbors": n_neighbors,
"weights": str(best_estimator.weights),
"algorithm": str(best_estimator.algorithm),
"metric": str(best_estimator.metric),
"n_samples_fit": best_estimator.n_samples_fit_,
},
)
state_file = results_dir / "best_estimator_state.nc"
print(f"Storing best estimator metadata to {state_file}")
state.to_netcdf(state_file, engine="h5netcdf")
# Predict probabilities for all cells
print("Predicting probabilities for all cells...")
preds = predict_proba(dataset_ensemble, clf=best_estimator, classes=training_data.y.labels)
preds = predict_proba(dataset_ensemble, model=best_estimator, device=device)
if training_data.targets["y"].dtype == "category":
preds["predicted"] = preds["predicted"].astype("category")
preds["predicted"].cat.categories = training_data.targets["y"].cat.categories
print(f"Predicted probabilities DataFrame with {len(preds)} entries.")
preds_file = results_dir / "predicted_probabilities.parquet"
print(f"Storing predicted probabilities to {preds_file}")