API Documentation
Data Ingestion
lightsaber.data_utils.pt_dataset.BaseDataset (Dataset)
__init__(self, tgt_file, feat_file, idx_col, tgt_col, feat_columns=None, time_order_col=None, category_map={}, transform=[<function transform_drop_cols at 0x7fadb43bf4d0>, <function transform_fillna at 0x7fadb43bf560>], filter=[<function identity_nd at 0x7fae3ff95c20>], device='cpu')
special
Base dataset class
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tgt_file |
target file path |
required | |
feat_file |
feature file path |
required | |
idx_col |
str or List[str] |
index columns in the data. present in both |
required |
tgt_col |
str or List[str] |
target column present in |
required |
feat_columns |
feature columns to select from. either a single regex or list of columns (partial regex that matches the complete column name is ok. e.g. |
None |
|
time_order_col |
column(s) that signify the time ordering for a single example.
Default: |
None |
|
category_map |
dictionary of column maps |
{} |
|
transform |
single callable or list/tuple of callables |
how to transform data. if list of callables provided eg |
[<function transform_drop_cols at 0x7fadb43bf4d0>, <function transform_fillna at 0x7fadb43bf560>] |
filter |
single callable or list/tuple of callables |
how to filter data. if list of callables provided eg |
[<function identity_nd at 0x7fae3ff95c20>] |
device |
str |
valid pytorch device. |
'cpu' |
Examples:
Example of feature columns.
lightsaber.data_utils.pt_dataset.collate_fn(batch)
Provides mechanism to collate the batch
ref: https://github.com/dhpollack/programming_notebooks/blob/master/pytorch_attention_audio.py#L245
Puts data, and lengths into a packed_padded_sequence then returns the packed_padded_sequence and the labels.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch |
List[Tuples] |
[(*data, target)] data: all the differnt data input from |
required |
Returns:
Type | Description |
---|---|
Tuple |
(dx, dy, lengths, idx) |
Source code in lightsaber/data_utils/pt_dataset.py
def collate_fn(batch):
"""Provides mechanism to collate the batch
ref: https://github.com/dhpollack/programming_notebooks/blob/master/pytorch_attention_audio.py#L245
Puts data, and lengths into a packed_padded_sequence then returns
the packed_padded_sequence and the labels.
Parameters
----------
batch : List[Tuples]
[(*data, target)] data: all the differnt data input from `__getattr__` target: target y
Returns
-------
Tuple
(dx, dy, lengths, idx)
"""
pad = C.PAD
if len(batch) == 1:
dx_t, dy_t, lengths, idx = batch[0]
# sigs = sigs.t()
dx_t.unsqueeze_(0)
dy_t.unsqueeze_(0)
lengths = [lengths]
idx = np.atleast_2d(idx)
else:
dx_t, dy_t, lengths, idx = zip(*[(dx, dy, length, idx)
for (dx, dy, length, idx) in sorted(batch, key=lambda x: x[2],
reverse=True)])
max_len, n_feats = dx_t[0].size()
device = dx_t[0].device
dx_t = [T.cat((s, T.empty(max_len - s.size(0), n_feats, device=device).fill_(pad)), 0)
if s.size(0) != max_len else s
for s in dx_t]
dx_t = T.stack(dx_t, 0).to(device) # bs * max_seq_len * n_feat
dy_t = T.stack(dy_t, 0).to(device) # bs * n_out
# Handling the other variables
lengths = list(lengths)
idx = np.vstack(idx) # bs * 1
return dx_t, dy_t, lengths, idx
lightsaber.data_utils.sk_dataloader.SKDataLoader
Custom data loaders for scikit-learn
Source code in lightsaber/data_utils/sk_dataloader.py
class SKDataLoader(object):
"""Custom data loaders for scikit-learn"""
def __init__(self,
tgt_file, feat_file, idx_col, tgt_col,
feat_columns=None,
time_order_col=None,
category_map=C.DEFAULT_MAP,
filter=DEFAULT_FILTER,
fill_value=0.,
flatten=C.DEFAULT_FLATTEN,
cols_to_drop=C.DEFAULT_DROP_COLS,
):
"""
Parameters
----------
tgt_file:
target file path
feat_file:
feature file path
idx_col:
columns to specify the unique examples from the feature and target set
tgt_col:
columns to specify the target column from the target set.
feat_columns:
feature columns to select from. either list of columns (partials columns using `*` allowed) or a single regex
Default: `None` -> implies all columns
time_order_col:
column(s) that signify the time ordering for a single example.
Default: `None` -> implies no columns
category_map:
dictionary of column maps
filter: single callable or list/tuple of callables
how to filter data. if list of callables provided eg `[f, g]`, `g(f(x))` used
Default: no operation
fill_value:
pandas compatible function or value to fill missing data
flatten:
Functions to aggregate and flatten temporal data
cols_to_drop:
list of columns to drop
"""
self._tgt_file = tgt_file
self._feat_file = feat_file
self._idx_col = idx_col
self._tgt_col = tgt_col
self._feat_columns = feat_columns
self._time_order_col= time_order_col
self._category_map = category_map
# Enforing a flatten function to make sure sklearn modules gets a
# flattended data
_filter_flatten_filled_drop_cols = filter_flatten_filled_drop_cols(cols_to_drop=cols_to_drop,
aggfunc=flatten,
fill_value=fill_value)
self._filter = []
if filter is not None:
if isinstance(filter, (list, tuple)):
self._filter += filter
else:
self._filter.append(filter)
self._filter.append(_filter_flatten_filled_drop_cols)
# Reading data
self.read_data()
return
def read_data(self):
device = DEFAULT_DEVICE
transform = DEFAULT_TRANSFORM
self._dataset = ptd.BaseDataset(self._tgt_file,
self._feat_file,
self._idx_col,
self._tgt_col,
feat_columns=self._feat_columns,
time_order_col=self._time_order_col,
category_map=self._category_map,
filter=self._filter,
transform=transform,
device=device
)
return
@property
def shape(self):
return self._dataset.shape
@property
def sample_idx(self):
return self._dataset.sample_idx
def __len__(self):
return len(self._dataset)
def get_data(self):
X = self._dataset.data
y = self._dataset.target
return X, y
def get_patient(self, patient_id):
p_idx = self._dataset.sample_idx.index.get_loc(patient_id)
full_X, full_y = self.get_data()
p_X = full_X.iloc[[p_idx]]
if full_y is not None:
p_y = full_y.iloc[[p_idx]]
else:
p_y = None
return p_X, p_y
__init__(self, tgt_file, feat_file, idx_col, tgt_col, feat_columns=None, time_order_col=None, category_map={}, filter=None, fill_value=0.0, flatten='sum', cols_to_drop=['INDEX_CLAIM_THRU_DT', 'INDEX_CLAIM_ORDER', 'CLAIM_NO', 'CLM_ADMSN_DT', 'CLM_THRU_DT', 'CLAIM_ORDER'])
special
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tgt_file |
target file path |
required | |
feat_file |
feature file path |
required | |
idx_col |
columns to specify the unique examples from the feature and target set |
required | |
tgt_col |
columns to specify the target column from the target set. |
required | |
feat_columns |
feature columns to select from. either list of columns (partials columns using |
None |
|
time_order_col |
column(s) that signify the time ordering for a single example.
Default: |
None |
|
category_map |
dictionary of column maps |
{} |
|
filter |
single callable or list/tuple of callables |
how to filter data. if list of callables provided eg |
None |
fill_value |
pandas compatible function or value to fill missing data |
0.0 |
|
flatten |
Functions to aggregate and flatten temporal data |
'sum' |
|
cols_to_drop |
list of columns to drop |
['INDEX_CLAIM_THRU_DT', 'INDEX_CLAIM_ORDER', 'CLAIM_NO', 'CLM_ADMSN_DT', 'CLM_THRU_DT', 'CLAIM_ORDER'] |
Source code in lightsaber/data_utils/sk_dataloader.py
def __init__(self,
tgt_file, feat_file, idx_col, tgt_col,
feat_columns=None,
time_order_col=None,
category_map=C.DEFAULT_MAP,
filter=DEFAULT_FILTER,
fill_value=0.,
flatten=C.DEFAULT_FLATTEN,
cols_to_drop=C.DEFAULT_DROP_COLS,
):
"""
Parameters
----------
tgt_file:
target file path
feat_file:
feature file path
idx_col:
columns to specify the unique examples from the feature and target set
tgt_col:
columns to specify the target column from the target set.
feat_columns:
feature columns to select from. either list of columns (partials columns using `*` allowed) or a single regex
Default: `None` -> implies all columns
time_order_col:
column(s) that signify the time ordering for a single example.
Default: `None` -> implies no columns
category_map:
dictionary of column maps
filter: single callable or list/tuple of callables
how to filter data. if list of callables provided eg `[f, g]`, `g(f(x))` used
Default: no operation
fill_value:
pandas compatible function or value to fill missing data
flatten:
Functions to aggregate and flatten temporal data
cols_to_drop:
list of columns to drop
"""
self._tgt_file = tgt_file
self._feat_file = feat_file
self._idx_col = idx_col
self._tgt_col = tgt_col
self._feat_columns = feat_columns
self._time_order_col= time_order_col
self._category_map = category_map
# Enforing a flatten function to make sure sklearn modules gets a
# flattended data
_filter_flatten_filled_drop_cols = filter_flatten_filled_drop_cols(cols_to_drop=cols_to_drop,
aggfunc=flatten,
fill_value=fill_value)
self._filter = []
if filter is not None:
if isinstance(filter, (list, tuple)):
self._filter += filter
else:
self._filter.append(filter)
self._filter.append(_filter_flatten_filled_drop_cols)
# Reading data
self.read_data()
return
Filters and Transforms
lightsaber.data_utils.pt_dataset.identity_2d(x, y)
Identity function for 2 variables
Parameters:
Name | Type | Description | Default |
---|---|---|---|
x |
first param |
required | |
y |
second param |
required |
Returns:
Type | Description |
---|---|
object |
first param |
Source code in lightsaber/data_utils/pt_dataset.py
def identity_2d(x, y):
"""
Identity function for 2 variables
Parameters
----------
x :
first param
y :
second param
Returns
-------
x : object
first param
y : object
second param
"""
return x, y
Model Training
lightsaber.trainers.pt_trainer.PyModel (LightningModule)
PyModel
Source code in lightsaber/trainers/pt_trainer.py
class PyModel(pl.LightningModule):
"""PyModel"""
def __init__(self,
hparams:Namespace,
model:nn.Module,
train_dataset: Optional[Dataset] = None,
val_dataset: Optional[Dataset] = None,
cal_dataset: Optional[Dataset] = None,
test_dataset: Optional[Dataset] = None,
collate_fn: Optional[Callable] = None,
optimizer: Optional[Optimizer] = None,
loss_func: Optional[Callable] = None,
out_transform: Optional[Callable] = None,
num_workers: Optional[int] = 0,
debug: Optional[bool] = False,
**kwargs):
"""
Parameters
----------
hparams: Namespace
hyper-paramters for base model
model:
base pytorch model defining the model logic. model forward should output logit for classfication and accept
a single positional tensor (`x`) for input data and keyword tensors for `length` atleast.
Optinally can provide `hidden` keyword argument for sequential models to ingest past hidden state.
train_dataset: torch.utils.data.Dataset, optional
training dataset
val_dataset: torch.utils.data.Dataset, optional
validation dataset
cal_dataset: torch.utils.data.Dataset, optional
calibration dataset - if provided post-hoc calibration is performed
test_dataset: torch.utils.data.Dataset, optional
test dataset - if provided, training also report test peformance
collate_fn:
collate functions to handle inequal sample sizes in batch
optimizer: torch.optim.Optimizer, optional
pytorch optimizer. If not provided, Adam is used with standard parameters
loss_func: callable
if provided, used to compute the loss. Default: cross entropy loss
out_transform: callable
if provided, convert logit to expected format. Default, softmax
num_workers: int, Default: 0
if provided sets the numer of workers used by the DataLoaders.
kwargs: dict, optional
other parameters accepted by pl.LightningModule
"""
super(PyModel, self).__init__()
# self.bk_hparams = hparams
self.model = model
self._debug = debug
self.train_dataset = train_dataset
self.val_dataset = val_dataset
self.cal_dataset = cal_dataset
self.test_dataset = test_dataset
self.num_workers = num_workers
self.collate_fn = collate_fn
self._optimizer = optimizer
self._scheduler = kwargs.get('scheduler', None)
self._kwargs = kwargs
# save hyper-parameters
self.save_hyperparameters(hparams)
# -------------------------------------------
# TODO: Move to classifier
if loss_func is None:
self.loss_func = nn.CrossEntropyLoss()
else:
self.loss_func = loss_func
if out_transform is None:
self.out_transform = nn.Softmax(dim=1)
else:
self.out_transform = out_transform
self.temperature = nn.Parameter(T.ones(1) * 1.)
# -------------------------------------------
return
def configure_optimizers(self):
# REQUIRED
# can return multiple optimizers and learning_rate schedulers
if self._optimizer is None:
optimizer = T.optim.Adam(self.model.parameters(),
lr=self.hparams.lr,
weight_decay=1e-5 # standard value)
)
else:
optimizer = self._optimizer
if self._scheduler is None:
return optimizer
else:
print("Here")
return [optimizer], [self._scheduler]
def on_load_checkpoint(self, checkpoint):
# give sub model a chance to mess with the checkpoint
if hasattr(self.model, 'on_load_checkpoint'):
self.model.on_load_checkpoint(checkpoint)
return
# --------------------------------------------------------------
# Lightsaber::
# providing extra capabilities to model and compatibility with lightning
# -------------------------------------------------------------
def forward(self, *args, **kwargs):
return self.model.forward(*args, **kwargs)
def apply_regularization(self):
"""
Applies regularizations on the model parameter
"""
loss = 0.0
if hasattr(self.hparams, 'l1_reg') and self.hparams.l1_reg > 0:
loss += l1_regularization(self.parameters(), self.hparams.l1_reg)
if hasattr(self.hparams, 'l2_reg') and self.hparams.l2_reg > 0:
loss += l2_regularization(self.parameters(), self.hparams.l2_reg)
return loss
def freeze_except_last_layer(self):
n_layers = sum([1 for _ in self.model.parameters()])
freeze_layers = n_layers - 2
i = 0
freeze_number = 0
free_number = 0
for param in self.model.parameters():
if i <= freeze_layers - 1:
print('freezing %d-th layer' % i)
param.requires_grad = False
freeze_number += param.nelement()
else:
free_number += param.nelement()
i += 1
print('Total frozen parameters', freeze_number)
print('Total free parameters', free_number)
return
def clone(self):
return copy.copy(self)
# --------------------------------------------------------------
# Lightning:: step logic for train, test. validation
# -------------------------------------------------------------
def _common_step(self, batch, batch_idx):
"""Common step that is run over a batch of data.
Currently supports two types of data
1. batch containing only X, y, corresponding lengths, and idx
2. batch containing an extra dimension. Currently assuming its the summary data
"""
# REQUIRED
if len(batch) == 4:
x, y, lengths, idx = batch
y_out, _ = self.forward(x, lengths=lengths)
elif len(batch) == 5:
x, summary, y, lengths, idx = batch
y_out, _ = self.forward(x, lengths=lengths, summary=summary)
y_pred = self.out_transform(y_out)
return (y_pred, y_out, y, x)
def _shared_eval_step(self, y_pred, y_out, y, x, is_training=False):
# Supporting loss functions that takes in X as well
score = self._calculate_score(y_pred, y)
n_examples = y.shape[0]
is_x_included = False
for param in signature(self.loss_func).parameters:
if param == 'X':
is_x_included = True
if is_x_included:
loss = self.loss_func(y_out, y, X=x)
else:
loss = self.loss_func(y_out, y)
if is_training:
loss += (self.apply_regularization() / n_examples)
# General way of classification
return loss, n_examples, score
# TODO: move this to classification
def _process_common_output(self, y_pred):
_, y_hat = T.max(y_pred.data, 1)
return y_hat
# TODO: make this an abstractmethod. currently done for classification
def _calculate_score(self, y_pred, y):
y_hat = self._process_common_output(y_pred)
score = accuracy(y_hat, y)
return score
def training_step(self, batch, batch_idx):
y_pred, y_out, y, x = self._common_step(batch, batch_idx)
loss, n_examples, score = self._shared_eval_step(y_pred, y_out, y, x, is_training=True)
# Making it independent of loggers used
metrics = {"loss": loss, "train_score": score}
self.log_dict(metrics, on_step=self._debug, on_epoch=True, prog_bar=True, logger=True)
if self._debug:
self.log("train_n_examples", n_examples, on_step=True, on_epoch=True)
# tensorboard_log = {'batch_train_loss': loss, 'batch_train_score': train_score}
return metrics #, train_n_correct=n_correct, train_n_examples=n_examples, log=tensorboard_log)
def validation_step(self, batch, batch_idx):
y_pred, y_out, y, x = self._common_step(batch, batch_idx)
loss, n_examples, score = self._shared_eval_step(y_pred, y_out, y, x)
# Making it independent of loggers used
metrics = {"val_loss": loss, "val_score": score}
self.log_dict(metrics, on_step=False, on_epoch=True, prog_bar=True, logger=True)
if self._debug:
self.log("val_n_examples", n_examples, on_step=True, on_epoch=True)
# tensorboard_log = {'batch_val_loss': loss, 'batch_val_score': val_score}
return metrics #, val_n_correct=n_correct, val_n_examples=n_examples, log=tensorboard_log)
def test_step(self, batch, batch_idx):
y_pred, y_out, y, x = self._common_step(batch, batch_idx)
loss, n_examples, score = self._shared_eval_step(y_pred, y_out, y, x)
# Making it independent of loggers used
metrics = {"test_loss": loss, "test_score": score}
self.log_dict(metrics, on_step=False, on_epoch=True, prog_bar=True, logger=True)
# tensorboard_log = {'batch_test_loss': loss, 'batch_test_score': test_score}
# For test returning both outputs and y
# y_pred = self._process_common_output(y_hat)
# metrics.update(dict(y_pred=y_pred, y_hat=y, y=y))
return metrics #, test_n_correct=n_correct, test_n_examples=n_examples, log=tensorboard_log)
def predict_step(self, batch, batch_idx):
y_pred, y_out, y, x = self._common_step(batch, batch_idx)
y_hat = self._process_common_output(y_pred)
payload={'y_hat': y_hat, 'y_pred': y_pred, 'y': y}
return payload
def _on_predict_epoch_end(self, results):
# TODO: this should be working directly as a model hook
# Not working
def _process_single_dataloader(res_dataloader):
y_hat = T.cat([r['y_hat'] for r in res_dataloader])
y_pred = T.cat([r['y_pred'] for r in res_dataloader])
y = T.cat([r['y'] for r in res_dataloader])
return dict(y_hat=y_hat, y_pred=y_pred, y=y)
# making the code adaptive for multiple dataloaders
log.debug(f"Number of predict dataloader: {len(self.trainer.predict_dataloaders)}")
if len(self.trainer.predict_dataloaders) == 1:
payload = _process_single_dataloader(results)
else:
payload = [_process_single_dataloader(res_dataloader)
for res_dataloader in results]
return payload
# def validation_end(self, outputs):
# # OPTIONAL
# try:
# avg_val_loss = T.stack([x['batch_val_loss'] for x in outputs]).mean()
# except Exception:
# avg_val_loss = T.FloatTensor([0.])
#
# try:
# val_score = (np.stack([x['val_n_correct'] for x in outputs]).sum()
# / np.stack([x['val_n_examples'] for x in outputs]).sum())
# except Exception:
# val_score = T.FloatTensor([0.])
#
# tensorboard_log = {'val_loss': avg_val_loss, 'val_score': val_score}
# return dict(val_loss=avg_val_loss, val_score=val_score, log=tensorboard_log)
# --------------------------------------------------------------
# Classifier specific section:: calibration
# -------------------------------------------------------------
def temperature_scale(self, logits):
"""
Perform temperature scaling on logits
"""
# Expand temperature to match the size of logits
temperature = self.temperature.unsqueeze(1).expand(logits.size(0), logits.size(1))
return logits / temperature
def set_temperature(self, cal_loader):
"""
Tune the tempearature of the model (using the validation set).
We're going to set it to optimize NLL.
valid_loader (DataLoader): validation set loader
"""
_orig_device = self.device
try:
if self.trainer.on_gpu:
self.to(self.trainer.root_gpu)
except Exception:
pass
# self.cuda()
self.temperature.data = T.ones(1, device=self.temperature.device) * 1.5
# nll_criterion = nn.CrossEntropyLoss()
nll_criterion = self.loss_func
ece_criterion = _ECELoss()
n_batches = len(cal_loader)
# First: collect all the logits and labels for the validation set
logits_list = []
labels_list = []
with T.no_grad():
# making it compatible with non trainer run
try:
if self.trainer.on_gpu:
nll_criterion = self.trainer.transfer_to_gpu(nll_criterion, self.trainer.root_gpu)
ece_criterion = self.trainer.transfer_to_gpu(ece_criterion, self.trainer.root_gpu)
except Exception:
pass
for (bIdx, batch) in tqdm.tqdm(enumerate(cal_loader), total=n_batches):
if bIdx == n_batches:
break
# making it compatible with non trainer run
try:
if self.trainer.on_gpu:
batch = self.trainer.transfer_batch_to_gpu(batch, self.trainer.root_gpu)
except Exception:
pass
# for input, label in cal_loader:
if len(batch) == 4:
x, y, lengths, idx = batch
logits, _ = self.forward(x, lengths)
elif len(batch) == 5:
x, summary, y, lengths, idx = batch
logits, _ = self.forward(x, lengths, summary)
logits_list.append(logits)
labels_list.append(y)
logits = T.cat(logits_list)
labels = T.cat(labels_list)
# Calculate NLL and ECE before temperature scaling
before_temperature_nll = nll_criterion(logits, labels).item()
before_temperature_ece = ece_criterion(logits, labels).item()
print('Before temperature - NLL: %.3f, ECE: %.3f' % (before_temperature_nll, before_temperature_ece))
# Next: optimize the temperature w.r.t. NLL
optimizer = optim.LBFGS([self.temperature], lr=0.01, max_iter=50)
def eval():
loss = nll_criterion(self.temperature_scale(logits), labels)
loss.backward()
return loss
optimizer.step(eval)
# Calculate NLL and ECE after temperature scaling
after_temperature_nll = nll_criterion(self.temperature_scale(logits), labels).item()
after_temperature_ece = ece_criterion(self.temperature_scale(logits), labels).item()
print('Optimal temperature: %.3f' % self.temperature.item())
print('After temperature - NLL: %.3f, ECE: %.3f' % (after_temperature_nll, after_temperature_ece))
self.to(_orig_device)
return self
# --------------------------------------------------------------
# Scikit-learn compatibility section
# -------------------------------------------------------------
def get_params(self):
"""Return a dicitonary of param_name: param_value
"""
_params = vars(self.hparams)
return _params
# TODO: Move to classifier
def predict_proba(self, *args, **kwargs):
logit, _ = self.forward(*args, **kwargs)
pred = self.out_transform(self.temperature_scale(logit))
return pred
# TODO: Move to classifier
def predict(self, *args, **kwargs):
proba = self.predict_proba(*args, **kwargs)
pred = T.argmax(proba, dim=-1)
return pred
# DPM360:: connector
# Given the patient id, find the array index of the patient
def predict_patient(self, patient_id, test_dataset):
p_x, _, p_lengths, _ = test_dataset.get_patient(patient_id)
proba = self.predict_proba(p_x, lengths=p_lengths)
return proba
# --------------------------------------------------------------
# Dataset handling section
# TODO: move to dataset class
# -------------------------------------------------------------
def _pin_memory(self):
pin_memory = False
try:
if self.trainer.on_gpu:
pin_memory=True
except AttributeError:
pass
return pin_memory
def train_dataloader(self):
warnings.warn(f'{C._deprecation_warn_msg}. Pass dataloader directly', DeprecationWarning, stacklevel=2)
sampler = self._kwargs.get('train_sampler', None)
shuffle = True if sampler is None else False
pin_memory = self._pin_memory()
# REQUIRED
dataloader = DataLoader(self.train_dataset,
collate_fn=self.collate_fn,
shuffle=shuffle,
batch_size=self.hparams.batch_size,
sampler=sampler,
pin_memory=pin_memory,
num_workers=self.num_workers
)
return dataloader
def val_dataloader(self):
warnings.warn(f'{C._deprecation_warn_msg}. Pass dataloader directly', DeprecationWarning, stacklevel=2)
if self.val_dataset is None:
dataset = ptd.EmptyDataset()
dataloader = DataLoader(dataset)
else:
dataset = self.val_dataset
pin_memory = self._pin_memory()
dataloader = DataLoader(self.val_dataset,
collate_fn=self.collate_fn,
pin_memory=pin_memory,
batch_size=self.hparams.batch_size,
num_workers=self.num_workers
)
return dataloader
def test_dataloader(self):
warnings.warn(f'{C._deprecation_warn_msg}. Pass dataloader directly', DeprecationWarning, stacklevel=2)
if self.test_dataset is None:
dataset = ptd.EmptyDataset()
dataloader = DataLoader(dataset)
else:
dataset = self.test_dataset
pin_memory = self._pin_memory()
dataloader = DataLoader(self.test_dataset,
collate_fn=self.collate_fn,
pin_memory=pin_memory,
batch_size=self.hparams.batch_size,
num_workers=self.num_workers)
return dataloader
def cal_dataloader(self):
warnings.warn(f'{C._deprecation_warn_msg}. Pass dataloader directly', DeprecationWarning, stacklevel=2)
if self.cal_dataset is None:
dataset = ptd.EmptyDataset()
dataloader = DataLoader(dataset)
else:
dataset = self.cal_dataset
pin_memory = self._pin_memory()
dataloader = DataLoader(self.cal_dataset, collate_fn=self.collate_fn, pin_memory=pin_memory,
batch_size=self.hparams.batch_size,num_workers=self.num_workers)
return dataloader
__init__(self, hparams, model, train_dataset=None, val_dataset=None, cal_dataset=None, test_dataset=None, collate_fn=None, optimizer=None, loss_func=None, out_transform=None, num_workers=0, debug=False, **kwargs)
special
Parameters:
Name | Type | Description | Default |
---|---|---|---|
hparams |
Namespace |
hyper-paramters for base model |
required |
model |
Module |
base pytorch model defining the model logic. model forward should output logit for classfication and accept
a single positional tensor ( |
required |
train_dataset |
Optional[torch.utils.data.dataset.Dataset] |
training dataset |
None |
val_dataset |
Optional[torch.utils.data.dataset.Dataset] |
validation dataset |
None |
cal_dataset |
Optional[torch.utils.data.dataset.Dataset] |
calibration dataset - if provided post-hoc calibration is performed |
None |
test_dataset |
Optional[torch.utils.data.dataset.Dataset] |
test dataset - if provided, training also report test peformance |
None |
collate_fn |
Optional[Callable] |
collate functions to handle inequal sample sizes in batch |
None |
optimizer |
Optional[torch.optim.optimizer.Optimizer] |
pytorch optimizer. If not provided, Adam is used with standard parameters |
None |
loss_func |
Optional[Callable] |
if provided, used to compute the loss. Default: cross entropy loss |
None |
out_transform |
Optional[Callable] |
if provided, convert logit to expected format. Default, softmax |
None |
num_workers |
Optional[int] |
if provided sets the numer of workers used by the DataLoaders. |
0 |
kwargs |
dict |
other parameters accepted by pl.LightningModule |
{} |
Source code in lightsaber/trainers/pt_trainer.py
def __init__(self,
hparams:Namespace,
model:nn.Module,
train_dataset: Optional[Dataset] = None,
val_dataset: Optional[Dataset] = None,
cal_dataset: Optional[Dataset] = None,
test_dataset: Optional[Dataset] = None,
collate_fn: Optional[Callable] = None,
optimizer: Optional[Optimizer] = None,
loss_func: Optional[Callable] = None,
out_transform: Optional[Callable] = None,
num_workers: Optional[int] = 0,
debug: Optional[bool] = False,
**kwargs):
"""
Parameters
----------
hparams: Namespace
hyper-paramters for base model
model:
base pytorch model defining the model logic. model forward should output logit for classfication and accept
a single positional tensor (`x`) for input data and keyword tensors for `length` atleast.
Optinally can provide `hidden` keyword argument for sequential models to ingest past hidden state.
train_dataset: torch.utils.data.Dataset, optional
training dataset
val_dataset: torch.utils.data.Dataset, optional
validation dataset
cal_dataset: torch.utils.data.Dataset, optional
calibration dataset - if provided post-hoc calibration is performed
test_dataset: torch.utils.data.Dataset, optional
test dataset - if provided, training also report test peformance
collate_fn:
collate functions to handle inequal sample sizes in batch
optimizer: torch.optim.Optimizer, optional
pytorch optimizer. If not provided, Adam is used with standard parameters
loss_func: callable
if provided, used to compute the loss. Default: cross entropy loss
out_transform: callable
if provided, convert logit to expected format. Default, softmax
num_workers: int, Default: 0
if provided sets the numer of workers used by the DataLoaders.
kwargs: dict, optional
other parameters accepted by pl.LightningModule
"""
super(PyModel, self).__init__()
# self.bk_hparams = hparams
self.model = model
self._debug = debug
self.train_dataset = train_dataset
self.val_dataset = val_dataset
self.cal_dataset = cal_dataset
self.test_dataset = test_dataset
self.num_workers = num_workers
self.collate_fn = collate_fn
self._optimizer = optimizer
self._scheduler = kwargs.get('scheduler', None)
self._kwargs = kwargs
# save hyper-parameters
self.save_hyperparameters(hparams)
# -------------------------------------------
# TODO: Move to classifier
if loss_func is None:
self.loss_func = nn.CrossEntropyLoss()
else:
self.loss_func = loss_func
if out_transform is None:
self.out_transform = nn.Softmax(dim=1)
else:
self.out_transform = out_transform
self.temperature = nn.Parameter(T.ones(1) * 1.)
# -------------------------------------------
return
lightsaber.trainers.pt_trainer.run_training_with_mlflow(mlflow_conf, train_args, wrapped_model, train_dataloader=None, val_dataloader=None, test_dataloader=None, cal_dataloader=None, **kwargs)
Function to run supervised training for classifcation
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mlflow_conf |
dict |
mlflow configuration e,g, MLFLOW_URI |
required |
train_args |
Namespace |
namespace with arguments for pl.Trainer instance. See |
required |
wrapped_model |
PyModel |
wrapped PyModel |
required |
train_dataloader |
DataLoader |
training dataloader
If not provided dataloader is extracted from |
None |
val_dataloader |
DataLoader |
validation dataloader.
If not provided dataloader is extracted from |
None |
test_dataloader |
DataLoader |
test dataloader
If not provided dataloader is extracted from |
None |
cal_dataloader |
DataLoader |
calibration dataloader
If not provided dataloader is extracted from |
None |
model_path |
str |
prefix for storing model in MlFlow |
required |
artifacts |
dict |
any artifact to be logged by user |
required |
metrics |
Callable |
if specified, used for calculating all metrics. else inferred from problem type |
required |
run_id |
str |
if specified uses existing mlflow run. |
required |
auto_init_logger |
bool, default: True |
if specificed, loggers are generated automatically. else, assumes user passed it (this is planned and not implemented now) |
required |
kwargs |
dict |
remaining keyword argumennts are used as experiment tags |
{} |
Returns:
Type | Description |
---|---|
(run_id, run_metrics, y_val, y_val_hat, y_val_pred, y_test, y_test_hat, y_test_pred,) |
lightsaber.trainers.sk_trainer.SKModel
SKModel
Source code in lightsaber/trainers/sk_trainer.py
class SKModel(object):
"""SKModel
"""
def __init__(self,
base_model,
model_params=None,
name="undefined_model_name"):
"""
Parameters
----------
base_model:
base scikit-learn compatible model (classifier) defining model logic
model_params:
if provided, sets the model parameters for base_model
name:
name of the model
"""
super(SKModel, self).__init__()
self.model = base_model
if model_params is not None:
try:
self.set_params(**model_params)
except Exception as e:
warnings.warn("couldnt set model params - base_model/model_params inconsitent with scikit-learn")
log.debug(f'Error in model params:{e}')
self.__name__ = name
self.metrics = {}
self.proba = []
# self.params = self.model.get_params()
@property
def params(self):
try:
params = self.model.get_params()
except AttributeError:
raise DeprecationWarning("This is deprecated. will be dropped in v0.3. models should be sklearn compatible i.e. should have get_params. moving forward but this will be inconsistent with tuning")
params = self.model_params
return params
def set_params(self, **parameters):
self.model.set_params(**parameters)
return self
def fit(self, X, y, experiment_name=""): # default exp name is timestamp
"""
Fits self.model to X, given y.
Args:
X (np.array): Feature matrix
y (np.array): Binary labels for prediction
experiment_name (str): Name for experiment as defined in config, construction of SKModel object
Returns np.array predictions for each instance in X.
"""
self.model.fit(X,y)
# self.params = self.model.get_params()
return self
def predict(self, X):
"""
Uses model to predict labels given input X.
Args:
X (np.array): Feature matrix
Returns np.array predictions for each instance in X.
"""
return self.model.predict(X)
def calibrate(self, X, y):
ccc = CalibratedClassifierCV(self.model, method='isotonic', cv='prefit')
ccc.fit(X, y)
self.model = ccc
# self.params = self.model.get_params()
return self
def tune(self,
X, y,
hyper_params,
experiment_name,
cv=C.DEFAULT_CV,
scoring=C.DEFAULT_SCORING_CLASSIFIER,
): ## NEEDS MODIFICATION
"""Tune hyperparameters for model. Uses mlflow to log best model, Gridsearch model, scaler, and best_score
Parameters
----------
X: np.array
Feature matrix
y: np.array
Binary labels for prediction
hyper_params: dict
Dictionary of hyperparameters and values/settings for model's hyperparameters.
experiment_name: str
Name for experiment as defined in config, construction of SKModel object
cv: int or cv fold
pre-defined cv generator or number
"""
gs = GridSearchCV(estimator=self.model,
cv=cv,
param_grid=hyper_params,
verbose=2,
scoring=scoring)
gs.fit(X, y)
self.model = gs.best_estimator_
self.set_params(**gs.best_params_)
return self.model, gs
def predict_proba(self, X):
"""
Predicts on X and returns class probabilitiyes
Parameters
----------
X: np.array
Feature matrix
Returns
-------
array of shape (n_samples, n_classes)
"""
return self.model.predict_proba(X)
def score(self, X, y):
return self.model.score(X,y)
def predict_patient(self, patient_id, test_dataloader):
p_X, _ = test_dataloader.get_patient(patient_id)
return self.predict_proba(p_X)
__init__(self, base_model, model_params=None, name='undefined_model_name')
special
Parameters:
Name | Type | Description | Default |
---|---|---|---|
base_model |
base scikit-learn compatible model (classifier) defining model logic |
required | |
model_params |
if provided, sets the model parameters for base_model |
None |
|
name |
name of the model |
'undefined_model_name' |
Source code in lightsaber/trainers/sk_trainer.py
def __init__(self,
base_model,
model_params=None,
name="undefined_model_name"):
"""
Parameters
----------
base_model:
base scikit-learn compatible model (classifier) defining model logic
model_params:
if provided, sets the model parameters for base_model
name:
name of the model
"""
super(SKModel, self).__init__()
self.model = base_model
if model_params is not None:
try:
self.set_params(**model_params)
except Exception as e:
warnings.warn("couldnt set model params - base_model/model_params inconsitent with scikit-learn")
log.debug(f'Error in model params:{e}')
self.__name__ = name
self.metrics = {}
self.proba = []
lightsaber.trainers.sk_trainer.run_training_with_mlflow(mlflow_conf, wrapped_model, train_dataloader, val_dataloader=None, test_dataloader=None, **kwargs)
Function to run supervised training for classifcation
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mlflow_conf |
dict |
mlflow configuration e,g, MLFLOW_URI |
required |
wrapped_model |
SKModel |
wrapped SKModel |
required |
train_dataloader |
SKDataLoader |
training dataloader |
required |
val_dataloader |
SKDataLoader |
validation dataloader |
None |
test_dataloader |
SKDataLoader |
test dataloader |
None |
model_path |
str |
prefix for storing model in MlFlow |
required |
artifacts |
dict |
any artifact to be logged by user |
required |
metrics |
Callable |
if specified, used for calculating all metrics. else inferred from problem type |
required |
tune |
bool |
if specified tune model based on inner cv. Default: False |
'False' |
scoring |
Callable |
used when tune=True. sklearn compatible scoring function to score the models for grid search. default: C.DEFAULT_SCORING_CLASSIFIER |
'C.DEFAULT_SCORING_CLASSIFIER' |
inner_cv |
object |
used when tune=True. sklearn compatible cross validation folds to for grid search. default: C.DEFAULT_CV |
'C.DEFAULT_CV' |
h_search |
dict |
used when tune=True (required). sklearn compatible search space for grid search. |
required |
run_id |
str |
if specified uses existing mlflow run. |
required |
kwargs |
dict |
remaining keyword argumennts are used as experiment tags |
{} |
Returns:
Type | Description |
---|---|
(run_id, run_metrics, y_val, y_val_hat, y_val_pred, y_test, y_test_hat, y_test_pred,) |
Model Registration and Load
PyTorch
lightsaber.trainers.pt_trainer.register_model_with_mlflow(run_id, mlflow_conf, wrapped_model, registered_model_name, model_path='model_checkpoint', **artifacts)
Method to register a trained model
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id |
str |
mlflow run id for the trained model |
required |
mlflow_conf |
dict |
mlflow configuration e,g, MLFLOW_URI |
required |
wrapped_model |
PyModel |
model architecture to be logged |
required |
registered_model_name |
str |
name for registering the model |
required |
model_path |
str |
output path where model will be logged |
'model_checkpoint' |
artifacts |
dict |
dictionary of objects to log with the model |
{} |
lightsaber.trainers.pt_trainer.load_model_from_mlflow(run_id, mlflow_conf, wrapped_model, model_path='model_checkpoint')
Method to load a trained model from mlflow
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id |
str |
mlflow run id for the trained model |
required |
mlflow_conf |
dict |
mlflow configuration e,g, MLFLOW_URI |
required |
wrapped_model |
PyModel |
model architecture to be logged |
required |
model_path |
str |
output path where model checkpoints are logged |
'model_checkpoint' |
Returns:
Type | Description |
---|---|
wrapped model with saved weights and parameters from the run |
Scikit-learn
lightsaber.trainers.sk_trainer.register_model_with_mlflow(run_id, mlflow_conf, wrapped_model=None, registered_model_name=None, model_path='model', **artifacts)
Method to register a trained model
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id |
str |
mlflow run id for the trained model |
required |
mlflow_conf |
dict |
mlflow configuration e,g, MLFLOW_URI |
required |
wrapped_model |
SKModel |
model architecture to be logged. If not provided, the model is directly read from mlflow |
None |
registered_model_name |
str |
name for registering the model |
None |
model_path |
str |
output path where model will be logged |
'model' |
artifacts |
dict |
dictionary of objects to log with the model |
{} |
lightsaber.trainers.sk_trainer.load_model_from_mlflow(run_id, mlflow_conf, wrapped_model=None, model_path='model')
Method to load a trained model from mlflow
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id |
str |
mlflow run id for the trained model |
required |
mlflow_conf |
dict |
mlflow configuration e,g, MLFLOW_URI |
required |
wrapped_model |
SKModel |
model architecture to be logged |
None |
model_path |
str |
output path where model checkpoints are logged |
'model' |
Returns:
Type | Description |
---|---|
wrapped model with saved weights and parameters from the run |