-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Closed
Milestone
Description
Version Information
Version of Akka.NET? 1.5.40
Which Akka.NET Modules? Streams
Describe the bug
Cancelled sinks are blocking other BroadcastHub consumers. This is the same exact BroadcastHub problem as described here: akka/akka-core#23205 (comment)
To Reproduce
[Fact]
public async Task BroadcastHub_must_handle_cancelled_Sink()
{
await this.AssertAllStagesStoppedAsync(async () =>
{
var upstream = this.CreatePublisherProbe<int>();
var hubSource = Source.FromPublisher(upstream).RunWith(BroadcastHub.Sink<int>(4), Materializer);
var downstream = this.CreateSubscriberProbe<int>();
hubSource.RunWith(Sink.Cancelled<int>(), Materializer);
hubSource.RunWith(Sink.FromSubscriber(downstream), Materializer);
await downstream.EnsureSubscriptionAsync();
await downstream.RequestAsync(10);
await upstream.ExpectRequestAsync();
await upstream.SendNextAsync(1);
await downstream.ExpectNextAsync(1);
await upstream.SendNextAsync(2);
await downstream.ExpectNextAsync(2);
await upstream.SendNextAsync(3);
await downstream.ExpectNextAsync(3);
await upstream.SendNextAsync(4);
await downstream.ExpectNextAsync(4);
await upstream.SendNextAsync(5);
await downstream.ExpectNextAsync(5);
await upstream.SendCompleteAsync();
await downstream.ExpectCompleteAsync();
}, Materializer);
}Expected behavior
downstream should receive the 5th element
Actual behavior
Timeout while waiting for message exception on the await downstream.ExpectNextAsync(5); line.