Skip to content

API Documentation

Data Ingestion

lightsaber.data_utils.pt_dataset.BaseDataset (Dataset)

__init__(self, tgt_file, feat_file, idx_col, tgt_col, feat_columns=None, time_order_col=None, category_map={}, transform=[<function transform_drop_cols at 0x7fadb43bf4d0>, <function transform_fillna at 0x7fadb43bf560>], filter=[<function identity_nd at 0x7fae3ff95c20>], device='cpu') special

Base dataset class

Parameters:

Name Type Description Default
tgt_file

target file path

required
feat_file

feature file path

required
idx_col str or List[str]

index columns in the data. present in both tgt_file and feat_file

required
tgt_col str or List[str]

target column present in tgt_file

required
feat_columns

feature columns to select from. either a single regex or list of columns (partial regex that matches the complete column name is ok. e.g. CCS would only match CCS whereas CCS.* will match CCS_XYZ and CCS) Default: None -> implies all columns

None
time_order_col

column(s) that signify the time ordering for a single example. Default: None -> implies no columns

None
category_map

dictionary of column maps

{}
transform single callable or list/tuple of callables

how to transform data. if list of callables provided eg [f, g], g(f(x)) used Default: drop lightsaber.constants::DEFAULT_DROP_COLS and fillna

[<function transform_drop_cols at 0x7fadb43bf4d0>, <function transform_fillna at 0x7fadb43bf560>]
filter single callable or list/tuple of callables

how to filter data. if list of callables provided eg [f, g], g(f(x)) used Default: no operation

[<function identity_nd at 0x7fae3ff95c20>]
device str

valid pytorch device. cpu or gpu

'cpu'

Examples:

Example of feature columns.

lightsaber.data_utils.pt_dataset.collate_fn(batch)

Provides mechanism to collate the batch

ref: https://github.com/dhpollack/programming_notebooks/blob/master/pytorch_attention_audio.py#L245

Puts data, and lengths into a packed_padded_sequence then returns the packed_padded_sequence and the labels.

Parameters:

Name Type Description Default
batch List[Tuples]

[(*data, target)] data: all the differnt data input from __getattr__ target: target y

required

Returns:

Type Description
Tuple

(dx, dy, lengths, idx)

Source code in lightsaber/data_utils/pt_dataset.py
def collate_fn(batch): 
    """Provides mechanism to collate the batch

    ref: https://github.com/dhpollack/programming_notebooks/blob/master/pytorch_attention_audio.py#L245

    Puts data, and lengths into a packed_padded_sequence then returns
    the packed_padded_sequence and the labels.

    Parameters
    ----------
    batch : List[Tuples]
        [(*data, target)] data: all the differnt data input from `__getattr__`  target: target y

    Returns
    -------
    Tuple
        (dx, dy, lengths, idx)
    """
    pad = C.PAD

    if len(batch) == 1:
        dx_t, dy_t, lengths, idx = batch[0]
        #  sigs = sigs.t()
        dx_t.unsqueeze_(0)
        dy_t.unsqueeze_(0)
        lengths = [lengths]
        idx = np.atleast_2d(idx)

    else:
        dx_t, dy_t, lengths, idx = zip(*[(dx, dy, length, idx)
                                         for (dx, dy, length, idx) in sorted(batch, key=lambda x: x[2],
                                                                             reverse=True)])
        max_len, n_feats = dx_t[0].size()
        device = dx_t[0].device

        dx_t = [T.cat((s, T.empty(max_len - s.size(0), n_feats, device=device).fill_(pad)), 0)
                if s.size(0) != max_len else s
                for s in dx_t]
        dx_t = T.stack(dx_t, 0).to(device)  # bs * max_seq_len * n_feat

        dy_t = T.stack(dy_t, 0).to(device)  # bs * n_out

        # Handling the other variables
        lengths = list(lengths)
        idx = np.vstack(idx) # bs * 1
    return dx_t, dy_t, lengths, idx

lightsaber.data_utils.sk_dataloader.SKDataLoader

Custom data loaders for scikit-learn

