Skip to content

Commit

Permalink
Forecast Scenario Notebook for Local and Remote Inferencing (#3429)
Browse files Browse the repository at this point in the history
* Initial commit with the codes

* Working batch inference with gap

* Cleanup code and delete outputs

* Fix format issues

* Working batch inf e2e

* Fix formatting issues

---------

Co-authored-by: Rahul Kumar <74648335+iamrk04@users.noreply.github.com>
  • Loading branch information
SamGos93 and iamrk04 authored Oct 29, 2024
1 parent b0b3ff7 commit bc167ff
Show file tree
Hide file tree
Showing 8 changed files with 2,036 additions and 0 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""
This is the script that is executed on the compute instance. It relies
on the model.pkl file which is uploaded along with this script to the
compute instance.
"""

import os

import pandas as pd

from azureml.core import Dataset, Run
import joblib
from pandas.tseries.frequencies import to_offset


def init():
global target_column_name
global fitted_model

target_column_name = os.environ["TARGET_COLUMN_NAME"]
# AZUREML_MODEL_DIR is an environment variable created during deployment
# It is the path to the model folder (./azureml-models)
# Please provide your model's folder name if there's one
model_path = os.path.join(os.environ["AZUREML_MODEL_DIR"], "model.pkl")
fitted_model = joblib.load(model_path)


def run(mini_batch):
print(f"run method start: {__file__}, run({mini_batch})")
resultList = []
for test in mini_batch:
if os.path.splitext(test)[-1] == ".parquet":
X_test = pd.read_parquet(test)
elif os.path.splitext(test)[-1] == ".csv":
X_test = pd.read_csv(test, parse_dates=[fitted_model.time_column_name])
else:
continue # Skip if it's neither a Parquet nor CSV file

y_test = X_test.pop(target_column_name).values

# We have default quantiles values set as below(95th percentile)
quantiles = [0.025, 0.5, 0.975]
predicted_column_name = "predicted"
PI = "prediction_interval"
fitted_model.quantiles = quantiles
pred_quantiles = fitted_model.forecast_quantiles(
X_test, ignore_data_errors=True
)
pred_quantiles[PI] = pred_quantiles[[min(quantiles), max(quantiles)]].apply(
lambda x: "[{}, {}]".format(x[0], x[1]), axis=1
)
X_test[target_column_name] = y_test
X_test[PI] = pred_quantiles[PI].values
X_test[predicted_column_name] = pred_quantiles[0.5].values
# drop rows where prediction or actuals are nan
# happens because of missing actuals
# or at edges of time due to lags/rolling windows
clean = X_test[
X_test[[target_column_name, predicted_column_name]].notnull().all(axis=1)
]
print(
f"The predictions have {clean.shape[0]} rows and {clean.shape[1]} columns."
)

resultList.append(clean)

return pd.concat(resultList, sort=False, ignore_index=True)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"append_row": {"pandas.DataFrame.to_csv": {"sep": ","}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Generate synthetic data

import pandas as pd
import numpy as np


def get_timeseries(
train_len: int,
test_len: int,
time_column_name: str,
target_column_name: str,
time_series_id_column_name: str,
time_series_number: int = 1,
freq: str = "H",
):
"""
Return the time series of designed length.
:param train_len: The length of training data (one series).
:type train_len: int
:param test_len: The length of testing data (one series).
:type test_len: int
:param time_column_name: The desired name of a time column.
:type time_column_name: str
:param time_series_number: The number of time series in the data set.
:type time_series_number: int
:param freq: The frequency string representing pandas offset.
see https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html
:type freq: str
:returns: the tuple of train and test data sets.
:rtype: tuple
"""
data_train = [] # type: List[pd.DataFrame]
data_test = [] # type: List[pd.DataFrame]
data_length = train_len + test_len
for i in range(time_series_number):
X = pd.DataFrame(
{
time_column_name: pd.date_range(
start="2000-01-01", periods=data_length, freq=freq
),
target_column_name: np.arange(data_length).astype(float)
+ np.random.rand(data_length)
+ i * 5,
"ext_predictor": np.asarray(range(42, 42 + data_length)),
time_series_id_column_name: np.repeat("ts{}".format(i), data_length),
}
)
data_train.append(X[:train_len])
data_test.append(X[train_len:])
X_train = pd.concat(data_train)
y_train = X_train.pop(target_column_name).values
X_test = pd.concat(data_test)
y_test = X_test.pop(target_column_name).values
return X_train, y_train, X_test, y_test


def make_forecasting_query(
fulldata, time_column_name, target_column_name, forecast_origin, horizon, lookback
):

"""
This function will take the full dataset, and create the query
to predict all values of the time series from the `forecast_origin`
forward for the next `horizon` horizons. Context from previous
`lookback` periods will be included.
fulldata: pandas.DataFrame a time series dataset. Needs to contain X and y.
time_column_name: string which column (must be in fulldata) is the time axis
target_column_name: string which column (must be in fulldata) is to be forecast
forecast_origin: datetime type the last time we (pretend to) have target values
horizon: timedelta how far forward, in time units (not periods)
lookback: timedelta how far back does the model look
Example:
```
forecast_origin = pd.to_datetime("2012-09-01") + pd.DateOffset(days=5) # forecast 5 days after end of training
print(forecast_origin)
X_query, y_query = make_forecasting_query(data,
forecast_origin = forecast_origin,
horizon = pd.DateOffset(days=7), # 7 days into the future
lookback = pd.DateOffset(days=1), # model has lag 1 period (day)
)
```
"""

X_past = fulldata[
(fulldata[time_column_name] > forecast_origin - lookback)
& (fulldata[time_column_name] <= forecast_origin)
]

X_future = fulldata[
(fulldata[time_column_name] > forecast_origin)
& (fulldata[time_column_name] <= forecast_origin + horizon)
]

y_past = X_past.pop(target_column_name).values.astype(float)
y_future = X_future.pop(target_column_name).values.astype(float)

# Now take y_future and turn it into question marks
y_query = y_future.copy().astype(float) # because sometimes life hands you an int
y_query.fill(np.nan)

print("X_past is " + str(X_past.shape) + " - shaped")
print("X_future is " + str(X_future.shape) + " - shaped")
print("y_past is " + str(y_past.shape) + " - shaped")
print("y_query is " + str(y_query.shape) + " - shaped")

X_pred = pd.concat([X_past, X_future])
y_pred = np.concatenate([y_past, y_query])
return X_pred, y_pred
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit bc167ff

Please sign in to comment.