Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Streams to be consumed as IAsyncEnumerable #4742

Merged
merged 8 commits into from
Dec 20, 2021
17 changes: 17 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1919,6 +1919,15 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> Sum<TIn>(System.Func<TIn, TIn, TIn> reduce) { }
public static Akka.Streams.Dsl.Sink<TIn, TMat> Wrap<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
}
public sealed class SinkQueueAsyncEnumerator<T> : System.Collections.Generic.IAsyncEnumerator<T>, System.IAsyncDisposable
{
public SinkQueueAsyncEnumerator([System.Runtime.CompilerServices.TupleElementNamesAttribute(new string[] {
"killSwitch",
"sinkQueue"})] System.ValueTuple<Akka.Streams.UniqueKillSwitch, Akka.Streams.ISinkQueue<T>> queueAndSwitch, System.Threading.CancellationToken token) { }
public T Current { get; }
public System.Threading.Tasks.ValueTask DisposeAsync() { }
public System.Threading.Tasks.ValueTask<bool> MoveNextAsync() { }
}
public sealed class Sink<TIn, TMat> : Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>>, Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat>
{
public Sink(Akka.Streams.Implementation.IModule module) { }
Expand Down Expand Up @@ -2108,6 +2117,8 @@ namespace Akka.Streams.Dsl
public System.ValueTuple<TMat, Akka.Streams.Dsl.Source<TOut, Akka.NotUsed>> PreMaterialize(Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task<TOut2> RunAggregate<TOut2>(TOut2 zero, System.Func<TOut2, TOut, TOut2> aggregate, Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task<TOut2> RunAggregateAsync<TOut2>(TOut2 zero, System.Func<TOut2, TOut, System.Threading.Tasks.Task<TOut2>> aggregate, Akka.Streams.IMaterializer materializer) { }
public System.Collections.Generic.IAsyncEnumerable<TOut> RunAsAsyncEnumerable(Akka.Streams.IMaterializer materializer) { }
public System.Collections.Generic.IAsyncEnumerable<TOut> RunAsAsyncEnumerableBuffer(Akka.Streams.IMaterializer materializer, int minBuffer = 4, int maxBuffer = 16) { }
public System.Threading.Tasks.Task RunForeach(System.Action<TOut> action, Akka.Streams.IMaterializer materializer) { }
public System.Threading.Tasks.Task<TOut> RunSum(System.Func<TOut, TOut, TOut> reduce, Akka.Streams.IMaterializer materializer) { }
public TMat2 RunWith<TMat2>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> sink, Akka.Streams.IMaterializer materializer) { }
Expand Down Expand Up @@ -2150,6 +2161,12 @@ namespace Akka.Streams.Dsl
[Akka.Annotations.ApiMayChangeAttribute()]
public static Akka.Streams.Dsl.Sink<T, System.Threading.Tasks.Task<Akka.Streams.ISourceRef<T>>> SourceRef<T>() { }
}
public sealed class StreamsAsyncEnumerableRerunnable<T, TMat> : System.Collections.Generic.IAsyncEnumerable<T>
{
public StreamsAsyncEnumerableRerunnable(Akka.Streams.Dsl.Source<T, TMat> source, Akka.Streams.IMaterializer materializer) { }
public StreamsAsyncEnumerableRerunnable(Akka.Streams.Dsl.Source<T, TMat> source, Akka.Streams.IMaterializer materializer, int minBuf, int maxBuf) { }
public System.Collections.Generic.IAsyncEnumerator<T> GetAsyncEnumerator(System.Threading.CancellationToken cancellationToken) { }
}
public class static SubFlowOperations
{
public static Akka.Streams.Dsl.SubFlow<TOut2, TMat, TClosed> Aggregate<TOut1, TOut2, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut1, TMat, TClosed> flow, TOut2 zero, System.Func<TOut2, TOut1, TOut2> fold) { }
Expand Down
151 changes: 151 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
//-----------------------------------------------------------------------
// <copyright file="AsyncEnumerableSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Pattern;
using Akka.Routing;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
using FluentAssertions;
using Nito.AsyncEx.Synchronous;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Streams.Tests.Dsl
{
#if NETCOREAPP
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't await foreach in the versions of Framework we are targeting with our test runners.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah got it

public class AsyncEnumerableSpec : AkkaSpec
{
private ActorMaterializer Materializer { get; }

public AsyncEnumerableSpec(ITestOutputHelper helper) : base(helper)
{
var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(2, 16);
Materializer = ActorMaterializer.Create(Sys, settings);
}

[Fact] public async Task RunAsAsyncEnumerable_Uses_CancellationToken()
{
var input = Enumerable.Range(1, 6).ToList();

var cts = new CancellationTokenSource();
var token = cts.Token;

var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer);
var output = input.ToArray();
bool caught = false;
try
{
await foreach (var a in asyncEnumerable.WithCancellation(token))
{
cts.Cancel();
}
}
catch (OperationCanceledException e)
{
caught = true;
}

caught.ShouldBeTrue();
}

[Fact]
public async Task RunAsAsyncEnumerable_must_return_an_IAsyncEnumerableT_from_a_Source()
{
var input = Enumerable.Range(1, 6).ToList();
var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer);
var output = input.ToArray();
await foreach (var a in asyncEnumerable)
{
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}
output.Length.ShouldBe(0,"Did not receive all elements!");
}

[Fact]
public async Task RunAsAsyncEnumerable_must_allow_multiple_enumerations()
{
var input = Enumerable.Range(1, 6).ToList();
var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer);
var output = input.ToArray();
await foreach (var a in asyncEnumerable)
{
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}
output.Length.ShouldBe(0,"Did not receive all elements!");

output = input.ToArray();
await foreach (var a in asyncEnumerable)
{
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}
output.Length.ShouldBe(0,"Did not receive all elements in second enumeration!!");
}


