diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt index 442fb717694..e993e240eb2 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt @@ -3967,7 +3967,8 @@ namespace Akka.Streams.Implementation.Fusing public readonly object Event; public readonly System.Action Handler; public readonly Akka.Streams.Stage.GraphStageLogic Logic; - public AsyncInput(Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, Akka.Streams.Stage.GraphStageLogic logic, object @event, System.Action handler) { } + public readonly System.Threading.Tasks.TaskCompletionSource Promise; + public AsyncInput(Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, Akka.Streams.Stage.GraphStageLogic logic, object @event, System.Threading.Tasks.TaskCompletionSource promise, System.Action handler) { } public Akka.Streams.Implementation.Fusing.GraphInterpreterShell Shell { get; } } public class BatchingActorInputBoundary : Akka.Streams.Implementation.Fusing.GraphInterpreter.UpstreamBoundaryStageLogic @@ -4202,7 +4203,7 @@ namespace Akka.Streams.Implementation.Fusing public readonly Akka.Streams.Stage.GraphStageLogic[] Logics; public readonly Akka.Streams.IMaterializer Materializer; public const Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection NoEvent = null; - public readonly System.Action> OnAsyncInput; + public readonly System.Action, System.Action> OnAsyncInput; public const int OutClosed = 32; public const int OutReady = 8; public const int PullEndFlip = 10; @@ -4213,7 +4214,7 @@ namespace Akka.Streams.Implementation.Fusing public const int Pushing = 4; public int RunningStagesCount; public static readonly Akka.Streams.Attributes[] SingleNoAttribute; - public GraphInterpreter(Akka.Streams.Implementation.Fusing.GraphAssembly assembly, Akka.Streams.IMaterializer materializer, Akka.Event.ILoggingAdapter log, Akka.Streams.Stage.GraphStageLogic[] logics, Connection[] connections, System.Action> onAsyncInput, bool fuzzingMode, Akka.Actor.IActorRef context) { } + public GraphInterpreter(Akka.Streams.Implementation.Fusing.GraphAssembly assembly, Akka.Streams.IMaterializer materializer, Akka.Event.ILoggingAdapter log, Akka.Streams.Stage.GraphStageLogic[] logics, Connection[] connections, System.Action, System.Action> onAsyncInput, bool fuzzingMode, Akka.Actor.IActorRef context) { } public Akka.Actor.IActorRef Context { get; } public static Akka.Streams.Implementation.Fusing.GraphInterpreter Current { get; } public static Akka.Streams.Implementation.Fusing.GraphInterpreter CurrentInterpreterOrNull { get; } @@ -4228,7 +4229,7 @@ namespace Akka.Streams.Implementation.Fusing public int Execute(int eventLimit) { } public void Finish() { } public void Init(Akka.Streams.IMaterializer subMaterializer) { } - public void RunAsyncInput(Akka.Streams.Stage.GraphStageLogic logic, object evt, System.Action handler) { } + public void RunAsyncInput(Akka.Streams.Stage.GraphStageLogic logic, object evt, System.Threading.Tasks.TaskCompletionSource promise, System.Action handler) { } public void SetHandler(Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection connection, Akka.Streams.Stage.IInHandler handler) { } public void SetHandler(Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection connection, Akka.Streams.Stage.IOutHandler handler) { } public override string ToString() { } @@ -4878,12 +4879,13 @@ namespace Akka.Streams.Stage } public abstract class GraphStageLogic : Akka.Streams.Stage.IStageLogging { - public static System.Action DoNothing; + public static readonly System.Action DoNothing; public static readonly Akka.Streams.Stage.InHandler EagerTerminateInput; public static readonly Akka.Streams.Stage.OutHandler EagerTerminateOutput; public static readonly Akka.Streams.Stage.InHandler IgnoreTerminateInput; public static readonly Akka.Streams.Stage.OutHandler IgnoreTerminateOutput; public readonly int InCount; + public static readonly System.Threading.Tasks.TaskCompletionSource NoPromise; public readonly int OutCount; public static readonly Akka.Streams.Stage.InHandler TotallyIgnorantInput; protected GraphStageLogic(int inCount, int outCount) { } @@ -4922,6 +4924,7 @@ namespace Akka.Streams.Stage protected Akka.Streams.Stage.IOutHandler GetHandler(Akka.Streams.Outlet outlet) { } [Akka.Annotations.ApiMayChangeAttribute()] protected Akka.Streams.Stage.StageActor GetStageActor(Akka.Streams.Stage.StageActorRef.Receive receive) { } + protected Akka.Streams.Stage.IAsyncCallback GetTypedAsyncCallback(System.Action handler) { } protected T Grab(Akka.Streams.Inlet inlet) { } protected bool HasBeenPulled(Akka.Streams.Inlet inlet) { } [Akka.Annotations.InternalApiAttribute()] @@ -5010,6 +5013,11 @@ namespace Akka.Streams.Stage protected abstract Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes); public virtual Akka.Streams.Stage.ILogicAndMaterializedValue CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { } } + public interface IAsyncCallback + { + void Invoke(T input); + System.Threading.Tasks.Task InvokeWithFeedback(T input); + } public interface IAsyncContext : Akka.Streams.Stage.IContext, Akka.Streams.Stage.IDetachedContext, Akka.Streams.Stage.ILifecycleContext { Akka.Streams.Stage.AsyncCallback GetAsyncCallback(); diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt index e23359dba13..badb6f52d94 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt @@ -3949,7 +3949,8 @@ namespace Akka.Streams.Implementation.Fusing public readonly object Event; public readonly System.Action Handler; public readonly Akka.Streams.Stage.GraphStageLogic Logic; - public AsyncInput(Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, Akka.Streams.Stage.GraphStageLogic logic, object @event, System.Action handler) { } + public readonly System.Threading.Tasks.TaskCompletionSource Promise; + public AsyncInput(Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, Akka.Streams.Stage.GraphStageLogic logic, object @event, System.Threading.Tasks.TaskCompletionSource promise, System.Action handler) { } public Akka.Streams.Implementation.Fusing.GraphInterpreterShell Shell { get; } } public class BatchingActorInputBoundary : Akka.Streams.Implementation.Fusing.GraphInterpreter.UpstreamBoundaryStageLogic @@ -4175,7 +4176,7 @@ namespace Akka.Streams.Implementation.Fusing public readonly Akka.Streams.Stage.GraphStageLogic[] Logics; public readonly Akka.Streams.IMaterializer Materializer; public const Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection NoEvent = null; - public readonly System.Action> OnAsyncInput; + public readonly System.Action, System.Action> OnAsyncInput; public const int OutClosed = 32; public const int OutReady = 8; public const int PullEndFlip = 10; @@ -4186,7 +4187,7 @@ namespace Akka.Streams.Implementation.Fusing public const int Pushing = 4; public int RunningStagesCount; public static readonly Akka.Streams.Attributes[] SingleNoAttribute; - public GraphInterpreter(Akka.Streams.Implementation.Fusing.GraphAssembly assembly, Akka.Streams.IMaterializer materializer, Akka.Event.ILoggingAdapter log, Akka.Streams.Stage.GraphStageLogic[] logics, Connection[] connections, System.Action> onAsyncInput, bool fuzzingMode, Akka.Actor.IActorRef context) { } + public GraphInterpreter(Akka.Streams.Implementation.Fusing.GraphAssembly assembly, Akka.Streams.IMaterializer materializer, Akka.Event.ILoggingAdapter log, Akka.Streams.Stage.GraphStageLogic[] logics, Connection[] connections, System.Action, System.Action> onAsyncInput, bool fuzzingMode, Akka.Actor.IActorRef context) { } public Akka.Actor.IActorRef Context { get; } public static Akka.Streams.Implementation.Fusing.GraphInterpreter Current { get; } public static Akka.Streams.Implementation.Fusing.GraphInterpreter CurrentInterpreterOrNull { get; } @@ -4201,7 +4202,7 @@ namespace Akka.Streams.Implementation.Fusing public int Execute(int eventLimit) { } public void Finish() { } public void Init(Akka.Streams.IMaterializer subMaterializer) { } - public void RunAsyncInput(Akka.Streams.Stage.GraphStageLogic logic, object evt, System.Action handler) { } + public void RunAsyncInput(Akka.Streams.Stage.GraphStageLogic logic, object evt, System.Threading.Tasks.TaskCompletionSource promise, System.Action handler) { } public void SetHandler(Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection connection, Akka.Streams.Stage.IInHandler handler) { } public void SetHandler(Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection connection, Akka.Streams.Stage.IOutHandler handler) { } public override string ToString() { } @@ -4851,12 +4852,13 @@ namespace Akka.Streams.Stage } public abstract class GraphStageLogic : Akka.Streams.Stage.IStageLogging { - public static System.Action DoNothing; + public static readonly System.Action DoNothing; public static readonly Akka.Streams.Stage.InHandler EagerTerminateInput; public static readonly Akka.Streams.Stage.OutHandler EagerTerminateOutput; public static readonly Akka.Streams.Stage.InHandler IgnoreTerminateInput; public static readonly Akka.Streams.Stage.OutHandler IgnoreTerminateOutput; public readonly int InCount; + public static readonly System.Threading.Tasks.TaskCompletionSource NoPromise; public readonly int OutCount; public static readonly Akka.Streams.Stage.InHandler TotallyIgnorantInput; protected GraphStageLogic(int inCount, int outCount) { } @@ -4895,6 +4897,7 @@ namespace Akka.Streams.Stage protected Akka.Streams.Stage.IOutHandler GetHandler(Akka.Streams.Outlet outlet) { } [Akka.Annotations.ApiMayChangeAttribute()] protected Akka.Streams.Stage.StageActor GetStageActor(Akka.Streams.Stage.StageActorRef.Receive receive) { } + protected Akka.Streams.Stage.IAsyncCallback GetTypedAsyncCallback(System.Action handler) { } protected T Grab(Akka.Streams.Inlet inlet) { } protected bool HasBeenPulled(Akka.Streams.Inlet inlet) { } [Akka.Annotations.InternalApiAttribute()] @@ -4983,6 +4986,11 @@ namespace Akka.Streams.Stage protected abstract Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes); public virtual Akka.Streams.Stage.ILogicAndMaterializedValue CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { } } + public interface IAsyncCallback + { + void Invoke(T input); + System.Threading.Tasks.Task InvokeWithFeedback(T input); + } public interface IAsyncContext : Akka.Streams.Stage.IContext, Akka.Streams.Stage.IDetachedContext, Akka.Streams.Stage.ILifecycleContext { Akka.Streams.Stage.AsyncCallback GetAsyncCallback(); diff --git a/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs b/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs index d8b0c3bd39b..be4c3fb09d4 100644 --- a/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs @@ -615,6 +615,38 @@ await Awaiting(async () => }, Materializer); } + [Fact] + public async Task BroadcastHub_must_handle_cancelled_Sink() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var upstream = this.CreatePublisherProbe(); + var hubSource = Source.FromPublisher(upstream).RunWith(BroadcastHub.Sink(4), Materializer); + var downstream = this.CreateSubscriberProbe(); + + hubSource.RunWith(Sink.Cancelled(), Materializer); + hubSource.RunWith(Sink.FromSubscriber(downstream), Materializer); + + await downstream.EnsureSubscriptionAsync(); + + await downstream.RequestAsync(10); + await upstream.ExpectRequestAsync(); + await upstream.SendNextAsync(1); + await downstream.ExpectNextAsync(1); + await upstream.SendNextAsync(2); + await downstream.ExpectNextAsync(2); + await upstream.SendNextAsync(3); + await downstream.ExpectNextAsync(3); + await upstream.SendNextAsync(4); + await downstream.ExpectNextAsync(4); + await upstream.SendNextAsync(5); + await downstream.ExpectNextAsync(5); + + await upstream.SendCompleteAsync(); + await downstream.ExpectCompleteAsync(); + }, Materializer); + } + [Fact] public async Task PartitionHub_must_work_in_the_happy_case_with_one_stream() { diff --git a/src/core/Akka.Streams.Tests/Implementation/Fusing/AsyncCallbackSpec.cs b/src/core/Akka.Streams.Tests/Implementation/Fusing/AsyncCallbackSpec.cs new file mode 100644 index 00000000000..2c6667386d0 --- /dev/null +++ b/src/core/Akka.Streams.Tests/Implementation/Fusing/AsyncCallbackSpec.cs @@ -0,0 +1,334 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Linq; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Streams.Dsl; +using Akka.Streams.Stage; +using Akka.Streams.TestKit; +using Akka.TestKit; +using Akka.Util; +using Xunit; +using Xunit.Abstractions; +using FluentAssertions; +using static FluentAssertions.FluentActions; + +namespace Akka.Streams.Tests.Implementation.Fusing; + +public class AsyncCallbackSpec: AkkaSpec +{ + private ActorMaterializer Materializer { get; } + + internal sealed class Started + { + public static readonly Started Instance = new(); + private Started() { } + } + + internal sealed record Elem(int N); + + internal sealed record ThrowException(string Message); + + internal sealed class Stopped + { + public static readonly Stopped Instance = new(); + private Stopped() { } + } + + internal sealed record Callbacks(Action Callback, IAsyncCallback CallbackAsync); + + internal sealed class AsyncCallbackGraphStage: GraphStageWithMaterializedValue, Callbacks> + { + #region Logic + + internal sealed class Logic: InAndOutGraphStageLogic + { + private readonly AsyncCallbackGraphStage _stage; + public readonly Action AsyncCallback; + public readonly IAsyncCallback AsyncCallbackAsync; + + public Logic(AsyncCallbackGraphStage stage, Shape shape) : base(shape) + { + _stage = stage; + AsyncCallback = GetAsyncCallback(Callback); + AsyncCallbackAsync = GetTypedAsyncCallback(Callback); + if (_stage._early.HasValue) + _stage._early.Value(AsyncCallbackAsync); + SetHandlers(_stage.In, _stage.Out, this); + } + + public override void PreStart() + { + base.PreStart(); + _stage._probe.Tell(Started.Instance); + } + + public override void PostStop() + { + base.PostStop(); + _stage._probe.Tell(Stopped.Instance); + } + + public override void OnPush() + { + var n = Grab(_stage.In); + _stage._probe.Tell(new Elem(n)); + Push(_stage.Out, n); + } + + public override void OnPull() + { + Pull(_stage.In); + } + + private void Callback(object whatever) + { + switch (whatever) + { + case ThrowException t: + throw new TestException(t.Message); + case "fail-the-stage": + FailStage(new Exception("failing the stage")); + break; + default: + _stage._probe.Tell(whatever); + break; + } + } + } + + #endregion + private readonly IActorRef _probe; + private readonly Option>> _early; + + public AsyncCallbackGraphStage(IActorRef probe, Option>>? early = null) + { + Shape = new FlowShape(In, Out); + _probe = probe; + _early = early ?? Option>>.None; + } + + public Inlet In { get; } = new("In"); + public Outlet Out { get; } = new("Out"); + public override FlowShape Shape { get; } + + public override ILogicAndMaterializedValue CreateLogicAndMaterializedValue(Attributes inheritedAttributes) + { + var logic = new Logic(this, Shape); + return new LogicAndMaterializedValue(logic, new Callbacks(logic.AsyncCallback, logic.AsyncCallbackAsync)); + } + } + + public AsyncCallbackSpec(ITestOutputHelper output) : base(output, "akka.loglevel = DEBUG") + { + Materializer = ActorMaterializer.Create(Sys, ActorMaterializerSettings.Create(Sys).WithFuzzingMode(false)); + } + + [Fact(DisplayName = "The support for async callbacks must invoke without feedback, happy path")] + public async Task WithoutFeedbackHappyPathTest() + { + var probe = CreateTestProbe(); + var upstream = this.CreatePublisherProbe(); + var downstream = this.CreateSubscriberProbe(); + var (callback, _) = Source.FromPublisher(upstream) + .ViaMaterialized(new AsyncCallbackGraphStage(probe.Ref), Keep.Right) + .To(Sink.FromSubscriber(downstream)) + .Run(Materializer); + + await downstream.EnsureSubscriptionAsync(); + + await probe.ExpectMsgAsync(); + await downstream.RequestAsync(1); + await upstream.ExpectRequestAsync(); + + foreach (var n in Enumerable.Range(0, 10)) + { + var msg = $"whatever{n}"; + callback(msg); + await probe.ExpectMsgAsync(msg); + } + + await upstream.SendCompleteAsync(); + await downstream.ExpectCompleteAsync(); + + probe.ExpectMsg(); + } + + [Fact(DisplayName = "The support for async callbacks must invoke with feedback, happy path")] + public async Task WithFeedbackHappyPathTest() + { + var probe = CreateTestProbe(); + var upstream = this.CreatePublisherProbe(); + var downstream = this.CreateSubscriberProbe(); + var (_, callback) = Source.FromPublisher(upstream) + .ViaMaterialized(new AsyncCallbackGraphStage(probe.Ref), Keep.Right) + .To(Sink.FromSubscriber(downstream)) + .Run(Materializer); + + await probe.ExpectMsgAsync(); + + await downstream.EnsureSubscriptionAsync(); + await downstream.RequestAsync(1); + await upstream.ExpectRequestAsync(); + + foreach (var n in Enumerable.Range(0, 10)) + { + var msg = $"whatever{n}"; + var feedback = callback.InvokeWithFeedback(msg); + await probe.ExpectMsgAsync(msg); + await feedback; + feedback.IsCompleted.Should().BeTrue(); + feedback.Result.Should().Be(Done.Instance); + } + + await upstream.SendCompleteAsync(); + await downstream.ExpectCompleteAsync(); + + probe.ExpectMsg(); + } + + [Fact(DisplayName = "The support for async callbacks must fail the feedback if stage is stopped")] + public async Task FailedFeedbackStageStoppedTest() + { + var probe = CreateTestProbe(); + var (_, callback) = Source.Empty() + .ViaMaterialized(new AsyncCallbackGraphStage(probe.Ref), Keep.Right) + .To(Sink.Ignore()) + .Run(Materializer); + + await probe.ExpectMsgAsync(); + await probe.ExpectMsgAsync(); + + var feedback = callback.InvokeWithFeedback("whatever"); + + Invoking(() => feedback.GetAwaiter().GetResult()) + .Should().Throw(); + } + + [Fact(DisplayName = "The support for async callbacks must invoke early")] + public async Task InvokeEarlyTest() + { + var probe = CreateTestProbe(); + var upstream = this.CreatePublisherProbe(); + var (callback, _) = Source.FromPublisher(upstream) + .ViaMaterialized(new AsyncCallbackGraphStage( + probe.Ref, + Option>>.Create(asyncCb => asyncCb.InvokeWithFeedback("early"))), Keep.Right) + .To(Sink.Ignore()) + .Run(Materializer); + + await Task.Delay(100); + // and deliver in order + callback("later"); + + await probe.ExpectMsgAsync(); + await probe.ExpectMsgAsync("early"); + await probe.ExpectMsgAsync("later"); + + await upstream.SendCompleteAsync(); + probe.ExpectMsg(); + } + + [Fact(DisplayName = "The support for async callbacks must invoke with feedback early")] + public async Task InvokeFeedbackEarlyTest() + { + var probe = CreateTestProbe(); + var earlyFeedback = new TaskCompletionSource(); + var upstream = this.CreatePublisherProbe(); + var (_, callback) = Source.FromPublisher(upstream) + .ViaMaterialized(new AsyncCallbackGraphStage( + probe.Ref, + Option>>.Create(asyncCb => + { + asyncCb.InvokeWithFeedback("early"); + earlyFeedback.SetResult(Done.Instance); + }) + ), Keep.Right) + .To(Sink.Ignore()) + .Run(Materializer); + + await Task.Delay(100); + // and deliver in order + var laterFeedback = callback.InvokeWithFeedback("later"); + + await probe.ExpectMsgAsync(); + await probe.ExpectMsgAsync("early"); + earlyFeedback.Task.Result.Should().Be(Done.Instance); + + await probe.ExpectMsgAsync("later"); + laterFeedback.Result.Should().Be(Done.Instance); + + await upstream.SendCompleteAsync(); + probe.ExpectMsg(); + } + + [Fact(DisplayName = "The support for async callbacks must accept concurrent inputs")] + public async Task ConcurrentInputTest() + { + var probe = CreateTestProbe(); + var upstream = this.CreatePublisherProbe(); + var (_, callback) = Source.FromPublisher(upstream) + .ViaMaterialized(new AsyncCallbackGraphStage(probe.Ref), Keep.Right) + .To(Sink.Ignore()) + .Run(Materializer); + + await probe.ExpectMsgAsync(); + + var feedbacks = Enumerable.Range(1, 100) + .Select(n => callback.InvokeWithFeedback(n.ToString())); + + var cbResults = await Task.WhenAll(feedbacks); + cbResults.Length.Should().Be(100); + Enumerable.Range(1, 100) + .Select(_ => probe.ExpectMsg()) + .ToHashSet().Count.Should().Be(100); + + await upstream.SendCompleteAsync(); + probe.ExpectMsg(); + } + + [Fact(DisplayName = "The support for async callbacks must fail the feedback if the handler throws")] + public async Task FailingFeedbackHandlerThrowsTest() + { + var probe = CreateTestProbe(); + var upstream = this.CreatePublisherProbe(); + var (_, callback) = Source.FromPublisher(upstream) + .ViaMaterialized(new AsyncCallbackGraphStage(probe.Ref), Keep.Right) + .To(Sink.Ignore()) + .Run(Materializer); + + await probe.ExpectMsgAsync(); + (await callback.InvokeWithFeedback("happy-case")).Should().Be(Done.Instance); + await probe.ExpectMsgAsync("happy-case"); + + var feedback = callback.InvokeWithFeedback(new ThrowException("oh my gosh, whale of a wash!")); + await Awaiting(async () => await feedback) + .Should().ThrowAsync() + .WithMessage("oh my gosh, whale of a wash!"); + + await upstream.ExpectCancellationAsync(); + } + + [Fact(DisplayName = "The support for async callbacks must fail the feedback if the handler fails the stage")] + public async Task FailingFeedbackHandlerFailsStageTest() + { + var probe = CreateTestProbe(); + var (_, callback) = Source.Empty() + .ViaMaterialized(new AsyncCallbackGraphStage(probe.Ref), Keep.Right) + .To(Sink.Ignore()) + .Run(Materializer); + + await probe.ExpectMsgAsync(); + await probe.ExpectMsgAsync(); + + var feedback = callback.InvokeWithFeedback("fail-the-stage"); + await Awaiting(async () => await feedback) + .Should().ThrowAsync(); + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams.Tests/Implementation/Fusing/GraphInterpreterSpecKit.cs b/src/core/Akka.Streams.Tests/Implementation/Fusing/GraphInterpreterSpecKit.cs index 8ffd7269c77..ba3b44881cc 100644 --- a/src/core/Akka.Streams.Tests/Implementation/Fusing/GraphInterpreterSpecKit.cs +++ b/src/core/Akka.Streams.Tests/Implementation/Fusing/GraphInterpreterSpecKit.cs @@ -124,7 +124,7 @@ public void Init() new Dictionary(), _ => { }); var connections = mat.Item1; var logics = mat.Item2; - var interpreter = new GraphInterpreter(assembly, NoMaterializer.Instance, _logger, logics, connections, (_, _, _) => {}, false, null); + var interpreter = new GraphInterpreter(assembly, NoMaterializer.Instance, _logger, logics, connections, (_, _, _, _) => {}, false, null); var i = 0; foreach (var upstream in _upstreams) @@ -147,7 +147,7 @@ public void ManualInit(GraphAssembly assembly) new Dictionary(), _ => { }); var connections = mat.Item1; var logics = mat.Item2; - _interpreter = new GraphInterpreter(assembly, NoMaterializer.Instance, _logger, logics, connections, (_, _, _) => {}, false, null); + _interpreter = new GraphInterpreter(assembly, NoMaterializer.Instance, _logger, logics, connections, (_, _, _, _) => {}, false, null); } public AssemblyBuilder Builder(params IGraphStageWithMaterializedValue[] stages) diff --git a/src/core/Akka.Streams/Dsl/Hub.cs b/src/core/Akka.Streams/Dsl/Hub.cs index 187b22ec545..07e55c8df34 100644 --- a/src/core/Akka.Streams/Dsl/Hub.cs +++ b/src/core/Akka.Streams/Dsl/Hub.cs @@ -542,67 +542,16 @@ private sealed class RegistrationPending : IHubEvent private RegistrationPending() { - } } - private sealed class UnRegister : IHubEvent - { - public UnRegister(long id, int previousOffset, int finalOffset) - { - Id = id; - PreviousOffset = previousOffset; - FinalOffset = finalOffset; - } - - public long Id { get; } - - public int PreviousOffset { get; } + private sealed record UnRegister(long Id, int PreviousOffset, int FinalOffset) : IHubEvent; - public int FinalOffset { get; } - } - - private sealed class Advanced : IHubEvent - { - public Advanced(long id, int previousOffset) - { - Id = id; - PreviousOffset = previousOffset; - } - - public long Id { get; } - - public int PreviousOffset { get; } - } - - private sealed class NeedWakeup : IHubEvent - { - public NeedWakeup(long id, int previousOffset, int currentOffset) - { - Id = id; - PreviousOffset = previousOffset; - CurrentOffset = currentOffset; - } - - public long Id { get; } - - public int PreviousOffset { get; } - - public int CurrentOffset { get; } - } - - private sealed class Consumer - { - public Consumer(long id, Action callback) - { - Id = id; - Callback = callback; - } + private sealed record Advanced(long Id, int PreviousOffset) : IHubEvent; - public long Id { get; } + private sealed record NeedWakeup(long Id, int PreviousOffset, int CurrentOffset) : IHubEvent; - public Action Callback { get; } - } + private sealed record Consumer(long Id, IAsyncCallback Callback); private sealed class Completed @@ -611,35 +560,15 @@ private sealed class Completed private Completed() { - } } private interface IHubState { } - private sealed class Open : IHubState - { - public Open(Task> callbackTask, ImmutableList registrations) - { - CallbackTask = callbackTask; - Registrations = registrations; - } + private sealed record Open(Task> CallbackTask, ImmutableList Registrations) : IHubState; - public Task> CallbackTask { get; } - - public ImmutableList Registrations { get; } - } - - private sealed class Closed : IHubState - { - public Closed(Exception failure = null) - { - Failure = failure; - } - - public Exception Failure { get; } - } + private sealed record Closed(Exception? Failure = null) : IHubState; private interface IConsumerEvent { } @@ -650,29 +579,12 @@ private sealed class Wakeup : IConsumerEvent private Wakeup() { - - } - } - - private sealed class HubCompleted : IConsumerEvent - { - public HubCompleted(Exception failure = null) - { - Failure = failure; } - - public Exception Failure { get; } } - private sealed class Initialize : IConsumerEvent - { - public Initialize(int offset) - { - Offset = offset; - } + private sealed record HubCompleted(Exception? Failure = null) : IConsumerEvent; - public int Offset { get; } - } + private sealed record Initialize(int Offset) : IConsumerEvent; #endregion @@ -747,69 +659,96 @@ public override void OnPush() private void OnEvent(IHubEvent hubEvent) { - if (hubEvent == RegistrationPending.Instance) + switch (hubEvent) { - var open = (Open)State.GetAndSet(_noRegistrationState); - foreach (var c in open.Registrations) + case RegistrationPending: { - var startFrom = _head; - _activeConsumer++; - AddConsumer(c, startFrom); - c.Callback(new Initialize(startFrom)); - } + var open = (Open)State.GetAndSet(_noRegistrationState); + foreach (var consumer in open.Registrations) + { + var startFrom = _head; + _activeConsumer++; + AddConsumer(consumer, startFrom); - return; - } + _ = ProcessConsumerRegistration(); + continue; - if (hubEvent is UnRegister unregister) - { - _activeConsumer--; - FindAndRemoveConsumer(unregister.Id, unregister.PreviousOffset); - if (_activeConsumer == 0) + async Task ProcessConsumerRegistration() + { + // in case the consumer is already stopped we need to undo registration + try + { + await consumer.Callback.InvokeWithFeedback(new Initialize(startFrom)); + } + catch (StreamDetachedException) // stopped + { + try + { + // Make sure that the task completed successfully + var result = await _callbackCompletion.Task; + result(new UnRegister(consumer.Id, startFrom, startFrom)); + } + catch + { + // no-op + } + } + } + } + + return; + } + case UnRegister unregister: { - if (IsClosed(_stage.In)) - CompleteStage(); - else if (_head != unregister.FinalOffset) + if(FindAndRemoveConsumer(unregister.Id, unregister.PreviousOffset) is not null) + _activeConsumer--; + + if (_activeConsumer == 0) { - // If our final consumer goes away, we roll forward the buffer so a subsequent consumer does not - // see the already consumed elements. This feature is quite handy. - - while (_head != unregister.FinalOffset) + if (IsClosed(_stage.In)) + CompleteStage(); + else if (_head != unregister.FinalOffset) { - _queue[_head & _stage._mask] = null; - _head++; + // If our final consumer goes away, we roll forward the buffer so a subsequent consumer does not + // see the already consumed elements. This feature is quite handy. + + while (_head != unregister.FinalOffset) + { + _queue[_head & _stage._mask] = null; + _head++; + } + + _head = unregister.FinalOffset; + if (!HasBeenPulled(_stage.In)) + Pull(_stage.In); } - - _head = unregister.FinalOffset; - if (!HasBeenPulled(_stage.In)) - Pull(_stage.In); } + else + CheckUnblock(unregister.PreviousOffset); + return; + } + case Advanced advance: + { + var newOffset = advance.PreviousOffset + _stage._demandThreshold; + // Move the consumer from its last known offset to its new one. Check if we are unblocked. + var customer = FindAndRemoveConsumer(advance.Id, advance.PreviousOffset); + AddConsumer(customer, newOffset); + CheckUnblock(advance.PreviousOffset); + return; + } + case NeedWakeup wakeup: + { + // Move the consumer from its last known offset to its new one. Check if we are unblocked. + var consumer = FindAndRemoveConsumer(wakeup.Id, wakeup.PreviousOffset); + AddConsumer(consumer, wakeup.CurrentOffset); + + // Also check if the consumer is now unblocked since we published an element since it went asleep. + if (wakeup.CurrentOffset != _tail) + consumer.Callback.Invoke(Wakeup.Instance); + CheckUnblock(wakeup.PreviousOffset); + return; } - else - CheckUnblock(unregister.PreviousOffset); - return; - } - - if (hubEvent is Advanced advance) - { - var newOffset = advance.PreviousOffset + _stage._demandThreshold; - // Move the consumer from its last known offset to its new one. Check if we are unblocked. - var c = FindAndRemoveConsumer(advance.Id, advance.PreviousOffset); - AddConsumer(c, newOffset); - CheckUnblock(advance.PreviousOffset); - return; } - - // only NeedWakeup left - var wakeup = (NeedWakeup)hubEvent; - // Move the consumer from its last known offset to its new one. Check if we are unblocked. - var consumer = FindAndRemoveConsumer(wakeup.Id, wakeup.PreviousOffset); - AddConsumer(consumer, wakeup.CurrentOffset); - - // Also check if the consumer is now unblocked since we published an element since it went asleep. - if (wakeup.CurrentOffset != _tail) - consumer.Callback(Wakeup.Instance); - CheckUnblock(wakeup.PreviousOffset); } // Producer API @@ -823,10 +762,10 @@ public override void OnUpstreamFailure(Exception e) // Notify pending consumers and set tombstone var open = (Open)State.GetAndSet(new Closed(e)); - open.Registrations.ForEach(c => c.Callback(failMessage)); + open.Registrations.ForEach(c => c.Callback.Invoke(failMessage)); // Notify registered consumers - _consumerWheel.SelectMany(x => x).ForEach(c => c.Callback(failMessage)); + _consumerWheel.SelectMany(x => x).ForEach(c => c.Callback.Invoke(failMessage)); FailStage(e); } @@ -904,7 +843,7 @@ private void AddConsumer(Consumer consumer, int offset) /// /// TBD private void WakeupIndex(int index) - => _consumerWheel[index].ForEach(c => c.Callback(Wakeup.Instance)); + => _consumerWheel[index].ForEach(c => c.Callback.Invoke(Wakeup.Instance)); private void Complete() { @@ -931,7 +870,7 @@ public override void PostStop() { var completedMessage = new HubCompleted(); foreach (var consumer in open.Registrations) - consumer.Callback(completedMessage); + consumer.Callback.Invoke(completedMessage); } else continue; @@ -989,20 +928,7 @@ public Logic(HubSourceLogic stage, long id) : base(stage.Shape) public override void PreStart() { - var callback = GetAsyncCallback(OnCommand); - - void OnHubReady(Result> result) - { - if (result.IsSuccess) - { - _hubCallback = result.Value; - if (IsAvailable(_stage.Out) && _offsetInitialized) - OnPull(); - _hubCallback(RegistrationPending.Instance); - } - else - FailStage(result.Exception); - } + var callback = GetTypedAsyncCallback(OnCommand); /* * Note that there is a potential race here. First we add ourselves to the pending registrations, then @@ -1039,6 +965,21 @@ void OnHubReady(Result> result) break; } + + return; + + void OnHubReady(Result> result) + { + if (result.IsSuccess) + { + _hubCallback = result.Value; + if (IsAvailable(_stage.Out) && _offsetInitialized) + OnPull(); + _hubCallback!(RegistrationPending.Instance); + } + else + FailStage(result.Exception); + } } public override void OnPull() diff --git a/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs b/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs index e9a26385542..c5247e7e3bd 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs @@ -9,6 +9,7 @@ using System.Collections.Generic; using System.Linq; using System.Text; +using System.Threading.Tasks; using Akka.Actor; using Akka.Annotations; using Akka.Event; @@ -281,7 +282,7 @@ public int Receive(ActorGraphInterpreter.IBoundaryEvent e, int eventLimit) return eventLimit; case ActorGraphInterpreter.AsyncInput asyncInput: - Interpreter.RunAsyncInput(asyncInput.Logic, asyncInput.Event, asyncInput.Handler); + Interpreter.RunAsyncInput(asyncInput.Logic, asyncInput.Event, asyncInput.Promise, asyncInput.Handler); if (eventLimit == 1 && _interpreter.IsSuspended) { SendResume(true); @@ -409,12 +410,12 @@ private void SendResume(bool sendResume) private GraphInterpreter GetInterpreter() { return new GraphInterpreter(_assembly, Materializer, Log, _logics, _connections, - (logic, @event, handler) => + (logic, @event, promise, handler) => { - var asyncInput = new ActorGraphInterpreter.AsyncInput(this, logic, @event, handler); + var asyncInput = new ActorGraphInterpreter.AsyncInput(this, logic, @event, promise, handler); var currentInterpreter = CurrentInterpreterOrNull; if (currentInterpreter == null || !Equals(currentInterpreter.Context, Self)) - Self.Tell(new ActorGraphInterpreter.AsyncInput(this, logic, @event, handler)); + Self.Tell(asyncInput); else _enqueueToShortCircuit(asyncInput); }, _settings.IsFuzzingMode, Self); @@ -694,36 +695,18 @@ public ExposedPublisher(GraphInterpreterShell shell, int id, IActorPublisher pub public GraphInterpreterShell Shell { get; } } - /// - /// TBD - /// public readonly struct AsyncInput : IBoundaryEvent { - /// - /// TBD - /// public readonly GraphStageLogic Logic; - /// - /// TBD - /// public readonly object Event; - /// - /// TBD - /// + public readonly TaskCompletionSource Promise; public readonly Action Handler; - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - public AsyncInput(GraphInterpreterShell shell, GraphStageLogic logic, object @event, Action handler) + public AsyncInput(GraphInterpreterShell shell, GraphStageLogic logic, object @event, TaskCompletionSource promise, Action handler) { Shell = shell; Logic = logic; Event = @event; + Promise = promise; Handler = handler; } diff --git a/src/core/Akka.Streams/Implementation/Fusing/GraphInterpreter.cs b/src/core/Akka.Streams/Implementation/Fusing/GraphInterpreter.cs index 6bc899b50e7..6d694e60da4 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/GraphInterpreter.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/GraphInterpreter.cs @@ -10,6 +10,7 @@ using System.Runtime.CompilerServices; using System.Text; using System.Threading; +using System.Threading.Tasks; using Akka.Actor; using Akka.Annotations; using Akka.Event; @@ -409,7 +410,7 @@ public static GraphInterpreter Current /// /// TBD /// - public readonly Action> OnAsyncInput; + public readonly Action, Action> OnAsyncInput; /// /// TBD /// @@ -457,7 +458,7 @@ public GraphInterpreter( ILoggingAdapter log, GraphStageLogic[] logics, Connection[] connections, - Action> onAsyncInput, + Action, Action> onAsyncInput, bool fuzzingMode, IActorRef context) { @@ -799,13 +800,7 @@ private void ReportStageError(Exception e) #pragma warning disable CS0162 // Disabled since the flag can be set while debugging - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - public void RunAsyncInput(GraphStageLogic logic, object evt, Action handler) + public void RunAsyncInput(GraphStageLogic logic, object evt, TaskCompletionSource promise, Action handler) { if (!IsStageCompleted(logic)) { @@ -819,9 +814,19 @@ public void RunAsyncInput(GraphStageLogic logic, object evt, Action hand try { handler(evt); + if (!ReferenceEquals(promise, GraphStageLogic.NoPromise)) + { + promise.TrySetResult(Done.Instance); + logic.OnFeedbackDispatched(); + } } catch (Exception e) { + if (!ReferenceEquals(promise, GraphStageLogic.NoPromise)) + { + promise.TrySetException(e); + logic.OnFeedbackDispatched(); + } logic.FailStage(e); } AfterStageHasRun(logic); diff --git a/src/core/Akka.Streams/Stage/GraphStage.cs b/src/core/Akka.Streams/Stage/GraphStage.cs index a193658a3e0..ff5182aa7c5 100644 --- a/src/core/Akka.Streams/Stage/GraphStage.cs +++ b/src/core/Akka.Streams/Stage/GraphStage.cs @@ -6,11 +6,13 @@ //----------------------------------------------------------------------- using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; using System.Runtime.Serialization; using System.Threading; +using System.Threading.Tasks; using Akka.Actor; using Akka.Annotations; using Akka.Dispatch.SysMsg; @@ -36,6 +38,7 @@ public interface ILogicAndMaterializedValue /// TBD /// GraphStageLogic Logic { get; } + /// /// TBD /// @@ -63,6 +66,7 @@ public LogicAndMaterializedValue(GraphStageLogic logic, TMaterialized materializ /// TBD /// public GraphStageLogic Logic { get; } + /// /// TBD /// @@ -74,7 +78,8 @@ public LogicAndMaterializedValue(GraphStageLogic logic, TMaterialized materializ /// /// TBD /// TBD - public interface IGraphStageWithMaterializedValue : IGraph where TShape : Shape + public interface IGraphStageWithMaterializedValue : IGraph + where TShape : Shape { /// /// TBD @@ -89,7 +94,9 @@ public interface IGraphStageWithMaterializedValue /// /// TBD /// TBD - public abstract class GraphStageWithMaterializedValue : IGraphStageWithMaterializedValue where TShape : Shape + public abstract class + GraphStageWithMaterializedValue : IGraphStageWithMaterializedValue + where TShape : Shape { #region anonymous graph class @@ -105,13 +112,16 @@ public Graph(TShape shape, IModule module, Attributes attributes) public IModule Module { get; } - public IGraph WithAttributes(Attributes attributes) => new Graph(Shape, Module, attributes); + public IGraph WithAttributes(Attributes attributes) => + new Graph(Shape, Module, attributes); - public IGraph AddAttributes(Attributes attributes) => WithAttributes(Module.Attributes.And(attributes)); + public IGraph AddAttributes(Attributes attributes) => + WithAttributes(Module.Attributes.And(attributes)); public IGraph Named(string name) => AddAttributes(Attributes.CreateName(name)); - public IGraph Async() => AddAttributes(new Attributes(Attributes.AsyncBoundary.Instance)); + public IGraph Async() => + AddAttributes(new Attributes(Attributes.AsyncBoundary.Instance)); } #endregion @@ -145,14 +155,16 @@ protected GraphStageWithMaterializedValue() /// /// TBD /// TBD - public IGraph WithAttributes(Attributes attributes) => new Graph(Shape, Module, attributes); + public IGraph WithAttributes(Attributes attributes) => + new Graph(Shape, Module, attributes); /// /// TBD /// /// TBD /// TBD - public abstract ILogicAndMaterializedValue CreateLogicAndMaterializedValue(Attributes inheritedAttributes); + public abstract ILogicAndMaterializedValue CreateLogicAndMaterializedValue( + Attributes inheritedAttributes); /// /// TBD @@ -164,7 +176,8 @@ protected GraphStageWithMaterializedValue() /// /// TBD /// TBD - public IGraph AddAttributes(Attributes attributes) => WithAttributes(Module.Attributes.And(attributes)); + public IGraph AddAttributes(Attributes attributes) => + WithAttributes(Module.Attributes.And(attributes)); /// /// TBD @@ -177,7 +190,8 @@ protected GraphStageWithMaterializedValue() /// TBD /// /// TBD - public IGraph Async() => AddAttributes(new Attributes(Attributes.AsyncBoundary.Instance)); + public IGraph Async() => + AddAttributes(new Attributes(Attributes.AsyncBoundary.Instance)); } /// @@ -193,7 +207,8 @@ public abstract class GraphStage : GraphStageWithMaterializedValue /// TBD /// TBD - public sealed override ILogicAndMaterializedValue CreateLogicAndMaterializedValue(Attributes inheritedAttributes) + public sealed override ILogicAndMaterializedValue CreateLogicAndMaterializedValue( + Attributes inheritedAttributes) => new LogicAndMaterializedValue(CreateLogic(inheritedAttributes), NotUsed.Instance); /// @@ -205,11 +220,59 @@ public sealed override ILogicAndMaterializedValue CreateLogicAndMateria } /// - /// TBD + /// Asynchronous callback holder that is attached to a . + /// + /// Initializing will eventually lead to the registered callback function being called. + /// + /// This holder has the same lifecycle as a stream and cannot be used before materialization is done. /// + /// + /// Typical use cases for this are exchanging messages between stream and substreams or sending external events + /// to a stream. + /// + /// The input type accepted by the stage. + public interface IAsyncCallback + { + /// + /// Dispatch an asynchronous notification. This method is thread-safe and may + /// be invoked from external execution contexts. + /// + /// + /// For cases where it's important to know if the notification was ever processed + /// or not, please see + /// + void Invoke(T input); + + /// + /// Dispatch an asynchronous notification. This method is thread-safe and may + /// be invoked from external execution contexts. + /// + /// This method returns directly and the returned is then completed + /// once the event has been handled by the operator. If the event triggers an exception + /// from the handler the returned will be completed with that exception. + /// + /// If the operator was stopped before the event was handled then the Task will fail with a + /// . + /// + /// + /// The handling of the returned incurs slight overhead, so in cases where you don't + /// need an explicit reply please use instead. + /// + Task InvokeWithFeedback(T input); + } + + /// + /// Timer-driven graph stage logic. + /// + /// + /// To be thread safe the methods of this class must only be called from either the constructor of the graph operator during + /// materialization or one of the methods invoked by the graph operator machinery, such as + /// public abstract class TimerGraphStageLogic : GraphStageLogic { - private readonly IDictionary _keyToTimers = new Dictionary(); + private readonly IDictionary _keyToTimers = + new Dictionary(); + private readonly AtomicCounter _timerIdGen = new(0); private Action _timerAsyncCallback; @@ -262,10 +325,8 @@ protected internal void ScheduleRepeatedly(object timerKey, TimeSpan initialDela { CancelTimer(timerKey); var id = _timerIdGen.IncrementAndGet(); - var task = Interpreter.Materializer.ScheduleRepeatedly(initialDelay, interval, () => - { - TimerAsyncCallback(new TimerMessages.Scheduled(timerKey, id, isRepeating: true)); - }); + var task = Interpreter.Materializer.ScheduleRepeatedly(initialDelay, interval, + () => { TimerAsyncCallback(new TimerMessages.Scheduled(timerKey, id, isRepeating: true)); }); _keyToTimers[timerKey] = new TimerMessages.Timer(id, task); } @@ -291,10 +352,8 @@ protected internal void ScheduleOnce(object timerKey, TimeSpan delay) { CancelTimer(timerKey); var id = _timerIdGen.IncrementAndGet(); - var task = Interpreter.Materializer.ScheduleOnce(delay, () => - { - TimerAsyncCallback(new TimerMessages.Scheduled(timerKey, id, isRepeating: false)); - }); + var task = Interpreter.Materializer.ScheduleOnce(delay, + () => { TimerAsyncCallback(new TimerMessages.Scheduled(timerKey, id, isRepeating: false)); }); _keyToTimers[timerKey] = new TimerMessages.Timer(id, task); } @@ -351,10 +410,12 @@ public sealed class Scheduled : IDeadLetterSuppression /// TBD /// public readonly object TimerKey; + /// /// TBD /// public readonly int TimerId; + /// /// TBD /// @@ -386,6 +447,7 @@ public sealed class Timer /// TBD /// public readonly int Id; + /// /// TBD /// @@ -423,6 +485,23 @@ public Timer(int id, ICancelable task) /// public abstract class GraphStageLogic : IStageLogging { + private ImmutableList _callbacksWaitingForInterpreter = ImmutableList.Empty; + + // needs to be accessible via the ConcurrentAsyncCallback + private AtomicReference>?> _asyncCallbacksInProgress = + new(ImmutableList>.Empty); + + public static readonly TaskCompletionSource NoPromise; + + static GraphStageLogic() + { + NoPromise = new TaskCompletionSource(); + NoPromise.SetResult(Done.Instance); + } + + private StreamDetachedException StreamDetachedException => + new StreamDetachedException($"Stage with GraphStageLogic [{this}] stopped before async invocation was processed"); + #region internal classes private sealed class Reading : InHandler @@ -434,7 +513,8 @@ private sealed class Reading : InHandler private readonly Action _onComplete; private readonly GraphStageLogic _logic; - public Reading(Inlet inlet, int n, IInHandler previous, Action andThen, Action onComplete, GraphStageLogic logic) + public Reading(Inlet inlet, int n, IInHandler previous, Action andThen, Action onComplete, + GraphStageLogic logic) { _inlet = inlet; _n = n; @@ -571,7 +651,9 @@ private Emitting Dequeue() private sealed class EmittingSingle : Emitting { private readonly T _element; - public EmittingSingle(Outlet @out, T element, IOutHandler previous, Action andThen, GraphStageLogic logic) : base(@out, previous, andThen, logic) + + public EmittingSingle(Outlet @out, T element, IOutHandler previous, Action andThen, + GraphStageLogic logic) : base(@out, previous, andThen, logic) { _element = element; } @@ -587,7 +669,8 @@ private sealed class EmittingIterator : Emitting { private readonly IEnumerator _enumerator; - public EmittingIterator(Outlet @out, IEnumerator enumerator, IOutHandler previous, Action andThen, GraphStageLogic logic) : base(@out, previous, andThen, logic) + public EmittingIterator(Outlet @out, IEnumerator enumerator, IOutHandler previous, Action andThen, + GraphStageLogic logic) : base(@out, previous, andThen, logic) { _enumerator = enumerator; } @@ -602,7 +685,10 @@ public override void OnPull() private sealed class EmittingCompletion : Emitting { - public EmittingCompletion(Outlet @out, IOutHandler previous, GraphStageLogic logic) : base(@out, previous, DoNothing, logic) { } + public EmittingCompletion(Outlet @out, IOutHandler previous, GraphStageLogic logic) : base(@out, previous, + DoNothing, logic) + { + } public override void OnPull() => Logic.Complete(Out); } @@ -657,7 +743,8 @@ protected sealed class LambdaInHandler : InHandler /// TBD /// TBD /// TBD - public LambdaInHandler(Action onPush, Action onUpstreamFinish = null, Action onUpstreamFailure = null) + public LambdaInHandler(Action onPush, Action onUpstreamFinish = null, + Action onUpstreamFailure = null) { _onPush = onPush; _onUpstreamFinish = onUpstreamFinish; @@ -729,6 +816,193 @@ public override void OnDownstreamFinish(Exception cause) } } + /// + /// INTERNAL API + /// + internal static class ConcurrentAsyncCallbackState + { + public interface IState; + + /// + /// Event with feedback promise. + /// + public sealed record Event(TE Evt, TaskCompletionSource HandlingPromise); + + /// + /// Waiting for materialization completion or during dispatching of internally queued events + /// + public sealed record Pending(ImmutableList> PendingEvents) : IState + { + public static readonly Pending Empty = new(ImmutableList>.Empty); + } + + /// + /// Stream is initialized and so now threads can send events without any sync overhead + /// + public sealed class Initialized : IState + { + public static readonly Initialized Instance = new(); + + private Initialized() + { + } + } + } + + /// + /// Used by the to invoke all queue events + /// once the interpreter is started. + /// + private interface IConcurrentAsyncCallback + { + void OnStart(); + } + + + /// + /// ConcurrentAsyncCallback allows to send events to a stream from multiple threads regardless of + /// the initialization / materialization state of the stream in a thread-safe manner. + /// + /// The state of this object can be changed both "internally" by the owning or + /// by the external world, i.e. external threads and callers. + /// + /// Specifically, calls to this class can be made: + /// - From the owning , TO + /// + /// The type of input argument accepted + internal sealed class ConcurrentAsyncCallback : IAsyncCallback, IConcurrentAsyncCallback + { + // need this to make the GraphInterpreter happy + private readonly Action _wrappedHandler; + private readonly GraphStageLogic _ownedStage; + + private readonly AtomicReference> _state = + new(ConcurrentAsyncCallbackState.Pending.Empty); + + public ConcurrentAsyncCallback(Action handler, GraphStageLogic ownedStage) + { + var handler1 = handler; + _ownedStage = ownedStage; + _wrappedHandler = obj => + { + if (obj is T e) + handler1(e); + else + throw new ArgumentException( + $"Expected {nameof(obj)} to be of type {typeof(T)}, but was {obj.GetType()}"); + }; + } + + /// + /// Called from the owning . + /// + /// + public void OnStart() + { + while (true) + { + // dispatch callbacks that have been queued from before the interpreter was started + var current = _state.GetAndSet(ConcurrentAsyncCallbackState.Pending.Empty); + switch (current) + { + case ConcurrentAsyncCallbackState.Pending pending: + if (pending.PendingEvents.Count > 0) + { + foreach (var (evt, promise) in pending.PendingEvents) + { + OnAsyncInput(evt, promise); + } + } + + break; + default: + throw new IllegalStateException($"Unexpected callback state: {current}"); + } + + // in the meantime, more callbacks might have been queued (we keep queuing them to ensure order) + if (!_state.CompareAndSet(ConcurrentAsyncCallbackState.Pending.Empty, + ConcurrentAsyncCallbackState.Initialized.Instance)) + // state guaranteed to still be pending + continue; + break; + } + } + + private void OnAsyncInput(T e, TaskCompletionSource promise) + { + _ownedStage.Interpreter.OnAsyncInput(_ownedStage, e, promise, _wrappedHandler); + } + + // External call + public void Invoke(T input) + { + InvokeWithPromise(input, NoPromise); + } + + // External call + public Task InvokeWithFeedback(T input) + { + var promise = new TaskCompletionSource(); + + if (AddToWaiting()) + { + InvokeWithPromise(input, promise); + return promise.Task; + } + + return Task.FromException(_ownedStage.StreamDetachedException); + + /* + * Add this task completion source to the owning logic, so it can be completed `AfterPostStop` + * if it was never handled otherwise. Returns `true` if the logic is still running, `false` otherwise. + */ + bool AddToWaiting() + { + while (true) + { + var previous = _ownedStage._asyncCallbacksInProgress.Value; + if (previous != null) // not stopped + { + // prepend to front of list + var updated = previous.Add(promise); + if (!_ownedStage._asyncCallbacksInProgress.CompareAndSet(previous, updated)) + continue; + return true; + } + + // logic was already stopped + return false; + } + } + } + + private void InvokeWithPromise(T evt, TaskCompletionSource promise) + { + while (true) + { + var state = _state.Value; + switch (state) + { + case ConcurrentAsyncCallbackState.Initialized: + // started - can just dispatch async message to interpreter + OnAsyncInput(evt, promise); + break; + + case ConcurrentAsyncCallbackState.Pending list: + { + // not started yet - queue the event + var e = new ConcurrentAsyncCallbackState.Event(evt, promise); + var newList = list.PendingEvents.Add(e); + if (!_state.CompareAndSet(list, new ConcurrentAsyncCallbackState.Pending(newList))) continue; + break; + } + } + + break; + } + } + } + #endregion @@ -749,7 +1023,8 @@ public override void OnDownstreamFinish(Exception cause) /// /// TBD /// TBD - public static InHandler ConditionalTerminateInput(Func predicate) => new ConditionalTerminateInput(predicate); + public static InHandler ConditionalTerminateInput(Func predicate) => + new ConditionalTerminateInput(predicate); /// /// Input handler that does not terminate the stage upon receiving completion @@ -768,9 +1043,9 @@ public override void OnDownstreamFinish(Exception cause) public static readonly OutHandler IgnoreTerminateOutput = Stage.IgnoreTerminateOutput.Instance; /// - /// TBD + /// Empty cached delegate /// - public static Action DoNothing = () => { }; + public static readonly Action DoNothing = () => { }; /// /// Output handler that terminates the state upon receiving completion if the @@ -778,12 +1053,14 @@ public override void OnDownstreamFinish(Exception cause) /// /// TBD /// TBD - public static OutHandler ConditionalTerminateOutput(Func predicate) => new ConditionalTerminateOutput(predicate); + public static OutHandler ConditionalTerminateOutput(Func predicate) => + new ConditionalTerminateOutput(predicate); /// /// TBD /// public readonly int InCount; + /// /// TBD /// @@ -793,18 +1070,21 @@ public override void OnDownstreamFinish(Exception cause) /// TBD /// internal readonly object[] Handlers; + /// /// TBD /// internal readonly Connection[] PortToConn; + /// /// TBD /// internal int StageId = int.MinValue; + private GraphInterpreter _interpreter; /// - /// TBD + /// INTERNAL API: The that is running this stage. /// /// /// This exception is thrown when the class is not initialized. @@ -814,13 +1094,18 @@ internal GraphInterpreter Interpreter get { if (_interpreter == null) - throw new IllegalStateException("Not yet initialized: only SetHandler is allowed in GraphStageLogic constructor. " + + throw new IllegalStateException( + "Not yet initialized: only SetHandler is allowed in GraphStageLogic constructor. " + "To access materializer use Source/Flow/Sink.Setup factory"); return _interpreter; } - set => _interpreter = value; + set + { + _interpreter = value; + } } + /// /// TBD /// @@ -921,7 +1206,8 @@ protected internal void SetHandler(Inlet inlet, IInHandler handler) /// /// This exception is thrown when the specified is undefined. /// - protected internal void SetHandler(Inlet inlet, Action onPush, Action onUpstreamFinish = null, Action onUpstreamFailure = null) + protected internal void SetHandler(Inlet inlet, Action onPush, Action onUpstreamFinish = null, + Action onUpstreamFailure = null) { if (onPush == null) throw new ArgumentNullException(nameof(onPush), "GraphStageLogic onPush handler must be provided"); @@ -952,7 +1238,8 @@ private void SetHandler(Outlet outlet, IOutHandler handler) /// /// TBD /// TBD - protected internal void SetHandler(Outlet outlet, IOutHandler handler) => SetHandler((Outlet)outlet, handler); + protected internal void SetHandler(Outlet outlet, IOutHandler handler) => + SetHandler((Outlet)outlet, handler); /// /// Assigns callbacks for the events for an . @@ -963,7 +1250,8 @@ private void SetHandler(Outlet outlet, IOutHandler handler) /// /// This exception is thrown when the specified is undefined. /// - protected internal void SetHandler(Outlet outlet, Action onPull, Action onDownstreamFinish = null) + protected internal void SetHandler(Outlet outlet, Action onPull, + Action onDownstreamFinish = null) { if (onPull == null) throw new ArgumentNullException(nameof(onPull), "GraphStageLogic onPull handler must be provided"); @@ -974,13 +1262,15 @@ protected internal void SetHandler(Outlet outlet, Action onPull, Action and . /// [Obsolete("Use method `SetHandlers` instead. Will be removed in v1.5")] - protected internal void SetHandler(Inlet inlet, Outlet outlet, InAndOutGraphStageLogic handler) => + protected internal void SetHandler(Inlet inlet, Outlet outlet, + InAndOutGraphStageLogic handler) => SetHandlers(inlet, outlet, handler); /// /// Assigns callbacks for the events for an and . /// - protected internal void SetHandlers(Inlet inlet, Outlet outlet, InAndOutGraphStageLogic handler) + protected internal void SetHandlers(Inlet inlet, Outlet outlet, + InAndOutGraphStageLogic handler) { SetHandler(inlet, handler); SetHandler(outlet, handler); @@ -1072,7 +1362,9 @@ protected internal void TryPull(Inlet inlet) /// TBD /// protected void Cancel(Inlet inlet, Exception cause) => Interpreter.Cancel(GetConnection(inlet), cause); - protected void Cancel(Inlet inlet) => Interpreter.Cancel(GetConnection(inlet), SubscriptionWithCancelException.NoMoreElementsNeeded.Instance); + + protected void Cancel(Inlet inlet) => Interpreter.Cancel(GetConnection(inlet), + SubscriptionWithCancelException.NoMoreElementsNeeded.Instance); /// /// Once the callback for an input port has been invoked, the element that has been pushed @@ -1092,13 +1384,14 @@ private T Grab(Inlet inlet) var connection = GetConnection(inlet); var element = connection.Slot; - if ((connection.PortState & (InReady | InFailed | InClosed)) == InReady && !ReferenceEquals(element, Empty.Instance)) + if ((connection.PortState & (InReady | InFailed | InClosed)) == InReady && + !ReferenceEquals(element, Empty.Instance)) { // fast path connection.Slot = Empty.Instance; return (T)element; } - + // Slow path for grabbing element from already failed or completed connections if (!IsAvailable(inlet)) throw new ArgumentException($"Cannot get element from already empty input port ({inlet})"); @@ -1110,9 +1403,9 @@ private T Grab(Inlet inlet) connection.Slot = new GraphInterpreter.Failed(failed.Reason, Empty.Instance); return (T)failed.PreviousElement; } - + // Completed - var elem = (T) connection.Slot; + var elem = (T)connection.Slot; connection.Slot = Empty.Instance; return elem; } @@ -1175,12 +1468,13 @@ private bool IsAvailable(Inlet inlet) _ => true // completed but element still there to grab }; } - + if ((connection.PortState & (InReady | InFailed)) == (InReady | InFailed)) { return connection.Slot switch { - GraphInterpreter.Failed failed => !ReferenceEquals(failed.PreviousElement, Empty.Instance), // failed but element still there to grab + GraphInterpreter.Failed failed => !ReferenceEquals(failed.PreviousElement, + Empty.Instance), // failed but element still there to grab _ => false }; } @@ -1292,14 +1586,14 @@ private void Complete(Outlet outlet) /// [InternalApi] private Exception _lastCancellationCause = null; - [InternalApi] + [InternalApi] public void InternalOnDownstreamFinish(Exception cause) { try { if (_lastCancellationCause != null) throw new ArgumentException("OnDownstreamFinish must not be called recursively", nameof(cause)); - + // Some stages might propagate null exceptions due to improper Task continuation handling // (see https://github.com/akkadotnet/Akka.Persistence.Sql/issues/498) // This is a stop gap solution to make sure that Akka.Streams doesn't behave improperly @@ -1315,7 +1609,7 @@ public void InternalOnDownstreamFinish(Exception cause) cause = e; } } - + _lastCancellationCause = cause; CancelStage(_lastCancellationCause); } @@ -1324,10 +1618,9 @@ public void InternalOnDownstreamFinish(Exception cause) _lastCancellationCause = null; } } - + public void CancelStage(Exception cause) { - switch (cause) { case SubscriptionWithCancelException.NonFailureCancellation _: @@ -1338,13 +1631,14 @@ public void CancelStage(Exception cause) break; } } - + /// /// Automatically invokes or on all the input or output ports that have been called, /// then marks the stage as stopped. /// - public void CompleteStage() - => InternalCompleteStage(SubscriptionWithCancelException.StageWasCompleted.Instance, Option.None); + public void CompleteStage() + => InternalCompleteStage(SubscriptionWithCancelException.StageWasCompleted.Instance, + Option.None); /// /// Automatically invokes or on all the input or output ports that have been called, @@ -1414,7 +1708,8 @@ protected bool IsClosed(Outlet outlet) /// /// This exception is thrown when the specified is currently reading. /// - protected void ReadMany(Inlet inlet, int n, Action> andThen, Action> onComplete) + protected void ReadMany(Inlet inlet, int n, Action> andThen, + Action> onComplete) { if (n < 0) throw new ArgumentException("Cannot read negative number of elements"); @@ -1576,7 +1871,7 @@ protected internal void EmitMultiple(Outlet outlet, IEnumerator enumera /// Emit an element through the given outlet and continue with the given thunk /// afterwards, suspending execution if necessary. /// This action replaces the for the given outlet if suspension - /// is needed and reinstalls the current handler upon receiving an + /// is needed and re-installs the current handler upon receiving an /// signal (before invoking the function). /// /// TBD @@ -1591,14 +1886,15 @@ protected internal void Emit(Outlet outlet, T element, Action andThen) andThen(); } else - SetOrAddEmitting(outlet, new EmittingSingle(outlet, element, GetNonEmittingHandler(outlet), andThen, this)); + SetOrAddEmitting(outlet, + new EmittingSingle(outlet, element, GetNonEmittingHandler(outlet), andThen, this)); } /// /// Emit an element through the given outlet and continue with the given thunk /// afterwards, suspending execution if necessary. /// This action replaces the for the given outlet if suspension - /// is needed and reinstalls the current handler upon receiving an . + /// is needed and re-installs the current handler upon receiving an . /// /// TBD /// TBD @@ -1639,7 +1935,8 @@ private void SetOrAddEmitting(Outlet outlet, Emitting next) /// TBD /// TBD /// TBD - protected void PassAlong(Inlet from, Outlet to, bool doFinish = true, bool doFail = true, bool doPull = false) + protected void PassAlong(Inlet from, Outlet to, bool doFinish = true, bool doFail = true, + bool doPull = false) where TIn : TOut { var passHandler = new PassAlongHandler(from, to, this, doFinish, doFail); @@ -1662,14 +1959,49 @@ protected void PassAlong(Inlet from, Outlet to, bool doFin /// is safe to be called from other threads and it will in the background thread-safely /// delegate to the passed callback function. I.e. it will be called by the external world and /// the passed handler will be invoked eventually in a thread-safe way by the execution environment. + /// + /// In case the stream is not yet materialized, the callback will buffer events until the stream is available. + /// + /// has an internal + /// that will be failed if the event cannot be processed due to stream completion. + /// + /// To be thread safe this method must only be called from either the constructor of the graph operator during + /// materialization or one of the methods invoked by the graph operator machinery, such as `OnPush` and `OnPull`. /// /// This object can be cached and reused within the same . /// - /// TBD - /// TBD - /// TBD protected Action GetAsyncCallback(Action handler) - => @event => Interpreter.OnAsyncInput(this, @event, x => handler((T)x)); + { + // backwards compat + return GetTypedAsyncCallback(handler).Invoke; + } + + /// + /// Obtain a callback object that can be used asynchronously to re-enter the + /// current with an asynchronous notification. The delegate returned + /// is safe to be called from other threads and it will in the background thread-safely + /// delegate to the passed callback function. I.e. it will be called by the external world and + /// the passed handler will be invoked eventually in a thread-safe way by the execution environment. + /// + /// In case the stream is not yet materialized, the callback will buffer events until the stream is available. + /// + /// has an internal + /// that will be failed if the event cannot be processed due to stream completion. + /// + /// To be thread safe this method must only be called from either the constructor of the graph operator during + /// materialization or one of the methods invoked by the graph operator machinery, such as `OnPush` and `OnPull`. + /// + /// This object can be cached and reused within the same . + /// + protected IAsyncCallback GetTypedAsyncCallback(Action handler) + { + var callback = new ConcurrentAsyncCallback(handler, this); + if (_interpreter != null) callback.OnStart(); + else _callbacksWaitingForInterpreter = _callbacksWaitingForInterpreter.Add(callback); + + // backwards compatibility + return callback; + } /// /// Obtain a callback object that can be used asynchronously to re-enter the @@ -1680,10 +2012,12 @@ protected Action GetAsyncCallback(Action handler) /// /// This object can be cached and reused within the same . /// - /// TBD - /// TBD protected Action GetAsyncCallback(Action handler) - => () => Interpreter.OnAsyncInput(this, NotUsed.Instance, _ => handler()); + { + // ugly + var callback = GetTypedAsyncCallback(_ => handler()); + return () => callback.Invoke(NotUsed.Instance); + } /// /// Initialize a which can be used to interact with from the outside world "as-if" an actor. @@ -1731,14 +2065,15 @@ protected StageActor GetStageActor(StageActorRef.Receive receive) [ApiMayChange] protected virtual string StageActorName => ""; - /// - /// TBD - /// - protected internal virtual void BeforePreStart() { } + protected internal virtual void BeforePreStart() + { + foreach(var cb in _callbacksWaitingForInterpreter) + { + cb.OnStart(); + } + _callbacksWaitingForInterpreter = _callbacksWaitingForInterpreter.Clear(); + } - /// - /// TBD - /// protected internal virtual void AfterPostStop() { if (_stageActor != null) @@ -1746,17 +2081,67 @@ protected internal virtual void AfterPostStop() _stageActor.Stop(); _stageActor = null; } + + // make sure any InvokeWithFeedback after this fails fast + var inProgress = _asyncCallbacksInProgress.GetAndSet(null); + if (inProgress is { Count: > 0 }) + { + var ex = StreamDetachedException; + foreach (var tcs in inProgress) + { + tcs.TrySetException(ex); + } + } + // TODO: cleanUpSubstreams + } + + private long _asyncCleanupCounter = 0; + + /// + /// Called from interpreter thread by + /// + internal void OnFeedbackDispatched() + { + _asyncCleanupCounter++; + + // 256 seemed to be a sweet spot in JVM benchmarks + // It means that at most 255 completed promises are retained per logic that + // uses invokeWithFeedback callbacks. + if(_asyncCleanupCounter % 256 == 0) + { + void Cleanup() + { + while (true) + { + var previous = _asyncCallbacksInProgress.Value; + if (previous is not null) + { + var updated = previous.Where(c => !c.Task.IsCompleted).ToImmutableList(); + if (!_asyncCallbacksInProgress.CompareAndSet(previous, updated)) continue; + } + + break; + } + } + + Cleanup(); + } } + /// /// Invoked before any external events are processed, at the startup of the stage. /// - public virtual void PreStart() { } + public virtual void PreStart() + { + } /// /// Invoked after processing of external events stopped because the stage is about to stop or fail. /// - public virtual void PostStop() { } + public virtual void PostStop() + { + } /// /// INTERNAL API @@ -1766,7 +2151,6 @@ public virtual void PostStop() { } /// the ). Care needs to be taken to cancel this Inlet /// when the stage shuts down lest the corresponding Sink be left hanging. /// - /// TBD [InternalApi] protected class SubSinkInlet { @@ -1777,11 +2161,6 @@ protected class SubSinkInlet private bool _pulled; private readonly SubSink _sink; - /// - /// TBD - /// - /// TBD - /// TBD public SubSinkInlet(GraphStageLogic logic, string name) { _name = name; @@ -1795,58 +2174,55 @@ public SubSinkInlet(GraphStageLogic logic, string name) { _elem = (T)next.Element; _pulled = false; - _handler.OnPush(); + _handler!.OnPush(); } else if (msg is OnComplete) { _closed = true; - _handler.OnUpstreamFinish(); + _handler!.OnUpstreamFinish(); } else if (msg is OnError error) { _closed = true; - _handler.OnUpstreamFailure(error.Cause); + _handler!.OnUpstreamFailure(error.Cause); } })); } - /// - /// TBD - /// public IGraph, NotUsed> Sink => _sink; /// - /// TBD + /// Sets the input handler. /// - /// TBD public void SetHandler(IInHandler handler) => _handler = handler; /// - /// TBD + /// Returns true if there is an element available to be grabbed. /// public bool IsAvailable => _elem.HasValue; /// - /// TBD + /// Returns true if this inlet is closed. /// public bool IsClosed => _closed; /// - /// TBD + /// Returns true if this inlet has been pulled and is not closed. /// public bool HasBeenPulled => _pulled && !IsClosed; /// - /// TBD + /// Grab the most recent element from this inlet. /// /// /// This exception is thrown when this inlet is empty. /// - /// TBD + /// The element public T Grab() { if (!_elem.HasValue) - throw new IllegalStateException($"cannot grab element from port {this} when data has not yet arrived"); + throw new IllegalStateException( + $"cannot grab element from port {this} when data has not yet arrived"); var ret = _elem.Value; _elem = Option.None; @@ -1854,7 +2230,7 @@ public T Grab() } /// - /// TBD + /// Pull data from upstream. /// /// /// This exception is thrown when this inlet is closed or already pulled. @@ -1871,12 +2247,12 @@ public void Pull() } /// - /// TBD + /// Cancel this graph stage using a default reason. /// public void Cancel() => Cancel(SubscriptionWithCancelException.NoMoreElementsNeeded.Instance); - + /// - /// TBD + /// Cancel this graph stage with a specific reason. /// public void Cancel(Exception cause) { @@ -1884,16 +2260,9 @@ public void Cancel(Exception cause) _sink.CancelSubstream(cause); } - /// public override string ToString() => $"SubSinkInlet{_name}"; } - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD protected SubSinkInlet CreateSubSinkInlet(string name) => new(this, name); /// @@ -1907,7 +2276,6 @@ public void Cancel(Exception cause) /// Outlet in case the corresponding Source is not materialized within a /// given time limit, see e.g. ActorMaterializerSettings. /// - /// TBD [InternalApi] protected class SubSourceOutlet { @@ -1917,11 +2285,6 @@ protected class SubSourceOutlet private bool _available; private bool _closed; - /// - /// TBD - /// - /// TBD - /// TBD public SubSourceOutlet(GraphStageLogic logic, string name) { _name = name; @@ -1933,7 +2296,7 @@ public SubSourceOutlet(GraphStageLogic logic, string name) if (!_closed) { _available = true; - _handler.OnPull(); + _handler!.OnPull(); } } else if (command is SubSink.Cancel cancel) @@ -1942,7 +2305,7 @@ public SubSourceOutlet(GraphStageLogic logic, string name) { _available = false; _closed = true; - _handler.OnDownstreamFinish(SubscriptionWithCancelException.StageWasCompleted.Instance); + _handler!.OnDownstreamFinish(SubscriptionWithCancelException.StageWasCompleted.Instance); } } })); @@ -1980,13 +2343,12 @@ public void Timeout(TimeSpan d) /// Set OutHandler for this dynamic output port; this needs to be done before /// the first substream callback can arrive. /// - /// TBD public void SetHandler(IOutHandler handler) => _handler = handler; /// /// Push to this output port. /// - /// TBD + /// The element to be processed public void Push(T elem) { _available = false; @@ -2006,7 +2368,6 @@ public void Complete() /// /// Fail this output port. /// - /// TBD public void Fail(Exception ex) { _available = false; @@ -2104,7 +2465,6 @@ public virtual void OnDownstreamFinish(Exception cause) => /// public abstract class InAndOutHandler : IInHandler, IOutHandler { - /// /// Called when the input port has a new element available. The actual element can be retrieved via the method. /// @@ -2279,14 +2639,21 @@ public class StageActorRefNotInitializedException : Exception /// The singleton instance of this exception /// public static readonly StageActorRefNotInitializedException Instance = new(); - private StageActorRefNotInitializedException() : base("You must first call GetStageActorRef(StageActorRef.Receive), to initialize the actor's behavior") { } + + private StageActorRefNotInitializedException() : base( + "You must first call GetStageActorRef(StageActorRef.Receive), to initialize the actor's behavior") + { + } /// /// Initializes a new instance of the class. /// /// The that holds the serialized object data about the exception being thrown. /// The that contains contextual information about the source or destination. - protected StageActorRefNotInitializedException(SerializationInfo info, StreamingContext context) : base(info, context) { } + protected StageActorRefNotInitializedException(SerializationInfo info, StreamingContext context) : base(info, + context) + { + } } /// @@ -2299,12 +2666,16 @@ public sealed class EagerTerminateInput : InHandler /// public static readonly EagerTerminateInput Instance = new(); - private EagerTerminateInput() { } + private EagerTerminateInput() + { + } /// /// TBD /// - public override void OnPush() { } + public override void OnPush() + { + } } /// @@ -2317,16 +2688,23 @@ public sealed class IgnoreTerminateInput : InHandler /// public static readonly IgnoreTerminateInput Instance = new(); - private IgnoreTerminateInput() { } + private IgnoreTerminateInput() + { + } /// /// TBD /// - public override void OnPush() { } + public override void OnPush() + { + } + /// /// TBD /// - public override void OnUpstreamFinish() { } + public override void OnUpstreamFinish() + { + } } /// @@ -2346,7 +2724,10 @@ public class ConditionalTerminateInput : InHandler /// /// TBD /// - public override void OnPush() { } + public override void OnPush() + { + } + /// /// TBD /// @@ -2367,21 +2748,31 @@ public sealed class TotallyIgnorantInput : InHandler /// public static readonly TotallyIgnorantInput Instance = new(); - private TotallyIgnorantInput() { } + private TotallyIgnorantInput() + { + } /// /// TBD /// - public override void OnPush() { } + public override void OnPush() + { + } + /// /// TBD /// - public override void OnUpstreamFinish() { } + public override void OnUpstreamFinish() + { + } + /// /// TBD /// /// TBD - public override void OnUpstreamFailure(Exception e) { } + public override void OnUpstreamFailure(Exception e) + { + } } /// @@ -2394,11 +2785,16 @@ public sealed class EagerTerminateOutput : OutHandler /// public static readonly EagerTerminateOutput Instance = new(); - private EagerTerminateOutput() { } + private EagerTerminateOutput() + { + } + /// /// TBD /// - public override void OnPull() { } + public override void OnPull() + { + } } /// @@ -2411,16 +2807,23 @@ public sealed class IgnoreTerminateOutput : OutHandler /// public static readonly IgnoreTerminateOutput Instance = new(); - private IgnoreTerminateOutput() { } + private IgnoreTerminateOutput() + { + } /// /// TBD /// - public override void OnPull() { } + public override void OnPull() + { + } + /// /// TBD /// - public override void OnDownstreamFinish(Exception cause) { } + public override void OnDownstreamFinish(Exception cause) + { + } } /// @@ -2440,7 +2843,10 @@ public class ConditionalTerminateOutput : OutHandler /// /// TBD /// - public override void OnPull() { } + public override void OnPull() + { + } + /// /// TBD /// @@ -2479,7 +2885,9 @@ public StageActor( { case LocalActorRef r: _cell = r.Cell; break; case RepointableActorRef r: _cell = (ActorCell)r.Underlying; break; - default: throw new IllegalStateException($"Stream supervisor must be a local actor, was [{materializer.Supervisor.GetType()}]"); + default: + throw new IllegalStateException( + $"Stream supervisor must be a local actor, was [{materializer.Supervisor.GetType()}]"); } _functionRef = _cell.AddFunctionRef((sender, message) => @@ -2488,8 +2896,9 @@ public StageActor( { case PoisonPill _: case Kill _: - materializer.Logger.Warning("{0} message sent to StageActor({1}) will be ignored, since it is not a real Actor. " + - "Use a custom message type to communicate with it instead.", message, _functionRef.Path); + materializer.Logger.Warning( + "{0} message sent to StageActor({1}) will be ignored, since it is not a real Actor. " + + "Use a custom message type to communicate with it instead.", message, _functionRef.Path); break; default: _callback((sender, message)); break; } @@ -2559,7 +2968,9 @@ internal void InternalReceive((IActorRef, object) pack) /// TBD internal class GraphStageLogicWithCallbackWrapper : GraphStageLogic { - private interface ICallbackState { } + private interface ICallbackState + { + } private sealed class NotInitialized : ICallbackState { @@ -2640,4 +3051,4 @@ private void Locked(Action body) } } } -} +} \ No newline at end of file