Skip to content

Commit

Permalink
Update Streams 2.4.18 (#3241)
Browse files Browse the repository at this point in the history
* Backport: Fail all stages on abrupt termination

* fix termination

* fixed corruption of ordering in MergePreferred caused by using emit (for validation)

* Make viaMat on identity Flow preserve upstreams/downstreams

* - Add additional SetHandler overload for InAndOutLogic
- Convert some stages GraphStage and use SimpleLinearGraphStage if possible
- Backport: Fail framing for negative frame sizes

* Truncated contents in existing files for FileIO.toPath/toFile

* fix ClassCastException in SubSink

* GroupedWithin cold emit fix

* rewrite Source.empty to GraphStage

* restore unfoldresource source

* fix GraphStage

* api approval
  • Loading branch information
marcpiechura authored and Aaronontheweb committed Jan 3, 2018
1 parent 8d0380c commit 8a05676
Show file tree
Hide file tree
Showing 35 changed files with 1,032 additions and 515 deletions.
13 changes: 13 additions & 0 deletions docs/articles/streams/custom-stream-processing.md
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,19 @@ In essence, the above guarantees are similar to what `Actor`'s provide, if one t
> [!WARNING]
> It is **not** safe to access the state of any custom stage outside of the callbacks that it provides, just like it is unsafe to access the state of an actor from the outside. This means that Future callbacks should not close over internal state of custom stages because such access can be concurrent with the provided callbacks, leading to undefined behavior.

## Resources and the stage lifecycle

If a stage manages a resource with a lifecycle, for example objects that need to be shutdown when they are not
used anymore it is important to make sure this will happen in all circumstances when the stage shuts down.

Cleaning up resources should be done in `GraphStageLogic.PostStop` and not in the `InHandler` and `OutHandler`
callbacks. The reason for this is that when the stage itself completes or is failed there is no signal from the upstreams
for the downstreams. Even for stages that do not complete or fail in this manner, this can happen when the
`Materializer` is shutdown or the `ActorSystem` is terminated while a stream is still running, what is called an
"abrupt termination".


## Extending Flow Combinators with Custom Operators

The most general way of extending any `Source`, `Flow` or `SubFlow` (e.g. from `GroupBy`) is demonstrated above: create a graph of flow-shape like the `Filter` example given above and use the `.Via(...)` combinator to integrate it into your stream topology. This works with all `IFlow` sub-types, including the ports that you connect with the graph DSL.
Expand Down
6 changes: 6 additions & 0 deletions src/Akka.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=MergeSequentialChecks/@EntryIndexedValue">HINT</s:String>
<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=PatternAlwaysOfType/@EntryIndexedValue">DO_NOT_SHOW</s:String>
<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=UseObjectOrCollectionInitializer/@EntryIndexedValue">HINT</s:String>
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_ACCESSORHOLDER_ATTRIBUTE_ON_SAME_LINE_EX/@EntryValue">NEVER</s:String>
<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/SPACE_AFTER_TYPECAST_PARENTHESES/@EntryValue">False</s:Boolean>
<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/SPACE_AROUND_MULTIPLICATIVE_OP/@EntryValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/SPACE_WITHIN_SINGLE_LINE_ARRAY_INITIALIZER_BRACES/@EntryValue">True</s:Boolean>
Expand All @@ -14,7 +15,12 @@
// &lt;/copyright&gt;&#xD;
//-----------------------------------------------------------------------</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=FSM/@EntryIndexedValue">FSM</s:String>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpAttributeForSingleLineMethodUpgrade/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpKeepExistingMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpPlaceEmbeddedOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpRenamePlacementToArrangementMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EAddAccessorOwnerDeclarationBracesMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002ECSharpPlaceAttributeOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateThisQualifierSettings/@EntryIndexedValue">True</s:Boolean>
</wpf:ResourceDictionary>
23 changes: 19 additions & 4 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
[assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETFramework,Version=v4.5", FrameworkDisplayName=".NET Framework 4.5")]
namespace Akka.Streams
{
public sealed class AbruptStageTerminationException : System.Exception
{
public AbruptStageTerminationException(Akka.Streams.Stage.GraphStageLogic logic) { }
}
public class AbruptTerminationException : System.Exception
{
public readonly Akka.Actor.IActorRef Actor;
Expand Down Expand Up @@ -2359,6 +2363,15 @@ namespace Akka.Streams.Implementation
public void Subscribe(Reactive.Streams.ISubscriber<T> subscriber) { }
public override string ToString() { }
}
public sealed class EmptySource<TOut> : Akka.Streams.Stage.GraphStage<Akka.Streams.SourceShape<TOut>>
{
public EmptySource() { }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public Akka.Streams.Outlet<TOut> Out { get; }
public override Akka.Streams.SourceShape<TOut> Shape { get; }
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
public abstract class EnumerableActorName
{
protected EnumerableActorName() { }
Expand Down Expand Up @@ -3633,12 +3646,13 @@ namespace Akka.Streams.Implementation.Fusing
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class OnCompleted<TIn, TOut> : Akka.Streams.Stage.PushStage<TIn, TOut>
public sealed class OnCompleted<T> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<T, Akka.NotUsed>>
{
public OnCompleted(System.Action success, System.Action<System.Exception> failure) { }
public override Akka.Streams.Stage.ISyncDirective OnPush(TIn element, Akka.Streams.Stage.IContext<TOut> context) { }
public override Akka.Streams.Stage.ITerminationDirective OnUpstreamFailure(System.Exception cause, Akka.Streams.Stage.IContext<TOut> context) { }
public override Akka.Streams.Stage.ITerminationDirective OnUpstreamFinish(Akka.Streams.Stage.IContext<TOut> context) { }
public Akka.Streams.Inlet<T> In { get; }
public Akka.Streams.Outlet<Akka.NotUsed> Out { get; }
public override Akka.Streams.FlowShape<T, Akka.NotUsed> Shape { get; }
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class Recover<T> : Akka.Streams.Implementation.Fusing.SimpleLinearGraphStage<T>
Expand Down Expand Up @@ -4137,6 +4151,7 @@ namespace Akka.Streams.Stage
protected internal void SetHandler(Akka.Streams.Inlet inlet, System.Action onPush, System.Action onUpstreamFinish = null, System.Action<System.Exception> onUpstreamFailure = null) { }
protected internal void SetHandler(Akka.Streams.Outlet outlet, Akka.Streams.Stage.IOutHandler handler) { }
protected internal void SetHandler(Akka.Streams.Outlet outlet, System.Action onPull, System.Action onDownstreamFinish = null) { }
protected internal void SetHandler(Akka.Streams.Inlet inlet, Akka.Streams.Outlet outlet, Akka.Streams.Stage.InAndOutGraphStageLogic handler) { }
protected void SetKeepGoing(bool enabled) { }
protected internal void TryPull(Akka.Streams.Inlet inlet) { }
protected internal void TryPull<T>(Akka.Streams.Inlet<T> inlet) { }
Expand Down
15 changes: 15 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/ActorRefBackpressureSinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,5 +229,20 @@ public void ActorBackpressurSink_should_fail_to_materialize_with_zero_sized_inpu
.WithAttributes(Attributes.CreateInputBuffer(0, 0));
Source.Single(1).Invoking(s => s.RunWith(badSink, Materializer)).ShouldThrow<ArgumentException>();
}

[Fact]
public void ActorBackpressurSink_should_signal_failure_on_abrupt_termination()
{
var materializer = ActorMaterializer.Create(Sys);
var probe = CreateTestProbe();
var sink = Sink.ActorRefWithAck<string>(probe.Ref, InitMessage, AckMessage, CompleteMessage)
.WithAttributes(Attributes.CreateInputBuffer(1, 1));

Source.Maybe<string>().To(sink).Run(materializer);

probe.ExpectMsg(InitMessage);
materializer.Shutdown();
probe.ExpectMsg<Status.Failure>();
}
}
}
6 changes: 2 additions & 4 deletions src/core/Akka.Streams.Tests/Dsl/FlowGroupedWithinSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void A_GroupedWithin_must_not_emit_empty_group_when_finished_while_not_be
var c = this.CreateManualSubscriberProbe<IEnumerable<int>>();

Source.FromPublisher(p)
.GroupedWithin(1000, TimeSpan.FromMilliseconds(500))
.GroupedWithin(1000, TimeSpan.FromMilliseconds(50))
.To(Sink.FromSubscriber(c))
.Run(Materializer);

Expand All @@ -180,12 +180,10 @@ public void A_GroupedWithin_must_not_emit_empty_group_when_finished_while_not_be

cSub.Request(1);
pSub.ExpectRequest();
c.ExpectNoMsg(TimeSpan.FromMilliseconds(600));
pSub.SendComplete();
c.ExpectComplete();
c.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
}

[Fact]
public void A_GroupedWithin_must_reset_time_window_when_max_elements_reached()
{
Expand Down
13 changes: 13 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowMonitorSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Akka.Streams.TestKit;
using Akka.Streams.TestKit.Tests;
using Akka.TestKit;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

Expand Down Expand Up @@ -152,5 +153,17 @@ public void A_FlowMonitor_must_return_Received_after_receiving_a_StreamState_mes
throw new Exception();
});
}

