diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt index 6b60552ac21..c471b93a465 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt @@ -1919,6 +1919,15 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Sink> Sum(System.Func reduce) { } public static Akka.Streams.Dsl.Sink Wrap(Akka.Streams.IGraph, TMat> graph) { } } + public sealed class SinkQueueAsyncEnumerator : System.Collections.Generic.IAsyncEnumerator, System.IAsyncDisposable + { + public SinkQueueAsyncEnumerator([System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] { + "killSwitch", + "sinkQueue"})] System.ValueTuple> queueAndSwitch, System.Threading.CancellationToken token) { } + public T Current { get; } + public System.Threading.Tasks.ValueTask DisposeAsync() { } + public System.Threading.Tasks.ValueTask MoveNextAsync() { } + } public sealed class Sink : Akka.Streams.IGraph>, Akka.Streams.IGraph, TMat> { public Sink(Akka.Streams.Implementation.IModule module) { } @@ -2108,6 +2117,8 @@ namespace Akka.Streams.Dsl public System.ValueTuple> PreMaterialize(Akka.Streams.IMaterializer materializer) { } public System.Threading.Tasks.Task RunAggregate(TOut2 zero, System.Func aggregate, Akka.Streams.IMaterializer materializer) { } public System.Threading.Tasks.Task RunAggregateAsync(TOut2 zero, System.Func> aggregate, Akka.Streams.IMaterializer materializer) { } + public System.Collections.Generic.IAsyncEnumerable RunAsAsyncEnumerable(Akka.Streams.IMaterializer materializer) { } + public System.Collections.Generic.IAsyncEnumerable RunAsAsyncEnumerableBuffer(Akka.Streams.IMaterializer materializer, int minBuffer = 4, int maxBuffer = 16) { } public System.Threading.Tasks.Task RunForeach(System.Action action, Akka.Streams.IMaterializer materializer) { } public System.Threading.Tasks.Task RunSum(System.Func reduce, Akka.Streams.IMaterializer materializer) { } public TMat2 RunWith(Akka.Streams.IGraph, TMat2> sink, Akka.Streams.IMaterializer materializer) { } @@ -2150,6 +2161,12 @@ namespace Akka.Streams.Dsl [Akka.Annotations.ApiMayChangeAttribute()] public static Akka.Streams.Dsl.Sink>> SourceRef() { } } + public sealed class StreamsAsyncEnumerableRerunnable : System.Collections.Generic.IAsyncEnumerable + { + public StreamsAsyncEnumerableRerunnable(Akka.Streams.Dsl.Source source, Akka.Streams.IMaterializer materializer) { } + public StreamsAsyncEnumerableRerunnable(Akka.Streams.Dsl.Source source, Akka.Streams.IMaterializer materializer, int minBuf, int maxBuf) { } + public System.Collections.Generic.IAsyncEnumerator GetAsyncEnumerator(System.Threading.CancellationToken cancellationToken) { } + } public class static SubFlowOperations { public static Akka.Streams.Dsl.SubFlow Aggregate(this Akka.Streams.Dsl.SubFlow flow, TOut2 zero, System.Func fold) { } diff --git a/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs b/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs new file mode 100644 index 00000000000..9776e259327 --- /dev/null +++ b/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs @@ -0,0 +1,151 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Akka.Pattern; +using Akka.Routing; +using Akka.Streams.Dsl; +using Akka.Streams.TestKit; +using Akka.TestKit; +using FluentAssertions; +using Nito.AsyncEx.Synchronous; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Streams.Tests.Dsl +{ +#if NETCOREAPP + public class AsyncEnumerableSpec : AkkaSpec + { + private ActorMaterializer Materializer { get; } + + public AsyncEnumerableSpec(ITestOutputHelper helper) : base(helper) + { + var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(2, 16); + Materializer = ActorMaterializer.Create(Sys, settings); + } + + [Fact] public async Task RunAsAsyncEnumerable_Uses_CancellationToken() + { + var input = Enumerable.Range(1, 6).ToList(); + + var cts = new CancellationTokenSource(); + var token = cts.Token; + + var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer); + var output = input.ToArray(); + bool caught = false; + try + { + await foreach (var a in asyncEnumerable.WithCancellation(token)) + { + cts.Cancel(); + } + } + catch (OperationCanceledException e) + { + caught = true; + } + + caught.ShouldBeTrue(); + } + + [Fact] + public async Task RunAsAsyncEnumerable_must_return_an_IAsyncEnumerableT_from_a_Source() + { + var input = Enumerable.Range(1, 6).ToList(); + var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer); + var output = input.ToArray(); + await foreach (var a in asyncEnumerable) + { + (output[0] == a).ShouldBeTrue("Did not get elements in order!"); + output = output.Skip(1).ToArray(); + } + output.Length.ShouldBe(0,"Did not receive all elements!"); + } + + [Fact] + public async Task RunAsAsyncEnumerable_must_allow_multiple_enumerations() + { + var input = Enumerable.Range(1, 6).ToList(); + var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer); + var output = input.ToArray(); + await foreach (var a in asyncEnumerable) + { + (output[0] == a).ShouldBeTrue("Did not get elements in order!"); + output = output.Skip(1).ToArray(); + } + output.Length.ShouldBe(0,"Did not receive all elements!"); + + output = input.ToArray(); + await foreach (var a in asyncEnumerable) + { + (output[0] == a).ShouldBeTrue("Did not get elements in order!"); + output = output.Skip(1).ToArray(); + } + output.Length.ShouldBe(0,"Did not receive all elements in second enumeration!!"); + } + + + [Fact] + public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination() + { + var materializer = ActorMaterializer.Create(Sys); + var probe = this.CreatePublisherProbe(); + var task = Source.FromPublisher(probe).RunAsAsyncEnumerable(materializer); + + var a = Task.Run( async () => + { + await foreach (var notused in task) + { + materializer.Shutdown(); + } + }); + //since we are collapsing the stream inside the read + //we want to send messages so we aren't just waiting forever. + probe.SendNext(1); + probe.SendNext(2); + bool thrown = false; + try + { + await a; + } + catch (AbruptTerminationException e) + { + thrown = true; + } + thrown.ShouldBeTrue(); + } + + [Fact] + public async Task RunAsAsyncEnumerable_Throws_if_materializer_gone_before_Enumeration() + { + var materializer = ActorMaterializer.Create(Sys); + var probe = this.CreatePublisherProbe(); + var task = Source.FromPublisher(probe).RunAsAsyncEnumerable(materializer); + materializer.Shutdown(); + + async Task ShouldThrow() + { + await foreach (var a in task) + { + + } + } + + await Assert.ThrowsAsync(ShouldThrow); + } + + + } + +#else +#endif +} diff --git a/src/core/Akka.Streams/Akka.Streams.csproj b/src/core/Akka.Streams/Akka.Streams.csproj index e3205c5a705..78be8ba16db 100644 --- a/src/core/Akka.Streams/Akka.Streams.csproj +++ b/src/core/Akka.Streams/Akka.Streams.csproj @@ -63,6 +63,7 @@ + diff --git a/src/core/Akka.Streams/Dsl/Source.cs b/src/core/Akka.Streams/Dsl/Source.cs index ca4d8222445..2433f985c69 100644 --- a/src/core/Akka.Streams/Dsl/Source.cs +++ b/src/core/Akka.Streams/Dsl/Source.cs @@ -362,6 +362,35 @@ public Task RunSum(Func reduce, IMaterializer materializ public Task RunForeach(Action action, IMaterializer materializer) => RunWith(Sink.ForEach(action), materializer); + /// + /// Shortcut for running this as an . + /// The given enumerable is re-runnable but will cause a re-materialization of the stream each time. + /// This is implemented using a SourceQueue and will buffer elements based on configured stream defaults. + /// For custom buffers Please use + /// + /// The materializer to use for each enumeration + /// A lazy that will run each time it is enumerated. + public IAsyncEnumerable RunAsAsyncEnumerable( + IMaterializer materializer) => + new StreamsAsyncEnumerableRerunnable(this, materializer); + + /// + /// Shortcut for running this as an . + /// The given enumerable is re-runnable but will cause a re-materialization of the stream each time. + /// This is implemented using a SourceQueue and will buffer elements and/or backpressure, + /// based on the buffer values provided. + /// + /// The materializer to use for each enumeration + /// The minimum input buffer size + /// The Max input buffer size. + /// A lazy that will run each time it is enumerated. + public IAsyncEnumerable RunAsAsyncEnumerableBuffer( + IMaterializer materializer, int minBuffer = 4, + int maxBuffer = 16) => + new StreamsAsyncEnumerableRerunnable( + this, materializer,minBuffer,maxBuffer); + + /// /// Combines several sources with fun-in strategy like or and returns . /// diff --git a/src/core/Akka.Streams/Implementation/AsyncEnumerable.cs b/src/core/Akka.Streams/Implementation/AsyncEnumerable.cs new file mode 100644 index 00000000000..b1fbb600f82 --- /dev/null +++ b/src/core/Akka.Streams/Implementation/AsyncEnumerable.cs @@ -0,0 +1,97 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2021 Lightbend Inc. +// // Copyright (C) 2013-2021 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Akka.Configuration.Hocon; + +namespace Akka.Streams.Dsl +{ + /// + /// Used to treat an of + /// as an + /// + /// + public sealed class StreamsAsyncEnumerableRerunnable : IAsyncEnumerable + { + private static readonly Sink> defaultSinkqueue = + Sink.Queue(); + private readonly Source _source; + private readonly IMaterializer _materializer; + + private readonly Sink> thisSinkQueue; + //private readonly IRunnableGraph<(UniqueKillSwitch, ISinkQueue)> _graph; + public StreamsAsyncEnumerableRerunnable(Source source, IMaterializer materializer) + { + _source = source; + _materializer = materializer; + thisSinkQueue = defaultSinkqueue; + } + + public StreamsAsyncEnumerableRerunnable(Source source, + IMaterializer materializer, int minBuf, int maxBuf):this(source, materializer) + { + thisSinkQueue = + defaultSinkqueue.WithAttributes( + Attributes.CreateInputBuffer(minBuf, maxBuf)); + } + + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + return new SinkQueueAsyncEnumerator(_source + .Via(cancellationToken.AsFlow(cancelGracefully: true)) + .ViaMaterialized(KillSwitches.Single(), Keep.Right) + .ToMaterialized(thisSinkQueue, Keep.Both) + .Run(_materializer), + cancellationToken); + } + } + /// + /// Wraps a Sink Queue and Killswitch around + /// + /// + public sealed class SinkQueueAsyncEnumerator : IAsyncEnumerator + { + private ISinkQueue _sinkQueue; + private IKillSwitch _killSwitch; + private CancellationToken _token; + public SinkQueueAsyncEnumerator((UniqueKillSwitch killSwitch,ISinkQueue sinkQueue) queueAndSwitch, CancellationToken token) + { + _sinkQueue = queueAndSwitch.sinkQueue; + _killSwitch = queueAndSwitch.killSwitch; + _token = token; + } + public async ValueTask DisposeAsync() + { + //If we are disposing, let's shut down the stream + //so that we don't have data hanging around. + _killSwitch.Shutdown(); + _killSwitch = null; + _sinkQueue = null; + } + + public async ValueTask MoveNextAsync() + { + _token.ThrowIfCancellationRequested(); + var opt = await _sinkQueue.PullAsync(); + if (opt.HasValue) + { + Current = opt.Value; + return true; + } + else + { + return false; + } + } + + public T Current { get; private set; } + } +} \ No newline at end of file