Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved Streams Error-Handling docs #3085

Merged
merged 4 commits into from
Sep 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 113 additions & 36 deletions docs/articles/streams/error-handling.md
Original file line number Diff line number Diff line change
@@ -1,30 +1,107 @@
---
layout: docs.hbs
title: Error Handling
uid: error-handling
title: Error Handling in Streams
---
# Error Handling in Streams

Strategies for how to handle exceptions from processing stream elements can be defined when
materializing the stream. The error handling strategies are inspired by actor supervision
strategies, but the semantics have been adapted to the domain of stream processing.
When a stage in a stream fails this will normally lead to the entire stream being torn down. Each of the stages downstream gets informed about the failure and each upstream stage sees a cancellation.

> [!WARNING]
> *ZipWith*, *GraphStage* junction, *ActorPublisher* source and *ActorSubscriber* sink
components do not honour the supervision strategy attribute yet.
In many cases you may want to avoid complete stream failure, this can be done in a few different ways:

# Supervision Strategies
- `Recover` to emit a final element then complete the stream normally on upstream failure
- `RecoverWithRetries` to create a new upstream and start consuming from that on failure
- Using a supervision strategy for stages that support it

There are three ways to handle exceptions from application code:
In addition to these built in tools for error handling, a common pattern is to wrap the stream inside an actor, and have the actor restart the entire stream on failure.

## Recover
`Recover` allows you to emit a final element and then complete the stream on an upstream failure. Deciding which exceptions should be recovered is done through a `delegate`. If an exception does not have a matching case the stream is failed.

Recovering can be useful if you want to gracefully complete a stream on failure while letting downstream know that there was a failure.

```C#
Source.From(Enumerable.Range(0, 6)).Select(n =>
{
if (n < 5)
return n.ToString();

throw new ArithmeticException("Boom!");
})
.Recover(exception =>
{
if (exception is ArithmeticException)
return new Option<string>("stream truncated");
return Option<string>.None;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Silv3rcircl3 I wondered that we have such API. Why not just string?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option<> is like Nullable<>, but it can also work with non-value types. We needed a universal primitive that will notify about value absence and will work with both (classes and structs).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also note that NULL is not allowed as element, so return null would fail the stream.

Copy link
Contributor Author

@alexvaluyskiy alexvaluyskiy Sep 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Silv3rcircl3 Is null accepted in RecoveryWithRetries?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, because there you return a complete new source and null indicates that you don't want to recover but rather fail the stream. By element I mean a single message that is processed by the stream, i.e. Select(x => null as string) would fail the stream

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks

})
.RunForeach(Console.WriteLine, materializer);
```
This will output:
```
0
1
2
3
4
stream truncated
```

## Recover with retries
`RecoverWithRetries` allows you to put a new upstream in place of the failed one, recovering stream failures up to a specified maximum number of times.

Deciding which exceptions should be recovered is done through a `delegate`. If an exception does not have a matching case the stream is failed.

```scala
var planB = Source.From(new List<string> {"five", "six", "seven", "eight"});

Source.From(Enumerable.Range(0, 10)).Select(n =>
{
if (n < 5)
return n.ToString();

* ``Stop`` - The stream is completed with failure.
* ``Resume`` - The element is dropped and the stream continues.
* ``Restart`` - The element is dropped and the stream continues after restarting the stage.
Restarting a stage means that any accumulated state is cleared. This is typically
performed by creating a new instance of the stage.
throw new ArithmeticException("Boom!");
})
.RecoverWithRetries(attempts: 1, partialFunc: exception =>
{
if (exception is ArithmeticException)
return planB;
return null;
})
.RunForeach(Console.WriteLine, materializer);
```

This will output:

```
0
1
2
3
4
five
six
seven
eight
```

## Supervision Strategies

> [!NOTE]
> The stages that support supervision strategies are explicitly documented to do so, if there is nothing in the documentation of a stage saying that it adheres to the supervision strategy it means it fails rather than applies supervision..

The error handling strategies are inspired by actor supervision strategies, but the semantics have been adapted to the domain of stream processing. The most important difference is that supervision is not automatically applied to stream stages but instead something that each stage has to implement explicitly.

For many stages it may not even make sense to implement support for supervision strategies, this is especially true for stages connecting to external technologies where for example a failed connection will likely still fail if a new connection is tried immediately.

For stages that do implement supervision, the strategies for how to handle exceptions from processing stream elements can be selected when materializing the stream through use of an attribute.

There are three ways to handle exceptions from application code:
- `Stop` - The stream is completed with failure.
- `Resume` - The element is dropped and the stream continues.
- `Restart` - The element is dropped and the stream continues after restarting the stage. Restarting a stage means that any accumulated state is cleared. This is typically performed by creating a new instance of the stage.

By default the stopping strategy is used for all exceptions, i.e. the stream will be completed with
failure when an exception is thrown.
By default the stopping strategy is used for all exceptions, i.e. the stream will be completed with failure when an exception is thrown.

