From 13475daa86eade36f5b420dac8ff9d3a693848ae Mon Sep 17 00:00:00 2001 From: Jonas Finnemann Jensen Date: Mon, 29 Mar 2021 13:41:14 +0200 Subject: [PATCH 1/6] Added ChunkedStreamReader --- lib/async.dart | 1 + lib/src/chunked_stream_reader.dart | 176 ++++++++++++++ test/chunked_stream_reader.dart | 360 +++++++++++++++++++++++++++++ 3 files changed, 537 insertions(+) create mode 100644 lib/src/chunked_stream_reader.dart create mode 100644 test/chunked_stream_reader.dart diff --git a/lib/async.dart b/lib/async.dart index a97fc65..611d137 100644 --- a/lib/async.dart +++ b/lib/async.dart @@ -37,3 +37,4 @@ export 'src/stream_subscription_transformer.dart'; export 'src/stream_zip.dart'; export 'src/subscription_stream.dart'; export 'src/typed_stream_transformer.dart'; +export 'src/chunked_stream_reader.dart'; diff --git a/lib/src/chunked_stream_reader.dart b/lib/src/chunked_stream_reader.dart new file mode 100644 index 0000000..2576b68 --- /dev/null +++ b/lib/src/chunked_stream_reader.dart @@ -0,0 +1,176 @@ +// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:typed_data'; + +import 'byte_collector.dart' show collectBytes; + +/// Utility class for reading elements from a _chunked stream_. +/// +/// A _chunked stream_ is a stream where each event is a chunk of elements. +/// Byte-streams with the type `Stream>` is common of example of this. +/// As illustrated in the example below, this utility class makes it easy to +/// read a _chunked stream_ using custom chunk sizes and sub-stream sizes, +/// without managing partially read chunks. +/// +/// ```dart +/// final r = ChunkedStreamReader(File('myfile.txt').openRead()); +/// try { +/// // Read the first 4 bytes +/// final firstBytes = await r.readChunk(4); +/// if (firstBytes.length < 4) { +/// throw Exception('myfile.txt has less than 4 bytes'); +/// } +/// +/// // Read next 8 kilobytes as a substream +/// Stream> substream = r.readStream(8 * 1024); +/// +/// ... +/// } finally { +/// // We always cancel the ChunkedStreamReader, this ensures the underlying +/// // stream is cancelled. +/// r.cancel(); +/// } +/// ``` +/// +/// The read-operations: [readChunk] and [readStream] cannot be invoked +/// concurrently. +class ChunkedStreamReader { + final StreamIterator> _input; + final List _emptyList = []; + List _buffer = []; + bool _reading = false; + + factory ChunkedStreamReader(Stream> stream) => + ChunkedStreamReader._(StreamIterator(stream)); + + ChunkedStreamReader._(this._input); + + /// Read next [size] elements from _chunked stream_, buffering to create a + /// chunk with [size] elements. + /// + /// This will read _chunks_ from the underlying _chunked stream_ until [size] + /// elements have been buffered, or end-of-stream, then it returns the first + /// [size] buffered elements. + /// + /// If end-of-stream is encountered before [size] bytes is read, this returns + /// a list less than [size] (indicating end-of-stream). + /// + /// If the underlying stream throws, the stream is cancelled, the exception is + /// propogated and further read operations will fail. + /// + /// Throws, if another read operation is on-going. + Future> readChunk(int size) async { + final result = []; + await for (final chunk in readStream(size)) { + result.addAll(chunk); + } + return result; + } + + /// Read next [size] elements from _chunked stream_ as a sub-stream. + /// + /// This will pass-through _chunks_ from the underlying _chunked stream_ until + /// [size] elements have been returned, or end-of-stream has been encountered. + /// + /// If end-of-stream is encountered before [size] bytes is read, this returns + /// a _chunked stream_ with less than [size] (indicating end-of-stream). + /// + /// If the underlying stream throws, the stream is cancelled, the exception is + /// propogated and further read operations will fail. + /// + /// If the sub-stream returned from [readStream] is cancelled the remaining + /// unread elements up-to [size] are drained, allowing subsequent + /// read-operations to proceed after cancellation. + /// + /// Throws, if another read-operation is on-going. + Stream> readStream(int size) { + if (size < 0) { + throw ArgumentError.value(size, 'size', 'must be non-negative'); + } + if (_reading) { + throw StateError('Concurrent read operations are not allowed!'); + } + _reading = true; + + final c = StreamController>(); + + c.addStream(() async* { + // While we have data to read + while (size > 0) { + // Read something into the buffer, if it's empty + if (_buffer.isEmpty) { + if (!(await _input.moveNext())) { + // Don't attempt to read more data, as there is no more data. + size = 0; + _reading = false; + break; + } + _buffer = _input.current; + } + + if (size < _buffer.length) { + final output = _buffer.sublist(0, size); + _buffer = _buffer.sublist(size); + size = 0; + yield output; + _reading = false; + break; + } + + if (_buffer.isNotEmpty) { + final output = _buffer; + size -= _buffer.length; + _buffer = _emptyList; + yield output; + } + } + }()).whenComplete(c.close); + + c.onCancel = () async { + while (size > 0) { + if (_buffer.isEmpty) { + if (!await _input.moveNext()) { + size = 0; // no more data + _reading = false; + break; + } + _buffer = _input.current; + } + + if (size < _buffer.length) { + _buffer = _buffer.sublist(size); + size = 0; + _reading = false; + break; + } + + size -= _buffer.length; + _buffer = _emptyList; + } + }; + + return c.stream; + } + + /// Cancel the underlying _chunked stream_. + /// + /// It is always safe to call [cancel], even if the undelying stream was read + /// to completion. + /// + /// It can be a good idea to call [cancel] in a `finally`-block when done + /// using the [ChunkedStreamReader], this mitigates risk of leaking resources. + Future cancel() async => await _input.cancel(); +} + +/// Extensions for using [ChunkedStreamReader] with byte-streams. +extension ChunkedStreamReaderByteStreamExt on ChunkedStreamReader { + /// Read bytes into a [Uint8List]. + /// + /// This does the same as [readChunk], except it uses [collectBytes] to create + /// a [Uint8List], which offers better performance. + Future readBytes(int size) async => + await collectBytes(readStream(size)); +} diff --git a/test/chunked_stream_reader.dart b/test/chunked_stream_reader.dart new file mode 100644 index 0000000..c7f9bec --- /dev/null +++ b/test/chunked_stream_reader.dart @@ -0,0 +1,360 @@ +// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:typed_data'; +import 'package:test/test.dart'; + +import 'package:async/async.dart'; + +Stream> _chunkedStream(List> chunks) async* { + for (final chunk in chunks) { + yield chunk; + } +} + +Stream> _chunkedStreamWithError(List> chunks) async* { + for (final chunk in chunks) { + yield chunk; + } + + throw StateError('test generated error'); +} + +Future> _readChunkedStream(Stream> input) async { + final result = []; + await for (final chunk in input) { + result.addAll(chunk); + } + return result; +} + +void main() { + test('readChunk() -- chunk in given size', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2'], + ])); + expect(await s.readChunk(3), equals(['a', 'b', 'c'])); + expect(await s.readChunk(2), equals(['1', '2'])); + expect(await s.readChunk(1), equals([])); + }); + + test('readChunk() propagates stream error', () async { + final s = ChunkedStreamReader(_chunkedStreamWithError([ + ['a', 'b', 'c'], + ['1', '2'], + ])); + expect(await s.readChunk(3), equals(['a', 'b', 'c'])); + expect(() async => await s.readChunk(3), throwsStateError); + }); + + test('readChunk() -- chunk in given size', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2'], + ])); + expect(await s.readChunk(2), equals(['a', 'b'])); + expect(await s.readChunk(3), equals(['c', '1', '2'])); + expect(await s.readChunk(1), equals([])); + }); + + test('readChunk() -- chunks one item at the time', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2'], + ])); + expect(await s.readChunk(1), equals(['a'])); + expect(await s.readChunk(1), equals(['b'])); + expect(await s.readChunk(1), equals(['c'])); + expect(await s.readChunk(1), equals(['1'])); + expect(await s.readChunk(1), equals(['2'])); + expect(await s.readChunk(1), equals([])); + }); + + test('readChunk() -- one big chunk', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2'], + ])); + expect(await s.readChunk(6), equals(['a', 'b', 'c', '1', '2'])); + }); + + test('readStream() propagates stream error', () async { + final s = ChunkedStreamReader(_chunkedStreamWithError([ + ['a', 'b', 'c'], + ['1', '2'], + ])); + expect(await s.readChunk(3), equals(['a', 'b', 'c'])); + final substream = s.readStream(3); + final subChunkedStreamReader = ChunkedStreamReader(substream); + expect(() async => await subChunkedStreamReader.readChunk(3), + throwsStateError); + }); + + test('readStream() + _readChunkedStream()', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2'], + ])); + expect(await _readChunkedStream(s.readStream(5)), + equals(['a', 'b', 'c', '1', '2'])); + expect(await s.readChunk(1), equals([])); + }); + + test('(readStream() + _readChunkedStream()) x 2', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2'], + ])); + expect(await _readChunkedStream(s.readStream(2)), equals(['a', 'b'])); + expect(await _readChunkedStream(s.readStream(3)), equals(['c', '1', '2'])); + }); + + test('readStream() + _readChunkedStream() -- past end', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2'], + ])); + expect(await _readChunkedStream(s.readStream(6)), + equals(['a', 'b', 'c', '1', '2'])); + expect(await s.readChunk(1), equals([])); + }); + + test('readChunk() readStream() + _readChunkedStream() readChunk()', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2'], + ])); + expect(await s.readChunk(1), equals(['a'])); + expect(await _readChunkedStream(s.readStream(3)), equals(['b', 'c', '1'])); + expect(await s.readChunk(2), equals(['2'])); + }); + + test( + 'readChunk() StreamIterator(readStream()).cancel() readChunk() ' + '-- one item at the time', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2'], + ])); + expect(await s.readChunk(1), equals(['a'])); + final i = StreamIterator(s.readStream(3)); + expect(await i.moveNext(), isTrue); + await i.cancel(); + expect(await s.readChunk(1), equals(['2'])); + expect(await s.readChunk(1), equals([])); + }); + + test( + 'readChunk() StreamIterator(readStream()) readChunk() ' + '-- one item at the time', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2'], + ])); + expect(await s.readChunk(1), equals(['a'])); + final i = StreamIterator(s.readStream(3)); + expect(await i.moveNext(), isTrue); + expect(i.current, equals(['b', 'c'])); + expect(await i.moveNext(), isTrue); + expect(i.current, equals(['1'])); + expect(await i.moveNext(), isFalse); + expect(await s.readChunk(1), equals(['2'])); + expect(await s.readChunk(1), equals([])); + }); + + test('readStream() x 2', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2'], + ])); + expect( + await s.readStream(2).toList(), + equals([ + ['a', 'b'] + ])); + expect( + await s.readStream(3).toList(), + equals([ + ['c'], + ['1', '2'] + ])); + }); + + test( + 'readChunk() StreamIterator(readStream()).cancel() readChunk() -- ' + 'cancellation after reading', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2'], + ])); + expect(await s.readChunk(1), equals(['a'])); + final i = StreamIterator(s.readStream(3)); + expect(await i.moveNext(), isTrue); + await i.cancel(); + expect(await s.readChunk(1), equals(['2'])); + expect(await s.readChunk(1), equals([])); + }); + + test( + 'readChunk() StreamIterator(readStream()).cancel() readChunk() -- ' + 'cancellation after reading (2)', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2', '3'], + ['4', '5', '6'] + ])); + expect(await s.readChunk(1), equals(['a'])); + final i = StreamIterator(s.readStream(6)); + expect(await i.moveNext(), isTrue); + await i.cancel(); + expect(await s.readChunk(1), equals(['5'])); + expect(await s.readChunk(1), equals(['6'])); + }); + + test( + 'readChunk() StreamIterator(readStream()) readChunk() -- ' + 'not cancelling produces StateError', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2'], + ])); + expect(await s.readChunk(1), equals(['a'])); + final i = StreamIterator(s.readStream(3)); + expect(await i.moveNext(), isTrue); + expect(() async => await s.readChunk(1), throwsStateError); + }); + + test( + 'readChunk() StreamIterator(readStream()) readChunk() -- ' + 'not cancelling produces StateError (2)', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2'], + ])); + expect(await s.readChunk(1), equals(['a'])); + + /// ignore: unused_local_variable + final i = StreamIterator(s.readStream(3)); + expect(() async => await s.readChunk(1), throwsStateError); + }); + + test( + 'readChunk() readStream() that ends with first chunk + ' + '_readChunkedStream() readChunk()', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2'], + ])); + expect(await s.readChunk(1), equals(['a'])); + expect( + await s.readStream(2).toList(), + equals([ + ['b', 'c'] + ])); + expect(await s.readChunk(3), equals(['1', '2'])); + }); + + test( + 'readChunk() readStream() that ends with first chunk + drain() ' + 'readChunk()', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2'], + ])); + expect(await s.readChunk(1), equals(['a'])); + final sub = s.readStream(2); + await sub.drain(); + expect(await s.readChunk(3), equals(['1', '2'])); + }); + + test( + 'readChunk() readStream() that ends with second chunk + ' + '_readChunkedStream() readChunk()', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2'], + ['3', '4'] + ])); + expect(await s.readChunk(1), equals(['a'])); + expect( + await s.readStream(4).toList(), + equals([ + ['b', 'c'], + ['1', '2'] + ])); + expect(await s.readChunk(3), equals(['3', '4'])); + }); + + test( + 'readChunk() readStream() that ends with second chunk + ' + 'drain() readChunk()', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2'], + ['3', '4'], + ])); + expect(await s.readChunk(1), equals(['a'])); + final substream = s.readStream(4); + await substream.drain(); + expect(await s.readChunk(3), equals(['3', '4'])); + }); + + test( + 'readChunk() readStream() readChunk() before ' + 'draining substream produces StateError', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2'], + ['3', '4'], + ])); + expect(await s.readChunk(1), equals(['a'])); + // ignore: unused_local_variable + final substream = s.readStream(4); + expect(() async => await s.readChunk(3), throwsStateError); + }); + + test('creating two substreams simultaneously causes a StateError', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b', 'c'], + ['1', '2'], + ['3', '4'], + ])); + expect(await s.readChunk(1), equals(['a'])); + // ignore: unused_local_variable + final substream = s.readStream(4); + expect(() async { + //ignore: unused_local_variable + final substream2 = s.readStream(3); + }, throwsStateError); + }); + + test('nested ChunkedStreamReader', () async { + final s = ChunkedStreamReader(_chunkedStream([ + ['a', 'b'], + ['1', '2'], + ['3', '4'], + ])); + expect(await s.readChunk(1), equals(['a'])); + final substream = s.readStream(4); + final nested = ChunkedStreamReader(substream); + expect(await nested.readChunk(2), equals(['b', '1'])); + expect(await nested.readChunk(3), equals(['2', '3'])); + expect(await nested.readChunk(2), equals([])); + expect(await s.readChunk(1), equals(['4'])); + }); + + test('ChunkedStreamReaderByteStreamExt', () async { + final s = ChunkedStreamReader(_chunkedStream([ + [1, 2, 3], + [4], + ])); + expect(await s.readBytes(1), equals([1])); + expect(await s.readBytes(1), isA()); + expect(await s.readBytes(1), equals([3])); + expect(await s.readBytes(1), equals([4])); + expect(await s.readBytes(1), equals([])); + }); +} From e86bf653063bb752608aa0ea0810ca624666eece Mon Sep 17 00:00:00 2001 From: Jonas Finnemann Jensen Date: Mon, 29 Mar 2021 14:59:00 +0200 Subject: [PATCH 2/6] Updated changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c7256ac..fa8b22a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 2.5.1-dev +* Added `ChunkedStreamReader` for reading _chunked streams_ without managing + buffers. + ## 2.5.0 * Stable release for null safety. From 01fd7f77c6cc96d8308c09bf8af28b2893108e73 Mon Sep 17 00:00:00 2001 From: Jonas Finnemann Jensen Date: Tue, 30 Mar 2021 15:26:20 +0200 Subject: [PATCH 3/6] Prepare 2.6.0 release --- CHANGELOG.md | 2 +- pubspec.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fa8b22a..84938d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 2.5.1-dev +## 2.6.0 * Added `ChunkedStreamReader` for reading _chunked streams_ without managing buffers. diff --git a/pubspec.yaml b/pubspec.yaml index e24947d..5bfe9b1 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: async -version: 2.5.1-dev +version: 2.6.0 description: Utility functions and classes related to the 'dart:async' library. repository: https://github.com/dart-lang/async From 7a9918f1fc79bbc52cb44aeb0f7f21f472004eb5 Mon Sep 17 00:00:00 2001 From: Jonas Finnemann Jensen Date: Sat, 10 Apr 2021 13:16:51 +0200 Subject: [PATCH 4/6] Address review comments, fix boundary issue --- lib/src/chunked_stream_reader.dart | 54 +++++++++++++++-------------- test/chunked_stream_reader.dart | 55 ++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 26 deletions(-) diff --git a/lib/src/chunked_stream_reader.dart b/lib/src/chunked_stream_reader.dart index 2576b68..e5bbfb2 100644 --- a/lib/src/chunked_stream_reader.dart +++ b/lib/src/chunked_stream_reader.dart @@ -3,6 +3,7 @@ // BSD-style license that can be found in the LICENSE file. import 'dart:async'; +import 'dart:collection'; import 'dart:typed_data'; import 'byte_collector.dart' show collectBytes; @@ -35,11 +36,11 @@ import 'byte_collector.dart' show collectBytes; /// } /// ``` /// -/// The read-operations: [readChunk] and [readStream] cannot be invoked -/// concurrently. +/// The read-operations [readChunk] and [readStream] must not be invoked until +/// the future from a previous call has completed. class ChunkedStreamReader { final StreamIterator> _input; - final List _emptyList = []; + final List _emptyList = UnmodifiableListView([]); List _buffer = []; bool _reading = false; @@ -55,8 +56,8 @@ class ChunkedStreamReader { /// elements have been buffered, or end-of-stream, then it returns the first /// [size] buffered elements. /// - /// If end-of-stream is encountered before [size] bytes is read, this returns - /// a list less than [size] (indicating end-of-stream). + /// If end-of-stream is encountered before [size] elements is read, this + /// returns a list with fewer than [size] elements (indicating end-of-stream). /// /// If the underlying stream throws, the stream is cancelled, the exception is /// propogated and further read operations will fail. @@ -75,8 +76,8 @@ class ChunkedStreamReader { /// This will pass-through _chunks_ from the underlying _chunked stream_ until /// [size] elements have been returned, or end-of-stream has been encountered. /// - /// If end-of-stream is encountered before [size] bytes is read, this returns - /// a _chunked stream_ with less than [size] (indicating end-of-stream). + /// If end-of-stream is encountered before [size] elements is read, this + /// returns a list with fewer than [size] elements (indicating end-of-stream). /// /// If the underlying stream throws, the stream is cancelled, the exception is /// propogated and further read operations will fail. @@ -87,17 +88,13 @@ class ChunkedStreamReader { /// /// Throws, if another read-operation is on-going. Stream> readStream(int size) { - if (size < 0) { - throw ArgumentError.value(size, 'size', 'must be non-negative'); - } + RangeError.checkNotNegative(size, 'size'); if (_reading) { throw StateError('Concurrent read operations are not allowed!'); } _reading = true; - final c = StreamController>(); - - c.addStream(() async* { + final substream = () async* { // While we have data to read while (size > 0) { // Read something into the buffer, if it's empty @@ -111,30 +108,31 @@ class ChunkedStreamReader { _buffer = _input.current; } - if (size < _buffer.length) { - final output = _buffer.sublist(0, size); - _buffer = _buffer.sublist(size); - size = 0; - yield output; - _reading = false; - break; - } - if (_buffer.isNotEmpty) { + if (size < _buffer.length) { + final output = _buffer.sublist(0, size); + _buffer = _buffer.sublist(size); + size = 0; + yield output; + _reading = false; + break; + } + final output = _buffer; size -= _buffer.length; _buffer = _emptyList; yield output; } } - }()).whenComplete(c.close); + }; + final c = StreamController>(); + c.onListen = () => c.addStream(substream()).whenComplete(c.close); c.onCancel = () async { while (size > 0) { if (_buffer.isEmpty) { if (!await _input.moveNext()) { size = 0; // no more data - _reading = false; break; } _buffer = _input.current; @@ -143,13 +141,13 @@ class ChunkedStreamReader { if (size < _buffer.length) { _buffer = _buffer.sublist(size); size = 0; - _reading = false; break; } size -= _buffer.length; _buffer = _emptyList; } + _reading = false; }; return c.stream; @@ -157,7 +155,11 @@ class ChunkedStreamReader { /// Cancel the underlying _chunked stream_. /// - /// It is always safe to call [cancel], even if the undelying stream was read + /// If a future from [readChunk] or [readStream] is still pending then + /// [cancel] behaves as if the underlying stream ended early. That is a future + /// from [readChunk] may return a partial chunk smaller than the request size. + /// + /// It is always safe to call [cancel], even if the underlying stream was read /// to completion. /// /// It can be a good idea to call [cancel] in a `finally`-block when done diff --git a/test/chunked_stream_reader.dart b/test/chunked_stream_reader.dart index c7f9bec..ceecdd2 100644 --- a/test/chunked_stream_reader.dart +++ b/test/chunked_stream_reader.dart @@ -73,6 +73,21 @@ void main() { expect(await s.readChunk(1), equals([])); }); + test('readChunk() -- until exact end of stream', () async { + final stream = Stream.fromIterable(Iterable.generate( + 10, + (_) => Uint8List(512), + )); + + final s = ChunkedStreamReader(stream); + while (true) { + final c = await s.readBytes(1024); + if (c.isEmpty) { + break; + } + } + }); + test('readChunk() -- one big chunk', () async { final s = ChunkedStreamReader(_chunkedStream([ ['a', 'b', 'c'], @@ -357,4 +372,44 @@ void main() { expect(await s.readBytes(1), equals([4])); expect(await s.readBytes(1), equals([])); }); + + test('cancel while readChunk() is pending', () async { + final s = ChunkedStreamReader(() async* { + yield [1, 2, 3]; + // This will hang forever, so we will call cancel() + await Completer().future; + yield [4]; // this should never be reachable + fail('unreachable!'); + }()); + + expect(await s.readBytes(2), equals([1, 2])); + + final future = s.readChunk(2); + + // Wait a tiny bit and cancel + await Future.microtask(() => null); + s.cancel(); + + expect(await future, hasLength(lessThan(2))); + }); + + test('cancel while readStream() is pending', () async { + final s = ChunkedStreamReader(() async* { + yield [1, 2, 3]; + // This will hang forever, so we will call cancel() + await Completer().future; + yield [4]; // this should never be reachable + fail('unreachable!'); + }()); + + expect(await collectBytes(s.readStream(2)), equals([1, 2])); + + final stream = s.readStream(2); + + // Wait a tiny bit and cancel + await Future.microtask(() => null); + s.cancel(); + + expect(await collectBytes(stream), hasLength(lessThan(2))); + }); } From 5baaa2422d604a9b2822067685efcd0291e17062 Mon Sep 17 00:00:00 2001 From: Jonas Finnemann Jensen Date: Tue, 13 Apr 2021 21:54:29 +0200 Subject: [PATCH 5/6] Cleanup tests --- test/chunked_stream_reader.dart | 557 +++++++++++++------------------- 1 file changed, 233 insertions(+), 324 deletions(-) diff --git a/test/chunked_stream_reader.dart b/test/chunked_stream_reader.dart index ceecdd2..9991807 100644 --- a/test/chunked_stream_reader.dart +++ b/test/chunked_stream_reader.dart @@ -4,377 +4,286 @@ import 'dart:async'; import 'dart:typed_data'; -import 'package:test/test.dart'; +import 'package:test/test.dart'; import 'package:async/async.dart'; -Stream> _chunkedStream(List> chunks) async* { - for (final chunk in chunks) { - yield chunk; - } -} - -Stream> _chunkedStreamWithError(List> chunks) async* { - for (final chunk in chunks) { - yield chunk; - } +void main() { + test('readChunk() chunk by chunk', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); - throw StateError('test generated error'); -} + expect(await r.readChunk(2), equals([1, 2])); + expect(await r.readChunk(3), equals([3, 4, 5])); + expect(await r.readChunk(4), equals([6, 7, 8, 9])); + expect(await r.readChunk(1), equals([10])); + expect(await r.readChunk(1), equals([])); + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); + }); -Future> _readChunkedStream(Stream> input) async { - final result = []; - await for (final chunk in input) { - result.addAll(chunk); - } - return result; -} + test('readChunk() element by element', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); -void main() { - test('readChunk() -- chunk in given size', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2'], - ])); - expect(await s.readChunk(3), equals(['a', 'b', 'c'])); - expect(await s.readChunk(2), equals(['1', '2'])); - expect(await s.readChunk(1), equals([])); + for (var i = 0; i < 10; i++) { + expect(await r.readChunk(1), equals([i + 1])); + } + expect(await r.readChunk(1), equals([])); + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); }); - test('readChunk() propagates stream error', () async { - final s = ChunkedStreamReader(_chunkedStreamWithError([ - ['a', 'b', 'c'], - ['1', '2'], - ])); - expect(await s.readChunk(3), equals(['a', 'b', 'c'])); - expect(() async => await s.readChunk(3), throwsStateError); - }); + test('readChunk() exact elements', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); - test('readChunk() -- chunk in given size', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2'], - ])); - expect(await s.readChunk(2), equals(['a', 'b'])); - expect(await s.readChunk(3), equals(['c', '1', '2'])); - expect(await s.readChunk(1), equals([])); + expect(await r.readChunk(10), equals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])); + expect(await r.readChunk(1), equals([])); + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); }); - test('readChunk() -- chunks one item at the time', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2'], - ])); - expect(await s.readChunk(1), equals(['a'])); - expect(await s.readChunk(1), equals(['b'])); - expect(await s.readChunk(1), equals(['c'])); - expect(await s.readChunk(1), equals(['1'])); - expect(await s.readChunk(1), equals(['2'])); - expect(await s.readChunk(1), equals([])); + test('readChunk() past end', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); + + expect(await r.readChunk(20), equals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])); + expect(await r.readChunk(1), equals([])); + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); }); - test('readChunk() -- until exact end of stream', () async { - final stream = Stream.fromIterable(Iterable.generate( - 10, - (_) => Uint8List(512), - )); + test('readChunk() chunks of 2 elements', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); - final s = ChunkedStreamReader(stream); - while (true) { - final c = await s.readBytes(1024); - if (c.isEmpty) { - break; - } - } + expect(await r.readChunk(2), equals([1, 2])); + expect(await r.readChunk(2), equals([3, 4])); + expect(await r.readChunk(2), equals([5, 6])); + expect(await r.readChunk(2), equals([7, 8])); + expect(await r.readChunk(2), equals([9, 10])); + expect(await r.readChunk(1), equals([])); + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); }); - test('readChunk() -- one big chunk', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2'], - ])); - expect(await s.readChunk(6), equals(['a', 'b', 'c', '1', '2'])); - }); + test('readChunk() chunks of 3 elements', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); - test('readStream() propagates stream error', () async { - final s = ChunkedStreamReader(_chunkedStreamWithError([ - ['a', 'b', 'c'], - ['1', '2'], - ])); - expect(await s.readChunk(3), equals(['a', 'b', 'c'])); - final substream = s.readStream(3); - final subChunkedStreamReader = ChunkedStreamReader(substream); - expect(() async => await subChunkedStreamReader.readChunk(3), - throwsStateError); + expect(await r.readChunk(3), equals([1, 2, 3])); + expect(await r.readChunk(3), equals([4, 5, 6])); + expect(await r.readChunk(3), equals([7, 8, 9])); + expect(await r.readChunk(3), equals([10])); + expect(await r.readChunk(1), equals([])); + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); }); - test('readStream() + _readChunkedStream()', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2'], - ])); - expect(await _readChunkedStream(s.readStream(5)), - equals(['a', 'b', 'c', '1', '2'])); - expect(await s.readChunk(1), equals([])); - }); + test('readChunk() cancel half way', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); - test('(readStream() + _readChunkedStream()) x 2', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2'], - ])); - expect(await _readChunkedStream(s.readStream(2)), equals(['a', 'b'])); - expect(await _readChunkedStream(s.readStream(3)), equals(['c', '1', '2'])); + expect(await r.readChunk(5), equals([1, 2, 3, 4, 5])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); }); - test('readStream() + _readChunkedStream() -- past end', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2'], - ])); - expect(await _readChunkedStream(s.readStream(6)), - equals(['a', 'b', 'c', '1', '2'])); - expect(await s.readChunk(1), equals([])); - }); + test('readChunk() propagates exception', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + throw Exception('stopping here'); + }()); - test('readChunk() readStream() + _readChunkedStream() readChunk()', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2'], - ])); - expect(await s.readChunk(1), equals(['a'])); - expect(await _readChunkedStream(s.readStream(3)), equals(['b', 'c', '1'])); - expect(await s.readChunk(2), equals(['2'])); + expect(await r.readChunk(3), equals([1, 2, 3])); + await expectLater(r.readChunk(3), throwsException); + + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); }); - test( - 'readChunk() StreamIterator(readStream()).cancel() readChunk() ' - '-- one item at the time', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2'], - ])); - expect(await s.readChunk(1), equals(['a'])); - final i = StreamIterator(s.readStream(3)); + test('readStream() forwards chunks', () async { + final chunk2 = [3, 4, 5]; + final chunk3 = [6, 7, 8, 9]; + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield chunk2; + yield chunk3; + yield [10]; + }()); + + expect(await r.readChunk(1), equals([1])); + final i = StreamIterator(r.readStream(9)); expect(await i.moveNext(), isTrue); - await i.cancel(); - expect(await s.readChunk(1), equals(['2'])); - expect(await s.readChunk(1), equals([])); - }); + expect(i.current, equals([2])); - test( - 'readChunk() StreamIterator(readStream()) readChunk() ' - '-- one item at the time', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2'], - ])); - expect(await s.readChunk(1), equals(['a'])); - final i = StreamIterator(s.readStream(3)); + // We must forward the exact chunks otherwise it's not efficient! + // Hence, we have a reference equality check here. expect(await i.moveNext(), isTrue); - expect(i.current, equals(['b', 'c'])); + expect(i.current, equals([3, 4, 5])); + expect(i.current == chunk2, isTrue); + expect(await i.moveNext(), isTrue); - expect(i.current, equals(['1'])); + expect(i.current, equals([6, 7, 8, 9])); + expect(i.current == chunk3, isTrue); + + expect(await i.moveNext(), isTrue); + expect(i.current, equals([10])); expect(await i.moveNext(), isFalse); - expect(await s.readChunk(1), equals(['2'])); - expect(await s.readChunk(1), equals([])); - }); - test('readStream() x 2', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2'], - ])); - expect( - await s.readStream(2).toList(), - equals([ - ['a', 'b'] - ])); - expect( - await s.readStream(3).toList(), - equals([ - ['c'], - ['1', '2'] - ])); + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); }); - test( - 'readChunk() StreamIterator(readStream()).cancel() readChunk() -- ' - 'cancellation after reading', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2'], - ])); - expect(await s.readChunk(1), equals(['a'])); - final i = StreamIterator(s.readStream(3)); - expect(await i.moveNext(), isTrue); - await i.cancel(); - expect(await s.readChunk(1), equals(['2'])); - expect(await s.readChunk(1), equals([])); - }); + test('readStream() is drained when canceled', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); - test( - 'readChunk() StreamIterator(readStream()).cancel() readChunk() -- ' - 'cancellation after reading (2)', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2', '3'], - ['4', '5', '6'] - ])); - expect(await s.readChunk(1), equals(['a'])); - final i = StreamIterator(s.readStream(6)); + expect(await r.readChunk(1), equals([1])); + final i = StreamIterator(r.readStream(7)); expect(await i.moveNext(), isTrue); + expect(i.current, equals([2])); + // Cancelling here should skip the remainder of the substream + // and we continue to read 9 and 10 from r await i.cancel(); - expect(await s.readChunk(1), equals(['5'])); - expect(await s.readChunk(1), equals(['6'])); - }); - test( - 'readChunk() StreamIterator(readStream()) readChunk() -- ' - 'not cancelling produces StateError', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2'], - ])); - expect(await s.readChunk(1), equals(['a'])); - final i = StreamIterator(s.readStream(3)); - expect(await i.moveNext(), isTrue); - expect(() async => await s.readChunk(1), throwsStateError); - }); + expect(await r.readChunk(2), equals([9, 10])); - test( - 'readChunk() StreamIterator(readStream()) readChunk() -- ' - 'not cancelling produces StateError (2)', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2'], - ])); - expect(await s.readChunk(1), equals(['a'])); - - /// ignore: unused_local_variable - final i = StreamIterator(s.readStream(3)); - expect(() async => await s.readChunk(1), throwsStateError); + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); }); - test( - 'readChunk() readStream() that ends with first chunk + ' - '_readChunkedStream() readChunk()', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2'], - ])); - expect(await s.readChunk(1), equals(['a'])); - expect( - await s.readStream(2).toList(), - equals([ - ['b', 'c'] - ])); - expect(await s.readChunk(3), equals(['1', '2'])); - }); + test('readStream() concurrent reads is forbidden', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); - test( - 'readChunk() readStream() that ends with first chunk + drain() ' - 'readChunk()', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2'], - ])); - expect(await s.readChunk(1), equals(['a'])); - final sub = s.readStream(2); - await sub.drain(); - expect(await s.readChunk(3), equals(['1', '2'])); - }); + expect(await r.readChunk(1), equals([1])); + // Notice we are not reading this substream: + r.readStream(7); - test( - 'readChunk() readStream() that ends with second chunk + ' - '_readChunkedStream() readChunk()', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2'], - ['3', '4'] - ])); - expect(await s.readChunk(1), equals(['a'])); - expect( - await s.readStream(4).toList(), - equals([ - ['b', 'c'], - ['1', '2'] - ])); - expect(await s.readChunk(3), equals(['3', '4'])); + expectLater(r.readChunk(2), throwsStateError); }); - test( - 'readChunk() readStream() that ends with second chunk + ' - 'drain() readChunk()', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2'], - ['3', '4'], - ])); - expect(await s.readChunk(1), equals(['a'])); - final substream = s.readStream(4); - await substream.drain(); - expect(await s.readChunk(3), equals(['3', '4'])); - }); + test('readStream() supports draining', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); - test( - 'readChunk() readStream() readChunk() before ' - 'draining substream produces StateError', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2'], - ['3', '4'], - ])); - expect(await s.readChunk(1), equals(['a'])); - // ignore: unused_local_variable - final substream = s.readStream(4); - expect(() async => await s.readChunk(3), throwsStateError); - }); + expect(await r.readChunk(1), equals([1])); + await r.readStream(7).drain(); + expect(await r.readChunk(2), equals([9, 10])); - test('creating two substreams simultaneously causes a StateError', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b', 'c'], - ['1', '2'], - ['3', '4'], - ])); - expect(await s.readChunk(1), equals(['a'])); - // ignore: unused_local_variable - final substream = s.readStream(4); - expect(() async { - //ignore: unused_local_variable - final substream2 = s.readStream(3); - }, throwsStateError); + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); }); test('nested ChunkedStreamReader', () async { - final s = ChunkedStreamReader(_chunkedStream([ - ['a', 'b'], - ['1', '2'], - ['3', '4'], - ])); - expect(await s.readChunk(1), equals(['a'])); - final substream = s.readStream(4); - final nested = ChunkedStreamReader(substream); - expect(await nested.readChunk(2), equals(['b', '1'])); - expect(await nested.readChunk(3), equals(['2', '3'])); - expect(await nested.readChunk(2), equals([])); - expect(await s.readChunk(1), equals(['4'])); + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); + + expect(await r.readChunk(1), equals([1])); + final r2 = ChunkedStreamReader(r.readStream(7)); + expect(await r2.readChunk(2), equals([2, 3])); + expect(await r2.readChunk(1), equals([4])); + await r2.cancel(); + + expect(await r.readChunk(2), equals([9, 10])); + + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); }); - test('ChunkedStreamReaderByteStreamExt', () async { - final s = ChunkedStreamReader(_chunkedStream([ - [1, 2, 3], - [4], - ])); - expect(await s.readBytes(1), equals([1])); - expect(await s.readBytes(1), isA()); - expect(await s.readBytes(1), equals([3])); - expect(await s.readBytes(1), equals([4])); - expect(await s.readBytes(1), equals([])); + test('readBytes() chunks of 3 elements', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); + + expect(await r.readBytes(3), allOf(equals([1, 2, 3]), isA())); + expect(await r.readBytes(3), allOf(equals([4, 5, 6]), isA())); + expect(await r.readBytes(3), allOf(equals([7, 8, 9]), isA())); + expect(await r.readBytes(3), allOf(equals([10]), isA())); + expect(await r.readBytes(1), equals([])); + expect(await r.readBytes(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readBytes(1), equals([])); + }); + + test('readChunk() until exact end of stream', () async { + final stream = Stream.fromIterable(Iterable.generate( + 10, + (_) => Uint8List(512), + )); + + final r = ChunkedStreamReader(stream); + while (true) { + final c = await r.readBytes(1024); + if (c.isEmpty) { + break; + } + } }); test('cancel while readChunk() is pending', () async { - final s = ChunkedStreamReader(() async* { + final r = ChunkedStreamReader(() async* { yield [1, 2, 3]; // This will hang forever, so we will call cancel() await Completer().future; @@ -382,19 +291,19 @@ void main() { fail('unreachable!'); }()); - expect(await s.readBytes(2), equals([1, 2])); + expect(await r.readBytes(2), equals([1, 2])); - final future = s.readChunk(2); + final future = r.readChunk(2); // Wait a tiny bit and cancel await Future.microtask(() => null); - s.cancel(); + r.cancel(); expect(await future, hasLength(lessThan(2))); }); test('cancel while readStream() is pending', () async { - final s = ChunkedStreamReader(() async* { + final r = ChunkedStreamReader(() async* { yield [1, 2, 3]; // This will hang forever, so we will call cancel() await Completer().future; @@ -402,13 +311,13 @@ void main() { fail('unreachable!'); }()); - expect(await collectBytes(s.readStream(2)), equals([1, 2])); + expect(await collectBytes(r.readStream(2)), equals([1, 2])); - final stream = s.readStream(2); + final stream = r.readStream(2); // Wait a tiny bit and cancel await Future.microtask(() => null); - s.cancel(); + r.cancel(); expect(await collectBytes(stream), hasLength(lessThan(2))); }); From 5b9db706c4711b34d40b9c1b9d01d5603bf95cc9 Mon Sep 17 00:00:00 2001 From: Jonas Finnemann Jensen Date: Tue, 13 Apr 2021 22:02:42 +0200 Subject: [PATCH 6/6] More tests and const empty list --- lib/src/chunked_stream_reader.dart | 3 +- test/chunked_stream_reader.dart | 56 ++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/lib/src/chunked_stream_reader.dart b/lib/src/chunked_stream_reader.dart index e5bbfb2..855a772 100644 --- a/lib/src/chunked_stream_reader.dart +++ b/lib/src/chunked_stream_reader.dart @@ -3,7 +3,6 @@ // BSD-style license that can be found in the LICENSE file. import 'dart:async'; -import 'dart:collection'; import 'dart:typed_data'; import 'byte_collector.dart' show collectBytes; @@ -40,7 +39,7 @@ import 'byte_collector.dart' show collectBytes; /// the future from a previous call has completed. class ChunkedStreamReader { final StreamIterator> _input; - final List _emptyList = UnmodifiableListView([]); + final List _emptyList = const []; List _buffer = []; bool _reading = false; diff --git a/test/chunked_stream_reader.dart b/test/chunked_stream_reader.dart index 9991807..7dcd408 100644 --- a/test/chunked_stream_reader.dart +++ b/test/chunked_stream_reader.dart @@ -173,6 +173,62 @@ void main() { expect(await r.readChunk(1), equals([])); }); + test('readStream() cancel at the exact end', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); + + expect(await r.readChunk(1), equals([1])); + final i = StreamIterator(r.readStream(7)); + expect(await i.moveNext(), isTrue); + expect(i.current, equals([2])); + + expect(await i.moveNext(), isTrue); + expect(i.current, equals([3, 4, 5])); + + expect(await i.moveNext(), isTrue); + expect(i.current, equals([6, 7, 8])); + + await i.cancel(); // cancel substream just as it's ending + + expect(await r.readChunk(2), equals([9, 10])); + + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); + }); + + test('readStream() cancel at the exact end on chunk boundary', () async { + final r = ChunkedStreamReader(() async* { + yield [1, 2]; + yield [3, 4, 5]; + yield [6, 7, 8, 9]; + yield [10]; + }()); + + expect(await r.readChunk(1), equals([1])); + final i = StreamIterator(r.readStream(8)); + expect(await i.moveNext(), isTrue); + expect(i.current, equals([2])); + + expect(await i.moveNext(), isTrue); + expect(i.current, equals([3, 4, 5])); + + expect(await i.moveNext(), isTrue); + expect(i.current, equals([6, 7, 8, 9])); + + await i.cancel(); // cancel substream just as it's ending + + expect(await r.readChunk(2), equals([10])); + + expect(await r.readChunk(1), equals([])); + await r.cancel(); // check this is okay! + expect(await r.readChunk(1), equals([])); + }); + test('readStream() is drained when canceled', () async { final r = ChunkedStreamReader(() async* { yield [1, 2];