Source code in lightsaber/data_utils/sk_dataloader.py
class SKDataLoader(object):
    """Custom data loaders for scikit-learn"""

    def __init__(self, 
                 tgt_file, feat_file, idx_col, tgt_col, 
                 feat_columns=None, 
                 time_order_col=None, 
                 category_map=C.DEFAULT_MAP, 
                 filter=DEFAULT_FILTER,
                 fill_value=0.,
                 flatten=C.DEFAULT_FLATTEN, 
                 cols_to_drop=C.DEFAULT_DROP_COLS,
                 ):
        """
        Parameters
        ----------
        tgt_file:
            target file path
        feat_file:
            feature file path
        idx_col:
            columns to specify the unique examples from the feature and target set
        tgt_col:
            columns to specify the target column from the target set.
        feat_columns:
            feature columns to select from. either list of columns (partials columns using `*` allowed) or a single regex
            Default: `None` -> implies all columns
        time_order_col:
            column(s) that signify the time ordering for a single example.
            Default: `None` -> implies no columns
        category_map:
            dictionary of column maps
        filter: single callable or list/tuple of callables
            how to filter data. if list of callables provided eg `[f, g]`, `g(f(x))` used 
            Default: no operation
        fill_value:
            pandas compatible function or value to fill missing data
        flatten:
            Functions to aggregate and flatten temporal data
        cols_to_drop:
            list of columns to drop
        """

        self._tgt_file = tgt_file
        self._feat_file = feat_file
        self._idx_col = idx_col
        self._tgt_col = tgt_col
        self._feat_columns = feat_columns
        self._time_order_col= time_order_col
        self._category_map = category_map

        # Enforing a flatten function to make sure sklearn modules gets a
        # flattended data
        _filter_flatten_filled_drop_cols = filter_flatten_filled_drop_cols(cols_to_drop=cols_to_drop,
                                                                           aggfunc=flatten,
                                                                           fill_value=fill_value)
        self._filter = []
        if filter is not None:
            if isinstance(filter, (list, tuple)):
                self._filter += filter
            else:
                self._filter.append(filter)
        self._filter.append(_filter_flatten_filled_drop_cols)

        # Reading data
        self.read_data()
        return

    def read_data(self):
        device = DEFAULT_DEVICE
        transform = DEFAULT_TRANSFORM

        self._dataset = ptd.BaseDataset(self._tgt_file, 
                                        self._feat_file, 
                                        self._idx_col, 
                                        self._tgt_col,
                                        feat_columns=self._feat_columns, 
                                        time_order_col=self._time_order_col,
                                        category_map=self._category_map,
                                        filter=self._filter,
                                        transform=transform,
                                        device=device
                                        )
        return

    @property
    def shape(self):
        return self._dataset.shape

    @property
    def sample_idx(self):
        return self._dataset.sample_idx

    def __len__(self):
        return len(self._dataset)

    def get_data(self):
        X = self._dataset.data
        y = self._dataset.target
        return X, y

    def get_patient(self, patient_id):
        p_idx = self._dataset.sample_idx.index.get_loc(patient_id)
        full_X, full_y = self.get_data()
        p_X = full_X.iloc[[p_idx]]
        if full_y is not None:
            p_y = full_y.iloc[[p_idx]]
        else:
            p_y = None
        return p_X, p_y

__init__(self, tgt_file, feat_file, idx_col, tgt_col, feat_columns=None, time_order_col=None, category_map={}, filter=None, fill_value=0.0, flatten='sum', cols_to_drop=['INDEX_CLAIM_THRU_DT', 'INDEX_CLAIM_ORDER', 'CLAIM_NO', 'CLM_ADMSN_DT', 'CLM_THRU_DT', 'CLAIM_ORDER']) special

Parameters:

Name Type Description Default
tgt_file

target file path

required
feat_file

feature file path

required
idx_col

columns to specify the unique examples from the feature and target set

required
tgt_col

columns to specify the target column from the target set.

required
feat_columns

feature columns to select from. either list of columns (partials columns using * allowed) or a single regex Default: None -> implies all columns

None
time_order_col

column(s) that signify the time ordering for a single example. Default: None -> implies no columns

None
category_map

dictionary of column maps

{}
filter single callable or list/tuple of callables

how to filter data. if list of callables provided eg [f, g], g(f(x)) used Default: no operation

None
fill_value

pandas compatible function or value to fill missing data

0.0
flatten

Functions to aggregate and flatten temporal data

'sum'
cols_to_drop

list of columns to drop

['INDEX_CLAIM_THRU_DT', 'INDEX_CLAIM_ORDER', 'CLAIM_NO', 'CLM_ADMSN_DT', 'CLM_THRU_DT', 'CLAIM_ORDER']
Source code in lightsaber/data_utils/sk_dataloader.py
def __init__(self, 
             tgt_file, feat_file, idx_col, tgt_col, 
             feat_columns=None, 
             time_order_col=None, 
             category_map=C.DEFAULT_MAP, 
             filter=DEFAULT_FILTER,
             fill_value=0.,
             flatten=C.DEFAULT_FLATTEN, 
             cols_to_drop=C.DEFAULT_DROP_COLS,
             ):
    """
    Parameters
    ----------
    tgt_file:
        target file path
    feat_file:
        feature file path
    idx_col:
        columns to specify the unique examples from the feature and target set
    tgt_col:
        columns to specify the target column from the target set.
    feat_columns:
        feature columns to select from. either list of columns (partials columns using `*` allowed) or a single regex
        Default: `None` -> implies all columns
    time_order_col:
        column(s) that signify the time ordering for a single example.
        Default: `None` -> implies no columns
    category_map:
        dictionary of column maps
    filter: single callable or list/tuple of callables
        how to filter data. if list of callables provided eg `[f, g]`, `g(f(x))` used 
        Default: no operation
    fill_value:
        pandas compatible function or value to fill missing data
    flatten:
        Functions to aggregate and flatten temporal data
    cols_to_drop:
        list of columns to drop
    """

    self._tgt_file = tgt_file
    self._feat_file = feat_file
    self._idx_col = idx_col
    self._tgt_col = tgt_col
    self._feat_columns = feat_columns
    self._time_order_col= time_order_col
    self._category_map = category_map

    # Enforing a flatten function to make sure sklearn modules gets a
    # flattended data
    _filter_flatten_filled_drop_cols = filter_flatten_filled_drop_cols(cols_to_drop=cols_to_drop,
                                                                       aggfunc=flatten,
                                                                       fill_value=fill_value)
    self._filter = []
    if filter is not None:
        if isinstance(filter, (list, tuple)):
            self._filter += filter
        else:
            self._filter.append(filter)
    self._filter.append(_filter_flatten_filled_drop_cols)

    # Reading data
    self.read_data()
    return

Filters and Transforms

