From e31a9cf70f6ec838f99b38c67a83bd9a16ecedad Mon Sep 17 00:00:00 2001 From: Nate Bosch Date: Tue, 20 Apr 2021 17:31:08 -0700 Subject: [PATCH 1/3] Style tweaks in stream_queue.dart - Use a noun phrase to document the `hasNext` getter. The `next` and `peek` getters keep their verb phrase doc comments since the side effects are critical distinctions between these methods. If we were writing this today, `next()` would be a method. - Change `_failClosed()` which returns an error, to `_checkClosed()` which checks the condition and optionally closes. Avoid nesting the majority of method behavior in a conditional. - Use `RangeError.checkNotNegative` over a conditional. - Change bare `Future` to `Future` in code examples. - Make `withTransaction` `async` since there is no longer a blocker now that async methods start running synchronously. --- lib/src/stream_queue.dart | 120 +++++++++++++++++--------------------- 1 file changed, 52 insertions(+), 68 deletions(-) diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart index 5aa6054..b139b3f 100644 --- a/lib/src/stream_queue.dart +++ b/lib/src/stream_queue.dart @@ -126,7 +126,7 @@ class StreamQueue { } } - /// Asks if the stream has any more events. + /// Whether the stream has any more events. /// /// Returns a future that completes with `true` if the stream has any /// more events, whether data or error. @@ -138,12 +138,10 @@ class StreamQueue { /// Another alternative is to use `take(1)` which returns either zero or /// one events. Future get hasNext { - if (!_isClosed) { - var hasNextRequest = _HasNextRequest(); - _addRequest(hasNextRequest); - return hasNextRequest.future; - } - throw _failClosed(); + _checkClosed(); + var hasNextRequest = _HasNextRequest(); + _addRequest(hasNextRequest); + return hasNextRequest.future; } /// Look at the next [count] data events without consuming them. @@ -152,13 +150,11 @@ class StreamQueue { /// If one of the next [count] events is an error, the returned future /// completes with this error, and the error is still left in the queue. Future> lookAhead(int count) { - if (count < 0) throw RangeError.range(count, 0, null, 'count'); - if (!_isClosed) { - var request = _LookAheadRequest(count); - _addRequest(request); - return request.future; - } - throw _failClosed(); + RangeError.checkNotNegative(count, 'count'); + _checkClosed(); + var request = _LookAheadRequest(count); + _addRequest(request); + return request.future; } /// Requests the next (yet unrequested) event from the stream. @@ -176,12 +172,10 @@ class StreamQueue { /// and they will be completed in the order they were requested, by the /// first events that were not consumed by previous requeusts. Future get next { - if (!_isClosed) { - var nextRequest = _NextRequest(); - _addRequest(nextRequest); - return nextRequest.future; - } - throw _failClosed(); + _checkClosed(); + var nextRequest = _NextRequest(); + _addRequest(nextRequest); + return nextRequest.future; } /// Looks at the next (yet unrequested) event from the stream. @@ -189,15 +183,13 @@ class StreamQueue { /// Like [next] except that the event is not consumed. /// If the next event is an error event, it stays in the queue. Future get peek { - if (!_isClosed) { - var nextRequest = _PeekRequest(); - _addRequest(nextRequest); - return nextRequest.future; - } - throw _failClosed(); + _checkClosed(); + var nextRequest = _PeekRequest(); + _addRequest(nextRequest); + return nextRequest.future; } - /// Returns a stream of all the remaning events of the source stream. + /// A stream of all the remaning events of the source stream. /// /// All requested [next], [skip] or [take] operations are completed /// first, and then any remaining events are provided as events of @@ -207,9 +199,7 @@ class StreamQueue { /// `rest` the caller may no longer request other events, like /// after calling [cancel]. Stream get rest { - if (_isClosed) { - throw _failClosed(); - } + _checkClosed(); var request = _RestRequest(this); _isClosed = true; _addRequest(request); @@ -232,13 +222,11 @@ class StreamQueue { /// then all events were succssfully skipped. If the value /// is greater than zero then the stream ended early. Future skip(int count) { - if (count < 0) throw RangeError.range(count, 0, null, 'count'); - if (!_isClosed) { - var request = _SkipRequest(count); - _addRequest(request); - return request.future; - } - throw _failClosed(); + RangeError.checkNotNegative(count, 'count'); + _checkClosed(); + var request = _SkipRequest(count); + _addRequest(request); + return request.future; } /// Requests the next [count] data events as a list. @@ -257,13 +245,11 @@ class StreamQueue { /// of data collected so far. That is, the returned /// list may have fewer than [count] elements. Future> take(int count) { - if (count < 0) throw RangeError.range(count, 0, null, 'count'); - if (!_isClosed) { - var request = _TakeRequest(count); - _addRequest(request); - return request.future; - } - throw _failClosed(); + RangeError.checkNotNegative(count, 'count'); + _checkClosed(); + var request = _TakeRequest(count); + _addRequest(request); + return request.future; } /// Requests a transaction that can conditionally consume events. @@ -285,7 +271,7 @@ class StreamQueue { /// /// ```dart /// /// Consumes all empty lines from the beginning of [lines]. - /// Future consumeEmptyLines(StreamQueue lines) async { + /// Future consumeEmptyLines(StreamQueue lines) async { /// while (await lines.hasNext) { /// var transaction = lines.startTransaction(); /// var queue = transaction.newQueue(); @@ -299,7 +285,7 @@ class StreamQueue { /// } /// ``` StreamQueueTransaction startTransaction() { - if (_isClosed) throw _failClosed(); + _checkClosed(); var request = _TransactionRequest(this); _addRequest(request); @@ -320,7 +306,7 @@ class StreamQueue { /// /// ```dart /// /// Consumes all empty lines from the beginning of [lines]. - /// Future consumeEmptyLines(StreamQueue lines) async { + /// Future consumeEmptyLines(StreamQueue lines) async { /// while (await lines.hasNext) { /// // Consume a line if it's empty, otherwise return. /// if (!await lines.withTransaction( @@ -330,23 +316,24 @@ class StreamQueue { /// } /// } /// ``` - Future withTransaction(Future Function(StreamQueue) callback) { + Future withTransaction( + Future Function(StreamQueue) callback) async { var transaction = startTransaction(); - /// Avoid async/await to ensure that [startTransaction] is called - /// synchronously and so ends up in the right place in the request queue. var queue = transaction.newQueue(); - return callback(queue).then((result) { - if (result) { - transaction.commit(queue); - } else { - transaction.reject(); - } - return result; - }, onError: (Object error) { + bool result; + try { + result = await callback(queue); + } on Object { transaction.commit(queue); - throw error; - }); + rethrow; + } + if (result) { + transaction.commit(queue); + } else { + transaction.reject(); + } + return result; } /// Passes a copy of this queue to [callback], and updates this queue to match @@ -394,13 +381,13 @@ class StreamQueue { /// stream had closed. /// /// The returned future completes with the result of calling - /// `cancel`. + /// `cancel` on the subscription to the source stream. /// /// After calling `cancel`, no further events can be requested. /// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel] /// may be called again. Future? cancel({bool immediate = false}) { - if (_isClosed) throw _failClosed(); + _checkClosed(); _isClosed = true; if (!immediate) { @@ -529,12 +516,9 @@ class StreamQueue { // ------------------------------------------------------------------ // Internal helper methods. - /// Returns an error for when a request is made after cancel. - /// - /// Returns a [StateError] with a message saying that either - /// [cancel] or [rest] have already been called. - Error _failClosed() { - return StateError('Already cancelled'); + /// Throws an error if [cancel] or [rest] have already been called. + void _checkClosed() { + if (_isClosed) throw StateError('Already cancelled'); } /// Adds a new request to the queue. From f06d84b818a5c87161e8bd40fbee68b6f62686f1 Mon Sep 17 00:00:00 2001 From: Nate Bosch Date: Wed, 21 Apr 2021 07:41:54 -0700 Subject: [PATCH 2/3] rename to _checkNotClosed, use catch(_) --- lib/src/stream_queue.dart | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart index b139b3f..9c7f5eb 100644 --- a/lib/src/stream_queue.dart +++ b/lib/src/stream_queue.dart @@ -138,7 +138,7 @@ class StreamQueue { /// Another alternative is to use `take(1)` which returns either zero or /// one events. Future get hasNext { - _checkClosed(); + _checkNotClosed(); var hasNextRequest = _HasNextRequest(); _addRequest(hasNextRequest); return hasNextRequest.future; @@ -151,7 +151,7 @@ class StreamQueue { /// completes with this error, and the error is still left in the queue. Future> lookAhead(int count) { RangeError.checkNotNegative(count, 'count'); - _checkClosed(); + _checkNotClosed(); var request = _LookAheadRequest(count); _addRequest(request); return request.future; @@ -172,7 +172,7 @@ class StreamQueue { /// and they will be completed in the order they were requested, by the /// first events that were not consumed by previous requeusts. Future get next { - _checkClosed(); + _checkNotClosed(); var nextRequest = _NextRequest(); _addRequest(nextRequest); return nextRequest.future; @@ -183,7 +183,7 @@ class StreamQueue { /// Like [next] except that the event is not consumed. /// If the next event is an error event, it stays in the queue. Future get peek { - _checkClosed(); + _checkNotClosed(); var nextRequest = _PeekRequest(); _addRequest(nextRequest); return nextRequest.future; @@ -199,7 +199,7 @@ class StreamQueue { /// `rest` the caller may no longer request other events, like /// after calling [cancel]. Stream get rest { - _checkClosed(); + _checkNotClosed(); var request = _RestRequest(this); _isClosed = true; _addRequest(request); @@ -223,7 +223,7 @@ class StreamQueue { /// is greater than zero then the stream ended early. Future skip(int count) { RangeError.checkNotNegative(count, 'count'); - _checkClosed(); + _checkNotClosed(); var request = _SkipRequest(count); _addRequest(request); return request.future; @@ -246,7 +246,7 @@ class StreamQueue { /// list may have fewer than [count] elements. Future> take(int count) { RangeError.checkNotNegative(count, 'count'); - _checkClosed(); + _checkNotClosed(); var request = _TakeRequest(count); _addRequest(request); return request.future; @@ -285,7 +285,7 @@ class StreamQueue { /// } /// ``` StreamQueueTransaction startTransaction() { - _checkClosed(); + _checkNotClosed(); var request = _TransactionRequest(this); _addRequest(request); @@ -324,7 +324,7 @@ class StreamQueue { bool result; try { result = await callback(queue); - } on Object { + } catch(_) { transaction.commit(queue); rethrow; } @@ -387,7 +387,7 @@ class StreamQueue { /// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel] /// may be called again. Future? cancel({bool immediate = false}) { - _checkClosed(); + _checkNotClosed(); _isClosed = true; if (!immediate) { @@ -517,7 +517,7 @@ class StreamQueue { // Internal helper methods. /// Throws an error if [cancel] or [rest] have already been called. - void _checkClosed() { + void _checkNotClosed() { if (_isClosed) throw StateError('Already cancelled'); } From 8e1c11fcfd7560e1856f59013f21dd4dea869b82 Mon Sep 17 00:00:00 2001 From: Nate Bosch Date: Wed, 21 Apr 2021 07:43:25 -0700 Subject: [PATCH 3/3] dartfmt --- lib/src/stream_queue.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart index 9c7f5eb..f9902b9 100644 --- a/lib/src/stream_queue.dart +++ b/lib/src/stream_queue.dart @@ -324,7 +324,7 @@ class StreamQueue { bool result; try { result = await callback(queue); - } catch(_) { + } catch (_) { transaction.commit(queue); rethrow; }