Skip to content

Commit

Permalink
Merge branch 'dev' into benchmark-dot-net
Browse files Browse the repository at this point in the history
  • Loading branch information
Horusiath authored May 31, 2018
2 parents 914b7c5 + c2c1df0 commit 6ab0b14
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 1 deletion.
9 changes: 8 additions & 1 deletion docs/articles/streams/basics.md
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,14 @@ RunnableGraph<Tuple<TaskCompletionSource<int>, ICancelable, Task<int>>> r12 =
> [!NOTE]
> In Graphs it is possible to access the materialized value from inside the stream processing graph.
> For details see [Accessing the materialized value inside the Graph](xref:streams-working-with-graphs#accessing-the-materialized-value-inside-the-graph).
### Source pre-materialization
There are situations in which you require a `Source` materialized value **before** the `Source` gets hooked up to the rest of the graph.
This is particularly useful in the case of "materialized value powered" `Source`s, like `Source.Queue`, `Source.ActorRef` or `Source.Maybe`.

By using the `PreMaterialize` operator on a `Source`, you can obtain its materialized value and another `Source`. The latter can be used to consume messages from the original `Source`. Note that this can be materialized multiple times.

[!code-csharp[FlowDocTests.cs](../../examples/DocsExamples/Streams/FlowDocTests.cs?name=source-prematerialization)]

## Stream ordering
In Akka Streams almost all computation stages *preserve input order* of elements. This means that if inputs ``{IA1,IA2,...,IAn}``
Expand Down
34 changes: 34 additions & 0 deletions docs/examples/DocsExamples/Streams/FlowDocTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using System;
using Akka;
using Akka.Actor;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.TestKit.Xunit2;
using Xunit;

namespace DocsExamples.Streams
{
public class FlowDocTests : TestKit
{
[Fact]
public void Source_prematerialization()
{
#region source-prematerialization

var matPoweredSource =
Source.ActorRef<string>(bufferSize: 100, overflowStrategy: OverflowStrategy.Fail);

Tuple<IActorRef, Source<string, NotUsed>> materialized = matPoweredSource.PreMaterialize(Sys.Materializer());

var actorRef = materialized.Item1;
var source = materialized.Item2;

actorRef.Tell("hit");

// pass source around for materialization
source.RunWith(Sink.ForEach<string>(Console.WriteLine), Sys.Materializer());

#endregion
}
}
}
4 changes: 4 additions & 0 deletions docs/examples/Tutorials/Tutorials.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" />
</ItemGroup>

<ItemGroup>
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -1655,6 +1655,7 @@ namespace Akka.Streams.Dsl
public Akka.Streams.Dsl.Source<TOut, TMat3> ConcatMaterialized<TMat2, TMat3>(Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut>, TMat2> that, System.Func<TMat, TMat2, TMat3> materializedFunction) { }
public Akka.Streams.Dsl.Source<TOut, TMat2> MapMaterializedValue<TMat2>(System.Func<TMat, TMat2> mapFunc) { }
public Akka.Streams.Dsl.Source<TOut, TMat> Named(string name) { }
public System.Tuple<TMat, Akka.Streams.Dsl.Source<TOut, Akka.NotUsed>> PreMaterialize(Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task<TOut2> RunAggregate<TOut2>(TOut2 zero, System.Func<TOut2, TOut, TOut2> aggregate, Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task<TOut2> RunAggregateAsync<TOut2>(TOut2 zero, System.Func<TOut2, TOut, System.Threading.Tasks.Task<TOut2>> aggregate, Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task RunForeach(System.Action<TOut> action, Akka.Streams.IMaterializer materializer) { }
Expand Down
79 changes: 79 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/SourceSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -442,5 +442,84 @@ public void A_ZipWithN_Source_must_properly_ZipWithN()
.AwaitResult()
.ShouldAllBeEquivalentTo(new[] {111, 222, 333});
}

[Fact]
public void Source_prematerialization_must_materialize_the_source_and_connect_it_to_a_publisher()
{
var matValPoweredSource = Source.Maybe<int>();
var matted = matValPoweredSource.PreMaterialize(Sys.Materializer());
var mat = matted.Item1;
var src = matted.Item2;

var probe = src.RunWith(this.SinkProbe<int>(), Sys.Materializer());
probe.Request(1);
mat.TrySetResult(42).Should().BeTrue();
probe.ExpectNext(42);
probe.ExpectComplete();
}

[Fact]
public async Task Source_prematerialization_must_allow_for_multiple_downstream_materialized_sources()
{
var matValPoweredSource = Source.Queue<string>(int.MaxValue, OverflowStrategy.Fail);
var matted = matValPoweredSource.PreMaterialize(Sys.Materializer());
var mat = matted.Item1;
var src = matted.Item2;

var probe1 = src.RunWith(this.SinkProbe<string>(), Sys.Materializer());
var probe2 = src.RunWith(this.SinkProbe<string>(), Sys.Materializer());

probe1.Request(1);
probe2.Request(2);
await mat.OfferAsync("One");
probe1.ExpectNext("One");
probe2.ExpectNext("One");
}

[Fact]
public async Task Source_prematerialization_must_survive_cancellation_of_downstream_materialized_sources()
{
var matValPoweredSource = Source.Queue<string>(Int32.MaxValue, OverflowStrategy.Fail);
var matted = matValPoweredSource.PreMaterialize(Sys.Materializer());
var mat = matted.Item1;
var src = matted.Item2;

var probe1 = src.RunWith(this.SinkProbe<string>(), Sys.Materializer());
src.RunWith(Sink.Cancelled<string>(), Sys.Materializer());

probe1.Request(1);
await mat.OfferAsync("One");
probe1.ExpectNext("One");
}

[Fact]
public void Source_prematerialization_must_propagate_failures_to_downstream_materialized_sources()
{
var matValPoweredSource = Source.Queue<string>(Int32.MaxValue, OverflowStrategy.Fail);
var matted = matValPoweredSource.PreMaterialize(Sys.Materializer());
var mat = matted.Item1;
var src = matted.Item2;

var probe1 = src.RunWith(this.SinkProbe<string>(), Sys.Materializer());
var probe2 = src.RunWith(this.SinkProbe<string>(), Sys.Materializer());

mat.Fail(new InvalidOperationException("boom"));

probe1.ExpectSubscription();
probe2.ExpectSubscription();

probe1.ExpectError().Message.Should().Be("boom");
probe2.ExpectError().Message.Should().Be("boom");
}

[Fact]
public void Source_prematerialization_must_propagate_materialization_failures()
{
var matValPoweredSource =
Source.Empty<int>().MapMaterializedValue<int>(_ => throw new InvalidOperationException("boom"));

Action thrower = () => matValPoweredSource.PreMaterialize(Sys.Materializer());
thrower.ShouldThrow<InvalidOperationException>();
}
}
}
12 changes: 12 additions & 0 deletions src/core/Akka.Streams/Dsl/Source.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,18 @@ IFlow<TOut, TMat2> IFlow<TOut, TMat>.MapMaterializedValue<TMat2>(Func<TMat, TMat
public Source<TOut, TMat2> MapMaterializedValue<TMat2>(Func<TMat, TMat2> mapFunc)
=> new Source<TOut, TMat2>(Module.TransformMaterializedValue(mapFunc));

/// <summary>
/// Materializes this Source immediately.
/// </summary>
/// <param name="materializer">The materializer.</param>
/// <returns>A tuple containing the (1) materialized value and (2) a new <see cref="Source"/>
/// that can be used to consume elements from the newly materialized <see cref="Source"/>.</returns>
public Tuple<TMat, Source<TOut, NotUsed>> PreMaterialize(IMaterializer materializer)
{
var tup = ToMaterialized(Sink.AsPublisher<TOut>(fanout: true), Keep.Both).Run(materializer);
return Tuple.Create(tup.Item1, Source.FromPublisher(tup.Item2));
}

/// <summary>
/// Connect this <see cref="Source{TOut,TMat}"/> to a <see cref="Sink{TIn,TMat}"/> and run it. The returned value is the materialized value
/// of the <see cref="Sink{TIn,TMat}"/> , e.g. the <see cref="IPublisher{TIn}"/> of a <see cref="Sink.Publisher{TIn}"/>.
Expand Down

0 comments on commit 6ab0b14

Please sign in to comment.