diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt index a1bfdf964d4..e8680bd6602 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt @@ -1220,8 +1220,6 @@ namespace Akka.Streams.Dsl where TShape2 : Akka.Streams.Shape where TShape3 : Akka.Streams.Shape where TShape4 : Akka.Streams.Shape { } - public static Akka.Streams.IGraph CreateMaterialized(System.Func, TShape> buildBlock) - where TShape : Akka.Streams.Shape { } public sealed class Builder { public Akka.Streams.Outlet MaterializedValue { get; } @@ -1638,6 +1636,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.SubFlow Prepend(this Akka.Streams.Dsl.SubFlow flow, Akka.Streams.IGraph, TMat> that) where TOut1 : TOut2 { } public static Akka.Streams.Dsl.SubFlow, TMat, TClosed> Recover(this Akka.Streams.Dsl.SubFlow flow, System.Func> partialFunc) { } + [System.ObsoleteAttribute("Use RecoverWithRetries instead.")] public static Akka.Streams.Dsl.SubFlow RecoverWith(this Akka.Streams.Dsl.SubFlow flow, System.Func, TMat>> partialFunc) { } public static Akka.Streams.Dsl.SubFlow RecoverWithRetries(this Akka.Streams.Dsl.SubFlow flow, System.Func, TMat>> partialFunc, int attempts) { } public static Akka.Streams.Dsl.SubFlow Scan(this Akka.Streams.Dsl.SubFlow flow, TOut2 zero, System.Func scan) { } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowDelaySpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowDelaySpec.cs index 3aa1ccff47f..33505a4386f 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowDelaySpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowDelaySpec.cs @@ -8,9 +8,11 @@ using System; using System.Collections.Generic; using System.Linq; +using Akka.Actor; using Akka.Streams.Dsl; using Akka.Streams.TestKit; using Akka.Streams.TestKit.Tests; +using Akka.Streams.Tests.Actor; using Akka.TestKit; using Akka.Util.Internal; using FluentAssertions; @@ -155,6 +157,7 @@ public void A_Delay_must_pass_elements_with_delay_through_normally_in_backpressu { Source.From(Enumerable.Range(1, 3)) .Delay(TimeSpan.FromMilliseconds(300), DelayOverflowStrategy.Backpressure) + .WithAttributes(Attributes.CreateInputBuffer(1,1)) .RunWith(this.SinkProbe(), Materializer) .Request(5) .ExpectNoMsg(TimeSpan.FromMilliseconds(200)) @@ -208,5 +211,36 @@ public void A_Delay_must_emit_early_when_buffer_is_full_and_in_EmitEarly_mode() pSub.SendError(new Exception()); }, Materializer); } + + [Fact] + public void A_Delay_must_properly_delay_according_to_buffer_size() + { + // With a buffer size of 1, delays add up + var task = Source.From(Enumerable.Range(1, 5)) + .Delay(TimeSpan.FromMilliseconds(500), DelayOverflowStrategy.Backpressure) + .WithAttributes(Attributes.CreateInputBuffer(1, 1)) + .RunWith(Sink.Ignore(), Materializer); + + task.Wait(TimeSpan.FromSeconds(2)).ShouldBeFalse(); + task.Wait(TimeSpan.FromSeconds(1)).ShouldBeTrue(); + + // With a buffer large enough to hold all arriving elements, delays don't add up + task = Source.From(Enumerable.Range(1, 100)) + .Delay(TimeSpan.FromSeconds(1), DelayOverflowStrategy.Backpressure) + .WithAttributes(Attributes.CreateInputBuffer(100, 100)) + .RunWith(Sink.Ignore(), Materializer); + + task.Wait(TimeSpan.FromSeconds(2)).ShouldBeTrue(); + + // Delays that are already present are preserved when buffer is large enough + task = Source.Tick(TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(100), NotUsed.Instance) + .Take(10) + .Delay(TimeSpan.FromSeconds(1), DelayOverflowStrategy.Backpressure) + .WithAttributes(Attributes.CreateInputBuffer(10, 10)) + .RunWith(Sink.Ignore(), Materializer); + + task.Wait(TimeSpan.FromMilliseconds(900)).ShouldBeFalse(); + task.Wait(TimeSpan.FromSeconds(1)).ShouldBeTrue(); + } } } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs index 7643acbec80..8a7aa74babe 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs @@ -9,12 +9,16 @@ using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; +using System.Threading.Tasks; +using Akka.IO; using Akka.Streams.Dsl; using Akka.Streams.Dsl.Internal; +using Akka.Streams.Implementation; using Akka.Streams.Supervision; using Akka.Streams.TestKit; using Akka.Streams.TestKit.Tests; using Akka.TestKit; +using Akka.Util; using FluentAssertions; using Reactive.Streams; using Xunit; @@ -29,7 +33,7 @@ public class FlowGroupBySpec : AkkaSpec public FlowGroupBySpec(ITestOutputHelper helper) : base(helper) { - var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(2, 2); + var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(2, 2); Materializer = ActorMaterializer.Create(Sys, settings); } @@ -66,8 +70,8 @@ private void WithSubstreamsSupport(int groupCount = 2, int elementCount = 6, int var max = maxSubstream > 0 ? maxSubstream : groupCount; var groupStream = Source.FromPublisher(source) - .GroupBy(max, x => x%groupCount) - .Lift(x => x%groupCount) + .GroupBy(max, x => x % groupCount) + .Lift(x => x % groupCount) .RunWith(Sink.AsPublisher>>(false), Materializer); var masterSubscriber = this.CreateManualSubscriberProbe>>(); @@ -77,12 +81,19 @@ private void WithSubstreamsSupport(int groupCount = 2, int elementCount = 6, int run?.Invoke(masterSubscriber, masterSubscription, expectedKey => { masterSubscription.Request(1); - var tuple = masterSubscriber.ExpectNext(); + var tuple = masterSubscriber.ExpectNext(); tuple.Item1.Should().Be(expectedKey); return tuple.Item2; }); } + private ByteString RandomByteString(int size) + { + var a = new byte[size]; + ThreadLocalRandom.Current.NextBytes(a); + return ByteString.Create(a); + } + [Fact] public void GroupBy_must_work_in_the_happy_case() { @@ -127,17 +138,31 @@ public void GroupBy_must_work_in_the_happy_case() [Fact] public void GroupBy_must_work_in_normal_user_scenario() { - var source = Source.From(new[] {"Aaa", "Abb", "Bcc", "Cdd", "Cee"}) + var source = Source.From(new[] { "Aaa", "Abb", "Bcc", "Cdd", "Cee" }) .GroupBy(3, s => s.Substring(0, 1)) .Grouped(10) .MergeSubstreams() .Grouped(10); var task = - ((Source>, NotUsed>) source).RunWith( + ((Source>, NotUsed>)source).RunWith( Sink.First>>(), Materializer); task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); task.Result.OrderBy(e => e.First()) - .ShouldBeEquivalentTo(new[] {new[] {"Aaa", "Abb"}, new[] {"Bcc"}, new[] {"Cdd", "Cee"}}); + .ShouldBeEquivalentTo(new[] { new[] { "Aaa", "Abb" }, new[] { "Bcc" }, new[] { "Cdd", "Cee" } }); + } + + [Fact] + public void GroupBy_must_fail_when_key_function_returns_null() + { + var source = (Source, NotUsed>)Source.From(new[] { "Aaa", "Abb", "Bcc", "Cdd", "Cee" }) + .GroupBy(3, s => s.StartsWith("A") ? null : s.Substring(0, 1)) + .Grouped(10) + .MergeSubstreams(); + var down = source.RunWith(this.SinkProbe>(), Materializer); + down.Request(1); + var ex = down.ExpectError(); + ex.Message.Should().Contain("Key cannot be null"); + ex.Should().BeOfType(); } [Fact] @@ -149,7 +174,7 @@ public void GroupBy_must_support_cancelling_substreams() { new StreamPuppet(getSubFlow(1).RunWith(Sink.AsPublisher(false), Materializer), this).Cancel(); var substream = new StreamPuppet(getSubFlow(0).RunWith(Sink.AsPublisher(false), Materializer), this); - + substream.Request(2); substream.ExpectNext(2); substream.ExpectNext(4); @@ -173,8 +198,8 @@ public void GroupBy_must_accept_cancellation_of_master_stream_when_not_consume_a var publisherProbe = this.CreateManualPublisherProbe(); var publisher = Source.FromPublisher(publisherProbe) - .GroupBy(2, x => x%2) - .Lift(x => x%2) + .GroupBy(2, x => x % 2) + .Lift(x => x % 2) .RunWith(Sink.AsPublisher>>(false), Materializer); var subscriber = this.CreateManualSubscriberProbe>>(); publisher.Subscribe(subscriber); @@ -193,8 +218,8 @@ public void GroupBy_must_work_with_empty_input_stream() { var publisher = Source.From(new List()) - .GroupBy(2, x => x%2) - .Lift(x => x%2) + .GroupBy(2, x => x % 2) + .Lift(x => x % 2) .RunWith(Sink.AsPublisher>>(false), Materializer); var subscriber = this.CreateManualSubscriberProbe>>(); publisher.Subscribe(subscriber); @@ -270,9 +295,9 @@ public void GroupBy_must_fail_stream_when_GroupBy_function_throws() { if (i == 2) throw ex; - return i%2; + return i % 2; }) - .Lift(x => x%2) + .Lift(x => x % 2) .RunWith(Sink.AsPublisher>>(false), Materializer); @@ -309,9 +334,9 @@ public void GroupBy_must_resume_stream_when_GroupBy_function_throws() { if (i == 2) throw ex; - return i%2; + return i % 2; }) - .Lift(x => x%2) + .Lift(x => x % 2) .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) .RunWith(Sink.AsPublisher>>(false), Materializer); @@ -362,10 +387,10 @@ public void GroupBy_must_pass_along_early_cancellation() var flowSubscriber = Source.AsSubscriber() - .GroupBy(2, x => x%2) - .Lift(x => x%2) + .GroupBy(2, x => x % 2) + .Lift(x => x % 2) .To(Sink.FromSubscriber(down)).Run(Materializer); - + var downstream = down.ExpectSubscription(); downstream.Cancel(); up.Subscribe(flowSubscriber); @@ -379,8 +404,8 @@ public void GroupBy_must_fail_when_exceeding_maxSubstreams() { this.AssertAllStagesStopped(() => { - var f = Flow.Create().GroupBy(1, x => x%2).PrefixAndTail(0).MergeSubstreams(); - var t = ((Flow, Source>, NotUsed>) f) + var f = Flow.Create().GroupBy(1, x => x % 2).PrefixAndTail(0).MergeSubstreams(); + var t = ((Flow, Source>, NotUsed>)f) .RunWith(this.SourceProbe(), this.SinkProbe, Source>>(), Materializer); var up = t.Item1; var down = t.Item2; @@ -400,5 +425,231 @@ public void GroupBy_must_fail_when_exceeding_maxSubstreams() s1.ExpectError(ex); }, Materializer); } + + [Fact] + public void GroupBy_must_emit_subscribe_before_completed() + { + this.AssertAllStagesStopped(() => + { + var source = (Source, NotUsed>)Source.Single(0).GroupBy(1, _ => "all") + .PrefixAndTail(0) + .Select(t => t.Item2) + .ConcatSubstream(); + var futureGroupSource = source.RunWith(Sink.First>(), Materializer); + + var publisher = futureGroupSource.AwaitResult().RunWith(Sink.AsPublisher(false), Materializer); + var probe = this.CreateSubscriberProbe(); + publisher.Subscribe(probe); + var subscription = probe.ExpectSubscription(); + subscription.Request(1); + probe.ExpectNext(0); + probe.ExpectComplete(); + }, Materializer); + } + + [Fact] + public void GroupBy_must_work_under_fuzzing_stress_test() + { + this.AssertAllStagesStopped(() => + { + var publisherProbe = this.CreateManualPublisherProbe(); + var subscriber = this.CreateManualSubscriberProbe>(); + + var firstGroup = (Source, NotUsed>)Source.FromPublisher(publisherProbe) + .GroupBy(256, element => element.Head) + .Select(b => b.Reverse()) + .MergeSubstreams(); + var secondGroup = (Source, NotUsed>)firstGroup.GroupBy(256, bytes => bytes.First()) + .Select(b => b.Reverse()) + .MergeSubstreams(); + var publisher = secondGroup.RunWith(Sink.AsPublisher>(false), Materializer); + publisher.Subscribe(subscriber); + + var upstreamSubscription = publisherProbe.ExpectSubscription(); + var downstreamSubscription = subscriber.ExpectSubscription(); + + downstreamSubscription.Request(300); + for (var i = 1; i <= 300; i++) + { + var byteString = RandomByteString(10); + upstreamSubscription.ExpectRequest(); + upstreamSubscription.SendNext(byteString); + subscriber.ExpectNext().ShouldBeEquivalentTo(byteString); + } + + upstreamSubscription.SendComplete(); + }, Materializer); + } + + [Fact] + public void GroupBy_must_work_with_random_demand() + { + this.AssertAllStagesStopped(() => + { + var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(1, 1); + var materializer = Sys.Materializer(settings); + + var props = new RandomDemandProperties + { + Kit = this + }; + Enumerable.Range(0, 100) + .ToList() + .ForEach(_ => props.Probes.Add(new TaskCompletionSource>())); + + var map = new Dictionary(); + + var publisherProbe = this.CreateManualPublisherProbe(); + var probeShape = new SinkShape(new Inlet("ProbeSink.in")); + var probeSink = new ProbeSink(probeShape, props, Attributes.None); + Source.FromPublisher(publisherProbe) + .GroupBy(100, element => Math.Abs(element.Head % 100)) + .To(new Sink>(probeSink)) + .Run(materializer); + + var upstreamSubscription = publisherProbe.ExpectSubscription(); + + for (var i = 1; i <= 400; i++) + { + var byteString = RandomByteString(10); + var index = Math.Abs(byteString.Head % 100); + + upstreamSubscription.ExpectRequest(); + upstreamSubscription.SendNext(byteString); + + if (!map.ContainsKey(index)) + { + var probe = props.Probes[props.PropesReaderTop].Task.AwaitResult(); + props.PropesReaderTop++; + map[index] = new SubFlowState(probe, false, byteString); + //stream automatically requests next element + } + else + { + var state = map[index]; + if (state.FirstElement != null) //first element in subFlow + { + if (!state.HasDemand) + props.BlockingNextElement = byteString; + RandomDemand(map, props); + } + else if (state.HasDemand) + { + if (props.BlockingNextElement == null) + { + state.Probe.ExpectNext().ShouldBeEquivalentTo(byteString); + map[index] = new SubFlowState(state.Probe, false, null); + RandomDemand(map, props); + } + else + true.ShouldBeFalse("INVALID CASE"); + } + else + { + props.BlockingNextElement = byteString; + RandomDemand(map, props); + } + } + } + upstreamSubscription.SendComplete(); + }, Materializer); + } + + private sealed class SubFlowState + { + public SubFlowState(TestSubscriber.Probe probe, bool hasDemand, ByteString firstElement) + { + Probe = probe; + HasDemand = hasDemand; + FirstElement = firstElement; + } + + public TestSubscriber.Probe Probe { get; } + + public bool HasDemand { get; } + + public ByteString FirstElement { get; } + } + + private sealed class ProbeSink : SinkModule> + { + private readonly RandomDemandProperties _properties; + + public ProbeSink(SinkShape shape, RandomDemandProperties properties, Attributes attributes) : base(shape) + { + _properties = properties; + Attributes = attributes; + } + + public override Attributes Attributes { get; } + + public override IModule WithAttributes(Attributes attributes) + { + return new ProbeSink(AmendShape(attributes), _properties, attributes); + } + + protected override SinkModule> NewInstance(SinkShape shape) + { + return new ProbeSink(shape, _properties, Attributes); + } + + public override object Create(MaterializationContext context, out TestSubscriber.Probe materializer) + { + var promise = _properties.Probes[_properties.ProbesWriterTop]; + var probe = TestSubscriber.CreateSubscriberProbe(_properties.Kit); + promise.SetResult(probe); + _properties.ProbesWriterTop++; + materializer = probe; + return probe; + } + } + + private sealed class RandomDemandProperties + { + public TestKitBase Kit { get; set; } + + public int ProbesWriterTop { get; set; } + + public int PropesReaderTop { get; set; } + + public List>> Probes { get; } = + new List>>(100); + + public ByteString BlockingNextElement { get; set; } + } + + private void RandomDemand(Dictionary map, RandomDemandProperties props) + { + while (true) + { + + var nextIndex = ThreadLocalRandom.Current.Next(0, map.Count); + var key = map.Keys.ToArray()[nextIndex]; + if (!map[key].HasDemand) + { + var state = map[key]; + map[key] = new SubFlowState(state.Probe, true, state.FirstElement); + + state.Probe.Request(1); + + // need to verify elements that are first element in subFlow or is in nextElement buffer before + // pushing next element from upstream + if (state.FirstElement != null) + { + state.Probe.ExpectNext().ShouldBeEquivalentTo(state.FirstElement); + map[key] = new SubFlowState(state.Probe, false, null); + } + else if (props.BlockingNextElement != null && Math.Abs(props.BlockingNextElement.Head % 100) == key) + { + state.Probe.ExpectNext().ShouldBeEquivalentTo(props.BlockingNextElement); + props.BlockingNextElement = null; + map[key] = new SubFlowState(state.Probe, false, null); + } + else if (props.BlockingNextElement == null) + break; + } + } + + } } } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowRecoverWithSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowRecoverWithSpec.cs index e37664eb9d9..d90fe716424 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowRecoverWithSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowRecoverWithSpec.cs @@ -251,15 +251,32 @@ public void A_RecoverWith_must_terminate_with_exception_after_set_number_of_retr throw new IndexOutOfRangeException(); return x; }) - .RecoverWithRetries(_ => Source.From(new[] {11, 22}).Concat(Source.Failed(Ex)), 3) + .RecoverWithRetries(_ => Source.From(new[] {11, 22, 33}) + .Select(x => + { + if (x == 33) + throw Ex; + return x; + }), 3) .RunWith(this.SinkProbe(), Materializer); - probe.Request(2).ExpectNext(1,2); + probe.Request(2).ExpectNext(1, 2); probe.Request(2).ExpectNext(11, 22); probe.Request(2).ExpectNext(11, 22); probe.Request(2).ExpectNext(11, 22); probe.Request(1).ExpectError().Should().Be(Ex); }, Materializer); } + + [Fact] + public void A_RecoverWith_must_throw_ArgumentException_if_number_of_retries_is_less_than_minus_one() + { + this.AssertAllStagesStopped(() => + { + Flow.Create() + .Invoking(f => f.RecoverWithRetries(exception => Source.Empty(), -2)) + .ShouldThrow(); + }, Materializer); + } } } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs index 1bc9d430673..2e3897da69e 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs @@ -300,18 +300,13 @@ public void A_Flow_with_SelectAsync_must_handle_cancel_properly() [Fact] public void A_Flow_with_SelectAsync_must_not_run_more_futures_than_configured() { - // TODO: 9/14/2016: Mono 4.4.2 blows up the XUnit test runner upon calling Thread.Interrupt below (@Aaronontheweb) - // SEE: https://github.com/xunit/xunit/issues/979 - if (IsMono) - return; - this.AssertAllStagesStopped(() => { const int parallelism = 8; var counter = new AtomicCounter(); var queue = new BlockingQueue, long>>(); - - var timer = new Thread(() => + var cancellation = new CancellationTokenSource(); + Task.Run(() => { var delay = 500; // 50000 nanoseconds var count = 0; @@ -320,7 +315,7 @@ public void A_Flow_with_SelectAsync_must_not_run_more_futures_than_configured() { try { - var t = queue.Take(CancellationToken.None); + var t = queue.Take(cancellation.Token); var promise = t.Item1; var enqueued = t.Item2; var wakeup = enqueued + delay; @@ -334,9 +329,7 @@ public void A_Flow_with_SelectAsync_must_not_run_more_futures_than_configured() cont = false; } } - }); - - timer.Start(); + }, cancellation.Token); Func> deferred = () => { @@ -356,12 +349,11 @@ public void A_Flow_with_SelectAsync_must_not_run_more_futures_than_configured() .SelectAsync(parallelism, _ => deferred()) .RunAggregate(0, (c, _) => c + 1, Materializer); - task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); - task.Result.Should().Be(n); + task.AwaitResult().Should().Be(n); } finally { - timer.Interrupt(); + cancellation.Cancel(false); } }, Materializer); } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncUnorderedSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncUnorderedSpec.cs index 580a21cfaf9..e13a89c7800 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncUnorderedSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncUnorderedSpec.cs @@ -294,8 +294,9 @@ public void A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_conf const int parallelism = 8; var counter = new AtomicCounter(); var queue = new BlockingQueue, long>>(); + var cancellation = new CancellationTokenSource(); - var timer = new Thread(() => + Task.Run(() => { var delay = 500; // 50000 nanoseconds var count = 0; @@ -304,7 +305,7 @@ public void A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_conf { try { - var t = queue.Take(CancellationToken.None); + var t = queue.Take(cancellation.Token); var promise = t.Item1; var enqueued = t.Item2; var wakeup = enqueued + delay; @@ -318,10 +319,8 @@ public void A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_conf cont = false; } } - }); - - timer.Start(); - + }, cancellation.Token); + Func> deferred = () => { var promise = new TaskCompletionSource(); @@ -344,7 +343,7 @@ public void A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_conf } finally { - timer.Interrupt(); + cancellation.Cancel(false); } }, Materializer); } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowThrottleSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowThrottleSpec.cs index ad7fcefc307..093b4954a25 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowThrottleSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowThrottleSpec.cs @@ -183,7 +183,7 @@ public void Throttle_for_single_cost_elements_must_send_elements_downstream_as_s .Throttle(2, TimeSpan.FromMilliseconds(750), 0, ThrottleMode.Shaping) .RunWith(this.SinkProbe(), Materializer); probe.Request(5); - var result = probe.ReceiveWhile(TimeSpan.FromMilliseconds(950), filter: x => x); + var result = probe.ReceiveWhile(TimeSpan.FromMilliseconds(900), filter: x => x); probe.ExpectNoMsg(TimeSpan.FromMilliseconds(150)) .ExpectNext(3) .ExpectNoMsg(TimeSpan.FromMilliseconds(150)) diff --git a/src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs index 83637227009..f881fa942e3 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs @@ -12,6 +12,7 @@ using Akka.IO; using Akka.Streams.Dsl; using Akka.Streams.Stage; +using Akka.Streams.TestKit.Tests; using Akka.TestKit; using Akka.Util; using FluentAssertions; @@ -122,6 +123,13 @@ public void Delimiter_bytes_based_framing_must_respect_maximum_line_settings() .Limit(100) .RunWith(Sink.Seq(), Materializer); task2.Invoking(t => t.Wait(TimeSpan.FromSeconds(3))).ShouldThrow(); + + var task3 = + Source.Single(ByteString.FromString("aaa")) + .Via(SimpleLines("\n", 2)) + .Limit(100) + .RunWith(Sink.Seq(), Materializer); + task3.Invoking(t => t.Wait(TimeSpan.FromSeconds(3))).ShouldThrow(); } [Fact] @@ -157,8 +165,7 @@ public void Delimiter_bytes_based_framing_must_allow_truncated_frames_if_configu .Grouped(1000) .RunWith(Sink.First>(), Materializer); - task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); - task.Result.Should().ContainSingle(s => s.Equals("I have no end")); + task.AwaitResult().Should().ContainSingle(s => s.Equals("I have no end")); } private static string RandomString(int length) diff --git a/src/core/Akka.Streams.Tests/Dsl/GraphMatValueSpec.cs b/src/core/Akka.Streams.Tests/Dsl/GraphMatValueSpec.cs index 31cce03b93f..eb7190c54eb 100644 --- a/src/core/Akka.Streams.Tests/Dsl/GraphMatValueSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/GraphMatValueSpec.cs @@ -14,6 +14,7 @@ using FluentAssertions; using Xunit; using Xunit.Abstractions; +using Akka.Actor; // ReSharper disable InvokeAsExtensionMethod namespace Akka.Streams.Tests.Dsl @@ -245,6 +246,60 @@ public void A_Graph_with_materialized_value_must_produce_NotUsed_when_starting_f done.Should().BeTrue(); } + [Theory] + [InlineData(true)] + [InlineData(false)] + public void A_Graph_with_materialized_value_must_ignore_materialized_values_for_a_graph_with_no_materialized_values_exposed(bool autoFusing) + { + // The bug here was that the default behavior for "compose" in Module is Keep.left, but + // EmptyModule.compose broke this by always returning the other module intact, which means + // that the materialized value was that of the other module (which is inconsistent with Keep.left behavior). + + var sink = Sink.Ignore(); + + var g = RunnableGraph.FromGraph(GraphDsl.Create(b => + { + var s = b.Add(sink); + var source = b.Add(Source.From(Enumerable.Range(1, 3))); + var flow = b.Add(Flow.Create()); + + b.From(source).Via(flow).To(s); + return ClosedShape.Instance; + })); + + var result = g.Run(CreateMaterializer(autoFusing)); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public void A_Graph_with_materialized_value_must_ignore_materialized_values_for_a_graph_with_no_materialized_values_exposed_but_keep_side_effects(bool autoFusing) + { + // The bug here was that the default behavior for "compose" in Module is Keep.left, but + // EmptyModule.compose broke this by always returning the other module intact, which means + // that the materialized value was that of the other module (which is inconsistent with Keep.left behavior). + + var sink = Sink.Ignore().MapMaterializedValue(_=> + { + TestActor.Tell("side effect!"); + return _; + }); + + var g = RunnableGraph.FromGraph(GraphDsl.Create(b => + { + var s = b.Add(sink); + var source = b.Add(Source.From(Enumerable.Range(1, 3))); + var flow = b.Add(Flow.Create()); + + b.From(source).Via(flow).To(s); + return ClosedShape.Instance; + })); + + var result = g.Run(CreateMaterializer(autoFusing)); + + ExpectMsg("side effect!"); + } + [Fact] public void A_Graph_with_Identity_Flow_optimization_even_if_port_are_wired_in_an_arbitrary_higher_nesting_level() { diff --git a/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs b/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs index 3d8014fc2f1..d77f17a6362 100644 --- a/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs @@ -151,16 +151,22 @@ public void FileSource_should_complete_only_when_all_contents_of_a_file_have_bee } [Fact] - public void FileSource_should_onError_when_trying_to_read_from_file_which_does_not_exist() + public void FileSource_should_onError_with_failure_and_return_a_failed_IOResult_when_trying_to_read_from_file_which_does_not_exist() { this.AssertAllStagesStopped(() => { - var p = FileIO.FromFile(NotExistingFile()).RunWith(Sink.AsPublisher(false), _materializer); + var t = FileIO.FromFile(NotExistingFile()) + .ToMaterialized(Sink.AsPublisher(false), Keep.Both) + .Run(_materializer); + var r = t.Item1; + var p = t.Item2; + var c = this.CreateManualSubscriberProbe(); p.Subscribe(c); c.ExpectSubscription(); c.ExpectError(); + r.AwaitResult(Dilated(TimeSpan.FromSeconds(3))).WasSuccessful.ShouldBeFalse(); }, _materializer); } diff --git a/src/core/Akka.Streams.Tests/IO/OutputStreamSourceSpec.cs b/src/core/Akka.Streams.Tests/IO/OutputStreamSourceSpec.cs index bccb758707d..04f330c09ad 100644 --- a/src/core/Akka.Streams.Tests/IO/OutputStreamSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/OutputStreamSourceSpec.cs @@ -28,7 +28,7 @@ namespace Akka.Streams.Tests.IO { public class OutputStreamSourceSpec : AkkaSpec { - private static readonly TimeSpan Timeout = TimeSpan.FromMilliseconds(300); + private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(3); private readonly ActorMaterializer _materializer; private readonly byte[] _bytesArray; diff --git a/src/core/Akka.Streams.Tests/IO/TcpSpec.cs b/src/core/Akka.Streams.Tests/IO/TcpSpec.cs index 0d9b068206a..ba0f138c531 100644 --- a/src/core/Akka.Streams.Tests/IO/TcpSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/TcpSpec.cs @@ -477,6 +477,8 @@ public void Outgoing_TCP_stream_must_not_thrown_on_unbind_after_system_has_been_ .BindAndHandle(Flow.Create(), mat2, address.Address.ToString(), address.Port); // Ensure server is running + bindingTask.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); + // and is possible to communicate with var t = Source.Single(ByteString.FromString("")) .Via(sys2.TcpStream().OutgoingConnection(address)) .RunWith(Sink.Ignore(), mat2); @@ -484,7 +486,6 @@ public void Outgoing_TCP_stream_must_not_thrown_on_unbind_after_system_has_been_ sys2.Terminate().Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); - bindingTask.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); var binding = bindingTask.Result; binding.Unbind().Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); } diff --git a/src/core/Akka.Streams.Tests/Implementation/Fusing/InterpreterSupervisionSpec.cs b/src/core/Akka.Streams.Tests/Implementation/Fusing/InterpreterSupervisionSpec.cs index 77b8d301874..0621b0a807f 100644 --- a/src/core/Akka.Streams.Tests/Implementation/Fusing/InterpreterSupervisionSpec.cs +++ b/src/core/Akka.Streams.Tests/Implementation/Fusing/InterpreterSupervisionSpec.cs @@ -165,100 +165,6 @@ public void Interpreter_error_handling_should_complete_after_resume_when_Map_thr }); } - [Fact] - public void Interpreter_error_handling_should_restart_when_OnPush_throws() - { - var stage = new RestartTestStage(onPush: (_, element, context) => - { - if (element <= 0) throw TE(); - return null; - }); - WithOneBoundedSetup(new IStage[] { - new Select(x => x + 1, resumingDecider), - stage, - new Select(x => x + 100, resumingDecider) - }, - (lastEvents, upstream, downstream) => - { - downstream.RequestOne(); - lastEvents().Should().BeEquivalentTo(new RequestOne()); - upstream.OnNext(2); - lastEvents().Should().BeEquivalentTo(new OnNext(103)); - - downstream.RequestOne(); - lastEvents().Should().BeEquivalentTo(new RequestOne()); - upstream.OnNext(-1); // boom - lastEvents().Should().BeEquivalentTo(new RequestOne()); - - upstream.OnNext(3); - lastEvents().Should().BeEquivalentTo(new OnNext(104)); - }); - } - - [Fact] - public void Interpreter_error_handling_should_restart_when_OnPush_throws_after_context_Push() - { - var stage = new RestartTestStage(onPush: (_, element, context) => - { - var result = context.Push(element); - if (element <= 0) throw TE(); - return result; - }); - WithOneBoundedSetup(new IStage[] { - new Select(x => x + 1, resumingDecider), - stage, - new Select(x => x + 100, resumingDecider) - }, - (lastEvents, upstream, downstream) => - { - downstream.RequestOne(); - lastEvents().Should().BeEquivalentTo(new RequestOne()); - upstream.OnNext(2); - lastEvents().Should().BeEquivalentTo(new OnNext(103)); - - downstream.RequestOne(); - lastEvents().Should().BeEquivalentTo(new RequestOne()); - upstream.OnNext(-1); // boom - // The element has been pushed before the exception, there is no way back - lastEvents().Should().BeEquivalentTo(new OnNext(100)); - - downstream.RequestOne(); - lastEvents().Should().BeEquivalentTo(new RequestOne()); - - upstream.OnNext(3); - lastEvents().Should().BeEquivalentTo(new OnNext(104)); - }); - } - - [Fact] - public void Interpreter_error_handling_should_fail_when_OnPull_throws() - { - var stage = new RestartTestStage(onPull: (stg, context) => - { - if (stg.Sum < 0) throw TE(); - return null; - }); - WithOneBoundedSetup(new IStage[] { - new Select(x => x + 1, resumingDecider), - stage, - new Select(x => x + 100, resumingDecider) - }, - (lastEvents, upstream, downstream) => - { - downstream.RequestOne(); - lastEvents().Should().BeEquivalentTo(new RequestOne()); - upstream.OnNext(2); - lastEvents().Should().BeEquivalentTo(new OnNext(103)); - - downstream.RequestOne(); - lastEvents().Should().BeEquivalentTo(new RequestOne()); - upstream.OnNext(-5); // this will trigger failure of next requestOne (pull) - lastEvents().Should().BeEquivalentTo(new OnNext(99)); - - downstream.RequestOne(); // boom - lastEvents().Should().BeEquivalentTo(new OnError(TE()), new Cancel()); - }); - } [Fact] public void Interpreter_error_handling_should_fail_when_Expand_seed_throws() @@ -310,142 +216,15 @@ public void Interpreter_error_handling_should_fail_when_Expand_extrapolate_throw }); } - [Fact] - public void Interpreter_error_handling_should_fail_when_OnPull_throws_before_pushing_all_generated_elements() - { - Action test = (decider, absorbTermination) => - { - WithOneBoundedSetup(new OneToManyTestStage(decider, absorbTermination), - (lastEvents, upstream, downstream) => - { - downstream.RequestOne(); - lastEvents().Should().BeEquivalentTo(new RequestOne()); - - upstream.OnNext(1); - lastEvents().Should().BeEquivalentTo(new OnNext(1)); - - if (absorbTermination) - { - upstream.OnComplete(); - lastEvents().Should().BeEmpty(); - } - - downstream.RequestOne(); - lastEvents().Should().BeEquivalentTo(new OnNext(2)); - - downstream.RequestOne(); - // 3 => boom - if (absorbTermination) - lastEvents().Should().BeEquivalentTo(new OnError(TE())); - else - lastEvents().Should().BeEquivalentTo(new OnError(TE()), new Cancel()); - }); - }; - - test(resumingDecider, false); - test(restartingDecider, false); - test(resumingDecider, true); - test(restartingDecider, true); - - } - private Exception TE() { return new TestException("TEST"); } - public class RestartTestStage : PushPullStage - { - public int Sum; - private readonly Func, ISyncDirective> _onPush; - private readonly Func, ISyncDirective> _onPull; - - public RestartTestStage(Func, ISyncDirective> onPush = null, Func, ISyncDirective> onPull = null) - { - _onPush = onPush; - _onPull = onPull; - } - - public override ISyncDirective OnPush(int element, IContext context) - { - var result = _onPush?.Invoke(this, element, context); - if (result != null) - return result; - Sum += element; - return context.Push(Sum); - } - - public override ISyncDirective OnPull(IContext context) - { - var result = _onPull?.Invoke(this, context); - if (result != null) - return result; - return context.Pull(); - } - - public override Directive Decide(Exception cause) - { - return Directive.Restart; - } - - public override IStage Restart() - { - Sum = 0; - return this; - } - } - private IEnumerator ContinuallyThrow() { Func thrower = () => { throw TE(); }; yield return thrower(); } - - public class OneToManyTestStage : PushPullStage - { - private readonly Decider _decider; - private readonly bool _absorbTermination; - private Queue _buffer; - - public OneToManyTestStage(Decider decider, bool absorbTermination) - { - _decider = decider; - _absorbTermination = absorbTermination; - _buffer = new Queue(); - } - - public override ISyncDirective OnPush(int element, IContext context) - { - _buffer = new Queue(new [] {element + 1, element + 2, element + 3}); - return context.Push(element); - } - - public override ISyncDirective OnPull(IContext context) - { - if (_buffer.Count == 0 && context.IsFinishing) - return context.Finish(); - if (_buffer.Count == 0) - return context.Pull(); - var element = _buffer.Dequeue(); - if (element == 3) throw new TestException("TEST"); - return context.Push(element); - } - - public override ITerminationDirective OnUpstreamFinish(IContext context) - { - return _absorbTermination ? context.AbsorbTermination() : context.Finish(); - } - - // note that resume will be turned into failure in the Interpreter if exception is thrown from OnPull - public override Directive Decide(Exception cause) - { - return _decider(cause); - } - - public override IStage Restart() - { - return new OneToManyTestStage(_decider, _absorbTermination); - } - } } } \ No newline at end of file diff --git a/src/core/Akka.Streams.Tests/Implementation/StreamLayoutSpec.cs b/src/core/Akka.Streams.Tests/Implementation/StreamLayoutSpec.cs index ce0c74e5019..01b5d77cbe1 100644 --- a/src/core/Akka.Streams.Tests/Implementation/StreamLayoutSpec.cs +++ b/src/core/Akka.Streams.Tests/Implementation/StreamLayoutSpec.cs @@ -11,6 +11,7 @@ using System.Linq; using Akka.Streams.Dsl; using Akka.Streams.Implementation; +using Akka.Streams.TestKit.Tests; using FluentAssertions; using Reactive.Streams; using Xunit; @@ -137,7 +138,8 @@ protected override object MaterializeAtomic(AtomicModule atomic, Attributes effe #endregion private const int TooDeepForStack = 5000; - + // Seen tests run in 9-10 seconds, these test cases are heavy on the GC + private static readonly TimeSpan VeryPatient = TimeSpan.FromSeconds(20); private readonly IMaterializer _materializer; private static TestAtomicModule TestStage() => new TestAtomicModule(1, 1); @@ -256,7 +258,7 @@ public void StreamLayout_should_not_fail_materialization_when_building_a_large_g var t = g.ToMaterialized(Sink.Seq(), Keep.Both).Run(_materializer); var materialized = t.Item1; - var result = t.Item2.Result; + var result = t.Item2.AwaitResult(VeryPatient); materialized.Should().Be(1); result.Count.Should().Be(1); @@ -271,7 +273,7 @@ public void StreamLayout_should_not_fail_materialization_when_building_a_large_g var t = g.RunWith(Source.Single(42).MapMaterializedValue(_ => 1), Sink.Seq(), _materializer); var materialized = t.Item1; - var result = t.Item2.Result; + var result = t.Item2.AwaitResult(VeryPatient); materialized.Should().Be(1); result.Count.Should().Be(1); @@ -286,7 +288,7 @@ public void StreamLayout_should_not_fail_materialization_when_building_a_large_g var t = g.ToMaterialized(Sink.Seq(), Keep.Both).Run(_materializer); var materialized = t.Item1; - var result = t.Item2.Result; + var result = t.Item2.AwaitResult(VeryPatient); materialized.Should().Be(1); result.Count.Should().Be(1); @@ -302,7 +304,7 @@ public void StreamLayout_should_not_fail_fusing_and_materialization_when_buildin var m = g.ToMaterialized(Sink.Seq(), Keep.Both); var t = m.Run(_materializer); var materialized = t.Item1; - var result = t.Item2.Result; + var result = t.Item2.AwaitResult(VeryPatient); materialized.Should().Be(1); result.Count.Should().Be(1); @@ -317,7 +319,7 @@ public void StreamLayout_should_not_fail_fusing_and_materialization_when_buildin var t = g.RunWith(Source.Single(42).MapMaterializedValue(_ => 1), Sink.Seq(), _materializer); var materialized = t.Item1; - var result = t.Item2.Result; + var result = t.Item2.AwaitResult(VeryPatient); materialized.Should().Be(1); result.Count.Should().Be(1); @@ -332,7 +334,7 @@ public void StreamLayout_should_not_fail_fusing_and_materialization_when_buildin var t = g.ToMaterialized(Sink.Seq(), Keep.Both).Run(_materializer); var materialized = t.Item1; - var result = t.Item2.Result; + var result = t.Item2.AwaitResult(VeryPatient); materialized.Should().Be(1); result.Count.Should().Be(1); diff --git a/src/core/Akka.Streams/Akka.Streams.csproj b/src/core/Akka.Streams/Akka.Streams.csproj index 75904832bb6..b14359994fb 100644 --- a/src/core/Akka.Streams/Akka.Streams.csproj +++ b/src/core/Akka.Streams/Akka.Streams.csproj @@ -109,7 +109,6 @@ - @@ -118,7 +117,6 @@ - @@ -133,7 +131,6 @@ - diff --git a/src/core/Akka.Streams/CodeGen/Dsl/GraphApply.cs b/src/core/Akka.Streams/CodeGen/Dsl/GraphApply.cs index 066078cc707..15189b5f37c 100644 --- a/src/core/Akka.Streams/CodeGen/Dsl/GraphApply.cs +++ b/src/core/Akka.Streams/CodeGen/Dsl/GraphApply.cs @@ -1,4 +1,4 @@ -// --- auto generated: 19.05.2016 16:27:08 --- // +// --- auto generated: 01.10.2016 20:12:05 --- // //----------------------------------------------------------------------- // // Copyright (C) 2015-2016 Lightbend Inc. @@ -15,19 +15,14 @@ public partial class GraphDsl /// /// Creates a new by passing a to the given create function. /// - public static IGraph CreateMaterialized(Func, TShape> buildBlock) where TShape: Shape + public static IGraph Create(Func, TShape> buildBlock) where TShape: Shape { - var builder = new Builder(); + var builder = new Builder(); var shape = buildBlock(builder); var module = builder.Module.ReplaceShape(shape); - return new GraphImpl(shape, module); + return new GraphImpl(shape, module); } - - /// - /// Creates a new by passing a to the given create function. - /// - public static IGraph Create(Func, TShape> buildBlock) where TShape : Shape => CreateMaterialized(buildBlock); /// /// Creates a new by importing the given graph @@ -38,7 +33,7 @@ public static IGraph Create(IGraph(); - var shape1 = builder.Add(g1); + var shape1 = builder.Add(g1, Keep.Right); var shape = buildBlock(builder, shape1); var module = builder.Module.ReplaceShape(shape); diff --git a/src/core/Akka.Streams/CodeGen/Dsl/GraphApply.tt b/src/core/Akka.Streams/CodeGen/Dsl/GraphApply.tt index 231aa08be11..f1c732be85c 100644 --- a/src/core/Akka.Streams/CodeGen/Dsl/GraphApply.tt +++ b/src/core/Akka.Streams/CodeGen/Dsl/GraphApply.tt @@ -39,7 +39,7 @@ namespace Akka.Streams.Dsl where TShape1: Shape { var builder = new Builder(); - var shape1 = builder.Add(g1); + var shape1 = builder.Add(g1, Keep.Right); var shape = buildBlock(builder, shape1); var module = builder.Module.ReplaceShape(shape); diff --git a/src/core/Akka.Streams/Dsl/Flow.cs b/src/core/Akka.Streams/Dsl/Flow.cs index 8a34b9dabcf..5dcf712603c 100644 --- a/src/core/Akka.Streams/Dsl/Flow.cs +++ b/src/core/Akka.Streams/Dsl/Flow.cs @@ -260,32 +260,6 @@ public IRunnableGraph JoinMaterialized(IGraph DeprecatedAndThen(StageModule op) - { - //No need to copy here, op is a fresh instance - return IsIdentity - ? new Flow(op) - : new Flow( - Module.Fuse(op, Shape.Outlet, op.In).ReplaceShape(new FlowShape(Shape.Inlet, op.Out))); - } - - internal Flow DeprecatedAndThenMaterialized(Func, TMat2>> factory) - { - var op = new DirectProcessor(() => - { - var t = factory(); - return Tuple.Create, object>(t.Item1, t.Item2); - }); - - if(IsIdentity) - return new Flow(op); - - return - new Flow( - Module.Fuse(op, Shape.Outlet, op.In , Keep.Right) - .ReplaceShape(new FlowShape(Shape.Inlet, op.Out))); - } /// /// Connect the to this and then connect it to the and run it. @@ -328,8 +302,8 @@ public static Flow FromProcessor(Func /// Creates a Flow from a Reactive Streams and returns a materialized value. /// - public static Flow FromProcessorMaterialized( - Func, TMat>> factory) => Create().DeprecatedAndThenMaterialized(factory); + public static Flow FromProcessorMaterialized(Func, TMat>> factory) + => new Flow(new ProcessorModule(factory)); /// /// Helper to create a without a or . diff --git a/src/core/Akka.Streams/Dsl/FlowOperations.cs b/src/core/Akka.Streams/Dsl/FlowOperations.cs index 3467d7f95e7..dff4be1a105 100644 --- a/src/core/Akka.Streams/Dsl/FlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/FlowOperations.cs @@ -70,7 +70,7 @@ public static Flow RecoverWith(this Flow /// RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after /// a failure has been recovered up to number of times so that each time there is a failure it is fed into the and a new - /// Source may be materialized. Note that if you pass in 0, this won't attempt to recover at all. Passing in a negative number will behave exactly the same as . + /// Source may be materialized. Note that if you pass in 0, this won't attempt to recover at all. Passing in -1 will behave exactly the same as . /// /// Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. /// This stage can recover the failure signal, but not the skipped elements, which will be dropped. @@ -86,6 +86,9 @@ public static Flow RecoverWith(this Flow /// Cancels when downstream cancels /// + /// Receives the failure cause and returns the new Source to be materialized if any + /// Maximum number of retries or -1 to retry indefinitely + /// if is a negative number other than -1 public static Flow RecoverWithRetries(this Flow flow, Func, TMat>> partialFunc, int attempts) { @@ -881,6 +884,8 @@ public static Flow, Source>, TMat /// If the group by throws an exception and the supervision decision /// is or /// the element is dropped and the stream and substreams continue. + /// + /// Function MUST NOT return null. This will throw exception and trigger supervision decision mechanism. /// /// Emits when an element for which the grouping function returns a group that has not yet been created. /// Emits the new group @@ -894,9 +899,7 @@ public static Flow, Source>, TMat public static SubFlow> GroupBy(this Flow flow, int maxSubstreams, Func groupingFunc) { return flow.GroupBy(maxSubstreams, groupingFunc, - (f, s) => ((Flow, TMat>) f).To(s), - (f, o) => ((Flow) f).DeprecatedAndThen(o) - ); + (f, s) => ((Flow, TMat>) f).To(s)); } /// diff --git a/src/core/Akka.Streams/Dsl/Framing.cs b/src/core/Akka.Streams/Dsl/Framing.cs index 9ca0b5f432f..9bdbbfa2828 100644 --- a/src/core/Akka.Streams/Dsl/Framing.cs +++ b/src/core/Akka.Streams/Dsl/Framing.cs @@ -6,8 +6,8 @@ //----------------------------------------------------------------------- using System; -using System.Linq; using Akka.IO; +using Akka.Streams.Implementation.Stages; using Akka.Streams.Stage; using Akka.Util; @@ -32,7 +32,7 @@ public static Flow Delimiter(ByteString delimit bool allowTruncation = false) { return Flow.Create() - .Transform(() => new DelimiterFramingStage(delimiter, maximumFrameLength, allowTruncation)) + .Via(new DelimiterFramingStage(delimiter, maximumFrameLength, allowTruncation)) .Named("DelimiterFraming"); } @@ -162,95 +162,142 @@ public override ISyncDirective OnPush(ByteString element, IContext c } } - private sealed class DelimiterFramingStage : PushPullStage + private sealed class DelimiterFramingStage : GraphStage> { - private readonly ByteString _separatorBytes; - private readonly int _maximumLineBytes; - private readonly bool _allowTruncation; - private readonly byte _firstSeperatorByte; - private ByteString _buffer; - private int _nextPossibleMatch; + #region Logic - public DelimiterFramingStage(ByteString separatorBytes, int maximumLineBytes, bool allowTruncation) + private sealed class Logic : GraphStageLogic { - _separatorBytes = separatorBytes; - _maximumLineBytes = maximumLineBytes; - _allowTruncation = allowTruncation; - _firstSeperatorByte = separatorBytes.Head; - _buffer = ByteString.Empty; - } + private readonly DelimiterFramingStage _stage; + private readonly byte _firstSeparatorByte; + private ByteString _buffer; + private int _nextPossibleMatch; - public override ISyncDirective OnPush(ByteString element, IContext context) - { - _buffer += element; - return DoParse(context); - } - - public override ISyncDirective OnPull(IContext context) => DoParse(context); + public Logic(DelimiterFramingStage stage) : base (stage.Shape) + { + _stage = stage; + _firstSeparatorByte = stage._separatorBytes.Head; + _buffer = ByteString.Empty; + + SetHandler(stage.In, onPush: () => + { + _buffer += Grab(stage.In); + DoParse(); + }, onUpstreamFinish: () => + { + if(_buffer.IsEmpty) + CompleteStage(); + else if (IsAvailable(stage.Out)) + DoParse(); + + // else swallow the termination and wait for pull + }); + + SetHandler(stage.Out, onPull: DoParse); + } - public override ITerminationDirective OnUpstreamFinish(IContext context) - { - return _buffer.NonEmpty ? context.AbsorbTermination() : context.Finish(); - } + private void TryPull() + { + if (IsClosed(_stage.In)) + { + if (_stage._allowTruncation) + { + Push(_stage.Out, _buffer); + CompleteStage(); + } + else + FailStage( + new FramingException( + "Stream finished but there was a truncated final frame in the buffer")); + } + else + Pull(_stage.In); + } - public override void PostStop() => _buffer = null; + private void DoParse() + { + var possibleMatchPosition = -1; - private ISyncDirective TryPull(IContext context) - { - if (!context.IsFinishing) - return context.Pull(); + for (var i = _nextPossibleMatch; i < _buffer.Count; i++) + { + if (_buffer[i] == _firstSeparatorByte) + { + possibleMatchPosition = i; + break; + } + } - return _allowTruncation - ? context.PushAndFinish(_buffer) - : context.Fail(new FramingException("Stream finished but there was a truncated final frame in the buffer")); + if (possibleMatchPosition > _stage._maximumLineBytes) + FailStage( + new FramingException( + $"Read {_buffer.Count} bytes which is more than {_stage._maximumLineBytes} without seeing a line terminator")); + else if (possibleMatchPosition == -1) + { + if (_buffer.Count > _stage._maximumLineBytes) + FailStage( + new FramingException( + $"Read {_buffer.Count} bytes which is more than {_stage._maximumLineBytes} without seeing a line terminator")); + else + { + // No matching character, we need to accumulate more bytes into the buffer + _nextPossibleMatch = _buffer.Count; + TryPull(); + } + } + else if (possibleMatchPosition + _stage._separatorBytes.Count > _buffer.Count) + { + // We have found a possible match (we found the first character of the terminator + // sequence) but we don't have yet enough bytes. We remember the position to + // retry from next time. + _nextPossibleMatch = possibleMatchPosition; + TryPull(); + } + else if (Equals(_buffer.Slice(possibleMatchPosition, possibleMatchPosition + _stage._separatorBytes.Count), _stage._separatorBytes)) + { + // Found a match + var parsedFrame = _buffer.Slice(0, possibleMatchPosition).Compact(); + _buffer = _buffer.Drop(possibleMatchPosition + _stage._separatorBytes.Count); + _nextPossibleMatch = 0; + Push(_stage.Out, parsedFrame); + + if (IsClosed(_stage.In) && _buffer.IsEmpty) + CompleteStage(); + } + else + { + // possibleMatchPos was not actually a match + _nextPossibleMatch++; + DoParse(); + } + } } - private ISyncDirective DoParse(IContext context) + #endregion + + private readonly ByteString _separatorBytes; + private readonly int _maximumLineBytes; + private readonly bool _allowTruncation; + + public DelimiterFramingStage(ByteString separatorBytes, int maximumLineBytes, bool allowTruncation) { - var possibleMatchPosition = -1; + _separatorBytes = separatorBytes; + _maximumLineBytes = maximumLineBytes; + _allowTruncation = allowTruncation; - for (var i = _nextPossibleMatch; i < _buffer.Count; i++) - { - if (_buffer[i] == _firstSeperatorByte) - { - possibleMatchPosition = i; - break; - } - } + Shape = new FlowShape(In, Out); + } + + private Inlet In = new Inlet("DelimiterFraming.in"); - if (possibleMatchPosition > _maximumLineBytes) - return context.Fail(new FramingException($"Read {_buffer.Count} bytes which is more than {_maximumLineBytes} without seeing a line terminator")); + private Outlet Out = new Outlet("DelimiterFraming.in"); - if (possibleMatchPosition == -1) - { - // No matching character, we need to accumulate more bytes into the buffer - _nextPossibleMatch = _buffer.Count; - return TryPull(context); - } + public override FlowShape Shape { get; } - if (possibleMatchPosition + _separatorBytes.Count > _buffer.Count) - { - // We have found a possible match (we found the first character of the terminator - // sequence) but we don't have yet enough bytes. We remember the position to - // retry from next time. - _nextPossibleMatch = possibleMatchPosition; - return TryPull(context); - } + protected override Attributes InitialAttributes { get; } = DefaultAttributes.DelimiterFraming; - if (_buffer.Slice(possibleMatchPosition, possibleMatchPosition + _separatorBytes.Count).SequenceEqual(_separatorBytes)) - { - // Found a match - var parsedFrame = _buffer.Slice(0, possibleMatchPosition).Compact(); - _buffer = _buffer.Drop(possibleMatchPosition + _separatorBytes.Count); - _nextPossibleMatch = 0; - if (context.IsFinishing && _buffer.IsEmpty) - return context.PushAndFinish(parsedFrame); - return context.Push(parsedFrame); - } + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this); - _nextPossibleMatch += 1; - return DoParse(context); - } + public override string ToString() => "DelimiterFraming"; } private sealed class LengthFieldFramingStage : PushPullStage diff --git a/src/core/Akka.Streams/Dsl/GraphDsl.cs b/src/core/Akka.Streams/Dsl/GraphDsl.cs index 63da61b00cc..919f011b720 100644 --- a/src/core/Akka.Streams/Dsl/GraphDsl.cs +++ b/src/core/Akka.Streams/Dsl/GraphDsl.cs @@ -38,7 +38,7 @@ internal TShape Add(IGraph graph, Func(copy.TransformMaterializedValue(transform), Keep.Right); return (TShape)graph.Shape.CopyFromPorts(copy.Shape.Inlets, copy.Shape.Outlets); } @@ -70,7 +70,7 @@ public TShape Add(IGraph graph) StreamLayout.Validate(graph.Module); var copy = graph.Module.CarbonCopy(); - _moduleInProgress = _moduleInProgress.Compose(copy); + _moduleInProgress = _moduleInProgress.Compose(copy, Keep.Left); return (TShape)graph.Shape.CopyFromPorts(copy.Shape.Inlets, copy.Shape.Outlets); } diff --git a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs index fe9f3aded24..5da9651c1b3 100644 --- a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs @@ -81,7 +81,7 @@ public static IFlow RecoverWith(this IFlow f /// /// RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after /// a failure has been recovered up to number of times so that each time there is a failure it is fed into the and a new - /// Source may be materialized. Note that if you pass in 0, this won't attempt to recover at all. Passing in a negative number will behave exactly the same as . + /// Source may be materialized. Note that if you pass in 0, this won't attempt to recover at all. Passing in -1 will behave exactly the same as . /// /// Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. /// This stage can recover the failure signal, but not the skipped elements, which will be dropped. @@ -97,6 +97,9 @@ public static IFlow RecoverWith(this IFlow f /// /// Cancels when downstream cancels /// + /// Receives the failure cause and returns the new Source to be materialized if any + /// Maximum number of retries or -1 to retry indefinitely + /// if is a negative number other than -1 public static IFlow RecoverWithRetries(this IFlow flow, Func, TMat>> partialFunc, int attempts) { @@ -907,6 +910,8 @@ public static IFlow, Source>, TMat> PrefixAn /// If the group by throws an exception and the supervision decision /// is or /// the element is dropped and the stream and substreams continue. + /// + /// Function MUST NOT return null. This will throw exception and trigger supervision decision mechanism. /// /// Emits when an element for which the grouping function returns a group that has not yet been created. /// Emits the new group @@ -921,15 +926,14 @@ public static SubFlow GroupBy( this IFlow flow, int maxSubstreams, Func groupingFunc, - Func, TMat>, Sink, Task>, TClosed> toFunc, - Func, StageModule>, IFlow, TMat>> deprecatedAndThenFunc) + Func, TMat>, Sink, Task>, TClosed> toFunc) { - var merge = new GroupByMergeBack(flow, deprecatedAndThenFunc, maxSubstreams, groupingFunc); + var merge = new GroupByMergeBack(flow, maxSubstreams, groupingFunc); Func, TClosed> finish = s => { return toFunc( - deprecatedAndThenFunc(flow, new GroupBy(maxSubstreams, groupingFunc)), + flow.Via(new Fusing.GroupBy(maxSubstreams, groupingFunc)), Sink.ForEach>(e => e.RunWith(s, Fusing.GraphInterpreter.Current.Materializer))); }; @@ -939,24 +943,21 @@ public static SubFlow GroupBy( internal class GroupByMergeBack : IMergeBack { private readonly IFlow _self; - private readonly Func, StageModule>, IFlow, TMat>> _deprecatedAndThenFunc; private readonly int _maxSubstreams; private readonly Func _groupingFunc; public GroupByMergeBack(IFlow self, - Func, StageModule>, IFlow, TMat>> deprecatedAndThenFunc, int maxSubstreams, Func groupingFunc) { _self = self; - _deprecatedAndThenFunc = deprecatedAndThenFunc; _maxSubstreams = maxSubstreams; _groupingFunc = groupingFunc; } public IFlow Apply(Flow flow, int breadth) { - return _deprecatedAndThenFunc(_self, new GroupBy(_maxSubstreams, o => _groupingFunc(o))) + return _self.Via(new Fusing.GroupBy(_maxSubstreams, _groupingFunc)) .Select(f => f.Via(flow)) .Via(new Fusing.FlattenMerge, T, NotUsed>(breadth)); } diff --git a/src/core/Akka.Streams/Dsl/Source.cs b/src/core/Akka.Streams/Dsl/Source.cs index 7805eea72c0..ec4f7ea97a1 100644 --- a/src/core/Akka.Streams/Dsl/Source.cs +++ b/src/core/Akka.Streams/Dsl/Source.cs @@ -182,14 +182,6 @@ IFlow IFlow.MapMaterializedValue(Func MapMaterializedValue(Func mapFunc) => new Source(Module.TransformMaterializedValue(mapFunc)); - internal Source DeprecatedAndThen(StageModule op) - { - //No need to copy here, op is a fresh instance - return new Source(Module - .Fuse(op, Shape.Outlet, op.In) - .ReplaceShape(new SourceShape(op.Out))); - } - /// /// Connect this to a and run it. The returned value is the materialized value /// of the , e.g. the of a . diff --git a/src/core/Akka.Streams/Dsl/SourceOperations.cs b/src/core/Akka.Streams/Dsl/SourceOperations.cs index 6dbbd61ae3c..4b35673e408 100644 --- a/src/core/Akka.Streams/Dsl/SourceOperations.cs +++ b/src/core/Akka.Streams/Dsl/SourceOperations.cs @@ -70,7 +70,7 @@ public static Source RecoverWith(this Source /// /// RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after /// a failure has been recovered up to number of times so that each time there is a failure it is fed into the and a new - /// Source may be materialized. Note that if you pass in 0, this won't attempt to recover at all. Passing in a negative number will behave exactly the same as . + /// Source may be materialized. Note that if you pass in 0, this won't attempt to recover at all. Passing in -1 will behave exactly the same as . /// /// Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. /// This stage can recover the failure signal, but not the skipped elements, which will be dropped. @@ -85,7 +85,10 @@ public static Source RecoverWith(this Source /// Completes when upstream completes or upstream failed with exception can handle /// /// Cancels when downstream cancels - /// + /// + /// /// Receives the failure cause and returns the new Source to be materialized if any + /// Maximum number of retries or -1 to retry indefinitely + /// if is a negative number other than -1 public static Source RecoverWithRetries(this Source flow, Func, TMat>> partialFunc, int attempts) { @@ -893,10 +896,7 @@ public static Source, Source>, TMat> P /// public static SubFlow> GroupBy(this Source flow, int maxSubstreams, Func groupingFunc) { - return flow.GroupBy(maxSubstreams, groupingFunc, - (f, s) => ((Source, TMat>) f).To(s), - (f, o) => ((Source) f).DeprecatedAndThen(o) - ); + return flow.GroupBy(maxSubstreams, groupingFunc, (f, s) => ((Source, TMat>) f).To(s)); } /// diff --git a/src/core/Akka.Streams/Dsl/SubFlowOperations.cs b/src/core/Akka.Streams/Dsl/SubFlowOperations.cs index 84a8d54fb5b..30190c27479 100644 --- a/src/core/Akka.Streams/Dsl/SubFlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/SubFlowOperations.cs @@ -60,6 +60,7 @@ public static SubFlow, TMat, TClosed> Recover( /// /// Cancels when downstream cancels /// + [Obsolete("Use RecoverWithRetries instead.")] public static SubFlow RecoverWith(this SubFlow flow, Func, TMat>> partialFunc) { @@ -69,7 +70,7 @@ public static SubFlow RecoverWith(this /// /// RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after /// a failure has been recovered up to number of times so that each time there is a failure it is fed into the and a new - /// Source may be materialized. Note that if you pass in 0, this won't attempt to recover at all. Passing in a negative number will behave exactly the same as . + /// Source may be materialized. Note that if you pass in 0, this won't attempt to recover at all. Passing in -1 will behave exactly the same as . /// /// Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. /// This stage can recover the failure signal, but not the skipped elements, which will be dropped. @@ -85,6 +86,9 @@ public static SubFlow RecoverWith(this /// /// Cancels when downstream cancels /// + /// Receives the failure cause and returns the new Source to be materialized if any + /// Maximum number of retries or -1 to retry indefinitely + /// if is a negative number other than -1 public static SubFlow RecoverWithRetries(this SubFlow flow, Func, TMat>> partialFunc, int attempts) { diff --git a/src/core/Akka.Streams/Implementation/ActorMaterializerImpl.cs b/src/core/Akka.Streams/Implementation/ActorMaterializerImpl.cs index a0c41ac090c..7892649c314 100644 --- a/src/core/Akka.Streams/Implementation/ActorMaterializerImpl.cs +++ b/src/core/Akka.Streams/Implementation/ActorMaterializerImpl.cs @@ -66,16 +66,13 @@ protected override object MaterializeAtomic(AtomicModule atomic, Attributes effe AssignPort(source.Shape.Outlets.First(), publisher); materializedValues.Add(atomic, materialized); } - else if (atomic is IStageModule) + else if (atomic is IProcessorModule) { - // FIXME: Remove this, only stream-of-stream ops need it - var stage = (IStageModule) atomic; - // assumes BaseType is StageModule<> - var methodInfo = ProcessorForMethod.MakeGenericMethod(atomic.GetType().BaseType.GenericTypeArguments); - var parameters = new object[] - {stage, effectiveAttributes, _materializer.EffectiveSettings(effectiveAttributes), null}; - var processor = methodInfo.Invoke(this, parameters); - object materialized = parameters[3]; + var stage = atomic as IProcessorModule; + var t = stage.CreateProcessor(); + var processor = t.Item1; + var materialized = t.Item2; + AssignPort(stage.In, UntypedSubscriber.FromTyped(processor)); AssignPort(stage.Out, UntypedPublisher.FromTyped(processor)); materializedValues.Add(atomic, materialized); @@ -143,21 +140,6 @@ private void MaterializeGraph(GraphModule graph, Attributes effectiveAttributes, i++; } } - - // ReSharper disable once UnusedMember.Local - private IProcessor ProcessorFor(StageModule op, Attributes effectiveAttributes, ActorMaterializerSettings settings, out object materialized) - { - DirectProcessor processor; - if ((processor = op as DirectProcessor) != null) - { - var t = processor.ProcessorFactory(); - materialized = t.Item2; - return t.Item1; - } - - var props = ActorProcessorFactory.Props(_materializer, op, effectiveAttributes, out materialized); - return ActorProcessorFactory.Create(_materializer.ActorOf(props, StageName(effectiveAttributes), settings.Dispatcher)); - } } #endregion @@ -410,37 +392,4 @@ protected override bool Receive(object message) protected override void PostStop() => HaveShutdown.Value = true; } - - internal static class ActorProcessorFactory - { - public static Props Props(ActorMaterializer materializer, StageModule op, Attributes parentAttributes, out object materialized) - { - var attr = parentAttributes.And(op.Attributes); - // USE THIS TO AVOID CLOSING OVER THE MATERIALIZER BELOW - // Also, otherwise the attributes will not affect the settings properly! - var settings = materializer.EffectiveSettings(attr); - Props result; - materialized = null; - - if (op is IGroupBy) - { - var groupBy = (IGroupBy) op; - result = GroupByProcessorImpl.Props(settings, groupBy.MaxSubstreams, groupBy.Extractor); - } - else if (op.GetType().IsGenericType && op.GetType().GetGenericTypeDefinition() == typeof(DirectProcessor<,>)) - throw new ArgumentException("DirectProcessor cannot end up in ActorProcessorFactory"); - else - throw new ArgumentException($"StageModule type {op.GetType()} is not supported"); - - return result; - } - - public static ActorProcessor Create(IActorRef impl) - { - var p = new ActorProcessor(impl); - // Resolve cyclic dependency with actor. This MUST be the first message no matter what. - impl.Tell(new ExposedPublisher(p)); - return p; - } - } } \ No newline at end of file diff --git a/src/core/Akka.Streams/Implementation/FlowModule.cs b/src/core/Akka.Streams/Implementation/FlowModule.cs deleted file mode 100644 index 04996313a3c..00000000000 --- a/src/core/Akka.Streams/Implementation/FlowModule.cs +++ /dev/null @@ -1,36 +0,0 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2015-2016 Lightbend Inc. -// Copyright (C) 2013-2016 Akka.NET project -// -//----------------------------------------------------------------------- - -using System; - -namespace Akka.Streams.Implementation -{ - internal abstract class FlowModule : AtomicModule - { - public readonly Inlet In = new Inlet("Flow.in"); - public readonly Outlet Out = new Outlet("Flow.out"); - - protected FlowModule() - { - Shape = new FlowShape(In, Out); - } - - public override Shape Shape { get; } - - public override IModule ReplaceShape(Shape shape) - { - if (Shape.Equals(shape)) - return this; - throw new NotSupportedException("cannot replace the shape of a FlowModule"); - } - - protected virtual string Label => GetType().Name; - - public sealed override string ToString() => $"{Label} [{GetHashCode()}%08x]"; - } - -} \ No newline at end of file diff --git a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs index 535d2a42576..c10909d4a62 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs @@ -1795,7 +1795,7 @@ public Logic(Attributes inheritedAttributes, Delay stage) : base(stage.Shape) if (_buffer.IsFull) overflowStrategy(); else { - GrabAndPull(_stage._strategy != DelayOverflowStrategy.Backpressure || _buffer.Capacity < _size - 1); + GrabAndPull(_stage._strategy != DelayOverflowStrategy.Backpressure || _buffer.Used < _size - 1); if (!IsTimerActive(TimerName)) ScheduleOnce(TimerName, _stage._delay); } @@ -2121,6 +2121,10 @@ private void SwitchTo(IGraph, TMat> source) public RecoverWith(Func, TMat>> partialFunction, int maximumRetries) { + if (maximumRetries < -1) + throw new ArgumentException("number of retries must be non-negative or equal to -1", + nameof(maximumRetries)); + _partialFunction = partialFunction; _maximumRetries = maximumRetries; } diff --git a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs index 6b889d2ac6d..7d5c87d5a5f 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs @@ -13,6 +13,7 @@ using Akka.Streams.Dsl; using Akka.Streams.Implementation.Stages; using Akka.Streams.Stage; +using Akka.Streams.Supervision; using Akka.Streams.Util; using Akka.Util; using Akka.Util.Internal; @@ -307,6 +308,287 @@ public PrefixAndTail(int count) public override string ToString() => $"PrefixAndTail({_count})"; } + /// + /// INTERNAL API + /// + internal sealed class GroupBy : GraphStage>> + { + #region Loigc + + private sealed class Logic : TimerGraphStageLogic + { + private readonly GroupBy _stage; + private readonly Dictionary _activeSubstreams = new Dictionary(); + private readonly HashSet _closedSubstreams = new HashSet(); + private readonly HashSet _substreamsJustStarted = new HashSet(); + private readonly Lazy _decider; + private TimeSpan _timeout; + private SubstreamSource _substreamWaitingToBePushed; + private Option _nextElementKey = Option.None; + private Option _nextElementValue = Option.None; + private long _nextId; + private int _firstPushCounter; + + + public Logic(GroupBy stage, Attributes inheritedAttributes) : base(stage.Shape) + { + _stage = stage; + + _decider = new Lazy(() => + { + var attribute = inheritedAttributes.GetAttribute(null); + return attribute != null ? attribute.Decider : Deciders.StoppingDecider; ; + }); + + SetHandler(_stage.In, onPush: () => + { + try + { + var element = Grab(_stage.In); + var key = _stage._keyFor(element); + if (key == null) + throw new ArgumentNullException(nameof(key), "Key cannot be null"); + + if (_activeSubstreams.ContainsKey(key)) + { + var substreamSource = _activeSubstreams[key]; + if (substreamSource.IsAvailable) + substreamSource.Push(element); + else + { + _nextElementKey = key; + _nextElementValue = element; + } + } + else + { + if (_activeSubstreams.Count == _stage._maxSubstreams) + Fail(new IllegalStateException($"Cannot open substream for key {key}: too many substreams open")); + else if (_closedSubstreams.Contains(key) && !HasBeenPulled(_stage.In)) + Pull(_stage.In); + else + RunSubstream(key, element); + } + } + catch (Exception ex) + { + var directive = _decider.Value(ex); + if (directive == Directive.Stop) + Fail(ex); + else if (!HasBeenPulled(_stage.In)) + Pull(_stage.In); + } + }, onUpstreamFinish: () => + { + if (!TryCompleteAll()) + SetKeepGoing(true); + }, onUpstreamFailure: Fail); + + SetHandler(_stage.Out, onPull: () => + { + if (_substreamWaitingToBePushed != null) + { + Push(_stage.Out, Source.FromGraph(_substreamWaitingToBePushed.Source)); + ScheduleOnce(_substreamWaitingToBePushed.Key.Value, _timeout); + _substreamWaitingToBePushed = null; + } + else + { + if (HasNextElement) + { + var subSubstreamSource = _activeSubstreams[_nextElementKey.Value]; + if (subSubstreamSource.IsAvailable) + { + subSubstreamSource.Push(_nextElementValue.Value); + ClearNextElement(); + } + } + else + TryPull(_stage.In); + } + }, + onDownstreamFinish: () => + { + if (_activeSubstreams.Count == 0) + CompleteStage(); + else + SetKeepGoing(true); + }); + } + + private long NextId => ++_nextId; + + private bool HasNextElement => _nextElementKey.HasValue; + + private void ClearNextElement() + { + _nextElementKey = Option.None; + _nextElementValue = Option.None; + } + + private bool TryCompleteAll() + { + if (_activeSubstreams.Count == 0 || (!HasNextElement && _firstPushCounter == 0)) + { + foreach (var value in _activeSubstreams.Values) + value.Complete(); + CompleteStage(); + return true; + } + + return false; + } + + private void Fail(Exception ex) + { + foreach (var value in _activeSubstreams.Values) + value.Fail(ex); + + FailStage(ex); + } + + private bool NeedToPull => !(HasBeenPulled(_stage.In) || IsClosed(_stage.In) || HasNextElement); + + public override void PreStart() + { + var settings = ActorMaterializer.Downcast(Interpreter.Materializer).Settings; + _timeout = settings.SubscriptionTimeoutSettings.Timeout; + } + + protected internal override void OnTimer(object timerKey) + { + var key = (TKey) timerKey; + if (_activeSubstreams.ContainsKey(key)) + { + var substreamSource = _activeSubstreams[key]; + substreamSource.Timeout(_timeout); + _closedSubstreams.Add(key); + _activeSubstreams.Remove(key); + if (IsClosed(_stage.In)) + TryCompleteAll(); + } + } + + private void RunSubstream(TKey key, T value) + { + var substreamSource = new SubstreamSource(this, "GroupBySource " + NextId, key, value); + _activeSubstreams.Add(key, substreamSource); + _firstPushCounter++; + if (IsAvailable(_stage.Out)) + { + Push(_stage.Out, Source.FromGraph(substreamSource.Source)); + ScheduleOnce(key, _timeout); + _substreamWaitingToBePushed = null; + } + else + { + SetKeepGoing(true); + _substreamsJustStarted.Add(substreamSource); + _substreamWaitingToBePushed = substreamSource; + } + } + + private sealed class SubstreamSource : SubSourceOutlet, IOutHandler + { + private readonly Logic _logic; + private Option _firstElement; + + public SubstreamSource(Logic logic, string name, Option key, Option firstElement) : base(logic, name) + { + _logic = logic; + _firstElement = firstElement; + Key = key; + + SetHandler(this); + } + + private bool FirstPush => _firstElement.HasValue; + + private bool HasNextForSubSource => _logic.HasNextElement && _logic._nextElementKey.Equals(Key); + + public Option Key { get; } + + private void CompleteSubStream() + { + Complete(); + _logic._activeSubstreams.Remove(Key.Value); + _logic._closedSubstreams.Add(Key.Value); + } + + private void TryCompleteHandler() + { + if (_logic.IsClosed(_logic._stage.In) && !HasNextForSubSource) + { + CompleteSubStream(); + _logic.TryCompleteAll(); + } + } + + public void OnPull() + { + _logic.CancelTimer(Key.Value); + if (FirstPush) + { + _logic._firstPushCounter--; + Push(_firstElement.Value); + _firstElement = Option.None; + _logic._substreamsJustStarted.Remove(this); + if(_logic._substreamsJustStarted.Count == 0) + _logic.SetKeepGoing(false); + } + else if (HasNextForSubSource) + { + Push(_logic._nextElementValue.Value); + _logic.ClearNextElement(); + } + else if (_logic.NeedToPull) + _logic.Pull(_logic._stage.In); + + TryCompleteHandler(); + } + + public void OnDownstreamFinish() + { + if(_logic.HasNextElement && _logic._nextElementKey.Equals(Key)) + _logic.ClearNextElement(); + if (FirstPush) + _logic._firstPushCounter--; + CompleteSubStream(); + if (_logic.IsClosed(_logic._stage.In)) + _logic.TryCompleteAll(); + else if (_logic.NeedToPull) + _logic.Pull(_logic._stage.In); + } + } + } + + #endregion + + private readonly int _maxSubstreams; + private readonly Func _keyFor; + + public GroupBy(int maxSubstreams, Func keyFor) + { + _maxSubstreams = maxSubstreams; + _keyFor = keyFor; + + Shape = new FlowShape>(In, Out); + } + + private Inlet In { get; } = new Inlet("GroupBy.in"); + + private Outlet> Out { get; } = new Outlet>("GroupBy.out"); + + protected override Attributes InitialAttributes { get; } = DefaultAttributes.GroupBy; + + public override FlowShape> Shape { get; } + + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) + => new Logic(this, inheritedAttributes); + + public override string ToString() => "GroupBy"; + } + /// /// INTERNAL API /// @@ -447,7 +729,7 @@ public override void OnUpstreamFailure(Exception ex) private TimeSpan _timeout; private SubSourceOutlet _substreamSource; - private bool _substreamPushed; + private bool _substreamWaitingToBePushed; private bool _substreamCancelled; private readonly Split _stage; @@ -458,16 +740,16 @@ public Logic(Split stage) : base(stage.Shape) { if (_substreamSource == null) Pull(stage._in); - else if (!_substreamPushed) + else if (!_substreamWaitingToBePushed) { Push(stage._out, Source.FromGraph(_substreamSource.Source)); ScheduleOnce(SubscriptionTimer, _timeout); - _substreamPushed = true; + _substreamWaitingToBePushed = true; } }, onDownstreamFinish: () => { // If the substream is already cancelled or it has not been handed out, we can go away - if (!_substreamPushed || _substreamCancelled) + if (!_substreamWaitingToBePushed || _substreamCancelled) CompleteStage(); }); @@ -509,10 +791,10 @@ private void HandOver(SubstreamHandler handler) { Push(_stage._out, Source.FromGraph(_substreamSource.Source)); ScheduleOnce(SubscriptionTimer, _timeout); - _substreamPushed = true; + _substreamWaitingToBePushed = true; } else - _substreamPushed = false; + _substreamWaitingToBePushed = false; } } @@ -540,6 +822,8 @@ public Split(Split.SplitDecision decision, Func predicate, SubstreamCan public override FlowShape> Shape { get; } protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this); + + public override string ToString() => "Split"; } /// diff --git a/src/core/Akka.Streams/Implementation/GroupByProcessorImpl.cs b/src/core/Akka.Streams/Implementation/GroupByProcessorImpl.cs deleted file mode 100644 index e019995d341..00000000000 --- a/src/core/Akka.Streams/Implementation/GroupByProcessorImpl.cs +++ /dev/null @@ -1,135 +0,0 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2015-2016 Lightbend Inc. -// Copyright (C) 2013-2016 Akka.NET project -// -//----------------------------------------------------------------------- - -using System; -using System.Collections.Generic; -using Akka.Actor; -using Akka.Pattern; -using Akka.Streams.Dsl; -using Decider = Akka.Streams.Supervision.Decider; -using Directive = Akka.Streams.Supervision.Directive; - -namespace Akka.Streams.Implementation -{ - internal sealed class GroupByProcessorImpl : MultiStreamOutputProcessor - { - public static Props Props(ActorMaterializerSettings settings, int maxSubstreams, Func keyFor) - => Actor.Props.Create(() => new GroupByProcessorImpl(settings, maxSubstreams, keyFor)) - .WithDeploy(Deploy.Local); - - private readonly int _maxSubstreams; - private readonly Func _keyFor; - private readonly Decider _decider; - private readonly IDictionary _keyToSubstreamOutput = new Dictionary(); - // No substream is open yet. If downstream cancels now, we are complete - // some substreams are open now. If downstream cancels, we still continue until the substreams are closed - private readonly TransferPhase _waitNext; - - private SubstreamOutput _pendingSubstreamOutput; - - public GroupByProcessorImpl(ActorMaterializerSettings settings, int maxSubstreams, Func keyFor) : base(settings) - { - _maxSubstreams = maxSubstreams; - _keyFor = keyFor; - _decider = settings.SupervisionDecider; - var waitFirst = new TransferPhase(PrimaryInputs.NeedsInput.And(PrimaryOutputs.NeedsDemand), () => - { - var element = PrimaryInputs.DequeueInputElement(); - object key; - if (TryKeyFor(element, out key)) - NextPhase(OpenSubstream(element, key)); - }); - _waitNext = new TransferPhase(PrimaryInputs.NeedsInput, () => - { - var element = PrimaryInputs.DequeueInputElement(); - object key; - if (TryKeyFor(element, out key)) - { - SubstreamOutput substream; - if (_keyToSubstreamOutput.TryGetValue(key, out substream)) - { - if (substream.IsOpen) - NextPhase(DispatchToSubstream(element, substream)); - } - else if (PrimaryOutputs.IsOpen) - NextPhase(OpenSubstream(element, key)); - } - }); - - InitialPhase(1, waitFirst); - } - - protected override void InvalidateSubstreamOutput(SubstreamKey substream) - { - if (!ReferenceEquals(_pendingSubstreamOutput, null) && substream == _pendingSubstreamOutput.Key) - { - _pendingSubstreamOutput = null; - NextPhase(_waitNext); - } - - base.InvalidateSubstreamOutput(substream); - } - - private bool TryKeyFor(object key, out object element) - { - try - { - element = _keyFor(key); - return true; - } - catch (Exception cause) - { - if (_decider(cause) != Directive.Stop) - { - element = null; - if (Settings.IsDebugLogging) - Log.Debug("Dropped element [{0}] due to exception from groupBy function: {1}", key, cause.Message); - return false; - } - throw; - } - } - - private TransferPhase OpenSubstream(object element, object key) - { - return new TransferPhase(PrimaryOutputs.NeedsDemandOrCancel, () => - { - if (PrimaryOutputs.IsClosed) - { - // Just drop, we do not open any more substreams - NextPhase(_waitNext); - } - else - { - if (_keyToSubstreamOutput.Count == _maxSubstreams) - throw new IllegalStateException($"Cannot open substream for key '{key}': too many substreams open"); - var substreamOutput = CreateSubstreamOutput(); - var substreamFlow = Source.FromPublisher(substreamOutput); - PrimaryOutputs.EnqueueOutputElement(substreamFlow); - - if (_keyToSubstreamOutput.ContainsKey(key)) - _keyToSubstreamOutput[key] = substreamOutput; - else - _keyToSubstreamOutput.Add(key, substreamOutput); - - NextPhase(DispatchToSubstream(element, substreamOutput)); - } - }); - } - - private TransferPhase DispatchToSubstream(object element, SubstreamOutput substream) - { - _pendingSubstreamOutput = substream; - return new TransferPhase(substream.NeedsDemand, () => - { - substream.EnqueueOutputElement(element); - _pendingSubstreamOutput = null; - NextPhase(_waitNext); - }); - } - } -} \ No newline at end of file diff --git a/src/core/Akka.Streams/Implementation/IO/FilePublisher.cs b/src/core/Akka.Streams/Implementation/IO/FilePublisher.cs index d8311343b25..fef226e3d90 100644 --- a/src/core/Akka.Streams/Implementation/IO/FilePublisher.cs +++ b/src/core/Akka.Streams/Implementation/IO/FilePublisher.cs @@ -77,6 +77,7 @@ protected override void PreStart() } catch (Exception ex) { + _completionPromise.TrySetResult(new IOResult(0, Result.Failure(ex))); OnErrorThenStop(ex); } diff --git a/src/core/Akka.Streams/Implementation/IO/OutputStreamSourceStage.cs b/src/core/Akka.Streams/Implementation/IO/OutputStreamSourceStage.cs index ad3e0930e69..802226aaaa2 100644 --- a/src/core/Akka.Streams/Implementation/IO/OutputStreamSourceStage.cs +++ b/src/core/Akka.Streams/Implementation/IO/OutputStreamSourceStage.cs @@ -8,6 +8,8 @@ using System; using System.Collections.Concurrent; using System.IO; +using System.Linq; +using System.Threading; using System.Threading.Tasks; using Akka.Dispatch; using Akka.IO; @@ -44,7 +46,6 @@ internal class Close : IAdapterToStageMessage private Close() { - } } @@ -78,19 +79,23 @@ internal interface IStageWithCallback private sealed class Logic : GraphStageLogic, IStageWithCallback { private readonly OutputStreamSourceStage _stage; - private BlockingCollection _dataQueue; private readonly AtomicReference _downstreamStatus; - private TaskCompletionSource _flush; - private TaskCompletionSource _close; + private readonly string _dispatcherId; private readonly Action>> _upstreamCallback; private readonly OnPullRunnable _pullTask; + private readonly CancellationTokenSource _cancellation = new CancellationTokenSource(); + private BlockingCollection _dataQueue; + private TaskCompletionSource _flush; + private TaskCompletionSource _close; + private MessageDispatcher _dispatcher; - public Logic(OutputStreamSourceStage stage, BlockingCollection dataQueue, - AtomicReference downstreamStatus) : base(stage.Shape) + public Logic(OutputStreamSourceStage stage, BlockingCollection dataQueue, AtomicReference downstreamStatus, string dispatcherId) : base(stage.Shape) { _stage = stage; _dataQueue = dataQueue; _downstreamStatus = downstreamStatus; + _dispatcherId = dispatcherId; + var downstreamCallback = GetAsyncCallback((Either result) => { if (result.IsLeft) @@ -100,15 +105,27 @@ public Logic(OutputStreamSourceStage stage, BlockingCollection dataQ }); _upstreamCallback = GetAsyncCallback>>(OnAsyncMessage); - _pullTask = new OnPullRunnable(downstreamCallback, dataQueue); + _pullTask = new OnPullRunnable(downstreamCallback, dataQueue, _cancellation.Token); SetHandler(_stage._out, onPull: OnPull, onDownstreamFinish: OnDownstreamFinish); } + public override void PreStart() + { + _dispatcher = ActorMaterializer.Downcast(Materializer).System.Dispatchers.Lookup(_dispatcherId); + base.PreStart(); + } + + public override void PostStop() + { + // interrupt any pending blocking take + _cancellation.Cancel(false); + base.PostStop(); + } + private void OnDownstreamFinish() { //assuming there can be no further in messages _downstreamStatus.Value = Canceled.Instance; - _dataQueue.Add(ByteString.Empty); _dataQueue = null; CompleteStage(); } @@ -117,18 +134,24 @@ private sealed class OnPullRunnable : IRunnable { private readonly Action> _callback; private readonly BlockingCollection _dataQueue; + private readonly CancellationToken _cancellationToken; - public OnPullRunnable(Action> callback, BlockingCollection dataQueue) + public OnPullRunnable(Action> callback, BlockingCollection dataQueue, CancellationToken cancellationToken) { _callback = callback; _dataQueue = dataQueue; + _cancellationToken = cancellationToken; } public void Run() { try { - _callback(new Left(_dataQueue.Take())); + _callback(new Left(_dataQueue.Take(_cancellationToken))); + } + catch (OperationCanceledException) + { + _callback(new Left(ByteString.Empty)); } catch (Exception ex) { @@ -137,10 +160,7 @@ public void Run() } } - private void OnPull() - { - Interpreter.Materializer.ExecutionContext.Schedule(_pullTask); - } + private void OnPull() => _dispatcher.Schedule(_pullTask); private void OnPush(ByteString data) { @@ -207,7 +227,6 @@ private void SendResponseIfNeeded() private readonly TimeSpan _writeTimeout; private readonly Outlet _out = new Outlet("OutputStreamSource.out"); - public OutputStreamSourceStage(TimeSpan writeTimeout) { _writeTimeout = writeTimeout; @@ -227,8 +246,11 @@ public override ILogicAndMaterializedValue CreateLogicAndMaterializedVal var dataQueue = new BlockingCollection(maxBuffer); var downstreamStatus = new AtomicReference(Ok.Instance); - - var logic = new Logic(this, dataQueue, downstreamStatus); + + var dispatcherId = + inheritedAttributes.GetAttribute( + DefaultAttributes.IODispatcher.GetAttributeList().First()).Name; + var logic = new Logic(this, dataQueue, downstreamStatus, dispatcherId); return new LogicAndMaterializedValue(logic, new OutputStreamAdapter(dataQueue, downstreamStatus, logic, _writeTimeout)); } diff --git a/src/core/Akka.Streams/Implementation/Sinks.cs b/src/core/Akka.Streams/Implementation/Sinks.cs index beb770fdb36..a2794225ff5 100644 --- a/src/core/Akka.Streams/Implementation/Sinks.cs +++ b/src/core/Akka.Streams/Implementation/Sinks.cs @@ -140,8 +140,10 @@ public override object Create(MaterializationContext context, out IPublisher.Props(settings)); - var fanoutProcessor = ActorProcessorFactory.Create(fanoutRef); + var impl = actorMaterializer.ActorOf(context, FanoutProcessorImpl.Props(settings)); + var fanoutProcessor = new ActorProcessor(impl); + impl.Tell(new ExposedPublisher(fanoutProcessor)); + // Resolve cyclic dependency with actor. This MUST be the first message no matter what. materializer = fanoutProcessor; return fanoutProcessor; } diff --git a/src/core/Akka.Streams/Implementation/Stages/Stages.cs b/src/core/Akka.Streams/Implementation/Stages/Stages.cs index b0c81483ba9..5068aba04e4 100644 --- a/src/core/Akka.Streams/Implementation/Stages/Stages.cs +++ b/src/core/Akka.Streams/Implementation/Stages/Stages.cs @@ -54,6 +54,7 @@ internal static class DefaultAttributes public static readonly Attributes Processor = Attributes.CreateName("processor"); public static readonly Attributes ProcessorWithKey = Attributes.CreateName("processorWithKey"); public static readonly Attributes IdentityOp = Attributes.CreateName("identityOp"); + public static readonly Attributes DelimiterFraming = Attributes.CreateName("delimiterFraming"); public static readonly Attributes Initial = Attributes.CreateName("initial"); public static readonly Attributes Completion = Attributes.CreateName("completion"); @@ -78,7 +79,7 @@ internal static class DefaultAttributes public static readonly Attributes UnfoldResourceSource = Attributes.CreateName("unfoldResourceSource").And(IODispatcher); public static readonly Attributes UnfoldResourceSourceAsync = Attributes.CreateName("unfoldResourceSourceAsync").And(IODispatcher); public static readonly Attributes TerminationWatcher = Attributes.CreateName("terminationWatcher"); - public static readonly Attributes Delay = Attributes.CreateName("delay").And(new Attributes.InputBuffer(16, 16)); + public static readonly Attributes Delay = Attributes.CreateName("delay"); public static readonly Attributes ZipN = Attributes.CreateName("zipN"); public static readonly Attributes ZipWithN = Attributes.CreateName("zipWithN"); @@ -119,19 +120,6 @@ internal static class DefaultAttributes public static readonly Attributes FileSink = Attributes.CreateName("fileSink").And(IODispatcher); public static readonly Attributes SeqSink = Attributes.CreateName("seqSink"); } - - // FIXME: To be deprecated as soon as stream-of-stream operations are stages - internal interface IStageModule - { - InPort In { get; } - OutPort Out { get; } - } - - internal abstract class StageModule : FlowModule, IStageModule - { - InPort IStageModule.In => In; - OutPort IStageModule.Out => Out; - } /// /// Stage that is backed by a GraphStage but can be symbolically introspected @@ -383,61 +371,4 @@ public override ILogicAndMaterializedValue> CreateLogicAndMaterialized public override string ToString() => "LastOrDefaultStage"; } - - // FIXME: These are not yet proper stages, therefore they use the deprecated StageModule infrastructure - - internal interface IGroupBy - { - int MaxSubstreams { get; } - Func Extractor { get; } - } - - internal sealed class GroupBy : StageModule>, IGroupBy - { - private readonly Func _extractor; - private readonly Func _extractorWrapper; - - public GroupBy(int maxSubstreams, Func extractor, Attributes attributes = null) - { - MaxSubstreams = maxSubstreams; - _extractor = extractor; - _extractorWrapper = _ => _extractor((TIn) _); - Attributes = attributes ?? DefaultAttributes.GroupBy; - - Label = $"GroupBy({maxSubstreams})"; - } - - public int MaxSubstreams { get; } - - public Func Extractor => _extractor; - - Func IGroupBy.Extractor => _extractorWrapper; - - public override IModule CarbonCopy() => new GroupBy(MaxSubstreams, Extractor, Attributes); - - public override Attributes Attributes { get; } - - public override IModule WithAttributes(Attributes attributes) - => new GroupBy(MaxSubstreams, Extractor, attributes); - - protected override string Label { get; } - } - - internal sealed class DirectProcessor : StageModule - { - public readonly Func, object>> ProcessorFactory; - - public DirectProcessor(Func, object>> processorFactory, Attributes attributes = null) - { - ProcessorFactory = processorFactory; - Attributes = attributes ?? DefaultAttributes.Processor; - } - - public override IModule CarbonCopy() => new DirectProcessor(ProcessorFactory, Attributes); - - public override Attributes Attributes { get; } - - public override IModule WithAttributes(Attributes attributes) - => new DirectProcessor(ProcessorFactory, attributes); - } } \ No newline at end of file diff --git a/src/core/Akka.Streams/Implementation/StreamLayout.cs b/src/core/Akka.Streams/Implementation/StreamLayout.cs index 3208a7fb0f6..eb413dc9130 100644 --- a/src/core/Akka.Streams/Implementation/StreamLayout.cs +++ b/src/core/Akka.Streams/Implementation/StreamLayout.cs @@ -13,6 +13,7 @@ using Akka.Pattern; using Akka.Streams.Dsl; using Akka.Streams.Implementation.Fusing; +using Akka.Streams.Implementation.Stages; using Akka.Streams.Util; using Akka.Util; using Reactive.Streams; @@ -567,8 +568,7 @@ public virtual IModule Nest() public virtual IImmutableDictionary Upstreams => ImmutableDictionary.Empty; - public virtual StreamLayout.IMaterializedValueNode MaterializedValueComputation => new StreamLayout.Atomic(this) - ; + public virtual StreamLayout.IMaterializedValueNode MaterializedValueComputation => new StreamLayout.Atomic(this); public abstract Shape Shape { get; } @@ -599,8 +599,7 @@ private EmptyModule() public override bool IsRunnable => false; - public override StreamLayout.IMaterializedValueNode MaterializedValueComputation => StreamLayout.Ignore.Instance - ; + public override StreamLayout.IMaterializedValueNode MaterializedValueComputation => StreamLayout.Ignore.Instance; public override IModule ReplaceShape(Shape shape) { @@ -610,11 +609,28 @@ public override IModule ReplaceShape(Shape shape) throw new NotSupportedException("Cannot replace the shape of empty module"); } - public override IModule Compose(IModule other) => other; + public override IModule Compose(IModule other) => Compose(other, Keep.Left); public override IModule Compose(IModule other, Func matFunc) { - throw new NotSupportedException("It is invalid to combine materialized value with EmptyModule"); + if (Keep.IsRight(matFunc)) + return other; + + if (Keep.IsLeft(matFunc)) + { + // If "that" has a fully ignorable materialized value, we ignore it, otherwise we keep the side effect and + // explicitly map to NotUsed + var materialized = + IgnorableMaterializedValueComposites.Apply(other) + ? (StreamLayout.IMaterializedValueNode) StreamLayout.Ignore.Instance + : new StreamLayout.Transform(_ => NotUsed.Instance, other.MaterializedValueComputation); + + return new CompositeModule(other.IsSealed ? ImmutableArray.Create(other) : other.SubModules, other.Shape, + other.Downstreams, other.Upstreams, materialized, IsSealed ? Attributes.None : Attributes); + } + + throw new NotSupportedException( + "It is invalid to combine materialized value with EmptyModule except with Keep.Left or Keep.Right"); } public override IModule Nest() => this; @@ -1026,6 +1042,9 @@ private void EstablishSubscription(ISubscriber subscriber, ISubscription subs try { subscriber.OnSubscribe(wrapped); + // Requests will be only allowed once onSubscribe has returned to avoid reentering on an onNext before + // onSubscribe completed + wrapped.UngateDemandAndRequestBuffered(); } catch (Exception ex) { @@ -1235,12 +1254,37 @@ public void OnNext(T element) } } - private sealed class WrappedSubscription : ISubscription + private interface ISubscriptionState + { + long Demand { get; } + } + + private sealed class PassThrough : ISubscriptionState + { + public static PassThrough Instance { get; } = new PassThrough(); + + private PassThrough() { } + + public long Demand { get; } = 0; + } + + private sealed class Buffering : ISubscriptionState + { + public Buffering(long demand) + { + Demand = demand; + } + + public long Demand { get; } + } + + private sealed class WrappedSubscription : AtomicReference, ISubscription { + private static readonly Buffering NoBufferedDemand = new Buffering(0); private readonly ISubscription _real; private readonly VirtualProcessor _processor; - - public WrappedSubscription(ISubscription real, VirtualProcessor processor) + + public WrappedSubscription(ISubscription real, VirtualProcessor processor) : base(NoBufferedDemand) { _real = real; _processor = processor; @@ -1265,7 +1309,26 @@ public void Request(long n) } } else - _real.Request(n); + { + // NOTE: At this point, batched requests might not have been dispatched, i.e. this can reorder requests. + // This does not violate the Spec though, since we are a "Processor" here and although we, in reality, + // proxy downstream requests, it is virtually *us* that emit the requests here and we are free to follow + // any pattern of emitting them. + // The only invariant we need to keep is to never emit more requests than the downstream emitted so far. + + while (true) + { + var current = Value; + if (current == PassThrough.Instance) + { + _real.Request(n); + break; + } + if (!CompareAndSet(current, new Buffering(current.Demand + n))) + continue; + break; + } + } } public void Cancel() @@ -1273,6 +1336,15 @@ public void Cancel() _processor.Value = Inert.Instance; _real.Cancel(); } + + public void UngateDemandAndRequestBuffered() + { + // Ungate demand + var requests = GetAndSet(PassThrough.Instance).Demand; + // And request buffered demand + if(requests > 0) + _real.Request(requests); + } } } @@ -1683,4 +1755,50 @@ private void DoSubscribe(IUntypedPublisher publisher, object subscriberOrVirtual publisher.Subscribe(UntypedSubscriber.FromTyped(subscriberOrVirtual)); } } + + internal interface IProcessorModule + { + Inlet In { get; } + + Outlet Out { get; } + + Tuple CreateProcessor(); + } + + internal sealed class ProcessorModule : AtomicModule, IProcessorModule + { + private readonly Func, TMat>> _createProcessor; + + public ProcessorModule(Func, TMat>> createProcessor, Attributes attributes = null) + { + _createProcessor = createProcessor; + Attributes = attributes ?? DefaultAttributes.Processor; + Shape = new FlowShape((Inlet)In, (Outlet)Out); + } + + public Inlet In { get; } = new Inlet("ProcessorModule.in"); + + public Outlet Out { get; } = new Outlet("ProcessorModule.out"); + + public override Shape Shape { get; } + + public override IModule ReplaceShape(Shape shape) + { + if(shape != Shape) + throw new NotSupportedException("Cannot replace the shape of a FlowModule"); + return this; + } + + public override IModule CarbonCopy() => WithAttributes(Attributes); + + public override Attributes Attributes { get; } + + public override IModule WithAttributes(Attributes attributes) => new ProcessorModule(_createProcessor, attributes); + + public Tuple CreateProcessor() + { + var result = _createProcessor(); + return Tuple.Create(result.Item1, result.Item2); + } + } } diff --git a/src/core/Akka.Streams/Implementation/StreamOfStreamProcessors.cs b/src/core/Akka.Streams/Implementation/StreamOfStreamProcessors.cs deleted file mode 100644 index 8cdd10045d8..00000000000 --- a/src/core/Akka.Streams/Implementation/StreamOfStreamProcessors.cs +++ /dev/null @@ -1,464 +0,0 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2015-2016 Lightbend Inc. -// Copyright (C) 2013-2016 Akka.NET project -// -//----------------------------------------------------------------------- - -using System; -using System.Collections.Generic; -using System.Linq; -using Akka.Actor; -using Akka.Event; -using Akka.Pattern; -using Akka.Util; -using Reactive.Streams; - -namespace Akka.Streams.Implementation -{ - internal abstract class MultiStreamOutputProcessor : ActorProcessorImpl, IStreamSubscriptionTimeoutSupport - { - #region internal classes - - [Serializable] - internal struct SubstreamKey : IEquatable - { - public readonly long Id; - - public SubstreamKey(long id) - { - Id = id; - } - - public bool Equals(SubstreamKey other) => Id == other.Id; - - public override bool Equals(object obj) => obj is SubstreamKey && Equals((SubstreamKey)obj); - - public override int GetHashCode() => Id.GetHashCode(); - - public static bool operator ==(SubstreamKey x, SubstreamKey y) => Equals(x, y); - - public static bool operator !=(SubstreamKey x, SubstreamKey y) => !(x == y); - } - - [Serializable] - internal struct SubstreamRequestMore : INoSerializationVerificationNeeded - { - public readonly SubstreamKey Substream; - public readonly long Demand; - - public SubstreamRequestMore(SubstreamKey substream, long demand) - { - Substream = substream; - Demand = demand; - } - } - - [Serializable] - internal struct SubstreamCancel : INoSerializationVerificationNeeded - { - public readonly SubstreamKey Substream; - - public SubstreamCancel(SubstreamKey substream) - { - Substream = substream; - } - } - - [Serializable] - internal struct SubstreamSubscribe : INoSerializationVerificationNeeded - { - public readonly SubstreamKey Substream; - public readonly ISubscriber Subscriber; - - public SubstreamSubscribe(SubstreamKey substream, ISubscriber subscriber) - { - Substream = substream; - Subscriber = subscriber; - } - } - - [Serializable] - internal struct SubstreamSubscriptionTimeout : INoSerializationVerificationNeeded - { - public readonly SubstreamKey Substream; - - public SubstreamSubscriptionTimeout(SubstreamKey substream) - { - Substream = substream; - } - } - - internal struct SubstreamSubscription : ISubscription, IEquatable - { - public readonly IActorRef Parent; - public readonly SubstreamKey Substream; - - public SubstreamSubscription(IActorRef parent, SubstreamKey substream) - { - if (ReferenceEquals(parent, null)) throw new ArgumentNullException(nameof(parent), "Parent actor ref cannot be null"); - Parent = parent; - Substream = substream; - } - - public void Request(long n) => Parent.Tell(new SubstreamRequestMore(Substream, n)); - - public void Cancel() => Parent.Tell(new SubstreamCancel(Substream)); - - public bool Equals(SubstreamSubscription other) => Substream == other.Substream && Equals(Parent, other.Parent); - - public override bool Equals(object obj) => obj is SubstreamSubscription && Equals((SubstreamSubscription)obj); - - public override int GetHashCode() - { - unchecked - { - return (Parent.GetHashCode() * 397) ^ Substream.GetHashCode(); - } - } - - public override string ToString() => "SubstreamSubscription" + GetHashCode(); - } - - internal class SubstreamOutput : SimpleOutputs, IPublisher - { - #region Internal classes - - public interface IPublisherState { } - public interface ICompletedState : IPublisherState { } - - [Serializable] - public sealed class Open : IPublisherState - { - public static readonly Open Instance = new Open(); - private Open() { } - } - - [Serializable] - public sealed class Attached : IPublisherState - { - public readonly ISubscriber Subscriber; - - public Attached(ISubscriber subscriber) - { - Subscriber = subscriber; - } - } - - [Serializable] - public sealed class Completed : ICompletedState - { - public static readonly Completed Instance = new Completed(); - private Completed() { } - } - - [Serializable] - public sealed class Cancelled : ICompletedState - { - public static readonly Cancelled Instance = new Cancelled(); - private Cancelled() { } - } - - [Serializable] - public sealed class Failed : ICompletedState - { - public readonly Exception Reason; - - public Failed(Exception reason) - { - Reason = reason; - } - } - - #endregion - - public readonly SubstreamKey Key; - public readonly ICancelable SubscriptionTimeout; - - private readonly SubstreamSubscription _subscription; - private readonly AtomicReference _state = new AtomicReference(Open.Instance); - - public SubstreamOutput(SubstreamKey key, IActorRef actor, IPump pump, ICancelable subscriptionTimeout) : base(actor, pump) - { - Key = key; - SubscriptionTimeout = subscriptionTimeout; - _subscription = new SubstreamSubscription(actor, key); - } - - public override SubReceive SubReceive { get { throw new NotSupportedException("Substream outputs are managed in a dedicated receive block"); } } - public bool IsAttached => _state.Value is Attached; - - public void EnqueueOutputDemand(long demand) - { - DownstreamDemand += demand; - Pump.Pump(); - } - - public override void Error(Exception e) - { - if (!IsDownstreamCompleted) - { - ClosePublisher(new Failed(e)); - IsDownstreamCompleted = true; - } - } - - public override void Cancel() - { - if (!IsDownstreamCompleted) - { - ClosePublisher(Cancelled.Instance); - IsDownstreamCompleted = true; - } - } - - public override void Complete() - { - if (!IsDownstreamCompleted) - { - ClosePublisher(Completed.Instance); - IsDownstreamCompleted = true; - } - } - - public void Subscribe(ISubscriber subscriber) - { - ReactiveStreamsCompliance.RequireNonNullSubscriber(subscriber); - SubscriptionTimeout.Cancel(); - - if (_state.CompareAndSet(Open.Instance, new Attached(subscriber))) - Actor.Tell(new SubstreamSubscribe(Key, subscriber)); - else - { - var value = _state.Value; - if (value is Attached || value is Cancelled) - ReactiveStreamsCompliance.RejectAdditionalSubscriber(subscriber, "Substream publisher"); - else if (value is ICompletedState) - { - ReactiveStreamsCompliance.TryOnSubscribe(subscriber, CancelledSubscription.Instance); - CloseSubscriber(subscriber, (ICompletedState)value); - } - else if (value is Open) - throw new IllegalStateException("Publisher cannot become open after being used before"); - } - } - - public void AttachSubscriber(ISubscriber subscriber) - { - if (Subscriber == null) - { - Subscriber = UntypedSubscriber.FromTyped(subscriber); - ReactiveStreamsCompliance.TryOnSubscribe(subscriber, _subscription); - } - else - ReactiveStreamsCompliance.RejectAdditionalSubscriber(subscriber, "Substream publisher"); - } - - private void ClosePublisher(ICompletedState withState) - { - SubscriptionTimeout.Cancel(); - var prev = _state.Value; - _state.Value = withState; - - if (prev is ICompletedState) - throw new IllegalStateException("Attempted to double shutdown publisher"); - - if (prev is Attached) - { - var sub = ((Attached)prev).Subscriber; - if (Subscriber == null) - ReactiveStreamsCompliance.TryOnSubscribe(sub, CancelledSubscription.Instance); - CloseSubscriber(sub, withState); - } - } - - private void CloseSubscriber(ISubscriber subscriber, ICompletedState withState) - { - var f = withState as Failed; - if (withState is Completed) - ReactiveStreamsCompliance.TryOnComplete(subscriber); - else if (f != null && !(f.Reason is ISpecViolation)) - ReactiveStreamsCompliance.TryOnError(subscriber, f.Reason); - } - } - - #endregion - - // stream keys will be removed from this map on cancellation/subscription-timeout, never assume a key is present - private readonly IDictionary _substreamOutputs = new Dictionary(); - private long _nextId; - - protected MultiStreamOutputProcessor(ActorMaterializerSettings settings) : base(settings) { } - - private ILoggingAdapter _log; - protected new ILoggingAdapter Log => _log ?? (_log = Context.GetLogger()); - - public StreamSubscriptionTimeoutSettings SubscriptionTimeoutSettings => Settings.SubscriptionTimeoutSettings; - - protected override void Fail(Exception e) - { - FailOutputs(e); - base.Fail(e); - } - - public override void PumpFinished() - { - FinishOutputs(); - base.PumpFinished(); - } - - protected override bool ActiveReceive(object message) - { - return PrimaryInputs.SubReceive.CurrentReceive(message) - || PrimaryOutputs.SubReceive.CurrentReceive(message) - || OutputSubstreamManagement(message); - } - - #region MultiStreamOutputProcessorLike - - protected long NextId() => (++_nextId); - - protected SubstreamOutput CreateSubstreamOutput() - { - var id = new SubstreamKey(NextId()); - var cancellable = ScheduleSubscriptionTimeout(Self, new SubstreamSubscriptionTimeout(id)); - var output = new SubstreamOutput(id, Self, this, cancellable); - _substreamOutputs.Add(output.Key, output); - return output; - } - - protected virtual void InvalidateSubstreamOutput(SubstreamKey substream) - { - CancelSubstreamOutput(substream); - Pump(); - } - - protected void CancelSubstreamOutput(SubstreamKey substream) - { - SubstreamOutput output; - if (_substreamOutputs.TryGetValue(substream, out output)) - { - output.Cancel(); - _substreamOutputs.Remove(substream); - } - } - - protected void CompleteSubstreamOutput(SubstreamKey substream) - { - SubstreamOutput output; - if (_substreamOutputs.TryGetValue(substream, out output)) - { - output.Complete(); - _substreamOutputs.Remove(substream); - } - } - - protected void FailOutputs(Exception cause) - { - foreach (var output in _substreamOutputs.Values) - output.Error(cause); - } - - protected void FinishOutputs() - { - foreach (var output in _substreamOutputs.Values) - output.Complete(); - } - - public bool OutputSubstreamManagement(object message) - { - return message.Match() - .With(request => - { - SubstreamOutput output; - if (_substreamOutputs.TryGetValue(request.Substream, out output)) - { - if (request.Demand < 1) - output.Error(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveException); - else - output.EnqueueOutputDemand(request.Demand); - } - }) - .With(subscribe => - { - SubstreamOutput output; - if (_substreamOutputs.TryGetValue(subscribe.Substream, out output)) - output.AttachSubscriber(subscribe.Subscriber); - }) - .With(timeout => - { - SubstreamOutput output; - if (_substreamOutputs.TryGetValue(timeout.Substream, out output) && !output.IsAttached) - SubscriptionTimedOut(UntypedPublisher.FromTyped(output)); - }) - .With(cancel => InvalidateSubstreamOutput(cancel.Substream)) - .WasHandled; - } - - #endregion - - #region StreamSubscriptionTimeoutSupport - - public ICancelable ScheduleSubscriptionTimeout(IActorRef actorRef, object message) - { - return SubscriptionTimeoutSettings.Mode == StreamSubscriptionTimeoutTerminationMode.NoopTermination - ? NoopSubscriptionTimeout.Instance - : Context.System.Scheduler.ScheduleTellOnceCancelable(SubscriptionTimeoutSettings.Timeout, actorRef, message, Self); - } - - public void SubscriptionTimedOut(IUntypedPublisher target) - { - switch (SubscriptionTimeoutSettings.Mode) - { - case StreamSubscriptionTimeoutTerminationMode.CancelTermination: Cancel(target, SubscriptionTimeoutSettings.Timeout); break; - case StreamSubscriptionTimeoutTerminationMode.WarnTermination: Warn(target, SubscriptionTimeoutSettings.Timeout); break; - } - } - - /// - /// Callback that should ensure that the target is canceled with the given cause. - /// - public void HandleSubscriptionTimeout(IUntypedPublisher target, Exception cause) - { - SubstreamOutput output; - if ((output = UntypedPublisher.ToTyped(target) as SubstreamOutput) != null) - { - output.Error(cause); - output.AttachSubscriber(CancelingSubscriber.Instance); - } - } - - private void Cancel(IUntypedPublisher target, TimeSpan timeout) - { - var typedTarget = UntypedPublisher.ToTyped(target); - if ( - typedTarget.GetType() - .GetInterfaces() - .Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IProcessor<,>))) - { - if (Log.IsDebugEnabled) - Log.Debug( - $"Cancelling {typedTarget} Processor's publisher and subscriber sides (after {timeout.TotalMilliseconds} ms)"); - HandleSubscriptionTimeout(target, new SubscriptionTimeoutException( - $"Publisher was not attached to upstream within deadline {timeout.TotalMilliseconds} ms")); - - } - else - if (Log.IsDebugEnabled) - Log.Debug( - $"Cancelling {typedTarget} (after {timeout.TotalMilliseconds} ms)"); - HandleSubscriptionTimeout(target, new SubscriptionTimeoutException( - $"Publisher {typedTarget} you are trying to subscribe to has been shut-down because exceeing its subscription timeout")); - } - - private void Warn(IUntypedPublisher target, TimeSpan timeout) - { - var typedTarget = UntypedPublisher.ToTyped(target); - Log.Warning( - "Timed out {0} detected (after {1})! You should investigate if you either cancel or consume all {2} instances", - typedTarget, timeout, typedTarget.GetType().Name); - } - - #endregion - } -} \ No newline at end of file diff --git a/src/core/Akka.Streams/KillSwitch.cs b/src/core/Akka.Streams/KillSwitch.cs index 87c9762ed24..4584d78c82c 100644 --- a/src/core/Akka.Streams/KillSwitch.cs +++ b/src/core/Akka.Streams/KillSwitch.cs @@ -246,7 +246,7 @@ internal UniqueKillSwitch(TaskCompletionSource promise) /// A is a provider for s of that can be completed or failed from the outside. /// /// A returned by the switch can be materialized arbitrary amount of times: every newly materialized - /// belongs to the switch from which it was aquired.Multiple instances are isolated from each other, + /// belongs to the switch from which it was acquired. Multiple instances are isolated from each other, /// shutting down or aborting on instance does not affect the s provided by another instance. /// /// @@ -338,7 +338,7 @@ internal SharedKillSwitch(string name) public void Abort(Exception cause) => _shutdownPromise.TrySetException(cause); /// - /// Retrurns a typed Flow of a requested type that will be linked to this instance. By invoking + /// Returns a typed Flow of a requested type that will be linked to this instance. By invoking /// or all running instances of all provided s by this /// switch will be stopped normally or failed. ///