Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 23 additions & 26 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -626,52 +626,49 @@ The result tables would look something like the following.

![Window Operations](img/structured-streaming-window.png)

Since this windowing is similar to grouping, in code, you can use `groupBy()` and `window()` operations to express windowed aggregations.
Since this windowing is similar to grouping, in code, you can use `groupBy()` and `window()` operations to express windowed aggregations. You can see the full code for the below examples in
[Scala]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala)/
[Java]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java)/
[Python]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these changes are good. do not revert this based on what i have said below.


<div class="codetabs">
<div data-lang="scala" markdown="1">

{% highlight scala %}
// Number of events in every 1 minute time windows
df.groupBy(window(df.col("time"), "1 minute"))
.count()
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Average number of events for each device type in every 1 minute time windows
df.groupBy(
df.col("type"),
window(df.col("time"), "1 minute"))
.avg("signal")
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a look at the built doc again and imagined what it would look like. This would look very verbose. I think since the nearest example in the doc (Basic Operations - Selection, Projection, Aggregation) uses device data and already has all the boilerplate code to define DeviceData class, etc., lets not change the code snippet to the exact one in the example.

Can you revert all the code snippet changes, and just do one change for the Scala snippet.

  • Change df.col("..") to use $"..."
  • Add import spark.implicits._

window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
{% endhighlight %}

</div>
<div data-lang="java" markdown="1">

{% highlight java %}
import static org.apache.spark.sql.functions.window;

// Number of events in every 1 minute time windows
df.groupBy(window(df.col("time"), "1 minute"))
.count();

// Average number of events for each device type in every 1 minute time windows
df.groupBy(
df.col("type"),
window(df.col("time"), "1 minute"))
.avg("signal");
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
words.col("word")
).count();
{% endhighlight %}

</div>
<div data-lang="python" markdown="1">
{% highlight python %}
from pyspark.sql.functions import window

# Number of events in every 1 minute time windows
df.groupBy(window("time", "1 minute")).count()
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Average number of events for each device type in every 1 minute time windows
df.groupBy("type", window("time", "1 minute")).avg("signal")
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, '10 minutes', '5 minutes'),
words.word
).count()
{% endhighlight %}

</div>
Expand Down