Skip to content
This repository has been archived by the owner on Oct 17, 2024. It is now read-only.

Merge null_safety branch into master #125

Merged
merged 16 commits into from
Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,18 +1,33 @@
language: dart

dart:
- dev
- 2.2.0
- dev

dart_task:
- test: --platform vm
- test: --platform chrome
- dartanalyzer
- dartfmt
jobs:
include:
- stage: analyze_and_format
name: "Analyzer"
dart: be/raw/latest
os: linux
script: dartanalyzer --enable-experiment=non-nullable --fatal-warnings --fatal-infos .
- stage: analyze_and_format
name: "Format"
dart: be/raw/latest
os: linux
script: dartfmt -n --set-exit-if-changed .
- stage: test
name: "Vm Tests"
dart: be/raw/latest
os: linux
script: pub run --enable-experiment=non-nullable test -p vm

stages:
- analyze_and_format
- test

# Only building master means that we don't run two builds for each pull request.
branches:
only: [master]
only: [master, null_safety]

cache:
directories:
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.5.0-nullsafety

* Migrate this package to null safety.

## 2.4.2

* `StreamQueue` starts listening immediately to broadcast strings.
Expand Down
2 changes: 2 additions & 0 deletions analysis_options.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ analyzer:
todo: ignore
# Lint provided by pkg:pedantic – should fix this!
unawaited_futures: ignore
enable-experiment:
- non-nullable

linter:
rules:
Expand Down
12 changes: 6 additions & 6 deletions lib/src/async_cache.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ class AsyncCache<T> {
final Duration _duration;

/// Cached results of a previous [fetchStream] call.
StreamSplitter<T> _cachedStreamSplitter;
StreamSplitter<T>? _cachedStreamSplitter;

/// Cached results of a previous [fetch] call.
Future<T> _cachedValueFuture;
Future<T>? _cachedValueFuture;

/// Fires when the cache should be considered stale.
Timer _stale;
Timer? _stale;

/// Creates a cache that invalidates its contents after [duration] has passed.
///
Expand Down Expand Up @@ -65,7 +65,7 @@ class AsyncCache<T> {
await _cachedValueFuture;
_startStaleTimer();
}
return _cachedValueFuture;
return _cachedValueFuture!;
}

