Source code for deepdow.utils

"""Collection of utilities and helpers."""
import os
import pathlib

import numpy as np
import pandas as pd


[docs]class ChangeWorkingDirectory: """Context manager that changes current working directory. Parameters ---------- directory : str or pathlib.Path or None The new working directory. If None then staying in the current one. Attributes ---------- _previous : pathlib.Path The original working directory we want to return to after exiting the context manager. """ def __init__(self, directory): self.directory = ( pathlib.Path(directory) if directory is not None else pathlib.Path.cwd() ) if not self.directory.is_dir(): raise NotADirectoryError( "{} is not a directory".format(str(self.directory)) ) self._previous = pathlib.Path.cwd() def __enter__(self): """Change directory.""" os.chdir(str(self.directory)) def __exit__(self, exc_type, exc_val, exc_tb): """Go bach to the original directory.""" os.chdir(str(self._previous))
[docs]class PandasChecks: """General checks for pandas objects."""
[docs] @staticmethod def check_no_gaps(index): """Check if a time index has no gaps. Parameters ---------- index : pd.DatetimeIndex Time index to be checked for gaps. Raises ------ TypeError If inconvenient type. IndexError If there is a gap. """ if not isinstance(index, pd.DatetimeIndex): raise TypeError("Unsupported type: {}".format(type(index))) correct_index = pd.date_range( index[0], periods=len(index), freq=index.freq ) if not correct_index.equals(index): raise IndexError("Index has gaps.")
[docs] @staticmethod def check_valid_entries(table): """Check if input table has no nan or +-inf entries. Parameters ---------- table : pd.Series or pd.DataFrame Input table. Raises ------ TypeError Inappropriate type of `table`. ValueError At least one entry invalid. """ if not isinstance(table, (pd.Series, pd.DataFrame)): raise TypeError("Unsupported type: {}".format(type(table))) if not np.all(np.isfinite(table.values)): raise ValueError("There is an invalid entry")
[docs] @staticmethod def check_indices_agree(*frames): """Check if inputs are pd.Series or pd.DataFrame with same indices / columns. Parameters ---------- frames : list Elements are either `pd.Series` or `pd.DataFrame`. Raises ------ TypeError If elements are not `pd.Series` or `pd.DataFrame`. IndexError If indices/colums do not agree. """ if not all([isinstance(x, (pd.Series, pd.DataFrame)) for x in frames]): raise TypeError("Some elements are not pd.Series or pd.DataFrame") reference_index = frames[0].index for i, f in enumerate(frames): if not f.index.equals(reference_index): raise IndexError( "The {} entry has wrong index: {}".format(i, f.index) ) if isinstance(f, pd.DataFrame) and not f.columns.equals( reference_index ): raise IndexError( "The {} entry has wrong columns: {}".format(i, f.columns) )
[docs]def prices_to_returns(prices, use_log=True): """Convert prices to returns. Parameters ---------- prices : pd.DataFrame Rows represent different time points and the columns represent different assets. Note that the columns can also be a ``pd.MultiIndex``. use_log : bool If True, then logarithmic returns are used (natural logarithm). If False, then simple returns. Returns ------- returns : pd.DataFrame Returns per asset per period. The first period is deleted. """ # checks if use_log: values = np.log(prices.values) - np.log(prices.shift(1).values) else: values = (prices.values - prices.shift(1).values) / prices.shift( 1 ).values return pd.DataFrame( values[1:, :], index=prices.index[1:], columns=prices.columns )
[docs]def returns_to_Xy(returns, lookback=10, horizon=10, gap=0): """Create a deep learning dataset (in memory). Parameters ---------- returns : pd.DataFrame Returns where columns represent assets and rows timestamps. The last row is the most recent. lookback : int Number of timesteps to include in the features. horizon : int Number of timesteps to inclued in the label. gap : int Integer representing the number of time periods one cannot act after observing the features. Returns ------- X : np.ndarray Array of shape `(N, 1, lookback, n_assets)`. Generated out of the entire dataset. timestamps : pd.DateTimeIndex Index corresponding to the feature matrix `X`. y : np.ndarray Array of shape `(N, 1, horizon, n_assets)`. Generated out of the entire dataset. """ n_timesteps = len(returns.index) if lookback >= n_timesteps - horizon - gap + 1: raise ValueError("Not enough timesteps to extract X and y.") X_list = [] timestamps_list = [] y_list = [] for i in range(lookback, n_timesteps - horizon - gap + 1): X_list.append(returns.iloc[i - lookback : i, :].values) timestamps_list.append(returns.index[i - 1]) y_list.append(returns.iloc[i + gap : i + gap + horizon, :].values) X = np.array(X_list) timestamps = pd.DatetimeIndex(timestamps_list, freq=returns.index.freq) y = np.array(y_list) return X[:, np.newaxis, :, :], timestamps, y[:, np.newaxis, :, :]
[docs]def raw_to_Xy( raw_data, lookback=10, horizon=10, gap=0, freq="B", included_assets=None, included_indicators=None, use_log=True, ): """Convert raw data to features. Parameters ---------- raw_data : pd.DataFrame Rows represents different timestamps stored in index. Note that there can be gaps. Columns are pd.MultiIndex with the zero level being assets and the first level indicator. lookback : int Number of timesteps to include in the features. horizon : int Number of timesteps to included in the label. gap : int Integer representing the number of time periods one cannot act after observing the features. freq : str Periodicity of the data. included_assets : None or list Assets to be included. If None then all available. included_indicators : None or list Indicators to be included. If None then all available. use_log : bool If True, then logarithmic returns are used (natural logarithm). If False, then simple returns. Returns ------- X : np.ndarray Feature array of shape `(n_samples, n_indicators, lookback, n_assets)`. timestamps : pd.DateTimeIndex Per row timestamp of shape length `n_samples`. y : np.ndarray Targets arra of shape `(n_samples, n_indicators, horizon, n_assets)`. asset_names : list Names of assets. indicators : list List of indicators. """ if freq is None: raise ValueError("Frequency freq needs to be specified.") asset_names = ( included_assets if included_assets is not None else raw_data.columns.levels[0].to_list() ) indicators = ( included_indicators if included_indicators is not None else raw_data.columns.levels[1].to_list() ) index = pd.date_range( start=raw_data.index[0], end=raw_data.index[-1], freq=freq ) new = pd.DataFrame(raw_data, index=index).ffill().bfill() to_exclude = [] for a in asset_names: is_valid = np.all(np.isfinite(new[a])) and np.all(new[a] > 0) if not is_valid: to_exclude.append(a) asset_names = sorted(list(set(asset_names) - set(to_exclude))) absolute = new.iloc[:, new.columns.get_level_values(0).isin(asset_names)][ asset_names ] # sort absolute = absolute.iloc[ :, absolute.columns.get_level_values(1).isin(indicators) ] returns = prices_to_returns(absolute, use_log=use_log) X_list = [] y_list = [] for ind in indicators: X, timestamps, y = returns_to_Xy( returns.xs(ind, axis=1, level=1), lookback=lookback, horizon=horizon, gap=gap, ) X_list.append(X) y_list.append(y) X = np.concatenate(X_list, axis=1) y = np.concatenate(y_list, axis=1) return X, timestamps, y, asset_names, indicators