lightsaber.data_utils.pt_dataset.identity_2d(x, y)

Identity function for 2 variables

Parameters:

Name Type Description Default
x

first param

required
y

second param

required

Returns:

Type Description
object

first param

Source code in lightsaber/data_utils/pt_dataset.py
def identity_2d(x, y):
    """
    Identity function for 2 variables

    Parameters
    ----------
    x : 
        first param
    y : 
        second param

    Returns
    -------
    x : object
        first param
    y : object
        second param
    """
    return x, y

Model Training

lightsaber.trainers.pt_trainer.PyModel (LightningModule)

PyModel

Source code in lightsaber/trainers/pt_trainer.py
class PyModel(pl.LightningModule):
    """PyModel"""
    def __init__(self, 
                 hparams:Namespace, 
                 model:nn.Module,
                 train_dataset: Optional[Dataset] = None, 
                 val_dataset: Optional[Dataset] = None,
                 cal_dataset: Optional[Dataset] = None, 
                 test_dataset: Optional[Dataset] = None,
                 collate_fn: Optional[Callable] = None, 
                 optimizer: Optional[Optimizer] = None,
                 loss_func: Optional[Callable] = None, 
                 out_transform: Optional[Callable] = None, 
                 num_workers: Optional[int] = 0, 
                 debug: Optional[bool] = False,
                 **kwargs):
        """
        Parameters
        ----------
        hparams: Namespace
            hyper-paramters for base model
        model: 
            base pytorch model defining the model logic. model forward should output logit for classfication and accept
            a single positional tensor (`x`) for input data and keyword tensors for `length` atleast. 
            Optinally can provide `hidden` keyword argument for sequential models to ingest past hidden state.
        train_dataset: torch.utils.data.Dataset, optional
            training dataset 
        val_dataset: torch.utils.data.Dataset, optional
            validation dataset 
        cal_dataset: torch.utils.data.Dataset, optional
            calibration dataset - if provided post-hoc calibration is performed
        test_dataset: torch.utils.data.Dataset, optional
            test dataset - if provided, training also report test peformance
        collate_fn: 
            collate functions to handle inequal sample sizes in batch
        optimizer: torch.optim.Optimizer, optional
            pytorch optimizer. If not provided, Adam is used with standard parameters
        loss_func: callable
            if provided, used to compute the loss. Default: cross entropy loss
        out_transform: callable
            if provided, convert logit to expected format. Default, softmax
        num_workers: int, Default: 0
            if provided sets the numer of workers used by the DataLoaders. 
        kwargs: dict, optional
            other parameters accepted by pl.LightningModule
        """
        super(PyModel, self).__init__()
        #  self.bk_hparams = hparams
        self.model = model

        self._debug = debug

        self.train_dataset = train_dataset
        self.val_dataset = val_dataset
        self.cal_dataset = cal_dataset
        self.test_dataset = test_dataset

        self.num_workers = num_workers

        self.collate_fn = collate_fn

        self._optimizer = optimizer
        self._scheduler = kwargs.get('scheduler', None)
        self._kwargs = kwargs

        # save hyper-parameters
        self.save_hyperparameters(hparams)

        # -------------------------------------------
        # TODO: Move to classifier
        if loss_func is None:
            self.loss_func = nn.CrossEntropyLoss()
        else:
            self.loss_func = loss_func

        if out_transform is None:
            self.out_transform = nn.Softmax(dim=1)
        else:
            self.out_transform = out_transform

        self.temperature = nn.Parameter(T.ones(1) * 1.)
        # -------------------------------------------
        return

    def configure_optimizers(self):
        # REQUIRED
        # can return multiple optimizers and learning_rate schedulers
        if self._optimizer is None:

            optimizer = T.optim.Adam(self.model.parameters(),
                                     lr=self.hparams.lr,
                                     weight_decay=1e-5  # standard value)
                                     )
        else:
            optimizer = self._optimizer

        if self._scheduler is None:
            return optimizer
        else:
            print("Here")
            return [optimizer], [self._scheduler]

    def on_load_checkpoint(self, checkpoint):
        # give sub model a chance to mess with the checkpoint
        if hasattr(self.model, 'on_load_checkpoint'):
            self.model.on_load_checkpoint(checkpoint)
        return

    # --------------------------------------------------------------
    #  Lightsaber:: 
    #  providing extra capabilities to model and compatibility with lightning 
    # -------------------------------------------------------------
    def forward(self, *args, **kwargs):
        return self.model.forward(*args, **kwargs)

    def apply_regularization(self):
        """
        Applies regularizations on the model parameter
        """
        loss = 0.0
        if hasattr(self.hparams, 'l1_reg') and self.hparams.l1_reg > 0:
            loss += l1_regularization(self.parameters(), self.hparams.l1_reg)
        if hasattr(self.hparams, 'l2_reg') and self.hparams.l2_reg > 0:
            loss += l2_regularization(self.parameters(), self.hparams.l2_reg)
        return loss

    def freeze_except_last_layer(self):
        n_layers = sum([1 for _ in self.model.parameters()])
        freeze_layers = n_layers - 2
        i = 0
        freeze_number = 0
        free_number = 0
        for param in self.model.parameters():
            if i <= freeze_layers - 1:
                print('freezing %d-th layer' % i)
                param.requires_grad = False
                freeze_number += param.nelement()
            else:
                free_number += param.nelement()
            i += 1
        print('Total frozen parameters', freeze_number)
        print('Total free parameters', free_number)
        return 

    def clone(self):
        return copy.copy(self)

    # --------------------------------------------------------------
    #  Lightning:: step logic for train, test. validation
    # -------------------------------------------------------------
    def _common_step(self, batch, batch_idx):
        """Common step that is run over a batch of data. 

        Currently supports two types of data
        1. batch containing only X, y, corresponding lengths, and idx
        2. batch containing an extra dimension. Currently assuming its the summary data
        """
        # REQUIRED
        if len(batch) == 4:
            x, y, lengths, idx = batch
            y_out, _ = self.forward(x, lengths=lengths)
        elif len(batch) == 5:
            x, summary, y, lengths, idx = batch
            y_out, _ = self.forward(x, lengths=lengths, summary=summary)

        y_pred = self.out_transform(y_out)
        return (y_pred, y_out, y, x)

    def _shared_eval_step(self, y_pred, y_out, y, x, is_training=False):
        # Supporting loss functions that takes in X as well
        score = self._calculate_score(y_pred, y)
        n_examples = y.shape[0]

        is_x_included = False
        for param in signature(self.loss_func).parameters:
            if param == 'X':
                is_x_included = True

        if is_x_included:    
            loss = self.loss_func(y_out, y, X=x)
        else:
            loss = self.loss_func(y_out, y)

        if is_training:
            loss += (self.apply_regularization() / n_examples)
        # General way of classification
        return loss, n_examples, score

    # TODO: move this to classification
    def _process_common_output(self, y_pred):
        _, y_hat = T.max(y_pred.data, 1)
        return y_hat

    # TODO: make this an abstractmethod. currently done for classification
    def _calculate_score(self, y_pred, y):
        y_hat = self._process_common_output(y_pred)
        score = accuracy(y_hat, y)
        return score

    def training_step(self, batch, batch_idx):
        y_pred, y_out, y, x = self._common_step(batch, batch_idx)
        loss, n_examples, score = self._shared_eval_step(y_pred, y_out, y, x, is_training=True)

        # Making it independent of loggers used
        metrics = {"loss": loss, "train_score": score}
        self.log_dict(metrics, on_step=self._debug, on_epoch=True, prog_bar=True, logger=True) 
        if self._debug:
            self.log("train_n_examples", n_examples, on_step=True, on_epoch=True)
        #  tensorboard_log = {'batch_train_loss': loss, 'batch_train_score': train_score}
        return metrics  #, train_n_correct=n_correct, train_n_examples=n_examples, log=tensorboard_log)

    def validation_step(self, batch, batch_idx):
        y_pred, y_out, y, x = self._common_step(batch, batch_idx)
        loss, n_examples, score = self._shared_eval_step(y_pred, y_out, y, x)

        # Making it independent of loggers used
        metrics = {"val_loss": loss, "val_score": score}
        self.log_dict(metrics, on_step=False, on_epoch=True, prog_bar=True, logger=True) 
        if self._debug:
            self.log("val_n_examples", n_examples, on_step=True, on_epoch=True)
        #  tensorboard_log = {'batch_val_loss': loss, 'batch_val_score': val_score}
        return metrics  #, val_n_correct=n_correct, val_n_examples=n_examples, log=tensorboard_log)

    def test_step(self, batch, batch_idx):
        y_pred, y_out, y, x = self._common_step(batch, batch_idx)
        loss, n_examples, score = self._shared_eval_step(y_pred, y_out, y, x)

        # Making it independent of loggers used
        metrics = {"test_loss": loss, "test_score": score}
        self.log_dict(metrics, on_step=False, on_epoch=True, prog_bar=True, logger=True) 
        #  tensorboard_log = {'batch_test_loss': loss, 'batch_test_score': test_score}
        # For test returning both outputs and y
        #  y_pred = self._process_common_output(y_hat)
        #  metrics.update(dict(y_pred=y_pred, y_hat=y, y=y))
        return metrics #, test_n_correct=n_correct, test_n_examples=n_examples, log=tensorboard_log)

    def predict_step(self, batch, batch_idx):
        y_pred, y_out, y, x = self._common_step(batch, batch_idx)
        y_hat = self._process_common_output(y_pred)

        payload={'y_hat': y_hat, 'y_pred': y_pred, 'y': y}
        return payload

    def _on_predict_epoch_end(self, results):
        # TODO: this should be working directly as a model hook
        # Not working
        def _process_single_dataloader(res_dataloader):
            y_hat = T.cat([r['y_hat'] for r in res_dataloader])
            y_pred = T.cat([r['y_pred'] for r in res_dataloader])
            y = T.cat([r['y'] for r in res_dataloader])
            return dict(y_hat=y_hat, y_pred=y_pred, y=y)

        # making the code adaptive for multiple dataloaders
        log.debug(f"Number of predict dataloader: {len(self.trainer.predict_dataloaders)}")
        if len(self.trainer.predict_dataloaders) == 1:
            payload = _process_single_dataloader(results)
        else:
            payload = [_process_single_dataloader(res_dataloader) 
                       for res_dataloader in results]
        return payload

    # def validation_end(self, outputs):
    #     # OPTIONAL
    #     try:
    #         avg_val_loss = T.stack([x['batch_val_loss'] for x in outputs]).mean()
    #     except Exception:
    #         avg_val_loss = T.FloatTensor([0.])
    #     
    #     try:
    #         val_score = (np.stack([x['val_n_correct'] for x in outputs]).sum() 
    #                      / np.stack([x['val_n_examples'] for x in outputs]).sum())
    #     except Exception:
    #         val_score = T.FloatTensor([0.])
    #         
    #     tensorboard_log = {'val_loss': avg_val_loss, 'val_score': val_score}
    #     return dict(val_loss=avg_val_loss, val_score=val_score, log=tensorboard_log)

    # --------------------------------------------------------------
    #   Classifier specific section:: calibration
    # -------------------------------------------------------------
    def temperature_scale(self, logits):
        """
        Perform temperature scaling on logits
        """
        # Expand temperature to match the size of logits
        temperature = self.temperature.unsqueeze(1).expand(logits.size(0), logits.size(1))
        return logits / temperature

    def set_temperature(self, cal_loader):
        """
        Tune the tempearature of the model (using the validation set).
        We're going to set it to optimize NLL.
        valid_loader (DataLoader): validation set loader
        """
        _orig_device = self.device
        try:
            if self.trainer.on_gpu:
                self.to(self.trainer.root_gpu)
        except Exception:
            pass
        #  self.cuda()
        self.temperature.data = T.ones(1, device=self.temperature.device) * 1.5

        # nll_criterion = nn.CrossEntropyLoss()
        nll_criterion = self.loss_func
        ece_criterion = _ECELoss()
        n_batches = len(cal_loader)

        # First: collect all the logits and labels for the validation set
        logits_list = []
        labels_list = []
        with T.no_grad():
            # making it compatible with non trainer run
            try:
                if self.trainer.on_gpu:
                    nll_criterion = self.trainer.transfer_to_gpu(nll_criterion, self.trainer.root_gpu)
                    ece_criterion = self.trainer.transfer_to_gpu(ece_criterion, self.trainer.root_gpu)
            except Exception:
                pass

            for (bIdx, batch) in tqdm.tqdm(enumerate(cal_loader), total=n_batches):
                if bIdx == n_batches:
                    break

                # making it compatible with non trainer run
                try:       
                    if self.trainer.on_gpu:
                        batch = self.trainer.transfer_batch_to_gpu(batch, self.trainer.root_gpu)
                except Exception:
                    pass
            #  for input, label in cal_loader:
                if len(batch) == 4:
                    x, y, lengths, idx = batch 
                    logits, _ = self.forward(x, lengths)
                elif len(batch) == 5:
                    x, summary, y, lengths, idx = batch
                    logits, _ = self.forward(x, lengths, summary)
                logits_list.append(logits)
                labels_list.append(y)
            logits = T.cat(logits_list)
            labels = T.cat(labels_list)

        # Calculate NLL and ECE before temperature scaling

        before_temperature_nll = nll_criterion(logits, labels).item()
        before_temperature_ece = ece_criterion(logits, labels).item()
        print('Before temperature - NLL: %.3f, ECE: %.3f' % (before_temperature_nll, before_temperature_ece))

        # Next: optimize the temperature w.r.t. NLL
        optimizer = optim.LBFGS([self.temperature], lr=0.01, max_iter=50)

        def eval():
            loss = nll_criterion(self.temperature_scale(logits), labels)
            loss.backward()
            return loss
        optimizer.step(eval)

        # Calculate NLL and ECE after temperature scaling
        after_temperature_nll = nll_criterion(self.temperature_scale(logits), labels).item()
        after_temperature_ece = ece_criterion(self.temperature_scale(logits), labels).item()
        print('Optimal temperature: %.3f' % self.temperature.item())
        print('After temperature - NLL: %.3f, ECE: %.3f' % (after_temperature_nll, after_temperature_ece))

        self.to(_orig_device)
        return self

    # --------------------------------------------------------------
    #  Scikit-learn compatibility section
    # -------------------------------------------------------------
    def get_params(self):
        """Return a dicitonary of param_name: param_value
        """
        _params = vars(self.hparams)
        return _params

    # TODO: Move to classifier
    def predict_proba(self, *args, **kwargs):
        logit, _ = self.forward(*args, **kwargs)
        pred = self.out_transform(self.temperature_scale(logit))
        return pred

    # TODO: Move to classifier
    def predict(self, *args, **kwargs):
        proba = self.predict_proba(*args, **kwargs)
        pred = T.argmax(proba, dim=-1)
        return pred

    # DPM360:: connector
    # Given the patient id, find the array index of the patient
    def predict_patient(self, patient_id, test_dataset):
        p_x, _, p_lengths, _ = test_dataset.get_patient(patient_id)
        proba = self.predict_proba(p_x, lengths=p_lengths)
        return proba

    # --------------------------------------------------------------
    #  Dataset handling section
    # TODO: move to dataset class
    # -------------------------------------------------------------
    def _pin_memory(self):
        pin_memory = False
        try:
            if self.trainer.on_gpu:
                pin_memory=True
        except AttributeError:
            pass
        return pin_memory

    def train_dataloader(self):
        warnings.warn(f'{C._deprecation_warn_msg}. Pass dataloader directly', DeprecationWarning, stacklevel=2)
        sampler = self._kwargs.get('train_sampler', None)
        shuffle = True if sampler is None else False

        pin_memory = self._pin_memory()
        # REQUIRED
        dataloader = DataLoader(self.train_dataset, 
                                collate_fn=self.collate_fn, 
                                shuffle=shuffle,
                                batch_size=self.hparams.batch_size,
                                sampler=sampler,
                                pin_memory=pin_memory,
                                num_workers=self.num_workers
                                )
        return dataloader

    def val_dataloader(self):
        warnings.warn(f'{C._deprecation_warn_msg}. Pass dataloader directly', DeprecationWarning, stacklevel=2)
        if self.val_dataset is None:
            dataset = ptd.EmptyDataset()
            dataloader = DataLoader(dataset)
        else:
            dataset = self.val_dataset
            pin_memory = self._pin_memory()
            dataloader = DataLoader(self.val_dataset, 
                                    collate_fn=self.collate_fn, 
                                    pin_memory=pin_memory,
                                    batch_size=self.hparams.batch_size,
                                    num_workers=self.num_workers
                                    )
        return dataloader

    def test_dataloader(self):
        warnings.warn(f'{C._deprecation_warn_msg}. Pass dataloader directly', DeprecationWarning, stacklevel=2)
        if self.test_dataset is None:
            dataset = ptd.EmptyDataset()
            dataloader = DataLoader(dataset)
        else:
            dataset = self.test_dataset
            pin_memory = self._pin_memory()
            dataloader = DataLoader(self.test_dataset, 
                                    collate_fn=self.collate_fn,
                                    pin_memory=pin_memory,
                                    batch_size=self.hparams.batch_size,
                                    num_workers=self.num_workers)
        return dataloader

    def cal_dataloader(self):
        warnings.warn(f'{C._deprecation_warn_msg}. Pass dataloader directly', DeprecationWarning, stacklevel=2)
        if self.cal_dataset is None:
            dataset = ptd.EmptyDataset()
            dataloader = DataLoader(dataset)
        else:
            dataset = self.cal_dataset
            pin_memory = self._pin_memory()
            dataloader = DataLoader(self.cal_dataset, collate_fn=self.collate_fn, pin_memory=pin_memory,
                                    batch_size=self.hparams.batch_size,num_workers=self.num_workers)
        return dataloader

