-
Notifications
You must be signed in to change notification settings - Fork 37
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
327 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
#!/usr/bin/env python3 | ||
""" | ||
HAR (Heterogeneous Autoregressive) Model for Realized Volatility with Forecasting | ||
Usage: | ||
./har_realised_vol.py -h | ||
""" | ||
|
||
from argparse import ArgumentParser, RawDescriptionHelpFormatter | ||
from datetime import timedelta | ||
from typing import Tuple | ||
|
||
import matplotlib.pyplot as plt | ||
import numpy as np | ||
import pandas as pd | ||
import statsmodels.api as sm | ||
import yfinance as yf | ||
from statsmodels.regression.linear_model import OLS | ||
|
||
|
||
def parse_args(): | ||
parser = ArgumentParser( | ||
description=__doc__, formatter_class=RawDescriptionHelpFormatter | ||
) | ||
parser.add_argument("--start", default="2020-01-01", help="Start date (YYYY-MM-DD)") | ||
parser.add_argument("--end", default="2024-01-01", help="End date (YYYY-MM-DD)") | ||
parser.add_argument("--symbol", default="^GSPC", help="Stock symbol") | ||
parser.add_argument( | ||
"--forecast_days", type=int, default=5, help="Number of days to forecast" | ||
) | ||
return parser.parse_args() | ||
|
||
|
||
def calculate_realized_volatility(returns: pd.Series) -> pd.Series: | ||
"""Calculate realized volatility""" | ||
return returns**2 | ||
|
||
|
||
def calculate_har_components(rv: pd.Series) -> pd.DataFrame: | ||
"""Calculate HAR components (daily, weekly, monthly)""" | ||
df = pd.DataFrame(index=rv.index) | ||
df["RV_d"] = rv | ||
df["RV_w"] = rv.rolling(window=5).mean() # weekly | ||
df["RV_m"] = rv.rolling(window=22).mean() # monthly | ||
return df | ||
|
||
|
||
def prepare_har_data(rv: pd.Series) -> Tuple[pd.DataFrame, pd.Series]: | ||
"""Prepare data for HAR model""" | ||
components = calculate_har_components(rv) | ||
|
||
# Shift components by 1 day to avoid look-ahead bias | ||
X = components.shift(1).dropna() | ||
y = rv[X.index] | ||
|
||
return X, y | ||
|
||
|
||
def fit_har_model(rv: pd.Series) -> Tuple[OLS, pd.Series]: | ||
"""Fit HAR model""" | ||
X, y = prepare_har_data(rv) | ||
X = sm.add_constant(X) | ||
model = OLS(y, X).fit() | ||
predictions = model.predict(X) | ||
return model, predictions | ||
|
||
|
||
def create_forecast_features(rv: pd.Series) -> pd.DataFrame: | ||
"""Create features for forecasting""" | ||
features = pd.DataFrame(index=[0]) | ||
# Add features in the same order as training data | ||
features["const"] = 1 # Add constant term explicitly | ||
features["RV_d"] = rv.iloc[-1] | ||
features["RV_w"] = rv.iloc[-5:].mean() | ||
features["RV_m"] = rv.iloc[-22:].mean() | ||
return features | ||
|
||
|
||
def forecast_volatility(model: OLS, rv: pd.Series, forecast_days: int) -> pd.Series: | ||
"""Generate forecasts""" | ||
forecasts = [] | ||
current_rv = rv.copy() | ||
|
||
for i in range(forecast_days): | ||
# Create features for forecasting | ||
features = create_forecast_features(current_rv) | ||
# Remove sm.add_constant since we're adding it explicitly in create_forecast_features | ||
forecast = model.predict(features)[0] | ||
forecasts.append(forecast) | ||
|
||
# Update series with new forecast | ||
new_index = current_rv.index[-1] + timedelta(days=1) | ||
current_rv.loc[new_index] = forecast | ||
|
||
# Create forecast series | ||
forecast_dates = [ | ||
rv.index[-1] + timedelta(days=i + 1) for i in range(len(forecasts)) | ||
] | ||
return pd.Series(forecasts, index=forecast_dates, name="Forecast") | ||
|
||
|
||
def plot_results( | ||
rv: pd.Series, train_rv: pd.Series, predictions: pd.Series, forecasts: pd.Series | ||
): | ||
"""Plot results""" | ||
# Full plot | ||
plt.figure(figsize=(15, 8)) | ||
plt.plot(rv.index, rv, label="Historical RV", alpha=0.5) | ||
plt.plot(predictions.index, predictions, label="In-sample Predictions", alpha=0.5) | ||
plt.plot(forecasts.index, forecasts, "r--", label="Forecasts", linewidth=2) | ||
plt.axvline(x=train_rv.index[-1], color="gray", linestyle="--", alpha=0.5) | ||
plt.legend() | ||
plt.title("HAR Model: Historical, Fitted, and Forecasted Realized Volatility") | ||
|
||
# Recent window plot | ||
plt.figure(figsize=(15, 8)) | ||
days_to_show = 60 | ||
plt.plot( | ||
rv.index[-days_to_show:], | ||
rv.iloc[-days_to_show:], | ||
label="Historical RV", | ||
alpha=0.5, | ||
) | ||
plt.plot(forecasts.index, forecasts, "r--", label="Forecasts", linewidth=2) | ||
plt.legend() | ||
plt.title(f"HAR Model: Last {days_to_show} Days and Forecasts") | ||
|
||
plt.show() | ||
|
||
|
||
def main(args): | ||
# Download data | ||
sp500 = yf.download(args.symbol, start=args.start, end=args.end) | ||
|
||
# Calculate returns and realized volatility | ||
returns = np.log(sp500["Close"]).diff() | ||
rv = calculate_realized_volatility(returns) | ||
|
||
# Split data | ||
train_size = int(len(rv) * 0.8) | ||
train_rv = rv[:train_size] | ||
|
||
# Fit model | ||
model, predictions = fit_har_model(train_rv) | ||
|
||
# Generate forecasts | ||
forecasts = forecast_volatility(model, rv, args.forecast_days) | ||
|
||
# Print results | ||
print("\nModel Summary:") | ||
print(model.summary()) | ||
print("\nVolatility Forecasts:") | ||
print(forecasts) | ||
|
||
# Plot results | ||
plot_results(rv, train_rv, predictions, forecasts) | ||
|
||
|
||
if __name__ == "__main__": | ||
args = parse_args() | ||
main(args) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
#!/usr/bin/env python3 | ||
""" | ||
HARQ Model Implementation with Future Volatility Predictions | ||
""" | ||
|
||
from datetime import datetime, timedelta | ||
|
||
import matplotlib.pyplot as plt | ||
import numpy as np | ||
import pandas as pd | ||
import yfinance as yf | ||
|
||
|
||
class HARQModel: | ||
def __init__(self): | ||
self.params = None | ||
self.beta0 = None | ||
self.beta1 = None | ||
self.beta1q = None | ||
self.beta2 = None | ||
self.beta3 = None | ||
|
||
def compute_realized_volatility(self, returns): | ||
"""Compute realized volatility""" | ||
return np.sqrt(np.sum(returns**2)) * np.sqrt(252) | ||
|
||
def compute_realized_quarticity(self, returns): | ||
"""Compute realized quarticity""" | ||
return np.sum(returns**4) * (252**2) | ||
|
||
def prepare_features(self, rv_series): | ||
"""Prepare HAR features""" | ||
features = pd.DataFrame(index=rv_series.index) | ||
|
||
# Daily (previous day) volatility | ||
features["daily_rv"] = rv_series.shift(1) | ||
|
||
# Weekly (previous 5 days) average volatility | ||
features["weekly_rv"] = rv_series.rolling(window=5).mean().shift(1) | ||
|
||
# Monthly (previous 22 days) average volatility | ||
features["monthly_rv"] = rv_series.rolling(window=22).mean().shift(1) | ||
|
||
return features.fillna(method="ffill") | ||
|
||
def fit(self, returns, rv_series): | ||
"""Fit the HARQ model""" | ||
# Calculate realized quarticity | ||
rq_series = returns.rolling(window=22).apply(self.compute_realized_quarticity) | ||
|
||
# Prepare features | ||
features = self.prepare_features(rv_series) | ||
features["daily_rv_rq"] = features["daily_rv"] * rq_series | ||
|
||
# Remove NaN values | ||
features = features.dropna() | ||
y = rv_series[features.index] | ||
|
||
# Fit model using OLS | ||
X = features.values | ||
X = np.column_stack([np.ones(len(X)), X]) | ||
betas = np.linalg.pinv(X.T @ X) @ X.T @ y | ||
|
||
# Store parameters | ||
self.beta0 = betas[0] | ||
self.beta1 = betas[1] | ||
self.beta1q = betas[2] | ||
self.beta2 = betas[3] | ||
self.beta3 = betas[4] | ||
|
||
self.params = { | ||
"beta0": self.beta0, | ||
"beta1": self.beta1, | ||
"beta1q": self.beta1q, | ||
"beta2": self.beta2, | ||
"beta3": self.beta3, | ||
} | ||
|
||
return features, y | ||
|
||
def forecast_n_days(self, returns, rv_series, n_days=5): | ||
"""Forecast volatility for next n days""" | ||
if self.params is None: | ||
raise ValueError("Model must be fitted before forecasting") | ||
|
||
# Initialize forecasts | ||
forecasts = [] | ||
|
||
# Get latest values | ||
latest_rv = rv_series.iloc[-1] | ||
latest_weekly = rv_series.iloc[-5:].mean() | ||
latest_monthly = rv_series.iloc[-22:].mean() | ||
latest_rq = self.compute_realized_quarticity(returns.iloc[-22:]) | ||
|
||
# Generate forecasts | ||
for _ in range(n_days): | ||
forecast = ( | ||
self.beta0 | ||
+ self.beta1 * latest_rv | ||
+ self.beta1q * (latest_rv * latest_rq) | ||
+ self.beta2 * latest_weekly | ||
+ self.beta3 * latest_monthly | ||
) | ||
|
||
forecasts.append(forecast) | ||
|
||
# Update for next iteration | ||
latest_rv = forecast | ||
latest_weekly = (latest_weekly * 4 + forecast) / 5 | ||
latest_monthly = (latest_monthly * 21 + forecast) / 22 | ||
|
||
return forecasts | ||
|
||
|
||
def main(): | ||
# Download data | ||
symbol = "SPY" | ||
end_date = datetime.now() | ||
start_date = end_date - timedelta(days=365) | ||
|
||
df = yf.download(symbol, start=start_date, end=end_date) | ||
|
||
# Calculate daily returns | ||
df["returns"] = df["Adj Close"].pct_change() | ||
|
||
# Calculate realized volatility | ||
df["rv"] = df["returns"].rolling(window=22).std() * np.sqrt(252) | ||
|
||
# Initialize and fit model | ||
model = HARQModel() | ||
_, _ = model.fit(df["returns"], df["rv"]) | ||
|
||
# Make predictions | ||
n_days = 5 | ||
forecasts = model.forecast_n_days(df["returns"], df["rv"], n_days) | ||
|
||
# Generate forecast dates | ||
last_date = df.index[-1] | ||
forecast_dates = [ | ||
(last_date + timedelta(days=i + 1)).strftime("%Y-%m-%d") for i in range(n_days) | ||
] | ||
|
||
# Print results | ||
print( | ||
f"\nCurrent volatility ({last_date.strftime('%Y-%m-%d')}): {df['rv'].iloc[-1]:.1%}" | ||
) | ||
print("\nForecasted annualized volatility:") | ||
for date, forecast in zip(forecast_dates, forecasts): | ||
print(f"- {date}: {forecast:.1%}") | ||
|
||
# Plot results | ||
plt.figure(figsize=(12, 6)) | ||
plt.plot(df.index[-60:], df["rv"].iloc[-60:], label="Historical Volatility") | ||
|
||
# Plot forecasts | ||
forecast_dates = [pd.to_datetime(date) for date in forecast_dates] | ||
plt.plot(forecast_dates, forecasts, "r--", label="Forecast") | ||
|
||
plt.title(f"{symbol} Volatility Forecast") | ||
plt.legend() | ||
plt.grid(True) | ||
plt.show() | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |