Skip to content

Commit

Permalink
Streams update 2.4.6 & 2.4.7 (#2319)
Browse files Browse the repository at this point in the history
* only accept -1 for infinite retries in recoverWithRetries

* groupBy should be a Graph stage

* Converts DelimiterFramingStage from PushPullStage to GraphStage

* fix GroupBy

* Fixing wrong initial buffer sizes in delay, and one logic bug

* Return failed IOResult for non-existint file

* fix memory leaks in tests

* Fix onSubscribe-request-onNext reentrancy in VirtualProcessor

* Remove obsolete RestartTestStage

* Increase LayoutSpec stresstest patience

* Remove obsolete OneToManyTestStage

* Fix racy RecoverWithSpec

* check that the server has started before trying to connect

* Fix compose method of EmptyModule to be able to Keep.left or right

* Adding docs for KillSwitch

* replace all Thread.Interrupt calls

* fix OutputStreamSourceStage

* fix Throttle spec
  • Loading branch information
Marc Piechura authored and alexvaluyskiy committed Oct 5, 2016
1 parent 2c50507 commit aa913c2
Show file tree
Hide file tree
Showing 37 changed files with 1,059 additions and 1,228 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1220,8 +1220,6 @@ namespace Akka.Streams.Dsl
where TShape2 : Akka.Streams.Shape
where TShape3 : Akka.Streams.Shape
where TShape4 : Akka.Streams.Shape { }
public static Akka.Streams.IGraph<TShape, TMat> CreateMaterialized<TShape, TMat>(System.Func<Akka.Streams.Dsl.GraphDsl.Builder<TMat>, TShape> buildBlock)
where TShape : Akka.Streams.Shape { }
public sealed class Builder<T>
{
public Akka.Streams.Outlet<T> MaterializedValue { get; }
Expand Down Expand Up @@ -1638,6 +1636,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.SubFlow<TOut2, TMat, TClosed> Prepend<TOut1, TOut2, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut1, TMat, TClosed> flow, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut2>, TMat> that)
where TOut1 : TOut2 { }
public static Akka.Streams.Dsl.SubFlow<Akka.Streams.Util.Option<TOut>, TMat, TClosed> Recover<TOut, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> flow, System.Func<System.Exception, Akka.Streams.Util.Option<TOut>> partialFunc) { }
[System.ObsoleteAttribute("Use RecoverWithRetries instead.")]
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> RecoverWith<TOut, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> flow, System.Func<System.Exception, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut>, TMat>> partialFunc) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> RecoverWithRetries<TOut, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> flow, System.Func<System.Exception, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut>, TMat>> partialFunc, int attempts) { }
public static Akka.Streams.Dsl.SubFlow<TOut2, TMat, TClosed> Scan<TOut1, TOut2, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut1, TMat, TClosed> flow, TOut2 zero, System.Func<TOut2, TOut1, TOut2> scan) { }
Expand Down
34 changes: 34 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowDelaySpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Akka.Actor;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.Streams.TestKit.Tests;
using Akka.Streams.Tests.Actor;
using Akka.TestKit;
using Akka.Util.Internal;
using FluentAssertions;
Expand Down Expand Up @@ -155,6 +157,7 @@ public void A_Delay_must_pass_elements_with_delay_through_normally_in_backpressu
{
Source.From(Enumerable.Range(1, 3))
.Delay(TimeSpan.FromMilliseconds(300), DelayOverflowStrategy.Backpressure)
.WithAttributes(Attributes.CreateInputBuffer(1,1))
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(5)
.ExpectNoMsg(TimeSpan.FromMilliseconds(200))
Expand Down Expand Up @@ -208,5 +211,36 @@ public void A_Delay_must_emit_early_when_buffer_is_full_and_in_EmitEarly_mode()
pSub.SendError(new Exception());
}, Materializer);
}

[Fact]
public void A_Delay_must_properly_delay_according_to_buffer_size()
{
// With a buffer size of 1, delays add up
var task = Source.From(Enumerable.Range(1, 5))
.Delay(TimeSpan.FromMilliseconds(500), DelayOverflowStrategy.Backpressure)
.WithAttributes(Attributes.CreateInputBuffer(1, 1))
.RunWith(Sink.Ignore<int>(), Materializer);

task.Wait(TimeSpan.FromSeconds(2)).ShouldBeFalse();
task.Wait(TimeSpan.FromSeconds(1)).ShouldBeTrue();

// With a buffer large enough to hold all arriving elements, delays don't add up
task = Source.From(Enumerable.Range(1, 100))
.Delay(TimeSpan.FromSeconds(1), DelayOverflowStrategy.Backpressure)
.WithAttributes(Attributes.CreateInputBuffer(100, 100))
.RunWith(Sink.Ignore<int>(), Materializer);

task.Wait(TimeSpan.FromSeconds(2)).ShouldBeTrue();

// Delays that are already present are preserved when buffer is large enough
task = Source.Tick(TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(100), NotUsed.Instance)
.Take(10)
.Delay(TimeSpan.FromSeconds(1), DelayOverflowStrategy.Backpressure)
.WithAttributes(Attributes.CreateInputBuffer(10, 10))
.RunWith(Sink.Ignore<NotUsed>(), Materializer);

task.Wait(TimeSpan.FromMilliseconds(900)).ShouldBeFalse();
task.Wait(TimeSpan.FromSeconds(1)).ShouldBeTrue();
}
}
}
Loading

0 comments on commit aa913c2

Please sign in to comment.