diff --git a/notebooks/community/aisamples/AIsample - Book Recommendation.ipynb b/notebooks/community/aisamples/AIsample - Book Recommendation.ipynb deleted file mode 100644 index a9efdde068..0000000000 --- a/notebooks/community/aisamples/AIsample - Book Recommendation.ipynb +++ /dev/null @@ -1,1495 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "# Creating, Evaluating, and Deploying a Recommendation System" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "## Introduction\n", - "In this notebook, we'll demonstrate data engineering and data science workflow with an e2e sample. The scenario is to build a recommender for online book recommendation.\n", - "\n", - "\n", - "There are different types of recommendation algorithms, we'll use a model based collaborative filtering algorithm named Alternating Least Squares (ALS) matrix factorization in this notebook.\n", - "\n", - "\n", - "ALS attempts to estimate the ratings matrix R as the product of two lower-rank matrices, X and Y, i.e. X * Yt = R. Typically these approximations are called ‘factor’ matrices. \n", - "The general approach is iterative. During each iteration, one of the factor matrices is held constant, while the other is solved for using least squares. The newly solved factor matrix is \n", - "then held constant while solving for the other factor matrix.\n", - "\n", - "\n" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "## Step 1: Load the Data\n", - "\n", - "```\n", - "+--- Book-Recommendation-Dataset\n", - "| +--- Books.csv\n", - "| +--- Ratings.csv\n", - "| +--- Users.csv\n", - "```\n", - "- Books.csv\n", - "\n", - "|ISBN|Book-Title|Book-Author|Year-Of-Publication|Publisher|Image-URL-S|Image-URL-M|Image-URL-L|\n", - "|---|---|---|---|---|---|---|---|\n", - "|0195153448|Classical Mythology|Mark P. O. Morford|2002|Oxford University Press|http://images.amazon.com/images/P/0195153448.01.THUMBZZZ.jpg|http://images.amazon.com/images/P/0195153448.01.MZZZZZZZ.jpg|http://images.amazon.com/images/P/0195153448.01.LZZZZZZZ.jpg|\n", - "|0002005018|Clara Callan|Richard Bruce Wright|2001|HarperFlamingo Canada|http://images.amazon.com/images/P/0002005018.01.THUMBZZZ.jpg|http://images.amazon.com/images/P/0002005018.01.MZZZZZZZ.jpg|http://images.amazon.com/images/P/0002005018.01.LZZZZZZZ.jpg|\n", - "\n", - "- Ratings.csv\n", - "\n", - "|User-ID|ISBN|Book-Rating|\n", - "|---|---|---|\n", - "|276725|034545104X|0|\n", - "|276726|0155061224|5|\n", - "\n", - "- Users.csv\n", - "\n", - "|User-ID|Location|Age|\n", - "|---|---|---|\n", - "|1|\"nyc| new york| usa\"||\n", - "|2|\"stockton| california| usa\"|18.0|" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "**By defining below parameters, we can apply this notebook on different datasets easily.**" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "IS_CUSTOM_DATA = False # if True, dataset has to be uploaded manually\n", - "\n", - "USER_ID_COL = \"User-ID\" # must not be '_user_id' for this notebook to run successfully\n", - "ITEM_ID_COL = \"ISBN\" # must not be '_item_id' for this notebook to run successfully\n", - "ITEM_INFO_COL = (\n", - " \"Book-Title\" # must not be '_item_info' for this notebook to run successfully\n", - ")\n", - "RATING_COL = (\n", - " \"Book-Rating\" # must not be '_rating' for this notebook to run successfully\n", - ")\n", - "IS_SAMPLE = True # if True, use only rows of data for training, otherwise use all data\n", - "SAMPLE_ROWS = 5000 # if IS_SAMPLE is True, use only this number of rows for training\n", - "\n", - "DATA_FOLDER = \"Files/book-recommendation/\" # folder containing the dataset\n", - "ITEMS_FILE = \"Books.csv\" # file containing the items information\n", - "USERS_FILE = \"Users.csv\" # file containing the users information\n", - "RATINGS_FILE = \"Ratings.csv\" # file containing the ratings information\n", - "\n", - "EXPERIMENT_NAME = \"aisample-recommendation\" # mlflow experiment name" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Download dataset and Upload to lakehouse\n", - "\n", - "**Please add a lakehouse to the notebook before running it.**" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "if not IS_CUSTOM_DATA:\n", - " # Download demo data files into lakehouse if not exist\n", - " import os, requests\n", - "\n", - " remote_url = \"https://synapseaisolutionsa.blob.core.windows.net/public/Book-Recommendation-Dataset\"\n", - " file_list = [\"Books.csv\", \"Ratings.csv\", \"Users.csv\"]\n", - " download_path = f\"/lakehouse/default/{DATA_FOLDER}/raw\"\n", - "\n", - " if not os.path.exists(\"/lakehouse/default\"):\n", - " raise FileNotFoundError(\n", - " \"Default lakehouse not found, please add a lakehouse and restart the session.\"\n", - " )\n", - " os.makedirs(download_path, exist_ok=True)\n", - " for fname in file_list:\n", - " if not os.path.exists(f\"{download_path}/{fname}\"):\n", - " r = requests.get(f\"{remote_url}/{fname}\", timeout=30)\n", - " with open(f\"{download_path}/{fname}\", \"wb\") as f:\n", - " f.write(r.content)\n", - " print(\"Downloaded demo data files into lakehouse.\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# to record the notebook running time\n", - "import time\n", - "\n", - "ts = time.time()" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Read data from lakehouse" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "advisor": { - "adviceMetadata": "{\"artifactId\":\"d0fa4e54-1ffe-4239-9441-a0432a4693ed\",\"activityId\":\"4c706a11-e7cd-47d0-9419-6e506cf48447\",\"applicationId\":\"application_1660013541887_0001\",\"jobGroupId\":\"8\",\"advices\":{\"error\":1}}" - }, - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "df_items = (\n", - " spark.read.option(\"header\", True)\n", - " .option(\"inferSchema\", True)\n", - " .csv(f\"{DATA_FOLDER}/raw/{ITEMS_FILE}\")\n", - " .cache()\n", - ")\n", - "\n", - "df_ratings = (\n", - " spark.read.option(\"header\", True)\n", - " .option(\"inferSchema\", True)\n", - " .csv(f\"{DATA_FOLDER}/raw/{RATINGS_FILE}\")\n", - " .cache()\n", - ")\n", - "\n", - "df_users = (\n", - " spark.read.option(\"header\", True)\n", - " .option(\"inferSchema\", True)\n", - " .csv(f\"{DATA_FOLDER}/raw/{USERS_FILE}\")\n", - " .cache()\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "## Step 2. Exploratory Data Analysis\n", - "\n", - "### Display Raw Data\n", - "\n", - "We can explore the raw data with `display`, do some basic statistics or even show chart views." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "import pyspark.sql.functions as F\n", - "from pyspark.ml.feature import StringIndexer" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "display(df_items, summary=True)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "Add `_item_id` column for later usage. `_item_id` must be integer for recommendation models. Here we leverage `StringIndexer` to transform `ITEM_ID_COL` to indices." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "df_items = (\n", - " StringIndexer(inputCol=ITEM_ID_COL, outputCol=\"_item_id\")\n", - " .setHandleInvalid(\"skip\")\n", - " .fit(df_items)\n", - " .transform(df_items)\n", - " .withColumn(\"_item_id\", F.col(\"_item_id\").cast(\"int\"))\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Display and check if the `_item_id` increases monotonically and successively as we expected." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "display(df_items.sort(F.col(\"_item_id\").desc()))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "display(df_users, summary=True)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "There is a missing value in `User-ID`, we'll drop the row with missing value. It doesn't matter if customized dataset doesn't have missing value." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "df_users = df_users.dropna(subset=(USER_ID_COL))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "display(df_users, summary=True)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "Add `_user_id` column for later usage. `_user_id` must be integer for recommendation models. Here we leverage `StringIndexer` to transform `USER_ID_COL` to indices.\n", - "\n", - "In this book dataset, we already have `User-ID` column which is integer. But we still add `_user_id` column \n", - "for compatibility to different datasets, making this notebook more robust." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "df_users = (\n", - " StringIndexer(inputCol=USER_ID_COL, outputCol=\"_user_id\")\n", - " .setHandleInvalid(\"skip\")\n", - " .fit(df_users)\n", - " .transform(df_users)\n", - " .withColumn(\"_user_id\", F.col(\"_user_id\").cast(\"int\"))\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "display(df_users.sort(F.col(\"_user_id\").desc()))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "display(df_ratings, summary=True)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Get the distinct ratings and save them to a list `ratings` for later use." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "ratings = [i[0] for i in df_ratings.select(RATING_COL).distinct().collect()]\n", - "print(ratings)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Merge data\n", - "Merge raw dataframes into one dataframe for more comprehensive analysis." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "df_all = df_ratings.join(df_users, USER_ID_COL, \"inner\").join(\n", - " df_items, ITEM_ID_COL, \"inner\"\n", - ")\n", - "df_all_columns = [\n", - " c for c in df_all.columns if c not in [\"_user_id\", \"_item_id\", RATING_COL]\n", - "]\n", - "\n", - "# with this step, we can reorder the columns to make sure _user_id, _item_id and RATING_COL are the first three columns\n", - "df_all = (\n", - " df_all.select([\"_user_id\", \"_item_id\", RATING_COL] + df_all_columns)\n", - " .withColumn(\"id\", F.monotonically_increasing_id())\n", - " .cache()\n", - ")\n", - "\n", - "display(df_all)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "print(f\"Total Users: {df_users.select('_user_id').distinct().count()}\")\n", - "print(f\"Total Items: {df_items.select('_item_id').distinct().count()}\")\n", - "print(f\"Total User-Item Interactions: {df_all.count()}\")" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Compute and Plot most popular items" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# import libs\n", - "\n", - "import pandas as pd # dataframes\n", - "import matplotlib.pyplot as plt # plotting\n", - "import seaborn as sns # plotting\n", - "\n", - "color = sns.color_palette() # adjusting plotting style" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# compute top popular products\n", - "df_top_items = (\n", - " df_all.groupby([\"_item_id\"])\n", - " .count()\n", - " .join(df_items, \"_item_id\", \"inner\")\n", - " .sort([\"count\"], ascending=[0])\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# find top popular items\n", - "topn = 10\n", - "pd_top_items = df_top_items.limit(topn).toPandas()\n", - "pd_top_items.head()" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "Top `` popular items, which can be used for **recommendation section \"Popular\"** or **\"Top purchased\"**." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# Plot top items\n", - "f, ax = plt.subplots(figsize=(12, 10))\n", - "plt.xticks(rotation=\"vertical\")\n", - "sns.barplot(x=ITEM_INFO_COL, y=\"count\", data=pd_top_items)\n", - "plt.ylabel(\"Number of Ratings for the Item\")\n", - "plt.xlabel(\"Item Name\")\n", - "plt.show()" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "## Step 3. Model development and deploy\n", - "\n", - "So far, we have explored the dataset, added unique ids to our users and items, and plotted top items. Next, we'll train an Alternating Least Squares (ALS) recommender to give users personalized recommendations" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Prepare training and testing data" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "if IS_SAMPLE:\n", - " # need to sort by '_user_id' before limit, so as to make sure ALS work normally.\n", - " # if train and test dataset have no common _user_id, ALS will fail\n", - " df_all = df_all.sort(\"_user_id\").limit(SAMPLE_ROWS)\n", - "\n", - "# cast column into the correct types\n", - "df_all = df_all.withColumn(RATING_COL, F.col(RATING_COL).cast(\"float\"))\n", - "\n", - "# By using fraction between 0 to 1, it returns the approximate number of the fraction of the dataset.\n", - "# fraction = 0.8 means 80% of the dataset.\n", - "# Note that rating = 0 means the user didn't rate the item, so we can't use it for training.\n", - "# With below steps, we'll select 80% the dataset with rating > 0 as training dataset.\n", - "fractions_train = {0: 0}\n", - "fractions_test = {0: 0}\n", - "for i in ratings:\n", - " if i == 0:\n", - " continue\n", - " fractions_train[i] = 0.8\n", - " fractions_test[i] = 1\n", - "train = df_all.sampleBy(RATING_COL, fractions=fractions_train)\n", - "\n", - "# join with leftanti means not in, thus below step will select all rows from df_all\n", - "# with rating > 0 and not in train dataset, i.e., the left 20% of the dataset as test dataset.\n", - "test = df_all.join(train, on=\"id\", how=\"leftanti\").sampleBy(\n", - " RATING_COL, fractions=fractions_test\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# compute the sparsity of the dataset\n", - "def get_mat_sparsity(ratings):\n", - " # Count the total number of ratings in the dataset\n", - " count_nonzero = ratings.select(RATING_COL).count()\n", - " print(f\"Number of rows: {count_nonzero}\")\n", - "\n", - " # Count the number of distinct user_id and distinct product_id\n", - " total_elements = (\n", - " ratings.select(\"_user_id\").distinct().count()\n", - " * ratings.select(\"_item_id\").distinct().count()\n", - " )\n", - "\n", - " # Divide the numerator by the denominator\n", - " sparsity = (1.0 - (count_nonzero * 1.0) / total_elements) * 100\n", - " print(\"The ratings dataframe is \", \"%.4f\" % sparsity + \"% sparse.\")\n", - "\n", - "\n", - "get_mat_sparsity(df_all)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# check the id range\n", - "# ALS only supports values in Integer range\n", - "print(f\"max user_id: {df_all.agg({'_user_id': 'max'}).collect()[0][0]}\")\n", - "print(f\"max user_id: {df_all.agg({'_item_id': 'max'}).collect()[0][0]}\")" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Define the Model\n", - "\n", - "With our data in place, we can now define the recommendation model. We'll apply Alternating Least Squares (ALS) \n", - "model in this notebook. \n", - "\n", - "Spark ML provides a convenient API in building the model. However, the model is not good enough at \n", - "handling problems like data sparsity and cold start. We'll combine cross validation and auto hyperparameter tuning \n", - "to improve the performance of the model." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# Specify training parameters\n", - "num_epochs = 1\n", - "rank_size_list = [64, 128]\n", - "reg_param_list = [0.01, 0.1]\n", - "model_tuning_method = \"TrainValidationSplit\" # TrainValidationSplit or CrossValidator" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "from pyspark.ml.evaluation import RegressionEvaluator\n", - "from pyspark.ml.recommendation import ALS\n", - "from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit\n", - "\n", - "# Build the recommendation model using ALS on the training data\n", - "# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics\n", - "als = ALS(\n", - " maxIter=num_epochs,\n", - " userCol=\"_user_id\",\n", - " itemCol=\"_item_id\",\n", - " ratingCol=RATING_COL,\n", - " coldStartStrategy=\"drop\",\n", - " implicitPrefs=False,\n", - " nonnegative=True,\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Model training and hyper-tunning" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# Define tuning parameters\n", - "param_grid = (\n", - " ParamGridBuilder()\n", - " .addGrid(als.rank, rank_size_list)\n", - " .addGrid(als.regParam, reg_param_list)\n", - " .build()\n", - ")\n", - "\n", - "print(\"Number of models to be tested: \", len(param_grid))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# Define evaluator, set rmse as loss\n", - "evaluator = RegressionEvaluator(\n", - " metricName=\"rmse\", labelCol=RATING_COL, predictionCol=\"prediction\"\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# Build cross validation using CrossValidator and TrainValidationSplit\n", - "if model_tuning_method == \"CrossValidator\":\n", - " tuner = CrossValidator(\n", - " estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5\n", - " )\n", - "elif model_tuning_method == \"TrainValidationSplit\":\n", - " tuner = TrainValidationSplit(\n", - " estimator=als,\n", - " estimatorParamMaps=param_grid,\n", - " evaluator=evaluator,\n", - " # 80% of the data will be used for training, 20% for validation.\n", - " trainRatio=0.8,\n", - " )\n", - "else:\n", - " raise ValueError(f\"Unknown model_tuning_method: {model_tuning_method}\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "import numpy as np\n", - "import pandas as pd\n", - "\n", - "# Train and Extract best model\n", - "models = tuner.fit(train)\n", - "model = models.bestModel\n", - "\n", - "if model_tuning_method == \"CrossValidator\":\n", - " metrics = models.avgMetrics\n", - "elif model_tuning_method == \"TrainValidationSplit\":\n", - " metrics = models.validationMetrics\n", - "else:\n", - " raise ValueError(f\"Unknown model_tuning_method: {model_tuning_method}\")\n", - "\n", - "param_maps = models.getEstimatorParamMaps()\n", - "best_params = param_maps[np.argmin(metrics)]\n", - "pd_metrics = pd.DataFrame(data={\"Metric\": metrics})\n", - "\n", - "print(\"** Best Model **\")\n", - "for k in best_params:\n", - " print(f\"{k.name}: {best_params[k]}\")\n", - "\n", - "# collect metrics\n", - "param_strings = []\n", - "for param_map in param_maps:\n", - " # use split to remove the prefix 'ALS__' in param name\n", - " param_strings.append(\n", - " \" \".join(f\"{str(k).split('_')[-1]}={v}\" for (k, v) in param_map.items())\n", - " )\n", - "pd_metrics[\"Params\"] = param_strings" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# Plot metrics of different submodels\n", - "f, ax = plt.subplots(figsize=(12, 5))\n", - "sns.lineplot(x=pd_metrics[\"Params\"], y=pd_metrics[\"Metric\"])\n", - "plt.ylabel(\"Loss: RMSE\")\n", - "plt.xlabel(\"Params\")\n", - "plt.title(\"Loss of SubModels\")\n", - "plt.show()" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Model Evaluation\n", - "\n", - "We now have the best model, then we can do more evaluations on the test data. \n", - "If we trained the model well, it should have high metrics on both train and test datasets.\n", - "If we see only good metrics on train, then the model is overfitted, we may need to increase training data size.\n", - "If we see bad metrics on both datasets, then the model is not defined well, \n", - "we may need to change model architecture or at least fine tune hyper parameters." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "def evaluate(model, data):\n", - " \"\"\"\n", - " Evaluate the model by computing rmse, mae, r2 and var over the data.\n", - " \"\"\"\n", - "\n", - " predictions = model.transform(data).withColumn(\n", - " \"prediction\", F.col(\"prediction\").cast(\"double\")\n", - " )\n", - "\n", - " # show 10 predictions\n", - " predictions.select(\"_user_id\", \"_item_id\", RATING_COL, \"prediction\").limit(\n", - " 10\n", - " ).show()\n", - "\n", - " # initialize the regression evaluator\n", - " evaluator = RegressionEvaluator(predictionCol=\"prediction\", labelCol=RATING_COL)\n", - "\n", - " _evaluator = lambda metric: evaluator.setMetricName(metric).evaluate(predictions)\n", - " rmse = _evaluator(\"rmse\")\n", - " mae = _evaluator(\"mae\")\n", - " r2 = _evaluator(\"r2\")\n", - " var = _evaluator(\"var\")\n", - "\n", - " print(f\"RMSE score = {rmse}\")\n", - " print(f\"MAE score = {mae}\")\n", - " print(f\"R2 score = {r2}\")\n", - " print(f\"Explained variance = {var}\")\n", - "\n", - " return predictions, (rmse, mae, r2, var)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "Evaluation on training data" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "_ = evaluate(model, train)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "Evaluation on test data.\n", - "\n", - "If R2 is negative, it means the trained model is actually worse than a horizontal straight line." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "_, (rmse, mae, r2, var) = evaluate(model, test)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Log and Load Model with MLFlow\n", - "Now we get a pretty good model, we can save it for later use. Here we use mlflow to log metrics/models, and load models back for prediction." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# setup mlflow\n", - "import mlflow\n", - "\n", - "mlflow.set_experiment(EXPERIMENT_NAME)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# log model, metrics and params\n", - "with mlflow.start_run() as run:\n", - " print(\"log model:\")\n", - " mlflow.spark.log_model(\n", - " model,\n", - " f\"{EXPERIMENT_NAME}-alsmodel\",\n", - " registered_model_name=f\"{EXPERIMENT_NAME}-alsmodel\",\n", - " dfs_tmpdir=\"Files/spark\",\n", - " )\n", - "\n", - " print(\"log metrics:\")\n", - " mlflow.log_metrics({\"RMSE\": rmse, \"MAE\": mae, \"R2\": r2, \"Explained variance\": var})\n", - "\n", - " print(\"log parameters:\")\n", - " mlflow.log_params(\n", - " {\n", - " \"num_epochs\": num_epochs,\n", - " \"rank_size_list\": rank_size_list,\n", - " \"reg_param_list\": reg_param_list,\n", - " \"model_tuning_method\": model_tuning_method,\n", - " \"DATA_FOLDER\": DATA_FOLDER,\n", - " }\n", - " )\n", - "\n", - " model_uri = f\"runs:/{run.info.run_id}/{EXPERIMENT_NAME}-alsmodel\"\n", - " print(\"Model saved in run %s\" % run.info.run_id)\n", - " print(f\"Model URI: {model_uri}\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# load model back\n", - "# mlflow will use PipelineModel to wrapper the original model, thus here we extract the original ALSModel from the stages.\n", - "loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir=\"Files/spark\").stages[-1]" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "## Step 4. Save Prediction Results" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Model Deploy and Prediction" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "#### Offline Recommendation\n", - "Recommend 10 items for each user\n", - "\n", - "##### Save offline recommendation results" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# Generate top 10 product recommendations for each user\n", - "userRecs = loaded_model.recommendForAllUsers(10)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# convert recommendations into interpretable format\n", - "userRecs = (\n", - " userRecs.withColumn(\"rec_exp\", F.explode(\"recommendations\"))\n", - " .select(\"_user_id\", F.col(\"rec_exp._item_id\"), F.col(\"rec_exp.rating\"))\n", - " .join(df_items.select([\"_item_id\", \"Book-Title\"]), on=\"_item_id\")\n", - ")\n", - "userRecs.limit(10).show()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# code for saving userRecs into lakehouse\n", - "userRecs.write.format(\"delta\").mode(\"overwrite\").save(\n", - " f\"{DATA_FOLDER}/predictions/userRecs\"\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "print(f\"Full run cost {int(time.time() - ts)} seconds.\")" - ] - } - ], - "metadata": { - "kernel_info": { - "name": "synapse_pyspark" - }, - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.9.12" - }, - "notebook_environment": {}, - "save_output": true, - "spark_compute": { - "compute_id": "/trident/default", - "session_options": { - "conf": { - "spark.dynamicAllocation.enabled": "false", - "spark.dynamicAllocation.maxExecutors": "2", - "spark.dynamicAllocation.minExecutors": "2", - "spark.livy.synapse.ipythonInterpreter.enabled": "true" - }, - "driverCores": 8, - "driverMemory": "56g", - "enableDebugMode": false, - "executorCores": 8, - "executorMemory": "56g", - "keepAliveTimeout": 30, - "numExecutors": 5 - } - }, - "synapse_widget": { - "state": {}, - "version": "0.1" - }, - "trident": { - "lakehouse": { - "default_lakehouse": "", - "known_lakehouses": [ - { - "id": "" - } - ] - } - }, - "vscode": { - "interpreter": { - "hash": "8cebba326b76ca708172f0a6a24a89689a3b64f83dbd9353b827f2f4b33d3f80" - } - } - }, - "nbformat": 4, - "nbformat_minor": 1 -} diff --git a/notebooks/community/aisamples/AIsample - Fraud Detection.ipynb b/notebooks/community/aisamples/AIsample - Fraud Detection.ipynb deleted file mode 100644 index a03bb1c735..0000000000 --- a/notebooks/community/aisamples/AIsample - Fraud Detection.ipynb +++ /dev/null @@ -1,1047 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Creating, Evaluating, and Deploying a Fraud Detection Model" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "## Introduction\n", - "\n", - "In this notebook, we'll demonstrate data engineering and data science work flow with an e2e sample. The scenario is to build a model for detecting fraud credit card transactions." - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "## Step 1: Load the Data\n", - "\n", - "The dataset contains transactions made by credit cards in September 2013 by European cardholders.\n", - "This dataset presents transactions that occurred in two days, where we have 492 frauds out of 284,807 transactions. The dataset is highly unbalanced, the positive class (frauds) account for 0.172% of all transactions.\n", - "\n", - "It contains only numerical input variables which are the result of a PCA transformation. Unfortunately, due to confidentiality issues, we cannot provide the original features and more background information about the data. Features V1, V2, … V28 are the principal components obtained with PCA, the only features which have not been transformed with PCA are 'Time' and 'Amount'. Feature 'Time' contains the seconds elapsed between each transaction and the first transaction in the dataset. The feature 'Amount' is the transaction Amount, this feature can be used for example-dependent cost-sensitive learning. Feature 'Class' is the response variable, and it takes value 1 in case of fraud and 0 otherwise.\n", - "\n", - "Given the class imbalance ratio, we recommend measuring the accuracy using the Area Under the Precision-Recall Curve (AUPRC). Confusion matrix accuracy is not meaningful for unbalanced classification." - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "- creditcard.csv\n", - "\n", - "|\"Time\"|\"V1\"|\"V2\"|\"V3\"|\"V4\"|\"V5\"|\"V6\"|\"V7\"|\"V8\"|\"V9\"|\"V10\"|\"V11\"|\"V12\"|\"V13\"|\"V14\"|\"V15\"|\"V16\"|\"V17\"|\"V18\"|\"V19\"|\"V20\"|\"V21\"|\"V22\"|\"V23\"|\"V24\"|\"V25\"|\"V26\"|\"V27\"|\"V28\"|\"Amount\"|\"Class\"|\n", - "|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|\n", - "|0|-1.3598071336738|-0.0727811733098497|2.53634673796914|1.37815522427443|-0.338320769942518|0.462387777762292|0.239598554061257|0.0986979012610507|0.363786969611213|0.0907941719789316|-0.551599533260813|-0.617800855762348|-0.991389847235408|-0.311169353699879|1.46817697209427|-0.470400525259478|0.207971241929242|0.0257905801985591|0.403992960255733|0.251412098239705|-0.018306777944153|0.277837575558899|-0.110473910188767|0.0669280749146731|0.128539358273528|-0.189114843888824|0.133558376740387|-0.0210530534538215|149.62|\"0\"|\n", - "|0|1.19185711131486|0.26615071205963|0.16648011335321|0.448154078460911|0.0600176492822243|-0.0823608088155687|-0.0788029833323113|0.0851016549148104|-0.255425128109186|-0.166974414004614|1.61272666105479|1.06523531137287|0.48909501589608|-0.143772296441519|0.635558093258208|0.463917041022171|-0.114804663102346|-0.183361270123994|-0.145783041325259|-0.0690831352230203|-0.225775248033138|-0.638671952771851|0.101288021253234|-0.339846475529127|0.167170404418143|0.125894532368176|-0.00898309914322813|0.0147241691924927|2.69|\"0\"|" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Install libraries\n", - "In this notebook, we'll use `imblearn` which first needs to be installed. The PySpark kernel will be restarted after `%pip install`, thus we need to install it before we run any other cells." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# install imblearn for SMOTE\n", - "%pip install imblearn" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "**By defining below parameters, we can apply this notebook on different datasets easily.**" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "IS_CUSTOM_DATA = False # if True, dataset has to be uploaded manually\n", - "\n", - "TARGET_COL = \"Class\" # target column name\n", - "IS_SAMPLE = False # if True, use only rows of data for training, otherwise use all data\n", - "SAMPLE_ROWS = 5000 # if IS_SAMPLE is True, use only this number of rows for training\n", - "\n", - "DATA_FOLDER = \"Files/fraud-detection/\" # folder with data files\n", - "DATA_FILE = \"creditcard.csv\" # data file name\n", - "\n", - "EXPERIMENT_NAME = \"aisample-fraud\" # mlflow experiment name" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Download dataset and Upload to lakehouse\n", - "\n", - "**Please add a lakehouse to the notebook before running it.**" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "if not IS_CUSTOM_DATA:\n", - " # Download demo data files into lakehouse if not exist\n", - " import os, requests\n", - "\n", - " remote_url = \"https://synapseaisolutionsa.blob.core.windows.net/public/Credit_Card_Fraud_Detection\"\n", - " fname = \"creditcard.csv\"\n", - " download_path = f\"/lakehouse/default/{DATA_FOLDER}/raw\"\n", - "\n", - " if not os.path.exists(\"/lakehouse/default\"):\n", - " raise FileNotFoundError(\n", - " \"Default lakehouse not found, please add a lakehouse and restart the session.\"\n", - " )\n", - " os.makedirs(download_path, exist_ok=True)\n", - " if not os.path.exists(f\"{download_path}/{fname}\"):\n", - " r = requests.get(f\"{remote_url}/{fname}\", timeout=30)\n", - " with open(f\"{download_path}/{fname}\", \"wb\") as f:\n", - " f.write(r.content)\n", - " print(\"Downloaded demo data files into lakehouse.\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# to record the notebook running time\n", - "import time\n", - "\n", - "ts = time.time()" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Read data from lakehouse" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "df = (\n", - " spark.read.format(\"csv\")\n", - " .option(\"header\", \"true\")\n", - " .option(\"inferSchema\", True)\n", - " .load(f\"{DATA_FOLDER}/raw/{DATA_FILE}\")\n", - " .cache()\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "## Step 2. Exploratory Data Analysis\n", - "\n", - "### Display Raw Data\n", - "\n", - "We can explore the raw data with `display`, do some basic statistics or even show chart views." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "display(df)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# print dataset basic info\n", - "print(\"records read: \" + str(df.count()))\n", - "print(\"Schema: \")\n", - "df.printSchema()" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Cast columns into the correct types" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "import pyspark.sql.functions as F\n", - "\n", - "df_columns = df.columns\n", - "df_columns.remove(TARGET_COL)\n", - "\n", - "# to make sure the TARGET_COL is the last column\n", - "df = df.select(df_columns + [TARGET_COL]).withColumn(\n", - " TARGET_COL, F.col(TARGET_COL).cast(\"int\")\n", - ")\n", - "\n", - "if IS_SAMPLE:\n", - " df = df.limit(SAMPLE_ROWS)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "## Step 3. Model development and deploy\n", - "So far, we have explored the dataset, checked the scheme, adjusted the columns order, and casted the columns into correct types.\n", - "\n", - "Next, we'll train a lightgbm model to classify fraud transactions." - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Prepare training and testing data" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# Split the dataset into train and test\n", - "train, test = df.randomSplit([0.85, 0.15], seed=42)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# Merge Columns\n", - "from pyspark.ml.feature import VectorAssembler\n", - "\n", - "feature_cols = df.columns[:-1]\n", - "featurizer = VectorAssembler(inputCols=feature_cols, outputCol=\"features\")\n", - "train_data = featurizer.transform(train)[TARGET_COL, \"features\"]\n", - "test_data = featurizer.transform(test)[TARGET_COL, \"features\"]" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Check data volume and imbalance" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "display(train_data.groupBy(TARGET_COL).count())" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Handle imbalance data\n", - "We'll apply [SMOTE](https://arxiv.org/abs/1106.1813) (Synthetic Minority Over-sampling Technique) to automatically handle imbalance data. A dataset is imbalanced if the classification categories are not approximately equally represented. Often real-world data sets are predominately composed of \"normal\" examples with only a small percentage of \"abnormal\" or \"interesting\" examples. It is also the case that the cost of misclassifying an abnormal (interesting) example as a normal example is often much higher than the cost of the reverse error. Under-sampling of the majority (normal) class has been proposed as a good means of increasing the sensitivity of a classifier to the minority class. This paper shows that a combination of our method of over-sampling the minority (abnormal) class and under-sampling the majority (normal) class can achieve better classifier performance (in ROC space) than only under-sampling the majority class. " - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "#### Apply SMOTE for new train_data \n", - "imblearn only works for pandas dataframe, not pyspark dataframe." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "from pyspark.ml.functions import vector_to_array, array_to_vector\n", - "import numpy as np\n", - "from collections import Counter\n", - "from imblearn.over_sampling import SMOTE\n", - "\n", - "train_data_array = train_data.withColumn(\"features\", vector_to_array(\"features\"))\n", - "\n", - "train_data_pd = train_data_array.toPandas()\n", - "\n", - "X = train_data_pd[\"features\"].to_numpy()\n", - "y = train_data_pd[TARGET_COL].to_numpy()\n", - "print(\"Original dataset shape %s\" % Counter(y))\n", - "\n", - "X = np.array([np.array(x) for x in X])\n", - "\n", - "sm = SMOTE(random_state=42)\n", - "X_res, y_res = sm.fit_resample(X, y)\n", - "print(\"Resampled dataset shape %s\" % Counter(y_res))\n", - "\n", - "new_train_data = tuple(zip(X_res.tolist(), y_res.tolist()))\n", - "dataColumns = [\"features\", TARGET_COL]\n", - "new_train_data = spark.createDataFrame(data=new_train_data, schema=dataColumns)\n", - "new_train_data = new_train_data.withColumn(\"features\", array_to_vector(\"features\"))" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Define the Model\n", - "\n", - "With our data in place, we can now define the model. We'll apply lightgbm model in this notebook. \n", - "\n", - "We'll leverage SynapseML to implement the model within a few lines of code." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "from synapse.ml.lightgbm import LightGBMClassifier\n", - "\n", - "model = LightGBMClassifier(\n", - " objective=\"binary\", featuresCol=\"features\", labelCol=TARGET_COL, isUnbalance=True\n", - ")\n", - "smote_model = LightGBMClassifier(\n", - " objective=\"binary\", featuresCol=\"features\", labelCol=TARGET_COL, isUnbalance=False\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Model training" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "model = model.fit(train_data)\n", - "smote_model = smote_model.fit(new_train_data)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Model Explanation\n", - "Here we can show the importance of each column." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "import pandas as pd\n", - "import matplotlib.pyplot as plt\n", - "\n", - "feature_importances = model.getFeatureImportances()\n", - "fi = pd.Series(feature_importances, index=feature_cols)\n", - "fi = fi.sort_values(ascending=True)\n", - "f_index = fi.index\n", - "f_values = fi.values\n", - "\n", - "# print feature importances\n", - "print(\"f_index:\", f_index)\n", - "print(\"f_values:\", f_values)\n", - "\n", - "# plot\n", - "x_index = list(range(len(fi)))\n", - "x_index = [x / len(fi) for x in x_index]\n", - "plt.rcParams[\"figure.figsize\"] = (20, 20)\n", - "plt.barh(\n", - " x_index, f_values, height=0.028, align=\"center\", color=\"tan\", tick_label=f_index\n", - ")\n", - "plt.xlabel(\"importances\")\n", - "plt.ylabel(\"features\")\n", - "plt.show()" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Model Evaluation" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "predictions = model.transform(test_data)\n", - "predictions.limit(10).toPandas()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "from synapse.ml.train import ComputeModelStatistics\n", - "\n", - "metrics = ComputeModelStatistics(\n", - " evaluationMetric=\"classification\", labelCol=TARGET_COL, scoredLabelsCol=\"prediction\"\n", - ").transform(predictions)\n", - "display(metrics)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# collect confusion matrix value\n", - "cm = metrics.select(\"confusion_matrix\").collect()[0][0].toArray()\n", - "print(cm)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# plot confusion matrix\n", - "import seaborn as sns\n", - "\n", - "sns.set(rc={\"figure.figsize\": (6, 4.5)})\n", - "ax = sns.heatmap(cm, annot=True, fmt=\".20g\")\n", - "ax.set_title(\"Confusion Matrix\")\n", - "ax.set_xlabel(\"Predicted label\")\n", - "ax.set_ylabel(\"True label\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n", - "\n", - "\n", - "def evaluate(predictions):\n", - " \"\"\"\n", - " Evaluate the model by computing AUROC and AUPRC with the predictions.\n", - " \"\"\"\n", - "\n", - " # initialize the binary evaluator\n", - " evaluator = BinaryClassificationEvaluator(\n", - " rawPredictionCol=\"prediction\", labelCol=TARGET_COL\n", - " )\n", - "\n", - " _evaluator = lambda metric: evaluator.setMetricName(metric).evaluate(predictions)\n", - "\n", - " # calculate AUROC, baseline 0.5\n", - " auroc = _evaluator(\"areaUnderROC\")\n", - " print(f\"AUROC: {auroc:.4f}\")\n", - "\n", - " # calculate AUPRC, baseline positive rate (0.172% in the demo data)\n", - " auprc = _evaluator(\"areaUnderPR\")\n", - " print(f\"AUPRC: {auprc:.4f}\")\n", - "\n", - " return auroc, auprc" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# evaluate the original model\n", - "auroc, auprc = evaluate(predictions)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# evaluate the SMOTE model\n", - "new_predictions = smote_model.transform(test_data)\n", - "new_auroc, new_auprc = evaluate(new_predictions)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "if new_auprc > auprc:\n", - " # Using model trained on SMOTE data if it has higher AUPRC\n", - " model = smote_model\n", - " auprc = new_auprc\n", - " auroc = new_auroc" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Log and Load Model with MLFlow\n", - "Now we get a pretty good model, we can save it for later use. Here we use mlflow to log metrics/models, and load models back for prediction." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# setup mlflow\n", - "import mlflow\n", - "\n", - "mlflow.set_experiment(EXPERIMENT_NAME)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# log model, metrics and params\n", - "with mlflow.start_run() as run:\n", - " print(\"log model:\")\n", - " mlflow.spark.log_model(\n", - " model,\n", - " f\"{EXPERIMENT_NAME}-lightgbm\",\n", - " registered_model_name=f\"{EXPERIMENT_NAME}-lightgbm\",\n", - " dfs_tmpdir=\"Files/spark\",\n", - " )\n", - "\n", - " print(\"log metrics:\")\n", - " mlflow.log_metrics({\"AUPRC\": auprc, \"AUROC\": auroc})\n", - "\n", - " print(\"log parameters:\")\n", - " mlflow.log_params({\"DATA_FILE\": DATA_FILE})\n", - "\n", - " model_uri = f\"runs:/{run.info.run_id}/{EXPERIMENT_NAME}-lightgbm\"\n", - " print(\"Model saved in run %s\" % run.info.run_id)\n", - " print(f\"Model URI: {model_uri}\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# load model back\n", - "loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir=\"Files/spark\")" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "## Step 4. Save Prediction Results" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Model Deploy and Prediction" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "batch_predictions = loaded_model.transform(test_data)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# code for saving predictions into lakehouse\n", - "batch_predictions.write.format(\"delta\").mode(\"overwrite\").save(\n", - " f\"{DATA_FOLDER}/predictions/batch_predictions\"\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "print(f\"Full run cost {int(time.time() - ts)} seconds.\")" - ] - } - ], - "metadata": { - "kernel_info": { - "name": "synapse_pyspark" - }, - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.13" - }, - "notebook_environment": {}, - "save_output": true, - "spark_compute": { - "compute_id": "/trident/default", - "session_options": { - "conf": { - "spark.dynamicAllocation.enabled": "false", - "spark.dynamicAllocation.maxExecutors": "2", - "spark.dynamicAllocation.minExecutors": "2", - "spark.livy.synapse.ipythonInterpreter.enabled": "true" - }, - "driverCores": 8, - "driverMemory": "56g", - "enableDebugMode": false, - "executorCores": 8, - "executorMemory": "56g", - "keepAliveTimeout": 30, - "numExecutors": 5 - } - }, - "synapse_widget": { - "state": {}, - "version": "0.1" - }, - "trident": { - "lakehouse": { - "default_lakehouse": "", - "known_lakehouses": [ - { - "id": "" - } - ] - } - }, - "vscode": { - "interpreter": { - "hash": "8cebba326b76ca708172f0a6a24a89689a3b64f83dbd9353b827f2f4b33d3f80" - } - } - }, - "nbformat": 4, - "nbformat_minor": 1 -} diff --git a/notebooks/community/aisamples/AIsample - Time Series Forecasting.ipynb b/notebooks/community/aisamples/AIsample - Time Series Forecasting.ipynb deleted file mode 100644 index b6e325e945..0000000000 --- a/notebooks/community/aisamples/AIsample - Time Series Forecasting.ipynb +++ /dev/null @@ -1,467 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Training and Evaluating Time Series Forecasting Model" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Introduction \n", - "\n", - "In this notebook, we will develop a program to forecast time series data which has seasonal cycles. We will use [NYC Property Sales data](https://www1.nyc.gov/site/finance/about/open-portal.page) range from 2003 to 2015 published by NYC Department of Finance on the [NYC Open Data Portal](https://opendata.cityofnewyork.us/). \n", - "\n", - "The dataset is a record of every building sold in the New York City property market during a 13-year period. Please refer to [Glossary of Terms for Property Sales Files](https://www1.nyc.gov/assets/finance/downloads/pdf/07pdf/glossary_rsf071607.pdf) for definition of columns in the spreadsheet. The dataset looks like below: \n", - "\n", - "|borouge|neighborhood|building_class_category|tax_class|block|lot|eastment|building_class_at_present|address|apartment_number|zip_code|residential_units|commercial_units|total_units|land_square_feet|gross_square_feet|year_built|tax_class_at_time_of_sale|building_class_at_time_of_sale|sale_price|sale_date|\n", - "|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|\n", - "|Manhattan|ALPHABET CITY|07 RENTALS - WALKUP APARTMENTS|0.0|384.0|17.0||C4|225 EAST 2ND STREET||10009.0|10.0|0.0|10.0|2145.0|6670.0|1900.0|2.0|C4|275000.0|2007-06-19|\n", - "|Manhattan|ALPHABET CITY|07 RENTALS - WALKUP APARTMENTS|2.0|405.0|12.0||C7|508 EAST 12TH STREET||10009.0|28.0|2.0|30.0|3872.0|15428.0|1930.0|2.0|C7|7794005.0|2007-05-21|\n", - "\n", - "We will build up a model to forecast the monthly volume of property trade based on historical data. To forecast, we will use [Facebook Prophet](https://facebook.github.io/prophet/), which provides fast and automated forecast procedure and handles seasonality well. " - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Install Prophet\n", - "\n", - "Let's first install [Facebook Prophet](https://facebook.github.io/prophet/). It uses a decomposable time series model which consists of three main components: trend, seasonality, and holidays. For the trend part, Prophet assumes piece-wise constant rate of growth with automatic change point selection. For seasonality part, Prophet models weekly and yearly seasonality using Fourier Series. Since we are using monthly data, we will not have weekly seasonality and will not consider holidays." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "!pip install prophet" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Step 1: Load the Data" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Download dataset and Upload to Lakehouse\n", - "\n", - "There are 15 csv files holding property sales records from 5 boroughs in New York from 2003 to 2015. For your convenience, these files are compressed in `nyc_property_sales.tar` and are available in a public blob storage." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "URL = \"https://synapseaisolutionsa.blob.core.windows.net/public/NYC_Property_Sales_Dataset/\"\n", - "TAR_FILE_NAME = \"nyc_property_sales.tar\"\n", - "DATA_FOLER = \"Files/NYC_Property_Sales_Dataset\"\n", - "TAR_FILE_PATH = f\"/lakehouse/default/{DATA_FOLER}/tar/\"\n", - "CSV_FILE_PATH = f\"/lakehouse/default/{DATA_FOLER}/csv/\"" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import os\n", - "\n", - "if not os.path.exists(\"/lakehouse/default\"):\n", - " # ask user to add a lakehouse if no default lakehouse added to the notebook.\n", - " # a new notebook will not link to any lakehouse by default.\n", - " raise FileNotFoundError(\n", - " \"Default lakehouse not found, please add a lakehouse and restart the session.\"\n", - " )\n", - "else:\n", - " # check if the needed files are already in the lakehouse, try to download and unzip if not.\n", - " if not os.path.exists(f\"{TAR_FILE_PATH}{TAR_FILE_NAME}\"):\n", - " os.makedirs(TAR_FILE_PATH, exist_ok=True)\n", - " os.system(f\"wget {URL}{TAR_FILE_NAME} -O {TAR_FILE_PATH}{TAR_FILE_NAME}\")\n", - "\n", - " os.makedirs(CSV_FILE_PATH, exist_ok=True)\n", - " os.system(f\"tar -zxvf {TAR_FILE_PATH}{TAR_FILE_NAME} -C {CSV_FILE_PATH}\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Create Dataframe from Lakehouse\n", - "\n", - "The `display` function prints the dataframe and automatically gives chart views." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "df = (\n", - " spark.read.format(\"csv\")\n", - " .option(\"header\", \"true\")\n", - " .load(\"Files/NYC_Property_Sales_Dataset/csv\")\n", - ")\n", - "display(df)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Step 2: Data Preprocessing" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Type Conversion and Filtering \n", - "Let us do some necessary type conversion and filtering. \n", - "- Need to convert sale prices to integers. \n", - "- Need to exclude irregular sales data. For example, a $0 sale indicates ownership transfer without cash consideration. \n", - "- Exclude building types other than A class. \n", - "\n", - "The reason to choose only market of A class building for analysis is that seasonal effect cannot be ignored for A class building. The model we will use outperforms many others in including seasonality, which is a quite common need in time series analysis." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# import libs\n", - "import pyspark.sql.functions as F\n", - "from pyspark.sql.types import *" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "df = df.withColumn(\n", - " \"sale_price\", F.regexp_replace(\"sale_price\", \"[$,]\", \"\").cast(IntegerType())\n", - ")\n", - "df = df.select(\"*\").where(\n", - " 'sale_price > 0 and total_units > 0 and gross_square_feet > 0 and building_class_at_time_of_sale like \"A%\"'\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "monthly_sale_df = df.select(\n", - " \"sale_price\",\n", - " \"total_units\",\n", - " \"gross_square_feet\",\n", - " F.date_format(\"sale_date\", \"yyyy-MM\").alias(\"month\"),\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "display(df)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "summary_df = (\n", - " monthly_sale_df.groupBy(\"month\")\n", - " .agg(\n", - " F.sum(\"sale_price\").alias(\"total_sales\"),\n", - " F.sum(\"total_units\").alias(\"units\"),\n", - " F.sum(\"gross_square_feet\").alias(\"square_feet\"),\n", - " )\n", - " .orderBy(\"month\")\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "display(summary_df)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Visualization\n", - "Now, let us look at the trend of property trade in NYC. The yearly seasonality is quite clear in the chosen building class. The peek buying seasons are usually spring and fall." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "df_pandas = summary_df.toPandas()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import matplotlib.pyplot as plt\n", - "import seaborn as sns\n", - "import numpy as np\n", - "\n", - "f, (ax1, ax2) = plt.subplots(2, 1, figsize=(35, 10))\n", - "plt.sca(ax1)\n", - "plt.xticks(np.arange(0, 15 * 12, step=12))\n", - "plt.ticklabel_format(style=\"plain\", axis=\"y\")\n", - "sns.lineplot(x=\"month\", y=\"total_sales\", data=df_pandas)\n", - "plt.ylabel(\"Total Sales\")\n", - "plt.xlabel(\"Time\")\n", - "plt.title(\"Total Property Sales by Month\")\n", - "\n", - "plt.sca(ax2)\n", - "plt.xticks(np.arange(0, 15 * 12, step=12))\n", - "plt.ticklabel_format(style=\"plain\", axis=\"y\")\n", - "sns.lineplot(x=\"month\", y=\"square_feet\", data=df_pandas)\n", - "plt.ylabel(\"Total Square Feet\")\n", - "plt.xlabel(\"Time\")\n", - "plt.title(\"Total Property Square Feet Sold by Month\")\n", - "plt.show()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Step 3: Model Training and Evaluation" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Model Fitting\n", - "\n", - "To do model fitting, we just need to rename the time axis to 'ds' and value axis to 'y'" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import pandas as pd\n", - "\n", - "df_pandas[\"ds\"] = pd.to_datetime(df_pandas[\"month\"])\n", - "df_pandas[\"y\"] = df_pandas[\"total_sales\"]" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Now let us fit the model. We will choose to use 'multiplicative' seasonality, it means seasonality is no longer a constant additive factor like default assumed by Prophet. As you can see in a previous cell, we printed out the total property sale data per month, and the vibration amplitude is not consistent. It means using simple additive seasonality will not fit the data well. In addition, we will use Markov Chain Monte Carlo (MCMC) that gives mean of posteriori distribution. By default, Prophet uses Stan's L-BFGS to fit the model, which finds maximum a posteriori probability (MAP) estimate." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from prophet import Prophet\n", - "from prophet.plot import add_changepoints_to_plot\n", - "\n", - "m = Prophet(\n", - " seasonality_mode=\"multiplicative\", weekly_seasonality=False, mcmc_samples=1000\n", - ")\n", - "m.fit(df_pandas)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Let us use built-in functions in Prophet to show the model fitting results. The black dots are data points used to train the model. The blue line is the prediction, and the light blue area shows uncertainty intervals. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "future = m.make_future_dataframe(periods=12, freq=\"M\")\n", - "forecast = m.predict(future)\n", - "fig = m.plot(forecast)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The Prophet assumes piece-wise constant growth, thus you can plot the change points of the trained model " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "fig = m.plot(forecast)\n", - "a = add_changepoints_to_plot(fig.gca(), m, forecast)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Visualize trend and yearly seasonality. The light blue area reflects uncertainty." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "fig2 = m.plot_components(forecast)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Cross Validation" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We can use Prophet's built-in cross validation functionality to measure the forecast error on historical data. The below parameters mean starting with 11 years of training data, then making predictions every 30 days within 1 year horizon." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from prophet.diagnostics import cross_validation\n", - "from prophet.diagnostics import performance_metrics\n", - "\n", - "df_cv = cross_validation(m, initial=\"11 Y\", period=\"30 days\", horizon=\"365 days\")\n", - "df_p = performance_metrics(df_cv, monthly=True)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "display(df_p)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Step 4: Log and Load Model with MLFlow\n", - "We can now store the trained model for later use." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# setup mlflow\n", - "import mlflow\n", - "\n", - "EXPERIMENT_NAME = \"aisample-timeseries\"\n", - "\n", - "mlflow.set_experiment(EXPERIMENT_NAME)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# log the model and parameters\n", - "model_name = f\"{EXPERIMENT_NAME}-prophet\"\n", - "with mlflow.start_run() as run:\n", - " mlflow.prophet.log_model(m, model_name, registered_model_name=model_name)\n", - " mlflow.log_params({\"seasonality_mode\": \"multiplicative\", \"mcmc_samples\": 1000})\n", - " model_uri = f\"runs:/{run.info.run_id}/{model_name}\"\n", - " print(\"Model saved in run %s\" % run.info.run_id)\n", - " print(f\"Model URI: {model_uri}\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# load the model back\n", - "loaded_model = mlflow.prophet.load_model(model_uri)" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3.8.13 ('gpu_env')", - "language": "python", - "name": "python3" - }, - "language_info": { - "name": "python", - "version": "3.8.13" - }, - "orig_nbformat": 4, - "vscode": { - "interpreter": { - "hash": "8cebba326b76ca708172f0a6a24a89689a3b64f83dbd9353b827f2f4b33d3f80" - } - } - }, - "nbformat": 4, - "nbformat_minor": 0 -} diff --git a/notebooks/community/aisamples/AIsample - Title Genre Classification.ipynb b/notebooks/community/aisamples/AIsample - Title Genre Classification.ipynb deleted file mode 100644 index 8ffc7ccef8..0000000000 --- a/notebooks/community/aisamples/AIsample - Title Genre Classification.ipynb +++ /dev/null @@ -1,929 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "# Training and Evaluating a Text Classification Model" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Introduction \n", - "\n", - "In this notebook, we'll demonstrate how to solve a text classification task with word2vec + linear-regression model on Spark.\n", - "\n", - "The sample dataset we used here consists of metadata relating to books digitised by the British Library in partnership with Microsoft. It includes human generated labels for whether a book is 'fiction' or 'non-fiction'. We use this dataset to train a model for genre classification which predict whether a book is 'fiction' or 'non-fiction' based on its title.\n", - "\n", - "|BL record ID|Type of resource|Name|Dates associated with name|Type of name|Role|All names|Title|Variant titles|Series title|Number within series|Country of publication|Place of publication|Publisher|Date of publication|Edition|Physical description|Dewey classification|BL shelfmark|Topics|Genre|Languages|Notes|BL record ID for physical resource|classification_id|user_id|created_at|subject_ids|annotator_date_pub|annotator_normalised_date_pub|annotator_edition_statement|annotator_genre|annotator_FAST_genre_terms|annotator_FAST_subject_terms|annotator_comments|annotator_main_language|annotator_other_languages_summaries|annotator_summaries_language|annotator_translation|annotator_original_language|annotator_publisher|annotator_place_pub|annotator_country|annotator_title|Link to digitised book|annotated|\n", - "|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|\n", - "|014602826|Monograph|Yearsley, Ann|1753-1806|person||More, Hannah, 1745-1833 [person] ; Yearsley, Ann, 1753-1806 [person]|Poems on several occasions [With a prefatory letter by Hannah More.]||||England|London||1786|Fourth edition MANUSCRIPT note|||Digital Store 11644.d.32|||English||003996603||||||||||||||||||||||False|\n", - "|014602830|Monograph|A, T.||person||Oldham, John, 1653-1683 [person] ; A, T. [person]|A Satyr against Vertue. (A poem: supposed to be spoken by a Town-Hector [By John Oldham. The preface signed: T. A.])||||England|London||1679||15 pages (4°)||Digital Store 11602.ee.10. (2.)|||English||000001143||||||||||||||||||||||False|\n" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "## Step 1: Load the Data" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Notebook Configurations" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Install libraries\n", - "In this notebook, we'll use `wordcloud` which first needs to be installed. The PySpark kernel will be restarted after `%pip install`, thus we need to install it before we run any other cells." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# install wordcloud for text visualization\n", - "%pip install wordcloud" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "**By defining below parameters, we can apply this notebook on different datasets easily.**" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "advisor": { - "adviceMetadata": "{\"artifactId\":\"2869283e-cc68-4bec-a46e-95bf731d931f\",\"activityId\":\"aaa495e9-53ed-4e64-85eb-900ab7d8eff4\",\"applicationId\":\"application_1657773129532_0001\",\"jobGroupId\":\"33\",\"advices\":{}}" - }, - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "IS_CUSTOM_DATA = False # if True, dataset has to be uploaded manually by user\n", - "DATA_FOLDER = \"Files/title-genre-classification\"\n", - "DATA_FILE = \"blbooksgenre.csv\"\n", - "\n", - "# data schema\n", - "TEXT_COL = \"Title\"\n", - "LABEL_COL = \"annotator_genre\"\n", - "LABELS = [\"Fiction\", \"Non-fiction\"]\n", - "\n", - "EXPERIMENT_NAME = \"aisample-textclassification\" # mlflow experiment name" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "We also define some hyper-parameters for model training (*DON'T modify these parameters unless you are aware of the meaning*)." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "advisor": { - "adviceMetadata": "{\"artifactId\":\"2869283e-cc68-4bec-a46e-95bf731d931f\",\"activityId\":\"aaa495e9-53ed-4e64-85eb-900ab7d8eff4\",\"applicationId\":\"application_1657773129532_0001\",\"jobGroupId\":\"34\",\"advices\":{}}" - }, - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# hyper-params\n", - "word2vec_size = 128\n", - "min_word_count = 3\n", - "max_iter = 10\n", - "k_folds = 3" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Import dependencies" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import numpy as np\n", - "from itertools import chain\n", - "\n", - "from wordcloud import WordCloud\n", - "import matplotlib.pyplot as plt\n", - "import seaborn as sns\n", - "\n", - "import pyspark.sql.functions as F\n", - "\n", - "from pyspark.ml import Pipeline\n", - "from pyspark.ml.feature import *\n", - "from pyspark.ml.tuning import CrossValidator, ParamGridBuilder\n", - "from pyspark.ml.classification import LogisticRegression\n", - "from pyspark.ml.evaluation import (\n", - " BinaryClassificationEvaluator,\n", - " MulticlassClassificationEvaluator,\n", - ")\n", - "\n", - "from synapse.ml.stages import ClassBalancer\n", - "from synapse.ml.train import ComputeModelStatistics\n", - "\n", - "import mlflow" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Download dataset and upload to lakehouse\n", - "\n", - "**Please add a lakehouse to the notebook before running it.**" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "advisor": { - "adviceMetadata": "{\"artifactId\":\"2869283e-cc68-4bec-a46e-95bf731d931f\",\"activityId\":\"aaa495e9-53ed-4e64-85eb-900ab7d8eff4\",\"applicationId\":\"application_1657773129532_0001\",\"jobGroupId\":\"35\",\"advices\":{}}" - }, - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "if not IS_CUSTOM_DATA:\n", - " # Download demo data files into lakehouse if not exist\n", - " import os, requests\n", - "\n", - " remote_url = \"https://synapseaisolutionsa.blob.core.windows.net/public/Title_Genre_Classification\"\n", - " fname = \"blbooksgenre.csv\"\n", - " download_path = f\"/lakehouse/default/{DATA_FOLDER}/raw\"\n", - "\n", - " if not os.path.exists(\"/lakehouse/default\"):\n", - " raise FileNotFoundError(\n", - " \"Default lakehouse not found, please add a lakehouse and restart the session.\"\n", - " )\n", - " os.makedirs(download_path, exist_ok=True)\n", - " if not os.path.exists(f\"{download_path}/{fname}\"):\n", - " r = requests.get(f\"{remote_url}/{fname}\", timeout=30)\n", - " with open(f\"{download_path}/{fname}\", \"wb\") as f:\n", - " f.write(r.content)\n", - " print(\"Downloaded demo data files into lakehouse.\")" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Read data from lakehouse" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "advisor": { - "adviceMetadata": "{\"artifactId\":\"2869283e-cc68-4bec-a46e-95bf731d931f\",\"activityId\":\"aaa495e9-53ed-4e64-85eb-900ab7d8eff4\",\"applicationId\":\"application_1657773129532_0001\",\"jobGroupId\":\"36\",\"advices\":{}}" - }, - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "raw_df = spark.read.csv(f\"{DATA_FOLDER}/raw/{DATA_FILE}\", header=True, inferSchema=True)\n", - "\n", - "display(raw_df.limit(20))" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "## Step 2: Data Preprocess" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Data clean" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "advisor": { - "adviceMetadata": "{\"artifactId\":\"2869283e-cc68-4bec-a46e-95bf731d931f\",\"activityId\":\"aaa495e9-53ed-4e64-85eb-900ab7d8eff4\",\"applicationId\":\"application_1657773129532_0001\",\"jobGroupId\":\"37\",\"advices\":{}}" - }, - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "df = (\n", - " raw_df.select([TEXT_COL, LABEL_COL])\n", - " .where(F.col(LABEL_COL).isin(LABELS))\n", - " .dropDuplicates([TEXT_COL])\n", - " .cache()\n", - ")\n", - "\n", - "display(df.limit(20))" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Deal with unblanced data" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "advisor": { - "adviceMetadata": "{\"artifactId\":\"2869283e-cc68-4bec-a46e-95bf731d931f\",\"activityId\":\"aaa495e9-53ed-4e64-85eb-900ab7d8eff4\",\"applicationId\":\"application_1657773129532_0001\",\"jobGroupId\":\"38\",\"advices\":{}}" - }, - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "cb = ClassBalancer().setInputCol(LABEL_COL)\n", - "\n", - "df = cb.fit(df).transform(df)\n", - "display(df.limit(20))" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Tokenize" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "advisor": { - "adviceMetadata": "{\"artifactId\":\"2869283e-cc68-4bec-a46e-95bf731d931f\",\"activityId\":\"aaa495e9-53ed-4e64-85eb-900ab7d8eff4\",\"applicationId\":\"application_1657773129532_0001\",\"jobGroupId\":\"39\",\"advices\":{}}" - }, - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "## text transformer\n", - "tokenizer = Tokenizer(inputCol=TEXT_COL, outputCol=\"tokens\")\n", - "stopwords_remover = StopWordsRemover(inputCol=\"tokens\", outputCol=\"filtered_tokens\")\n", - "\n", - "## build the pipeline\n", - "pipeline = Pipeline(stages=[tokenizer, stopwords_remover])\n", - "\n", - "token_df = pipeline.fit(df).transform(df)\n", - "\n", - "display(token_df.limit(20))" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Visualization\n", - "\n", - "Display wordcloud for each class" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "advisor": { - "adviceMetadata": "{\"artifactId\":\"2869283e-cc68-4bec-a46e-95bf731d931f\",\"activityId\":\"aaa495e9-53ed-4e64-85eb-900ab7d8eff4\",\"applicationId\":\"application_1657773129532_0001\",\"jobGroupId\":\"40\",\"advices\":{}}" - }, - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# WordCloud\n", - "for label in LABELS:\n", - " tokens = (\n", - " token_df.where(F.col(LABEL_COL) == label)\n", - " .select(F.explode(\"filtered_tokens\").alias(\"token\"))\n", - " .where(F.col(\"token\").rlike(r\"^\\w+$\"))\n", - " )\n", - "\n", - " top50_tokens = (\n", - " tokens.groupBy(\"token\").count().orderBy(F.desc(\"count\")).limit(50).collect()\n", - " )\n", - "\n", - " # Generate a word cloud image\n", - " wordcloud = WordCloud(\n", - " scale=10,\n", - " background_color=\"white\",\n", - " random_state=42, # Make sure the output is always the same for the same input\n", - " ).generate_from_frequencies(dict(top50_tokens))\n", - "\n", - " # Display the generated image the matplotlib way:\n", - " plt.figure(figsize=(10, 10))\n", - " plt.title(label, fontsize=20)\n", - " plt.axis(\"off\")\n", - " plt.imshow(wordcloud, interpolation=\"bilinear\")" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Vectorize\n", - "\n", - "We use word2vec to vectorize text" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "advisor": { - "adviceMetadata": "{\"artifactId\":\"2869283e-cc68-4bec-a46e-95bf731d931f\",\"activityId\":\"aaa495e9-53ed-4e64-85eb-900ab7d8eff4\",\"applicationId\":\"application_1657773129532_0001\",\"jobGroupId\":\"41\",\"advices\":{}}" - }, - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "## label transformer\n", - "label_indexer = StringIndexer(inputCol=LABEL_COL, outputCol=\"labelIdx\")\n", - "vectorizer = Word2Vec(\n", - " vectorSize=word2vec_size,\n", - " minCount=min_word_count,\n", - " inputCol=\"filtered_tokens\",\n", - " outputCol=\"features\",\n", - ")\n", - "\n", - "## build the pipeline\n", - "pipeline = Pipeline(stages=[label_indexer, vectorizer])\n", - "vec_df = (\n", - " pipeline.fit(token_df)\n", - " .transform(token_df)\n", - " .select([TEXT_COL, LABEL_COL, \"features\", \"labelIdx\", \"weight\"])\n", - ")\n", - "\n", - "display(vec_df.limit(20))" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "## Step 3: Model Training and Evaluation\n", - "\n", - "We have cleaned the dataset, dealt with unbalanced data, tokenized the text, displayed word cloud and vectorized the text.\n", - "\n", - "Next, we'll train a linear regression model to classify the vectorized text." - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Split dataset into train and test" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "advisor": { - "adviceMetadata": "{\"artifactId\":\"2869283e-cc68-4bec-a46e-95bf731d931f\",\"activityId\":\"aaa495e9-53ed-4e64-85eb-900ab7d8eff4\",\"applicationId\":\"application_1657773129532_0001\",\"jobGroupId\":\"42\",\"advices\":{}}" - }, - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "(train_df, test_df) = vec_df.randomSplit((0.8, 0.2), seed=42)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Create the model" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "advisor": { - "adviceMetadata": "{\"artifactId\":\"2869283e-cc68-4bec-a46e-95bf731d931f\",\"activityId\":\"aaa495e9-53ed-4e64-85eb-900ab7d8eff4\",\"applicationId\":\"application_1657773129532_0001\",\"jobGroupId\":\"43\",\"advices\":{}}" - }, - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "lr = (\n", - " LogisticRegression()\n", - " .setMaxIter(max_iter)\n", - " .setFeaturesCol(\"features\")\n", - " .setLabelCol(\"labelIdx\")\n", - " .setWeightCol(\"weight\")\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Train model with cross validation" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "advisor": { - "adviceMetadata": "{\"artifactId\":\"2869283e-cc68-4bec-a46e-95bf731d931f\",\"activityId\":\"aaa495e9-53ed-4e64-85eb-900ab7d8eff4\",\"applicationId\":\"application_1657773129532_0001\",\"jobGroupId\":\"44\",\"advices\":{}}" - }, - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "param_grid = (\n", - " ParamGridBuilder()\n", - " .addGrid(lr.regParam, [0.03, 0.1, 0.3])\n", - " .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2])\n", - " .build()\n", - ")\n", - "\n", - "if len(LABELS) > 2:\n", - " evaluator_cls = MulticlassClassificationEvaluator\n", - " evaluator_metrics = [\"f1\", \"accuracy\"]\n", - "else:\n", - " evaluator_cls = BinaryClassificationEvaluator\n", - " evaluator_metrics = [\"areaUnderROC\", \"areaUnderPR\"]\n", - "evaluator = evaluator_cls(labelCol=\"labelIdx\", weightCol=\"weight\")\n", - "\n", - "crossval = CrossValidator(\n", - " estimator=lr, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=k_folds\n", - ")\n", - "\n", - "model = crossval.fit(train_df)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Evaluate the model" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "advisor": { - "adviceMetadata": "{\"artifactId\":\"2869283e-cc68-4bec-a46e-95bf731d931f\",\"activityId\":\"aaa495e9-53ed-4e64-85eb-900ab7d8eff4\",\"applicationId\":\"application_1657773129532_0001\",\"jobGroupId\":\"46\",\"advices\":{}}" - }, - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "predictions = model.transform(test_df)\n", - "\n", - "display(predictions)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "advisor": { - "adviceMetadata": "{\"artifactId\":\"2869283e-cc68-4bec-a46e-95bf731d931f\",\"activityId\":\"aaa495e9-53ed-4e64-85eb-900ab7d8eff4\",\"applicationId\":\"application_1657773129532_0001\",\"jobGroupId\":\"47\",\"advices\":{}}" - }, - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "log_metrics = {}\n", - "for metric in evaluator_metrics:\n", - " value = evaluator.evaluate(predictions, {evaluator.metricName: metric})\n", - " log_metrics[metric] = value\n", - " print(f\"{metric}: {value:.4f}\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "advisor": { - "adviceMetadata": "{\"artifactId\":\"2869283e-cc68-4bec-a46e-95bf731d931f\",\"activityId\":\"aaa495e9-53ed-4e64-85eb-900ab7d8eff4\",\"applicationId\":\"application_1657773129532_0001\",\"jobGroupId\":\"48\",\"advices\":{}}" - }, - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "metrics = ComputeModelStatistics(\n", - " evaluationMetric=\"classification\", labelCol=\"labelIdx\", scoredLabelsCol=\"prediction\"\n", - ").transform(predictions)\n", - "display(metrics)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "advisor": { - "adviceMetadata": "{\"artifactId\":\"2869283e-cc68-4bec-a46e-95bf731d931f\",\"activityId\":\"aaa495e9-53ed-4e64-85eb-900ab7d8eff4\",\"applicationId\":\"application_1657773129532_0001\",\"jobGroupId\":\"49\",\"advices\":{}}" - }, - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "# collect confusion matrix value\n", - "cm = metrics.select(\"confusion_matrix\").collect()[0][0].toArray()\n", - "print(cm)\n", - "\n", - "# plot confusion matrix\n", - "sns.set(rc={\"figure.figsize\": (6, 4.5)})\n", - "ax = sns.heatmap(cm, annot=True, fmt=\".20g\")\n", - "ax.set_title(\"Confusion Matrix\")\n", - "ax.set_xlabel(\"Predicted label\")\n", - "ax.set_ylabel(\"True label\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Log and Load Model with MLFlow\n", - "Now we get a pretty good model, we can save it for later use. Here we use mlflow to log metrics/models, and load models back for prediction." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# setup mlflow\n", - "mlflow.set_experiment(EXPERIMENT_NAME)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# log model, metrics and params\n", - "with mlflow.start_run() as run:\n", - " print(\"log model:\")\n", - " mlflow.spark.log_model(\n", - " model,\n", - " f\"{EXPERIMENT_NAME}-lrmodel\",\n", - " registered_model_name=f\"{EXPERIMENT_NAME}-lrmodel\",\n", - " dfs_tmpdir=\"Files/spark\",\n", - " )\n", - "\n", - " print(\"log metrics:\")\n", - " mlflow.log_metrics(log_metrics)\n", - "\n", - " print(\"log parameters:\")\n", - " mlflow.log_params(\n", - " {\n", - " \"word2vec_size\": word2vec_size,\n", - " \"min_word_count\": min_word_count,\n", - " \"max_iter\": max_iter,\n", - " \"k_folds\": k_folds,\n", - " \"DATA_FILE\": DATA_FILE,\n", - " }\n", - " )\n", - "\n", - " model_uri = f\"runs:/{run.info.run_id}/{EXPERIMENT_NAME}-lrmodel\"\n", - " print(\"Model saved in run %s\" % run.info.run_id)\n", - " print(f\"Model URI: {model_uri}\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# load model back\n", - "loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir=\"Files/spark\")\n", - "\n", - "# verify loaded model\n", - "predictions = loaded_model.transform(test_df)\n", - "display(predictions)" - ] - } - ], - "metadata": { - "kernel_info": { - "name": "synapse_pyspark" - }, - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.13" - }, - "notebook_environment": {}, - "save_output": true, - "spark_compute": { - "compute_id": "/trident/default", - "session_options": { - "conf": { - "spark.dynamicAllocation.enabled": "false", - "spark.dynamicAllocation.maxExecutors": "2", - "spark.dynamicAllocation.minExecutors": "2", - "spark.livy.synapse.ipythonInterpreter.enabled": "true" - }, - "driverCores": 8, - "driverMemory": "56g", - "enableDebugMode": false, - "executorCores": 8, - "executorMemory": "56g", - "keepAliveTimeout": 30, - "numExecutors": 5 - } - }, - "synapse_widget": { - "state": {}, - "version": "0.1" - }, - "trident": { - "lakehouse": { - "default_lakehouse": "", - "known_lakehouses": [ - { - "id": "" - } - ] - } - }, - "vscode": { - "interpreter": { - "hash": "8cebba326b76ca708172f0a6a24a89689a3b64f83dbd9353b827f2f4b33d3f80" - } - } - }, - "nbformat": 4, - "nbformat_minor": 1 -} diff --git a/notebooks/community/aisamples/AIsample - Uplift Modelling.ipynb b/notebooks/community/aisamples/AIsample - Uplift Modelling.ipynb deleted file mode 100644 index f160694300..0000000000 --- a/notebooks/community/aisamples/AIsample - Uplift Modelling.ipynb +++ /dev/null @@ -1,914 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Creating, Training and Evaluating Uplift Models" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "## Introduction\n", - "\n", - "In this notebook, we'll demonstrate how to create, train and evaluate uplift models and apply uplift modelling technique.\n", - "\n", - "- What is uplift modelling?\n", - "\n", - " It is a family of causal inference technology that uses machine learning models to estimate the causal impact of some treatment on an individual's behaviour.\n", - "\n", - " - **Persuadables** will only respond positive to the treatment\n", - " - **Sleeping-dogs** have a strong negative response to the treatment\n", - " - **Lost Causes** will never reach the outcome even with the treatment\n", - " - **Sure Things** will always reach the outcome with or without the treatment\n", - "\n", - " The goal of uplift modelling is to identify the \"persuadables\", not waste efforts on \"sure things\" and \"lost causes\", and avoid bothering \"sleeping dogs\"\n", - "\n", - "- How does uplift modelling work?\n", - " - **Meta Learner**: predicts the difference between an individual's behaviour when there is a treatment and when there is no treatment\n", - "\n", - " - **Uplift Tree**: a tree-based algorithm where the splitting criterion is based on differences in uplift\n", - "\n", - " - **NN-based Model**:a neural network model that usually works with observational data\n", - "\n", - "- Where can uplift modelling work?\n", - " - Marketing: help to identify persuadables to apply a treatment such as a coupon or an online advertisement\n", - " - Medical Treatment: help to understand how a treatment can impact certain groups differently\n", - " \n" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "## Step 1: Load the Data" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Notebook Configurations\n", - "\n", - "By defining below parameters, we can apply this notebook on different datasets easily." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "IS_CUSTOM_DATA = False # if True, dataset has to be uploaded manually by user\n", - "DATA_FOLDER = \"Files/uplift-modelling\"\n", - "DATA_FILE = \"criteo-research-uplift-v2.1.csv\"\n", - "\n", - "# data schema\n", - "FEATURE_COLUMNS = [f\"f{i}\" for i in range(12)]\n", - "TREATMENT_COLUMN = \"treatment\"\n", - "LABEL_COLUMN = \"visit\"\n", - "\n", - "EXPERIMENT_NAME = \"aisample-upliftmodelling\" # mlflow experiment name" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Import dependencies" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import pyspark.sql.functions as F\n", - "from pyspark.sql.window import Window\n", - "from pyspark.sql.types import *\n", - "\n", - "import numpy as np\n", - "import pandas as pd\n", - "\n", - "import matplotlib as mpl\n", - "import matplotlib.pyplot as plt\n", - "import matplotlib.style as style\n", - "import seaborn as sns\n", - "\n", - "%matplotlib inline\n", - "\n", - "\n", - "from synapse.ml.featurize import Featurize\n", - "from synapse.ml.core.spark import FluentAPI\n", - "from synapse.ml.lightgbm import *\n", - "from synapse.ml.train import ComputeModelStatistics\n", - "\n", - "import os\n", - "import gzip\n", - "\n", - "import mlflow" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Download dataset and upload to lakehouse\n", - "\n", - "**Please add a lakehouse to the notebook before running it.**" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "- Dataset description: This dataset was created by The Criteo AI Lab. The dataset consists of 13M rows, each one representing a user with 12 features, a treatment indicator and 2 binary labels (visits and conversions).\n", - " - f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11: feature values (dense, float)\n", - " - treatment: treatment group (1 = treated, 0 = control) which indicates if a customer was targeted by advertising randomly\n", - " - conversion: whether a conversion occurred for this user (binary, label)\n", - " - visit: whether a visit occurred for this user (binary, label)\n", - "\n", - "- Dataset homepage: https://ailab.criteo.com/criteo-uplift-prediction-dataset/\n", - "\n", - "- Citation:\n", - " ```\n", - " @inproceedings{Diemert2018,\n", - " author = {{Diemert Eustache, Betlei Artem} and Renaudin, Christophe and Massih-Reza, Amini},\n", - " title={A Large Scale Benchmark for Uplift Modeling},\n", - " publisher = {ACM},\n", - " booktitle = {Proceedings of the AdKDD and TargetAd Workshop, KDD, London,United Kingdom, August, 20, 2018},\n", - " year = {2018}\n", - " }\n", - " ```" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "if not IS_CUSTOM_DATA:\n", - " # Download demo data files into lakehouse if not exist\n", - " import os, requests\n", - "\n", - " remote_url = \"http://go.criteo.net/criteo-research-uplift-v2.1.csv.gz\"\n", - " download_file = \"criteo-research-uplift-v2.1.csv.gz\"\n", - " download_path = f\"/lakehouse/default/{DATA_FOLDER}/raw\"\n", - "\n", - " if not os.path.exists(\"/lakehouse/default\"):\n", - " raise FileNotFoundError(\n", - " \"Default lakehouse not found, please add a lakehouse and restart the session.\"\n", - " )\n", - " os.makedirs(download_path, exist_ok=True)\n", - " if not os.path.exists(f\"{download_path}/{DATA_FILE}\"):\n", - " r = requests.get(f\"{remote_url}\", timeout=30)\n", - " with open(f\"{download_path}/{download_file}\", \"wb\") as f:\n", - " f.write(r.content)\n", - " with gzip.open(f\"{download_path}/{download_file}\", \"rb\") as fin:\n", - " with open(f\"{download_path}/{DATA_FILE}\", \"wb\") as fout:\n", - " fout.write(fin.read())\n", - " print(\"Downloaded demo data files into lakehouse.\")" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Read data from lakehouse" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "raw_df = spark.read.csv(\n", - " f\"{DATA_FOLDER}/raw/{DATA_FILE}\", header=True, inferSchema=True\n", - ").cache()\n", - "\n", - "display(raw_df.limit(20))" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "## Step 2: Prepare the Dataset" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Data exploration\n", - "\n", - "- **The overall rate of users that visit/convert**" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "advisor": { - "adviceMetadata": "{\"artifactId\":\"cbdb21d2-1420-495e-a169-c60f8314af7f\",\"activityId\":\"3806e4c7-a95e-44fb-b0e8-de2a0203b554\",\"applicationId\":\"application_1661918651252_0001\",\"jobGroupId\":\"9\",\"advices\":{\"warn\":1}}" - }, - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "raw_df.select(\n", - " F.mean(\"visit\").alias(\"Percentage of users that visit\"),\n", - " F.mean(\"conversion\").alias(\"Percentage of users that convert\"),\n", - " (F.sum(\"conversion\") / F.sum(\"visit\")).alias(\"Percentage of visitors that convert\"),\n", - ").show()" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "- **The overall average treatment effect on visit**" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "advisor": { - "adviceMetadata": "{\"artifactId\":\"cbdb21d2-1420-495e-a169-c60f8314af7f\",\"activityId\":\"3806e4c7-a95e-44fb-b0e8-de2a0203b554\",\"applicationId\":\"application_1661918651252_0001\",\"jobGroupId\":\"10\",\"advices\":{\"warn\":1}}" - }, - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "raw_df.groupby(\"treatment\").agg(\n", - " F.mean(\"visit\").alias(\"Mean of visit\"),\n", - " F.sum(\"visit\").alias(\"Sum of visit\"),\n", - " F.count(\"visit\").alias(\"Count\"),\n", - ").show()" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "- **The overall average treatment effect on conversion**" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "advisor": { - "adviceMetadata": "{\"artifactId\":\"cbdb21d2-1420-495e-a169-c60f8314af7f\",\"activityId\":\"3806e4c7-a95e-44fb-b0e8-de2a0203b554\",\"applicationId\":\"application_1661918651252_0001\",\"jobGroupId\":\"11\",\"advices\":{\"warn\":1}}" - }, - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "raw_df.groupby(\"treatment\").agg(\n", - " F.mean(\"conversion\").alias(\"Mean of conversion\"),\n", - " F.sum(\"conversion\").alias(\"Sum of conversion\"),\n", - " F.count(\"conversion\").alias(\"Count\"),\n", - ").show()" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Split train-test dataset" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "transformer = (\n", - " Featurize().setOutputCol(\"features\").setInputCols(FEATURE_COLUMNS).fit(raw_df)\n", - ")\n", - "\n", - "df = transformer.transform(raw_df)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)\n", - "\n", - "print(\"Size of train dataset: %d\" % train_df.count())\n", - "print(\"Size of test dataset: %d\" % test_df.count())\n", - "\n", - "train_df.groupby(TREATMENT_COLUMN).count().show()" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Split treatment-control dataset" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "treatment_train_df = train_df.where(f\"{TREATMENT_COLUMN} > 0\")\n", - "control_train_df = train_df.where(f\"{TREATMENT_COLUMN} = 0\")" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "## Step 3: Model Training and Evaluation" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Uplift Modelling: T-Learner with LightGBM" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "classifier = (\n", - " LightGBMClassifier()\n", - " .setFeaturesCol(\"features\")\n", - " .setNumLeaves(10)\n", - " .setNumIterations(100)\n", - " .setObjective(\"binary\")\n", - " .setLabelCol(LABEL_COLUMN)\n", - ")\n", - "\n", - "treatment_model = classifier.fit(treatment_train_df)\n", - "control_model = classifier.fit(control_train_df)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Predict on test dataset" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "getPred = F.udf(lambda v: float(v[1]), FloatType())\n", - "\n", - "test_pred_df = (\n", - " test_df.mlTransform(treatment_model)\n", - " .withColumn(\"treatment_pred\", getPred(\"probability\"))\n", - " .drop(\"rawPrediction\", \"probability\", \"prediction\")\n", - " .mlTransform(control_model)\n", - " .withColumn(\"control_pred\", getPred(\"probability\"))\n", - " .drop(\"rawPrediction\", \"probability\", \"prediction\")\n", - " .withColumn(\"pred_uplift\", F.col(\"treatment_pred\") - F.col(\"control_pred\"))\n", - " .select(\n", - " TREATMENT_COLUMN, LABEL_COLUMN, \"treatment_pred\", \"control_pred\", \"pred_uplift\"\n", - " )\n", - " .cache()\n", - ")\n", - "\n", - "display(test_pred_df.limit(20))" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "### Model evaluation\n", - "\n", - "Since actual uplift cannot be observed for each individual, we measure the uplift over a group of customers.\n", - "\n", - "- **Uplift Curve**: plots the real cumulative uplift across the population\n", - "\n", - "First, we rank the test dataframe order by the predict uplift." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "test_ranked_df = test_pred_df.withColumn(\n", - " \"percent_rank\", F.percent_rank().over(Window.orderBy(F.desc(\"pred_uplift\")))\n", - ")\n", - "\n", - "display(test_ranked_df.limit(20))" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Next, we calculate the cumulative percentage of visits in each group (treatment or control)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "C = test_ranked_df.where(f\"{TREATMENT_COLUMN} == 0\").count()\n", - "T = test_ranked_df.where(f\"{TREATMENT_COLUMN} != 0\").count()\n", - "\n", - "test_ranked_df = (\n", - " test_ranked_df.withColumn(\n", - " \"control_label\",\n", - " F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),\n", - " )\n", - " .withColumn(\n", - " \"treatment_label\",\n", - " F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),\n", - " )\n", - " .withColumn(\n", - " \"control_cumsum\",\n", - " F.sum(\"control_label\").over(Window.orderBy(\"percent_rank\")) / C,\n", - " )\n", - " .withColumn(\n", - " \"treatment_cumsum\",\n", - " F.sum(\"treatment_label\").over(Window.orderBy(\"percent_rank\")) / T,\n", - " )\n", - ")\n", - "\n", - "display(test_ranked_df.limit(20))" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Finally, we calculate the group's uplift at each percentage" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "test_ranked_df = test_ranked_df.withColumn(\n", - " \"group_uplift\", F.col(\"treatment_cumsum\") - F.col(\"control_cumsum\")\n", - ").cache()\n", - "\n", - "display(test_ranked_df.limit(20))" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "Now we can plot the uplift curve on the prediction of the test dataset. We need to convert the pyspark dataframe to pandas dataframe before plotting." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "def uplift_plot(uplift_df):\n", - " \"\"\"\n", - " Plot the uplift curve\n", - " \"\"\"\n", - " gain_x = uplift_df.percent_rank\n", - " gain_y = uplift_df.group_uplift\n", - " # plot the data\n", - " plt.figure(figsize=(10, 6))\n", - " mpl.rcParams[\"font.size\"] = 8\n", - "\n", - " ax = plt.plot(gain_x, gain_y, color=\"#2077B4\", label=\"Normalized Uplift Model\")\n", - "\n", - " plt.plot(\n", - " [0, gain_x.max()],\n", - " [0, gain_y.max()],\n", - " \"--\",\n", - " color=\"tab:orange\",\n", - " label=\"Random Treatment\",\n", - " )\n", - " plt.legend()\n", - " plt.xlabel(\"Porportion Targeted\")\n", - " plt.ylabel(\"Uplift\")\n", - " plt.grid(b=True, which=\"major\")\n", - "\n", - " return ax\n", - "\n", - "\n", - "test_ranked_pd_df = test_ranked_df.select(\n", - " [\"pred_uplift\", \"percent_rank\", \"group_uplift\"]\n", - ").toPandas()\n", - "uplift_plot(test_ranked_pd_df)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "![criteo_uplift_curve.jpeg](https://mmlspark.blob.core.windows.net/graphics/notebooks/criteo_uplift_curve.jpeg)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - }, - "source": [ - "From the uplift curve above, we notice that the top 20% population ranked by our prediction have a large gain if they were given the treatment, which means the are the **persuadables**. Therefore, we can print the cutoff score at 20% percentage to identify the target customers." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "jupyter": { - "outputs_hidden": false, - "source_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - }, - "outputs": [], - "source": [ - "cutoff_percentage = 0.2\n", - "cutoff_score = test_ranked_pd_df.iloc[int(len(test_ranked_pd_df) * cutoff_percentage)][\n", - " \"pred_uplift\"\n", - "]\n", - "\n", - "print(\"Uplift score higher than {:.4f} are Persuadables\".format(cutoff_score))" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Log and Load Model with MLFlow\n", - "Now that we have a trained model, we can save it for later use. Here we use mlflow to log metrics and models. We can also use this API to load models for prediction." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# setup mlflow\n", - "mlflow.set_experiment(EXPERIMENT_NAME)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# log model, metrics and params\n", - "with mlflow.start_run() as run:\n", - " print(\"log model:\")\n", - " mlflow.spark.log_model(\n", - " treatment_model,\n", - " f\"{EXPERIMENT_NAME}-treatmentmodel\",\n", - " registered_model_name=f\"{EXPERIMENT_NAME}-treatmentmodel\",\n", - " dfs_tmpdir=\"Files/spark\",\n", - " )\n", - "\n", - " mlflow.spark.log_model(\n", - " control_model,\n", - " f\"{EXPERIMENT_NAME}-controlmodel\",\n", - " registered_model_name=f\"{EXPERIMENT_NAME}-controlmodel\",\n", - " dfs_tmpdir=\"Files/spark\",\n", - " )\n", - "\n", - " model_uri = f\"runs:/{run.info.run_id}/{EXPERIMENT_NAME}\"\n", - " print(\"Model saved in run %s\" % run.info.run_id)\n", - " print(f\"Model URI: {model_uri}-treatmentmodel\")\n", - " print(f\"Model URI: {model_uri}-controlmodel\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# load model back\n", - "loaded_treatmentmodel = mlflow.spark.load_model(\n", - " f\"{model_uri}-treatmentmodel\", dfs_tmpdir=\"Files/spark\"\n", - ")" - ] - } - ], - "metadata": { - "kernel_info": { - "name": "synapse_pyspark" - }, - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.13" - }, - "notebook_environment": {}, - "save_output": true, - "spark_compute": { - "compute_id": "/trident/default", - "session_options": { - "conf": { - "spark.livy.synapse.ipythonInterpreter.enabled": "true" - }, - "enableDebugMode": false, - "keepAliveTimeout": 30 - } - }, - "trident": { - "lakehouse": { - "default_lakehouse": "4eb0cf6c-9e08-4640-bc5c-206cba1864b2", - "known_lakehouses": [ - { - "id": "4eb0cf6c-9e08-4640-bc5c-206cba1864b2" - } - ] - } - }, - "vscode": { - "interpreter": { - "hash": "8cebba326b76ca708172f0a6a24a89689a3b64f83dbd9353b827f2f4b33d3f80" - } - } - }, - "nbformat": 4, - "nbformat_minor": 1 -}