Skip to content

Commit

Permalink
Merge pull request akkadotnet#2301 from Silv3rcircl3/streams_update_2…
Browse files Browse the repository at this point in the history
….4.4

Update Streams to 2.4.4.
  • Loading branch information
Aaronontheweb authored Sep 7, 2016
2 parents 62cbc56 + 8a8cb42 commit 3db8e49
Show file tree
Hide file tree
Showing 15 changed files with 336 additions and 91 deletions.
57 changes: 57 additions & 0 deletions src/core/Akka.Streams.Tests/Actor/ActorPublisherSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,24 @@ public void ActorPublisher_should_use_dispatcher_from_props()
actorRef.Tell(ThreadName.Instance);
ExpectMsg<string>().Should().Contain("my-dispatcher1");
}

[Fact]
public void ActorPublisher_should_handle_stash()
{
var probe = this.CreateTestProbe();
var actorRef = Sys.ActorOf(TestPublisherWithStash.Props(probe.Ref));
var p = new ActorPublisherImpl<string>(actorRef);
var s = this.CreateProbe<string>();
p.Subscribe(s);
s.Request(2);
s.Request(3);
actorRef.Tell("unstash");
probe.ExpectMsg(new TotalDemand(5));
probe.ExpectMsg(new TotalDemand(5));
s.Request(4);
probe.ExpectMsg(new TotalDemand(9));
s.Cancel();
}
}

internal class TestPublisher : ActorPublisher<string>
Expand Down Expand Up @@ -466,6 +484,34 @@ protected override bool Receive(object message)
}
}

internal class TestPublisherWithStash : TestPublisher, IWithUnboundedStash
{
public TestPublisherWithStash(IActorRef probe) : base(probe)
{
}

public new static Props Props(IActorRef probe, bool useTestDispatcher = true)
{
var p = Akka.Actor.Props.Create(() => new TestPublisherWithStash(probe));
return useTestDispatcher ? p.WithDispatcher("akka.test.stream-dispatcher") : p;
}

protected override bool Receive(object message)
{
if ("unstash".Equals(message))
{
Stash.UnstashAll();
Context.Become(base.Receive);
}
else
Stash.Stash();

return true;
}

public IStash Stash { get; set; }
}

internal class Sender : ActorPublisher<int>
{
public static Props Props { get; } = Props.Create<Sender>().WithDispatcher("akka.test.stream-dispatcher");
Expand Down Expand Up @@ -571,6 +617,17 @@ public TotalDemand(long elements)
{
Elements = elements;
}

public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj)) return false;
if (ReferenceEquals(this, obj)) return true;
return obj.GetType() == GetType() && Equals((TotalDemand) obj);
}

protected bool Equals(TotalDemand other) => Elements == other.Elements;

public override int GetHashCode() => Elements.GetHashCode();
}

internal class Produce
Expand Down
12 changes: 12 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowGroupedWithinSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
Expand Down Expand Up @@ -240,5 +241,16 @@ public void A_GroupedWithin_must_group_with_rest()
RandomTestRange(Sys)
.ForEach(_ => RunScript(script(), Settings, flow => flow.GroupedWithin(3, TimeSpan.FromMinutes(10))));
}

[Fact]
public void A_GroupedWithin_must_group_with_small_groups_with_backpressure()
{
var t = Source.From(Enumerable.Range(1, 10))
.GroupedWithin(1, TimeSpan.FromDays(1))
.Throttle(1, TimeSpan.FromMilliseconds(110), 0, ThrottleMode.Shaping)
.RunWith(Sink.Seq<IEnumerable<int>>(), Materializer);
t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
t.Result.ShouldAllBeEquivalentTo(Enumerable.Range(1, 10).Select(i => new List<int> {i}));
}
}
}
2 changes: 1 addition & 1 deletion src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void A_Flow_with_SelectAsync_must_finish_after_task_failure()
.Grouped(10)
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);

t.Wait(TimeSpan.FromSeconds(1)).Should().BeTrue();
t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
t.Result.ShouldAllBeEquivalentTo(new[] {1, 2});
}, Materializer);
}
Expand Down
43 changes: 43 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowSumSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

namespace Akka.Streams.Tests.Dsl
{
//JVMN : FlowReduceSpec
public class FlowSumSpec : AkkaSpec
{
private ActorMaterializer Materializer { get; }
Expand Down Expand Up @@ -128,5 +129,47 @@ public void A_Sum_must_complete_task_with_failure_when_reducing_function_throws(
task.Invoking(t => t.Wait(TimeSpan.FromSeconds(3))).ShouldThrow<TestException>().WithMessage("test");
}, Materializer);
}

[Fact]
public void A_Sum_must_fail_on_Empty_stream_using_Source_RunSum()
{
this.AssertAllStagesStopped(() =>
{
var result = Source.Empty<int>().RunSum((i, i1) => i + i1, Materializer);
result.Invoking(t => t.Wait(TimeSpan.FromSeconds(3)))
.ShouldThrow<NoSuchElementException>()
.And.Message.Should()
.Contain("empty stream");
}, Materializer);
}

