Skip to content

Samples Revamp: Streaming #319

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 42 commits into from
Nov 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
c07f36d
Merge pull request #1 from dotnet/master
bamurtaugh Oct 9, 2019
4813b69
Merge pull request #2 from dotnet/master
bamurtaugh Oct 28, 2019
b0b2fee
Add readmes
bamurtaugh Oct 29, 2019
1944dc8
Update links in readme
bamurtaugh Oct 29, 2019
f0fe4f2
Moving general readmes to other branch
bamurtaugh Oct 29, 2019
e5a19e8
Move general readmes to other branch
bamurtaugh Oct 29, 2019
90262b4
Update streaming links and content
bamurtaugh Oct 29, 2019
c22feba
Update links
bamurtaugh Oct 29, 2019
375ae95
Merge branch 'master' into newsamples-stream
bamurtaugh Oct 29, 2019
c67f5be
Formatting for class/method names
bamurtaugh Oct 31, 2019
7b5b0cc
Add specific return type instead of var
bamurtaugh Oct 31, 2019
3c0021e
Move period inside asterisks
bamurtaugh Oct 31, 2019
ecc9191
Merge branch 'master' into newsamples-stream
bamurtaugh Nov 1, 2019
f78a74a
Merge branch 'master' into newsamples-stream
bamurtaugh Nov 4, 2019
dc7a2b2
Fix spacing
bamurtaugh Nov 7, 2019
4ed5cb9
Spacing
bamurtaugh Nov 7, 2019
4c81eaf
Merge branch 'master' into newsamples-stream
bamurtaugh Nov 7, 2019
45e76cb
Spacing
bamurtaugh Nov 7, 2019
5d17e49
Fix spacing in code snippets
bamurtaugh Nov 7, 2019
50b51e4
Add modified word count example
bamurtaugh Nov 7, 2019
dbac77f
Update udf explanation
bamurtaugh Nov 7, 2019
15f6060
Explain all samples
bamurtaugh Nov 7, 2019
7a76488
netcat context
bamurtaugh Nov 7, 2019
d2ec62f
Update code snippets, explanations
bamurtaugh Nov 7, 2019
0f23bb2
Wording
bamurtaugh Nov 7, 2019
f5de3fd
Update spark-submit
bamurtaugh Nov 7, 2019
2b64ae2
Relate back to udf streaming example
bamurtaugh Nov 7, 2019
a8e11ab
Fix indentation
bamurtaugh Nov 7, 2019
2ed31b8
Grammar
bamurtaugh Nov 7, 2019
10da44f
Update name of new sample
bamurtaugh Nov 7, 2019
d38854e
Update file name
bamurtaugh Nov 7, 2019
1371572
Add Sql.Streaming using
bamurtaugh Nov 7, 2019
8d59ab6
Merge branch 'newsamples-stream' of https://github.com/bamurtaugh/spa…
bamurtaugh Nov 7, 2019
13a58e2
Improve string output
bamurtaugh Nov 7, 2019
5586732
Fix readme code spacing
bamurtaugh Nov 7, 2019
e03d2d7
Shorten class ref in readme
bamurtaugh Nov 7, 2019
b7ec62c
Sort usings, add missing bracket
bamurtaugh Nov 7, 2019
4e9987c
Update comment
bamurtaugh Nov 7, 2019
38d4b08
Change netcat command to powershell id
bamurtaugh Nov 7, 2019
44fbf3d
Update output picture
bamurtaugh Nov 8, 2019
e268444
Merge branch 'master' into newsamples-stream
bamurtaugh Nov 8, 2019
cbdf3b9
Update spark-submit command
bamurtaugh Nov 8, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions examples/Microsoft.Spark.CSharp.Examples/Sql/Streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# .NET for Apache Spark C# Samples: Streaming

