Skip to content

Commit

Permalink
Stream update 2.4.8 (#2333)
Browse files Browse the repository at this point in the history
* Update Streams to 2.4.8

* Fix log messages

* Fix failure message
  • Loading branch information
Marc Piechura authored and alexvaluyskiy committed Oct 6, 2016
1 parent aa913c2 commit 8f35fdc
Show file tree
Hide file tree
Showing 44 changed files with 2,473 additions and 501 deletions.
1,216 changes: 1,213 additions & 3 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt

Large diffs are not rendered by default.

27 changes: 23 additions & 4 deletions src/core/Akka.Streams.Tests/Actor/ActorPublisherSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
using Akka.Pattern;
using Akka.Streams.Actors;
using Akka.Streams.Dsl;
using Akka.Streams.Implementation;
using Akka.Streams.TestKit;
using Akka.Streams.TestKit.Tests;
using Akka.TestKit;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
using ActorPublisher = Akka.Streams.Actors.ActorPublisher;
using Cancel = Akka.Streams.Actors.Cancel;

namespace Akka.Streams.Tests.Actor
{
Expand Down Expand Up @@ -247,7 +250,23 @@ public void ActorPublisher_should_only_allow_one_subscriber()
s.ExpectSubscription();
var s2 = this.CreateManualSubscriberProbe<string>();
ActorPublisher.Create<string>(actorRef).Subscribe(s2);
s2.ExpectSubscriptionAndError().Should().BeOfType<IllegalStateException>();
s2.ExpectSubscriptionAndError()
.Should()
.BeOfType<IllegalStateException>()
.Which.Message.Should()
.Be($"ActorPublisher {ReactiveStreamsCompliance.SupportsOnlyASingleSubscriber}");
}

[Fact]
public void ActorPublisher_should_not_subscribe_the_same_subscriber_multiple_times()
{
var probe = CreateTestProbe();
var actorRef = Sys.ActorOf(TestPublisher.Props(probe.Ref));
var s = this.CreateManualSubscriberProbe<string>();
ActorPublisher.Create<string>(actorRef).Subscribe(s);
s.ExpectSubscription();
ActorPublisher.Create<string>(actorRef).Subscribe(s);
s.ExpectError().Message.Should().Be(ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes);
}

[Fact]
Expand Down Expand Up @@ -454,7 +473,7 @@ public void ActorPublisher_should_handle_stash()
}
}

internal class TestPublisher : ActorPublisher<string>
internal class TestPublisher : Actors.ActorPublisher<string>
{
public static Props Props(IActorRef probe, bool useTestDispatcher = true)
{
Expand Down Expand Up @@ -512,7 +531,7 @@ protected override bool Receive(object message)
public IStash Stash { get; set; }
}

internal class Sender : ActorPublisher<int>
internal class Sender : Actors.ActorPublisher<int>
{
public static Props Props { get; } = Props.Create<Sender>().WithDispatcher("akka.test.stream-dispatcher");

Expand Down Expand Up @@ -559,7 +578,7 @@ private void DeliverBuffer()
}
}

internal class TimeoutingPublisher : ActorPublisher<int>
internal class TimeoutingPublisher : Actors.ActorPublisher<int>
{
public static Props Props(IActorRef probe, TimeSpan timeout) =>
Akka.Actor.Props.Create(() => new TimeoutingPublisher(probe, timeout))
Expand Down
1 change: 1 addition & 0 deletions src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
<Compile Include="Dsl\GraphZipNSpec.cs" />
<Compile Include="Dsl\GraphZipWithNSpec.cs" />
<Compile Include="Dsl\GraphZipWithSpec.cs" />
<Compile Include="Dsl\LazySinkSpec.cs" />
<Compile Include="Dsl\LiftExtensions.cs" />
<Compile Include="Dsl\PublisherSinkSpec.cs" />
<Compile Include="Dsl\QueueSinkSpec.cs" />
Expand Down
36 changes: 36 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Akka.Streams.Dsl;
using Akka.Streams.Dsl.Internal;
using Akka.Streams.Implementation;
using Akka.Streams.Implementation.Fusing;
using Akka.Streams.Supervision;
using Akka.Streams.TestKit;
using Akka.Streams.TestKit.Tests;
Expand Down Expand Up @@ -481,6 +482,41 @@ public void GroupBy_must_work_under_fuzzing_stress_test()
}, Materializer);
}

[Fact]
public void GroupBy_must_Work_if_pull_is_exercised_from_both_substream_and_main()
{
this.AssertAllStagesStopped(() =>
{
var upstream = this.CreatePublisherProbe<int>();
var downstreamMaster = this.CreateSubscriberProbe<Source<int, NotUsed>>();

Source.FromPublisher(upstream)
.Via(new GroupBy<int, bool>(2, element => element == 0))
.RunWith(Sink.FromSubscriber(downstreamMaster), Materializer);

var substream = this.CreateSubscriberProbe<int>();

downstreamMaster.Request(1);
upstream.SendNext(1);
downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream), Materializer);

// Read off first buffered element from subsource
substream.Request(1);
substream.ExpectNext(1);

// Both will attempt to pull upstream
substream.Request(1);
substream.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
downstreamMaster.Request(1);
downstreamMaster.ExpectNoMsg(TimeSpan.FromMilliseconds(100));

// Cleanup, not part of the actual test
substream.Cancel();
downstreamMaster.Cancel();
upstream.SendComplete();
}, Materializer);
}