[Fact]
public void A_Sum_must_fail_on_Empty_stream_using_Flow_Sum()
{
this.AssertAllStagesStopped(() =>
{
var result = Source.Empty<int>()
.Via(SumFlow)
.RunWith(Sink.Aggregate<int, int>(0, (i, i1) => i + i1), Materializer);
result.Invoking(t => t.Wait(TimeSpan.FromSeconds(3)))
.ShouldThrow<NoSuchElementException>()
.And.Message.Should()
.Contain("empty stream");
}, Materializer);
}

[Fact]
public void A_Sum_must_fail_on_Empty_stream_using_Sink_Sum()
{
this.AssertAllStagesStopped(() =>
{
var result = Source.Empty<int>()
.RunWith(SumSink, Materializer);
result.Invoking(t => t.Wait(TimeSpan.FromSeconds(3)))
.ShouldThrow<NoSuchElementException>()
.And.Message.Should()
.Contain("empty stream");
}, Materializer);
}
}
}
22 changes: 22 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowThrottleSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,28 @@ public void Throttle_for_single_cost_elements_must_accept_very_low_rates()
}, Materializer);
}

[Fact]
public void Throttle_for_single_cost_elements_must_()
{
var sharedThrottle = Flow.Create<int>().Throttle(1, TimeSpan.FromDays(1), 1, ThrottleMode.Enforcing);

// If there is accidental shared state then we would not be able to pass through the single element
var t = Source.Single(1)
.Via(sharedThrottle)
.Via(sharedThrottle)
.RunWith(Sink.First<int>(), Materializer);
t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
t.Result.Should().Be(1);

// It works with a new stream, too
t = Source.Single(2)
.Via(sharedThrottle)
.Via(sharedThrottle)
.RunWith(Sink.First<int>(), Materializer);
t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
t.Result.Should().Be(2);
}

[Fact]
public void Throttle_for_single_cost_elements_must_emit_single_element_per_tick()
{
Expand Down
26 changes: 26 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/GraphMatValueSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,32 @@ public void A_Graph_with_materialized_value_must_produce_NotUsed_when_starting_f
})), Keep.Right).To(Sink.Ignore<int>()).Run(materializer).Should().Be(NotUsed.Instance);
done.Should().BeTrue();
}

[Fact]
public void A_Graph_with_Identity_Flow_optimization_even_if_port_are_wired_in_an_arbitrary_higher_nesting_level()
{
var mat2 = Sys.Materializer(ActorMaterializerSettings.Create(Sys).WithAutoFusing(false));

var subFlow = GraphDsl.Create(b =>
{
var zip = b.Add(new Zip<string, string>());
var bc = b.Add(new Broadcast<string>(2));

b.From(bc.Out(0)).To(zip.In0);
b.From(bc.Out(1)).To(zip.In1);

return new FlowShape<string, Tuple<string, string>>(bc.In, zip.Out);
}).Named("NestedFlow");

var nest1 = Flow.Create<string>().Via(subFlow);
var nest2 = Flow.Create<string>().Via(nest1);
var nest3 = Flow.Create<string>().Via(nest2);
var nest4 = Flow.Create<string>().Via(nest3);

//fails
var matValue = Source.Single("").Via(nest4).To(Sink.Ignore<Tuple<string, string>>()).Run(mat2);
matValue.Should().Be(NotUsed.Instance);
}
}
}

30 changes: 30 additions & 0 deletions src/core/Akka.Streams.Tests/IO/TcpSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,36 @@ public void Outgoing_TCP_stream_must_handle_when_connection_actor_terminates_une
system2.Terminate().Wait();
}

[Fact(Skip = "Fix me")]
public void Outgoing_TCP_stream_must_not_thrown_on_unbind_after_system_has_been_shut_down()
{
var sys2 = ActorSystem.Create("shutdown-test-system");
var mat2 = sys2.Materializer();

try
{
var address = TestUtils.TemporaryServerAddress();
var bindingTask = sys2.TcpStream()
.BindAndHandle(Flow.Create<ByteString>(), mat2, address.Address.ToString(), address.Port);

// Ensure server is running
var t = Source.Single(ByteString.FromString(""))
.Via(sys2.TcpStream().OutgoingConnection(address))
.RunWith(Sink.Ignore<ByteString>(), mat2);
t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();

sys2.Terminate().Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();

bindingTask.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
var binding = bindingTask.Result;
binding.Unbind().Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
}
finally
{
sys2.Terminate().Wait(TimeSpan.FromSeconds(5));
}
}

