Skip to content
This repository was archived by the owner on Oct 17, 2024. It is now read-only.

Import ChunkedStreamIterator #160

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/async.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export 'src/async_cache.dart';
export 'src/async_memoizer.dart';
export 'src/byte_collector.dart';
export 'src/cancelable_operation.dart';
export 'src/chunked_stream_iterator.dart';
export 'src/delegate/event_sink.dart';
export 'src/delegate/future.dart';
export 'src/delegate/sink.dart';
Expand Down
233 changes: 233 additions & 0 deletions lib/src/chunked_stream_iterator.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:typed_data';

import 'read_chunked_stream.dart';

/// Auxiliary class for iterating over the items in a chunked stream.
///
/// A _chunked stream_ is a stream in which items arrives in chunks with each
/// event from the stream. A common example is a byte stream with the type
/// `Stream<List<int>>`. In such a byte stream bytes arrives in chunks
/// `List<int>` for each event.
///
/// Note. methods on this class may not be called concurrently.
abstract class ChunkedStreamIterator<T> {
factory ChunkedStreamIterator(Stream<List<T>> stream) {
return _ChunkedStreamIterator<T>(stream);
}

/// Returns a list of the next [size] elements.
///
/// Returns a list with less than [size] elements if the end of stream is
/// encountered before [size] elements are read.
///
/// If an error is encountered before reading [size] elements, the error
/// will be thrown.
///
/// Throws if [read] or [substream] is called before the a previous
/// read-operation is completed.
Future<List<T>> read(int size);

/// Cancels the stream iterator (and the underlying stream subscription)
/// early.
///
/// Users should call [cancel] to ensure that the stream is properly closed
/// if they need to stop listening earlier than the end of the stream.
Future<void> cancel();

/// Returns a sub-[Stream] with the next [size] elements.
///
/// A sub-[Stream] is a [Stream] consisting of the next [size] elements
/// in the same order they occur in the stream used to create this iterator.
///
/// Throws if [read] or [substream] is called before the a previous
/// read-operation is completed.
///
/// ```dart
/// final s = ChunkedStreamIterator(_chunkedStream([
/// ['a', 'b', 'c'],
/// ['1', '2'],
/// ]));
/// expect(await s.read(1), equals(['a']));
///
/// // creates a substream from the chunks holding the
/// // next three elements (['b', 'c'], ['1'])
/// final i = StreamIterator(s.substream(3));
/// expect(await i.moveNext(), isTrue);
/// expect(await i.current, equals(['b', 'c']));
/// expect(await i.moveNext(), isTrue);
/// expect(await i.current, equals(['1']));
///
/// // Since the substream has been read till the end, we can continue reading
/// // from the initial stream.
/// expect(await s.read(1), equals(['2']));
/// ```
///
/// The resulting stream may contain less than [size] elements if the
/// underlying stream has less than [size] elements before the end of stream.
///
/// When the substream is cancelled, the remaining elements in the substream
/// are drained.
Stream<List<T>> substream(int size);
}

