From cbce1bc273e3499d5c5d05076579cdce84a372c2 Mon Sep 17 00:00:00 2001 From: Natalie Weizenbaum Date: Wed, 1 Sep 2021 17:27:55 -0700 Subject: [PATCH] Add additional CancelableOperation utilities (dart-lang/async#194) * Add CancelableOperation.fromSubscription * Add CancelableOperation.race() --- pkgs/async/CHANGELOG.md | 4 + pkgs/async/lib/src/cancelable_operation.dart | 56 +++++++++ .../async/test/cancelable_operation_test.dart | 116 ++++++++++++++++++ 3 files changed, 176 insertions(+) diff --git a/pkgs/async/CHANGELOG.md b/pkgs/async/CHANGELOG.md index 9acccf91..815c523e 100644 --- a/pkgs/async/CHANGELOG.md +++ b/pkgs/async/CHANGELOG.md @@ -2,6 +2,10 @@ * Add `StreamExtensions.firstOrNull`. +* Add a `CancelableOperation.fromSubscription()` static factory. + +* Add a `CancelableOperation.race()` static method. + ## 2.8.2 * Deprecate `EventSinkBase`, `StreamSinkBase`, `IOSinkBase`. diff --git a/pkgs/async/lib/src/cancelable_operation.dart b/pkgs/async/lib/src/cancelable_operation.dart index e04af905..f03f6729 100644 --- a/pkgs/async/lib/src/cancelable_operation.dart +++ b/pkgs/async/lib/src/cancelable_operation.dart @@ -39,6 +39,62 @@ class CancelableOperation { return completer.operation; } + /// Creates a [CancelableOperation] wrapping [subscription]. + /// + /// This overrides [subscription.onDone] and [subscription.onError] so that + /// the returned operation will complete when the subscription completes or + /// emits an error. When this operation is canceled or when it emits an error, + /// the subscription will be canceled (unlike + /// `CancelableOperation.fromFuture(subscription.asFuture())`). + static CancelableOperation fromSubscription( + StreamSubscription subscription) { + var completer = CancelableCompleter(onCancel: subscription.cancel); + subscription.onDone(completer.complete); + subscription.onError((Object error, StackTrace stackTrace) { + subscription.cancel().whenComplete(() { + completer.completeError(error, stackTrace); + }); + }); + return completer.operation; + } + + /// Returns a [CancelableOperation] that completes with the value of the first + /// of [operations] to complete. + /// + /// Once any of [operations] completes, its result is forwarded to the + /// returned [CancelableOperation] and the rest are cancelled. When the + /// returned operation is cancelled, all the [operations] are cancelled as + /// well. + static CancelableOperation race( + Iterable> operations) { + operations = operations.toList(); + if (operations.isEmpty) { + throw ArgumentError.value("May not be empty", "operations"); + } + + var done = false; + // Note: if one of the completers has already completed, it's not actually + // cancelled by this. + Future _cancelAll() { + done = true; + return Future.wait(operations.map((operation) => operation.cancel())); + } + + var completer = CancelableCompleter(onCancel: _cancelAll); + for (var operation in operations) { + operation.then((value) { + if (!done) completer.complete(_cancelAll().then((_) => value)); + }, onError: (error, stackTrace) { + if (!done) { + completer.complete( + _cancelAll().then((_) => Future.error(error, stackTrace))); + } + }); + } + + return completer.operation; + } + /// The value returned by the operation. Future get value => _completer._inner.future; diff --git a/pkgs/async/test/cancelable_operation_test.dart b/pkgs/async/test/cancelable_operation_test.dart index a46dfe2b..fa41001d 100644 --- a/pkgs/async/test/cancelable_operation_test.dart +++ b/pkgs/async/test/cancelable_operation_test.dart @@ -122,6 +122,30 @@ void main() { expect(operation.value, throwsA('error')); }); }); + + group('CancelableOperation.fromSubscription', () { + test('forwards a done event once it completes', () async { + var controller = StreamController(); + var operationCompleted = false; + CancelableOperation.fromSubscription(controller.stream.listen(null)) + .then((_) { + operationCompleted = true; + }); + + await flushMicrotasks(); + expect(operationCompleted, isFalse); + + controller.close(); + await flushMicrotasks(); + expect(operationCompleted, isTrue); + }); + + test('forwards errors', () { + var operation = CancelableOperation.fromSubscription( + Stream.error('error').listen(null)); + expect(operation.value, throwsA('error')); + }); + }); }); group('when canceled', () { @@ -237,6 +261,37 @@ void main() { await flushMicrotasks(); expect(fired, isTrue); }); + + test('CancelableOperation.fromSubscription() cancels the subscription', + () async { + var cancelCompleter = Completer(); + var canceled = false; + var controller = StreamController(onCancel: () { + canceled = true; + return cancelCompleter.future; + }); + var operation = + CancelableOperation.fromSubscription(controller.stream.listen(null)); + + await flushMicrotasks(); + expect(canceled, isFalse); + + // The `cancel()` call shouldn't complete until + // `StreamSubscription.cancel` completes. + var cancelCompleted = false; + expect( + operation.cancel().then((_) { + cancelCompleted = true; + }), + completes); + await flushMicrotasks(); + expect(canceled, isTrue); + expect(cancelCompleted, isFalse); + + cancelCompleter.complete(); + await flushMicrotasks(); + expect(cancelCompleted, isTrue); + }); }); group('asStream()', () { @@ -440,4 +495,65 @@ void main() { }); }); }); + + group('race()', () { + late bool canceled1; + late CancelableCompleter completer1; + late bool canceled2; + late CancelableCompleter completer2; + late bool canceled3; + late CancelableCompleter completer3; + late CancelableOperation operation; + setUp(() { + canceled1 = false; + completer1 = CancelableCompleter(onCancel: () { + canceled1 = true; + }); + + canceled2 = false; + completer2 = CancelableCompleter(onCancel: () { + canceled2 = true; + }); + + canceled3 = false; + completer3 = CancelableCompleter(onCancel: () { + canceled3 = true; + }); + + operation = CancelableOperation.race( + [completer1.operation, completer2.operation, completer3.operation]); + }); + + test('returns the first value to complete', () { + completer1.complete(1); + completer2.complete(2); + completer3.complete(3); + + expect(operation.value, completion(equals(1))); + }); + + test('throws the first error to complete', () { + completer1.completeError("error 1"); + completer2.completeError("error 2"); + completer3.completeError("error 3"); + + expect(operation.value, throwsA("error 1")); + }); + + test('cancels any completers that haven\'t completed', () async { + completer1.complete(1); + await expectLater(operation.value, completion(equals(1))); + expect(canceled1, isFalse); + expect(canceled2, isTrue); + expect(canceled3, isTrue); + }); + + test('cancels all completers when the operation is completed', () async { + await operation.cancel(); + + expect(canceled1, isTrue); + expect(canceled2, isTrue); + expect(canceled3, isTrue); + }); + }); }