diff --git a/docs/articles/streams/builtinstages.md b/docs/articles/streams/builtinstages.md index 17be551d20c..35289f2b736 100644 --- a/docs/articles/streams/builtinstages.md +++ b/docs/articles/streams/builtinstages.md @@ -148,6 +148,12 @@ elements or failing the stream, the strategy is chosen by the user. **completes** when the actorref is sent ``Akka.Actor.Status.Success`` or ``PoisonPill`` +#### PreMaterialize + +Materializes this Source, immediately returning (1) its materialized value, and (2) a new Source that can consume elements 'into' the pre-materialized one. + +Useful for when you need a materialized value of a Source when handing it out to someone to materialize it for you. + #### Combine Combine several sources, using a given strategy such as merge or concat, into one source. @@ -359,7 +365,13 @@ to provide back pressure onto the sink. **cancels** when the actor terminates -**backpressures** when the actor acknowledgement has not arrived +**backpressures** when the actor acknowledgement has not arrived. + +#### PreMaterialize + +Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink that can consume elements 'into' the pre-materialized one. + +Useful for when you need a materialized value of a Sink when handing it out to someone to materialize it for you. #### ActorSubscriber diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt index 465c6622eba..595d16b2a59 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt @@ -1609,6 +1609,7 @@ namespace Akka.Streams.Dsl public Akka.Streams.Dsl.Sink ContraMap(System.Func function) { } public Akka.Streams.Dsl.Sink MapMaterializedValue(System.Func fn) { } public Akka.Streams.Dsl.Sink Named(string name) { } + public System.Tuple> PreMaterialize(Akka.Streams.IMaterializer materializer) { } public TMat2 RunWith(Akka.Streams.IGraph, TMat2> source, Akka.Streams.IMaterializer materializer) { } public override string ToString() { } public Akka.Streams.Dsl.Sink WithAttributes(Akka.Streams.Attributes attributes) { } diff --git a/src/core/Akka.API.Tests/CoreAPISpec.cs b/src/core/Akka.API.Tests/CoreAPISpec.cs index 6bbff100a08..ddd1521169b 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.cs +++ b/src/core/Akka.API.Tests/CoreAPISpec.cs @@ -83,6 +83,7 @@ public void ApproveClusterTools() var publicApi = Filter(PublicApiGenerator.CreatePublicApiForAssembly(asm)); Approvals.Verify(publicApi); } + [Fact] [MethodImpl(MethodImplOptions.NoInlining)] public void ApproveStreams() diff --git a/src/core/Akka.Streams.Tests/Dsl/SinkSpec.cs b/src/core/Akka.Streams.Tests/Dsl/SinkSpec.cs index 13be3cc25bb..df370a3361d 100644 --- a/src/core/Akka.Streams.Tests/Dsl/SinkSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/SinkSpec.cs @@ -5,6 +5,7 @@ // //----------------------------------------------------------------------- +using System; using System.Linq; using Akka.Streams.Dsl; using Akka.Streams.TestKit; @@ -190,6 +191,68 @@ public void A_Sink_must_support_contramap() .Run(Materializer) .Result.ShouldAllBeEquivalentTo(Enumerable.Range(1, 9)); } + + [Fact] + public void Sink_prematerialization_must_materialize_the_sink_and_wrap_its_exposed_publisher_in_a_Source() + { + var publisherSink = Sink.AsPublisher(false); + var tup = publisherSink.PreMaterialize(Sys.Materializer()); + var matPub = tup.Item1; + var sink = tup.Item2; + + var probe = Source.FromPublisher(matPub).RunWith(this.SinkProbe(), Sys.Materializer()); + probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + + Source.Single("hello").RunWith(sink, Sys.Materializer()); + + probe.EnsureSubscription(); + probe.RequestNext("hello"); + probe.ExpectComplete(); + } + + [Fact] + public void Sink_prematerialization_must_materialize_the_sink_and_wrap_its_exposed_fanout_publisher_in_a_Source_twice() + { + var publisherSink = Sink.AsPublisher(true); + var tup = publisherSink.PreMaterialize(Sys.Materializer()); + var matPub = tup.Item1; + var sink = tup.Item2; + + var probe1 = Source.FromPublisher(matPub).RunWith(this.SinkProbe(), Sys.Materializer()); + var probe2 = Source.FromPublisher(matPub).RunWith(this.SinkProbe(), Sys.Materializer()); + + Source.Single("hello").RunWith(sink, Sys.Materializer()); + + probe1.EnsureSubscription(); + probe1.RequestNext("hello"); + probe1.ExpectComplete(); + + probe2.EnsureSubscription(); + probe2.RequestNext("hello"); + probe2.ExpectComplete(); + } + + [Fact] + public void + Sink_prematerialization_must_materialize_the_sink_and_wrap_its_exposed_nonfanout_publisher_and_fail_the_second_materialization() + { + var publisherSink = Sink.AsPublisher(false); + var tup = publisherSink.PreMaterialize(Sys.Materializer()); + var matPub = tup.Item1; + var sink = tup.Item2; + + var probe1 = Source.FromPublisher(matPub).RunWith(this.SinkProbe(), Sys.Materializer()); + var probe2 = Source.FromPublisher(matPub).RunWith(this.SinkProbe(), Sys.Materializer()); + + Source.Single("hello").RunWith(sink, Sys.Materializer()); + + probe1.EnsureSubscription(); + probe1.RequestNext("hello"); + probe1.ExpectComplete(); + + probe2.EnsureSubscription(); + probe2.ExpectError().Message.Should().Contain("only supports one subscriber"); + } } } diff --git a/src/core/Akka.Streams/Dsl/Sink.cs b/src/core/Akka.Streams/Dsl/Sink.cs index 6efae2b32bb..14eb39b0bd9 100644 --- a/src/core/Akka.Streams/Dsl/Sink.cs +++ b/src/core/Akka.Streams/Dsl/Sink.cs @@ -81,6 +81,20 @@ public TMat2 RunWith(IGraph, TMat2> source, IMaterialize public Sink MapMaterializedValue(Func fn) => new Sink(Module.TransformMaterializedValue(fn)); + /// + /// Materializes this Sink immediately. + /// + /// Useful for when you need a materialized value of a Sink when handing it out to someone to materialize it for you. + /// + /// The materializer. + /// A tuple containing the (1) materialized value and (2) a new + /// that can be used to consume elements from the newly materialized . + public Tuple> PreMaterialize(IMaterializer materializer) + { + var sub = Source.AsSubscriber().ToMaterialized(this, Keep.Both).Run(materializer); + return Tuple.Create(sub.Item2, Sink.FromSubscriber(sub.Item1)); + } + /// /// Change the attributes of this to the given ones /// and seal the list of attributes. This means that further calls will not be able @@ -178,7 +192,7 @@ public static class Sink /// TBD public static Sink Wrap(IGraph, TMat> graph) => graph is Sink - ? (Sink) graph + ? (Sink)graph : new Sink(graph.Module); /// @@ -205,7 +219,7 @@ public static Sink> First() { if (!e.IsFaulted && e.IsCompleted && e.Result == null) throw new InvalidOperationException("Sink.First materialized on an empty stream"); - + return e; }); @@ -226,7 +240,7 @@ public static Sink> FirstOrDefault() /// /// TBD /// TBD - public static Sink> Last() + public static Sink> Last() => FromGraph(new LastOrDefault(throwOnDefault: true)).WithAttributes(DefaultAttributes.LastOrDefaultSink); @@ -541,7 +555,7 @@ public static Sink> LazySink(FuncTBD public static Sink FromGraph(IGraph, TMat> graph) => graph is Sink - ? (Sink) graph + ? (Sink)graph : new Sink(graph.Module); ///