from datetime import datetime
import random, string
import os
from . import feature_ranking
from . import lreb
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
import numpy,time
from .helper_funcs import save_to_pickle, save_to_csv
[docs]def compute_curves(X, y,
iterations,
curve_steps,
validation_size,
clf_ranking,
clf_scoring,
n_features=None,
ranking_test_size = .2,
ranking_number_of_folds = 5,
return_ranking_coefs = False,
scoring_test_size = .2,
scoring_number_of_folds = 5,
score_function = feature_ranking.youden_index,
scoring_n_jobs = 1,
ranking_n_jobs = 1,
scoring_adaptive_features = False,
scoring_tolerance_steps = 10,
scoring_window_size = 10,
verbose = 0,
details_files = True,
details_files_parent_path = "./",
seed = None):
"""
Splits data into *working* and *validation*, then computes as many score curves as specified in the `iterations` parameter using
only *validation* data. Each iteration has its own set of random splits for ranking and random splits for scoring. Returns
a list with ranked features and scores for each iteration, along with indices for samples in *working* and *validation* data. If
`return_ranking_coefs` is `True`, it also returns the average rankings for each feature.
Parameters:
-----------
X:
2D array with rows observations and columns variables.
y:
1D array with integer labels (0 or 1) for each row of X.
iterations:
Number of repetitions for the inner RubricOE loop.
validation_size:
Proportion of observations that will be held out during the entire procedure for validation.
clf_ranking:
Function that will be used for ranking features in the inner loop. An example is lreb.LinRidgeRegSVD().
Generally a function with a `.fit(data,labels)` method with a `coef_` attribute will work.
clf_scoring:
Function that will be used for training and testing computations in the inner loop on subsets of features.
An example is sklearn's SVC(). Generally a function with `.fit(train_data,labels)` and `.predict(test_data)`
methods will work.
n_features:
If specified, upper bound of ranked features used to generate score curves. Note: All features are still used for feature ranking,
only score curves are affected.
ranking_test_size:
Proportion of observations in working data that will be randomly discarded in each fold when training the ranking procedure.
ranking_number_of_folds:
Number of repetitions of training the ranking procedure.
return_ranking_coefs:
Flag to specify if should return actual rankings of features.
scoring_test_size:
Proportion of observations in working data that will be used in each fold for testing performance of `clf_scoring`.
scoring_number_of_folds:
Number of repetitions of training the ranking procedure.
score_function:
Function that will be used to generate values in the score curve. By default it uses the Youden Index, but more generally
a function of the form score(true,pred) will work
score_n_jobs:
Number of jobs that will be spawned to compute curve points in parallel.
verbose:
Verbosity level. 0 disables all messages. Greater than 0 prints messages.
Returns:
--------
ranked_features_list, curve_list, idx_working, idx_validation
A tuple with, in order:
* a list with lists of ranked feature indices from higher to lower importance (one per iteration),
* a list with lists of scores obtained by the classifier when increasing the number of features selected (one per iteration),
* an array with the indices of the observations corresponding to the working set
* an array with the indices of the observations corresponding to the validation set
If the flag `return_ranking_coefs` is set to `True`, the return tuple is
ranked_features_list, ranking_coefs, curve_list, idx_working, idx_validation
With, in order:
* a list with lists of ranked feature indices from higher to lower importance (one per iteration),
* a list with lists of rankings of the features from higher to lower importance (one per iteration),
* a list with lists of scores obtained by the classifier when increasing the number of features selected (one per iteration),
* an array with the indices of the observations corresponding to the working set
* an array with the indices of the observations corresponding to the validation set
"""
unique_id = "RUBRICOE_" + datetime.now().isoformat().replace("-","_").replace(":","_").replace(".","_").replace("T","_") + \
''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(8))
if details_files:
details_files_path = os.path.join(details_files_parent_path, unique_id)
os.mkdir(details_files_path)
idx_working, idx_validation = train_test_split(range(X.shape[0]),test_size=validation_size,stratify=y,random_state=seed)
if details_files:
save_to_csv(idx_working, details_files_path, "indices_working.csv")
save_to_csv(idx_validation, details_files_path, "indices_validation.csv")
X_working = X[idx_working]
y_working = y[idx_working]
if return_ranking_coefs:
ranking_coefs_list = []
ranked_features_list = []
curve_list = []
if seed is None:
random_generator = numpy.random.RandomState()
else:
random_generator = numpy.random.RandomState(seed)
iteration_scoring_seeds = random_generator.randint(2**32,size=iterations)
iteration_ranking_seeds = random_generator.randint(2**32,size=iterations)
for i in range(iterations):
if details_files:
details_files_iteration_path = os.path.join(details_files_path,"ITERATION{}".format(i))
os.mkdir(details_files_iteration_path)
start_time = time.time()
if verbose>0:
print("Iteration {} of {}".format(i+1,iterations))
if n_features is None:
n_features = X.shape[1]
step_size = 1
while (n_features//step_size) > curve_steps:
step_size += 1
ranked_features = feature_ranking.rank_features(clf_ranking,
X_working,
y_working,
test_size=ranking_test_size,
number_of_folds=ranking_number_of_folds,
verbose=verbose,
return_ranking_coefs=return_ranking_coefs,
n_jobs = ranking_n_jobs,
details_files=details_files,
details_files_path=details_files_iteration_path,
seed=iteration_scoring_seeds[i])
if return_ranking_coefs:
ranked_features, ranking_coefs = ranked_features
if isinstance(clf_ranking,lreb.LinRidgeRegSVD): # WORKAROUND FOR CURRENT IMPLEMENTATION OF LREB
if verbose>0:
print("Applying workaround for Linear Regression with Error Bars.")
if return_ranking_coefs:
temp = [(x,y) for (x,y) in zip(ranked_features,ranking_coefs) if x<X_working.shape[1]]
ranked_features, ranking_coefs = tuple(zip(*temp))
else:
ranked_features = [x for x in ranked_features if x<X_working.shape[1]]
ranked_features_list.append(ranked_features)
if return_ranking_coefs:
ranking_coefs_list.append(ranking_coefs)
if verbose>0:
print(f"Feature ranking completed. {time.time()-start_time} seconds.")
start_time = time.time()
curve = feature_ranking.score_curve(clf_scoring,
X_working[:,ranked_features[:n_features]],
y_working,
test_size=scoring_test_size,
number_of_folds=scoring_number_of_folds,
verbose=verbose,
n_jobs=scoring_n_jobs,
step_size=step_size,
n_features=n_features,
score_function=score_function,
adaptive_features=scoring_adaptive_features,
tolerance_steps = scoring_tolerance_steps,
window_size = scoring_window_size,
details_files=details_files,
details_files_path=details_files_iteration_path,
seed=iteration_ranking_seeds[i])
if verbose>0:
print(f"Curve scoring completed. {time.time()-start_time} seconds.")
if details_files:
save_to_csv(ranked_features,details_files_iteration_path,"ranked_features.csv")
save_to_csv(curve,details_files_iteration_path,"curve.csv")
curve_list.append(curve)
if return_ranking_coefs:
return ranked_features_list, ranking_coefs_list, curve_list, idx_working, idx_validation
return ranked_features_list, curve_list, idx_working, idx_validation
[docs]def compute_feature_counts(ranked_features_list, curve_list, step_size):
"""
Computes the proportion of iterations where a feature was selected as a top feature according to its corresponding curve.
Expects parameters similar to the output of `compute_curves`.
"""
if len(ranked_features_list)!=len(curve_list):
raise ValueError("Number of iterations in rankings and curves don't match.")
scale = len(ranked_features_list)
top_features_list = []
for i,c in enumerate(curve_list):
threshold = (numpy.argmax([x[1] for x in c])+1)*step_size
top_features_list.append(ranked_features_list[i][:threshold+1])
counts = numpy.zeros(len(ranked_features_list[0]))
for y in range(len(counts)):
for l in top_features_list:
if y in l:
counts[y] += 1
feature_counts = counts/scale
return feature_counts
[docs]def compute_top_features(feature_counts, threshold=1.):
"""
Filters out top features according to `threshold`. A `threshold` value of 1 indicates that the feature was selected as
a top feature in all iterations. Expects `feature_counts` to be similar to the output of `compute_feature_counts`.
"""
return numpy.where(feature_counts>=threshold)[0]
[docs]def plot_scores(eval_scores, extra_string=""):
"""
Basic function to plot scores of a repeated experiment with confidence bars and mean value.
"""
import matplotlib.pyplot as plt
import seaborn as sns
plt.figure(figsize=(5,5),dpi=100)
cil,cir = numpy.percentile(eval_scores,[2.5,97.5])
sns.histplot(eval_scores, label="Density Estimate",kde=True)
xc = 0
xg = numpy.mean(eval_scores)
plt.suptitle("Score frequency plot\n{}".format(extra_string))
plt.axvline(x=xc, label='Score = {}'.format(xc),linestyle="dashed", c="red")
plt.axvline(x=cil, label='CI Left = {:0.3f}'.format(cil),linestyle="dotted", c="blue")
plt.axvline(x=cir, label='CI Right = {:0.3f}'.format(cir),linestyle="dotted", c="blue")
plt.axvline(x=xg, label='Mean = {:0.3f}'.format(xg),linestyle="dashed", c="green")
plt.xlabel("Score")
plt.legend()
plt.show()
plt.close()
[docs]def compute(df,
iterations,
curve_steps,
validation_size,
n_features,
ranking_test_size,
scoring_test_size,
ranking_number_of_folds,
scoring_number_of_folds,
C,
scoring_n_jobs,
threshold,
output_filename,
details=True,
details_files_parent_path = "./",
verbose=0):
"""
Function to run the full RubricOE analysis
Parameters:
-----------
df:
Dataframe of input matrix with column 'phenotype' with label
iterations: int
Number of iterations of main RubricOE loop
curve_steps: int
Scoring curve resolution
validation_size: float
Proportion of data to use for validation
n_features: int
Number of features to use for score curve. Set to \"all\" for all features.
ranking_test_size: float
Proportion of non-validation data to use per split on ranking step.
scoring_test_size: float
Proportion of non-validation data to use per split on scoring step.
ranking_number_of_folds:
Number of splits used in ranking step.
scoring_number_of_folds: int
Number of splits used in scoring step
C: float
Regularization coefficient in Ridge Regression with Error Bars
scoring_n_jobs: int
Number of parallel processes to run in scoring step
threshold: float
Proportion of iterations where a SNP should be present to be interpreted as a top SNP.
output_filename: str,
Base name of output files.
details: Boolean, default=True,
Whether to output files with detailed intermediate results.
details_files_path: str, optional
Path where output files will be written to. Not optional if `details_files` is `True`.
verbose: int, default=0,
Set to 0 to omit progress notifications.
Returns:
--------
A the subset input dataframe, relative to the top features
"""
X = df[[x for x in df.columns if x!="phenotype"]].values
y = df["phenotype"].values
if n_features == "all":
n_features = X.shape[1]
else:
n_features = int(n_features)
step_size = n_features//curve_steps
ranked_features, curves, idx_working, idx_validation = ompute_curves(X, y,
iterations = iterations,
curve_steps = curve_steps,
validation_size = validation_size,
n_features = n_features,
ranking_test_size = ranking_test_size,
scoring_test_size = scoring_test_size,
ranking_number_of_folds = ranking_number_of_folds,
scoring_number_of_folds = scoring_number_of_folds,
clf_ranking = rubricoe.lreb.LinRidgeRegSVD(C = C),
clf_scoring = SVC(kernel = "linear"),
scoring_n_jobs = scoring_n_jobs,
verbose = verbose,
details_files=details,
details_files_parent_path = details_files_parent_path)
feature_counts = compute_feature_counts(ranked_features,curves,step_size)
top_features = compute_top_features(feature_counts, threshold)
if verbose>0:
print("Finished computing. Saving files...")
if details:
output_df = pd.DataFrame(df.columns[top_features])
output_df.to_csv(output_filename,header=False,index=False)
if verbose>0:
print("Finished.")
return pd.DataFrame(df.columns[top_features])