diff --git a/CHANGELOG.md b/CHANGELOG.md index 5809285..1984b28 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ * Add a `collectBytes` function which collects list-of-byte events into a single byte list. +* Fix a bug where rejecting a `StreamQueueTransaction` would throw a + `StateError` if `StreamQueue.rest` had been called on one of its child queues. + ## 1.12.0 * Add an `AsyncCache` class that caches asynchronous operations for a period of diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart index 14eb7a0..575cc8a 100644 --- a/lib/src/stream_queue.dart +++ b/lib/src/stream_queue.dart @@ -548,6 +548,7 @@ class _StreamQueue extends StreamQueue { if (_isDone) { return new Stream.empty(); } + _isDone = true; if (_subscription == null) { return _sourceStream; @@ -555,7 +556,6 @@ class _StreamQueue extends StreamQueue { var subscription = _subscription; _subscription = null; - _isDone = true; var wasPaused = subscription.isPaused; var result = new SubscriptionStream(subscription); @@ -960,7 +960,7 @@ class _HasNextRequest implements _EventRequest { /// Request for a [StreamQueue.startTransaction] call. /// /// This request isn't complete until the user calls -/// [StreamQueueTransaction.commit] or [StreamQueue.rejectTransaction], at which +/// [StreamQueueTransaction.commit] or [StreamQueueTransaction.reject], at which /// point it manually removes itself from the request queue and calls /// [StreamQueue._updateRequests]. class _TransactionRequest implements _EventRequest { diff --git a/pubspec.yaml b/pubspec.yaml index a3e0c90..0e703f7 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: async -version: 1.13.0 +version: 1.13.0-dev author: Dart Team description: Utility functions and classes related to the 'dart:async' library. homepage: https://www.github.com/dart-lang/async diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart index 9668f0a..7d2fdd4 100644 --- a/test/stream_queue_test.dart +++ b/test/stream_queue_test.dart @@ -863,6 +863,30 @@ main() { transaction.reject(); }); + // Regression test. + test("pending child rest requests emit no more events", () async { + var controller = new StreamController(); + var events = new StreamQueue(controller.stream); + var transaction = events.startTransaction(); + var queue = transaction.newQueue(); + + // This should emit no more events after the transaction is rejected. + queue.rest.listen(expectAsync1((_) {}, count: 3), + onDone: expectAsync0(() {}, count: 0)); + + controller.add(1); + controller.add(2); + controller.add(3); + await flushMicrotasks(); + + transaction.reject(); + await flushMicrotasks(); + + // These shouldn't affect the result of `queue.rest.toList()`. + controller.add(4); + controller.add(5); + }); + test("child requests' cancel() may still be called explicitly", () async { transaction.reject(); await queue1.cancel();