Skip to content
This repository was archived by the owner on Oct 17, 2024. It is now read-only.

Add StreamQueue.startTransactions(). #9

Merged
merged 3 commits into from
Jan 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
* Add an `AsyncCache` class that caches asynchronous operations for a period of
time.

* Add `StreamQueue.startTransaction()` and `StreamQueue.startTransactions()`.
These allow users to conditionally consume events based on their values.

## 1.11.3

* Fix strong-mode warning against the signature of Future.then
Expand Down
210 changes: 199 additions & 11 deletions lib/src/stream_queue.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
import 'dart:async';
import 'dart:collection';

import 'package:collection/collection.dart';

import "result.dart";
import "subscription_stream.dart";
import "stream_completer.dart";
import "stream_splitter.dart";

/// An asynchronous pull-based interface for accessing stream events.
///
Expand Down Expand Up @@ -86,7 +89,7 @@ abstract class StreamQueue<T> {
bool _isClosed = false;

/// Queue of events not used by a request yet.
final Queue<Result> _eventQueue = new Queue();
final QueueList<Result> _eventQueue = new QueueList();

/// Queue of pending requests.
///
Expand Down Expand Up @@ -210,6 +213,44 @@ abstract class StreamQueue<T> {
throw _failClosed();
}

/// Requests a transaction that can conditionally consume events.
///
/// The transaction can create copies of this queue at the current position
/// using [StreamQueueTransaction.newQueue]. Each of these queues is
/// independent of one another and of the parent queue. The transaction
/// finishes when one of two methods is called:
///
/// * [StreamQueueTransaction.commit] updates the parent queue's position to
/// match that of one of the copies.
///
/// * [StreamQueueTransaction.reject] causes the parent queue to continue as
/// though [startTransaction] hadn't been called.
///
/// Until the transaction finishes, this queue won't emit any events.
///
/// ```dart
/// /// Consumes all empty lines from the beginning of [lines].
/// Future consumeEmptyLines(StreamQueue<String> lines) {
/// while (await lines.hasNext) {
/// var transaction = lines.startTransaction();
/// var queue = transaction.newQueue();
/// if ((await queue.next).isNotEmpty) {
/// transaction.reject();
/// return;
/// } else {
/// transaction.accept(queue);
/// }
/// }
/// }
/// ```
StreamQueueTransaction<T> startTransaction() {
if (_isClosed) throw _failClosed();

var request = new _TransactionRequest(this);
_addRequest(request);
return request.transaction;
}

/// Cancels the underlying event source.
///
/// If [immediate] is `false` (the default), the cancel operation waits until
Expand Down Expand Up @@ -361,7 +402,7 @@ class _StreamQueue<T> extends StreamQueue<T> {
}

void _ensureListening() {
assert(!_isDone);
if (_isDone) return;
if (_subscription == null) {
_subscription =
_sourceStream.listen(
Expand Down Expand Up @@ -407,6 +448,125 @@ class _StreamQueue<T> extends StreamQueue<T> {
}
}

/// A transaction on a [StreamQueue], created by [StreamQueue.startTransaction].
///
/// Copies of the parent queue may be created using [newQueue]. Calling [commit]
/// moves the parent queue to a copy's position, and calling [reject] causes it
/// to continue as though [StreamQueue.startTransaction] was never called.
class StreamQueueTransaction<T> {
/// The parent queue on which this transaction is active.
final StreamQueue<T> _parent;

/// The splitter that produces copies of the parent queue's stream.
final StreamSplitter<T> _splitter;

/// Queues created using [newQueue].
final _queues = new Set<_TransactionStreamQueue>();

/// Whether [commit] has been called.
var _committed = false;

/// Whether [reject] has been called.
var _rejected = false;

StreamQueueTransaction._(this._parent, Stream<T> source)
: _splitter = new StreamSplitter(source);

/// Creates a new copy of the parent queue.
///
/// This copy starts at the parent queue's position when
/// [StreamQueue.startTransaction] was called. Its position can be committed
/// to the parent queue using [commit].
StreamQueue<T> newQueue() {
var queue = new _TransactionStreamQueue(_splitter.split());
_queues.add(queue);
return queue;
}

/// Commits a queue created using [newQueue].
///
/// The parent queue's position is updated to be the same as [queue]'s.
/// Further requests on all queues created by this transaction, including
/// [queue], will complete as though [cancel] were called with `immediate:
/// true`.
///
/// Throws a [StateError] if [commit] or [reject] have already been called, or
/// if there are pending requests on [queue].
void commit(StreamQueue<T> queue) {
_assertActive();
if (!_queues.contains(queue)) {
throw new ArgumentError("Queue doesn't belong to this transaction.");
} else if (queue._requestQueue.isNotEmpty) {
throw new StateError("A queue with pending requests can't be committed.");
}
_committed = true;

// Remove all events from the parent queue that were consumed by the
// child queue.
var eventsConsumed = (queue as _TransactionStreamQueue)._eventsReceived -
queue._eventQueue.length;
for (var j = 0; j < eventsConsumed; j++) {
_parent._eventQueue.removeFirst();
}

_done();
}

/// Rejects this transaction without updating the parent queue.
///
/// The parent will continue as though [StreamQueue.startTransaction] hadn't
/// been called. Further requests on all queues created by this transaction
/// will complete as though [cancel] were called with `immediate: true`.
///
/// Throws a [StateError] if [commit] or [reject] have already been called.
void reject() {
_assertActive();
_rejected = true;
_done();
}

// Cancels all [_queues], removes the [_TransactionRequest] from [_parent]'s
// request queue, and runs the next request.
void _done() {
_splitter.close();
for (var queue in _queues) {
queue._cancel();
}

assert((_parent._requestQueue.first as _TransactionRequest)
.transaction == this);
_parent._requestQueue.removeFirst();
_parent._updateRequests();
}

/// Throws a [StateError] if [accept] or [reject] has already been called.
void _assertActive() {
if (_committed) {
throw new StateError("This transaction has already been accepted.");
} else if (_rejected) {
throw new StateError("This transaction has already been rejected.");
}
}
}

/// A [StreamQueue] that belongs to a [StreamQueueTransaction].
class _TransactionStreamQueue<T> extends _StreamQueue<T> {
/// The total number of events received by this queue, including events that
/// haven't yet been consumed by requests.
///
/// This is used to fast-forward the parent queue if this transaction is
/// accepted.
var _eventsReceived = 0;

_TransactionStreamQueue(Stream<T> sourceStream) : super(sourceStream);

/// Modifies [StreamQueue._addResult] to count the total number of events that
/// have been passed to this transaction.
void _addResult(Result result) {
_eventsReceived++;
super._addResult(result);
}
}

/// Request object that receives events when they arrive, until fulfilled.
///
Expand Down Expand Up @@ -443,7 +603,7 @@ abstract class _EventRequest<T> {
/// If the function returns `false` when the stream has already closed
/// ([isDone] is true), then the request must call
/// [StreamQueue._updateRequests] itself when it's ready to continue.
bool update(Queue<Result<T>> events, bool isDone);
bool update(QueueList<Result<T>> events, bool isDone);
}

/// Request for a [StreamQueue.next] call.
Expand All @@ -458,7 +618,7 @@ class _NextRequest<T> implements _EventRequest<T> {

Future<T> get future => _completer.future;

bool update(Queue<Result<T>> events, bool isDone) {
bool update(QueueList<Result<T>> events, bool isDone) {
if (events.isNotEmpty) {
events.removeFirst().complete(_completer);
return true;
Expand Down Expand Up @@ -491,7 +651,7 @@ class _SkipRequest<T> implements _EventRequest<T> {
/// The future completed when the correct number of events have been skipped.
Future<int> get future => _completer.future;

bool update(Queue<Result<T>> events, bool isDone) {
bool update(QueueList<Result<T>> events, bool isDone) {
while (_eventsToSkip > 0) {
if (events.isEmpty) {
if (isDone) break;
Expand Down Expand Up @@ -529,7 +689,7 @@ class _TakeRequest<T> implements _EventRequest<T> {
/// The future completed when the correct number of events have been captured.
Future<List<T>> get future => _completer.future;

bool update(Queue<Result<T>> events, bool isDone) {
bool update(QueueList<Result<T>> events, bool isDone) {
while (_list.length < _eventsToTake) {
if (events.isEmpty) {
if (isDone) break;
Expand All @@ -556,8 +716,6 @@ class _TakeRequest<T> implements _EventRequest<T> {
class _CancelRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by the `cancel` call.
final _completer = new Completer();

/// The [StreamQueue] object that has this request queued.
///
/// When the event is completed, it needs to cancel the active subscription
/// of the `StreamQueue` object, if any.
Expand All @@ -568,7 +726,7 @@ class _CancelRequest<T> implements _EventRequest<T> {
/// The future completed when the cancel request is completed.
Future get future => _completer.future;

bool update(Queue<Result<T>> events, bool isDone) {
bool update(QueueList<Result<T>> events, bool isDone) {
if (_streamQueue._isDone) {
_completer.complete();
} else {
Expand Down Expand Up @@ -599,7 +757,7 @@ class _RestRequest<T> implements _EventRequest<T> {
/// The stream which will contain the remaining events of [_streamQueue].
Stream<T> get stream => _completer.stream;

bool update(Queue<Result<T>> events, bool isDone) {
bool update(QueueList<Result<T>> events, bool isDone) {
if (events.isEmpty) {
if (_streamQueue._isDone) {
_completer.setEmpty();
Expand Down Expand Up @@ -632,7 +790,7 @@ class _HasNextRequest<T> implements _EventRequest<T> {

Future<bool> get future => _completer.future;

bool update(Queue<Result<T>> events, bool isDone) {
bool update(QueueList<Result<T>> events, bool isDone) {
if (events.isNotEmpty) {
_completer.complete(true);
return true;
Expand All @@ -644,3 +802,33 @@ class _HasNextRequest<T> implements _EventRequest<T> {
return false;
}
}

/// Request for a [StreamQueue.startTransaction] call.
///
/// This request isn't complete until the user calls
/// [StreamQueueTransaction.commit] or [StreamQueue.rejectTransaction], at which
/// point it manually removes itself from the request queue and calls
/// [StreamQueue._updateRequests].
class _TransactionRequest<T> implements _EventRequest<T> {
/// The transaction created by this request.
StreamQueueTransaction<T> get transaction => _transaction;
StreamQueueTransaction<T> _transaction;

/// The controller that passes events to [transaction].
final _controller = new StreamController<T>(sync: true);

/// The number of events passed to [_controller] so far.
var _eventsSent = 0;

_TransactionRequest(StreamQueue<T> parent) {
_transaction = new StreamQueueTransaction._(parent, _controller.stream);
}

bool update(QueueList<Result<T>> events, bool isDone) {
while (_eventsSent < events.length) {
events[_eventsSent++].addTo(_controller);
}
if (isDone && !_controller.isClosed) _controller.close();
return false;
}
}
Loading