Skip to content
This repository has been archived by the owner on Oct 17, 2024. It is now read-only.

Clean up CancelableOperation. #204

Merged
merged 3 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 108 additions & 90 deletions lib/src/cancelable_operation.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@

import 'dart:async';

import 'package:async/async.dart';

import 'utils.dart';

/// An asynchronous operation that can be cancelled.
///
/// The value of this operation is exposed as [value]. When this operation is
Expand All @@ -16,28 +12,25 @@ import 'utils.dart';
class CancelableOperation<T> {
/// The completer that produced this operation.
///
/// This is canceled when [cancel] is called.
/// That completer is canceled when [cancel] is called.
final CancelableCompleter<T> _completer;

CancelableOperation._(this._completer);

/// Creates a [CancelableOperation] wrapping [inner].
/// Creates a [CancelableOperation] with the same result as the [result] future.
///
/// When this operation is canceled, [onCancel] will be called and any value
/// or error produced by [inner] will be discarded. If [onCancel] returns a
/// [Future], it will be forwarded to [cancel].
/// or error later produced by [result] will be discarded.
/// If [onCancel] returns a [Future], it will be returned by [cancel].
///
/// [onCancel] will be called synchronously when the operation is canceled.
/// It's guaranteed to only be called once.
/// The [onCancel] funcion will be called synchronously
/// when the new operation is canceled, and will be called at most once.\
///
/// Calling this constructor is equivalent to creating a [CancelableCompleter]
/// and completing it with [inner].
factory CancelableOperation.fromFuture(Future<T> inner,
{FutureOr Function()? onCancel}) {
var completer = CancelableCompleter<T>(onCancel: onCancel);
completer.complete(inner);
return completer.operation;
}
/// Calling this constructor is equivalent to creating a
/// [CancelableCompleter] and completing it with [result].
factory CancelableOperation.fromFuture(Future<T> result,
{FutureOr Function()? onCancel}) =>
(CancelableCompleter<T>(onCancel: onCancel)..complete(result)).operation;

/// Creates a [CancelableOperation] wrapping [subscription].
///
Expand All @@ -58,12 +51,12 @@ class CancelableOperation<T> {
return completer.operation;
}

/// Returns a [CancelableOperation] that completes with the value of the first
/// Creates a [CancelableOperation] that completes with the value of the first
/// of [operations] to complete.
///
/// Once any of [operations] completes, its result is forwarded to the
/// returned [CancelableOperation] and the rest are cancelled. When the
/// returned operation is cancelled, all the [operations] are cancelled as
/// new [CancelableOperation] and the rest are cancelled. If the
/// bew operation is cancelled, all the [operations] are cancelled as
/// well.
static CancelableOperation<T> race<T>(
Iterable<CancelableOperation<T>> operations) {
Expand All @@ -73,8 +66,8 @@ class CancelableOperation<T> {
}

var done = false;
// Note: if one of the completers has already completed, it's not actually
// cancelled by this.
// Note: if one or more of the completers have already completed,
// they're not actually cancelled by this.
Future<void> _cancelAll() {
done = true;
return Future.wait(operations.map((operation) => operation.cancel()));
Expand All @@ -83,25 +76,29 @@ class CancelableOperation<T> {
var completer = CancelableCompleter<T>(onCancel: _cancelAll);
for (var operation in operations) {
operation.then((value) {
if (!done) completer.complete(_cancelAll().then((_) => value));
if (!done) _cancelAll().whenComplete(() => completer.complete(value));
}, onError: (error, stackTrace) {
if (!done) {
completer.complete(
_cancelAll().then((_) => Future.error(error, stackTrace)));
_cancelAll()
.whenComplete(() => completer.completeError(error, stackTrace));
}
});
}

return completer.operation;
}

/// The value returned by the operation.
/// The result of this operation, if not cancelled.
///
/// This future will not complete if the operation is cancelled.
/// Use [valueOrCancellation] for a future which completes
/// both if the operation is cancelled and if it isn't.
Future<T> get value => _completer._inner.future;

/// Creates a [Stream] containing the result of this operation.
///
/// This is like `value.asStream()`, but if a subscription to the stream is
/// canceled, this is as well.
/// canceled, this operation is as well.
Stream<T> asStream() {
var controller =
StreamController<T>(sync: true, onCancel: _completer._cancel);
Expand All @@ -124,55 +121,51 @@ class CancelableOperation<T> {
/// returned by [cancel], then completes to [cancellationValue].
Future<T?> valueOrCancellation([T? cancellationValue]) {
var completer = Completer<T?>.sync();
value.then((result) => completer.complete(result),
onError: completer.completeError);
value.then(completer.complete, onError: completer.completeError);

_completer._cancelMemo.future.then((_) {
_completer._cancelCompleter.future.then((_) {
completer.complete(cancellationValue);
}, onError: completer.completeError);

return completer.future;
}

/// Registers callbacks to be called when this operation completes.
/// Creates a new cancelable operation to be completed
/// when this operation completes or is cancelled.
///
/// [onValue] and [onError] behave in the same way as [Future.then].
/// The [onValue] and [onError] callbacks behave in the same way as
/// for [Future.then], and the result of those callbacks is used to complete
/// the returned cancelable operation.
///
/// If [onCancel] is provided, and this operation is canceled, the [onCancel]
/// callback is called and the returned operation completes with the result.
/// If [onCancel] is provided, and the this operation is canceled,
/// the [onCancel] callback is called and the returned operation completes
/// with the result returned by that call.
///
/// If [onCancel] is not given, and this operation is canceled, then the
/// returned operation is canceled.
/// If [onCancel] is not provided, and this operation is canceled, then the
/// returned operation is also canceled.
///
/// If [propagateCancel] is `true` and the returned operation is canceled then
/// this operation is canceled. The default is `false`.
/// If [propagateCancel] is `true` and the returned operation is canceled
/// then this operation is canceled. The default is `false`.
CancelableOperation<R> then<R>(FutureOr<R> Function(T) onValue,
{FutureOr<R> Function(Object, StackTrace)? onError,
FutureOr<R> Function()? onCancel,
bool propagateCancel = false}) {
final completer =
CancelableCompleter<R>(onCancel: propagateCancel ? cancel : null);

valueOrCancellation().then((T? result) {
if (!completer.isCanceled) {
if (isCompleted && !isCanceled) {
assert(result is T);
completer.complete(Future.sync(() => onValue(result as T)));
} else if (onCancel != null) {
completer.complete(Future.sync(onCancel));
} else {
completer._cancel();
}
}
}, onError: (Object error, StackTrace stackTrace) {
if (!completer.isCanceled) {
if (onError != null) {
completer.complete(Future.sync(() => onError(error, stackTrace)));
} else {
completer.completeError(error, stackTrace);
}
if (!isCanceled) {
value
.then(onValue, onError: onError)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what is causing a few tests to fail. Before the onValue would not be called if the returned operation is canceled, but now it only matters whether this operation is canceled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'll fix that.

.then(completer.complete, onError: completer.completeError);
}
_completer._cancelCompleter.future.then((_) {
if (onCancel != null) {
completer.complete(Future.sync(onCancel));
} else {
completer._cancel();
}
});

return completer.operation;
}

Expand All @@ -196,24 +189,51 @@ class CancelableOperation<T> {
/// A completer for a [CancelableOperation].
class CancelableCompleter<T> {
/// The completer for the wrapped future.
///
/// At most one of `_inner.future` and `_cancelCompleter.future` will
/// ever complete.
final _inner = Completer<T>();

/// The callback to call if the future is canceled.
final FutureOrCallback? _onCancel;
/// Completed when `cancel` is called.
///
/// At most one of `_inner.future` and `_cancelCompleter.future` will
/// ever complete.
final _cancelCompleter = Completer<void>();

/// The callback to call if the operation is canceled.
final FutureOr<void> Function()? _onCancel;

/// The operation controlled by this completer.
late final operation = CancelableOperation<T>._(this);

/// Set when [complete] or [completeError] is called.
///
/// Completing twice is not allowed.
///
/// If [complete] is called with a future, it's still possible to
/// cancel the operation until that future completes,
/// so this value and [_isCanceled] are not mutually exclusive.
bool _isCompleted = false;

/// Set when [cancel] is called.
///
/// Cancelling twice does nothing, nor does completing after cancelling.
bool _isCanceled = false;

/// Creates a new completer for a [CancelableOperation].
///
/// When the future operation canceled, as long as the completer hasn't yet
/// completed, [onCancel] is called. If [onCancel] returns a [Future], it's
/// forwarded to [CancelableOperation.cancel].
/// The cancelable [operation] can be completed using
/// [complete] or [completeError].
///
/// The [onCancel] function is called if the [operation] is canceled,
/// by calling [CancelableOperation.cancel]
/// before the operation has completed.
/// If [onCancel] returns a [Future],
/// that future is also returned by [CancelableOperation.cancel].
///
/// [onCancel] will be called synchronously when the operation is canceled.
/// It's guaranteed to only be called once.
/// The [onCancel] function will be called at most once.
CancelableCompleter({FutureOr Function()? onCancel}) : _onCancel = onCancel;

/// The operation controlled by this completer.
late final operation = CancelableOperation<T>._(this);

/// Whether the [complete] or [completeError] have been called.
///
/// Once this completer has been completed with either a result or error,
Expand All @@ -223,20 +243,16 @@ class CancelableCompleter<T> {
/// completed before it's [operation] is completed. In that case the
/// [operation] may still be canceled before the result is available.
bool get isCompleted => _isCompleted;
bool _isCompleted = false;

/// Whether the completer was canceled before the result was ready.
bool get isCanceled => _isCanceled;
bool _isCanceled = false;

/// The memoizer for [_cancel].
final _cancelMemo = AsyncMemoizer();

/// Completes [operation] to [value].
/// Completes [operation] with [value].
///
/// If [value] is a [Future] the [operation] will complete with the result of
/// that `Future` once it is available.
/// In that case [isComplete] will be true before the [operation] is complete.
/// If [value] is a [Future] the [operation] will complete
/// with the result of that `Future` once it is available.
/// In that case [isComplete] will be `true` before the [operation]
/// is complete.
///
/// If the type [T] is not nullable [value] may be not be omitted or `null`.
///
Expand All @@ -247,20 +263,19 @@ class CancelableCompleter<T> {
if (_isCompleted) throw StateError('Operation already completed');
_isCompleted = true;

if (value is! Future) {
if (value is! Future<T>) {
if (_isCanceled) return;
_inner.complete(value);
return;
}

final future = value as Future<T>;
if (_isCanceled) {
// Make sure errors from [value] aren't top-leveled.
future.catchError((_) {});
value.ignore();
return;
}

future.then((result) {
value.then((result) {
if (_isCanceled) return;
_inner.complete(result);
}, onError: (Object error, StackTrace stackTrace) {
Expand All @@ -282,24 +297,27 @@ class CancelableCompleter<T> {
_inner.completeError(error, stackTrace);
}

/// Cancel the operation.
/// Cancels the operation.
///
/// If the operation has already completed, prior to being cancelled,
/// this method does nothing.
/// If the operation has already been cancelled, this method returns
/// the same result as the first call to `_cancel`.
///
/// This call is be ignored if the result of the operation is already
/// available.
/// The result of the operation may only be available some time after
/// the completer has been completed (using [complete] or [completeError],
/// which sets [isCompleted] to true) if completed with a [Future].
/// The completer can be cancelled until the result becomes available,
/// even if [isCompleted] is true.
///
/// This call is ignored if this completer has already been canceled.
Future _cancel() {
if (_inner.isCompleted) return Future.value();
Future<void> _cancel() {
if (_inner.isCompleted) return Future.value(null);

return _cancelMemo.runOnce(() {
if (!_isCanceled) {
_isCanceled = true;
var onCancel = _onCancel;
if (onCancel != null) return onCancel();
});
_cancelCompleter
.complete(onCancel == null ? null : Future.sync(onCancel));
}
return _cancelCompleter.future;
}
}
3 changes: 1 addition & 2 deletions lib/src/lazy_stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import 'dart:async';

import 'stream_completer.dart';
import 'utils.dart';

/// A [Stream] wrapper that forwards to another [Stream] that's initialized
/// lazily.
Expand All @@ -15,7 +14,7 @@ import 'utils.dart';
/// produce a `Stream`.
class LazyStream<T> extends Stream<T> {
/// The callback that's called to create the inner stream.
FutureOrCallback<Stream<T>>? _callback;
FutureOr<Stream<T>> Function()? _callback;

/// Creates a single-subscription `Stream` that calls [callback] when it gets
/// a listener and forwards to the returned stream.
Expand Down
12 changes: 0 additions & 12 deletions lib/src/utils.dart

This file was deleted.