diff --git a/ons-spark/_toc.yml b/ons-spark/_toc.yml index 791d88c9..aad53361 100644 --- a/ons-spark/_toc.yml +++ b/ons-spark/_toc.yml @@ -54,6 +54,7 @@ parts: - file: spark-analysis/interpolation - file: spark-analysis/logistic-regression - file: spark-analysis/visualisation + - file: spark-analysis/flags - file: spark-analysis/bin-continuous-variable - file: spark-analysis/cramer_v - caption: Testing and Debugging diff --git a/ons-spark/raw-notebooks/flags/flags.ipynb b/ons-spark/raw-notebooks/flags/flags.ipynb new file mode 100755 index 00000000..22beae9c --- /dev/null +++ b/ons-spark/raw-notebooks/flags/flags.ipynb @@ -0,0 +1,267 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Flags in Spark\n", + "\n", + "A flag is a column that indicates whether a certain condition, such as a threshold value, is met. The flag is typically a 0 or 1 depending on whether the condition has been met.\n", + "\n", + "Our example will show how you can create a flag for age differences greater than a threshold wihtin a given group. **Note**: Additional use cases for creating flags will be added to this page at a later date.\n", + "\n", + "## Creating an age-difference flag\n", + "\n", + "Given an `group` and `age` columns, we want to highlight if the top 2 values in `age` column per `group` have difference greater than a specified threshold value.\n", + "\n", + "It is likely that there are *many* ways of doing this. Below is one method using the functions available from `pyspark.sql` and some dummy data. \n", + "\n", + "As is good practice, first import the packages you will be using and start your Spark session:" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import pyspark.sql.functions as F\n", + "from pyspark.sql import SparkSession, Window" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "```r\n", + "library(sparklyr)\n", + "library(dplyr)\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "spark = (SparkSession.builder.master(\"local[2]\")\n", + " .appName(\"flags\")\n", + " .getOrCreate())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "```r\n", + "default_config <- sparklyr::spark_config()\n", + "\n", + "sc <- sparklyr::spark_connect(\n", + " master = \"local[2]\",\n", + " app_name = \"flags\",\n", + " config = default_config)\n", + "\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Creating your dummy data\n", + "\n", + "The dataframe `df` will have an `id` column from 0 to 4. To create multiple `id` entries we will do a `.CrossJoin` with the numbers from 0 to 2 and then drop the latter column. To find more information on cross joins please refer to the [page on cross joins](https://best-practice-and-impact.github.io/ons-spark/spark-functions/cross-joins.html). Finally, we will add an `age` column with random numbers from 1 to 10. " + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----+---+\n", + "|group|age|\n", + "+-----+---+\n", + "| 0| 7|\n", + "| 0| 9|\n", + "| 0| 10|\n", + "| 1| 9|\n", + "| 1| 5|\n", + "| 1| 6|\n", + "| 2| 4|\n", + "| 2| 3|\n", + "| 2| 10|\n", + "| 3| 1|\n", + "| 3| 4|\n", + "| 3| 10|\n", + "| 4| 8|\n", + "| 4| 7|\n", + "| 4| 3|\n", + "+-----+---+\n", + "\n" + ] + } + ], + "source": [ + "df = (\n", + " spark.range(5).select((F.col(\"id\")).alias(\"group\"))\n", + " .crossJoin(\n", + " spark.range(3)\n", + " .withColumnRenamed(\"id\",\"drop\")\n", + " ).drop(\"drop\")\n", + " .withColumn(\"age\", F.ceil(F.rand(seed=42)*10)))\n", + "\n", + "df.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "```r\n", + "set.seed(42)\n", + "\n", + "df = sparklyr::sdf_seq(sc, 0, 4) %>%\n", + " rename(\"group\" = \"id\") %>%\n", + " cross_join(sparklyr::sdf_seq(sc, 0, 2)) %>%\n", + " rename(\"drop\" = \"id\") %>%\n", + " select(-c(drop)) %>%\n", + " mutate(age = ceil(rand()*10))\n", + " \n", + "print(df,n=15)\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Creating your columns \n", + "\n", + "There will be more than one way to get the desired result. Note that the method outlined here spells out the process in detail; please feel free to combine some of the steps (without sacrificing code readability of course!).\n", + "\n", + "Now that we have dummy data we want to create a `window` over `id` that is ordered by `age` in descending order. We then want to create the following columns:\n", + "\n", + "- `age_lag` showing the next highest age within an id\n", + "- `age_diff` the difference between `age` and `age_lag` columns\n", + "- `age_order` numbered oldest to youngest\n", + "- `top_two_age_diff` returns the age difference between two oldest entries within an id, 0 for other rows\n", + "- `age_diff_flag` flag to tell us if the age difference is greater than some threshold, 5 chosen here\n", + "\n", + "The two columns at the end are intermediary, therefore these could be dropped if no longer needed, using `.drop`." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----+---+-------+--------+---------+----------------+-------------+\n", + "|group|age|age_lag|age_diff|age_order|top_two_age_diff|age_diff_flag|\n", + "+-----+---+-------+--------+---------+----------------+-------------+\n", + "| 0| 10| 9| 1| 1| 1| 0|\n", + "| 0| 9| 7| 2| 2| 0| 0|\n", + "| 0| 7| null| null| 3| 0| 0|\n", + "| 1| 9| 6| 3| 1| 3| 0|\n", + "| 1| 6| 5| 1| 2| 0| 0|\n", + "| 1| 5| null| null| 3| 0| 0|\n", + "| 3| 10| 4| 6| 1| 6| 1|\n", + "| 3| 4| 1| 3| 2| 0| 0|\n", + "| 3| 1| null| null| 3| 0| 0|\n", + "| 2| 10| 4| 6| 1| 6| 1|\n", + "| 2| 4| 3| 1| 2| 0| 0|\n", + "| 2| 3| null| null| 3| 0| 0|\n", + "| 4| 8| 7| 1| 1| 1| 0|\n", + "| 4| 7| 3| 4| 2| 0| 0|\n", + "| 4| 3| null| null| 3| 0| 0|\n", + "+-----+---+-------+--------+---------+----------------+-------------+\n", + "\n" + ] + } + ], + "source": [ + "order_window = Window.partitionBy(\"group\").orderBy(F.desc(\"age\"))\n", + "\n", + "df = df.withColumn(\"age_lag\", F.lag(F.col(\"age\"), -1).over(order_window))\n", + "df = df.withColumn(\"age_diff\", F.col(\"age\") - F.col(\"age_lag\"))\n", + "df = df.withColumn(\"age_order\", F.row_number().over(order_window))\n", + "df = df.withColumn(\"top_two_age_diff\", F.when((F.col(\"age_order\") == 1), F.col(\"age_diff\")).otherwise(0))\n", + "df = df.withColumn(\"age_diff_flag\", F.when(F.col(\"top_two_age_diff\") > 5, 1).otherwise(0))\n", + "\n", + "df.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "```r\n", + "df <- df %>%\n", + " group_by(group) %>%\n", + " arrange(desc(age)) %>%\n", + " mutate(age_lag = lag(age,n = -1)) %>%\n", + " mutate(age_diff = age - age_lag) %>%\n", + " group_by(group) %>%\n", + " mutate(age_order = row_number()) %>%\n", + " mutate(top_two_age_diff = ifelse(age_order == 1,\n", + " age_diff,\n", + " 0\n", + " )) %>%\n", + " mutate(age_diff_flag = ifelse(top_two_age_diff > 5,\n", + " 1,\n", + " 0\n", + " ))\n", + "\n", + "print(df, n = 15)\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "As you can see in the table above, a flag column has been created with the values 0 and 1. If the age difference is greater than 5 then the flag = 1, if less than 5 then the flag = 0. \n", + "\n", + "### Further Resources" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Spark at the ONS articles:\n", + "- [Cross joins](https://best-practice-and-impact.github.io/ons-spark/spark-functions/cross-joins.html)\n", + "- [Window functions](https://best-practice-and-impact.github.io/ons-spark/spark-functions/window-functions.html)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.8" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/ons-spark/raw-notebooks/flags/outputs.csv b/ons-spark/raw-notebooks/flags/outputs.csv new file mode 100644 index 00000000..e2a3405e --- /dev/null +++ b/ons-spark/raw-notebooks/flags/outputs.csv @@ -0,0 +1,81 @@ +cell_no,execution_count,tidy_python_output,r_output +1,1,, +3,2,, +6,3,"+-----+---+ +|group|age| ++-----+---+ +| 0| 7| +| 0| 9| +| 0| 10| +| 1| 9| +| 1| 5| +| 1| 6| +| 2| 4| +| 2| 3| +| 2| 10| +| 3| 1| +| 3| 4| +| 3| 10| +| 4| 8| +| 4| 7| +| 4| 3| ++-----+---+ +","# Source: spark [?? x 2] + group age + + 1 0 7 + 2 1 1 + 3 0 3 + 4 0 6 + 5 1 5 + 6 1 5 + 7 2 1 + 8 3 1 + 9 4 2 +10 2 8 +11 2 7 +12 3 5 +13 3 10 +14 4 3 +15 4 4 +" +9,4,"+-----+---+-------+--------+---------+----------------+-------------+ +|group|age|age_lag|age_diff|age_order|top_two_age_diff|age_diff_flag| ++-----+---+-------+--------+---------+----------------+-------------+ +| 0| 10| 9| 1| 1| 1| 0| +| 0| 9| 7| 2| 2| 0| 0| +| 0| 7| null| null| 3| 0| 0| +| 1| 9| 6| 3| 1| 3| 0| +| 1| 6| 5| 1| 2| 0| 0| +| 1| 5| null| null| 3| 0| 0| +| 3| 10| 4| 6| 1| 6| 1| +| 3| 4| 1| 3| 2| 0| 0| +| 3| 1| null| null| 3| 0| 0| +| 2| 10| 4| 6| 1| 6| 1| +| 2| 4| 3| 1| 2| 0| 0| +| 2| 3| null| null| 3| 0| 0| +| 4| 8| 7| 1| 1| 1| 0| +| 4| 7| 3| 4| 2| 0| 0| +| 4| 3| null| null| 3| 0| 0| ++-----+---+-------+--------+---------+----------------+-------------+ +","# Source: spark [?? x 7] +# Groups: group +# Ordered by: desc(age) + group age age_lag age_diff age_order top_two_age_diff age_diff_flag + + 1 1 9 8 1 1 1 0 + 2 1 8 4 4 2 0 0 + 3 1 4 NA NA 3 0 0 + 4 3 10 8 2 1 2 0 + 5 3 8 5 3 2 0 0 + 6 3 5 NA NA 3 0 0 + 7 0 8 8 0 1 0 0 + 8 0 8 4 4 2 0 0 + 9 0 4 NA NA 3 0 0 +10 2 9 6 3 1 3 0 +11 2 6 2 4 2 0 0 +12 2 2 NA NA 3 0 0 +13 4 6 2 4 1 4 0 +14 4 2 1 1 2 0 0 +15 4 1 NA NA 3 0 0 +" diff --git a/ons-spark/raw-notebooks/flags/r_input.R b/ons-spark/raw-notebooks/flags/r_input.R new file mode 100644 index 00000000..2559e386 --- /dev/null +++ b/ons-spark/raw-notebooks/flags/r_input.R @@ -0,0 +1,40 @@ +options(warn = -1) +library(sparklyr) +library(dplyr) + +default_config <- sparklyr::spark_config() + +sc <- sparklyr::spark_connect( + master = "local[2]", + app_name = "flags", + config = default_config) + + +set.seed(42) + +df = sparklyr::sdf_seq(sc, 0, 4) %>% + rename("group" = "id") %>% + cross_join(sparklyr::sdf_seq(sc, 0, 2)) %>% + rename("drop" = "id") %>% + select(-c(drop)) %>% + mutate(age = ceil(rand()*10)) + +print(df,n=15) + +df <- df %>% + group_by(group) %>% + arrange(desc(age)) %>% + mutate(age_lag = lag(age,n = -1)) %>% + mutate(age_diff = age - age_lag) %>% + group_by(group) %>% + mutate(age_order = row_number()) %>% + mutate(top_two_age_diff = ifelse(age_order == 1, + age_diff, + 0 + )) %>% + mutate(age_diff_flag = ifelse(top_two_age_diff > 5, + 1, + 0 + )) + +print(df, n = 15) diff --git a/ons-spark/spark-analysis/flags.md b/ons-spark/spark-analysis/flags.md new file mode 100644 index 00000000..fa7f2060 --- /dev/null +++ b/ons-spark/spark-analysis/flags.md @@ -0,0 +1,226 @@ +# Flags in Spark + +A flag is a column that indicates whether a certain condition, such as a threshold value, is met. The flag is typically a 0 or 1 depending on whether the condition has been met. + +Our example will show how you can create a flag for age differences greater than a threshold wihtin a given group. **Note**: Additional use cases for creating flags will be added to this page at a later date. + +## Creating an age-difference flag + +Given an `group` and `age` columns, we want to highlight if the top 2 values in `age` column per `group` have difference greater than a specified threshold value. + +It is likely that there are *many* ways of doing this. Below is one method using the functions available from `pyspark.sql` and some dummy data. + +As is good practice, first import the packages you will be using and start your Spark session: +````{tabs} +```{code-tab} py +import pyspark.sql.functions as F +from pyspark.sql import SparkSession, Window +``` + +```{code-tab} r R + +library(sparklyr) +library(dplyr) + +``` +```` + +````{tabs} +```{code-tab} py +spark = (SparkSession.builder.master("local[2]") + .appName("flags") + .getOrCreate()) +``` + +```{code-tab} r R + +default_config <- sparklyr::spark_config() + +sc <- sparklyr::spark_connect( + master = "local[2]", + app_name = "flags", + config = default_config) + + +``` +```` +### Creating your dummy data + +The dataframe `df` will have an `id` column from 0 to 4. To create multiple `id` entries we will do a `.CrossJoin` with the numbers from 0 to 2 and then drop the latter column. To find more information on cross joins please refer to the [page on cross joins](https://best-practice-and-impact.github.io/ons-spark/spark-functions/cross-joins.html). Finally, we will add an `age` column with random numbers from 1 to 10. +````{tabs} +```{code-tab} py +df = ( + spark.range(5).select((F.col("id")).alias("group")) + .crossJoin( + spark.range(3) + .withColumnRenamed("id","drop") + ).drop("drop") + .withColumn("age", F.ceil(F.rand(seed=42)*10))) + +df.show() +``` + +```{code-tab} r R + +set.seed(42) + +df = sparklyr::sdf_seq(sc, 0, 4) %>% + rename("group" = "id") %>% + cross_join(sparklyr::sdf_seq(sc, 0, 2)) %>% + rename("drop" = "id") %>% + select(-c(drop)) %>% + mutate(age = ceil(rand()*10)) + +print(df,n=15) + +``` +```` + +````{tabs} + +```{code-tab} plaintext Python Output ++-----+---+ +|group|age| ++-----+---+ +| 0| 7| +| 0| 9| +| 0| 10| +| 1| 9| +| 1| 5| +| 1| 6| +| 2| 4| +| 2| 3| +| 2| 10| +| 3| 1| +| 3| 4| +| 3| 10| +| 4| 8| +| 4| 7| +| 4| 3| ++-----+---+ +``` + +```{code-tab} plaintext R Output +# Source: spark [?? x 2] + group age + + 1 0 7 + 2 1 1 + 3 0 3 + 4 0 6 + 5 1 5 + 6 1 5 + 7 2 1 + 8 3 1 + 9 4 2 +10 2 8 +11 2 7 +12 3 5 +13 3 10 +14 4 3 +15 4 4 +``` +```` +### Creating your columns + +There will be more than one way to get the desired result. Note that the method outlined here spells out the process in detail; please feel free to combine some of the steps (without sacrificing code readability of course!). + +Now that we have dummy data we want to create a `window` over `id` that is ordered by `age` in descending order. We then want to create the following columns: + +- `age_lag` showing the next highest age within an id +- `age_diff` the difference between `age` and `age_lag` columns +- `age_order` numbered oldest to youngest +- `top_two_age_diff` returns the age difference between two oldest entries within an id, 0 for other rows +- `age_diff_flag` flag to tell us if the age difference is greater than some threshold, 5 chosen here + +The two columns at the end are intermediary, therefore these could be dropped if no longer needed, using `.drop`. +````{tabs} +```{code-tab} py +order_window = Window.partitionBy("group").orderBy(F.desc("age")) + +df = df.withColumn("age_lag", F.lag(F.col("age"), -1).over(order_window)) +df = df.withColumn("age_diff", F.col("age") - F.col("age_lag")) +df = df.withColumn("age_order", F.row_number().over(order_window)) +df = df.withColumn("top_two_age_diff", F.when((F.col("age_order") == 1), F.col("age_diff")).otherwise(0)) +df = df.withColumn("age_diff_flag", F.when(F.col("top_two_age_diff") > 5, 1).otherwise(0)) + +df.show() +``` + +```{code-tab} r R + +df <- df %>% + group_by(group) %>% + arrange(desc(age)) %>% + mutate(age_lag = lag(age,n = -1)) %>% + mutate(age_diff = age - age_lag) %>% + group_by(group) %>% + mutate(age_order = row_number()) %>% + mutate(top_two_age_diff = ifelse(age_order == 1, + age_diff, + 0 + )) %>% + mutate(age_diff_flag = ifelse(top_two_age_diff > 5, + 1, + 0 + )) + +print(df, n = 15) + +``` +```` + +````{tabs} + +```{code-tab} plaintext Python Output ++-----+---+-------+--------+---------+----------------+-------------+ +|group|age|age_lag|age_diff|age_order|top_two_age_diff|age_diff_flag| ++-----+---+-------+--------+---------+----------------+-------------+ +| 0| 10| 9| 1| 1| 1| 0| +| 0| 9| 7| 2| 2| 0| 0| +| 0| 7| null| null| 3| 0| 0| +| 1| 9| 6| 3| 1| 3| 0| +| 1| 6| 5| 1| 2| 0| 0| +| 1| 5| null| null| 3| 0| 0| +| 3| 10| 4| 6| 1| 6| 1| +| 3| 4| 1| 3| 2| 0| 0| +| 3| 1| null| null| 3| 0| 0| +| 2| 10| 4| 6| 1| 6| 1| +| 2| 4| 3| 1| 2| 0| 0| +| 2| 3| null| null| 3| 0| 0| +| 4| 8| 7| 1| 1| 1| 0| +| 4| 7| 3| 4| 2| 0| 0| +| 4| 3| null| null| 3| 0| 0| ++-----+---+-------+--------+---------+----------------+-------------+ +``` + +```{code-tab} plaintext R Output +# Source: spark [?? x 7] +# Groups: group +# Ordered by: desc(age) + group age age_lag age_diff age_order top_two_age_diff age_diff_flag + + 1 1 9 8 1 1 1 0 + 2 1 8 4 4 2 0 0 + 3 1 4 NA NA 3 0 0 + 4 3 10 8 2 1 2 0 + 5 3 8 5 3 2 0 0 + 6 3 5 NA NA 3 0 0 + 7 0 8 8 0 1 0 0 + 8 0 8 4 4 2 0 0 + 9 0 4 NA NA 3 0 0 +10 2 9 6 3 1 3 0 +11 2 6 2 4 2 0 0 +12 2 2 NA NA 3 0 0 +13 4 6 2 4 1 4 0 +14 4 2 1 1 2 0 0 +15 4 1 NA NA 3 0 0 +``` +```` +As you can see in the table above, a flag column has been created with the values 0 and 1. If the age difference is greater than 5 then the flag = 1, if less than 5 then the flag = 0. + +### Further Resources + +Spark at the ONS articles: +- [Cross joins](https://best-practice-and-impact.github.io/ons-spark/spark-functions/cross-joins.html) +- [Window functions](https://best-practice-and-impact.github.io/ons-spark/spark-functions/window-functions.html) \ No newline at end of file