Skip to content
This repository has been archived by the owner on Oct 17, 2024. It is now read-only.

Add ChunkedStreamReader #161

Merged
merged 6 commits into from
Apr 16, 2021
Merged

Conversation

jonasfj
Copy link
Contributor

@jonasfj jonasfj commented Mar 29, 2021

This adds a ChunkedStreamReader as a replacement for ChunkedStreamIterator from package:chunked_stream.

Motivation: We would like to use ChunkedStreamIterator in Dart SDK and avoid dependency on package:chunked_stream. Also for consuming a chunked byte stream in pull-fashion, this has proven very useful :)

I refactored the code a bit, mostly just cleanup, better documentation comments, and rename the class so that code that imports both package:async and package:chunked_stream won't get a conflict. Test cases are the same to ensure easy migration.


Credits @walnutdust for the original ChunkedStreamIterator in package:chunked_stream.

@jonasfj jonasfj requested a review from lrhn March 29, 2021 12:02
@google-cla google-cla bot added the cla: yes label Mar 29, 2021
@jonasfj
Copy link
Contributor Author

jonasfj commented Mar 29, 2021

Note: I opted to not have ChunkStreamReader take a makeList in the constructor, but instead have ChunkedStreamReaderByteStreamExt. Because it's mostly useful for byte-streams, and, when using it you really want a Uint8List and not a List<int> as the return value.

@jonasfj jonasfj requested a review from natebosch March 29, 2021 14:37
CHANGELOG.md Outdated Show resolved Hide resolved
@jonasfj jonasfj mentioned this pull request Mar 30, 2021
5 tasks
@jonasfj
Copy link
Contributor Author

jonasfj commented Mar 30, 2021

Now proposes release of 2.6.0

@natebosch
Copy link
Contributor

Can you share any of the real world use cases you have in mind for this?

@jonasfj
Copy link
Contributor Author

jonasfj commented Mar 30, 2021

Can you share any of the real world use cases you have in mind for this?

https://pub.dev/packages/tar

Uses this under the hood. It makes it relatively easy to read a tar-stream... Because you can do chunkedStreamReader,readChunk(512) then decode the header, and do chunkedStreamReader.readStream(header.size).

So the implementation is greatly simplified, as you don't have to manage buffering. Especially, since a tar reader will want to expose the content of entries as substreams.

It often comes in handy if you want to consume a byte-stream without writing the entire stream to a buffer.

Copy link
Contributor

@lrhn lrhn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool stuff!

lib/src/chunked_stream_reader.dart Outdated Show resolved Hide resolved
lib/src/chunked_stream_reader.dart Outdated Show resolved Hide resolved
lib/src/chunked_stream_reader.dart Outdated Show resolved Hide resolved
lib/src/chunked_stream_reader.dart Outdated Show resolved Hide resolved

final c = StreamController<List<T>>();

