diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt index ceceed21077..38b670b0b01 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt @@ -1935,11 +1935,11 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Sink> FanoutPublisher() { } public static Akka.Streams.Dsl.Sink> First() { } public static Akka.Streams.Dsl.Sink> FirstOrDefault() { } - public static Akka.Streams.Dsl.Sink ForEach(System.Action action) { } - public static Akka.Streams.Dsl.Sink ForEachParallel(int parallelism, System.Action action) { } + public static Akka.Streams.Dsl.Sink> ForEach(System.Action action) { } + public static Akka.Streams.Dsl.Sink> ForEachParallel(int parallelism, System.Action action) { } public static Akka.Streams.Dsl.Sink FromGraph(Akka.Streams.IGraph, TMat> graph) { } public static Akka.Streams.Dsl.Sink FromSubscriber(Reactive.Streams.ISubscriber subscriber) { } - public static Akka.Streams.Dsl.Sink Ignore() { } + public static Akka.Streams.Dsl.Sink> Ignore() { } public static Akka.Streams.Dsl.Sink> Last() { } public static Akka.Streams.Dsl.Sink> LastOrDefault() { } public static Akka.Streams.Dsl.Sink>> LazyInitAsync(System.Func>> sinkFactory) { } @@ -4209,13 +4209,13 @@ namespace Akka.Streams.Implementation.Fusing protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } } [Akka.Annotations.InternalApiAttribute()] - public sealed class IgnoreSink : Akka.Streams.Stage.GraphStageWithMaterializedValue, System.Threading.Tasks.Task> + public sealed class IgnoreSink : Akka.Streams.Stage.GraphStageWithMaterializedValue, System.Threading.Tasks.Task> { public IgnoreSink() { } protected override Akka.Streams.Attributes InitialAttributes { get; } public Akka.Streams.Inlet Inlet { get; } public override Akka.Streams.SinkShape Shape { get; } - public override Akka.Streams.Stage.ILogicAndMaterializedValue CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { } + public override Akka.Streams.Stage.ILogicAndMaterializedValue> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { } public override string ToString() { } } [Akka.Annotations.InternalApiAttribute()] diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt index a7535dbedfc..d8d48193023 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt @@ -1935,11 +1935,11 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Sink> FanoutPublisher() { } public static Akka.Streams.Dsl.Sink> First() { } public static Akka.Streams.Dsl.Sink> FirstOrDefault() { } - public static Akka.Streams.Dsl.Sink ForEach(System.Action action) { } - public static Akka.Streams.Dsl.Sink ForEachParallel(int parallelism, System.Action action) { } + public static Akka.Streams.Dsl.Sink> ForEach(System.Action action) { } + public static Akka.Streams.Dsl.Sink> ForEachParallel(int parallelism, System.Action action) { } public static Akka.Streams.Dsl.Sink FromGraph(Akka.Streams.IGraph, TMat> graph) { } public static Akka.Streams.Dsl.Sink FromSubscriber(Reactive.Streams.ISubscriber subscriber) { } - public static Akka.Streams.Dsl.Sink Ignore() { } + public static Akka.Streams.Dsl.Sink> Ignore() { } public static Akka.Streams.Dsl.Sink> Last() { } public static Akka.Streams.Dsl.Sink> LastOrDefault() { } public static Akka.Streams.Dsl.Sink>> LazyInitAsync(System.Func>> sinkFactory) { } @@ -4221,13 +4221,13 @@ namespace Akka.Streams.Implementation.Fusing protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } } [Akka.Annotations.InternalApiAttribute()] - public sealed class IgnoreSink : Akka.Streams.Stage.GraphStageWithMaterializedValue, System.Threading.Tasks.Task> + public sealed class IgnoreSink : Akka.Streams.Stage.GraphStageWithMaterializedValue, System.Threading.Tasks.Task> { public IgnoreSink() { } protected override Akka.Streams.Attributes InitialAttributes { get; } public Akka.Streams.Inlet Inlet { get; } public override Akka.Streams.SinkShape Shape { get; } - public override Akka.Streams.Stage.ILogicAndMaterializedValue CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { } + public override Akka.Streams.Stage.ILogicAndMaterializedValue> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { } public override string ToString() { } } [Akka.Annotations.InternalApiAttribute()] diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt index ceceed21077..38b670b0b01 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt @@ -1935,11 +1935,11 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Sink> FanoutPublisher() { } public static Akka.Streams.Dsl.Sink> First() { } public static Akka.Streams.Dsl.Sink> FirstOrDefault() { } - public static Akka.Streams.Dsl.Sink ForEach(System.Action action) { } - public static Akka.Streams.Dsl.Sink ForEachParallel(int parallelism, System.Action action) { } + public static Akka.Streams.Dsl.Sink> ForEach(System.Action action) { } + public static Akka.Streams.Dsl.Sink> ForEachParallel(int parallelism, System.Action action) { } public static Akka.Streams.Dsl.Sink FromGraph(Akka.Streams.IGraph, TMat> graph) { } public static Akka.Streams.Dsl.Sink FromSubscriber(Reactive.Streams.ISubscriber subscriber) { } - public static Akka.Streams.Dsl.Sink Ignore() { } + public static Akka.Streams.Dsl.Sink> Ignore() { } public static Akka.Streams.Dsl.Sink> Last() { } public static Akka.Streams.Dsl.Sink> LastOrDefault() { } public static Akka.Streams.Dsl.Sink>> LazyInitAsync(System.Func>> sinkFactory) { } @@ -4209,13 +4209,13 @@ namespace Akka.Streams.Implementation.Fusing protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } } [Akka.Annotations.InternalApiAttribute()] - public sealed class IgnoreSink : Akka.Streams.Stage.GraphStageWithMaterializedValue, System.Threading.Tasks.Task> + public sealed class IgnoreSink : Akka.Streams.Stage.GraphStageWithMaterializedValue, System.Threading.Tasks.Task> { public IgnoreSink() { } protected override Akka.Streams.Attributes InitialAttributes { get; } public Akka.Streams.Inlet Inlet { get; } public override Akka.Streams.SinkShape Shape { get; } - public override Akka.Streams.Stage.ILogicAndMaterializedValue CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { } + public override Akka.Streams.Stage.ILogicAndMaterializedValue> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { } public override string ToString() { } } [Akka.Annotations.InternalApiAttribute()] diff --git a/src/core/Akka.Streams.Tests/Dsl/GraphMatValueSpec.cs b/src/core/Akka.Streams.Tests/Dsl/GraphMatValueSpec.cs index fe0fe2b307e..626bc42c6da 100644 --- a/src/core/Akka.Streams.Tests/Dsl/GraphMatValueSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/GraphMatValueSpec.cs @@ -208,7 +208,7 @@ public void A_Graph_with_materialized_value_must_perform_side_effecting_transfor }); var r = RunnableGraph.FromGraph(GraphDsl.Create(Sink.Ignore(), (b, sink) => { - var source = Source.From(Enumerable.Range(1, 10)).MapMaterializedValue(_ => Task.FromResult(0)); + var source = Source.From(Enumerable.Range(1, 10)).MapMaterializedValue(_ => Task.FromResult(Done.Instance)); b.Add(g); b.From(source).To(sink); return ClosedShape.Instance; diff --git a/src/core/Akka.Streams.Tests/Dsl/GraphZipSpec.cs b/src/core/Akka.Streams.Tests/Dsl/GraphZipSpec.cs index 59aaeb56d49..6ef5ab69ca7 100644 --- a/src/core/Akka.Streams.Tests/Dsl/GraphZipSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/GraphZipSpec.cs @@ -86,8 +86,8 @@ public void Zip_must_complete_if_one_side_is_available_but_other_already_complet var completed = RunnableGraph.FromGraph(GraphDsl.Create(Sink.Ignore<(int, string)>(), (b, sink) => { var zip = b.Add(new Zip()); - var source1 = Source.FromPublisher(upstream1).MapMaterializedValue(_ => null); - var source2 = Source.FromPublisher(upstream2).MapMaterializedValue(_ => null); + var source1 = Source.FromPublisher(upstream1).MapMaterializedValue(_ => Task.FromResult(Done.Instance)); + var source2 = Source.FromPublisher(upstream2).MapMaterializedValue(_ => Task.FromResult(Done.Instance)); b.From(source1).To(zip.In0); b.From(source2).To(zip.In1); diff --git a/src/core/Akka.Streams.Tests/IO/TcpSpec.cs b/src/core/Akka.Streams.Tests/IO/TcpSpec.cs index 90f9adea6c7..95cfe3eb036 100644 --- a/src/core/Akka.Streams.Tests/IO/TcpSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/TcpSpec.cs @@ -546,7 +546,7 @@ private async Task ValidateServerClientCommunicationAsync(ByteString testData, S (await serverConnection.WaitReadAsync()).Should().BeEquivalentTo(testData); } - private Sink EchoHandler() => + private Sink> EchoHandler() => Sink.ForEach(c => c.Flow.Join(Flow.Create()).Run(Materializer)); [Fact] diff --git a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs index e0224cb6e41..5e0ebe6addc 100644 --- a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs @@ -1325,17 +1325,17 @@ public static SubFlow GroupBy( this IFlow flow, int maxSubstreams, Func groupingFunc, - Func, TMat>, Sink, Task>, TClosed> toFunc, + Func, TMat>, Sink, Task>, TClosed> toFunc, bool allowClosedSubstreamRecreation) { var merge = new GroupByMergeBack(flow, maxSubstreams, groupingFunc, allowClosedSubstreamRecreation); - Func, TClosed> finish = s => + TClosed finish(Sink s) { return toFunc( flow.Via(new Fusing.GroupBy(maxSubstreams, groupingFunc, allowClosedSubstreamRecreation)), Sink.ForEach>(e => e.RunWith(s, Fusing.GraphInterpreter.Current.Materializer))); - }; + } return new SubFlowImpl(Flow.Create(), merge, finish); } @@ -1349,7 +1349,7 @@ public static SubFlow GroupBy( /// infinite number of keys this can cause memory issues. Elements belonging to those keys are drained directly /// and not send to the substream. /// - /// See also + /// See also /// /// /// @@ -1364,7 +1364,7 @@ public static SubFlow GroupBy( this IFlow flow, int maxSubstreams, Func groupingFunc, - Func, TMat>, Sink, Task>, TClosed> toFunc) => GroupBy(flow, maxSubstreams, groupingFunc, toFunc, allowClosedSubstreamRecreation: false); + Func, TMat>, Sink, Task>, TClosed> toFunc) => GroupBy(flow, maxSubstreams, groupingFunc, toFunc, allowClosedSubstreamRecreation: false); /// /// TBD @@ -1472,15 +1472,15 @@ public IFlow Apply(Flow flow, int breadth) /// TBD public static SubFlow SplitWhen(this IFlow flow, SubstreamCancelStrategy substreamCancelStrategy, Func predicate, - Func, TMat>, Sink, Task>, TClosed> toFunc) + Func, TMat>, Sink, Task>, TClosed> toFunc) { var merge = new SplitWhenMergeBack(flow, predicate, substreamCancelStrategy); - Func, TClosed> finish = s => + TClosed finish(Sink s) { return toFunc(flow.Via(Fusing.Split.When(predicate, substreamCancelStrategy)), Sink.ForEach>(e => e.RunWith(s, Fusing.GraphInterpreter.Current.Materializer))); - }; + } return new SubFlowImpl(Flow.Create(), merge, finish); } @@ -1577,15 +1577,15 @@ public IFlow Apply(Flow flow, int breadth) /// TBD public static SubFlow SplitAfter(this IFlow flow, SubstreamCancelStrategy substreamCancelStrategy, Func predicate, - Func, TMat>, Sink, Task>, TClosed> toFunc) + Func, TMat>, Sink, Task>, TClosed> toFunc) { var merge = new SplitAfterMergeBack(flow, predicate, substreamCancelStrategy); - Func, TClosed> finish = s => + TClosed finish(Sink s) { return toFunc(flow.Via(Fusing.Split.After(predicate, substreamCancelStrategy)), Sink.ForEach>(e => e.RunWith(s, Fusing.GraphInterpreter.Current.Materializer))); - }; + } return new SubFlowImpl(Flow.Create(), merge, finish); } diff --git a/src/core/Akka.Streams/Dsl/Sink.cs b/src/core/Akka.Streams/Dsl/Sink.cs index b9b2ceb2caa..683d9e1d0b2 100644 --- a/src/core/Akka.Streams/Dsl/Sink.cs +++ b/src/core/Akka.Streams/Dsl/Sink.cs @@ -295,7 +295,7 @@ public static Sink> DistinctRetainingFanOutPublisher(A /// /// TBD /// TBD - public static Sink Ignore() => FromGraph(new IgnoreSink()); + public static Sink> Ignore() => FromGraph(new IgnoreSink()); /// /// A that will invoke the given for each received element. @@ -306,7 +306,7 @@ public static Sink> DistinctRetainingFanOutPublisher(A /// TBD /// TBD /// TBD - public static Sink ForEach(Action action) => Flow.Create() + public static Sink> ForEach(Action action) => Flow.Create() .Select(input => { action(input); @@ -357,7 +357,7 @@ public static Sink Combine(FuncTBD /// TBD /// TBD - public static Sink ForEachParallel(int parallelism, Action action) => Flow.Create() + public static Sink> ForEachParallel(int parallelism, Action action) => Flow.Create() .SelectAsyncUnordered(parallelism, input => Task.Run(() => { action(input); diff --git a/src/core/Akka.Streams/Implementation/Fusing/GraphStages.cs b/src/core/Akka.Streams/Implementation/Fusing/GraphStages.cs index 58fa9b11627..283d2a93949 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/GraphStages.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/GraphStages.cs @@ -1017,16 +1017,16 @@ public TaskSource(Task task) /// Discards all received elements. /// [InternalApi] - public sealed class IgnoreSink : GraphStageWithMaterializedValue, Task> + public sealed class IgnoreSink : GraphStageWithMaterializedValue, Task> { #region Internal classes private sealed class Logic : InGraphStageLogic { private readonly IgnoreSink _stage; - private readonly TaskCompletionSource _completion; + private readonly TaskCompletionSource _completion; - public Logic(IgnoreSink stage, TaskCompletionSource completion) : base(stage.Shape) + public Logic(IgnoreSink stage, TaskCompletionSource completion) : base(stage.Shape) { _stage = stage; _completion = completion; @@ -1041,7 +1041,7 @@ public Logic(IgnoreSink stage, TaskCompletionSource completion) : base(s public override void OnUpstreamFinish() { base.OnUpstreamFinish(); - _completion.TrySetResult(0); + _completion.TrySetResult(Done.Instance); } public override void OnUpstreamFailure(Exception e) @@ -1053,10 +1053,7 @@ public override void OnUpstreamFailure(Exception e) #endregion - public IgnoreSink() - { - Shape = new SinkShape(Inlet); - } + public IgnoreSink() => Shape = new SinkShape(Inlet); protected override Attributes InitialAttributes { get; } = DefaultAttributes.IgnoreSink; @@ -1064,11 +1061,11 @@ public IgnoreSink() public override SinkShape Shape { get; } - public override ILogicAndMaterializedValue CreateLogicAndMaterializedValue(Attributes inheritedAttributes) + public override ILogicAndMaterializedValue> CreateLogicAndMaterializedValue(Attributes inheritedAttributes) { - var completion = new TaskCompletionSource(); + var completion = new TaskCompletionSource(); var logic = new Logic(this, completion); - return new LogicAndMaterializedValue(logic, completion.Task); + return new LogicAndMaterializedValue>(logic, completion.Task); } public override string ToString() => "IgnoreSink";