__init__(self, hparams, model, train_dataset=None, val_dataset=None, cal_dataset=None, test_dataset=None, collate_fn=None, optimizer=None, loss_func=None, out_transform=None, num_workers=0, debug=False, **kwargs) special

Parameters:

Name Type Description Default
hparams Namespace

hyper-paramters for base model

required
model Module

base pytorch model defining the model logic. model forward should output logit for classfication and accept a single positional tensor (x) for input data and keyword tensors for length atleast. Optinally can provide hidden keyword argument for sequential models to ingest past hidden state.

required
train_dataset Optional[torch.utils.data.dataset.Dataset]

training dataset

None
val_dataset Optional[torch.utils.data.dataset.Dataset]

validation dataset

None
cal_dataset Optional[torch.utils.data.dataset.Dataset]

calibration dataset - if provided post-hoc calibration is performed

None
test_dataset Optional[torch.utils.data.dataset.Dataset]

test dataset - if provided, training also report test peformance

None
collate_fn Optional[Callable]

collate functions to handle inequal sample sizes in batch

None
optimizer Optional[torch.optim.optimizer.Optimizer]

pytorch optimizer. If not provided, Adam is used with standard parameters

None
loss_func Optional[Callable]

if provided, used to compute the loss. Default: cross entropy loss

