diff --git a/CHANGELOG.md b/CHANGELOG.md index e32ec99..9acccf9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 2.9.0 + +* Add `StreamExtensions.firstOrNull`. + ## 2.8.2 * Deprecate `EventSinkBase`, `StreamSinkBase`, `IOSinkBase`. diff --git a/lib/src/stream_extensions.dart b/lib/src/stream_extensions.dart index 3801a02..129ce26 100644 --- a/lib/src/stream_extensions.dart +++ b/lib/src/stream_extensions.dart @@ -33,4 +33,29 @@ extension StreamExtensions on Stream { sink.close(); })); } + + /// A future which completes with the first event of this stream, or with + /// `null`. + /// + /// This stream is listened to, and if it emits any event, whether a data + /// event or an error event, the future completes with the same data value or + /// error. If the stream ends without emitting any events, the future is + /// completed with `null`. + Future get firstOrNull { + var completer = Completer.sync(); + final subscription = listen(null); + subscription + ..onData((event) { + subscription.cancel(); + completer.complete(event); + }) + ..onError((Object error, StackTrace stackTrace) { + subscription.cancel(); + completer.completeError(error, stackTrace); + }) + ..onDone(() { + completer.complete(null); + }); + return completer.future; + } } diff --git a/pubspec.yaml b/pubspec.yaml index c636de6..ec3613a 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: async -version: 2.8.3 +version: 2.9.0-dev description: Utility functions and classes related to the 'dart:async' library. repository: https://github.com/dart-lang/async diff --git a/test/stream_extensions_test.dart b/test/stream_extensions_test.dart index 85a3cee..2118ae7 100644 --- a/test/stream_extensions_test.dart +++ b/test/stream_extensions_test.dart @@ -55,4 +55,41 @@ void main() { expect(() => Stream.fromIterable([1]).slices(0), throwsRangeError); }); }); + + group('.firstOrNull', () { + test('returns the first data event', () { + expect( + Stream.fromIterable([1, 2, 3, 4]).firstOrNull, completion(equals(1))); + }); + + test('returns the first error event', () { + expect(Stream.error('oh no').firstOrNull, throwsA('oh no')); + }); + + test('returns null for an empty stream', () { + expect(Stream.empty().firstOrNull, completion(isNull)); + }); + + test('cancels the subscription after an event', () async { + var isCancelled = false; + var controller = StreamController(onCancel: () { + isCancelled = true; + }); + controller.add(1); + + await expectLater(controller.stream.firstOrNull, completion(equals(1))); + expect(isCancelled, isTrue); + }); + + test('cancels the subscription after an error', () async { + var isCancelled = false; + var controller = StreamController(onCancel: () { + isCancelled = true; + }); + controller.addError('oh no'); + + await expectLater(controller.stream.firstOrNull, throwsA('oh no')); + expect(isCancelled, isTrue); + }); + }); }