Skip to content

Commit c3a7ec8

Browse files
authored
GraphStageLogic.ConcurrentAsyncCallback throws NRE when used with ChannelSource (#7808) (#7811)
(cherry picked from commit 4168945)
1 parent 9dab980 commit c3a7ec8

File tree

2 files changed

+67
-5
lines changed

2 files changed

+67
-5
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// -----------------------------------------------------------------------
2+
// <copyright file="Issue7794Spec.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
// -----------------------------------------------------------------------
7+
8+
using System.Threading.Channels;
9+
using System.Threading.Tasks;
10+
using Akka.Streams.Dsl;
11+
using Akka.TestKit;
12+
using Xunit;
13+
using Xunit.Abstractions;
14+
15+
namespace Akka.Streams.Tests.Implementation;
16+
17+
public class Issue7794Spec: AkkaSpec
18+
{
19+
private ActorMaterializer Materializer { get; }
20+
21+
public Issue7794Spec(ITestOutputHelper helper) : base(helper)
22+
{
23+
Materializer = Sys.Materializer();
24+
}
25+
26+
[Fact(DisplayName = "ChannelSource should not throw NRE when Channel completes")]
27+
public async Task Issue_7794_ChannelSource_NRE()
28+
{
29+
var channel = Channel.CreateBounded<Message<string, string>>(new BoundedChannelOptions(capacity: 100)
30+
{
31+
FullMode = BoundedChannelFullMode.Wait,
32+
SingleReader = true,
33+
SingleWriter = true,
34+
AllowSynchronousContinuations = false
35+
});
36+
37+
var streamRes = ChannelSource.FromReader(channel.Reader)
38+
.Select(e => e)
39+
.RunWith(Sink.Ignore<Message<string, string>>(), Materializer);
40+
41+
_ = Task.Run(async () =>
42+
{
43+
await Task.Delay(100);
44+
channel.Writer.Complete();
45+
});
46+
47+
await streamRes;
48+
}
49+
50+
private class Message<TKey, TValue>
51+
{
52+
public TKey Key { get; set; }
53+
public TValue Value { get; set; }
54+
}
55+
}

src/core/Akka.Streams/Stage/GraphStage.cs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -885,11 +885,18 @@ public ConcurrentAsyncCallback(Action<T> handler, GraphStageLogic ownedStage)
885885
_ownedStage = ownedStage;
886886
_wrappedHandler = obj =>
887887
{
888-
if (obj is T e)
889-
handler1(e);
890-
else
891-
throw new ArgumentException(
892-
$"Expected {nameof(obj)} to be of type {typeof(T)}, but was {obj.GetType()}");
888+
switch (obj)
889+
{
890+
// Always assume that T can be null and the handler will handle null values
891+
case null:
892+
handler1(default);
893+
break;
894+
case T e:
895+
handler1(e);
896+
break;
897+
default:
898+
throw new ArgumentException($"Expected {nameof(obj)} to be of type {typeof(T)}, but was {obj.GetType()}");
899+
}
893900
};
894901
}
895902

0 commit comments

Comments
 (0)