diff --git a/src/core/Akka.Streams.Tests/Actor/ActorPublisherSpec.cs b/src/core/Akka.Streams.Tests/Actor/ActorPublisherSpec.cs index e4b8c61b73e..89faa930035 100644 --- a/src/core/Akka.Streams.Tests/Actor/ActorPublisherSpec.cs +++ b/src/core/Akka.Streams.Tests/Actor/ActorPublisherSpec.cs @@ -434,6 +434,24 @@ public void ActorPublisher_should_use_dispatcher_from_props() actorRef.Tell(ThreadName.Instance); ExpectMsg().Should().Contain("my-dispatcher1"); } + + [Fact] + public void ActorPublisher_should_handle_stash() + { + var probe = this.CreateTestProbe(); + var actorRef = Sys.ActorOf(TestPublisherWithStash.Props(probe.Ref)); + var p = new ActorPublisherImpl(actorRef); + var s = this.CreateProbe(); + p.Subscribe(s); + s.Request(2); + s.Request(3); + actorRef.Tell("unstash"); + probe.ExpectMsg(new TotalDemand(5)); + probe.ExpectMsg(new TotalDemand(5)); + s.Request(4); + probe.ExpectMsg(new TotalDemand(9)); + s.Cancel(); + } } internal class TestPublisher : ActorPublisher @@ -466,6 +484,34 @@ protected override bool Receive(object message) } } + internal class TestPublisherWithStash : TestPublisher, IWithUnboundedStash + { + public TestPublisherWithStash(IActorRef probe) : base(probe) + { + } + + public new static Props Props(IActorRef probe, bool useTestDispatcher = true) + { + var p = Akka.Actor.Props.Create(() => new TestPublisherWithStash(probe)); + return useTestDispatcher ? p.WithDispatcher("akka.test.stream-dispatcher") : p; + } + + protected override bool Receive(object message) + { + if ("unstash".Equals(message)) + { + Stash.UnstashAll(); + Context.Become(base.Receive); + } + else + Stash.Stash(); + + return true; + } + + public IStash Stash { get; set; } + } + internal class Sender : ActorPublisher { public static Props Props { get; } = Props.Create().WithDispatcher("akka.test.stream-dispatcher"); @@ -571,6 +617,17 @@ public TotalDemand(long elements) { Elements = elements; } + + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) return false; + if (ReferenceEquals(this, obj)) return true; + return obj.GetType() == GetType() && Equals((TotalDemand) obj); + } + + protected bool Equals(TotalDemand other) => Elements == other.Elements; + + public override int GetHashCode() => Elements.GetHashCode(); } internal class Produce diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowGroupedWithinSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowGroupedWithinSpec.cs index c3a413b7e4c..ba33105c83e 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowGroupedWithinSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowGroupedWithinSpec.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Generic; +using System.Collections.Immutable; using System.Linq; using Akka.Streams.Dsl; using Akka.Streams.TestKit; @@ -240,5 +241,16 @@ public void A_GroupedWithin_must_group_with_rest() RandomTestRange(Sys) .ForEach(_ => RunScript(script(), Settings, flow => flow.GroupedWithin(3, TimeSpan.FromMinutes(10)))); } + + [Fact] + public void A_GroupedWithin_must_group_with_small_groups_with_backpressure() + { + var t = Source.From(Enumerable.Range(1, 10)) + .GroupedWithin(1, TimeSpan.FromDays(1)) + .Throttle(1, TimeSpan.FromMilliseconds(110), 0, ThrottleMode.Shaping) + .RunWith(Sink.Seq>(), Materializer); + t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); + t.Result.ShouldAllBeEquivalentTo(Enumerable.Range(1, 10).Select(i => new List {i})); + } } } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs index 763108435d7..30976c59c81 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs @@ -222,7 +222,7 @@ public void A_Flow_with_SelectAsync_must_finish_after_task_failure() .Grouped(10) .RunWith(Sink.First>(), Materializer); - t.Wait(TimeSpan.FromSeconds(1)).Should().BeTrue(); + t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); t.Result.ShouldAllBeEquivalentTo(new[] {1, 2}); }, Materializer); } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowSumSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowSumSpec.cs index 43e66bcd854..992ae7cfb27 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowSumSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowSumSpec.cs @@ -18,6 +18,7 @@ namespace Akka.Streams.Tests.Dsl { + //JVMN : FlowReduceSpec public class FlowSumSpec : AkkaSpec { private ActorMaterializer Materializer { get; } @@ -128,5 +129,47 @@ public void A_Sum_must_complete_task_with_failure_when_reducing_function_throws( task.Invoking(t => t.Wait(TimeSpan.FromSeconds(3))).ShouldThrow().WithMessage("test"); }, Materializer); } + + [Fact] + public void A_Sum_must_fail_on_Empty_stream_using_Source_RunSum() + { + this.AssertAllStagesStopped(() => + { + var result = Source.Empty().RunSum((i, i1) => i + i1, Materializer); + result.Invoking(t => t.Wait(TimeSpan.FromSeconds(3))) + .ShouldThrow() + .And.Message.Should() + .Contain("empty stream"); + }, Materializer); + } + + [Fact] + public void A_Sum_must_fail_on_Empty_stream_using_Flow_Sum() + { + this.AssertAllStagesStopped(() => + { + var result = Source.Empty() + .Via(SumFlow) + .RunWith(Sink.Aggregate(0, (i, i1) => i + i1), Materializer); + result.Invoking(t => t.Wait(TimeSpan.FromSeconds(3))) + .ShouldThrow() + .And.Message.Should() + .Contain("empty stream"); + }, Materializer); + } + + [Fact] + public void A_Sum_must_fail_on_Empty_stream_using_Sink_Sum() + { + this.AssertAllStagesStopped(() => + { + var result = Source.Empty() + .RunWith(SumSink, Materializer); + result.Invoking(t => t.Wait(TimeSpan.FromSeconds(3))) + .ShouldThrow() + .And.Message.Should() + .Contain("empty stream"); + }, Materializer); + } } } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowThrottleSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowThrottleSpec.cs index 0cae3904e37..988d6f59f77 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowThrottleSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowThrottleSpec.cs @@ -87,6 +87,28 @@ public void Throttle_for_single_cost_elements_must_accept_very_low_rates() }, Materializer); } + [Fact] + public void Throttle_for_single_cost_elements_must_() + { + var sharedThrottle = Flow.Create().Throttle(1, TimeSpan.FromDays(1), 1, ThrottleMode.Enforcing); + + // If there is accidental shared state then we would not be able to pass through the single element + var t = Source.Single(1) + .Via(sharedThrottle) + .Via(sharedThrottle) + .RunWith(Sink.First(), Materializer); + t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); + t.Result.Should().Be(1); + + // It works with a new stream, too + t = Source.Single(2) + .Via(sharedThrottle) + .Via(sharedThrottle) + .RunWith(Sink.First(), Materializer); + t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); + t.Result.Should().Be(2); + } + [Fact] public void Throttle_for_single_cost_elements_must_emit_single_element_per_tick() { diff --git a/src/core/Akka.Streams.Tests/Dsl/GraphMatValueSpec.cs b/src/core/Akka.Streams.Tests/Dsl/GraphMatValueSpec.cs index 5cdfcee338e..c41d5fdf0ca 100644 --- a/src/core/Akka.Streams.Tests/Dsl/GraphMatValueSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/GraphMatValueSpec.cs @@ -244,6 +244,32 @@ public void A_Graph_with_materialized_value_must_produce_NotUsed_when_starting_f })), Keep.Right).To(Sink.Ignore()).Run(materializer).Should().Be(NotUsed.Instance); done.Should().BeTrue(); } + + [Fact] + public void A_Graph_with_Identity_Flow_optimization_even_if_port_are_wired_in_an_arbitrary_higher_nesting_level() + { + var mat2 = Sys.Materializer(ActorMaterializerSettings.Create(Sys).WithAutoFusing(false)); + + var subFlow = GraphDsl.Create(b => + { + var zip = b.Add(new Zip()); + var bc = b.Add(new Broadcast(2)); + + b.From(bc.Out(0)).To(zip.In0); + b.From(bc.Out(1)).To(zip.In1); + + return new FlowShape>(bc.In, zip.Out); + }).Named("NestedFlow"); + + var nest1 = Flow.Create().Via(subFlow); + var nest2 = Flow.Create().Via(nest1); + var nest3 = Flow.Create().Via(nest2); + var nest4 = Flow.Create().Via(nest3); + + //fails + var matValue = Source.Single("").Via(nest4).To(Sink.Ignore>()).Run(mat2); + matValue.Should().Be(NotUsed.Instance); + } } } diff --git a/src/core/Akka.Streams.Tests/IO/TcpSpec.cs b/src/core/Akka.Streams.Tests/IO/TcpSpec.cs index 74f86b42d0c..a6a63387ffd 100644 --- a/src/core/Akka.Streams.Tests/IO/TcpSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/TcpSpec.cs @@ -464,6 +464,36 @@ public void Outgoing_TCP_stream_must_handle_when_connection_actor_terminates_une system2.Terminate().Wait(); } + [Fact(Skip = "Fix me")] + public void Outgoing_TCP_stream_must_not_thrown_on_unbind_after_system_has_been_shut_down() + { + var sys2 = ActorSystem.Create("shutdown-test-system"); + var mat2 = sys2.Materializer(); + + try + { + var address = TestUtils.TemporaryServerAddress(); + var bindingTask = sys2.TcpStream() + .BindAndHandle(Flow.Create(), mat2, address.Address.ToString(), address.Port); + + // Ensure server is running + var t = Source.Single(ByteString.FromString("")) + .Via(sys2.TcpStream().OutgoingConnection(address)) + .RunWith(Sink.Ignore(), mat2); + t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); + + sys2.Terminate().Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); + + bindingTask.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); + var binding = bindingTask.Result; + binding.Unbind().Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); + } + finally + { + sys2.Terminate().Wait(TimeSpan.FromSeconds(5)); + } + } + private void ValidateServerClientCommunication(ByteString testData, ServerConnection serverConnection, TcpReadProbe readProbe, TcpWriteProbe writeProbe) { serverConnection.Write(testData); diff --git a/src/core/Akka.Streams/Actors/ActorPublisher.cs b/src/core/Akka.Streams/Actors/ActorPublisher.cs index b58a800fc7b..b8b98d2baaf 100644 --- a/src/core/Akka.Streams/Actors/ActorPublisher.cs +++ b/src/core/Akka.Streams/Actors/ActorPublisher.cs @@ -51,10 +51,21 @@ public interface IActorPublisherMessage: IDeadLetterSuppression { } public sealed class Request : IActorPublisherMessage { public readonly long Count; + public Request(long count) { Count = count; } + + /// + /// INTERNAL API: needed for stash support + /// + internal void MarkProcessed() => IsProcessed = true; + + /// + /// INTERNAL API: needed for stash support + /// + internal bool IsProcessed { get; private set; } } /// @@ -386,16 +397,26 @@ protected override bool AroundReceive(Receive receive, object message) if (message is Request) { var req = (Request) message; - if (req.Count < 1) + if (req.IsProcessed) { - if (_lifecycleState == LifecycleState.Active) - OnError(new ArgumentException("Number of requested elements must be positive. Rule 3.9")); + // it's an unstashed Request, demand is already handled + base.AroundReceive(receive, req); } else { - _demand += req.Count; - if (_demand < 0) _demand = long.MaxValue; // long overflow: effectively unbounded - base.AroundReceive(receive, message); + if (req.Count < 1) + { + if (_lifecycleState == LifecycleState.Active) + OnError(new ArgumentException("Number of requested elements must be positive. Rule 3.9")); + } + else + { + _demand += req.Count; + if (_demand < 0) + _demand = long.MaxValue; // long overflow: effectively unbounded + req.MarkProcessed(); + base.AroundReceive(receive, message); + } } } else if (message is Subscribe) @@ -435,8 +456,12 @@ protected override bool AroundReceive(Receive receive, object message) } else if (message is Cancel) { - CancelSelf(); - base.AroundReceive(receive, message); + if (_lifecycleState != LifecycleState.Canceled) + { + // possible to receive again in case of stash + CancelSelf(); + base.AroundReceive(receive, message); + } } else if (message is SubscriptionTimeoutExceeded) { diff --git a/src/core/Akka.Streams/Dsl/FlowOperations.cs b/src/core/Akka.Streams/Dsl/FlowOperations.cs index 9d60f5f6b35..449cc319606 100644 --- a/src/core/Akka.Streams/Dsl/FlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/FlowOperations.cs @@ -451,6 +451,10 @@ public static Flow Aggregate(this Flo /// Similar to but uses first element as zero element. /// Applies the given function towards its current and next value, /// yielding the next current value. + /// + /// If the stream is empty (i.e. completes before signalling any elements), + /// the sum stage will fail its downstream with a , + /// which is semantically in-line with that standard library collections do in such situations. /// /// Emits when upstream completes /// diff --git a/src/core/Akka.Streams/Dsl/Sink.cs b/src/core/Akka.Streams/Dsl/Sink.cs index 1dfc89973b4..a4bad45c62a 100644 --- a/src/core/Akka.Streams/Dsl/Sink.cs +++ b/src/core/Akka.Streams/Dsl/Sink.cs @@ -271,7 +271,11 @@ public static Sink> Aggregate(TOut zero, Func will be completed with value of the final /// function evaluation when the input stream ends, or completed with `Failure` - /// if there is a failure signaled in the stream. + /// if there is a failure signaled in the stream. + /// + /// If the stream is empty (i.e. completes before signalling any elements), + /// the sum stage will fail its downstream with a , + /// which is semantically in-line with that standard library collections do in such situations. /// public static Sink> Sum(Func reduce) => Flow.Create() .Sum(reduce) diff --git a/src/core/Akka.Streams/Dsl/SourceOperations.cs b/src/core/Akka.Streams/Dsl/SourceOperations.cs index f38a8cab49e..5eecadcebfc 100644 --- a/src/core/Akka.Streams/Dsl/SourceOperations.cs +++ b/src/core/Akka.Streams/Dsl/SourceOperations.cs @@ -451,6 +451,10 @@ public static Source Aggregate(this Source but uses first element as zero element. /// Applies the given function towards its current and next value, /// yielding the next current value. + /// + /// If the stream is empty (i.e. completes before signalling any elements), + /// the sum stage will fail its downstream with a , + /// which is semantically in-line with that standard library collections do in such situations. /// /// Emits when upstream completes /// diff --git a/src/core/Akka.Streams/Dsl/SubFlowOperations.cs b/src/core/Akka.Streams/Dsl/SubFlowOperations.cs index e3dda1098b4..e147aea260c 100644 --- a/src/core/Akka.Streams/Dsl/SubFlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/SubFlowOperations.cs @@ -451,6 +451,10 @@ public static SubFlow Aggregate but uses first element as zero element. /// Applies the given function towards its current and next value, /// yielding the next current value. + /// + /// If the stream is empty (i.e. completes before signalling any elements), + /// the sum stage will fail its downstream with a , + /// which is semantically in-line with that standard library collections do in such situations. /// /// Emits when upstream completes /// diff --git a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs index 0f1646717be..3976aa7d456 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs @@ -1197,7 +1197,23 @@ private sealed class Logic : GraphStageLogic { private class Holder { - public Result Elem { get; set; } + private readonly Action> _callback; + + public Holder(Result element, Action> callback) + { + _callback = callback; + Element = element; + } + + public Result Element { get; private set; } + + public void Invoke(Result result) + { + Element = result.IsSuccess && result.Value == null + ? Result.Failure(ReactiveStreamsCompliance.ElementMustNotBeNullException) + : result; + _callback(this); + } } private static readonly Result NotYetThere = Result.Failure(new Exception()); @@ -1205,7 +1221,7 @@ private class Holder private readonly SelectAsync _stage; private readonly Decider _decider; private IBuffer> _buffer; - private readonly Action, Result>> _taskCallback; + private readonly Action> _taskCallback; public Logic(Attributes inheritedAttributes, SelectAsync stage) : base(stage.Shape) { @@ -1213,23 +1229,20 @@ public Logic(Attributes inheritedAttributes, SelectAsync stage) : bas var attr = inheritedAttributes.GetAttribute(null); _decider = attr != null ? attr.Decider : Deciders.StoppingDecider; - _taskCallback = GetAsyncCallback, Result>>(t => + _taskCallback = GetAsyncCallback>(t => { - var holder = t.Item1; - var result = t.Item2; - if (!result.IsSuccess) - FailOrPull(holder, result); - else + var element = t.Element; + if (!element.IsSuccess) { - if (result.Value == null) - FailOrPull(holder, Result.Failure(ReactiveStreamsCompliance.ElementMustNotBeNullException)); - else + if (_decider(element.Exception) == Directive.Stop) { - holder.Elem = result; - if (IsAvailable(_stage.Out)) - PushOne(); + FailStage(element.Exception); + return; } } + + if (IsAvailable(stage.Out)) + PushOne(); }); SetHandler(_stage.In, onPush: () => @@ -1237,9 +1250,9 @@ public Logic(Attributes inheritedAttributes, SelectAsync stage) : bas try { var task = _stage._mapFunc(Grab(_stage.In)); - var holder = new Holder() {Elem = NotYetThere}; + var holder = new Holder(NotYetThere, _taskCallback); _buffer.Enqueue(holder); - task.ContinueWith(t => _taskCallback(Tuple.Create(holder, Result.FromTask(t))), TaskContinuationOptions.ExecuteSynchronously); + task.ContinueWith(continuationAction: t => holder.Invoke(Result.FromTask(t)), continuationOptions: TaskContinuationOptions.ExecuteSynchronously); } catch (Exception e) { @@ -1260,18 +1273,6 @@ public Logic(Attributes inheritedAttributes, SelectAsync stage) : bas public override void PreStart() => _buffer = Buffer.Create>(_stage._parallelism, Materializer); - private void FailOrPull(Holder holder, Result failure) - { - if (_decider(failure.Exception) == Directive.Stop) - FailStage(failure.Exception); - else - { - holder.Elem = failure; - if (IsAvailable(_stage.Out)) - PushOne(); - } - } - private void PushOne() { var inlet = _stage.In; @@ -1279,17 +1280,19 @@ private void PushOne() { if (_buffer.IsEmpty) { - if (IsClosed(inlet)) CompleteStage(); - else if (!HasBeenPulled(inlet)) Pull(inlet); + if (IsClosed(inlet)) + CompleteStage(); + else if (!HasBeenPulled(inlet)) + Pull(inlet); } - else if (_buffer.Peek().Elem == NotYetThere) + else if (_buffer.Peek().Element == NotYetThere) { if (Todo < _stage._parallelism && !HasBeenPulled(inlet)) TryPull(inlet); } else { - var result = _buffer.Dequeue().Elem; + var result = _buffer.Dequeue().Element; if (!result.IsSuccess) continue; @@ -1301,6 +1304,8 @@ private void PushOne() break; } } + + public override string ToString() => $"SelectAsync.Logic(buffer={_buffer})"; } #endregion @@ -1308,8 +1313,8 @@ private void PushOne() private readonly int _parallelism; private readonly Func> _mapFunc; - public readonly Inlet In = new Inlet("in"); - public readonly Outlet Out = new Outlet("out"); + public readonly Inlet In = new Inlet("SelectAsync.in"); + public readonly Outlet Out = new Outlet("SelectAsync.out"); public SelectAsync(int parallelism, Func> mapFunc) { @@ -1345,25 +1350,33 @@ public Logic(Attributes inheritedAttributes, SelectAsyncUnordered sta _stage = stage; var attr = inheritedAttributes.GetAttribute(null); _decider = attr != null ? attr.Decider : Deciders.StoppingDecider; + _taskCallback = GetAsyncCallback>(result => { _inFlight--; - - if (!result.IsSuccess) - FailOrPull(result.Exception); - else + if (result.IsSuccess && result.Value != null) { - if (result.Value == null) - FailOrPull(ReactiveStreamsCompliance.ElementMustNotBeNullException); - else if (IsAvailable(_stage.Out)) + if (IsAvailable(stage.Out)) { - if (!HasBeenPulled(_stage.In)) - TryPull(_stage.In); - Push(_stage.Out, result.Value); + if (!HasBeenPulled(stage.In)) + TryPull(stage.In); + Push(stage.Out, result.Value); } else _buffer.Enqueue(result.Value); } + else + { + var ex = !result.IsSuccess + ? result.Exception + : ReactiveStreamsCompliance.ElementMustNotBeNullException; + if (_decider(ex) == Directive.Stop) + FailStage(ex); + else if (IsClosed(stage.In) && Todo == 0) + CompleteStage(); + else if (!HasBeenPulled(stage.In)) + TryPull(stage.In); + } }); SetHandler(_stage.In, onPush: () => @@ -1404,24 +1417,15 @@ public Logic(Attributes inheritedAttributes, SelectAsyncUnordered sta public override void PreStart() => _buffer = Buffer.Create(_stage._parallelism, Materializer); - private void FailOrPull(Exception failure) - { - var inlet = _stage.In; - if (_decider(failure) == Directive.Stop) - FailStage(failure); - else if (IsClosed(inlet) && Todo == 0) - CompleteStage(); - else if (!HasBeenPulled(inlet)) - TryPull(inlet); - } + public override string ToString() => $"SelectAsyncUnordered.Logic(InFlight={_inFlight}, buffer= {_buffer}"; } #endregion private readonly int _parallelism; private readonly Func> _mapFunc; - public readonly Inlet In = new Inlet("in"); - public readonly Outlet Out = new Outlet("out"); + public readonly Inlet In = new Inlet("SelectAsyncUnordered.in"); + public readonly Outlet Out = new Outlet("SelectAsyncUnordered.out"); public SelectAsyncUnordered(int parallelism, Func> mapFunc) { @@ -1548,6 +1552,7 @@ private sealed class Logic : TimerGraphStageLogic // AND // - timer fired OR group is full private bool _groupClosed; + private bool _groupEmitted; private bool _finished; private int _elements; @@ -1563,17 +1568,17 @@ public Logic(GroupedWithin stage) : base(stage.Shape) }, onUpstreamFinish: () => { _finished = true; - if (!_groupClosed && _elements > 0) - CloseGroup(); - else + if (_groupEmitted) CompleteStage(); - }, onUpstreamFailure: FailStage); + else + CloseGroup(); + }); SetHandler(_stage._out, onPull: () => { if (_groupClosed) EmitGroup(); - }, onDownstreamFinish: CompleteStage); + }); } public override void PreStart() @@ -1584,6 +1589,7 @@ public override void PreStart() private void NextElement(T element) { + _groupEmitted = false; _buffer.Add(element); _elements++; if (_elements == _stage._count) @@ -1604,6 +1610,7 @@ private void CloseGroup() private void EmitGroup() { + _groupEmitted = true; Push(_stage._out, _buffer); _buffer = new List(); if (!_finished) @@ -1890,27 +1897,32 @@ private sealed class Logic : GraphStageLogic public Logic(Sum stage) : base(stage.Shape) { - var rest = new LambdaInHandler(onPush: () => - { - _aggregator = stage._reduce(_aggregator, Grab(stage.Inlet)); - Pull(stage.Inlet); - }, onUpstreamFinish: () => - { - Push(stage.Outlet, _aggregator); - CompleteStage(); - }); + var rest = new LambdaInHandler( + onPush: () => + { + _aggregator = stage._reduce(_aggregator, Grab(stage.Inlet)); + Pull(stage.Inlet); + }, + onUpstreamFinish: () => + { + Push(stage.Outlet, _aggregator); + CompleteStage(); + }); - SetHandler(stage.Inlet, onPush: () => - { - _aggregator = Grab(stage.Inlet); - Pull(stage.Inlet); - SetHandler(stage.Inlet, rest); - }); + // Initial input handler + SetHandler(stage.Inlet, + onPush: () => + { + _aggregator = Grab(stage.Inlet); + Pull(stage.Inlet); + SetHandler(stage.Inlet, rest); + }, + onUpstreamFinish: () => FailStage(new NoSuchElementException("sum over empty stream"))); SetHandler(stage.Outlet, onPull: () => Pull(stage.Inlet)); } - public override string ToString() => $"Reduce.Logic(aggregator={_aggregator}"; + public override string ToString() => $"Sum.Logic(aggregator={_aggregator}"; } #endregion diff --git a/src/core/Akka.Streams/Implementation/IO/TcpStages.cs b/src/core/Akka.Streams/Implementation/IO/TcpStages.cs index 2fa4f8cec07..36dab5af4e7 100644 --- a/src/core/Akka.Streams/Implementation/IO/TcpStages.cs +++ b/src/core/Akka.Streams/Implementation/IO/TcpStages.cs @@ -107,10 +107,11 @@ private void Receive(Tuple args) if (IsAvailable(_stage._out)) _listener.Tell(new Tcp.ResumeAccepting(1), StageActorRef); - var target = StageActorRef; + var thisStage = StageActorRef; _bindingPromise.TrySetResult(new StreamTcp.ServerBinding(bound.LocalAddress, () => { - target.Tell(Tcp.Unbind.Instance, StageActorRef); + // Beware, sender must be explicit since stageActor.ref will be invalid to access after the stage stopped + thisStage.Tell(Tcp.Unbind.Instance, thisStage); return _unbindPromise.Task; })); }) diff --git a/src/core/Akka.Streams/Implementation/StreamLayout.cs b/src/core/Akka.Streams/Implementation/StreamLayout.cs index 57e1a5fbf17..3a30e61442e 100644 --- a/src/core/Akka.Streams/Implementation/StreamLayout.cs +++ b/src/core/Akka.Streams/Implementation/StreamLayout.cs @@ -1641,8 +1641,9 @@ private object ResolveMaterialized(StreamLayout.IMaterializedValueNode node, IDi protected void AssignPort(InPort inPort, object subscriberOrVirtual) { Subscribers[inPort] = subscriberOrVirtual; + // Interface (unconnected) ports of the current scope will be wired when exiting the scope - if (!CurrentLayout.InPorts.Contains(inPort)) + if (CurrentLayout.Upstreams.ContainsKey(inPort)) { IUntypedPublisher publisher; if (Publishers.TryGetValue(CurrentLayout.Upstreams[inPort], out publisher)) @@ -1655,7 +1656,7 @@ protected void AssignPort(OutPort outPort, IUntypedPublisher publisher) { Publishers[outPort] = publisher; // Interface (unconnected) ports of the current scope will be wired when exiting the scope - if (!CurrentLayout.OutPorts.Contains(outPort)) + if (CurrentLayout.Downstreams.ContainsKey(outPort)) { object subscriber; if (Subscribers.TryGetValue(CurrentLayout.Downstreams[outPort], out subscriber))