You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have developed a modular way to run flmal's automl on multiple groups of data (each dataset is sliced from the overall data, and each dataset is associated one family of products, capturing their historical sales).
When I use in-built metrics e.g. "r2" from the list of optimization metrics, I am able to train everything smoothly, without any issues. In fact I have conducted many experiments and logged the respective results for each experiment in mlflow (each experiment consists of a list of optimized models, one for each family of products).
I then wanted to explore if I can change my default metric from "r2" to a custom metric called "custom_adjusted_r2".
I am attaching the code for reference, below -
So it looks like, the r2_score computation within my custom_metric is the source of the problem (as if I dont use custom_metric and use the out of the box "r2", everything works as intended - and this is the only change I made, apart from logging the correct metrics later, which is purely an arithmetic operation).
Looking at the function signature of the custom_metric, I am using the variables estimator and X_val, to compute Y_pred. I am unable to understand which estimator is passed here and which X_val is passed here, to result in an empty Y_pred, which is causing the r2_score error.
I know its a lot of code, and lot of lines, but
TL;DR -
Would like more clarity on how the arguments in the custom_metric are parsed and how the X_val, Y_val are passed to this custom metric, during the automl.fit() call using the settings config dictionary.
(Also, not sure why X_train and Y_train are also added in the signature, but looking at the example provided in the documentation, its used to close the gap between training and validation loss).
And if I am able to find a solution for this, I would like to then explore with another custom_metric which is WMAPE, (weighted mean absolute percentage error).
It would be nice if the dev team can actually accommodate these 2 metrics in the existing list of metrics as well.
Thanks in advance!
Steps to reproduce
Code -
from flaml import AutoML
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
import pandas as pd
import mlflow
from mlflow.exceptions import MlflowException
import logging
def custom_adjusted_r2(
X_val,
Y_val,
estimator,
labels,
X_train,
Y_train,
weight_val=None,
weight_train=None,
config=None,
groups_val=None,
groups_train=None,
):
# get predictions for x_val
Y_pred = estimator.predict(X_val)
# compute basic r2
r2 = r2_score(Y_val, Y_pred, sample_weight=weight_val) if weight_val is not None else r2_score(Y_val, Y_pred)
n = len(Y_val)
p = X_val.shape[1] # assuming x_val is array-like
# avoid division by zero if n - p - 1 <= 0
if n - p - 1 <= 0:
adjusted_r2 = r2
else:
adjusted_r2 = 1 - ((1 - r2) * (n - 1) / (n - p - 1))
# since flaml minimizes the metric, return 1 - adjusted_r2
metric_to_minimize = 1 - adjusted_r2
metrics_to_log = {"r2": r2, "adjusted_r2": adjusted_r2}
return metric_to_minimize, metrics_to_log
def custom_wmape(
X_val,
Y_val,
estimator,
labels,
X_train,
Y_train,
weight_val=None,
weight_train=None,
config=None,
groups_val=None,
groups_train=None,
):
# get predictions for x_val
Y_pred = estimator.predict(X_val)
# compute numerator and denominator, using weights if provided
if weight_val is not None:
num = np.sum(weight_val * np.abs(Y_val - Y_pred))
den = np.sum(weight_val * np.abs(Y_val))
else:
num = np.sum(np.abs(Y_val - Y_pred))
den = np.sum(np.abs(Y_val))
wmape = num / den if den != 0 else float('inf')
return wmape, {"wmape": wmape}
def split_data(pdf, feature_cols, target_col, test_ratio=0.2, random_state=42):
"""
Splits the data into train and test sets.
Args:
- pdf: Pandas DataFrame
- feature_cols: List of feature columns
- target_col: Target column name
- test_ratio: Proportion of data for the test set
- random_state: Random seed for reproducibility
Returns:
- X_train, y_train, X_test, y_test
"""
# Train-test split
train_data, test_data = train_test_split(pdf, test_size=test_ratio, random_state=random_state)
# Extract features and target
X_train = train_data[feature_cols].values.astype(float)
y_train = train_data[target_col].values.astype(float)
X_test = test_data[feature_cols].values.astype(float)
y_test = test_data[target_col].values.astype(float)
return X_train, y_train, X_test, y_test
def run_flaml_with_cv(X_train, y_train, msku_bucket, log_folder_name, time_budget=300, estimator_list=None, estimator_time_ratio=None):
"""
Runs FLAML AutoML with built-in cross-validation.
Args:
- X_train: Training features
- y_train: Training target
- time_budget: Time budget for AutoML (in seconds)
- estimator_list: List of estimators to consider
Returns:
- automl: Trained FLAML AutoML object
"""
automl = AutoML()
automl_settings = {
"time_budget": time_budget,
#"max_iter": 1000,
#"metric": "r2",
"metric": "custom_adjusted_r2",
"task": "regression",
"log_file_name": f"/dbfs/mnt/sports-analytics-test/{log_folder_name}/flaml_automl_{msku_bucket}.log",
"eval_method": "cv", # Enable built-in cross-validation
"n_splits": 5, # Use 5-fold cross-validation
"ensemble": True,
"estimator_list": estimator_list,
#"split_ratio": 0.2,
"seed": 42
}
#logger.info("Starting FLAML AutoML with cross-validation...")
automl.fit(X_train=X_train, y_train=y_train, **automl_settings)
#logger.info("FLAML AutoML completed.")
return automl
def log_flaml_model(msku_bucket, automl, X_train, y_train, X_test, y_test, experiment_name, sales_share, num_points):
"""
Logs FLAML AutoML model and metrics to MLflow.
Args:
- msku_bucket: MSKU bucket identifier
- automl: Trained FLAML AutoML object
- X_train: Training features
- y_train: Training labels
- X_test: Test features
- y_test: Test labels
- experiment_name: Name of the MLflow experiment
"""
mlflow.set_experiment(experiment_name)
model_name = f"FLAML_Model_{msku_bucket}"
# Check if this run already exists
existing_runs = mlflow.search_runs(filter_string=f"tags.`mlflow.runName` = '{model_name}'")
if not existing_runs.empty:
print(f"Skipping duplicate MLflow logging for {msku_bucket}")
return # Prevents duplicate logging
with mlflow.start_run(run_name=model_name) as run:
run_id = run.info.run_id # Capture run ID
# Log parameters
mlflow.log_params(automl.best_config)
mlflow.log_param("best_model_family", automl.best_estimator)
mlflow.log_param("best_model_train_time", automl.best_config_train_time)
mlflow.log_param("time_to_find_best_model", automl.time_to_find_best_model)
mlflow.log_param("sales_share", sales_share)
mlflow.log_param("num_points", num_points)
# Compute Metrics for Train, Validation, and Test
y_train_pred = automl.predict(X_train)
y_test_pred = automl.predict(X_test)
train_r2 = r2_score(y_train, y_train_pred)
test_r2 = r2_score(y_test, y_test_pred)
val_r2 = 1 - automl.best_loss # FLAML's best_loss is the validation loss (1 - R2)
# compute adjusted r2 for validation set
n = len(y_test)
p = X_test.shape[1]
if n - p - 1 <= 0:
train_adjusted_r2 = train_r2
val_adjusted_r2 = val_r2
test_adjusted_r2 = test_r2
else:
train_adjusted_r2 = 1 - ((1 - train_r2) * (n - 1) / (n - p - 1))
val_adjusted_r2 = 1 - ((1 - val_r2) * (n - 1) / (n - p - 1))
test_adjusted_r2 = 1 - ((1 - test_r2) * (n - 1) / (n - p - 1))
train_metrics = {
"train_r2": train_adjusted_r2, #r2_score(y_train, y_train_pred),
"train_rmse": mean_squared_error(y_train, y_train_pred, squared=False),
"train_mape": np.mean(np.abs((y_train - y_train_pred) / y_train)) * 100,
"train_wmape": np.sum(np.abs(y_train - y_train_pred)) / np.sum(y_train),
"train_mae": mean_absolute_error(y_train, y_train_pred)
}
test_metrics = {
"test_r2": test_adjusted_r2, #r2_score(y_test, y_test_pred),
"test_rmse": mean_squared_error(y_test, y_test_pred, squared=False),
"test_mape": np.mean(np.abs((y_test - y_test_pred) / y_test)) * 100,
"test_wmape": np.sum(np.abs(y_test - y_test_pred)) / np.sum(y_test),
"test_mae": mean_absolute_error(y_test, y_test_pred)
}
# # Merge train, test, and validation metrics
all_metrics = {**train_metrics, **test_metrics, "val_r2": val_adjusted_r2}
# Log all metrics in MLflow
mlflow.log_metrics(all_metrics)
# Collect log messages in a list
log_messages = []
# Log and Register the Model
mlflow.sklearn.log_model(automl.model, artifact_path="models")
try:
registered_model = mlflow.register_model(f"runs:/{run_id}/models", model_name)
log_messages.append(f"Registered model: {model_name}, version: {registered_model.version}")
except MlflowException as e:
log_messages.append(f"Skipping registration for {model_name}, it already exists.")
# Log additional artifacts
log_messages.append("FLAML AutoML training completed.")
mlflow.log_text("\n".join(log_messages), artifact_file="log.txt")
mlflow.log_text(str(automl.config_history), artifact_file="config_history.txt")
#mlflow.log_text(str(automl.metrics_for_best_config), artifact_file="metrics_for_best_config.txt")
#mlflow.log_text(str(automl.best_result), artifact_file="best_result.txt")
mlflow.end_run() # Ensure the run is closed
def train_evaluate_model_with_flaml(pdf, exp_name, log_folder_name, extra_features=[]):
"""
Train and evaluate a model for each MSKU using FLAML and log with MLflow.
Args:
- pdf: Pandas DataFrame for one MSKU bucket
Returns:
- pd.DataFrame: Results for the MSKU
"""
msku_bucket = pdf["msku_bucket"].iloc[0]
sales_share = pdf["sales_share"].iloc[0]
num_points = pdf.shape[0]
mlflow.set_experiment(exp_name)
experiment_name = mlflow.get_experiment_by_name(exp_name).name
# Define features and target
feature_cols = (
["wtd_avg_md", "event_effect"] +
[f"day_of_week_ohe_dense_{i}" for i in range(7)] +
[f"week_of_month_ohe_dense_{i}" for i in range(6)] +
[f"month_of_year_ohe_dense_{i}" for i in range(12)]
)
feature_cols.extend(extra_features)
target_col = "total_sold_qty"
estimator_list = ['lgbm', 'xgboost', 'xgb_limitdepth', 'rf', 'extra_tree', 'histgb']
time_budget = 300
# Split data into train and test sets
X_train, y_train, X_test, y_test = split_data(pdf, feature_cols, target_col)
# Run FLAML AutoML with cross-validation
automl = run_flaml_with_cv(X_train, y_train, msku_bucket, log_folder_name=log_folder_name, time_budget=time_budget, estimator_list=estimator_list)
# Log results to MLflow
log_flaml_model(msku_bucket, automl, X_train, y_train, X_test, y_test, experiment_name, sales_share, num_points)
# Calculating regular r2 here, to compare if adjusted r2 in the logged metric is actually lower than the usual r2, just for display
train_r2 = r2_score(y_train, automl.predict(X_train))
test_r2 = r2_score(y_test, automl.predict(X_test))
val_r2_mean = 1 - automl.best_loss # Best validation R2 (logged as loss)
return pd.DataFrame([{
"msku_bucket": msku_bucket,
"sales_share": sales_share,
"num_points": num_points,
"train_r2": train_r2,
"test_r2": test_r2,
"val_r2": val_r2_mean,
"best_model": automl.best_estimator,
"best_config": str(automl.best_config),
"best_model_train_time": automl.best_config_train_time
#"model_summary": model_summary.to_dict("records") # Summary of all models
}])
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3482.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3482.0 (TID 7414) (10.50.219.10 executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/root/.ipykernel/1483/command-3206881139961060-1069190184", line 297, in
File "/root/.ipykernel/1483/command-3206881139961060-1069190184", line 270, in train_evaluate_model_with_flaml
File "/databricks/python/lib/python3.11/site-packages/sklearn/utils/_param_validation.py", line 201, in wrapper
validate_parameter_constraints(
File "/databricks/python/lib/python3.11/site-packages/sklearn/utils/_param_validation.py", line 95, in validate_parameter_constraints
raise InvalidParameterError(
sklearn.utils._param_validation.InvalidParameterError: The 'y_pred' parameter of r2_score must be an array-like. Got None instead.
Screenshots and logs
No response
Additional Information
flaml version: flaml==2.3.3
OS: Linux
Python version: 3.11
Databricks version: 15.4 LTS ML (includes Apache Spark 3.5.0, Scala 2.12)
The text was updated successfully, but these errors were encountered:
I think digging a bit in the documentation -
There does not seem to include a graceful handling of the case when X_val is None (default argument passed while calling the automl.fit()) and when eval_method = 'cv'.
If the eval_method is "auto", there is way to decide eval_method, and if the eval_method is "holdout" then this condition handles it.
I am unable to find the equivalent logic for initializing the X_val and Y_val (needed for the invoking of a custom_metric), which is why I think my y_preds are None.
Describe the issue
I have developed a modular way to run flmal's automl on multiple groups of data (each dataset is sliced from the overall data, and each dataset is associated one family of products, capturing their historical sales).
When I use in-built metrics e.g. "r2" from the list of optimization metrics, I am able to train everything smoothly, without any issues. In fact I have conducted many experiments and logged the respective results for each experiment in mlflow (each experiment consists of a list of optimized models, one for each family of products).
I then wanted to explore if I can change my default metric from "r2" to a custom metric called "custom_adjusted_r2".
I am attaching the code for reference, below -
So it looks like, the r2_score computation within my custom_metric is the source of the problem (as if I dont use custom_metric and use the out of the box "r2", everything works as intended - and this is the only change I made, apart from logging the correct metrics later, which is purely an arithmetic operation).
Looking at the function signature of the custom_metric, I am using the variables estimator and X_val, to compute Y_pred. I am unable to understand which estimator is passed here and which X_val is passed here, to result in an empty Y_pred, which is causing the r2_score error.
I know its a lot of code, and lot of lines, but
TL;DR -
Would like more clarity on how the arguments in the custom_metric are parsed and how the X_val, Y_val are passed to this custom metric, during the automl.fit() call using the settings config dictionary.
(Also, not sure why X_train and Y_train are also added in the signature, but looking at the example provided in the documentation, its used to close the gap between training and validation loss).
And if I am able to find a solution for this, I would like to then explore with another custom_metric which is WMAPE, (weighted mean absolute percentage error).
It would be nice if the dev team can actually accommodate these 2 metrics in the existing list of metrics as well.
Thanks in advance!
Steps to reproduce
Code -
This is the error I am receiving -
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3482.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3482.0 (TID 7414) (10.50.219.10 executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/root/.ipykernel/1483/command-3206881139961060-1069190184", line 297, in
File "/root/.ipykernel/1483/command-3206881139961060-1069190184", line 270, in train_evaluate_model_with_flaml
File "/databricks/python/lib/python3.11/site-packages/sklearn/utils/_param_validation.py", line 201, in wrapper
validate_parameter_constraints(
File "/databricks/python/lib/python3.11/site-packages/sklearn/utils/_param_validation.py", line 95, in validate_parameter_constraints
raise InvalidParameterError(
sklearn.utils._param_validation.InvalidParameterError: The 'y_pred' parameter of r2_score must be an array-like. Got None instead.
Screenshots and logs
No response
Additional Information
flaml version: flaml==2.3.3
OS: Linux
Python version: 3.11
Databricks version: 15.4 LTS ML (includes Apache Spark 3.5.0, Scala 2.12)
The text was updated successfully, but these errors were encountered: