Skip to content
This repository was archived by the owner on Oct 17, 2024. It is now read-only.

Faster ChunkedStreamReader. #182

Merged
merged 3 commits into from
Jun 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 2.7.0-dev

* Improve performance for `ChunkedStreamReader` by creating fewer internal
sublists and specializing to create views for `Uint8List` chunks.

## 2.7.0

* Add a `Stream.slices()` extension method.
Expand Down
63 changes: 51 additions & 12 deletions lib/src/chunked_stream_reader.dart
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,34 @@ import 'byte_collector.dart' show collectBytes;
/// The read-operations [readChunk] and [readStream] must not be invoked until
/// the future from a previous call has completed.
class ChunkedStreamReader<T> {
/// Iterator over underlying stream.
///
/// The reader requests data from this input whenever requests on the
/// reader cannot be fulfilled with the already fetched data.
final StreamIterator<List<T>> _input;

/// Sentinel value used for [_buffer] when we have no value.
final List<T> _emptyList = const [];

/// Last partially consumed chunk received from [_input].
///
/// Elements up to [_offset] have already been consumed and should not be
/// consumed again.
List<T> _buffer = <T>[];

/// Offset into [_buffer] after data which have already been emitted.
///
/// The offset is between `0` and `_buffer.length`, both inclusive.
/// The data in [_buffer] from [_offset] and forward have not yet been
/// emitted by the chunked stream reader, the data before [_offset] has.
int _offset = 0;

/// Whether a read request is currently being processed.
///
/// Is `true` while a request is in progress.
/// While a read request, like [readChunk] or [readStream], is being processed,
/// no new requests can be made.
/// New read attempts will throw instead.
bool _reading = false;

factory ChunkedStreamReader(Stream<List<T>> stream) =>
Expand Down Expand Up @@ -96,30 +121,40 @@ class ChunkedStreamReader<T> {
final substream = () async* {
// While we have data to read
while (size > 0) {
// Read something into the buffer, if it's empty
if (_buffer.isEmpty) {
// Read something into the buffer, if buffer has been consumed.
assert(_offset <= _buffer.length);
if (_offset == _buffer.length) {
if (!(await _input.moveNext())) {
// Don't attempt to read more data, as there is no more data.
size = 0;
_reading = false;
break;
}
_buffer = _input.current;
_offset = 0;
}

if (_buffer.isNotEmpty) {
if (size < _buffer.length) {
final output = _buffer.sublist(0, size);
_buffer = _buffer.sublist(size);
final remainingBuffer = _buffer.length - _offset;
if (remainingBuffer > 0) {
if (remainingBuffer >= size) {
List<T> output;
if (_buffer is Uint8List) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a local variable to avoid having to do both is and as on the same thing.

var buffer = _buffer;
if (buffer is Uint8List) {
  output = Uint8List.sublistView(buffer, _offset, _offset + size) as List<T>;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer a specialized ChunkedByteStreamReader with special code for Uint8List everywhere.
The readChunk(size) above is not particularly efficient when it dumps all these Uint8Lists into a plain List<int>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why we have ChunkedStreamReaderByteStreamExt, but yes, in hindsight... maybe we should only have made ChunkedByteStreamReader -- because in practice most other streams aren't chunked.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ones that are ... are probably Stream<String>, not list based. So yes.

output = Uint8List.sublistView(
_buffer as Uint8List, _offset, _offset + size) as List<T>;
} else {
output = _buffer.sublist(_offset, _offset + size);
}
_offset += size;
size = 0;
yield output;
_reading = false;
break;
}

final output = _buffer;
size -= _buffer.length;
final output = _offset == 0 ? _buffer : _buffer.sublist(_offset);
size -= remainingBuffer;
_buffer = _emptyList;
_offset = 0;
yield output;
}
}
Expand All @@ -129,22 +164,26 @@ class ChunkedStreamReader<T> {
c.onListen = () => c.addStream(substream()).whenComplete(c.close);
c.onCancel = () async {
while (size > 0) {
if (_buffer.isEmpty) {
assert(_offset <= _buffer.length);
if (_buffer.length == _offset) {
if (!await _input.moveNext()) {
size = 0; // no more data
break;
}
_buffer = _input.current;
_offset = 0;
}

if (size < _buffer.length) {
_buffer = _buffer.sublist(size);
final remainingBuffer = _buffer.length - _offset;
if (remainingBuffer >= size) {
_offset += size;
size = 0;
break;
}

size -= _buffer.length;
size -= remainingBuffer;
_buffer = _emptyList;
_offset = 0;
}
_reading = false;
};
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: async
version: 2.7.0
version: 2.7.1-dev

description: Utility functions and classes related to the 'dart:async' library.
repository: https://github.com/dart-lang/async
Expand Down
102 changes: 102 additions & 0 deletions test/chunked_stream_reader.dart
Original file line number Diff line number Diff line change
Expand Up @@ -377,4 +377,106 @@ void main() {

expect(await collectBytes(stream), hasLength(lessThan(2)));
});

test('readChunk() chunk by chunk (Uint8List)', () async {
final r = ChunkedStreamReader(() async* {
yield Uint8List.fromList([1, 2]);
yield Uint8List.fromList([3, 4, 5]);
yield Uint8List.fromList([6, 7, 8, 9]);
yield Uint8List.fromList([10]);
}());

expect(await r.readChunk(2), equals([1, 2]));
expect(await r.readChunk(3), equals([3, 4, 5]));
expect(await r.readChunk(4), equals([6, 7, 8, 9]));
expect(await r.readChunk(1), equals([10]));
expect(await r.readChunk(1), equals([]));
expect(await r.readChunk(1), equals([]));
await r.cancel(); // check this is okay!
expect(await r.readChunk(1), equals([]));
});

test('readChunk() element by element (Uint8List)', () async {
final r = ChunkedStreamReader(() async* {
yield Uint8List.fromList([1, 2]);
yield Uint8List.fromList([3, 4, 5]);
yield Uint8List.fromList([6, 7, 8, 9]);
yield Uint8List.fromList([10]);
}());

for (var i = 0; i < 10; i++) {
expect(await r.readChunk(1), equals([i + 1]));
}
expect(await r.readChunk(1), equals([]));
expect(await r.readChunk(1), equals([]));
await r.cancel(); // check this is okay!
expect(await r.readChunk(1), equals([]));
});

test('readChunk() exact elements (Uint8List)', () async {
final r = ChunkedStreamReader(() async* {
yield Uint8List.fromList([1, 2]);
yield Uint8List.fromList([3, 4, 5]);
yield Uint8List.fromList([6, 7, 8, 9]);
yield Uint8List.fromList([10]);
}());

expect(await r.readChunk(10), equals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]));
expect(await r.readChunk(1), equals([]));
expect(await r.readChunk(1), equals([]));
await r.cancel(); // check this is okay!
expect(await r.readChunk(1), equals([]));
});

test('readChunk() past end (Uint8List)', () async {
final r = ChunkedStreamReader(() async* {
yield Uint8List.fromList([1, 2]);
yield Uint8List.fromList([3, 4, 5]);
yield Uint8List.fromList([6, 7, 8, 9]);
yield Uint8List.fromList([10]);
}());

expect(await r.readChunk(20), equals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]));
expect(await r.readChunk(1), equals([]));
expect(await r.readChunk(1), equals([]));
await r.cancel(); // check this is okay!
expect(await r.readChunk(1), equals([]));
});

test('readChunk() chunks of 2 elements (Uint8List)', () async {
final r = ChunkedStreamReader(() async* {
yield Uint8List.fromList([1, 2]);
yield Uint8List.fromList([3, 4, 5]);
yield Uint8List.fromList([6, 7, 8, 9]);
yield Uint8List.fromList([10]);
}());

expect(await r.readChunk(2), equals([1, 2]));
expect(await r.readChunk(2), equals([3, 4]));
expect(await r.readChunk(2), equals([5, 6]));
expect(await r.readChunk(2), equals([7, 8]));
expect(await r.readChunk(2), equals([9, 10]));
expect(await r.readChunk(1), equals([]));
expect(await r.readChunk(1), equals([]));
await r.cancel(); // check this is okay!
expect(await r.readChunk(1), equals([]));
});

test('readChunk() chunks of 3 elements (Uint8List)', () async {
final r = ChunkedStreamReader(() async* {
yield Uint8List.fromList([1, 2]);
yield Uint8List.fromList([3, 4, 5]);
yield Uint8List.fromList([6, 7, 8, 9]);
yield Uint8List.fromList([10]);
}());

expect(await r.readChunk(3), equals([1, 2, 3]));
expect(await r.readChunk(3), equals([4, 5, 6]));
expect(await r.readChunk(3), equals([7, 8, 9]));
expect(await r.readChunk(3), equals([10]));
expect(await r.readChunk(1), equals([]));
expect(await r.readChunk(1), equals([]));
await r.cancel(); // check this is okay!
expect(await r.readChunk(1), equals([]));
});
}