Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 51 additions & 36 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -537,21 +537,21 @@ Most of the common operations on DataFrame/Dataset are supported for streaming.
<div data-lang="scala" markdown="1">

{% highlight scala %}
case class DeviceData(device: String, type: String, signal: Double, time: DateTime)
case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type won't compile in scala since it is a keyword, hence here change it to deviceType


val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: string }
val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData] // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10") // using untyped APIs
ds.filter(_.signal > 10).map(_.device) // using typed APIs

// Running count of the number of updates for each device type
df.groupBy("type").count() // using untyped API
df.groupBy("deviceType").count() // using untyped API

// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed._
ds.groupByKey(_.type).agg(typed.avg(_.signal)) // using typed API
import org.apache.spark.sql.expressions.scalalang.typed
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this import should be ….scalalang.typed rather than ….scalalang.typed._

ds.groupByKey(_.deviceType).agg(typed.avg(_.signal)) // using typed API
{% endhighlight %}

</div>
Expand All @@ -565,7 +565,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;

public class DeviceData {
private String device;
private String type;
private String deviceType;
private Double signal;
private java.sql.Date time;
...
Expand All @@ -590,13 +590,13 @@ ds.filter(new FilterFunction<DeviceData>() { // using typed APIs
}, Encoders.STRING());

// Running count of the number of updates for each device type
df.groupBy("type").count(); // using untyped API
df.groupBy("deviceType").count(); // using untyped API

// Running average signal for each device type
ds.groupByKey(new MapFunction<DeviceData, String>() { // using typed API
@Override
public String call(DeviceData value) throws Exception {
return value.getType();
return value.getDeviceType();
}
}, Encoders.STRING()).agg(typed.avg(new MapFunction<DeviceData, Double>() {
@Override
Expand All @@ -611,13 +611,13 @@ ds.groupByKey(new MapFunction<DeviceData, String>() { // using typed API
<div data-lang="python" markdown="1">

{% highlight python %}
df = ... # streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
df = ... # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }

# Select the devices which have signal more than 10
df.select("device").where("signal > 10")

# Running count of the number of updates for each device type
df.groupBy("type").count()
df.groupBy("deviceType").count()
{% endhighlight %}
</div>
</div>
Expand Down Expand Up @@ -973,7 +973,7 @@ Here is a table of all the sinks, and the corresponding settings.
<tr>
<td><b>File Sink</b></td>
<td>Append</td>
<td><pre>writeStream<br/> .format("parquet")<br/> .start()</pre></td>
<td><pre>writeStream<br/> .format("parquet")<br/> .option(<br/> "checkpointLocation",<br/> "path/to/checkpoint/dir")<br/> .option(<br/> "path",<br/> "path/to/destination/dir")<br/> .start()</pre></td>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file format requires checkpointLocation and path to be explicitly specified

<td>Yes</td>
<td>Supports writes to partitioned tables. Partitioning by time may be useful.</td>
</tr>
Expand Down Expand Up @@ -1026,7 +1026,9 @@ noAggDF
// Write new data to Parquet files
noAggDF
.writeStream
.parquet("path/to/destination/directory")
.format("parquet")
.option("checkpointLocation", "path/to/checkpoint/dir")
.option("path", "path/to/destination/dir")
.start()

// ========== DF with aggregation ==========
Expand Down Expand Up @@ -1066,7 +1068,9 @@ noAggDF
// Write new data to Parquet files
noAggDF
.writeStream()
.parquet("path/to/destination/directory")
.format("parquet")
.option("checkpointLocation", "path/to/checkpoint/dir")
.option("path", "path/to/destination/dir")
.start();

// ========== DF with aggregation ==========
Expand Down Expand Up @@ -1106,7 +1110,9 @@ noAggDF \
# Write new data to Parquet files
noAggDF \
.writeStream() \
.parquet("path/to/destination/directory") \
.format("parquet") \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.option("path", "path/to/destination/dir") \
.start()

# ========== DF with aggregation ==========
Expand All @@ -1120,11 +1126,11 @@ aggDF \
.start()

# Have all the aggregates in an in memory table. The query name will be the table name
aggDF\
.writeStream()\
.queryName("aggregates")\
.outputMode("complete")\
.format("memory")\
aggDF \
.writeStream() \
.queryName("aggregates") \
.outputMode("complete") \
.format("memory") \
.start()

spark.sql("select * from aggregates").show() # interactively query in-memory table
Expand Down Expand Up @@ -1159,7 +1165,9 @@ The `StreamingQuery` object created when a query is started can be used to monit
{% highlight scala %}
val query = df.writeStream.format("console").start() // get the query object

query.id // get the unique identifier of the running query
query.id // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId // get the unique id of this run of the query, which will be generated at every start/restart

query.name // get the name of the auto-generated or user-specified name

Expand All @@ -1169,11 +1177,11 @@ query.stop() // stop the query

query.awaitTermination() // block until query is terminated, with stop() or with error

query.exception() // the exception if the query has been terminated with error
query.exception // the exception if the query has been terminated with error

query.sourceStatus() // progress information about data has been read from the input sources
query.recentProgress // an array of the most recent progress updates for this query

query.sinkStatus() // progress information about data written to the output sink
query.lastProgress // the most recent progress update of this streaming query
{% endhighlight %}


Expand All @@ -1183,21 +1191,23 @@ query.sinkStatus() // progress information about data written to the output si
{% highlight java %}
StreamingQuery query = df.writeStream().format("console").start(); // get the query object

query.id(); // get the unique identifier of the running query
query.id(); // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId(); // get the unique id of this run of the query, which will be generated at every start/restart

query.name(); // get the name of the auto-generated or user-specified name

query.explain(); // print detailed explanations of the query

query.stop(); // stop the query
query.stop(); // stop the query

query.awaitTermination(); // block until query is terminated, with stop() or with error

query.exception(); // the exception if the query has been terminated with error
query.exception(); // the exception if the query has been terminated with error

query.sourceStatus(); // progress information about data has been read from the input sources
query.recentProgress(); // an array of the most recent progress updates for this query

query.sinkStatus(); // progress information about data written to the output sink
query.lastProgress(); // the most recent progress update of this streaming query

{% endhighlight %}

Expand All @@ -1207,7 +1217,9 @@ query.sinkStatus(); // progress information about data written to the output s
{% highlight python %}
query = df.writeStream().format("console").start() # get the query object

query.id() # get the unique identifier of the running query
query.id() # get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId() # get the unique id of this run of the query, which will be generated at every start/restart

query.name() # get the name of the auto-generated or user-specified name

Expand All @@ -1217,11 +1229,11 @@ query.stop() # stop the query

query.awaitTermination() # block until query is terminated, with stop() or with error

query.exception() # the exception if the query has been terminated with error
query.exception() # the exception if the query has been terminated with error

query.sourceStatus() # progress information about data has been read from the input sources
query.recentProgress() # an array of the most recent progress updates for this query

query.sinkStatus() # progress information about data written to the output sink
query.lastProgress() # the most recent progress update of this streaming query

{% endhighlight %}

Expand Down Expand Up @@ -1491,14 +1503,17 @@ spark.streams.addListener(new StreamingQueryListener() {
{% highlight java %}
SparkSession spark = ...

spark.streams.addListener(new StreamingQueryListener() {
@Overrides void onQueryStarted(QueryStartedEvent queryStarted) {
spark.streams().addListener(new StreamingQueryListener() {
@Override
public void onQueryStarted(QueryStartedEvent queryStarted) {
System.out.println("Query started: " + queryStarted.id());
}
@Overrides void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
@Override
public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
System.out.println("Query terminated: " + queryTerminated.id());
}
@Overrides void onQueryProgress(QueryProgressEvent queryProgress) {
@Override
public void onQueryProgress(QueryProgressEvent queryProgress) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's @Override in java, and these methods must be public

System.out.println("Query made progress: " + queryProgress.progress());
}
});
Expand Down