None
out_transform Optional[Callable]

if provided, convert logit to expected format. Default, softmax

None
num_workers Optional[int]

if provided sets the numer of workers used by the DataLoaders.

0
kwargs dict

other parameters accepted by pl.LightningModule

{}
Source code in lightsaber/trainers/pt_trainer.py
def __init__(self, 
             hparams:Namespace, 
             model:nn.Module,
             train_dataset: Optional[Dataset] = None, 
             val_dataset: Optional[Dataset] = None,
             cal_dataset: Optional[Dataset] = None, 
             test_dataset: Optional[Dataset] = None,
             collate_fn: Optional[Callable] = None, 
             optimizer: Optional[Optimizer] = None,
             loss_func: Optional[Callable] = None, 
             out_transform: Optional[Callable] = None, 
             num_workers: Optional[int] = 0, 
             debug: Optional[bool] = False,
             **kwargs):
    """
    Parameters
    ----------
    hparams: Namespace
        hyper-paramters for base model
    model: 
        base pytorch model defining the model logic. model forward should output logit for classfication and accept
        a single positional tensor (`x`) for input data and keyword tensors for `length` atleast. 
        Optinally can provide `hidden` keyword argument for sequential models to ingest past hidden state.
    train_dataset: torch.utils.data.Dataset, optional
        training dataset 
    val_dataset: torch.utils.data.Dataset, optional
        validation dataset 
    cal_dataset: torch.utils.data.Dataset, optional
        calibration dataset - if provided post-hoc calibration is performed
    test_dataset: torch.utils.data.Dataset, optional
        test dataset - if provided, training also report test peformance
    collate_fn: 
        collate functions to handle inequal sample sizes in batch
    optimizer: torch.optim.Optimizer, optional
        pytorch optimizer. If not provided, Adam is used with standard parameters
    loss_func: callable
        if provided, used to compute the loss. Default: cross entropy loss
    out_transform: callable
        if provided, convert logit to expected format. Default, softmax
    num_workers: int, Default: 0
        if provided sets the numer of workers used by the DataLoaders. 
    kwargs: dict, optional
        other parameters accepted by pl.LightningModule
    """
    super(PyModel, self).__init__()
    #  self.bk_hparams = hparams
    self.model = model

    self._debug = debug

    self.train_dataset = train_dataset
    self.val_dataset = val_dataset
    self.cal_dataset = cal_dataset
    self.test_dataset = test_dataset

    self.num_workers = num_workers

    self.collate_fn = collate_fn

    self._optimizer = optimizer
    self._scheduler = kwargs.get('scheduler', None)
    self._kwargs = kwargs

    # save hyper-parameters
    self.save_hyperparameters(hparams)

    # -------------------------------------------
    # TODO: Move to classifier
    if loss_func is None:
        self.loss_func = nn.CrossEntropyLoss()
    else:
        self.loss_func = loss_func

    if out_transform is None:
        self.out_transform = nn.Softmax(dim=1)
    else:
        self.out_transform = out_transform

    self.temperature = nn.Parameter(T.ones(1) * 1.)
    # -------------------------------------------
    return

