Skip to content

Commit

Permalink
Style tweaks in stream_queue.dart (dart-archive/async#167)
Browse files Browse the repository at this point in the history
- Use a noun phrase to document the `hasNext` getter. The `next` and
  `peek` getters keep their verb phrase doc comments since the side
  effects are critical distinctions between these methods. If we were
  writing this today, `next()` would be a method.
- Change `_failClosed()` which returns an error, to `_checkNotClosed()`
  which checks the condition and optionally closes. Avoid nesting the
  majority of method behavior in a conditional.
- Use `RangeError.checkNotNegative` over a conditional.
- Change bare `Future` to `Future<void>` in code examples.
- Make `withTransaction` `async` since there is no longer a blocker now
  that async methods start running synchronously.
  • Loading branch information
natebosch authored Apr 21, 2021
1 parent 842b7e8 commit dc46ed0
Showing 1 changed file with 52 additions and 68 deletions.
120 changes: 52 additions & 68 deletions pkgs/async/lib/src/stream_queue.dart
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class StreamQueue<T> {
}
}

/// Asks if the stream has any more events.
/// Whether the stream has any more events.
///
/// Returns a future that completes with `true` if the stream has any
/// more events, whether data or error.
Expand All @@ -138,12 +138,10 @@ class StreamQueue<T> {
/// Another alternative is to use `take(1)` which returns either zero or
/// one events.
Future<bool> get hasNext {
if (!_isClosed) {
var hasNextRequest = _HasNextRequest<T>();
_addRequest(hasNextRequest);
return hasNextRequest.future;
}
throw _failClosed();
_checkNotClosed();
var hasNextRequest = _HasNextRequest<T>();
_addRequest(hasNextRequest);
return hasNextRequest.future;
}

/// Look at the next [count] data events without consuming them.
Expand All @@ -152,13 +150,11 @@ class StreamQueue<T> {
/// If one of the next [count] events is an error, the returned future
/// completes with this error, and the error is still left in the queue.
Future<List<T>> lookAhead(int count) {
if (count < 0) throw RangeError.range(count, 0, null, 'count');
if (!_isClosed) {
var request = _LookAheadRequest<T>(count);
_addRequest(request);
return request.future;
}
throw _failClosed();
RangeError.checkNotNegative(count, 'count');
_checkNotClosed();
var request = _LookAheadRequest<T>(count);
_addRequest(request);
return request.future;
}

/// Requests the next (yet unrequested) event from the stream.
Expand All @@ -176,28 +172,24 @@ class StreamQueue<T> {
/// and they will be completed in the order they were requested, by the
/// first events that were not consumed by previous requeusts.
Future<T> get next {
if (!_isClosed) {
var nextRequest = _NextRequest<T>();
_addRequest(nextRequest);
return nextRequest.future;
}
throw _failClosed();
_checkNotClosed();
var nextRequest = _NextRequest<T>();
_addRequest(nextRequest);
return nextRequest.future;
}

/// Looks at the next (yet unrequested) event from the stream.
///
/// Like [next] except that the event is not consumed.
/// If the next event is an error event, it stays in the queue.
Future<T> get peek {
if (!_isClosed) {
var nextRequest = _PeekRequest<T>();
_addRequest(nextRequest);
return nextRequest.future;
}
throw _failClosed();
_checkNotClosed();
var nextRequest = _PeekRequest<T>();
_addRequest(nextRequest);
return nextRequest.future;
}

/// Returns a stream of all the remaning events of the source stream.
/// A stream of all the remaning events of the source stream.
///
/// All requested [next], [skip] or [take] operations are completed
/// first, and then any remaining events are provided as events of
Expand All @@ -207,9 +199,7 @@ class StreamQueue<T> {
/// `rest` the caller may no longer request other events, like
/// after calling [cancel].
Stream<T> get rest {
if (_isClosed) {
throw _failClosed();
}
_checkNotClosed();
var request = _RestRequest<T>(this);
_isClosed = true;
_addRequest(request);
Expand All @@ -232,13 +222,11 @@ class StreamQueue<T> {
/// then all events were succssfully skipped. If the value
/// is greater than zero then the stream ended early.
Future<int> skip(int count) {
if (count < 0) throw RangeError.range(count, 0, null, 'count');
if (!_isClosed) {
var request = _SkipRequest<T>(count);
_addRequest(request);
return request.future;
}
throw _failClosed();
RangeError.checkNotNegative(count, 'count');
_checkNotClosed();
var request = _SkipRequest<T>(count);
_addRequest(request);
return request.future;
}

/// Requests the next [count] data events as a list.
Expand All @@ -257,13 +245,11 @@ class StreamQueue<T> {
/// of data collected so far. That is, the returned
/// list may have fewer than [count] elements.
Future<List<T>> take(int count) {
if (count < 0) throw RangeError.range(count, 0, null, 'count');
if (!_isClosed) {
var request = _TakeRequest<T>(count);
_addRequest(request);
return request.future;
}
throw _failClosed();
RangeError.checkNotNegative(count, 'count');
_checkNotClosed();
var request = _TakeRequest<T>(count);
_addRequest(request);
return request.future;
}

/// Requests a transaction that can conditionally consume events.
Expand All @@ -285,7 +271,7 @@ class StreamQueue<T> {
///
/// ```dart
/// /// Consumes all empty lines from the beginning of [lines].
/// Future consumeEmptyLines(StreamQueue<String> lines) async {
/// Future<void> consumeEmptyLines(StreamQueue<String> lines) async {
/// while (await lines.hasNext) {
/// var transaction = lines.startTransaction();
/// var queue = transaction.newQueue();
Expand All @@ -299,7 +285,7 @@ class StreamQueue<T> {
/// }
/// ```
StreamQueueTransaction<T> startTransaction() {
if (_isClosed) throw _failClosed();
_checkNotClosed();

var request = _TransactionRequest(this);
_addRequest(request);
Expand All @@ -320,7 +306,7 @@ class StreamQueue<T> {
///
/// ```dart
/// /// Consumes all empty lines from the beginning of [lines].
/// Future consumeEmptyLines(StreamQueue<String> lines) async {
/// Future<void> consumeEmptyLines(StreamQueue<String> lines) async {
/// while (await lines.hasNext) {
/// // Consume a line if it's empty, otherwise return.
/// if (!await lines.withTransaction(
Expand All @@ -330,23 +316,24 @@ class StreamQueue<T> {
/// }
/// }
/// ```
Future<bool> withTransaction(Future<bool> Function(StreamQueue<T>) callback) {
Future<bool> withTransaction(
Future<bool> Function(StreamQueue<T>) callback) async {
var transaction = startTransaction();

/// Avoid async/await to ensure that [startTransaction] is called
/// synchronously and so ends up in the right place in the request queue.
var queue = transaction.newQueue();
return callback(queue).then((result) {
if (result) {
transaction.commit(queue);
} else {
transaction.reject();
}
return result;
}, onError: (Object error) {
bool result;
try {
result = await callback(queue);
} catch (_) {
transaction.commit(queue);
throw error;
});
rethrow;
}
if (result) {
transaction.commit(queue);
} else {
transaction.reject();
}
return result;
}

/// Passes a copy of this queue to [callback], and updates this queue to match
Expand Down Expand Up @@ -394,13 +381,13 @@ class StreamQueue<T> {
/// stream had closed.
///
/// The returned future completes with the result of calling
/// `cancel`.
/// `cancel` on the subscription to the source stream.
///
/// After calling `cancel`, no further events can be requested.
/// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel]
/// may be called again.
Future? cancel({bool immediate = false}) {
if (_isClosed) throw _failClosed();
_checkNotClosed();
_isClosed = true;

if (!immediate) {
Expand Down Expand Up @@ -529,12 +516,9 @@ class StreamQueue<T> {
// ------------------------------------------------------------------
// Internal helper methods.

/// Returns an error for when a request is made after cancel.
///
/// Returns a [StateError] with a message saying that either
/// [cancel] or [rest] have already been called.
Error _failClosed() {
return StateError('Already cancelled');
/// Throws an error if [cancel] or [rest] have already been called.
void _checkNotClosed() {
if (_isClosed) throw StateError('Already cancelled');
}

/// Adds a new request to the queue.
Expand Down

0 comments on commit dc46ed0

Please sign in to comment.