Skip to content

Commit

Permalink
Avoid missing streams when relistening to broadcast StreamGroup (dart…
Browse files Browse the repository at this point in the history
…-archive/async#184)

In a broadcast `StreamGroup` when all listeners have canceled the inner
subscriptions are kept only for single subscriber streams. When a new
listener is added and subscriptions need to be recreated for inner
broadcast streams the presence of a single subscriber subscription in
the collection of subscriptions would cause the listen to bail out early
and fail to subscribe to later broadcast streams. Change `return` to
`continue` to keep going with the rest of the streams.
  • Loading branch information
anhtuan23 authored Jul 20, 2021
1 parent 5220785 commit 0aa87e7
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 1 deletion.
3 changes: 3 additions & 0 deletions pkgs/async/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
easier to implement custom sinks.
* Improve performance for `ChunkedStreamReader` by creating fewer internal
sublists and specializing to create views for `Uint8List` chunks.
* Don't ignore broadcast streams added to a `StreamGroup` that doesn't have an
active listener but previously had listeners and contains a single
subscription inner stream.

## 2.7.0

Expand Down
2 changes: 1 addition & 1 deletion pkgs/async/lib/src/stream_group.dart
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class StreamGroup<T> implements Sink<Stream<T>> {
// If this is a broadcast group and this isn't the first time it's been
// listened to, there may still be some subscriptions to
// single-subscription streams.
if (entry.value != null) return;
if (entry.value != null) continue;

var stream = entry.key;
try {
Expand Down
18 changes: 18 additions & 0 deletions pkgs/async/test/stream_group_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,24 @@ void main() {
expect(controller.hasListener, isTrue);
});

test(
'listens on streams that follow single-subscription streams when '
'relistening after a cancel', () async {
var controller1 = StreamController<String>();
streamGroup.add(controller1.stream);
streamGroup.stream.listen(null).cancel();

var controller2 = StreamController<String>();
streamGroup.add(controller2.stream);

var emitted = <String>[];
streamGroup.stream.listen(emitted.add);
controller1.add('one');
controller2.add('two');
await flushMicrotasks();
expect(emitted, ['one', 'two']);
});

test('never cancels single-subscription streams', () async {
var subscription = streamGroup.stream.listen(null);

Expand Down

0 comments on commit 0aa87e7

Please sign in to comment.