Source code for deepdow.layers.misc

"""miscellaneous layers."""

import torch
import torch.nn as nn


[docs]class Cov2Corr(nn.Module): """Conversion from covariance matrix to correlation matrix."""
[docs] def forward(self, covmat): """Convert. Parameters ---------- covmat : torch.Tensor Covariance matrix of shape (n_samples, n_assets, n_assets). Returns ------- corrmat : torch.Tensor Correlation matrix of shape (n_samples, n_assets, n_assets). """ n_samples, n_assets, _ = covmat.shape stds = torch.sqrt(torch.diagonal(covmat, dim1=1, dim2=2)) stds_ = stds.view(n_samples, n_assets, 1) corr = covmat / torch.matmul(stds_, stds_.permute(0, 2, 1)) return corr
[docs]class CovarianceMatrix(nn.Module): """Covariance matrix or its square root. Parameters ---------- sqrt : bool If True, then returning the square root. shrinkage_strategy : None or {'diagonal', 'identity', 'scaled_identity'} Strategy of combining the sample covariance matrix with some more stable matrix. shrinkage_coef : float or None If ``float`` then in the range [0, 1] representing the weight of the convex combination. If `shrinkage_coef=1` then using purely the sample covariance matrix. If `shrinkage_coef=0` then using purely the stable matrix. If None then needs to be provided dynamically when performing forward pass. """ def __init__( self, sqrt=True, shrinkage_strategy="diagonal", shrinkage_coef=0.5 ): """Construct.""" super().__init__() self.sqrt = sqrt if shrinkage_strategy is not None: if shrinkage_strategy not in { "diagonal", "identity", "scaled_identity", }: raise ValueError( "Unrecognized shrinkage strategy {}".format( shrinkage_strategy ) ) self.shrinkage_strategy = shrinkage_strategy self.shrinkage_coef = shrinkage_coef
[docs] def forward(self, x, shrinkage_coef=None): """Perform forward pass. Parameters ---------- x : torch.Tensor Of shape (n_samples, dim, n_assets). The middle dimension `dim` represents the observations we compute the covariance matrix over. shrinkage_coef : None or torch.Tensor If None then using the `self.shrinkage_coef` supplied at construction for each sample. Otherwise a tensor of shape `(n_shapes,)`. Returns ------- covmat : torch.Tensor Of shape (n_samples, n_assets, n_assets). """ n_samples = x.shape[0] dtype, device = x.dtype, x.device if not ((shrinkage_coef is None) ^ (self.shrinkage_coef is None)): raise ValueError("Not clear which shrinkage coefficient to use") if shrinkage_coef is not None: shrinkage_coef_ = shrinkage_coef # (n_samples,) else: shrinkage_coef_ = self.shrinkage_coef * torch.ones( n_samples, dtype=dtype, device=device ) wrapper = self.compute_sqrt if self.sqrt else lambda h: h return torch.stack( [ wrapper( self.compute_covariance( x[i].T.clone(), shrinkage_strategy=self.shrinkage_strategy, shrinkage_coef=shrinkage_coef_[i], ) ) for i in range(n_samples) ], dim=0, )
[docs] @staticmethod def compute_covariance(m, shrinkage_strategy=None, shrinkage_coef=0.5): """Compute covariance matrix for a single sample. Parameters ---------- m : torch.Tensor Of shape (n_assets, n_channels). shrinkage_strategy : None or {'diagonal', 'identity', 'scaled_identity'} Strategy of combining the sample covariance matrix with some more stable matrix. shrinkage_coef : torch.Tensor A ``torch.Tensor`` scalar (probably in the range [0, 1]) representing the weight of the convex combination. Returns ------- covmat_single : torch.Tensor Covariance matrix of shape (n_assets, n_assets). """ fact = 1.0 / (m.size(1) - 1) m -= torch.mean(m, dim=1, keepdim=True) # !!!!!!!!!!! INPLACE mt = m.t() s = fact * m.matmul(mt) # sample covariance matrix if shrinkage_strategy is None: return s elif shrinkage_strategy == "identity": identity = torch.eye(len(s), device=s.device, dtype=s.dtype) return shrinkage_coef * s + (1 - shrinkage_coef) * identity elif shrinkage_strategy == "scaled_identity": identity = torch.eye(len(s), device=s.device, dtype=s.dtype) scaled_identity = identity * torch.diag(s).mean() return shrinkage_coef * s + (1 - shrinkage_coef) * scaled_identity elif shrinkage_strategy == "diagonal": diagonal = torch.diag(torch.diag(s)) return shrinkage_coef * s + (1 - shrinkage_coef) * diagonal
[docs] @staticmethod def compute_sqrt(m): """Compute the square root of a single positive definite matrix. Parameters ---------- m : torch.Tensor Tensor of shape `(n_assets, n_assets)` representing the covariance matrix - needs to be PSD. Returns ------- m_sqrt : torch.Tensor Tensor of shape `(n_assets, n_assets)` representing the square root of the covariance matrix. """ _, s, v = m.svd() good = ( s > s.max(-1, True).values * s.size(-1) * torch.finfo(s.dtype).eps ) components = good.sum(-1) common = components.max() unbalanced = common != components.min() if common < s.size(-1): s = s[..., :common] # pragma: no cover v = v[..., :common] # pragma: no cover if unbalanced: # pragma: no cover good = good[..., :common] # pragma: no cover if unbalanced: s = s.where( good, torch.zeros((), device=s.device, dtype=s.dtype) ) # pragma: no cover return (v * s.sqrt().unsqueeze(-2)) @ v.transpose(-2, -1)
[docs]class KMeans(torch.nn.Module): """K-means algorithm. Parameters ---------- n_clusters : int Number of clusters to look for. init : str, {'random, 'k-means++', 'manual'} How to initialize the clusters at the beginning of the algorithm. n_init : int Number of times the algorithm is run. The best clustering is determined based on the potential (sum of distances of all points to the centroids). max_iter : int Maximum number of iterations of the algorithm. Note that if `norm(new_potential - old_potential) < tol` then stop prematurely. tol : float If `abs(new_potential - old_potential) < tol` then algorithm stopped irrespective of the `max_iter`. random_state : int or None Setting randomness. verbose : bool Control level of verbosity. """ def __init__( self, n_clusters=5, init="random", n_init=1, max_iter=30, tol=1e-5, random_state=None, verbose=False, ): super().__init__() self.n_clusters = n_clusters self.init = init self.n_init = n_init self.max_iter = max_iter self.tol = tol self.random_state = random_state self.verbose = verbose if self.init not in {"manual", "random", "k-means++"}: raise ValueError( "Unrecognized initialization {}".format(self.init) )
[docs] def initialize(self, x, manual_init=None): """Initialize the k-means algorithm. Parameters ---------- x : torch.Tensor Feature matrix of shape `(n_samples, n_features)`. manual_init : None or torch.Tensor If not None then expecting a tensor of shape `(n_clusters, n_features)`. Note that for this feature to be used one needs to set `init='manual'` in the constructor. Returns ------- cluster_centers : torch.Tensor Tensor of shape `(n_clusters, n_features)` representing the initial cluster centers. """ n_samples, n_features = x.shape device, dtype = x.device, x.dtype # Note that normalization to probablities is done automatically within torch.multinomial if self.init == "random": p = torch.ones(n_samples, dtype=dtype, device=device) # centroid_samples = torch.randperm(n_samples).to(device=device)[:self.n_clusters] centroid_samples = torch.multinomial( p, num_samples=self.n_clusters, replacement=False ) cluster_centers = x[centroid_samples] elif self.init == "k-means++": p = torch.ones(n_samples, dtype=dtype, device=device) cluster_centers_l = [] centroid_samples_l = [] while len(cluster_centers_l) < self.n_clusters: centroid_sample = torch.multinomial( p, num_samples=1, replacement=False ) if centroid_sample in centroid_samples_l: continue # pragma: no cover centroid_samples_l.append(centroid_sample) cluster_center = x[[centroid_sample]] # (1, n_features) cluster_centers_l.append(cluster_center) p = self.compute_distances(x, cluster_center).view(-1) cluster_centers = torch.cat(cluster_centers_l, dim=0) elif self.init == "manual": if not torch.is_tensor(manual_init): raise TypeError("The manual_init needs to be a torch.Tensor") if manual_init.shape[0] != self.n_clusters: raise ValueError( "The number of manually provided cluster centers is different from n_clusters" ) if manual_init.shape[1] != x.shape[1]: raise ValueError( "The feature size of manually provided cluster centers is different from the input" ) cluster_centers = manual_init.to(dtype=dtype, device=device) return cluster_centers
[docs] def forward(self, x, manual_init=None): """Perform clustering. Parameters ---------- x : torch.Tensor Feature matrix of shape `(n_samples, n_features)`. manual_init : None or torch.Tensor If not None then expecting a tensor of shape `(n_clusters, n_features)`. Note that for this feature to be used one needs to set `init='manual'` in the constructor. Returns ------- cluster_ixs : torch.Tensor 1D array of lenght `n_samples` representing to what cluster each sample belongs. cluster_centers : torch.tensor Tensor of shape `(n_clusters, n_features)` representing the cluster centers. """ n_samples, n_features = x.shape if n_samples < self.n_clusters: raise ValueError( "The number of samples is lower than the number of clusters." ) if self.random_state is not None: torch.manual_seed(self.random_state) lowest_potential = float("inf") lowest_potential_cluster_ixs = None lowest_potential_cluster_centers = None for run in range(self.n_init): cluster_centers = self.initialize(x, manual_init=manual_init) previous_potential = float("inf") for it in range(self.max_iter): distances = self.compute_distances( x, cluster_centers ) # (n_samples, n_clusters) # E step cluster_ixs = torch.argmin(distances, dim=1) # (n_samples,) # M step cluster_centers = torch.stack( [ x[cluster_ixs == i].mean(dim=0) for i in range(self.n_clusters) ], dim=0, ) # stats current_potential = distances.gather( 1, cluster_ixs.view(-1, 1) ).sum() if ( abs(current_potential - previous_potential) < self.tol or it == self.max_iter - 1 ): if self.verbose: print( "Run: {}, n_iters: {}, stop_early: {}, potential: {:.3f}".format( run, it, it != self.max_iter - 1, current_potential, ) ) break previous_potential = current_potential if current_potential < lowest_potential: lowest_potential = current_potential lowest_potential_cluster_ixs = cluster_ixs.clone() lowest_potential_cluster_centers = cluster_centers.clone() if self.verbose: print("Lowest potential: {}".format(lowest_potential)) return lowest_potential_cluster_ixs, lowest_potential_cluster_centers
[docs] @staticmethod def compute_distances(x, cluster_centers): """Compute squared distances of samples to cluster centers. Parameters ---------- x : torch.tensor Tensor of shape `(n_samples, n_features)`. cluster_centers : torch.tensor Tensor of shape `(n_clusters, n_features)`. Returns ------- distances : torch.tensor Tensor of shape `(n_samples, n_clusters)` that provides for each sample (row) the squared distance to a given cluster center (column). """ x_n = (x**2).sum(dim=1).view(-1, 1) # (n_samples, 1) c_n = (cluster_centers**2).sum(dim=1).view(1, -1) # (1, n_clusters) distances = ( x_n + c_n - 2 * torch.mm(x, cluster_centers.permute(1, 0)) ) # (n_samples, n_clusters) return torch.clamp(distances, min=0)
[docs]class MultiplyByConstant(torch.nn.Module): """Multiplying constant. Parameters ---------- dim_size : int Number of input channels. We learn one constant per channel. Therefore `dim_size=n_trainable_parameters`. dim_ix : int Which dimension to apply the multiplication to. """ def __init__(self, dim_size=1, dim_ix=1): super().__init__() self.dim_size = dim_size self.dim_ix = dim_ix self.constant = torch.nn.Parameter( torch.ones(self.dim_size), requires_grad=True )
[docs] def forward(self, x): """Perform forward pass. Parameters ---------- x : torch.Tensor N-dimensional tensor of shape (d_0, d_1, ..., d_{N-1}) Returns ------- weights : torch.Torch Tensor of shape (d_0, d_1, ..., d_{N-1}). """ if self.dim_size != x.shape[self.dim_ix]: raise ValueError( "The size of dimension {} is {} which is different than {}".format( self.dim_ix, x.shape[self.dim_ix], self.dim_size ) ) view = [ self.dim_size if i == self.dim_ix else 1 for i in range(x.ndim) ] return x * self.constant.view(view)