/// General purpose _chunked stream iterator_.
class _ChunkedStreamIterator<T> implements ChunkedStreamIterator<T> {
/// Underlying iterator that iterates through the original stream.
final StreamIterator<List<T>> _iterator;

/// Keeps track of the number of elements left in the current substream.
int _toRead = 0;

/// Buffered items from a previous chunk. Items in this list should not have
/// been read by the user.
late List<T> _buffered;

/// Instance variable representing an empty list object, used as the empty
/// default state for [_buffered]. Take caution not to write code that
/// directly modify the [_buffered] list by adding elements to it.
final List<T> _emptyList = [];

_ChunkedStreamIterator(Stream<List<T>> stream)
: _iterator = StreamIterator(stream) {
_buffered = _emptyList;
}

/// Returns a list of the next [size] elements.
///
/// Returns a list with less than [size] elements if the end of stream is
/// encounted before [size] elements are read.
///
/// If an error is encountered before reading [size] elements, the error
/// will be thrown.
@override
Future<List<T>> read(int size) async =>
await readChunkedStream(substream(size));

/// Cancels the stream iterator (and the underlying stream subscription)
/// early.
///
/// Users should call [cancel] to ensure that the stream is properly closed
/// if they need to stop listening earlier than the end of the stream.
@override
Future<void> cancel() async => await _iterator.cancel();

/// Returns a sub-[Stream] with the next [size] elements.
///
/// A sub-[Stream] is a [Stream] consisting of the next [size] elements
/// in the same order they occur in the stream used to create this iterator.
///
/// If [read] is called before the sub-[Stream] is fully read, a [StateError]
/// will be thrown.
///
/// ```dart
/// final s = ChunkedStreamIterator(_chunkedStream([
/// ['a', 'b', 'c'],
/// ['1', '2'],
/// ]));
/// expect(await s.read(1), equals(['a']));
///
/// // creates a substream from the chunks holding the
/// // next three elements (['b', 'c'], ['1'])
/// final i = StreamIterator(s.substream(3));
/// expect(await i.moveNext(), isTrue);
/// expect(await i.current, equals(['b', 'c']));
/// expect(await i.moveNext(), isTrue);
/// expect(await i.current, equals(['1']));
///
/// // Since the substream has been read till the end, we can continue reading
/// // from the initial stream.
/// expect(await s.read(1), equals(['2']));
/// ```
///
/// The resulting stream may contain less than [size] elements if the
/// underlying stream has less than [size] elements before the end of stream.
///
/// When the substream is cancelled, the remaining elements in the substream
/// are drained.
@override
Stream<List<T>> substream(int size) {
if (size < 0) {
throw ArgumentError.value(size, 'size', 'must be non-negative');
}
if (_toRead > 0) {
throw StateError('Concurrent invocations are not supported!');
}

_toRead = size;

// Creates a new [StreamController] made out of the elements from
// [_iterator].
final substream = _substream();
final newController = StreamController<List<T>>();

// When [newController]'s stream is cancelled, drain all the remaining
// elements.
newController.onCancel = () async {
await _substream().drain();
};

// Since the controller should only have [size] elements, we close
// [newController]'s stream once all the elements in [substream] have
// been added. This is necessary so that await-for loops on
// [newController.stream] will complete.
final future = newController.addStream(substream);
future.whenComplete(() {
newController.close();
});

return newController.stream;
}

/// Asynchronous generator implementation for [substream].
Stream<List<T>> _substream() async* {
// Only yield when there are elements to be read.
while (_toRead > 0) {
// If [_buffered] is empty, set it to the next element in the stream if
// possible.
if (_buffered.isEmpty) {
if (!(await _iterator.moveNext())) {
break;
}

_buffered = _iterator.current;
}

List<T> toYield;
if (_toRead < _buffered.length) {
// If there are less than [_buffered.length] elements left to be read
// in the substream, sublist the chunk from [_buffered] accordingly.
toYield = _buffered.sublist(0, _toRead);
_buffered = _buffered.sublist(_toRead);
_toRead = 0;
} else {
// Otherwise prepare to yield the full [_buffered] chunk, updating
// the other variables accordingly
toYield = _buffered;
_toRead -= _buffered.length;
_buffered = _emptyList;
}

yield toYield;
}

// Set [_toRead] to be 0. This line is necessary if the size that is passed
// in is greater than the number of elements in [_iterator].
_toRead = 0;
}
}

/// Extension methods for [ChunkedStreamIterator] when working with byte-streams
/// [Stream<List<int>>].
extension ChunkedStreamIteratorByteStreamExt on ChunkedStreamIterator<int> {
/// Read bytes as [Uint8List].
///
/// This does the same as [read], except it uses [readByteStream] to create
/// a [Uint8List], which offers better performance.
Future<Uint8List> readBytes(int size) async =>
await readByteStream(substream(size));
}
121 changes: 121 additions & 0 deletions lib/src/read_chunked_stream.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async' show Stream, Future;
import 'dart:typed_data';

