From 2cefe35690aef580b12b3fc5bd96446f57261f99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20H=C3=B6lzer?= Date: Sun, 11 Jan 2026 19:48:19 +0100 Subject: [PATCH] Refactor training script --- src/entropice/ml/dataset.py | 62 +++++- src/entropice/ml/inference.py | 49 ++-- src/entropice/ml/models.py | 299 +++++++++++++++++++++++++ src/entropice/ml/training.py | 405 +++++++--------------------------- 4 files changed, 458 insertions(+), 357 deletions(-) create mode 100644 src/entropice/ml/models.py diff --git a/src/entropice/ml/dataset.py b/src/entropice/ml/dataset.py index 275319a..2495a08 100644 --- a/src/entropice/ml/dataset.py +++ b/src/entropice/ml/dataset.py @@ -149,6 +149,36 @@ class SplittedArrays: train: torch.Tensor | np.ndarray | cp.ndarray test: torch.Tensor | np.ndarray | cp.ndarray + @cached_property + def combined(self) -> torch.Tensor | np.ndarray | cp.ndarray: + """Combined train and test arrays.""" + if isinstance(self.train, torch.Tensor) and isinstance(self.test, torch.Tensor): + return torch.cat([self.train, self.test], dim=0) + elif isinstance(self.train, cp.ndarray) and isinstance(self.test, cp.ndarray): + return cp.concatenate([self.train, self.test], axis=0) + elif isinstance(self.train, np.ndarray) and isinstance(self.test, np.ndarray): + return np.concatenate([self.train, self.test], axis=0) + else: + raise TypeError("Incompatible types for train and test arrays.") + + def as_numpy(self) -> "SplittedArrays": + """Return the arrays as numpy arrays.""" + train_np = ( + self.train.cpu().numpy() + if isinstance(self.train, torch.Tensor) + else self.train.get() + if isinstance(self.train, cp.ndarray) + else self.train + ) + test_np = ( + self.test.cpu().numpy() + if isinstance(self.test, torch.Tensor) + else self.test.get() + if isinstance(self.test, cp.ndarray) + else self.test + ) + return SplittedArrays(train=train_np, test=test_np) + @dataclass(frozen=True, eq=False) class TrainingSet: @@ -185,12 +215,28 @@ class TrainingSet: intervals.append((category_raw_values.min(), category_raw_values.max())) return intervals - @cached_property - def target_labels(self) -> list[str]: - """Labels of the target categories.""" + @property + def target_labels(self) -> list[str] | None: + """Labels of the target categories, None if not categorical.""" binned = self.targets["y"] - assert binned.dtype.name == "category", "Target labels are not categorical." - return list(binned.cat.categories) + if binned.dtype.name == "category": + return list(binned.cat.categories) + else: + return None + + @property + def target_codes(self) -> list[int] | None: + """Label-Codes of the target categories, None if not categorical.""" + binned = self.targets["y"] + if binned.dtype.name == "category": + return list(binned.cat.codes) + else: + return None + + @property + def feature_names(self) -> list[str]: + """Names of the features.""" + return self.features.columns.tolist() def __len__(self): return len(self.z) @@ -538,7 +584,7 @@ class DatasetEnsemble: "none": No caching. "read": Read from cache if exists, otherwise create and save to cache. "overwrite": Always create and save to cache, overwriting existing cache. - Defaults to "none". + Defaults to "read". Yields: Generator[pd.DataFrame]: Generator yielding DataFrames with features for inference. @@ -572,7 +618,7 @@ class DatasetEnsemble: "none": No caching. "read": Read from cache if exists, otherwise create and save to cache. "overwrite": Always create and save to cache, overwriting existing cache. - Defaults to "none". + Defaults to "read". Returns: pd.DataFrame: The training DataFrame. @@ -604,7 +650,7 @@ class DatasetEnsemble: "none": No caching. "read": Read from cache if exists, otherwise create and save to cache. "overwrite": Always create and save to cache, overwriting existing cache. - Defaults to "none". + Defaults to "read". Returns: TrainingSet: The training set. diff --git a/src/entropice/ml/inference.py b/src/entropice/ml/inference.py index 99708b1..5d75b6d 100644 --- a/src/entropice/ml/inference.py +++ b/src/entropice/ml/inference.py @@ -1,17 +1,17 @@ # ruff: noqa: N806 """Inference runs on trained models.""" +from typing import Literal + import cupy as cp import geopandas as gpd import pandas as pd import torch -from cuml.ensemble import RandomForestClassifier -from entropy import ESPAClassifier from rich import pretty, traceback from sklearn import set_config -from xgboost.sklearn import XGBClassifier from entropice.ml.dataset import DatasetEnsemble +from entropice.ml.models import SupportedModel traceback.install() pretty.install() @@ -21,41 +21,40 @@ set_config(array_api_dispatch=True) def predict_proba( e: DatasetEnsemble, - clf: RandomForestClassifier | ESPAClassifier | XGBClassifier, - classes: list, + model: SupportedModel, + device: Literal["cpu", "cuda", "torch"] = "cuda", ) -> gpd.GeoDataFrame: """Get predicted probabilities for each cell. Args: e (DatasetEnsemble): The dataset ensemble configuration. - clf (BaseEstimator): The trained classifier to use for predictions. - classes (list): List of class names. + model: SupportedModel: The trained model to use for predictions. + device (Literal["cpu", "cuda", "torch"]): The device to use for predictions. + This must match with the state of the model! Returns: - list: A list of predicted probabilities for each cell. + gpd.GeoDataFrame: A GeoDataFrame with cell_id, predicted probability, and geometry. """ # Predict in batches to avoid memory issues - batch_size = 10000 + batch_size = 50000 preds = [] - for batch in e.create_batches(batch_size=batch_size): - cols_to_drop = ["geometry"] - if e.target == "darts_mllabels": - cols_to_drop += [col for col in batch.columns if col.startswith("dartsml_")] - else: - cols_to_drop += [col for col in batch.columns if col.startswith("darts_")] - X_batch = batch.drop(columns=cols_to_drop).dropna() - + grid_gdf = e.read_grid() + for batch in e.create_inference_df(batch_size=batch_size): # Skip empty batches (all rows had NaN values) - if len(X_batch) == 0: + if len(batch) == 0: continue - cell_ids = X_batch.index.to_numpy() - cell_geoms = batch.loc[X_batch.index, "geometry"].to_numpy() - X_batch = X_batch.to_numpy(dtype="float64") - X_batch = torch.asarray(X_batch, device=0) - batch_preds = clf.predict(X_batch) + cell_ids = batch.index.to_numpy() + cell_geoms = grid_gdf.loc[batch.index, "geometry"].to_numpy() + + X_batch = batch.to_numpy(dtype="float64") + if device == "torch": + X_batch = torch.from_numpy(X_batch).to("cuda") + elif device == "cuda": + X_batch = cp.asarray(X_batch) + batch_preds = model.predict(X_batch) if isinstance(batch_preds, cp.ndarray): batch_preds = batch_preds.get() elif torch.is_tensor(batch_preds): @@ -63,9 +62,9 @@ def predict_proba( batch_preds = gpd.GeoDataFrame( { "cell_id": cell_ids, - "predicted_class": [classes[i] for i in batch_preds], + "predicted": batch_preds, "geometry": cell_geoms, }, - ).set_crs(epsg=3413, inplace=False) + ).set_crs(grid_gdf.crs, inplace=False) preds.append(batch_preds) return gpd.GeoDataFrame(pd.concat(preds, ignore_index=True)) diff --git a/src/entropice/ml/models.py b/src/entropice/ml/models.py new file mode 100644 index 0000000..f9777b5 --- /dev/null +++ b/src/entropice/ml/models.py @@ -0,0 +1,299 @@ +"""Training helper to create and configure ML models with hyperparameter search.""" + +from dataclasses import dataclass, field +from typing import TypedDict + +import scipy.stats +import xarray as xr +from cuml.ensemble import RandomForestClassifier, RandomForestRegressor +from cuml.neighbors import KNeighborsClassifier, KNeighborsRegressor +from entropy import ESPAClassifier +from scipy.stats._distn_infrastructure import rv_continuous_frozen, rv_discrete_frozen +from xgboost.sklearn import XGBClassifier, XGBRegressor + +from entropice.ml.dataset import TrainingSet +from entropice.utils.types import Task + + +class Distribution(TypedDict): + """Distribution specification for hyperparameter optimization.""" + + distribution: str + low: float | int + high: float | int + + +type HPConfig = dict[str, Distribution | list] +type SupportedModel = ( + ESPAClassifier + | XGBClassifier + | XGBRegressor + | RandomForestClassifier + | RandomForestRegressor + | KNeighborsClassifier + | KNeighborsRegressor +) + + +@dataclass(frozen=True) +class ModelHPOConfig: + """Model - Hyperparameter Optimization Configuration.""" + + model: SupportedModel + hp_config: HPConfig + fit_params: dict[str, object] = field(default_factory=dict) + + @property + def search_space(self) -> dict[str, list | rv_continuous_frozen | rv_discrete_frozen]: + """Convert the HPConfig into a search space dictionary usable by sklearn's RandomizedSearchCV.""" + search_space = {} + for key, dist in self.hp_config.items(): + if isinstance(dist, list): + search_space[key] = dist + continue + assert hasattr(scipy.stats, dist["distribution"]), ( + f"Unknown distribution type for {key}: {dist['distribution']}" + ) + distfn = getattr(scipy.stats, dist["distribution"]) + search_space[key] = distfn(dist["low"], dist["high"]) + return search_space + + +# Hardcode Search Settings for now +espa_hpconfig: HPConfig = { + "eps_cl": {"distribution": "loguniform", "low": 1e-11, "high": 1e-6}, + "eps_e": {"distribution": "loguniform", "low": 1e4, "high": 1e8}, + "initial_K": {"distribution": "randint", "low": 400, "high": 800}, +} +xgboost_hpconfig: HPConfig = { + "learning_rate": {"distribution": "loguniform", "low": 1e-3, "high": 1e-1}, + "n_estimators": {"distribution": "randint", "low": 50, "high": 2000}, +} +rf_hpconfig: HPConfig = { + "max_depth": {"distribution": "randint", "low": 5, "high": 50}, + "n_estimators": {"distribution": "randint", "low": 50, "high": 1000}, +} +knn_hpconfig: HPConfig = { + "n_neighbors": {"distribution": "randint", "low": 10, "high": 200}, + "weights": ["uniform", "distance"], # Categorical parameter +} + + +def get_model_hpo_config(model: str, task: Task, **model_kwargs) -> ModelHPOConfig: + """Create a model and its hyperparameter optimization configuration based on the specified model type and task. + + Args: + model (str): The type of model to create. Supported values are "espa", "xgboost", "rf", and "knn". + task (Task): The type of task to perform. Supported values are "classifier" and "regressor". + Kwargs: + model_kwargs: Additional keyword arguments to pass to the model constructor. + + Returns: + ModelHPOConfig: A dataclass containing the model instance, hyperparameter configuration, and any fit parameters. + + """ + model_type = "classifier" if task in ("binary", "count_regimes", "density_regimes") else "regressor" + match (model, model_type): + case ("espa", "classifier"): + clf = ESPAClassifier(20, 0.1, 0.1, robust=True, **model_kwargs) + fit_params = {"max_iter": 300} + return ModelHPOConfig(clf, espa_hpconfig, fit_params) + case ("xgboost", "classifier"): + clf = XGBClassifier( + objective="multi:softprob" if task != "binary" else "binary:logistic", + eval_metric="mlogloss" if task != "binary" else "logloss", + tree_method="hist", + max_depth=10, + **model_kwargs, + ) + return ModelHPOConfig(clf, xgboost_hpconfig) + case ("xgboost", "regressor"): + reg = XGBRegressor( + objective="reg:squarederror", + eval_metric="rmse", + tree_method="hist", + max_depth=10, + **model_kwargs, + ) + return ModelHPOConfig(reg, xgboost_hpconfig) + case ("rf", "classifier"): + clf = RandomForestClassifier(split_criterion="entropy", **model_kwargs) + return ModelHPOConfig(clf, rf_hpconfig) + case ("rf", "regressor"): + reg = RandomForestRegressor(split_criterion="variance", **model_kwargs) + return ModelHPOConfig(reg, rf_hpconfig) + case ("knn", "classifier"): + clf = KNeighborsClassifier(**model_kwargs) + return ModelHPOConfig(clf, knn_hpconfig) + case ("knn", "regressor"): + reg = KNeighborsRegressor(**model_kwargs) + return ModelHPOConfig(reg, knn_hpconfig) + case _: + raise ValueError(f"Unsupported model/task combination: {model}/{task}") + + +def extract_espa_feature_importance(model: ESPAClassifier, training_data: TrainingSet) -> xr.Dataset: + """Extract the inner state of a trained ESPAClassifier as an xarray Dataset.""" + # Annotate the state with xarray metadata + boxes = list(range(model.K_)) + box_centers = xr.DataArray( + model.S_.cpu().numpy(), + dims=["feature", "box"], + coords={"feature": training_data.feature_names, "box": boxes}, + name="box_centers", + attrs={"description": "Centers of the boxes in feature space."}, + ) + box_assignments = xr.DataArray( + model.Lambda_.cpu().numpy(), + dims=["class", "box"], + coords={"class": training_data.target_labels, "box": boxes}, + name="box_assignments", + attrs={"description": "Assignments of samples to boxes."}, + ) + feature_weights = xr.DataArray( + model.W_.cpu().numpy(), + dims=["feature"], + coords={"feature": training_data.feature_names}, + name="feature_weights", + attrs={"description": "Feature weights for each box."}, + ) + state = xr.Dataset( + { + "box_centers": box_centers, + "box_assignments": box_assignments, + "feature_weights": feature_weights, + }, + attrs={ + "description": "Inner state of the best ESPAClassifier from RandomizedSearchCV.", + }, + ) + return state + + +def extract_xgboost_feature_importance(model: XGBClassifier | XGBRegressor, training_data: TrainingSet) -> xr.Dataset: + """Extract feature importance from a trained XGBoost model as an xarray Dataset.""" + # Extract XGBoost-specific information + # Get the underlying booster + booster = model.get_booster() + + # Feature importance with different importance types + # Note: get_score() returns dict with keys like 'f0', 'f1', etc. (feature indices) + importance_weight = booster.get_score(importance_type="weight") + importance_gain = booster.get_score(importance_type="gain") + importance_cover = booster.get_score(importance_type="cover") + importance_total_gain = booster.get_score(importance_type="total_gain") + importance_total_cover = booster.get_score(importance_type="total_cover") + + # Create aligned arrays for all features (including zero-importance) + def align_importance(importance_dict, features): + """Align importance dict to feature list, filling missing with 0. + + XGBoost returns feature indices (f0, f1, ...) as keys, so we need to map them. + """ + return [importance_dict.get(f"f{i}", 0.0) for i in range(len(features))] + + feature_importance_weight = xr.DataArray( + align_importance(importance_weight, training_data.feature_names), + dims=["feature"], + coords={"feature": training_data.feature_names}, + name="feature_importance_weight", + attrs={"description": "Number of times a feature is used to split the data across all trees."}, + ) + feature_importance_gain = xr.DataArray( + align_importance(importance_gain, training_data.feature_names), + dims=["feature"], + coords={"feature": training_data.feature_names}, + name="feature_importance_gain", + attrs={"description": "Average gain across all splits the feature is used in."}, + ) + feature_importance_cover = xr.DataArray( + align_importance(importance_cover, training_data.feature_names), + dims=["feature"], + coords={"feature": training_data.feature_names}, + name="feature_importance_cover", + attrs={"description": "Average coverage across all splits the feature is used in."}, + ) + feature_importance_total_gain = xr.DataArray( + align_importance(importance_total_gain, training_data.feature_names), + dims=["feature"], + coords={"feature": training_data.feature_names}, + name="feature_importance_total_gain", + attrs={"description": "Total gain across all splits the feature is used in."}, + ) + feature_importance_total_cover = xr.DataArray( + align_importance(importance_total_cover, training_data.feature_names), + dims=["feature"], + coords={"feature": training_data.feature_names}, + name="feature_importance_total_cover", + attrs={"description": "Total coverage across all splits the feature is used in."}, + ) + + # Store tree information + n_trees = booster.num_boosted_rounds() + + state = xr.Dataset( + { + "feature_importance_weight": feature_importance_weight, + "feature_importance_gain": feature_importance_gain, + "feature_importance_cover": feature_importance_cover, + "feature_importance_total_gain": feature_importance_total_gain, + "feature_importance_total_cover": feature_importance_total_cover, + }, + attrs={ + "description": "Inner state of the best XGBClassifier from RandomizedSearchCV.", + "n_trees": n_trees, + "objective": str(model.objective), + }, + ) + return state + + +def extract_rf_feature_importance( + model: RandomForestClassifier | RandomForestRegressor, training_data: TrainingSet +) -> xr.Dataset: + """Extract feature importance from a trained RandomForest model as an xarray Dataset.""" + # Extract Random Forest-specific information + # Note: cuML's RandomForestClassifier doesn't expose individual trees (estimators_) + # like sklearn does, so we can only extract feature importances and model parameters + + # Feature importances (Gini importance) + feature_importances = model.feature_importances_ + + feature_importance = xr.DataArray( + feature_importances, + dims=["feature"], + coords={"feature": training_data.feature_names}, + name="feature_importance", + attrs={"description": "Gini importance (impurity-based feature importance)."}, + ) + + # cuML RF doesn't expose individual trees, so we store model parameters instead + n_estimators = model.n_estimators + max_depth = model.max_depth + + # OOB score if available + oob_score = None + if hasattr(model, "oob_score_") and model.oob_score: + oob_score = float(model.oob_score_) + + # cuML RandomForest doesn't provide per-tree statistics like sklearn + # Store what we have: feature importances and model configuration + attrs = { + "description": "Inner state of the best RandomForestClassifier from RandomizedSearchCV (cuML).", + "n_estimators": int(n_estimators), + "note": "cuML RandomForest does not expose individual tree statistics like sklearn", + } + + # Only add optional attributes if they have values + if max_depth is not None: + attrs["max_depth"] = int(max_depth) + if oob_score is not None: + attrs["oob_score"] = oob_score + + state = xr.Dataset( + { + "feature_importance": feature_importance, + }, + attrs=attrs, + ) + return state diff --git a/src/entropice/ml/training.py b/src/entropice/ml/training.py index b8317ec..a4d4327 100644 --- a/src/entropice/ml/training.py +++ b/src/entropice/ml/training.py @@ -4,42 +4,31 @@ import pickle from dataclasses import asdict, dataclass from functools import partial -import cupy as cp import cyclopts +import numpy as np import pandas as pd import toml -import torch import xarray as xr -from cuml.ensemble import RandomForestClassifier -from cuml.neighbors import KNeighborsClassifier -from entropy import ESPAClassifier from rich import pretty, traceback -from scipy.stats import loguniform, randint -from scipy.stats._distn_infrastructure import rv_continuous_frozen, rv_discrete_frozen from sklearn import set_config -from sklearn.metrics import ( - accuracy_score, - confusion_matrix, - f1_score, - jaccard_score, - precision_score, - recall_score, -) +from sklearn.metrics import accuracy_score, confusion_matrix, f1_score, jaccard_score, precision_score, recall_score from sklearn.model_selection import KFold, RandomizedSearchCV from stopuhr import stopwatch -from xgboost.sklearn import XGBClassifier -from entropice.ml.dataset import DatasetEnsemble +from entropice.ml.dataset import DatasetEnsemble, SplittedArrays from entropice.ml.inference import predict_proba +from entropice.ml.models import ( + extract_espa_feature_importance, + extract_rf_feature_importance, + extract_xgboost_feature_importance, + get_model_hpo_config, +) from entropice.utils.paths import get_cv_results_dir from entropice.utils.types import Model, Task traceback.install() pretty.install() -# Disabled array_api_dispatch to avoid namespace conflicts between NumPy and CuPy -# when using XGBoost with device="cuda" -set_config(array_api_dispatch=True) cli = cyclopts.App("entropice-training", config=cyclopts.config.Toml("training-config.toml")) # ty:ignore[invalid-argument-type] @@ -82,93 +71,21 @@ _metric_functions = { @cyclopts.Parameter("*") @dataclass(frozen=True, kw_only=True) class CVSettings: + """Cross-validation settings for model training.""" + n_iter: int = 2000 - robust: bool = False task: Task = "binary" model: Model = "espa" @dataclass(frozen=True, kw_only=True) class TrainingSettings(DatasetEnsemble, CVSettings): + """Helper Wrapper to store combined training and dataset ensemble settings.""" + param_grid: dict cv_splits: int metrics: list[str] - classes: list[str] - - -def _create_clf( - settings: CVSettings, -): - if settings.model == "espa": - if settings.task == "binary": - param_grid = { - "eps_cl": loguniform(1e-4, 1e1), - "eps_e": loguniform(1e1, 1e7), - "initial_K": randint(20, 400), - } - else: - param_grid = { - "eps_cl": loguniform(1e-11, 1e-6), - "eps_e": loguniform(1e4, 1e8), - "initial_K": randint(400, 800), - } - clf = ESPAClassifier(20, 0.1, 0.1, random_state=42, robust=settings.robust) - fit_params = {"max_iter": 300} - elif settings.model == "xgboost": - param_grid = { - "learning_rate": loguniform(1e-4, 1e-1), - "max_depth": randint(5, 50), - "n_estimators": randint(50, 1000), - } - clf = XGBClassifier( - objective="multi:softprob" if settings.task != "binary" else "binary:logistic", - eval_metric="mlogloss" if settings.task != "binary" else "logloss", - random_state=42, - tree_method="hist", - device="gpu", # Using CPU to avoid CuPy/NumPy namespace conflicts in sklearn metrics - ) - fit_params = {} - elif settings.model == "rf": - param_grid = { - "max_depth": randint(5, 50), - "n_estimators": randint(50, 1000), - } - clf = RandomForestClassifier(random_state=42, split_criterion="entropy") - fit_params = {} - elif settings.model == "knn": - param_grid = { - "n_neighbors": randint(10, 200), - "weights": ["uniform", "distance"], - } - clf = KNeighborsClassifier() - fit_params = {} - else: - raise ValueError(f"Unknown model: {settings.model}") - - return clf, param_grid, fit_params - - -def _serialize_param_grid(param_grid): - param_grid_serializable = {} - for key, dist in param_grid.items(): - # ! Hacky, but I can't find a better way to serialize scipy distributions once they are created - if isinstance(dist, rv_continuous_frozen): - param_grid_serializable[key] = { - "distribution": "loguniform", - "low": dist.a, - "high": dist.b, - } - elif isinstance(dist, rv_discrete_frozen): - param_grid_serializable[key] = { - "distribution": "randint", - "low": dist.a, - "high": dist.b, - } - elif isinstance(dist, list): - param_grid_serializable[key] = dist - else: - raise ValueError(f"Unknown distribution type for {key}: {type(dist)}") - return param_grid_serializable + classes: list[str] | None @cli.default @@ -179,24 +96,24 @@ def random_cv( """Perform random cross-validation on the training dataset. Args: - grid (Grid): The grid type to use. - level (int): The grid level to use. - n_iter (int, optional): Number of parameter settings that are sampled. Defaults to 2000. - robust (bool, optional): Whether to use robust training. Defaults to False. - task (Literal["binary", "count", "density"], optional): The classification task type. Defaults to "binary". + dataset_ensemble (DatasetEnsemble): The dataset ensemble configuration. + settings (CVSettings): The cross-validation settings. """ - device = "torch" if settings.model in ["espa"] else "cuda" - print("Creating training data...") - training_data = dataset_ensemble.create_cat_training_dataset(task=settings.task, device=device) + # Since we use cuml and xgboost libraries, we can only enable array API for ESPA + use_array_api = settings.model == "espa" + device = "torch" if use_array_api else "cuda" + set_config(array_api_dispatch=use_array_api) - clf, param_grid, fit_params = _create_clf(settings) - print(f"Using model: {settings.model} with parameters: {param_grid}") + print("Creating training data...") + training_data = dataset_ensemble.create_training_set(task=settings.task, device=device) + model_hpo_config = get_model_hpo_config(settings.model, settings.task) + print(f"Using model: {settings.model} with parameters: {model_hpo_config.hp_config}") cv = KFold(n_splits=5, shuffle=True, random_state=42) metrics = _metrics["binary" if settings.task == "binary" else "multiclass"] search = RandomizedSearchCV( - clf, - param_grid, + model_hpo_config.model, + model_hpo_config.search_space, n_iter=settings.n_iter, n_jobs=1, cv=cv, @@ -208,47 +125,27 @@ def random_cv( print(f"Starting RandomizedSearchCV with {search.n_iter} candidates...") with stopwatch(f"RandomizedSearchCV fitting for {search.n_iter} candidates"): - y_train = ( - training_data.y.train.get() - if settings.model == "xgboost" and isinstance(training_data.y.train, cp.ndarray) - else training_data.y.train + search.fit( + training_data.X.train, + # XGBoost returns it's labels as numpy arrays instead of cupy arrays + # Thus, for the scoring to work, we need to convert them back to numpy + training_data.y.as_numpy().train if settings.model == "xgboost" else training_data.y.train, + **model_hpo_config.fit_params, ) - search.fit(training_data.X.train, y_train, **fit_params) print("Best parameters combination found:") best_estimator = search.best_estimator_ best_parameters = best_estimator.get_params() - for param_name in sorted(param_grid.keys()): + for param_name in sorted(model_hpo_config.hp_config.keys()): print(f"{param_name}: {best_parameters[param_name]}") - y_test = ( - training_data.y.test.get() - if settings.model == "xgboost" and isinstance(training_data.y.test, cp.ndarray) - else training_data.y.test + test_accuracy = search.score( + training_data.X.test, + training_data.y.as_numpy().test if settings.model == "xgboost" else training_data.y.test, ) - test_accuracy = search.score(training_data.X.test, y_test) print(f"Accuracy of the best parameters using the inner CV of the random search: {search.best_score_:.3f}") print(f"Accuracy on test set: {test_accuracy:.3f}") - # Compute predictions on the test set - y_pred = best_estimator.predict(training_data.X.test) - labels = list(range(len(training_data.y.labels))) - y_test = torch.asarray(y_test, device="cuda") - y_pred = torch.asarray(y_pred, device="cuda") - labels = torch.asarray(labels, device="cuda") - - test_metrics = {metric: _metric_functions[metric](y_test, y_pred) for metric in metrics} - - # Get a confusion matrix - cm = confusion_matrix(y_test, y_pred, labels=labels) - label_names = [training_data.y.labels[i] for i in range(len(training_data.y.labels))] - cm = xr.DataArray( - cm.cpu().numpy(), - dims=["true_label", "predicted_label"], - coords={"true_label": label_names, "predicted_label": label_names}, - name="confusion_matrix", - ) - results_dir = get_cv_results_dir( "random_search", grid=dataset_ensemble.grid, @@ -257,15 +154,13 @@ def random_cv( ) # Store the search settings - # First convert the param_grid distributions to a serializable format - param_grid_serializable = _serialize_param_grid(param_grid) combined_settings = TrainingSettings( **asdict(settings), **asdict(dataset_ensemble), - param_grid=param_grid_serializable, + param_grid=model_hpo_config.hp_config, cv_splits=cv.get_n_splits(), metrics=metrics, - classes=training_data.y.labels, + classes=training_data.target_labels, ) settings_file = results_dir / "search_settings.toml" print(f"Storing search settings to {settings_file}") @@ -288,209 +183,71 @@ def random_cv( print(f"Storing CV results to {results_file}") results.to_parquet(results_file) - # Store the test metrics - test_metrics_file = results_dir / "test_metrics.toml" + # Compute predictions on the all sets and move them to numpy for metric computations + y_pred = SplittedArrays( + train=best_estimator.predict(training_data.X.train), + test=best_estimator.predict(training_data.X.test), + ).as_numpy() + + # Compute and StoreMetrics + y = training_data.y.as_numpy() + test_metrics = {metric: _metric_functions[metric](y.test, y_pred.test) for metric in metrics} + train_metrics = {metric: _metric_functions[metric](y.train, y_pred.train) for metric in metrics} + combined_metrics = {metric: _metric_functions[metric](y.combined, y_pred.combined) for metric in metrics} + all_metrics = { + "test_metrics": test_metrics, + "train_metrics": train_metrics, + "combined_metrics": combined_metrics, + } + test_metrics_file = results_dir / "metrics.toml" print(f"Storing test metrics to {test_metrics_file}") with open(test_metrics_file, "w") as f: - toml.dump({"test_metrics": test_metrics}, f) + toml.dump(all_metrics, f) - # Store the confusion matrix - cm_file = results_dir / "confusion_matrix.nc" - print(f"Storing confusion matrix to {cm_file}") - cm.to_netcdf(cm_file, engine="h5netcdf") + # Make confusion matrices for classification taasks + if settings.task in ["binary", "count_regimes", "density_regimes"]: + labels = np.array(training_data.target_codes) + cm = xr.Dataset( + { + "test": (("true_label", "predicted_label"), confusion_matrix(y.test, y_pred.test, labels=labels)), + "train": (("true_label", "predicted_label"), confusion_matrix(y.train, y_pred.train, labels=labels)), + "combined": ( + ("true_label", "predicted_label"), + confusion_matrix(y.combined, y_pred.combined, labels=labels), + ), + }, + coords={"true_label": training_data.target_labels, "predicted_label": training_data.target_labels}, + ) + # Store the confusion matrices + cm_file = results_dir / "confusion_matrix.nc" + print(f"Storing confusion matrices to {cm_file}") + cm.to_netcdf(cm_file, engine="h5netcdf") # Get the inner state of the best estimator - features = training_data.X.data.columns.tolist() - if settings.model == "espa": - # Annotate the state with xarray metadata - boxes = list(range(best_estimator.K_)) - box_centers = xr.DataArray( - best_estimator.S_.cpu().numpy(), - dims=["feature", "box"], - coords={"feature": features, "box": boxes}, - name="box_centers", - attrs={"description": "Centers of the boxes in feature space."}, - ) - box_assignments = xr.DataArray( - best_estimator.Lambda_.cpu().numpy(), - dims=["class", "box"], - coords={"class": training_data.y.labels, "box": boxes}, - name="box_assignments", - attrs={"description": "Assignments of samples to boxes."}, - ) - feature_weights = xr.DataArray( - best_estimator.W_.cpu().numpy(), - dims=["feature"], - coords={"feature": features}, - name="feature_weights", - attrs={"description": "Feature weights for each box."}, - ) - state = xr.Dataset( - { - "box_centers": box_centers, - "box_assignments": box_assignments, - "feature_weights": feature_weights, - }, - attrs={ - "description": "Inner state of the best ESPAClassifier from RandomizedSearchCV.", - }, - ) + state = extract_espa_feature_importance(best_estimator, training_data) state_file = results_dir / "best_estimator_state.nc" print(f"Storing best estimator state to {state_file}") state.to_netcdf(state_file, engine="h5netcdf") elif settings.model == "xgboost": - # Extract XGBoost-specific information - # Get the underlying booster - booster = best_estimator.get_booster() - - # Feature importance with different importance types - # Note: get_score() returns dict with keys like 'f0', 'f1', etc. (feature indices) - importance_weight = booster.get_score(importance_type="weight") - importance_gain = booster.get_score(importance_type="gain") - importance_cover = booster.get_score(importance_type="cover") - importance_total_gain = booster.get_score(importance_type="total_gain") - importance_total_cover = booster.get_score(importance_type="total_cover") - - # Create aligned arrays for all features (including zero-importance) - def align_importance(importance_dict, features): - """Align importance dict to feature list, filling missing with 0. - - XGBoost returns feature indices (f0, f1, ...) as keys, so we need to map them. - """ - return [importance_dict.get(f"f{i}", 0.0) for i in range(len(features))] - - feature_importance_weight = xr.DataArray( - align_importance(importance_weight, features), - dims=["feature"], - coords={"feature": features}, - name="feature_importance_weight", - attrs={"description": "Number of times a feature is used to split the data across all trees."}, - ) - feature_importance_gain = xr.DataArray( - align_importance(importance_gain, features), - dims=["feature"], - coords={"feature": features}, - name="feature_importance_gain", - attrs={"description": "Average gain across all splits the feature is used in."}, - ) - feature_importance_cover = xr.DataArray( - align_importance(importance_cover, features), - dims=["feature"], - coords={"feature": features}, - name="feature_importance_cover", - attrs={"description": "Average coverage across all splits the feature is used in."}, - ) - feature_importance_total_gain = xr.DataArray( - align_importance(importance_total_gain, features), - dims=["feature"], - coords={"feature": features}, - name="feature_importance_total_gain", - attrs={"description": "Total gain across all splits the feature is used in."}, - ) - feature_importance_total_cover = xr.DataArray( - align_importance(importance_total_cover, features), - dims=["feature"], - coords={"feature": features}, - name="feature_importance_total_cover", - attrs={"description": "Total coverage across all splits the feature is used in."}, - ) - - # Store tree information - n_trees = booster.num_boosted_rounds() - - state = xr.Dataset( - { - "feature_importance_weight": feature_importance_weight, - "feature_importance_gain": feature_importance_gain, - "feature_importance_cover": feature_importance_cover, - "feature_importance_total_gain": feature_importance_total_gain, - "feature_importance_total_cover": feature_importance_total_cover, - }, - attrs={ - "description": "Inner state of the best XGBClassifier from RandomizedSearchCV.", - "n_trees": n_trees, - "objective": str(best_estimator.objective), - }, - ) + state = extract_xgboost_feature_importance(best_estimator, training_data) state_file = results_dir / "best_estimator_state.nc" print(f"Storing best estimator state to {state_file}") state.to_netcdf(state_file, engine="h5netcdf") elif settings.model == "rf": - # Extract Random Forest-specific information - # Note: cuML's RandomForestClassifier doesn't expose individual trees (estimators_) - # like sklearn does, so we can only extract feature importances and model parameters - - # Feature importances (Gini importance) - feature_importances = best_estimator.feature_importances_ - - feature_importance = xr.DataArray( - feature_importances, - dims=["feature"], - coords={"feature": features}, - name="feature_importance", - attrs={"description": "Gini importance (impurity-based feature importance)."}, - ) - - # cuML RF doesn't expose individual trees, so we store model parameters instead - n_estimators = best_estimator.n_estimators - max_depth = best_estimator.max_depth - - # OOB score if available - oob_score = None - if hasattr(best_estimator, "oob_score_") and best_estimator.oob_score: - oob_score = float(best_estimator.oob_score_) - - # cuML RandomForest doesn't provide per-tree statistics like sklearn - # Store what we have: feature importances and model configuration - attrs = { - "description": "Inner state of the best RandomForestClassifier from RandomizedSearchCV (cuML).", - "n_estimators": int(n_estimators), - "note": "cuML RandomForest does not expose individual tree statistics like sklearn", - } - - # Only add optional attributes if they have values - if max_depth is not None: - attrs["max_depth"] = int(max_depth) - if oob_score is not None: - attrs["oob_score"] = oob_score - - state = xr.Dataset( - { - "feature_importance": feature_importance, - }, - attrs=attrs, - ) + state = extract_rf_feature_importance(best_estimator, training_data) state_file = results_dir / "best_estimator_state.nc" print(f"Storing best estimator state to {state_file}") state.to_netcdf(state_file, engine="h5netcdf") - elif settings.model == "knn": - # KNN doesn't have traditional feature importance - # Store information about the training data and neighbors - n_neighbors = best_estimator.n_neighbors - - # We can't extract meaningful "state" from KNN in the same way - # but we can store metadata about the model - state = xr.Dataset( - attrs={ - "description": "Metadata of the best KNeighborsClassifier from RandomizedSearchCV.", - "n_neighbors": n_neighbors, - "weights": str(best_estimator.weights), - "algorithm": str(best_estimator.algorithm), - "metric": str(best_estimator.metric), - "n_samples_fit": best_estimator.n_samples_fit_, - }, - ) - state_file = results_dir / "best_estimator_state.nc" - print(f"Storing best estimator metadata to {state_file}") - state.to_netcdf(state_file, engine="h5netcdf") - # Predict probabilities for all cells print("Predicting probabilities for all cells...") - preds = predict_proba(dataset_ensemble, clf=best_estimator, classes=training_data.y.labels) + preds = predict_proba(dataset_ensemble, model=best_estimator, device=device) + if training_data.targets["y"].dtype == "category": + preds["predicted"] = preds["predicted"].astype("category") + preds["predicted"].cat.categories = training_data.targets["y"].cat.categories print(f"Predicted probabilities DataFrame with {len(preds)} entries.") preds_file = results_dir / "predicted_probabilities.parquet" print(f"Storing predicted probabilities to {preds_file}")