import numpy as np
import multiprocessing as mp
from models.caviar import CAViaR
[docs]
class B_CAESar():
'''
CAESar model for Expected Shortfall estimation - optimization only with Barrera loss. See [1], Supplementary Material B, for further details.
[1] Gatta, F., Lillo, F., & Mazzarisi, P. (2024). CAESar: Conditional Autoregressive Expected Shortfall. arXiv preprint arXiv:2407.06619.
Parameters:
----------------
- theta: float
desired confidence level.
- spec: str, optional
specification of the model (SAV, AS, GARCH). Default is AS.
- lambdas: dict, optional
lambdas for the soft constraints. Default is {'r':10}.
Example of usage
----------------
.. code-block:: python
import numpy as np
from models.special import B_CAESar #Import the model
y = np.random.randn(1500) #Replace with your data
tv = 1250 #Training set length
theta = 0.05 #Set the desired confidence level
mdl = B_CAESar(theta, 'AS') # Initialize the model
res = mdl.fit_predict(y, tv, seed=2) # Fit and predict
q_pred = res['qf'] #Quantile forecast
es_pred = res['ef'] #Expected shortfall forecast
Methods:
----------------
'''
def __init__(self, theta, spec='AS', lambdas=dict()):
self.theta = theta #Initialize theta
self.lambdas = {'r':10}
self.lambdas.update(lambdas) #Initialize lambdas for soft constraints
# According to the desired model, initialize the number of parameters and the loop
if spec == 'SAV':
self.mdl_spec = 'SAV'
self.n_parameters = 4
self.loop = self.R_SAVloop
self.joint_loop = self.Joint_SAVloop
elif spec == 'AS':
self.mdl_spec = 'AS'
self.n_parameters = 5
self.loop = self.R_ASloop
self.joint_loop = self.Joint_ASloop
elif spec == 'GARCH':
self.mdl_spec = 'GARCH'
self.n_parameters = 4
self.loop = self.R_GARCHloop
self.joint_loop = self.Joint_GARCHloop
else:
raise ValueError(f'Specification {spec} not recognized!\nChoose between "SAV", "AS", and "GARCH"')
def loss_function(self, v, r, y):
'''
Compute the loss function (Barrera) of the model.
INPUTS:
- v: ndarray
Value at Risk.
- r: ndarray
difference between Expected Shortfall forecast and Value at Risk.
- y: ndarray
target time series.
OUTPUTS:
- float
loss value.
:meta private:
'''
loss_val = np.mean(
(r - np.where(y<v, (y-v)/self.theta, 0))**2
) + self.lambdas['r'] * np.mean( np.where(r>0, r, 0) )
return loss_val
def R_SAVloop(self, beta, y, q, r0, pred_mode=False):
'''
Loop for the ^r estimation via SAV specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (4,).
- y: ndarray
target time series.
- q: ndarray
quantile forecast.
- r0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], ^r[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
difference between the Expected Shortfall forecast and the quantile.
:meta private:
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and r at step -1 and we need to compute r at step 0
r = list()
r.append( beta[0] + beta[1] * np.abs(r0[0]) + beta[2] * r0[1] + beta[3] * r0[2] ) #In pred_mode, r0 is assumed to be [y[-1], q[-1], r[-1]]
else: #If we are in training mode, we have an approximation of r at step 0
r = [r0]
# Loop
for t in range(1, len(y)):
r.append( beta[0] + beta[1] * np.abs(y[t-1]) + beta[2] * q[t-1] + beta[3] * r[t-1] )
r = np.array(r)
return r
def Joint_SAVloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via SAV specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (8,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
:meta private:
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
q = list()
e = list()
q.append( beta[0] + beta[1] * np.abs(e0[0]) + beta[2] * e0[1] + beta[3] * e0[2] ) #In pred_mode, e0 is assumed to be [y[-1], q[-1], e[-1]]
e.append( beta[4] + beta[5] * np.abs(e0[0]) + beta[6] * e0[1] + beta[7] * e0[2] )
else: #If we are in training mode, we have an approximation of q and e at step 0
q = [q0]
e = [e0]
# Loop
for t in range(1, len(y)):
q.append( beta[0] + beta[1] * np.abs(y[t-1]) + beta[2] * q[t-1] + beta[3] * e[t-1] )
e.append( beta[4] + beta[5] * np.abs(y[t-1]) + beta[6] * q[t-1] + beta[7] * e[t-1] )
q, e = np.array(q), np.array(e)
return q, e
def R_ASloop(self, beta, y, q, r0, pred_mode=False):
'''
Loop for the ^r estimation via AS specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (5,).
- y: ndarray
target time series.
- q: ndarray
quantile forecast.
- r0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], ^r[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
difference between the Expected Shortfall forecast and the quantile.
:meta private:
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute e at step 0
r = list()
r.append( beta[0] + np.where(r0[0]>0, beta[1], beta[2]) * r0[0] + beta[3] * r0[1] + beta[4] * r0[2] ) #In pred_mode, r0 is assumed to be [y[-1], q[-1], r[-1]]
else: #If we are in training mode, we have an approximation of e at step 0
r = [r0]
# Loop
y_plus_coeff = np.where(y>0, beta[1], beta[2]) #Only one between positive and negative y part gives a contribution
for t in range(1, len(y)):
r.append( beta[0] + y_plus_coeff[t-1] * y[t-1] + beta[3] * q[t-1] + beta[4] * r[t-1] )
r = np.array(r)
return r
def Joint_ASloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via AS specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (10,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
:meta private:
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
q = list()
e = list()
q.append( beta[0] + np.where(e0[0]>0, beta[1], beta[2]) * e0[0] + beta[3] * e0[1] + beta[4] * e0[2] ) #In pred_mode, e0 is assumed to be [y[-1], q[-1], e[-1]]
e.append( beta[5] + np.where(e0[0]>0, beta[6], beta[7]) * e0[0] + beta[8] * e0[1] + beta[9] * e0[2] )
else: #If we are in training mode, we have an approximation of q and e at step 0
q = [q0]
e = [e0]
# Loop
y_plus_coeff_q = np.where(y>0, beta[1], beta[2]) #Only one between positive and negative y part gives a contribution
y_plus_coeff_e = np.where(y>0, beta[6], beta[7])
for t in range(1, len(y)):
q.append( beta[0] + y_plus_coeff_q[t-1] * y[t-1] + beta[3] * q[t-1] + beta[4] * e[t-1] )
e.append( beta[5] + y_plus_coeff_e[t-1] * y[t-1] + beta[8] * q[t-1] + beta[9] * e[t-1] )
q, e = np.array(q), np.array(e)
return q, e
def R_GARCHloop(self, beta, y, q, r0, pred_mode=False):
'''
Loop for the ^r estimation via GARCH specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (4,).
- y: ndarray
target time series.
- q: ndarray
quantile forecast.
- r0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], ^r[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
difference between the Expected Shortfall forecast and the quantile.
:meta private:
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute e at step 0
r = list()
r.append( -np.sqrt(beta[0]+beta[1]*r0[0]**2+beta[2]*r0[1]**2+beta[3]*r0[2]**2) ) #In pred_mode, r0 is assumed to be [y[-1], q[-1], r[-1]]
else: #If we are in training mode, we have an approximation of e at step 0
r = [r0]
# Loop
for t in range(1, len(y)):
r.append( -np.sqrt(beta[0]+beta[1]*y[t-1]**2+beta[2]*q[t-1]**2+beta[3]*r[t-1]**2) )
r = np.array(r)
return r
def Joint_GARCHloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via GARCH specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (8,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
:meta private:
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
q = list()
e = list()
q.append( -np.sqrt(beta[0]+beta[1]*e0[0]**2+beta[2]*e0[1]**2+beta[3]*e0[2]**2) ) #In pred_mode, e0 is assumed to be [y[-1], q[-1], e[-1]]
e.append( -np.sqrt(beta[4]+beta[5]*e0[0]**2+beta[6]*e0[1]**2+beta[7]*e0[2]**2) )
else: #If we are in training mode, we have an approximation of q and e at step 0
q = [q0]
e = [e0]
# Loop
for t in range(1, len(y)):
q.append( -np.sqrt(beta[0]+beta[1]*y[t-1]**2+beta[2]*q[t-1]**2+beta[3]*e[t-1]**2) )
e.append( -np.sqrt(beta[4]+beta[5]*y[t-1]**2+beta[6]*q[t-1]**2+beta[7]*e[t-1]**2) )
q, e = np.array(q), np.array(e)
return q, e
def ESloss(self, beta, y, q, r0):
'''
Compute the Barrera loss of the model.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (self.n_parameters,).
- y: ndarray
target time series.
- q: ndarray
quantile forecast.
- r0: float
initial point for the ^r forecast.
OUTPUTS:
- float
loss value.
:meta private:
'''
r = self.loop(beta, y, q, r0)
loss_val = self.loss_function(q, r, y) #Compute loss
return loss_val
def optim4mp(self, yi, qi, r0, beta0, n_rep, pipend):
'''
Optimization routine for multiprocessing.
INPUTS:
- yi: ndarray
target time series.
- qi: ndarray
quantile forecast.
- r0: float
initial point for the ^r forecast.
- beta0: ndarray
initial point for the optimization. The shape is (self.n_parameters,).
- n_rep: int
number of repetitions for the optimization.
- pipend: multiprocessing.connection.Connection
pipe end for communicating multiprocessing.
OUTPUTS:
- None
:meta private:
'''
from scipy.optimize import minimize
# First iteration
res = minimize(
lambda x: self.ESloss(x, yi, qi, r0), beta0,
method='SLSQP', options={'disp':False})
beta_worker, fval_beta_worker, exitflag_worker = res.x, res.fun, int(res.success)
# Iterate until the optimization is successful or the maximum number of repetitions is reached
for _ in range(n_rep):
res = minimize(
lambda x: self.ESloss(x, yi, qi, r0), beta_worker,
method='SLSQP', options={'disp':False})
beta_worker, fval_beta_worker, exitflag_worker = res.x, res.fun, int(res.success)
#If optimization is successful, exit the loop (no need to iterate further repetitions)
if exitflag_worker == 1:
break
# Communicate the results to the main process
pipend.send((beta_worker, fval_beta_worker, exitflag_worker))
[docs]
def fit(self, yi, seed=None, return_train=False, q0=None, nV=102, n_init=3, n_rep=5):
'''
Fit the CAESar model.
INPUTS:
- yi: ndarray
target time series.
- seed: int or None, optional
random seed. Default is None.
- return_train: bool, optional
if True, the function returns the fitted values. Default is False.
- q0: list of float or None, optional
[initial quantile, initial expected shortfall]. If None, the initial quantile
is computed as the empirical quantile in the first 10% of the
training set; the initial expected shortfall as the tail mean in the same
subset. Default is None.
- nV: int, optional
number of random initializations of the model coefficients. Default is 102.
- n_init: int, optional
number of best initializations to work with. Default is 3.
- n_rep: int, optional
number of repetitions for the optimization. Default is 5.
OUTPUTS:
- qi: ndarray
Quantile forecast in the training set (if return_train=True).
- ei: ndarray
Expected Shortfall forecast in the training set (if return_train=True).
- beta: ndarray
optimized coefficients of the model. The shape is
(2, n_parameters) (if return_train=True).
'''
#-------------------- Step 0: CAViaR for quantile initial guess
cav_res = CAViaR(
self.theta, self.mdl_spec).fit(
yi, seed=seed, return_train=True, q0=q0) #Train CAViaR
qi, beta_cav = cav_res['qi'], cav_res['beta']
del(cav_res)
#-------------------- Step 1: Initialization
if isinstance(yi, list):
yi = np.array(yi)
if isinstance(q0, type(None)):
#The starting forecast ^q_0 and ^e_0 is the empricial quantile of the first part of the trainig set (for computational reason)
n_emp = int(np.ceil(0.1 * len(yi))) #Select onyl 1/10 of the training set
if round(n_emp * self.theta) == 0: n_emp = len(yi) #In case the training dimension is too small wrt theta
y_sort = np.sort(yi[:n_emp])
quantile0 = int(round(n_emp * self.theta))-1
if quantile0 < 0: quantile0 = 0
e0 = np.mean(y_sort[:quantile0+1])
q0 = y_sort[quantile0]
else:
q0 = q0[0]
e0 = q0[1]
#-------------------- Step 2: Initial guess
np.random.seed(seed)
nInitialVectors = [nV//3, self.n_parameters] #Define the shape of the random initializations
beta0 = [np.random.uniform(0, 1, nInitialVectors)]
beta0.append(np.random.uniform(-1, 1, nInitialVectors))
beta0.append(np.random.randn(*nInitialVectors))
beta0 = np.concatenate(beta0, axis=0)
AEfval = np.empty(nV) #Initialize the loss function vector
#Iterates over the random initializations
for i in range(nV):
AEfval[i] = self.ESloss(beta0[i, :], yi, qi, e0-q0) #Compute starting loss
beta0 = beta0[AEfval.argsort()][0:n_init] #Sort initializations by loss and select only the n_init best initializations
#-------------------- Step 3: Optimization Routine - Only Barrera
beta = np.empty((n_init, self.n_parameters)) #Initialize the parameters vector
fval_beta = np.empty(n_init) #Initialize the loss function vector
exitflag = np.empty(n_init) #Initialize the exit flag vector
# Multiprocessing: create and start worker processes
workers = list()
for i in range(n_init):
parent_pipend, child_pipend = mp.Pipe() # Create a pipe to communicate with the worker
worker = mp.Process(target=self.optim4mp,
args=(yi, qi, e0-q0, beta0[i, :],
n_rep, child_pipend))
workers.append([worker, parent_pipend])
worker.start()
# Gather results from workers
for i, worker_list in enumerate(workers):
worker, parent_pipend = worker_list
beta_worker, fval_beta_worker, exitflag_worker =\
parent_pipend.recv() # Get the result from the worker
worker.join()
beta[i, :] = beta_worker
fval_beta[i] = fval_beta_worker
exitflag[i] = exitflag_worker
ind_min = np.argmin(fval_beta) #Select the index of the best loss
self.beta = beta[ind_min, :] #Store the best parameters
self.fval_beta = fval_beta[ind_min] #Store the best loss
self.exitflag = exitflag[ind_min] #Store the best exit flag
# Concatenate ^q and ^r parameters, then convert ^r into ^e parameters
joint_beta = np.concatenate([beta_cav, [0], self.beta]) #Concatenate
if self.mdl_spec == 'SAV':
joint_beta[4] += joint_beta[0]
joint_beta[5] += joint_beta[1]
joint_beta[6] += joint_beta[2] - joint_beta[7]
elif self.mdl_spec == 'AS':
joint_beta[5] += joint_beta[0]
joint_beta[6] += joint_beta[1]
joint_beta[7] += joint_beta[2]
joint_beta[8] += joint_beta[3] - joint_beta[9]
elif self.mdl_spec == 'GARCH':
joint_beta[4] += joint_beta[0]
joint_beta[5] += joint_beta[1]
joint_beta[6] += joint_beta[2] + joint_beta[7]
self.beta = joint_beta
#Compute the fit output: optimal beta vector, optimization info, and the last pair ^q, ^e
#in sample variables
qi, ei = self.joint_loop(self.beta, yi, q0, e0)
self.last_state = [yi[-1], qi[-1], ei[-1]]
if return_train:
return {'qi':qi, 'ei':ei, 'beta':self.beta.reshape((2, self.n_parameters))}
[docs]
def predict(self, yf=np.array(list())):
'''
Predict the quantile.
INPUTS:
- yf: ndarray, optional
test data. If yf is not empty, the internal state is updated
with the last observation. Default is an empty list.
OUTPUTS:
- qf: ndarray
Quantile forecast in the test set.
- ef: ndarray
Expected Shortfall forecast in the test set.
'''
qf, ef = self.joint_loop(self.beta, yf, None, self.last_state, pred_mode=True)
if len(yf) > 0:
self.last_state = [yf[-1], qf[-1], ef[-1]]
return {'qf':qf, 'ef':ef}
[docs]
def fit_predict(self, y, ti, seed=None, return_train=True, q0=None, nV=102, n_init=3, n_rep=5):
'''
Fit and predict the CAESar model.
INPUTS:
- y: ndarray
target time series.
- ti: int
train set length.
- seed: int or None, optional
random seed. Default is None.
- return_train: bool, optional
return the train set. Default is True.
- q0: list of float or None, optional
[initial quantile, initial expected shortfall]. If None, the initial quantile
is computed as the empirical quantile in the first 10% of the
training set; the initial expected shortfall as the tail mean in the same
subset. Default is None.
- nV: int, optional
number of random initializations of the model coefficients. Default is 102.
- n_init: int, optional
number of best initializations to work with. Default is 3.
- n_rep: int, optional
number of repetitions for the optimization. Default is 5.
OUTPUTS:
- qi: ndarray
Quantile forecast in the training set (if return_train=True).
- ei: ndarray
Expected Shortfall forecast in the training set (if return_train=True).
- qf: ndarray
Quantile forecast in the test set.
- ef: ndarray
Expected Shortfall forecast in the test set.
- beta: ndarray
optimized coefficients of the model. The shape is (2, n_parameters).
'''
yi, yf = y[:ti], y[ti:] #Split train and test
if return_train:
res_train = self.fit(yi, seed=seed, return_train=True, q0=q0) #Train AE
res_test = self.predict(yf)
return {'qi':res_train['qi'], 'ei':res_train['ei'],
'qf':res_test['qf'], 'ef':res_test['ef'],
'beta':self.beta.reshape((2, self.n_parameters))} #Return prediction
else:
self.fit(yi, seed=seed, return_train=False, q0=q0) #Train AE
res_test = self.predict(yf)
return {'qf':res_test['qf'], 'ef':res_test['ef'],
'beta':self.beta.reshape((2, self.n_parameters))} #Return prediction
[docs]
class P_CAESar():
'''
CAESar model for Expected Shortfall estimation - optimization only with Patton loss. See [1], Supplementary Material B, for further details.
[1] Gatta, F., Lillo, F., & Mazzarisi, P. (2024). CAESar: Conditional Autoregressive Expected Shortfall. arXiv preprint arXiv:2407.06619.
Parameters:
----------------
- theta: float
desired confidence level.
- spec: str, optional
specification of the model (SAV, AS, GARCH). Default is AS.
- lambdas: dict, optional
lambdas for the soft constraints. Default is {'q':10, 'e':10}.
Example of usage
----------------
.. code-block:: python
import numpy as np
from models.special import P_CAESar #Import the model
y = np.random.randn(1500) #Replace with your data
tv = 1250 #Training set length
theta = 0.05 #Set the desired confidence level
mdl = P_CAESar(theta, 'AS') # Initialize the model
res = mdl.fit_predict(y, tv, seed=2) # Fit and predict
q_pred = res['qf'] #Quantile forecast
es_pred = res['ef'] #Expected shortfall forecast
Methods:
----------------
'''
def __init__(self, theta, spec='AS', lambdas=dict()):
self.theta = theta #Initialize theta
self.lambdas = {'q':10, 'e':10}
self.lambdas.update(lambdas) #Initialize lambdas for soft constraints
# According to the desired model, initialize the number of parameters and the loop
if spec == 'SAV':
self.mdl_spec = 'SAV'
self.n_parameters = 4
self.joint_loop = self.Joint_SAVloop
elif spec == 'AS':
self.mdl_spec = 'AS'
self.n_parameters = 5
self.joint_loop = self.Joint_ASloop
elif spec == 'GARCH':
self.mdl_spec = 'GARCH'
self.n_parameters = 4
self.joint_loop = self.Joint_GARCHloop
else:
raise ValueError(f'Specification {spec} not recognized!\nChoose between "SAV", "AS", and "GARCH"')
def joint_loss(self, v, e, y):
'''
Compute the loss function (Patton) of the model.
INPUTS:
- v: ndarray
Value at Risk forecast.
- e: ndarray
Expected Shortfall forecast.
- y: ndarray
target time series.
OUTPUTS:
- float
Patton loss value.
:meta private:
'''
loss_val = np.mean(
np.where(y<=v, (y-v)/(self.theta*e), 0) + v/e + np.log(-e)
) + self.lambdas['e'] * np.mean( np.where(e>v, e-v, 0) ) +\
self.lambdas['q'] * np.mean( np.where(v>0, v, 0) )
return loss_val
def Joint_SAVloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via SAV specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (8,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
:meta private:
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
q = list()
e = list()
q.append( beta[0] + beta[1] * np.abs(e0[0]) + beta[2] * e0[1] + beta[3] * e0[2] ) #In pred_mode, e0 is assumed to be [y[-1], q[-1], e[-1]]
e.append( beta[4] + beta[5] * np.abs(e0[0]) + beta[6] * e0[1] + beta[7] * e0[2] )
else: #If we are in training mode, we have an approximation of q and e at step 0
q = [q0]
e = [e0]
# Loop
for t in range(1, len(y)):
q.append( beta[0] + beta[1] * np.abs(y[t-1]) + beta[2] * q[t-1] + beta[3] * e[t-1] )
e.append( beta[4] + beta[5] * np.abs(y[t-1]) + beta[6] * q[t-1] + beta[7] * e[t-1] )
q, e = np.array(q), np.array(e)
return q, e
def Joint_ASloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via AS specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (10,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
:meta private:
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
q = list()
e = list()
q.append( beta[0] + np.where(e0[0]>0, beta[1], beta[2]) * e0[0] + beta[3] * e0[1] + beta[4] * e0[2] ) #In pred_mode, e0 is assumed to be [y[-1], q[-1], e[-1]]
e.append( beta[5] + np.where(e0[0]>0, beta[6], beta[7]) * e0[0] + beta[8] * e0[1] + beta[9] * e0[2] )
else: #If we are in training mode, we have an approximation of q and e at step 0
q = [q0]
e = [e0]
# Loop
y_plus_coeff_q = np.where(y>0, beta[1], beta[2]) #Only one between positive and negative y part gives a contribution
y_plus_coeff_e = np.where(y>0, beta[6], beta[7])
for t in range(1, len(y)):
q.append( beta[0] + y_plus_coeff_q[t-1] * y[t-1] + beta[3] * q[t-1] + beta[4] * e[t-1] )
e.append( beta[5] + y_plus_coeff_e[t-1] * y[t-1] + beta[8] * q[t-1] + beta[9] * e[t-1] )
q, e = np.array(q), np.array(e)
return q, e
def Joint_GARCHloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via GARCH specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (8,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
:meta private:
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
q = list()
e = list()
q.append( -np.sqrt(beta[0]+beta[1]*e0[0]**2+beta[2]*e0[1]**2+beta[3]*e0[2]**2) ) #In pred_mode, e0 is assumed to be [y[-1], q[-1], e[-1]]
e.append( -np.sqrt(beta[4]+beta[5]*e0[0]**2+beta[6]*e0[1]**2+beta[7]*e0[2]**2) )
else: #If we are in training mode, we have an approximation of q and e at step 0
q = [q0]
e = [e0]
# Loop
for t in range(1, len(y)):
q.append( -np.sqrt(beta[0]+beta[1]*y[t-1]**2+beta[2]*q[t-1]**2+beta[3]*e[t-1]**2) )
e.append( -np.sqrt(beta[4]+beta[5]*y[t-1]**2+beta[6]*q[t-1]**2+beta[7]*e[t-1]**2) )
q, e = np.array(q), np.array(e)
return q, e
def Jointloss(self, beta, y, q0, e0):
'''
Compute the Patton loss of the model.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (2*self.n_parameters,).
- y: ndarray
target time series.
- q0: float
initial point for the ^q forecast.
- e0: float
initial point for the ^e forecast.
OUTPUTS:
- float
loss value.
:meta private:
'''
q, e = self.joint_loop(beta, y, q0, e0)
loss_val = self.joint_loss(q, e, y) #Compute loss
return loss_val
def joint_fit4mp(self, yi, q0, e0, beta0, n_rep, pipend):
'''
Joint optimization routine.
INPUTS:
- yi: ndarray
target time series.
- q0: float
initial point for the ^q forecast.
- e0: float
initial point for the ^e forecast.
- beta0: ndarray
initial point for the optimization. The shape is (2*self.n_parameters,).
- n_rep: int
number of repetitions for the optimization.
- pipend: multiprocessing.connection.Connection
pipe end for communicating multiprocessing.
OUTPUTS:
- None.
:meta private:
'''
from scipy.optimize import minimize
# First iteration
res = minimize(
lambda x: self.Jointloss(x, yi, q0, e0), beta0,
method='SLSQP', options={'disp':False})
beta_worker, fval_beta_worker, exitflag_worker = res.x, res.fun, int(res.success)
# Iterate until the optimization is successful or the maximum number of repetitions is reached
for _ in range(n_rep):
res = minimize(
lambda x: self.Jointloss(x, yi, q0, e0), beta_worker,
method='SLSQP', options={'disp':False})
beta_worker, fval_beta_worker, exitflag_worker = res.x, res.fun, int(res.success)
#If optimization is successful, exit the loop (no need to iterate further repetitions)
if exitflag_worker == 1:
break
# Communicate the results to the main process
pipend.send((beta_worker, fval_beta_worker, exitflag_worker))
[docs]
def fit(self, yi, seed=None, return_train=False, q0=None, nV=102, n_init=3, n_rep=5):
'''
Fit the CAESar model.
INPUTS:
- yi: ndarray
target time series.
- seed: int or None, optional
random seed. Default is None.
- return_train: bool, optional
if True, the function returns the fitted values. Default is False.
- q0: list of float or None, optional
[initial quantile, initial expected shortfall]. If None, the initial quantile
is computed as the empirical quantile in the first 10% of the
training set; the initial expected shortfall as the tail mean in the same
subset. Default is None.
- nV: int, optional
number of random initializations of the model coefficients. Default is 102.
- n_init: int, optional
number of best initializations to work with. Default is 3.
- n_rep: int, optional
number of repetitions for the optimization. Default is 5.
OUTPUTS:
- qi: ndarray
Quantile forecast in the training set (if return_train=True).
- ei: ndarray
Expected Shortfall forecast in the training set (if return_train=True).
- beta: ndarray
optimized coefficients of the model. The shape is
(2, n_parameters) (if return_train=True).
'''
#-------------------- Step 0: CAViaR for quantile initial guess
cav_res = CAViaR(
self.theta, self.mdl_spec).fit(
yi, seed=seed, return_train=True, q0=q0) #Train CAViaR
qi, beta_cav = cav_res['qi'], cav_res['beta']
del(cav_res)
#-------------------- Step 1: Initialization
if isinstance(yi, list):
yi = np.array(yi)
if isinstance(q0, type(None)):
#The starting forecast ^q_0 and ^e_0 is the empricial quantile of the first part of the trainig set (for computational reason)
n_emp = int(np.ceil(0.1 * len(yi))) #Select onyl 1/10 of the training set
if round(n_emp * self.theta) == 0: n_emp = len(yi) #In case the training dimension is too small wrt theta
y_sort = np.sort(yi[:n_emp])
quantile0 = int(round(n_emp * self.theta))-1
if quantile0 < 0: quantile0 = 0
e0 = np.mean(y_sort[:quantile0+1])
q0 = y_sort[quantile0]
else:
q0 = q0[0]
e0 = q0[1]
#-------------------- Step 2: Initial guess
np.random.seed(seed)
nInitialVectors = [nV//3, self.n_parameters] #Define the shape of the random initializations
beta0 = [np.random.uniform(0, 1, nInitialVectors)]
beta0.append(np.random.uniform(-1, 1, nInitialVectors))
beta0.append(np.random.randn(*nInitialVectors))
beta0 = np.concatenate(beta0, axis=0)
beta0 = np.concatenate([np.concatenate([beta_cav.reshape(1,-1)]*nV,
axis=0), np.zeros((nV,1)), beta0], axis=1)
AEfval = np.empty(nV) #Initialize the loss function vector
#Iterates over the random initializations
for i in range(nV):
AEfval[i] = self.Jointloss(beta0[i, :], yi, q0, e0) #Compute starting loss
beta0 = beta0[AEfval.argsort()][0:n_init] #Sort initializations by loss and select only the n_init best initializations
#-------------------- Step 3: Optimization Routine
beta = np.empty((n_init, 2*self.n_parameters)) #Initialize the parameters vector
fval_beta = np.empty(n_init) #Initialize the loss function vector
exitflag = np.empty(n_init) #Initialize the exit flag vector
# Multiprocessing: create and start worker processes
workers = list()
for i in range(n_init):
parent_pipend, child_pipend = mp.Pipe() # Create a pipe to communicate with the worker
worker = mp.Process(target=self.joint_fit4mp,
args=(yi, q0, e0, beta0[i, :],
n_rep, child_pipend))
workers.append([worker, parent_pipend])
worker.start()
# Gather results from workers
for i, worker_list in enumerate(workers):
worker, parent_pipend = worker_list
beta_worker, fval_beta_worker, exitflag_worker =\
parent_pipend.recv() # Get the result from the worker
worker.join()
beta[i, :] = beta_worker
fval_beta[i] = fval_beta_worker
exitflag[i] = exitflag_worker
ind_min = np.argmin(fval_beta) #Select the index of the best loss
self.beta = beta[ind_min, :] #Store the best parameters
self.fval_beta = fval_beta[ind_min] #Store the best loss
self.exitflag = exitflag[ind_min] #Store the best exit flag
#Compute the fit output: optimal beta vector, optimization info, and the last pair ^q, ^e
#in sample variables
qi, ei = self.joint_loop(self.beta, yi, q0, e0)
self.last_state = [yi[-1], qi[-1], ei[-1]]
if return_train:
return {'qi':qi, 'ei':ei, 'beta':self.beta.reshape((2, self.n_parameters))}
[docs]
def predict(self, yf=np.array(list())):
'''
Predict the quantile.
INPUTS:
- yf: ndarray, optional
test data. If yf is not empty, the internal state is updated
with the last observation. Default is an empty list.
OUTPUTS:
- qf: ndarray
Quantile forecast in the test set.
- ef: ndarray
Expected Shortfall forecast in the test set.
'''
qf, ef = self.joint_loop(self.beta, yf, None, self.last_state, pred_mode=True)
if len(yf) > 0:
self.last_state = [yf[-1], qf[-1], ef[-1]]
return {'qf':qf, 'ef':ef}
[docs]
def fit_predict(self, y, ti, seed=None, return_train=True, q0=None, nV=102, n_init=3, n_rep=5):
'''
Fit and predict the CAESar model.
INPUTS:
- y: ndarray
target time series.
- ti: int
train set length.
- seed: int or None, optional
random seed. Default is None.
- return_train: bool, optional
return the train set. Default is True.
- q0: list of float or None, optional
[initial quantile, initial expected shortfall]. If None, the initial quantile
is computed as the empirical quantile in the first 10% of the
training set; the initial expected shortfall as the tail mean in the same
subset. Default is None.
- nV: int, optional
number of random initializations of the model coefficients. Default is 102.
- n_init: int, optional
number of best initializations to work with. Default is 3.
- n_rep: int, optional
number of repetitions for the optimization. Default is 5.
OUTPUTS:
- qi: ndarray
Quantile forecast in the training set (if return_train=True).
- ei: ndarray
Expected Shortfall forecast in the training set (if return_train=True).
- qf: ndarray
Quantile forecast in the test set.
- ef: ndarray
Expected Shortfall forecast in the test set.
- beta: ndarray
optimized coefficients of the model. The shape is (2, n_parameters).
'''
yi, yf = y[:ti], y[ti:] #Split train and test
if return_train:
res_train = self.fit(yi, seed=seed, return_train=True, q0=q0) #Train AE
res_test = self.predict(yf)
return {'qi':res_train['qi'], 'ei':res_train['ei'], 'qf':res_test['qf'], 'ef':res_test['ef'],
'beta':self.beta.reshape((2, self.n_parameters))} #Return prediction
else:
self.fit(yi, seed=seed, return_train=False, q0=q0) #Train AE
res_test = self.predict(yf)
return {'qf':res_test['qf'], 'ef':res_test['ef'],
'beta':self.beta.reshape((2, self.n_parameters))} #Return prediction
[docs]
class CAESar_No_Cross():
'''
CAESar model for Expected Shortfall estimation - No cross term ^q <-> ^e. See [1], Supplementary Material A, for further details.
[1] Gatta, F., Lillo, F., & Mazzarisi, P. (2024). CAESar: Conditional Autoregressive Expected Shortfall. arXiv preprint arXiv:2407.06619.
Parameters:
----------------
- theta: float
desired confidence level.
- spec: str, optional
specification of the model (SAV, AS, GARCH). Default is AS.
- lambdas: dict, optional
lambdas for the soft constraints. Default is {'q':10, 'e':10}.
Example of usage
----------------
.. code-block:: python
import numpy as np
from models.special import CAESar_No_Cross #Import the model
y = np.random.randn(1500) #Replace with your data
tv = 1250 #Training set length
theta = 0.05 #Set the desired confidence level
mdl = CAESar_No_Cross(theta, 'AS') # Initialize the model
res = mdl.fit_predict(y, tv, seed=2) # Fit and predict
q_pred = res['qf'] #Quantile forecast
es_pred = res['ef'] #Expected shortfall forecast
Methods:
----------------
'''
def __init__(self, theta, spec='AS', lambdas=dict()):
self.theta = theta #Initialize theta
self.lambdas = {'q':10, 'e':10}
self.lambdas.update(lambdas) #Initialize lambdas for soft constraints
# According to the desired model, initialize the number of parameters and the loop
if spec == 'SAV':
self.mdl_spec = 'SAV'
self.n_parameters = 3
self.joint_loop = self.Joint_SAVloop
elif spec == 'AS':
self.mdl_spec = 'AS'
self.n_parameters = 4
self.joint_loop = self.Joint_ASloop
elif spec == 'GARCH':
self.mdl_spec = 'GARCH'
self.n_parameters = 3
self.joint_loop = self.Joint_GARCHloop
else:
raise ValueError(f'Specification {spec} not recognized!\nChoose between "SAV", "AS", and "GARCH"')
def joint_loss(self, v, e, y):
'''
Compute the loss function (Patton) of the model.
INPUTS:
- v: ndarray
Value at Risk forecast.
- e: ndarray
Expected Shortfall forecast.
- y: ndarray
target time series.
OUTPUTS:
- float
Patton loss value.
:meta private:
'''
loss_val = np.mean(
np.where(y<=v, (y-v)/(self.theta*e), 0) + v/e + np.log(-e)
) + self.lambdas['e'] * np.mean( np.where(e>v, e-v, 0) ) +\
self.lambdas['q'] * np.mean( np.where(v>0, v, 0) )
return loss_val
def Joint_SAVloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via SAV specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (8,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
:meta private:
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
q = list()
e = list()
q.append( beta[0] + beta[1] * np.abs(e0[0]) + beta[2] * e0[1] ) #In pred_mode, e0 is assumed to be [y[-1], q[-1], e[-1]]
e.append( beta[3] + beta[4] * np.abs(e0[0]) + beta[5] * e0[2] )
else: #If we are in training mode, we have an approximation of q and e at step 0
q = [q0]
e = [e0]
# Loop
for t in range(1, len(y)):
q.append( beta[0] + beta[1] * np.abs(y[t-1]) + beta[2] * q[t-1] )
e.append( beta[3] + beta[4] * np.abs(y[t-1]) + beta[5] * e[t-1] )
q, e = np.array(q), np.array(e)
return q, e
def Joint_ASloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via AS specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (10,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
:meta private:
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
q = list()
e = list()
q.append( beta[0] + np.where(e0[0]>0, beta[1], beta[2]) * e0[0] + beta[3] * e0[1] ) #In pred_mode, e0 is assumed to be [y[-1], q[-1], e[-1]]
e.append( beta[4] + np.where(e0[0]>0, beta[5], beta[6]) * e0[0] + beta[7] * e0[2] )
else: #If we are in training mode, we have an approximation of q and e at step 0
q = [q0]
e = [e0]
# Loop
y_plus_coeff_q = np.where(y>0, beta[1], beta[2]) #Only one between positive and negative y part gives a contribution
y_plus_coeff_e = np.where(y>0, beta[5], beta[6])
for t in range(1, len(y)):
q.append( beta[0] + y_plus_coeff_q[t-1] * y[t-1] + beta[3] * q[t-1] )
e.append( beta[4] + y_plus_coeff_e[t-1] * y[t-1] + beta[7] * e[t-1] )
q, e = np.array(q), np.array(e)
return q, e
def Joint_GARCHloop(self, beta, y, q0, e0, pred_mode=False):
'''
Loop for the joint ^q and ^e estimation via GARCH specification.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (8,).
- y: ndarray
target time series.
- q0: float
if pred_mode=False, it contains ^q starting point. Otherwise, it is not used.
- e0: float or list of floats
if pred_mode=False, r0 is a float describing initial point for the
^r forecast. If pred_mode=True, r0 is a list of floats describing the
last state of the system, that is r0=[y[-1], q[-1], e[-1]].
- pred_mode: bool, optional
if True, the loop is in prediction mode and r0 is assumed to contain
the last state. Default is False.
OUTPUTS:
- ndarray
quantile forecast.
- ndarray
Expected Shortfall forecast.
:meta private:
'''
# Initial point
if pred_mode: #If we are in prediction mode, we have y, q, and e at step -1 and we need to compute q and e at step 0
q = list()
e = list()
q.append( -np.sqrt(beta[0]+beta[1]*e0[0]**2+beta[2]*e0[1]**2) ) #In pred_mode, e0 is assumed to be [y[-1], q[-1], e[-1]]
e.append( -np.sqrt(beta[3]+beta[4]*e0[0]**2+beta[5]*e0[2]**2) )
else: #If we are in training mode, we have an approximation of q and e at step 0
q = [q0]
e = [e0]
# Loop
for t in range(1, len(y)):
q.append( -np.sqrt(beta[0]+beta[1]*y[t-1]**2+beta[2]*q[t-1]**2) )
e.append( -np.sqrt(beta[3]+beta[4]*y[t-1]**2+beta[5]*e[t-1]**2) )
q, e = np.array(q), np.array(e)
return q, e
def Jointloss(self, beta, y, q0, e0):
'''
Compute the Patton loss of the model.
INPUTS:
- beta: ndarray
parameters of the model. The shape is (2*self.n_parameters,).
- y: ndarray
target time series.
- q0: float
initial point for the ^q forecast.
- e0: float
initial point for the ^e forecast.
OUTPUTS:
- float
loss value.
:meta private:
'''
q, e = self.joint_loop(beta, y, q0, e0)
loss_val = self.joint_loss(q, e, y) #Compute loss
return loss_val
def joint_fit4mp(self, yi, q0, e0, beta0, n_rep, pipend):
'''
Joint optimization routine.
INPUTS:
- yi: ndarray
target time series.
- q0: float
initial point for the ^q forecast.
- e0: float
initial point for the ^e forecast.
- beta0: ndarray
initial point for the optimization. The shape is (2*self.n_parameters,).
- n_rep: int
number of repetitions for the optimization.
- pipend: multiprocessing.connection.Connection
pipe end for communicating multiprocessing.
OUTPUTS:
- None.
:meta private:
'''
from scipy.optimize import minimize
# First iteration
res = minimize(
lambda x: self.Jointloss(x, yi, q0, e0), beta0,
method='SLSQP', options={'disp':False})
beta_worker, fval_beta_worker, exitflag_worker = res.x, res.fun, int(res.success)
# Iterate until the optimization is successful or the maximum number of repetitions is reached
for _ in range(n_rep):
res = minimize(
lambda x: self.Jointloss(x, yi, q0, e0), beta_worker,
method='SLSQP', options={'disp':False})
beta_worker, fval_beta_worker, exitflag_worker = res.x, res.fun, int(res.success)
#If optimization is successful, exit the loop (no need to iterate further repetitions)
if exitflag_worker == 1:
break
# Communicate the results to the main process
pipend.send((beta_worker, fval_beta_worker, exitflag_worker))
[docs]
def fit(self, yi, seed=None, return_train=False, q0=None, nV=102, n_init=3, n_rep=5):
'''
Fit the CAESar model.
INPUTS:
- yi: ndarray
target time series.
- seed: int or None, optional
random seed. Default is None.
- return_train: bool, optional
if True, the function returns the fitted values. Default is False.
- q0: list of float or None, optional
[initial quantile, initial expected shortfall]. If None, the initial quantile
is computed as the empirical quantile in the first 10% of the
training set; the initial expected shortfall as the tail mean in the same
subset. Default is None.
- nV: int, optional
number of random initializations of the model coefficients. Default is 102.
- n_init: int, optional
number of best initializations to work with. Default is 3.
- n_rep: int, optional
number of repetitions for the optimization. Default is 5.
OUTPUTS:
- qi: ndarray
Quantile forecast in the training set (if return_train=True).
- ei: ndarray
Expected Shortfall forecast in the training set (if return_train=True).
- beta: ndarray
optimized coefficients of the model. The shape is
(2, self.n_parameters) (if return_train=True).
'''
#-------------------- Step 0: CAViaR for quantile initial guess
cav_res = CAViaR(
self.theta, self.mdl_spec).fit(
yi, seed=seed, return_train=True, q0=q0) #Train CAViaR
qi, beta_cav = cav_res['qi'], cav_res['beta']
del(cav_res)
#-------------------- Step 1: Initialization
if isinstance(yi, list):
yi = np.array(yi)
if isinstance(q0, type(None)):
#The starting forecast ^q_0 and ^e_0 is the empricial quantile of the first part of the trainig set (for computational reason)
n_emp = int(np.ceil(0.1 * len(yi))) #Select onyl 1/10 of the training set
if round(n_emp * self.theta) == 0: n_emp = len(yi) #In case the training dimension is too small wrt theta
y_sort = np.sort(yi[:n_emp])
quantile0 = int(round(n_emp * self.theta))-1
if quantile0 < 0: quantile0 = 0
e0 = np.mean(y_sort[:quantile0+1])
q0 = y_sort[quantile0]
else:
q0 = q0[0]
e0 = q0[1]
#-------------------- Step 2: Initial guess
np.random.seed(seed)
nInitialVectors = [nV//3, self.n_parameters] #Define the shape of the random initializations
beta0 = [np.random.uniform(0, 1, nInitialVectors)]
beta0.append(np.random.uniform(-1, 1, nInitialVectors))
beta0.append(np.random.randn(*nInitialVectors))
beta0 = np.concatenate(beta0, axis=0)
beta0 = np.concatenate([np.concatenate([beta_cav.reshape(1,-1)]*nV,
axis=0), beta0], axis=1)
AEfval = np.empty(nV) #Initialize the loss function vector
#Iterates over the random initializations
for i in range(nV):
AEfval[i] = self.Jointloss(beta0[i, :], yi, q0, e0) #Compute starting loss
beta0 = beta0[AEfval.argsort()][0:n_init] #Sort initializations by loss and select only the n_init best initializations
#-------------------- Step 3: Optimization Routine
beta = np.empty((n_init, 2*self.n_parameters)) #Initialize the parameters vector
fval_beta = np.empty(n_init) #Initialize the loss function vector
exitflag = np.empty(n_init) #Initialize the exit flag vector
# Multiprocessing: create and start worker processes
workers = list()
for i in range(n_init):
parent_pipend, child_pipend = mp.Pipe() # Create a pipe to communicate with the worker
worker = mp.Process(target=self.joint_fit4mp,
args=(yi, q0, e0, beta0[i, :],
n_rep, child_pipend))
workers.append([worker, parent_pipend])
worker.start()
# Gather results from workers
for i, worker_list in enumerate(workers):
worker, parent_pipend = worker_list
beta_worker, fval_beta_worker, exitflag_worker =\
parent_pipend.recv() # Get the result from the worker
worker.join()
beta[i, :] = beta_worker
fval_beta[i] = fval_beta_worker
exitflag[i] = exitflag_worker
ind_min = np.argmin(fval_beta) #Select the index of the best loss
self.beta = beta[ind_min, :] #Store the best parameters
self.fval_beta = fval_beta[ind_min] #Store the best loss
self.exitflag = exitflag[ind_min] #Store the best exit flag
#Compute the fit output: optimal beta vector, optimization info, and the last pair ^q, ^e
#in sample variables
qi, ei = self.joint_loop(self.beta, yi, q0, e0)
self.last_state = [yi[-1], qi[-1], ei[-1]]
if return_train:
return {'qi':qi, 'ei':ei, 'beta':self.beta.reshape((2, self.n_parameters))}
[docs]
def predict(self, yf=np.array(list())):
'''
Predict the quantile.
INPUTS:
- yf: ndarray, optional
test data. If yf is not empty, the internal state is updated
with the last observation. Default is an empty list.
OUTPUTS:
- qf: ndarray
Quantile forecast in the test set.
- ef: ndarray
Expected Shortfall forecast in the test set.
'''
qf, ef = self.joint_loop(self.beta, yf, None, self.last_state, pred_mode=True)
if len(yf) > 0:
self.last_state = [yf[-1], qf[-1], ef[-1]]
return {'qf':qf, 'ef':ef}
[docs]
def fit_predict(self, y, ti, seed=None, return_train=True, q0=None, nV=102, n_init=3, n_rep=5):
'''
Fit and predict the CAESar model.
INPUTS:
- y: ndarray
target time series.
- ti: int
train set length.
- seed: int or None, optional
random seed. Default is None.
- return_train: bool, optional
return the train set. Default is True.
- q0: list of float or None, optional
[initial quantile, initial expected shortfall]. If None, the initial quantile
is computed as the empirical quantile in the first 10% of the
training set; the initial expected shortfall as the tail mean in the same
subset. Default is None.
- nV: int, optional
number of random initializations of the model coefficients. Default is 102.
- n_init: int, optional
number of best initializations to work with. Default is 3.
- n_rep: int, optional
number of repetitions for the optimization. Default is 5.
OUTPUTS:
- qi: ndarray
Quantile forecast in the training set (if return_train=True).
- ei: ndarray
Expected Shortfall forecast in the training set (if return_train=True).
- qf: ndarray
Quantile forecast in the test set.
- ef: ndarray
Expected Shortfall forecast in the test set.
- beta: ndarray
optimized coefficients of the model. The shape is (2, n_parameters).
'''
yi, yf = y[:ti], y[ti:] #Split train and test
if return_train:
res_train = self.fit(yi, seed=seed, return_train=True, q0=q0) #Train AE
res_test = self.predict(yf)
return {'qi':res_train['qi'], 'ei':res_train['ei'], 'qf':res_test['qf'], 'ef':res_test['ef'],
'beta':self.beta.reshape((2, self.n_parameters))} #Return prediction
else:
self.fit(yi, seed=seed, return_train=False, q0=q0) #Train AE
res_test = self.predict(yf)
return {'qf':res_test['qf'], 'ef':res_test['ef'],
'beta':self.beta.reshape((2, self.n_parameters))} #Return prediction