lightsaber.trainers.pt_trainer.run_training_with_mlflow(mlflow_conf, train_args, wrapped_model, train_dataloader=None, val_dataloader=None, test_dataloader=None, cal_dataloader=None, **kwargs)

Function to run supervised training for classifcation

Parameters:

Name Type Description Default
mlflow_conf dict

mlflow configuration e,g, MLFLOW_URI

required
train_args Namespace

namespace with arguments for pl.Trainer instance. See pytorch_lightning.trainer.Trainer for supported options TODO: potentially hyper-parameters for model

required
wrapped_model PyModel

wrapped PyModel

required
train_dataloader DataLoader

training dataloader If not provided dataloader is extracted from wrapped_model (backwards compatibility)

None
val_dataloader DataLoader

validation dataloader. If not provided dataloader is extracted from wrapped_model (backwards compatibility)

None
test_dataloader DataLoader

test dataloader If not provided dataloader is extracted from wrapped_model (backwards compatibility)

None
cal_dataloader DataLoader

calibration dataloader If not provided dataloader is extracted from wrapped_model (backwards compatibility)

None
model_path str

prefix for storing model in MlFlow

required
artifacts dict

any artifact to be logged by user

required
metrics Callable

if specified, used for calculating all metrics. else inferred from problem type

required
run_id str

