Source code for geno4sd.ml_tools.rubricoe.feature_ranking

from .helper_funcs import save_to_pickle, save_to_csv
import multiprocessing as mp
from sklearn.model_selection import  train_test_split
from sklearn.metrics import confusion_matrix, make_scorer
import numpy
import time
import os


X = None
y = None


def _tn(y_true, y_pred): return confusion_matrix(y_true, y_pred)[0, 0]
def _fp(y_true, y_pred): return confusion_matrix(y_true, y_pred)[0, 1]
def _fn(y_true, y_pred): return confusion_matrix(y_true, y_pred)[1, 0]
def _tp(y_true, y_pred): return confusion_matrix(y_true, y_pred)[1, 1]


def _tpr(y_true, y_pred):
    return _tp(y_true, y_pred)/(_tp(y_true, y_pred)+_fn(y_true, y_pred))


def _tnr(y_true, y_pred):
    return _tn(y_true, y_pred)/(_tn(y_true, y_pred)+_fp(y_true, y_pred))


scoring = {'tp': make_scorer(_tp), 'tn': make_scorer(_tn),
           'fp': make_scorer(_fp), 'fn': make_scorer(_fn)}


[docs]def youden_index(y_true, y_pred): return _tpr(y_true, y_pred) + _tnr(y_true, y_pred) - 1
def _rank_features_job(test_size, stratify_by, clf, fold_number, details_files, details_files_path, seed): """ Worker function that ranks feature indices of X, according to clf, from higher to lower importance. Used by the main rank_features function. The final result is obtained through fitting on multiple data subsamplings, as many as number_of_folds. Parameters ---------- test_size: float Proportion of samples (rows of X0) to be used for testing. fold_number: int Index of current data split that the worker is assigned to. clf Estimator used to produce scores. Expected to have an attribute `coef_` and `fit` and `predict` methods. details_files: bool Flag to enable or disable writing detailed population splits information to files. details_files_path: str, optional Path where output files will be written to. Not optional if `details_files` is `True`. seed: int, optional Seed value used for reproducibility. If not specified, no reproducibility is enforced. """ global X global y idx_train, _ = train_test_split( range(X.shape[0]), test_size=test_size, stratify=stratify_by, random_state=seed) X_train = X[idx_train] y_train = y[idx_train] clf.fit(X_train, y_train) result = clf.coef_.ravel() / numpy.max(numpy.abs(clf.coef_.ravel())) if details_files: save_to_csv(idx_train, details_files_path, "ranking_fold{}_train_indices.csv".format(fold_number)) save_to_pickle(clf, details_files_path, "ranking_fold{}_ranking_object.pickle".format(fold_number)) return result, fold_number
[docs]def rank_features(clf, X0, y0, test_size=.2, number_of_folds=5, verbose=0, return_ranking_coefs=False, n_jobs=3, details_files=False, details_files_path=None, seed=None): """ Returns ranked feature indices of X, according to clf, from higher to lower importance. The final result is obtained through fitting on multiple data subsamplings, as many as number_of_folds. Parameters ---------- clf: Estimator used to produce scores. Expected to have an attribute `coef_` and `fit` and `predict` methods. X0: array-like Dataset with rows observations and columns features/variables. y0: array-like Targets of X0. test_size: float, default .2 Proportion of samples (rows of X0) to be used for testing. number_of_folds: int, default 5 Number of splits for cross-validation purposes. n_jobs: int, default 1 Number of workers used by the multiprocessing pool. verbose: int, default 0 Level of stdout verbosity. n_features: int, optional Number of features of the dataset to use. If not specified, all features are used. details_files: bool, default False Flag to enable or disable writing detailed population splits information to files. details_files_path: str, optional Path where output files will be written to. Not optional if `details_files` is `True`. seed: int, optional Seed value used for reproducibility. If not specified, no reproducibility is enforced. """ if verbose > 0: print("Ranking features...") global X global y X = X0 y = y0 influences = [None]*number_of_folds if seed is None: random_generator = numpy.random.RandomState() else: random_generator = numpy.random.RandomState(seed) fold_seeds = random_generator.randint(2**32, size=number_of_folds) def errors(result): print(result) def gather_influences(result): nonlocal influences x, fold_number = result influences[fold_number] = x if verbose > 0: print("Gathered fold {} of {}.".format( fold_number, number_of_folds), end="\n") params = [(test_size, y, clf, i, details_files, details_files_path, fold_seeds[i]) for i in range(number_of_folds)] pool = mp.Pool(n_jobs) for x in params: pool.apply_async(_rank_features_job, args=x, callback=gather_influences, error_callback=errors) pool.close() pool.join() influences = numpy.array(influences) feature_coefs = numpy.sum(numpy.abs(influences), axis=0) sorted_features_indices = numpy.argsort(feature_coefs)[::-1] if return_ranking_coefs: return sorted_features_indices, numpy.sort(feature_coefs)[::-1]/number_of_folds return sorted_features_indices
def _compute_score(index, upper_bound): global X global y results = [] start_time = time.time() for ix_train, ix_test, y_train, y_test in splits: X_train, X_test = X[ix_train, :upper_bound], X[ix_test, :upper_bound] clf.fit(X_train, y_train) y_pred = clf.predict(X_test) results.append(score_function(y_test, y_pred)) return index, numpy.mean(results), time.time() - start_time, os.getpid() def _init_compute(clf_, splits_, details_files_, details_files_path_, score_function_=youden_index): """ Initializer for multiprocessing workers, we use the fact that labels y are the same for all workers. Similarly for classifiers, test size and scoring function. """ global clf global splits global score_function global details_files global details_files_path clf = clf_ splits = splits_ score_function = score_function_ details_files = details_files_ details_files_path = details_files_path_
[docs]def score_curve(clf, X0, y0, step_size=10, test_size=.2, number_of_folds=5, n_jobs=1, score_function=youden_index, verbose=0, n_features=None, adaptive_features=False, tolerance_steps=10, window_size=10, details_files=False, details_files_path=None, seed=None ): """ Computes a score for a sequence of subsets of features. Computes a sequence of pairs (x_i, y_i) where y_i is the score of clf on a test set after fitting a training subset of X[:x_i], as many subsets as number_of_folds (these subsets are fixed once selected). The x_i are generated as a range from x_i to the total number of features in X, increasing by step_size. Parameters ---------- clf: Estimator used to produce scores. Expected to have an attribute `coef_` and `fit` and `predict` methods. X0: Dataset with rows observations and columns features/variables. y0: Targets of X0. n_features: int, optional If specified, upper bound of ranked features used to generate score curves. Otherwise use all features. step_size: int, default 10 Number of features to be added at each step of the curve. test_size: float, default .2 Proportion of samples (rows of X0) to be used for testing. number_of_folds: int, default 5 Number of splits for cross-validation purposes. n_jobs: int, default 1 Number of workers used by the multiprocessing pool. score_function: Callable, default youden_index Callable that computes a score. Assumed to receive parameters `(y_true, y_pred)` and return a float. verbose: int, default 0 Level of stdout verbosity. n_features: int, optional Number of features of the dataset to use. If not specified, all features are used. adaptive_features: bool, default False Flag to enable or disable early stopping. tolerance_steps: int, default 10 Steps to wait while observing a downwards trend befor early stopping. window_size: int, 10 Size of rolling average window used in early stopping. details_files: bool, default False Flag to enable or disable writing detailed population splits information to files. details_files_path: str, optional Path where output files will be written to. Not optional if `details_files` is `True`. seed: int, optional Seed value used for reproducibility. If not specified, no reproducibility is enforced. """ if verbose > 0: print("Computing score curve...") global X global y X = X0 y = y0 if n_features is None: n_features = X.shape[1] if seed is None: random_generator = numpy.random.RandomState() else: random_generator = numpy.random.RandomState(seed) fold_seeds = random_generator.randint(2**32, size=number_of_folds) bounds = [x for x in range(step_size, n_features+1, step_size)] ix = range(len(X)) splits = [train_test_split(ix, y, stratify=y, test_size=test_size, random_state=fold_seeds[i]) for i in range(number_of_folds)] if details_files: for i, (idx_train, idx_test, _, _) in enumerate(splits): save_to_csv(idx_train, details_files_path, "scoring_fold{}_train_indices.csv".format(i)) save_to_csv(idx_test, details_files_path, "scoring_fold{}_test_indices.csv".format(i)) if n_features not in bounds: bounds.append(n_features) params = [(i, x) for i, x in enumerate(bounds)] sorted_scores = [] previous_average = -1 rolling_average_score_values = [] tolerance = tolerance_steps previous_ucb = 0 def upper_continuous_bound(l): def gen(): i = 0 while True: yield i i += 1 return next(x for x in gen() if x not in l) def add_to_list_and_check(result): nonlocal sorted_scores nonlocal adaptive_features nonlocal verbose nonlocal tolerance nonlocal previous_average nonlocal previous_ucb end_time = result[2] to_append = [result[0], result[1]] sorted_scores.append(to_append) sorted_scores = sorted(sorted_scores, key=lambda x: x[0]) if adaptive_features: ucb = upper_continuous_bound([x[0] for x in sorted_scores]) if previous_ucb != ucb: current_average = numpy.mean( [x[1] for x in sorted_scores[:ucb][-window_size:]]) rolling_average_score_values.append(current_average) if previous_average > current_average: tolerance -= 1 else: tolerance = tolerance_steps if not tolerance: if verbose > 0: print("Early stopping at number of features:", len(sorted_scores)*step_size, ".") pool.terminate() previous_average = current_average previous_ucb = ucb if verbose > 0: print( f"Point {len(sorted_scores)} of {len(bounds)}. Took {end_time} seconds at process {result[3]}.", end="\n", flush=True) def errors(result): print(result) pool = mp.Pool(n_jobs, initializer=_init_compute, initargs=(clf, splits, details_files, details_files_path, score_function), maxtasksperchild=None) for x in params: pool.apply_async(_compute_score, args=x, callback=add_to_list_and_check, error_callback=errors) pool.close() pool.join() score_values = [x[1] for x in sorted_scores] curve = list(zip(bounds, score_values)) return curve