Skip to content

Commit

Permalink
KillSwitches: flow stage from CancellationToken (#3568)
Browse files Browse the repository at this point in the history
* KillSwitches: flow stage from CancellationToken

* removed cancellation token from materializer value type

* added API approvals and docs
  • Loading branch information
Horusiath authored and marcpiechura committed Aug 18, 2018
1 parent 1234561 commit aa01127
Show file tree
Hide file tree
Showing 4 changed files with 338 additions and 0 deletions.
6 changes: 6 additions & 0 deletions docs/articles/streams/stream-dynamic.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ by the switch. Refer to the below for usage examples.
> [!NOTE]
> A `UniqueKillSwitch` is always a result of a materialization, whilst `SharedKillSwitch` needs to be constructed before any materialization takes place.
### Using `CancellationToken`s as kill switches

Plain old .NET cancellation tokens can also be used as kill switch stages via extension method: `cancellationToken.AsFlow(cancelGracefully: true)`. Their behavior is very similar to what a `SharedKillSwitch` has to offer with one exception - while normal kill switch recognizes difference between closing a stream gracefully (via. `Shutdown()`) and abruptly (via. `Abort(exception)`), .NET cancellation tokens have no such distinction.

Therefore you need to explicitly specify at the moment of defining a flow stage, if cancellation token call should cause stream to close with completion or failure, by using `cancelGracefully` parameter. If it's set to `false`, calling cancel on a token's source will cause stream to fail with an `OperationCanceledException`.

## Dynamic fan-in and fan-out with MergeHub and BroadcastHub

There are many cases when consumers or producers of a certain service (represented as a Sink, Source, or possibly Flow) are dynamic and not known in advance. The Graph DSL does not allow to represent this, all connections of the graph must be known in advance and must be connected upfront. To allow dynamic fan-in and fan-out streaming, the Hubs should be used. They provide means to construct Sink and Source pairs that are “attached” to each other, but one of them can be materialized multiple times to implement dynamic fan-in or fan-out.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ namespace Akka.Streams
}
public class static KillSwitches
{
public static Akka.Streams.IGraph<Akka.Streams.FlowShape<T, T>, Akka.NotUsed> AsFlow<T>(this System.Threading.CancellationToken cancellationToken, bool cancelGracefully = False) { }
public static Akka.Streams.SharedKillSwitch Shared(string name) { }
public static Akka.Streams.IGraph<Akka.Streams.FlowShape<T, T>, Akka.Streams.UniqueKillSwitch> Single<T>() { }
public static Akka.Streams.IGraph<Akka.Streams.BidiShape<T1, T1, T2, T2>, Akka.Streams.UniqueKillSwitch> SingleBidi<T1, T2>() { }
Expand Down
237 changes: 237 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowKillSwitchSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

using System;
using System.Linq;
using System.Threading;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.Streams.TestKit.Tests;
Expand All @@ -28,6 +29,8 @@ public FlowKillSwitchSpec(ITestOutputHelper helper) : base(helper)
Materializer = ActorMaterializer.Create(Sys);
}

#region unique kill switch

[Fact]
public void A_UniqueKillSwitch_must_stop_a_stream_if_requested()
{
Expand Down Expand Up @@ -123,6 +126,9 @@ public void A_UniqueKillSwitch_must_ignore_completion_after_already_completed()
downstream.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
}

#endregion

#region shared kill switch

[Fact]
public void A_SharedKillSwitch_must_stop_a_stream_if_requested()
Expand Down Expand Up @@ -491,5 +497,236 @@ public void A_SharedKillSwitch_must_use_its_name_on_the_flows_it_hands_out()
killSwitch.Flow<int>().ToString().Should().Be("Flow(KillSwitch(MySwitchName))");
}, Materializer);
}

#endregion

#region cancellable kill switch

[Fact]
public void A_CancellationToken_flow_must_stop_a_stream_if_requested()
{
this.AssertAllStagesStopped(() =>
{
var cancel = new CancellationTokenSource();

var t = this.SourceProbe<int>()
.Via(cancel.Token.AsFlow<int>(cancelGracefully: true))
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
var upstream = t.Item1;
var downstream = t.Item2;

downstream.Request(1);
upstream.SendNext(1);
downstream.ExpectNext(1);

cancel.Cancel();
upstream.ExpectCancellation();
downstream.ExpectComplete();
}, Materializer);
}

