"""Collection of layers that are using producing weight allocations."""
import cvxpy as cp
from cvxpylayers.torch import CvxpyLayer
import torch
from torch.distributions import MultivariateNormal
import torch.nn as nn
from .misc import Cov2Corr, CovarianceMatrix, KMeans
[docs]class AnalyticalMarkowitz(nn.Module):
"""Minimum variance and maximum sharpe ratio with no constraints.
There exists known analytical solutions so numerical solutions are necessary.
References
----------
[1] http://faculty.washington.edu/ezivot/econ424/portfolioTheoryMatrix.pdf
"""
[docs] def forward(self, covmat, rets=None):
"""Perform forward pass.
Parameters
----------
covmat : torch.Tensor
Covariance matrix of shape `(n_samples, n_assets, n_assets)`.
rets : torch.Tensor or None
If tensor then of shape `(n_samples, n_assets)` representing expected returns. If provided triggers
computation of maximum share ratio. Else None triggers computation of minimum variance portfolio.
Returns
-------
weights : torch.Tensor
Of shape (n_samples, n_assets) representing the optimal weights. If `rets` provided, then it represents
maximum sharpe ratio portfolio (tangency portfolio). Otherwise minimum variance portfolio.
"""
n_samples, n_assets, _ = covmat.shape
device = covmat.device
dtype = covmat.dtype
ones = torch.ones(n_samples, n_assets, 1).to(
device=device, dtype=dtype
)
if rets is not None:
expected_returns = rets.view(n_samples, n_assets, 1)
else:
expected_returns = ones
w_unscaled = torch.linalg.solve(covmat, expected_returns)
denominator = torch.matmul(ones.permute(0, 2, 1), w_unscaled)
w = w_unscaled / denominator
return w.squeeze(-1)
[docs]class NCO(nn.Module):
"""Nested cluster optimization.
This optimization algorithm performs the following steps:
1. Divide all assets into clusters
2. Run standard optimization inside of each of these clusters (intra step)
3. Run standard optimization on the resulting portfolios (inter step)
4. Compute the final weights
Parameters
----------
n_clusters : int
Number of clusters to find in the data. Note that the underlying clustering model is
KMeans - ``deepdow.layers.KMeans``.
n_init : int
Number of runs of the clustering algorithm.
init : str, {'random', 'k-means++'}
Initialization strategy of the clustering algorithm.
random_state : int or None
Random state passed to the stochastic k-means clustering.
See Also
--------
deepdow.layers.KMeans : k-means clustering algorithm
References
----------
[1] M Lopez de Prado.
"A Robust Estimator of the Efficient Frontier"
Available at SSRN 3469961, 2019
"""
def __init__(
self, n_clusters, n_init=10, init="random", random_state=None
):
super().__init__()
self.n_clusters = n_clusters
self.n_init = n_init
self.init = init
self.random_state = random_state
self.cov2corr_layer = Cov2Corr()
self.kmeans_layer = KMeans(
n_clusters=self.n_clusters,
n_init=self.n_init,
init=self.init,
random_state=self.random_state,
)
self.analytical_markowitz_layer = AnalyticalMarkowitz()
[docs] def forward(self, covmat, rets=None):
"""Perform forward pass.
Parameters
----------
covmat : torch.Tensor
Covariance matrix of shape `(n_samples, n_assets, n_assets)`.
rets : torch.Tensor or None
If tensor then of shape `(n_samples, n_assets)` representing expected returns. If provided triggers
computation of maximum share ratio. Else None triggers computation of minimum variance portfolio.
Returns
-------
weights : torch.Tensor
Of shape (n_samples, n_assets) representing the optimal weights. If `rets` provided, then
maximum sharpe ratio portfolio (tangency portfolio) used both on intra and inter cluster level. Otherwise
minimum variance portfolio.
Notes
-----
Currently there is not batching over the sample dimension - simple for loop is used.
"""
n_samples, n_assets, _ = covmat.shape
dtype, device = covmat.dtype, covmat.device
corrmat = Cov2Corr()(covmat)
w_l = (
[]
) # we need to iterate over the sample dimension (currently no speedup)
for i in range(n_samples):
cluster_ixs, cluster_centers = self.kmeans_layer(corrmat[i])
w_intra_clusters = torch.zeros(
(n_assets, self.n_clusters), dtype=dtype, device=device
)
for c in range(self.n_clusters):
in_cluster = torch.where(cluster_ixs == c)[
0
] # indices from the same cluster
intra_covmat = (
covmat[[i]]
.index_select(1, in_cluster)
.index_select(2, in_cluster)
) # (1, ?, ?)
intra_rets = (
None
if rets is None
else rets[[i]].index_select(1, in_cluster)
) # (1, ?)
w_intra_clusters[
in_cluster, c
] = self.analytical_markowitz_layer(intra_covmat, intra_rets)[
0
]
inter_covmat = w_intra_clusters.T @ (
covmat[i] @ w_intra_clusters
) # (n_clusters, n_clusters)
inter_rets = (
None
if rets is None
else (w_intra_clusters.T @ rets[i]).view(1, -1)
) # (1, n_clusters)
w_inter_clusters = self.analytical_markowitz_layer(
inter_covmat.view(1, self.n_clusters, self.n_clusters),
inter_rets,
) # (1, n_clusters)
w_final = (w_intra_clusters * w_inter_clusters).sum(
dim=1
) # (n_assets,)
w_l.append(w_final)
res = torch.stack(w_l, dim=0)
return res
[docs]class NumericalMarkowitz(nn.Module):
"""Convex optimization layer stylized into portfolio optimization problem.
Parameters
----------
n_assets : int
Number of assets.
Attributes
----------
cvxpylayer : CvxpyLayer
Custom layer used by a third party package called cvxpylayers.
References
----------
[1] https://github.com/cvxgrp/cvxpylayers
"""
def __init__(self, n_assets, max_weight=1):
"""Construct."""
super().__init__()
covmat_sqrt = cp.Parameter((n_assets, n_assets))
rets = cp.Parameter(n_assets)
alpha = cp.Parameter(nonneg=True)
w = cp.Variable(n_assets)
ret = rets @ w
risk = cp.sum_squares(covmat_sqrt @ w)
reg = alpha * (cp.norm(w) ** 2)
prob = cp.Problem(
cp.Maximize(ret - risk - reg),
[cp.sum(w) == 1, w >= 0, w <= max_weight],
)
assert prob.is_dpp()
self.cvxpylayer = CvxpyLayer(
prob, parameters=[rets, covmat_sqrt, alpha], variables=[w]
)
[docs] def forward(self, rets, covmat_sqrt, gamma_sqrt, alpha):
"""Perform forward pass.
Parameters
----------
rets : torch.Tensor
Of shape (n_samples, n_assets) representing expected returns (or whatever the feature extractor decided
to encode).
covmat_sqrt : torch.Tensor
Of shape (n_samples, n_assets, n_assets) representing the square of the covariance matrix.
gamma_sqrt : torch.Tensor
Of shape (n_samples,) representing the tradeoff between risk and return - where on efficient frontier
we are.
alpha : torch.Tensor
Of shape (n_samples,) representing how much L2 regularization is applied to weights. Note that
we pass the absolute value of this variable into the optimizer since when creating the problem
we asserted it is going to be nonnegative.
Returns
-------
weights : torch.Tensor
Of shape (n_samples, n_assets) representing the optimal weights as determined by the convex optimizer.
"""
n_samples, n_assets = rets.shape
gamma_sqrt_ = gamma_sqrt.repeat((1, n_assets * n_assets)).view(
n_samples, n_assets, n_assets
)
alpha_abs = torch.abs(alpha) # it needs to be nonnegative
return self.cvxpylayer(rets, gamma_sqrt_ * covmat_sqrt, alpha_abs)[0]
[docs]class Resample(nn.Module):
"""Meta allocator that bootstraps the input expected returns and covariance matrix.
The idea is to take the input covmat and expected returns and view them as parameters of a Multivariate
Normal distribution. After that, we iterate the below steps `n_portfolios` times:
1. Sample `n_draws` from the distribution
2. Estimate expected_returns and covariance matrix
3. Use the `allocator` to compute weights.
This will results in `n_portfolios` portfolios that we simply average to get the final weights.
Parameters
----------
allocator : AnalyticalMarkowitz or NCO or NumericalMarkowitz
Instance of an allocator.
n_draws : int or None
Number of draws. If None then set equal to number of assets to prevent numerical problems.
n_portfolios : int
Number of samples.
sqrt : bool
If True, then the input array represent the square root of the covariance matrix. Else it is the actual
covariance matrix.
random_state : int or None
Random state (forward passes with same parameters will have same results).
References
----------
[1] Michaud, Richard O., and Robert Michaud.
"Estimation error and portfolio optimization: a resampling solution."
Available at SSRN 2658657 (2007)
"""
def __init__(
self,
allocator,
n_draws=None,
n_portfolios=5,
sqrt=False,
random_state=None,
):
super().__init__()
if not isinstance(
allocator, (AnalyticalMarkowitz, NCO, NumericalMarkowitz)
):
raise TypeError(
"Unsupported type of allocator: {}".format(type(allocator))
)
self.allocator = allocator
self.sqrt = sqrt
self.n_draws = n_draws
self.n_portfolios = n_portfolios
self.random_state = random_state
mapper = {
"AnalyticalMarkowitz": False,
"NCO": True,
"NumericalMarkowitz": True,
}
self.uses_sqrt = mapper[allocator.__class__.__name__]
[docs] def forward(self, matrix, rets=None, **kwargs):
"""Perform forward pass.
Only accepts keyword arguments to avoid ambiguity.
Parameters
----------
matrix : torch.Tensor
Of shape (n_samples, n_assets, n_assets) representing the square of the covariance matrix if
`self.square=True` else the covariance matrix itself.
rets : torch.Tensor or None
Of shape (n_samples, n_assets) representing expected returns (or whatever the feature extractor decided
to encode). Note that `NCO` and `AnalyticalMarkowitz` allow for `rets=None` (using only minimum variance).
kwargs : dict
All additional input arguments the `self.allocator` needs to perform forward pass.
Returns
-------
weights : torch.Tensor
Of shape (n_samples, n_assets) representing the optimal weights.
"""
if self.random_state is not None:
torch.manual_seed(self.random_state)
n_samples, n_assets, _ = matrix.shape
dtype, device = matrix.dtype, matrix.device
n_draws = (
self.n_draws or n_assets
) # make sure that if None then we have the same N=M
covmat = matrix @ matrix if self.sqrt else matrix
dist_rets = (
torch.zeros(n_samples, n_assets, dtype=dtype, device=device)
if rets is None
else rets
)
dist = MultivariateNormal(loc=dist_rets, covariance_matrix=covmat)
portfolios = [] # n_portfolios elements of (n_samples, n_assets)
for _ in range(self.n_portfolios):
draws = dist.rsample((n_draws,)) # (n_draws, n_samples, n_assets)
rets_ = (
draws.mean(dim=0) if rets is not None else None
) # (n_samples, n_assets)
covmat_ = CovarianceMatrix(sqrt=self.uses_sqrt)(
draws.permute(1, 0, 2)
) # (n_samples, n_assets, ...)
if isinstance(self.allocator, (AnalyticalMarkowitz, NCO)):
portfolio = self.allocator(covmat=covmat_, rets=rets_)
elif isinstance(self.allocator, NumericalMarkowitz):
gamma = kwargs["gamma"]
alpha = kwargs["alpha"]
portfolio = self.allocator(rets_, covmat_, gamma, alpha)
portfolios.append(portfolio)
portfolios_t = torch.stack(
portfolios, dim=0
) # (n_portfolios, n_samples, n_assets)
return portfolios_t.mean(dim=0)
[docs]class SoftmaxAllocator(torch.nn.Module):
"""Portfolio creation by computing a softmax over the asset dimension with temperature.
Parameters
----------
temperature : None or float
If None, then needs to be provided per sample during forward pass. If ``float`` then assumed
to be always the same.
formulation : str, {'analytical', 'variational'}
Controls what way the problem is solved. If 'analytical' then using an explicit formula,
however, one cannot decide on a `max_weight` different than 1. If `variational` then solved
via convex optimization and one can set any `max_weight`.
n_assets : None or int
Only required and used if `formulation='variational`.
max_weight : float
A float between (0, 1] representing the maximum weight per asset.
"""
def __init__(
self,
temperature=1,
formulation="analytical",
n_assets=None,
max_weight=1,
):
super().__init__()
self.temperature = temperature
if formulation not in {"analytical", "variational"}:
raise ValueError("Unrecognized formulation {}".format(formulation))
if formulation == "variational" and n_assets is None:
raise ValueError(
"One needs to provide n_assets for the variational formulation."
)
if formulation == "analytical" and max_weight != 1:
raise ValueError(
"Cannot constraint weights via max_weight for analytical formulation"
)
if formulation == "variational" and n_assets * max_weight < 1:
raise ValueError(
"One cannot create fully invested portfolio with the given max_weight"
)
self.formulation = formulation
if formulation == "analytical":
self.layer = torch.nn.Softmax(dim=1)
else:
x = cp.Parameter(n_assets)
w = cp.Variable(n_assets)
obj = -x @ w - cp.sum(cp.entr(w))
cons = [cp.sum(w) == 1.0, w <= max_weight]
prob = cp.Problem(cp.Minimize(obj), cons)
self.layer = CvxpyLayer(prob, [x], [w])
[docs] def forward(self, x, temperature=None):
"""Perform forward pass.
Parameters
----------
x : torch.Tensor
Tensor of shape `(n_samples, n_assets`).
temperature : None or torch.Tensor
If None, then using the `temperature` provided at construction time. Otherwise a `torch.Tensor` of shape
`(n_samples,)` representing a per sample temperature.
Returns
-------
weights : torch.Tensor
Tensor of shape `(n_samples, n_assets`).
"""
n_samples, _ = x.shape
device, dtype = x.device, x.dtype
if not ((temperature is None) ^ (self.temperature is None)):
raise ValueError("Not clear which temperature to use")
if temperature is not None:
temperature_ = temperature # (n_samples,)
else:
temperature_ = float(self.temperature) * torch.ones(
n_samples, dtype=dtype, device=device
)
inp = x / temperature_[..., None]
return (
self.layer(inp)
if self.formulation == "analytical"
else self.layer(inp)[0]
)
[docs]class SparsemaxAllocator(torch.nn.Module):
"""Portfolio creation by computing a sparsemax over the asset dimension with temperature.
Parameters
----------
n_assets : int
Number of assets. Note that we require this quantity at construction to make sure
the underlying cvxpylayer does not need to be reinitialized every forward pass.
temperature : None or float
If None, then needs to be provided per sample during forward pass. If ``float`` then
assumed to be always the same.
max_weight : float
A float between (0, 1] representing the maximum weight per asset.
References
----------
[1] Martins, Andre, and Ramon Astudillo. "From softmax to sparsemax: A sparse model of attention
and multi-label classification." International Conference on Machine Learning. 2016.
[2] Malaviya, Chaitanya, Pedro Ferreira, and André FT Martins. "Sparse and constrained attention
for neural machine translation." arXiv preprint arXiv:1805.08241 (2018)
"""
def __init__(self, n_assets, temperature=1, max_weight=1):
super().__init__()
if n_assets * max_weight < 1:
raise ValueError(
"One cannot create fully invested portfolio with the given max_weight"
)
self.n_assets = n_assets
self.temperature = temperature
# Construct convex optimization problem
x = cp.Parameter(n_assets)
w = cp.Variable(n_assets)
obj = cp.sum_squares(x - w)
cons = [cp.sum(w) == 1, 0.0 <= w, w <= max_weight]
prob = cp.Problem(cp.Minimize(obj), cons)
self.layer = CvxpyLayer(prob, parameters=[x], variables=[w])
[docs] def forward(self, x, temperature=None):
"""Perform forward pass.
Parameters
----------
x : torch.Tensor
Tensor of shape `(n_samples, n_assets`).
temperature : None or torch.Tensor
If None, then using the `temperature` provided at construction time. Otherwise a
`torch.Tensor` of shape `(n_samples,)` representing a per sample temperature.
Returns
-------
weights : torch.Tensor
Tensor of shape `(n_samples, n_assets`).
"""
n_samples, _ = x.shape
device, dtype = x.device, x.dtype
if not ((temperature is None) ^ (self.temperature is None)):
raise ValueError("Not clear which temperature to use")
if temperature is not None:
temperature_ = temperature # (n_samples,)
else:
temperature_ = float(self.temperature) * torch.ones(
n_samples, dtype=dtype, device=device
)
inp = x / temperature_[..., None]
return self.layer(inp)[0]
[docs]class WeightNorm(torch.nn.Module):
"""Allocation via weight normalization.
We learn a single weight for each asset and make sure that they sum up to one.
"""
def __init__(self, n_assets):
super().__init__()
self.asset_weights = torch.nn.Parameter(
torch.ones(n_assets), requires_grad=True
)
[docs] def forward(self, x):
"""Perform forward pass.
Parameters
----------
x : torch.Tensor
Tensor of shape `(n_samples, dim_1, ...., dim_N)`.
Returns
-------
weights : torch.Tensor
Tensor of shape `(n_samples, n_assets`).
"""
n_samples = x.shape[0]
clamped = torch.clamp(self.asset_weights, min=0)
normalized = clamped / clamped.sum()
return torch.stack(n_samples * [normalized], dim=0)
[docs]class NumericalRiskBudgeting(nn.Module):
"""Convex optimization layer stylized into portfolio optimization problem.
Parameters
----------
n_assets : int
Number of assets.
Attributes
----------
cvxpylayer : CvxpyLayer
Custom layer used by a third party package called cvxpylayers.
References
----------
[1] https://github.com/cvxgrp/cvxpylayers
[2] https://papers.ssrn.com/sol3/papers.cfm?abstract_id=2297383
[3] https://mpra.ub.uni-muenchen.de/37749/2/MPRA_paper_37749.pdf
"""
def __init__(self, n_assets, max_weight=1):
"""Construct."""
super().__init__()
covmat_sqrt = cp.Parameter((n_assets, n_assets))
b = cp.Parameter(n_assets, nonneg=True)
w = cp.Variable(n_assets)
term_1 = 0.5 * cp.sum_squares(covmat_sqrt @ w)
term_2 = b @ cp.log(w)
objective = cp.Minimize(term_1 - term_2) # refer [2]
constraint = [cp.sum(w) == 1, w >= 0, w <= max_weight] # refer [2]
prob = cp.Problem(objective, constraint)
assert prob.is_dpp()
self.cvxpylayer = CvxpyLayer(
prob, parameters=[covmat_sqrt, b], variables=[w]
)
[docs] def forward(self, covmat_sqrt, b):
"""Perform forward pass.
Parameters
----------
covmat : torch.Tensor
Of shape (n_samples, n_assets, n_assets) representing the covariance matrix.
b : torch.Tensor
Of shape (n_samples, n_assets) representing the budget,
risk contribution from each component (asset) is equal to the budget, refer [3]
Returns
-------
weights : torch.Tensor
Of shape (n_samples, n_assets) representing the optimal weights as determined by the convex optimizer.
"""
return self.cvxpylayer(covmat_sqrt, b)[0]