Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Streams to be consumed as IAsyncEnumerable #4742

Merged
merged 8 commits into from
Dec 20, 2021

Conversation

to11mtm
Copy link
Member

@to11mtm to11mtm commented Jan 23, 2021

This pull request adds two methods on top of Source<TOut,TMat>:

  • RunAsAsyncEnumerable
  • RunAsAsyncEnumerableBuffer <-- allows overriding default buffer settings

This is a semi-naive but very workable implementation, utilizing ISinkQueue<TOut> alongside a Killswitch for disposal semantics.

This approach has the advantage of being concise and safe for users; multiple iterations re-run the stream which is probably what most users would expect.


namespace Akka.Streams.Tests.Dsl
{
#if NETCOREAPP
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't await foreach in the versions of Framework we are targeting with our test runners.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah got it


}
};
a.ShouldThrow<IllegalStateException>();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't sure if we should throw something different here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably fine

@@ -63,6 +63,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="$(ProtobufVersion)" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="5.0.0" />
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aaronontheweb I'm so so sorry ;_;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol, probably nothing we can do about that

src/core/Akka.Streams/Dsl/Source.cs Outdated Show resolved Hide resolved
{
//If we are disposing, let's shut down the stream
//so that we don't have data hanging around.
_killSwitch.Shutdown();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are materializing every time we get enumerator, we can dispose neatly here by just hitting the killswitch.

@to11mtm to11mtm changed the title Allow Streams to be consumed as IAsyncEumerable Allow Streams to be consumed as IAsyncEnumerable Jan 23, 2021
@to11mtm
Copy link
Member Author

to11mtm commented Jan 23, 2021

Hmm, it looks like behavior of one of the unit tests is Racy; it looks like shutting down the stream could cause -either- an InvalidStateException or an AbruptTerminationException depending on some timing.

Do we want to try to normalize to AbruptTermination? Or wrap these altogether?

@Horusiath
Copy link
Contributor

Horusiath commented Jan 24, 2021

Sorry, but why like this? AFAIK I already implemented IAsyncEnumerable sink in the past, together with ChannelReader/ChannelWriter stages (I think it was in AkkaContrib repo, which is no longer present). The thing is that in akka streams junction points with external world (outside of akka streams graph) are provided by materialized values, so IAsyncEnumerable stage has shape Sink<T, IAsyncEnumerable<T>>. Why?

Akka.Streams is build to provide composability - sometimes you may want to return more then one materialized value (eg. input channel to a graph and async enumerable output from a graph). This is possible via combinator operators and Keep.Both option. With the approach you proposed it's no longer possible.

@to11mtm
Copy link
Member Author

to11mtm commented Jan 24, 2021

Sorry, but why like this? AFAIK I already implemented IAsyncEnumerable sink in the past, together with ChannelReader/ChannelWriter stages (I think it was in AkkaContrib repo, which is no longer present). The thing is that in akka streams junction points with external world (outside of akka streams graph) are provided by materialized values, so IAsyncEnumerable stage has shape Sink<T, IAsyncEnumerable<T>>. Why?

Is that approach still somewhere? I came up with this because I could not find such an implementation, because you are right about below, this is definitely not 'composable' in the way Akka Streams normally provides (thus the RunAs naming.)

Akka.Streams is build to provide composability - sometimes you may want to return more then one materialized value (eg. input channel to a graph and async enumerable output from a graph). This is possible via combinator operators and Keep.Both option. With the approach you proposed it's no longer possible.

@Horusiath
Copy link
Contributor

I think, this could be implemented in more general terms of Akka.Streams.Channels I once did in Alpakka project. From there after materializing stream as channel reader, all that's left to represent it as IAsyncEnumerable is to use natively supported extension method ReadAllAsync. Example:

using System.Threading.Channels;

var events =
    source
    .ToMaterialized(ChannelSink.AsReader<int>(bufferSize: 10), Keep.Right)
    .Run(materializer)
    .ReadAllAsync(cancellationToken);
    
await foreach (var e in events) {
  await Handle(e)
}

This is more generalized implementation: it supports channels, which AFAIK are part of std lib in .NET 5.0. I can make a PR if we're ready to include that into core akka.

@Aaronontheweb
Copy link
Member

This is more generalized implementation: it supports channels, which AFAIK are part of std lib in .NET 5.0. I can make a PR if we're ready to include that into core akka.

One thing we're going to explore in Akka.NET v1.5, per some of @Zetanova's legwork, is replacing our current dispatching system that uses separate threadpools with one that uses separate channels on top of the shared .NET threadpool, since we want the benefits (exclusive scheduling) without the overhead (auto-scaling and managing threadpools on-demand.) We're hopeful that System.Threading.Channels can accomplish that.

So we're cool to add that as a dependency to Akka.Streams now, IMHO. I don't see the harm.

@Aaronontheweb
Copy link
Member

I want to take another look at this again....

@Aaronontheweb
Copy link
Member

I think, this could be implemented in more general terms of Akka.Streams.Channels I once did in Alpakka project. From there after materializing stream as channel reader, all that's left to represent it as IAsyncEnumerable is to use natively supported extension method ReadAllAsync. Example:

using System.Threading.Channels;

var events =
    source
    .ToMaterialized(ChannelSink.AsReader<int>(bufferSize: 10), Keep.Right)
    .Run(materializer)
    .ReadAllAsync(cancellationToken);
    
await foreach (var e in events) {
  await Handle(e)
}

This is more generalized implementation: it supports channels, which AFAIK are part of std lib in .NET 5.0. I can make a PR if we're ready to include that into core akka.

I think we should consider doing this for Akka.NET 2.0 - since we're dropping .NET Standard support then.

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - might not be a bad idea to re-arrange the internals of how this is implemented at some point, per @Horusiath's suggestion, but I think this is good to go for v1.4 and v1.5.


}
};
a.ShouldThrow<IllegalStateException>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably fine


namespace Akka.Streams.Tests.Dsl
{
#if NETCOREAPP
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah got it

@Aaronontheweb
Copy link
Member

Hmmm... looks like compilation failures?

@Aaronontheweb
Copy link
Member

Ah, looks like a change due to an upgrade we made onto a newer version of FluentAssertions @to11mtm

@Aaronontheweb Aaronontheweb modified the milestones: 1.4.25, 1.4.26 Sep 8, 2021
@Aaronontheweb Aaronontheweb modified the milestones: 1.4.26, 1.4.27 Sep 28, 2021
@Aaronontheweb Aaronontheweb modified the milestones: 1.4.27, 1.4.28 Oct 11, 2021
@Aaronontheweb Aaronontheweb modified the milestones: 1.4.28, 1.4.29 Nov 10, 2021
@Aaronontheweb
Copy link
Member

Looks like we have a compilation error - I suspect it's related to a FluentAssertions upgrade we merged in after this PR was made.

@Aaronontheweb Aaronontheweb modified the milestones: 1.4.29, 1.4.30 Dec 13, 2021
Copy link
Contributor

@Arkatufus Arkatufus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants