Skip to content

Commit

Permalink
Akka.Streams: add PreMaterialize support for Sinks (#3477)
Browse files Browse the repository at this point in the history
* added PreMaterialize for Sinks

* added API approval and implemented specs for Sink.PreMaterialize

* added docs
  • Loading branch information
Aaronontheweb authored and marcpiechura committed Jun 1, 2018
1 parent 1b1146b commit d866409
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 5 deletions.
14 changes: 13 additions & 1 deletion docs/articles/streams/builtinstages.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ elements or failing the stream, the strategy is chosen by the user.

**completes** when the actorref is sent ``Akka.Actor.Status.Success`` or ``PoisonPill``

#### PreMaterialize

Materializes this Source, immediately returning (1) its materialized value, and (2) a new Source that can consume elements 'into' the pre-materialized one.

Useful for when you need a materialized value of a Source when handing it out to someone to materialize it for you.

#### Combine

Combine several sources, using a given strategy such as merge or concat, into one source.
Expand Down Expand Up @@ -359,7 +365,13 @@ to provide back pressure onto the sink.

**cancels** when the actor terminates

**backpressures** when the actor acknowledgement has not arrived
**backpressures** when the actor acknowledgement has not arrived.

#### PreMaterialize

Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink that can consume elements 'into' the pre-materialized one.

Useful for when you need a materialized value of a Sink when handing it out to someone to materialize it for you.


#### ActorSubscriber
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,7 @@ namespace Akka.Streams.Dsl
public Akka.Streams.Dsl.Sink<TIn2, TMat> ContraMap<TIn2>(System.Func<TIn2, TIn> function) { }
public Akka.Streams.Dsl.Sink<TIn, TMat2> MapMaterializedValue<TMat2>(System.Func<TMat, TMat2> fn) { }
public Akka.Streams.Dsl.Sink<TIn, TMat> Named(string name) { }
public System.Tuple<TMat, Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed>> PreMaterialize(Akka.Streams.IMaterializer materializer) { }
public TMat2 RunWith<TMat2>(Akka.Streams.IGraph<Akka.Streams.SourceShape<TIn>, TMat2> source, Akka.Streams.IMaterializer materializer) { }
public override string ToString() { }
public Akka.Streams.Dsl.Sink<TIn, TMat> WithAttributes(Akka.Streams.Attributes attributes) { }
Expand Down
1 change: 1 addition & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public void ApproveClusterTools()
var publicApi = Filter(PublicApiGenerator.CreatePublicApiForAssembly(asm));
Approvals.Verify(publicApi);
}

[Fact]
[MethodImpl(MethodImplOptions.NoInlining)]
public void ApproveStreams()
Expand Down
63 changes: 63 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/SinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
Expand Down Expand Up @@ -190,6 +191,68 @@ public void A_Sink_must_support_contramap()
.Run(Materializer)
.Result.ShouldAllBeEquivalentTo(Enumerable.Range(1, 9));
}

[Fact]
public void Sink_prematerialization_must_materialize_the_sink_and_wrap_its_exposed_publisher_in_a_Source()
{
var publisherSink = Sink.AsPublisher<string>(false);
var tup = publisherSink.PreMaterialize(Sys.Materializer());
var matPub = tup.Item1;
var sink = tup.Item2;

var probe = Source.FromPublisher(matPub).RunWith(this.SinkProbe<string>(), Sys.Materializer());
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));

Source.Single("hello").RunWith(sink, Sys.Materializer());

probe.EnsureSubscription();
probe.RequestNext("hello");
probe.ExpectComplete();
}

[Fact]
public void Sink_prematerialization_must_materialize_the_sink_and_wrap_its_exposed_fanout_publisher_in_a_Source_twice()
{
var publisherSink = Sink.AsPublisher<string>(true);
var tup = publisherSink.PreMaterialize(Sys.Materializer());
var matPub = tup.Item1;
var sink = tup.Item2;

var probe1 = Source.FromPublisher(matPub).RunWith(this.SinkProbe<string>(), Sys.Materializer());
var probe2 = Source.FromPublisher(matPub).RunWith(this.SinkProbe<string>(), Sys.Materializer());

Source.Single("hello").RunWith(sink, Sys.Materializer());

probe1.EnsureSubscription();
probe1.RequestNext("hello");
probe1.ExpectComplete();

probe2.EnsureSubscription();
probe2.RequestNext("hello");
probe2.ExpectComplete();
}

