diff --git a/CHANGELOG.md b/CHANGELOG.md index 4680638..397ec1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,6 @@ -## 2.6.2-dev +## 2.7.0 + +* Add a `Stream.slices()` extension method. * Fix a bug where `CancelableOperation.then` may invoke the `onValue` callback, even if it had been canceled before `CancelableOperation.value` completes. diff --git a/lib/async.dart b/lib/async.dart index 2d5876a..2170442 100644 --- a/lib/async.dart +++ b/lib/async.dart @@ -29,6 +29,7 @@ export 'src/result/value.dart'; export 'src/single_subscription_transformer.dart'; export 'src/stream_closer.dart'; export 'src/stream_completer.dart'; +export 'src/stream_extensions.dart'; export 'src/stream_group.dart'; export 'src/stream_queue.dart'; export 'src/stream_sink_completer.dart'; diff --git a/lib/src/stream_extensions.dart b/lib/src/stream_extensions.dart new file mode 100644 index 0000000..8bd4b01 --- /dev/null +++ b/lib/src/stream_extensions.dart @@ -0,0 +1,24 @@ +// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +/// Utility extensions on [Stream]. +extension StreamExtensions on Stream { + Stream> slices(int length) { + if (length < 1) throw RangeError.range(length, 1, null, 'length'); + + var slice = []; + return transform(StreamTransformer.fromHandlers(handleData: (data, sink) { + slice.add(data); + if (slice.length == length) { + sink.add(slice); + slice = []; + } + }, handleDone: (sink) { + if (slice.isNotEmpty) sink.add(slice); + sink.close(); + })); + } +} diff --git a/pubspec.yaml b/pubspec.yaml index 4fc5b92..b6b6bca 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: async -version: 2.6.2-dev +version: 2.7.0 description: Utility functions and classes related to the 'dart:async' library. repository: https://github.com/dart-lang/async diff --git a/test/stream_extensions_test.dart b/test/stream_extensions_test.dart new file mode 100644 index 0000000..85a3cee --- /dev/null +++ b/test/stream_extensions_test.dart @@ -0,0 +1,58 @@ +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE filevents. + +import 'dart:async'; + +import 'package:async/async.dart'; +import 'package:test/test.dart'; + +void main() { + group('.slices', () { + test('empty', () { + expect(Stream.empty().slices(1).toList(), completion(equals([]))); + }); + + test('with the same length as the iterable', () { + expect( + Stream.fromIterable([1, 2, 3]).slices(3).toList(), + completion(equals([ + [1, 2, 3] + ]))); + }); + + test('with a longer length than the iterable', () { + expect( + Stream.fromIterable([1, 2, 3]).slices(5).toList(), + completion(equals([ + [1, 2, 3] + ]))); + }); + + test('with a shorter length than the iterable', () { + expect( + Stream.fromIterable([1, 2, 3]).slices(2).toList(), + completion(equals([ + [1, 2], + [3] + ]))); + }); + + test('with length divisible by the iterable\'s', () { + expect( + Stream.fromIterable([1, 2, 3, 4]).slices(2).toList(), + completion(equals([ + [1, 2], + [3, 4] + ]))); + }); + + test('refuses negative length', () { + expect(() => Stream.fromIterable([1]).slices(-1), throwsRangeError); + }); + + test('refuses length 0', () { + expect(() => Stream.fromIterable([1]).slices(0), throwsRangeError); + }); + }); +}