Source code for deepdow.utils

"""Collection of utilities and helpers."""
import os
import pathlib

import numpy as np
import pandas as pd


[docs]class ChangeWorkingDirectory: """Context manager that changes current working directory. Parameters ---------- directory : str or pathlib.Path or None The new working directory. If None then staying in the current one. Attributes ---------- _previous : pathlib.Path The original working directory we want to return to after exiting the context manager. """ def __init__(self, directory): self.directory = pathlib.Path(directory) if directory is not None else pathlib.Path.cwd() if not self.directory.is_dir(): raise NotADirectoryError('{} is not a directory'.format(str(self.directory))) self._previous = pathlib.Path.cwd() def __enter__(self): """Change directory.""" os.chdir(str(self.directory)) def __exit__(self, exc_type, exc_val, exc_tb): """Go bach to the original directory.""" os.chdir(str(self._previous))
[docs]class PandasChecks: """General checks for pandas objects."""
[docs] @staticmethod def check_no_gaps(index): """Check if a time index has no gaps. Parameters ---------- index : pd.DatetimeIndex Time index to be checked for gaps. Raises ------ TypeError If inconvenient type. IndexError If there is a gap. """ if not isinstance(index, pd.DatetimeIndex): raise TypeError('Unsupported type: {}'.format(type(index))) correct_index = pd.date_range(index[0], periods=len(index), freq=index.freq) if not correct_index.equals(index): raise IndexError('Index has gaps.')
[docs] @staticmethod def check_valid_entries(table): """Check if input table has no nan or +-inf entries. Parameters ---------- table : pd.Series or pd.DataFrame Input table. Raises ------ TypeError Inappropriate type of `table`. ValueError At least one entry invalid. """ if not isinstance(table, (pd.Series, pd.DataFrame)): raise TypeError('Unsupported type: {}'.format(type(table))) if not np.all(np.isfinite(table.values)): raise ValueError('There is an invalid entry')
[docs] @staticmethod def check_indices_agree(*frames): """Check if inputs are pd.Series or pd.DataFrame with same indices / columns. Parameters ---------- frames : list Elements are either `pd.Series` or `pd.DataFrame`. Raises ------ TypeError If elements are not `pd.Series` or `pd.DataFrame`. IndexError If indices/colums do not agree. """ if not all([isinstance(x, (pd.Series, pd.DataFrame)) for x in frames]): raise TypeError('Some elements are not pd.Series or pd.DataFrame') reference_index = frames[0].index for i, f in enumerate(frames): if not f.index.equals(reference_index): raise IndexError('The {} entry has wrong index: {}'.format(i, f.index)) if isinstance(f, pd.DataFrame) and not f.columns.equals(reference_index): raise IndexError('The {} entry has wrong columns: {}'.format(i, f.columns))
[docs]def prices_to_returns(prices, use_log=True): """Convert prices to returns. Parameters ---------- prices : pd.DataFrame Rows represent different time points and the columns represent different assets. Note that the columns can also be a ``pd.MultiIndex``. use_log : bool If True, then logarithmic returns are used (natural logarithm). If False, then simple returns. Returns ------- returns : pd.DataFrame Returns per asset per period. The first period is deleted. """ # checks if use_log: values = np.log(prices.values) - np.log(prices.shift(1).values) else: values = (prices.values - prices.shift(1).values) / prices.shift(1).values return pd.DataFrame(values[1:, :], index=prices.index[1:], columns=prices.columns)
[docs]def returns_to_Xy(returns, lookback=10, horizon=10, gap=0): """Create a deep learning dataset (in memory). Parameters ---------- returns : pd.DataFrame Returns where columns represent assets and rows timestamps. The last row is the most recent. lookback : int Number of timesteps to include in the features. horizon : int Number of timesteps to inclued in the label. gap : int Integer representing the number of time periods one cannot act after observing the features. Returns ------- X : np.ndarray Array of shape `(N, 1, lookback, n_assets)`. Generated out of the entire dataset. timestamps : pd.DateTimeIndex Index corresponding to the feature matrix `X`. y : np.ndarray Array of shape `(N, 1, horizon, n_assets)`. Generated out of the entire dataset. """ n_timesteps = len(returns.index) if lookback >= n_timesteps - horizon - gap + 1: raise ValueError('Not enough timesteps to extract X and y.') X_list = [] timestamps_list = [] y_list = [] for i in range(lookback, n_timesteps - horizon - gap + 1): X_list.append(returns.iloc[i - lookback: i, :].values) timestamps_list.append(returns.index[i - 1]) y_list.append(returns.iloc[i + gap: i + gap + horizon, :].values) X = np.array(X_list) timestamps = pd.DatetimeIndex(timestamps_list, freq=returns.index.freq) y = np.array(y_list) return X[:, np.newaxis, :, :], timestamps, y[:, np.newaxis, :, :]
[docs]def raw_to_Xy(raw_data, lookback=10, horizon=10, gap=0, freq='B', included_assets=None, included_indicators=None, use_log=True): """Convert raw data to features. Parameters ---------- raw_data : pd.DataFrame Rows represents different timestamps stored in index. Note that there can be gaps. Columns are pd.MultiIndex with the zero level being assets and the first level indicator. lookback : int Number of timesteps to include in the features. horizon : int Number of timesteps to included in the label. gap : int Integer representing the number of time periods one cannot act after observing the features. freq : str Periodicity of the data. included_assets : None or list Assets to be included. If None then all available. included_indicators : None or list Indicators to be included. If None then all available. use_log : bool If True, then logarithmic returns are used (natural logarithm). If False, then simple returns. Returns ------- X : np.ndarray Feature array of shape `(n_samples, n_indicators, lookback, n_assets)`. timestamps : pd.DateTimeIndex Per row timestamp of shape length `n_samples`. y : np.ndarray Targets arra of shape `(n_samples, n_indicators, horizon, n_assets)`. asset_names : list Names of assets. indicators : list List of indicators. """ if freq is None: raise ValueError('Frequency freq needs to be specified.') asset_names = included_assets if included_assets is not None else raw_data.columns.levels[0].to_list() indicators = included_indicators if included_indicators is not None else raw_data.columns.levels[1].to_list() index = pd.date_range(start=raw_data.index[0], end=raw_data.index[-1], freq=freq) new = pd.DataFrame(raw_data, index=index).ffill().bfill() to_exclude = [] for a in asset_names: is_valid = np.all(np.isfinite(new[a])) and np.all(new[a] > 0) if not is_valid: to_exclude.append(a) asset_names = sorted(list(set(asset_names) - set(to_exclude))) absolute = new.iloc[:, new.columns.get_level_values(0).isin(asset_names)][asset_names] # sort absolute = absolute.iloc[:, absolute.columns.get_level_values(1).isin(indicators)] returns = prices_to_returns(absolute, use_log=use_log) X_list = [] y_list = [] for ind in indicators: X, timestamps, y = (returns_to_Xy(returns.xs(ind, axis=1, level=1), lookback=lookback, horizon=horizon, gap=gap)) X_list.append(X) y_list.append(y) X = np.concatenate(X_list, axis=1) y = np.concatenate(y_list, axis=1) return X, timestamps, y, asset_names, indicators