Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka.Streams: System.NotSupportedException when disposing stage with materialized IAsyncEnumerable #6280

Closed
Aaronontheweb opened this issue Nov 30, 2022 · 7 comments · Fixed by #6296

Comments

@Aaronontheweb
Copy link
Member

Version Information
Version of Akka.NET? v1.4.46, v1.5.0-alpha3
Which Akka.NET Modules? Akka.Streams

Describe the bug

When running a Source<T> built from an IAsyncEnumerable<T>, any attempt to dispose the IAsyncEnumerable<T> via its DisposeAsync() method results in the following exception being thrown:

[WARNING][11/30/2022 4:36:06 PM][Thread 0008][LogSource (akka://OpsCenter)] Failed to dispose IAsyncEnumerator asynchronously
Cause: System.NotSupportedException: Specified method is not supported.
   at OpsCenter.ClusterState.Subscriptions.ClusterStreamManager.GetEnvironmentEventStream(String environmentId, ClusterStreamConfig config, CancellationToken cancellationToken)+System.IAsyncDisposable.DisposeAsync()
   at Akka.Streams.Implementation.Fusing.AsyncEnumerable`1.Logic.OnDownstreamFinish()

To Reproduce

I don't have a clean reproduction of this issue yet, but using the following in LinqPad with Akka.NET v1.4.46 installed:

async Task Main()
{
		var actorSystem = ActorSystem.Create("MySys");

		using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));

		var (actorRef, src) = Source.ActorRef<int>(1000, OverflowStrategy.DropHead)
		.PreMaterialize(actorSystem.Materializer());
		
		Task.Run(() => {
			foreach(var i in Enumerable.Range(0, 100)){
				actorRef.Tell(i);
			}	
			//  shut down the stream
			//actorRef.Tell(PoisonPill.Instance);
		});


		await Source.From(() => src.RunAsAsyncEnumerable(actorSystem.Materializer())).RunForeach(i => Console.WriteLine($"{DateTime.UtcNow}: {i}"), actorSystem.Materializer());
}

I was able to trigger another bug with this stage:

[ERROR][11/30/2022 9:02:21 PM][Thread 0082][[akka://MySys/user/StreamSupervisor-10/Flow-0-0-unknown-operation#943292100]] Error in stage [EnumerableSource]: Should never reach this code
Cause: System.InvalidOperationException: Should never reach this code
   at Akka.Streams.Implementation.Fusing.AsyncEnumerable`1.Logic.OnPull()
   at Akka.Streams.Implementation.Fusing.GraphInterpreter.Execute(Int32 eventLimit)
--- End of stack trace from previous location ---
   at UserQuery.Main() in C:\Users\aaron\AppData

As for the error with this code - I'll try another reproduction but I think it happens when IActorContext used to materialize the IAsyncEnumerable source is terminated.

Expected behavior

IAsyncEnumerable stages should be able to shut down correctly.

Actual behavior

The IAsyncEnumerable stage errs in various ways, which aren't properly supported right now.

@to11mtm
Copy link
Member

to11mtm commented Dec 2, 2022

I know I brought it up on Discord but to put it on record..

Is it worth making this change in #6268 ?

++        public IAsyncEnumerable<TOut> RunAsAsyncEnumerableBuffer(
++            IMaterializer materializer,
++            int maxBuffer = 16)
++        {
++            return this.RunWith(
++                Sink.ChannelReader<TOut>(maxBuffer, true)
++                    .MapMaterializedValue(r => r.ReadAllAsync()), materializer);
++        }

        //Existing method, add obsolete
        /// <summary>
        /// Shortcut for running this <see cref="Source{TOut,TMat}"/> as an <see cref="IAsyncEnumerable{TOut}"/>.
--        /// The given enumerable is re-runnable but will cause a re-materialization of the stream each time.
++      /// The given enumerable is not re-runnable.
        /// This is implemented using a SourceQueue and will buffer elements and/or backpressure,
        /// based on the buffer values provided.
        /// </summary>
        /// <param name="materializer">The materializer to use for each enumeration</param>
        /// <param name="minBuffer">The minimum input buffer size</param>
        /// <param name="maxBuffer">The Max input buffer size.</param>
++        [Obsolete("minBuffer is No longer respected, Please use new overload",false)]
        public IAsyncEnumerable<TOut> RunAsAsyncEnumerableBuffer(
            IMaterializer materializer, int minBuffer = 4,
            int maxBuffer = 16) =>
--            new StreamsAsyncEnumerableRerunnable<TOut,TMat>(
--                this, materializer,minBuffer,maxBuffer);
++          return RunAsAsyncEnumerableBuffer(materializer,maxBuffer);

The main difference is that the IAsyncEnumerable is not 're-runnable` but TBH I'm not sure that's worth keeping.

@Aaronontheweb
Copy link
Member Author

I think so - having more consistent / clean termination behavior is probably best. If users want re-runnable IAsyncEnumerables they'd be better off restarting the graph IMHO.

@Aaronontheweb
Copy link
Member Author

Looks like we have a test case that should cover this already

[Fact]
public async Task AsyncEnumerableSource_Must_Cancel_Running_Source_If_Downstream_Completes()
{
await this.AssertAllStagesStoppedAsync(async () =>
{
var latch = new AtomicBoolean();
IAsyncEnumerable<int> Range() => ProbeableRangeAsync(0, 100, latch);
var subscriber = this.CreateManualSubscriberProbe<int>();
Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);
var subscription = await subscriber.ExpectSubscriptionAsync();
subscription.Request(50);
await subscriber.ExpectNextNAsync(Enumerable.Range(0, 50));
subscription.Cancel();
// The cancellation token inside the IAsyncEnumerable should be cancelled
await WithinAsync(3.Seconds(), async () => latch.Value);
}, Materializer);
}

Stepping through with the debugger, I don't see it throw in this test. It does consistently in my reproduction application though.

@Aaronontheweb
Copy link
Member Author

Related: dotnet/runtime#51176

@Aaronontheweb
Copy link
Member Author

Yep, this is the reason why I suspect:

dotnet/runtime#51176 (comment)

Aaronontheweb added a commit to Aaronontheweb/akka.net that referenced this issue Dec 5, 2022
@Aaronontheweb
Copy link
Member Author

Yeah what we're doing is illegal, per Stephen Toub https://learn.microsoft.com/en-us/archive/msdn-magazine/2019/november/csharp-iterating-with-async-enumerables-in-csharp-8#much-ado-about-threading

It should be evident that it’s fine for one MoveNextAsync call to occur on a different thread from a previous or subsequent MoveNext­Async call; after all, the implementation may await a task and continue execution somewhere else. However, that doesn’t mean MoveNext­Async is “thread-safe”—far from it. On a given async enumerator, MoveNextAsync must never be invoked concurrently, meaning MoveNextAsync shouldn’t be called again on a given enumerator until the previous call to it has completed. Similarly, DisposeAsync on an iterator shouldn’t be invoked while either MoveNextAsync or DisposeAsync on that same enumerator is still in flight.

Right call here is to not attempt to dispose the IAsyncEnumerator<T> at all upon completion, since we have no guarantee what state the upstream IAsyncEnumerable<T> is in. The CancellationToken should already take care of this. Filling up the log with warnings about a problem we can't fix is just noise.

Aaronontheweb added a commit to Aaronontheweb/akka.net that referenced this issue Dec 5, 2022
Aaronontheweb added a commit that referenced this issue Dec 6, 2022
* Upgraded Akka.Streams and Akka.Streams.Tests to C# 9

* added reproduction for #6280

* close #6280

* added compiler directive back to fix compilation issues on Linux

* added comment

* bump CI
@Aaronontheweb
Copy link
Member Author

closed via #6290

Arkatufus pushed a commit to Arkatufus/akka.net that referenced this issue Dec 7, 2022
Fixes akkadotnet#6280

We no longer attempt to dispose IAsyncEnumerators inside Source stages due to the reasons outlined here: akkadotnet#6280 (comment)

This prevents the log from being filled with NotSupportedException warnings from failed DisposeAsync operations.

(cherry-picked from 3156272)
Aaronontheweb added a commit that referenced this issue Dec 7, 2022
…` bug (#6296)

* Backports #6290

Fixes #6280

We no longer attempt to dispose IAsyncEnumerators inside Source stages due to the reasons outlined here: #6280 (comment)

This prevents the log from being filled with NotSupportedException warnings from failed DisposeAsync operations.

(cherry-picked from 3156272)

* post-merge fix

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants