From ed14b05d35fe5847ad544b8e13d9562e7f26b76e Mon Sep 17 00:00:00 2001 From: Andrew El Date: Sat, 23 Jan 2021 11:26:02 -0500 Subject: [PATCH 1/4] Add `.RunAsAsyncEnumerable` and `.RunAsAsyncEnumerableBuffer` to Akka Streams Source DSL --- .../CoreAPISpec.ApproveStreams.approved.txt | 2 + .../Dsl/AsyncEnumerableSpec.cs | 120 ++++++++++++++++++ src/core/Akka.Streams/Akka.Streams.csproj | 1 + src/core/Akka.Streams/Dsl/Source.cs | 35 +++++ .../Implementation/AsyncEnumerable.cs | 77 +++++++++++ 5 files changed, 235 insertions(+) create mode 100644 src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs create mode 100644 src/core/Akka.Streams/Implementation/AsyncEnumerable.cs diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt index 367ae23b1a0..b1136c9600c 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt @@ -2031,6 +2031,8 @@ namespace Akka.Streams.Dsl public System.Threading.Tasks.Task RunAggregateAsync(TOut2 zero, System.Func> aggregate, Akka.Streams.IMaterializer materializer) { } public System.Threading.Tasks.Task RunForeach(System.Action action, Akka.Streams.IMaterializer materializer) { } public System.Threading.Tasks.Task RunSum(System.Func reduce, Akka.Streams.IMaterializer materializer) { } + public System.Collections.Generic.IAsyncEnumerable RunAsAsyncEnumerable(Akka.Sterams.IMaterializer materializer) { } + public System.Collections.Generic.IAsyncEnumerable RunAsAsyncEnumerableBuffer(Akka.Sterams.IMaterializer materializer, int minBuffer = 4, int maxBuffer = 16) { } public TMat2 RunWith(Akka.Streams.IGraph, TMat2> sink, Akka.Streams.IMaterializer materializer) { } public Akka.Streams.Dsl.IRunnableGraph To(Akka.Streams.IGraph, TMat2> sink) { } public Akka.Streams.Dsl.IRunnableGraph ToMaterialized(Akka.Streams.IGraph, TMat2> sink, System.Func combine) { } diff --git a/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs b/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs new file mode 100644 index 00000000000..a9ad75aceb5 --- /dev/null +++ b/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs @@ -0,0 +1,120 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Linq; +using System.Threading.Tasks; +using Akka.Pattern; +using Akka.Routing; +using Akka.Streams.Dsl; +using Akka.Streams.TestKit; +using Akka.TestKit; +using FluentAssertions; +using Nito.AsyncEx.Synchronous; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Streams.Tests.Dsl +{ +#if NETCOREAPP + public class AsyncEnumerableSpec : AkkaSpec + { + private ActorMaterializer Materializer { get; } + + public AsyncEnumerableSpec(ITestOutputHelper helper) : base(helper) + { + var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(2, 16); + Materializer = ActorMaterializer.Create(Sys, settings); + } + [Fact] + public async Task RunAsAsyncEnumerable_must_return_an_IAsyncEnumerableT_from_a_Source() + { + var input = Enumerable.Range(1, 6).ToList(); + var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer); + var output = input.ToArray(); + await foreach (var a in asyncEnumerable) + { + (output[0] == a).ShouldBeTrue("Did not get elements in order!"); + output = output.Skip(1).ToArray(); + } + output.Length.ShouldBe(0,"Did not receive all elements!"); + } + + [Fact] + public async Task RunAsAsyncEnumerable_must_allow_multiple_enumerations() + { + var input = Enumerable.Range(1, 6).ToList(); + var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer); + var output = input.ToArray(); + await foreach (var a in asyncEnumerable) + { + (output[0] == a).ShouldBeTrue("Did not get elements in order!"); + output = output.Skip(1).ToArray(); + } + output.Length.ShouldBe(0,"Did not receive all elements!"); + + output = input.ToArray(); + await foreach (var a in asyncEnumerable) + { + (output[0] == a).ShouldBeTrue("Did not get elements in order!"); + output = output.Skip(1).ToArray(); + } + output.Length.ShouldBe(0,"Did not receive all elements in second enumeration!!"); + } + + + [Fact] + public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination() + { + var materializer = ActorMaterializer.Create(Sys); + var probe = this.CreatePublisherProbe(); + var task = Source.FromPublisher(probe).RunAsAsyncEnumerable(materializer); + + var a = Task.Run( async () => + { + await foreach (var B in task) + { + materializer.Shutdown(); + } + }); + //since we are collapsing the stream inside the read + //we want to send messages so we aren't just waiting forever. + probe.SendNext(1); + probe.SendNext(2); + bool thrown = false; + try + { + await a; + } + catch (AbruptTerminationException e) + { + thrown = true; + } + thrown.ShouldBeTrue(); + } + + [Fact] + public async Task RunAsAsyncEnumerable_Throws_if_materializer_gone_before_Enumeration() + { + var materializer = ActorMaterializer.Create(Sys); + var probe = this.CreatePublisherProbe(); + var task = Source.FromPublisher(probe).RunAsAsyncEnumerable(materializer); + materializer.Shutdown(); + Func a = async () => + { + await foreach (var a in task) + { + + } + }; + a.ShouldThrow(); + } + } + +#else +#endif +} diff --git a/src/core/Akka.Streams/Akka.Streams.csproj b/src/core/Akka.Streams/Akka.Streams.csproj index ca1b41846f4..6e377f6667c 100644 --- a/src/core/Akka.Streams/Akka.Streams.csproj +++ b/src/core/Akka.Streams/Akka.Streams.csproj @@ -63,6 +63,7 @@ + diff --git a/src/core/Akka.Streams/Dsl/Source.cs b/src/core/Akka.Streams/Dsl/Source.cs index f8a82cf9e6f..d0e75bdc677 100644 --- a/src/core/Akka.Streams/Dsl/Source.cs +++ b/src/core/Akka.Streams/Dsl/Source.cs @@ -362,6 +362,41 @@ public Task RunSum(Func reduce, IMaterializer materializ public Task RunForeach(Action action, IMaterializer materializer) => RunWith(Sink.ForEach(action), materializer); + /// + /// Shortcut for running this as an . + /// The given enumerable is re-runnable but will cause a re-materialization of the stream each time. + /// This is implemented using a SourceQueue and will buffer elements based on configured stream defaults. + /// For custom buffers Please use + /// + /// The materializer to use for each enumeration + /// A lazy that will run each time it is enumerated. + public IAsyncEnumerable RunAsAsyncEnumerable( + IMaterializer materializer) => + new StreamsAsyncEnumerableRerunnable( + ViaMaterialized(KillSwitches.Single(),Keep.Right). + ToMaterialized(Sink.Queue(), Keep.Both), materializer); + + /// + /// Shortcut for running this as an . + /// The given enumerable is re-runnable but will cause a re-materialization of the stream each time. + /// This is implemented using a SourceQueue and will buffer elements and/or backpressure, + /// based on the buffer values provided. + /// + /// The materializer to use for each enumeration + /// The minimum input buffer size + /// The Max input buffer size. + /// A lazy that will run each time it is enumerated. + public IAsyncEnumerable RunAsAsyncEnumerableBuffer( + IMaterializer materializer, int minBuffer = 4, + int maxBuffer = 16) => + new StreamsAsyncEnumerableRerunnable( + ViaMaterialized(KillSwitches.Single(), Keep.Right) + .ToMaterialized( + Sink.Queue().WithAttributes( + Attributes.CreateInputBuffer(minBuffer, maxBuffer)), + Keep.Both), materializer); + + /// /// Combines several sources with fun-in strategy like or and returns . /// diff --git a/src/core/Akka.Streams/Implementation/AsyncEnumerable.cs b/src/core/Akka.Streams/Implementation/AsyncEnumerable.cs new file mode 100644 index 00000000000..b57eccabf8f --- /dev/null +++ b/src/core/Akka.Streams/Implementation/AsyncEnumerable.cs @@ -0,0 +1,77 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2021 Lightbend Inc. +// // Copyright (C) 2013-2021 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Akka.Configuration.Hocon; + +namespace Akka.Streams.Dsl +{ + /// + /// Used to treat an of + /// as an + /// + /// + public sealed class StreamsAsyncEnumerableRerunnable : IAsyncEnumerable + { + private readonly IMaterializer _materializer; + private readonly IRunnableGraph<(UniqueKillSwitch, ISinkQueue)> _graph; + + public StreamsAsyncEnumerableRerunnable(IRunnableGraph<(UniqueKillSwitch, ISinkQueue)> graph, IMaterializer materializer) + { + _graph = graph; + _materializer = materializer; + } + + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + return new SinkQueueAsyncEnumerator(_graph.Run(_materializer), cancellationToken); + } + } + /// + /// Wraps a Sink Queue and Killswitch around + /// + /// + public sealed class SinkQueueAsyncEnumerator : IAsyncEnumerator + { + private ISinkQueue _sinkQueue; + private IKillSwitch _killSwitch; + private CancellationToken _token; + public SinkQueueAsyncEnumerator((UniqueKillSwitch killSwitch,ISinkQueue sinkQueue) queueAndSwitch, CancellationToken token) + { + _sinkQueue = queueAndSwitch.sinkQueue; + _killSwitch = queueAndSwitch.killSwitch; + _token = token; + } + public async ValueTask DisposeAsync() + { + //If we are disposing, let's shut down the stream + //so that we don't have data hanging around. + _killSwitch.Shutdown(); + _sinkQueue = null; + } + + public async ValueTask MoveNextAsync() + { + _token.ThrowIfCancellationRequested(); + var opt = await _sinkQueue.PullAsync(); + if (opt.HasValue) + { + Current = opt.Value; + return true; + } + else + { + return false; + } + } + + public T Current { get; private set; } + } +} \ No newline at end of file From 7e316799f637a07baa2c612655bb732d8ab87a7c Mon Sep 17 00:00:00 2001 From: Andrew El Date: Sat, 23 Jan 2021 14:43:00 -0500 Subject: [PATCH 2/4] Add tests for CancellationToken support, use Cancellationtoken in rest of stream flow --- .../Dsl/AsyncEnumerableSpec.cs | 31 ++++++++++++++++++- src/core/Akka.Streams/Dsl/Source.cs | 12 ++----- .../Implementation/AsyncEnumerable.cs | 30 +++++++++++++++--- 3 files changed, 58 insertions(+), 15 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs b/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs index a9ad75aceb5..7ee7de261d7 100644 --- a/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs @@ -7,6 +7,7 @@ using System; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Akka.Pattern; using Akka.Routing; @@ -30,6 +31,32 @@ public AsyncEnumerableSpec(ITestOutputHelper helper) : base(helper) var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(2, 16); Materializer = ActorMaterializer.Create(Sys, settings); } + + [Fact] public async Task RunAsAsyncEnumerable_Uses_CancellationToken() + { + var input = Enumerable.Range(1, 6).ToList(); + + var cts = new CancellationTokenSource(); + var token = cts.Token; + + var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer); + var output = input.ToArray(); + bool caught = false; + try + { + await foreach (var a in asyncEnumerable.WithCancellation(token)) + { + cts.Cancel(); + } + } + catch (OperationCanceledException e) + { + caught = true; + } + + caught.ShouldBeTrue(); + } + [Fact] public async Task RunAsAsyncEnumerable_must_return_an_IAsyncEnumerableT_from_a_Source() { @@ -76,7 +103,7 @@ public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination() var a = Task.Run( async () => { - await foreach (var B in task) + await foreach (var notused in task) { materializer.Shutdown(); } @@ -113,6 +140,8 @@ public async Task RunAsAsyncEnumerable_Throws_if_materializer_gone_before_Enumer }; a.ShouldThrow(); } + + } #else diff --git a/src/core/Akka.Streams/Dsl/Source.cs b/src/core/Akka.Streams/Dsl/Source.cs index d0e75bdc677..74bcc436f5b 100644 --- a/src/core/Akka.Streams/Dsl/Source.cs +++ b/src/core/Akka.Streams/Dsl/Source.cs @@ -372,9 +372,7 @@ public Task RunForeach(Action action, IMaterializer materializer) /// A lazy that will run each time it is enumerated. public IAsyncEnumerable RunAsAsyncEnumerable( IMaterializer materializer) => - new StreamsAsyncEnumerableRerunnable( - ViaMaterialized(KillSwitches.Single(),Keep.Right). - ToMaterialized(Sink.Queue(), Keep.Both), materializer); + new StreamsAsyncEnumerableRerunnable(this, materializer); /// /// Shortcut for running this as an . @@ -389,12 +387,8 @@ public IAsyncEnumerable RunAsAsyncEnumerable( public IAsyncEnumerable RunAsAsyncEnumerableBuffer( IMaterializer materializer, int minBuffer = 4, int maxBuffer = 16) => - new StreamsAsyncEnumerableRerunnable( - ViaMaterialized(KillSwitches.Single(), Keep.Right) - .ToMaterialized( - Sink.Queue().WithAttributes( - Attributes.CreateInputBuffer(minBuffer, maxBuffer)), - Keep.Both), materializer); + new StreamsAsyncEnumerableRerunnable( + this, materializer,minBuffer,maxBuffer); /// diff --git a/src/core/Akka.Streams/Implementation/AsyncEnumerable.cs b/src/core/Akka.Streams/Implementation/AsyncEnumerable.cs index b57eccabf8f..b1fbb600f82 100644 --- a/src/core/Akka.Streams/Implementation/AsyncEnumerable.cs +++ b/src/core/Akka.Streams/Implementation/AsyncEnumerable.cs @@ -17,21 +17,40 @@ namespace Akka.Streams.Dsl /// as an /// /// - public sealed class StreamsAsyncEnumerableRerunnable : IAsyncEnumerable + public sealed class StreamsAsyncEnumerableRerunnable : IAsyncEnumerable { + private static readonly Sink> defaultSinkqueue = + Sink.Queue(); + private readonly Source _source; private readonly IMaterializer _materializer; - private readonly IRunnableGraph<(UniqueKillSwitch, ISinkQueue)> _graph; - public StreamsAsyncEnumerableRerunnable(IRunnableGraph<(UniqueKillSwitch, ISinkQueue)> graph, IMaterializer materializer) + private readonly Sink> thisSinkQueue; + //private readonly IRunnableGraph<(UniqueKillSwitch, ISinkQueue)> _graph; + public StreamsAsyncEnumerableRerunnable(Source source, IMaterializer materializer) { - _graph = graph; + _source = source; _materializer = materializer; + thisSinkQueue = defaultSinkqueue; + } + + public StreamsAsyncEnumerableRerunnable(Source source, + IMaterializer materializer, int minBuf, int maxBuf):this(source, materializer) + { + thisSinkQueue = + defaultSinkqueue.WithAttributes( + Attributes.CreateInputBuffer(minBuf, maxBuf)); } public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); - return new SinkQueueAsyncEnumerator(_graph.Run(_materializer), cancellationToken); + + return new SinkQueueAsyncEnumerator(_source + .Via(cancellationToken.AsFlow(cancelGracefully: true)) + .ViaMaterialized(KillSwitches.Single(), Keep.Right) + .ToMaterialized(thisSinkQueue, Keep.Both) + .Run(_materializer), + cancellationToken); } } /// @@ -54,6 +73,7 @@ public async ValueTask DisposeAsync() //If we are disposing, let's shut down the stream //so that we don't have data hanging around. _killSwitch.Shutdown(); + _killSwitch = null; _sinkQueue = null; } From 9ad66e934ec0bd0fb91eb18dc18810330ab3868c Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 21 Dec 2021 00:51:59 +0700 Subject: [PATCH 3/4] Fix FluentAssertion API changes --- src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs b/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs index 7ee7de261d7..9776e259327 100644 --- a/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs @@ -131,14 +131,16 @@ public async Task RunAsAsyncEnumerable_Throws_if_materializer_gone_before_Enumer var probe = this.CreatePublisherProbe(); var task = Source.FromPublisher(probe).RunAsAsyncEnumerable(materializer); materializer.Shutdown(); - Func a = async () => + + async Task ShouldThrow() { await foreach (var a in task) { } - }; - a.ShouldThrow(); + } + + await Assert.ThrowsAsync(ShouldThrow); } From 4220989d6ab0b7bee80e565a4f3773f2edf104a3 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 21 Dec 2021 01:12:34 +0700 Subject: [PATCH 4/4] Update API Approval list --- .../CoreAPISpec.ApproveStreams.approved.txt | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt index ee4d4949ae7..c471b93a465 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt @@ -1919,6 +1919,15 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Sink> Sum(System.Func reduce) { } public static Akka.Streams.Dsl.Sink Wrap(Akka.Streams.IGraph, TMat> graph) { } } + public sealed class SinkQueueAsyncEnumerator : System.Collections.Generic.IAsyncEnumerator, System.IAsyncDisposable + { + public SinkQueueAsyncEnumerator([System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { + "killSwitch", + "sinkQueue"})] System.ValueTuple> queueAndSwitch, System.Threading.CancellationToken token) { } + public T Current { get; } + public System.Threading.Tasks.ValueTask DisposeAsync() { } + public System.Threading.Tasks.ValueTask MoveNextAsync() { } + } public sealed class Sink : Akka.Streams.IGraph>, Akka.Streams.IGraph, TMat> { public Sink(Akka.Streams.Implementation.IModule module) { } @@ -2108,10 +2117,10 @@ namespace Akka.Streams.Dsl public System.ValueTuple> PreMaterialize(Akka.Streams.IMaterializer materializer) { } public System.Threading.Tasks.Task RunAggregate(TOut2 zero, System.Func aggregate, Akka.Streams.IMaterializer materializer) { } public System.Threading.Tasks.Task RunAggregateAsync(TOut2 zero, System.Func> aggregate, Akka.Streams.IMaterializer materializer) { } + public System.Collections.Generic.IAsyncEnumerable RunAsAsyncEnumerable(Akka.Streams.IMaterializer materializer) { } + public System.Collections.Generic.IAsyncEnumerable RunAsAsyncEnumerableBuffer(Akka.Streams.IMaterializer materializer, int minBuffer = 4, int maxBuffer = 16) { } public System.Threading.Tasks.Task RunForeach(System.Action action, Akka.Streams.IMaterializer materializer) { } public System.Threading.Tasks.Task RunSum(System.Func reduce, Akka.Streams.IMaterializer materializer) { } - public System.Collections.Generic.IAsyncEnumerable RunAsAsyncEnumerable(Akka.Sterams.IMaterializer materializer) { } - public System.Collections.Generic.IAsyncEnumerable RunAsAsyncEnumerableBuffer(Akka.Sterams.IMaterializer materializer, int minBuffer = 4, int maxBuffer = 16) { } public TMat2 RunWith(Akka.Streams.IGraph, TMat2> sink, Akka.Streams.IMaterializer materializer) { } public Akka.Streams.Dsl.IRunnableGraph To(Akka.Streams.IGraph, TMat2> sink) { } public Akka.Streams.Dsl.IRunnableGraph ToMaterialized(Akka.Streams.IGraph, TMat2> sink, System.Func combine) { } @@ -2152,6 +2161,12 @@ namespace Akka.Streams.Dsl [Akka.Annotations.ApiMayChangeAttribute()] public static Akka.Streams.Dsl.Sink>> SourceRef() { } } + public sealed class StreamsAsyncEnumerableRerunnable : System.Collections.Generic.IAsyncEnumerable + { + public StreamsAsyncEnumerableRerunnable(Akka.Streams.Dsl.Source source, Akka.Streams.IMaterializer materializer) { } + public StreamsAsyncEnumerableRerunnable(Akka.Streams.Dsl.Source source, Akka.Streams.IMaterializer materializer, int minBuf, int maxBuf) { } + public System.Collections.Generic.IAsyncEnumerator GetAsyncEnumerator(System.Threading.CancellationToken cancellationToken) { } + } public class static SubFlowOperations { public static Akka.Streams.Dsl.SubFlow Aggregate(this Akka.Streams.Dsl.SubFlow flow, TOut2 zero, System.Func fold) { }