Skip to content

Commit

Permalink
Improve Streams IO docs (#3127)
Browse files Browse the repository at this point in the history
* Improve Streams IO docs

* fixed remarks

* changed example

* Fixed an example

* api approval
  • Loading branch information
alexvaluyskiy authored and Aaronontheweb committed Oct 2, 2017
1 parent 716d330 commit 67bced3
Show file tree
Hide file tree
Showing 7 changed files with 346 additions and 2 deletions.
55 changes: 55 additions & 0 deletions docs/articles/streams/workingwithstreamingio.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,61 @@ title: Working with streaming IO
---

# Working with streaming IO
Akka Streams provides a way of handling File IO and TCP connections with Streams. While the general approach is very similar to the [Actor based TCP handling using Akka IO](xref:akka-io), by using Akka Streams you are freed of having to manually react to back-pressure signals, as the library does it transparently for you.

## Streaming TCP

### Accepting connections: Echo Server
In order to implement a simple EchoServer we bind to a given address, which returns a `Source<Tcp.IncomingConnection, Task<Tcp.ServerBinding>>`, which will emit an `IncomingConnection` element for each new connection that the Server s`hould handle:

[!code-csharp[StreamTcpDocTests.cs](../../examples/DocsExamples/Streams/StreamTcpDocTests.cs?name=echo-server-simple-bind)]

![tcp stream bind](/images/tcp-stream-bind.png)

Next, we simply handle each incoming connection using a `Flow` which will be used as the processing stage to handle and emit `ByteString` from and to the TCP Socket. Since one `ByteString` does not have to necessarily correspond to exactly one line of text (the client might be sending the line in chunks) we use the `Framing.Delimiter` helper to chunk the inputs up into actual lines of text. The last boolean argument indicates that we require an explicit line ending even for the last message before the connection is closed. In this example we simply add exclamation marks to each incoming text message and push it through the flow:

[!code-csharp[StreamTcpDocTests.cs](../../examples/DocsExamples/Streams/StreamTcpDocTests.cs?name=echo-server-simple-handle)]

Notice that while most building blocks in Akka Streams are reusable and freely shareable, this is not the case for the incoming connection Flow, since it directly corresponds to an existing, already accepted connection its handling can only ever be materialized once.

Closing connections is possible by cancelling the incoming connection `Flow` from your server logic (e.g. by connecting its downstream to a `Sink.Cancelled` and its upstream to a `Source.Empty`). It is also possible `to shut down the server’s socket by cancelling the `IncomingConnection` source `connections`.

[!code-csharp[StreamTcpDocTests.cs](../../examples/DocsExamples/Streams/StreamTcpDocTests.cs?name=close-incoming-connection)]

We can then test the TCP server by sending data to the TCP Socket using `netcat` (on Windows it is possible to use Linux Subsystem for Windows):
```
echo -n "Hello World" | netcat 127.0.0.1 8888
Hello World!!!
```

![tcp stream run](/images/tcp-stream-run.png)

### Connecting: REPL Client

In this example we implement a rather naive Read Evaluate Print Loop client over TCP. Let’s say we know a server has exposed a simple command line interface over TCP, and would like to interact with it using Akka Streams over TCP. To open an outgoing connection socket we use the `OutgoingConnection` method:

[!code-csharp[StreamTcpDocTests.cs](../../examples/DocsExamples/Streams/StreamTcpDocTests.cs?name=repl-client)]

The `repl` flow we use to handle the server interaction first prints the servers response, then awaits on input from the command line (this blocking call is used here just for the sake of simplicity) and converts it to a `ByteString` which is then sent over the wire to the server. Then we simply connect the TCP pipeline to this processing stage–at this point it will be materialized and start processing data once the server responds with an initial message.

A resilient REPL client would be more sophisticated than this, for example it should split out the input reading into a separate `SelectAsync` step and have a way to let the server write more data than one `ByteString` chunk at any given time, these improvements however are left as exercise for the reader.

### Avoiding deadlocks and liveness issues in back-pressured cycles
When writing such end-to-end back-pressured systems you may sometimes end up in a situation of a loop, in which either side is waiting for the other one to start the conversation. One does not need to look far to find examples of such back-pressure loops. In the two examples shown previously, we always assumed that the side we are connecting to would start the conversation, which effectively means both sides are back-pressured and can not get the conversation started. There are multiple ways of dealing with this which are explained in depth in [Graph cycles, liveness and deadlocks](xref:streams-working-with-graphs#graph-cycles-liveness-and-deadlocks), however in client-server scenarios it is often the simplest to make either side simply send an initial message.

> [!NOTE]
> In case of back-pressured cycles (which can occur even between different systems) sometimes you have to decide which of the sides has start the conversation in order to kick it off. This can be often done by injecting an initial message from one of the sides–a conversation starter.
To break this back-pressure cycle we need to inject some initial message, a “conversation starter”. First, we need to decide which side of the connection should remain passive and which active. Thankfully in most situations finding the right spot to start the conversation is rather simple, as it often is inherent to the protocol we are trying to implement using Streams. In chat-like applications, which our examples resemble, it makes sense to make the Server initiate the conversation by emitting a “hello” message:

[!code-csharp[StreamTcpDocTests.cs](../../examples/DocsExamples/Streams/StreamTcpDocTests.cs?name=welcome-banner-chat-server)]

To emit the initial message we merge a `Source` with a single element, after the command processing but before the framing and transformation to `ByteString` this way we do not have to repeat such logic.

In this example both client and server may need to close the stream based on a parsed command - `BYE` in the case of the server, and `q` in the case of the client. This is implemented by taking from the stream until `q` and and concatenating a `Source` with a single `BYE` element which will then be sent after the original source completed.

### Using framing in your protocol
Streaming transport protocols like TCP just pass streams of bytes, and does not know what is a logical chunk of bytes from the application's point of view. Often when implementing network protocols you will want to introduce your own framing. This can be done in two ways: An end-of-frame marker, e.g. end line `\n`, can do framing via `Framing.Delimiter`. Or a length-field can be used to build a framing protocol.

## Streaming File IO

Expand Down
131 changes: 129 additions & 2 deletions docs/examples/DocsExamples.sln
Original file line number Diff line number Diff line change
@@ -1,11 +1,29 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26430.14
VisualStudioVersion = 15.0.26730.12
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DocsExamples", "DocsExamples\DocsExamples.csproj", "{47E0257B-23D0-4D0E-B567-4DFCC3B2B2E1}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tutorials", "Tutorials\Tutorials.csproj", "{3C3559CC-D45D-4621-BB81-FCD6E7B38B4A}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tutorials", "Tutorials\Tutorials.csproj", "{3C3559CC-D45D-4621-BB81-FCD6E7B38B4A}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka", "..\..\src\core\Akka\Akka.csproj", "{E02A1282-A92D-4958-83BD-BA887843BB3E}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Internal", "Internal", "{8FF8A4B3-FCC0-4892-B5FF-F11C827AB1F3}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Streams", "..\..\src\core\Akka.Streams\Akka.Streams.csproj", "{BE36174C-0FC4-4A90-B12E-A1FEB1498244}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Persistence", "..\..\src\core\Akka.Persistence\Akka.Persistence.csproj", "{9E25D89B-0516-4C2A-A976-36A4DB2FEE3B}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.TestKit", "..\..\src\core\Akka.TestKit\Akka.TestKit.csproj", "{D5CA0DCD-7E61-4D59-B3C1-45797BFDE65C}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Cluster.Tools", "..\..\src\contrib\cluster\Akka.Cluster.Tools\Akka.Cluster.Tools.csproj", "{D25C1249-6732-425E-BE76-72A0E5351AAB}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.TestKit.Xunit2", "..\..\src\contrib\testkits\Akka.TestKit.Xunit2\Akka.TestKit.Xunit2.csproj", "{E9DAAAA6-2638-4388-9D11-8ADDC6A01C9C}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Cluster", "..\..\src\core\Akka.Cluster\Akka.Cluster.csproj", "{3A67944B-05B5-410C-93C7-654DC91E7F1C}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Remote", "..\..\src\core\Akka.Remote\Akka.Remote.csproj", "{E5AB69B8-8D2A-406A-AB00-FBA6FBA0D0BA}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -41,8 +59,117 @@ Global
{3C3559CC-D45D-4621-BB81-FCD6E7B38B4A}.Release|x64.Build.0 = Release|Any CPU
{3C3559CC-D45D-4621-BB81-FCD6E7B38B4A}.Release|x86.ActiveCfg = Release|Any CPU
{3C3559CC-D45D-4621-BB81-FCD6E7B38B4A}.Release|x86.Build.0 = Release|Any CPU
{E02A1282-A92D-4958-83BD-BA887843BB3E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E02A1282-A92D-4958-83BD-BA887843BB3E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E02A1282-A92D-4958-83BD-BA887843BB3E}.Debug|x64.ActiveCfg = Debug|Any CPU
{E02A1282-A92D-4958-83BD-BA887843BB3E}.Debug|x64.Build.0 = Debug|Any CPU
{E02A1282-A92D-4958-83BD-BA887843BB3E}.Debug|x86.ActiveCfg = Debug|Any CPU
{E02A1282-A92D-4958-83BD-BA887843BB3E}.Debug|x86.Build.0 = Debug|Any CPU
{E02A1282-A92D-4958-83BD-BA887843BB3E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E02A1282-A92D-4958-83BD-BA887843BB3E}.Release|Any CPU.Build.0 = Release|Any CPU
{E02A1282-A92D-4958-83BD-BA887843BB3E}.Release|x64.ActiveCfg = Release|Any CPU
{E02A1282-A92D-4958-83BD-BA887843BB3E}.Release|x64.Build.0 = Release|Any CPU
{E02A1282-A92D-4958-83BD-BA887843BB3E}.Release|x86.ActiveCfg = Release|Any CPU
{E02A1282-A92D-4958-83BD-BA887843BB3E}.Release|x86.Build.0 = Release|Any CPU
{BE36174C-0FC4-4A90-B12E-A1FEB1498244}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BE36174C-0FC4-4A90-B12E-A1FEB1498244}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BE36174C-0FC4-4A90-B12E-A1FEB1498244}.Debug|x64.ActiveCfg = Debug|Any CPU
{BE36174C-0FC4-4A90-B12E-A1FEB1498244}.Debug|x64.Build.0 = Debug|Any CPU
{BE36174C-0FC4-4A90-B12E-A1FEB1498244}.Debug|x86.ActiveCfg = Debug|Any CPU
{BE36174C-0FC4-4A90-B12E-A1FEB1498244}.Debug|x86.Build.0 = Debug|Any CPU
{BE36174C-0FC4-4A90-B12E-A1FEB1498244}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BE36174C-0FC4-4A90-B12E-A1FEB1498244}.Release|Any CPU.Build.0 = Release|Any CPU
{BE36174C-0FC4-4A90-B12E-A1FEB1498244}.Release|x64.ActiveCfg = Release|Any CPU
{BE36174C-0FC4-4A90-B12E-A1FEB1498244}.Release|x64.Build.0 = Release|Any CPU
{BE36174C-0FC4-4A90-B12E-A1FEB1498244}.Release|x86.ActiveCfg = Release|Any CPU
{BE36174C-0FC4-4A90-B12E-A1FEB1498244}.Release|x86.Build.0 = Release|Any CPU
{9E25D89B-0516-4C2A-A976-36A4DB2FEE3B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9E25D89B-0516-4C2A-A976-36A4DB2FEE3B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9E25D89B-0516-4C2A-A976-36A4DB2FEE3B}.Debug|x64.ActiveCfg = Debug|Any CPU
{9E25D89B-0516-4C2A-A976-36A4DB2FEE3B}.Debug|x64.Build.0 = Debug|Any CPU
{9E25D89B-0516-4C2A-A976-36A4DB2FEE3B}.Debug|x86.ActiveCfg = Debug|Any CPU
{9E25D89B-0516-4C2A-A976-36A4DB2FEE3B}.Debug|x86.Build.0 = Debug|Any CPU
{9E25D89B-0516-4C2A-A976-36A4DB2FEE3B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9E25D89B-0516-4C2A-A976-36A4DB2FEE3B}.Release|Any CPU.Build.0 = Release|Any CPU
{9E25D89B-0516-4C2A-A976-36A4DB2FEE3B}.Release|x64.ActiveCfg = Release|Any CPU
{9E25D89B-0516-4C2A-A976-36A4DB2FEE3B}.Release|x64.Build.0 = Release|Any CPU
{9E25D89B-0516-4C2A-A976-36A4DB2FEE3B}.Release|x86.ActiveCfg = Release|Any CPU
{9E25D89B-0516-4C2A-A976-36A4DB2FEE3B}.Release|x86.Build.0 = Release|Any CPU
{D5CA0DCD-7E61-4D59-B3C1-45797BFDE65C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D5CA0DCD-7E61-4D59-B3C1-45797BFDE65C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D5CA0DCD-7E61-4D59-B3C1-45797BFDE65C}.Debug|x64.ActiveCfg = Debug|Any CPU
{D5CA0DCD-7E61-4D59-B3C1-45797BFDE65C}.Debug|x64.Build.0 = Debug|Any CPU
{D5CA0DCD-7E61-4D59-B3C1-45797BFDE65C}.Debug|x86.ActiveCfg = Debug|Any CPU
{D5CA0DCD-7E61-4D59-B3C1-45797BFDE65C}.Debug|x86.Build.0 = Debug|Any CPU
{D5CA0DCD-7E61-4D59-B3C1-45797BFDE65C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D5CA0DCD-7E61-4D59-B3C1-45797BFDE65C}.Release|Any CPU.Build.0 = Release|Any CPU
{D5CA0DCD-7E61-4D59-B3C1-45797BFDE65C}.Release|x64.ActiveCfg = Release|Any CPU
{D5CA0DCD-7E61-4D59-B3C1-45797BFDE65C}.Release|x64.Build.0 = Release|Any CPU
{D5CA0DCD-7E61-4D59-B3C1-45797BFDE65C}.Release|x86.ActiveCfg = Release|Any CPU
{D5CA0DCD-7E61-4D59-B3C1-45797BFDE65C}.Release|x86.Build.0 = Release|Any CPU
{D25C1249-6732-425E-BE76-72A0E5351AAB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D25C1249-6732-425E-BE76-72A0E5351AAB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D25C1249-6732-425E-BE76-72A0E5351AAB}.Debug|x64.ActiveCfg = Debug|Any CPU
{D25C1249-6732-425E-BE76-72A0E5351AAB}.Debug|x64.Build.0 = Debug|Any CPU
{D25C1249-6732-425E-BE76-72A0E5351AAB}.Debug|x86.ActiveCfg = Debug|Any CPU
{D25C1249-6732-425E-BE76-72A0E5351AAB}.Debug|x86.Build.0 = Debug|Any CPU
{D25C1249-6732-425E-BE76-72A0E5351AAB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D25C1249-6732-425E-BE76-72A0E5351AAB}.Release|Any CPU.Build.0 = Release|Any CPU
{D25C1249-6732-425E-BE76-72A0E5351AAB}.Release|x64.ActiveCfg = Release|Any CPU
{D25C1249-6732-425E-BE76-72A0E5351AAB}.Release|x64.Build.0 = Release|Any CPU
{D25C1249-6732-425E-BE76-72A0E5351AAB}.Release|x86.ActiveCfg = Release|Any CPU
{D25C1249-6732-425E-BE76-72A0E5351AAB}.Release|x86.Build.0 = Release|Any CPU
{E9DAAAA6-2638-4388-9D11-8ADDC6A01C9C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E9DAAAA6-2638-4388-9D11-8ADDC6A01C9C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E9DAAAA6-2638-4388-9D11-8ADDC6A01C9C}.Debug|x64.ActiveCfg = Debug|Any CPU
{E9DAAAA6-2638-4388-9D11-8ADDC6A01C9C}.Debug|x64.Build.0 = Debug|Any CPU
{E9DAAAA6-2638-4388-9D11-8ADDC6A01C9C}.Debug|x86.ActiveCfg = Debug|Any CPU
{E9DAAAA6-2638-4388-9D11-8ADDC6A01C9C}.Debug|x86.Build.0 = Debug|Any CPU
{E9DAAAA6-2638-4388-9D11-8ADDC6A01C9C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E9DAAAA6-2638-4388-9D11-8ADDC6A01C9C}.Release|Any CPU.Build.0 = Release|Any CPU
{E9DAAAA6-2638-4388-9D11-8ADDC6A01C9C}.Release|x64.ActiveCfg = Release|Any CPU
{E9DAAAA6-2638-4388-9D11-8ADDC6A01C9C}.Release|x64.Build.0 = Release|Any CPU
{E9DAAAA6-2638-4388-9D11-8ADDC6A01C9C}.Release|x86.ActiveCfg = Release|Any CPU
{E9DAAAA6-2638-4388-9D11-8ADDC6A01C9C}.Release|x86.Build.0 = Release|Any CPU
{3A67944B-05B5-410C-93C7-654DC91E7F1C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3A67944B-05B5-410C-93C7-654DC91E7F1C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3A67944B-05B5-410C-93C7-654DC91E7F1C}.Debug|x64.ActiveCfg = Debug|Any CPU
{3A67944B-05B5-410C-93C7-654DC91E7F1C}.Debug|x64.Build.0 = Debug|Any CPU
{3A67944B-05B5-410C-93C7-654DC91E7F1C}.Debug|x86.ActiveCfg = Debug|Any CPU
{3A67944B-05B5-410C-93C7-654DC91E7F1C}.Debug|x86.Build.0 = Debug|Any CPU
{3A67944B-05B5-410C-93C7-654DC91E7F1C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3A67944B-05B5-410C-93C7-654DC91E7F1C}.Release|Any CPU.Build.0 = Release|Any CPU
{3A67944B-05B5-410C-93C7-654DC91E7F1C}.Release|x64.ActiveCfg = Release|Any CPU
{3A67944B-05B5-410C-93C7-654DC91E7F1C}.Release|x64.Build.0 = Release|Any CPU
{3A67944B-05B5-410C-93C7-654DC91E7F1C}.Release|x86.ActiveCfg = Release|Any CPU
{3A67944B-05B5-410C-93C7-654DC91E7F1C}.Release|x86.Build.0 = Release|Any CPU
{E5AB69B8-8D2A-406A-AB00-FBA6FBA0D0BA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E5AB69B8-8D2A-406A-AB00-FBA6FBA0D0BA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E5AB69B8-8D2A-406A-AB00-FBA6FBA0D0BA}.Debug|x64.ActiveCfg = Debug|Any CPU
{E5AB69B8-8D2A-406A-AB00-FBA6FBA0D0BA}.Debug|x64.Build.0 = Debug|Any CPU
{E5AB69B8-8D2A-406A-AB00-FBA6FBA0D0BA}.Debug|x86.ActiveCfg = Debug|Any CPU
{E5AB69B8-8D2A-406A-AB00-FBA6FBA0D0BA}.Debug|x86.Build.0 = Debug|Any CPU
{E5AB69B8-8D2A-406A-AB00-FBA6FBA0D0BA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E5AB69B8-8D2A-406A-AB00-FBA6FBA0D0BA}.Release|Any CPU.Build.0 = Release|Any CPU
{E5AB69B8-8D2A-406A-AB00-FBA6FBA0D0BA}.Release|x64.ActiveCfg = Release|Any CPU
{E5AB69B8-8D2A-406A-AB00-FBA6FBA0D0BA}.Release|x64.Build.0 = Release|Any CPU
{E5AB69B8-8D2A-406A-AB00-FBA6FBA0D0BA}.Release|x86.ActiveCfg = Release|Any CPU
{E5AB69B8-8D2A-406A-AB00-FBA6FBA0D0BA}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{E02A1282-A92D-4958-83BD-BA887843BB3E} = {8FF8A4B3-FCC0-4892-B5FF-F11C827AB1F3}
{BE36174C-0FC4-4A90-B12E-A1FEB1498244} = {8FF8A4B3-FCC0-4892-B5FF-F11C827AB1F3}
{9E25D89B-0516-4C2A-A976-36A4DB2FEE3B} = {8FF8A4B3-FCC0-4892-B5FF-F11C827AB1F3}
{D5CA0DCD-7E61-4D59-B3C1-45797BFDE65C} = {8FF8A4B3-FCC0-4892-B5FF-F11C827AB1F3}
{D25C1249-6732-425E-BE76-72A0E5351AAB} = {8FF8A4B3-FCC0-4892-B5FF-F11C827AB1F3}
{E9DAAAA6-2638-4388-9D11-8ADDC6A01C9C} = {8FF8A4B3-FCC0-4892-B5FF-F11C827AB1F3}
{3A67944B-05B5-410C-93C7-654DC91E7F1C} = {8FF8A4B3-FCC0-4892-B5FF-F11C827AB1F3}
{E5AB69B8-8D2A-406A-AB00-FBA6FBA0D0BA} = {8FF8A4B3-FCC0-4892-B5FF-F11C827AB1F3}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {76767832-3C78-4846-B241-FBCCD4996F8D}
EndGlobalSection
EndGlobal
Loading

0 comments on commit 67bced3

Please sign in to comment.