[Fact]
public void GroupBy_must_work_with_random_demand()
{
Expand Down
22 changes: 22 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowScanSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,27 @@ public void A_Scan_must_resume_properly()
.ToStrict(TimeSpan.FromSeconds(1))
.ShouldAllBeEquivalentTo(new[] {0, 1, 4, 9, 16});
}

[Fact]
public void A_Scan_must_scan_normally_for_empty_source()
{
Source.Empty<int>()
.Scan(0, (i, i1) => i + i1)
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(2)
.ExpectNext(0)
.ExpectComplete();
}

[Fact]
public void A_Scan_must_fail_when_upstream_failed()
{
var cause = new TestException("");
Source.Failed<int>(cause)
.Scan(0, (i, i1) => i + i1)
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(2)
.ExpectError().Should().Be(cause);
}
}
}
242 changes: 242 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/LazySinkSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
//-----------------------------------------------------------------------
// <copyright file="LazySinkSpec.cs" company="Akka.NET Project">
// Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.Supervision;
using Akka.Streams.TestKit;
using Akka.Streams.TestKit.Tests;
using Akka.TestKit;
using Akka.Util.Internal;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Streams.Tests.Dsl
{
public class LazySinkSpec : AkkaSpec
{
public LazySinkSpec(ITestOutputHelper helper) : base(helper)
{
var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(1,1);
Materializer = Sys.Materializer(settings);
}

private ActorMaterializer Materializer { get; }

private static Func<TMat> Fallback<TMat>()
{
return () =>
{
Assert.True(false, "Must not call fallback function");
return default(TMat);
};
}

private static readonly Exception Ex = new TestException("");

[Fact]
public void A_LazySink_must_work_in_the_happy_case()
{
this.AssertAllStagesStopped(() =>
{
var lazySink = Sink.LazySink((int _) => Task.FromResult(this.SinkProbe<int>()),
Fallback<TestSubscriber.Probe<int>>());
var taskProbe = Source.From(Enumerable.Range(0, 11)).RunWith(lazySink, Materializer);
var probe = taskProbe.AwaitResult(TimeSpan.FromMilliseconds(300));
probe.Request(100);
Enumerable.Range(0, 11).ForEach(i => probe.ExpectNext(i));
}, Materializer);
}

[Fact]
public void A_LazySink_must_work_with_slow_sink_init()
{
this.AssertAllStagesStopped(() =>
{
var p = new TaskCompletionSource<Sink<int, TestSubscriber.Probe<int>>>();
var sourceProbe = this.CreateManualPublisherProbe<int>();
var taskProbe = Source.FromPublisher(sourceProbe)
.RunWith(Sink.LazySink((int _) => p.Task, Fallback<TestSubscriber.Probe<int>>()), Materializer);

var sourceSub = sourceProbe.ExpectSubscription();
sourceSub.ExpectRequest(1);
sourceSub.SendNext(0);
sourceSub.ExpectRequest(1);
sourceProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
taskProbe.Wait(TimeSpan.FromMilliseconds(200)).ShouldBeFalse();

p.SetResult(this.SinkProbe<int>());
var probe = taskProbe.AwaitResult(TimeSpan.FromMilliseconds(300));
probe.Request(100);
probe.ExpectNext(0);
Enumerable.Range(1,10).ForEach(i =>
{
sourceSub.SendNext(i);
probe.ExpectNext(i);
});
sourceSub.SendComplete();
}, Materializer);
}

[Fact]
public void A_LazySink_must_complete_when_there_was_no_elements_in_stream()
{
this.AssertAllStagesStopped(() =>
{
var lazySink = Sink.LazySink((int _) => Task.FromResult(Sink.Aggregate(0, (int i, int i2) => i + i2)),
() => Task.FromResult(0));
var taskProbe = Source.Empty<int>().RunWith(lazySink, Materializer);
var taskResult = taskProbe.AwaitResult(TimeSpan.FromMilliseconds(300));
taskResult.AwaitResult(TimeSpan.FromMilliseconds(300)).ShouldBe(0);
}, Materializer);
}

[Fact]
public void A_LazySink_must_complete_normally_when_upstream_is_completed()
{
this.AssertAllStagesStopped(() =>
{
var lazySink = Sink.LazySink((int _) => Task.FromResult(this.SinkProbe<int>()),
Fallback<TestSubscriber.Probe<int>>());
var taskProbe = Source.Single(1).RunWith(lazySink, Materializer);
var taskResult = taskProbe.AwaitResult(TimeSpan.FromMilliseconds(300));
taskResult.Request(1).ExpectNext(1).ExpectComplete();
}, Materializer);
}

[Fact]
public void A_LazySink_must_fail_gracefully_when_sink_factory_method_failed()
{
this.AssertAllStagesStopped(() =>
{
var sourceProbe = this.CreateManualPublisherProbe<int>();
var taskProbe = Source.FromPublisher(sourceProbe).RunWith(Sink.LazySink((int _) =>
{
throw Ex;
}, Fallback<TestSubscriber.Probe<int>>()), Materializer);

var sourceSub = sourceProbe.ExpectSubscription();
sourceSub.ExpectRequest(1);
sourceSub.SendNext(0);
sourceSub.ExpectCancellation();
taskProbe.Invoking(t => t.Wait()).ShouldThrow<TestException>();
}, Materializer);
}

[Fact]
public void A_LazySink_must_fail_gracefully_when_upstream_failed()
{
this.AssertAllStagesStopped(() =>
{
var sourceProbe = this.CreateManualPublisherProbe<int>();
var lazySink = Sink.LazySink((int _) => Task.FromResult(this.SinkProbe<int>()),
Fallback<TestSubscriber.Probe<int>>());
var taskProbe = Source.FromPublisher(sourceProbe).RunWith(lazySink, Materializer);

var sourceSub = sourceProbe.ExpectSubscription();
sourceSub.ExpectRequest(1);
sourceSub.SendNext(0);
var probe = taskProbe.AwaitResult(TimeSpan.FromMilliseconds(300));
probe.Request(1).ExpectNext(0);
sourceSub.SendError(Ex);
probe.ExpectError().Should().Be(Ex);
}, Materializer);
}

[Fact]
public void A_LazySink_must_fail_gracefully_when_factory_task_failed()
{
this.AssertAllStagesStopped(() =>
{
var failedTask = new TaskFactory<Sink<int, TestSubscriber.Probe<int>>>().StartNew(() =>
{
throw Ex;
});
var sourceProbe = this.CreateManualPublisherProbe<int>();
var lazySink = Sink.LazySink((int _) => failedTask, Fallback<TestSubscriber.Probe<int>>());
var graph =
Source.FromPublisher(sourceProbe)
.ToMaterialized(lazySink, Keep.Right)
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.StoppingDecider));
var taskProbe = RunnableGraph.FromGraph(graph).Run(Materializer);

var sourceSub = sourceProbe.ExpectSubscription();
sourceSub.ExpectRequest(1);
sourceSub.SendNext(0);
taskProbe.Invoking(t => t.Wait(TimeSpan.FromMilliseconds(300))).ShouldThrow<TestException>();
}, Materializer);
}

