Skip to content
This repository has been archived by the owner on Oct 17, 2024. It is now read-only.

Add ChunkedStreamReader #161

Merged
merged 6 commits into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
## 2.5.1-dev
## 2.6.0

* Added `ChunkedStreamReader` for reading _chunked streams_ without managing
buffers.

## 2.5.0

Expand Down
1 change: 1 addition & 0 deletions lib/async.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ export 'src/stream_subscription_transformer.dart';
export 'src/stream_zip.dart';
export 'src/subscription_stream.dart';
export 'src/typed_stream_transformer.dart';
export 'src/chunked_stream_reader.dart';
177 changes: 177 additions & 0 deletions lib/src/chunked_stream_reader.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:typed_data';

import 'byte_collector.dart' show collectBytes;

/// Utility class for reading elements from a _chunked stream_.
///
/// A _chunked stream_ is a stream where each event is a chunk of elements.
/// Byte-streams with the type `Stream<List<int>>` is common of example of this.
/// As illustrated in the example below, this utility class makes it easy to
/// read a _chunked stream_ using custom chunk sizes and sub-stream sizes,
/// without managing partially read chunks.
///
/// ```dart
/// final r = ChunkedStreamReader(File('myfile.txt').openRead());
/// try {
/// // Read the first 4 bytes
/// final firstBytes = await r.readChunk(4);
/// if (firstBytes.length < 4) {
/// throw Exception('myfile.txt has less than 4 bytes');
/// }
///
/// // Read next 8 kilobytes as a substream
/// Stream<List<int>> substream = r.readStream(8 * 1024);
///
/// ...
/// } finally {
/// // We always cancel the ChunkedStreamReader, this ensures the underlying
/// // stream is cancelled.
/// r.cancel();
/// }
/// ```
///
/// The read-operations [readChunk] and [readStream] must not be invoked until
/// the future from a previous call has completed.
class ChunkedStreamReader<T> {
final StreamIterator<List<T>> _input;
final List<T> _emptyList = const [];
List<T> _buffer = <T>[];
bool _reading = false;

factory ChunkedStreamReader(Stream<List<T>> stream) =>
ChunkedStreamReader._(StreamIterator(stream));

ChunkedStreamReader._(this._input);

/// Read next [size] elements from _chunked stream_, buffering to create a
/// chunk with [size] elements.
///
/// This will read _chunks_ from the underlying _chunked stream_ until [size]
/// elements have been buffered, or end-of-stream, then it returns the first
/// [size] buffered elements.
///
/// If end-of-stream is encountered before [size] elements is read, this
/// returns a list with fewer than [size] elements (indicating end-of-stream).
///
/// If the underlying stream throws, the stream is cancelled, the exception is
/// propogated and further read operations will fail.
///
/// Throws, if another read operation is on-going.
Future<List<T>> readChunk(int size) async {
final result = <T>[];
await for (final chunk in readStream(size)) {
result.addAll(chunk);
}
return result;
}

/// Read next [size] elements from _chunked stream_ as a sub-stream.
///
/// This will pass-through _chunks_ from the underlying _chunked stream_ until
/// [size] elements have been returned, or end-of-stream has been encountered.
///
/// If end-of-stream is encountered before [size] elements is read, this
/// returns a list with fewer than [size] elements (indicating end-of-stream).
///
/// If the underlying stream throws, the stream is cancelled, the exception is
/// propogated and further read operations will fail.
///
/// If the sub-stream returned from [readStream] is cancelled the remaining
/// unread elements up-to [size] are drained, allowing subsequent
/// read-operations to proceed after cancellation.
///
/// Throws, if another read-operation is on-going.
Stream<List<T>> readStream(int size) {
RangeError.checkNotNegative(size, 'size');
if (_reading) {
throw StateError('Concurrent read operations are not allowed!');
}
_reading = true;

final substream = () async* {
// While we have data to read
while (size > 0) {
// Read something into the buffer, if it's empty
if (_buffer.isEmpty) {
if (!(await _input.moveNext())) {
// Don't attempt to read more data, as there is no more data.
size = 0;
_reading = false;
break;
}
_buffer = _input.current;
}

if (_buffer.isNotEmpty) {
jonasfj marked this conversation as resolved.
Show resolved Hide resolved
if (size < _buffer.length) {
final output = _buffer.sublist(0, size);
_buffer = _buffer.sublist(size);
size = 0;
yield output;
_reading = false;
break;
}

final output = _buffer;
size -= _buffer.length;
_buffer = _emptyList;
yield output;
}
}
};

final c = StreamController<List<T>>();
c.onListen = () => c.addStream(substream()).whenComplete(c.close);
c.onCancel = () async {
while (size > 0) {
jonasfj marked this conversation as resolved.
Show resolved Hide resolved
if (_buffer.isEmpty) {
if (!await _input.moveNext()) {
size = 0; // no more data
break;
}
_buffer = _input.current;
}

if (size < _buffer.length) {
_buffer = _buffer.sublist(size);
size = 0;
break;
}

size -= _buffer.length;
_buffer = _emptyList;
}
_reading = false;
};

return c.stream;
}

/// Cancel the underlying _chunked stream_.
///
/// If a future from [readChunk] or [readStream] is still pending then
/// [cancel] behaves as if the underlying stream ended early. That is a future
/// from [readChunk] may return a partial chunk smaller than the request size.
///
/// It is always safe to call [cancel], even if the underlying stream was read
/// to completion.
///
/// It can be a good idea to call [cancel] in a `finally`-block when done
/// using the [ChunkedStreamReader], this mitigates risk of leaking resources.
Future<void> cancel() async => await _input.cancel();
jonasfj marked this conversation as resolved.
Show resolved Hide resolved
}

/// Extensions for using [ChunkedStreamReader] with byte-streams.
extension ChunkedStreamReaderByteStreamExt on ChunkedStreamReader<int> {
/// Read bytes into a [Uint8List].
///
/// This does the same as [readChunk], except it uses [collectBytes] to create
/// a [Uint8List], which offers better performance.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And because you only use sublist internally, which preserves typed_data lists, it's going to be Uint8List all the way down. Nice!

Future<Uint8List> readBytes(int size) async =>
await collectBytes(readStream(size));
}
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: async
version: 2.5.1-dev
version: 2.6.0

description: Utility functions and classes related to the 'dart:async' library.
repository: https://github.com/dart-lang/async
Expand Down
Loading