From 31dddcf0cac90c8f9c0e8acd09874b3906f5d633 Mon Sep 17 00:00:00 2001 From: Junhui Yang Date: Tue, 2 Apr 2024 03:53:34 +0100 Subject: [PATCH] Best hyperparameters set --- config.yaml | 4 +- main.py | 109 +++++++++++++++++------- src/data_check/conda.yml | 4 +- src/data_check/test_data.py | 5 ++ src/train_random_forest/run.py | 147 +++++++++++++++++---------------- 5 files changed, 164 insertions(+), 105 deletions(-) diff --git a/config.yaml b/config.yaml index 933ec1f3e..15a8a48aa 100644 --- a/config.yaml +++ b/config.yaml @@ -22,7 +22,7 @@ modeling: stratify_by: "neighbourhood_group" # Maximum number of features to consider for the TFIDF applied to the title of the # insertion (the column called "name") - max_tfidf_features: 5 + max_tfidf_features: 30 # NOTE: you can put here any parameter that is accepted by the constructor of # RandomForestRegressor. This is a subsample, but more could be added: random_forest: @@ -33,6 +33,6 @@ modeling: # Here -1 means all available cores n_jobs: -1 criterion: squared_error - max_features: 0.5 + max_features: 0.33 # DO not change the following oob_score: true diff --git a/main.py b/main.py index 27a89b1d6..1c026ada5 100644 --- a/main.py +++ b/main.py @@ -8,15 +8,15 @@ from omegaconf import DictConfig _steps = [ - "download", - "basic_cleaning", - "data_check", - "data_split", - "train_random_forest", + 'download', + 'basic_cleaning', + 'data_check', + 'data_split', + 'train_random_forest', # NOTE: We do not include this in the steps so it is not run by mistake. - # You first need to promote a model export to "prod" before you can run this, + # You first need to promote a model export to 'prod' before you can run this, # then you need to run this step explicitly -# "test_regression_model" +# 'test_regression_model' ] @@ -25,47 +25,77 @@ def go(config: DictConfig): # Setup the wandb experiment. All runs will be grouped under this name - os.environ["WANDB_PROJECT"] = config["main"]["project_name"] - os.environ["WANDB_RUN_GROUP"] = config["main"]["experiment_name"] + os.environ['WANDB_PROJECT'] = config['main']['project_name'] + os.environ['WANDB_RUN_GROUP'] = config['main']['experiment_name'] # Steps to execute steps_par = config['main']['steps'] - active_steps = steps_par.split(",") if steps_par != "all" else _steps + active_steps = steps_par.split(',') if steps_par != 'all' else _steps # Move to a temporary directory with tempfile.TemporaryDirectory() as tmp_dir: - if "download" in active_steps: + if 'download' in active_steps: # Download file and load in W&B _ = mlflow.run( f"{config['main']['components_repository']}/get_data", - "main", + 'main', version='main', parameters={ - "sample": config["etl"]["sample"], - "artifact_name": "sample.csv", - "artifact_type": "raw_data", - "artifact_description": "Raw file as downloaded" + 'sample': config['etl']['sample'], + 'artifact_name': 'sample.csv', + 'artifact_type': 'raw_data', + 'artifact_description': 'Raw file as downloaded' }, ) - if "basic_cleaning" in active_steps: + if 'basic_cleaning' in active_steps: ################## # Implement here # ################## - pass - - if "data_check" in active_steps: + _ = mlflow.run( + os.path.join(hydra.utils.get_original_cwd(), 'src', 'basic_cleaning'), + 'main', + parameters={ + 'input_artifact': 'sample.csv:latest', + 'output_artifact': 'clean_sample.csv', + 'output_type': 'clean_sample', + 'output_description': 'Data with outliers and null values removed', + 'min_price': config['etl']['min_price'], + 'max_price': config['etl']['max_price'] + }, + ) + + if 'data_check' in active_steps: ################## # Implement here # ################## - pass - - if "data_split" in active_steps: + _ = mlflow.run( + os.path.join(hydra.utils.get_original_cwd(), 'src', 'data_check'), + 'main', + parameters={ + 'csv': 'clean_sample.csv:latest', + 'ref': 'clean_sample.csv:reference', + 'kl_threshold': config['data_check']['kl_threshold'], + 'min_price': config['etl']['min_price'], + 'max_price': config['etl']['max_price'] + }, + ) + + if 'data_split' in active_steps: ################## # Implement here # ################## - pass + _ = mlflow.run( + os.path.join(hydra.utils.get_original_cwd(), 'components', 'train_val_test_split'), + 'main', + parameters={ + 'input': 'clean_sample.csv:latest', + 'test_size': config['modeling']['test_size'], + 'random_seed': config['modeling']['random_seed'], + 'stratify_by': config['modeling']['stratify_by'] + }, + ) if "train_random_forest" in active_steps: @@ -80,17 +110,34 @@ def go(config: DictConfig): ################## # Implement here # ################## - - pass - - if "test_regression_model" in active_steps: + _ = mlflow.run( + os.path.join(hydra.utils.get_original_cwd(), 'src', 'train_random_forest'), + 'main', + parameters={ + 'trainval_artifact': 'trainval_data.csv:latest', + 'val_size': config['modeling']['val_size'], + 'random_seed': config['modeling']['random_seed'], + 'stratify_by': config['modeling']['stratify_by'], + 'rf_config': rf_config, + 'max_tfidf_features': config['modeling']['max_tfidf_features'], + 'output_artifact': 'random_forest_export' + }, + ) + + if 'test_regression_model' in active_steps: ################## # Implement here # ################## - - pass + _ = mlflow.run( + os.path.join(hydra.utils.get_original_cwd(), 'components', 'test_regression_model'), + 'main', + parameters={ + 'mlflow_model': 'random_forest_export:prod', + 'test_dataset': 'test_data.csv:latest' + }, + ) -if __name__ == "__main__": +if __name__ == '__main__': go() diff --git a/src/data_check/conda.yml b/src/data_check/conda.yml index 4321977d1..11c218904 100644 --- a/src/data_check/conda.yml +++ b/src/data_check/conda.yml @@ -5,8 +5,8 @@ channels: dependencies: - python=3.10.0 - pandas=2.1.3 - - pytest=6.2.2 - - scipy=1.5.2 + - pytest=6.2.5 + - scipy=1.7.3 - pip=23.3.1 - pip: - mlflow==2.8.1 diff --git a/src/data_check/test_data.py b/src/data_check/test_data.py index 6ed3ec6fb..ed823e5d4 100644 --- a/src/data_check/test_data.py +++ b/src/data_check/test_data.py @@ -63,3 +63,8 @@ def test_similar_neigh_distrib(data: pd.DataFrame, ref_data: pd.DataFrame, kl_th ######################################################## # Implement here test_row_count and test_price_range # ######################################################## +def test_row_count(data): + assert 15000 < data.shape[0] < 1000000 + +def test_price_range(data, min_price, max_price): + assert data['price'].between(min_price, max_price).all() diff --git a/src/train_random_forest/run.py b/src/train_random_forest/run.py index d8f37d41b..57368c9f6 100644 --- a/src/train_random_forest/run.py +++ b/src/train_random_forest/run.py @@ -1,7 +1,7 @@ #!/usr/bin/env python -""" +''' This script trains a Random Forest -""" +''' import argparse import logging import os @@ -26,21 +26,21 @@ def delta_date_feature(dates): - """ + ''' Given a 2d array containing dates (in any format recognized by pd.to_datetime), it returns the delta in days between each date and the most recent date in its column - """ + ''' date_sanitized = pd.DataFrame(dates).apply(pd.to_datetime) return date_sanitized.apply(lambda d: (d.max() -d).dt.days, axis=0).to_numpy() -logging.basicConfig(level=logging.INFO, format="%(asctime)-15s %(message)s") +logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(message)s') logger = logging.getLogger() def go(args): - run = wandb.init(job_type="train_random_forest") + run = wandb.init(job_type='train_random_forest') run.config.update(args) # Get the Random Forest configuration and update W&B @@ -54,89 +54,96 @@ def go(args): ###################################### # Use run.use_artifact(...).file() to get the train and validation artifact (args.trainval_artifact) # and save the returned path in train_local_pat - trainval_local_path = # YOUR CODE HERE + trainval_local_path = run.use_artifact(args.trainval_artifact).file() # YOUR CODE HERE ###################################### X = pd.read_csv(trainval_local_path) - y = X.pop("price") # this removes the column "price" from X and puts it into y + y = X.pop('price') # this removes the column 'price' from X and puts it into y - logger.info(f"Minimum price: {y.min()}, Maximum price: {y.max()}") + logger.info(f'Minimum price: {y.min()}, Maximum price: {y.max()}') X_train, X_val, y_train, y_val = train_test_split( X, y, test_size=args.val_size, stratify=X[args.stratify_by], random_state=args.random_seed ) - logger.info("Preparing sklearn pipeline") + logger.info('Preparing sklearn pipeline') sk_pipe, processed_features = get_inference_pipeline(rf_config, args.max_tfidf_features) # Then fit it to the X_train, y_train data - logger.info("Fitting") + logger.info('Fitting') ###################################### # Fit the pipeline sk_pipe by calling the .fit method on X_train and y_train # YOUR CODE HERE + sk_pipe.fit(X_train, y_train) ###################################### # Compute r2 and MAE - logger.info("Scoring") + logger.info('Scoring') r_squared = sk_pipe.score(X_val, y_val) y_pred = sk_pipe.predict(X_val) mae = mean_absolute_error(y_val, y_pred) - logger.info(f"Score: {r_squared}") - logger.info(f"MAE: {mae}") + logger.info(f'Score: {r_squared}') + logger.info(f'MAE: {mae}') - logger.info("Exporting model") + logger.info('Exporting model') # Save model package in the MLFlow sklearn format - if os.path.exists("random_forest_dir"): - shutil.rmtree("random_forest_dir") + if os.path.exists('random_forest_dir'): + shutil.rmtree('random_forest_dir') ###################################### - # Save the sk_pipe pipeline as a mlflow.sklearn model in the directory "random_forest_dir" + # Save the sk_pipe pipeline as a mlflow.sklearn model in the directory 'random_forest_dir' # HINT: use mlflow.sklearn.save_model # YOUR CODE HERE + mlflow.sklearn.save_model(sk_pipe, 'random_forest_dir') ###################################### ###################################### # Upload the model we just exported to W&B - # HINT: use wandb.Artifact to create an artifact. Use args.output_artifact as artifact name, "model_export" as + # HINT: use wandb.Artifact to create an artifact. Use args.output_artifact as artifact name, 'model_export' as # type, provide a description and add rf_config as metadata. Then, use the .add_dir method of the artifact instance - # you just created to add the "random_forest_dir" directory to the artifact, and finally use + # you just created to add the 'random_forest_dir' directory to the artifact, and finally use # run.log_artifact to log the artifact to the run # YOUR CODE HERE + artifact = wandb.Artifact(args.output_artifact, type = 'model_export', description = 'Random Forest pipeline export', metadata = rf_config) + artifact.add_dir('random_forest_dir') + run.log_artifact(artifact) + artifact.wait() ###################################### # Plot feature importance fig_feat_imp = plot_feature_importance(sk_pipe, processed_features) ###################################### - # Here we save r_squared under the "r2" key + # Here we save r_squared under the 'r2' key run.summary['r2'] = r_squared - # Now log the variable "mae" under the key "mae". + # Now log the variable 'mae' under the key 'mae'. # YOUR CODE HERE + run.log({'mae': mae}) ###################################### # Upload to W&B the feture importance visualization run.log( { - "feature_importance": wandb.Image(fig_feat_imp), + 'feature_importance': wandb.Image(fig_feat_imp), } ) def plot_feature_importance(pipe, feat_names): # We collect the feature importance for all non-nlp features first - feat_imp = pipe["random_forest"].feature_importances_[: len(feat_names)-1] + feat_imp = pipe['random_forest'].feature_importances_[: len(feat_names)-1] # For the NLP feature we sum across all the TF-IDF dimensions into a global # NLP importance - nlp_importance = sum(pipe["random_forest"].feature_importances_[len(feat_names) - 1:]) + nlp_importance = sum(pipe['random_forest'].feature_importances_[len(feat_names) - 1:]) feat_imp = np.append(feat_imp, nlp_importance) fig_feat_imp, sub_feat_imp = plt.subplots(figsize=(10, 10)) # idx = np.argsort(feat_imp)[::-1] - sub_feat_imp.bar(range(feat_imp.shape[0]), feat_imp, color="r", align="center") + sub_feat_imp.bar(range(feat_imp.shape[0]), feat_imp, color='r', align='center') _ = sub_feat_imp.set_xticks(range(feat_imp.shape[0])) _ = sub_feat_imp.set_xticklabels(np.array(feat_names), rotation=90) fig_feat_imp.tight_layout() @@ -147,8 +154,8 @@ def get_inference_pipeline(rf_config, max_tfidf_features): # Let's handle the categorical features first # Ordinal categorical are categorical values for which the order is meaningful, for example # for room type: 'Entire home/apt' > 'Private room' > 'Shared room' - ordinal_categorical = ["room_type"] - non_ordinal_categorical = ["neighbourhood_group"] + ordinal_categorical = ['room_type'] + non_ordinal_categorical = ['neighbourhood_group'] # NOTE: we do not need to impute room_type because the type of the room # is mandatory on the websites, so missing values are not possible in production # (nor during training). That is not true for neighbourhood_group @@ -156,23 +163,23 @@ def get_inference_pipeline(rf_config, max_tfidf_features): ###################################### # Build a pipeline with two steps: - # 1 - A SimpleImputer(strategy="most_frequent") to impute missing values + # 1 - A SimpleImputer(strategy='most_frequent') to impute missing values # 2 - A OneHotEncoder() step to encode the variable - non_ordinal_categorical_preproc = # YOUR CODE HERE + non_ordinal_categorical_preproc = make_pipeline(SimpleImputer(strategy='most_frequent'), OneHotEncoder()) # YOUR CODE HERE ###################################### # Let's impute the numerical columns to make sure we can handle missing values # (note that we do not scale because the RF algorithm does not need that) zero_imputed = [ - "minimum_nights", - "number_of_reviews", - "reviews_per_month", - "calculated_host_listings_count", - "availability_365", - "longitude", - "latitude" + 'minimum_nights', + 'number_of_reviews', + 'reviews_per_month', + 'calculated_host_listings_count', + 'availability_365', + 'longitude', + 'latitude' ] - zero_imputer = SimpleImputer(strategy="constant", fill_value=0) + zero_imputer = SimpleImputer(strategy='constant', fill_value=0) # A MINIMAL FEATURE ENGINEERING step: # we create a feature that represents the number of days passed since the last review @@ -183,10 +190,10 @@ def get_inference_pipeline(rf_config, max_tfidf_features): FunctionTransformer(delta_date_feature, check_inverse=False, validate=False) ) - # Some minimal NLP for the "name" column - reshape_to_1d = FunctionTransformer(np.reshape, kw_args={"newshape": -1}) + # Some minimal NLP for the 'name' column + reshape_to_1d = FunctionTransformer(np.reshape, kw_args={'newshape': -1}) name_tfidf = make_pipeline( - SimpleImputer(strategy="constant", fill_value=""), + SimpleImputer(strategy='constant', fill_value=''), reshape_to_1d, TfidfVectorizer( binary=False, @@ -198,80 +205,80 @@ def get_inference_pipeline(rf_config, max_tfidf_features): # Let's put everything together preprocessor = ColumnTransformer( transformers=[ - ("ordinal_cat", ordinal_categorical_preproc, ordinal_categorical), - ("non_ordinal_cat", non_ordinal_categorical_preproc, non_ordinal_categorical), - ("impute_zero", zero_imputer, zero_imputed), - ("transform_date", date_imputer, ["last_review"]), - ("transform_name", name_tfidf, ["name"]) + ('ordinal_cat', ordinal_categorical_preproc, ordinal_categorical), + ('non_ordinal_cat', non_ordinal_categorical_preproc, non_ordinal_categorical), + ('impute_zero', zero_imputer, zero_imputed), + ('transform_date', date_imputer, ['last_review']), + ('transform_name', name_tfidf, ['name']) ], - remainder="drop", # This drops the columns that we do not transform + remainder='drop', # This drops the columns that we do not transform ) - processed_features = ordinal_categorical + non_ordinal_categorical + zero_imputed + ["last_review", "name"] + processed_features = ordinal_categorical + non_ordinal_categorical + zero_imputed + ['last_review', 'name'] # Create random forest random_Forest = RandomForestRegressor(**rf_config) ###################################### - # Create the inference pipeline. The pipeline must have 2 steps: a step called "preprocessor" applying the - # ColumnTransformer instance that we saved in the `preprocessor` variable, and a step called "random_forest" + # Create the inference pipeline. The pipeline must have 2 steps: a step called 'preprocessor' applying the + # ColumnTransformer instance that we saved in the `preprocessor` variable, and a step called 'random_forest' # with the random forest instance that we just saved in the `random_forest` variable. # HINT: Use the explicit Pipeline constructor so you can assign the names to the steps, do not use make_pipeline - sk_pipe = # YOUR CODE HERE + sk_pipe = Pipeline(steps = [('preprocessor', preprocessor), ('random_forest', random_Forest)]) # YOUR CODE HERE return sk_pipe, processed_features -if __name__ == "__main__": +if __name__ == '__main__': - parser = argparse.ArgumentParser(description="Basic cleaning of dataset") + parser = argparse.ArgumentParser(description='Basic cleaning of dataset') parser.add_argument( - "--trainval_artifact", + '--trainval_artifact', type=str, - help="Artifact containing the training dataset. It will be split into train and validation" + help='Artifact containing the training dataset. It will be split into train and validation' ) parser.add_argument( - "--val_size", + '--val_size', type=float, - help="Size of the validation split. Fraction of the dataset, or number of items", + help='Size of the validation split. Fraction of the dataset, or number of items', ) parser.add_argument( - "--random_seed", + '--random_seed', type=int, - help="Seed for random number generator", + help='Seed for random number generator', default=42, required=False, ) parser.add_argument( - "--stratify_by", + '--stratify_by', type=str, - help="Column to use for stratification", - default="none", + help='Column to use for stratification', + default='none', required=False, ) parser.add_argument( - "--rf_config", - help="Random forest configuration. A JSON dict that will be passed to the " - "scikit-learn constructor for RandomForestRegressor.", - default="{}", + '--rf_config', + help='Random forest configuration. A JSON dict that will be passed to the ' + 'scikit-learn constructor for RandomForestRegressor.', + default='{}', ) parser.add_argument( - "--max_tfidf_features", - help="Maximum number of words to consider for the TFIDF", + '--max_tfidf_features', + help='Maximum number of words to consider for the TFIDF', default=10, type=int ) parser.add_argument( - "--output_artifact", + '--output_artifact', type=str, - help="Name for the output serialized model", + help='Name for the output serialized model', required=True, )