[Fact]
public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination()
{
var materializer = ActorMaterializer.Create(Sys);
var probe = this.CreatePublisherProbe<int>();
var task = Source.FromPublisher(probe).RunAsAsyncEnumerable(materializer);

var a = Task.Run( async () =>
{
await foreach (var notused in task)
{
materializer.Shutdown();
}
});
//since we are collapsing the stream inside the read
//we want to send messages so we aren't just waiting forever.
probe.SendNext(1);
probe.SendNext(2);
bool thrown = false;
try
{
await a;
}
catch (AbruptTerminationException e)
{
thrown = true;
}
thrown.ShouldBeTrue();
}

[Fact]
public async Task RunAsAsyncEnumerable_Throws_if_materializer_gone_before_Enumeration()
{
var materializer = ActorMaterializer.Create(Sys);
var probe = this.CreatePublisherProbe<int>();
var task = Source.FromPublisher(probe).RunAsAsyncEnumerable(materializer);
materializer.Shutdown();

async Task ShouldThrow()
{
await foreach (var a in task)
{

}
}

await Assert.ThrowsAsync<IllegalStateException>(ShouldThrow);
}


}

#else
#endif
}
1 change: 1 addition & 0 deletions src/core/Akka.Streams/Akka.Streams.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="$(ProtobufVersion)" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="5.0.0" />
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aaronontheweb I'm so so sorry ;_;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol, probably nothing we can do about that