[Fact]
public void A_CancellationToken_flow_must_fail_a_stream_if_requested()
{
this.AssertAllStagesStopped(() =>
{
var cancel = new CancellationTokenSource();

var t = this.SourceProbe<int>()
.Via(cancel.Token.AsFlow<int>(cancelGracefully: false))
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
var upstream = t.Item1;
var downstream = t.Item2;

downstream.Request(1);
upstream.SendNext(1);
downstream.ExpectNext(1);

cancel.Cancel();
upstream.ExpectCancellation();
downstream.ExpectError().Should().BeOfType<OperationCanceledException>();
}, Materializer);
}

[Fact]
public void A_CancellationToken_flow_must_pass_through_all_elements_unmodified()
{
this.AssertAllStagesStopped(() =>
{
var cancel = new CancellationTokenSource();
var task = Source.From(Enumerable.Range(1, 100))
.Via(cancel.Token.AsFlow<int>())
.RunWith(Sink.Seq<int>(), Materializer);
task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
task.Result.ShouldAllBeEquivalentTo(Enumerable.Range(1, 100));
}, Materializer);
}

[Fact]
public void A_CancellationToken_flow_must_provide_a_flow_that_if_materialized_multiple_times_with_multiple_types_stops_all_streams_if_requested()
{
this.AssertAllStagesStopped(() =>
{
var cancel = new CancellationTokenSource();

var t1 = this.SourceProbe<int>()
.Via(cancel.Token.AsFlow<int>(cancelGracefully: true))
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
var t2 = this.SourceProbe<string>()
.Via(cancel.Token.AsFlow<string>(cancelGracefully: true))
.ToMaterialized(this.SinkProbe<string>(), Keep.Both)
.Run(Materializer);

var upstream1 = t1.Item1;
var downstream1 = t1.Item2;
var upstream2 = t2.Item1;
var downstream2 = t2.Item2;

downstream1.Request(1);
upstream1.SendNext(1);
downstream1.ExpectNext(1);

downstream2.Request(2);
upstream2.SendNext("A").SendNext("B");
downstream2.ExpectNext("A", "B");

cancel.Cancel();

upstream1.ExpectCancellation();
upstream2.ExpectCancellation();
downstream1.ExpectComplete();
downstream2.ExpectComplete();
}, Materializer);
}

[Fact]
public void A_CancellationToken_flow_must_provide_a_flow_that_if_materialized_multiple_times_with_multiple_types_fails_all_streams_if_requested()
{
this.AssertAllStagesStopped(() =>
{
var cancel = new CancellationTokenSource();

var t1 = this.SourceProbe<int>()
.Via(cancel.Token.AsFlow<int>(cancelGracefully: false))
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
var t2 = this.SourceProbe<string>()
.Via(cancel.Token.AsFlow<string>(cancelGracefully: false))
.ToMaterialized(this.SinkProbe<string>(), Keep.Both)
.Run(Materializer);

var upstream1 = t1.Item1;
var downstream1 = t1.Item2;
var upstream2 = t2.Item1;
var downstream2 = t2.Item2;

downstream1.Request(1);
upstream1.SendNext(1);
downstream1.ExpectNext(1);

downstream2.Request(2);
upstream2.SendNext("A").SendNext("B");
downstream2.ExpectNext("A", "B");

cancel.Cancel();
upstream1.ExpectCancellation();
upstream2.ExpectCancellation();

downstream1.ExpectError().Should().BeOfType<OperationCanceledException>();
downstream2.ExpectError().Should().BeOfType<OperationCanceledException>();
}, Materializer);
}

[Fact]
public void A_CancellationToken_flow_must_ignore_subsequent_aborts_and_shutdowns_after_shutdown()
{
this.AssertAllStagesStopped(() =>
{
var cancel = new CancellationTokenSource();

var t = this.SourceProbe<int>()
.Via(cancel.Token.AsFlow<int>(cancelGracefully: true))
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
var upstream = t.Item1;
var downstream = t.Item2;

downstream.Request(1);
upstream.SendNext(1);
downstream.ExpectNext(1);

cancel.Cancel();
upstream.ExpectCancellation();
downstream.ExpectComplete();

cancel.Cancel();
upstream.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
downstream.ExpectNoMsg(TimeSpan.FromMilliseconds(100));

cancel.Cancel();
upstream.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
downstream.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
}, Materializer);
}

[Fact]
public void A_CancellationToken_flow_must_complete_immediately_flows_materialized_after_switch_shutdown()
{
this.AssertAllStagesStopped(() =>
{
var cancel = new CancellationTokenSource();
cancel.Cancel();

var t = this.SourceProbe<int>()
.Via(cancel.Token.AsFlow<int>(cancelGracefully: true))
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
var upstream = t.Item1;
var downstream = t.Item2;

upstream.ExpectCancellation();
downstream.ExpectSubscriptionAndComplete();
}, Materializer);
}