```csharp
```C#
var source = Source.From(Enumerable.Range(0, 6)).Select(x => 100/x);
var result = source.RunWith(Sink.Aggregate<int, int>(0, (sum, i) => sum + i), materializer);
// division by zero will fail the stream and the
Expand All @@ -33,7 +110,7 @@ var result = source.RunWith(Sink.Aggregate<int, int>(0, (sum, i) => sum + i), ma

The default supervision strategy for a stream can be defined on the settings of the materializer.

```csharp
```C#
Decider decider = cause => cause is DivideByZeroException
? Directive.Resume
: Directive.Stop;
Expand All @@ -46,33 +123,33 @@ var result = source.RunWith(Sink.Aggregate<int, int>(0, (sum, i) => sum + i), ma
// result here will be a Task completed with Success(228)
```

Here you can see that all ``DivideByZeroException`` will resume the processing, i.e. the
Here you can see that all `DivideByZeroException` will resume the processing, i.e. the
elements that cause the division by zero are effectively dropped.

> [!NOTE]
> Be aware that dropping elements may result in deadlocks in graphs with cycles, as explained in [Graph cycles, liveness and deadlocks](workingwithgraphs#graph-cycles-liveness-and-deadlocks).
> Be aware that dropping elements may result in deadlocks in graphs with cycles, as explained in [Graph cycles, liveness and deadlocks](xref:working-with-graphs#graph-cycles-liveness-and-deadlocks).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed the link


The supervision strategy can also be defined for all operators of a flow.

```csharp
```C#
Decider decider = cause => cause is DivideByZeroException
? Directive.Resume
: Directive.Stop;

var flow = Flow.Create<int>()
.Where(x => 100/x < 50)
.Select(x => 100/(5 - x))
.Where(x => 100 / x < 50)
.Select(x => 100 / (5 - x))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(decider));
var source = Source.From(Enumerable.Range(0, 6)).Via(flow);
var result = source.RunWith(Sink.Aggregate<int, int>(0, (sum, i) => sum + i), materializer);
// the elements causing division by zero will be dropped
// result here will be a Future completed with Success(150)
```

``Restart`` works in a similar way as ``Resume`` with the addition that accumulated state,
`Restart` works in a similar way as `Resume` with the addition that accumulated state,
if any, of the failing processing stage will be reset.

```csharp
```C#
Decider decider = cause => cause is ArgumentException
? Directive.Restart
: Directive.Stop;
Expand All @@ -92,38 +169,38 @@ var result = source.Limit(1000).RunWith(Sink.Seq<int>(), materializer);
// result here will be a Task completed with Success(List(0, 1, 4, 0, 5, 12))
```

# Errors from SelectAsync
Stream supervision can also be applied to the tasks of ``SelectAsync``.
## Errors from SelectAsync
Stream supervision can also be applied to the tasks of `SelectAsync` and `SelectAsyncUnordered` even if such failures happen in the task rather than inside the stage itself. .

Let's say that we use an external service to lookup email addresses and we would like to
discard those that cannot be found.

We start with the tweet stream of authors:

```csharp
```C#
var authors = tweets
.Where(t => t.HashTags.Contains("Akka.Net"))
.Select(t => t.Author);
```

Assume that we can lookup their email address using:

```csharp
```C#
Task<string> LookupEmail(string handle)
```

The ``Task`` is completed with ``Failure`` if the email is not found.
The `Task` is completed with `Failure` if the email is not found.

Transforming the stream of authors to a stream of email addresses by using the ``LookupEmail``
service can be done with ``SelectAsync`` and we use ``Deciders.ResumingDecider`` to drop
Transforming the stream of authors to a stream of email addresses by using the `LookupEmail`
service can be done with `SelectAsync` and we use `Deciders.ResumingDecider` to drop
unknown email addresses:

```csharp
```c#
var emailAddresses = authors.Via(
Flow.Create<Author>()
.SelectAsync(4, author => AddressSystem.LookupEmail(author.Handle))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)));
```

If we would not use ``Resume`` the default stopping strategy would complete the stream
with failure on the first ``Task`` that was completed with ``Failure``.
If we would not use `Resume` the default stopping strategy would complete the stream
with failure on the first `Task` that was completed with `Failure`.
2 changes: 1 addition & 1 deletion docs/articles/streams/integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ Please note that a factory is necessary to achieve reusability of the resulting

As described above any Akka Streams ``Source`` can be exposed as a Reactive Streams ``Publisher``
and any ``Sink`` can be exposed as a Reactive Streams ``Subscriber``. Therefore we recommend that you
implement Reactive Streams integrations with built-in stages or [custom stages](customstreamprocessing.md).
implement Reactive Streams integrations with built-in stages or [custom stages](xref:custom-stream-processing).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the link


For historical reasons the `ActorPublisher` and `ActorSubscriber` are
provided to support implementing Reactive Streams `Publisher` class and `Subscriber` class with
Expand Down
4 changes: 2 additions & 2 deletions docs/articles/streams/workingwithgraphs.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
layout: docs.hbs
uid: working-with-graphs
title: Working with Graphs
---

Expand Down Expand Up @@ -509,7 +509,7 @@ public static IMessage FromBytes(ByteString bytes)

In this way you could easily integrate any other serialization library that turns an object into a sequence of bytes.

The other stage that we talked about is a little more involved since reversing a framing protocol means that any received chunk of bytes may correspond to zero or more messages. This is best implemented using a `GraphStage` (see also [Custom processing with GraphStage](customstreamprocessing.md#custom-processing-with-graphstage)).
The other stage that we talked about is a little more involved since reversing a framing protocol means that any received chunk of bytes may correspond to zero or more messages. This is best implemented using a `GraphStage` (see also [Custom processing with GraphStage](xref:custom-stream-processing#custom-processing-with-graphstage)).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed the link


```csharp
public static ByteString AddLengthHeader(ByteString bytes, ByteOrder order)
Expand Down