diff --git a/CHANGELOG.md b/CHANGELOG.md index c7256ac..84938d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ -## 2.5.1-dev +## 2.6.0 + +* Added `ChunkedStreamReader` for reading _chunked streams_ without managing + buffers. ## 2.5.0 diff --git a/lib/async.dart b/lib/async.dart index a97fc65..611d137 100644 --- a/lib/async.dart +++ b/lib/async.dart @@ -37,3 +37,4 @@ export 'src/stream_subscription_transformer.dart'; export 'src/stream_zip.dart'; export 'src/subscription_stream.dart'; export 'src/typed_stream_transformer.dart'; +export 'src/chunked_stream_reader.dart'; diff --git a/lib/src/chunked_stream_reader.dart b/lib/src/chunked_stream_reader.dart new file mode 100644 index 0000000..855a772 --- /dev/null +++ b/lib/src/chunked_stream_reader.dart @@ -0,0 +1,177 @@ +// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:typed_data'; + +import 'byte_collector.dart' show collectBytes; + +/// Utility class for reading elements from a _chunked stream_. +/// +/// A _chunked stream_ is a stream where each event is a chunk of elements. +/// Byte-streams with the type `Stream>` is common of example of this. +/// As illustrated in the example below, this utility class makes it easy to +/// read a _chunked stream_ using custom chunk sizes and sub-stream sizes, +/// without managing partially read chunks. +/// +/// ```dart +/// final r = ChunkedStreamReader(File('myfile.txt').openRead()); +/// try { +/// // Read the first 4 bytes +/// final firstBytes = await r.readChunk(4); +/// if (firstBytes.length < 4) { +/// throw Exception('myfile.txt has less than 4 bytes'); +/// } +/// +/// // Read next 8 kilobytes as a substream +/// Stream> substream = r.readStream(8 * 1024); +/// +/// ... +/// } finally { +/// // We always cancel the ChunkedStreamReader, this ensures the underlying +/// // stream is cancelled. +/// r.cancel(); +/// } +/// ``` +/// +/// The read-operations [readChunk] and [readStream] must not be invoked until +/// the future from a previous call has completed. +class ChunkedStreamReader { + final StreamIterator> _input; + final List _emptyList = const []; + List _buffer = []; + bool _reading = false; + + factory ChunkedStreamReader(Stream> stream) => + ChunkedStreamReader._(StreamIterator(stream)); + + ChunkedStreamReader._(this._input); + + /// Read next [size] elements from _chunked stream_, buffering to create a + /// chunk with [size] elements. + /// + /// This will read _chunks_ from the underlying _chunked stream_ until [size] + /// elements have been buffered, or end-of-stream, then it returns the first + /// [size] buffered elements. + /// + /// If end-of-stream is encountered before [size] elements is read, this + /// returns a list with fewer than [size] elements (indicating end-of-stream). + /// + /// If the underlying stream throws, the stream is cancelled, the exception is + /// propogated and further read operations will fail. + /// + /// Throws, if another read operation is on-going. + Future> readChunk(int size) async { + final result = []; + await for (final chunk in readStream(size)) { + result.addAll(chunk); + } + return result; + } + + /// Read next [size] elements from _chunked stream_ as a sub-stream. + /// + /// This will pass-through _chunks_ from the underlying _chunked stream_ until + /// [size] elements have been returned, or end-of-stream has been encountered. + /// + /// If end-of-stream is encountered before [size] elements is read, this + /// returns a list with fewer than [size] elements (indicating end-of-stream). + /// + /// If the underlying stream throws, the stream is cancelled, the exception is + /// propogated and further read operations will fail. + /// + /// If the sub-stream returned from [readStream] is cancelled the remaining + /// unread elements up-to [size] are drained, allowing subsequent + /// read-operations to proceed after cancellation. + /// + /// Throws, if another read-operation is on-going. + Stream> readStream(int size) { + RangeError.checkNotNegative(size, 'size'); + if (_reading) { + throw StateError('Concurrent read operations are not allowed!'); + } + _reading = true; + + final substream = () async* { + // While we have data to read + while (size > 0) { + // Read something into the buffer, if it's empty + if (_buffer.isEmpty) { + if (!(await _input.moveNext())) { + // Don't attempt to read more data, as there is no more data. + size = 0; + _reading = false; + break; + } + _buffer = _input.current; + } + + if (_buffer.isNotEmpty) { + if (size < _buffer.length) { + final output = _buffer.sublist(0, size); + _buffer = _buffer.sublist(size); + size = 0; + yield output; + _reading = false; + break; + } + + final output = _buffer; + size -= _buffer.length; + _buffer = _emptyList; + yield output; + } + } + }; + + final c = StreamController>(); + c.onListen = () => c.addStream(substream()).whenComplete(c.close); + c.onCancel = () async { + while (size > 0) { + if (_buffer.isEmpty) { + if (!await _input.moveNext()) { + size = 0; // no more data + break; + } + _buffer = _input.current; + } + + if (size < _buffer.length) { + _buffer = _buffer.sublist(size); + size = 0; + break; + } + + size -= _buffer.length; + _buffer = _emptyList; + } + _reading = false; + }; + + return c.stream; + } + + /// Cancel the underlying _chunked stream_. + /// + /// If a future from [readChunk] or [readStream] is still pending then + /// [cancel] behaves as if the underlying stream ended early. That is a future + /// from [readChunk] may return a partial chunk smaller than the request size. + /// + /// It is always safe to call [cancel], even if the underlying stream was read + /// to completion. + /// + /// It can be a good idea to call [cancel] in a `finally`-block when done + /// using the [ChunkedStreamReader], this mitigates risk of leaking resources. + Future cancel() async => await _input.cancel(); +} + +/// Extensions for using [ChunkedStreamReader] with byte-streams. +extension ChunkedStreamReaderByteStreamExt on ChunkedStreamReader { + /// Read bytes into a [Uint8List]. + /// + /// This does the same as [readChunk], except it uses [collectBytes] to create + /// a [Uint8List], which offers better performance. + Future readBytes(int size) async => + await collectBytes(readStream(size)); +} diff --git a/pubspec.yaml b/pubspec.yaml index e24947d..5bfe9b1 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: async -version: 2.5.1-dev +version: 2.6.0 description: Utility functions and classes related to the 'dart:async' library. repository: https://github.com/dart-lang/async diff --git a/test/chunked_stream_reader.dart b/test/chunked_stream_reader.dart new file mode 100644 index 0000000..7dcd408 --- /dev/null +++ b/test/chunked_stream_reader.dart @@ -0,0 +1,380 @@ +// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:typed_data'; + +import 'package:test/test.dart'; +import 'package:async/async.dart'; + +void main() { + test('readChunk() chunk by chunk', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); + + expect(await r.readChunk(2), equals([1, 2])); + expect(await r.readChunk(3), equals([3, 4, 5])); + expect(await r.readChunk(4), equals([6, 7, 8, 9])); + expect(await r.readChunk(1), equals([10])); + expect(await r.readChunk(1), equals([])); + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); + }); + + test('readChunk() element by element', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); + + for (var i = 0; i < 10; i++) { + expect(await r.readChunk(1), equals([i + 1])); + } + expect(await r.readChunk(1), equals([])); + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); + }); + + test('readChunk() exact elements', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); + + expect(await r.readChunk(10), equals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])); + expect(await r.readChunk(1), equals([])); + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); + }); + + test('readChunk() past end', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); + + expect(await r.readChunk(20), equals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])); + expect(await r.readChunk(1), equals([])); + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); + }); + + test('readChunk() chunks of 2 elements', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); + + expect(await r.readChunk(2), equals([1, 2])); + expect(await r.readChunk(2), equals([3, 4])); + expect(await r.readChunk(2), equals([5, 6])); + expect(await r.readChunk(2), equals([7, 8])); + expect(await r.readChunk(2), equals([9, 10])); + expect(await r.readChunk(1), equals([])); + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); + }); + + test('readChunk() chunks of 3 elements', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); + + expect(await r.readChunk(3), equals([1, 2, 3])); + expect(await r.readChunk(3), equals([4, 5, 6])); + expect(await r.readChunk(3), equals([7, 8, 9])); + expect(await r.readChunk(3), equals([10])); + expect(await r.readChunk(1), equals([])); + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); + }); + + test('readChunk() cancel half way', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); + + expect(await r.readChunk(5), equals([1, 2, 3, 4, 5])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); + }); + + test('readChunk() propagates exception', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + throw Exception('stopping here'); + }()); + + expect(await r.readChunk(3), equals([1, 2, 3])); + await expectLater(r.readChunk(3), throwsException); + + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); + }); + + test('readStream() forwards chunks', () async { + final chunk2 = [3, 4, 5]; + final chunk3 = [6, 7, 8, 9]; + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield chunk2; + yield chunk3; + yield [10]; + }()); + + expect(await r.readChunk(1), equals([1])); + final i = StreamIterator(r.readStream(9)); + expect(await i.moveNext(), isTrue); + expect(i.current, equals([2])); + + // We must forward the exact chunks otherwise it's not efficient! + // Hence, we have a reference equality check here. + expect(await i.moveNext(), isTrue); + expect(i.current, equals([3, 4, 5])); + expect(i.current == chunk2, isTrue); + + expect(await i.moveNext(), isTrue); + expect(i.current, equals([6, 7, 8, 9])); + expect(i.current == chunk3, isTrue); + + expect(await i.moveNext(), isTrue); + expect(i.current, equals([10])); + expect(await i.moveNext(), isFalse); + + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); + }); + + test('readStream() cancel at the exact end', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); + + expect(await r.readChunk(1), equals([1])); + final i = StreamIterator(r.readStream(7)); + expect(await i.moveNext(), isTrue); + expect(i.current, equals([2])); + + expect(await i.moveNext(), isTrue); + expect(i.current, equals([3, 4, 5])); + + expect(await i.moveNext(), isTrue); + expect(i.current, equals([6, 7, 8])); + + await i.cancel(); // cancel substream just as it's ending + + expect(await r.readChunk(2), equals([9, 10])); + + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); + }); + + test('readStream() cancel at the exact end on chunk boundary', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); + + expect(await r.readChunk(1), equals([1])); + final i = StreamIterator(r.readStream(8)); + expect(await i.moveNext(), isTrue); + expect(i.current, equals([2])); + + expect(await i.moveNext(), isTrue); + expect(i.current, equals([3, 4, 5])); + + expect(await i.moveNext(), isTrue); + expect(i.current, equals([6, 7, 8, 9])); + + await i.cancel(); // cancel substream just as it's ending + + expect(await r.readChunk(2), equals([10])); + + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); + }); + + test('readStream() is drained when canceled', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); + + expect(await r.readChunk(1), equals([1])); + final i = StreamIterator(r.readStream(7)); + expect(await i.moveNext(), isTrue); + expect(i.current, equals([2])); + // Cancelling here should skip the remainder of the substream + // and we continue to read 9 and 10 from r + await i.cancel(); + + expect(await r.readChunk(2), equals([9, 10])); + + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); + }); + + test('readStream() concurrent reads is forbidden', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); + + expect(await r.readChunk(1), equals([1])); + // Notice we are not reading this substream: + r.readStream(7); + + expectLater(r.readChunk(2), throwsStateError); + }); + + test('readStream() supports draining', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); + + expect(await r.readChunk(1), equals([1])); + await r.readStream(7).drain(); + expect(await r.readChunk(2), equals([9, 10])); + + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); + }); + + test('nested ChunkedStreamReader', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); + + expect(await r.readChunk(1), equals([1])); + final r2 = ChunkedStreamReader(r.readStream(7)); + expect(await r2.readChunk(2), equals([2, 3])); + expect(await r2.readChunk(1), equals([4])); + await r2.cancel(); + + expect(await r.readChunk(2), equals([9, 10])); + + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); + }); + + test('readBytes() chunks of 3 elements', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); + + expect(await r.readBytes(3), allOf(equals([1, 2, 3]), isA())); + expect(await r.readBytes(3), allOf(equals([4, 5, 6]), isA())); + expect(await r.readBytes(3), allOf(equals([7, 8, 9]), isA())); + expect(await r.readBytes(3), allOf(equals([10]), isA())); + expect(await r.readBytes(1), equals([])); + expect(await r.readBytes(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readBytes(1), equals([])); + }); + + test('readChunk() until exact end of stream', () async { + final stream = Stream.fromIterable(Iterable.generate( + 10, + (_) => Uint8List(512), + )); + + final r = ChunkedStreamReader(stream); + while (true) { + final c = await r.readBytes(1024); + if (c.isEmpty) { + break; + } + } + }); + + test('cancel while readChunk() is pending', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2, 3]; + // This will hang forever, so we will call cancel() + await Completer().future; + yield [4]; // this should never be reachable + fail('unreachable!'); + }()); + + expect(await r.readBytes(2), equals([1, 2])); + + final future = r.readChunk(2); + + // Wait a tiny bit and cancel + await Future.microtask(() => null); + r.cancel(); + + expect(await future, hasLength(lessThan(2))); + }); + + test('cancel while readStream() is pending', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2, 3]; + // This will hang forever, so we will call cancel() + await Completer().future; + yield [4]; // this should never be reachable + fail('unreachable!'); + }()); + + expect(await collectBytes(r.readStream(2)), equals([1, 2])); + + final stream = r.readStream(2); + + // Wait a tiny bit and cancel + await Future.microtask(() => null); + r.cancel(); + + expect(await collectBytes(stream), hasLength(lessThan(2))); + }); +}