Skip to content

Commit

Permalink
Add additional CancelableOperation utilities (dart-archive/async#194)
Browse files Browse the repository at this point in the history
* Add CancelableOperation.fromSubscription

* Add CancelableOperation.race()
  • Loading branch information
nex3 authored Sep 2, 2021
1 parent f785214 commit cbce1bc
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkgs/async/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

* Add `StreamExtensions.firstOrNull`.

* Add a `CancelableOperation.fromSubscription()` static factory.

* Add a `CancelableOperation.race()` static method.

## 2.8.2

* Deprecate `EventSinkBase`, `StreamSinkBase`, `IOSinkBase`.
Expand Down
56 changes: 56 additions & 0 deletions pkgs/async/lib/src/cancelable_operation.dart
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,62 @@ class CancelableOperation<T> {
return completer.operation;
}

/// Creates a [CancelableOperation] wrapping [subscription].
///
/// This overrides [subscription.onDone] and [subscription.onError] so that
/// the returned operation will complete when the subscription completes or
/// emits an error. When this operation is canceled or when it emits an error,
/// the subscription will be canceled (unlike
/// `CancelableOperation.fromFuture(subscription.asFuture())`).
static CancelableOperation<void> fromSubscription(
StreamSubscription<void> subscription) {
var completer = CancelableCompleter<void>(onCancel: subscription.cancel);
subscription.onDone(completer.complete);
subscription.onError((Object error, StackTrace stackTrace) {
subscription.cancel().whenComplete(() {
completer.completeError(error, stackTrace);
});
});
return completer.operation;
}

/// Returns a [CancelableOperation] that completes with the value of the first
/// of [operations] to complete.
///
/// Once any of [operations] completes, its result is forwarded to the
/// returned [CancelableOperation] and the rest are cancelled. When the
/// returned operation is cancelled, all the [operations] are cancelled as
/// well.
static CancelableOperation<T> race<T>(
Iterable<CancelableOperation<T>> operations) {
operations = operations.toList();
if (operations.isEmpty) {
throw ArgumentError.value("May not be empty", "operations");
}

var done = false;
// Note: if one of the completers has already completed, it's not actually
// cancelled by this.
Future<void> _cancelAll() {
done = true;
return Future.wait(operations.map((operation) => operation.cancel()));
}

var completer = CancelableCompleter<T>(onCancel: _cancelAll);
for (var operation in operations) {
operation.then((value) {
if (!done) completer.complete(_cancelAll().then((_) => value));
}, onError: (error, stackTrace) {
if (!done) {
completer.complete(
_cancelAll().then((_) => Future.error(error, stackTrace)));
}
});
}

return completer.operation;
}

/// The value returned by the operation.
Future<T> get value => _completer._inner.future;

Expand Down
116 changes: 116 additions & 0 deletions pkgs/async/test/cancelable_operation_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,30 @@ void main() {
expect(operation.value, throwsA('error'));
});
});

group('CancelableOperation.fromSubscription', () {
test('forwards a done event once it completes', () async {
var controller = StreamController<void>();
var operationCompleted = false;
CancelableOperation.fromSubscription(controller.stream.listen(null))
.then((_) {
operationCompleted = true;
});

await flushMicrotasks();
expect(operationCompleted, isFalse);

controller.close();
await flushMicrotasks();
expect(operationCompleted, isTrue);
});

test('forwards errors', () {
var operation = CancelableOperation.fromSubscription(
Stream.error('error').listen(null));
expect(operation.value, throwsA('error'));
});
});
});

group('when canceled', () {
Expand Down Expand Up @@ -237,6 +261,37 @@ void main() {
await flushMicrotasks();
expect(fired, isTrue);
});

test('CancelableOperation.fromSubscription() cancels the subscription',
() async {
var cancelCompleter = Completer<void>();
var canceled = false;
var controller = StreamController<void>(onCancel: () {
canceled = true;
return cancelCompleter.future;
});
var operation =
CancelableOperation.fromSubscription(controller.stream.listen(null));

await flushMicrotasks();
expect(canceled, isFalse);

// The `cancel()` call shouldn't complete until
// `StreamSubscription.cancel` completes.
var cancelCompleted = false;
expect(
operation.cancel().then((_) {
cancelCompleted = true;
}),
completes);
await flushMicrotasks();
expect(canceled, isTrue);
expect(cancelCompleted, isFalse);

cancelCompleter.complete();
await flushMicrotasks();
expect(cancelCompleted, isTrue);
});
});

group('asStream()', () {
Expand Down Expand Up @@ -440,4 +495,65 @@ void main() {
});
});
});

group('race()', () {
late bool canceled1;
late CancelableCompleter<int> completer1;
late bool canceled2;
late CancelableCompleter<int> completer2;
late bool canceled3;
late CancelableCompleter<int> completer3;
late CancelableOperation<int> operation;
setUp(() {
canceled1 = false;
completer1 = CancelableCompleter<int>(onCancel: () {
canceled1 = true;
});

canceled2 = false;
completer2 = CancelableCompleter<int>(onCancel: () {
canceled2 = true;
});

canceled3 = false;
completer3 = CancelableCompleter<int>(onCancel: () {
canceled3 = true;
});

operation = CancelableOperation.race(
[completer1.operation, completer2.operation, completer3.operation]);
});

test('returns the first value to complete', () {
completer1.complete(1);
completer2.complete(2);
completer3.complete(3);

expect(operation.value, completion(equals(1)));
});

test('throws the first error to complete', () {
completer1.completeError("error 1");
completer2.completeError("error 2");
completer3.completeError("error 3");

expect(operation.value, throwsA("error 1"));
});

test('cancels any completers that haven\'t completed', () async {
completer1.complete(1);
await expectLater(operation.value, completion(equals(1)));
expect(canceled1, isFalse);
expect(canceled2, isTrue);
expect(canceled3, isTrue);
});

test('cancels all completers when the operation is completed', () async {
await operation.cancel();

expect(canceled1, isTrue);
expect(canceled2, isTrue);
expect(canceled3, isTrue);
});
});
}

0 comments on commit cbce1bc

Please sign in to comment.