[Fact]
public void
Sink_prematerialization_must_materialize_the_sink_and_wrap_its_exposed_nonfanout_publisher_and_fail_the_second_materialization()
{
var publisherSink = Sink.AsPublisher<string>(false);
var tup = publisherSink.PreMaterialize(Sys.Materializer());
var matPub = tup.Item1;
var sink = tup.Item2;

var probe1 = Source.FromPublisher(matPub).RunWith(this.SinkProbe<string>(), Sys.Materializer());
var probe2 = Source.FromPublisher(matPub).RunWith(this.SinkProbe<string>(), Sys.Materializer());

Source.Single("hello").RunWith(sink, Sys.Materializer());

probe1.EnsureSubscription();
probe1.RequestNext("hello");
probe1.ExpectComplete();

probe2.EnsureSubscription();
probe2.ExpectError().Message.Should().Contain("only supports one subscriber");
}
}
}

Expand Down
22 changes: 18 additions & 4 deletions src/core/Akka.Streams/Dsl/Sink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,20 @@ public TMat2 RunWith<TMat2>(IGraph<SourceShape<TIn>, TMat2> source, IMaterialize
public Sink<TIn, TMat2> MapMaterializedValue<TMat2>(Func<TMat, TMat2> fn)
=> new Sink<TIn, TMat2>(Module.TransformMaterializedValue(fn));

/// <summary>
/// Materializes this Sink immediately.
///
/// Useful for when you need a materialized value of a Sink when handing it out to someone to materialize it for you.
/// </summary>
/// <param name="materializer">The materializer.</param>
/// <returns>A tuple containing the (1) materialized value and (2) a new <see cref="Sink"/>
/// that can be used to consume elements from the newly materialized <see cref="Sink"/>.</returns>
public Tuple<TMat, Sink<TIn, NotUsed>> PreMaterialize(IMaterializer materializer)
{
var sub = Source.AsSubscriber<TIn>().ToMaterialized(this, Keep.Both).Run(materializer);
return Tuple.Create(sub.Item2, Sink.FromSubscriber(sub.Item1));
}

/// <summary>
/// Change the attributes of this <see cref="IGraph{TShape}"/> to the given ones
/// and seal the list of attributes. This means that further calls will not be able
Expand Down Expand Up @@ -178,7 +192,7 @@ public static class Sink
/// <returns>TBD</returns>
public static Sink<TIn, TMat> Wrap<TIn, TMat>(IGraph<SinkShape<TIn>, TMat> graph)
=> graph is Sink<TIn, TMat>
? (Sink<TIn, TMat>) graph
? (Sink<TIn, TMat>)graph
: new Sink<TIn, TMat>(graph.Module);

/// <summary>
Expand All @@ -205,7 +219,7 @@ public static Sink<TIn, Task<TIn>> First<TIn>()
{
if (!e.IsFaulted && e.IsCompleted && e.Result == null)
throw new InvalidOperationException("Sink.First materialized on an empty stream");

return e;
});

Expand All @@ -226,7 +240,7 @@ public static Sink<TIn, Task<TIn>> FirstOrDefault<TIn>()
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
/// <returns>TBD</returns>
public static Sink<TIn, Task<TIn>> Last<TIn>()
public static Sink<TIn, Task<TIn>> Last<TIn>()
=> FromGraph(new LastOrDefault<TIn>(throwOnDefault: true)).WithAttributes(DefaultAttributes.LastOrDefaultSink);


Expand Down Expand Up @@ -541,7 +555,7 @@ public static Sink<TIn, Task<TMat>> LazySink<TIn, TMat>(Func<TIn, Task<Sink<TIn,
/// <returns>TBD</returns>
public static Sink<TIn, TMat> FromGraph<TIn, TMat>(IGraph<SinkShape<TIn>, TMat> graph)
=> graph is Sink<TIn, TMat>
? (Sink<TIn, TMat>) graph
? (Sink<TIn, TMat>)graph
: new Sink<TIn, TMat>(graph.Module);

/// <summary>
Expand Down

0 comments on commit d866409

Please sign in to comment.