Skip to content

Commit

Permalink
Creating a flag with age-diff example (#127)
Browse files Browse the repository at this point in the history
* Create new page called Creating-a-view-in-HDFS.ipynb

* changing file name to be consistent with other files

* text and code added to SQL views page

* SQL_view images added and image links updated in notebook

* updating toc page to include SQL views guidance

* fixing typo

* file name change

* removing mis-named files

* removing .md file as it is not needed

* responding to feedback from JD, corrected typos, added further resources and saved python code-block

* editing SQL code block to be nested

* code block updated to a markdown block instead

* changing .PNG to .png so images show correcttly

* new notebook for age-diff-flag info

* flags notebook updated to hold info from our wiki

* converted flags notebook to get md file

* table fo contents updated to inc. flags page

* link to cross join page updated

* extra info added at start and end of notebook

* re-conversion to capture updates

* added further resources section

* markdown updated to include further resources

* added window function link to further resources

* dont need markdown file

* moved flags notebook

* moved flags notebook straight to analysis section

* files linked to r not needed as there is no r code

* Update text in the content

* Add R code in notebook for flags article

* Moved flags notebook from analysis to raw-notebooks folder

* Run notebook converter

* Add in note about future content to be added

---------

Co-authored-by: zogkoa <Anton.Zogkolli@ons.gov.uk>
Co-authored-by: kellyn <Nathan.Kelly@ons.gov.uk>
Co-authored-by: NathanKelly-ONS <93586797+NathanKelly-ONS@users.noreply.github.com>
  • Loading branch information
4 people authored Jan 26, 2024
1 parent 450a315 commit 4dfa06f
Show file tree
Hide file tree
Showing 5 changed files with 615 additions and 0 deletions.
1 change: 1 addition & 0 deletions ons-spark/_toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ parts:
- file: spark-analysis/interpolation
- file: spark-analysis/logistic-regression
- file: spark-analysis/visualisation
- file: spark-analysis/flags
- file: spark-analysis/bin-continuous-variable
- file: spark-analysis/cramer_v
- caption: Testing and Debugging
Expand Down
267 changes: 267 additions & 0 deletions ons-spark/raw-notebooks/flags/flags.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Flags in Spark\n",
"\n",
"A flag is a column that indicates whether a certain condition, such as a threshold value, is met. The flag is typically a 0 or 1 depending on whether the condition has been met.\n",
"\n",
"Our example will show how you can create a flag for age differences greater than a threshold wihtin a given group. **Note**: Additional use cases for creating flags will be added to this page at a later date.\n",
"\n",
"## Creating an age-difference flag\n",
"\n",
"Given an `group` and `age` columns, we want to highlight if the top 2 values in `age` column per `group` have difference greater than a specified threshold value.\n",
"\n",
"It is likely that there are *many* ways of doing this. Below is one method using the functions available from `pyspark.sql` and some dummy data. \n",
"\n",
"As is good practice, first import the packages you will be using and start your Spark session:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import pyspark.sql.functions as F\n",
"from pyspark.sql import SparkSession, Window"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```r\n",
"library(sparklyr)\n",
"library(dplyr)\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"spark = (SparkSession.builder.master(\"local[2]\")\n",
" .appName(\"flags\")\n",
" .getOrCreate())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```r\n",
"default_config <- sparklyr::spark_config()\n",
"\n",
"sc <- sparklyr::spark_connect(\n",
" master = \"local[2]\",\n",
" app_name = \"flags\",\n",
" config = default_config)\n",
"\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Creating your dummy data\n",
"\n",
"The dataframe `df` will have an `id` column from 0 to 4. To create multiple `id` entries we will do a `.CrossJoin` with the numbers from 0 to 2 and then drop the latter column. To find more information on cross joins please refer to the [page on cross joins](https://best-practice-and-impact.github.io/ons-spark/spark-functions/cross-joins.html). Finally, we will add an `age` column with random numbers from 1 to 10. "
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+---+\n",
"|group|age|\n",
"+-----+---+\n",
"| 0| 7|\n",
"| 0| 9|\n",
"| 0| 10|\n",
"| 1| 9|\n",
"| 1| 5|\n",
"| 1| 6|\n",
"| 2| 4|\n",
"| 2| 3|\n",
"| 2| 10|\n",
"| 3| 1|\n",
"| 3| 4|\n",
"| 3| 10|\n",
"| 4| 8|\n",
"| 4| 7|\n",
"| 4| 3|\n",
"+-----+---+\n",
"\n"
]
}
],
"source": [
"df = (\n",
" spark.range(5).select((F.col(\"id\")).alias(\"group\"))\n",
" .crossJoin(\n",
" spark.range(3)\n",
" .withColumnRenamed(\"id\",\"drop\")\n",
" ).drop(\"drop\")\n",
" .withColumn(\"age\", F.ceil(F.rand(seed=42)*10)))\n",
"\n",
"df.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```r\n",
"set.seed(42)\n",
"\n",
"df = sparklyr::sdf_seq(sc, 0, 4) %>%\n",
" rename(\"group\" = \"id\") %>%\n",
" cross_join(sparklyr::sdf_seq(sc, 0, 2)) %>%\n",
" rename(\"drop\" = \"id\") %>%\n",
" select(-c(drop)) %>%\n",
" mutate(age = ceil(rand()*10))\n",
" \n",
"print(df,n=15)\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Creating your columns \n",
"\n",
"There will be more than one way to get the desired result. Note that the method outlined here spells out the process in detail; please feel free to combine some of the steps (without sacrificing code readability of course!).\n",
"\n",
"Now that we have dummy data we want to create a `window` over `id` that is ordered by `age` in descending order. We then want to create the following columns:\n",
"\n",
"- `age_lag` showing the next highest age within an id\n",
"- `age_diff` the difference between `age` and `age_lag` columns\n",
"- `age_order` numbered oldest to youngest\n",
"- `top_two_age_diff` returns the age difference between two oldest entries within an id, 0 for other rows\n",
"- `age_diff_flag` flag to tell us if the age difference is greater than some threshold, 5 chosen here\n",
"\n",
"The two columns at the end are intermediary, therefore these could be dropped if no longer needed, using `.drop`."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+---+-------+--------+---------+----------------+-------------+\n",
"|group|age|age_lag|age_diff|age_order|top_two_age_diff|age_diff_flag|\n",
"+-----+---+-------+--------+---------+----------------+-------------+\n",
"| 0| 10| 9| 1| 1| 1| 0|\n",
"| 0| 9| 7| 2| 2| 0| 0|\n",
"| 0| 7| null| null| 3| 0| 0|\n",
"| 1| 9| 6| 3| 1| 3| 0|\n",
"| 1| 6| 5| 1| 2| 0| 0|\n",
"| 1| 5| null| null| 3| 0| 0|\n",
"| 3| 10| 4| 6| 1| 6| 1|\n",
"| 3| 4| 1| 3| 2| 0| 0|\n",
"| 3| 1| null| null| 3| 0| 0|\n",
"| 2| 10| 4| 6| 1| 6| 1|\n",
"| 2| 4| 3| 1| 2| 0| 0|\n",
"| 2| 3| null| null| 3| 0| 0|\n",
"| 4| 8| 7| 1| 1| 1| 0|\n",
"| 4| 7| 3| 4| 2| 0| 0|\n",
"| 4| 3| null| null| 3| 0| 0|\n",
"+-----+---+-------+--------+---------+----------------+-------------+\n",
"\n"
]
}
],
"source": [
"order_window = Window.partitionBy(\"group\").orderBy(F.desc(\"age\"))\n",
"\n",
"df = df.withColumn(\"age_lag\", F.lag(F.col(\"age\"), -1).over(order_window))\n",
"df = df.withColumn(\"age_diff\", F.col(\"age\") - F.col(\"age_lag\"))\n",
"df = df.withColumn(\"age_order\", F.row_number().over(order_window))\n",
"df = df.withColumn(\"top_two_age_diff\", F.when((F.col(\"age_order\") == 1), F.col(\"age_diff\")).otherwise(0))\n",
"df = df.withColumn(\"age_diff_flag\", F.when(F.col(\"top_two_age_diff\") > 5, 1).otherwise(0))\n",
"\n",
"df.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```r\n",
"df <- df %>%\n",
" group_by(group) %>%\n",
" arrange(desc(age)) %>%\n",
" mutate(age_lag = lag(age,n = -1)) %>%\n",
" mutate(age_diff = age - age_lag) %>%\n",
" group_by(group) %>%\n",
" mutate(age_order = row_number()) %>%\n",
" mutate(top_two_age_diff = ifelse(age_order == 1,\n",
" age_diff,\n",
" 0\n",
" )) %>%\n",
" mutate(age_diff_flag = ifelse(top_two_age_diff > 5,\n",
" 1,\n",
" 0\n",
" ))\n",
"\n",
"print(df, n = 15)\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As you can see in the table above, a flag column has been created with the values 0 and 1. If the age difference is greater than 5 then the flag = 1, if less than 5 then the flag = 0. \n",
"\n",
"### Further Resources"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Spark at the ONS articles:\n",
"- [Cross joins](https://best-practice-and-impact.github.io/ons-spark/spark-functions/cross-joins.html)\n",
"- [Window functions](https://best-practice-and-impact.github.io/ons-spark/spark-functions/window-functions.html)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.8"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
81 changes: 81 additions & 0 deletions ons-spark/raw-notebooks/flags/outputs.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
cell_no,execution_count,tidy_python_output,r_output
1,1,,
3,2,,
6,3,"+-----+---+
|group|age|
+-----+---+
| 0| 7|
| 0| 9|
| 0| 10|
| 1| 9|
| 1| 5|
| 1| 6|
| 2| 4|
| 2| 3|
| 2| 10|
| 3| 1|
| 3| 4|
| 3| 10|
| 4| 8|
| 4| 7|
| 4| 3|
+-----+---+
","# Source: spark<?> [?? x 2]
group age
<int> <dbl>
1 0 7
2 1 1
3 0 3
4 0 6
5 1 5
6 1 5
7 2 1
8 3 1
9 4 2
10 2 8
11 2 7
12 3 5
13 3 10
14 4 3
15 4 4
"
9,4,"+-----+---+-------+--------+---------+----------------+-------------+
|group|age|age_lag|age_diff|age_order|top_two_age_diff|age_diff_flag|
+-----+---+-------+--------+---------+----------------+-------------+
| 0| 10| 9| 1| 1| 1| 0|
| 0| 9| 7| 2| 2| 0| 0|
| 0| 7| null| null| 3| 0| 0|
| 1| 9| 6| 3| 1| 3| 0|
| 1| 6| 5| 1| 2| 0| 0|
| 1| 5| null| null| 3| 0| 0|
| 3| 10| 4| 6| 1| 6| 1|
| 3| 4| 1| 3| 2| 0| 0|
| 3| 1| null| null| 3| 0| 0|
| 2| 10| 4| 6| 1| 6| 1|
| 2| 4| 3| 1| 2| 0| 0|
| 2| 3| null| null| 3| 0| 0|
| 4| 8| 7| 1| 1| 1| 0|
| 4| 7| 3| 4| 2| 0| 0|
| 4| 3| null| null| 3| 0| 0|
+-----+---+-------+--------+---------+----------------+-------------+
","# Source: spark<?> [?? x 7]
# Groups: group
# Ordered by: desc(age)
group age age_lag age_diff age_order top_two_age_diff age_diff_flag
<int> <dbl> <dbl> <dbl> <int> <dbl> <dbl>
1 1 9 8 1 1 1 0
2 1 8 4 4 2 0 0
3 1 4 NA NA 3 0 0
4 3 10 8 2 1 2 0
5 3 8 5 3 2 0 0
6 3 5 NA NA 3 0 0
7 0 8 8 0 1 0 0
8 0 8 4 4 2 0 0
9 0 4 NA NA 3 0 0
10 2 9 6 3 1 3 0
11 2 6 2 4 2 0 0
12 2 2 NA NA 3 0 0
13 4 6 2 4 1 4 0
14 4 2 1 1 2 0 0
15 4 1 NA NA 3 0 0
"
40 changes: 40 additions & 0 deletions ons-spark/raw-notebooks/flags/r_input.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
options(warn = -1)
library(sparklyr)
library(dplyr)

default_config <- sparklyr::spark_config()

sc <- sparklyr::spark_connect(
master = "local[2]",
app_name = "flags",
config = default_config)


set.seed(42)

df = sparklyr::sdf_seq(sc, 0, 4) %>%
rename("group" = "id") %>%
cross_join(sparklyr::sdf_seq(sc, 0, 2)) %>%
rename("drop" = "id") %>%
select(-c(drop)) %>%
mutate(age = ceil(rand()*10))

print(df,n=15)

df <- df %>%
group_by(group) %>%
arrange(desc(age)) %>%
mutate(age_lag = lag(age,n = -1)) %>%
mutate(age_diff = age - age_lag) %>%
group_by(group) %>%
mutate(age_order = row_number()) %>%
mutate(top_two_age_diff = ifelse(age_order == 1,
age_diff,
0
)) %>%
mutate(age_diff_flag = ifelse(top_two_age_diff > 5,
1,
0
))

print(df, n = 15)
Loading

0 comments on commit 4dfa06f

Please sign in to comment.