Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider to make second invocation of streamQueue.hasNext be postponed concluding the result until the first invocation of q.next , unless the stream is closed. #356

Closed
Cat-sushi opened this issue Oct 18, 2022 · 6 comments
Labels
closed-not-planned Closed as we don't intend to take action on the reported issue package:async

Comments

@Cat-sushi
Copy link

Cat-sushi commented Oct 18, 2022

First of all. it is very unintuitive that await q.next just after await q.hasNext returning true might fail.

2nd, there is no way to safely consume events in a StreamQueue one by one without additional useless List from await q.take(1).

3rd, in case of sc = StreamController(); q = StreamQueue(sc.stream), sc.add(e) awakes only one caller of await q.hasNext and the succeeding await q.next always succeeds. It is inconsistent with q = StreamQueue(Stream.fromIterable(list))

Finally, it is a breaking chang unintentionally introduced by the reimplementation of async in SDK 2.18, I guess.

A possible solution I think is that the second q.hasNext should proactively enter the event loop, even if the stream already has a event.

See dart-lang/sdk#50217 and #355, as well.

@lrhn
Copy link
Member

lrhn commented Oct 19, 2022

It's a queue. If you interleave operations between unrelated asynchronous consumers, then it's not safe. Neither is the normal Queue.

import "dart:collection";
var queue = Queue.from([1, 2, 3]);

Future<int?> fetch() async {
  await something();
  if (queue.isNotEmpty) {
    await something();
    var result = queue.removeFirst();
    await something();
    return result;
  }
  return null;
}

Future<void> something() async {}

void main() async {
  // Fetch up to four values.
  var values = Future.wait([fetch(), fetch(), fetch(), fetch()]);
}

If you break up the isEmpty and removeFirst checks, and allows other operations to happen between them, then it's not safe.
The StreamQueue.hasNext is itself an asynchronous operation.

We probably can change the behavior so that hasNext provides the result immediately after the check, and not just in a later microtask, but it won't change anything. Any already scheduled hasNext requests will still happen before anything done in response to the the first hasNext request completing.
It's a queue, you can't skip ahead and do a next before the already scheduled hasNexts.

It's not true that the stream controller only wakes one consumer.

import "dart:async";
import "package:async/async.dart";

void main() async {
  var c = StreamController();
  var q = StreamQueue(c.stream);
  q.hasNext.then((n) {
    print("1: $n");
  });
  q.hasNext.then((n) {
    print("2: $n");
  });
  q.next.then((v) {
    print("3: $v");
  });
  q.hasNext.then((n) {
    print("4: $n");
  });
  c.add("event");
  c.close();
}

This prints 1: true, 2: true, 3: event, and 4: false. It completes both hasNext requests before the next request. It has to, because of the queue nature of the requests.

If anything has changed, it's probably the timing of Stream.fromIterable, not anything related to StreamQueue. If that breaks your code, then it's already fragile and depending on timing. The Stream.fromIterable was changed, because it allowed us to simplify the event scheduling in stream subscriptions.

The suggestion here is to postpone a second hasNext request, after another hasNext has already completed, until another non-hasNext request has completed. (Maybe only if it consumes an event?)

It doesn't solve the actual problem. If someone does hasNext, someone else does take(1) on the same queue, and the first hasNext handler responds by doing next, then a stream of length 1 will still cause an error.
The idea only works if the only operations on the queue are hasNext followed by next.

The request queue is still a queue. What you really need is an atomic hasNext/next operation.
We could add tryNext. It would work like take(1), only slightly worse if the event type is nullable.
I'd rather document that take(1) is the recommended way to atomically get an event if one exists.

We try not to change timing of asynchronous operations, because it always breaks some fragile code. but we have never actually promised that current scheduling and ordering of asynchronous events. Every part of the specification of asynchrony contains allows some leeway by using phrases like "in a later microtask".

@Cat-sushi
Copy link
Author

  q.hasNext.then((n) {
    print("4: $n");
  });

4: false
Who can understand such insane behavior!

@lrhn
Copy link
Member

lrhn commented Oct 19, 2022

That hasNext evaluates to false after the previously enqueued next request has consumed the only event of the source stream, so the hasNext sees the stream close? Seems sane to me.

The class may need better documentation, but the underlying model is actually simple and predictable.

@Cat-sushi
Copy link
Author

I myself understand the behavior, but I think almost nobody can understand such behavior.

@Cat-sushi
Copy link
Author

Cat-sushi commented Oct 19, 2022

Sorry, I mistakenly said d: false is insane.
I know hasNext usually return false.
My point was that, await q.nest just after await q.hasNext returning true always succeeds, when sc = StreamController(); q = StreamQueue(sc.stream), and the same code was called multiply.(without .then())
It doesn't have a timing issue.

@lrhn lrhn added the closed-not-planned Closed as we don't intend to take action on the reported issue label Apr 19, 2023
@lrhn
Copy link
Member

lrhn commented Apr 19, 2023

Not planning any change here.
Requests are enqueued. The only safe way to change the current behavior is to disallow queuing new requests while an existing request is being processed. Basically, it'd stop being a queue.

That's a different class, which can also exist (or can be implemented as a wrapper around the normal StreamQueue), but it's not something we'll change the existing class to.

@lrhn lrhn closed this as completed Apr 19, 2023
@mosuem mosuem transferred this issue from dart-archive/async Oct 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
closed-not-planned Closed as we don't intend to take action on the reported issue package:async
Projects
None yet
Development

No branches or pull requests

3 participants