[.NET for Apache Spark](https://dot.net/spark) is a free, open-source, and cross-platform big data analytics framework.

In the **Streaming** folder, we provide C# samples which will help you get started with the big data analytics scenario known as
**structured streaming.** Stream processing means we're analyzing data live as it's being produced.

## Problem

These samples are examples of **stream processing** since we're processing data in real time.

## Solution

You'll see there are four different samples included in the *Streaming* folder:
* **[StructuredNetworkWordCount.cs](StructuredNetworkWordCount.cs)** - word count on data streamed from any source (i.e. netcat)
* **[StructuredNetworkWordCountWindowed.cs](StructuredNetworkWordCountWindowed.cs)** - word count on data with windowing logic
* **[StructuredKafkaWordCount.cs](StructuredKafkaWordCount.cs)** - word count on data streamed from Kafka
* **[StructuredNetworkCharacterCount.cs](StructuredNetworkCharacterCount.cs)** - sample counting number of characters in each string read from a stream; demonstrating power of UDFs + stream processing

While the steps below apply to most stream processing apps, some of the specific code snippets or submission instructions pertain specifically to the [StructuredNetworkCharacterCount.cs](StructuredNetworkCharacterCount.cs) example.

### 1. Create a Spark Session

In any Spark application, we need to establish a new `SparkSession`, which is the entry point to programming Spark with the Dataset and DataFrame API.

```CSharp
SparkSession spark = SparkSession
.Builder()
.AppName("Streaming example with a UDF")
.GetOrCreate();
```

By calling on the *spark* object created above, we can access Spark and DataFrame functionality throughout our program.

### 2. Establish and Connect to Data Stream

Depending upon which sample you choose to run, you need to setup a connection to a data stream. One popular way to test out stream processing is through **netcat.**

#### Establish Stream: Netcat

netcat (also known as *nc*) allows you to read from and write to network connections. We'll establish a network
connection with netcat through a terminal window.

[Download netcat](https://sourceforge.net/projects/nc110/files/), extract the file from the zip download, and append the
directory you extracted to your "PATH" environment variable.

To start a new connection, open a command prompt. For Linux users, run ```nc -lk 9999``` to connect to localhost on port 9999.

Windows users can run ```nc -vvv -l -p 9999``` to connect to localhost port 9999. The result should look something like this:

![NetcatConnect](https://github.com/bamurtaugh/spark/blob/StreamingLog/examples/Microsoft.Spark.CSharp.Examples/Sql/Streaming/netconnect.PNG)

Our Spark program will be listening for input we type into this command prompt.

#### Connect to Stream: ReadStream()

The `ReadStream()` method returns a `DataStreamReader` that can be used to read streaming data in as a `DataFrame`. We'll include the host and port information so that our Spark app knows where to expect its streaming data.

```CSharp
DataFrame lines = spark
.ReadStream()
.Format("socket")
.Option("host", hostname)
.Option("port", port)
.Load();
```

### 3. Register a UDF

A UDF is a *user-defined function.* We can use UDFs in Spark applications to perform calculations and analysis on our data.

```CSharp
Func<Column, Column> udfArray =
Udf<string, string[]>((str) => new string[] { str, $"{str} {str.Length}");
```

In the above code snippet from [StructuredNetworkCharacterCount.cs](StructuredNetworkCharacterCount.cs), we register a UDF called `udfArray`. This UDF will process each string it receives from the netcat terminal to produce an array that includes: the original string (contained in *str*), the original string concatenated with the length of that original string.

For example, entering *Hello world* in the terminal would produce an array where:
* array[0] = Hello world
* array[1] = Hello world 11

This is just an example of how you can use UDFs to further modify and analyze your data, even live as it's being streamed in!

### 4. Use SparkSQL

Next, we'll use SparkSQL to perform various functions on the data stored in our DataFrame. It's common to combine UDFs and SparkSQL so that we can apply a UDF to each row of our DataFrame.

```CSharp
DataFrame arrayDF = lines.Select(Explode(udfArray(lines["value"])));
```

In the above code snippet from [StructuredNetworkCharacterCount.cs](StructuredNetworkCharacterCount.cs), we apply *udfArray* to each value in our DataFrame (which represents each string read in from our netcat terminal). We then apply the SparkSQL method `Explode` to put each entry of our array in its own row. Finally, we use `Select` to place the columns we've produced in the new DataFrame *arrayDF.*

### 5. Display Your Stream

We can use `DataFrame.WriteStream()` to establish characteristics of our output, such as printing our results to the console and only displaying the most recent output and not all of our previous output as well.

```CSharp
StreamingQuery query = arrayDf
.WriteStream()
.Format("console")
.Start();
```

### 6. Running Your Code

Structured streaming in Spark processes data through a series of small **batches.**
When you run your program, the command prompt where we established the netcat will allow you to start typing.
In our example, when you hit *enter* after entering data in the command prompt, Spark will consider that a batch and run the UDF.

![StreamingOutput](https://github.com/bamurtaugh/spark/blob/StreamingLog/examples/Microsoft.Spark.CSharp.Examples/Sql/Streaming/stream2.png)

Check out the directions for building and running this app on [Windows](../../../../../../docs/building/windows-instructions.md) or [Ubuntu](../../../../../../docs/building/ubuntu-instructions.md).

#### Windows Example:

After starting a new netcat session, open a new terminal and run your `spark-submit` command, similar to the following:

```powershell
spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local /path/to/microsoft-spark-<version>.jar Microsoft.Spark.CSharp.Examples.exe Sql.Streaming.StructuredNetworkCharacterCount localhost 9999
```

> **Note:** Be sure to update the above command with the actual path to your Microsoft Spark jar file. The above command also assumes your netcat server is running on localhost port 9999.

## Additional Resources

To learn more about structured streaming with .NET for Apache Spark, check out [this video](https://channel9.msdn.com/Series/NET-for-Apache-Spark-101/Structured-Streaming-with-NET-for-Apache-Spark) from the .NET for Apache Spark 101 video series to see a streaming demo coded and ran live.

You can also [check out the demos and explanation](https://youtu.be/ZWsYMQ0Sw1o) from the .NET for Apache Spark session at .NET Conf 2019!
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Streaming;
using static Microsoft.Spark.Sql.Functions;

namespace Microsoft.Spark.Examples.Sql.Streaming
{
/// <summary>
/// This an example of using a UDF with streaming processing.
///
/// You can set up the data source as follow in a separated terminal:
/// `$ nc -lk 9999`
/// to start writing standard input to port 9999.
/// </summary>
internal sealed class StructuredNetworkCharacterCount : IExample
{
public void Run(string[] args)
{
// Default to running on localhost:9999
string hostname = "localhost";
var port = 9999;

// User designated their own host and port
if (args.Length == 2)
{
hostname = args[0];
port = int.Parse(args[1]);
}

SparkSession spark = SparkSession
.Builder()
.AppName("Streaming example with a UDF")
.GetOrCreate();

DataFrame lines = spark
.ReadStream()
.Format("socket")
.Option("host", hostname)
.Option("port", port)
.Load();

// UDF to produce an array
// Array includes: 1) original string 2) original string + length of original string
Func<Column, Column> udfArray =
Udf<string, string[]>((str) => new string[] { str, $"{str} {str.Length}" });
DataFrame arrayDF = lines.Select(Explode(udfArray(lines["value"])));

// Process and display each incoming line
StreamingQuery query = arrayDF
.WriteStream()
.Format("console")
.Start();

query.AwaitTermination();
}
}
}