diff --git a/notebooks/community/aisamples/AIsample - Book Recommendation.ipynb b/notebooks/community/aisamples/AIsample - Book Recommendation.ipynb index 9c7c2bf396..de432e5ca9 100644 --- a/notebooks/community/aisamples/AIsample - Book Recommendation.ipynb +++ b/notebooks/community/aisamples/AIsample - Book Recommendation.ipynb @@ -137,7 +137,9 @@ } }, "source": [ - "### Download dataset and Upload to lakehouse" + "### Download dataset and Upload to lakehouse\n", + "\n", + "**Please add a lakehouse to the notebook before running it.**" ] }, { @@ -158,36 +160,21 @@ "source": [ "if not IS_CUSTOM_DATA:\n", " # Download demo data files into lakehouse if not exist\n", + " import os, requests\n", + "\n", " remote_url = \"https://synapseaisolutionsa.blob.core.windows.net/public/Book-Recommendation-Dataset\"\n", " file_list = [\"Books.csv\", \"Ratings.csv\", \"Users.csv\"]\n", - "\n", - " # For this demo, we first check if the dataset files are already prepared in the default lakehouse. If not, we'll download the dataset.\n", - " import os\n", - " import requests\n", + " download_path = f\"/lakehouse/default/{DATA_FOLDER}/raw\"\n", "\n", " if not os.path.exists(\"/lakehouse/default\"):\n", - " # ask user to add a lakehouse if no default lakehouse added to the notebook.\n", - " # a new notebook will not link to any lakehouse by default.\n", - " raise FileNotFoundError(\n", - " \"Default lakehouse not found, please add a lakehouse for the notebook.\"\n", - " )\n", - " else:\n", - " # check if the needed files are already in the lakehouse, try to download if not.\n", - " # raise an error if downloading failed.\n", - " os.makedirs(f\"/lakehouse/default/{DATA_FOLDER}/raw/\", exist_ok=True)\n", - " for fname in file_list:\n", - " if not os.path.exists(f\"/lakehouse/default/{DATA_FOLDER}/raw/{fname}\"):\n", - " try:\n", - " r = requests.get(f\"{remote_url}/{fname}\", timeout=30)\n", - " with open(\n", - " f\"/lakehouse/default/{DATA_FOLDER}/raw/{fname}\", \"wb\"\n", - " ) as f:\n", - " f.write(r.content)\n", - " print(f\"Downloaded {fname} into {DATA_FOLDER}/raw/.\")\n", - " except Exception as e:\n", - " print(f\"Failed on downloading {fname}, error message: {e}\")\n", - " else:\n", - " print(f\"{fname} already exists in {DATA_FOLDER}/raw/.\")" + " raise FileNotFoundError(\"Default lakehouse not found, please add a lakehouse.\")\n", + " os.makedirs(download_path, exist_ok=True)\n", + " for fname in file_list:\n", + " if not os.path.exists(f\"{download_path}/{fname}\"):\n", + " r = requests.get(f\"{remote_url}/{fname}\", timeout=30)\n", + " with open(f\"{download_path}/{fname}\", \"wb\") as f:\n", + " f.write(r.content)\n", + " print(\"Downloaded demo data files into lakehouse.\")" ] }, { @@ -1419,7 +1406,9 @@ "outputs": [], "source": [ "# code for saving userRecs into lakehouse\n", - "userRecs.write.mode(\"overwrite\").parquet(f\"{DATA_FOLDER}/predictions/userRecs\")" + "userRecs.write.format(\"delta\").mode(\"overwrite\").save(\n", + " f\"{DATA_FOLDER}/predictions/userRecs\"\n", + ")" ] }, { diff --git a/notebooks/community/aisamples/AIsample - Fraud Detection.ipynb b/notebooks/community/aisamples/AIsample - Fraud Detection.ipynb index 39e066cd81..58432bb15e 100644 --- a/notebooks/community/aisamples/AIsample - Fraud Detection.ipynb +++ b/notebooks/community/aisamples/AIsample - Fraud Detection.ipynb @@ -123,7 +123,9 @@ } }, "source": [ - "### Download dataset and Upload to lakehouse" + "### Download dataset and Upload to lakehouse\n", + "\n", + "**Please add a lakehouse to the notebook before running it.**" ] }, { @@ -144,36 +146,20 @@ "source": [ "if not IS_CUSTOM_DATA:\n", " # Download demo data files into lakehouse if not exist\n", - " remote_url = \"https://synapseaisolutionsa.blob.core.windows.net/public/Credit_Card_Fraud_Detection\"\n", - " file_list = [\"creditcard.csv\"]\n", + " import os, requests\n", "\n", - " # For this demo, we first check if the dataset files are already prepared in the default lakehouse. If not, we'll download the dataset.\n", - " import os\n", - " import requests\n", + " remote_url = \"https://synapseaisolutionsa.blob.core.windows.net/public/Credit_Card_Fraud_Detection\"\n", + " fname = \"creditcard.csv\"\n", + " download_path = f\"/lakehouse/default/{DATA_FOLDER}/raw\"\n", "\n", " if not os.path.exists(\"/lakehouse/default\"):\n", - " # ask user to add a lakehouse if no default lakehouse added to the notebook.\n", - " # a new notebook will not link to any lakehouse by default.\n", - " raise FileNotFoundError(\n", - " \"Default lakehouse not found, please add a lakehouse for the notebook.\"\n", - " )\n", - " else:\n", - " # check if the needed files are already in the lakehouse, try to download if not.\n", - " # raise an error if downloading failed.\n", - " os.makedirs(f\"/lakehouse/default/{DATA_FOLDER}/raw/\", exist_ok=True)\n", - " for fname in file_list:\n", - " if not os.path.exists(f\"/lakehouse/default/{DATA_FOLDER}/raw/{fname}\"):\n", - " try:\n", - " r = requests.get(f\"{remote_url}/{fname}\", timeout=30)\n", - " with open(\n", - " f\"/lakehouse/default/{DATA_FOLDER}/raw/{fname}\", \"wb\"\n", - " ) as f:\n", - " f.write(r.content)\n", - " print(f\"Downloaded {fname} into {DATA_FOLDER}/raw/.\")\n", - " except Exception as e:\n", - " print(f\"Failed on downloading {fname}, error message: {e}\")\n", - " else:\n", - " print(f\"{fname} already exists in {DATA_FOLDER}/raw/.\")" + " raise FileNotFoundError(\"Default lakehouse not found, please add a lakehouse.\")\n", + " os.makedirs(download_path, exist_ok=True)\n", + " if not os.path.exists(f\"{download_path}/{fname}\"):\n", + " r = requests.get(f\"{remote_url}/{fname}\", timeout=30)\n", + " with open(f\"{download_path}/{fname}\", \"wb\") as f:\n", + " f.write(r.content)\n", + " print(\"Downloaded demo data files into lakehouse.\")" ] }, { @@ -972,7 +958,7 @@ "outputs": [], "source": [ "# code for saving predictions into lakehouse\n", - "batch_predictions.write.mode(\"overwrite\").parquet(\n", + "batch_predictions.write.format(\"delta\").mode(\"overwrite\").save(\n", " f\"{DATA_FOLDER}/predictions/batch_predictions\"\n", ")" ] diff --git a/notebooks/community/aisamples/AIsample - Title Genre Classification.ipynb b/notebooks/community/aisamples/AIsample - Title Genre Classification.ipynb index 5cdcd65da9..893a75dbc1 100644 --- a/notebooks/community/aisamples/AIsample - Title Genre Classification.ipynb +++ b/notebooks/community/aisamples/AIsample - Title Genre Classification.ipynb @@ -99,7 +99,7 @@ }, "outputs": [], "source": [ - "IS_CUSTOMER_DATA = False # if True, dataset has to be uploaded manually by user\n", + "IS_CUSTOM_DATA = False # if True, dataset has to be uploaded manually by user\n", "DATA_FOLDER = \"Files/title-genre-classification\"\n", "DATA_FILE = \"blbooksgenre.csv\"\n", "\n", @@ -199,7 +199,9 @@ } }, "source": [ - "### Download dataset and upload to lakehouse" + "### Download dataset and upload to lakehouse\n", + "\n", + "**Please add a lakehouse to the notebook before running it.**" ] }, { @@ -221,38 +223,22 @@ }, "outputs": [], "source": [ - "if not IS_CUSTOMER_DATA:\n", + "if not IS_CUSTOM_DATA:\n", " # Download demo data files into lakehouse if not exist\n", - " remote_url = \"https://synapseaisolutionsa.blob.core.windows.net/public/Title_Genre_Classification\"\n", - " file_list = [\"blbooksgenre.csv\"]\n", + " import os, requests\n", "\n", - " # For this demo, we first check if the dataset files are already prepared in the default lakehouse. If not, we'll download the dataset.\n", - " import os\n", - " import requests\n", + " remote_url = \"https://synapseaisolutionsa.blob.core.windows.net/public/Title_Genre_Classification\"\n", + " fname = \"blbooksgenre.csv\"\n", + " download_path = f\"/lakehouse/default/{DATA_FOLDER}/raw\"\n", "\n", " if not os.path.exists(\"/lakehouse/default\"):\n", - " # ask user to add a lakehouse if no default lakehouse added to the notebook.\n", - " # a new notebook will not link to any lakehouse by default.\n", - " raise FileNotFoundError(\n", - " \"Default lakehouse not found, please add a lakehouse for the notebook.\"\n", - " )\n", - " else:\n", - " # check if the needed files are already in the lakehouse, try to download if not.\n", - " # raise an error if downloading failed.\n", - " os.makedirs(f\"/lakehouse/default/{DATA_FOLDER}/raw/\", exist_ok=True)\n", - " for fname in file_list:\n", - " if not os.path.exists(f\"/lakehouse/default/{DATA_FOLDER}/raw/{fname}\"):\n", - " try:\n", - " r = requests.get(f\"{remote_url}/{fname}\", timeout=30)\n", - " with open(\n", - " f\"/lakehouse/default/{DATA_FOLDER}/raw/{fname}\", \"wb\"\n", - " ) as f:\n", - " f.write(r.content)\n", - " print(f\"Downloaded {fname} into {DATA_FOLDER}/raw/.\")\n", - " except Exception as e:\n", - " print(f\"Failed on downloading {fname}, error message: {e}\")\n", - " else:\n", - " print(f\"{fname} already exists in {DATA_FOLDER}/raw/.\")" + " raise FileNotFoundError(\"Default lakehouse not found, please add a lakehouse.\")\n", + " os.makedirs(download_path, exist_ok=True)\n", + " if not os.path.exists(f\"{download_path}/{fname}\"):\n", + " r = requests.get(f\"{remote_url}/{fname}\", timeout=30)\n", + " with open(f\"{download_path}/{fname}\", \"wb\") as f:\n", + " f.write(r.content)\n", + " print(\"Downloaded demo data files into lakehouse.\")" ] }, { diff --git a/notebooks/community/aisamples/AIsample - Uplift Modelling.ipynb b/notebooks/community/aisamples/AIsample - Uplift Modelling.ipynb index 6ac2ab76b8..4f68d43fd8 100644 --- a/notebooks/community/aisamples/AIsample - Uplift Modelling.ipynb +++ b/notebooks/community/aisamples/AIsample - Uplift Modelling.ipynb @@ -89,14 +89,16 @@ }, "outputs": [], "source": [ - "IS_CUSTOMER_DATA = False # if True, dataset has to be uploaded manually by user\n", + "IS_CUSTOM_DATA = False # if True, dataset has to be uploaded manually by user\n", "DATA_FOLDER = \"Files/uplift-modelling\"\n", "DATA_FILE = \"criteo-research-uplift-v2.1.csv\"\n", "\n", "# data schema\n", "FEATURE_COLUMNS = [f\"f{i}\" for i in range(12)]\n", "TREATMENT_COLUMN = \"treatment\"\n", - "LABEL_COLUMN = \"visit\"" + "LABEL_COLUMN = \"visit\"\n", + "\n", + "EXPERIMENT_NAME = \"aisample-upliftmodelling\" # mlflow experiment name" ] }, { @@ -139,7 +141,11 @@ "from synapse.ml.train import ComputeModelStatistics\n", "\n", "import os\n", - "import gzip" + "import gzip\n", + "\n", + "import mlflow\n", + "import trident.mlflow\n", + "from trident.mlflow import get_sds_url" ] }, { @@ -152,7 +158,9 @@ } }, "source": [ - "### Download dataset and upload to lakehouse" + "### Download dataset and upload to lakehouse\n", + "\n", + "**Please add a lakehouse to the notebook before running it.**" ] }, { @@ -201,47 +209,25 @@ }, "outputs": [], "source": [ - "if not IS_CUSTOMER_DATA:\n", + "if not IS_CUSTOM_DATA:\n", " # Download demo data files into lakehouse if not exist\n", + " import os, requests\n", + "\n", " remote_url = \"http://go.criteo.net/criteo-research-uplift-v2.1.csv.gz\"\n", " download_file = \"criteo-research-uplift-v2.1.csv.gz\"\n", - "\n", - " # For this demo, we first check if the dataset files are already prepared in the default lakehouse. If not, we'll download the dataset.\n", - " import os\n", - " import requests\n", + " download_path = f\"/lakehouse/default/{DATA_FOLDER}/raw\"\n", "\n", " if not os.path.exists(\"/lakehouse/default\"):\n", - " # ask user to add a lakehouse if no default lakehouse added to the notebook.\n", - " # a new notebook will not link to any lakehouse by default.\n", - " raise FileNotFoundError(\n", - " \"Default lakehouse not found, please add a lakehouse for the notebook.\"\n", - " )\n", - " else:\n", - " # check if the needed files are already in the lakehouse, try to download if not.\n", - " # raise an error if downloading failed.\n", - " os.makedirs(f\"/lakehouse/default/{DATA_FOLDER}/raw/\", exist_ok=True)\n", - "\n", - " if not os.path.exists(f\"/lakehouse/default/{DATA_FOLDER}/raw/{DATA_FILE}\"):\n", - " try:\n", - " r = requests.get(f\"{remote_url}\", timeout=30)\n", - " with open(\n", - " f\"/lakehouse/default/{DATA_FOLDER}/raw/{download_file}\", \"wb\"\n", - " ) as f:\n", - " f.write(r.content)\n", - " print(f\"Downloaded {download_file} into {DATA_FOLDER}/raw/.\")\n", - "\n", - " with gzip.open(\n", - " f\"/lakehouse/default/{DATA_FOLDER}/raw/{download_file}\", \"rb\"\n", - " ) as fin:\n", - " with open(\n", - " f\"/lakehouse/default/{DATA_FOLDER}/raw/{DATA_FILE}\", \"wb\"\n", - " ) as fout:\n", - " fout.write(fin.read())\n", - " print(f\"Unzip {download_file} into {DATA_FOLDER}/raw/{DATA_FILE}.\")\n", - " except Exception as e:\n", - " print(f\"Failed on downloading {DATA_FILE}, error message: {e}\")\n", - " else:\n", - " print(f\"{DATA_FILE} already exists in {DATA_FOLDER}/raw/.\")" + " raise FileNotFoundError(\"Default lakehouse not found, please add a lakehouse.\")\n", + " os.makedirs(download_path, exist_ok=True)\n", + " if not os.path.exists(f\"{download_path}/{DATA_FILE}\"):\n", + " r = requests.get(f\"{remote_url}\", timeout=30)\n", + " with open(f\"{download_path}/{download_file}\", \"wb\") as f:\n", + " f.write(r.content)\n", + " with gzip.open(f\"{download_path}/{download_file}\", \"rb\") as fin:\n", + " with open(f\"{download_path}/{DATA_FILE}\", \"wb\") as fout:\n", + " fout.write(fin.read())\n", + " print(\"Downloaded demo data files into lakehouse.\")" ] }, { @@ -261,7 +247,6 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": false, "jupyter": { "outputs_hidden": false, "source_hidden": false @@ -578,7 +563,6 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": false, "jupyter": { "outputs_hidden": false, "source_hidden": false @@ -815,6 +799,67 @@ "\n", "print(\"Uplift score higher than {:.4f} are Persuadables\".format(cutoff_score))" ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Log and Load Model with MLFlow\n", + "Now that we have a trained model, we can save it for later use. Here we use mlflow to log metrics and models, \. We can also use this API to load models for prediction." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# setup mlflow\n", + "mlflow.set_tracking_uri(get_sds_url())\n", + "mlflow.set_registry_uri(get_sds_url())\n", + "mlflow.set_experiment(EXPERIMENT_NAME)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# log model, metrics and params\n", + "with mlflow.start_run() as run:\n", + " print(\"log model:\")\n", + " mlflow.spark.log_model(\n", + " treatment_model,\n", + " f\"{EXPERIMENT_NAME}-treatmentmodel\",\n", + " registered_model_name=f\"{EXPERIMENT_NAME}-treatmentmodel\",\n", + " dfs_tmpdir=\"Files/spark\",\n", + " )\n", + "\n", + " mlflow.spark.log_model(\n", + " control_model,\n", + " f\"{EXPERIMENT_NAME}-controlmodel\",\n", + " registered_model_name=f\"{EXPERIMENT_NAME}-controlmodel\",\n", + " dfs_tmpdir=\"Files/spark\",\n", + " )\n", + "\n", + " model_uri = f\"runs:/{run.info.run_id}/{EXPERIMENT_NAME}\"\n", + " print(\"Model saved in run %s\" % run.info.run_id)\n", + " print(f\"Model URI: {model_uri}-treatmentmodel\")\n", + " print(f\"Model URI: {model_uri}-controlmodel\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# load model back\n", + "loaded_treatmentmodel = mlflow.spark.load_model(\n", + " f\"{model_uri}-treatmentmodel\", dfs_tmpdir=\"Files/spark\"\n", + ")" + ] } ], "metadata": { @@ -822,7 +867,7 @@ "name": "synapse_pyspark" }, "kernelspec": { - "display_name": "Python 3.8.8 ('base')", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, @@ -862,10 +907,10 @@ }, "vscode": { "interpreter": { - "hash": "1d10ca6e668f54b54a282e8fffa4324e72130593ca9a1b635e16a1de3383a887" + "hash": "8cebba326b76ca708172f0a6a24a89689a3b64f83dbd9353b827f2f4b33d3f80" } } }, "nbformat": 4, - "nbformat_minor": 0 + "nbformat_minor": 1 }