/// Read all chunks from [input] and return a list consisting of items from all
/// chunks.
///
/// If the maximum number of items exceeded [maxSize] this will stop reading and
/// throw [MaximumSizeExceeded].
///
/// **Example**
/// ```dart
/// import 'dart:io';
///
/// List<int> readFile(String filePath) async {
/// Stream<List<int>> fileStream = File(filePath).openRead();
/// List<int> contents = await readChunkedStream(fileStream);
/// return contents;
/// }
/// ```
///
/// If reading a byte stream of type [Stream<List<int>>] consider using
/// [readByteStream] instead.
Future<List<T>> readChunkedStream<T>(
Stream<List<T>> input, {
int? maxSize,
}) async {
if (maxSize != null && maxSize < 0) {
throw ArgumentError.value(maxSize, 'maxSize must be positive, if given');
}

final result = <T>[];
await for (final chunk in input) {
result.addAll(chunk);
if (maxSize != null && result.length > maxSize) {
throw MaximumSizeExceeded(maxSize);
}
}
return result;
}

/// Read all bytes from [input] and return a [Uint8List] consisting of all bytes
/// from [input].
///
/// If the maximum number of bytes exceeded [maxSize] this will stop reading and
/// throw [MaximumSizeExceeded].
///
/// **Example**
/// ```dart
/// import 'dart:io';
///
/// Uint8List readFile(String filePath) async {
/// Stream<List<int>> fileStream = File(filePath).openRead();
/// Uint8List contents = await readByteStream(fileStream);
/// return contents;
/// }
/// ```
///
/// This method does the same as [readChunkedStream], except it returns a
/// [Uint8List] which can be faster when working with bytes.
///
/// **Remark** The returned [Uint8List] might be a view on a
/// larger [ByteBuffer]. Do not use [Uint8List.buffer] without taking into
/// account [Uint8List.lengthInBytes] and [Uint8List.offsetInBytes].
/// Doing so is never correct, but in many common cases an instance of
/// [Uint8List] will not be a view on a larger buffer, so such mistakes can go
/// undetected. Consider using [Uint8List.sublistView], to create subviews if
/// necessary.
Future<Uint8List> readByteStream(
Stream<List<int>> input, {
int? maxSize,
}) async {
if (maxSize != null && maxSize < 0) {
throw ArgumentError.value(maxSize, 'maxSize must be positive, if given');
}

final result = BytesBuilder();
await for (final chunk in input) {
result.add(chunk);
if (maxSize != null && result.length > maxSize) {
throw MaximumSizeExceeded(maxSize);
}
}
return result.takeBytes();
}

/// Create a _chunked stream_ limited to the first [maxSize] items from [input].
///
/// Throws [MaximumSizeExceeded] if [input] contains more than [maxSize] items.
Stream<List<T>> limitChunkedStream<T>(
Stream<List<T>> input, {
int? maxSize,
}) async* {
if (maxSize != null && maxSize < 0) {
throw ArgumentError.value(maxSize, 'maxSize must be positive, if given');
}

var count = 0;
await for (final chunk in input) {
if (maxSize != null && maxSize - count < chunk.length) {
yield chunk.sublist(0, maxSize - count);
throw MaximumSizeExceeded(maxSize);
}
count += chunk.length;
yield chunk;
}
}

/// Exception thrown if [maxSize] was exceeded while reading a _chunked stream_.
///
/// This is typically thrown by [readChunkedStream] or [readByteStream].
class MaximumSizeExceeded implements Exception {
final int maxSize;
const MaximumSizeExceeded(this.maxSize);

@override
String toString() => 'Input stream exceeded the maxSize: $maxSize';
}
Loading