-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Description
Version Information
Version of Akka.NET? 1.5.55
Which Akka.NET Modules? Akka.Streams
Describe the bug
Its not failing 100% of the time; but seems to be when its Completing the writer while the stream is waiting for next element (when channel is empty)
The exception I'm getting is:
System.NullReferenceException
Object reference not set to an instance of an object.
at Akka.Streams.Stage.GraphStageLogic.ConcurrentAsyncCallback`1.<>c__DisplayClass3_0.<.ctor>b__0(Object obj)
at Akka.Streams.Implementation.Fusing.GraphInterpreter.RunAsyncInput(GraphStageLogic logic, Object evt, TaskCompletionSource`1 promise, Action`1 handler)
--- End of stack trace from previous location ---
at OmniPulse.Tests.Utils.ChannelStreamTermination.Test() in /Users/jrb/omnigraph/omni-pulse/test/Utils/ChannelStreamTermination.cs:line 28
at Xunit.Sdk.TestInvoker`1.<>c__DisplayClass47_0.<<InvokeTestMethodAsync>b__1>d.MoveNext() in /_/src/xunit.execution/Sdk/Frameworks/Runners/TestInvoker.cs:line 259
--- End of stack trace from previous location ---
at Xunit.Sdk.ExecutionTimer.AggregateAsync(Func`1 asyncAction) in /_/src/xunit.execution/Sdk/Frameworks/ExecutionTimer.cs:line 48
at Xunit.Sdk.ExceptionAggregator.RunAsync(Func`1 code) in /_/src/xunit.core/Sdk/ExceptionAggregator.cs:line 90To Reproduce
public class ChannelStreamTermination
{
[Fact]
public async Task Test()
{
// Arrange
var testInput = Enumerable.Range(start: 1, count: 5).Select(i => i.ToString()).ToList();
var service = new TestService(materializer: ActorSystem.Create("Local").Materializer());
// Act
await service.StartAsync();
foreach (var item in testInput)
await service.Enqueue(data: item);
await Task.Delay(millisecondsDelay: 1000); // Error seems to be when stream is stopped on empty channel
await service.StopAsync();
// Assert
Assert.Equal(expected: testInput.Order(), actual: service.Processed.Order());
}
}
internal sealed class TestService(IMaterializer materializer)
{
private readonly Channel<string> channel = Channel.CreateUnbounded<string>();
public readonly ConcurrentBag<string> Processed = [];
private Task streamTask;
public Task Enqueue(string data)
{
return channel.Writer.WriteAsync(item: data).AsTask();
}
public Task StartAsync()
{
streamTask = CreateStream().Run(materializer: materializer);
return Task.CompletedTask;
}
public Task StopAsync()
{
channel.Writer.Complete();
return streamTask ?? Task.CompletedTask;
}
private IRunnableGraph<Task<Done>> CreateStream()
{
return ChannelSource.FromReader(channel.Reader)
.Select(ImmutableArray.Create)
.Select(s =>
{
foreach (var item in s) Processed.Add(item);
return Done.Instance;
})
.ToMaterialized(sink: Sink.Ignore<Done>(), combine: Keep.Right);
}
}Expected behavior
Should shut down cleanly without any errors.
Actual behavior
NREs.
Additional context
Possibly related to #7381
I'm guessing its a race condition between
OnReaderComplete setting the CompleteStage() on
akka.net/src/core/Akka.Streams/Implementation/ChannelSources.cs
Lines 45 to 51 in 2dc6be9
| private void OnReaderComplete(Exception reason) | |
| { | |
| if (reason is null) | |
| CompleteStage(); | |
| else | |
| FailStage(reason); | |
| } |
versus the continuation on _reader.WaitToReadAsync() if the reader gets cancelled
akka.net/src/core/Akka.Streams/Implementation/ChannelSources.cs
Lines 71 to 93 in 2dc6be9
| var continuation = _reader.WaitToReadAsync(); | |
| if (continuation.IsCompletedSuccessfully) | |
| { | |
| var dataAvailable = continuation.GetAwaiter().GetResult(); | |
| if (dataAvailable && _reader.TryRead(out element)) | |
| Push(_outlet, element); | |
| else | |
| CompleteStage(); | |
| } | |
| else | |
| continuation.AsTask().ContinueWith(_onReadReady); | |
| } | |
| } | |
| private void ContinueAsyncRead(Task<bool> t) | |
| { | |
| if (t.IsFaulted) | |
| _onValueReadFailure(t.Exception); | |
| else if (t.IsCanceled) | |
| _onValueReadFailure(new TaskCanceledException(t)); | |
| else | |
| _onValueRead(t.Result); | |
| } |
Can't quite see why, but if i change
...csharp
else if (!continuation.IsCompleted)
continuation.AsTask().ContinueWith(t =>
{
if (t.IsFaulted) _onValueReadFailure(t.Exception);
else if (t.IsCanceled) _onValueReadFailure(new TaskCanceledException(t));
else _onValueRead(t.Result);
});
...
to
...csharp
else if (!continuation.IsCompleted)
continuation.AsTask().ContinueWith(t =>
{
if (t.IsFaulted) _onValueReadFailure(t.Exception);
else if (t.IsCanceled) _onValueReadFailure(new TaskCanceledException(t));
else if(t.Result) _onValueRead(t.Result);
else CompleteStage();
});
...
I no longer get the issue, but the _onValueRead callback should do just that..?
Hmm... still getting occational errors, though a lot less