Skip to content
This repository has been archived by the owner on Oct 17, 2024. It is now read-only.

Commit

Permalink
Faster ChunkedStreamReader. (#182)
Browse files Browse the repository at this point in the history
* Faster ChunkedStreamReader.

 * Add an internal `_offset` to track offset in `_buffer`, reducing the
   number of times we need to create a sublist internally.
 * Specialize to handle cases where `_buffer` is a `Uint8List` by
   creating a `Uint8List.sublistView` when we need to split a chunk.
  • Loading branch information
jonasfj authored Jun 17, 2021
1 parent f1c8882 commit 27e3a92
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 13 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 2.7.0-dev

* Improve performance for `ChunkedStreamReader` by creating fewer internal
sublists and specializing to create views for `Uint8List` chunks.

## 2.7.0

* Add a `Stream.slices()` extension method.
Expand Down
63 changes: 51 additions & 12 deletions lib/src/chunked_stream_reader.dart
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,34 @@ import 'byte_collector.dart' show collectBytes;
/// The read-operations [readChunk] and [readStream] must not be invoked until
/// the future from a previous call has completed.
class ChunkedStreamReader<T> {
/// Iterator over underlying stream.
///
/// The reader requests data from this input whenever requests on the
/// reader cannot be fulfilled with the already fetched data.
final StreamIterator<List<T>> _input;

/// Sentinel value used for [_buffer] when we have no value.
final List<T> _emptyList = const [];

/// Last partially consumed chunk received from [_input].
///
/// Elements up to [_offset] have already been consumed and should not be
/// consumed again.
List<T> _buffer = <T>[];

/// Offset into [_buffer] after data which have already been emitted.
///
/// The offset is between `0` and `_buffer.length`, both inclusive.
/// The data in [_buffer] from [_offset] and forward have not yet been
/// emitted by the chunked stream reader, the data before [_offset] has.
int _offset = 0;

/// Whether a read request is currently being processed.
///
/// Is `true` while a request is in progress.
/// While a read request, like [readChunk] or [readStream], is being processed,
/// no new requests can be made.
/// New read attempts will throw instead.
bool _reading = false;

factory ChunkedStreamReader(Stream<List<T>> stream) =>
Expand Down Expand Up @@ -96,30 +121,40 @@ class ChunkedStreamReader<T> {
final substream = () async* {
// While we have data to read
while (size > 0) {
// Read something into the buffer, if it's empty
if (_buffer.isEmpty) {
// Read something into the buffer, if buffer has been consumed.
assert(_offset <= _buffer.length);
if (_offset == _buffer.length) {
if (!(await _input.moveNext())) {
// Don't attempt to read more data, as there is no more data.
size = 0;
_reading = false;
break;
}
_buffer = _input.current;
_offset = 0;
}

if (_buffer.isNotEmpty) {
if (size < _buffer.length) {
final output = _buffer.sublist(0, size);
_buffer = _buffer.sublist(size);
final remainingBuffer = _buffer.length - _offset;
if (remainingBuffer > 0) {
if (remainingBuffer >= size) {
List<T> output;
if (_buffer is Uint8List) {
output = Uint8List.sublistView(
_buffer as Uint8List, _offset, _offset + size) as List<T>;
} else {
output = _buffer.sublist(_offset, _offset + size);
}
_offset += size;
size = 0;
yield output;
_reading = false;
break;
}

final output = _buffer;
size -= _buffer.length;
final output = _offset == 0 ? _buffer : _buffer.sublist(_offset);
size -= remainingBuffer;
_buffer = _emptyList;
_offset = 0;
yield output;
}
}
Expand All @@ -129,22 +164,26 @@ class ChunkedStreamReader<T> {
c.onListen = () => c.addStream(substream()).whenComplete(c.close);
c.onCancel = () async {
while (size > 0) {
if (_buffer.isEmpty) {
assert(_offset <= _buffer.length);
if (_buffer.length == _offset) {
if (!await _input.moveNext()) {
size = 0; // no more data
break;
}
_buffer = _input.current;
_offset = 0;
}

if (size < _buffer.length) {
_buffer = _buffer.sublist(size);
final remainingBuffer = _buffer.length - _offset;
if (remainingBuffer >= size) {
_offset += size;
size = 0;
break;
}

size -= _buffer.length;
size -= remainingBuffer;
_buffer = _emptyList;
_offset = 0;
}
_reading = false;
};
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: async
version: 2.7.0
version: 2.7.1-dev

description: Utility functions and classes related to the 'dart:async' library.
repository: https://github.com/dart-lang/async
Expand Down
102 changes: 102 additions & 0 deletions test/chunked_stream_reader.dart
Original file line number Diff line number Diff line change
Expand Up @@ -377,4 +377,106 @@ void main() {

expect(await collectBytes(stream), hasLength(lessThan(2)));
});

test('readChunk() chunk by chunk (Uint8List)', () async {
final r = ChunkedStreamReader(() async* {
yield Uint8List.fromList([1, 2]);
yield Uint8List.fromList([3, 4, 5]);
yield Uint8List.fromList([6, 7, 8, 9]);
yield Uint8List.fromList([10]);
}());

expect(await r.readChunk(2), equals([1, 2]));
expect(await r.readChunk(3), equals([3, 4, 5]));
expect(await r.readChunk(4), equals([6, 7, 8, 9]));
expect(await r.readChunk(1), equals([10]));
expect(await r.readChunk(1), equals([]));
expect(await r.readChunk(1), equals([]));
await r.cancel(); // check this is okay!
expect(await r.readChunk(1), equals([]));
});

test('readChunk() element by element (Uint8List)', () async {
final r = ChunkedStreamReader(() async* {
yield Uint8List.fromList([1, 2]);
yield Uint8List.fromList([3, 4, 5]);
yield Uint8List.fromList([6, 7, 8, 9]);
yield Uint8List.fromList([10]);
}());

for (var i = 0; i < 10; i++) {
expect(await r.readChunk(1), equals([i + 1]));
}
expect(await r.readChunk(1), equals([]));
expect(await r.readChunk(1), equals([]));
await r.cancel(); // check this is okay!
expect(await r.readChunk(1), equals([]));
});

test('readChunk() exact elements (Uint8List)', () async {
final r = ChunkedStreamReader(() async* {
yield Uint8List.fromList([1, 2]);
yield Uint8List.fromList([3, 4, 5]);
yield Uint8List.fromList([6, 7, 8, 9]);
yield Uint8List.fromList([10]);
}());

expect(await r.readChunk(10), equals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]));
expect(await r.readChunk(1), equals([]));
expect(await r.readChunk(1), equals([]));
await r.cancel(); // check this is okay!
expect(await r.readChunk(1), equals([]));
});

test('readChunk() past end (Uint8List)', () async {
final r = ChunkedStreamReader(() async* {
yield Uint8List.fromList([1, 2]);
yield Uint8List.fromList([3, 4, 5]);
yield Uint8List.fromList([6, 7, 8, 9]);
yield Uint8List.fromList([10]);
}());

expect(await r.readChunk(20), equals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]));
expect(await r.readChunk(1), equals([]));
expect(await r.readChunk(1), equals([]));
await r.cancel(); // check this is okay!
expect(await r.readChunk(1), equals([]));
});

test('readChunk() chunks of 2 elements (Uint8List)', () async {
final r = ChunkedStreamReader(() async* {
yield Uint8List.fromList([1, 2]);
yield Uint8List.fromList([3, 4, 5]);
yield Uint8List.fromList([6, 7, 8, 9]);
yield Uint8List.fromList([10]);
}());

expect(await r.readChunk(2), equals([1, 2]));
expect(await r.readChunk(2), equals([3, 4]));
expect(await r.readChunk(2), equals([5, 6]));
expect(await r.readChunk(2), equals([7, 8]));
expect(await r.readChunk(2), equals([9, 10]));
expect(await r.readChunk(1), equals([]));
expect(await r.readChunk(1), equals([]));
await r.cancel(); // check this is okay!
expect(await r.readChunk(1), equals([]));
});

test('readChunk() chunks of 3 elements (Uint8List)', () async {
final r = ChunkedStreamReader(() async* {
yield Uint8List.fromList([1, 2]);
yield Uint8List.fromList([3, 4, 5]);
yield Uint8List.fromList([6, 7, 8, 9]);
yield Uint8List.fromList([10]);
}());

expect(await r.readChunk(3), equals([1, 2, 3]));
expect(await r.readChunk(3), equals([4, 5, 6]));
expect(await r.readChunk(3), equals([7, 8, 9]));
expect(await r.readChunk(3), equals([10]));
expect(await r.readChunk(1), equals([]));
expect(await r.readChunk(1), equals([]));
await r.cancel(); // check this is okay!
expect(await r.readChunk(1), equals([]));
});
}

0 comments on commit 27e3a92

Please sign in to comment.