import pandas as np
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
import matplotlib.pyplot as plt
from datetime import timedelta
import pandas as pd
Multivariate GARCH in Tensorflow/Keras
Not investment advice - use at your own risk.
Conditional volatility and correlation using multivariate GARCH on DAX and S&P500. Implementation with Tensorflow/Keras.
Still under development - corresponding blogpost coming soon.
import yfinance as yf
= yf.download("^GDAXI ^GSPC", start="2017-09-10", end="2022-09-10", interval="1d")
data
= data["Close"]
close = np.log(close).diff().dropna()
returns
= plt.subplots(2,1, figsize = (22,5*2))
fig, axs
for i in range(2):
axs[i].plot(returns.iloc[:,i])=0.5)
axs[i].grid(alpha=0)
axs[i].margins(x"{} - log-returns".format(returns.columns[i]),size=20) axs[i].set_title(
[*********************100%***********************] 2 of 2 completed
= returns.rolling(60,min_periods=0).corr()
rolling_corrs = rolling_corrs["^GDAXI"][rolling_corrs.index.get_level_values(1)=="^GSPC"]
gdaxi_sp500_rollcorr
= (22,5))
plt.figure(figsize
"60 day rolling correlation - DAX vs. S&P500",size=20)
plt.title(30:],gdaxi_sp500_rollcorr.values[30:],c="green", label="60 day rolling correlation")
plt.plot(returns.index[=0.5)
plt.grid(alpha=0) plt.margins(x
class MGARCH_DCC(tf.keras.Model):
"""
Tensorflow/Keras implementation of multivariate GARCH under dynamic conditional correlation (DCC) specification.
Further reading:
- Engle, Robert. "Dynamic conditional correlation: A simple class of multivariate generalized autoregressive conditional heteroskedasticity models."
- Bollerslev, Tim. "Modeling the Coherence in Short-Run Nominal Exchange Rates: A Multi-variate Generalized ARCH Model."
- Lütkepohl, Helmut. "New introduction to multiple time series analysis."
"""
def __init__(self, y):
"""
Args:
y: NxM numpy.array of N observations of M correlated time-series
"""
super().__init__()
= y.shape[1]
n_dims self.n_dims = n_dims
self.MU = tf.Variable(np.mean(y,0)) #use a mean variable
self.sigma0 = tf.Variable(np.std(y,0)) #initial standard deviations at t=0
#we initialize all restricted parameters to lie inside the desired range
#by keeping the learning rate low, this should result in admissible results
#for more complex models, this might not suffice
self.alpha0 = tf.Variable(np.std(y,0))
self.alpha = tf.Variable(tf.zeros(shape=(n_dims,))+0.25)
self.beta = tf.Variable(tf.zeros(shape=(n_dims,))+0.25)
self.L0 = tf.Variable(np.float32(np.linalg.cholesky(np.corrcoef(y.T)))) #decomposition of A_0
self.A = tf.Variable(tf.zeros(shape=(1,))+0.9)
self.B = tf.Variable(tf.zeros(shape=(1,))+0.05)
def call(self, y):
"""
Args:
y: NxM numpy.array of N observations of M correlated time-series
"""
return self.get_conditional_dists(y)
def get_log_probs(self, y):
"""
Calculate log probabilities for a given matrix of time-series observations
Args:
y: NxM numpy.array of N observations of M correlated time-series
"""
return self.get_conditional_dists(y).log_prob(y)
@tf.function
def get_conditional_dists(self, y):
"""
Calculate conditional distributions for given observations
Args:
y: NxM numpy.array of N observations of M correlated time-series
"""
= tf.shape(y)[0]
T
#create containers for looping
= tf.TensorArray(tf.float32, size = T) #observation mean container
mus = tf.TensorArray(tf.float32, size = T) #observation covariance container
Sigmas
= tf.TensorArray(tf.float32, size = T+1)
sigmas = tf.TensorArray(tf.float32, size = T+1)
us = tf.TensorArray(tf.float32, size = T+1)
Qs
#initialize respective values for t=0
= sigmas.write(0, self.sigma0)
sigmas = tf.transpose(self.L0)@self.L0
A0 = Qs.write(0, A0) #set initial unnormalized correlation equal to mean matrix
Qs = us.write(0, tf.zeros(shape=(self.n_dims,))) #initial observations equal to zero
us
#convenience
= self.sigma0
sigma0 = self.alpha0**2 #ensure positivity
alpha0 = self.alpha
alpha = self.beta
beta
= self.A
A = self.B
B
for t in tf.range(T):
#tm1 = 't minus 1'
#suppress conditioning on past in notation
#1) calculate conditional standard deviations
= us.read(t)
u_tm1 = sigmas.read(t)
sigma_tm1
= (alpha0 + alpha*sigma_tm1**2 + beta*u_tm1**2)**0.5
sigma_t
#2) calculate conditional correlations
= u_tm1/sigma_tm1
u_tm1_standardized
= tf.reshape(u_tm1_standardized, (self.n_dims,1))@tf.reshape(u_tm1_standardized, (1,self.n_dims))
Psi_tilde_tm1
= Qs.read(t)
Q_tm1 = A0 + A*(Q_tm1 - A0) + B*(Psi_tilde_tm1 - A0)
Q_t = self.cov_to_corr(Q_t)
R_t
#3) calculate conditional covariance
= tf.linalg.LinearOperatorDiag(sigma_t)
D_t = D_t@R_t@D_t
Sigma_t
#4) store values for next iteration
= sigmas.write(t+1, sigma_t)
sigmas = us.write(t+1, y[t,:]-self.MU) #we want to model the zero-mean disturbances
us = Qs.write(t+1, Q_t)
Qs
= mus.write(t, self.MU)
mus = Sigmas.write(t, Sigma_t)
Sigmas
return tfp.distributions.MultivariateNormalFullCovariance(mus.stack(), Sigmas.stack())
def cov_to_corr(self, S):
"""
Transforms covariance matrix to a correlation matrix via matrix operations
Args:
S: Symmetric, positive semidefinite covariance matrix (tf.Tensor)
"""
= tf.linalg.LinearOperatorDiag(1/(tf.linalg.diag_part(S)**0.5))
D return D@S@D
def train_step(self, data):
"""
Custom training step to handle keras model.fit given that there is no input-output structure in our model
Args:
S: Symmetric, positive semidefinite covariance matrix (tf.Tensor)
"""
= data
x,y with tf.GradientTape() as tape:
= -tf.math.reduce_mean(self.get_log_probs(y))
loss
= self.trainable_weights
trainable_vars = tape.gradient(loss, trainable_vars)
gradients
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
return {"Current loss": loss}
def sample_forecast(self, y, T_forecast = 30, n_samples=500):
"""
Create forecast samples to use for monte-carlo simulation of quantities of interest about the forecast (e.g. mean, var, corr, etc.)
WARNING: This is not optimized very much and can take some time to run, probably due to Python's slow loops - can likely be improved
Args:
y: numpy.array of training data, used to initialize the forecast values
T_forecast: number of periods to predict (integer)
n_samples: Number of samples to draw (integer)
"""
= tf.shape(y)[0]
T
#create lists for looping; no gradients, thus no tf.TensorArrays needed
#can initialize directly
= []
mus = []
Sigmas
= [tf.zeros(shape=(self.n_dims,))]
us = [self.sigma0]
sigmas = []
Qs
#initialize remaining values for t=0
= tf.transpose(self.L0)@self.L0
A0
Qs.append(A0)
#convenience
= self.sigma0
sigma0 = self.alpha0**2 #ensure positivity
alpha0 = self.alpha
alpha = self.beta
beta
= self.A
A = self.B
B
#'warmup' to initialize latest lagged features
for t in range(T):
#tm1 = 't minus 1'
#suppress conditioning on past in notation
= us[-1]
u_tm1 = sigmas[-1]
sigma_tm1
= (alpha0 + alpha*sigma_tm1**2 + beta*u_tm1**2)**0.5
sigma_t
= u_tm1/sigma_tm1
u_tm1_standardized
= tf.reshape(u_tm1_standardized, (self.n_dims,1))@tf.reshape(u_tm1_standardized, (1,self.n_dims))
Psi_tilde_tm1
= Qs[-1]
Q_tm1 = A0 + A*(Q_tm1 - A0) + B*(Psi_tilde_tm1 - A0)
Q_t = self.cov_to_corr(Q_t)
R_t
= tf.linalg.LinearOperatorDiag(sigma_t)
D_t = D_t@R_t@D_t
Sigma_t
sigmas.append(sigma_t)-self.MU) #we want to model the zero-mean disturbances
us.append(y[t,:]
Qs.append(Q_t)
self.MU)
mus.append(
Sigmas.append(Sigma_t)
#sample containers
= []
y_samples = []
R_samples = []
sigma_samples
for n in range(n_samples):
= []
mus_samp = []
Sigmas_samp
= [sigmas[-1]]
sigmas_samp = [us[-1]]
us_samp = [Qs[-1]]
Qs_samp
#forecast containers
= []
ys_samp = []
sig_samp = []
R_samp
for t in range(T_forecast):
= us_samp[-1]
u_tm1 = sigmas_samp[-1]
sigma_tm1
= (alpha0 + alpha**2 + beta*u_tm1**2)**0.5
sigma_t
= u_tm1/sigma_tm1
u_tm1_standardized
= tf.reshape(u_tm1_standardized, (self.n_dims,1))@tf.reshape(u_tm1_standardized, (1,self.n_dims))
Psi_tilde_tm1
= Qs_samp[-1]
Q_tm1 = A0 + A*(Q_tm1 - A0) + B*(Psi_tilde_tm1 - A0)
Q_t = self.cov_to_corr(Q_t)
R_t
= tf.linalg.LinearOperatorDiag(sigma_t)
D_t = D_t@R_t@D_t
Sigma_t
sigmas_samp.append(sigma_t)
Qs_samp.append(Q_t)
= tfp.distributions.MultivariateNormalFullCovariance(self.MU, Sigma_t).sample()
ynext 1,1,-1)))
ys_samp.append(tf.reshape(ynext,(1,1,-1)))
sig_samp.append(tf.reshape(sigma_t,(1,1,self.n_dims,self.n_dims)))
R_samp.append(tf.reshape(R_t,(
-self.MU)
us_samp.append(ynext
1))
y_samples.append(tf.concat(ys_samp,1))
R_samples.append(tf.concat(R_samp,1))
sigma_samples.append(tf.concat(sig_samp,
return tf.concat(y_samples,0).numpy(), tf.concat(R_samples,0).numpy(), tf.concat(sigma_samples,0).numpy()
= np.float32(returns)[:-90,:]
train = np.float32(returns)[-90:,:] test
= MGARCH_DCC(train)
model
from scipy.stats import norm
= model(train)
out
= out.mean().numpy()
means = out.stddev().numpy()
stds
= norm(means, stds).ppf(0.05)
lowers = norm(means, stds).ppf(0.95)
uppers
= (16,6))
plt.figure(figsize
= 0
i
= (16,6))
plt.figure(figsize
plt.plot(train[:,i])len(train)),lowers[:,i],uppers[:,i],color="red",alpha=0.5) plt.fill_between(np.arange(
<Figure size 1152x432 with 0 Axes>
= (16,6))
plt.figure(figsize
= [model.cov_to_corr(out.covariance()[i,:,:])[0,1].numpy() for i in range(len(train))]
corr12 ="red") plt.plot(corr12,c
= 1
i
= (16,6))
plt.figure(figsize
plt.plot(train[:,i])len(train)),lowers[:,i],uppers[:,i],color="red",alpha=0.5) plt.fill_between(np.arange(
compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-2)) model.
=len(train), shuffle=False, epochs = 300, verbose=False) model.fit(train, train, batch_size
<keras.callbacks.History at 0x137bf2a40>
= model(train)
out
= out.mean().numpy()
means = out.stddev().numpy()
stds
= norm(means, stds).ppf(0.05)
lowers = norm(means, stds).ppf(0.95) uppers
= 0
i
= (16,6))
plt.figure(figsize
plt.plot(train[:,i])len(train)),lowers[:,i],uppers[:,i],color="red",alpha=0.5) plt.fill_between(np.arange(
= 1
i
= (16,6))
plt.figure(figsize
plt.plot(train[:,i])len(train)),lowers[:,i],uppers[:,i],color="red",alpha=0.5) plt.fill_between(np.arange(
= [model.cov_to_corr(out.covariance()[i,:,:])[0,1].numpy() for i in range(len(train))] corr12
123)
np.random.seed(123)
tf.random.set_seed(
= model.sample_forecast(train,90,1000) fcast
= fcast[1][:,:,0,1]
corrs = np.mean(corrs,0)
corr_means = np.quantile(corrs,0.05,0)
corr_lowers = np.quantile(corrs,0.95,0)
corr_uppers
= model(np.float32(returns.values))
conditional_dists
= [model.cov_to_corr(conditional_dists.covariance()[i,:,:])[0,1].numpy() for i in range(len(returns))]
conditional_correlations
= returns[:-90].index
idx_train = pd.date_range(returns[:-90].index[-1] + timedelta(days=1), returns[:-90].index[-1] + timedelta(days=90))
idx_test
= plt.subplots(2,1,figsize=(20,12), gridspec_kw={'height_ratios': [2, 1]})
fig, axs
0].set_title("Conditional Correlation - DAX, S&P500", size=20)
axs[
0].axhline(np.corrcoef(returns.T)[0,1], c="green",alpha=0.75,ls="dashed",lw=2, label="Unconditional sample correlation")
axs[
0].plot(idx_train[30:],conditional_correlations[30:-90],c="red", label="MGARCH in-sample conditional correlation")
axs[0].plot(idx_test,conditional_correlations[-90:],c="red",ls="dotted",lw=3, label="MGARCH out-of-sample conditional correlation")
axs[
0].plot(idx_test, corr_means,color="blue",lw=3, alpha=0.9, label="MGARCH correlation mean forecast")
axs[0].fill_between(idx_test, corr_lowers, corr_uppers, color="blue", alpha=0.2, label="MGARCH correlation 90% forecast interval")
axs[
0].grid(alpha=0.5)
axs[0].legend(prop = {"size":13})
axs[0].margins(x=0)
axs[
1].set_title("Sanity check: Model predicted VS. rolling correlation",size=20)
axs[1].plot(returns.index[30:],gdaxi_sp500_rollcorr.values[30:],c="green", label="60 day rolling correlation")
axs[1].plot(returns.index[30:],conditional_correlations[30:],c="red", label="MGARCH in-sample conditional correlation")
axs[1].grid(alpha=0.5)
axs[1].legend(prop = {"size":13})
axs[1].margins(x=0) axs[