Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart(Source|Flow|Sink): Configurable stream restart deadline #5122

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/articles/streams/error-handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ when a WebSocket connection fails due to the HTTP server it's running on going d
By using an exponential backoff, we avoid going into a tight reconnect look, which both gives the HTTP server some time
to recover, and it avoids using needless resources on the client side.

The various restart shapes mentioned all expect an `Akka.Stream.RestartSettings` which configures the restart behavior.

Configurable parameters are:

* `minBackoff` is the initial duration until the underlying stream is restarted
* `maxBackoff` caps the exponential backoff
* `randomFactor` allows addition of a random delay following backoff calculation
* `maxRestarts` caps the total number of restarts
* `maxRestartsWithin` sets a timeframe during which restarts are counted towards the same total for `maxRestarts`

The following snippet shows how to create a backoff supervisor using `Akka.Streams.Dsl.RestartSource`
which will supervise the given `Source`. The `Source` in this case is a
`HttpResponseMessage`, produced by `HttpCLient`. If the stream fails or completes at any point, the request will
Expand Down
29 changes: 29 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,20 @@ namespace Akka.Streams
public RemoteStreamRefActorTerminatedException(string message) { }
protected RemoteStreamRefActorTerminatedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
}
public class RestartSettings
{
public System.TimeSpan MaxBackoff { get; }
public int MaxRestarts { get; }
public System.TimeSpan MaxRestartsWithin { get; }
public System.TimeSpan MinBackoff { get; }
public double RandomFactor { get; }
public static Akka.Streams.RestartSettings Create(System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { }
public override string ToString() { }
public Akka.Streams.RestartSettings WithMaxBackoff(System.TimeSpan value) { }
public Akka.Streams.RestartSettings WithMaxRestarts(int count, System.TimeSpan within) { }
public Akka.Streams.RestartSettings WithMinBackoff(System.TimeSpan value) { }
public Akka.Streams.RestartSettings WithRandomFactor(double value) { }
}
public abstract class Shape : System.ICloneable
{
protected Shape() { }
Expand Down Expand Up @@ -1763,22 +1777,37 @@ namespace Akka.Streams.Dsl
}
public class static RestartFlow
{
[System.ObsoleteAttribute("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")]
public static Akka.Streams.Dsl.Flow<TIn, TOut, Akka.NotUsed> OnFailuresWithBackoff<TIn, TOut, TMat>(System.Func<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> flowFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { }
[System.ObsoleteAttribute("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")]
public static Akka.Streams.Dsl.Flow<TIn, TOut, Akka.NotUsed> OnFailuresWithBackoff<TIn, TOut, TMat>(System.Func<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> flowFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor, int maxRestarts) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, Akka.NotUsed> OnFailuresWithBackoff<TIn, TOut, TMat>(System.Func<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> flowFactory, Akka.Streams.RestartSettings settings) { }
[System.ObsoleteAttribute("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")]
public static Akka.Streams.Dsl.Flow<TIn, TOut, Akka.NotUsed> WithBackoff<TIn, TOut, TMat>(System.Func<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> flowFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { }
[System.ObsoleteAttribute("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")]
public static Akka.Streams.Dsl.Flow<TIn, TOut, Akka.NotUsed> WithBackoff<TIn, TOut, TMat>(System.Func<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> flowFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor, int maxRestarts) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, Akka.NotUsed> WithBackoff<TIn, TOut, TMat>(System.Func<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> flowFactory, Akka.Streams.RestartSettings settings) { }
}
public class static RestartSink
{
[System.ObsoleteAttribute("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")]
public static Akka.Streams.Dsl.Sink<T, Akka.NotUsed> WithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Sink<T, TMat>> sinkFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { }
[System.ObsoleteAttribute("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")]
public static Akka.Streams.Dsl.Sink<T, Akka.NotUsed> WithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Sink<T, TMat>> sinkFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor, int maxRestarts) { }
public static Akka.Streams.Dsl.Sink<T, Akka.NotUsed> WithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Sink<T, TMat>> sinkFactory, Akka.Streams.RestartSettings settings) { }
}
public class static RestartSource
{
[System.ObsoleteAttribute("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")]
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> OnFailuresWithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Source<T, TMat>> sourceFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { }
[System.ObsoleteAttribute("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")]
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> OnFailuresWithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Source<T, TMat>> sourceFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor, int maxRestarts) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> OnFailuresWithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Source<T, TMat>> sourceFactory, Akka.Streams.RestartSettings settings) { }
[System.ObsoleteAttribute("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")]
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> WithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Source<T, TMat>> sourceFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { }
[System.ObsoleteAttribute("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")]
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> WithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Source<T, TMat>> sourceFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor, int maxRestarts) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> WithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Source<T, TMat>> sourceFactory, Akka.Streams.RestartSettings settings) { }
}
public class static Retry
{
Expand Down
25 changes: 12 additions & 13 deletions src/core/Akka.Docs.Tests/Streams/RestartDocTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@
using System;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
using Akka;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.TestKit.Xunit2;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

Expand All @@ -39,19 +36,21 @@ public void Restart_stages_should_demonstrate_a_restart_with_backoff_source()
#region restart-with-backoff-source
var httpClient = new HttpClient();

var restartSource = RestartSource.WithBackoff(() =>
{
// Create a source from a task
return Source.FromTask(
httpClient.GetAsync("http://example.com/eventstream") // Make a single request
)
.Select(c => c.Content.ReadAsStringAsync())
.Select(c => c.Result);
},
var settings = RestartSettings.Create(
minBackoff: TimeSpan.FromSeconds(3),
maxBackoff: TimeSpan.FromSeconds(30),
randomFactor: 0.2 // adds 20% "noise" to vary the intervals slightly
);
).WithMaxRestarts(20, TimeSpan.FromMinutes(5)); // limits the amount of restarts to 20 within 5 minutes

var restartSource = RestartSource.WithBackoff(() =>
{
// Create a source from a task
return Source.FromTask(
httpClient.GetAsync("http://example.com/eventstream") // Make a single request
)
.Select(c => c.Content.ReadAsStringAsync())
.Select(c => c.Result);
}, settings);
#endregion

#region with-kill-switch
Expand Down
Loading