From 25a7e2ec39c03622b86918cb9ce3e7d00dd283d1 Mon Sep 17 00:00:00 2001 From: Do Tuan Anh <dotuananh.dta@gmail.com> Date: Wed, 21 Jul 2021 02:27:55 +0700 Subject: [PATCH] Avoid missing streams when relistening to broadcast StreamGroup (#184) In a broadcast `StreamGroup` when all listeners have canceled the inner subscriptions are kept only for single subscriber streams. When a new listener is added and subscriptions need to be recreated for inner broadcast streams the presence of a single subscriber subscription in the collection of subscriptions would cause the listen to bail out early and fail to subscribe to later broadcast streams. Change `return` to `continue` to keep going with the rest of the streams. --- CHANGELOG.md | 3 +++ lib/src/stream_group.dart | 2 +- test/stream_group_test.dart | 18 ++++++++++++++++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 102ad59..2c5a3eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ easier to implement custom sinks. * Improve performance for `ChunkedStreamReader` by creating fewer internal sublists and specializing to create views for `Uint8List` chunks. +* Don't ignore broadcast streams added to a `StreamGroup` that doesn't have an + active listener but previously had listeners and contains a single + subscription inner stream. ## 2.7.0 diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart index f7819bf..c8414f4 100644 --- a/lib/src/stream_group.dart +++ b/lib/src/stream_group.dart @@ -192,7 +192,7 @@ class StreamGroup<T> implements Sink<Stream<T>> { // If this is a broadcast group and this isn't the first time it's been // listened to, there may still be some subscriptions to // single-subscription streams. - if (entry.value != null) return; + if (entry.value != null) continue; var stream = entry.key; try { diff --git a/test/stream_group_test.dart b/test/stream_group_test.dart index 11c66c1..256056e 100644 --- a/test/stream_group_test.dart +++ b/test/stream_group_test.dart @@ -417,6 +417,24 @@ void main() { expect(controller.hasListener, isTrue); }); + test( + 'listens on streams that follow single-subscription streams when ' + 'relistening after a cancel', () async { + var controller1 = StreamController<String>(); + streamGroup.add(controller1.stream); + streamGroup.stream.listen(null).cancel(); + + var controller2 = StreamController<String>(); + streamGroup.add(controller2.stream); + + var emitted = <String>[]; + streamGroup.stream.listen(emitted.add); + controller1.add('one'); + controller2.add('two'); + await flushMicrotasks(); + expect(emitted, ['one', 'two']); + }); + test('never cancels single-subscription streams', () async { var subscription = streamGroup.stream.listen(null);