Skip to content

Commit

Permalink
Add CancelableCompleter.completeOperation (dart-archive/async#215)
Browse files Browse the repository at this point in the history
Towards dart-lang/async#210

Combined with `CancelableOperation.thenOperation` this allows chaining
cancelable work that can be canceled at multiple points.
  • Loading branch information
natebosch authored Apr 19, 2022
1 parent c3fe316 commit c923caa
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkgs/async/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

* Add `CancelableOperation.thenOperation` which gives more flexibility to
complete the resulting operation.
* Add `CancelableCompleter.completeOperation`.

## 2.9.0

Expand Down
28 changes: 28 additions & 0 deletions pkgs/async/lib/src/cancelable_operation.dart
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,34 @@ class CancelableCompleter<T> {
});
}

/// Makes this [CancelableCompleter.operation] complete with the same result
/// as [result].
///
/// If [propagateCancel] is `true` (the default), and the [operation] of this
/// completer is canceled before [result] completes, then [result] is also
/// canceled.
void completeOperation(CancelableOperation<T> result,
{bool propagateCancel = true}) {
if (!_mayComplete) throw StateError("Already completed");
_mayComplete = false;
if (isCanceled) {
if (propagateCancel) result.cancel();
result.value.ignore();
return;
}
result.then<void>((value) {
_inner?.complete(
value); // _inner is set to null if this.operation is cancelled.
}, onError: (error, stack) {
_inner?.completeError(error, stack);
}, onCancel: () {
operation.cancel();
});
if (propagateCancel) {
_cancelCompleter?.future.whenComplete(result.cancel);
}
}

/// Completer to use for completing with a result.
///
/// Returns `null` if it's not possible to complete any more.
Expand Down
91 changes: 91 additions & 0 deletions pkgs/async/test/cancelable_operation_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,39 @@ void main() {
expect(completer.operation.isCompleted, isTrue);
});

test('sends values from a cancelable operation to the future', () {
expect(completer.operation.value, completion(equals(1)));
completer
.completeOperation(CancelableOperation.fromFuture(Future.value(1)));
});

test('sends values from a completed cancelable operation to the future',
() async {
final operation = CancelableOperation.fromFuture(Future.value(1));
await operation.value;
expect(completer.operation.value, completion(equals(1)));
completer.completeOperation(operation);
});

test('sends errors from a cancelable operation to the future', () {
expect(completer.operation.value, throwsA('error'));
completer.completeOperation(
CancelableOperation.fromFuture(Future.error('error')..ignore()));
});

test('sends errors from a completed cancelable operation to the future',
() async {
final operation =
CancelableOperation.fromFuture(Future.error('error')..ignore());
try {
await operation.value;
} on Object {
// ignore
}
expect(completer.operation.value, throwsA('error'));
completer.completeOperation(operation);
});

test('sends values to valueOrCancellation', () {
expect(completer.operation.valueOrCancellation(), completion(equals(1)));
completer.complete(1);
Expand Down Expand Up @@ -292,6 +325,64 @@ void main() {
await flushMicrotasks();
expect(cancelCompleted, isTrue);
});

group('completeOperation', () {
test('sends cancellation from a cancelable operation', () async {
final completer = CancelableCompleter<void>();
completer.operation.value.whenComplete(expectAsync0(() {}, count: 0));
completer
.completeOperation(CancelableCompleter<void>().operation..cancel());
await completer.operation.valueOrCancellation();
expect(completer.operation.isCanceled, true);
});

test('sends errors from a completed cancelable operation to the future',
() async {
final operation = CancelableCompleter<void>().operation..cancel();
await operation.valueOrCancellation();
final completer = CancelableCompleter<void>();
completer.operation.value.whenComplete(expectAsync0(() {}, count: 0));
completer.completeOperation(operation);
await completer.operation.valueOrCancellation();
expect(completer.operation.isCanceled, true);
});

test('propagates cancellation', () {
final completer = CancelableCompleter<void>();
final operation =
CancelableCompleter<void>(onCancel: expectAsync0(() {}, count: 1))
.operation;
completer.completeOperation(operation);
completer.operation.cancel();
});

test('propagates cancellation from already canceld completer', () async {
final completer = CancelableCompleter<void>()..operation.cancel();
await completer.operation.valueOrCancellation();
final operation =
CancelableCompleter<void>(onCancel: expectAsync0(() {}, count: 1))
.operation;
completer.completeOperation(operation);
});
test('cancel propagation can be disabled', () {
final completer = CancelableCompleter<void>();
final operation =
CancelableCompleter<void>(onCancel: expectAsync0(() {}, count: 0))
.operation;
completer.completeOperation(operation, propagateCancel: false);
completer.operation.cancel();
});

test('cancel propagation can be disabled from already canceled completed',
() async {
final completer = CancelableCompleter<void>()..operation.cancel();
await completer.operation.valueOrCancellation();
final operation =
CancelableCompleter<void>(onCancel: expectAsync0(() {}, count: 0))
.operation;
completer.completeOperation(operation, propagateCancel: false);
});
});
});

group('asStream()', () {
Expand Down

0 comments on commit c923caa

Please sign in to comment.