import numpy as np
import multiprocessing as mp
from models.caviar import CAViaR
import warnings
class CAESar_base():
'''
:meta private:
'''
def __init__(self, theta, lambdas=dict()):
self.theta = theta #Initialize theta
self.lambdas = {'r':10, 'q':10, 'e':10}
self.lambdas.update(lambdas) #Initialize lambdas for soft constraints
def loss_function(self, v, r, y):
'''
Compute the loss function (Barrera) of the model.
INPUTS:
- v: ndarray
Value at Risk.
- r: ndarray
difference between Expected Shortfall forecast and Value at Risk.
- y: ndarray
target time series.
OUTPUTS:
- float
Barrera loss value.
'''
loss_val = np.mean(
(r - np.where(y<v, (y-v)/self.theta, 0))**2
) + self.lambdas['r'] * np.mean( np.where(r>0, r, 0) )
return loss_val
def joint_loss(self, v, e, y):
'''
Compute the loss function (Patton) of the model.
INPUTS:
- v: ndarray
Value at Risk forecast.
- e: ndarray
Expected Shortfall forecast.
- y: ndarray
target time series.
OUTPUTS:
- float
Patton loss value.
'''
loss_val = np.mean(
np.where(y<=v, (y-v)/(self.theta*e), 0) + v/e + np.log(-e)
) + self.lambdas['e'] * np.mean( np.where(e>v, e-v, 0) ) +\
self.lambdas['q'] * np.mean( np.where(v>0, v, 0) )
return loss_val
def ESloss(self, beta, y, q, r0):
'''
Compute the Barrera loss of the model.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (self.n_parameters,).
- y: ndarray
target time series.
- q: ndarray
quantile forecast.
- r0: float
initial point for the ^r forecast.
OUTPUTS:
- float
loss value.
'''
r = self.loop(beta, y, q, r0) #Compute ^r
loss_val = self.loss_function(q, r, y) #Compute loss
return loss_val
def Jointloss(self, beta, y, q0, e0):
'''
Compute the Patton loss of the model.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (2*self.n_parameters,).
- y: ndarray
target time series.
- q0: float
initial point for the ^q forecast.
- e0: float
initial point for the ^e forecast.
OUTPUTS:
- float
loss value.
'''
q, e = self.joint_loop(beta, y, q0, e0) #Compute ^q and ^e
loss_val = self.joint_loss(q, e, y) #Compute loss
return loss_val
def optim4mp(self, yi, qi, r0, beta0, n_rep, pipend):
'''
Optimization routine for multiprocessing.
INPUTS:
- yi: ndarray
target time series.
- qi: ndarray
quantile forecast.
- r0: float
initial point for the ^r forecast.
- beta0: ndarray
initial point for the optimization. The shape is (self.n_parameters,).
- n_rep: int
number of repetitions for the optimization.
- pipend: multiprocessing.connection.Connection
pipe end for communicating multiprocessing.
OUTPUTS:
- None.
'''
from scipy.optimize import minimize
# First iteration
res = minimize(
lambda x: self.ESloss(x, yi, qi, r0), beta0,
method='SLSQP', options={'disp':False})
beta_worker, fval_beta_worker, exitflag_worker = res.x, res.fun, int(res.success)
# Iterate until the optimization is successful or the maximum number of repetitions is reached
for _ in range(n_rep):
res = minimize(
lambda x: self.ESloss(x, yi, qi, r0), beta_worker,
method='SLSQP', options={'disp':False})
beta_worker, fval_beta_worker, exitflag_worker = res.x, res.fun, int(res.success)
#If optimization is successful, exit the loop (no need to iterate further repetitions)
if exitflag_worker == 1:
break
# Communicate the results to the main process
pipend.send((beta_worker, fval_beta_worker, exitflag_worker))
def joint_optim(self, yi, q0, e0, beta0, n_rep):
'''
Joint optimization routine.
INPUTS:
- yi: ndarray
target time series.
- q0: float
initial point for the ^q forecast.
- e0: float
initial point for the ^e forecast.
- beta0: ndarray
initial point for the optimization. The shape is (2*self.n_parameters,).
- n_rep: int
number of repetitions for the optimization.
OUTPUTS:
- ndarray
optimized coefficients of the model. The shape is (2*self.n_parameters,).
'''
from scipy.optimize import minimize
# First iteration
res = minimize(
lambda x: self.Jointloss(x, yi, q0, e0), beta0,
method='SLSQP', options={'disp':False})
beta_worker, exitflag_worker = res.x, int(res.success)
# Iterate until the optimization is successful or the maximum number of repetitions is reached
for _ in range(n_rep):
res = minimize(
lambda x: self.Jointloss(x, yi, q0, e0), beta_worker,
method='SLSQP', options={'disp':False})
beta_worker, exitflag_worker = res.x, int(res.success)
#If optimization is successful, exit the loop (no need to iterate further repetitions)
if exitflag_worker == 1:
break
return beta_worker
def fit_predict(self, y, ti, seed=None, return_train=True, q0=None, nV=102, n_init=3, n_rep=5):
'''
Fit and predict the CAESar model.
INPUTS:
- y: ndarray
target time series.
- ti: int
train set length.
- seed: int or None, optional
random seed. Default is None.
- return_train: bool, optional
return the train set. Default is True.
- q0: list of float or None, optional
[initial quantile, initial expected shortfall]. If None, the initial quantile
is computed as the empirical quantile in the first 10% of the
training set; the initial expected shortfall as the tail mean in the same
subset. Default is None.
- nV: int, optional
number of random initializations of the model coefficients. Default is 102.
- n_init: int, optional
number of best initializations to work with. Default is 3.
- n_rep: int, optional
number of repetitions for the optimization. Default is 5.
OUTPUTS:
- ndarray
quantile forecast in the training set (if return_train=True).
- ndarray
expected shortfall forecast in the training set (if return_train=True).
- ndarray
quantile forecast in the test set.
- ndarray
expected shortfall forecast in the test set.
- ndarray
optimized coefficients of the model. The shape is (2, self.n_parameters).
'''
yi, yf = y[:ti], y[ti:] #Split train and test
if return_train:
res_train = self.fit(yi, seed=seed, return_train=True, q0=q0, nV=nV, n_init=n_init, n_rep=n_rep) #Train AE
res_test = self.predict(yf)
return {'qi':res_train['qi'], 'ei':res_train['ei'],
'qf':res_test['qf'], 'ef':res_test['ef'],
'beta':self.beta.reshape((2, self.n_parameters))} #Return prediction
else:
self.fit(yi, seed=seed, return_train=False, q0=q0, nV=nV, n_init=n_init, n_rep=n_rep) #Train AE
res_test = self.predict(yf)
return {'qf':res_test['qf'], 'ef':res_test['ef'],
'beta':self.beta.reshape((2, self.n_parameters))} #Return prediction
class CAESar_general(CAESar_base):
'''
CAESar for joint quantile and expected shortfall estimation
:meta private:
'''
def __init__(self, theta, spec='AS', lambdas=dict(), p=1, u=1):
'''
Initialization of the CAESar model.
INPUTS:
- theta: float
desired confidence level.
- spec: str, optional
specification of the model (SAV, AS, GARCH). Default is AS.
- lambdas: dict, optional
lambdas for the soft constraints. Default is {'r':10, 'q':10, 'e':10}.
- p: int, optional
number of y_t lags for the model. Default is 1.
- u: int, optional
number of ^q_t lags for the model. Default is 1.
OUTPUTS:
- None.
'''
super().__init__(theta, lambdas) #Initialize the base class
# Initialize p, u, and v
self.p, self.u, self.max_lag = p, u, np.max([p, u])
# According to the desired model, initialize the number of parameters and the loop
if spec == 'SAV':
self.mdl_spec = 'SAV'
self.n_parameters = 1+p+2*u
self.loop = self.R_SAVloop
self.joint_loop = self.Joint_SAVloop
elif spec == 'AS':
self.mdl_spec = 'AS'
self.n_parameters = 1+2*p+2*u
self.loop = self.R_ASloop
self.joint_loop = self.Joint_ASloop
elif spec == 'GARCH':
self.mdl_spec = 'GARCH'
self.n_parameters = 1+p+2*u
self.loop = self.R_GARCHloop
self.joint_loop = self.Joint_GARCHloop
else:
raise ValueError(f'Specification {spec} not recognized!\nChoose between "SAV", "AS", and "GARCH"')
def R_SAVloop(self, beta, y, q, r0, pred_mode=False):
'''
Loop for the ^r estimation via SAV specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (4,).
- y: ndarray
target time series.
- q: ndarray
quantile forecast.
- r0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], ^r[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
difference between the Expected Shortfall forecast and the quantile.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and r at step -1 and we need to compute r at step 0
r = list()
r.append( beta[0] + beta[1] * np.abs(r0[0]) + beta[2] * r0[1] + beta[3] * r0[2] ) #In pred_mode, r0 is assumed to be [y[-1], q[-1], r[-1]]
else: #If we are in training mode, we have an approximation of r at step 0
r = [r0]
# Loop
for t in range(1, len(y)):
r.append( beta[0] + beta[1] * np.abs(y[t-1]) + beta[2] * q[t-1] + beta[3] * r[t-1] )
r = np.array(r)
return r
def Joint_SAVloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via SAV specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (8,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
q = list()
e = list()
q.append( beta[0] + beta[1] * np.abs(e0[0]) + beta[2] * e0[1] + beta[3] * e0[2] ) #In pred_mode, e0 is assumed to be [y[-1], q[-1], e[-1]]
e.append( beta[4] + beta[5] * np.abs(e0[0]) + beta[6] * e0[1] + beta[7] * e0[2] )
else: #If we are in training mode, we have an approximation of q and e at step 0
q = [q0]
e = [e0]
# Loop
for t in range(1, len(y)):
q.append( beta[0] + beta[1] * np.abs(y[t-1]) + beta[2] * q[t-1] + beta[3] * e[t-1] )
e.append( beta[4] + beta[5] * np.abs(y[t-1]) + beta[6] * q[t-1] + beta[7] * e[t-1] )
q, e = np.array(q), np.array(e)
return q, e
def R_ASloop(self, beta, y, q, r0, pred_mode=False):
'''
Loop for the ^r estimation via AS specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (5,).
- y: ndarray
target time series.
- q: ndarray
quantile forecast.
- r0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], ^r[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
difference between the Expected Shortfall forecast and the quantile.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and r at step -1 and we need to compute r at step 0
# In pred_mode, r0 is assumed to be [y[-max_lag:], ^q[-max_lag:], ^r[-max_lag:]]
r = list()
for t in range(self.max_lag):
r.append( r0[2][t])
y = np.concatenate([r0[0], y])
q = np.concatenate([r0[1], q])
# Loop
y_coeff_list = [np.where(y>0, beta[2*i-1], beta[2*i]) for i in range(1, self.p+1)]
if len(y) > 0:
for t in range(self.max_lag, len(y)):
r.append(beta[0] +\
np.sum([y_coeff_list[i-1][t-i]*y[t-i] for i in range(1, self.p+1)]) +\
np.sum([beta[self.p*2+j]*q[t-j] for j in range(1, self.u+1)]) +\
np.sum([beta[self.p*2+self.u+j]*r[t-j] for j in range(1, self.u+1)]) )
else:
t = self.max_lag + 1
r.append(beta[0] +\
np.sum([y_coeff_list[i-1][t-i]*y[t-i] for i in range(1, self.p+1)]) +\
np.sum([beta[self.p*2+j]*q[t-j] for j in range(1, self.u+1)]) +\
np.sum([beta[self.p*2+self.u+j]*r[t-j] for j in range(1, self.u+1)]) )
r = r[self.max_lag:]
else: #If we are in training mode, we have an approximation of q at step 0
r = [r0]*self.max_lag
# Loop
y_coeff_list = [np.where(y>0, beta[2*i-1], beta[2*i]) for i in range(1, self.p+1)]
for t in range(self.max_lag, len(y)):
r.append(beta[0] +\
np.sum([y_coeff_list[i-1][t-i]*y[t-i] for i in range(1, self.p+1)]) +\
np.sum([beta[self.p*2+j]*q[t-j] for j in range(1, self.u+1)]) +\
np.sum([beta[self.p*2+self.u+j]*r[t-j] for j in range(1, self.u+1)]) )
r = np.array(r)
return r
def Joint_ASloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via AS specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (10,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
# In pred_mode, e0 is assumed to be [y[-max_lag:], ^q[-max_lag:], ^e[-max_lag:]]
q = list()
for t in range(self.max_lag):
q.append( e0[1][t])
e = list()
for t in range(self.max_lag):
e.append( e0[2][t])
y = np.concatenate([e0[0], y])
# Loop
y_coeff_list_q = [np.where(y>0, beta[2*i-1], beta[2*i]) for i in range(1, self.p+1)]
y_coeff_list_e = [np.where(y>0, beta[self.n_parameters+2*i-1],
beta[self.n_parameters+2*i]) for i in range(1, self.p+1)]
if len(y) > 0:
for t in range(self.max_lag, len(y)):
q.append(beta[0] +\
np.sum([y_coeff_list_q[i-1][t-i]*y[t-i] for i in range(1, self.p+1)]) +\
np.sum([beta[self.p*2+j]*q[t-j] for j in range(1, self.u+1)]) +\
np.sum([beta[self.p*2+self.u+j]*e[t-j] for j in range(1, self.u+1)]) )
e.append(beta[self.n_parameters] +\
np.sum([y_coeff_list_e[i-1][t-i]*y[t-i] for i in range(1, self.p+1)]) +\
np.sum([beta[self.n_parameters+self.p*2+j]*q[t-j] for j in range(1, self.u+1)]) +\
np.sum([beta[self.n_parameters+self.p*2+self.u+j]*e[t-j] for j in range(1, self.u+1)]) )
else:
t = self.max_lag + 1
q.append(beta[0] +\
np.sum([y_coeff_list_q[i-1][t-i]*y[t-i] for i in range(1, self.p+1)]) +\
np.sum([beta[self.p*2+j]*q[t-j] for j in range(1, self.u+1)]) +\
np.sum([beta[self.p*2+self.u+j]*e[t-j] for j in range(1, self.u+1)]) )
e.append(beta[self.n_parameters] +\
np.sum([y_coeff_list_e[i-1][t-i]*y[t-i] for i in range(1, self.p+1)]) +\
np.sum([beta[self.n_parameters+self.p*2+j]*q[t-j] for j in range(1, self.u+1)]) +\
np.sum([beta[self.n_parameters+self.p*2+self.u+j]*e[t-j] for j in range(1, self.u+1)]) )
q, e = q[self.max_lag:], e[self.max_lag:]
else: #If we are in training mode, we have an approximation of q at step 0
q = [q0]*self.max_lag
e = [e0]*self.max_lag
# Loop
y_coeff_list_q = [np.where(y>0, beta[2*i-1], beta[2*i]) for i in range(1, self.p+1)]
y_coeff_list_e = [np.where(y>0, beta[self.n_parameters+2*i-1],
beta[self.n_parameters+2*i]) for i in range(1, self.p+1)]
for t in range(self.max_lag, len(y)):
q.append(beta[0] +\
np.sum([y_coeff_list_q[i-1][t-i]*y[t-i] for i in range(1, self.p+1)]) +\
np.sum([beta[self.p*2+j]*q[t-j] for j in range(1, self.u+1)]) +\
np.sum([beta[self.p*2+self.u+j]*e[t-j] for j in range(1, self.u+1)]) )
e.append(beta[self.n_parameters] +\
np.sum([y_coeff_list_e[i-1][t-i]*y[t-i] for i in range(1, self.p+1)]) +\
np.sum([beta[self.n_parameters+self.p*2+j]*q[t-j] for j in range(1, self.u+1)]) +\
np.sum([beta[self.n_parameters+self.p*2+self.u+j]*e[t-j] for j in range(1, self.u+1)]) )
q, e = np.array(q), np.array(e)
return q, e
def R_GARCHloop(self, beta, y, q, r0, pred_mode=False):
'''
Loop for the ^r estimation via GARCH specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (4,).
- y: ndarray
target time series.
- q: ndarray
quantile forecast.
- r0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], ^r[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
difference between the Expected Shortfall forecast and the quantile.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute e at step 0
r = list()
r.append( -np.sqrt(beta[0]+beta[1]*r0[0]**2+beta[2]*r0[1]**2+beta[3]*r0[2]**2) ) #In pred_mode, r0 is assumed to be [y[-1], q[-1], r[-1]]
else: #If we are in training mode, we have an approximation of e at step 0
r = [r0]
# Loop
for t in range(1, len(y)):
r.append( -np.sqrt(beta[0]+beta[1]*y[t-1]**2+beta[2]*q[t-1]**2+beta[3]*r[t-1]**2) )
r = np.array(r)
return r
def Joint_GARCHloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via GARCH specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (8,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
q = list()
e = list()
q.append( -np.sqrt(beta[0]+beta[1]*e0[0]**2+beta[2]*e0[1]**2+beta[3]*e0[2]**2) ) #In pred_mode, e0 is assumed to be [y[-1], q[-1], e[-1]]
e.append( -np.sqrt(beta[4]+beta[5]*e0[0]**2+beta[6]*e0[1]**2+beta[7]*e0[2]**2) )
else: #If we are in training mode, we have an approximation of q and e at step 0
q = [q0]
e = [e0]
# Loop
for t in range(1, len(y)):
q.append( -np.sqrt(beta[0]+beta[1]*y[t-1]**2+beta[2]*q[t-1]**2+beta[3]*e[t-1]**2) )
e.append( -np.sqrt(beta[4]+beta[5]*y[t-1]**2+beta[6]*q[t-1]**2+beta[7]*e[t-1]**2) )
q, e = np.array(q), np.array(e)
return q, e
def fit(self, yi, seed=None, return_train=False, q0=None, nV=102, n_init=3, n_rep=5):
'''
Fit the CAESar model.
INPUTS:
- yi: ndarray
target time series.
- seed: int or None, optional
random seed. Default is None.
- return_train: bool, optional
if True, the function returns the fitted values. Default is False.
- q0: list of float or None, optional
[initial quantile, initial expected shortfall]. If None, the initial quantile
is computed as the empirical quantile in the first 10% of the
training set; the initial expected shortfall as the tail mean in the same
subset. Default is None.
- nV: int, optional
number of random initializations of the model coefficients. Default is 102.
- n_init: int, optional
number of best initializations to work with. Default is 3.
- n_rep: int, optional
number of repetitions for the optimization. Default is 5.
OUTPUTS:
- ndarray
quantile forecast in the training set (if return_train=True).
- ndarray
expected shortfall forecast in the training set (if return_train=True).
- ndarray
optimized coefficients of the model. The shape is
(2, self.n_parameters) (if return_train=True).
'''
warnings.simplefilter(action='ignore', category=RuntimeWarning) #Ignore warnings
#-------------------- Step 0: CAViaR for quantile initial guess
cav_res = CAViaR(
self.theta, self.mdl_spec, p=self.p, u=self.u).fit(
yi, seed=seed, return_train=True, q0=q0) #Train CAViaR
qi, beta_cav = cav_res['qi'], cav_res['beta']
del(cav_res)
#-------------------- Step 1: Initialization
if isinstance(yi, list):
yi = np.array(yi)
if isinstance(q0, type(None)):
#The starting forecast ^q_0 and ^e_0 is the empricial quantile of the first part of the trainig set (for computational reason)
n_emp = int(np.ceil(0.1 * len(yi))) #Select onyl 1/10 of the training set
if round(n_emp * self.theta) == 0: n_emp = len(yi) #In case the training dimension is too small wrt theta
y_sort = np.sort(yi[:n_emp])
quantile0 = int(round(n_emp * self.theta))-1
if quantile0 < 0: quantile0 = 0
e0 = np.mean(y_sort[:quantile0+1])
q0 = y_sort[quantile0]
else:
q0 = q0[0]
e0 = q0[1]
#-------------------- Step 2: Initial guess
np.random.seed(seed)
nInitialVectors = [nV//3, self.n_parameters] #Define the shape of the random initializations
beta0 = [np.random.uniform(0, 1, nInitialVectors)]
beta0.append(np.random.uniform(-1, 1, nInitialVectors))
beta0.append(np.random.randn(*nInitialVectors))
beta0 = np.concatenate(beta0, axis=0)
AEfval = np.empty(nV) #Initialize the loss function vector
#Iterates over the random initializations
for i in range(nV):
AEfval[i] = self.ESloss(beta0[i, :], yi, qi, e0-q0) #Compute starting loss
beta0 = beta0[AEfval.argsort()][0:n_init] #Sort initializations by loss and select only the n_init best initializations
#-------------------- Step 3: Optimization Routine - I step (barrera)
beta = np.empty((n_init, self.n_parameters)) #Initialize the parameters vector
fval_beta = np.empty(n_init) #Initialize the loss function vector
exitflag = np.empty(n_init) #Initialize the exit flag vector
# Multiprocessing: create and start worker processes
workers = list()
for i in range(n_init):
parent_pipend, child_pipend = mp.Pipe() # Create a pipe to communicate with the worker
worker = mp.Process(target=self.optim4mp,
args=(yi, qi, e0-q0, beta0[i, :],
n_rep, child_pipend))
workers.append([worker, parent_pipend])
worker.start()
# Gather results from workers
for i, worker_list in enumerate(workers):
worker, parent_pipend = worker_list
beta_worker, fval_beta_worker, exitflag_worker =\
parent_pipend.recv() # Get the result from the worker
worker.join()
beta[i, :] = beta_worker
fval_beta[i] = fval_beta_worker
exitflag[i] = exitflag_worker
ind_min = np.argmin(fval_beta) #Select the index of the best loss
self.beta = beta[ind_min, :] #Store the best parameters
self.fval_beta = fval_beta[ind_min] #Store the best loss
self.exitflag = exitflag[ind_min] #Store the best exit flag
#-------------------- Step 4: Optimization Routine - II step (patton)
# Concatenate ^q and ^r parameters, then convert ^r into ^e parameters
joint_beta = np.concatenate([beta_cav, [0]*self.u, self.beta]) #Concatenate
if self.mdl_spec == 'SAV':
joint_beta[4] += joint_beta[0]
joint_beta[5] += joint_beta[1]
joint_beta[6] += joint_beta[2] - joint_beta[7]
elif self.mdl_spec == 'AS':
for i in range(2*self.p +1):
joint_beta[self.n_parameters + i] += joint_beta[i]
for j in range(2*self.p +1, 2*self.p + self.u +1):
joint_beta[self.n_parameters + j] += joint_beta[j] -\
joint_beta[self.n_parameters + self.u +j]
elif self.mdl_spec == 'GARCH':
joint_beta[4] += joint_beta[0]
joint_beta[5] += joint_beta[1]
joint_beta[6] += joint_beta[2] + joint_beta[7]
joint_beta_temp = self.joint_optim(yi, q0, e0, joint_beta, n_rep)
if not np.isnan(joint_beta_temp).any():
joint_beta = joint_beta_temp
self.beta = joint_beta
#Compute the fit output: optimal beta vector, optimization info, and the last pair ^q, ^e
#in sample variables
qi, ei = self.joint_loop(self.beta, yi, q0, e0) #Train CAESar and recover fitted quantiles
self.last_state = [yi[-self.max_lag:], qi[-self.max_lag:], ei[-self.max_lag:]] #Store the last state
if return_train: #If return_train is True, return the training prediction and coefficients
return {'qi':qi, 'ei':ei, 'beta':self.beta.reshape((2, self.n_parameters))}
def predict(self, yf=np.array(list())):
'''
Predict the quantile.
INPUTS:
- yf: ndarray, optional
test data. If yf is not empty, the internal state is updated
with the last observation. Default is an empty list.
OUTPUTS:
- ndarray
quantile estimate.
- ndarray
Expected Shortfall estimate.
'''
qf, ef = self.joint_loop(self.beta, yf, None, self.last_state, pred_mode=True)
if len(yf) > 0:
self.last_state = [
np.concatenate([self.last_state[0], yf])[-self.max_lag:],
np.concatenate([self.last_state[1], qf])[-self.max_lag:],
np.concatenate([self.last_state[2], ef])[-self.max_lag:] ]
return {'qf':qf, 'ef':ef}
class CAESar_1_1(CAESar_base):
'''
CAESar for joint quantile and expected shortfall estimation - y lags=1, q lags=1.
:meta private:
'''
def __init__(self, theta, spec='AS', lambdas=dict()):
'''
Initialization of the CAESar model.
INPUTS:
- theta: float
desired confidence level.
- spec: str, optional
specification of the model (SAV, AS, GARCH). Default is AS.
- lambdas: dict, optional
lambdas for the soft constraints. Default is {'r':10, 'q':10, 'e':10}.
OUTPUTS:
- None.
'''
super().__init__(theta, lambdas) #Initialize the base class
# According to the desired model, initialize the number of parameters and the loop
if spec == 'SAV':
self.mdl_spec = 'SAV'
self.n_parameters = 4
self.loop = self.R_SAVloop
self.joint_loop = self.Joint_SAVloop
elif spec == 'AS':
self.mdl_spec = 'AS'
self.n_parameters = 5
self.loop = self.R_ASloop
self.joint_loop = self.Joint_ASloop
elif spec == 'GARCH':
self.mdl_spec = 'GARCH'
self.n_parameters = 4
self.loop = self.R_GARCHloop
self.joint_loop = self.Joint_GARCHloop
else:
raise ValueError(f'Specification {spec} not recognized!\nChoose between "SAV", "AS", and "GARCH"')
def R_SAVloop(self, beta, y, q, r0, pred_mode=False):
'''
Loop for the ^r estimation via SAV specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (4,).
- y: ndarray
target time series.
- q: ndarray
quantile forecast.
- r0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], ^r[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
difference between the Expected Shortfall forecast and the quantile.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and r at step -1 and we need to compute r at step 0
r = list()
r.append( beta[0] + beta[1] * np.abs(r0[0]) + beta[2] * r0[1] + beta[3] * r0[2] ) #In pred_mode, r0 is assumed to be [y[-1], q[-1], r[-1]]
else: #If we are in training mode, we have an approximation of r at step 0
r = [r0]
# Loop
for t in range(1, len(y)):
r.append( beta[0] + beta[1] * np.abs(y[t-1]) + beta[2] * q[t-1] + beta[3] * r[t-1] )
r = np.array(r)
return r
def Joint_SAVloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via SAV specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (8,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
q = list()
e = list()
q.append( beta[0] + beta[1] * np.abs(e0[0]) + beta[2] * e0[1] + beta[3] * e0[2] ) #In pred_mode, e0 is assumed to be [y[-1], q[-1], e[-1]]
e.append( beta[4] + beta[5] * np.abs(e0[0]) + beta[6] * e0[1] + beta[7] * e0[2] )
else: #If we are in training mode, we have an approximation of q and e at step 0
q = [q0]
e = [e0]
# Loop
for t in range(1, len(y)):
q.append( beta[0] + beta[1] * np.abs(y[t-1]) + beta[2] * q[t-1] + beta[3] * e[t-1] )
e.append( beta[4] + beta[5] * np.abs(y[t-1]) + beta[6] * q[t-1] + beta[7] * e[t-1] )
q, e = np.array(q), np.array(e)
return q, e
def R_ASloop(self, beta, y, q, r0, pred_mode=False):
'''
Loop for the ^r estimation via AS specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (5,).
- y: ndarray
target time series.
- q: ndarray
quantile forecast.
- r0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], ^r[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
difference between the Expected Shortfall forecast and the quantile.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute e at step 0
r = list()
r.append( beta[0] + np.where(r0[0]>0, beta[1], beta[2]) * r0[0] + beta[3] * r0[1] + beta[4] * r0[2] ) #In pred_mode, r0 is assumed to be [y[-1], q[-1], r[-1]]
else: #If we are in training mode, we have an approximation of e at step 0
r = [r0]
# Loop
y_plus_coeff = np.where(y>0, beta[1], beta[2]) #Only one between positive and negative y part gives a contribution
for t in range(1, len(y)):
r.append( beta[0] + y_plus_coeff[t-1] * y[t-1] + beta[3] * q[t-1] + beta[4] * r[t-1] )
r = np.array(r)
return r
def Joint_ASloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via AS specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (10,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
q = list()
e = list()
q.append( beta[0] + np.where(e0[0]>0, beta[1], beta[2]) * e0[0] + beta[3] * e0[1] + beta[4] * e0[2] ) #In pred_mode, e0 is assumed to be [y[-1], q[-1], e[-1]]
e.append( beta[5] + np.where(e0[0]>0, beta[6], beta[7]) * e0[0] + beta[8] * e0[1] + beta[9] * e0[2] )
else: #If we are in training mode, we have an approximation of q and e at step 0
q = [q0]
e = [e0]
# Loop
y_plus_coeff_q = np.where(y>0, beta[1], beta[2]) #Only one between positive and negative y part gives a contribution
y_plus_coeff_e = np.where(y>0, beta[6], beta[7])
for t in range(1, len(y)):
q.append( beta[0] + y_plus_coeff_q[t-1] * y[t-1] + beta[3] * q[t-1] + beta[4] * e[t-1] )
e.append( beta[5] + y_plus_coeff_e[t-1] * y[t-1] + beta[8] * q[t-1] + beta[9] * e[t-1] )
q, e = np.array(q), np.array(e)
return q, e
def R_GARCHloop(self, beta, y, q, r0, pred_mode=False):
'''
Loop for the ^r estimation via GARCH specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (4,).
- y: ndarray
target time series.
- q: ndarray
quantile forecast.
- r0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], ^r[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
difference between the Expected Shortfall forecast and the quantile.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute e at step 0
r = list()
r.append( -np.sqrt(beta[0]+beta[1]*r0[0]**2+beta[2]*r0[1]**2+beta[3]*r0[2]**2) ) #In pred_mode, r0 is assumed to be [y[-1], q[-1], r[-1]]
else: #If we are in training mode, we have an approximation of e at step 0
r = [r0]
# Loop
for t in range(1, len(y)):
r.append( -np.sqrt(beta[0]+beta[1]*y[t-1]**2+beta[2]*q[t-1]**2+beta[3]*r[t-1]**2) )
r = np.array(r)
return r
def Joint_GARCHloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via GARCH specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (8,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
q = list()
e = list()
q.append( -np.sqrt(beta[0]+beta[1]*e0[0]**2+beta[2]*e0[1]**2+beta[3]*e0[2]**2) ) #In pred_mode, e0 is assumed to be [y[-1], q[-1], e[-1]]
e.append( -np.sqrt(beta[4]+beta[5]*e0[0]**2+beta[6]*e0[1]**2+beta[7]*e0[2]**2) )
else: #If we are in training mode, we have an approximation of q and e at step 0
q = [q0]
e = [e0]
# Loop
for t in range(1, len(y)):
q.append( -np.sqrt(beta[0]+beta[1]*y[t-1]**2+beta[2]*q[t-1]**2+beta[3]*e[t-1]**2) )
e.append( -np.sqrt(beta[4]+beta[5]*y[t-1]**2+beta[6]*q[t-1]**2+beta[7]*e[t-1]**2) )
q, e = np.array(q), np.array(e)
return q, e
def fit(self, yi, seed=None, return_train=False, q0=None, nV=102, n_init=3, n_rep=5):
'''
Fit the CAESar model.
INPUTS:
- yi: ndarray
target time series.
- seed: int or None, optional
random seed. Default is None.
- return_train: bool, optional
if True, the function returns the fitted values. Default is False.
- q0: list of float or None, optional
[initial quantile, initial expected shortfall]. If None, the initial quantile
is computed as the empirical quantile in the first 10% of the
training set; the initial expected shortfall as the tail mean in the same
subset. Default is None.
- nV: int, optional
number of random initializations of the model coefficients. Default is 102.
- n_init: int, optional
number of best initializations to work with. Default is 3.
- n_rep: int, optional
number of repetitions for the optimization. Default is 5.
OUTPUTS:
- ndarray
quantile forecast in the training set (if return_train=True).
- ndarray
expected shortfall forecast in the training set (if return_train=True).
- ndarray
optimized coefficients of the model. The shape is
(2, self.n_parameters) (if return_train=True).
'''
warnings.simplefilter(action='ignore', category=RuntimeWarning) #Ignore Runtime Warnings
#-------------------- Step 0: CAViaR for quantile initial guess
cav_res = CAViaR(
self.theta, self.mdl_spec, p=1, u=1).fit(
yi, seed=seed, return_train=True, q0=q0) #Train CAViaR
qi, beta_cav = cav_res['qi'], cav_res['beta']
del(cav_res)
#-------------------- Step 1: Initialization
if isinstance(yi, list):
yi = np.array(yi)
if isinstance(q0, type(None)):
#The starting forecast ^q_0 and ^e_0 is the empricial quantile of the first part of the trainig set (for computational reason)
n_emp = int(np.ceil(0.1 * len(yi))) #Select onyl 1/10 of the training set
if round(n_emp * self.theta) == 0: n_emp = len(yi) #In case the training dimension is too small wrt theta
y_sort = np.sort(yi[:n_emp])
quantile0 = int(round(n_emp * self.theta))-1
if quantile0 < 0: quantile0 = 0
e0 = np.mean(y_sort[:quantile0+1])
q0 = y_sort[quantile0]
else:
q0 = q0[0]
e0 = q0[1]
#-------------------- Step 2: Initial guess
np.random.seed(seed)
nInitialVectors = [nV//3, self.n_parameters] #Define the shape of the random initializations
beta0 = [np.random.uniform(0, 1, nInitialVectors)]
beta0.append(np.random.uniform(-1, 1, nInitialVectors))
beta0.append(np.random.randn(*nInitialVectors))
beta0 = np.concatenate(beta0, axis=0)
AEfval = np.empty(nV) #Initialize the loss function vector
#Iterates over the random initializations
for i in range(nV):
AEfval[i] = self.ESloss(beta0[i, :], yi, qi, e0-q0) #Compute starting loss
beta0 = beta0[AEfval.argsort()][0:n_init] #Sort initializations by loss and select only the n_init best initializations
#-------------------- Step 3: Optimization Routine - I step (barrera)
beta = np.empty((n_init, self.n_parameters)) #Initialize the parameters vector
fval_beta = np.empty(n_init) #Initialize the loss function vector
exitflag = np.empty(n_init) #Initialize the exit flag vector
# Multiprocessing: create and start worker processes
workers = list()
for i in range(n_init):
parent_pipend, child_pipend = mp.Pipe() # Create a pipe to communicate with the worker
worker = mp.Process(target=self.optim4mp,
args=(yi, qi, e0-q0, beta0[i, :],
n_rep, child_pipend))
workers.append([worker, parent_pipend])
worker.start()
# Gather results from workers
for i, worker_list in enumerate(workers):
worker, parent_pipend = worker_list
beta_worker, fval_beta_worker, exitflag_worker =\
parent_pipend.recv() # Get the result from the worker
worker.join()
beta[i, :] = beta_worker
fval_beta[i] = fval_beta_worker
exitflag[i] = exitflag_worker
ind_min = np.argmin(fval_beta) #Select the index of the best loss
self.beta = beta[ind_min, :] #Store the best parameters
self.fval_beta = fval_beta[ind_min] #Store the best loss
self.exitflag = exitflag[ind_min] #Store the best exit flag
#-------------------- Step 4: Optimization Routine - II step (patton)
# Concatenate ^q and ^r parameters, then convert ^r into ^e parameters
joint_beta = np.concatenate([beta_cav, [0], self.beta]) #Concatenate
if self.mdl_spec == 'SAV':
joint_beta[4] += joint_beta[0]
joint_beta[5] += joint_beta[1]
joint_beta[6] += joint_beta[2] - joint_beta[7]
elif self.mdl_spec == 'AS':
joint_beta[5] += joint_beta[0]
joint_beta[6] += joint_beta[1]
joint_beta[7] += joint_beta[2]
joint_beta[8] += joint_beta[3] - joint_beta[9]
elif self.mdl_spec == 'GARCH':
joint_beta[4] += joint_beta[0]
joint_beta[5] += joint_beta[1]
joint_beta[6] += joint_beta[2] + joint_beta[7]
joint_beta_temp = self.joint_optim(yi, q0, e0, joint_beta, n_rep)
if not np.isnan(joint_beta_temp).any():
joint_beta = joint_beta_temp
self.beta = joint_beta
#Compute the fit output: optimal beta vector, optimization info, and the last pair ^q, ^e
#in sample variables
qi, ei = self.joint_loop(self.beta, yi, q0, e0) #Train CAESar and recover fitted quantiles
self.last_state = [yi[-1], qi[-1], ei[-1]] #Store the last state
if return_train: #If return_train is True, return the training prediction and coefficients
return {'qi':qi, 'ei':ei, 'beta':self.beta.reshape((2, self.n_parameters))}
def predict(self, yf=np.array(list())):
'''
Predict the quantile.
INPUTS:
- yf: ndarray, optional
test data. If yf is not empty, the internal state is updated
with the last observation. Default is an empty list.
OUTPUTS:
- ndarray
quantile estimate.
- ndarray
Expected Shortfall estimate.
'''
qf, ef = self.joint_loop(self.beta, yf, None, self.last_state, pred_mode=True)
if len(yf) > 0:
self.last_state = [yf[-1], qf[-1], ef[-1]]
return {'qf':qf, 'ef':ef}
class CAESar_2_2(CAESar_base):
'''
CAESar for joint quantile and expected shortfall estimation - y lags=2, q lags=2.
:meta private:
'''
def __init__(self, theta, spec='AS', lambdas=dict()):
'''
Initialization of the CAESar model.
INPUTS:
- theta: float
desired confidence level.
- spec: str, optional
specification of the model (SAV, AS, GARCH). Default is AS.
- lambdas: dict, optional
lambdas for the soft constraints. Default is {'r':10, 'q':10, 'e':10}.
OUTPUTS:
- None.
'''
super().__init__(theta, lambdas) #Initialize the base class
# According to the desired model, initialize the number of parameters and the loop
if spec == 'SAV':
self.mdl_spec = 'SAV'
self.n_parameters = 7
self.loop = self.R_SAVloop
self.joint_loop = self.Joint_SAVloop
elif spec == 'AS':
self.mdl_spec = 'AS'
self.n_parameters = 9
self.loop = self.R_ASloop
self.joint_loop = self.Joint_ASloop
elif spec == 'GARCH':
self.mdl_spec = 'GARCH'
self.n_parameters = 7
self.loop = self.R_GARCHloop
self.joint_loop = self.Joint_GARCHloop
else:
raise ValueError(f'Specification {spec} not recognized!\nChoose between "SAV", "AS", and "GARCH"')
def R_SAVloop(self, beta, y, q, r0, pred_mode=False):
'''
Loop for the ^r estimation via SAV specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (4,).
- y: ndarray
target time series.
- q: ndarray
quantile forecast.
- r0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], ^r[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
difference between the Expected Shortfall forecast and the quantile.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and r at step -1 and we need to compute r at step 0
r = list()
r.append( beta[0] + beta[1] * np.abs(r0[0]) + beta[2] * r0[1] + beta[3] * r0[2] ) #In pred_mode, r0 is assumed to be [y[-1], q[-1], r[-1]]
else: #If we are in training mode, we have an approximation of r at step 0
r = [r0]
# Loop
for t in range(1, len(y)):
r.append( beta[0] + beta[1] * np.abs(y[t-1]) + beta[2] * q[t-1] + beta[3] * r[t-1] )
r = np.array(r)
return r
def Joint_SAVloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via SAV specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (8,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
q = list()
e = list()
q.append( beta[0] + beta[1] * np.abs(e0[0]) + beta[2] * e0[1] + beta[3] * e0[2] ) #In pred_mode, e0 is assumed to be [y[-1], q[-1], e[-1]]
e.append( beta[4] + beta[5] * np.abs(e0[0]) + beta[6] * e0[1] + beta[7] * e0[2] )
else: #If we are in training mode, we have an approximation of q and e at step 0
q = [q0]
e = [e0]
# Loop
for t in range(1, len(y)):
q.append( beta[0] + beta[1] * np.abs(y[t-1]) + beta[2] * q[t-1] + beta[3] * e[t-1] )
e.append( beta[4] + beta[5] * np.abs(y[t-1]) + beta[6] * q[t-1] + beta[7] * e[t-1] )
q, e = np.array(q), np.array(e)
return q, e
def R_ASloop(self, beta, y, q, r0, pred_mode=False):
'''
Loop for the ^r estimation via AS specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (5,).
- y: ndarray
target time series.
- q: ndarray
quantile forecast.
- r0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], ^r[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
difference between the Expected Shortfall forecast and the quantile.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and r at step -1 and we need to compute r at step 0
# In pred_mode, r0 is assumed to be [y[-max_lag:], ^q[-max_lag:], ^r[-max_lag:]]
r = list()
for t in range(2):
r.append( r0[2][t])
y = np.concatenate([r0[0], y])
q = np.concatenate([r0[1], q])
# Loop
y_coeff_list_l1 = np.where(y>0, beta[1], beta[2])
y_coeff_list_l2 = np.where(y>0, beta[3], beta[4])
if len(y) > 0:
for t in range(2, len(y)):
r.append(beta[0] +\
y_coeff_list_l1[t-1]*y[t-1] + y_coeff_list_l2[t-1]*y[t-2] +\
beta[5]*q[t-1] + beta[6]*q[t-2] +\
beta[7]*r[t-1] + beta[8]*r[t-2] )
else:
t = 3
r.append(beta[0] +\
y_coeff_list_l1[t-1]*y[t-1] + y_coeff_list_l2[t-1]*y[t-2] +\
beta[5]*q[t-1] + beta[6]*q[t-2] +\
beta[7]*r[t-1] + beta[8]*r[t-2] )
r = r[2:]
else: #If we are in training mode, we have an approximation of q at step 0
r = [r0]*2
# Loop
y_coeff_list_l1 = np.where(y>0, beta[1], beta[2])
y_coeff_list_l2 = np.where(y>0, beta[3], beta[4])
for t in range(2, len(y)):
r.append(beta[0] +\
y_coeff_list_l1[t-1]*y[t-1] + y_coeff_list_l2[t-1]*y[t-2] +\
beta[5]*q[t-1] + beta[6]*q[t-2] +\
beta[7]*r[t-1] + beta[8]*r[t-2] )
r = np.array(r)
return r
def Joint_ASloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via AS specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (10,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
# In pred_mode, e0 is assumed to be [y[-max_lag:], ^q[-max_lag:], ^e[-max_lag:]]
q = list()
for t in range(2):
q.append( e0[1][t])
e = list()
for t in range(2):
e.append( e0[2][t])
y = np.concatenate([e0[0], y])
# Loop
y_coeff_list_q_l1 = np.where(y>0, beta[1], beta[2])
y_coeff_list_q_l2 = np.where(y>0, beta[3], beta[4])
y_coeff_list_e_l1 = np.where(y>0, beta[10], beta[11])
y_coeff_list_e_l2 = np.where(y>0, beta[12], beta[13])
if len(y) > 0:
for t in range(2, len(y)):
q.append(beta[0] +\
y_coeff_list_q_l1[t-1]*y[t-1] + y_coeff_list_q_l2[t-2]*y[t-2] +\
beta[5]*q[t-1] + beta[6]*q[t-2] +\
beta[7]*e[t-1] + beta[8]*e[t-2] )
e.append(beta[9] +\
y_coeff_list_e_l1[t-1]*y[t-1] + y_coeff_list_e_l2[t-2]*y[t-2] +\
beta[14]*q[t-1] + beta[15]*q[t-2] +\
beta[16]*e[t-1] + beta[17]*e[t-2] )
else:
t = 3
q.append(beta[0] +\
y_coeff_list_q_l1[t-1]*y[t-1] + y_coeff_list_q_l2[t-2]*y[t-2] +\
beta[5]*q[t-1] + beta[6]*q[t-2] +\
beta[7]*e[t-1] + beta[8]*e[t-2] )
e.append(beta[9] +\
y_coeff_list_e_l1[t-1]*y[t-1] + y_coeff_list_e_l2[t-2]*y[t-2] +\
beta[14]*q[t-1] + beta[15]*q[t-2] +\
beta[16]*e[t-1] + beta[17]*e[t-2] )
q, e = q[2:], e[2:]
else: #If we are in training mode, we have an approximation of q at step 0
q = [q0]*2
e = [e0]*2
# Loop
y_coeff_list_q_l1 = np.where(y>0, beta[1], beta[2])
y_coeff_list_q_l2 = np.where(y>0, beta[3], beta[4])
y_coeff_list_e_l1 = np.where(y>0, beta[10], beta[11])
y_coeff_list_e_l2 = np.where(y>0, beta[12], beta[13])
for t in range(2, len(y)):
q.append(beta[0] +\
y_coeff_list_q_l1[t-1]*y[t-1] + y_coeff_list_q_l2[t-2]*y[t-2] +\
beta[5]*q[t-1] + beta[6]*q[t-2] +\
beta[7]*e[t-1] + beta[8]*e[t-2] )
e.append(beta[9] +\
y_coeff_list_e_l1[t-1]*y[t-1] + y_coeff_list_e_l2[t-2]*y[t-2] +\
beta[14]*q[t-1] + beta[15]*q[t-2] +\
beta[16]*e[t-1] + beta[17]*e[t-2] )
q, e = np.array(q), np.array(e)
return q, e
def R_GARCHloop(self, beta, y, q, r0, pred_mode=False):
'''
Loop for the ^r estimation via GARCH specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (4,).
- y: ndarray
target time series.
- q: ndarray
quantile forecast.
- r0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], ^r[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
difference between the Expected Shortfall forecast and the quantile.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute e at step 0
r = list()
r.append( -np.sqrt(beta[0]+beta[1]*r0[0]**2+beta[2]*r0[1]**2+beta[3]*r0[2]**2) ) #In pred_mode, r0 is assumed to be [y[-1], q[-1], r[-1]]
else: #If we are in training mode, we have an approximation of e at step 0
r = [r0]
# Loop
for t in range(1, len(y)):
r.append( -np.sqrt(beta[0]+beta[1]*y[t-1]**2+beta[2]*q[t-1]**2+beta[3]*r[t-1]**2) )
r = np.array(r)
return r
def Joint_GARCHloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via GARCH specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (8,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
q = list()
e = list()
q.append( -np.sqrt(beta[0]+beta[1]*e0[0]**2+beta[2]*e0[1]**2+beta[3]*e0[2]**2) ) #In pred_mode, e0 is assumed to be [y[-1], q[-1], e[-1]]
e.append( -np.sqrt(beta[4]+beta[5]*e0[0]**2+beta[6]*e0[1]**2+beta[7]*e0[2]**2) )
else: #If we are in training mode, we have an approximation of q and e at step 0
q = [q0]
e = [e0]
# Loop
for t in range(1, len(y)):
q.append( -np.sqrt(beta[0]+beta[1]*y[t-1]**2+beta[2]*q[t-1]**2+beta[3]*e[t-1]**2) )
e.append( -np.sqrt(beta[4]+beta[5]*y[t-1]**2+beta[6]*q[t-1]**2+beta[7]*e[t-1]**2) )
q, e = np.array(q), np.array(e)
return q, e
def fit(self, yi, seed=None, return_train=False, q0=None, nV=102, n_init=3, n_rep=5):
'''
Fit the CAESar model.
INPUTS:
- yi: ndarray
target time series.
- seed: int or None, optional
random seed. Default is None.
- return_train: bool, optional
if True, the function returns the fitted values. Default is False.
- q0: list of float or None, optional
[initial quantile, initial expected shortfall]. If None, the initial quantile
is computed as the empirical quantile in the first 10% of the
training set; the initial expected shortfall as the tail mean in the same
subset. Default is None.
- nV: int, optional
number of random initializations of the model coefficients. Default is 102.
- n_init: int, optional
number of best initializations to work with. Default is 3.
- n_rep: int, optional
number of repetitions for the optimization. Default is 5.
OUTPUTS:
- ndarray
quantile forecast in the training set (if return_train=True).
- ndarray
expected shortfall forecast in the training set (if return_train=True).
- ndarray
optimized coefficients of the model. The shape is
(2, self.n_parameters) (if return_train=True).
'''
warnings.simplefilter(action='ignore', category=RuntimeWarning) #Ignore Runtime Warnings
#-------------------- Step 0: CAViaR for quantile initial guess
cav_res = CAViaR(
self.theta, self.mdl_spec, p=2, u=2).fit(
yi, seed=seed, return_train=True, q0=q0) #Train CAViaR
qi, beta_cav = cav_res['qi'], cav_res['beta']
del(cav_res)
#-------------------- Step 1: Initialization
if isinstance(yi, list):
yi = np.array(yi)
if isinstance(q0, type(None)):
#The starting forecast ^q_0 and ^e_0 is the empricial quantile of the first part of the trainig set (for computational reason)
n_emp = int(np.ceil(0.1 * len(yi))) #Select onyl 1/10 of the training set
if round(n_emp * self.theta) == 0: n_emp = len(yi) #In case the training dimension is too small wrt theta
y_sort = np.sort(yi[:n_emp])
quantile0 = int(round(n_emp * self.theta))-1
if quantile0 < 0: quantile0 = 0
e0 = np.mean(y_sort[:quantile0+1])
q0 = y_sort[quantile0]
else:
q0 = q0[0]
e0 = q0[1]
#-------------------- Step 2: Initial guess
np.random.seed(seed)
nInitialVectors = [nV//3, self.n_parameters] #Define the shape of the random initializations
beta0 = [np.random.uniform(0, 1, nInitialVectors)]
beta0.append(np.random.uniform(-1, 1, nInitialVectors))
beta0.append(np.random.randn(*nInitialVectors))
beta0 = np.concatenate(beta0, axis=0)
AEfval = np.empty(nV) #Initialize the loss function vector
#Iterates over the random initializations
for i in range(nV):
AEfval[i] = self.ESloss(beta0[i, :], yi, qi, e0-q0) #Compute starting loss
beta0 = beta0[AEfval.argsort()][0:n_init] #Sort initializations by loss and select only the n_init best initializations
#-------------------- Step 3: Optimization Routine - I step (barrera)
beta = np.empty((n_init, self.n_parameters)) #Initialize the parameters vector
fval_beta = np.empty(n_init) #Initialize the loss function vector
exitflag = np.empty(n_init) #Initialize the exit flag vector
# Multiprocessing: create and start worker processes
workers = list()
for i in range(n_init):
parent_pipend, child_pipend = mp.Pipe() # Create a pipe to communicate with the worker
worker = mp.Process(target=self.optim4mp,
args=(yi, qi, e0-q0, beta0[i, :],
n_rep, child_pipend))
workers.append([worker, parent_pipend])
worker.start()
# Gather results from workers
for i, worker_list in enumerate(workers):
worker, parent_pipend = worker_list
beta_worker, fval_beta_worker, exitflag_worker =\
parent_pipend.recv() # Get the result from the worker
worker.join()
beta[i, :] = beta_worker
fval_beta[i] = fval_beta_worker
exitflag[i] = exitflag_worker
ind_min = np.argmin(fval_beta) #Select the index of the best loss
self.beta = beta[ind_min, :] #Store the best parameters
self.fval_beta = fval_beta[ind_min] #Store the best loss
self.exitflag = exitflag[ind_min] #Store the best exit flag
#-------------------- Step 4: Optimization Routine - II step (patton)
# Concatenate ^q and ^r parameters, then convert ^r into ^e parameters
joint_beta = np.concatenate([beta_cav, [0, 0], self.beta]) #Concatenate
if self.mdl_spec == 'SAV':
joint_beta[4] += joint_beta[0]
joint_beta[5] += joint_beta[1]
joint_beta[6] += joint_beta[2] - joint_beta[7]
elif self.mdl_spec == 'AS':
for i in range(5):
joint_beta[9 + i] += joint_beta[i]
for j in range(5, 7):
joint_beta[9 + j] += joint_beta[j] - joint_beta[11 +j]
elif self.mdl_spec == 'GARCH':
joint_beta[4] += joint_beta[0]
joint_beta[5] += joint_beta[1]
joint_beta[6] += joint_beta[2] + joint_beta[7]
joint_beta_temp = self.joint_optim(yi, q0, e0, joint_beta, n_rep)
if not np.isnan(joint_beta_temp).any():
joint_beta = joint_beta_temp
self.beta = joint_beta
#Compute the fit output: optimal beta vector, optimization info, and the last pair ^q, ^e
#in sample variables
qi, ei = self.joint_loop(self.beta, yi, q0, e0) #Train CAESar and recover fitted quantiles
self.last_state = [yi[-2:], qi[-2:], ei[-2:]] #Store the last state
if return_train: #If return_train is True, return the training prediction and coefficients
return {'qi':qi, 'ei':ei, 'beta':self.beta.reshape((2, self.n_parameters))}
def predict(self, yf=np.array(list())):
'''
Predict the quantile.
INPUTS:
- yf: ndarray, optional
test data. If yf is not empty, the internal state is updated
with the last observation. Default is an empty list.
OUTPUTS:
- ndarray
quantile estimate.
- ndarray
Expected Shortfall estimate.
'''
qf, ef = self.joint_loop(self.beta, yf, None, self.last_state, pred_mode=True)
if len(yf) > 0:
self.last_state = [
np.concatenate([self.last_state[0], yf])[-2:],
np.concatenate([self.last_state[1], qf])[-2:],
np.concatenate([self.last_state[2], ef])[-2:] ]
return {'qf':qf, 'ef':ef}
class CAESar_1_2(CAESar_base):
'''
CAESar for joint quantile and expected shortfall estimation - y lags=1, q lags=2.
:meta private:
'''
def __init__(self, theta, spec='AS', lambdas=dict()):
'''
Initialization of the CAESar model.
INPUTS:
- theta: float
desired confidence level.
- spec: str, optional
specification of the model (SAV, AS, GARCH). Default is AS.
- lambdas: dict, optional
lambdas for the soft constraints. Default is {'r':10, 'q':10, 'e':10}.
OUTPUTS:
- None.
'''
super().__init__(theta, lambdas) #Initialize the base class
# According to the desired model, initialize the number of parameters and the loop
if spec == 'SAV':
self.mdl_spec = 'SAV'
self.n_parameters = 6
self.loop = self.R_SAVloop
self.joint_loop = self.Joint_SAVloop
elif spec == 'AS':
self.mdl_spec = 'AS'
self.n_parameters = 7
self.loop = self.R_ASloop
self.joint_loop = self.Joint_ASloop
elif spec == 'GARCH':
self.mdl_spec = 'GARCH'
self.n_parameters = 6
self.loop = self.R_GARCHloop
self.joint_loop = self.Joint_GARCHloop
else:
raise ValueError(f'Specification {spec} not recognized!\nChoose between "SAV", "AS", and "GARCH"')
def R_SAVloop(self, beta, y, q, r0, pred_mode=False):
'''
Loop for the ^r estimation via SAV specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (4,).
- y: ndarray
target time series.
- q: ndarray
quantile forecast.
- r0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], ^r[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
difference between the Expected Shortfall forecast and the quantile.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and r at step -1 and we need to compute r at step 0
r = list()
r.append( beta[0] + beta[1] * np.abs(r0[0]) + beta[2] * r0[1] + beta[3] * r0[2] ) #In pred_mode, r0 is assumed to be [y[-1], q[-1], r[-1]]
else: #If we are in training mode, we have an approximation of r at step 0
r = [r0]
# Loop
for t in range(1, len(y)):
r.append( beta[0] + beta[1] * np.abs(y[t-1]) + beta[2] * q[t-1] + beta[3] * r[t-1] )
r = np.array(r)
return r
def Joint_SAVloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via SAV specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (8,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
q = list()
e = list()
q.append( beta[0] + beta[1] * np.abs(e0[0]) + beta[2] * e0[1] + beta[3] * e0[2] ) #In pred_mode, e0 is assumed to be [y[-1], q[-1], e[-1]]
e.append( beta[4] + beta[5] * np.abs(e0[0]) + beta[6] * e0[1] + beta[7] * e0[2] )
else: #If we are in training mode, we have an approximation of q and e at step 0
q = [q0]
e = [e0]
# Loop
for t in range(1, len(y)):
q.append( beta[0] + beta[1] * np.abs(y[t-1]) + beta[2] * q[t-1] + beta[3] * e[t-1] )
e.append( beta[4] + beta[5] * np.abs(y[t-1]) + beta[6] * q[t-1] + beta[7] * e[t-1] )
q, e = np.array(q), np.array(e)
return q, e
def R_ASloop(self, beta, y, q, r0, pred_mode=False):
'''
Loop for the ^r estimation via AS specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (5,).
- y: ndarray
target time series.
- q: ndarray
quantile forecast.
- r0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], ^r[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
difference between the Expected Shortfall forecast and the quantile.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and r at step -1 and we need to compute r at step 0
# In pred_mode, r0 is assumed to be [y[-max_lag:], ^q[-max_lag:], ^r[-max_lag:]]
r = list()
for t in range(2):
r.append( r0[2][t])
y = np.concatenate([r0[0], y])
q = np.concatenate([r0[1], q])
# Loop
y_coeff_list_l1 = np.where(y>0, beta[1], beta[2])
if len(y) > 0:
for t in range(2, len(y)):
r.append(beta[0] +\
y_coeff_list_l1[t-1]*y[t-1] +\
beta[3]*q[t-1] + beta[4]*q[t-2] +\
beta[5]*r[t-1] + beta[6]*r[t-2] )
else:
t = 3
r.append(beta[0] +\
y_coeff_list_l1[t-1]*y[t-1] +\
beta[3]*q[t-1] + beta[4]*q[t-2] +\
beta[5]*r[t-1] + beta[6]*r[t-2] )
r = r[2:]
else: #If we are in training mode, we have an approximation of q at step 0
r = [r0]*2
# Loop
y_coeff_list_l1 = np.where(y>0, beta[1], beta[2])
for t in range(2, len(y)):
r.append(beta[0] +\
y_coeff_list_l1[t-1]*y[t-1] +\
beta[3]*q[t-1] + beta[4]*q[t-2] +\
beta[5]*r[t-1] + beta[6]*r[t-2] )
r = np.array(r)
return r
def Joint_ASloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via AS specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (10,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
# In pred_mode, e0 is assumed to be [y[-max_lag:], ^q[-max_lag:], ^e[-max_lag:]]
q = list()
for t in range(2):
q.append( e0[1][t])
e = list()
for t in range(2):
e.append( e0[2][t])
y = np.concatenate([e0[0], y])
# Loop
y_coeff_list_q_l1 = np.where(y>0, beta[1], beta[2])
y_coeff_list_e_l1 = np.where(y>0, beta[8], beta[9])
if len(y) > 0:
for t in range(2, len(y)):
q.append(beta[0] +\
y_coeff_list_q_l1[t-1]*y[t-1] +\
beta[3]*q[t-1] + beta[4]*q[t-2] +\
beta[5]*e[t-1] + beta[6]*e[t-2] )
e.append(beta[7] +\
y_coeff_list_e_l1[t-1]*y[t-1] +\
beta[10]*q[t-1] + beta[11]*q[t-2] +\
beta[12]*e[t-1] + beta[13]*e[t-2] )
else:
t = 3
q.append(beta[0] +\
y_coeff_list_q_l1[t-1]*y[t-1] +\
beta[3]*q[t-1] + beta[4]*q[t-2] +\
beta[5]*e[t-1] + beta[6]*e[t-2] )
e.append(beta[7] +\
y_coeff_list_e_l1[t-1]*y[t-1] +\
beta[10]*q[t-1] + beta[11]*q[t-2] +\
beta[12]*e[t-1] + beta[13]*e[t-2] )
q, e = q[2:], e[2:]
else: #If we are in training mode, we have an approximation of q at step 0
q = [q0]*2
e = [e0]*2
# Loop
y_coeff_list_q_l1 = np.where(y>0, beta[1], beta[2])
y_coeff_list_e_l1 = np.where(y>0, beta[8], beta[9])
for t in range(2, len(y)):
q.append(beta[0] +\
y_coeff_list_q_l1[t-1]*y[t-1] +\
beta[3]*q[t-1] + beta[4]*q[t-2] +\
beta[5]*e[t-1] + beta[6]*e[t-2] )
e.append(beta[7] +\
y_coeff_list_e_l1[t-1]*y[t-1] +\
beta[10]*q[t-1] + beta[11]*q[t-2] +\
beta[12]*e[t-1] + beta[13]*e[t-2] )
q, e = np.array(q), np.array(e)
return q, e
def R_GARCHloop(self, beta, y, q, r0, pred_mode=False):
'''
Loop for the ^r estimation via GARCH specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (4,).
- y: ndarray
target time series.
- q: ndarray
quantile forecast.
- r0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], ^r[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
difference between the Expected Shortfall forecast and the quantile.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute e at step 0
r = list()
r.append( -np.sqrt(beta[0]+beta[1]*r0[0]**2+beta[2]*r0[1]**2+beta[3]*r0[2]**2) ) #In pred_mode, r0 is assumed to be [y[-1], q[-1], r[-1]]
else: #If we are in training mode, we have an approximation of e at step 0
r = [r0]
# Loop
for t in range(1, len(y)):
r.append( -np.sqrt(beta[0]+beta[1]*y[t-1]**2+beta[2]*q[t-1]**2+beta[3]*r[t-1]**2) )
r = np.array(r)
return r
def Joint_GARCHloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via GARCH specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (8,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
q = list()
e = list()
q.append( -np.sqrt(beta[0]+beta[1]*e0[0]**2+beta[2]*e0[1]**2+beta[3]*e0[2]**2) ) #In pred_mode, e0 is assumed to be [y[-1], q[-1], e[-1]]
e.append( -np.sqrt(beta[4]+beta[5]*e0[0]**2+beta[6]*e0[1]**2+beta[7]*e0[2]**2) )
else: #If we are in training mode, we have an approximation of q and e at step 0
q = [q0]
e = [e0]
# Loop
for t in range(1, len(y)):
q.append( -np.sqrt(beta[0]+beta[1]*y[t-1]**2+beta[2]*q[t-1]**2+beta[3]*e[t-1]**2) )
e.append( -np.sqrt(beta[4]+beta[5]*y[t-1]**2+beta[6]*q[t-1]**2+beta[7]*e[t-1]**2) )
q, e = np.array(q), np.array(e)
return q, e
def fit(self, yi, seed=None, return_train=False, q0=None, nV=102, n_init=3, n_rep=5):
'''
Fit the CAESar model.
INPUTS:
- yi: ndarray
target time series.
- seed: int or None, optional
random seed. Default is None.
- return_train: bool, optional
if True, the function returns the fitted values. Default is False.
- q0: list of float or None, optional
[initial quantile, initial expected shortfall]. If None, the initial quantile
is computed as the empirical quantile in the first 10% of the
training set; the initial expected shortfall as the tail mean in the same
subset. Default is None.
- nV: int, optional
number of random initializations of the model coefficients. Default is 102.
- n_init: int, optional
number of best initializations to work with. Default is 3.
- n_rep: int, optional
number of repetitions for the optimization. Default is 5.
OUTPUTS:
- ndarray
quantile forecast in the training set (if return_train=True).
- ndarray
expected shortfall forecast in the training set (if return_train=True).
- ndarray
optimized coefficients of the model. The shape is
(2, self.n_parameters) (if return_train=True).
'''
warnings.simplefilter(action='ignore', category=RuntimeWarning) #Ignore Runtime Warnings
#-------------------- Step 0: CAViaR for quantile initial guess
cav_res = CAViaR(
self.theta, self.mdl_spec, p=1, u=2).fit(
yi, seed=seed, return_train=True, q0=q0) #Train CAViaR
qi, beta_cav = cav_res['qi'], cav_res['beta']
del(cav_res)
#-------------------- Step 1: Initialization
if isinstance(yi, list):
yi = np.array(yi)
if isinstance(q0, type(None)):
#The starting forecast ^q_0 and ^e_0 is the empricial quantile of the first part of the trainig set (for computational reason)
n_emp = int(np.ceil(0.1 * len(yi))) #Select onyl 1/10 of the training set
if round(n_emp * self.theta) == 0: n_emp = len(yi) #In case the training dimension is too small wrt theta
y_sort = np.sort(yi[:n_emp])
quantile0 = int(round(n_emp * self.theta))-1
if quantile0 < 0: quantile0 = 0
e0 = np.mean(y_sort[:quantile0+1])
q0 = y_sort[quantile0]
else:
q0 = q0[0]
e0 = q0[1]
#-------------------- Step 2: Initial guess
np.random.seed(seed)
nInitialVectors = [nV//3, self.n_parameters] #Define the shape of the random initializations
beta0 = [np.random.uniform(0, 1, nInitialVectors)]
beta0.append(np.random.uniform(-1, 1, nInitialVectors))
beta0.append(np.random.randn(*nInitialVectors))
beta0 = np.concatenate(beta0, axis=0)
AEfval = np.empty(nV) #Initialize the loss function vector
#Iterates over the random initializations
for i in range(nV):
AEfval[i] = self.ESloss(beta0[i, :], yi, qi, e0-q0) #Compute starting loss
beta0 = beta0[AEfval.argsort()][0:n_init] #Sort initializations by loss and select only the n_init best initializations
#-------------------- Step 3: Optimization Routine - I step (barrera)
beta = np.empty((n_init, self.n_parameters)) #Initialize the parameters vector
fval_beta = np.empty(n_init) #Initialize the loss function vector
exitflag = np.empty(n_init) #Initialize the exit flag vector
# Multiprocessing: create and start worker processes
workers = list()
for i in range(n_init):
parent_pipend, child_pipend = mp.Pipe() # Create a pipe to communicate with the worker
worker = mp.Process(target=self.optim4mp,
args=(yi, qi, e0-q0, beta0[i, :],
n_rep, child_pipend))
workers.append([worker, parent_pipend])
worker.start()
# Gather results from workers
for i, worker_list in enumerate(workers):
worker, parent_pipend = worker_list
beta_worker, fval_beta_worker, exitflag_worker =\
parent_pipend.recv() # Get the result from the worker
worker.join()
beta[i, :] = beta_worker
fval_beta[i] = fval_beta_worker
exitflag[i] = exitflag_worker
ind_min = np.argmin(fval_beta) #Select the index of the best loss
self.beta = beta[ind_min, :] #Store the best parameters
self.fval_beta = fval_beta[ind_min] #Store the best loss
self.exitflag = exitflag[ind_min] #Store the best exit flag
#-------------------- Step 4: Optimization Routine - II step (patton)
# Concatenate ^q and ^r parameters, then convert ^r into ^e parameters
joint_beta = np.concatenate([beta_cav, [0, 0], self.beta]) #Concatenate
if self.mdl_spec == 'SAV':
joint_beta[4] += joint_beta[0]
joint_beta[5] += joint_beta[1]
joint_beta[6] += joint_beta[2] - joint_beta[7]
elif self.mdl_spec == 'AS':
for i in range(3):
joint_beta[7 + i] += joint_beta[i]
for j in range(3, 5):
joint_beta[7 + j] += joint_beta[j] - joint_beta[9 +j]
elif self.mdl_spec == 'GARCH':
joint_beta[4] += joint_beta[0]
joint_beta[5] += joint_beta[1]
joint_beta[6] += joint_beta[2] + joint_beta[7]
joint_beta_temp = self.joint_optim(yi, q0, e0, joint_beta, n_rep)
if not np.isnan(joint_beta_temp).any():
joint_beta = joint_beta_temp
self.beta = joint_beta
#Compute the fit output: optimal beta vector, optimization info, and the last pair ^q, ^e
#in sample variables
qi, ei = self.joint_loop(self.beta, yi, q0, e0) #Train CAESar and recover fitted quantiles
self.last_state = [yi[-2:], qi[-2:], ei[-2:]] #Store the last state
if return_train: #If return_train is True, return the training prediction and coefficients
return {'qi':qi, 'ei':ei, 'beta':self.beta.reshape((2, self.n_parameters))}
def predict(self, yf=np.array(list())):
'''
Predict the quantile.
INPUTS:
- yf: ndarray, optional
test data. If yf is not empty, the internal state is updated
with the last observation. Default is an empty list.
OUTPUTS:
- ndarray
quantile estimate.
- ndarray
Expected Shortfall estimate.
'''
qf, ef = self.joint_loop(self.beta, yf, None, self.last_state, pred_mode=True)
if len(yf) > 0:
self.last_state = [
np.concatenate([self.last_state[0], yf])[-2:],
np.concatenate([self.last_state[1], qf])[-2:],
np.concatenate([self.last_state[2], ef])[-2:] ]
return {'qf':qf, 'ef':ef}
class CAESar_2_1(CAESar_base):
'''
CAESar for joint quantile and expected shortfall estimation - y lags=2, q lags=1.
:meta private:
'''
def __init__(self, theta, spec='AS', lambdas=dict()):
'''
Initialization of the CAESar model.
INPUTS:
- theta: float
desired confidence level.
- spec: str, optional
specification of the model (SAV, AS, GARCH). Default is AS.
- lambdas: dict, optional
lambdas for the soft constraints. Default is {'r':10, 'q':10, 'e':10}.
OUTPUTS:
- None.
'''
super().__init__(theta, lambdas) #Initialize the base class
# According to the desired model, initialize the number of parameters and the loop
if spec == 'SAV':
self.mdl_spec = 'SAV'
self.n_parameters = 5
self.loop = self.R_SAVloop
self.joint_loop = self.Joint_SAVloop
elif spec == 'AS':
self.mdl_spec = 'AS'
self.n_parameters = 7
self.loop = self.R_ASloop
self.joint_loop = self.Joint_ASloop
elif spec == 'GARCH':
self.mdl_spec = 'GARCH'
self.n_parameters = 5
self.loop = self.R_GARCHloop
self.joint_loop = self.Joint_GARCHloop
else:
raise ValueError(f'Specification {spec} not recognized!\nChoose between "SAV", "AS", and "GARCH"')
def R_SAVloop(self, beta, y, q, r0, pred_mode=False):
'''
Loop for the ^r estimation via SAV specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (4,).
- y: ndarray
target time series.
- q: ndarray
quantile forecast.
- r0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], ^r[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
difference between the Expected Shortfall forecast and the quantile.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and r at step -1 and we need to compute r at step 0
r = list()
r.append( beta[0] + beta[1] * np.abs(r0[0]) + beta[2] * r0[1] + beta[3] * r0[2] ) #In pred_mode, r0 is assumed to be [y[-1], q[-1], r[-1]]
else: #If we are in training mode, we have an approximation of r at step 0
r = [r0]
# Loop
for t in range(1, len(y)):
r.append( beta[0] + beta[1] * np.abs(y[t-1]) + beta[2] * q[t-1] + beta[3] * r[t-1] )
r = np.array(r)
return r
def Joint_SAVloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via SAV specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (8,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
q = list()
e = list()
q.append( beta[0] + beta[1] * np.abs(e0[0]) + beta[2] * e0[1] + beta[3] * e0[2] ) #In pred_mode, e0 is assumed to be [y[-1], q[-1], e[-1]]
e.append( beta[4] + beta[5] * np.abs(e0[0]) + beta[6] * e0[1] + beta[7] * e0[2] )
else: #If we are in training mode, we have an approximation of q and e at step 0
q = [q0]
e = [e0]
# Loop
for t in range(1, len(y)):
q.append( beta[0] + beta[1] * np.abs(y[t-1]) + beta[2] * q[t-1] + beta[3] * e[t-1] )
e.append( beta[4] + beta[5] * np.abs(y[t-1]) + beta[6] * q[t-1] + beta[7] * e[t-1] )
q, e = np.array(q), np.array(e)
return q, e
def R_ASloop(self, beta, y, q, r0, pred_mode=False):
'''
Loop for the ^r estimation via AS specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (5,).
- y: ndarray
target time series.
- q: ndarray
quantile forecast.
- r0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], ^r[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
difference between the Expected Shortfall forecast and the quantile.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and r at step -1 and we need to compute r at step 0
# In pred_mode, r0 is assumed to be [y[-max_lag:], ^q[-max_lag:], ^r[-max_lag:]]
r = list()
for t in range(2):
r.append( r0[2][t])
y = np.concatenate([r0[0], y])
q = np.concatenate([r0[1], q])
# Loop
y_coeff_list_l1 = np.where(y>0, beta[1], beta[2])
y_coeff_list_l2 = np.where(y>0, beta[3], beta[4])
if len(y) > 0:
for t in range(2, len(y)):
r.append(beta[0] +\
y_coeff_list_l1[t-1]*y[t-1] + y_coeff_list_l2[t-1]*y[t-2] +\
beta[5]*q[t-1] +\
beta[6]*r[t-1] )
else:
t = 3
r.append(beta[0] +\
y_coeff_list_l1[t-1]*y[t-1] + y_coeff_list_l2[t-1]*y[t-2] +\
beta[5]*q[t-1] +\
beta[6]*r[t-1] )
r = r[2:]
else: #If we are in training mode, we have an approximation of q at step 0
r = [r0]*2
# Loop
y_coeff_list_l1 = np.where(y>0, beta[1], beta[2])
y_coeff_list_l2 = np.where(y>0, beta[3], beta[4])
for t in range(2, len(y)):
r.append(beta[0] +\
y_coeff_list_l1[t-1]*y[t-1] + y_coeff_list_l2[t-1]*y[t-2] +\
beta[5]*q[t-1] +\
beta[6]*r[t-1] )
r = np.array(r)
return r
def Joint_ASloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via AS specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (10,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
# In pred_mode, e0 is assumed to be [y[-max_lag:], ^q[-max_lag:], ^e[-max_lag:]]
q = list()
for t in range(2):
q.append( e0[1][t])
e = list()
for t in range(2):
e.append( e0[2][t])
y = np.concatenate([e0[0], y])
# Loop
y_coeff_list_q_l1 = np.where(y>0, beta[1], beta[2])
y_coeff_list_q_l2 = np.where(y>0, beta[3], beta[4])
y_coeff_list_e_l1 = np.where(y>0, beta[8], beta[9])
y_coeff_list_e_l2 = np.where(y>0, beta[10], beta[11])
if len(y) > 0:
for t in range(2, len(y)):
q.append(beta[0] +\
y_coeff_list_q_l1[t-1]*y[t-1] + y_coeff_list_q_l2[t-2]*y[t-2] +\
beta[5]*q[t-1] +\
beta[6]*e[t-1] )
e.append(beta[7] +\
y_coeff_list_e_l1[t-1]*y[t-1] + y_coeff_list_e_l2[t-2]*y[t-2] +\
beta[12]*q[t-1] +\
beta[13]*e[t-1] )
else:
t = 3
q.append(beta[0] +\
y_coeff_list_q_l1[t-1]*y[t-1] + y_coeff_list_q_l2[t-2]*y[t-2] +\
beta[5]*q[t-1] +\
beta[6]*e[t-1] )
e.append(beta[7] +\
y_coeff_list_e_l1[t-1]*y[t-1] + y_coeff_list_e_l2[t-2]*y[t-2] +\
beta[12]*q[t-1] +\
beta[13]*e[t-1] )
q, e = q[2:], e[2:]
else: #If we are in training mode, we have an approximation of q at step 0
q = [q0]*2
e = [e0]*2
# Loop
y_coeff_list_q_l1 = np.where(y>0, beta[1], beta[2])
y_coeff_list_q_l2 = np.where(y>0, beta[3], beta[4])
y_coeff_list_e_l1 = np.where(y>0, beta[8], beta[9])
y_coeff_list_e_l2 = np.where(y>0, beta[10], beta[11])
for t in range(2, len(y)):
q.append(beta[0] +\
y_coeff_list_q_l1[t-1]*y[t-1] + y_coeff_list_q_l2[t-2]*y[t-2] +\
beta[5]*q[t-1] +\
beta[6]*e[t-1] )
e.append(beta[7] +\
y_coeff_list_e_l1[t-1]*y[t-1] + y_coeff_list_e_l2[t-2]*y[t-2] +\
beta[12]*q[t-1] +\
beta[13]*e[t-1] )
q, e = np.array(q), np.array(e)
return q, e
def R_GARCHloop(self, beta, y, q, r0, pred_mode=False):
'''
Loop for the ^r estimation via GARCH specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (4,).
- y: ndarray
target time series.
- q: ndarray
quantile forecast.
- r0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], ^r[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
difference between the Expected Shortfall forecast and the quantile.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute e at step 0
r = list()
r.append( -np.sqrt(beta[0]+beta[1]*r0[0]**2+beta[2]*r0[1]**2+beta[3]*r0[2]**2) ) #In pred_mode, r0 is assumed to be [y[-1], q[-1], r[-1]]
else: #If we are in training mode, we have an approximation of e at step 0
r = [r0]
# Loop
for t in range(1, len(y)):
r.append( -np.sqrt(beta[0]+beta[1]*y[t-1]**2+beta[2]*q[t-1]**2+beta[3]*r[t-1]**2) )
r = np.array(r)
return r
def Joint_GARCHloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via GARCH specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (8,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
q = list()
e = list()
q.append( -np.sqrt(beta[0]+beta[1]*e0[0]**2+beta[2]*e0[1]**2+beta[3]*e0[2]**2) ) #In pred_mode, e0 is assumed to be [y[-1], q[-1], e[-1]]
e.append( -np.sqrt(beta[4]+beta[5]*e0[0]**2+beta[6]*e0[1]**2+beta[7]*e0[2]**2) )
else: #If we are in training mode, we have an approximation of q and e at step 0
q = [q0]
e = [e0]
# Loop
for t in range(1, len(y)):
q.append( -np.sqrt(beta[0]+beta[1]*y[t-1]**2+beta[2]*q[t-1]**2+beta[3]*e[t-1]**2) )
e.append( -np.sqrt(beta[4]+beta[5]*y[t-1]**2+beta[6]*q[t-1]**2+beta[7]*e[t-1]**2) )
q, e = np.array(q), np.array(e)
return q, e
def fit(self, yi, seed=None, return_train=False, q0=None, nV=102, n_init=3, n_rep=5):
'''
Fit the CAESar model.
INPUTS:
- yi: ndarray
target time series.
- seed: int or None, optional
random seed. Default is None.
- return_train: bool, optional
if True, the function returns the fitted values. Default is False.
- q0: list of float or None, optional
[initial quantile, initial expected shortfall]. If None, the initial quantile
is computed as the empirical quantile in the first 10% of the
training set; the initial expected shortfall as the tail mean in the same
subset. Default is None.
- nV: int, optional
number of random initializations of the model coefficients. Default is 102.
- n_init: int, optional
number of best initializations to work with. Default is 3.
- n_rep: int, optional
number of repetitions for the optimization. Default is 5.
OUTPUTS:
- ndarray
quantile forecast in the training set (if return_train=True).
- ndarray
expected shortfall forecast in the training set (if return_train=True).
- ndarray
optimized coefficients of the model. The shape is
(2, self.n_parameters) (if return_train=True).
'''
warnings.simplefilter(action='ignore', category=RuntimeWarning) #Ignore Runtime Warnings
#-------------------- Step 0: CAViaR for quantile initial guess
cav_res = CAViaR(
self.theta, self.mdl_spec, p=2, u=1).fit(
yi, seed=seed, return_train=True, q0=q0) #Train CAViaR
qi, beta_cav = cav_res['qi'], cav_res['beta']
del(cav_res)
#-------------------- Step 1: Initialization
if isinstance(yi, list):
yi = np.array(yi)
if isinstance(q0, type(None)):
#The starting forecast ^q_0 and ^e_0 is the empricial quantile of the first part of the trainig set (for computational reason)
n_emp = int(np.ceil(0.1 * len(yi))) #Select onyl 1/10 of the training set
if round(n_emp * self.theta) == 0: n_emp = len(yi) #In case the training dimension is too small wrt theta
y_sort = np.sort(yi[:n_emp])
quantile0 = int(round(n_emp * self.theta))-1
if quantile0 < 0: quantile0 = 0
e0 = np.mean(y_sort[:quantile0+1])
q0 = y_sort[quantile0]
else:
q0 = q0[0]
e0 = q0[1]
#-------------------- Step 2: Initial guess
np.random.seed(seed)
nInitialVectors = [nV//3, self.n_parameters] #Define the shape of the random initializations
beta0 = [np.random.uniform(0, 1, nInitialVectors)]
beta0.append(np.random.uniform(-1, 1, nInitialVectors))
beta0.append(np.random.randn(*nInitialVectors))
beta0 = np.concatenate(beta0, axis=0)
AEfval = np.empty(nV) #Initialize the loss function vector
#Iterates over the random initializations
for i in range(nV):
AEfval[i] = self.ESloss(beta0[i, :], yi, qi, e0-q0) #Compute starting loss
beta0 = beta0[AEfval.argsort()][0:n_init] #Sort initializations by loss and select only the n_init best initializations
#-------------------- Step 3: Optimization Routine - I step (barrera)
beta = np.empty((n_init, self.n_parameters)) #Initialize the parameters vector
fval_beta = np.empty(n_init) #Initialize the loss function vector
exitflag = np.empty(n_init) #Initialize the exit flag vector
# Multiprocessing: create and start worker processes
workers = list()
for i in range(n_init):
parent_pipend, child_pipend = mp.Pipe() # Create a pipe to communicate with the worker
worker = mp.Process(target=self.optim4mp,
args=(yi, qi, e0-q0, beta0[i, :],
n_rep, child_pipend))
workers.append([worker, parent_pipend])
worker.start()
# Gather results from workers
for i, worker_list in enumerate(workers):
worker, parent_pipend = worker_list
beta_worker, fval_beta_worker, exitflag_worker =\
parent_pipend.recv() # Get the result from the worker
worker.join()
beta[i, :] = beta_worker
fval_beta[i] = fval_beta_worker
exitflag[i] = exitflag_worker
ind_min = np.argmin(fval_beta) #Select the index of the best loss
self.beta = beta[ind_min, :] #Store the best parameters
self.fval_beta = fval_beta[ind_min] #Store the best loss
self.exitflag = exitflag[ind_min] #Store the best exit flag
#-------------------- Step 4: Optimization Routine - II step (patton)
# Concatenate ^q and ^r parameters, then convert ^r into ^e parameters
joint_beta = np.concatenate([beta_cav, [0], self.beta]) #Concatenate
if self.mdl_spec == 'SAV':
joint_beta[4] += joint_beta[0]
joint_beta[5] += joint_beta[1]
joint_beta[6] += joint_beta[2] - joint_beta[7]
elif self.mdl_spec == 'AS':
for i in range(5):
joint_beta[7 + i] += joint_beta[i]
for j in range(5, 6):
joint_beta[7 + j] += joint_beta[j] - joint_beta[8 +j]
elif self.mdl_spec == 'GARCH':
joint_beta[4] += joint_beta[0]
joint_beta[5] += joint_beta[1]
joint_beta[6] += joint_beta[2] + joint_beta[7]
joint_beta_temp = self.joint_optim(yi, q0, e0, joint_beta, n_rep)
if not np.isnan(joint_beta_temp).any():
joint_beta = joint_beta_temp
self.beta = joint_beta
#Compute the fit output: optimal beta vector, optimization info, and the last pair ^q, ^e
#in sample variables
qi, ei = self.joint_loop(self.beta, yi, q0, e0) #Train CAESar and recover fitted quantiles
self.last_state = [yi[-2:], qi[-2:], ei[-2:]] #Store the last state
if return_train: #If return_train is True, return the training prediction and coefficients
return {'qi':qi, 'ei':ei, 'beta':self.beta.reshape((2, self.n_parameters))}
def predict(self, yf=np.array(list())):
'''
Predict the quantile.
INPUTS:
- yf: ndarray, optional
test data. If yf is not empty, the internal state is updated
with the last observation. Default is an empty list.
OUTPUTS:
- ndarray
quantile estimate.
- ndarray
Expected Shortfall estimate.
'''
qf, ef = self.joint_loop(self.beta, yf, None, self.last_state, pred_mode=True)
if len(yf) > 0:
self.last_state = [
np.concatenate([self.last_state[0], yf])[-2:],
np.concatenate([self.last_state[1], qf])[-2:],
np.concatenate([self.last_state[2], ef])[-2:] ]
return {'qf':qf, 'ef':ef}
[docs]
class CAESar():
'''
Conditional Autoregressive Expected Shortfall (CAESar) model for joint quantile and expected shortfall estimation. The model has been introduced is:
Gatta, F., Lillo, F., & Mazzarisi, P. (2024). CAESar: Conditional Autoregressive Expected Shortfall. arXiv preprint arXiv:2407.06619.
Parameters:
----------------
- theta: float
quantile level.
- spec: str, optional
specification of the model (SAV, AS, GARCH). Default is AS.
- p: int, optional
number of lags for the y variable. Default is 1.
- u: int, optional
number of lags for ^q and ^e. Default is 1.
Example of usage
----------------
.. code-block:: python
import numpy as np
from models.caesar import CAESar #Import the model
y = np.random.randn(1500) #Replace with your data
tv = 1250 #Training set length
theta = 0.05 #Set the desired confidence level
mdl = CAESar(theta, 'AS') #Initialize the model
res = mdl.fit_predict(y, tv, seed=2, return_train=True) # Fit and predict
q_fitted = res['qi'] #Quantile fitted values
es_fitted = res['ei'] #Expected shortfall fitted values
coef = res['beta'] #Fitted coefficients
q_pred = res['qf'] #Quantile forecast
e_pred = res['ef'] #Expected shortfall forecast
Methods:
----------------
'''
def __init__(self, theta, spec='AS', lambdas=dict(), p=1, u=1):
p, u = int(p), int(u) #Convert to integer
# Ensure that p and r are greater than 0
assert p > 0, 'p must be greater than 0'
assert u > 0, 'u must be greater than 0'
# Check for optimized versions, otherwise use the general one and allert the user
if p == 1:
if u == 1:
self.wrapped_self = CAESar_1_1(theta, spec=spec, lambdas=lambdas)
elif u == 2:
self.wrapped_self = CAESar_1_2(theta, spec=spec, lambdas=lambdas)
else:
warnings.warn('The selected model is not optimized, resulting in high computational times. Consider using the optimized versions for p and u in {1,2}.')
self.wrapped_self = CAESar_general(theta, spec=spec, lambdas=lambdas, p=p, u=u)
elif p == 2:
if u == 1:
self.wrapped_self = CAESar_2_1(theta, spec=spec, lambdas=lambdas)
elif u == 2:
self.wrapped_self = CAESar_2_2(theta, spec=spec, lambdas=lambdas)
else:
warnings.warn('The selected model is not optimized, resulting in high computational times. Consider using the optimized versions for p and u in {1,2}.')
self.wrapped_self = CAESar_general(theta, spec=spec, lambdas=lambdas, p=p, u=u)
else:
warnings.warn('The selected model is not optimized, resulting in high computational times. Consider using the optimized versions for p and u in {1,2}.')
self.wrapped_self = CAESar_general(theta, spec=spec, lambdas=lambdas, p=p, u=u)
self.n_parameters = self.wrapped_self.n_parameters
[docs]
def fit(self, yi, seed=None, return_train=False, q0=None, nV=102, n_init=3, n_rep=5):
'''
Fit the CAESar model.
INPUTS:
- yi: ndarray
target time series.
- seed: int or None, optional
random seed. Default is None.
- return_train: bool, optional
if True, the function returns the fitted values. Default is False.
- q0: list of float or None, optional
[initial quantile, initial expected shortfall]. If None, the initial quantile
is computed as the empirical quantile in the first 10% of the
training set; the initial expected shortfall as the tail mean in the same
subset. Default is None.
- nV: int, optional
number of random initializations of the model coefficients. Default is 102.
- n_init: int, optional
number of best initializations to work with. Default is 3.
- n_rep: int, optional
number of repetitions for the optimization. Default is 5.
OUTPUTS:
- qi: ndarray
quantile forecast in the training set (if return_train=True).
- ei: ndarray
expected shortfall forecast in the training set (if return_train=True).
- beta: ndarray
optimized coefficients of the model. The shape is
(2, self.n_parameters) (if return_train=True).
'''
temp_out = self.wrapped_self.fit(yi, seed, return_train, q0, nV, n_init, n_rep)
self.last_state = self.wrapped_self.last_state
self.beta = self.wrapped_self.beta
if return_train: #If return_train is True, return the training prediction and coefficients
return temp_out
[docs]
def predict(self, yf=np.array(list())):
'''
Predict the quantile.
INPUTS:
- yf: ndarray, optional
test data. If yf is not empty, the internal state is updated
with the last observation. Default is an empty list.
OUTPUTS:
- qf: ndarray
quantile forecast in the test set.
- ef: ndarray
expected shortfall forecast in the test set.
'''
temp_out = self.wrapped_self.predict(yf)
self.last_state = self.wrapped_self.last_state
return temp_out
[docs]
def fit_predict(self, y, ti, seed=None, return_train=True, q0=None, nV=102, n_init=3, n_rep=5):
'''
Fit and predict the CAESar model.
INPUTS:
- y: ndarray
target time series.
- ti: int
train set length.
- seed: int or None, optional
random seed. Default is None.
- return_train: bool, optional
return the train set. Default is True.
- q0: list of float or None, optional
[initial quantile, initial expected shortfall]. If None, the initial quantile
is computed as the empirical quantile in the first 10% of the
training set; the initial expected shortfall as the tail mean in the same
subset. Default is None.
- nV: int, optional
number of random initializations of the model coefficients. Default is 102.
- n_init: int, optional
number of best initializations to work with. Default is 3.
- n_rep: int, optional
number of repetitions for the optimization. Default is 5.
OUTPUTS:
- qi: ndarray
quantile forecast in the training set (if return_train=True).
- ei: ndarray
expected shortfall forecast in the training set (if return_train=True).
- qf: ndarray
quantile forecast in the test set.
- ef: ndarray
expected shortfall forecast in the test set.
- beta: ndarray
optimized coefficients of the model. The shape is (2, self.n_parameters).
'''
temp_out = self.wrapped_self.fit_predict(
y, ti, seed, return_train, q0, nV, n_init, n_rep)
self.last_state = self.wrapped_self.last_state
self.beta = self.wrapped_self.beta
return temp_out