Skip to content

Commit

Permalink
Fix a StreamQueue bug (dart-archive/async#44)
Browse files Browse the repository at this point in the history
Previously, StreamQueueTransaction assumed that its request was the
oldest request in the queue at the time at which it was committed or
rejected. This assumption wasn't always correct, and this change
avoids making it.
  • Loading branch information
nex3 authored Jan 5, 2018
1 parent 48c0678 commit 4596b8f
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 9 deletions.
5 changes: 5 additions & 0 deletions pkgs/async/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 2.0.3

* Fix a bug in `StreamQueue.startTransaction()` and related methods when
rejecting a transaction that isn't the oldest request in the queue.

## 2.0.2

* Add support for Dart 2.0 library changes to class `Timer`.
Expand Down
16 changes: 9 additions & 7 deletions pkgs/async/lib/src/stream_queue.dart
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ abstract class StreamQueue<T> {

// ------------------------------------------------------------------
// Methods that may be called from the request implementations to
// control the even stream.
// control the event stream.

/// Matches events with requests.
///
Expand Down Expand Up @@ -643,11 +643,13 @@ class StreamQueueTransaction<T> {
for (var queue in _queues) {
queue._cancel();
}

assert((_parent._requestQueue.first as _TransactionRequest).transaction ==
this);
_parent._requestQueue.removeFirst();
_parent._updateRequests();
// If this is the active request in the queue, mark it as finished.
var currentRequest = _parent._requestQueue.first;
if (currentRequest is _TransactionRequest &&
currentRequest.transaction == this) {
_parent._requestQueue.removeFirst();
_parent._updateRequests();
}
}

/// Throws a [StateError] if [accept] or [reject] has already been called.
Expand Down Expand Up @@ -975,6 +977,6 @@ class _TransactionRequest<T> implements _EventRequest<T> {
events[_eventsSent++].addTo(_controller);
}
if (isDone && !_controller.isClosed) _controller.close();
return false;
return transaction._committed || _transaction._rejected;
}
}
2 changes: 1 addition & 1 deletion pkgs/async/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: async
version: 2.0.2
version: 2.0.3
author: Dart Team <misc@dartlang.org>
description: Utility functions and classes related to the 'dart:async' library.
homepage: https://www.github.com/dart-lang/async
Expand Down
39 changes: 38 additions & 1 deletion pkgs/async/test/stream_queue_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -904,9 +904,27 @@ main() {
expect(transaction.reject, throwsStateError);
expect(() => transaction.commit(queue1), throwsStateError);
});

test("before the transaction emits any events, does nothing", () async {
var controller = new StreamController();
var events = new StreamQueue(controller.stream);

// Queue a request before the transaction, but don't let it complete
// until we're done with the transaction.
expect(events.next, completion(equals(1)));
events.startTransaction().reject();
expect(events.next, completion(equals(2)));

await flushMicrotasks();
controller.add(1);
await flushMicrotasks();
controller.add(2);
await flushMicrotasks();
controller.close();
});
});

group("when committed,", () {
group("when committed", () {
test("further original requests use the committed state", () async {
expect(await queue1.next, 2);
await flushMicrotasks();
Expand Down Expand Up @@ -963,6 +981,25 @@ main() {
expect(transaction.reject, throwsStateError);
expect(() => transaction.commit(queue1), throwsStateError);
});

test("before the transaction emits any events, does nothing", () async {
var controller = new StreamController();
var events = new StreamQueue(controller.stream);

// Queue a request before the transaction, but don't let it complete
// until we're done with the transaction.
expect(events.next, completion(equals(1)));
var transaction = events.startTransaction();
transaction.commit(transaction.newQueue());
expect(events.next, completion(equals(2)));

await flushMicrotasks();
controller.add(1);
await flushMicrotasks();
controller.add(2);
await flushMicrotasks();
controller.close();
});
});
});

Expand Down

0 comments on commit 4596b8f

Please sign in to comment.