private void ValidateServerClientCommunication(ByteString testData, ServerConnection serverConnection, TcpReadProbe readProbe, TcpWriteProbe writeProbe)
{
serverConnection.Write(testData);
Expand Down
41 changes: 33 additions & 8 deletions src/core/Akka.Streams/Actors/ActorPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,21 @@ public interface IActorPublisherMessage: IDeadLetterSuppression { }
public sealed class Request : IActorPublisherMessage
{
public readonly long Count;

public Request(long count)
{
Count = count;
}

/// <summary>
/// INTERNAL API: needed for stash support
/// </summary>
internal void MarkProcessed() => IsProcessed = true;

/// <summary>
/// INTERNAL API: needed for stash support
/// </summary>
internal bool IsProcessed { get; private set; }
}

/// <summary>
Expand Down Expand Up @@ -386,16 +397,26 @@ protected override bool AroundReceive(Receive receive, object message)
if (message is Request)
{
var req = (Request) message;
if (req.Count < 1)
if (req.IsProcessed)
{
if (_lifecycleState == LifecycleState.Active)
OnError(new ArgumentException("Number of requested elements must be positive. Rule 3.9"));
// it's an unstashed Request, demand is already handled
base.AroundReceive(receive, req);
}
else
{
_demand += req.Count;
if (_demand < 0) _demand = long.MaxValue; // long overflow: effectively unbounded
base.AroundReceive(receive, message);
if (req.Count < 1)
{
if (_lifecycleState == LifecycleState.Active)
OnError(new ArgumentException("Number of requested elements must be positive. Rule 3.9"));
}
else
{
_demand += req.Count;
if (_demand < 0)
_demand = long.MaxValue; // long overflow: effectively unbounded
req.MarkProcessed();
base.AroundReceive(receive, message);
}
}
}
else if (message is Subscribe<T>)
Expand Down Expand Up @@ -435,8 +456,12 @@ protected override bool AroundReceive(Receive receive, object message)
}
else if (message is Cancel)
{
CancelSelf();
base.AroundReceive(receive, message);
if (_lifecycleState != LifecycleState.Canceled)
{
// possible to receive again in case of stash
CancelSelf();
base.AroundReceive(receive, message);
}
}
else if (message is SubscriptionTimeoutExceeded)
{
Expand Down
4 changes: 4 additions & 0 deletions src/core/Akka.Streams/Dsl/FlowOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,10 @@ public static Flow<TIn, TOut2, TMat> Aggregate<TIn, TOut1, TOut2, TMat>(this Flo
/// Similar to <see cref="Aggregate{TIn,TOut1,TOut2,TMat}"/> but uses first element as zero element.
/// Applies the given function <paramref name="reduce"/> towards its current and next value,
/// yielding the next current value.
///
/// If the stream is empty (i.e. completes before signalling any elements),
/// the sum stage will fail its downstream with a <see cref="NoSuchElementException"/>,
/// which is semantically in-line with that standard library collections do in such situations.
/// <para>
/// Emits when upstream completes
/// </para>
Expand Down
6 changes: 5 additions & 1 deletion src/core/Akka.Streams/Dsl/Sink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ public static Sink<TIn, Task<TOut>> Aggregate<TIn, TOut>(TOut zero, Func<TOut, T
/// output (from the second element) and the element as input.
/// The returned <see cref="Task{TIn}"/> will be completed with value of the final
/// function evaluation when the input stream ends, or completed with `Failure`
/// if there is a failure signaled in the stream.
/// if there is a failure signaled in the stream.
///
/// If the stream is empty (i.e. completes before signalling any elements),
/// the sum stage will fail its downstream with a <see cref="NoSuchElementException"/>,
/// which is semantically in-line with that standard library collections do in such situations.
/// </summary>
public static Sink<TIn, Task<TIn>> Sum<TIn>(Func<TIn, TIn, TIn> reduce) => Flow.Create<TIn>()
.Sum(reduce)
Expand Down
4 changes: 4 additions & 0 deletions src/core/Akka.Streams/Dsl/SourceOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,10 @@ public static Source<TOut2, TMat> Aggregate<TOut1, TOut2, TMat>(this Source<TOut
/// Similar to <see cref="Aggregate{TIn,TOut,TMat}"/> but uses first element as zero element.
/// Applies the given function <paramref name="reduce"/> towards its current and next value,
/// yielding the next current value.
///
/// If the stream is empty (i.e. completes before signalling any elements),
/// the sum stage will fail its downstream with a <see cref="NoSuchElementException"/>,
/// which is semantically in-line with that standard library collections do in such situations.
/// <para>
/// Emits when upstream completes
/// </para>
Expand Down
4 changes: 4 additions & 0 deletions src/core/Akka.Streams/Dsl/SubFlowOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,10 @@ public static SubFlow<TOut2, TMat, TClosed> Aggregate<TOut1, TOut2, TMat, TClose
/// Similar to <see cref="Aggregate{TOut1,TOut2,TMat,TClosed}"/> but uses first element as zero element.
/// Applies the given function <paramref name="reduce"/> towards its current and next value,
/// yielding the next current value.
///
/// If the stream is empty (i.e. completes before signalling any elements),
/// the sum stage will fail its downstream with a <see cref="NoSuchElementException"/>,
/// which is semantically in-line with that standard library collections do in such situations.
/// <para>
/// Emits when upstream completes
/// </para>
Expand Down
Loading

0 comments on commit 3db8e49

Please sign in to comment.