diff --git a/all-spark-notebook/test/data/local_pyspark.ipynb b/all-spark-notebook/test/data/local_pyspark.ipynb new file mode 100644 index 0000000000..66129f52e7 --- /dev/null +++ b/all-spark-notebook/test/data/local_pyspark.ipynb @@ -0,0 +1,60 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "output_type": "error", + "ename": "Error", + "evalue": "Jupyter cannot be started. Error attempting to locate jupyter: Data Science libraries jupyter and notebook are not installed in interpreter Python 3.7.7 64-bit ('jupyter': conda).", + "traceback": [ + "Error: Jupyter cannot be started. Error attempting to locate jupyter: Data Science libraries jupyter and notebook are not installed in interpreter Python 3.7.7 64-bit ('jupyter': conda).", + "at b.startServer (/Users/romain/.vscode/extensions/ms-python.python-2020.5.80290/out/client/extension.js:92:270430)", + "at async b.createServer (/Users/romain/.vscode/extensions/ms-python.python-2020.5.80290/out/client/extension.js:92:269873)", + "at async connect (/Users/romain/.vscode/extensions/ms-python.python-2020.5.80290/out/client/extension.js:92:397876)", + "at async w.ensureConnectionAndNotebookImpl (/Users/romain/.vscode/extensions/ms-python.python-2020.5.80290/out/client/extension.js:16:556625)", + "at async w.ensureConnectionAndNotebook (/Users/romain/.vscode/extensions/ms-python.python-2020.5.80290/out/client/extension.js:16:556303)", + "at async w.clearResult (/Users/romain/.vscode/extensions/ms-python.python-2020.5.80290/out/client/extension.js:16:552346)", + "at async w.reexecuteCell (/Users/romain/.vscode/extensions/ms-python.python-2020.5.80290/out/client/extension.js:16:540374)", + "at async w.reexecuteCells (/Users/romain/.vscode/extensions/ms-python.python-2020.5.80290/out/client/extension.js:16:537541)" + ] + } + ], + "source": [ + "from pyspark.sql import SparkSession\n", + "\n", + "# Spark session & context\n", + "spark = SparkSession.builder.master('local').getOrCreate()\n", + "sc = spark.sparkContext\n", + "\n", + "# Sum of the first 100 whole numbers\n", + "rdd = sc.parallelize(range(100 + 1))\n", + "rdd.sum()\n", + "# 5050" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} \ No newline at end of file diff --git a/all-spark-notebook/test/data/local_sparkR.ipynb b/all-spark-notebook/test/data/local_sparkR.ipynb new file mode 100644 index 0000000000..ecf7f7c1ec --- /dev/null +++ b/all-spark-notebook/test/data/local_sparkR.ipynb @@ -0,0 +1,41 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "library(SparkR)\n", + "\n", + "# Spark session & context\n", + "sc <- sparkR.session(\"local\")\n", + "\n", + "# Sum of the first 100 whole numbers\n", + "sdf <- createDataFrame(list(1:100))\n", + "dapplyCollect(sdf,\n", + " function(x) \n", + " { x <- sum(x)}\n", + " )\n", + "# 5050" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "R", + "language": "R", + "name": "ir" + }, + "language_info": { + "codemirror_mode": "r", + "file_extension": ".r", + "mimetype": "text/x-r-source", + "name": "R", + "pygments_lexer": "r", + "version": "3.6.3" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} \ No newline at end of file diff --git a/all-spark-notebook/test/data/local_sparklyr.ipynb b/all-spark-notebook/test/data/local_sparklyr.ipynb new file mode 100644 index 0000000000..8f4527243b --- /dev/null +++ b/all-spark-notebook/test/data/local_sparklyr.ipynb @@ -0,0 +1,43 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "library(sparklyr)\n", + "\n", + "# get the default config\n", + "conf <- spark_config()\n", + "# Set the catalog implementation in-memory\n", + "conf$spark.sql.catalogImplementation <- \"in-memory\"\n", + "\n", + "# Spark session & context\n", + "sc <- spark_connect(master = \"local\", config = conf)\n", + "\n", + "# Sum of the first 100 whole numbers\n", + "sdf_len(sc, 100, repartition = 1) %>% \n", + " spark_apply(function(e) sum(e))\n", + "# 5050" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "R", + "language": "R", + "name": "ir" + }, + "language_info": { + "codemirror_mode": "r", + "file_extension": ".r", + "mimetype": "text/x-r-source", + "name": "R", + "pygments_lexer": "r", + "version": "3.6.3" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} \ No newline at end of file diff --git a/all-spark-notebook/test/data/local_spylon.ipynb b/all-spark-notebook/test/data/local_spylon.ipynb new file mode 100644 index 0000000000..0caf2f0a0b --- /dev/null +++ b/all-spark-notebook/test/data/local_spylon.ipynb @@ -0,0 +1,63 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "%%init_spark\n", + "# Spark session & context\n", + "launcher.master = \"local\"\n", + "launcher.conf.spark.executor.cores = 1" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at :28\n", + "res4: Double = 5050.0\n" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "// Sum of the first 100 whole numbers\n", + "val rdd = sc.parallelize(0 to 100)\n", + "rdd.sum()\n", + "// 5050" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "spylon-kernel", + "language": "scala", + "name": "spylon-kernel" + }, + "language_info": { + "codemirror_mode": "text/x-scala", + "file_extension": ".scala", + "help_links": [ + { + "text": "MetaKernel Magics", + "url": "https://metakernel.readthedocs.io/en/latest/source/README.html" + } + ], + "mimetype": "text/x-scala", + "name": "scala", + "pygments_lexer": "scala", + "version": "0.4.1" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} \ No newline at end of file diff --git a/all-spark-notebook/test/data/local_toree.ipynb b/all-spark-notebook/test/data/local_toree.ipynb new file mode 100644 index 0000000000..16a29417f7 --- /dev/null +++ b/all-spark-notebook/test/data/local_toree.ipynb @@ -0,0 +1,89 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Waiting for a Spark session to start..." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "spark://master:7077\n" + ] + } + ], + "source": [ + "// should print the value of --master in the kernel spec\n", + "println(sc.master)" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Waiting for a Spark session to start..." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "rdd = ParallelCollectionRDD[0] at parallelize at :28\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "5050.0" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "// Sum of the first 100 whole numbers\n", + "val rdd = sc.parallelize(0 to 100)\n", + "rdd.sum()\n", + "// 5050" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Apache Toree - Scala", + "language": "scala", + "name": "apache_toree_scala" + }, + "language_info": { + "codemirror_mode": "text/x-scala", + "file_extension": ".scala", + "mimetype": "text/x-scala", + "name": "scala", + "pygments_lexer": "scala", + "version": "2.11.12" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} \ No newline at end of file diff --git a/all-spark-notebook/test/test_spark_notebooks.py b/all-spark-notebook/test/test_spark_notebooks.py new file mode 100644 index 0000000000..86eb98df10 --- /dev/null +++ b/all-spark-notebook/test/test_spark_notebooks.py @@ -0,0 +1,35 @@ +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. + +import logging + +import pytest +import os + +LOGGER = logging.getLogger(__name__) + + +@pytest.mark.parametrize( + "test_file", + # TODO: add local_sparklyr + ["local_pyspark", "local_spylon", "local_toree", "local_sparkR"], +) +def test_nbconvert(container, test_file): + """Check if Spark notebooks can be executed""" + host_data_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "data") + cont_data_dir = "/home/jovyan/data" + output_dir = "/tmp" + timeout_ms = 600 + LOGGER.info(f"Test that {test_file} notebook can be executed ...") + command = f"jupyter nbconvert --to markdown --ExecutePreprocessor.timeout={timeout_ms} --output-dir {output_dir} --execute {cont_data_dir}/{test_file}.ipynb" + c = container.run( + volumes={host_data_dir: {"bind": cont_data_dir, "mode": "ro"}}, + tty=True, + command=["start.sh", "bash", "-c", command], + ) + rv = c.wait(timeout=timeout_ms / 10 + 10) + assert rv == 0 or rv["StatusCode"] == 0, f"Command {command} failed" + logs = c.logs(stdout=True).decode("utf-8") + LOGGER.debug(logs) + expected_file = f"{output_dir}/{test_file}.md" + assert expected_file in logs, f"Expected file {expected_file} not generated" diff --git a/docs/using/specifics.md b/docs/using/specifics.md index 420b782c0f..50c3ccb016 100644 --- a/docs/using/specifics.md +++ b/docs/using/specifics.md @@ -5,7 +5,8 @@ This page provides details about features specific to one or more images. ## Apache Spark **Specific Docker Image Options** -* `-p 4040:4040` - The `jupyter/pyspark-notebook` and `jupyter/all-spark-notebook` images open [SparkUI (Spark Monitoring and Instrumentation UI)](http://spark.apache.org/docs/latest/monitoring.html) at default port `4040`, this option map `4040` port inside docker container to `4040` port on host machine . Note every new spark context that is created is put onto an incrementing port (ie. 4040, 4041, 4042, etc.), and it might be necessary to open multiple ports. For example: `docker run -d -p 8888:8888 -p 4040:4040 -p 4041:4041 jupyter/pyspark-notebook` + +* `-p 4040:4040` - The `jupyter/pyspark-notebook` and `jupyter/all-spark-notebook` images open [SparkUI (Spark Monitoring and Instrumentation UI)](http://spark.apache.org/docs/latest/monitoring.html) at default port `4040`, this option map `4040` port inside docker container to `4040` port on host machine . Note every new spark context that is created is put onto an incrementing port (ie. 4040, 4041, 4042, etc.), and it might be necessary to open multiple ports. For example: `docker run -d -p 8888:8888 -p 4040:4040 -p 4041:4041 jupyter/pyspark-notebook`. **Usage Examples** @@ -13,30 +14,66 @@ The `jupyter/pyspark-notebook` and `jupyter/all-spark-notebook` images support t ### Using Spark Local Mode -Spark local mode is useful for experimentation on small data when you do not have a Spark cluster available. +Spark **local mode** is useful for experimentation on small data when you do not have a Spark cluster available. + +#### In Python -#### In a Python Notebook +In a Python notebook. ```python from pyspark.sql import SparkSession -spark = SparkSession.builder.appName("SimpleApp").getOrCreate() -# do something to prove it works -spark.sql('SELECT "Test" as c1').show() + +# Spark session & context +spark = SparkSession.builder.master('local').getOrCreate() +sc = spark.sparkContext + +# Sum of the first 100 whole numbers +rdd = sc.parallelize(range(100 + 1)) +rdd.sum() +# 5050 ``` -#### In a R Notebook +#### In R -```r +In a R notebook with [SparkR][sparkr]. + +```R library(SparkR) -as <- sparkR.session("local[*]") +# Spark session & context +sc <- sparkR.session("local") + +# Sum of the first 100 whole numbers +sdf <- createDataFrame(list(1:100)) +dapplyCollect(sdf, + function(x) + { x <- sum(x)} + ) +# 5050 +``` + +In a R notebook with [sparklyr][sparklyr]. -# do something to prove it works -df <- as.DataFrame(iris) -head(filter(df, df$Petal_Width > 0.2)) +```R +library(sparklyr) + +# Spark configuration +conf <- spark_config() +# Set the catalog implementation in-memory +conf$spark.sql.catalogImplementation <- "in-memory" + +# Spark session & context +sc <- spark_connect(master = "local", config = conf) + +# Sum of the first 100 whole numbers +sdf_len(sc, 100, repartition = 1) %>% + spark_apply(function(e) sum(e)) +# 5050 ``` -#### In a Spylon Kernel Scala Notebook +#### In Scala + +##### In a Spylon Kernel Spylon kernel instantiates a `SparkContext` for you in variable `sc` after you configure Spark options in a `%%init_spark` magic cell. @@ -44,27 +81,30 @@ options in a `%%init_spark` magic cell. ```python %%init_spark # Configure Spark to use a local master -launcher.master = "local[*]" +launcher.master = "local" ``` ```scala -// Now run Scala code that uses the initialized SparkContext in sc -val rdd = sc.parallelize(0 to 999) -rdd.takeSample(false, 5) +// Sum of the first 100 whole numbers +val rdd = sc.parallelize(0 to 100) +rdd.sum() +// 5050 ``` -#### In an Apache Toree Scala Notebook +##### In an Apache Toree Kernel Apache Toree instantiates a local `SparkContext` for you in variable `sc` when the kernel starts. ```scala -val rdd = sc.parallelize(0 to 999) -rdd.takeSample(false, 5) +// Sum of the first 100 whole numbers +val rdd = sc.parallelize(0 to 100) +rdd.sum() +// 5050 ``` ### Connecting to a Spark Cluster in Standalone Mode -Connection to Spark Cluster on Standalone Mode requires the following set of steps: +Connection to Spark Cluster on **[Standalone Mode](https://spark.apache.org/docs/latest/spark-standalone.html)** requires the following set of steps: 0. Verify that the docker image (check the Dockerfile) and the Spark Cluster which is being deployed, run the same version of Spark. @@ -75,97 +115,104 @@ Connection to Spark Cluster on Standalone Mode requires the following set of ste * NOTE: When using `--net=host`, you must also use the flags `--pid=host -e TINI_SUBREAPER=true`. See https://github.com/jupyter/docker-stacks/issues/64 for details. -#### In a Python Notebook +**Note**: In the following examples we are using the Spark master URL `spark://master:7077` that shall be replaced by the URL of the Spark master. + +#### In Python + +The **same Python version** need to be used on the notebook (where the driver is located) and on the Spark workers. +The python version used at driver and worker side can be adjusted by setting the environment variables `PYSPARK_PYTHON` and / or `PYSPARK_DRIVER_PYTHON`, see [Spark Configuration][spark-conf] for more information. ```python -import os -# make sure pyspark tells workers to use python3 not 2 if both are installed -os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3' - -import pyspark -conf = pyspark.SparkConf() - -# Point to spark master -conf.setMaster("spark://10.10.10.10:7070") -# point to spark binary package in HDFS or on local filesystem on all slave -# nodes (e.g., file:///opt/spark/spark-2.2.0-bin-hadoop2.7.tgz) -conf.set("spark.executor.uri", "hdfs://10.10.10.10/spark/spark-2.2.0-bin-hadoop2.7.tgz") -# set other options as desired -conf.set("spark.executor.memory", "8g") -conf.set("spark.core.connection.ack.wait.timeout", "1200") - -# create the context -sc = pyspark.SparkContext(conf=conf) - -# do something to prove it works -rdd = sc.parallelize(range(100000000)) -rdd.sumApprox(3) +from pyspark.sql import SparkSession + +# Spark session & context +spark = SparkSession.builder.master('spark://master:7077').getOrCreate() +sc = spark.sparkContext + +# Sum of the first 100 whole numbers +rdd = sc.parallelize(range(100 + 1)) +rdd.sum() +# 5050 ``` -#### In a R Notebook +#### In R -```r +In a R notebook with [SparkR][sparkr]. + +```R library(SparkR) -# Point to spark master -# Point to spark binary package in HDFS or on local filesystem on all worker -# nodes (e.g., file:///opt/spark/spark-2.2.0-bin-hadoop2.7.tgz) in sparkEnvir -# Set other options in sparkEnvir -sc <- sparkR.session( - "spark://10.10.10.10:7070", - sparkEnvir=list( - spark.executor.uri="hdfs://10.10.10.10/spark/spark-2.4.3-bin-hadoop2.7.tgz", - spark.executor.memory="8g" - ) -) - -# do something to prove it works -data(iris) -df <- as.DataFrame(iris) -head(filter(df, df$Petal_Width > 0.2)) +# Spark session & context +sc <- sparkR.session("spark://master:7077") + +# Sum of the first 100 whole numbers +sdf <- createDataFrame(list(1:100)) +dapplyCollect(sdf, + function(x) + { x <- sum(x)} + ) +# 5050 ``` -#### In a Spylon Kernel Scala Notebook +In a R notebook with [sparklyr][sparklyr]. + +```R +library(sparklyr) + +# Spark session & context +# Spark configuration +conf <- spark_config() +# Set the catalog implementation in-memory +conf$spark.sql.catalogImplementation <- "in-memory" +sc <- spark_connect(master = "spark://master:7077", config = conf) + +# Sum of the first 100 whole numbers +sdf_len(sc, 100, repartition = 1) %>% + spark_apply(function(e) sum(e)) +# 5050 +``` + +#### In Scala + +##### In a Spylon Kernel + +Spylon kernel instantiates a `SparkContext` for you in variable `sc` after you configure Spark +options in a `%%init_spark` magic cell. ```python %%init_spark -# Point to spark master -launcher.master = "spark://10.10.10.10:7070" -launcher.conf.spark.executor.uri=hdfs://10.10.10.10/spark/spark-2.4.3-bin-hadoop2.7.tgz +# Configure Spark to use a local master +launcher.master = "spark://master:7077" ``` ```scala -// Now run Scala code that uses the initialized SparkContext in sc -val rdd = sc.parallelize(0 to 999) -rdd.takeSample(false, 5) +// Sum of the first 100 whole numbers +val rdd = sc.parallelize(0 to 100) +rdd.sum() +// 5050 ``` -#### In an Apache Toree Scala Notebook +##### In an Apache Toree Scala Notebook -The Apache Toree kernel automatically creates a `SparkContext` when it starts based on configuration -information from its command line arguments and environment variables. You can pass information -about your cluster via the `SPARK_OPTS` environment variable when you spawn a container. +The Apache Toree kernel automatically creates a `SparkContext` when it starts based on configuration information from its command line arguments and environment variables. You can pass information about your cluster via the `SPARK_OPTS` environment variable when you spawn a container. -For instance, to pass information about a standalone Spark master, Spark binary location in HDFS, -and an executor options, you could start the container like so: +For instance, to pass information about a standalone Spark master, you could start the container like so: ```bash -docker run -d -p 8888:8888 -e SPARK_OPTS='--master=spark://10.10.10.10:7070 \ - --spark.executor.uri=hdfs://10.10.10.10/spark/spark-2.4.3-bin-hadoop2.7.tgz \ - --spark.executor.memory=8g' jupyter/all-spark-notebook +docker run -d -p 8888:8888 -e SPARK_OPTS='--master=spark://master:7077' \ + jupyter/all-spark-notebook ``` -Note that this is the same information expressed in a notebook in the Python case above. Once the -kernel spec has your cluster information, you can test your cluster in an Apache Toree notebook like -so: +Note that this is the same information expressed in a notebook in the Python case above. Once the kernel spec has your cluster information, you can test your cluster in an Apache Toree notebook like so: ```scala // should print the value of --master in the kernel spec println(sc.master) -// do something to prove it works -val rdd = sc.parallelize(0 to 99999999) +// Sum of the first 100 whole numbers +val rdd = sc.parallelize(0 to 100) rdd.sum() +// 5050 ``` ## Tensorflow @@ -201,3 +248,7 @@ init = tf.global_variables_initializer() sess.run(init) sess.run(hello) ``` + +[sparkr]: https://spark.apache.org/docs/latest/sparkr.html +[sparklyr]: https://spark.rstudio.com/ +[spark-conf]: https://spark.apache.org/docs/latest/configuration.html \ No newline at end of file