import copy
import torch
import numpy as np
import torch.nn as nn
# Define the base neural network class
class BaseNN():
'''
Base class for neural networks.
:meta private:
'''
def __init__(self):
'''
Initialize the base neural network class.
INPUT:
- None.
OUTPUT:
- None.
'''
self.set_regularizer() # Define the regularization
def set_regularizer(self):
'''
Set the regularization according to the parameter 'reg_type' (default: no regularization).
INPUT:
- None.
OUTPUT:
- None.
'''
#Two available regularization types: l1, l2, and l1_l2
self.reg = torch.tensor(self.params['reg']).to(self.dev)
#Eventually, apply l1 regularization
if self.params['reg_type'] == 'l1':
def l1_model_reg(model):
regularization_loss = torch.tensor(0.0).to(self.dev)
for param in model.parameters():
regularization_loss += torch.norm(param, 1)
return self.reg*regularization_loss
self.regularizer = l1_model_reg
#Eventually, apply l2 regularization
elif self.params['reg_type'] == 'l2':
def l2_model_reg(model):
regularization_loss = torch.tensor(0.0).to(self.dev)
for param in model.parameters():
regularization_loss += torch.norm(param, 2)
return self.reg*regularization_loss
self.regularizer = l2_model_reg
#Eventually, apply l1_l2 regularization
elif self.params['reg_type'] == 'l1_l2':
def l1_l2_model_reg(model):
l1_loss, l2_loss = torch.tensor(0.0).to(self.dev), torch.tensor(0.0).to(self.dev)
for param in model.parameters():
l1_loss += torch.norm(param, 1)
l2_loss += torch.norm(param, 2)
return self.reg[0]*l1_loss + self.reg[1]*l2_loss
self.regularizer = l1_l2_model_reg
#Eventually, no regularization is applied
else:
def no_reg(model):
return torch.tensor(0.0)
self.regularizer = no_reg
def set_optimizer(self):
'''
Set the optimizer according to the parameter 'optimizer' (default: Adam).
INPUT:
- None.
OUTPUT:
- None.
'''
#Two available optimizers: Adam, RMSprop
temp = self.params['optimizer'].lower() if type(self.params['optimizer']) == str else None
if temp.lower() == 'adam':
from torch.optim import Adam
self.opt = Adam(self.mdl.parameters(), self.params['lr'])
elif temp.lower() == 'rmsprop':
from torch.optim import RMSprop
self.opt = RMSprop(self.mdl.parameters(), self.params['lr'])
else:
raise ValueError(f"Optimizer {temp} not recognized")
def train_single_batch(self, x_train, y_train):
'''
Training with full batch.
INPUT:
- x_train: torch.Tensor
model's input of shape (batch_size, n_features).
- y_train: torch.Tensor
model's output of shape (batch_size, 1).
OUTPUT:
- None.
'''
self.mdl.train() #Set the model in training mode
self.opt.zero_grad() # Zero the gradients
outputs = self.mdl(x_train) # Forward pass
loss = self.loss(outputs, y_train) + self.regularizer(self.mdl)
loss.backward() # Backpropagation
self.opt.step() # Update weights
self.train_loss.append(loss.item()) #Save the training loss
def train_multi_batch(self, x_train, y_train, indices):
'''
Training with multiple batches.
INPUT:
- x_train: torch.Tensor
model's input of shape (batch_size, n_features).
- y_train: torch.Tensor
model's output of shape (batch_size, 1).
- indices: torch.Tensor
list of indices (range(batch_size)).
OUTPUT:
- None.
'''
self.mdl.train() #Set the model in training mode
#Prepare batch training
total_loss = 0.0
indices = indices[torch.randperm(indices.size(0))] #Shuffle the indices
# Training
for i in range(0, len(indices), self.params['batch_size']):
#Construct the batch
batch_indices = indices[i:i+self.params['batch_size']] #Select the indices
x_batch = x_train[batch_indices] #Select the batch
y_batch = y_train[batch_indices]
self.opt.zero_grad() # Zero the gradients
outputs = self.mdl(x_batch) # Forward pass
loss = self.loss(outputs, y_batch) + self.regularizer(self.mdl)
loss.backward() # Backpropagation
self.opt.step() # Update weights
total_loss += loss.item()
total_loss /= np.ceil(len(indices) / self.params['batch_size'])
self.train_loss.append(total_loss) #Save the training loss
def early_stopping(self, curr_loss):
'''
Early stopping function.
INPUT:
- curr_loss: float,
current loss.
OUTPUT:
- bool,
True if early stopping is satisfied, False otherwise.
'''
if curr_loss < self.best_loss:
self.best_loss = curr_loss #Save the best loss
self.best_model = copy.deepcopy(self.mdl.state_dict()) #Save the best model
self.patience = 0 #Reset the patience counter
else:
self.patience += 1
#Check if I have to exit
if self.patience > self.params['patience']:
output = True
else:
output = False
return output
def fit(self, x_train, y_train, x_val=None, y_val=None):
'''
Fit the model. With early stopping and batch training.
INPUT:
- x_train: torch.Tensor
model's train input of shape (batch_size, n_features).
- y_train: torch.Tensor
model's train output of shape (batch_size, 1).
- x_val: torch.Tensor, optional
model's validation input of shape (batch_size, n_features). Default is None.
- y_val: torch.Tensor, optional
model's validation output of shape (batch_size, 1). Default is None.
OUTPUT:
- None.
'''
#Initialize the best model
self.best_model = self.mdl.state_dict()
self.best_loss = np.inf
#Initialize the patience counter
self.patience = 0
#Initialize the training and validation losses
self.train_loss = []
if isinstance(x_val, torch.Tensor):
self.val_loss = []
else:
self.val_loss = None
#Understand if I'm using single or multiple batches
single_batch = (self.params['batch_size'] == -1) or\
(self.params['batch_size'] >= x_train.shape[0])
# Eventually, create the train indices list
if not single_batch: indices = torch.arange(x_train.shape[0])
# Set the verbosity if it is provided as a boolean:
if isinstance(self.verbose, bool):
self.verbose = int(self.verbose) - 1
#Train the model
if self.verbose >= 0: #If verbose is True, then use the progress bar
from tqdm.auto import tqdm
it_base = tqdm(range(self.params['n_epochs']), desc='Training the network') #Create the progress bar
else: #Otherwise, use the standard iterator
it_base = range(self.params['n_epochs']) #Create the iterator
for epoch in it_base:
if (epoch==0) and (self.verbose > 0):
print_base = '{:<10}{:<15}{:<15}'
print(print_base.format('Epoch', 'Train Loss', 'Val Loss'))
# Training
if single_batch:
self.train_single_batch(x_train, y_train)
else:
self.train_multi_batch(x_train, y_train, indices)
# If Validation
if isinstance(x_val, torch.Tensor):
self.mdl.eval() #Set the model in evaluation mode
with torch.no_grad():
val_loss = self.loss(self.mdl(x_val), y_val) #Compute the validation loss
self.val_loss.append(val_loss.item()) #Save the validation loss
# Update best model and eventually early stopping
if self.early_stopping(val_loss):
if self.verbose >= 0: print(f"Early stopping at epoch {epoch+1}")
break
else: #Otherwise
# Update best model and eventually early stopping
if self.early_stopping(self.train_loss[-1]):
if self.verbose >= 0: print(f"Early stopping at epoch {epoch+1}")
break
# Eventually, print the losses
if self.verbose > 0:
if (epoch+1) % self.verbose == 0:
print(print_base.format(epoch+1,
format(self.train_loss[-1], '.20f')[:10],
format(self.val_loss[-1], '.20f')[:10] if isinstance(x_val, torch.Tensor) else '-'))
#Load the best model
self.mdl.load_state_dict(self.best_model)
def plot_losses(self, yscale='log'):
'''
Plot the training loss and, eventually, the validation loss.
INPUT:
- yscale: str, optional
scale of the y-axis. Default is 'log'.
OUTPUT:
- None.
'''
import seaborn as sns
import matplotlib.pyplot as plt
sns.set_theme()
#Plot the losses
fig, ax = plt.subplots(1, 1, figsize=(10,5))
ax.set(yscale=yscale)
sns.lineplot(self.train_loss, label='Train')
if isinstance(self.val_loss, list):
sns.lineplot(self.val_loss, label='Validation')
ax.set_xlabel("Epoch")
ax.set_ylabel("Loss")
ax.set_title("Training and Validation Losses")
plt.legend()
plt.show()
# Define the feedforward neural network class
class FFN(nn.Module):
'''
Class for Feedforward neural networks.
:meta private:
'''
def __init__(self, layers, init, activation, drop, DTYPE=torch.float64):
'''
INPUT:
- layers: list of int
list such that each component is the number of neurons in the corresponding layer.
- init: str
the type of initialization to use for the weights. Either 'glorot_normal' or 'glorot_uniform'.
- activation: str
name of the activation function. Either 'tanh', 'relu', or 'sigmoid'.
- drop: float
dropout rate.
- DTYPE: torch data type.
OUTPUT:
- None.
'''
super().__init__()
# Define the layers
self.layers = nn.ModuleList([
nn.Linear(layers[l-1], layers[l], dtype=DTYPE) for\
l in range(1,len(layers))
])
# Initialize the weights
self.weights_initializer(init)
#Define activation function and dropout
self.set_activation_function(activation) #Define activation function
self.dropout = nn.Dropout(drop)
def weights_initializer(self, init):
'''
Initialize the weights.
INPUT:
- init: str
type of initialization to use for the weights.
OUTPUT:
- None.
'''
#Two available initializers: glorot_normal, glorot_uniform
temp = init.lower() if type(init) == str else None
if temp == 'glorot_normal':
for layer in self.layers:
nn.init.xavier_normal_(layer.weight)
elif temp == 'glorot_uniform':
for layer in self.layers:
nn.init.xavier_uniform_(layer.weight)
else:
raise ValueError(f"Initializer {init} not recognized")
def set_activation_function(self, activation):
'''
Set the activation function.
INPUT:
- activation: str
type of activation function to use.
OUTPUT:
- None.
'''
#Three available activation functions: tanh, relu, sigmoid
if activation == 'tanh':
self.activation = nn.Tanh()
elif activation == 'relu':
self.activation = nn.ReLU()
elif activation == 'sigmoid':
self.activation = nn.Sigmoid()
else:
raise ValueError(f"Activation function {activation} not recognized")
def forward(self, x):
'''
INPUT:
- x: torch.Tensor
input of the network; shape (batch_size, n_features).
OUTPUT:
- torch.Tensor
output of the network; shape (batch_size, output_size).
'''
# Forward pass through the network
for layer in self.layers[:-1]:
x = self.activation(layer(x)) #Hidden layers
x = self.dropout(x) #Dropout
output = self.layers[-1](x) #Output layer
return output
# Define the loss function class
class PinballLoss_MultiQ(nn.Module):
'''
Class for the Pinball loss function.
:meta private:
'''
def __init__(self, quantiles):
'''
Initialize the Pinball loss function.
INPUT:
- quantiles: list of float
each element is between 0 and 1 and represents a target confidence levels.
OUTPUT:
- None.
'''
super().__init__()
self.quantiles = quantiles
def forward(self, y_pred, y_true):
'''
INPUT:
- y_pred: torch.Tensor
quantile forecasts with shape (batch_size, n_series).
- y_true: torch.Tensor
actual values with shape (batch_size, n_series).
OUTPUT:
- float
mean pinball loss.
'''
# Ensure to work with torch tensors
if isinstance(y_pred, np.ndarray):
y_pred = torch.from_numpy(y_pred)
if isinstance(y_true, np.ndarray):
y_true = torch.from_numpy(y_true)
#Check consistency in the dimensions
if len(y_pred.shape) == 1:
y_pred = torch.unsqueeze(y_pred, dim=1)
if len(y_true.shape) == 1:
y_true = torch.unsqueeze(y_true, dim=1)
if y_pred.shape[0] != y_true.shape[0]:
raise ValueError(f'Shape[0] of y_pred ({y_pred.shape}) and y_true ({y_true.shape}) do not match!!!')
if y_pred.shape[1] != len(self.quantiles):
raise ValueError(f'Shape[1] of y_pred ({y_pred.shape}) and len(quantiles) ({len(self.quantiles)}) do not match!!!')
if y_true.shape[1] != 1:
raise ValueError(f'Shape[1] of y_true ({y_pred.shape}) should be 1!!!')
# Compute the pinball loss
error = y_true - y_pred
loss = torch.zeros(y_true.shape).to(y_true.device)
for q, quantile in enumerate(self.quantiles):
loss += torch.max(quantile * error[:,q:q+1], (quantile - 1) * error[:,q:q+1])
loss = torch.mean(loss)
return loss
# Define the K-QRNN class
[docs]
class K_QRNN(BaseNN):
'''
Expected Shortfall estimation via Kratz approach [1] with Quantile Regression Neural Network (QRNN) for quantile regression.
[1] Kratz, M., Lok, Y. H., & McNeil, A. J. (2018). Multinomial VaR backtests: A simple implicit approach to backtesting expected shortfall. Journal of Banking & Finance, 88, 393-407.
Parameters:
----------------
- theta: float
desired confidence level.
- params: dict
parameters of the model:
- optimizer: str, optional
optimizer to use, either 'Adam' or 'RMSProp'. Default is 'Adam'.
- reg_type: str or None, optional
type of regularization. Either None, 'l1', 'l2', or 'l1_l2'. Default is None.
- reg: float or list of float, optional
regularization parameter. Not consider when reg_type=None.
float when reg_type='l1' or 'l2'. List with two floats (l1 and l2) when
reg_type='l1_l2'. Default is 0.
- initializer: str, optional
initializer for the weights. Either 'glorot_normal' or 'glorot_uniform'.
Default is 'glorot_normal'.
- activation: str, optional
activation function. Either 'relu', 'sigmoid', or 'tanh'. Default is 'relu'.
- lr: float, optional
learning rate. Default is 0.01.
- dropout: float, optional
dropout rate. Default is 0.
- batch_size: int, optional
batch size. Default is -1, that is full batch. When
batch_size < x_train.shape[0], mini-batch training is performed.
- patience: int, optional
patience for early stopping. Default is np.inf, that is no early stopping.
- verbose: int, optional
set after how many epochs the information on losses are printed. Default is 1.
- dev: torch.device
indicates the device where the model will be trained.
- verbose: bool, optional
if True, print the training progress. Default is True.
Example of usage
----------------
.. code-block:: python
import torch
import numpy as np
from models.kqrnn import K_QRNN #Import the model
y = np.random.randn(1500) #Replace with your data
device = torch.device("cuda:0") #Device to use
theta = 0.05 #Set the desired confidence level
ti, tv = 1000, 1250 #Train and (train+val) lengths
x_lags_Q = 25 #Number of lags to fed the neural network
# Parameters of the neural network
qrnn_par = {
'activation': 'tanh', 'dropout': 0.2, 'reg': 1e-05, 'lr': 3e-05, 'batch_size': 128,
'initializer': 'glorot_normal', 'optimizer': 'rmsprop', 'reg_type': 'l1',
'n_epochs': 15000, 'patience': 500, 'theta': 0.05, 'n_points': 10,
'layers': [x_lags_Q, 16, 128, 128, 10]}
# Prepare the data for the neural models
x = np.concatenate([
y.reshape(-1,1)[k:-x_lags_Q+k] for k in range(x_lags_Q)], axis=1) #Create lagged data
x = torch.tensor(x, dtype=torch.float64).to(device) #x contains the past values of y
y_torch = torch.tensor(
y.reshape(-1,1)[x_lags_Q:], dtype=torch.float64).to(device) #y contains the target
x_train, y_train = x[:ti-x_lags_Q], y_torch[:ti-x_lags_Q] #Train data
x_val, y_val = x[ti-x_lags_Q:tv-x_lags_Q], y_torch[ti-x_lags_Q:tv-x_lags_Q] #Val data
x_test = x[tv-x_lags_Q:] #Test data
mdl = K_QRNN(theta, qrnn_par, device, verbose=False) # Initialize the model
mdl.fit(x_train, y_train, x_val, y_val) # Fit the model
res = mdl.predict(x_test) #Predict
q_pred = res['qf'] #Quantile forecast
es_pred = res['ef'] #Expected shortfall forecast
Methods:
----------------
'''
def __init__(self, theta, params, dev, verbose=True):
self.set_params(params) #Set the parameters
self.dev = dev
self.verbose = verbose
super().__init__()
# Define the model and optimizer
self.mdl = FFN(self.params['layers'], self.params['initializer'],
self.params['activation'], self.params['dropout']).to(self.dev)
self.set_optimizer() #Define the optimizer
# Define the loss function
self.loss = PinballLoss_MultiQ(
np.linspace(0, theta, self.params['n_points']+1, endpoint=True)[1:])
def set_params(self, params):
'''
Define the ultimate parameters dictionary by merging the parameters
defined by the user with the default ones
INPUT:
- params: dict
parameters defined by the user.
OUTPUT:
- None.
:meta private:
'''
self.params = {'optimizer': 'Adam', 'reg_type': None,
'reg': 0, 'initializer': 'glorot_normal', 'activation': 'relu',
'lr': 0.01, 'dropout': 0, 'batch_size':-1,
'patience': np.inf, 'verbose': 1} #Default parameters
self.params.update(params) #Update default parameters with those provided by the user
[docs]
def fit(self, x_train, y_train, x_val=None, y_val=None):
'''
Fit the model. With early stopping and batch training.
INPUT:
- x_train: torch.Tensor
model's train input of shape (batch_size, n_features).
- y_train: torch.Tensor
model's train output of shape (batch_size, 1).
- x_val: torch.Tensor, optional
model's validation input of shape (batch_size, n_features). Default is None.
- y_val: torch.Tensor, optional
model's validation output of shape (batch_size, 1). Default is None.
OUTPUT:
- None.
'''
#Initialize the best model
self.best_model = self.mdl.state_dict()
self.best_loss = np.inf
#Initialize the patience counter
self.patience = 0
#Initialize the training and validation losses
self.train_loss = []
if isinstance(x_val, torch.Tensor):
self.val_loss = []
else:
self.val_loss = None
#Understand if I'm using single or multiple batches
single_batch = (self.params['batch_size'] == -1) or\
(self.params['batch_size'] >= x_train.shape[0])
# Eventually, create the train indices list
if not single_batch: indices = torch.arange(x_train.shape[0])
# Set the verbosity if it is provided as a boolean:
if isinstance(self.verbose, bool):
self.verbose = int(self.verbose) - 1
#Train the model
if self.verbose >= 0: #If verbose is True, then use the progress bar
from tqdm.auto import tqdm
it_base = tqdm(range(self.params['n_epochs']), desc='Training the network') #Create the progress bar
else: #Otherwise, use the standard iterator
it_base = range(self.params['n_epochs']) #Create the iterator
for epoch in it_base:
if (epoch==0) and (self.verbose > 0):
print_base = '{:<10}{:<15}{:<15}'
print(print_base.format('Epoch', 'Train Loss', 'Val Loss'))
# Training
if single_batch:
self.train_single_batch(x_train, y_train)
else:
self.train_multi_batch(x_train, y_train, indices)
# If Validation
if isinstance(x_val, torch.Tensor):
self.mdl.eval() #Set the model in evaluation mode
with torch.no_grad():
val_loss = self.loss(self.mdl(x_val), y_val) #Compute the validation loss
self.val_loss.append(val_loss.item()) #Save the validation loss
# Update best model and eventually early stopping
if self.early_stopping(val_loss):
if self.verbose >= 0: print(f"Early stopping at epoch {epoch+1}")
break
else: #Otherwise
# Update best model and eventually early stopping
if self.early_stopping(self.train_loss[-1]):
if self.verbose >= 0: print(f"Early stopping at epoch {epoch+1}")
break
# Eventually, print the losses
if self.verbose > 0:
if (epoch+1) % self.verbose == 0:
print(print_base.format(epoch+1,
format(self.train_loss[-1], '.20f')[:10],
format(self.val_loss[-1], '.20f')[:10] if isinstance(x_val, torch.Tensor) else '-'))
#Load the best model
self.mdl.load_state_dict(self.best_model)
[docs]
def predict(self, x_test):
'''
Predict the quantile forecast and the expected shortfall.
INPUT:
- x_test: torch.Tensor
input of the model; shape (batch_size, n_features).
OUTPUT:
- qf: ndarray
quantile forecast of the model.
- ef: ndarray
expected shortfall predicted by the model.
'''
res = self.mdl(x_test)
return {'qf':res[:,-1].cpu().detach().numpy(),
'ef':torch.mean(res, dim=1).cpu().detach().numpy()}
def __call__(self, x_test):
'''
Predict the quantile forecast and the expected shortfall.
INPUT:
- x_test: torch.Tensor
input of the model; shape (batch_size, n_features).
OUTPUT:
- qf: ndarray
quantile forecast of the model.
- ef: ndarray
expected shortfall predicted by the model.
'''
res = self.mdl(x_test)
return {'qf':res[:,-1].cpu().detach().numpy(),
'ef':torch.mean(res, dim=1).cpu().detach().numpy()}