c.addStream(() async* {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to try to optimize this.
It seems OK for size, but the async* function could probably just be an async function where yield e is replaced by c.add(e). Don't worry about it for now if it works.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we get no back-pressure.. We'd have to also hook onPause / onResume.

Yes, it could probably be a tiny bit faster, but I'm generally very weary of using StreamController.add because it doesn't return a future I can await.

lib/src/chunked_stream_reader.dart Show resolved Hide resolved
/// Read bytes into a [Uint8List].
///
/// This does the same as [readChunk], except it uses [collectBytes] to create
/// a [Uint8List], which offers better performance.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And because you only use sublist internally, which preserves typed_data lists, it's going to be Uint8List all the way down. Nice!

test/chunked_stream_reader.dart Show resolved Hide resolved
lib/src/chunked_stream_reader.dart Outdated Show resolved Hide resolved
lib/src/chunked_stream_reader.dart Outdated Show resolved Hide resolved
@simolus3
Copy link

I tried to integrate this into package:tar and ran into some unexpected errors about concurrent reads:

import 'dart:typed_data';
import 'package:async/async.dart';

Future<void> main() async {
  final block = Uint8List(512);
  final stream = Stream.fromIterable(Iterable.generate(10, (_) => block));

  final reader = ChunkedStreamReader(stream);
  while (true) {
    final chunk = await reader.readBytes(1024);

    if (chunk.isEmpty) break;
  }
}

This fails with Bad state: Concurrent read operations are not allowed!, but it appears to me that there aren't any concurrent reads. Am I overlooking something?

@jonasfj jonasfj merged commit 376c0fe into dart-archive:master Apr 16, 2021
@jonasfj jonasfj deleted the chunk-stream-reader branch April 16, 2021 20:22
@nex3
Copy link
Contributor

nex3 commented Apr 20, 2021

I know this already landed, but since it hasn't been released yet, something that may be worth considering: could this be re-imagined as a set of extension methods on StreamQueue<List<T>>? The StreamQueue class already provides the underlying semantics of pull-based access to a stream, with the added benefit of well-defined behavior for calling multiple methods synchronously and nice integration with the test package. Consider the following potential API:

extension ChunkedStreamQueueExtension<T> on StreamQueue<List<T>> {
  Future<List<T>> readChunk(int size);

  Stream<List<T>> readChunkedSubstream(int size);
}

I think withTransaction would make it possible to implement the behavior quite tersely as well.

@jonasfj
Copy link
Contributor Author

jonasfj commented Apr 23, 2021

@nex3, hmm, an extension method would have been nice, but I think I'll need a place to cache a partially consumed chunk.

It might have been possible if reader.readChunk(5) was allowed to return more than 5 elements, but that it is not. Which makes it very useful for decoding byte streams like tar where an integer says how long the next file is.


If the stream has the type Stream<List<int>> then reader.readChunk(1) will return a List<int> with length one, even when the underlying Stream<List<int>> returns everything in chunks of 4kb or 8kb (as many file systems would).

Example

Stream<List<int>> makeStreamWithChunksOfBytes() async* {
  // Byte chunk 1:
  yield [1],

  // Byte chunk 2:
  yield [2,3,4,5];

  // Byte chunk 3:
  yield [6],
}

final reader = ChunkedStreamReader(makeStreamWithChunksOfBytes());

// Read the first two bytes
expect(await reader.readChunk(2), equals([1,2]));

// At this point "Byte chunk 1" and "Byte chunk 2" has been read from the underlying stream
// and all the elements from "Byte chunk 1" have been return the user.
// BUT: elements 3,4,5 from "Byte chunk 2" are now cached inside [reader]

// Read the next 100 bytes (or in this case until the end of the stream)
expect(await reader.readChunk(100), equals([3,4,5,6]));

It's mostly useful when reading byte streams..


I don't see how an extension on StreamQueue can cache the unconsumed parts of a chunk. I would need to be able to push something back on-top of the queue. I could use StreamQueue.peakand modify the top-element, butUint8List` doesn't allow that.

@nex3
Copy link
Contributor

nex3 commented Apr 23, 2021

Yeah, I see what you mean—it's not implementable as an extension method on the class's public API. I think you could do it on the private API, though:

extension ChunkedStreamQueueExt<T> on StreamQueue<List<T>> {
  Stream<List<T>> readChunkedSubstream(int size) {
    if (size < 0) throw RangeError.range(size, 0, null, 'size');
    if (!_isClosed) {
      var request = _ReadChunkedSubstreamRequest<T>(size);
      _addRequest(request);
      return request.stream;
    }
    throw _failClosed();
  }
}

class _ReadChunkedSubstreamRequest<T> extends _EventRequest<List<T>> {
  Stream<List<T>> get stream => _controller.stream;
  final _controller = StreamController<List<T>>(sync: true);

  final int _size;

  int _consumed = 0;

  _ReadChunkedSubstreamRequest(this._size);

  @override
  bool update(QueueList<Result<List<T>>> events, bool isDone) {
    while (events.isNotEmpty) {
      var event = events.removeFirst();
      if (event.isError) {
        event.addTo(_controller);
        continue;
      }

      var data = event.asValue!.value;
      if (_consumed + data.length < _size) {
        _consumed += data.length;
        _controller.add(data);
      } else {
        if (_consumed + data.length > _size) {
          data = data.sublist(0, _size - _consumed);
          events.addFirst(Result.value(data.sublist(_size - _consumed)));
        }
        _controller.add(data);
        _controller.close();
        return true;
      }
    }

    if (isDone) _controller.close();
    return isDone;
  }
}

(Warning: I didn't test this!)

@lrhn
Copy link
Contributor

lrhn commented Apr 26, 2021

The StreamQueue was not built for extensibility, so the internal API isn't particularly user friendly.
It might be worth refactoring it to provide an interface that commands can use to access the queue, and then you can add arbitrary externally defined commands using callbacks.
Or maybe jsut add something like void nextCommand(bool function(Queue<T>, bool isDone) update);. The callback is called when it's its time to act, and then every time the underlying queue changes, until it calls close().
(The _EventRequest class is a single-method interface, so it should just be a function if exposed 6in a public API)

@jonasfj
Copy link
Contributor Author

jonasfj commented Apr 27, 2021

I think you could do it on the private API,

I suppose this might be possible. But I'm not sure it's desirable.

Using the internal API is not ideal, nor the worse. Certainly we could as @lrhn points out make a nextCommand method in the public API. This might be a powerful tool for advanced use-cases.

But for most users who want to read a byte-stream block by block, you really don't want other methods available. Those are just footguns.

ChunkedStreamReader is intended for users who are reading a byte stream and want the chunk boundaries in the underlying byte stream abstracted away. All the methods on StreamQueue operate on chunk boundaries of the underlying byte-stream, this is a different abstraction level -- I don't think mixing them will do users any favors.

If using ChunkedStreamReader the public methods on StreamQueue do not make sense, mixing these would almost always be a footgun.


Example:

  • StreamQueue.lookAhead(int count) → Future<List<T>> is a good idea, lookAhead(5) means:
    • "read 5 events from underlying stream (without removing them from the queue)"
    • But if reading from a chunked stream (like a byte-stream) it means: "read 5 chunks of arbitrary size from underlying stream (without removing them from the queue)".
  • If we were to implement ChunkedStreamReader.lookAhead(int size) → Future<List<T>> the desired semantics would be:
    • "read 5 elements from underlying chunked stream regardless of how many chunks you have to consume, and do not remove these 5 elements from the stream".
    • In the case of a byte stream it would mean: show me the next 5 bytes from the stream without removing them from the stream.

Building ChunkedStreamReader as an extension on StreamQueue would:

  • make it hard to add ChunkedStreamReader.lookAhead in the future, and,
  • expose methods like StreamQueue.lookAhead, StreamQueue.skip, StreamQueue.take, StreamQueue.eventsDispatched, etc. which consumers of a chunked stream have no business using.

Essentially, anyone using ChunkedStreamReader should never care about the individual events from the underlying stream. Any methods that gives information about underlying stream events is likely an undesired footgun.

@nex3
Copy link
Contributor

nex3 commented Apr 27, 2021

Yeah, I see what you mean. In that case, maybe it should be a wrapper rather than an extension, so you can get the nice request-sequencing behavior without having methods at two different abstraction levels. That's something that could probably be added to the existing API after a release, though.

mosuem pushed a commit to dart-lang/core that referenced this pull request Oct 14, 2024
* Added ChunkedStreamReader

* Updated changelog

* Prepare 2.6.0 release

* Address review comments, fix boundary issue

* Cleanup tests

* More tests and const empty list
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Development

Successfully merging this pull request may close these issues.

6 participants