Skip to content
This repository has been archived by the owner on Oct 17, 2024. It is now read-only.

Add StreamGroup.onIdle and StreamGroup.isIdle #164

Merged
merged 5 commits into from
Apr 27, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
* Added `ChunkedStreamReader` for reading _chunked streams_ without managing
buffers.

* Add `StreamGroup.isIdle` and `StreamGroup.onIdle`.

* Add `StreamGroup.isClosed` and `FutureGroup.isClosed` getters.

## 2.5.0
Expand Down
18 changes: 13 additions & 5 deletions lib/src/future_group.dart
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,22 @@ class FutureGroup<T> implements Sink<Future<T>> {
Future<List<T>> get future => _completer.future;
final _completer = Completer<List<T>>();

/// Whether this group has no pending futures.
/// Whether this group contains no futures.
///
/// A [FutureGroup] is idle when it contains no futures, which is the case for
/// a newly created group or one where all added futures have been removed or
/// completed.
bool get isIdle => _pending == 0;

/// A broadcast stream that emits a `null` event whenever the last pending
/// future in this group completes.
/// A broadcast stream that emits an event whenever this group becomes idle.
///
/// A [FutureGroup] is idle when it contains no futures, which is the case for
/// a newly created group or one where all added futures have been removed or
/// completed.
///
/// Once this group isn't waiting on any futures *and* [close] has been
/// called, this stream will close.
/// This stream will close when this group is idle *and* [close] has been
/// called. Note that events won't be emitted on this stream until [stream]
/// has been listened to.
Stream get onIdle =>
(_onIdleController ??= StreamController.broadcast(sync: true)).stream;

Expand Down
46 changes: 45 additions & 1 deletion lib/src/stream_group.dart
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,35 @@ class StreamGroup<T> implements Sink<Stream<T>> {
/// See [_StreamGroupState] for detailed descriptions of each state.
var _state = _StreamGroupState.dormant;

/// Whether this group contains no streams.
///
/// A [StreamGroup] is idle when it contains no streams, which is the case for
/// a newly created group or one where all added streams have been emitted
/// done events (or been [remove]d).
///
/// If this is a single-subscription group, then cancelling the subscription
/// to [stream] will also remove all streams.
bool get isIdle => _subscriptions.isEmpty;

/// A broadcast stream that emits an event whenever this group becomes idle.
///
/// A [StreamGroup] is idle when it contains no streams, which is the case for
/// a newly created group or one where all added streams have been emitted
/// done events (or been [remove]d).
///
/// This stream will close when either:
///
/// * This group is idle *and* [close] has been called, or
/// * [stream]'s subscription has been cancelled (if this is a
/// single-subscription group).
///
/// Note that events won't be emitted on this stream until [stream] has been
/// listened to.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth mentioning that the events are delivered asynchronously and the group can have become be non-idle when the event is delivered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, done.

Stream<void> get onIdle =>
(_onIdleController ??= StreamController.broadcast()).stream;

StreamController<Null>? _onIdleController;

/// Streams that have been added to the group, and their subscriptions if they
/// have been subscribed to.
///
Expand Down Expand Up @@ -135,7 +164,15 @@ class StreamGroup<T> implements Sink<Stream<T>> {
Future? remove(Stream<T> stream) {
var subscription = _subscriptions.remove(stream);
var future = subscription == null ? null : subscription.cancel();
if (_closed && _subscriptions.isEmpty) _controller.close();

if (_subscriptions.isEmpty) {
_onIdleController?.add(null);
if (_closed) {
_onIdleController?.close();
scheduleMicrotask(_controller.close);
}
}

return future;
}

Expand Down Expand Up @@ -180,6 +217,13 @@ class StreamGroup<T> implements Sink<Stream<T>> {
.toList();

_subscriptions.clear();

var onIdleController = _onIdleController;
if (onIdleController != null && !onIdleController.isClosed) {
onIdleController.add(null);
onIdleController.close();
}

return futures.isEmpty ? null : Future.wait(futures);
}

Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: async
version: 2.6.0-dev
version: 2.6.0

description: Utility functions and classes related to the 'dart:async' library.
repository: https://github.com/dart-lang/async
Expand Down
103 changes: 103 additions & 0 deletions test/stream_group_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,109 @@ void regardlessOfType(StreamGroup<String> Function() newStreamGroup) {
expect(events, equals(['one', 'two', 'three', 'four', 'five', 'six']));
});
});

group('onIdle', () {
test('emits an event when the last pending stream emits done', () async {
streamGroup.stream.listen(null);

var idle = false;
streamGroup.onIdle.listen((_) => idle = true);

var controller1 = StreamController<String>();
var controller2 = StreamController<String>();
var controller3 = StreamController<String>();

streamGroup.add(controller1.stream);
streamGroup.add(controller2.stream);
streamGroup.add(controller3.stream);

await flushMicrotasks();
expect(idle, isFalse);
expect(streamGroup.isIdle, isFalse);

controller1.close();
await flushMicrotasks();
expect(idle, isFalse);
expect(streamGroup.isIdle, isFalse);

controller2.close();
await flushMicrotasks();
expect(idle, isFalse);
expect(streamGroup.isIdle, isFalse);

controller3.close();
await flushMicrotasks();
expect(idle, isTrue);
expect(streamGroup.isIdle, isTrue);
});

test('emits an event each time it becomes idle', () async {
streamGroup.stream.listen(null);

var idle = false;
streamGroup.onIdle.listen((_) => idle = true);

var controller = StreamController<String>();
streamGroup.add(controller.stream);

controller.close();
await flushMicrotasks();
expect(idle, isTrue);
expect(streamGroup.isIdle, isTrue);

idle = false;
controller = StreamController<String>();
streamGroup.add(controller.stream);

await flushMicrotasks();
expect(idle, isFalse);
expect(streamGroup.isIdle, isFalse);

controller.close();
await flushMicrotasks();
expect(idle, isTrue);
expect(streamGroup.isIdle, isTrue);
});

test('emits an event when the group closes', () async {
// It's important that the order of events here stays consistent over
// time, since code may rely on it in subtle ways. Note that this is *not*
// an official guarantee, so the authors of `async` are free to change
// this behavior if they need to.
var idle = false;
var onIdleDone = false;
var streamClosed = false;

streamGroup.onIdle.listen(expectAsync1((_) {
expect(streamClosed, isFalse);
idle = true;
}), onDone: expectAsync0(() {
expect(idle, isTrue);
expect(streamClosed, isTrue);
onIdleDone = true;
}));

streamGroup.stream.drain().then(expectAsync1((_) {
expect(idle, isTrue);
expect(onIdleDone, isFalse);
streamClosed = true;
}));

var controller = StreamController<String>();
streamGroup.add(controller.stream);
streamGroup.close();

await flushMicrotasks();
expect(idle, isFalse);
expect(streamGroup.isIdle, isFalse);

controller.close();
await flushMicrotasks();
expect(idle, isTrue);
expect(streamGroup.isIdle, isTrue);
expect(streamClosed, isTrue);
});
});
}

/// Wait for all microtasks to complete.
Expand Down