/// Returns a cached stream from a previous call to [fetchStream], or runs
Expand All @@ -78,12 +78,12 @@ class AsyncCache<T> {
if (_cachedValueFuture != null) {
throw StateError('Previously used to cache via `fetch`');
}
_cachedStreamSplitter ??= StreamSplitter(
var splitter = _cachedStreamSplitter ??= StreamSplitter(
callback().transform(StreamTransformer.fromHandlers(handleDone: (sink) {
_startStaleTimer();
sink.close();
})));
return _cachedStreamSplitter.split();
return splitter.split();
}

/// Removes any cached value.
Expand Down
37 changes: 19 additions & 18 deletions lib/src/cancelable_operation.dart
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class CancelableOperation<T> {
/// moment this [CancelableOperation] is created, regardless of whether
/// [inner] has completed yet or not.
factory CancelableOperation.fromFuture(Future<T> inner,
{FutureOr Function() onCancel}) {
{FutureOr Function()? onCancel}) {
var completer = CancelableCompleter<T>(onCancel: onCancel);
completer.complete(inner);
return completer.operation;
Expand All @@ -55,7 +55,7 @@ class CancelableOperation<T> {
value.then((value) {
controller.add(value);
controller.close();
}, onError: (error, StackTrace stackTrace) {
}, onError: (Object error, StackTrace stackTrace) {
controller.addError(error, stackTrace);
controller.close();
});
Expand All @@ -68,8 +68,8 @@ class CancelableOperation<T> {
/// If this operation completes, this completes to the same result as [value].
/// If this operation is cancelled, the returned future waits for the future
/// returned by [cancel], then completes to [cancellationValue].
Future<T> valueOrCancellation([T cancellationValue]) {
var completer = Completer<T>.sync();
Future<T?> valueOrCancellation([T? cancellationValue]) {
var completer = Completer<T?>.sync();
value.then((result) => completer.complete(result),
onError: completer.completeError);

Expand All @@ -93,23 +93,24 @@ class CancelableOperation<T> {
/// If [propagateCancel] is `true` and the returned operation is canceled then
/// this operation is canceled. The default is `false`.
CancelableOperation<R> then<R>(FutureOr<R> Function(T) onValue,
{FutureOr<R> Function(Object, StackTrace) onError,
FutureOr<R> Function() onCancel,
{FutureOr<R> Function(Object, StackTrace)? onError,
FutureOr<R> Function()? onCancel,
bool propagateCancel = false}) {
final completer =
CancelableCompleter<R>(onCancel: propagateCancel ? cancel : null);

valueOrCancellation().then((T result) {
valueOrCancellation().then((T? result) {
if (!completer.isCanceled) {
if (isCompleted) {
completer.complete(Future.sync(() => onValue(result)));
assert(result is T);
completer.complete(Future.sync(() => onValue(result!)));
} else if (onCancel != null) {
completer.complete(Future.sync(onCancel));
} else {
completer._cancel();
}
}
}, onError: (error, StackTrace stackTrace) {
}, onError: (Object error, StackTrace stackTrace) {
if (!completer.isCanceled) {
if (onError != null) {
completer.complete(Future.sync(() => onError(error, stackTrace)));
Expand Down Expand Up @@ -145,7 +146,7 @@ class CancelableCompleter<T> {
final Completer<T> _inner;

/// The callback to call if the future is canceled.
final FutureOrCallback _onCancel;
final FutureOrCallback? _onCancel;

/// Creates a new completer for a [CancelableOperation].
///
Expand All @@ -155,15 +156,14 @@ class CancelableCompleter<T> {
///
/// [onCancel] will be called synchronously when the operation is canceled.
/// It's guaranteed to only be called once.
CancelableCompleter({FutureOr Function() onCancel})
CancelableCompleter({FutureOr Function()? onCancel})
: _onCancel = onCancel,
_inner = Completer<T>() {
_operation = CancelableOperation<T>._(this);
operation = CancelableOperation<T>._(this);
}

/// The operation controlled by this completer.
CancelableOperation<T> get operation => _operation;
CancelableOperation<T> _operation;
late final CancelableOperation<T> operation;

/// Whether the completer has completed.
bool get isCompleted => _isCompleted;
Expand All @@ -180,7 +180,7 @@ class CancelableCompleter<T> {
///
/// If [value] is a [Future], this will complete to the result of that
/// [Future] once it completes.
void complete([FutureOr<T> value]) {
void complete([FutureOr<T>? value]) {
if (_isCompleted) throw StateError('Operation already completed');
_isCompleted = true;

Expand All @@ -200,14 +200,14 @@ class CancelableCompleter<T> {
future.then((result) {
if (_isCanceled) return;
_inner.complete(result);
}, onError: (error, StackTrace stackTrace) {
}, onError: (Object error, StackTrace stackTrace) {
if (_isCanceled) return;
_inner.completeError(error, stackTrace);
});
}

/// Completes [operation] to [error].
void completeError(Object error, [StackTrace stackTrace]) {
void completeError(Object error, [StackTrace? stackTrace]) {
if (_isCompleted) throw StateError('Operation already completed');
_isCompleted = true;

Expand All @@ -221,7 +221,8 @@ class CancelableCompleter<T> {

return _cancelMemo.runOnce(() {
_isCanceled = true;
if (_onCancel != null) return _onCancel();
var onCancel = _onCancel;
if (onCancel != null) return onCancel();
});
}
}
2 changes: 1 addition & 1 deletion lib/src/delegate/event_sink.dart
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class DelegatingEventSink<T> implements EventSink<T> {
}

@override
void addError(error, [StackTrace stackTrace]) {
void addError(error, [StackTrace? stackTrace]) {
_sink.addError(error, stackTrace);
}

Expand Down
6 changes: 3 additions & 3 deletions lib/src/delegate/future.dart
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ class DelegatingFuture<T> implements Future<T> {
Stream<T> asStream() => _future.asStream();

@override
Future<T> catchError(Function onError, {bool Function(Object error) test}) =>
Future<T> catchError(Function onError, {bool Function(Object error)? test}) =>
_future.catchError(onError, test: test);

@override
Future<S> then<S>(FutureOr<S> Function(T) onValue, {Function onError}) =>
Future<S> then<S>(FutureOr<S> Function(T) onValue, {Function? onError}) =>
_future.then(onValue, onError: onError);

@override
Future<T> whenComplete(FutureOr Function() action) =>
_future.whenComplete(action);

@override
Future<T> timeout(Duration timeLimit, {FutureOr<T> Function() onTimeout}) =>
Future<T> timeout(Duration timeLimit, {FutureOr<T> Function()? onTimeout}) =>
_future.timeout(timeLimit, onTimeout: onTimeout);
}
2 changes: 1 addition & 1 deletion lib/src/delegate/stream_sink.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class DelegatingStreamSink<T> implements StreamSink<T> {
}

@override
void addError(error, [StackTrace stackTrace]) {
void addError(error, [StackTrace? stackTrace]) {
_sink.addError(error, stackTrace);
}

Expand Down
10 changes: 5 additions & 5 deletions lib/src/delegate/stream_subscription.dart
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,22 @@ class DelegatingStreamSubscription<T> implements StreamSubscription<T> {
: TypeSafeStreamSubscription<T>(subscription);

@override
void onData(void Function(T) handleData) {
void onData(void Function(T)? handleData) {
_source.onData(handleData);
}

@override
void onError(Function handleError) {
void onError(Function? handleError) {
_source.onError(handleError);
}

@override
void onDone(void Function() handleDone) {
void onDone(void Function()? handleDone) {
_source.onDone(handleDone);
}

@override
void pause([Future resumeFuture]) {
void pause([Future? resumeFuture]) {
_source.pause(resumeFuture);
}

Expand All @@ -59,7 +59,7 @@ class DelegatingStreamSubscription<T> implements StreamSubscription<T> {
Future cancel() => _source.cancel();

@override
Future<E> asFuture<E>([E futureValue]) => _source.asFuture(futureValue);
Future<E> asFuture<E>([E? futureValue]) => _source.asFuture(futureValue);

@override
bool get isPaused => _source.isPaused;
Expand Down
15 changes: 8 additions & 7 deletions lib/src/future_group.dart
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ class FutureGroup<T> implements Sink<Future<T>> {
Stream get onIdle =>
(_onIdleController ??= StreamController.broadcast(sync: true)).stream;

StreamController _onIdleController;
StreamController? _onIdleController;

/// The values emitted by the futures that have been added to the group, in
/// the order they were added.
///
/// The slots for futures that haven't completed yet are `null`.
final _values = <T>[];
final _values = <T?>[];

/// Wait for [task] to complete.
@override
Expand All @@ -71,12 +71,13 @@ class FutureGroup<T> implements Sink<Future<T>> {
_values[index] = value;

if (_pending != 0) return null;
if (_onIdleController != null) _onIdleController.add(null);
var onIdleController = _onIdleController;
if (onIdleController != null) onIdleController.add(null);

if (!_closed) return null;
if (_onIdleController != null) _onIdleController.close();
_completer.complete(_values);
}).catchError((error, StackTrace stackTrace) {
if (onIdleController != null) onIdleController.close();
_completer.complete(_values.whereType<T>().toList());
}).catchError((Object error, StackTrace stackTrace) {
if (_completer.isCompleted) return null;
_completer.completeError(error, stackTrace);
});
Expand All @@ -89,6 +90,6 @@ class FutureGroup<T> implements Sink<Future<T>> {
_closed = true;
if (_pending != 0) return;
if (_completer.isCompleted) return;
_completer.complete(_values);
_completer.complete(_values.whereType<T>().toList());
}
}
12 changes: 6 additions & 6 deletions lib/src/lazy_stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import 'utils.dart';
/// produce a `Stream`.
class LazyStream<T> extends Stream<T> {
/// The callback that's called to create the inner stream.
FutureOrCallback<Stream<T>> _callback;
FutureOrCallback<Stream<T>>? _callback;

/// Creates a single-subscription `Stream` that calls [callback] when it gets
/// a listener and forwards to the returned stream.
Expand All @@ -25,23 +25,23 @@ class LazyStream<T> extends Stream<T> {
}

@override
StreamSubscription<T> listen(void Function(T) onData,
{Function onError, void Function() onDone, bool cancelOnError}) {
if (_callback == null) {
StreamSubscription<T> listen(void Function(T)? onData,
{Function? onError, void Function()? onDone, bool? cancelOnError}) {
var callback = _callback;
if (callback == null) {
throw StateError('Stream has already been listened to.');
}

// Null out the callback before we invoke it to ensure that even while
// running it, this can't be called twice.
var callback = _callback;
_callback = null;
var result = callback();

Stream<T> stream;
if (result is Future<Stream<T>>) {
stream = StreamCompleter.fromFuture(result);
} else {
stream = result as Stream<T>;
stream = result;
}

return stream.listen(onData,
Expand Down
Loading