Source code for deepdow.callbacks

"""Collection of different callbacks."""
import pathlib
import sys

import mlflow
import numpy as np
import pandas as pd
import torch
from torch.utils.tensorboard import SummaryWriter
import tqdm

from .utils import ChangeWorkingDirectory


[docs]class Callback: """Parent class for all callbacks. General construct that allows for taking different actions at different points of the training process. One can provide a list of callbacks to the ``deepdow.experiments.Run``. Notes ----- To implement new callbacks one needs to subclass this class. """
[docs] def on_train_begin(self, metadata): """Take actions at the beginning of the training. Parameters ---------- metadata : dict Dictionary that is going to be populated with relevant data within `Run.launch`. """ pass
[docs] def on_train_interrupt(self, metadata): """Take actions on training interruption. Parameters ---------- metadata : dict Dictionary that is going to be populated with relevant data within `Run.launch`. Keys available are 'exception', 'locals`. """ pass
[docs] def on_train_end(self, metadata): """Take actions at the end of the training. Parameters ---------- metadata : dict Dictionary that is going to be populated with relevant data within `Run.launch`. """ pass
[docs] def on_epoch_begin(self, metadata): """Take actions at the beginning of an epoch. Parameters ---------- metadata : dict Dictionary that is going to be populated with relevant data within `Run.launch`. Keys available are 'epoch'. """ pass
[docs] def on_epoch_end(self, metadata): """Take actions at the beginning of an epoch. Parameters ---------- metadata : dict Dictionary that is going to be populated with relevant data within `Run.launch`. Keys available are `epoch`, `n_epochs`. """ pass
[docs] def on_batch_begin(self, metadata): """Take actions at the beginning of a batch. Parameters ---------- metadata : dict Dictionary that is going to be populated with relevant data within `Run.launch`. Keys available are 'asset_names', 'batch', 'epoch', 'timestamps', 'X_batch', 'y_batch'. """ pass
[docs] def on_batch_end(self, metadata): """Take actions at the beginning of a batch. Parameters ---------- metadata : dict Dictionary that is going to be populated with relevant data within `Run.launch`. Keys available are 'asset_names', 'batch', 'batch_loss', 'epoch', 'timestamps', 'weights', 'X_batch', 'y_batch'. """ pass
[docs]class EarlyStoppingException(Exception): """Custom exception raised by EarlyStoppingCallback to stop the training."""
[docs]class BenchmarkCallback(Callback): """Computation of benchmarks performance over different metrics and dataloaders. Parameters ---------- lookbacks : list or None If ``list`` then list of integers representing the different lookbacks. The benchmarks will be run for all of them. If None then just the default one implied by the dataloader. Attributes ---------- run : deepdow.experiments.Run Run instance that is using this callback. Notes ----- Very useful for establishing baselines for deep learning models. """ def __init__(self, lookbacks=None): self.lookbacks = lookbacks self.run = None
[docs] def on_train_begin(self, metadata): """Compute performance of all benchmarks.""" with torch.no_grad(): for dl_name, dl in self.run.val_dataloaders.items(): for bm_name, bm in self.run.models.items(): if bm_name == 'main': continue for batch, (X_batch, y_batch, timestamps_batch, _) in enumerate(dl): X_batch = X_batch.to(self.run.device).to(self.run.dtype) y_batch = y_batch.to(self.run.device).to(self.run.dtype) lookbacks = [] if self.lookbacks is None: lookbacks.append(X_batch.shape[2]) else: lookbacks = self.lookbacks for lb in lookbacks: weights = bm(X_batch[:, :, -lb:, :]) for metric_name, metric in self.run.metrics.items(): metric_per_s = metric(weights, y_batch).detach().cpu().numpy() for metric_value, ts in zip(metric_per_s, timestamps_batch): self.run.history.add_entry(timestamp=ts, epoch=-1, model=bm_name, batch=batch, lookback=lb, dataloader=dl_name, metric=metric_name, value=metric_value) if len(self.run.models) > 1: self.run.history.pretty_print(-1)
[docs]class EarlyStoppingCallback(Callback): """Early stopping callback. In the background, we keep a running minimum of a metric of interest. If it does not change for more than `patience` epochs the training is stopped. Parameters ---------- dataloader_name : str Name of the dataloader, needs to correspond to a key in `val_dataloaders` in ``deepdow.experiments.Run``. metric_name : str Name of the metric to use (the lower the better), needs to correspond to a key in `metrics` in ``deepdow.experiments.Run``. patience : int Number of epochs without improvement before the training is stopped. Attributes ---------- min : float Running minimum of the metric. n_epochs_no_improvement : int Number of epochs without improvement - not going below the previous minimum. """ def __init__(self, dataloader_name, metric_name, patience=5): self.dataloader_name = dataloader_name self.metric_name = metric_name self.patience = patience self.min = np.inf self.n_epochs_no_improvement = 0 self.run = None # will be injected with an instance of ``Run``.
[docs] def on_train_begin(self, metadata): """Check if dataloader name and metric name even exist.""" if self.dataloader_name not in self.run.val_dataloaders: raise ValueError('Did not find the dataloader {}'.format(self.dataloader_name)) if self.metric_name not in self.run.metrics: raise ValueError('Did not find the metric {}'.format(self.metric_name))
[docs] def on_epoch_end(self, metadata): """Extract statistics and if necessary stop training.""" epoch = metadata['epoch'] stats = self.run.history.metrics_per_epoch(epoch) if not (len(stats['lookback'].unique()) == 1 and len(stats['model'].unique()) == 1): raise ValueError('EarlyStoppingCallback needs to have a single lookback and model') # pragma: no cover stats_formatted = stats.groupby(['dataloader', 'metric'])['value'].mean().unstack(-1) current_metric = stats_formatted.loc[self.dataloader_name, self.metric_name] if current_metric < self.min: self.min = current_metric self.n_epochs_no_improvement = 0 else: self.n_epochs_no_improvement += 1 # pragma: no cover if self.n_epochs_no_improvement >= self.patience: raise EarlyStoppingException()
[docs] def on_train_interrupt(self, metadata): """Handle ``EarlyStoppingException``.""" ex = metadata['exception'] if isinstance(ex, EarlyStoppingException): msg = 'Training stopped early because there was no improvement in {}_{} for {} epochs'.format( self.dataloader_name, self.metric_name, self.patience) print(msg)
[docs]class MLFlowCallback(Callback): """MLFlow logging callback. Parameters ---------- run_name : str or None If ``str`` then represents the name of a new run to be created. If None then the user eithers provides `run_id` of an existing run and everything will be logged into it or a new run with random name would be generated. mlflow_path : str or pathlib.Path or None If ``str`` or ``pathlib.Path`` then represents the absolute path to a folder in which `mlruns` lie. If None then home folder used. experiment_name : str or None Experiment to be use. If None using the default one. run_id : str or None If provided and `run_name` is None then continuing an existing run. If None than a new run is created. log_benchmarks : bool If True then all benchmarks will be logged under separate mlflow runs. Attributes ---------- run : deepdow.experiments.Run Run instance that is using this callback. """ def __init__(self, run_name=None, mlflow_path=None, experiment_name=None, run_id=None, log_benchmarks=False): self.run_name = run_name self.mlflow_path = mlflow_path self.experiment_name = experiment_name if run_name is not None and run_id is not None: raise ValueError('Cannot provide both run_id and run_name') self.run_id = run_id self.log_benchmarks = log_benchmarks self._run_id = run_id or None self.run = None with ChangeWorkingDirectory(self.mlflow_path): self._client = mlflow.tracking.MlflowClient()
[docs] def on_train_begin(self, metadata): """Log hyperparameters and potentially benchmarks performance.""" with ChangeWorkingDirectory(self.mlflow_path): # log some metadata if self.experiment_name is not None: mlflow.set_experiment(self.experiment_name) with mlflow.start_run(run_name=self.run_name, run_id=self._run_id): # if run_id is not None then run_name is ignored self._run_id = mlflow.active_run().info.run_id params = { 'device': self.run.device, 'dtype': self.run.dtype, 'train_dataloader': self.run.train_dataloader.__class__.__name__ } params.update(self.run.train_dataloader.hparams) params.update(self.run.network.hparams) mlflow.log_params(params) if self.log_benchmarks: try: df = self.run.history.metrics_per_epoch(-1) # only benchmarks for bm_name in df['model'].unique(): with mlflow.start_run(run_name=bm_name): temp_df = df[df['model'] == bm_name] metrics = {'_'.join(list(map(lambda x: str(x), k))): v for k, v in temp_df.groupby(['dataloader', 'metric', 'lookback'])['value'].mean().items()} mlflow.log_metrics(metrics, step=0) mlflow.log_metrics(metrics, step=10) except KeyError: return
[docs] def on_epoch_end(self, metadata): """Read relevant results and log into MLflow.""" epoch = metadata.get('epoch') with ChangeWorkingDirectory(self.mlflow_path): # log some metadata if self.experiment_name is not None: mlflow.set_experiment(self.experiment_name) with mlflow.start_run(run_id=self._run_id): try: df = self.run.history.metrics_per_epoch(epoch) metrics = {'_'.join(list(map(lambda x: str(x), k))): v for k, v in df.groupby(['dataloader', 'metric', 'lookback'])['value'].mean().items()} mlflow.log_metrics(metrics, step=epoch) except KeyError: return
[docs]class ModelCheckpointCallback(Callback): """Model checkpointing callback. In the background, we keep a running minimum of a metric of interest. Parameters ---------- folder_path : str or pathlib.Path Directory to which to save the checkpoints. dataloader_name : str Name of the dataloader, needs to correspond to a key in `val_dataloaders` in ``deepdow.experiments.Run``. metric_name : str Name of the metric to use (the lower the better), needs to correspond to a key in `metrics` in ``deepdow.experiments.Run``. verbose : bool If True, each checkpointing triggers a message. Attributes ---------- min : float Running minimum of the metric. """ def __init__(self, folder_path, dataloader_name, metric_name, verbose=False, save_best_only=False): self.folder_path = pathlib.Path(folder_path) if self.folder_path.is_file(): raise NotADirectoryError('The checkpointing path needs to be a folder.') self.dataloader_name = dataloader_name self.metric_name = metric_name self.verbose = verbose self.save_best_only = save_best_only self.min = np.inf self.run = None # will be injected with an instance of ``Run``.
[docs] def on_train_begin(self, metadata): """Check if dataloader name and metric name even exist.""" self.folder_path.mkdir(parents=True, exist_ok=True) if self.dataloader_name not in self.run.val_dataloaders: raise ValueError('Did not find the dataloader {}'.format(self.dataloader_name)) if self.metric_name not in self.run.metrics: raise ValueError('Did not find the metric {}'.format(self.metric_name))
[docs] def on_epoch_end(self, metadata): """Store checkpoint if metric is in its all time low.""" epoch = metadata['epoch'] stats = self.run.history.metrics_per_epoch(epoch) if not (len(stats['lookback'].unique()) == 1 and len(stats['model'].unique()) == 1): raise ValueError('ModelCheckpointCallback needs to have a single lookback and model') # pragma: no cover stats_formatted = stats.groupby(['dataloader', 'metric'])['value'].mean().unstack(-1) current_metric = stats_formatted.loc[self.dataloader_name, self.metric_name] if current_metric < self.min: self.min = current_metric checkpoint_path = self.folder_path / 'model_{:02d}__{:.4f}.pth'.format(epoch, current_metric) torch.save(self.run.network, str(checkpoint_path)) if self.verbose: print('Checkpointed {}'.format(checkpoint_path))
[docs]class ProgressBarCallback(Callback): """Progress bar reporting remaining steps and relevant metrics. Attributes ---------- bar : tqdm.tqdm Bar object that is going to be instantiated at the beginning of each epoch. metrics : dict Keys are equal to `self.run.metrics.keys()` and the values are list that are appended on batch end with after gradient step metrics. run : Run Run object that is running the main training loop. One can get access to multiple useful things like the network (`run.network`), train dataloader (`run.train_dataloader`) etc. output : str, {'stdout', 'stderr'} Where to output the progress bar. """ def __init__(self, output='stderr', n_decimals=3): self.bar = None self.metrics = {} self.n_decimals = n_decimals if output == 'stderr': self.output = sys.stderr elif output == 'stdout': self.output = sys.stdout else: raise ValueError('Unrecognized output {}'.format(output)) self.run = None
[docs] def on_epoch_begin(self, metadata): """Initialize tqdm bar and metric lists.""" self.bar = tqdm.tqdm(total=len(self.run.train_dataloader), leave=True, desc='Epoch {}'.format(metadata['epoch']), file=self.output) self.metrics = {metric: [] for metric in self.run.metrics.keys()}
[docs] def on_epoch_end(self, metadata): """Update finished progress bar with latest epoch metrics.""" epoch = metadata.get('epoch') try: df = self.run.history.metrics_per_epoch(epoch) additional_metrics = {'_'.join(list(map(lambda x: str(x), k))): v for k, v in df.groupby(['dataloader', 'metric'])['value'].mean().items()} except KeyError: # no val_dataloaders additional_metrics = {} old_postfix = self.bar.postfix new_postfix = self.create_custom_postfix_str(additional_metrics) final_postfix = "{}, {}".format(old_postfix, new_postfix) self.bar.set_postfix_str(final_postfix) del self.bar
[docs] def on_batch_end(self, metadata): """Update progress bar with batch metrics.""" weights = metadata.get('weights') y_batch = metadata.get('y_batch') for metric, cal in self.run.metrics.items(): self.metrics[metric].append(cal(weights, y_batch).mean().item()) log_dict = {m: np.mean(vals) for m, vals in self.metrics.items()} self.bar.update() self.bar.set_postfix_str(self.create_custom_postfix_str(log_dict))
[docs] @staticmethod def create_custom_postfix_str(metrics, n_decimals=5): """Create a custom string with metrics. Parameters ---------- metrics : dict Keys represent names and the n_decimals : int Number of decimals to display. Returns ------- formatted : str Nicely formatted string to be appended to the progress bar. """ fmt_str = "{}={:." + str(n_decimals) + "f}" str_l = [fmt_str.format(k, v) for k, v in metrics.items()] return ", ".join(str_l)
[docs]class TensorBoardCallback(Callback): """Tensorboard logging interface. Currently supports: - images (evolution of predicted weights over time) - histograms (activations of input and outputs of all layers) - hyperparamters - scalars (logged metrics) Parameters ---------- log_dir : None or str or pathlib.Path Represent the folder where to checkpoints will be saved. If None then using `cwd/runs/CURRENT_DATETIME_HOSTNAME`. Else the exact path. ts : datetime.datetime or None If ``datetime.datetime``, then only logging specific sample corresponding to provided timestamp. If None then logging every sample. log_benchmarks : bool If True, then benchmark metrics are logged to scalars. The folder is `log_dir / bm_name`. Attributes ---------- run : deepdow.experiments.Run Run instance that is using this callback. """ def __init__(self, log_dir=None, ts=None, log_benchmarks=False): self.log_dir = pathlib.Path(log_dir) if log_dir is not None else pathlib.Path.cwd() self.writer = SummaryWriter(self.log_dir) self.counter = 0 self.ts = ts self.log_benchmarks = log_benchmarks self.run = None self.activations = {} self.handles = [] self.weights = []
[docs] def on_train_begin(self, metadata): """Log benchmarks performance.""" n_epochs = metadata.get('n_epochs') if self.log_benchmarks: try: df = self.run.history.metrics_per_epoch(-1) # only benchmarks for bm_name in df['model'].unique(): temp_df = df[df['model'] == bm_name] metrics = {'/'.join(list(map(lambda x: str(x), k))): v for k, v in temp_df.groupby(['dataloader', 'metric', 'lookback'])['value'].mean().items()} bm_writer = SummaryWriter(self.log_dir / bm_name) for metric_name, metric_value in metrics.items(): for global_step in range(n_epochs): bm_writer.add_scalar(metric_name, metric_value, global_step=global_step) except KeyError: return
[docs] def on_batch_begin(self, metadata): """Set up forward hooks.""" timestamps = metadata.get('timestamps') if self.ts is not None and self.ts not in timestamps: return def hook(model, inp, out): self.activations[model] = (inp, out) for layer in self.run.network._modules.values(): self.handles.append(layer.register_forward_hook(hook))
[docs] def on_batch_end(self, metadata): """Log activations.""" timestamps = metadata.get('timestamps') weights = metadata.get('weights') # cache weights self.weights.append(pd.DataFrame(weights.detach().cpu().numpy(), index=timestamps)) # add activations self._add_activations(metadata)
[docs] def on_epoch_end(self, metadata): """Log images, metrics and hyperparamters.""" epoch = metadata.get('epoch') n_epochs = metadata.get('n_epochs') # create weight image master_df = pd.concat(self.weights).sort_index() self.writer.add_image('weights', master_df.values[np.newaxis, ...], global_step=metadata['epoch']) self.weights = [] # log scalars try: df = self.run.history.metrics_per_epoch(epoch) metrics = {'/'.join(list(map(lambda x: str(x), k))): v for k, v in df.groupby(['dataloader', 'metric', 'lookback'])['value'].mean().items()} for metric_name, metric_value in metrics.items(): self.writer.add_scalar(metric_name, metric_value, global_step=epoch) if epoch == n_epochs - 1: self.writer.add_hparams(self.run.hparams, metrics) except KeyError: pass
def _add_activations(self, metadata): """Add activations.""" X_batch = metadata.get('X_batch') timestamps = metadata.get('timestamps') if self.ts is not None and self.ts not in timestamps: return ix = timestamps.index(self.ts) if self.ts is not None else list(range(len(X_batch))) self.writer.add_histogram(tag='inputs', values=X_batch[ix], global_step=self.counter) for s, io in self.activations.items(): for i, x in enumerate(io): if torch.is_tensor(x): self.writer.add_histogram(s.__class__.__name__ + "_{}".format('inp' if i == 0 else 'out'), x[ix], global_step=self.counter) else: for j, y in enumerate(x): if y is None: continue # pragma: no cover self.writer.add_histogram(s.__class__.__name__ + "_{}_{}".format('inp' if i == 0 else 'out', j), y[ix], global_step=self.counter) for handle in self.handles: handle.remove() self.handles = [] self.activations = {} self.counter += 1
[docs]class ValidationCallback(Callback): """Logging of all metrics for all validation dataloaders. Parameters ---------- freq : int With what frequiency to compute metrics. If equal to 1 then every epoch. The higher the less frequent the logging will be. lookbacks : list or None If ``list`` then list of integers representing the different lookbacks. The benchmarks will be run for all of them. If None then just the default one implied by the dataloader. Attributes ---------- run : deepdow.experiments.Run Run instance that is using this callback. """ def __init__(self, freq=1, lookbacks=None): self.freq = freq self.lookbacks = lookbacks self.run = None # to be populated later
[docs] def on_epoch_end(self, metadata): """Compute metrics and log them into the history object.""" epoch = metadata.get('epoch') model = self.run.network model.eval() if epoch % self.freq == 0: with torch.no_grad(): for dl_name, dl in self.run.val_dataloaders.items(): for batch, (X_batch, y_batch, timestamps_batch, _) in enumerate(dl): X_batch = X_batch.to(self.run.device).to(self.run.dtype) y_batch = y_batch.to(self.run.device).to(self.run.dtype) lookbacks = [] if self.lookbacks is None: lookbacks.append(X_batch.shape[2]) else: lookbacks = self.lookbacks for lb in lookbacks: weights = model(X_batch[:, :, -lb:, :]) for metric_name, metric in self.run.metrics.items(): metric_per_s = metric(weights, y_batch).detach().cpu().numpy() for metric_value, ts in zip(metric_per_s, timestamps_batch): self.run.history.add_entry(timestamp=ts, model='network', epoch=epoch, batch=batch, lookback=lb, dataloader=dl_name, metric=metric_name, value=metric_value)