Skip to content
This repository has been archived by the owner on Oct 17, 2024. It is now read-only.

Style tweaks in stream_queue.dart #167

Merged
merged 3 commits into from
Apr 21, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 52 additions & 68 deletions lib/src/stream_queue.dart
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class StreamQueue<T> {
}
}

/// Asks if the stream has any more events.
/// Whether the stream has any more events.
///
/// Returns a future that completes with `true` if the stream has any
/// more events, whether data or error.
Expand All @@ -138,12 +138,10 @@ class StreamQueue<T> {
/// Another alternative is to use `take(1)` which returns either zero or
/// one events.
Future<bool> get hasNext {
if (!_isClosed) {
var hasNextRequest = _HasNextRequest<T>();
_addRequest(hasNextRequest);
return hasNextRequest.future;
}
throw _failClosed();
_checkClosed();
var hasNextRequest = _HasNextRequest<T>();
_addRequest(hasNextRequest);
return hasNextRequest.future;
}

/// Look at the next [count] data events without consuming them.
Expand All @@ -152,13 +150,11 @@ class StreamQueue<T> {
/// If one of the next [count] events is an error, the returned future
/// completes with this error, and the error is still left in the queue.
Future<List<T>> lookAhead(int count) {
if (count < 0) throw RangeError.range(count, 0, null, 'count');
if (!_isClosed) {
var request = _LookAheadRequest<T>(count);
_addRequest(request);
return request.future;
}
throw _failClosed();
RangeError.checkNotNegative(count, 'count');
_checkClosed();
var request = _LookAheadRequest<T>(count);
_addRequest(request);
return request.future;
}

/// Requests the next (yet unrequested) event from the stream.
Expand All @@ -176,28 +172,24 @@ class StreamQueue<T> {
/// and they will be completed in the order they were requested, by the
/// first events that were not consumed by previous requeusts.
Future<T> get next {
if (!_isClosed) {
var nextRequest = _NextRequest<T>();
_addRequest(nextRequest);
return nextRequest.future;
}
throw _failClosed();
_checkClosed();
var nextRequest = _NextRequest<T>();
_addRequest(nextRequest);
return nextRequest.future;
}

/// Looks at the next (yet unrequested) event from the stream.
///
/// Like [next] except that the event is not consumed.
/// If the next event is an error event, it stays in the queue.
Future<T> get peek {
if (!_isClosed) {
var nextRequest = _PeekRequest<T>();
_addRequest(nextRequest);
return nextRequest.future;
}
throw _failClosed();
_checkClosed();
var nextRequest = _PeekRequest<T>();
_addRequest(nextRequest);
return nextRequest.future;
}

/// Returns a stream of all the remaning events of the source stream.
/// A stream of all the remaning events of the source stream.
///
/// All requested [next], [skip] or [take] operations are completed
/// first, and then any remaining events are provided as events of
Expand All @@ -207,9 +199,7 @@ class StreamQueue<T> {
/// `rest` the caller may no longer request other events, like
/// after calling [cancel].
Stream<T> get rest {
if (_isClosed) {
throw _failClosed();
}
_checkClosed();
var request = _RestRequest<T>(this);
_isClosed = true;
_addRequest(request);
Expand All @@ -232,13 +222,11 @@ class StreamQueue<T> {
/// then all events were succssfully skipped. If the value
/// is greater than zero then the stream ended early.
Future<int> skip(int count) {
if (count < 0) throw RangeError.range(count, 0, null, 'count');
if (!_isClosed) {
var request = _SkipRequest<T>(count);
_addRequest(request);
return request.future;
}
throw _failClosed();
RangeError.checkNotNegative(count, 'count');
_checkClosed();
var request = _SkipRequest<T>(count);
_addRequest(request);
return request.future;
}

/// Requests the next [count] data events as a list.
Expand All @@ -257,13 +245,11 @@ class StreamQueue<T> {
/// of data collected so far. That is, the returned
/// list may have fewer than [count] elements.
Future<List<T>> take(int count) {
if (count < 0) throw RangeError.range(count, 0, null, 'count');
if (!_isClosed) {
var request = _TakeRequest<T>(count);
_addRequest(request);
return request.future;
}
throw _failClosed();
RangeError.checkNotNegative(count, 'count');
_checkClosed();
var request = _TakeRequest<T>(count);
_addRequest(request);
return request.future;
}

/// Requests a transaction that can conditionally consume events.
Expand All @@ -285,7 +271,7 @@ class StreamQueue<T> {
///
/// ```dart
/// /// Consumes all empty lines from the beginning of [lines].
/// Future consumeEmptyLines(StreamQueue<String> lines) async {
/// Future<void> consumeEmptyLines(StreamQueue<String> lines) async {
/// while (await lines.hasNext) {
/// var transaction = lines.startTransaction();
/// var queue = transaction.newQueue();
Expand All @@ -299,7 +285,7 @@ class StreamQueue<T> {
/// }
/// ```
StreamQueueTransaction<T> startTransaction() {
if (_isClosed) throw _failClosed();
_checkClosed();

var request = _TransactionRequest(this);
_addRequest(request);
Expand All @@ -320,7 +306,7 @@ class StreamQueue<T> {
///
/// ```dart
/// /// Consumes all empty lines from the beginning of [lines].
/// Future consumeEmptyLines(StreamQueue<String> lines) async {
/// Future<void> consumeEmptyLines(StreamQueue<String> lines) async {
/// while (await lines.hasNext) {
/// // Consume a line if it's empty, otherwise return.
/// if (!await lines.withTransaction(
Expand All @@ -330,23 +316,24 @@ class StreamQueue<T> {
/// }
/// }
/// ```
Future<bool> withTransaction(Future<bool> Function(StreamQueue<T>) callback) {
Future<bool> withTransaction(
Future<bool> Function(StreamQueue<T>) callback) async {
var transaction = startTransaction();

/// Avoid async/await to ensure that [startTransaction] is called
/// synchronously and so ends up in the right place in the request queue.
var queue = transaction.newQueue();
return callback(queue).then((result) {
if (result) {
transaction.commit(queue);
} else {
transaction.reject();
}
return result;
}, onError: (Object error) {
bool result;
try {
result = await callback(queue);
} on Object {
natebosch marked this conversation as resolved.
Show resolved Hide resolved
transaction.commit(queue);
throw error;
});
rethrow;
}
if (result) {
transaction.commit(queue);
} else {
transaction.reject();
}
return result;
}

/// Passes a copy of this queue to [callback], and updates this queue to match
Expand Down Expand Up @@ -394,13 +381,13 @@ class StreamQueue<T> {
/// stream had closed.
///
/// The returned future completes with the result of calling
/// `cancel`.
/// `cancel` on the subscription to the source stream.
///
/// After calling `cancel`, no further events can be requested.
/// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel]
/// may be called again.
Future? cancel({bool immediate = false}) {
if (_isClosed) throw _failClosed();
_checkClosed();
_isClosed = true;

if (!immediate) {
Expand Down Expand Up @@ -529,12 +516,9 @@ class StreamQueue<T> {
// ------------------------------------------------------------------
// Internal helper methods.

/// Returns an error for when a request is made after cancel.
///
/// Returns a [StateError] with a message saying that either
/// [cancel] or [rest] have already been called.
Error _failClosed() {
return StateError('Already cancelled');
/// Throws an error if [cancel] or [rest] have already been called.
void _checkClosed() {
natebosch marked this conversation as resolved.
Show resolved Hide resolved
if (_isClosed) throw StateError('Already cancelled');
}

/// Adds a new request to the queue.
Expand Down