[Fact]
public void A_CancellationToken_flow_must_fail_immediately_flows_materialized_after_switch_failure()
{
this.AssertAllStagesStopped(() =>
{
var cancel = new CancellationTokenSource();
cancel.Cancel();

var t = this.SourceProbe<int>()
.Via(cancel.Token.AsFlow<int>(cancelGracefully: false))
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
var upstream = t.Item1;
var downstream = t.Item2;

upstream.ExpectCancellation();
downstream.ExpectSubscriptionAndError().Should().BeOfType<OperationCanceledException>();
}, Materializer);
}

[Fact]
public void A_CancellationToken_flow_should_not_cause_problems_if_switch_is_shutdown_after_flow_completed_normally()
{
this.AssertAllStagesStopped(() =>
{
var cancel = new CancellationTokenSource();
var task = Source.From(Enumerable.Range(1, 10))
.Via(cancel.Token.AsFlow<int>(cancelGracefully: true))
.RunWith(Sink.Seq<int>(), Materializer);
task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
task.Result.ShouldAllBeEquivalentTo(Enumerable.Range(1, 10));
cancel.Cancel();
}, Materializer);
}

#endregion
}
}
94 changes: 94 additions & 0 deletions src/core/Akka.Streams/KillSwitch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
//-----------------------------------------------------------------------

using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Akka.Streams.Stage;

Expand Down Expand Up @@ -52,6 +54,98 @@ public static class KillSwitches
public static IGraph<BidiShape<T1, T1, T2, T2>, UniqueKillSwitch> SingleBidi<T1, T2>
() => UniqueBidiKillSwitchStage<T1, T2>.Instance;

/// <summary>
/// Returns a flow, which works like a kill switch stage based on a provided <paramref name="cancellationToken"/>.
/// Since unlike cancellation tokens, kill switches expose ability to finish a stream either gracefully via
/// <see cref="IKillSwitch.Shutdown"/> or abruptly via <see cref="IKillSwitch.Abort"/>, this distinction is
/// handled by specifying <paramref name="cancelGracefully"/> parameter.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="cancellationToken">Cancellation token used to create a cancellation flow.</param>
/// <param name="cancelGracefully">
/// When set to true, will close stream gracefully via completting the stage.
/// When set to false, will close stream by failing the stage with <see cref="OperationCanceledException"/>.
/// </param>
/// <returns></returns>
public static IGraph<FlowShape<T, T>, NotUsed> AsFlow<T>(this CancellationToken cancellationToken, bool cancelGracefully = false)
{
return new CancellableKillSwitchStage<T>(cancellationToken, cancelGracefully);
}

internal sealed class CancellableKillSwitchStage<T> : GraphStage<FlowShape<T, T>>
{
#region logic

private sealed class Logic : InAndOutGraphStageLogic
{
private readonly CancellableKillSwitchStage<T> _stage;
private CancellationTokenRegistration? _registration = null;

public Logic(CancellableKillSwitchStage<T> stage)
: base(stage.Shape)
{
_stage = stage;
SetHandler(stage.Inlet, this);
SetHandler(stage.Outlet, this);
}

public override void PreStart()
{
if (_stage._cancellationToken.IsCancellationRequested)
{
if (_stage._cancelGracefully)
OnCancelComplete();
else
OnCancelFail();
}
else
{
var onCancel = _stage._cancelGracefully
? GetAsyncCallback(OnCancelComplete)
: GetAsyncCallback(OnCancelFail);

_registration = _stage._cancellationToken.Register(onCancel);
}
}

public override void PostStop()
{
_registration?.Dispose();
base.PostStop();
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public override void OnPush() => Push(_stage.Outlet, Grab(_stage.Inlet));

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public override void OnPull() => Pull(_stage.Inlet);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void OnCancelComplete() => CompleteStage();

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void OnCancelFail() => FailStage(new OperationCanceledException($"Stage cancelled due to cancellation token request.", _stage._cancellationToken));
}

#endregion

private readonly CancellationToken _cancellationToken;
private readonly bool _cancelGracefully;

public CancellableKillSwitchStage(CancellationToken cancellationToken, bool cancelGracefully)
{
_cancellationToken = cancellationToken;
_cancelGracefully = cancelGracefully;
Shape = new FlowShape<T, T>(Inlet, Outlet);
}

public Inlet<T> Inlet { get; } = new Inlet<T>("cancel.in");
public Outlet<T> Outlet { get; } = new Outlet<T>("cancel.out");

public override FlowShape<T, T> Shape { get; }
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this);
}

/// <summary>
/// TBD
/// </summary>
Expand Down

0 comments on commit aa01127

Please sign in to comment.