Source code for models.bcgns


import torch
import cupy as cp
import numpy as np
from copy import deepcopy
from itertools import chain
from typing import Optional, Tuple


class AffineSoftplus(torch.nn.Module):
    '''
    :meta private:
    '''
    def __init__(self, dim_in: int, dim_out: int):
        super().__init__()
        self.W = torch.nn.Parameter(torch.empty(dim_in, dim_out, dtype=torch.float32))
        self.b = torch.nn.Parameter(torch.empty(1, dim_out, dtype=torch.float32))
        self.activation = torch.nn.Softplus()
        self.diff_activation = torch.nn.Sigmoid()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        z = torch.matmul(x, self.W) + self.b
        y = self.activation(z)
        return y
    
    @torch.jit.export
    def forward_diff(self, x: torch.Tensor, dy_prev: Optional[torch.Tensor], only_first_diff: bool) -> Tuple[torch.Tensor, torch.Tensor]:
        z = torch.matmul(x, self.W) + self.b
        diff_activation = self.diff_activation(z).unsqueeze(1)
        if only_first_diff:
            W = self.W[0].unsqueeze(0)
        else:
            W = self.W
        W = W.unsqueeze(0)
        y = self.activation(z)
        if dy_prev is not None:
            dy = (dy_prev @ W) * diff_activation
        else:
            dy = W * diff_activation
        return y, dy

class Affine(torch.nn.Module):
    '''
    :meta private:
    '''
    def __init__(self, dim_in: int, dim_out: int):
        super().__init__()
        self.W = torch.nn.Parameter(torch.empty(dim_in, dim_out, dtype=torch.float32))
        self.b = torch.nn.Parameter(torch.empty(1, dim_out, dtype=torch.float32))
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        y = torch.matmul(x, self.W) + self.b
        return y
    
    @torch.jit.export
    def forward_diff(self, x: torch.Tensor, dy_prev: Optional[torch.Tensor], only_first_diff: bool) -> Tuple[torch.Tensor, torch.Tensor]:
        y = torch.matmul(x, self.W) + self.b
        if only_first_diff:
            W = self.W[0].unsqueeze(0)
        else:
            W = self.W
        W = W.unsqueeze(0)
        if dy_prev is not None:
            dy = dy_prev @ W
        else:
            dy = W
        return y, dy

