From ef25cb3b59209eea28b7b1bd02cedeacfe71c907 Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Wed, 26 Apr 2017 01:32:07 -0700 Subject: [PATCH 1/7] programming guide and example --- .../structured-streaming-programming-guide.md | 501 +++++++++++++----- .../streaming/structured_network_wordcount.R | 57 ++ 2 files changed, 412 insertions(+), 146 deletions(-) create mode 100644 examples/src/main/r/streaming/structured_network_wordcount.R diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 5b18cf2f3c2e..a8810e132486 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -8,13 +8,13 @@ title: Structured Streaming Programming Guide {:toc} # Overview -Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the [Dataset/DataFrame API](sql-programming-guide.html) in Scala, Java or Python to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, *Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.* +Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the [Dataset/DataFrame API](sql-programming-guide.html) in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, *Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.* -**Structured Streaming is still ALPHA in Spark 2.1** and the APIs are still experimental. In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count. +**Structured Streaming is still ALPHA in Spark 2.1** and the APIs are still experimental. In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count. # Quick Example -Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in -[Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java)/[Python]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/sql/streaming/structured_network_wordcount.py). +Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in +[Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java)/[Python]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/sql/streaming/structured_network_wordcount.py)/[R]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/r/streaming/structured_network_wordcount.R). And if you [download Spark](http://spark.apache.org/downloads.html), you can directly run the example. In any case, let’s walk through the example step-by-step and understand how it works. First, we have to import the necessary classes and create a local SparkSession, the starting point of all functionalities related to Spark.
@@ -28,7 +28,7 @@ val spark = SparkSession .builder .appName("StructuredNetworkWordCount") .getOrCreate() - + import spark.implicits._ {% endhighlight %} @@ -63,6 +63,13 @@ spark = SparkSession \ .getOrCreate() {% endhighlight %} +
+
+ +{% highlight r %} +sparkR.session(appName = "StructuredNetworkWordCount") +{% endhighlight %} +
@@ -136,6 +143,22 @@ wordCounts = words.groupBy("word").count() This `lines` DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have used two built-in SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we use the function `alias` to name the new column as "word". Finally, we have defined the `wordCounts` DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream. + +
+ +{% highlight r %} +# Create DataFrame representing the stream of input lines from connection to localhost:9999 +lines <- read.stream("socket", host = "localhost", port = 9999) + +# Split the lines into words +words <- selectExpr(lines, "explode(split(value, ' ')) as word") + +# Generate running word count +wordCounts <- count(group_by(words, "word")) +{% endhighlight %} + +This `lines` SparkDataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have a SQL expression with two SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we name the new column as "word". Finally, we have defined the `wordCounts` DataFrame by grouping by the unique values in the SparkDataFrame and counting them. Note that this is a streaming SparkDataFrame which represents the running word counts of the stream. +
@@ -181,13 +204,23 @@ query = wordCounts \ query.awaitTermination() {% endhighlight %} + +
+ +{% highlight r %} +# Start running the query that prints the running counts to the console +query <- write.stream(wordCounts, "console", outputMode = "complete") + +awaitTermination(query) +{% endhighlight %} +
-After this code is executed, the streaming computation will have started in the background. The `query` object is a handle to that active streaming query, and we have decided to wait for the termination of the query using `query.awaitTermination()` to prevent the process from exiting while the query is active. +After this code is executed, the streaming computation will have started in the background. The `query` object is a handle to that active streaming query, and we have decided to wait for the termination of the query using `awaitTermination()` to prevent the process from exiting while the query is active. -To actually execute this example code, you can either compile the code in your own -[Spark application](quick-start.html#self-contained-applications), or simply +To actually execute this example code, you can either compile the code in your own +[Spark application](quick-start.html#self-contained-applications), or simply [run the example](index.html#running-the-examples-and-shell) once you have downloaded Spark. We are showing the latter. You will first need to run Netcat (a small utility found in most Unix-like systems) as a data server by using @@ -211,6 +244,11 @@ $ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetwor $ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999 {% endhighlight %} +
+{% highlight bash %} +$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999 +{% endhighlight %} +
Then, any lines typed in the terminal running the netcat server will be counted and printed on screen every second. It will look something like the following. @@ -325,6 +363,35 @@ Batch: 0 | spark| 1| +------+-----+ +------------------------------------------- +Batch: 1 +------------------------------------------- ++------+-----+ +| value|count| ++------+-----+ +|apache| 2| +| spark| 1| +|hadoop| 1| ++------+-----+ +... +{% endhighlight %} + +
+{% highlight bash %} +# TERMINAL 2: RUNNING structured_network_wordcount.R + +$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999 + +------------------------------------------- +Batch: 0 +------------------------------------------- ++------+-----+ +| value|count| ++------+-----+ +|apache| 1| +| spark| 1| ++------+-----+ + ------------------------------------------- Batch: 1 ------------------------------------------- @@ -345,62 +412,62 @@ Batch: 1 # Programming Model -The key idea in Structured Streaming is to treat a live data stream as a -table that is being continuously appended. This leads to a new stream -processing model that is very similar to a batch processing model. You will -express your streaming computation as standard batch-like query as on a static -table, and Spark runs it as an *incremental* query on the *unbounded* input +The key idea in Structured Streaming is to treat a live data stream as a +table that is being continuously appended. This leads to a new stream +processing model that is very similar to a batch processing model. You will +express your streaming computation as standard batch-like query as on a static +table, and Spark runs it as an *incremental* query on the *unbounded* input table. Let’s understand this model in more detail. ## Basic Concepts -Consider the input data stream as the "Input Table". Every data item that is +Consider the input data stream as the "Input Table". Every data item that is arriving on the stream is like a new row being appended to the Input Table. ![Stream as a Table](img/structured-streaming-stream-as-a-table.png "Stream as a Table") -A query on the input will generate the "Result Table". Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink. +A query on the input will generate the "Result Table". Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink. ![Model](img/structured-streaming-model.png) The "Output" is defined as what gets written out to the external storage. The output can be defined in a different mode: - - *Complete Mode* - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table. + - *Complete Mode* - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table. - *Append Mode* - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change. - + - *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn't contain aggregations, it will be equivalent to Append mode. Note that each mode is applicable on certain types of queries. This is discussed in detail [later](#output-modes). -To illustrate the use of this model, let’s understand the model in context of -the [Quick Example](#quick-example) above. The first `lines` DataFrame is the input table, and -the final `wordCounts` DataFrame is the result table. Note that the query on -streaming `lines` DataFrame to generate `wordCounts` is *exactly the same* as -it would be a static DataFrame. However, when this query is started, Spark -will continuously check for new data from the socket connection. If there is -new data, Spark will run an "incremental" query that combines the previous +To illustrate the use of this model, let’s understand the model in context of +the [Quick Example](#quick-example) above. The first `lines` DataFrame is the input table, and +the final `wordCounts` DataFrame is the result table. Note that the query on +streaming `lines` DataFrame to generate `wordCounts` is *exactly the same* as +it would be a static DataFrame. However, when this query is started, Spark +will continuously check for new data from the socket connection. If there is +new data, Spark will run an "incremental" query that combines the previous running counts with the new data to compute updated counts, as shown below. ![Model](img/structured-streaming-example-model.png) -This model is significantly different from many other stream processing -engines. Many streaming systems require the user to maintain running -aggregations themselves, thus having to reason about fault-tolerance, and -data consistency (at-least-once, or at-most-once, or exactly-once). In this -model, Spark is responsible for updating the Result Table when there is new -data, thus relieving the users from reasoning about it. As an example, let’s +This model is significantly different from many other stream processing +engines. Many streaming systems require the user to maintain running +aggregations themselves, thus having to reason about fault-tolerance, and +data consistency (at-least-once, or at-most-once, or exactly-once). In this +model, Spark is responsible for updating the Result Table when there is new +data, thus relieving the users from reasoning about it. As an example, let’s see how this model handles event-time based processing and late arriving data. ## Handling Event-time and Late Data Event-time is the time embedded in the data itself. For many applications, you may want to operate on this event-time. For example, if you want to get the number of events generated by IoT devices every minute, then you probably want to use the time when the data was generated (that is, event-time in the data), rather than the time Spark receives them. This event-time is very naturally expressed in this model -- each event from the devices is a row in the table, and event-time is a column value in the row. This allows window-based aggregations (e.g. number of events every minute) to be just a special type of grouping and aggregation on the event-time column -- each time window is a group and each row can belong to multiple windows/groups. Therefore, such event-time-window-based aggregation queries can be defined consistently on both a static dataset (e.g. from collected device events logs) as well as on a data stream, making the life of the user much easier. -Furthermore, this model naturally handles data that has arrived later than -expected based on its event-time. Since Spark is updating the Result Table, -it has full control over updating old aggregates when there is late data, +Furthermore, this model naturally handles data that has arrived later than +expected based on its event-time. Since Spark is updating the Result Table, +it has full control over updating old aggregates when there is late data, as well as cleaning up old aggregates to limit the size of intermediate -state data. Since Spark 2.1, we have support for watermarking which +state data. Since Spark 2.1, we have support for watermarking which allows the user to specify the threshold of late data, and allows the engine -to accordingly clean up old state. These are explained later in more +to accordingly clean up old state. These are explained later in more detail in the [Window Operations](#window-operations-on-event-time) section. ## Fault Tolerance Semantics @@ -409,14 +476,14 @@ to track the read position in the stream. The engine uses checkpointing and writ # API using Datasets and DataFrames Since Spark 2.0, DataFrames and Datasets can represent static, bounded data, as well as streaming, unbounded data. Similar to static Datasets/DataFrames, you can use the common entry point `SparkSession` -([Scala](api/scala/index.html#org.apache.spark.sql.SparkSession)/[Java](api/java/org/apache/spark/sql/SparkSession.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.SparkSession) docs) +([Scala](api/scala/index.html#org.apache.spark.sql.SparkSession)/[Java](api/java/org/apache/spark/sql/SparkSession.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.SparkSession)/[R](api/R/sparkR.session.html) docs) to create streaming DataFrames/Datasets from streaming sources, and apply the same operations on them as static DataFrames/Datasets. If you are not familiar with Datasets/DataFrames, you are strongly advised to familiarize yourself with them using the [DataFrame/Dataset Programming Guide](sql-programming-guide.html). ## Creating streaming DataFrames and streaming Datasets -Streaming DataFrames can be created through the `DataStreamReader` interface +Streaming DataFrames can be created through the `DataStreamReader` interface ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader)/[Java](api/java/org/apache/spark/sql/streaming/DataStreamReader.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader) docs) -returned by `SparkSession.readStream()`. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc. +returned by `SparkSession.readStream()`. In [R](api/R/read.stream.html), with the `read.stream()` method. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc. #### Input Sources In Spark 2.0, there are a few built-in sources. @@ -425,10 +492,10 @@ In Spark 2.0, there are a few built-in sources. - **Kafka source** - Poll data from Kafka. It's compatible with Kafka broker versions 0.10.0 or higher. See the [Kafka Integration Guide](structured-streaming-kafka-integration.html) for more details. - - **Socket source (for testing)** - Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees. + - **Socket source (for testing)** - Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees. -Some sources are not fault-tolerant because they do not guarantee that data can be replayed using -checkpointed offsets after a failure. See the earlier section on +Some sources are not fault-tolerant because they do not guarantee that data can be replayed using +checkpointed offsets after a failure. See the earlier section on [fault-tolerance semantics](#fault-tolerance-semantics). Here are the details of all the sources in Spark. @@ -445,7 +512,8 @@ Here are the details of all the sources in Spark. path: path to the input directory, and common to all file formats.

For file-format-specific options, see the related methods in DataStreamReader - (Scala/Java/Python). + (Scala/Java/Python/R). E.g. for "parquet" format options see DataStreamReader.parquet() Yes Supports glob paths, but does not support multiple comma-separated paths/globs. @@ -483,7 +551,7 @@ Here are some examples. {% highlight scala %} val spark: SparkSession = ... -// Read text from socket +// Read text from socket val socketDF = spark .readStream .format("socket") @@ -493,7 +561,7 @@ val socketDF = spark socketDF.isStreaming // Returns True for DataFrames that have streaming sources -socketDF.printSchema +socketDF.printSchema // Read all the csv files written atomically in a directory val userSchema = new StructType().add("name", "string").add("age", "integer") @@ -510,7 +578,7 @@ val csvDF = spark {% highlight java %} SparkSession spark = ... -// Read text from socket +// Read text from socket Dataset socketDF = spark .readStream() .format("socket") @@ -537,7 +605,7 @@ Dataset csvDF = spark {% highlight python %} spark = SparkSession. ... -# Read text from socket +# Read text from socket socketDF = spark \ .readStream \ .format("socket") \ @@ -547,7 +615,7 @@ socketDF = spark \ socketDF.isStreaming() # Returns True for DataFrames that have streaming sources -socketDF.printSchema() +socketDF.printSchema() # Read all the csv files written atomically in a directory userSchema = StructType().add("name", "string").add("age", "integer") @@ -558,6 +626,25 @@ csvDF = spark \ .csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory") {% endhighlight %} +
+
+ +{% highlight r %} +sparkR.session(...) + +# Read text from socket +socketDF <- read.stream("socket", host = hostname, port = port) + +isStreaming(socketDF) # Returns TRUE for SparkDataFrames that have streaming sources + +printSchema(socketDF) + +# Read all the csv files written atomically in a directory +schema <- structType(structField("name", "string"), + structField("age", "integer")) +csvDF <- read.stream("csv", path = "/path/to/directory", schema = schema, sep = ";") +{% endhighlight %} +
@@ -638,16 +725,28 @@ ds.groupByKey((MapFunction) value -> value.getDeviceType(), df = ... # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType } # Select the devices which have signal more than 10 -df.select("device").where("signal > 10") +df.select("device").where("signal > 10") # Running count of the number of updates for each device type df.groupBy("deviceType").count() {% endhighlight %} +
+ +{% highlight r %} +df <- ... # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType } + +# Select the devices which have signal more than 10 +select(where(df, "signal > 10"), "device") + +# Running count of the number of updates for each device type +count(groupBy(df, "deviceType")) +{% endhighlight %} +
### Window Operations on Event Time -Aggregations over a sliding event-time window are straightforward with Structured Streaming and are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let's understand this with an illustration. +Aggregations over a sliding event-time window are straightforward with Structured Streaming and are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let's understand this with an illustration. Imagine our [quick example](#quick-example) is modified and the stream now contains lines along with the time when the line was generated. Instead of running word counts, we want to count words within 10 minute windows, updating every 5 minutes. That is, word counts in words received between 10 minute windows 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20, etc. Note that 12:00 - 12:10 means data that arrived after 12:00 but before 12:10. Now, consider a word that was received at 12:07. This word should increment the counts corresponding to two windows 12:00 - 12:10 and 12:05 - 12:15. So the counts will be indexed by both, the grouping key (i.e. the word) and the window (can be calculated from the event-time). @@ -704,26 +803,26 @@ windowedCounts = words.groupBy( ### Handling Late Data and Watermarking Now consider what happens if one of the events arrives late to the application. -For example, say, a word generated at 12:04 (i.e. event time) could be received by +For example, say, a word generated at 12:04 (i.e. event time) could be received by the application at 12:11. The application should use the time 12:04 instead of 12:11 -to update the older counts for the window `12:00 - 12:10`. This occurs -naturally in our window-based grouping – Structured Streaming can maintain the intermediate state -for partial aggregates for a long period of time such that late data can update aggregates of +to update the older counts for the window `12:00 - 12:10`. This occurs +naturally in our window-based grouping – Structured Streaming can maintain the intermediate state +for partial aggregates for a long period of time such that late data can update aggregates of old windows correctly, as illustrated below. ![Handling Late Data](img/structured-streaming-late-data.png) -However, to run this query for days, it's necessary for the system to bound the amount of -intermediate in-memory state it accumulates. This means the system needs to know when an old -aggregate can be dropped from the in-memory state because the application is not going to receive -late data for that aggregate any more. To enable this, in Spark 2.1, we have introduced +However, to run this query for days, it's necessary for the system to bound the amount of +intermediate in-memory state it accumulates. This means the system needs to know when an old +aggregate can be dropped from the in-memory state because the application is not going to receive +late data for that aggregate any more. To enable this, in Spark 2.1, we have introduced **watermarking**, which lets the engine automatically track the current event time in the data -and attempt to clean up old state accordingly. You can define the watermark of a query by -specifying the event time column and the threshold on how late the data is expected to be in terms of +and attempt to clean up old state accordingly. You can define the watermark of a query by +specifying the event time column and the threshold on how late the data is expected to be in terms of event time. For a specific window starting at time `T`, the engine will maintain state and allow late -data to update the state until `(max event time seen by the engine - late threshold > T)`. -In other words, late data within the threshold will be aggregated, -but data later than the threshold will be dropped. Let's understand this with an example. We can +data to update the state until `(max event time seen by the engine - late threshold > T)`. +In other words, late data within the threshold will be aggregated, +but data later than the threshold will be dropped. Let's understand this with an example. We can easily define watermarking on the previous example using `withWatermark()` as shown below.
@@ -775,27 +874,27 @@ windowedCounts = words \
-In this example, we are defining the watermark of the query on the value of the column "timestamp", -and also defining "10 minutes" as the threshold of how late is the data allowed to be. If this query -is run in Update output mode (discussed later in [Output Modes](#output-modes) section), +In this example, we are defining the watermark of the query on the value of the column "timestamp", +and also defining "10 minutes" as the threshold of how late is the data allowed to be. If this query +is run in Update output mode (discussed later in [Output Modes](#output-modes) section), the engine will keep updating counts of a window in the Result Table until the window is older than the watermark, which lags behind the current event time in column "timestamp" by 10 minutes. -Here is an illustration. +Here is an illustration. ![Watermarking in Update Mode](img/structured-streaming-watermark-update-mode.png) -As shown in the illustration, the maximum event time tracked by the engine is the +As shown in the illustration, the maximum event time tracked by the engine is the *blue dashed line*, and the watermark set as `(max event time - '10 mins')` -at the beginning of every trigger is the red line For example, when the engine observes the data +at the beginning of every trigger is the red line For example, when the engine observes the data `(12:14, dog)`, it sets the watermark for the next trigger as `12:04`. This watermark lets the engine maintain intermediate state for additional 10 minutes to allow late data to be counted. For example, the data `(12:09, cat)` is out of order and late, and it falls in -windows `12:05 - 12:15` and `12:10 - 12:20`. Since, it is still ahead of the watermark `12:04` in -the trigger, the engine still maintains the intermediate counts as state and correctly updates the -counts of the related windows. However, when the watermark is updated to `12:11`, the intermediate -state for window `(12:00 - 12:10)` is cleared, and all subsequent data (e.g. `(12:04, donkey)`) -is considered "too late" and therefore ignored. Note that after every trigger, -the updated counts (i.e. purple rows) are written to sink as the trigger output, as dictated by +windows `12:05 - 12:15` and `12:10 - 12:20`. Since, it is still ahead of the watermark `12:04` in +the trigger, the engine still maintains the intermediate counts as state and correctly updates the +counts of the related windows. However, when the watermark is updated to `12:11`, the intermediate +state for window `(12:00 - 12:10)` is cleared, and all subsequent data (e.g. `(12:04, donkey)`) +is considered "too late" and therefore ignored. Note that after every trigger, +the updated counts (i.e. purple rows) are written to sink as the trigger output, as dictated by the Update mode. Some sinks (e.g. files) may not supported fine-grained updates that Update Mode requires. To work @@ -804,31 +903,31 @@ This is illustrated below. ![Watermarking in Append Mode](img/structured-streaming-watermark-append-mode.png) -Similar to the Update Mode earlier, the engine maintains intermediate counts for each window. +Similar to the Update Mode earlier, the engine maintains intermediate counts for each window. However, the partial counts are not updated to the Result Table and not written to sink. The engine -waits for "10 mins" for late date to be counted, +waits for "10 mins" for late date to be counted, then drops intermediate state of a window < watermark, and appends the final -counts to the Result Table/sink. For example, the final counts of window `12:00 - 12:10` is -appended to the Result Table only after the watermark is updated to `12:11`. +counts to the Result Table/sink. For example, the final counts of window `12:00 - 12:10` is +appended to the Result Table only after the watermark is updated to `12:11`. **Conditions for watermarking to clean aggregation state** -It is important to note that the following conditions must be satisfied for the watermarking to +It is important to note that the following conditions must be satisfied for the watermarking to clean the state in aggregation queries *(as of Spark 2.1.1, subject to change in the future)*. -- **Output mode must be Append or Update.** Complete mode requires all aggregate data to be preserved, -and hence cannot use watermarking to drop intermediate state. See the [Output Modes](#output-modes) +- **Output mode must be Append or Update.** Complete mode requires all aggregate data to be preserved, +and hence cannot use watermarking to drop intermediate state. See the [Output Modes](#output-modes) section for detailed explanation of the semantics of each output mode. -- The aggregation must have either the event-time column, or a `window` on the event-time column. +- The aggregation must have either the event-time column, or a `window` on the event-time column. -- `withWatermark` must be called on the -same column as the timestamp column used in the aggregate. For example, -`df.withWatermark("time", "1 min").groupBy("time2").count()` is invalid +- `withWatermark` must be called on the +same column as the timestamp column used in the aggregate. For example, +`df.withWatermark("time", "1 min").groupBy("time2").count()` is invalid in Append output mode, as watermark is defined on a different column from the aggregation column. -- `withWatermark` must be called before the aggregation for the watermark details to be used. -For example, `df.groupBy("time").count().withWatermark("time", "1 min")` is invalid in Append +- `withWatermark` must be called before the aggregation for the watermark details to be used. +For example, `df.groupBy("time").count().withWatermark("time", "1 min")` is invalid in Append output mode. @@ -840,7 +939,7 @@ Streaming DataFrames can be joined with static DataFrames to create new streamin {% highlight scala %} val staticDf = spark.read. ... -val streamingDf = spark.readStream. ... +val streamingDf = spark.readStream. ... streamingDf.join(staticDf, "type") // inner equi-join with a static DF streamingDf.join(staticDf, "type", "right_join") // right outer join with a static DF @@ -928,12 +1027,12 @@ streamingDf \ ### Arbitrary Stateful Operations -Many uscases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation `mapGroupsWithState` and the more powerful operation `flatMapGroupsWithState`. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.GroupState)/[Java](api/java/org/apache/spark/sql/streaming/GroupState.html)) and the examples ([Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java)). +Many uscases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation `mapGroupsWithState` and the more powerful operation `flatMapGroupsWithState`. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.GroupState)/[Java](api/java/org/apache/spark/sql/streaming/GroupState.html)) and the examples ([Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java)). ### Unsupported Operations -There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. +There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. Some of them are as follows. - + - Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets. - Limit and take first N rows are not supported on streaming Datasets. @@ -954,17 +1053,17 @@ Some of them are as follows. In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that). -- `count()` - Cannot return a single count from a streaming Dataset. Instead, use `ds.groupBy.count()` which returns a streaming Dataset containing a running count. +- `count()` - Cannot return a single count from a streaming Dataset. Instead, use `ds.groupBy.count()` which returns a streaming Dataset containing a running count. - `foreach()` - Instead use `ds.writeStream.foreach(...)` (see next section). - `show()` - Instead use the console sink (see next section). If you try any of these operations, you will see an `AnalysisException` like "operation XYZ is not supported with streaming DataFrames/Datasets". -While some of them may be supported in future releases of Spark, -there are others which are fundamentally hard to implement on streaming data efficiently. -For example, sorting on the input stream is not supported, as it requires keeping -track of all the data received in the stream. This is therefore fundamentally hard to execute +While some of them may be supported in future releases of Spark, +there are others which are fundamentally hard to implement on streaming data efficiently. +For example, sorting on the input stream is not supported, as it requires keeping +track of all the data received in the stream. This is therefore fundamentally hard to execute efficiently. ## Starting Streaming Queries @@ -972,7 +1071,7 @@ Once you have defined the final result DataFrame/Dataset, all that is left is fo ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter)/[Java](api/java/org/apache/spark/sql/streaming/DataStreamWriter.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamWriter) docs) returned through `Dataset.writeStream()`. You will have to specify one or more of the following in this interface. -- *Details of the output sink:* Data format, location, etc. +- *Details of the output sink:* Data format, location, etc. - *Output mode:* Specify what gets written to the output sink. @@ -985,19 +1084,19 @@ returned through `Dataset.writeStream()`. You will have to specify one or more o #### Output Modes There are a few types of output modes. -- **Append mode (default)** - This is the default mode, where only the -new rows added to the Result Table since the last trigger will be -outputted to the sink. This is supported for only those queries where -rows added to the Result Table is never going to change. Hence, this mode -guarantees that each row will be output only once (assuming -fault-tolerant sink). For example, queries with only `select`, +- **Append mode (default)** - This is the default mode, where only the +new rows added to the Result Table since the last trigger will be +outputted to the sink. This is supported for only those queries where +rows added to the Result Table is never going to change. Hence, this mode +guarantees that each row will be output only once (assuming +fault-tolerant sink). For example, queries with only `select`, `where`, `map`, `flatMap`, `filter`, `join`, etc. will support Append mode. - **Complete mode** - The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries. -- **Update mode** - (*Available since Spark 2.1.1*) Only the rows in the Result Table that were -updated since the last trigger will be outputted to the sink. +- **Update mode** - (*Available since Spark 2.1.1*) Only the rows in the Result Table that were +updated since the last trigger will be outputted to the sink. More information to be added in future releases. Different types of streaming queries support different output modes. @@ -1015,9 +1114,9 @@ Here is the compatibility matrix. Aggregation on event-time with watermark Append, Update, Complete - Append mode uses watermark to drop old aggregation state. But the output of a + Append mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in `withWatermark()` as by - the modes semantics, rows can be added to the Result Table only once after they are + the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed). See the Late Data section for more details.

@@ -1031,10 +1130,10 @@ Here is the compatibility matrix. Other aggregations Complete, Update - Since no watermark is defined (only defined in other category), + Since no watermark is defined (only defined in other category), old aggregation state is not dropped.

- Append mode is not supported as aggregates can update thus violating the semantics of + Append mode is not supported as aggregates can update thus violating the semantics of this mode. @@ -1077,7 +1176,7 @@ Here is the compatibility matrix. #### Output Sinks There are a few types of built-in output sinks. -- **File sink** - Stores the output to a directory. +- **File sink** - Stores the output to a directory. {% highlight scala %} writeStream @@ -1114,9 +1213,9 @@ writeStream .start() {% endhighlight %} -Some sinks are not fault-tolerant because they do not guarantee persistence of the output and are -meant for debugging purposes only. See the earlier section on -[fault-tolerance semantics](#fault-tolerance-semantics). +Some sinks are not fault-tolerant because they do not guarantee persistence of the output and are +meant for debugging purposes only. See the earlier section on +[fault-tolerance semantics](#fault-tolerance-semantics). Here are the details of all the sinks in Spark. @@ -1145,7 +1244,8 @@ Here are the details of all the sinks in Spark. · "s3a://a/b/c/dataset.txt"

For file-format-specific options, see the related methods in DataFrameWriter - (Scala/Java/Python). + (Scala/Java/Python/R). E.g. for "parquet" format options see DataFrameWriter.parquet() @@ -1208,7 +1308,7 @@ noAggDF .option("checkpointLocation", "path/to/checkpoint/dir") .option("path", "path/to/destination/dir") .start() - + // ========== DF with aggregation ========== val aggDF = df.groupBy("device").count() @@ -1219,7 +1319,7 @@ aggDF .format("console") .start() -// Have all the aggregates in an in-memory table +// Have all the aggregates in an in-memory table aggDF .writeStream .queryName("aggregates") // this query name will be the table name @@ -1250,7 +1350,7 @@ noAggDF .option("checkpointLocation", "path/to/checkpoint/dir") .option("path", "path/to/destination/dir") .start(); - + // ========== DF with aggregation ========== Dataset aggDF = df.groupBy("device").count(); @@ -1261,7 +1361,7 @@ aggDF .format("console") .start(); -// Have all the aggregates in an in-memory table +// Have all the aggregates in an in-memory table aggDF .writeStream() .queryName("aggregates") // this query name will be the table name @@ -1292,7 +1392,7 @@ noAggDF \ .option("checkpointLocation", "path/to/checkpoint/dir") \ .option("path", "path/to/destination/dir") \ .start() - + # ========== DF with aggregation ========== aggDF = df.groupBy("device").count() @@ -1314,6 +1414,35 @@ aggDF \ spark.sql("select * from aggregates").show() # interactively query in-memory table {% endhighlight %} + +
+ +{% highlight r %} +# ========== DF with no aggregations ========== +noAggDF <- select(where(deviceDataDf, "signal > 10"), "device") + +# Print new data to console +write.stream(noAggDF, "console") + +# Write new data to Parquet files +write.stream(noAggDF, + "parquet", + path = "path/to/destination/dir", + checkpointLocation = "path/to/checkpoint/dir") + +# ========== DF with aggregation ========== +aggDF <- count(groupBy(df, "device")) + +# Print updated aggregations to console +write.stream(aggDF, "console", outputMode = "complete") + +# Have all the aggregates in an in memory table. The query name will be the table name +write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete") + +# Interactively query in-memory table +head(sql("select * from aggregates")) +{% endhighlight %} +
@@ -1330,12 +1459,12 @@ which has methods that get called whenever there is a sequence of rows generated - `version` and `partition` are two parameters in `open` that uniquely represent a set of rows that needs to be pushed out. `version` is a monotonically increasing id that increases with every trigger. `partition` is an id that represents a partition of the output, since the output is distributed and will be processed on multiple executors. -- `open` can use the `version` and `partition` to choose whether it needs to write the sequence of rows. Accordingly, it can return `true` (proceed with writing), or `false` (no need to write). If `false` is returned, then `process` will not be called on any row. For example, after a partial failure, some of the output partitions of the failed trigger may have already been committed to a database. Based on metadata stored in the database, the writer can identify partitions that have already been committed and accordingly return false to skip committing them again. +- `open` can use the `version` and `partition` to choose whether it needs to write the sequence of rows. Accordingly, it can return `true` (proceed with writing), or `false` (no need to write). If `false` is returned, then `process` will not be called on any row. For example, after a partial failure, some of the output partitions of the failed trigger may have already been committed to a database. Based on metadata stored in the database, the writer can identify partitions that have already been committed and accordingly return false to skip committing them again. - Whenever `open` is called, `close` will also be called (unless the JVM exits due to some error). This is true even if `open` returns false. If there is any error in processing and writing the data, `close` will be called with the error. It is your responsibility to clean up state (e.g. connections, transactions, etc.) that have been created in `open` such that there are no resource leaks. ## Managing Streaming Queries -The `StreamingQuery` object created when a query is started can be used to monitor and manage the query. +The `StreamingQuery` object created when a query is started can be used to monitor and manage the query.
@@ -1351,7 +1480,7 @@ query.name // get the name of the auto-generated or user-specified name query.explain() // print detailed explanations of the query -query.stop() // stop the query +query.stop() // stop the query query.awaitTermination() // block until query is terminated, with stop() or with error @@ -1403,7 +1532,7 @@ query.name() # get the name of the auto-generated or user-specified name query.explain() # print detailed explanations of the query -query.stop() # stop the query +query.stop() # stop the query query.awaitTermination() # block until query is terminated, with stop() or with error @@ -1415,6 +1544,24 @@ query.lastProgress() # the most recent progress update of this streaming quer {% endhighlight %} +
+
+ +{% highlight r %} +query <- write.stream(df, "console") # get the query object + +queryName(query) # get the name of the auto-generated or user-specified name + +explain(query) # print detailed explanations of the query + +stopQuery(query) # stop the query + +awaitTermination(query) # block until query is terminated, with stop() or with error + +lastProgress(query) # the most recent progress update of this streaming query + +{% endhighlight %} +
@@ -1461,28 +1608,34 @@ spark.streams().get(id) # get a query object by its unique id spark.streams().awaitAnyTermination() # block until any one of them terminates {% endhighlight %} + +
+{% highlight bash %} +Not available in R. +{% endhighlight %} +
## Monitoring Streaming Queries -There are two APIs for monitoring and debugging active queries - +There are two APIs for monitoring and debugging active queries - interactively and asynchronously. ### Interactive APIs -You can directly get the current status and metrics of an active query using -`streamingQuery.lastProgress()` and `streamingQuery.status()`. -`lastProgress()` returns a `StreamingQueryProgress` object -in [Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryProgress) +You can directly get the current status and metrics of an active query using +`streamingQuery.lastProgress()` and `streamingQuery.status()`. +`lastProgress()` returns a `StreamingQueryProgress` object +in [Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryProgress) and [Java](api/java/org/apache/spark/sql/streaming/StreamingQueryProgress.html) and a dictionary with the same fields in Python. It has all the information about -the progress made in the last trigger of the stream - what data was processed, -what were the processing rates, latencies, etc. There is also +the progress made in the last trigger of the stream - what data was processed, +what were the processing rates, latencies, etc. There is also `streamingQuery.recentProgress` which returns an array of last few progresses. -In addition, `streamingQuery.status()` returns a `StreamingQueryStatus` object -in [Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryStatus) +In addition, `streamingQuery.status()` returns a `StreamingQueryStatus` object +in [Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryStatus) and [Java](api/java/org/apache/spark/sql/streaming/StreamingQueryStatus.html) and a dictionary with the same fields in Python. It gives information about what the query is immediately doing - is a trigger active, is data being processed, etc. @@ -1637,13 +1790,65 @@ Will print something like the following. ''' print(query.status) -''' +''' Will print something like the following. {u'message': u'Waiting for data to arrive', u'isTriggerActive': False, u'isDataAvailable': False} ''' {% endhighlight %} + +
+ +{% highlight r %} +query <- ... # a StreamingQuery +lastProgress(query) + +''' +Will print something like the following. + +{ + "id" : "8c57e1ec-94b5-4c99-b100-f694162df0b9", + "runId" : "ae505c5a-a64e-4896-8c28-c7cbaf926f16", + "name" : null, + "timestamp" : "2017-04-26T08:27:28.835Z", + "numInputRows" : 0, + "inputRowsPerSecond" : 0.0, + "processedRowsPerSecond" : 0.0, + "durationMs" : { + "getOffset" : 0, + "triggerExecution" : 1 + }, + "stateOperators" : [ { + "numRowsTotal" : 4, + "numRowsUpdated" : 0 + } ], + "sources" : [ { + "description" : "TextSocketSource[host: localhost, port: 9999]", + "startOffset" : 1, + "endOffset" : 1, + "numInputRows" : 0, + "inputRowsPerSecond" : 0.0, + "processedRowsPerSecond" : 0.0 + } ], + "sink" : { + "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@76b37531" + } +} +''' + +status(query) +''' +Will print something like the following. + +{ + "message" : "Waiting for data to arrive", + "isDataAvailable" : false, + "isTriggerActive" : false +} +''' +{% endhighlight %} +
@@ -1703,11 +1908,17 @@ spark.streams().addListener(new StreamingQueryListener() { Not available in Python. {% endhighlight %} + +
+{% highlight bash %} +Not available in R. +{% endhighlight %} +
-## Recovering from Failures with Checkpointing -In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the [quick example](#quick-example)) to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when [starting a query](#starting-streaming-queries). +## Recovering from Failures with Checkpointing +In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the [quick example](#quick-example)) to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when [starting a query](#starting-streaming-queries).
@@ -1745,20 +1956,18 @@ aggDF \ .start() {% endhighlight %} +
+
+ +{% highlight r %} +write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "path/to/HDFS/dir") +{% endhighlight %} +
# Where to go from here -- Examples: See and run the -[Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming)/[Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/sql/streaming)/[Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/sql/streaming) +- Examples: See and run the +[Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming)/[Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/sql/streaming)/[Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/sql/streaming)/[R]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/r/streaming) examples. - Spark Summit 2016 Talk - [A Deep Dive into Structured Streaming](https://spark-summit.org/2016/events/a-deep-dive-into-structured-streaming/) - - - - - - - - - diff --git a/examples/src/main/r/streaming/structured_network_wordcount.R b/examples/src/main/r/streaming/structured_network_wordcount.R new file mode 100644 index 000000000000..cda18ebc072e --- /dev/null +++ b/examples/src/main/r/streaming/structured_network_wordcount.R @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Counts words in UTF8 encoded, '\n' delimited text received from the network. + +# To run this on your local machine, you need to first run a Netcat server +# $ nc -lk 9999 +# and then run the example +# ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999 + +# Load SparkR library into your R session +library(SparkR) + +# Initialize SparkSession +sparkR.session(appName = "SparkR-Streaming-structured-network-wordcount-example") + +args <- commandArgs(trailing = TRUE) + +if (length(args) != 2) { + print("Usage: structured_network_wordcount.R ") + print(" and describe the TCP server that Structured Streaming") + print("would connect to receive data.") + q("no") +} + +hostname <- args[[1]] +port <- as.integer(args[[2]]) + +# Create DataFrame representing the stream of input lines from connection to localhost:9999 +lines <- read.stream("socket", host = hostname, port = port) + +# Split the lines into words +words <- selectExpr(lines, "explode(split(value, ' ')) as word") + +# Generate running word count +wordCounts <- count(groupBy(words, "word")) + +# Start running the query that prints the running counts to the console +query <- write.stream(wordCounts, "console", outputMode = "complete") + +awaitTermination(query) + +sparkR.session.stop() From 707824eb054d2b83bef0d98ee434fa141ccc4a88 Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Sun, 30 Apr 2017 13:47:29 -0700 Subject: [PATCH 2/7] doc --- R/pkg/vignettes/sparkr-vignettes.Rmd | 79 +++++++++++++++++++++++++--- docs/sparkr.md | 4 ++ 2 files changed, 77 insertions(+), 6 deletions(-) diff --git a/R/pkg/vignettes/sparkr-vignettes.Rmd b/R/pkg/vignettes/sparkr-vignettes.Rmd index 4b9d6c380609..6294726ed4ac 100644 --- a/R/pkg/vignettes/sparkr-vignettes.Rmd +++ b/R/pkg/vignettes/sparkr-vignettes.Rmd @@ -182,7 +182,7 @@ head(df) ``` ### Data Sources -SparkR supports operating on a variety of data sources through the `SparkDataFrame` interface. You can check the Spark SQL programming guide for more [specific options](https://spark.apache.org/docs/latest/sql-programming-guide.html#manually-specifying-options) that are available for the built-in data sources. +SparkR supports operating on a variety of data sources through the `SparkDataFrame` interface. You can check the Spark SQL Programming Guide for more [specific options](https://spark.apache.org/docs/latest/sql-programming-guide.html#manually-specifying-options) that are available for the built-in data sources. The general method for creating `SparkDataFrame` from data sources is `read.df`. This method takes in the path for the file to load and the type of data source, and the currently active Spark Session will be used automatically. SparkR supports reading CSV, JSON and Parquet files natively and through Spark Packages you can find data source connectors for popular file formats like Avro. These packages can be added with `sparkPackages` parameter when initializing SparkSession using `sparkR.session`. @@ -232,7 +232,7 @@ write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite" ``` ### Hive Tables -You can also create SparkDataFrames from Hive tables. To do this we will need to create a SparkSession with Hive support which can access tables in the Hive MetaStore. Note that Spark should have been built with Hive support and more details can be found in the [SQL programming guide](https://spark.apache.org/docs/latest/sql-programming-guide.html). In SparkR, by default it will attempt to create a SparkSession with Hive support enabled (`enableHiveSupport = TRUE`). +You can also create SparkDataFrames from Hive tables. To do this we will need to create a SparkSession with Hive support which can access tables in the Hive MetaStore. Note that Spark should have been built with Hive support and more details can be found in the [SQL Programming Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html). In SparkR, by default it will attempt to create a SparkSession with Hive support enabled (`enableHiveSupport = TRUE`). ```{r, eval=FALSE} sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") @@ -314,7 +314,7 @@ Use `cube` or `rollup` to compute subtotals across multiple dimensions. mean(cube(carsDF, "cyl", "gear", "am"), "mpg") ``` -generates groupings for {(`cyl`, `gear`, `am`), (`cyl`, `gear`), (`cyl`), ()}, while +generates groupings for {(`cyl`, `gear`, `am`), (`cyl`, `gear`), (`cyl`), ()}, while ```{r} mean(rollup(carsDF, "cyl", "gear", "am"), "mpg") @@ -672,6 +672,7 @@ head(select(naiveBayesPrediction, "Class", "Sex", "Age", "Survived", "prediction Survival analysis studies the expected duration of time until an event happens, and often the relationship with risk factors or treatment taken on the subject. In contrast to standard regression analysis, survival modeling has to deal with special characteristics in the data including non-negative survival time and censoring. Accelerated Failure Time (AFT) model is a parametric survival model for censored data that assumes the effect of a covariate is to accelerate or decelerate the life course of an event by some constant. For more information, refer to the Wikipedia page [AFT Model](https://en.wikipedia.org/wiki/Accelerated_failure_time_model) and the references there. Different from a [Proportional Hazards Model](https://en.wikipedia.org/wiki/Proportional_hazards_model) designed for the same purpose, the AFT model is easier to parallelize because each instance contributes to the objective function independently. + ```{r, warning=FALSE} library(survival) ovarianDF <- createDataFrame(ovarian) @@ -902,7 +903,7 @@ perplexity There are multiple options that can be configured in `spark.als`, including `rank`, `reg`, `nonnegative`. For a complete list, refer to the help file. -```{r} +```{r, eval=FALSE} ratings <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0), list(2, 1, 1.0), list(2, 2, 5.0)) df <- createDataFrame(ratings, c("user", "item", "rating")) @@ -910,7 +911,7 @@ model <- spark.als(df, "rating", "user", "item", rank = 10, reg = 0.1, nonnegati ``` Extract latent factors. -```{r} +```{r, eval=FALSE} stats <- summary(model) userFactors <- stats$userFactors itemFactors <- stats$itemFactors @@ -920,7 +921,7 @@ head(itemFactors) Make predictions. -```{r} +```{r, eval=FALSE} predicted <- predict(model, df) head(predicted) ``` @@ -1002,6 +1003,72 @@ unlink(modelPath) ``` +## Structured Streaming + +SparkR supports the Structured Streaming API (experimental). + +You can check the Structured Streaming Programming Guide for [an introduction](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#programming-model) to its programming model and basic concepts. + +### Simple Source and Sink + +Spark has a few built-in input sources. As an example, to test with a socket source reading text into words and displaying the computed word counts: + +```{r, eval=FALSE} +# Create DataFrame representing the stream of input lines from connection +lines <- read.stream("socket", host = hostname, port = port) + +# Split the lines into words +words <- selectExpr(lines, "explode(split(value, ' ')) as word") + +# Generate running word count +wordCounts <- count(groupBy(words, "word")) + +# Start running the query that prints the running counts to the console +query <- write.stream(wordCounts, "console", outputMode = "complete") +``` + +### Kafka Source + +It is simple to read data from Kafka. For more information, see [Input Sources](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources) supported by Structured Streaming. + +```{r, eval=FALSE} +topic <- read.stream("kafka", + kafka.bootstrap.servers = "host1:port1,host2:port2", + subscribe = "topic1") +keyvalue <- selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") +``` + +### Operations and Sinks + +Most of the common operations on `SparkDataFrame` are supported for streaming, including selection, projection, and aggregation. Once you have defined the final result, to start the streaming computation, you will call the `write.stream` method setting a sink and `outputMode`. + +A streaming `SparkDataFrame` can be written for debugging to the console, to a temporary in-memory table, or for further processing in a fault-tolerant manner to the File Sink in different formats. + +```{r, eval=FALSE} +noAggDF <- select(where(deviceDataDf, "signal > 10"), "device") + +# Print new data to console +write.stream(noAggDF, "console") + +# Write new data to Parquet files +write.stream(noAggDF, + "parquet", + path = "path/to/destination/dir", + checkpointLocation = "path/to/checkpoint/dir") + +# Aggregate +aggDF <- count(groupBy(noAggDF, "device")) + +# Print updated aggregations to console +write.stream(aggDF, "console", outputMode = "complete") + +# Have all the aggregates in an in memory table. The query name will be the table name +write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete") + +head(sql("select * from aggregates")) +``` + + ## Advanced Topics ### SparkR Object Classes diff --git a/docs/sparkr.md b/docs/sparkr.md index e015ab260fca..de6a382eabcb 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -587,6 +587,10 @@ The following example shows how to save/load a MLlib model by SparkR.
Yes
+# Structured Streaming + +SparkR supports the Structured Streaming API (experimental). Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. For more information see the R API on the [Structured Streaming Programming Guide](structured-streaming-programming-guide.html) + # R Function Name Conflicts When loading and attaching a new package in R, it is possible to have a name [conflict](https://stat.ethz.ch/R-manual/R-devel/library/base/html/library.html), where a From 8cea361ee6dc2f006603d7f3a7ad9c5ae1ee0b85 Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Sun, 30 Apr 2017 16:03:07 -0700 Subject: [PATCH 3/7] update --- R/pkg/vignettes/sparkr-vignettes.Rmd | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/R/pkg/vignettes/sparkr-vignettes.Rmd b/R/pkg/vignettes/sparkr-vignettes.Rmd index 6294726ed4ac..d38ec4f1b6f3 100644 --- a/R/pkg/vignettes/sparkr-vignettes.Rmd +++ b/R/pkg/vignettes/sparkr-vignettes.Rmd @@ -1035,17 +1035,17 @@ It is simple to read data from Kafka. For more information, see [Input Sources]( topic <- read.stream("kafka", kafka.bootstrap.servers = "host1:port1,host2:port2", subscribe = "topic1") -keyvalue <- selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") +keyvalue <- selectExpr(topic, "CAST(key AS STRING)", "CAST(value AS STRING)") ``` ### Operations and Sinks Most of the common operations on `SparkDataFrame` are supported for streaming, including selection, projection, and aggregation. Once you have defined the final result, to start the streaming computation, you will call the `write.stream` method setting a sink and `outputMode`. -A streaming `SparkDataFrame` can be written for debugging to the console, to a temporary in-memory table, or for further processing in a fault-tolerant manner to the File Sink in different formats. +A streaming `SparkDataFrame` can be written for debugging to the console, to a temporary in-memory table, or for further processing in a fault-tolerant manner to a File Sink in different formats. ```{r, eval=FALSE} -noAggDF <- select(where(deviceDataDf, "signal > 10"), "device") +noAggDF <- select(where(deviceDataStreamingDf, "signal > 10"), "device") # Print new data to console write.stream(noAggDF, "console") From 5953667ea86aa6bb0dbf18dcef4f49f5fc562e05 Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Sun, 30 Apr 2017 16:08:48 -0700 Subject: [PATCH 4/7] reverse space change --- .../structured-streaming-programming-guide.md | 188 +++++++++--------- 1 file changed, 94 insertions(+), 94 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index a8810e132486..c038afddd189 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -28,7 +28,7 @@ val spark = SparkSession .builder .appName("StructuredNetworkWordCount") .getOrCreate() - + import spark.implicits._ {% endhighlight %} @@ -412,62 +412,62 @@ Batch: 1 # Programming Model -The key idea in Structured Streaming is to treat a live data stream as a -table that is being continuously appended. This leads to a new stream -processing model that is very similar to a batch processing model. You will -express your streaming computation as standard batch-like query as on a static -table, and Spark runs it as an *incremental* query on the *unbounded* input +The key idea in Structured Streaming is to treat a live data stream as a +table that is being continuously appended. This leads to a new stream +processing model that is very similar to a batch processing model. You will +express your streaming computation as standard batch-like query as on a static +table, and Spark runs it as an *incremental* query on the *unbounded* input table. Let’s understand this model in more detail. ## Basic Concepts -Consider the input data stream as the "Input Table". Every data item that is +Consider the input data stream as the "Input Table". Every data item that is arriving on the stream is like a new row being appended to the Input Table. ![Stream as a Table](img/structured-streaming-stream-as-a-table.png "Stream as a Table") -A query on the input will generate the "Result Table". Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink. +A query on the input will generate the "Result Table". Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink. ![Model](img/structured-streaming-model.png) The "Output" is defined as what gets written out to the external storage. The output can be defined in a different mode: - - *Complete Mode* - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table. + - *Complete Mode* - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table. - *Append Mode* - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change. - + - *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn't contain aggregations, it will be equivalent to Append mode. Note that each mode is applicable on certain types of queries. This is discussed in detail [later](#output-modes). -To illustrate the use of this model, let’s understand the model in context of -the [Quick Example](#quick-example) above. The first `lines` DataFrame is the input table, and -the final `wordCounts` DataFrame is the result table. Note that the query on -streaming `lines` DataFrame to generate `wordCounts` is *exactly the same* as -it would be a static DataFrame. However, when this query is started, Spark -will continuously check for new data from the socket connection. If there is -new data, Spark will run an "incremental" query that combines the previous +To illustrate the use of this model, let’s understand the model in context of +the [Quick Example](#quick-example) above. The first `lines` DataFrame is the input table, and +the final `wordCounts` DataFrame is the result table. Note that the query on +streaming `lines` DataFrame to generate `wordCounts` is *exactly the same* as +it would be a static DataFrame. However, when this query is started, Spark +will continuously check for new data from the socket connection. If there is +new data, Spark will run an "incremental" query that combines the previous running counts with the new data to compute updated counts, as shown below. ![Model](img/structured-streaming-example-model.png) -This model is significantly different from many other stream processing -engines. Many streaming systems require the user to maintain running -aggregations themselves, thus having to reason about fault-tolerance, and -data consistency (at-least-once, or at-most-once, or exactly-once). In this -model, Spark is responsible for updating the Result Table when there is new -data, thus relieving the users from reasoning about it. As an example, let’s +This model is significantly different from many other stream processing +engines. Many streaming systems require the user to maintain running +aggregations themselves, thus having to reason about fault-tolerance, and +data consistency (at-least-once, or at-most-once, or exactly-once). In this +model, Spark is responsible for updating the Result Table when there is new +data, thus relieving the users from reasoning about it. As an example, let’s see how this model handles event-time based processing and late arriving data. ## Handling Event-time and Late Data Event-time is the time embedded in the data itself. For many applications, you may want to operate on this event-time. For example, if you want to get the number of events generated by IoT devices every minute, then you probably want to use the time when the data was generated (that is, event-time in the data), rather than the time Spark receives them. This event-time is very naturally expressed in this model -- each event from the devices is a row in the table, and event-time is a column value in the row. This allows window-based aggregations (e.g. number of events every minute) to be just a special type of grouping and aggregation on the event-time column -- each time window is a group and each row can belong to multiple windows/groups. Therefore, such event-time-window-based aggregation queries can be defined consistently on both a static dataset (e.g. from collected device events logs) as well as on a data stream, making the life of the user much easier. -Furthermore, this model naturally handles data that has arrived later than -expected based on its event-time. Since Spark is updating the Result Table, -it has full control over updating old aggregates when there is late data, +Furthermore, this model naturally handles data that has arrived later than +expected based on its event-time. Since Spark is updating the Result Table, +it has full control over updating old aggregates when there is late data, as well as cleaning up old aggregates to limit the size of intermediate -state data. Since Spark 2.1, we have support for watermarking which +state data. Since Spark 2.1, we have support for watermarking which allows the user to specify the threshold of late data, and allows the engine -to accordingly clean up old state. These are explained later in more +to accordingly clean up old state. These are explained later in more detail in the [Window Operations](#window-operations-on-event-time) section. ## Fault Tolerance Semantics @@ -492,10 +492,10 @@ In Spark 2.0, there are a few built-in sources. - **Kafka source** - Poll data from Kafka. It's compatible with Kafka broker versions 0.10.0 or higher. See the [Kafka Integration Guide](structured-streaming-kafka-integration.html) for more details. - - **Socket source (for testing)** - Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees. + - **Socket source (for testing)** - Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees. -Some sources are not fault-tolerant because they do not guarantee that data can be replayed using -checkpointed offsets after a failure. See the earlier section on +Some sources are not fault-tolerant because they do not guarantee that data can be replayed using +checkpointed offsets after a failure. See the earlier section on [fault-tolerance semantics](#fault-tolerance-semantics). Here are the details of all the sources in Spark. @@ -803,26 +803,26 @@ windowedCounts = words.groupBy( ### Handling Late Data and Watermarking Now consider what happens if one of the events arrives late to the application. -For example, say, a word generated at 12:04 (i.e. event time) could be received by +For example, say, a word generated at 12:04 (i.e. event time) could be received by the application at 12:11. The application should use the time 12:04 instead of 12:11 -to update the older counts for the window `12:00 - 12:10`. This occurs -naturally in our window-based grouping – Structured Streaming can maintain the intermediate state -for partial aggregates for a long period of time such that late data can update aggregates of +to update the older counts for the window `12:00 - 12:10`. This occurs +naturally in our window-based grouping – Structured Streaming can maintain the intermediate state +for partial aggregates for a long period of time such that late data can update aggregates of old windows correctly, as illustrated below. ![Handling Late Data](img/structured-streaming-late-data.png) -However, to run this query for days, it's necessary for the system to bound the amount of -intermediate in-memory state it accumulates. This means the system needs to know when an old -aggregate can be dropped from the in-memory state because the application is not going to receive -late data for that aggregate any more. To enable this, in Spark 2.1, we have introduced +However, to run this query for days, it's necessary for the system to bound the amount of +intermediate in-memory state it accumulates. This means the system needs to know when an old +aggregate can be dropped from the in-memory state because the application is not going to receive +late data for that aggregate any more. To enable this, in Spark 2.1, we have introduced **watermarking**, which lets the engine automatically track the current event time in the data -and attempt to clean up old state accordingly. You can define the watermark of a query by -specifying the event time column and the threshold on how late the data is expected to be in terms of +and attempt to clean up old state accordingly. You can define the watermark of a query by +specifying the event time column and the threshold on how late the data is expected to be in terms of event time. For a specific window starting at time `T`, the engine will maintain state and allow late -data to update the state until `(max event time seen by the engine - late threshold > T)`. -In other words, late data within the threshold will be aggregated, -but data later than the threshold will be dropped. Let's understand this with an example. We can +data to update the state until `(max event time seen by the engine - late threshold > T)`. +In other words, late data within the threshold will be aggregated, +but data later than the threshold will be dropped. Let's understand this with an example. We can easily define watermarking on the previous example using `withWatermark()` as shown below.
@@ -874,27 +874,27 @@ windowedCounts = words \
-In this example, we are defining the watermark of the query on the value of the column "timestamp", -and also defining "10 minutes" as the threshold of how late is the data allowed to be. If this query -is run in Update output mode (discussed later in [Output Modes](#output-modes) section), +In this example, we are defining the watermark of the query on the value of the column "timestamp", +and also defining "10 minutes" as the threshold of how late is the data allowed to be. If this query +is run in Update output mode (discussed later in [Output Modes](#output-modes) section), the engine will keep updating counts of a window in the Result Table until the window is older than the watermark, which lags behind the current event time in column "timestamp" by 10 minutes. -Here is an illustration. +Here is an illustration. ![Watermarking in Update Mode](img/structured-streaming-watermark-update-mode.png) -As shown in the illustration, the maximum event time tracked by the engine is the +As shown in the illustration, the maximum event time tracked by the engine is the *blue dashed line*, and the watermark set as `(max event time - '10 mins')` -at the beginning of every trigger is the red line For example, when the engine observes the data +at the beginning of every trigger is the red line For example, when the engine observes the data `(12:14, dog)`, it sets the watermark for the next trigger as `12:04`. This watermark lets the engine maintain intermediate state for additional 10 minutes to allow late data to be counted. For example, the data `(12:09, cat)` is out of order and late, and it falls in -windows `12:05 - 12:15` and `12:10 - 12:20`. Since, it is still ahead of the watermark `12:04` in -the trigger, the engine still maintains the intermediate counts as state and correctly updates the -counts of the related windows. However, when the watermark is updated to `12:11`, the intermediate -state for window `(12:00 - 12:10)` is cleared, and all subsequent data (e.g. `(12:04, donkey)`) -is considered "too late" and therefore ignored. Note that after every trigger, -the updated counts (i.e. purple rows) are written to sink as the trigger output, as dictated by +windows `12:05 - 12:15` and `12:10 - 12:20`. Since, it is still ahead of the watermark `12:04` in +the trigger, the engine still maintains the intermediate counts as state and correctly updates the +counts of the related windows. However, when the watermark is updated to `12:11`, the intermediate +state for window `(12:00 - 12:10)` is cleared, and all subsequent data (e.g. `(12:04, donkey)`) +is considered "too late" and therefore ignored. Note that after every trigger, +the updated counts (i.e. purple rows) are written to sink as the trigger output, as dictated by the Update mode. Some sinks (e.g. files) may not supported fine-grained updates that Update Mode requires. To work @@ -903,31 +903,31 @@ This is illustrated below. ![Watermarking in Append Mode](img/structured-streaming-watermark-append-mode.png) -Similar to the Update Mode earlier, the engine maintains intermediate counts for each window. +Similar to the Update Mode earlier, the engine maintains intermediate counts for each window. However, the partial counts are not updated to the Result Table and not written to sink. The engine -waits for "10 mins" for late date to be counted, +waits for "10 mins" for late date to be counted, then drops intermediate state of a window < watermark, and appends the final -counts to the Result Table/sink. For example, the final counts of window `12:00 - 12:10` is -appended to the Result Table only after the watermark is updated to `12:11`. +counts to the Result Table/sink. For example, the final counts of window `12:00 - 12:10` is +appended to the Result Table only after the watermark is updated to `12:11`. **Conditions for watermarking to clean aggregation state** -It is important to note that the following conditions must be satisfied for the watermarking to +It is important to note that the following conditions must be satisfied for the watermarking to clean the state in aggregation queries *(as of Spark 2.1.1, subject to change in the future)*. -- **Output mode must be Append or Update.** Complete mode requires all aggregate data to be preserved, -and hence cannot use watermarking to drop intermediate state. See the [Output Modes](#output-modes) +- **Output mode must be Append or Update.** Complete mode requires all aggregate data to be preserved, +and hence cannot use watermarking to drop intermediate state. See the [Output Modes](#output-modes) section for detailed explanation of the semantics of each output mode. -- The aggregation must have either the event-time column, or a `window` on the event-time column. +- The aggregation must have either the event-time column, or a `window` on the event-time column. -- `withWatermark` must be called on the -same column as the timestamp column used in the aggregate. For example, -`df.withWatermark("time", "1 min").groupBy("time2").count()` is invalid +- `withWatermark` must be called on the +same column as the timestamp column used in the aggregate. For example, +`df.withWatermark("time", "1 min").groupBy("time2").count()` is invalid in Append output mode, as watermark is defined on a different column from the aggregation column. -- `withWatermark` must be called before the aggregation for the watermark details to be used. -For example, `df.groupBy("time").count().withWatermark("time", "1 min")` is invalid in Append +- `withWatermark` must be called before the aggregation for the watermark details to be used. +For example, `df.groupBy("time").count().withWatermark("time", "1 min")` is invalid in Append output mode. @@ -1027,12 +1027,12 @@ streamingDf \ ### Arbitrary Stateful Operations -Many uscases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation `mapGroupsWithState` and the more powerful operation `flatMapGroupsWithState`. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.GroupState)/[Java](api/java/org/apache/spark/sql/streaming/GroupState.html)) and the examples ([Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java)). +Many uscases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation `mapGroupsWithState` and the more powerful operation `flatMapGroupsWithState`. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.GroupState)/[Java](api/java/org/apache/spark/sql/streaming/GroupState.html)) and the examples ([Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java)). ### Unsupported Operations -There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. +There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. Some of them are as follows. - + - Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets. - Limit and take first N rows are not supported on streaming Datasets. @@ -1053,17 +1053,17 @@ Some of them are as follows. In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that). -- `count()` - Cannot return a single count from a streaming Dataset. Instead, use `ds.groupBy.count()` which returns a streaming Dataset containing a running count. +- `count()` - Cannot return a single count from a streaming Dataset. Instead, use `ds.groupBy.count()` which returns a streaming Dataset containing a running count. - `foreach()` - Instead use `ds.writeStream.foreach(...)` (see next section). - `show()` - Instead use the console sink (see next section). If you try any of these operations, you will see an `AnalysisException` like "operation XYZ is not supported with streaming DataFrames/Datasets". -While some of them may be supported in future releases of Spark, -there are others which are fundamentally hard to implement on streaming data efficiently. -For example, sorting on the input stream is not supported, as it requires keeping -track of all the data received in the stream. This is therefore fundamentally hard to execute +While some of them may be supported in future releases of Spark, +there are others which are fundamentally hard to implement on streaming data efficiently. +For example, sorting on the input stream is not supported, as it requires keeping +track of all the data received in the stream. This is therefore fundamentally hard to execute efficiently. ## Starting Streaming Queries @@ -1084,19 +1084,19 @@ returned through `Dataset.writeStream()`. You will have to specify one or more o #### Output Modes There are a few types of output modes. -- **Append mode (default)** - This is the default mode, where only the -new rows added to the Result Table since the last trigger will be -outputted to the sink. This is supported for only those queries where -rows added to the Result Table is never going to change. Hence, this mode -guarantees that each row will be output only once (assuming -fault-tolerant sink). For example, queries with only `select`, +- **Append mode (default)** - This is the default mode, where only the +new rows added to the Result Table since the last trigger will be +outputted to the sink. This is supported for only those queries where +rows added to the Result Table is never going to change. Hence, this mode +guarantees that each row will be output only once (assuming +fault-tolerant sink). For example, queries with only `select`, `where`, `map`, `flatMap`, `filter`, `join`, etc. will support Append mode. - **Complete mode** - The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries. -- **Update mode** - (*Available since Spark 2.1.1*) Only the rows in the Result Table that were -updated since the last trigger will be outputted to the sink. +- **Update mode** - (*Available since Spark 2.1.1*) Only the rows in the Result Table that were +updated since the last trigger will be outputted to the sink. More information to be added in future releases. Different types of streaming queries support different output modes. @@ -1114,9 +1114,9 @@ Here is the compatibility matrix. Aggregation on event-time with watermark Append, Update, Complete - Append mode uses watermark to drop old aggregation state. But the output of a + Append mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in `withWatermark()` as by - the modes semantics, rows can be added to the Result Table only once after they are + the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed). See the Late Data section for more details.

@@ -1130,10 +1130,10 @@ Here is the compatibility matrix. Other aggregations Complete, Update - Since no watermark is defined (only defined in other category), + Since no watermark is defined (only defined in other category), old aggregation state is not dropped.

- Append mode is not supported as aggregates can update thus violating the semantics of + Append mode is not supported as aggregates can update thus violating the semantics of this mode. @@ -1213,9 +1213,9 @@ writeStream .start() {% endhighlight %} -Some sinks are not fault-tolerant because they do not guarantee persistence of the output and are -meant for debugging purposes only. See the earlier section on -[fault-tolerance semantics](#fault-tolerance-semantics). +Some sinks are not fault-tolerant because they do not guarantee persistence of the output and are +meant for debugging purposes only. See the earlier section on +[fault-tolerance semantics](#fault-tolerance-semantics). Here are the details of all the sinks in Spark. @@ -1459,12 +1459,12 @@ which has methods that get called whenever there is a sequence of rows generated - `version` and `partition` are two parameters in `open` that uniquely represent a set of rows that needs to be pushed out. `version` is a monotonically increasing id that increases with every trigger. `partition` is an id that represents a partition of the output, since the output is distributed and will be processed on multiple executors. -- `open` can use the `version` and `partition` to choose whether it needs to write the sequence of rows. Accordingly, it can return `true` (proceed with writing), or `false` (no need to write). If `false` is returned, then `process` will not be called on any row. For example, after a partial failure, some of the output partitions of the failed trigger may have already been committed to a database. Based on metadata stored in the database, the writer can identify partitions that have already been committed and accordingly return false to skip committing them again. +- `open` can use the `version` and `partition` to choose whether it needs to write the sequence of rows. Accordingly, it can return `true` (proceed with writing), or `false` (no need to write). If `false` is returned, then `process` will not be called on any row. For example, after a partial failure, some of the output partitions of the failed trigger may have already been committed to a database. Based on metadata stored in the database, the writer can identify partitions that have already been committed and accordingly return false to skip committing them again. - Whenever `open` is called, `close` will also be called (unless the JVM exits due to some error). This is true even if `open` returns false. If there is any error in processing and writing the data, `close` will be called with the error. It is your responsibility to clean up state (e.g. connections, transactions, etc.) that have been created in `open` such that there are no resource leaks. ## Managing Streaming Queries -The `StreamingQuery` object created when a query is started can be used to monitor and manage the query. +The `StreamingQuery` object created when a query is started can be used to monitor and manage the query.
From 3337bcd044f3d6ee6287a51038bf353af557d81f Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Sun, 30 Apr 2017 16:14:38 -0700 Subject: [PATCH 5/7] more space --- .../structured-streaming-programming-guide.md | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index c038afddd189..fd72dd1b07f4 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -219,8 +219,8 @@ awaitTermination(query) After this code is executed, the streaming computation will have started in the background. The `query` object is a handle to that active streaming query, and we have decided to wait for the termination of the query using `awaitTermination()` to prevent the process from exiting while the query is active. -To actually execute this example code, you can either compile the code in your own -[Spark application](quick-start.html#self-contained-applications), or simply +To actually execute this example code, you can either compile the code in your own +[Spark application](quick-start.html#self-contained-applications), or simply [run the example](index.html#running-the-examples-and-shell) once you have downloaded Spark. We are showing the latter. You will first need to run Netcat (a small utility found in most Unix-like systems) as a data server by using @@ -746,7 +746,7 @@ count(groupBy(df, "deviceType"))
### Window Operations on Event Time -Aggregations over a sliding event-time window are straightforward with Structured Streaming and are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let's understand this with an illustration. +Aggregations over a sliding event-time window are straightforward with Structured Streaming and are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let's understand this with an illustration. Imagine our [quick example](#quick-example) is modified and the stream now contains lines along with the time when the line was generated. Instead of running word counts, we want to count words within 10 minute windows, updating every 5 minutes. That is, word counts in words received between 10 minute windows 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20, etc. Note that 12:00 - 12:10 means data that arrived after 12:00 but before 12:10. Now, consider a word that was received at 12:07. This word should increment the counts corresponding to two windows 12:00 - 12:10 and 12:05 - 12:15. So the counts will be indexed by both, the grouping key (i.e. the word) and the window (can be calculated from the event-time). @@ -1619,23 +1619,23 @@ Not available in R. ## Monitoring Streaming Queries -There are two APIs for monitoring and debugging active queries - +There are two APIs for monitoring and debugging active queries - interactively and asynchronously. ### Interactive APIs -You can directly get the current status and metrics of an active query using -`streamingQuery.lastProgress()` and `streamingQuery.status()`. -`lastProgress()` returns a `StreamingQueryProgress` object -in [Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryProgress) +You can directly get the current status and metrics of an active query using +`streamingQuery.lastProgress()` and `streamingQuery.status()`. +`lastProgress()` returns a `StreamingQueryProgress` object +in [Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryProgress) and [Java](api/java/org/apache/spark/sql/streaming/StreamingQueryProgress.html) and a dictionary with the same fields in Python. It has all the information about -the progress made in the last trigger of the stream - what data was processed, -what were the processing rates, latencies, etc. There is also +the progress made in the last trigger of the stream - what data was processed, +what were the processing rates, latencies, etc. There is also `streamingQuery.recentProgress` which returns an array of last few progresses. -In addition, `streamingQuery.status()` returns a `StreamingQueryStatus` object -in [Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryStatus) +In addition, `streamingQuery.status()` returns a `StreamingQueryStatus` object +in [Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryStatus) and [Java](api/java/org/apache/spark/sql/streaming/StreamingQueryStatus.html) and a dictionary with the same fields in Python. It gives information about what the query is immediately doing - is a trigger active, is data being processed, etc. From 197eee34a9a2469821838ca7d8eb995483ee3417 Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Sun, 30 Apr 2017 16:18:18 -0700 Subject: [PATCH 6/7] more space --- docs/structured-streaming-programming-guide.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index fd72dd1b07f4..15d4d41852cc 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1790,7 +1790,7 @@ Will print something like the following. ''' print(query.status) -''' +''' Will print something like the following. {u'message': u'Waiting for data to arrive', u'isTriggerActive': False, u'isDataAvailable': False} @@ -1917,7 +1917,7 @@ Not available in R.
-## Recovering from Failures with Checkpointing +## Recovering from Failures with Checkpointing In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the [quick example](#quick-example)) to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when [starting a query](#starting-streaming-queries).
From f2b10afc22e48fe4f2699755907d58d03e153f07 Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Wed, 3 May 2017 21:37:57 -0700 Subject: [PATCH 7/7] rename --- docs/structured-streaming-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 15d4d41852cc..53b3db21da76 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -157,7 +157,7 @@ words <- selectExpr(lines, "explode(split(value, ' ')) as word") wordCounts <- count(group_by(words, "word")) {% endhighlight %} -This `lines` SparkDataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have a SQL expression with two SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we name the new column as "word". Finally, we have defined the `wordCounts` DataFrame by grouping by the unique values in the SparkDataFrame and counting them. Note that this is a streaming SparkDataFrame which represents the running word counts of the stream. +This `lines` SparkDataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have a SQL expression with two SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we name the new column as "word". Finally, we have defined the `wordCounts` SparkDataFrame by grouping by the unique values in the SparkDataFrame and counting them. Note that this is a streaming SparkDataFrame which represents the running word counts of the stream.