Skip to content
This repository has been archived by the owner on Oct 17, 2024. It is now read-only.

Add additional CancelableOperation utilities #194

Merged
merged 6 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 2.9.0

* Add a `CancelableOperation.fromSubscription()` static factory.

* Add a `CancelableOperation.race()` static method.

## 2.8.2

* Deprecate `EventSinkBase`, `StreamSinkBase`, `IOSinkBase`.
Expand Down
51 changes: 51 additions & 0 deletions lib/src/cancelable_operation.dart
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,57 @@ class CancelableOperation<T> {
return completer.operation;
}

/// Creates a [CancelableOperation] wrapping [subscription].
///
/// This overrides [subscription.onDone] and [subscription.onError] so that
/// the returned operation will complete when the subscription completes or
/// emits an error. When this operation is canceled or when it emits an error,
/// the subscription will be canceled.
nex3 marked this conversation as resolved.
Show resolved Hide resolved
static CancelableOperation<void> fromSubscription(
StreamSubscription<void> subscription) {
var completer = CancelableCompleter<void>(onCancel: subscription.cancel);
subscription.onDone(completer.complete);
subscription.onError((Object error, StackTrace stackTrace) {
subscription.cancel();
nex3 marked this conversation as resolved.
Show resolved Hide resolved
completer.completeError(error, stackTrace);
});
return completer.operation;
}

/// Returns a [CancelableOperation] that completes with the value of the first
/// of [operations] to complete.
///
/// Once any of [operations] completes, its result is forwarded to the
/// returned [CancelableOperation] and the rest are cancelled. When the
/// returned operation is cancelled, all the [operations] are cancelled as
/// well.
static CancelableOperation<T> race<T>(
Iterable<CancelableOperation<T>> operations) {
operations = operations.toList();

nex3 marked this conversation as resolved.
Show resolved Hide resolved
var done = false;
// Note: if one of the completers has already completed, it's not actually
// cancelled by this.
Future<void> _cancelAll() {
done = true;
return Future.wait(operations.map((operation) => operation.cancel()));
}

var completer = CancelableCompleter<T>(onCancel: _cancelAll);
for (var operation in operations) {
operation.then((value) {
if (!done) completer.complete(_cancelAll().then((_) => value));
lrhn marked this conversation as resolved.
Show resolved Hide resolved
}, onError: (error, stackTrace) {
if (!done) {
completer.complete(
_cancelAll().then((_) => Future.error(error, stackTrace)));
lrhn marked this conversation as resolved.
Show resolved Hide resolved
}
});
}

return completer.operation;
}

/// The value returned by the operation.
Future<T> get value => _completer._inner.future;

Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: async
version: 2.8.3
version: 2.9.0

description: Utility functions and classes related to the 'dart:async' library.
repository: https://github.com/dart-lang/async
Expand Down
101 changes: 101 additions & 0 deletions test/cancelable_operation_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,30 @@ void main() {
expect(operation.value, throwsA('error'));
});
});

group('CancelableOperation.fromSubscription', () {
test('forwards a done event once it completes', () async {
var controller = StreamController<void>();
var operationCompleted = false;
CancelableOperation.fromSubscription(controller.stream.listen(null))
.then((_) {
operationCompleted = true;
});

await flushMicrotasks();
expect(operationCompleted, isFalse);

controller.close();
await flushMicrotasks();
expect(operationCompleted, isTrue);
});

test('forwards errors', () {
var operation = CancelableOperation.fromSubscription(
Stream.error('error').listen(null));
expect(operation.value, throwsA('error'));
});
});
});

group('when canceled', () {
Expand Down Expand Up @@ -237,6 +261,22 @@ void main() {
await flushMicrotasks();
expect(fired, isTrue);
});

test('Stream.fromSubscription() cancels the subscription', () async {
var canceled = false;
var controller = StreamController<void>(onCancel: () {
canceled = true;
});
var operation =
CancelableOperation.fromSubscription(controller.stream.listen(null));

await flushMicrotasks();
expect(canceled, isFalse);

expect(operation.cancel(), completes);
await flushMicrotasks();
expect(canceled, isTrue);
});
});

group('asStream()', () {
Expand Down Expand Up @@ -440,4 +480,65 @@ void main() {
});
});
});

group('race()', () {
late bool canceled1;
late CancelableCompleter<int> completer1;
late bool canceled2;
late CancelableCompleter<int> completer2;
late bool canceled3;
late CancelableCompleter<int> completer3;
late CancelableOperation<int> operation;
setUp(() {
canceled1 = false;
completer1 = CancelableCompleter<int>(onCancel: () {
canceled1 = true;
});

canceled2 = false;
completer2 = CancelableCompleter<int>(onCancel: () {
canceled2 = true;
});

canceled3 = false;
completer3 = CancelableCompleter<int>(onCancel: () {
canceled3 = true;
});

operation = CancelableOperation.race(
[completer1.operation, completer2.operation, completer3.operation]);
});

test('returns the first value to complete', () {
completer1.complete(1);
completer2.complete(2);
completer3.complete(3);

expect(operation.value, completion(equals(1)));
});

test('throws the first error to complete', () {
completer1.completeError("error 1");
completer2.completeError("error 2");
completer3.completeError("error 3");

expect(operation.value, throwsA("error 1"));
});

test('cancels any completers that haven\'t completed', () async {
completer1.complete(1);
await expectLater(operation.value, completion(equals(1)));
expect(canceled1, isFalse);
expect(canceled2, isTrue);
expect(canceled3, isTrue);
});

test('cancels all completers when the operation is completed', () async {
await operation.cancel();

expect(canceled1, isTrue);
expect(canceled2, isTrue);
expect(canceled3, isTrue);
});
});
}