Source code for tdub.ml_train

"""Module for training BDTs."""

# stdlib
import json
import logging
import os
from pathlib import PosixPath
from pprint import pformat
from typing import Optional, Tuple, List, Union, Dict, Any

# externals
import matplotlib

matplotlib.use("pdf")
import joblib
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from pygram11 import histogram
from scipy import interp
from sklearn.base import BaseEstimator
from sklearn.model_selection import KFold, train_test_split
from sklearn.metrics import auc, roc_auc_score, roc_curve, plot_roc_curve
from sklearn.experimental import enable_hist_gradient_boosting  # noqa
from sklearn.ensemble import HistGradientBoostingClassifier

# tdub
from tdub.art import setup_tdub_style, draw_atlas_label
from tdub.data import Region, features_for, quick_files, selection_for, selection_as_numexpr
from tdub.frames import iterative_selection, drop_cols
from tdub.hist import bin_centers
from tdub.math import ks_twosample_binned
import tdub.config

setup_tdub_style()
log = logging.getLogger(__name__)


[docs]class SingleTrainingSummary: """Describes some properties of a single training. Parameters ---------- auc : float the AUC value for the model ks_test_sig : float the binned KS test value for signal ks_pvalue_sig : float the binned KS test p-value for signal ks_test_bkg : float the binned KS test value for background ks_pvalue_bkg : float the binned KS test p-value for background kwargs : dict currently unused Attributes ---------- auc : float the AUC value for the model ks_test_sig : float the binned KS test value for signal ks_pvalue_sig : float the binned KS test p-value for signal ks_test_bkg : float the binned KS test value for background ks_pvalue_bkg : float the binned KS test p-value for background """ def __init__( self, *, auc: float = -1.0, ks_test_sig: float = -1.0, ks_pvalue_sig: float = -1.0, ks_test_bkg: float = -1.0, ks_pvalue_bkg: float = -1.0, **kwargs, ) -> None: """Class init.""" self.auc = auc self.ks_test_sig = ks_test_sig self.ks_pvalue_sig = ks_pvalue_sig self.ks_test_bkg = ks_test_bkg self.ks_pvalue_bkg = ks_pvalue_bkg self.bad_ks = self.ks_pvalue_sig < 0.2 or self.ks_pvalue_bkg < 0.2 def __repr__(self) -> str: """Clean representation of the result.""" p1 = f"auc={self.auc:0.3}" p2 = f"ks_test_sig={self.ks_test_sig:0.5}" p3 = f"ks_pvalue_sig={self.ks_pvalue_sig:0.5}" p4 = f"ks_test_bkg={self.ks_test_bkg:0.5}" p5 = f"ks_pvalue_bkg={self.ks_pvalue_bkg:0.5}" return f"SingleTrainingSummary({p1}, {p2}, {p3}, {p4}, {p5})"
[docs]class ResponseHistograms: """Create and use histogrammed model response information. Parameters ---------- response_type : str Models provide different types of response, like a raw prediction or a probability of signal. This class supports: - `"predict"` (for LGBM), - `"decision_function"` (for Scikit-learn) - `"proba"` (for either). model : BaseEstimator The trained model. X_train : array_like Training data feature matrix. X_test : array_like Testing data feature matrix. y_train : array_like Training data labels. y_test : array_like Testing data labels. w_train : array_like Training data event weights w_test : array_like Testing data event weights nbins : int Number of bins to use. """ def __init__( self, response_type, model, X_train, X_test, y_train, y_test, w_train, w_test, nbins: int = 30, ): """Class constructor.""" self.response_type = response_type r_train, r_test = self._eval_model(model, X_train, X_test) self._calculate(r_train, r_test, y_train, y_test, w_train, w_test, nbins) def _eval_model(self, model, X_train, X_test): if self.response_type == "predict": try: r_test = model.predict(X_test, raw_score=True) r_train = model.predict(X_train, raw_score=True) except TypeError: r_test = model.predict(X_test) r_train = model.predict(X_train) elif self.response_type == "decision_function": r_test = model.decision_function(X_test) r_train = model.decision_function(X_train) elif self.response_type == "proba": r_test = model.predict_proba(X_test)[:, 1] r_train = model.predict_proba(X_train)[:, 1] else: raise ValueError("response_type must be 'predict' or 'proba'") return r_train, r_test def _calculate(self, r_train, r_test, y_train, y_test, w_train, w_test, nbins): sig_test_p = y_test == 1 sig_train_p = y_train == 1 bkg_test_p = np.invert(sig_test_p) bkg_train_p = np.invert(sig_train_p) sig_train = r_train[sig_train_p] sig_test = r_test[sig_test_p] bkg_train = r_train[bkg_train_p] bkg_test = r_test[bkg_test_p] xmin = min(bkg_train.min(), bkg_test.min()) xmax = max(sig_train.max(), sig_test.max()) self.bins = np.linspace(xmin, xmax, nbins + 1) # fmt: off sig_test_h = histogram(sig_test, bins=self.bins, density=True, weights=w_test[sig_test_p]) bkg_test_h = histogram(bkg_test, bins=self.bins, density=True, weights=w_test[bkg_test_p]) sig_train_h = histogram(sig_train, bins=self.bins, density=True, weights=w_train[sig_train_p]) bkg_train_h = histogram(bkg_train, bins=self.bins, density=True, weights=w_train[bkg_train_p]) # fmt: on self.train_sig_h = sig_train_h[0] self.train_sig_e = sig_train_h[1] self.train_bkg_h = bkg_train_h[0] self.train_bkg_e = bkg_train_h[1] self.test_sig_h = sig_test_h[0] self.test_sig_e = sig_test_h[1] self.test_bkg_h = bkg_test_h[0] self.test_bkg_e = bkg_test_h[1] self.ks_sig = ks_twosample_binned( self.train_sig_h, self.test_sig_h, self.train_sig_e, self.test_sig_e ) self.ks_bkg = ks_twosample_binned( self.train_bkg_h, self.test_bkg_h, self.train_bkg_e, self.test_bkg_e ) @property def ks_sig_test(self) -> float: """float: Two sample binned KS test for signal.""" return round(float(self.ks_sig[0]), 5) @property def ks_sig_pval(self) -> float: """float: Two sample binned KS p-value for signal.""" return round(float(self.ks_sig[1]), 5) @property def ks_bkg_test(self) -> float: """float: Two sample binned KS test for background.""" return round(float(self.ks_bkg[0]), 5) @property def ks_bkg_pval(self) -> float: """float: Two sample binned KS p-value for background.""" return round(float(self.ks_bkg[1]), 5)
[docs] def draw( self, ax: Optional[plt.Axes] = None, xlabel: Optional[str] = None, ) -> Tuple[plt.Figure, plt.Axes]: """Draw the response histograms. Parameters ---------- ax : matplotlib.axes.Axes, optional Predefined matplotlib axes to use. xlabel : str, optional Override the automated xlabel definition. Returns ------- matplotlib.figure.Figure The matplotlib figure object. matplotlib.axes.Axes The matplotlib axes object. """ if ax is None: fig, ax = plt.subplots(figsize=(5.06, 4.5)) else: fig = ax.get_figure() bc = bin_centers(self.bins) ax.hist( bc, bins=self.bins, weights=self.train_sig_h, label=r"$tW$ (train)", histtype="stepfilled", alpha=0.5, edgecolor="C0", color="C0", ) ax.errorbar( bc, self.test_sig_h, yerr=self.test_sig_e, label=r"$tW$ (test)", color="C0", fmt="o", markersize=4, ) ax.hist( bc, bins=self.bins, weights=self.train_bkg_h, label=r"$t\bar{t}$ (train)", histtype="stepfilled", alpha=0.4, edgecolor="C3", color="C3", ) ax.errorbar( bc, self.test_bkg_h, yerr=self.test_bkg_e, label=r"$t\bar{t}$ (test)", color="C3", fmt="o", markersize=4, ) if self.response_type == "proba": ax.set_xlim([0, 1]) else: ax.set_xlim([self.bins[0], self.bins[-1]]) ax.set_ylim([0, 1.35 * ax.get_ylim()[1]]) ax.legend(loc="upper right", ncol=1, frameon=False, numpoints=1) handles, labels = ax.get_legend_handles_labels() handles = [handles[0], handles[2], handles[1], handles[3]] labels = [labels[0], labels[2], labels[1], labels[3]] ax.legend(handles, labels, loc="upper right", ncol=1, frameon=False, numpoints=1) ax.set_ylabel("Arbitrary Units") draw_atlas_label( ax, cme_and_lumi=False, extra_lines=["$tW$ BDT Training"], follow_shift=0.22, ) if xlabel is None: if self.response_type == "proba": ax.set_xlabel("Classifier Signal Probability") else: ax.set_xlabel("Classifier Response") else: ax.set_xlabel(xlabel) return fig, ax
[docs]def prepare_from_root( sig_files: List[str], bkg_files: List[str], region: Union[Region, str], branches: Optional[List[str]] = None, override_selection: Optional[str] = None, weight_mean: Optional[float] = None, weight_scale: Optional[float] = None, scale_sum_weights: bool = True, use_campaign_weight: bool = False, use_tptrw: bool = False, use_trrw: bool = False, test_case_size: Optional[int] = None, bkg_sample_frac: Optional[float] = None, ) -> Tuple[pd.DataFrame, np.ndarray, np.ndarray]: """Prepare the data to train in a region with signal and background ROOT files. Parameters ---------- sig_files : list(str) List of signal ROOT files. bkg_files : list(str) List of background ROOT files. region : Region or str Region where we're going to perform the training. branches : list(str), optional Override the list of features (usually defined by the region). override_selection : str, optional Manual selection string to apply to the dataset (this will override the region defined selection). weight_mean : float, optional Scale all weights such that the mean weight is this value. Cannot be used with `weight_scale`. weight_scale : float, optional Value to scale all weights by, cannot be used with `weight_mean`. scale_sum_weights : bool Scale sum of weights of signal to be sum of weights of background. use_campaign_weight : bool See the parameter description for :py:func:`tdub.frames.iterative_selection`. use_tptrw : bool Apply the top pt reweighting factor. use_trrw : bool Apply the top recursive reweighting factor. test_case_size : int, optional Prepare a small test case dataset using this many training and testing samples. bkg_sample_frac : float, optional Sample a fraction of the background data. Returns ------- :py:obj:`pandas.DataFrame` Event feature matrix. :py:obj:`numpy.ndarray` Event labels (0 for background; 1 for signal). :py:obj:`numpy.ndarray` Event weights. Examples -------- >>> from tdub.data import quick_files >>> from tdub.train import prepare_from_root >>> qfiles = quick_files("/path/to/data") >>> df, labels, weights = prepare_from_root(qfiles["tW_DR"], qfiles["ttbar"], "2j2b") """ if weight_scale is not None and weight_mean is not None: raise ValueError("weight_scale and weight_mean cannot be used together") log.info("Preparing a single dataset from ROOT files") log.info("Signal files:") for f in sig_files: log.info(" - %s" % f) log.info("Background files:") for f in bkg_files: log.info(" - %s" % f) if override_selection is not None: selection = override_selection log.info(f"Overriding selection (in region {region}) to {override_selection}") else: selection = selection_for(region) selection = selection_as_numexpr(selection) log.info("Total selection is: '%s'" % selection) if branches is None: log.info("Using features defined by the region") branches = features_for(region) sig_df = iterative_selection( files=sig_files, selection=selection, weight_name="weight_nominal", keep_category="kinematics", branches=branches, exclude_avoids=True, use_campaign_weight=use_campaign_weight, ) bkg_df = iterative_selection( files=bkg_files, selection=selection, weight_name="weight_nominal", keep_category="kinematics", branches=branches, exclude_avoids=True, use_campaign_weight=use_campaign_weight, use_tptrw=use_tptrw, use_trrw=use_trrw, sample_frac=bkg_sample_frac, ) if test_case_size is not None: if test_case_size > 5000: log.warn("why bother with test_case_size > 5000?") sig_df = sig_df.sample(n=test_case_size, random_state=tdub.config.RANDOM_STATE) bkg_df = bkg_df.sample(n=test_case_size, random_state=tdub.config.RANDOM_STATE) w_sig = sig_df.pop("weight_nominal").to_numpy() w_bkg = bkg_df.pop("weight_nominal").to_numpy() w_sig[w_sig < 0] = 0.0 w_bkg[w_bkg < 0] = 0.0 if scale_sum_weights: w_sig *= w_bkg.sum() / w_sig.sum() if "weight_campaign" in sig_df: drop_cols(sig_df, "weight_campaign") if "weight_campaign" in bkg_df: drop_cols(bkg_df, "weight_campaign") sorted_cols = sorted(sig_df.columns.to_list(), key=str.lower) sig_df = sig_df[sorted_cols] bkg_df = bkg_df[sorted_cols] cols = sig_df.columns.to_list() assert cols == bkg_df.columns.to_list(), "sig/bkg columns are different. bad." log.info("Features in prepared dataset:") for c in cols: log.info(" - %s" % c) df = pd.concat([sig_df, bkg_df]) y = np.concatenate([np.ones_like(w_sig), np.zeros_like(w_bkg)]) w = np.concatenate([w_sig, w_bkg]) if weight_scale is not None: w *= weight_scale if weight_mean is not None: w *= weight_mean * len(w) / np.sum(w) df.selection_used = selection return df, y, w
[docs]def persist_prepared_data( out_dir: Union[str, os.PathLike], df: pd.DataFrame, labels: np.ndarray, weights: np.ndarray, ) -> None: """Persist prepared data to disk. The product of :py:func:`tdub.ml_train.prepare_from_root` is easily persistable to disk; this function performs that task. If the same prepared data is going to be used for multiple training executations, one can save CPU cycles by saving the prepared data instead of starting higher upstream with our ROOT ntuples. Parameters ---------- out_dir : str or os.PathLike Directory to save output to. df : pandas.DataFrame Prepared DataFrame object. labels : numpy.ndarray Prepared labels. weights : numpy.ndarray Prepared weights. Examples -------- >>> from tdub.data import quick_files >>> from tdub.train import prepare_from_root, persist_prepared_data >>> qfiles = quick_files("/path/to/data") >>> df, y, w = prepare_from_root(qfiles["tW_DR"], qfiles["ttbar"], "2j2b") >>> persist_prepared_data("/path/to/output/data", df, y, w) """ out_dir = PosixPath(out_dir) out_dir.mkdir(exist_ok=True, parents=True) df.to_hdf(out_dir / "df.h5", "df", mode="w", complevel=0) np.save(out_dir / "labels.npy", labels) np.save(out_dir / "weights.npy", weights) selection_file = PosixPath(out_dir / "selection.txt") selection_file.write_text(f"{df.selection_used}\n")
[docs]def tdub_train_axes( learning_rate: float = 0.1, max_depth: int = 5, min_child_samples: int = 50, num_leaves: int = 31, reg_lambda: float = 0.0, **kwargs, ) -> Dict[str, Any]: """Construct a dictionary of default tdub training tune. Extra keyword arguments are swallowed but never used. Parameters ---------- learning_rate : float Learning rate for a classifier. max_depth : int Max depth for a classifier. min_child_samples : int Min child samples for a classifier. num_leaves : int Num leaves for a classifier. reg_lambda : float Lambda regularation (L2 regularation). Returns ------- dict(str, Any) The argument names and values """ return dict( learning_rate=learning_rate, max_depth=max_depth, min_child_samples=min_child_samples, num_leaves=num_leaves, reg_lambda=reg_lambda, )
[docs]def sklearn_gen_classifier( early_stopping_rounds: int = 10, validation_fraction: float = 0.20, train_axes: Optional[Dict[str, Any]] = None, **clf_params, ) -> BaseEstimator: """Create a classifier using scikit-learn. This uses Scikit-learn's :py:obj:`sklearn.ensemble.HistGradientBoostingClassifier`. The constructor to define early stopping rounds. Extra keyword arguments passed to the classifier initialization Parameters ---------- early_stopping_rounds : int Passed as the `n_iter_no_change` argument to scikit-learn's HistGradientBoostingClassifier. validation_fraction : float Passed to the `validation_fraction` argument in scikit-learn's HistGradientBoostingClassifier. train_axes : dict[str, Any] Values of required tdub training parameters. clf_params : kwargs Extra arguments passed to the constructor. Returns ------- sklearn.ensemble.HistGradientBoostingClassifier The classifier. """ if train_axes is None: train_axes = tdub_train_axes() params = dict( loss="binary_crossentropy", early_stopping=True, verbose=1, n_iter_no_change=early_stopping_rounds, validation_fraction=validation_fraction, max_iter=500, learning_rate=train_axes.get("learning_rate"), max_depth=train_axes.get("max_depth"), min_samples_leaf=train_axes.get("min_child_samples"), max_leaf_nodes=train_axes.get("num_leaves"), l2_regularization=train_axes.get("reg_lambda"), ) for k, v in clf_params.items(): if k not in params.keys(): params[k] = v clf = HistGradientBoostingClassifier(**params) log.info("Prepared sklearn classifier with parameters:") for k, v in clf.get_params().items(): log.info(f"{k:>20} = {v}") return clf
[docs]def sklearn_train_classifier( clf: BaseEstimator, X_train: Any, y_train: Any, w_train: Any, **fit_params, ) -> BaseEstimator: """Train a Scikit-learn classifier. Parameters ---------- clf : sklearn.ensemble.HistGradientBoostingClassifier The classifier X_train : array_like Training events matrix y_train : array_like Training event labels w_train : array_like Training event weights fit_params : kwargs Extra keyword arguments passed to the classifier. Returns ------- sklearn.ensemble.HistGradientBoostingClassifier The same classifier object passed to the function. """ params = {} for k, v in fit_params.items(): if k not in params.keys(): params[k] = v return clf.fit(X_train, y_train, sample_weight=w_train)
[docs]def lgbm_gen_classifier(train_axes: Dict[str, Any] = None, **clf_params) -> BaseEstimator: """Create a classifier using LightGBM. Parameters ---------- train_axes : dict[str, Any] Values of required tdub training parameters. clf_params : kwargs Extra arguments passed to the constructor. Returns ------- lightgbm.LGBMClassifier The classifier. """ import lightgbm as lgbm if train_axes is None: train_axes = tdub_train_axes() params = dict( learning_rate=train_axes.get("learning_rate", 0.1), max_depth=train_axes.get("max_depth", 5), min_child_samples=train_axes.get("min_child_samples", 50), num_leaves=train_axes.get("num_leaves", 50), reg_lambda=train_axes.get("reg_lambda", 0.0), ) for k, v in clf_params.items(): if k not in params.keys(): params[k] = v clf = lgbm.LGBMClassifier(boosting_type="gbdt", n_estimators=500, **params) for k, v in clf.get_params().items(): log.info(f"{k:>20} = {v}") return clf
[docs]def lgbm_train_classifier( clf: BaseEstimator, X_train: Any, y_train: Any, w_train: Any, validation_fraction: float = 0.20, early_stopping_rounds: int = 10, **fit_params, ) -> BaseEstimator: """Train a LGBMClassifier. Parameters ---------- clf : lightgbm.LGBMClassifier The classifier X_train : array_like Training events matrix y_train : array_like Training event labels w_train : array_like Training event weights validation_fraction : float Fraction of training events to use in validation set. early_stopping_rounds : int Number of early stopping rounds to use in training. fit_params : keyword arguments Extra keyword arguments passed to the classifier. Returns ------- lightgbm.LGBMClassifier The same classifier object passed to the function """ X_t, X_v, y_t, y_v, w_t, w_v = train_test_split( X_train, y_train, w_train, test_size=validation_fraction, random_state=tdub.config.RANDOM_STATE, shuffle=True, ) return clf.fit( X_t, y_t, sample_weight=w_t, eval_set=[(X_v, y_v)], eval_metric="auc", eval_sample_weight=[w_v], early_stopping_rounds=early_stopping_rounds, )
def xgb_gen_classifier(train_axes: Dict[str, Any] = None, **clf_params) -> BaseEstimator: """Create a classifier using XGBoost. Parameters ---------- train_axes : dict[str, Any] Values of required tdub training parameters. clf_params : kwargs Extra arguments passed to the constructor. Returns ------- xgboost.XGBClassifier The classifier. """ import xgboost as xgb if train_axes is None: train_axes = tdub_train_axes() params = dict( learning_rate=train_axes.get("learning_rate", 0.1), max_depth=train_axes.get("max_depth", 5), min_child_weight=train_axes.get("min_child_samples", 50), reg_lambda=train_axes.get("reg_lambda", 0.0), ) clf = xgb.XGBClassifier(booster="gbtree", n_estimators=500, **params) for k, v in clf.get_params().items(): log.info(f"{k:>20} = {v}") return clf def xgb_train_classifier( clf: BaseEstimator, X_train: Any, y_train: Any, w_train: Any, validation_fraction: float = 0.20, early_stopping_rounds: int = 10, **fit_params, ) -> BaseEstimator: """Train a XGBClassifier. clf : xgboost.XGBClassifier The classifier X_train : array_like Training events matrix y_train : array_like Training event labels w_train : array_like Training event weights validation_fraction : float Fraction of training events to use in validation set. early_stopping_rounds : int Number of early stopping rounds to use in training. fit_params : keyword arguments Extra keyword arguments passed to the classifier. Returns ------- xgboost.XGBClassifier The same classifier object passed to the function """ X_t, X_v, y_t, y_v, w_t, w_v = train_test_split( X_train, y_train, w_train, test_size=validation_fraction, random_state=tdub.config.RANDOM_STATE, shuffle=True, ) return clf.fit( X_t, y_t, sample_weight=w_t, eval_set=[(X_v, y_v)], eval_metric="auc", sample_weight_eval_set=[w_v], early_stopping_rounds=early_stopping_rounds, )
[docs]def single_training( df: pd.DataFrame, labels: np.ndarray, weights: np.ndarray, train_axes: Dict[str, Any], output_dir: Union[str, os.PathLike], test_size: float = 0.40, early_stopping_rounds: Optional[int] = None, extra_summary_entries: Optional[Dict[str, Any]] = None, use_sklearn: bool = False, use_xgboost: bool = False, save_lgbm_txt: bool = False, ) -> None: """Execute a single training with some parameters. The model and some useful information (mostly plots) are saved to `output_dir`. Parameters ---------- df : pandas.DataFrame Feature matrix in dataframe format labels : numpy.ndarray Event labels (`1` for signal; `0` for background) weights : numpy.ndarray Event weights train_axes : dict(str, Any) Dictionary of parameters defining the tdub train axes. output_dir : str or os.PathLike Directory to save results of training test_size : float Test size for splitting into training and testing sets early_stopping_rounds : int, optional Number of rounds to have no improvement for stopping training. extra_summary_entries : dict, optional Extra entries to save in the JSON output summary. use_sklearn : bool Use Scikit-learn's HistGradientBoostingClassifier. use_xgboost : bool Use XGBoost's XGBClassifier. save_lgbm_txt : bool Save fitted LGBM model to text file (ignored if either ``use_sklearn`` or ``use_xgboost`` is ``True``). Returns ------- SingleTrainingSummary Useful information about the training Examples -------- >>> from tdub.data import quick_files >>> from tdub.train import prepare_from_root, single_round, tdub_train_axes >>> qfiles = quick_files("/path/to/data") >>> df, labels, weights = prepare_from_root(qfiles["tW_DR"], qfiles["ttbar"], "2j2b") >>> train_axes = tdub_train_axes() ... learning_rate=0.05 ... max_depth=5, ... ) >>> single_round( ... df, ... labels, ... weights, ... tdub_train_axes, ... "training_output", ... ) """ use_lgbm = not use_sklearn and not use_xgboost if sum([use_sklearn, use_lgbm, use_xgboost]) != 1: raise RuntimeError("BDT provider not defined properly.") starting_dir = os.getcwd() output_path = PosixPath(output_dir) output_path.mkdir(exist_ok=True, parents=True) log.info("Saving training output to %s" % output_path.resolve()) os.chdir(output_path) X_train, X_test, y_train, y_test, w_train, w_test = train_test_split( df, labels, weights, test_size=test_size, random_state=tdub.config.RANDOM_STATE, shuffle=True, ) log.info(f"test size used: {test_size}") log.info("selection used on the datasets:") log.info(" '%s'" % df.selection_used) log.info("features training with:") for c in X_train.columns: log.info(" - %s" % c) # Create and train model train_axes = tdub_train_axes(**train_axes) if use_sklearn: model = sklearn_gen_classifier( early_stopping_rounds=early_stopping_rounds, validation_fraction=0.33, train_axes=train_axes, ) sklearn_train_classifier(model, X_train, y_train, w_train) elif use_xgboost: model = xgb_gen_classifier(train_axes=train_axes) xgb_train_classifier( model, X_train, y_train, w_train, early_stopping_rounds=early_stopping_rounds ) elif use_lgbm: model = lgbm_gen_classifier(train_axes=train_axes) lgbm_train_classifier( model, X_train, y_train, w_train, early_stopping_rounds=early_stopping_rounds ) # Save the model joblib.dump(model, "model.joblib.gz", compress=("gzip", 3)) if use_lgbm and save_lgbm_txt: model.booster_.save_model("model.txt") # ROC curve fig_roc, ax_roc = plt.subplots(figsize=(4.5, 4)) rcd = plot_roc_curve(model, X_test, y_test, sample_weight=w_test, ax=ax_roc, lw=2) ax_roc.set_ylabel("True postive rate") ax_roc.set_xlabel("False positive rate") ax_roc.grid(color="black", alpha=0.125) ax_roc.legend(loc="lower right") fig_roc.subplots_adjust(bottom=0.125, left=0.15) fig_roc.savefig("roc.pdf") importances_gain = {} importances_split = {} # Plot Importances if use_lgbm: import lightgbm as lgbm fig_imp, (ax_imp_gain, ax_imp_split) = plt.subplots(2, 1) lgbm.plot_importance(model, ax=ax_imp_gain, importance_type="gain", precision=2) lgbm.plot_importance(model, ax=ax_imp_split, importance_type="split", precision=2) ax_imp_gain.set_xlabel("Importance (gain)") ax_imp_split.set_xlabel("Importance (split)") ax_imp_gain.set_title("") ax_imp_split.set_title("") fig_imp.subplots_adjust(left=0.475, top=0.975, bottom=0.09, right=0.925) fig_imp.savefig("imp.pdf") imp_gain = model.booster_.feature_importance(importance_type="gain") imp_split = model.booster_.feature_importance(importance_type="split") feat_name = model.booster_.feature_name() imp_gain = np.array(imp_gain, dtype=np.float64) imp_split = np.array(imp_split, dtype=np.float64) imp_gain *= 1.0 / np.sum(imp_gain) imp_split *= 1.0 / np.sum(imp_split) for n, g, s in zip(feat_name, imp_gain, imp_split): importances_gain[n] = float(g) importances_split[n] = float(s) # Histograms: plot and extract information from them proba_histograms = ResponseHistograms( "proba", model, X_train, X_test, y_train, y_test, w_train, w_test ) if use_sklearn: pred_histograms = ResponseHistograms( "decision_function", model, X_train, X_test, y_train, y_test, w_train, w_test ) elif use_xgboost: pred_histograms = ResponseHistograms( "predict", model, X_train, X_test, y_train, y_test, w_train, w_test ) elif use_lgbm: pred_histograms = ResponseHistograms( "predict", model, X_train, X_test, y_train, y_test, w_train, w_test ) fig_pred, ax_pred = pred_histograms.draw() fig_proba, ax_proba = proba_histograms.draw() fig_pred.subplots_adjust(bottom=0.125, left=0.15) fig_pred.savefig("pred.pdf") fig_proba.subplots_adjust(bottom=0.125, left=0.15) fig_proba.savefig("proba.pdf") sts = SingleTrainingSummary( auc=float(rcd.roc_auc), ks_test_sig=proba_histograms.ks_sig_test, ks_pvalue_sig=proba_histograms.ks_sig_pval, ks_test_bkg=proba_histograms.ks_bkg_test, ks_pvalue_bkg=proba_histograms.ks_bkg_pval, ) # JSON Summary summary = {"auc": round(rcd.roc_auc, 5)} summary["selection_used"] = df.selection_used summary["bad_ks"] = sts.bad_ks summary["ks_test_sig"] = sts.ks_test_sig summary["ks_test_bkg"] = sts.ks_test_bkg summary["ks_pvalue_sig"] = sts.ks_pvalue_sig summary["ks_pvalue_bkg"] = sts.ks_pvalue_bkg summary["features"] = [c for c in df.columns] summary["set_params"] = train_axes summary["all_params"] = model.get_params() summary["best_iteration"] = -1 summary["importances_gain"] = importances_gain summary["importances_split"] = importances_split if early_stopping_rounds is not None: if hasattr(model, "n_iter_"): summary["best_iteration"] = int(model.n_iter_) elif hasattr(model, "best_iteration"): summary["best_iteration"] = int(model.best_iteration) elif hasattr(model, "best_iteration_"): summary["best_iteration"] = int(model.best_iteration_) else: log.warn("best iteration undetected") if extra_summary_entries is not None: for k, v in extra_summary_entries.items(): summary[k] = v with open("summary.json", "w") as f: json.dump(summary, f, indent=4) # Finish by going back to starting directory os.chdir(starting_dir)
[docs]def folded_training( df: pd.DataFrame, labels: np.ndarray, weights: np.ndarray, params: Dict[str, Any], fit_kw: Dict[str, Any], output_dir: Union[str, os.PathLike], region: str, kfold_kw: Dict[str, Any] = None, ) -> float: """Execute a folded training. Train a :obj:`lightgbm.LGBMClassifier` model using :math:`k`-fold cross validation using the given input data and parameters. The models resulting from the training (and other important training information) are saved to ``output_dir``. The entries in the ``kfold_kw`` argument are forwarded to the :obj:`sklearn.model_selection.KFold` class for data preprocessing. The default arguments that we use are (random_state is controlled by the tdub.config module): - ``n_splits``: 3 - ``shuffle``: ``True`` Parameters ---------- df : pandas.DataFrame Feature matrix in dataframe format labels : numpy.ndarray Event labels (``1`` for signal; ``0`` for background) weights : :obj:`numpy.ndarray` Event weights params : dict(str, Any) Dictionary of :obj:`lightgbm.LGBMClassifier` parameters fit_kw : dict(str, Any) Dictionary of arguments forwarded to :py:func:`lightgbm.LGBMClassifier.fit`. output_dir : str or os.PathLike Directory to save results of training region : str String representing the region kfold_kw : optional dict(str, Any) Arguments passed to :obj:`sklearn.model_selection.KFold` Returns ------- float Negative mean area under the ROC curve (AUC) Examples -------- >>> from tdub.data import quick_files >>> from tdub.train import prepare_from_root >>> from tdub.train import folded_training >>> qfiles = quick_files("/path/to/data") >>> df, labels, weights = prepare_from_root(qfiles["tW_DR"], qfiles["ttbar"], "2j2b") >>> params = dict( ... boosting_type="gbdt", ... num_leaves=42, ... learning_rate=0.05 ... reg_alpha=0.2, ... reg_lambda=0.8, ... max_depth=5, ... ) >>> folded_training( ... df, ... labels, ... weights, ... params, ... {"verbose": 20}, ... "/path/to/train/output", ... "2j2b", ... kfold_kw={"n_splits": 5, "shuffle": True}, ... ) """ import lightgbm as lgbm starting_dir = os.getcwd() output_path = PosixPath(output_dir) output_path.mkdir(exist_ok=True, parents=True) os.chdir(output_path) log.info("selection used on the datasets:") log.info(" '%s'" % df.selection_used) log.info("features training with:") for c in df.columns: log.info(" - %s" % c) log.info("model parameters:") for k, v in params.items(): if v is not None: log.info(f"{k:>20} | {v:<12}") log.info("saving output to %s" % output_path.resolve()) fig_proba_hists, ax_proba_hists = plt.subplots() fig_pred_hists, ax_pred_hists = plt.subplots() fig_rocs, ax_rocs = plt.subplots() tprs = [] aucs = [] importances = np.zeros(len(df.columns)) mean_fpr = np.linspace(0, 1, 100) folder = KFold(**kfold_kw, random_state=tdub.config.RANDOM_STATE) fold_number = 0 nfits = 0 for train_idx, test_idx in folder.split(df): # fmt: off X_train, X_test = df.iloc[train_idx], df.iloc[test_idx] y_train, y_test = labels[train_idx], labels[test_idx] w_train, w_test = weights[train_idx], weights[test_idx] validation_data = [(X_test, y_test)] validation_w = w_test # n_sig = y_train[y_train == 1].shape[0] # n_bkg = y_train[y_train == 0].shape[0] # scale_pos_weight = n_bkg / n_sig # log.info(f"n_bkg / n_sig = {n_bkg} / {n_sig} = {scale_pos_weight}") # params["scale_pos_weight"] = scale_pos_weight model = lgbm.LGBMClassifier(**params) fitted_model = model.fit( X_train, y_train, sample_weight=w_train, eval_set=validation_data, eval_metric="auc", eval_sample_weight=[validation_w], **fit_kw, ) joblib.dump(fitted_model, f"model_fold{fold_number}.joblib.gz", compress=("gzip", 3)) nfits += 1 importances += fitted_model.feature_importances_ fold_fig_proba, fold_ax_proba = plt.subplots() fold_fig_pred, fold_ax_pred = plt.subplots() test_proba = fitted_model.predict_proba(X_test)[:, 1] train_proba = fitted_model.predict_proba(X_train)[:, 1] test_pred = fitted_model.predict(X_test, raw_score=True) train_pred = fitted_model.predict(X_train, raw_score=True) proba_sig_test = test_proba[y_test == 1] proba_bkg_test = test_proba[y_test == 0] proba_sig_train = train_proba[y_train == 1] proba_bkg_train = train_proba[y_train == 0] pred_sig_test = test_pred[y_test == 1] pred_bkg_test = test_pred[y_test == 0] pred_sig_train = train_pred[y_train == 1] pred_bkg_train = train_pred[y_train == 0] w_sig_test = w_test[y_test == 1] w_bkg_test = w_test[y_test == 0] w_sig_train = w_train[y_train == 1] w_bkg_train = w_train[y_train == 0] proba_bins = np.linspace(0, 1, 31) proba_bc = bin_centers(proba_bins) predxmin = min(pred_bkg_test.min(), pred_bkg_train.min()) predxmax = max(pred_sig_test.max(), pred_sig_train.max()) pred_bins = np.linspace(predxmin, predxmax, 31) # Axis with all folds (proba histograms) ax_proba_hists.hist(proba_sig_test, bins=proba_bins, label=f"F{fold_number} Sig. (test)", density=True, histtype="step", weights=w_sig_test) ax_proba_hists.hist(proba_bkg_test, bins=proba_bins, label=f"F{fold_number} Bkg. (test)", density=True, histtype="step", weights=w_bkg_test) ax_proba_hists.hist(proba_sig_train, bins=proba_bins, label=f"F{fold_number} Sig. (train)", density=True, histtype="step", weights=w_sig_train) ax_proba_hists.hist(proba_bkg_train, bins=proba_bins, label=f"F{fold_number} Bkg. (train)", density=True, histtype="step", weights=w_bkg_train) # Axis specific to the fold (proba histograms) fold_ax_proba.hist(proba_sig_train, bins=proba_bins, label=f"F{fold_number} Sig. (train)", density=True, weights=w_sig_train, histtype="stepfilled", color="C0", edgecolor="C0", alpha=0.5, linewidth=1) fold_ax_proba.hist(proba_bkg_train, bins=proba_bins, label=f"F{fold_number} Bkg. (train)", density=True, weights=w_bkg_train, histtype="stepfilled", color="C3", edgecolor="C3", alpha=0.4, linewidth=1) train_h_sig = histogram(proba_sig_test, bins=proba_bins, weights=w_sig_test, flow=False, density=True) train_h_bkg = histogram(proba_bkg_test, bins=proba_bins, weights=w_bkg_test, flow=False, density=True) fold_ax_proba.errorbar(proba_bc, train_h_sig[0], yerr=train_h_sig[1], markersize=4, color="C0", fmt="o", label=f"F{fold_number} Sig. (test)") fold_ax_proba.errorbar(proba_bc, train_h_bkg[0], yerr=train_h_bkg[1], markersize=4, color="C3", fmt="o", label=f"F{fold_number} Bkg. (test)") fold_ax_proba.set_ylim([0, 1.5 * fold_ax_proba.get_ylim()[1]]) # Axis with all ax_pred_hists.hist(pred_sig_test, bins=pred_bins, label=f"F{fold_number} Sig. (test)", density=True, histtype="step", weights=w_sig_test) ax_pred_hists.hist(pred_bkg_test, bins=pred_bins, label=f"F{fold_number} Bkg. (test)", density=True, histtype="step", weights=w_bkg_test) ax_pred_hists.hist(pred_sig_train, bins=pred_bins, label=f"F{fold_number} Sig. (train)", density=True, histtype="step", weights=w_sig_train) ax_pred_hists.hist(pred_bkg_train, bins=pred_bins, label=f"F{fold_number} Bkg. (train)", density=True, histtype="step", weights=w_bkg_train) fold_ax_pred.hist(pred_sig_test, bins=pred_bins, label=f"F{fold_number} Sig. (test)", density=True, histtype="step", weights=w_sig_test) fold_ax_pred.hist(pred_bkg_test, bins=pred_bins, label=f"F{fold_number} Bkg. (test)", density=True, histtype="step", weights=w_bkg_test) fold_ax_pred.hist(pred_sig_train, bins=pred_bins, label=f"F{fold_number} Sig. (train)", density=True, histtype="step", weights=w_sig_train) fold_ax_pred.hist(pred_bkg_train, bins=pred_bins, label=f"F{fold_number} Bkg. (train)", density=True, histtype="step", weights=w_bkg_train) fold_ax_pred.set_ylim([0, 1.5 * fold_ax_pred.get_ylim()[1]]) fpr, tpr, thresholds = roc_curve(y_test, test_proba, sample_weight=w_test) tprs.append(interp(mean_fpr, fpr, tpr)) tprs[-1][0] = 0.0 roc_auc = auc(fpr, tpr) aucs.append(roc_auc) ax_rocs.plot(fpr, tpr, lw=1, alpha=0.45, label=f"fold {fold_number}, AUC = {roc_auc:0.3}") fold_ax_proba.set_ylabel("Arb. Units") fold_ax_proba.set_xlabel("Classifier Output") fold_ax_proba.legend(ncol=2, loc="upper center") fold_ax_pred.set_ylabel("Arb. Units") fold_ax_pred.set_xlabel("Classifier Output") fold_ax_pred.legend(ncol=2, loc="upper center") # fold_fig_proba.subplots_adjust(**_fig_adjustment_dict) fold_fig_proba.savefig(f"fold{fold_number}_histograms_proba.pdf") # fold_fig_pred.subplots_adjust(**_fig_adjustment_dict) fold_fig_pred.savefig(f"fold{fold_number}_histograms_pred.pdf") plt.close(fold_fig_proba) plt.close(fold_fig_pred) # fmt: on fold_number += 1 relative_importances = importances / nfits relative_importances = relative_importances / relative_importances.sum() mean_tpr = np.mean(tprs, axis=0) mean_tpr[-1] = 1.0 mean_auc = auc(mean_fpr, mean_tpr) std_auc = np.std(aucs) ax_rocs.plot( mean_fpr, mean_tpr, color="b", label=f"AUC = {mean_auc:0.2} $\\pm$ {std_auc:0.2}", lw=2, alpha=0.8, ) ax_rocs.set_xlabel("False Positive Rate") ax_rocs.set_ylabel("True Positive Rate") ax_proba_hists.set_ylabel("Arb. Units") ax_proba_hists.set_xlabel("Classifier Output") ax_proba_hists.legend(ncol=3, loc="upper center", fontsize="small") ax_proba_hists.set_ylim([0, 1.5 * ax_proba_hists.get_ylim()[1]]) # fig_proba_hists.subplots_adjust(**_fig_adjustment_dict) fig_proba_hists.savefig("histograms_proba.pdf") ax_pred_hists.set_ylabel("Arb. Units") ax_pred_hists.set_xlabel("Classifier Output") ax_pred_hists.legend(ncol=3, loc="upper center", fontsize="small") ax_pred_hists.set_ylim([0, 1.5 * ax_pred_hists.get_ylim()[1]]) # fig_pred_hists.subplots_adjust(**_fig_adjustment_dict) fig_pred_hists.savefig("histograms_pred.pdf") ax_rocs.grid(color="black", alpha=0.15) ax_rocs.legend(ncol=2, loc="lower right") # fig_rocs.subplots_adjust(**_fig_adjustment_dict) fig_rocs.savefig("roc.pdf") summary: Dict[str, Any] = {} summary["selection_used"] = df.selection_used summary["region"] = region summary["features"] = [str(c) for c in df.columns] summary["importances"] = list(relative_importances) summary["kfold"] = kfold_kw summary["roc"] = { "auc": mean_auc, "std": std_auc, "mean_fpr": list(mean_fpr), "mean_tpr": list(mean_tpr), } with open("summary.json", "w") as f: json.dump(summary, f, indent=4) os.chdir(starting_dir) negative_roc_score = -1.0 * np.mean(aucs) return negative_roc_score
_best_fit = None _best_auc = None _best_parameters = None _best_paramdict = None _ifit = None def gp_minimize_auc( data_dir: str, region: Union[Region, str], nlo_method: str, output_dir: Union[str, os.PathLike] = "_unnamed_optimization", n_calls: int = 15, esr: Optional[int] = 10, ): """Minimize AUC using Gaussian processes. This is our hyperparameter optimization procedure which uses the :py:func:`skopt.gp_minimize` functions from Scikit-Optimize. Parameters ---------- data_dir : str Path containing ROOT files region : Region or str Rregion where we're going to perform the training nlo_method : str Which tW NLO method sample ('DR' or 'DS' or 'Both') output_dir : str or os.PathLike Path to save optimization output n_calls : int Number of times to train during the minimization procedure esr : int, optional Early stopping rounds for fitting the model random_state: int Random state for splitting data into training/testing Examples -------- >>> from tdub.data import Region >>> from tdub.train import prepare_from_root, gp_minimize_auc >>> gp_minimize_auc("/path/to/data", Region.r2j1b, "DS", "opt_DS_2j1b") >>> gp_minimize_auc("/path/to/data", Region.r2j1b, "DR", "opt_DR_2j1b") """ from skopt.utils import use_named_args from skopt.space import Real, Integer from skopt.plots import plot_convergence from skopt import gp_minimize import lightgbm as lgbm qfiles = quick_files(f"{data_dir}") if nlo_method == "DR": tW_files = qfiles["tW_DR"] elif nlo_method == "DS": tW_files = qfiles["tW_DS"] elif nlo_method == "Both": tW_files = qfiles["tW_DR"] + qfiles["tW_DS"] tW_files.sort() else: raise ValueError("nlo_method must be 'DR' or 'DS' or 'Both'") df, labels, weights = prepare_from_root(tW_files, qfiles["ttbar"], region) X_train, X_test, y_train, y_test, w_train, w_test = train_test_split( df, labels, weights, train_size=0.333, random_state=tdub.config.RANDOM_STATE, shuffle=True, ) validation_data = [(X_test, y_test)] validation_w = w_test # n_sig = y_train[y_train == 1].shape[0] # n_bkg = y_train[y_train == 0].shape[0] # scale_pos_weight = n_bkg / n_sig # sample_size = n_bkg + n_sig # log.info(f"n_bkg / n_sig = {n_bkg} / {n_sig} = {scale_pos_weight}") dimensions = [ Real(low=0.01, high=0.2, prior="log-uniform", name="learning_rate"), Integer(low=40, high=250, name="num_leaves"), Integer(low=20, high=250, name="min_child_samples"), Integer(low=3, high=10, name="max_depth"), ] default_parameters = [0.1, 100, 50, 5] run_from_dir = os.getcwd() save_dir = PosixPath(output_dir) save_dir.mkdir(exist_ok=True, parents=True) os.chdir(save_dir) global _best_fit global _best_auc global _best_parameters global _best_paramdict global _ifit _best_fit = 0 _best_auc = 0.0 _best_parameters = [{"teste": 1}] _best_paramdict = {} _ifit = 0 @use_named_args(dimensions=dimensions) def afit( learning_rate, num_leaves, min_child_samples, max_depth, ): global _ifit global _best_fit global _best_auc global _best_parameters global _best_paramdict log.info(f"on iteration {_ifit} out of {n_calls}") log.info(f"learning_rate: {learning_rate}") log.info(f"num_leaves: {num_leaves}") log.info(f"min_child_samples: {min_child_samples}") log.info(f"max_depth: {max_depth}") curdir = os.getcwd() p = PosixPath(f"training_{_ifit}") p.mkdir(exist_ok=False) os.chdir(p.resolve()) with open("features.txt", "w") as f: for c in df.columns: print(c, file=f) model = lgbm.LGBMClassifier( boosting_type="gbdt", learning_rate=learning_rate, num_leaves=num_leaves, min_child_samples=min_child_samples, max_depth=max_depth, n_estimators=500, ) fitted_model = model.fit( X_train, y_train, sample_weight=w_train, eval_set=validation_data, eval_metric="auc", verbose=20, early_stopping_rounds=esr, eval_sample_weight=[validation_w], ) pred = fitted_model.predict_proba(X_test)[:, 1] score = roc_auc_score(y_test, pred, sample_weight=w_test) train_pred = fitted_model.predict_proba(X_train)[:, 1] fig, ax = plt.subplots() bins = np.linspace(0, 1, 31) ax.hist( train_pred[y_train == 0], bins=bins, label="Bkg. (train)", density=True, histtype="step", weights=w_train[y_train == 0], ) ax.hist( train_pred[y_train == 1], bins=bins, label="Sig. (train)", density=True, histtype="step", weights=w_train[y_train == 1], ) ax.hist( pred[y_test == 0], bins=bins, label="Bkg. (test)", density=True, histtype="step", weights=w_test[y_test == 0], ) ax.hist( pred[y_test == 1], bins=bins, label="Sig. (test)", density=True, histtype="step", weights=w_test[y_test == 1], ) ax.set_ylim([0, 1.5 * ax.get_ylim()[1]]) ax.legend(ncol=2, loc="upper center") # fig.subplots_adjust(**_fig_adjustment_dict) fig.savefig("histograms.pdf") plt.close(fig) binning_sig = np.linspace(0, 1, 31) binning_bkg = np.linspace(0, 1, 31) h_sig_test, err_sig_test = histogram( pred[y_test == 1], bins=binning_sig, weights=w_test[y_test == 1] ) h_sig_train, err_sig_train = histogram( train_pred[y_train == 1], bins=binning_sig, weights=w_train[y_train == 1] ) h_bkg_test, err_bkg_test = histogram( pred[y_test == 0], bins=binning_bkg, weights=w_test[y_test == 0] ) h_bkg_train, err_bkg_train = histogram( train_pred[y_train == 0], bins=binning_bkg, weights=w_train[y_train == 0] ) ks_statistic_sig, ks_pvalue_sig = ks_twosample_binned( h_sig_test, h_sig_train, err_sig_test, err_sig_train ) ks_statistic_bkg, ks_pvalue_bkg = ks_twosample_binned( h_bkg_test, h_bkg_train, err_bkg_test, err_bkg_train ) if ks_pvalue_sig < 0.1 or ks_pvalue_bkg < 0.1: score = score * 0.9 log.info(f"ksp sig: {ks_pvalue_sig}") log.info(f"ksp bkg: {ks_pvalue_bkg}") curp = pformat(model.get_params()) curp = eval(curp) with open("params.json", "w") as f: json.dump(curp, f, indent=4) with open("auc.txt", "w") as f: print(f"{score}", file=f) with open("ks.txt", "w") as f: print(f"sig: {ks_pvalue_sig}", file=f) print(f"bkg: {ks_pvalue_bkg}", file=f) os.chdir(curdir) if score > _best_auc: _best_parameters[0] = model.get_params() _best_auc = score _best_fit = _ifit _best_paramdict = curp _ifit += 1 del model return -score search_result = gp_minimize( func=afit, dimensions=dimensions, acq_func="gp_hedge", n_calls=n_calls, x0=default_parameters, ) summary = { "region": region, "nlo_method": nlo_method, "features": list(df.columns), "best_iteration": _best_fit, "best_auc": _best_auc, "best_params": _best_paramdict, } with open("summary.json", "w") as f: json.dump(summary, f, indent=4) fig, ax = plt.subplots() plot_convergence(search_result, ax=ax) # fig.subplots_adjust(**_fig_adjustment_dict) fig.savefig("gpmin_convergence.pdf") os.chdir(run_from_dir) return 0 def load_prepped(datadir: PosixPath) -> Tuple[pd.DataFrame, np.ndarray, np.ndarray]: """Load prepped training data. Parameters ---------- datadir : pathlib.PosixPath Path to the preppared output. Returns ------- :py:obj:`pandas.DataFrame` DataFrame containing training variables :py:obj:`numpy.ndarray` Labels for each event. :py:obj:`numpy.ndarray` Weights for each event. """ df = pd.read_hdf(datadir / "df.h5", "df") y = np.load(datadir / "labels.npy") w = np.load(datadir / "weights.npy") return df, y, w def var_and_binning_for_region( df: pd.DataFrame, region: Union[str, Region], meta_table: Any ) -> Any: """Create list of training variables associated with a region. Parameters ---------- df : pandas.DataFrame DataFrame containg training variables region : str String representation of region. meta_table : dict(str, Any) Variable meta data table. Returns ------- list List of variables with associated region and binning. """ results = [] cols = list(df.columns) for entry in meta_table["regions"][f"r{str(Region.from_str(str(region)))}"]: if entry["var"] in cols: results.append( (entry["var"], region, (entry["nbins"], entry["xmin"], entry["xmax"])) ) return results