[Fact]
public void A_FlowMonitor_must_return_failed_when_stream_is_abruptly_terminated()
{
var materializer = ActorMaterializer.Create(Sys);

var t = this.SourceProbe<string>().Monitor(Keep.Both).To(Sink.Ignore<string>()).Run(materializer);
var monitor = t.Item2;

materializer.Shutdown();
AwaitAssert(() => monitor.State.Should().BeOfType<FlowMonitor.Failed>(), RemainingOrDefault);
}
}
}
24 changes: 21 additions & 3 deletions src/core/Akka.Streams.Tests/Dsl/FlowOnCompleteSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void A_Flow_with_OnComplete_must_yield_the_first_error()
var onCompleteProbe = CreateTestProbe();
var p = this.CreateManualPublisherProbe<int>();
Source.FromPublisher(p)
.To(Sink.OnComplete<int>(() => {}, ex => onCompleteProbe.Ref.Tell(ex)))
.To(Sink.OnComplete<int>(() => { }, ex => onCompleteProbe.Ref.Tell(ex)))
.Run(Materializer);
var proc = p.ExpectSubscription();
proc.ExpectRequest();
Expand All @@ -71,7 +71,7 @@ public void A_Flow_with_OnComplete_must_invoke_callback_for_an_empty_stream()
var onCompleteProbe = CreateTestProbe();
var p = this.CreateManualPublisherProbe<int>();
Source.FromPublisher(p)
.To(Sink.OnComplete<int>(() => onCompleteProbe.Ref.Tell("done"), _ => {}))
.To(Sink.OnComplete<int>(() => onCompleteProbe.Ref.Tell("done"), _ => { }))
.Run(Materializer);
var proc = p.ExpectSubscription();
proc.ExpectRequest();
Expand All @@ -95,7 +95,7 @@ public void A_Flow_with_OnComplete_must_invoke_callback_after_transform_and_fore
return x;
}).RunWith(foreachSink, Materializer);
future.ContinueWith(t => onCompleteProbe.Tell(t.IsCompleted ? "done" : "failure"));