if specified uses existing mlflow run.

required
auto_init_logger bool, default: True

if specificed, loggers are generated automatically. else, assumes user passed it (this is planned and not implemented now)

required
kwargs dict

remaining keyword argumennts are used as experiment tags

{}

Returns:

Type Description

(run_id, run_metrics, y_val, y_val_hat, y_val_pred, y_test, y_test_hat, y_test_pred,)

lightsaber.trainers.sk_trainer.SKModel

SKModel

Source code in lightsaber/trainers/sk_trainer.py
class SKModel(object):
    """SKModel
    """
    def __init__(self,
                 base_model,
                 model_params=None,
                 name="undefined_model_name"):
        """
        Parameters
        ----------
        base_model:
            base scikit-learn compatible model (classifier) defining model logic
        model_params:
            if provided, sets the model parameters for base_model
        name:
            name of the model
        """
        super(SKModel, self).__init__()
        self.model = base_model
        if model_params is not None:
            try:
                self.set_params(**model_params)
            except Exception as e:
                warnings.warn("couldnt set model params - base_model/model_params inconsitent with scikit-learn")
                log.debug(f'Error in model params:{e}')
        self.__name__ = name

        self.metrics = {}
        self.proba = []
        # self.params = self.model.get_params()

    @property
    def params(self):
        try:
            params = self.model.get_params()
        except AttributeError:
            raise DeprecationWarning("This is deprecated. will be dropped in v0.3. models should be sklearn compatible i.e. should have get_params. moving forward but this will be inconsistent with tuning")
            params = self.model_params
        return params

    def set_params(self, **parameters):
        self.model.set_params(**parameters)
        return self

    def fit(self, X, y, experiment_name=""): # default exp name is timestamp
        """
        Fits self.model to X, given y.
        Args:
          X (np.array): Feature matrix
          y (np.array): Binary labels for prediction
          experiment_name (str): Name for experiment as defined in config, construction of SKModel object
        Returns np.array predictions for each instance in X.
        """
        self.model.fit(X,y)
        # self.params = self.model.get_params()
        return self

    def predict(self, X):
        """
        Uses model to predict labels given input X.
        Args:
          X (np.array): Feature matrix
        Returns np.array predictions for each instance in X.
        """
        return self.model.predict(X)

    def calibrate(self, X, y):
        ccc = CalibratedClassifierCV(self.model, method='isotonic', cv='prefit')
        ccc.fit(X, y)
        self.model = ccc
        #  self.params = self.model.get_params()
        return self

    def tune(self,
             X, y,
             hyper_params,
             experiment_name,
             cv=C.DEFAULT_CV,
             scoring=C.DEFAULT_SCORING_CLASSIFIER,
             ):  ## NEEDS MODIFICATION
        """Tune hyperparameters for model. Uses mlflow to log best model, Gridsearch model, scaler, and best_score

        Parameters
        ----------
        X: np.array
            Feature matrix
        y: np.array
            Binary labels for prediction
        hyper_params:  dict
            Dictionary of hyperparameters and values/settings for model's hyperparameters.
        experiment_name: str
            Name for experiment as defined in config, construction of SKModel object
        cv: int or cv fold
            pre-defined cv generator or number
        """
        gs = GridSearchCV(estimator=self.model,
                          cv=cv,
                          param_grid=hyper_params,
                          verbose=2,
                          scoring=scoring)
        gs.fit(X, y)
        self.model = gs.best_estimator_
        self.set_params(**gs.best_params_)
        return self.model, gs

    def predict_proba(self, X):
        """
        Predicts on X and returns class probabilitiyes

        Parameters
        ----------
        X: np.array
            Feature matrix

        Returns
        -------
        array of shape (n_samples, n_classes)
        """
        return self.model.predict_proba(X)

    def score(self, X, y):
        return self.model.score(X,y)

    def predict_patient(self, patient_id, test_dataloader):
        p_X, _ = test_dataloader.get_patient(patient_id)
        return self.predict_proba(p_X)

