Skip to content

Commit

Permalink
Restart(Source|Flow|Sink): Configurable stream restart deadline (#5122)
Browse files Browse the repository at this point in the history
* Configurable stream restart deadline

* Update documentation and code snippets
  • Loading branch information
ismaelhamed authored Jul 13, 2021
1 parent 1927ac1 commit d81767e
Show file tree
Hide file tree
Showing 6 changed files with 601 additions and 262 deletions.
10 changes: 10 additions & 0 deletions docs/articles/streams/error-handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ when a WebSocket connection fails due to the HTTP server it's running on going d
By using an exponential backoff, we avoid going into a tight reconnect look, which both gives the HTTP server some time
to recover, and it avoids using needless resources on the client side.

The various restart shapes mentioned all expect an `Akka.Stream.RestartSettings` which configures the restart behavior.

Configurable parameters are:

* `minBackoff` is the initial duration until the underlying stream is restarted
* `maxBackoff` caps the exponential backoff
* `randomFactor` allows addition of a random delay following backoff calculation
* `maxRestarts` caps the total number of restarts
* `maxRestartsWithin` sets a timeframe during which restarts are counted towards the same total for `maxRestarts`

The following snippet shows how to create a backoff supervisor using `Akka.Streams.Dsl.RestartSource`
which will supervise the given `Source`. The `Source` in this case is a
`HttpResponseMessage`, produced by `HttpCLient`. If the stream fails or completes at any point, the request will
Expand Down
29 changes: 29 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,20 @@ namespace Akka.Streams
public RemoteStreamRefActorTerminatedException(string message) { }
protected RemoteStreamRefActorTerminatedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
}
public class RestartSettings
{
public System.TimeSpan MaxBackoff { get; }
public int MaxRestarts { get; }
public System.TimeSpan MaxRestartsWithin { get; }
public System.TimeSpan MinBackoff { get; }
public double RandomFactor { get; }
public static Akka.Streams.RestartSettings Create(System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { }
public override string ToString() { }
public Akka.Streams.RestartSettings WithMaxBackoff(System.TimeSpan value) { }
public Akka.Streams.RestartSettings WithMaxRestarts(int count, System.TimeSpan within) { }
public Akka.Streams.RestartSettings WithMinBackoff(System.TimeSpan value) { }
public Akka.Streams.RestartSettings WithRandomFactor(double value) { }
}
public abstract class Shape : System.ICloneable
{
protected Shape() { }
Expand Down Expand Up @@ -1763,22 +1777,37 @@ namespace Akka.Streams.Dsl
}
public class static RestartFlow
{
[System.ObsoleteAttribute("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")]
public static Akka.Streams.Dsl.Flow<TIn, TOut, Akka.NotUsed> OnFailuresWithBackoff<TIn, TOut, TMat>(System.Func<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> flowFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { }
[System.ObsoleteAttribute("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")]
public static Akka.Streams.Dsl.Flow<TIn, TOut, Akka.NotUsed> OnFailuresWithBackoff<TIn, TOut, TMat>(System.Func<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> flowFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor, int maxRestarts) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, Akka.NotUsed> OnFailuresWithBackoff<TIn, TOut, TMat>(System.Func<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> flowFactory, Akka.Streams.RestartSettings settings) { }
[System.ObsoleteAttribute("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")]
public static Akka.Streams.Dsl.Flow<TIn, TOut, Akka.NotUsed> WithBackoff<TIn, TOut, TMat>(System.Func<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> flowFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { }
[System.ObsoleteAttribute("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")]
public static Akka.Streams.Dsl.Flow<TIn, TOut, Akka.NotUsed> WithBackoff<TIn, TOut, TMat>(System.Func<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> flowFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor, int maxRestarts) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, Akka.NotUsed> WithBackoff<TIn, TOut, TMat>(System.Func<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> flowFactory, Akka.Streams.RestartSettings settings) { }
}
public class static RestartSink
{
[System.ObsoleteAttribute("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")]
public static Akka.Streams.Dsl.Sink<T, Akka.NotUsed> WithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Sink<T, TMat>> sinkFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { }
[System.ObsoleteAttribute("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")]
public static Akka.Streams.Dsl.Sink<T, Akka.NotUsed> WithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Sink<T, TMat>> sinkFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor, int maxRestarts) { }
public static Akka.Streams.Dsl.Sink<T, Akka.NotUsed> WithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Sink<T, TMat>> sinkFactory, Akka.Streams.RestartSettings settings) { }
}
public class static RestartSource
{
[System.ObsoleteAttribute("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")]
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> OnFailuresWithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Source<T, TMat>> sourceFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { }
[System.ObsoleteAttribute("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")]
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> OnFailuresWithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Source<T, TMat>> sourceFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor, int maxRestarts) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> OnFailuresWithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Source<T, TMat>> sourceFactory, Akka.Streams.RestartSettings settings) { }
[System.ObsoleteAttribute("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")]
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> WithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Source<T, TMat>> sourceFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { }
[System.ObsoleteAttribute("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")]
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> WithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Source<T, TMat>> sourceFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor, int maxRestarts) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> WithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Source<T, TMat>> sourceFactory, Akka.Streams.RestartSettings settings) { }
}
public class static Retry
{
Expand Down
25 changes: 12 additions & 13 deletions src/core/Akka.Docs.Tests/Streams/RestartDocTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@
using System;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
using Akka;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.TestKit.Xunit2;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

Expand All @@ -39,19 +36,21 @@ public void Restart_stages_should_demonstrate_a_restart_with_backoff_source()
#region restart-with-backoff-source
var httpClient = new HttpClient();

var restartSource = RestartSource.WithBackoff(() =>
{
// Create a source from a task
return Source.FromTask(
httpClient.GetAsync("http://example.com/eventstream") // Make a single request
)
.Select(c => c.Content.ReadAsStringAsync())
.Select(c => c.Result);
},
var settings = RestartSettings.Create(
minBackoff: TimeSpan.FromSeconds(3),
maxBackoff: TimeSpan.FromSeconds(30),
randomFactor: 0.2 // adds 20% "noise" to vary the intervals slightly
);
).WithMaxRestarts(20, TimeSpan.FromMinutes(5)); // limits the amount of restarts to 20 within 5 minutes

var restartSource = RestartSource.WithBackoff(() =>
{
// Create a source from a task
return Source.FromTask(
httpClient.GetAsync("http://example.com/eventstream") // Make a single request
)
.Select(c => c.Content.ReadAsStringAsync())
.Select(c => c.Result);
}, settings);
#endregion

#region with-kill-switch
Expand Down
Loading

0 comments on commit d81767e

Please sign in to comment.