var proc = p.ExpectSubscription();
proc.ExpectRequest();
proc.SendNext(42);
Expand All @@ -105,5 +105,23 @@ public void A_Flow_with_OnComplete_must_invoke_callback_after_transform_and_fore
onCompleteProbe.ExpectMsg("done");
}, Materializer);
}

[Fact]
public void A_Flow_with_OnComplete_must_yield_error_on_abrupt_termination()
{
var materializer = ActorMaterializer.Create(Sys);
var onCompleteProbe = CreateTestProbe();
var publisher = this.CreateManualPublisherProbe<int>();

Source.FromPublisher(publisher).To(Sink.OnComplete<int>(() => onCompleteProbe.Ref.Tell("done"),
ex => onCompleteProbe.Ref.Tell(ex)))
.Run(materializer);
var proc = publisher.ExpectSubscription();
proc.ExpectRequest();
materializer.Shutdown();

onCompleteProbe.ExpectMsg<AbruptTerminationException>();
}
}
}

16 changes: 14 additions & 2 deletions src/core/Akka.Streams.Tests/Dsl/FlowSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -658,12 +658,24 @@ public void A_broken_Flow_must_cancel_upstream_and_call_onError_on_current_and_f
public void A_broken_Flow_must_suitably_override_attribute_handling_methods()
{
var f = Flow.Create<int>().Select(x => x + 1).Async().AddAttributes(Attributes.None).Named("name");
f.Module.Attributes.GetFirstAttribute<Attributes.Name>().Value.Should().Be("name");
f.Module.Attributes.GetFirstAttribute<Attributes.AsyncBoundary>()
f.Module.Attributes.GetAttribute<Attributes.Name>().Value.Should().Be("name");
f.Module.Attributes.GetAttribute<Attributes.AsyncBoundary>()
.Should()
.Be(Attributes.AsyncBoundary.Instance);
}

[Fact]
public void A_broken_Flow_must_work_without_fusing()
{
var settings = ActorMaterializerSettings.Create(Sys).WithAutoFusing(false).WithInputBuffer(1, 1);
var noFusingMaterializer = ActorMaterializer.Create(Sys, settings);

// The map followed by a filter is to ensure that it is a CompositeModule passed in to Flow[Int].via(...)
var sink = Flow.Create<int>().Via(Flow.Create<int>().Select(i => i + 1).Where(_ => true))
.ToMaterialized(Sink.First<int>(), Keep.Right);
Source.Single(4711).RunWith(sink, noFusingMaterializer).AwaitResult().ShouldBe(4712);
}

private static Flow<TIn, TOut, TMat> Identity<TIn, TOut, TMat>(Flow<TIn, TOut, TMat> flow) => flow.Select(e => e);
private static Flow<TIn, TOut, TMat> Identity2<TIn, TOut, TMat>(Flow<TIn, TOut, TMat> flow) => Identity(flow);

Expand Down
14 changes: 14 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowWatchTerminationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,19 @@ public void A_WatchTermination_must_complete_the_future_for_graph()
//sinkProbe.ExpectNextN(new[] {2, 3, 4, 5}).ExpectComplete();
}, Materializer);
}

[Fact]
public void A_WatchTermination_must_fail_task_when_abruptly_terminated()
{
var materializer = ActorMaterializer.Create(Sys);

var t = this.SourceProbe<int>().WatchTermination(Keep.Both).To(Sink.Ignore<int>()).Run(materializer);
var task = t.Item2;

materializer.Shutdown();

Action a = () => task.Wait(TimeSpan.FromSeconds(3));
a.ShouldThrow<AbruptTerminationException>();
}
}
}
Loading

0 comments on commit 8a05676

Please sign in to comment.