[Fact]
public void A_LazySink_must_cancel_upstream_when_internal_sink_is_cancelled()
{
this.AssertAllStagesStopped(() =>
{
var sourceProbe = this.CreateManualPublisherProbe<int>();
var lazySink = Sink.LazySink((int _) => Task.FromResult(this.SinkProbe<int>()),
Fallback<TestSubscriber.Probe<int>>());
var taskProbe = Source.FromPublisher(sourceProbe).RunWith(lazySink, Materializer);
var sourceSub = sourceProbe.ExpectSubscription();
sourceSub.ExpectRequest(1);
sourceSub.SendNext(0);
sourceSub.ExpectRequest(1);
var probe = taskProbe.AwaitResult(TimeSpan.FromMilliseconds(300));
probe.Request(1).ExpectNext(0);
probe.Cancel();
sourceSub.ExpectCancellation();
}, Materializer);
}

[Fact]
public void A_LazySink_must_contine_if_supervision_is_resume()
{
this.AssertAllStagesStopped(() =>
{
var sourceProbe = this.CreateManualPublisherProbe<int>();
var lazySink = Sink.LazySink((int a) =>
{
if (a == 0)
throw Ex;
return Task.FromResult(this.SinkProbe<int>());
},
Fallback<TestSubscriber.Probe<int>>());
var graph =
Source.FromPublisher(sourceProbe)
.ToMaterialized(lazySink, Keep.Right)
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider));
var taskProbe = RunnableGraph.FromGraph(graph).Run(Materializer);
var sourceSub = sourceProbe.ExpectSubscription();
sourceSub.ExpectRequest(1);
sourceSub.SendNext(0);
sourceSub.ExpectRequest(1);
sourceSub.SendNext(1);
var probe = taskProbe.AwaitResult(TimeSpan.FromMilliseconds(300));
probe.Request(1);
probe.ExpectNext(1);
probe.Cancel();
}, Materializer);
}

[Fact]
public void A_LazySink_must_fail_task_when_zero_throws_expception()
{
this.AssertAllStagesStopped(() =>
{
var lazySink = Sink.LazySink((int _) => Task.FromResult(Sink.Aggregate<int, int>(0, (i, i1) => i + i1)),
() =>
{
throw Ex;
});
var taskProbe = Source.Empty<int>().RunWith(lazySink, Materializer);
taskProbe.Invoking(t => t.Wait(TimeSpan.FromMilliseconds(300))).ShouldThrow<TestException>();
}, Materializer);
}
}
}
Loading

0 comments on commit 8f35fdc

Please sign in to comment.