__init__(self, base_model, model_params=None, name='undefined_model_name') special

Parameters:

Name Type Description Default
base_model

base scikit-learn compatible model (classifier) defining model logic

required
model_params

if provided, sets the model parameters for base_model

None
name

name of the model

'undefined_model_name'
Source code in lightsaber/trainers/sk_trainer.py
def __init__(self,
             base_model,
             model_params=None,
             name="undefined_model_name"):
    """
    Parameters
    ----------
    base_model:
        base scikit-learn compatible model (classifier) defining model logic
    model_params:
        if provided, sets the model parameters for base_model
    name:
        name of the model
    """
    super(SKModel, self).__init__()
    self.model = base_model
    if model_params is not None:
        try:
            self.set_params(**model_params)
        except Exception as e:
            warnings.warn("couldnt set model params - base_model/model_params inconsitent with scikit-learn")
            log.debug(f'Error in model params:{e}')
    self.__name__ = name

    self.metrics = {}
    self.proba = []

lightsaber.trainers.sk_trainer.run_training_with_mlflow(mlflow_conf, wrapped_model, train_dataloader, val_dataloader=None, test_dataloader=None, **kwargs)

Function to run supervised training for classifcation

Parameters:

Name Type Description Default
mlflow_conf dict

mlflow configuration e,g, MLFLOW_URI

required
wrapped_model SKModel

wrapped SKModel

required
train_dataloader SKDataLoader

training dataloader

required
val_dataloader SKDataLoader

validation dataloader

None
test_dataloader SKDataLoader

test dataloader

None
model_path str

prefix for storing model in MlFlow

required
artifacts dict

any artifact to be logged by user

required
metrics Callable

if specified, used for calculating all metrics. else inferred from problem type

required
tune bool

if specified tune model based on inner cv. Default: False

'False'
scoring Callable

used when tune=True. sklearn compatible scoring function to score the models for grid search. default: C.DEFAULT_SCORING_CLASSIFIER

'C.DEFAULT_SCORING_CLASSIFIER'
inner_cv object

used when tune=True. sklearn compatible cross validation folds to for grid search. default: C.DEFAULT_CV

'C.DEFAULT_CV'
h_search dict

used when tune=True (required). sklearn compatible search space for grid search.

required
run_id str

if specified uses existing mlflow run.

required
kwargs dict

remaining keyword argumennts are used as experiment tags

{}

Returns:

Type Description

(run_id, run_metrics, y_val, y_val_hat, y_val_pred, y_test, y_test_hat, y_test_pred,)

Model Registration and Load

PyTorch

lightsaber.trainers.pt_trainer.register_model_with_mlflow(run_id, mlflow_conf, wrapped_model, registered_model_name, model_path='model_checkpoint', **artifacts)

Method to register a trained model

Parameters:

Name Type Description Default
run_id str

mlflow run id for the trained model

required
mlflow_conf dict

mlflow configuration e,g, MLFLOW_URI

required
wrapped_model PyModel

model architecture to be logged

required
registered_model_name str

name for registering the model

required
model_path str

output path where model will be logged

'model_checkpoint'
artifacts dict

dictionary of objects to log with the model

{}

lightsaber.trainers.pt_trainer.load_model_from_mlflow(run_id, mlflow_conf, wrapped_model, model_path='model_checkpoint')

Method to load a trained model from mlflow

Parameters:

Name Type Description Default
run_id str

mlflow run id for the trained model

required
mlflow_conf dict

mlflow configuration e,g, MLFLOW_URI

required
wrapped_model PyModel

model architecture to be logged

required
model_path str

output path where model checkpoints are logged

'model_checkpoint'

Returns:

Type Description

wrapped model with saved weights and parameters from the run

Scikit-learn

lightsaber.trainers.sk_trainer.register_model_with_mlflow(run_id, mlflow_conf, wrapped_model=None, registered_model_name=None, model_path='model', **artifacts)

Method to register a trained model

Parameters:

Name Type Description Default
run_id str

mlflow run id for the trained model

required
mlflow_conf dict

mlflow configuration e,g, MLFLOW_URI

required
wrapped_model SKModel

model architecture to be logged. If not provided, the model is directly read from mlflow

None
registered_model_name str

name for registering the model

None
model_path str

output path where model will be logged

'model'
artifacts dict

dictionary of objects to log with the model

{}

lightsaber.trainers.sk_trainer.load_model_from_mlflow(run_id, mlflow_conf, wrapped_model=None, model_path='model')

Method to load a trained model from mlflow

Parameters:

Name Type Description Default
run_id str

mlflow run id for the trained model

required
mlflow_conf dict

mlflow configuration e,g, MLFLOW_URI

required
wrapped_model SKModel

model architecture to be logged

None
model_path str

output path where model checkpoints are logged

'model'

Returns:

Type Description

wrapped model with saved weights and parameters from the run