<PackageReference Include="Reactive.Streams" Version="1.0.2" />
</ItemGroup>
<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
Expand Down
29 changes: 29 additions & 0 deletions src/core/Akka.Streams/Dsl/Source.cs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,35 @@ public Task<TOut> RunSum(Func<TOut, TOut, TOut> reduce, IMaterializer materializ
public Task RunForeach(Action<TOut> action, IMaterializer materializer)
=> RunWith(Sink.ForEach(action), materializer);

/// <summary>
/// Shortcut for running this <see cref="Source{TOut,TMat}"/> as an <see cref="IAsyncEnumerable{TOut}"/>.
/// The given enumerable is re-runnable but will cause a re-materialization of the stream each time.
/// This is implemented using a SourceQueue and will buffer elements based on configured stream defaults.
/// For custom buffers Please use <see cref="RunAsAsyncEnumerableBuffer"/>
/// </summary>
/// <param name="materializer">The materializer to use for each enumeration</param>
/// <returns>A lazy <see cref="IAsyncEnumerable{T}"/> that will run each time it is enumerated.</returns>
public IAsyncEnumerable<TOut> RunAsAsyncEnumerable(
IMaterializer materializer) =>
new StreamsAsyncEnumerableRerunnable<TOut,TMat>(this, materializer);

/// <summary>
/// Shortcut for running this <see cref="Source{TOut,TMat}"/> as an <see cref="IAsyncEnumerable{TOut}"/>.
/// The given enumerable is re-runnable but will cause a re-materialization of the stream each time.
/// This is implemented using a SourceQueue and will buffer elements and/or backpressure,
/// based on the buffer values provided.
/// </summary>
/// <param name="materializer">The materializer to use for each enumeration</param>
/// <param name="minBuffer">The minimum input buffer size</param>
/// <param name="maxBuffer">The Max input buffer size.</param>
/// <returns>A lazy <see cref="IAsyncEnumerable{T}"/> that will run each time it is enumerated.</returns>
public IAsyncEnumerable<TOut> RunAsAsyncEnumerableBuffer(
IMaterializer materializer, int minBuffer = 4,
int maxBuffer = 16) =>
new StreamsAsyncEnumerableRerunnable<TOut,TMat>(
this, materializer,minBuffer,maxBuffer);


/// <summary>
/// Combines several sources with fun-in strategy like <see cref="Merge{TIn,TOut}"/> or <see cref="Concat{TIn,TOut}"/> and returns <see cref="Source{TOut,TMat}"/>.
/// </summary>
Expand Down
97 changes: 97 additions & 0 deletions src/core/Akka.Streams/Implementation/AsyncEnumerable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// //-----------------------------------------------------------------------
// // <copyright file="AsyncEnumerable.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Akka.Configuration.Hocon;

namespace Akka.Streams.Dsl
{
/// <summary>
/// Used to treat an <see cref="IRunnableGraph{TMat}"/> of <see cref="ISinkQueue{T}"/>
/// as an <see cref="IAsyncEnumerable{T}"/>
/// </summary>
/// <typeparam name="T"></typeparam>
public sealed class StreamsAsyncEnumerableRerunnable<T,TMat> : IAsyncEnumerable<T>
{
private static readonly Sink<T, ISinkQueue<T>> defaultSinkqueue =
Sink.Queue<T>();
private readonly Source<T, TMat> _source;
private readonly IMaterializer _materializer;

private readonly Sink<T, ISinkQueue<T>> thisSinkQueue;
//private readonly IRunnableGraph<(UniqueKillSwitch, ISinkQueue<T>)> _graph;
public StreamsAsyncEnumerableRerunnable(Source<T,TMat> source, IMaterializer materializer)
{
_source = source;
_materializer = materializer;
thisSinkQueue = defaultSinkqueue;
}

public StreamsAsyncEnumerableRerunnable(Source<T, TMat> source,
IMaterializer materializer, int minBuf, int maxBuf):this(source, materializer)
{
thisSinkQueue =
defaultSinkqueue.WithAttributes(
Attributes.CreateInputBuffer(minBuf, maxBuf));
}

public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

return new SinkQueueAsyncEnumerator<T>(_source
.Via(cancellationToken.AsFlow<T>(cancelGracefully: true))
.ViaMaterialized(KillSwitches.Single<T>(), Keep.Right)
.ToMaterialized(thisSinkQueue, Keep.Both)
.Run(_materializer),
cancellationToken);
}
}
/// <summary>
/// Wraps a Sink Queue and Killswitch around <see cref="IAsyncEnumerator{T}"/>
/// </summary>
/// <typeparam name="T"></typeparam>
public sealed class SinkQueueAsyncEnumerator<T> : IAsyncEnumerator<T>
{
private ISinkQueue<T> _sinkQueue;
private IKillSwitch _killSwitch;
private CancellationToken _token;
public SinkQueueAsyncEnumerator((UniqueKillSwitch killSwitch,ISinkQueue<T> sinkQueue) queueAndSwitch, CancellationToken token)
{
_sinkQueue = queueAndSwitch.sinkQueue;
_killSwitch = queueAndSwitch.killSwitch;
_token = token;
to11mtm marked this conversation as resolved.
Show resolved Hide resolved
}
public async ValueTask DisposeAsync()
{
//If we are disposing, let's shut down the stream
//so that we don't have data hanging around.
_killSwitch.Shutdown();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are materializing every time we get enumerator, we can dispose neatly here by just hitting the killswitch.

_killSwitch = null;
_sinkQueue = null;
}

public async ValueTask<bool> MoveNextAsync()
{
_token.ThrowIfCancellationRequested();
var opt = await _sinkQueue.PullAsync();
if (opt.HasValue)
{
Current = opt.Value;
return true;
}
else
{
return false;
}
}

public T Current { get; private set; }
}
}