class ModelRandomAlpha(torch.nn.Module):
    '''
    :meta private:
    '''
    def __init__(self, input_dim: int, num_hidden_layers: int, num_hidden_units: int):
        super().__init__()
        h = []
        dim_in = input_dim
        for i in range(num_hidden_layers):
            h.append(AffineSoftplus(dim_in, num_hidden_units))
            dim_in = num_hidden_units
        self.h = torch.nn.ModuleList(h)
        self.o = Affine(num_hidden_units, 1)
        self.register_buffer('x_mean', torch.zeros(1, input_dim, dtype=torch.float32))
        self.register_buffer('x_std', torch.ones(1, input_dim, dtype=torch.float32))
        self.register_buffer('y_mean', torch.zeros(1, 1, dtype=torch.float32))
        self.register_buffer('y_std', torch.ones(1, 1, dtype=torch.float32))
        for l in chain(self.h, (self.o,)):
            torch.nn.init.normal_(l.W, mean=0., std=np.sqrt(1/l.W.shape[0]))
            torch.nn.init.zeros_(l.b)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        a = (x-self.x_mean)/self.x_std
        for l in self.h:
            a = l(a)
        return self.o(a)*self.y_std+self.y_mean
    
    @torch.jit.export
    def forward_diff(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        a = (x-self.x_mean)/self.x_std
        a, da = self.h[0].forward_diff(a, None, True)
        da = da / self.x_std[:, 0, None, None]
        for l in self.h[1:]:
            a, da = l.forward_diff(a, da, False)
        a, da = self.o.forward_diff(a, da, False)
        a = a * self.y_std + self.y_mean
        da = da[:, :, 0] * self.y_std
        return a, da

class ModelRandomAlphaPiecewiseAffine(torch.nn.Module):
    '''
    :meta private:
    '''
    def __init__(self, input_dim, num_hidden_layers, num_hidden_units, interpolation_nodes, bottleneck=False, positive_increments=True):
        super().__init__()
        h = []
        dim_in = input_dim-1
        for i in range(num_hidden_layers):
            if bottleneck and i==num_hidden_layers-1:
                h.append(AffineSoftplus(num_hidden_units, 1))
                break
            h.append(AffineSoftplus(dim_in, num_hidden_units))
            dim_in = num_hidden_units
        self.h = torch.nn.ModuleList(h)
        self.o = Affine(num_hidden_units if not bottleneck else 1, interpolation_nodes.shape[0])
        self.positive_increments = positive_increments
        self.register_buffer('x_mean', torch.zeros(1, input_dim, dtype=torch.float32))
        self.register_buffer('x_std', torch.ones(1, input_dim, dtype=torch.float32))
        self.register_buffer('y_mean', torch.zeros(1, 1, dtype=torch.float32))
        self.register_buffer('y_std', torch.ones(1, 1, dtype=torch.float32))
        self.register_buffer('interpolation_nodes', interpolation_nodes)
        self.register_buffer('interpolation_nodes_delta', interpolation_nodes[1:]-interpolation_nodes[:-1])
        for l in chain(self.h, (self.o,)):
            torch.nn.init.normal_(l.W, mean=0., std=np.sqrt(1/l.W.shape[0]))
            torch.nn.init.zeros_(l.b)

    def forward(self, x):
        a = (x[:, 1:]-self.x_mean[:, 1:])/self.x_std[:, 1:]
        for l in self.h:
            a = l(a)
        a = self.o(a)
        if self.positive_increments:
            a = a[:, 0, None]+torch.nn.functional.softplus((torch.minimum(x[:, 0, None], self.interpolation_nodes[None, 1:])-self.interpolation_nodes[None, :-1])*(self.interpolation_nodes[None, :-1]>=x[:, 0, None])*a[:, 1:]/self.interpolation_nodes_delta[None, :]).sum(1, keepdim=True)
        else:
            a = a[:, 0, None]+((torch.minimum(x[:, 0, None], self.interpolation_nodes[None, 1:])-self.interpolation_nodes[None, :-1])*(self.interpolation_nodes[None, :-1]>=x[:, 0, None])*a[:, 1:]/self.interpolation_nodes_delta[None, :]).sum(1, keepdim=True)
        return a*self.y_std+self.y_mean

@torch.jit.script
def _var_loss(y_pred: torch.Tensor, y_true: torch.Tensor, alpha: torch.Tensor) -> torch.Tensor:
    '''
    :meta private:
    '''
    return torch.mean(torch.relu(y_true-y_pred)+alpha*y_pred)

def train_single_var(model, X, Y, X_valid, Y_valid, alpha, batch_size, lr, num_epochs):
    '''
    :meta private:
    '''
    assert X.shape[0] == Y.shape[0]
    assert Y.shape[1] == 1
    assert isinstance(alpha, torch.Tensor)
    device = next(model.parameters()).device
    optimizer = torch.optim.Adam(model.parameters())
    model.x_mean.copy_(X.mean(0)[None, :])
    model.x_std.copy_(X.std(0)[None, :])
    model.y_mean.copy_(Y.mean(0)[None, :])
    model.y_std.copy_(Y.std(0)[None, :])
    with torch.no_grad():
        valid_loss = 0
        for b in range((X_valid.shape[0]+batch_size-1)//batch_size):
            idx = slice(b*batch_size, (b+1)*batch_size)
            Y_pred_valid = model(X_valid[idx])
            valid_loss += _var_loss(Y_pred_valid, Y_valid[idx], alpha)/model.y_std
        valid_loss /= b+1
        valid_loss = valid_loss.item()
    best_state_dict = deepcopy(model.state_dict())
    best_valid_loss = valid_loss
    losses = [(valid_loss, best_valid_loss)]
    for e in range(num_epochs):
        for group in optimizer.param_groups:
            group['lr'] = lr/np.sqrt(e+1)
        for b in range((X.shape[0]+batch_size-1)//batch_size):
            idx = slice(b*batch_size, (b+1)*batch_size)
            optimizer.zero_grad()
            Y_pred = model(X[idx])
            loss = _var_loss(Y_pred, Y[idx], alpha)/model.y_std[0, 0]
            loss.backward()
            optimizer.step()
        with torch.no_grad():
            valid_loss = 0
            for b in range((X_valid.shape[0]+batch_size-1)//batch_size):
                idx = slice(b*batch_size, (b+1)*batch_size)
                Y_pred_valid = model(X_valid[idx])
                valid_loss += _var_loss(Y_pred_valid, Y_valid[idx], alpha)/model.y_std
            valid_loss /= b+1
            valid_loss = valid_loss.item()
        if valid_loss < best_valid_loss:
            best_valid_loss = valid_loss
            for k, v in best_state_dict.items():
                v.copy_(model.state_dict()[k])
        losses.append((valid_loss, best_valid_loss))
    model.load_state_dict(best_state_dict)
    return losses

def train_single_es_regr(model, X, Y, Y_var, alpha, batch_size):
    '''
    :meta private:
    '''
    assert X.shape[0] == Y.shape[0]
    assert Y.shape[1] == 1
    assert isinstance(alpha, torch.Tensor)
    device = next(model.parameters()).device
    model.x_mean.copy_(X.mean(0)[None, :])
    model.x_std.copy_(X.std(0)[None, :])
    with torch.no_grad():
        es_Y = (Y-Y_var).relu_().div_(alpha)+Y_var
    model.y_mean.copy_(es_Y.mean(0)[None, :])
    model.y_std.copy_(es_Y.std(0)[None, :])
    h_aug = torch.empty((batch_size, model.o.W.shape[0]+1), dtype=torch.float32, device=device)
    h_aug[:, 0] = 1
    with torch.no_grad():
        model.o.W.zero_()
        model.o.b.zero_()
        for b in range((X.shape[0]+batch_size-1)//batch_size):
            idx = slice(b*batch_size, (b+1)*batch_size)
            h = X[idx]
            for l in model.h:
                h = l(h)
            h_aug[:, 1:] = h
            with cp.cuda.Device(device.index):
                sol, _, _, _ = cp.linalg.lstsq(cp.asarray(h_aug), cp.asarray((es_Y[idx]-model.y_mean)/model.y_std), rcond=None)
                sol = torch.as_tensor(sol, device=device)
            model.o.W.add_(sol[1:h_aug.shape[1]])
            model.o.b.add_(sol[:1])
        model.o.W /= b+1
        model.o.b /= b+1

[docs] class BCGNS(): ''' Wrapper of the neural network model proposed in: Barrera, D., Crépey, S., Gobet, E., Nguyen, H. D., & Saadeddine, B. (2022). Learning value-at-risk and expected shortfall. arXiv preprint arXiv:2209.06476. and implemented in: https://github.com/BouazzaSE/Learning-VaR-and-ES Parameters: ---------------- - theta: float desired confidence level. - dim: int dimension of the input space. - device: torch.device device to use. Example of usage ---------------- .. code-block:: python import torch import numpy as np from models.bcgns import BCGNS #Import the model y = np.random.randn(1500) #Replace with your data device = torch.device("cuda:0") #Device to use theta = 0.05 #Set the desired confidence level ti, tv = 1000, 1250 #Train and (train+val) lengths x_lags_B = 25 #Number of lags to fed the neural network # Prepare the data for the neural models x = np.concatenate([ y.reshape(-1,1)[k:-x_lags_B+k] for k in range(x_lags_B)], axis=1) #Create lagged data x = torch.tensor(x, dtype=torch.float32).to(device) #x contains the past values of y y_torch = torch.tensor( y.reshape(-1,1)[x_lags_B:], dtype=torch.float32).to(device) #y contains the target x_train, y_train = x[:ti-x_lags_B], y_torch[:ti-x_lags_B] #Train data x_val, y_val = x[ti-x_lags_B:tv-x_lags_B], y_torch[ti-x_lags_B:tv-x_lags_B] #Val data x_test = x[tv-x_lags_B:] #Test data mdl = BCGNS(theta, x_train.shape[1], device) #Initialize the model mdl.fit(x_train, y_train, x_val, y_val, batch_size=16) #Fit the model res = mdl.predict(x_test) #Predict q_pred = res['qf'] #Quantile forecast es_pred = res['ef'] #Expected shortfall forecast Methods: ---------------- ''' def __init__(self, theta, dim, device): self.theta = torch.tensor(1-theta, device=device) # Construct the models self.var_mdl = torch.jit.script(ModelRandomAlpha(dim, 3, 2*dim)).to(device) self.es_mdl = torch.jit.script(ModelRandomAlpha(dim, 3, 2*dim)).to(device)
[docs] def fit(self, x_train_, y_train_, x_val_, y_val_, batch_size): ''' Fit the model. INPUTS: - x_train_: torch.Tensor training input data. - y_train_: torch.Tensor training output data. - x_val_: torch.Tensor validation input data. - y_val_: torch.Tensor validation output data. - batch_size: int batch size. ''' x_train, y_train, x_val, y_val = -x_train_, -y_train_, -x_val_, -y_val_ # Train the var model _ = train_single_var(self.var_mdl, x_train, y_train, x_val, y_val, self.theta, batch_size, 0.01, 2000) # Train the es model x_tv = torch.cat([x_train, x_val], dim=0) x_tv = x_tv[x_tv.shape[0] % batch_size:] y_tv = torch.cat([y_train, y_val], dim=0) y_tv = y_tv[y_tv.shape[0] % batch_size:] self.es_mdl.load_state_dict(self.var_mdl.state_dict()) with torch.no_grad(): y_es_train = self.var_mdl(x_tv) train_single_es_regr( self.es_mdl, x_tv, y_tv, y_es_train, self.theta, batch_size)
[docs] def predict(self, x_test_): ''' Predict the quantile forecast and expected shortfall. INPUTS: - x_test_: torch.Tensor input data. OUTPUTS: - qf: ndarray quantile forecast. - ef: ndarray expected shortfall forecast. ''' x_test = -x_test_ var = self.var_mdl(x_test).cpu().detach().numpy() r = self.es_mdl(x_test).cpu().detach().numpy() return {'qf':var, 'ef':r*(1-self.theta.item()) + var}
def __call__(self, x_test_): ''' Predict the quantile forecast and expected shortfall. INPUTS: - x_test_: torch.Tensor input data. OUTPUTS: - qf: ndarray quantile forecast. - ef: ndarray expected shortfall forecast. ''' x_test = -x_test_ var = self.var_mdl(x_test).cpu().detach().numpy() r = self.es_mdl(x_test).cpu().detach().numpy() return {'qf':var, 'ef':r*(1-self.theta.item()) + var}