diff --git a/docs/img/structured-streaming-watermark.png b/docs/img/structured-streaming-watermark.png new file mode 100644 index 000000000000..f21fbda17101 Binary files /dev/null and b/docs/img/structured-streaming-watermark.png differ diff --git a/docs/img/structured-streaming.pptx b/docs/img/structured-streaming.pptx index 6aad2ed33e92..f5bdfc078cad 100644 Binary files a/docs/img/structured-streaming.pptx and b/docs/img/structured-streaming.pptx differ diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 77b66b3b3a49..3b7d0c400317 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -10,7 +10,7 @@ title: Structured Streaming Programming Guide # Overview Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data.The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the [Dataset/DataFrame API](sql-programming-guide.html) in Scala, Java or Python to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, *Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.* -**Spark 2.0 is the ALPHA RELEASE of Structured Streaming** and the APIs are still experimental. In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count. +**Structured Streaming is still ALPHA in Spark 2.1** and the APIs are still experimental. In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count. # Quick Example Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in @@ -400,7 +400,14 @@ see how this model handles event-time based processing and late arriving data. ## Handling Event-time and Late Data Event-time is the time embedded in the data itself. For many applications, you may want to operate on this event-time. For example, if you want to get the number of events generated by IoT devices every minute, then you probably want to use the time when the data was generated (that is, event-time in the data), rather than the time Spark receives them. This event-time is very naturally expressed in this model -- each event from the devices is a row in the table, and event-time is a column value in the row. This allows window-based aggregations (e.g. number of events every minute) to be just a special type of grouping and aggregation on the even-time column -- each time window is a group and each row can belong to multiple windows/groups. Therefore, such event-time-window-based aggregation queries can be defined consistently on both a static dataset (e.g. from collected device events logs) as well as on a data stream, making the life of the user much easier. -Furthermore, this model naturally handles data that has arrived later than expected based on its event-time. Since Spark is updating the Result Table, it has full control over updating/cleaning up the aggregates when there is late data. While not yet implemented in Spark 2.0, event-time watermarking will be used to manage this data. These are explained later in more details in the [Window Operations](#window-operations-on-event-time) section. +Furthermore, this model naturally handles data that has arrived later than +expected based on its event-time. Since Spark is updating the Result Table, +it has full control over updating old aggregates when there is late data, +as well as cleaning up old aggregates to limit the size of intermediate +state data. Since Spark 2.1, we have support for watermarking which +allows the user to specify the threshold of late data, and allows the engine +to accordingly clean up old state. These are explained later in more +details in the [Window Operations](#window-operations-on-event-time) section. ## Fault Tolerance Semantics Delivering end-to-end exactly-once semantics was one of key goals behind the design of Structured Streaming. To achieve that, we have designed the Structured Streaming sources, the sinks and the execution engine to reliably track the exact progress of the processing so that it can handle any kind of failure by restarting and/or reprocessing. Every streaming source is assumed to have offsets (similar to Kafka offsets, or Kinesis sequence numbers) @@ -671,12 +678,123 @@ windowedCounts = words.groupBy( +### Handling Late Data and Watermarking Now consider what happens if one of the events arrives late to the application. -For example, a word that was generated at 12:04 but it was received at 12:11. -Since this windowing is based on the time in the data, the time 12:04 should be considered for windowing. This occurs naturally in our window-based grouping – the late data is automatically placed in the proper windows and the correct aggregates are updated as illustrated below. +For example, say, a word generated at 12:04 (i.e. event time) could be received received by +the application at 12:11. The application should use the time 12:04 instead of 12:11 +to update the older counts for the window `12:00 - 12:10`. This occurs +naturally in our window-based grouping – Structured Streaming can maintain the intermediate state +for partial aggregates for a long period of time such that late data can update aggregates of +old windows correctly, as illustrated below.  +However, to run this query for days, its necessary for the system to bound the amount of +intermediate in-memory state it accumulates. This means the system needs to know when an old +aggregate can be dropped from the in-memory state because the application is not going to receive +late data for that aggregate any more. To enable this, in Spark 2.1, we have introduced +**watermarking**, which let's the engine automatically track the current event time in the data and +and attempt to clean up old state accordingly. You can define the watermark of a query by +specifying the event time column and the threshold on how late the data is expected be in terms of +event time. For a specific window starting at time `T`, the engine will maintain state and allow late +data to be update the state until `(max event time seen by the engine - late threshold > T)`. +In other words, late data within the threshold will be aggregated, +but data later than the threshold will be dropped. Let's understand this with an example. We can +easily define watermarking on the previous example using `withWatermark()` as shown below. + +
| Query Type | ++ | Supported Output Modes | +Notes | +
|---|---|---|---|
Queries without aggregation |
+ Append | ++ Complete mode note supported as it is infeasible to keep all data in the Result Table. + | +|
| Queries with aggregation | +Aggregation on event-time with watermark | +Append, Complete | +
+ Append mode uses watermark to drop old aggregation state. But the output of a
+ windowed aggregation is delayed the late threshold specified in `withWatermark()` as by
+ the modes semantics, rows can be added to the Result Table only once after they are
+ finalized (i.e. after watermark is crossed). See
+ Late Data section for more details.
+ + Complete mode does drop not old aggregation state since by definition this mode + preserves all data in the Result Table. + |
+
| Other aggregations | +Complete | +
+ Append mode is not supported as aggregates can update thus violating the semantics of
+ this mode.
+ + Complete mode does drop not old aggregation state since by definition this mode + preserves all data in the Result Table. + |
+ |
| + | + | + | + |
writeStream
.format("parquet")
.start()
writeStream
.format("memory")
.queryName("table")
.start()