Skip to content
This repository has been archived by the owner on Oct 17, 2024. It is now read-only.

Add StreamQueue.startTransactions(). #9

Merged
merged 3 commits into from
Jan 9, 2017
Merged

Add StreamQueue.startTransactions(). #9

merged 3 commits into from
Jan 9, 2017

Conversation

nex3
Copy link
Contributor

@nex3 nex3 commented Dec 28, 2016

This is important for advanced pull-based stream manipulation. It allows
users to express logic of the form "consume the next events if they
match this predicate".

@nex3 nex3 requested a review from lrhn December 28, 2016 23:14
This is important for advanced pull-based stream manipulation. It allows
users to express logic of the form "consume the next events if they
match this predicate".
Copy link
Contributor

@lrhn lrhn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's OK, the comments are just suggestions.

///
/// Throws a [StateError] if [accept] or [reject] has already been called, or
/// if there are pending requests on this queue.
void accept() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A TransactionStreamQueue is itself a StreamQueue, so you can start another transaction on it. What happens if you call accept while a transaction is open?
That is:

var sq = ... some stream queue ...;
var t1 = sq.startTransaction();
await t1.next();
var t2 = t1.startTransaction();
await t2.next();
t1.accept();
// I guess this moves sq past the first event, then makes it stop at the 
// start transaction request, and t2 is still active. 
await t2.next();
t2.accept();
// Moves sq past two more events.

(Or is this prevented because the startTransaction request would make the request-queue non-empty?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Or is this prevented because the startTransaction request would make the request-queue non-empty?)

Yep. An open transaction counts as a pending request.

/// }
/// }
/// ```
List<TransactionStreamQueue> startTransactions(int count) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not particularly happy about the TransactionStreamQueue. I was never happy about my own designs for this feature either :)

There is no object representing the transaction itself, only a number of streams that each represent part of it.

Would it make sense to have a StreamQueueTransaction object returned by startTransaction():

class StreamQueueTransaction {
  /// Create a new queue running under the transaction.
  ///
  /// The queue gets the events of the original stream, and must either be 
  /// [reject]ed or [commit]ted.
  /// Cannot be called after [cancel] or [commit].
  StreamQueue newQueue();
  /// Forwards the original queue to where [queue] is.
  ///
  /// The [queue] must have been returned by [newQueue] and not 
  /// have been rejected.
  /// If [queue] has an open transaction, that transaction effectively becomes
  /// a transaction of the original stream, and can continue being used.
  /// Rejects all other open queues of the transaction.
  /// Cannot be called after [cancel] has been called.
  void commit(StreamQueue queue);
  /// Cancels the transaction.
  ///
  /// Continues the original stream as if the transaction had consumed no events.
  /// Cannot be called after [commit] has been called.
  /// Rejects all open queues of the transaction.
  void cancel();
  /// The queue, which must have been created by [newQueue], is rejected.
  ///
  /// The queue will not receive any further events.
  /// If [cancelIfLast] is true and there are no further open streams in the 
  /// transaction, the transaction is [cancel]'ed.
  void reject(StreamQueue queue, {cancelIfLast: false});
}

That way you don't need to extend StreamQueue, and all the transaction logic is kept in a single object.
Also, you don't need to know the number of streams when creating the transaction (that might be the only real advantage).

Since there is a strong connection between the stream state and whether you can call accept, keeping them together might make sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that. It'll be somewhat more verbose, but transactions are a pretty low-level construct and hopefully people will mostly use them through utility APIs.

With a dedicated transaction object, I don't think you even need the ability to reject individual queues. The user can just ignore them once they're no longer relevant—or call queue.cancel().

/// }
/// }
/// ```
List<TransactionStreamQueue> startTransactions(int count) {
Copy link
Contributor

@lrhn lrhn Jan 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plural name suggests that each stream is an individual transaction, but they are all part of the same transaction (since you can only commit one of them). So maybe startParallelTransaction(int streamCount) (because startTransaction is taken below), or combine the two and make [int count = 1] optional.


if (_requestQueue.isNotEmpty) {
throw new StateError(
"A transaction with pending requests can't be accepted.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to make commit a request as well? So, you can do:

var sq = ...;
var s1 = sq.startTransaction();
int count = s1.next();
if (count < 0) {
  s1.reject();
} else {
  s1.skip(count);
  s1.commit();  // Handled after the skips.
}
print(await sq.next());

It's not a big difference - I could just add an await before the skip call, but as a request it avoids having this test and error message.


_TransactionRequest(this._parent, int count)
: transactions = new List(count) {
var splitter = new StreamSplitter(_controller.stream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using a StreamSplitter, could you just have made _controller.stream a broadcast stream?
(Maybe not, if StreamSplitter pauses when all the splits are paused - we probably want that).

bool update(QueueList<Result<T>> events, bool isDone) {
while (_eventsSent < events.length) {
var result = events[_eventsSent++];
if (result.isValue) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just: result.addTo(_controller);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


if (_requestQueue.isNotEmpty) {
throw new StateError(
"A transaction with pending requests can't be accepted.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively (because who doesn't like alternatives), would it make sense to commit with requests still pending, even if commit is not a request?
Something like the example above, where you ask to skip some events, but you don't really need to wait for them to arrive, you just want to move the requests into the outer queue's requests, so it'll wait for them there.

That would make sense, but obviously the requests must be built to withstand being moved (they shouldn't see a difference in events, but if they have a reference to the queue itself, then they are not movable, and I think that's the case for at least some of them. Maybe as a later update, if it turns out to be needed).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with the "just throw" solution because I didn't think it was really clear which of the several possible behaviors was the most obvious or most useful, and I couldn't come up with any really compelling use-cases that weren't supportable in other ways. If we throw for now it leaves us open to choose a different behavior in the future if it becomes clear that one is correct.

@lrhn
Copy link
Contributor

lrhn commented Jan 9, 2017

LGTM!

@nex3 nex3 merged commit 2056226 into master Jan 9, 2017
@nex3 nex3 deleted the transaction branch January 9, 2017 22:39
mosuem pushed a commit to dart-lang/core that referenced this pull request Oct 14, 2024
This is important for advanced pull-based stream manipulation. It allows
users to express logic of the form "consume the next events if they
match this predicate".
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Development

Successfully merging this pull request may close these issues.

3 participants