From 9fb879970f6a0fc480a7dbe6975e482bddaf3722 Mon Sep 17 00:00:00 2001 From: Jacob MacDonald Date: Tue, 17 Mar 2020 08:43:18 -0700 Subject: [PATCH 01/15] update pubspec/analysis_options --- analysis_options.yaml | 2 ++ pubspec.yaml | 46 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/analysis_options.yaml b/analysis_options.yaml index 18c26e0..b58ef99 100644 --- a/analysis_options.yaml +++ b/analysis_options.yaml @@ -7,6 +7,8 @@ analyzer: todo: ignore # Lint provided by pkg:pedantic – should fix this! unawaited_futures: ignore + enable-experiment: + - non-nullable linter: rules: diff --git a/pubspec.yaml b/pubspec.yaml index 599ccdd..7172800 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -5,7 +5,7 @@ description: Utility functions and classes related to the 'dart:async' library. homepage: https://www.github.com/dart-lang/async environment: - sdk: '>=2.2.0 <3.0.0' + sdk: '>=2.8.0-dev.10 <3.0.0' dependencies: collection: ^1.5.0 @@ -15,3 +15,47 @@ dev_dependencies: stack_trace: ^1.0.0 test: ^1.0.0 pedantic: ^1.0.0 + +dependency_overrides: + boolean_selector: + git: + url: git://github.com/dart-lang/boolean_selector.git + ref: null_safety + charcode: + git: + url: git://github.com/dart-lang/charcode.git + ref: null_safety + collection: + git: + url: git://github.com/dart-lang/collection.git + ref: null-safety-migration + matcher: + git: + url: git://github.com/dart-lang/matcher.git + ref: null_safety + path: + git: + url: git://github.com/dart-lang/path.git + ref: null_safety_insert_workaround + source_span: + git: + url: git://github.com/dart-lang/source_span.git + ref: null_safety + stack_trace: + git: + url: git://github.com/dart-lang/stack_trace.git + ref: null_safety + string_scanner: + git: + url: git://github.com/dart-lang/string_scanner.git + ref: null_safety + term_glyph: + git: + url: git://github.com/dart-lang/term_glyph.git + ref: null_safety + test_api: + git: + url: git://github.com/dart-lang/test.git + ref: null_safety + path: pkgs/test_api + \ No newline at end of file From 4710946b1cc3f6d399489d295f5664bf99807f13 Mon Sep 17 00:00:00 2001 From: Jacob MacDonald Date: Tue, 17 Mar 2020 08:43:26 -0700 Subject: [PATCH 02/15] update delegates --- lib/src/delegate/event_sink.dart | 2 +- lib/src/delegate/future.dart | 6 +++--- lib/src/delegate/stream_sink.dart | 2 +- lib/src/delegate/stream_subscription.dart | 10 +++++----- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/src/delegate/event_sink.dart b/lib/src/delegate/event_sink.dart index bc33b19..33d88e9 100644 --- a/lib/src/delegate/event_sink.dart +++ b/lib/src/delegate/event_sink.dart @@ -33,7 +33,7 @@ class DelegatingEventSink implements EventSink { } @override - void addError(error, [StackTrace stackTrace]) { + void addError(error, [StackTrace? stackTrace]) { _sink.addError(error, stackTrace); } diff --git a/lib/src/delegate/future.dart b/lib/src/delegate/future.dart index 984caf6..1155452 100644 --- a/lib/src/delegate/future.dart +++ b/lib/src/delegate/future.dart @@ -25,11 +25,11 @@ class DelegatingFuture implements Future { Stream asStream() => _future.asStream(); @override - Future catchError(Function onError, {bool Function(Object error) test}) => + Future catchError(Function onError, {bool Function(Object error)? test}) => _future.catchError(onError, test: test); @override - Future then(FutureOr Function(T) onValue, {Function onError}) => + Future then(FutureOr Function(T) onValue, {Function? onError}) => _future.then(onValue, onError: onError); @override @@ -37,6 +37,6 @@ class DelegatingFuture implements Future { _future.whenComplete(action); @override - Future timeout(Duration timeLimit, {FutureOr Function() onTimeout}) => + Future timeout(Duration timeLimit, {FutureOr Function()? onTimeout}) => _future.timeout(timeLimit, onTimeout: onTimeout); } diff --git a/lib/src/delegate/stream_sink.dart b/lib/src/delegate/stream_sink.dart index 9005d1d..ad76e90 100644 --- a/lib/src/delegate/stream_sink.dart +++ b/lib/src/delegate/stream_sink.dart @@ -36,7 +36,7 @@ class DelegatingStreamSink implements StreamSink { } @override - void addError(error, [StackTrace stackTrace]) { + void addError(error, [StackTrace? stackTrace]) { _sink.addError(error, stackTrace); } diff --git a/lib/src/delegate/stream_subscription.dart b/lib/src/delegate/stream_subscription.dart index 392e27b..45b1134 100644 --- a/lib/src/delegate/stream_subscription.dart +++ b/lib/src/delegate/stream_subscription.dart @@ -31,22 +31,22 @@ class DelegatingStreamSubscription implements StreamSubscription { : TypeSafeStreamSubscription(subscription); @override - void onData(void Function(T) handleData) { + void onData(void Function(T)? handleData) { _source.onData(handleData); } @override - void onError(Function handleError) { + void onError(Function? handleError) { _source.onError(handleError); } @override - void onDone(void Function() handleDone) { + void onDone(void Function()? handleDone) { _source.onDone(handleDone); } @override - void pause([Future resumeFuture]) { + void pause([Future? resumeFuture]) { _source.pause(resumeFuture); } @@ -59,7 +59,7 @@ class DelegatingStreamSubscription implements StreamSubscription { Future cancel() => _source.cancel(); @override - Future asFuture([E futureValue]) => _source.asFuture(futureValue); + Future asFuture([E? futureValue]) => _source.asFuture(futureValue); @override bool get isPaused => _source.isPaused; From dc177a1900eb416c46a1a9924f23a11d7789a264 Mon Sep 17 00:00:00 2001 From: Jacob MacDonald Date: Tue, 17 Mar 2020 09:05:59 -0700 Subject: [PATCH 03/15] migrate result classes --- lib/src/result/capture_sink.dart | 2 +- lib/src/result/error.dart | 8 ++++---- lib/src/result/future.dart | 4 ++-- lib/src/result/release_sink.dart | 2 +- lib/src/result/result.dart | 26 +++++++++++++------------- lib/src/result/value.dart | 2 +- 6 files changed, 22 insertions(+), 22 deletions(-) diff --git a/lib/src/result/capture_sink.dart b/lib/src/result/capture_sink.dart index a7c3a59..562f5f9 100644 --- a/lib/src/result/capture_sink.dart +++ b/lib/src/result/capture_sink.dart @@ -18,7 +18,7 @@ class CaptureSink implements EventSink { } @override - void addError(Object error, [StackTrace stackTrace]) { + void addError(Object error, [StackTrace? stackTrace]) { _sink.add(Result.error(error, stackTrace)); } diff --git a/lib/src/result/error.dart b/lib/src/result/error.dart index 76e0275..cc26537 100644 --- a/lib/src/result/error.dart +++ b/lib/src/result/error.dart @@ -8,19 +8,19 @@ import 'result.dart'; import 'value.dart'; /// A result representing a thrown error. -class ErrorResult implements Result { +class ErrorResult implements Result { /// The error object that was thrown. final Object error; /// The stack trace corresponding to where [error] was thrown. - final StackTrace stackTrace; + final StackTrace? stackTrace; @override bool get isValue => false; @override bool get isError => true; @override - ValueResult get asValue => null; + ValueResult? get asValue => null; @override ErrorResult get asError => this; @@ -37,7 +37,7 @@ class ErrorResult implements Result { } @override - Future get asFuture => Future.error(error, stackTrace); + Future get asFuture => Future.error(error, stackTrace); /// Calls an error handler with the error and stacktrace. /// diff --git a/lib/src/result/future.dart b/lib/src/result/future.dart index ff30546..20a5ebf 100644 --- a/lib/src/result/future.dart +++ b/lib/src/result/future.dart @@ -16,8 +16,8 @@ class ResultFuture extends DelegatingFuture { /// The result of the wrapped [Future], if it's completed. /// /// If it hasn't completed yet, this will be `null`. - Result get result => _result; - Result _result; + Result? get result => _result; + Result? _result; ResultFuture(Future future) : super(future) { Result.capture(future).then((result) { diff --git a/lib/src/result/release_sink.dart b/lib/src/result/release_sink.dart index 5d8267a..bf6dd50 100644 --- a/lib/src/result/release_sink.dart +++ b/lib/src/result/release_sink.dart @@ -18,7 +18,7 @@ class ReleaseSink implements EventSink> { } @override - void addError(Object error, [StackTrace stackTrace]) { + void addError(Object error, [StackTrace? stackTrace]) { // Errors may be added by intermediate processing, even if it is never // added by CaptureSink. _sink.addError(error, stackTrace); diff --git a/lib/src/result/result.dart b/lib/src/result/result.dart index e604782..c296614 100644 --- a/lib/src/result/result.dart +++ b/lib/src/result/result.dart @@ -64,7 +64,7 @@ abstract class Result { try { return ValueResult(computation()); } catch (e, s) { - return ErrorResult(e, s); + return ErrorResult(e as Object, s); } } @@ -76,7 +76,7 @@ abstract class Result { /// Creates a `Result` holding an error. /// /// Alias for [ErrorResult.ErrorResult]. - factory Result.error(Object error, [StackTrace stackTrace]) => + factory Result.error(Object error, [StackTrace? stackTrace]) => ErrorResult(error, stackTrace); /// Captures the result of a future into a `Result` future. @@ -85,7 +85,7 @@ abstract class Result { /// Errors have been converted to an [ErrorResult] value. static Future> capture(Future future) { return future.then((value) => ValueResult(value), - onError: (error, StackTrace stackTrace) => + onError: (Object error, StackTrace stackTrace) => ErrorResult(error, stackTrace)); } @@ -97,9 +97,9 @@ abstract class Result { /// wrapped as a [Result.value]. /// The returned future will never have an error. static Future>> captureAll(Iterable> elements) { - var results = >[]; + var results = ?>[]; var pending = 0; - Completer>> completer; + late Completer>> completer; for (var element in elements) { if (element is Future) { var i = results.length; @@ -108,7 +108,7 @@ abstract class Result { Result.capture(element).then((result) { results[i] = result; if (--pending == 0) { - completer.complete(results); + completer.complete(results.cast>().toList()); } }); } else { @@ -116,7 +116,7 @@ abstract class Result { } } if (pending == 0) { - return Future>>.value(results); + return Future.value(results.cast>().toList()); } completer = Completer>>(); return completer.future; @@ -172,8 +172,8 @@ abstract class Result { /// Otherwise both levels of results are value results, and a single /// result with the value is returned. static Result flatten(Result> result) { - if (result.isValue) return result.asValue.value; - return result.asError; + if (result.isValue) return result.asValue!.value; + return result.asError!; } /// Converts a sequence of results to a result of a list. @@ -184,9 +184,9 @@ abstract class Result { var values = []; for (var result in results) { if (result.isValue) { - values.add(result.asValue.value); + values.add(result.asValue!.value); } else { - return result.asError; + return result.asError!; } } return Result>.value(values); @@ -205,12 +205,12 @@ abstract class Result { /// If this is a value result, returns itself. /// /// Otherwise returns `null`. - ValueResult get asValue; + ValueResult? get asValue; /// If this is an error result, returns itself. /// /// Otherwise returns `null`. - ErrorResult get asError; + ErrorResult? get asError; /// Completes a completer with this result. void complete(Completer completer); diff --git a/lib/src/result/value.dart b/lib/src/result/value.dart index 5c1a60f..7cfc474 100644 --- a/lib/src/result/value.dart +++ b/lib/src/result/value.dart @@ -19,7 +19,7 @@ class ValueResult implements Result { @override ValueResult get asValue => this; @override - ErrorResult get asError => null; + ErrorResult? get asError => null; ValueResult(this.value); From 20799c2aeb5d50f35b861b25d0d5f93f7b300ab0 Mon Sep 17 00:00:00 2001 From: Jacob MacDonald Date: Tue, 17 Mar 2020 09:25:17 -0700 Subject: [PATCH 04/15] update stream_sink_transformer, will need further update once non-nullable stack traces land --- lib/src/stream_sink_transformer.dart | 6 ++-- .../handler_transformer.dart | 30 ++++++++++++------- .../stream_transformer_wrapper.dart | 2 +- 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/lib/src/stream_sink_transformer.dart b/lib/src/stream_sink_transformer.dart index 1dc27ed..1ac5073 100644 --- a/lib/src/stream_sink_transformer.dart +++ b/lib/src/stream_sink_transformer.dart @@ -34,9 +34,9 @@ abstract class StreamSinkTransformer { /// they're passed are forwarded to the inner sink. If a handler is omitted, /// the event is passed through unaltered. factory StreamSinkTransformer.fromHandlers( - {void Function(S, EventSink) handleData, - void Function(Object, StackTrace, EventSink) handleError, - void Function(EventSink) handleDone}) { + {void Function(S, EventSink)? handleData, + void Function(Object, StackTrace?, EventSink)? handleError, + void Function(EventSink)? handleDone}) { return HandlerTransformer(handleData, handleError, handleDone); } diff --git a/lib/src/stream_sink_transformer/handler_transformer.dart b/lib/src/stream_sink_transformer/handler_transformer.dart index 8d37ce3..73a4717 100644 --- a/lib/src/stream_sink_transformer/handler_transformer.dart +++ b/lib/src/stream_sink_transformer/handler_transformer.dart @@ -11,8 +11,11 @@ import '../delegate/stream_sink.dart'; typedef HandleData = void Function(S data, EventSink sink); /// The type of the callback for handling error events. +/// +/// TODO: Update to take a non-nullable StackTrace once that change lands in +/// the sdk. typedef HandleError = void Function( - Object error, StackTrace stackTrace, EventSink sink); + Object error, StackTrace? stackTrace, EventSink sink); /// The type of the callback for handling done events. typedef HandleDone = void Function(EventSink sink); @@ -20,13 +23,13 @@ typedef HandleDone = void Function(EventSink sink); /// A [StreamSinkTransformer] that delegates events to the given handlers. class HandlerTransformer implements StreamSinkTransformer { /// The handler for data events. - final HandleData _handleData; + final HandleData? _handleData; /// The handler for error events. - final HandleError _handleError; + final HandleError? _handleError; /// The handler for done events. - final HandleDone _handleDone; + final HandleDone? _handleDone; HandlerTransformer(this._handleData, this._handleError, this._handleDone); @@ -55,19 +58,23 @@ class _HandlerSink implements StreamSink { @override void add(S event) { - if (_transformer._handleData == null) { + var handleData = _transformer._handleData; + if (handleData == null) { _inner.add(event as T); } else { - _transformer._handleData(event, _safeCloseInner); + handleData(event, _safeCloseInner); } } @override - void addError(error, [StackTrace stackTrace]) { - if (_transformer._handleError == null) { + void addError(error, [StackTrace? stackTrace]) { + var handleError = _transformer._handleError; + if (handleError == null) { _inner.addError(error, stackTrace); } else { - _transformer._handleError(error, stackTrace, _safeCloseInner); + /// TODO: Update to pass AsyncError.defaultStackTrace(error) once that + /// lands in the sdk. + handleError(error, stackTrace, _safeCloseInner); } } @@ -82,9 +89,10 @@ class _HandlerSink implements StreamSink { @override Future close() { - if (_transformer._handleDone == null) return _inner.close(); + var handleDone = _transformer._handleDone; + if (handleDone == null) return _inner.close(); - _transformer._handleDone(_safeCloseInner); + handleDone(_safeCloseInner); return _inner.done; } } diff --git a/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart b/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart index 1df7e5a..2afcbff 100644 --- a/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart +++ b/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart @@ -50,7 +50,7 @@ class _StreamTransformerWrapperSink implements StreamSink { } @override - void addError(error, [StackTrace stackTrace]) { + void addError(error, [StackTrace? stackTrace]) { _controller.addError(error, stackTrace); } From de4410745b4acbd4fe29db130c66f9bc70f65de5 Mon Sep 17 00:00:00 2001 From: Jacob MacDonald Date: Tue, 17 Mar 2020 09:27:34 -0700 Subject: [PATCH 05/15] migrate typed_stream_subscription --- lib/src/typed/stream_subscription.dart | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/lib/src/typed/stream_subscription.dart b/lib/src/typed/stream_subscription.dart index d85b776..fe91656 100644 --- a/lib/src/typed/stream_subscription.dart +++ b/lib/src/typed/stream_subscription.dart @@ -13,22 +13,23 @@ class TypeSafeStreamSubscription implements StreamSubscription { TypeSafeStreamSubscription(this._subscription); @override - void onData(void Function(T) handleData) { + void onData(void Function(T)? handleData) { + if (handleData == null) return _subscription.onData(null); _subscription.onData((data) => handleData(data as T)); } @override - void onError(Function handleError) { + void onError(Function? handleError) { _subscription.onError(handleError); } @override - void onDone(void Function() handleDone) { + void onDone(void Function()? handleDone) { _subscription.onDone(handleDone); } @override - void pause([Future resumeFuture]) { + void pause([Future? resumeFuture]) { _subscription.pause(resumeFuture); } @@ -41,5 +42,6 @@ class TypeSafeStreamSubscription implements StreamSubscription { Future cancel() => _subscription.cancel(); @override - Future asFuture([E futureValue]) => _subscription.asFuture(futureValue); + Future asFuture([E? futureValue]) => + _subscription.asFuture(futureValue); } From 3d3d54f8aa9225d6b90986a6f461e4ce4ef9c67a Mon Sep 17 00:00:00 2001 From: Jacob MacDonald Date: Tue, 17 Mar 2020 09:29:36 -0700 Subject: [PATCH 06/15] migrate AsyncCache --- lib/src/async_cache.dart | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/src/async_cache.dart b/lib/src/async_cache.dart index e0a6f49..655825d 100644 --- a/lib/src/async_cache.dart +++ b/lib/src/async_cache.dart @@ -29,13 +29,13 @@ class AsyncCache { final Duration _duration; /// Cached results of a previous [fetchStream] call. - StreamSplitter _cachedStreamSplitter; + StreamSplitter? _cachedStreamSplitter; /// Cached results of a previous [fetch] call. - Future _cachedValueFuture; + Future? _cachedValueFuture; /// Fires when the cache should be considered stale. - Timer _stale; + Timer? _stale; /// Creates a cache that invalidates its contents after [duration] has passed. /// @@ -78,12 +78,12 @@ class AsyncCache { if (_cachedValueFuture != null) { throw StateError('Previously used to cache via `fetch`'); } - _cachedStreamSplitter ??= StreamSplitter( + var splitter = _cachedStreamSplitter ??= StreamSplitter( callback().transform(StreamTransformer.fromHandlers(handleDone: (sink) { _startStaleTimer(); sink.close(); }))); - return _cachedStreamSplitter.split(); + return splitter.split(); } /// Removes any cached value. From eae7efea407114a7b75f54f13e8ddc171c6c89c7 Mon Sep 17 00:00:00 2001 From: Jacob MacDonald Date: Tue, 17 Mar 2020 09:37:21 -0700 Subject: [PATCH 07/15] migrate cancelable_operation --- lib/src/cancelable_operation.dart | 33 +++++++++++++++++-------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/lib/src/cancelable_operation.dart b/lib/src/cancelable_operation.dart index 9fd0534..0fc7f5d 100644 --- a/lib/src/cancelable_operation.dart +++ b/lib/src/cancelable_operation.dart @@ -35,7 +35,7 @@ class CancelableOperation { /// moment this [CancelableOperation] is created, regardless of whether /// [inner] has completed yet or not. factory CancelableOperation.fromFuture(Future inner, - {FutureOr Function() onCancel}) { + {FutureOr Function()? onCancel}) { var completer = CancelableCompleter(onCancel: onCancel); completer.complete(inner); return completer.operation; @@ -55,7 +55,7 @@ class CancelableOperation { value.then((value) { controller.add(value); controller.close(); - }, onError: (error, StackTrace stackTrace) { + }, onError: (Object error, StackTrace stackTrace) { controller.addError(error, stackTrace); controller.close(); }); @@ -68,7 +68,10 @@ class CancelableOperation { /// If this operation completes, this completes to the same result as [value]. /// If this operation is cancelled, the returned future waits for the future /// returned by [cancel], then completes to [cancellationValue]. - Future valueOrCancellation([T cancellationValue]) { + /// + /// If [T] is not nullable then [cancellationValue] must provide a valid + /// value, otherwise it will throw on cancellation. + Future valueOrCancellation([T? cancellationValue]) { var completer = Completer.sync(); value.then((result) => completer.complete(result), onError: completer.completeError); @@ -93,8 +96,8 @@ class CancelableOperation { /// If [propagateCancel] is `true` and the returned operation is canceled then /// this operation is canceled. The default is `false`. CancelableOperation then(FutureOr Function(T) onValue, - {FutureOr Function(Object, StackTrace) onError, - FutureOr Function() onCancel, + {FutureOr Function(Object, StackTrace)? onError, + FutureOr Function()? onCancel, bool propagateCancel = false}) { final completer = CancelableCompleter(onCancel: propagateCancel ? cancel : null); @@ -109,7 +112,7 @@ class CancelableOperation { completer._cancel(); } } - }, onError: (error, StackTrace stackTrace) { + }, onError: (Object error, StackTrace stackTrace) { if (!completer.isCanceled) { if (onError != null) { completer.complete(Future.sync(() => onError(error, stackTrace))); @@ -145,7 +148,7 @@ class CancelableCompleter { final Completer _inner; /// The callback to call if the future is canceled. - final FutureOrCallback _onCancel; + final FutureOrCallback? _onCancel; /// Creates a new completer for a [CancelableOperation]. /// @@ -155,15 +158,14 @@ class CancelableCompleter { /// /// [onCancel] will be called synchronously when the operation is canceled. /// It's guaranteed to only be called once. - CancelableCompleter({FutureOr Function() onCancel}) + CancelableCompleter({FutureOr Function()? onCancel}) : _onCancel = onCancel, _inner = Completer() { - _operation = CancelableOperation._(this); + operation = CancelableOperation._(this); } /// The operation controlled by this completer. - CancelableOperation get operation => _operation; - CancelableOperation _operation; + late final CancelableOperation operation; /// Whether the completer has completed. bool get isCompleted => _isCompleted; @@ -180,7 +182,7 @@ class CancelableCompleter { /// /// If [value] is a [Future], this will complete to the result of that /// [Future] once it completes. - void complete([FutureOr value]) { + void complete([FutureOr? value]) { if (_isCompleted) throw StateError('Operation already completed'); _isCompleted = true; @@ -200,14 +202,14 @@ class CancelableCompleter { future.then((result) { if (_isCanceled) return; _inner.complete(result); - }, onError: (error, StackTrace stackTrace) { + }, onError: (Object error, StackTrace stackTrace) { if (_isCanceled) return; _inner.completeError(error, stackTrace); }); } /// Completes [operation] to [error]. - void completeError(Object error, [StackTrace stackTrace]) { + void completeError(Object error, [StackTrace? stackTrace]) { if (_isCompleted) throw StateError('Operation already completed'); _isCompleted = true; @@ -221,7 +223,8 @@ class CancelableCompleter { return _cancelMemo.runOnce(() { _isCanceled = true; - if (_onCancel != null) return _onCancel(); + var onCancel = _onCancel; + if (onCancel != null) return onCancel(); }); } } From d54f65ba548acc552183e56d53770fab8b72e68b Mon Sep 17 00:00:00 2001 From: Jacob MacDonald Date: Tue, 17 Mar 2020 09:45:54 -0700 Subject: [PATCH 08/15] migrate FutureGroup --- lib/src/future_group.dart | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/lib/src/future_group.dart b/lib/src/future_group.dart index 402ae46..3a6291f 100644 --- a/lib/src/future_group.dart +++ b/lib/src/future_group.dart @@ -44,13 +44,13 @@ class FutureGroup implements Sink> { Stream get onIdle => (_onIdleController ??= StreamController.broadcast(sync: true)).stream; - StreamController _onIdleController; + StreamController? _onIdleController; /// The values emitted by the futures that have been added to the group, in /// the order they were added. /// /// The slots for futures that haven't completed yet are `null`. - final _values = []; + final _values = []; /// Wait for [task] to complete. @override @@ -71,12 +71,13 @@ class FutureGroup implements Sink> { _values[index] = value; if (_pending != 0) return null; - if (_onIdleController != null) _onIdleController.add(null); + var onIdleController = _onIdleController; + if (onIdleController != null) onIdleController.add(null); if (!_closed) return null; - if (_onIdleController != null) _onIdleController.close(); - _completer.complete(_values); - }).catchError((error, StackTrace stackTrace) { + if (onIdleController != null) onIdleController.close(); + _completer.complete(_values.whereType().toList()); + }).catchError((Object error, StackTrace stackTrace) { if (_completer.isCompleted) return null; _completer.completeError(error, stackTrace); }); @@ -89,6 +90,6 @@ class FutureGroup implements Sink> { _closed = true; if (_pending != 0) return; if (_completer.isCompleted) return; - _completer.complete(_values); + _completer.complete(_values.whereType().toList()); } } From 129b73c05f45e498fd445951b4f2e1a79842c833 Mon Sep 17 00:00:00 2001 From: Jacob MacDonald Date: Tue, 17 Mar 2020 09:57:38 -0700 Subject: [PATCH 09/15] migrate LazyStream/NullStreamSink --- lib/src/lazy_stream.dart | 10 +++++----- lib/src/null_stream_sink.dart | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/src/lazy_stream.dart b/lib/src/lazy_stream.dart index f5e65d1..a253ba8 100644 --- a/lib/src/lazy_stream.dart +++ b/lib/src/lazy_stream.dart @@ -15,7 +15,7 @@ import 'utils.dart'; /// produce a `Stream`. class LazyStream extends Stream { /// The callback that's called to create the inner stream. - FutureOrCallback> _callback; + FutureOrCallback>? _callback; /// Creates a single-subscription `Stream` that calls [callback] when it gets /// a listener and forwards to the returned stream. @@ -25,15 +25,15 @@ class LazyStream extends Stream { } @override - StreamSubscription listen(void Function(T) onData, - {Function onError, void Function() onDone, bool cancelOnError}) { - if (_callback == null) { + StreamSubscription listen(void Function(T)? onData, + {Function? onError, void Function()? onDone, bool? cancelOnError}) { + var callback = _callback; + if (callback == null) { throw StateError('Stream has already been listened to.'); } // Null out the callback before we invoke it to ensure that even while // running it, this can't be called twice. - var callback = _callback; _callback = null; var result = callback(); diff --git a/lib/src/null_stream_sink.dart b/lib/src/null_stream_sink.dart index b28df2a..34b100b 100644 --- a/lib/src/null_stream_sink.dart +++ b/lib/src/null_stream_sink.dart @@ -43,12 +43,12 @@ class NullStreamSink implements StreamSink { /// /// If [done] is passed, it's used as the [Sink.done] future. Otherwise, a /// completed future is used. - NullStreamSink({Future done}) : done = done ?? Future.value(); + NullStreamSink({Future? done}) : done = done ?? Future.value(); /// Creates a null sink whose [done] future emits [error]. /// /// Note that this error will not be considered uncaught. - NullStreamSink.error(error, [StackTrace stackTrace]) + NullStreamSink.error(Object error, [StackTrace? stackTrace]) : done = Future.error(error, stackTrace) // Don't top-level the error. This gives the user a change to call // [close] or [done], and matches the behavior of a remote endpoint @@ -61,7 +61,7 @@ class NullStreamSink implements StreamSink { } @override - void addError(error, [StackTrace stackTrace]) { + void addError(Object error, [StackTrace? stackTrace]) { _checkEventAllowed(); } @@ -70,7 +70,7 @@ class NullStreamSink implements StreamSink { _checkEventAllowed(); _addingStream = true; - var future = stream.listen(null).cancel() ?? Future.value(); + var future = stream.listen(null).cancel(); return future.whenComplete(() { _addingStream = false; }); From 96a911cd6bd523914b5abb02175141d885ab3e2c Mon Sep 17 00:00:00 2001 From: Jacob MacDonald Date: Tue, 17 Mar 2020 10:15:13 -0700 Subject: [PATCH 10/15] migrate StreamCompleter, StreamGroup, StreamQueue, and SingleSubscriptionTransformer --- lib/src/single_subscription_transformer.dart | 2 +- lib/src/stream_completer.dart | 31 ++++++++-------- lib/src/stream_group.dart | 19 +++++----- lib/src/stream_queue.dart | 37 ++++++++++---------- 4 files changed, 44 insertions(+), 45 deletions(-) diff --git a/lib/src/single_subscription_transformer.dart b/lib/src/single_subscription_transformer.dart index fe939fc..e9bdb32 100644 --- a/lib/src/single_subscription_transformer.dart +++ b/lib/src/single_subscription_transformer.dart @@ -18,7 +18,7 @@ class SingleSubscriptionTransformer extends StreamTransformerBase { @override Stream bind(Stream stream) { - StreamSubscription subscription; + late StreamSubscription subscription; var controller = StreamController(sync: true, onCancel: () => subscription.cancel()); subscription = stream.listen((value) { diff --git a/lib/src/stream_completer.dart b/lib/src/stream_completer.dart index 5128d57..5b06250 100644 --- a/lib/src/stream_completer.dart +++ b/lib/src/stream_completer.dart @@ -97,7 +97,7 @@ class StreamCompleter { /// /// Any one of [setSourceStream], [setEmpty], and [setError] may be called at /// most once. Trying to call any of them again will fail. - void setError(error, [StackTrace stackTrace]) { + void setError(Object error, [StackTrace? stackTrace]) { setSourceStream(Stream.fromFuture(Future.error(error, stackTrace))); } } @@ -108,22 +108,23 @@ class _CompleterStream extends Stream { /// /// Created if the user listens on this stream before the source stream /// is set, or if using [_setEmpty] so there is no source stream. - StreamController _controller; + StreamController? _controller; /// Source stream for the events provided by this stream. /// /// Set when the completer sets the source stream using [_setSourceStream] /// or [_setEmpty]. - Stream _sourceStream; + Stream? _sourceStream; @override - StreamSubscription listen(void Function(T) onData, - {Function onError, void Function() onDone, bool cancelOnError}) { + StreamSubscription listen(void Function(T)? onData, + {Function? onError, void Function()? onDone, bool? cancelOnError}) { if (_controller == null) { - if (_sourceStream != null && !_sourceStream.isBroadcast) { + var sourceStream = _sourceStream; + if (sourceStream != null && !sourceStream.isBroadcast) { // If the source stream is itself single subscription, // just listen to it directly instead of creating a controller. - return _sourceStream.listen(onData, + return sourceStream.listen(onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError); } _createController(); @@ -131,7 +132,7 @@ class _CompleterStream extends Stream { _linkStreamToController(); } } - return _controller.stream.listen(onData, + return _controller!.stream.listen(onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError); } @@ -157,11 +158,10 @@ class _CompleterStream extends Stream { /// Links source stream to controller when both are available. void _linkStreamToController() { - assert(_controller != null); - assert(_sourceStream != null); - _controller - .addStream(_sourceStream, cancelOnError: false) - .whenComplete(_controller.close); + var controller = _controller!; + controller + .addStream(_sourceStream!, cancelOnError: false) + .whenComplete(controller.close); } /// Sets an empty source stream. @@ -173,8 +173,9 @@ class _CompleterStream extends Stream { if (_controller == null) { _createController(); } - _sourceStream = _controller.stream; // Mark stream as set. - _controller.close(); + var controller = _controller!; + _sourceStream = controller.stream; // Mark stream as set. + controller.close(); } // Creates the [_controller]. diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart index e9a4afc..8c17ec2 100644 --- a/lib/src/stream_group.dart +++ b/lib/src/stream_group.dart @@ -29,7 +29,7 @@ import 'dart:async'; class StreamGroup implements Sink> { /// The stream through which all events from streams in the group are emitted. Stream get stream => _controller.stream; - StreamController _controller; + late StreamController _controller; /// Whether the group is closed, meaning that no more streams may be added. var _closed = false; @@ -47,7 +47,7 @@ class StreamGroup implements Sink> { /// subscriptions will be canceled and set to null again. Single-subscriber /// stream subscriptions will be left intact, since they can't be /// re-subscribed. - final _subscriptions = , StreamSubscription>{}; + final _subscriptions = , StreamSubscription?>{}; /// Merges the events from [streams] into a single single-subscription stream. /// @@ -100,7 +100,7 @@ class StreamGroup implements Sink> { /// /// Throws a [StateError] if this group is closed. @override - Future add(Stream stream) { + Future? add(Stream stream) { if (_closed) { throw StateError("Can't add a Stream to a closed StreamGroup."); } @@ -130,7 +130,7 @@ class StreamGroup implements Sink> { /// /// If [stream]'s subscription is canceled, this returns /// [StreamSubscription.cancel]'s return value. Otherwise, it returns `null`. - Future remove(Stream stream) { + Future? remove(Stream stream) { var subscription = _subscriptions.remove(stream); var future = subscription == null ? null : subscription.cancel(); if (_closed && _subscriptions.isEmpty) _controller.close(); @@ -155,7 +155,7 @@ class StreamGroup implements Sink> { void _onPause() { _state = _StreamGroupState.paused; for (var subscription in _subscriptions.values) { - subscription.pause(); + subscription!.pause(); } } @@ -163,19 +163,18 @@ class StreamGroup implements Sink> { void _onResume() { _state = _StreamGroupState.listening; for (var subscription in _subscriptions.values) { - subscription.resume(); + subscription!.resume(); } } /// A callback called when [stream] is canceled. /// /// This is only called for single-subscription groups. - Future _onCancel() { + Future? _onCancel() { _state = _StreamGroupState.canceled; var futures = _subscriptions.values - .map((subscription) => subscription.cancel()) - .where((future) => future != null) + .map((subscription) => subscription!.cancel()) .toList(); _subscriptions.clear(); @@ -194,7 +193,7 @@ class StreamGroup implements Sink> { // will still be added to [_controller], but then they'll be dropped since // it has no listeners. if (!stream.isBroadcast) return; - subscription.cancel(); + subscription!.cancel(); _subscriptions[stream] = null; }); } diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart index 97d74c7..676e9cf 100644 --- a/lib/src/stream_queue.dart +++ b/lib/src/stream_queue.dart @@ -87,7 +87,7 @@ class StreamQueue { /// /// Set to subscription when listening, and set to `null` when the /// subscription is done (and [_isDone] is set to true). - StreamSubscription _subscription; + StreamSubscription? _subscription; /// Whether the event source is done. bool _isDone = false; @@ -393,7 +393,7 @@ class StreamQueue { /// After calling `cancel`, no further events can be requested. /// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel] /// may be called again. - Future cancel({bool immediate = false}) { + Future? cancel({bool immediate = false}) { if (_isClosed) throw _failClosed(); _isClosed = true; @@ -448,11 +448,10 @@ class StreamQueue { } _isDone = true; - if (_subscription == null) { + var subscription = _subscription; + if (subscription == null) { return _source; } - - var subscription = _subscription; _subscription = null; var wasPaused = subscription.isPaused; @@ -469,7 +468,7 @@ class StreamQueue { /// /// The event source is restarted by the next call to [_ensureListening]. void _pause() { - _subscription.pause(); + _subscription!.pause(); } /// Ensures that we are listening on events from the event source. @@ -482,22 +481,22 @@ class StreamQueue { if (_subscription == null) { _subscription = _source.listen((data) { _addResult(Result.value(data)); - }, onError: (error, StackTrace stackTrace) { + }, onError: (Object error, StackTrace stackTrace) { _addResult(Result.error(error, stackTrace)); }, onDone: () { _subscription = null; _close(); }); } else { - _subscription.resume(); + _subscription!.resume(); } } /// Cancels the underlying event source. - Future _cancel() { + Future? _cancel() { if (_isDone) return null; _subscription ??= _source.listen(null); - var future = _subscription.cancel(); + var future = _subscription!.cancel(); _close(); return future; } @@ -765,7 +764,8 @@ class _SkipRequest implements _EventRequest { var event = events.removeFirst(); if (event.isError) { - _completer.completeError(event.asError.error, event.asError.stackTrace); + _completer.completeError( + event.asError!.error, event.asError!.stackTrace); return true; } } @@ -808,10 +808,10 @@ class _TakeRequest extends _ListRequest { var event = events.removeFirst(); if (event.isError) { - event.asError.complete(_completer); + event.asError!.complete(_completer); return true; } - _list.add(event.asValue.value); + _list.add(event.asValue!.value); } _completer.complete(_list); return true; @@ -831,10 +831,10 @@ class _LookAheadRequest extends _ListRequest { } var event = events.elementAt(_list.length); if (event.isError) { - event.asError.complete(_completer); + event.asError!.complete(_completer); return true; } - _list.add(event.asValue.value); + _list.add(event.asValue!.value); } _completer.complete(_list); return true; @@ -948,8 +948,7 @@ class _HasNextRequest implements _EventRequest { /// [StreamQueue._updateRequests]. class _TransactionRequest implements _EventRequest { /// The transaction created by this request. - StreamQueueTransaction get transaction => _transaction; - StreamQueueTransaction _transaction; + late final StreamQueueTransaction transaction; /// The controller that passes events to [transaction]. final _controller = StreamController(sync: true); @@ -958,7 +957,7 @@ class _TransactionRequest implements _EventRequest { var _eventsSent = 0; _TransactionRequest(StreamQueue parent) { - _transaction = StreamQueueTransaction._(parent, _controller.stream); + transaction = StreamQueueTransaction._(parent, _controller.stream); } @override @@ -967,6 +966,6 @@ class _TransactionRequest implements _EventRequest { events[_eventsSent++].addTo(_controller); } if (isDone && !_controller.isClosed) _controller.close(); - return transaction._committed || _transaction._rejected; + return transaction._committed || transaction._rejected; } } From 51341f28ba6d7ea548937e81c842b5dae3595b7c Mon Sep 17 00:00:00 2001 From: Jacob MacDonald Date: Tue, 17 Mar 2020 10:34:03 -0700 Subject: [PATCH 11/15] migrate the rest of the package --- lib/src/stream_sink_completer.dart | 36 +++++++++--------- lib/src/stream_splitter.dart | 16 ++++---- lib/src/stream_subscription_transformer.dart | 30 +++++++-------- lib/src/stream_zip.dart | 14 +++---- lib/src/subscription_stream.dart | 40 ++++++++------------ 5 files changed, 64 insertions(+), 72 deletions(-) diff --git a/lib/src/stream_sink_completer.dart b/lib/src/stream_sink_completer.dart index ebfd717..7cdcd20 100644 --- a/lib/src/stream_sink_completer.dart +++ b/lib/src/stream_sink_completer.dart @@ -70,7 +70,7 @@ class StreamSinkCompleter { /// /// Either of [setDestinationSink] or [setError] may be called at most once. /// Trying to call either of them again will fail. - void setError(error, [StackTrace stackTrace]) { + void setError(Object error, [StackTrace? stackTrace]) { setDestinationSink(NullStreamSink.error(error, stackTrace)); } } @@ -81,18 +81,18 @@ class _CompleterSink implements StreamSink { /// /// Created if the user adds events to this sink before the destination sink /// is set. - StreamController _controller; + StreamController? _controller; /// Completer for [done]. /// /// Created if the user requests the [done] future before the destination sink /// is set. - Completer _doneCompleter; + Completer? _doneCompleter; /// Destination sink for the events added to this sink. /// /// Set when [StreamSinkCompleter.setDestinationSink] is called. - StreamSink _destinationSink; + StreamSink? _destinationSink; /// Whether events should be sent directly to [_destinationSink], as opposed /// to going through [_controller]. @@ -100,49 +100,49 @@ class _CompleterSink implements StreamSink { @override Future get done { - if (_doneCompleter != null) return _doneCompleter.future; + if (_doneCompleter != null) return _doneCompleter!.future; if (_destinationSink == null) { _doneCompleter = Completer.sync(); - return _doneCompleter.future; + return _doneCompleter!.future; } - return _destinationSink.done; + return _destinationSink!.done; } @override void add(T event) { if (_canSendDirectly) { - _destinationSink.add(event); + _destinationSink!.add(event); } else { _ensureController(); - _controller.add(event); + _controller!.add(event); } } @override - void addError(error, [StackTrace stackTrace]) { + void addError(error, [StackTrace? stackTrace]) { if (_canSendDirectly) { - _destinationSink.addError(error, stackTrace); + _destinationSink!.addError(error, stackTrace); } else { _ensureController(); - _controller.addError(error, stackTrace); + _controller!.addError(error, stackTrace); } } @override Future addStream(Stream stream) { - if (_canSendDirectly) return _destinationSink.addStream(stream); + if (_canSendDirectly) return _destinationSink!.addStream(stream); _ensureController(); - return _controller.addStream(stream, cancelOnError: false); + return _controller!.addStream(stream, cancelOnError: false); } @override Future close() { if (_canSendDirectly) { - _destinationSink.close(); + _destinationSink!.close(); } else { _ensureController(); - _controller.close(); + _controller!.close(); } return done; } @@ -168,7 +168,7 @@ class _CompleterSink implements StreamSink { // Catch any error that may come from [addStream] or [sink.close]. They'll // be reported through [done] anyway. sink - .addStream(_controller.stream) + .addStream(_controller!.stream) .whenComplete(sink.close) .catchError((_) {}); } @@ -176,7 +176,7 @@ class _CompleterSink implements StreamSink { // If the user has already asked when the sink is done, connect the sink's // done callback to that completer. if (_doneCompleter != null) { - _doneCompleter.complete(sink.done); + _doneCompleter!.complete(sink.done); } } } diff --git a/lib/src/stream_splitter.dart b/lib/src/stream_splitter.dart index e9f326e..f7377d6 100644 --- a/lib/src/stream_splitter.dart +++ b/lib/src/stream_splitter.dart @@ -28,7 +28,7 @@ class StreamSplitter { /// The subscription to [_stream]. /// /// This will be `null` until a branch has a listener. - StreamSubscription _subscription; + StreamSubscription? _subscription; /// The buffer of events or errors that have already been emitted by /// [_stream]. @@ -57,7 +57,7 @@ class StreamSplitter { /// /// [count] defaults to 2. This is the same as creating [count] branches and /// then closing the [StreamSplitter]. - static List> splitFrom(Stream stream, [int count]) { + static List> splitFrom(Stream stream, [int? count]) { count ??= 2; var splitter = StreamSplitter(stream); var streams = List>.generate(count, (_) => splitter.split()); @@ -125,8 +125,8 @@ class StreamSplitter { assert(_controllers.isEmpty); assert(_isClosed); - Future future; - if (_subscription != null) future = _subscription.cancel(); + Future? future; + if (_subscription != null) future = _subscription!.cancel(); if (future != null) _closeGroup.add(future); _closeGroup.close(); } @@ -142,7 +142,7 @@ class StreamSplitter { // Resume the subscription in case it was paused, either because all the // controllers were paused or because the last one was canceled. If it // wasn't paused, this will be a no-op. - _subscription.resume(); + _subscription!.resume(); } else { _subscription = _stream.listen(_onData, onError: _onError, onDone: _onDone); @@ -152,14 +152,14 @@ class StreamSplitter { /// Pauses [_subscription] if every controller is paused. void _onPause() { if (!_controllers.every((controller) => controller.isPaused)) return; - _subscription.pause(); + _subscription!.pause(); } /// Resumes [_subscription]. /// /// If [_subscription] wasn't paused, this is a no-op. void _onResume() { - _subscription.resume(); + _subscription!.resume(); } /// Removes [controller] from [_controllers] and cancels or pauses @@ -175,7 +175,7 @@ class StreamSplitter { if (_isClosed) { _cancelSubscription(); } else { - _subscription.pause(); + _subscription!.pause(); } } diff --git a/lib/src/stream_subscription_transformer.dart b/lib/src/stream_subscription_transformer.dart index 0e28972..d03ea70 100644 --- a/lib/src/stream_subscription_transformer.dart +++ b/lib/src/stream_subscription_transformer.dart @@ -28,9 +28,9 @@ typedef _VoidHandler = void Function(StreamSubscription inner); /// [StreamSubscription]: [handleCancel] must call `cancel()`, [handlePause] /// must call `pause()`, and [handleResume] must call `resume()`. StreamTransformer subscriptionTransformer( - {Future Function(StreamSubscription) handleCancel, - void Function(StreamSubscription) handlePause, - void Function(StreamSubscription) handleResume}) { + {Future Function(StreamSubscription)? handleCancel, + void Function(StreamSubscription)? handlePause, + void Function(StreamSubscription)? handleResume}) { return StreamTransformer((stream, cancelOnError) { return _TransformedSubscription( stream.listen(null, cancelOnError: cancelOnError), @@ -50,7 +50,7 @@ StreamTransformer subscriptionTransformer( /// methods. class _TransformedSubscription implements StreamSubscription { /// The wrapped subscription. - StreamSubscription _inner; + StreamSubscription? _inner; /// The callback to run when [cancel] is called. final _AsyncHandler _handleCancel; @@ -68,47 +68,47 @@ class _TransformedSubscription implements StreamSubscription { this._inner, this._handleCancel, this._handlePause, this._handleResume); @override - void onData(void Function(T) handleData) { + void onData(void Function(T)? handleData) { _inner?.onData(handleData); } @override - void onError(Function handleError) { + void onError(Function? handleError) { _inner?.onError(handleError); } @override - void onDone(void Function() handleDone) { + void onDone(void Function()? handleDone) { _inner?.onDone(handleDone); } @override Future cancel() => _cancelMemoizer.runOnce(() { - var inner = _inner; - _inner.onData(null); - _inner.onDone(null); + var inner = _inner!; + inner.onData(null); + inner.onDone(null); // Setting onError to null will cause errors to be top-leveled. - _inner.onError((_, __) {}); + inner.onError((_, __) {}); _inner = null; return _handleCancel(inner); }); final _cancelMemoizer = AsyncMemoizer(); @override - void pause([Future resumeFuture]) { + void pause([Future? resumeFuture]) { if (_cancelMemoizer.hasRun) return; if (resumeFuture != null) resumeFuture.whenComplete(resume); - _handlePause(_inner); + _handlePause(_inner!); } @override void resume() { if (_cancelMemoizer.hasRun) return; - _handleResume(_inner); + _handleResume(_inner!); } @override - Future asFuture([E futureValue]) => + Future asFuture([E? futureValue]) => _inner?.asFuture(futureValue) ?? Completer().future; } diff --git a/lib/src/stream_zip.dart b/lib/src/stream_zip.dart index e319746..7ee5bde 100644 --- a/lib/src/stream_zip.dart +++ b/lib/src/stream_zip.dart @@ -18,12 +18,12 @@ class StreamZip extends Stream> { StreamZip(Iterable> streams) : _streams = streams; @override - StreamSubscription> listen(void Function(List) onData, - {Function onError, void Function() onDone, bool cancelOnError}) { + StreamSubscription> listen(void Function(List)? onData, + {Function? onError, void Function()? onDone, bool? cancelOnError}) { cancelOnError = identical(true, cancelOnError); var subscriptions = >[]; - StreamController> controller; - List current; + late StreamController> controller; + late List current; var dataCount = 0; /// Called for each data from a subscription in [subscriptions]. @@ -32,7 +32,7 @@ class StreamZip extends Stream> { dataCount++; if (dataCount == subscriptions.length) { var data = current; - current = List(subscriptions.length); + current = []; dataCount = 0; for (var i = 0; i < subscriptions.length; i++) { if (i != index) subscriptions[i].resume(); @@ -74,7 +74,7 @@ class StreamZip extends Stream> { subscriptions.add(stream.listen((data) { handleData(index, data); }, - onError: cancelOnError ? handleError : handleErrorCancel, + onError: cancelOnError! ? handleError : handleErrorCancel, onDone: handleDone, cancelOnError: cancelOnError)); } @@ -85,7 +85,7 @@ class StreamZip extends Stream> { rethrow; } - current = List(subscriptions.length); + current = []; controller = StreamController>(onPause: () { for (var i = 0; i < subscriptions.length; i++) { diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart index b428619..84e7d06 100644 --- a/lib/src/subscription_stream.dart +++ b/lib/src/subscription_stream.dart @@ -18,7 +18,7 @@ import 'delegate/stream_subscription.dart'; /// If other code is accessing the subscription, results may be unpredictable. class SubscriptionStream extends Stream { /// The subscription providing the events for this stream. - StreamSubscription _source; + StreamSubscription? _source; /// Create a single-subscription `Stream` from [subscription]. /// @@ -31,24 +31,25 @@ class SubscriptionStream extends Stream { /// an error. SubscriptionStream(StreamSubscription subscription) : _source = subscription { - _source.pause(); + var source = _source!; + source.pause(); // Clear callbacks to avoid keeping them alive unnecessarily. - _source.onData(null); - _source.onError(null); - _source.onDone(null); + source.onData(null); + source.onError(null); + source.onDone(null); } @override - StreamSubscription listen(void Function(T) onData, - {Function onError, void Function() onDone, bool cancelOnError}) { - if (_source == null) { + StreamSubscription listen(void Function(T)? onData, + {Function? onError, void Function()? onDone, bool? cancelOnError}) { + var subscription = _source; + if (subscription == null) { throw StateError('Stream has already been listened to.'); } cancelOnError = (true == cancelOnError); - var subscription = _source; _source = null; - var result = cancelOnError + var result = cancelOnError! ? _CancelOnErrorSubscriptionWrapper(subscription) : subscription; result.onData(onData); @@ -71,26 +72,17 @@ class _CancelOnErrorSubscriptionWrapper : super(subscription); @override - void onError(Function handleError) { + void onError(Function? handleError) { // Cancel when receiving an error. super.onError((error, StackTrace stackTrace) { - var cancelFuture = super.cancel(); - if (cancelFuture != null) { - // Wait for the cancel to complete before sending the error event. - cancelFuture.whenComplete(() { - if (handleError is ZoneBinaryCallback) { - handleError(error, stackTrace); - } else { - handleError(error); - } - }); - } else { + // Wait for the cancel to complete before sending the error event. + super.cancel().whenComplete(() { if (handleError is ZoneBinaryCallback) { handleError(error, stackTrace); - } else { + } else if (handleError != null) { handleError(error); } - } + }); }); } } From d191c4e3433ee08c6d4c414114ce8d0ec3c077d3 Mon Sep 17 00:00:00 2001 From: Jacob MacDonald Date: Tue, 17 Mar 2020 10:43:33 -0700 Subject: [PATCH 12/15] update the error handler to the future api from the SDK, pass an empty stack trace --- .../stream_sink_transformer/handler_transformer.dart | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/lib/src/stream_sink_transformer/handler_transformer.dart b/lib/src/stream_sink_transformer/handler_transformer.dart index 73a4717..aa1f770 100644 --- a/lib/src/stream_sink_transformer/handler_transformer.dart +++ b/lib/src/stream_sink_transformer/handler_transformer.dart @@ -15,7 +15,7 @@ typedef HandleData = void Function(S data, EventSink sink); /// TODO: Update to take a non-nullable StackTrace once that change lands in /// the sdk. typedef HandleError = void Function( - Object error, StackTrace? stackTrace, EventSink sink); + Object error, StackTrace stackTrace, EventSink sink); /// The type of the callback for handling done events. typedef HandleDone = void Function(EventSink sink); @@ -74,7 +74,8 @@ class _HandlerSink implements StreamSink { } else { /// TODO: Update to pass AsyncError.defaultStackTrace(error) once that /// lands in the sdk. - handleError(error, stackTrace, _safeCloseInner); + handleError( + error, stackTrace ?? StackTrace.fromString(''), _safeCloseInner); } } @@ -83,7 +84,12 @@ class _HandlerSink implements StreamSink { return _inner.addStream(stream.transform( StreamTransformer.fromHandlers( handleData: _transformer._handleData, - handleError: _transformer._handleError, + // TODO: remove extra wrapping once the sdk changes to make the + // stack trace arg non-nullable. + handleError: _transformer._handleError == null + ? null + : (error, stack, sink) => _transformer._handleError!( + error, stack ?? StackTrace.fromString(''), sink), handleDone: _closeSink))); } From 7786fd3c9368e79d9f57424dda500094584e6cca Mon Sep 17 00:00:00 2001 From: Jacob MacDonald Date: Tue, 17 Mar 2020 11:01:19 -0700 Subject: [PATCH 13/15] point at merged collection branch --- pubspec.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubspec.yaml b/pubspec.yaml index 7172800..b0b6ddf 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -28,7 +28,7 @@ dependency_overrides: collection: git: url: git://github.com/dart-lang/collection.git - ref: null-safety-migration + ref: null_safety matcher: git: url: git://github.com/dart-lang/matcher.git From 2495ddbf72cbadfcd93750f33aacc0967db5a2f7 Mon Sep 17 00:00:00 2001 From: Jacob MacDonald Date: Tue, 17 Mar 2020 11:35:51 -0700 Subject: [PATCH 14/15] migrate tests --- test/async_cache_test.dart | 14 ++++-- test/async_memoizer_test.dart | 2 +- test/cancelable_operation_test.dart | 22 ++++----- test/future_group_test.dart | 2 +- test/lazy_stream_test.dart | 6 +-- test/result/result_captureAll_test.dart | 2 +- test/result/result_flattenAll_test.dart | 4 +- test/result/result_future_test.dart | 8 ++-- test/result/result_test.dart | 28 +++++------ test/stream_completer_test.dart | 20 ++++---- test/stream_group_test.dart | 10 ++-- test/stream_queue_test.dart | 18 +++---- test/stream_sink_completer_test.dart | 48 +++++++++---------- test/stream_sink_transformer_test.dart | 6 +-- test/stream_splitter_test.dart | 4 +- test/stream_zip_test.dart | 2 +- test/stream_zip_zone_test.dart | 2 +- test/subscription_stream_test.dart | 9 ++-- test/subscription_transformer_test.dart | 7 +-- .../stream_subscription_test.dart | 14 +++--- test/utils.dart | 8 ++-- 21 files changed, 120 insertions(+), 116 deletions(-) diff --git a/test/async_cache_test.dart b/test/async_cache_test.dart index 4efbb69..81e43e1 100644 --- a/test/async_cache_test.dart +++ b/test/async_cache_test.dart @@ -9,7 +9,7 @@ import 'package:fake_async/fake_async.dart'; import 'package:test/test.dart'; void main() { - AsyncCache cache; + late AsyncCache cache; setUp(() { // Create a cache that is fresh for an hour. @@ -22,7 +22,8 @@ void main() { test('should not fetch via callback when a cache exists', () async { await cache.fetch(() async => 'Expensive'); - expect(await cache.fetch(expectAsync0(() => null, count: 0)), 'Expensive'); + expect(await cache.fetch(expectAsync0(() async => 'fake', count: 0)), + 'Expensive'); }); test('should not fetch via callback when a future is in-flight', () async { @@ -31,7 +32,7 @@ void main() { var completer = Completer(); expect(cache.fetch(() => completer.future), completion('Expensive')); - expect(cache.fetch(expectAsync0(() => null, count: 0)), + expect(cache.fetch(expectAsync0(() async => 'fake', count: 0)), completion('Expensive')); completer.complete('Expensive'); }); @@ -79,7 +80,10 @@ void main() { yield '2'; yield '3'; }).toList(); - expect(await cache.fetchStream(expectAsync0(() => null, count: 0)).toList(), + expect( + await cache + .fetchStream(expectAsync0(() => Stream.empty(), count: 0)) + .toList(), ['1', '2', '3']); }); @@ -148,7 +152,7 @@ void main() { test('should pause a cached stream without affecting others', () async { Stream call() => Stream.fromIterable(['1', '2', '3']); - StreamSubscription sub; + late StreamSubscription sub; sub = cache.fetchStream(call).listen(expectAsync1((event) { if (event == '1') sub.pause(); })); diff --git a/test/async_memoizer_test.dart b/test/async_memoizer_test.dart index 982f7c9..490b389 100644 --- a/test/async_memoizer_test.dart +++ b/test/async_memoizer_test.dart @@ -6,7 +6,7 @@ import 'package:async/async.dart'; import 'package:test/test.dart'; void main() { - AsyncMemoizer cache; + late AsyncMemoizer cache; setUp(() => cache = AsyncMemoizer()); test('runs the function only the first time runOnce() is called', () async { diff --git a/test/cancelable_operation_test.dart b/test/cancelable_operation_test.dart index b38c0b9..c87e43a 100644 --- a/test/cancelable_operation_test.dart +++ b/test/cancelable_operation_test.dart @@ -11,7 +11,7 @@ import 'utils.dart'; void main() { group('without being canceled', () { - CancelableCompleter completer; + late CancelableCompleter completer; setUp(() { completer = CancelableCompleter(onCancel: expectAsync0(() {}, count: 0)); }); @@ -116,7 +116,7 @@ void main() { test('fires onCancel', () { var canceled = false; - CancelableCompleter completer; + late CancelableCompleter completer; completer = CancelableCompleter(onCancel: expectAsync0(() { expect(completer.isCanceled, isTrue); canceled = true; @@ -243,23 +243,23 @@ void main() { }); group('then', () { - FutureOr Function(int) onValue; - FutureOr Function(Object, StackTrace) onError; - FutureOr Function() onCancel; - bool propagateCancel; - CancelableCompleter originalCompleter; + FutureOr Function(int)? onValue; + FutureOr Function(Object, StackTrace)? onError; + FutureOr Function()? onCancel; + late bool propagateCancel; + late CancelableCompleter originalCompleter; setUp(() { // Initialize all functions to ones that expect to not be called. - onValue = expectAsync1((_) => null, count: 0, id: 'onValue'); - onError = expectAsync2((e, s) => null, count: 0, id: 'onError'); - onCancel = expectAsync0(() => null, count: 0, id: 'onCancel'); + onValue = expectAsync1((_) => 'Fake', count: 0, id: 'onValue'); + onError = expectAsync2((e, s) => 'Fake', count: 0, id: 'onError'); + onCancel = expectAsync0(() => 'Fake', count: 0, id: 'onCancel'); propagateCancel = false; }); CancelableOperation runThen() { originalCompleter = CancelableCompleter(); - return originalCompleter.operation.then(onValue, + return originalCompleter.operation.then(onValue!, onError: onError, onCancel: onCancel, propagateCancel: propagateCancel); diff --git a/test/future_group_test.dart b/test/future_group_test.dart index f99d06d..22e90f8 100644 --- a/test/future_group_test.dart +++ b/test/future_group_test.dart @@ -10,7 +10,7 @@ import 'package:test/test.dart'; import 'utils.dart'; void main() { - FutureGroup futureGroup; + late FutureGroup futureGroup; setUp(() { futureGroup = FutureGroup(); }); diff --git a/test/lazy_stream_test.dart b/test/lazy_stream_test.dart index 2affe5b..32c6b14 100644 --- a/test/lazy_stream_test.dart +++ b/test/lazy_stream_test.dart @@ -10,10 +10,6 @@ import 'package:test/test.dart'; import 'utils.dart'; void main() { - test('disallows a null callback', () { - expect(() => LazyStream(null), throwsArgumentError); - }); - test('calls the callback when the stream is listened', () async { var callbackCalled = false; var stream = LazyStream(expectAsync0(() { @@ -96,7 +92,7 @@ void main() { }); test("a lazy stream can't be listened to from within its callback", () { - LazyStream stream; + late LazyStream stream; stream = LazyStream(expectAsync0(() { expect(() => stream.listen(null), throwsStateError); return Stream.empty(); diff --git a/test/result/result_captureAll_test.dart b/test/result/result_captureAll_test.dart index 8e79872..b992e1f 100644 --- a/test/result/result_captureAll_test.dart +++ b/test/result/result_captureAll_test.dart @@ -14,7 +14,7 @@ Result err(n) => ErrorResult('$n', someStack); /// Helper function creating an iterable of futures. Iterable> futures(int count, - {bool Function(int index) throwWhen}) sync* { + {bool Function(int index)? throwWhen}) sync* { for (var i = 0; i < count; i++) { if (throwWhen != null && throwWhen(i)) { yield Future.error('$i', someStack); diff --git a/test/result/result_flattenAll_test.dart b/test/result/result_flattenAll_test.dart index c0a8603..b87fec4 100644 --- a/test/result/result_flattenAll_test.dart +++ b/test/result/result_flattenAll_test.dart @@ -11,7 +11,7 @@ Result err(n) => ErrorResult('$n', someStack); /// Helper function creating an iterable of results. Iterable> results(int count, - {bool Function(int index) throwWhen}) sync* { + {bool Function(int index)? throwWhen}) sync* { for (var i = 0; i < count; i++) { if (throwWhen != null && throwWhen(i)) { yield err(i); @@ -27,7 +27,7 @@ void main() { expect(result, expectation); } else { expect(result.isValue, true); - expect(result.asValue.value, expectation.asValue.value); + expect(result.asValue!.value, expectation.asValue!.value); } } diff --git a/test/result/result_future_test.dart b/test/result/result_future_test.dart index 4218db2..7171005 100644 --- a/test/result/result_future_test.dart +++ b/test/result/result_future_test.dart @@ -9,8 +9,8 @@ import 'package:stack_trace/stack_trace.dart'; import 'package:test/test.dart'; void main() { - Completer completer; - ResultFuture future; + late Completer completer; + late ResultFuture future; setUp(() { completer = Completer(); future = ResultFuture(completer.future); @@ -25,7 +25,7 @@ void main() { // The completer calls its listeners asynchronously. We have to wait // before we can access the result. - expect(future.then((_) => future.result.asValue.value), + expect(future.then((_) => future.result!.asValue!.value), completion(equals(12))); }); @@ -36,7 +36,7 @@ void main() { // The completer calls its listeners asynchronously. We have to wait // before we can access the result. return future.catchError((_) {}).then((_) { - var error = future.result.asError; + var error = future.result!.asError!; expect(error.error, equals('error')); expect(error.stackTrace, equals(trace)); }); diff --git a/test/result/result_test.dart b/test/result/result_test.dart index adbc445..008cb3d 100644 --- a/test/result/result_test.dart +++ b/test/result/result_test.dart @@ -16,7 +16,7 @@ void main() { var result = Result.value(42); expect(result.isValue, isTrue); expect(result.isError, isFalse); - ValueResult value = result.asValue; + ValueResult value = result.asValue!; expect(value.value, equals(42)); }); @@ -24,7 +24,7 @@ void main() { Result result = ValueResult(42); expect(result.isValue, isTrue); expect(result.isError, isFalse); - var value = result.asValue; + var value = result.asValue!; expect(value.value, equals(42)); }); @@ -32,7 +32,7 @@ void main() { var result = Result.error('BAD', stack); expect(result.isValue, isFalse); expect(result.isError, isTrue); - var error = result.asError; + var error = result.asError!; expect(error.error, equals('BAD')); expect(error.stackTrace, same(stack)); }); @@ -50,7 +50,7 @@ void main() { var result = Result.error('BAD'); expect(result.isValue, isFalse); expect(result.isError, isTrue); - var error = result.asError; + var error = result.asError!; expect(error.error, equals('BAD')); expect(error.stackTrace, isNull); }); @@ -119,7 +119,7 @@ void main() { Result.capture(value).then(expectAsync1((Result result) { expect(result.isValue, isTrue); expect(result.isError, isFalse); - var value = result.asValue; + var value = result.asValue!; expect(value.value, equals(42)); }), onError: (e, s) { fail('Unexpected error: $e'); @@ -131,7 +131,7 @@ void main() { Result.capture(value).then(expectAsync1((Result result) { expect(result.isValue, isFalse); expect(result.isError, isTrue); - var error = result.asError; + var error = result.asError!; expect(error.error, equals('BAD')); expect(error.stackTrace, same(stack)); }), onError: (e, s) { @@ -203,15 +203,15 @@ void main() { expect(expectedList.isEmpty, isFalse); Result expected = expectedList.removeFirst(); expect(expected.isValue, isTrue); - expect(v, equals(expected.asValue.value)); + expect(v, equals(expected.asValue!.value)); } void errorListener(error, StackTrace stackTrace) { expect(expectedList.isEmpty, isFalse); Result expected = expectedList.removeFirst(); expect(expected.isError, isTrue); - expect(error, equals(expected.asError.error)); - expect(stackTrace, same(expected.asError.stackTrace)); + expect(error, equals(expected.asError!.error)); + expect(stackTrace, same(expected.asError!.stackTrace)); } stream.listen(expectAsync1(dataListener, count: 2), @@ -311,10 +311,10 @@ void expectResult(Result actual, Result expected) { expect(actual.isValue, equals(expected.isValue)); expect(actual.isError, equals(expected.isError)); if (actual.isValue) { - expect(actual.asValue.value, equals(expected.asValue.value)); + expect(actual.asValue!.value, equals(expected.asValue!.value)); } else { - expect(actual.asError.error, equals(expected.asError.error)); - expect(actual.asError.stackTrace, same(expected.asError.stackTrace)); + expect(actual.asError!.error, equals(expected.asError!.error)); + expect(actual.asError!.stackTrace, same(expected.asError!.stackTrace)); } } @@ -334,8 +334,8 @@ class TestSink implements EventSink { } @override - void addError(error, [StackTrace stack]) { - onError(error, stack); + void addError(error, [StackTrace? stack]) { + onError(error, stack ?? StackTrace.fromString('')); } @override diff --git a/test/stream_completer_test.dart b/test/stream_completer_test.dart index 0cd210a..ecb5621 100644 --- a/test/stream_completer_test.dart +++ b/test/stream_completer_test.dart @@ -77,7 +77,7 @@ void main() { var completer = StreamCompleter(); var lastEvent = -1; var controller = StreamController(); - StreamSubscription subscription; + late StreamSubscription subscription; subscription = completer.stream.listen((value) { expect(value, lessThan(3)); lastEvent = value; @@ -125,7 +125,7 @@ void main() { test("source stream isn't listened to until completer stream is", () async { var completer = StreamCompleter(); - StreamController controller; + late StreamController controller; controller = StreamController(onListen: () { scheduleMicrotask(controller.close); }); @@ -139,13 +139,13 @@ void main() { }); test('cancelOnError true when listening before linking stream', () async { - var completer = StreamCompleter(); + var completer = StreamCompleter(); Object lastEvent = -1; - var controller = StreamController(); + var controller = StreamController(); completer.stream.listen((value) { expect(value, lessThan(3)); lastEvent = value; - }, onError: (value) { + }, onError: (Object value) { expect(value, '3'); lastEvent = value; }, onDone: unreachable('done'), cancelOnError: true); @@ -172,9 +172,9 @@ void main() { }); test('cancelOnError true when listening after linking stream', () async { - var completer = StreamCompleter(); + var completer = StreamCompleter(); Object lastEvent = -1; - var controller = StreamController(); + var controller = StreamController(); completer.setSourceStream(controller.stream); controller.add(1); expect(controller.hasListener, isFalse); @@ -182,7 +182,7 @@ void main() { completer.stream.listen((value) { expect(value, lessThan(3)); lastEvent = value; - }, onError: (value) { + }, onError: (Object value) { expect(value, '3'); lastEvent = value; }, onDone: unreachable('done'), cancelOnError: true); @@ -265,8 +265,8 @@ void main() { }); test('setting onData etc. before and after setting stream', () async { - var completer = StreamCompleter(); - var controller = StreamController(); + var completer = StreamCompleter(); + var controller = StreamController(); var subscription = completer.stream.listen(null); Object lastEvent = 0; subscription.onData((value) => lastEvent = value); diff --git a/test/stream_group_test.dart b/test/stream_group_test.dart index 690fbc1..eadae19 100644 --- a/test/stream_group_test.dart +++ b/test/stream_group_test.dart @@ -9,7 +9,7 @@ import 'package:test/test.dart'; void main() { group('single-subscription', () { - StreamGroup streamGroup; + late StreamGroup streamGroup; setUp(() { streamGroup = StreamGroup(); }); @@ -236,7 +236,7 @@ void main() { StreamController(onCancel: () => completer.future); var fired = false; - streamGroup.add(controller.stream).then((_) => fired = true); + streamGroup.add(controller.stream)!.then((_) => fired = true); await flushMicrotasks(); expect(fired, isFalse); @@ -249,7 +249,7 @@ void main() { }); group('broadcast', () { - StreamGroup streamGroup; + late StreamGroup streamGroup; setUp(() { streamGroup = StreamGroup.broadcast(); }); @@ -455,7 +455,7 @@ void main() { } void regardlessOfType(StreamGroup Function() newStreamGroup) { - StreamGroup streamGroup; + late StreamGroup streamGroup; setUp(() { streamGroup = newStreamGroup(); }); @@ -608,7 +608,7 @@ void regardlessOfType(StreamGroup Function() newStreamGroup) { await flushMicrotasks(); var fired = false; - streamGroup.remove(controller.stream).then((_) => fired = true); + streamGroup.remove(controller.stream)!.then((_) => fired = true); await flushMicrotasks(); expect(fired, isFalse); diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart index fa286a8..74c91e6 100644 --- a/test/stream_queue_test.dart +++ b/test/stream_queue_test.dart @@ -250,7 +250,7 @@ void main() { var skip4 = events.skip(1); var index = 0; // Check that futures complete in order. - Func1Required sequence(expectedValue, sequenceIndex) => (value) { + Func1Required sequence(expectedValue, sequenceIndex) => (value) { expect(value, expectedValue); expect(index, sequenceIndex); index++; @@ -610,7 +610,7 @@ void main() { var controller = StreamController(); var events = StreamQueue(controller.stream); - bool hasNext; + bool? hasNext; events.hasNext.then((result) { hasNext = result; }); @@ -626,7 +626,7 @@ void main() { var controller = StreamController(); var events = StreamQueue(controller.stream); - bool hasNext; + bool? hasNext; events.hasNext.then((result) { hasNext = result; }); @@ -772,10 +772,10 @@ void main() { }); group('startTransaction operation produces a transaction that', () { - StreamQueue events; - StreamQueueTransaction transaction; - StreamQueue queue1; - StreamQueue queue2; + late StreamQueue events; + late StreamQueueTransaction transaction; + late StreamQueue queue1; + late StreamQueue queue2; setUp(() async { events = StreamQueue(createStream()); expect(await events.next, 1); @@ -1004,7 +1004,7 @@ void main() { }); group('withTransaction operation', () { - StreamQueue events; + late StreamQueue events; setUp(() async { events = StreamQueue(createStream()); expect(await events.next, 1); @@ -1058,7 +1058,7 @@ void main() { }); group('cancelable operation', () { - StreamQueue events; + late StreamQueue events; setUp(() async { events = StreamQueue(createStream()); expect(await events.next, 1); diff --git a/test/stream_sink_completer_test.dart b/test/stream_sink_completer_test.dart index 69d1d8a..591f32f 100644 --- a/test/stream_sink_completer_test.dart +++ b/test/stream_sink_completer_test.dart @@ -10,7 +10,7 @@ import 'package:test/test.dart'; import 'utils.dart'; void main() { - StreamSinkCompleter completer; + late StreamSinkCompleter completer; setUp(() { completer = StreamSinkCompleter(); }); @@ -21,10 +21,10 @@ void main() { completer.setDestinationSink(sink); completer.sink..add(1)..add(2)..add(3)..add(4); - expect(sink.results[0].asValue.value, equals(1)); - expect(sink.results[1].asValue.value, equals(2)); - expect(sink.results[2].asValue.value, equals(3)); - expect(sink.results[3].asValue.value, equals(4)); + expect(sink.results[0].asValue!.value, equals(1)); + expect(sink.results[1].asValue!.value, equals(2)); + expect(sink.results[2].asValue!.value, equals(3)); + expect(sink.results[3].asValue!.value, equals(4)); }); test('error events are forwarded', () { @@ -32,8 +32,8 @@ void main() { completer.setDestinationSink(sink); completer.sink..addError('oh no')..addError("that's bad"); - expect(sink.results[0].asError.error, equals('oh no')); - expect(sink.results[1].asError.error, equals("that's bad")); + expect(sink.results[0].asError!.error, equals('oh no')); + expect(sink.results[1].asError!.error, equals("that's bad")); }); test('addStream is forwarded', () async { @@ -49,10 +49,10 @@ void main() { controller.addError("that's bad"); await flushMicrotasks(); - expect(sink.results[0].asValue.value, equals(1)); - expect(sink.results[1].asError.error, equals('oh no')); - expect(sink.results[2].asValue.value, equals(2)); - expect(sink.results[3].asError.error, equals("that's bad")); + expect(sink.results[0].asValue!.value, equals(1)); + expect(sink.results[1].asError!.error, equals('oh no')); + expect(sink.results[2].asValue!.value, equals(2)); + expect(sink.results[3].asError!.error, equals("that's bad")); expect(sink.isClosed, isFalse); controller.close(); @@ -121,10 +121,10 @@ void main() { completer.setDestinationSink(sink); await flushMicrotasks(); - expect(sink.results[0].asValue.value, equals(1)); - expect(sink.results[1].asValue.value, equals(2)); - expect(sink.results[2].asValue.value, equals(3)); - expect(sink.results[3].asValue.value, equals(4)); + expect(sink.results[0].asValue!.value, equals(1)); + expect(sink.results[1].asValue!.value, equals(2)); + expect(sink.results[2].asValue!.value, equals(3)); + expect(sink.results[3].asValue!.value, equals(4)); }); test('error events are forwarded', () async { @@ -135,8 +135,8 @@ void main() { completer.setDestinationSink(sink); await flushMicrotasks(); - expect(sink.results[0].asError.error, equals('oh no')); - expect(sink.results[1].asError.error, equals("that's bad")); + expect(sink.results[0].asError!.error, equals('oh no')); + expect(sink.results[1].asError!.error, equals("that's bad")); }); test('addStream is forwarded', () async { @@ -154,10 +154,10 @@ void main() { completer.setDestinationSink(sink); await flushMicrotasks(); - expect(sink.results[0].asValue.value, equals(1)); - expect(sink.results[1].asError.error, equals('oh no')); - expect(sink.results[2].asValue.value, equals(2)); - expect(sink.results[3].asError.error, equals("that's bad")); + expect(sink.results[0].asValue!.value, equals(1)); + expect(sink.results[1].asError!.error, equals('oh no')); + expect(sink.results[2].asValue!.value, equals(2)); + expect(sink.results[3].asError!.error, equals("that's bad")); expect(sink.isClosed, isFalse); }); @@ -258,9 +258,9 @@ void main() { futureCompleter.complete(testSink); await testSink.done; - expect(testSink.results[0].asValue.value, equals(1)); - expect(testSink.results[1].asValue.value, equals(2)); - expect(testSink.results[2].asValue.value, equals(3)); + expect(testSink.results[0].asValue!.value, equals(1)); + expect(testSink.results[1].asValue!.value, equals(2)); + expect(testSink.results[2].asValue!.value, equals(3)); }); test('with an error', () async { diff --git a/test/stream_sink_transformer_test.dart b/test/stream_sink_transformer_test.dart index 8493971..e5f6baa 100644 --- a/test/stream_sink_transformer_test.dart +++ b/test/stream_sink_transformer_test.dart @@ -10,7 +10,7 @@ import 'package:test/test.dart'; import 'utils.dart'; void main() { - StreamController controller; + late StreamController controller; setUp(() { controller = StreamController(); }); @@ -18,7 +18,7 @@ void main() { group('fromStreamTransformer', () { test('transforms data events', () { var transformer = StreamSinkTransformer.fromStreamTransformer( - StreamTransformer.fromHandlers(handleData: (i, sink) { + StreamTransformer.fromHandlers(handleData: (int i, sink) { sink.add(i * 2); })); var sink = transformer.bind(controller.sink); @@ -117,7 +117,7 @@ void main() { group('fromHandlers', () { test('transforms data events', () { var transformer = - StreamSinkTransformer.fromHandlers(handleData: (i, sink) { + StreamSinkTransformer.fromHandlers(handleData: (int i, sink) { sink.add(i * 2); }); var sink = transformer.bind(controller.sink); diff --git a/test/stream_splitter_test.dart b/test/stream_splitter_test.dart index 3748c5c..3493193 100644 --- a/test/stream_splitter_test.dart +++ b/test/stream_splitter_test.dart @@ -8,8 +8,8 @@ import 'package:async/async.dart'; import 'package:test/test.dart'; void main() { - StreamController controller; - StreamSplitter splitter; + late StreamController controller; + late StreamSplitter splitter; setUp(() { controller = StreamController(); splitter = StreamSplitter(controller.stream); diff --git a/test/stream_zip_test.dart b/test/stream_zip_test.dart index 19c3dec..d374608 100644 --- a/test/stream_zip_test.dart +++ b/test/stream_zip_test.dart @@ -306,7 +306,7 @@ void main() { var s2 = Stream.fromIterable([1, 3, 5, 7]); var sz = StreamZip([s1, s2]); var ctr = 0; - StreamSubscription sub; + late StreamSubscription sub; sub = sz.listen(expectAsync1((v) { expect(v, equals([ctr * 2, ctr * 2 + 1])); if (ctr == 1) { diff --git a/test/stream_zip_zone_test.dart b/test/stream_zip_zone_test.dart index 68cd723..50af6b6 100644 --- a/test/stream_zip_zone_test.dart +++ b/test/stream_zip_zone_test.dart @@ -33,7 +33,7 @@ void testStream(String name, StreamController controller, Stream stream) { var outer = Zone.current; runZoned(() { var newZone1 = Zone.current; - StreamSubscription sub; + late StreamSubscription sub; sub = stream.listen(expectAsync1((v) { expect(v, 42); expect(Zone.current, newZone1); diff --git a/test/subscription_stream_test.dart b/test/subscription_stream_test.dart index 98a809f..8be0572 100644 --- a/test/subscription_stream_test.dart +++ b/test/subscription_stream_test.dart @@ -22,7 +22,7 @@ void main() { var stream = createStream(); var skips = 0; var completer = Completer(); - StreamSubscription subscription; + late StreamSubscription subscription; subscription = stream.listen((value) { ++skips; expect(value, skips); @@ -72,8 +72,9 @@ void main() { group('cancelOnError source:', () { for (var sourceCancels in [false, true]) { group('${sourceCancels ? "yes" : "no"}:', () { - SubscriptionStream subscriptionStream; - Future onCancel; // Completes if source stream is canceled before done. + late SubscriptionStream subscriptionStream; + late Future + onCancel; // Completes if source stream is canceled before done. setUp(() { var cancelCompleter = Completer(); var source = createErrorStream(cancelCompleter); @@ -159,7 +160,7 @@ Stream createStream() async* { yield 4; } -Stream createErrorStream([Completer onCancel]) async* { +Stream createErrorStream([Completer? onCancel]) async* { var canceled = true; try { yield 1; diff --git a/test/subscription_transformer_test.dart b/test/subscription_transformer_test.dart index f0e3a70..8278ea0 100644 --- a/test/subscription_transformer_test.dart +++ b/test/subscription_transformer_test.dart @@ -90,7 +90,7 @@ void main() { subscriptionTransformer(handleCancel: expectAsync1((inner) { callbackInvoked = true; inner.cancel(); - return null; + return Future.value(); }))).listen(expectAsync1((_) {}, count: 0)); await flushMicrotasks(); @@ -248,11 +248,12 @@ void main() { }); group('when the outer subscription is canceled but the inner is not', () { - StreamSubscription subscription; + late StreamSubscription subscription; setUp(() { var controller = StreamController(); subscription = controller.stream - .transform(subscriptionTransformer(handleCancel: (_) => null)) + .transform( + subscriptionTransformer(handleCancel: (_) => Future.value())) .listen(expectAsync1((_) {}, count: 0), onError: expectAsync2((_, __) {}, count: 0), onDone: expectAsync0(() {}, count: 0)); diff --git a/test/typed_wrapper/stream_subscription_test.dart b/test/typed_wrapper/stream_subscription_test.dart index 2a11546..33d28a8 100644 --- a/test/typed_wrapper/stream_subscription_test.dart +++ b/test/typed_wrapper/stream_subscription_test.dart @@ -11,10 +11,11 @@ import '../utils.dart'; void main() { group('with valid types, forwards', () { - StreamController controller; - StreamSubscription wrapper; - bool isCanceled; + late StreamController controller; + late StreamSubscription wrapper; + late bool isCanceled; setUp(() { + isCanceled = false; controller = StreamController(onCancel: () { isCanceled = true; }); @@ -67,10 +68,11 @@ void main() { }); group('with invalid types,', () { - StreamController controller; - StreamSubscription wrapper; - bool isCanceled; + late StreamController controller; + late StreamSubscription wrapper; + late bool isCanceled; setUp(() { + isCanceled = false; controller = StreamController(onCancel: () { isCanceled = true; }); diff --git a/test/utils.dart b/test/utils.dart index a99fe9b..0085ce2 100644 --- a/test/utils.dart +++ b/test/utils.dart @@ -25,7 +25,7 @@ OptionalArgAction unreachable(String name) => Matcher throwsZoned(matcher) => predicate((void Function() callback) { var firstError = true; runZoned(callback, - onError: expectAsync2((error, StackTrace stackTrace) { + onError: expectAsync2((Object error, StackTrace stackTrace) { if (firstError) { expect(error, matcher); firstError = false; @@ -67,7 +67,7 @@ class CompleterStreamSink implements StreamSink { @override void add(T event) {} @override - void addError(error, [StackTrace stackTrace]) {} + void addError(error, [StackTrace? stackTrace]) {} @override Future addStream(Stream stream) async {} @override @@ -95,7 +95,7 @@ class TestSink implements StreamSink { /// /// If [onDone] is passed, it's called when the user calls [close]. Its result /// is piped to the [done] future. - TestSink({void Function() onDone}) : _onDone = onDone ?? (() {}); + TestSink({void Function()? onDone}) : _onDone = onDone ?? (() {}); @override void add(T event) { @@ -103,7 +103,7 @@ class TestSink implements StreamSink { } @override - void addError(error, [StackTrace stackTrace]) { + void addError(error, [StackTrace? stackTrace]) { results.add(Result.error(error, stackTrace)); } From 1295bbd2d67698db1276ea2dda382d614bba86c0 Mon Sep 17 00:00:00 2001 From: Jacob MacDonald Date: Tue, 17 Mar 2020 12:17:28 -0700 Subject: [PATCH 15/15] code review updates --- lib/src/result/error.dart | 6 ++++-- lib/src/result/result.dart | 8 ++++---- lib/src/stream_completer.dart | 12 ++++-------- lib/src/stream_sink_completer.dart | 16 ++++++---------- .../handler_transformer.dart | 10 +++++----- 5 files changed, 23 insertions(+), 29 deletions(-) diff --git a/lib/src/result/error.dart b/lib/src/result/error.dart index cc26537..27ce155 100644 --- a/lib/src/result/error.dart +++ b/lib/src/result/error.dart @@ -13,7 +13,7 @@ class ErrorResult implements Result { final Object error; /// The stack trace corresponding to where [error] was thrown. - final StackTrace? stackTrace; + final StackTrace stackTrace; @override bool get isValue => false; @@ -24,7 +24,9 @@ class ErrorResult implements Result { @override ErrorResult get asError => this; - ErrorResult(this.error, this.stackTrace); + ErrorResult(this.error, [StackTrace? stackTrace]) + // TODO: Use AsyncError.defaultStackTrace(error) once available + : stackTrace = stackTrace ?? StackTrace.fromString(''); @override void complete(Completer completer) { diff --git a/lib/src/result/result.dart b/lib/src/result/result.dart index c296614..4a43dbd 100644 --- a/lib/src/result/result.dart +++ b/lib/src/result/result.dart @@ -63,8 +63,8 @@ abstract class Result { factory Result(T Function() computation) { try { return ValueResult(computation()); - } catch (e, s) { - return ErrorResult(e as Object, s); + } on Object catch (e, s) { + return ErrorResult(e, s); } } @@ -108,7 +108,7 @@ abstract class Result { Result.capture(element).then((result) { results[i] = result; if (--pending == 0) { - completer.complete(results.cast>().toList()); + completer.complete(List.from(results)); } }); } else { @@ -116,7 +116,7 @@ abstract class Result { } } if (pending == 0) { - return Future.value(results.cast>().toList()); + return Future.value(List.from(results)); } completer = Completer>>(); return completer.future; diff --git a/lib/src/stream_completer.dart b/lib/src/stream_completer.dart index 5b06250..27034c2 100644 --- a/lib/src/stream_completer.dart +++ b/lib/src/stream_completer.dart @@ -127,7 +127,7 @@ class _CompleterStream extends Stream { return sourceStream.listen(onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError); } - _createController(); + _ensureController(); if (_sourceStream != null) { _linkStreamToController(); } @@ -170,17 +170,13 @@ class _CompleterStream extends Stream { /// immediately. void _setEmpty() { assert(_sourceStream == null); - if (_controller == null) { - _createController(); - } - var controller = _controller!; + var controller = _ensureController(); _sourceStream = controller.stream; // Mark stream as set. controller.close(); } // Creates the [_controller]. - void _createController() { - assert(_controller == null); - _controller = StreamController(sync: true); + StreamController _ensureController() { + return _controller ??= StreamController(sync: true); } } diff --git a/lib/src/stream_sink_completer.dart b/lib/src/stream_sink_completer.dart index 7cdcd20..10e549d 100644 --- a/lib/src/stream_sink_completer.dart +++ b/lib/src/stream_sink_completer.dart @@ -113,8 +113,7 @@ class _CompleterSink implements StreamSink { if (_canSendDirectly) { _destinationSink!.add(event); } else { - _ensureController(); - _controller!.add(event); + _ensureController().add(event); } } @@ -123,8 +122,7 @@ class _CompleterSink implements StreamSink { if (_canSendDirectly) { _destinationSink!.addError(error, stackTrace); } else { - _ensureController(); - _controller!.addError(error, stackTrace); + _ensureController().addError(error, stackTrace); } } @@ -132,8 +130,7 @@ class _CompleterSink implements StreamSink { Future addStream(Stream stream) { if (_canSendDirectly) return _destinationSink!.addStream(stream); - _ensureController(); - return _controller!.addStream(stream, cancelOnError: false); + return _ensureController().addStream(stream, cancelOnError: false); } @override @@ -141,15 +138,14 @@ class _CompleterSink implements StreamSink { if (_canSendDirectly) { _destinationSink!.close(); } else { - _ensureController(); - _controller!.close(); + _ensureController().close(); } return done; } /// Create [_controller] if it doesn't yet exist. - void _ensureController() { - _controller ??= StreamController(sync: true); + StreamController _ensureController() { + return _controller ??= StreamController(sync: true); } /// Sets the destination sink to which events from this sink will be provided. diff --git a/lib/src/stream_sink_transformer/handler_transformer.dart b/lib/src/stream_sink_transformer/handler_transformer.dart index aa1f770..f652954 100644 --- a/lib/src/stream_sink_transformer/handler_transformer.dart +++ b/lib/src/stream_sink_transformer/handler_transformer.dart @@ -11,9 +11,9 @@ import '../delegate/stream_sink.dart'; typedef HandleData = void Function(S data, EventSink sink); /// The type of the callback for handling error events. -/// -/// TODO: Update to take a non-nullable StackTrace once that change lands in -/// the sdk. +// +// TODO: Update to take a non-nullable StackTrace once that change lands in +// the sdk. typedef HandleError = void Function( Object error, StackTrace stackTrace, EventSink sink); @@ -72,8 +72,8 @@ class _HandlerSink implements StreamSink { if (handleError == null) { _inner.addError(error, stackTrace); } else { - /// TODO: Update to pass AsyncError.defaultStackTrace(error) once that - /// lands in the sdk. + // TODO: Update to pass AsyncError.defaultStackTrace(error) once that + // lands in the sdk. handleError( error, stackTrace ?? StackTrace.fromString(''), _safeCloseInner); }