Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StreamQueue throws 'Bad state: No elements' unexpectedly. #50217

Closed
Cat-sushi opened this issue Oct 16, 2022 · 13 comments
Closed

StreamQueue throws 'Bad state: No elements' unexpectedly. #50217

Cat-sushi opened this issue Oct 16, 2022 · 13 comments
Labels
closed-as-intended Closed as the reported issue is expected behavior needs-info We need additional information from the issue author (auto-closed after 14 days if no response)

Comments

@Cat-sushi
Copy link

Cat-sushi commented Oct 16, 2022

Dart SDK version: 2.18.2 (stable) (Unknown timestamp) on "linux_x64"
AOT compiled.

There are 6 sendReceve invocations, and 3 of them throws "Bad state: No elements" at invocation await queries.next.
There is no await between await queries.hasNext and await queries.next.

VM mode crashes for another or same reason of #50082.

  var queryList = ['A', 'B', 'C'];
  var queries = StreamQueue<String>(Stream.fromIterable(queryList));

  ...

  Future<void> sendReceve() async {
    while (await queries.hasNext) {
      var ix = ixS;
      ixS++;
      var query = ''; // TODO
      try {
        query = await queries.next; // TODO sometimes unexpectedly throws 'Bad state: No elements'
      } catch (e, s) {
        print(e);
        print(s);
      }
      var client = await serverPool.next;
      var result = await client.fmatch(query);
      serverPoolController.add(client);
      results[ix] = result;
      printResultsInOrder();
    }
  }
Bad state: No elements
#0      _NextRequest.update (package:async/src/stream_queue.dart:695)
#1      StreamQueue._updateRequests (package:async/src/stream_queue.dart:419)
#2      StreamQueue._close (package:async/src/stream_queue.dart:513)
#3      StreamQueue._ensureListening.<anonymous closure> (package:async/src/stream_queue.dart:481)
#4      _RootZone.runGuarded (dart:async/zone.dart:1574)
#5      _BufferingStreamSubscription._sendDone.sendDone (dart:async/stream_impl.dart:392)
#6      _BufferingStreamSubscription._sendDone (dart:async/stream_impl.dart:402)
#7      _BufferingStreamSubscription._close (dart:async/stream_impl.dart:291)
#8      _MultiStreamController.closeSync (dart:async/stream_impl.dart:1058)
#9      new Stream.fromIterable.<anonymous closure>.next (dart:async/stream.dart:383)
#10     _microtaskLoop (dart:async/schedule_microtask.dart:40)
#11     _startMicrotaskLoop (dart:async/schedule_microtask.dart:49)
#12     _runPendingImmediateCallback (dart:isolate-patch/isolate_patch.dart:122)
#13     _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:193)
@Cat-sushi
Copy link
Author

It looks await queries.hasNext has false positive.

@Cat-sushi
Copy link
Author

I haven't reproduce it with SDK 2.17 or earlier, but I suspect that it is related with the following change.
sdk/CHANGELOG.md at main · dart-lang/sdk

Haven't package:async followed this change?
Is it a issue which should be posted the package repo?

@mraleph
Copy link
Member

mraleph commented Oct 17, 2022

Do you have multiple loops

while (await queries.hasNext) {
  await queries.next;
}

going on concurrently on the same queries object?

@mraleph mraleph added the needs-info We need additional information from the issue author (auto-closed after 14 days if no response) label Oct 17, 2022
@Cat-sushi
Copy link
Author

Yes, it does.
sendReceve() is called 6 times at the same time.

@mraleph
Copy link
Member

mraleph commented Oct 17, 2022

Then I think this is expected behaviour. The code you have written is inherently racy. There is no atomicity guarantees: between hasNext future completing and resuming some other code might race and consume events from the queue.

/cc @lrhn to confirm

@Cat-sushi
Copy link
Author

I can't believe so, because the API reference doesn't sa so, and there is no await between await queries.hasNext and await queries.next.
I want the API reference to be updated, at least.

@Cat-sushi
Copy link
Author

For your information, in case that the queue length = 6 * n + α (n > 1 and 6 > α > 0), it works well.

@lrhn
Copy link
Member

lrhn commented Oct 17, 2022

What @mraleph says. There being no await between the await queries.hasNext and await queries.next is not important because having multiple await queries.hasNext means that all of them get triggered by the same element.

It's not even a race, the stream queue is race-safe. It's also very much single-threaded by design.

The requests of a StreamQueue form a queue, and they are handled in the order they are added.
If you add six hasNext requests, all of those will trigger on the first event (because the event is not consimed). Each will enqueue another next event at the end of the request queue, after the other hasNext requests.
Then the next requests are fulfilled if possible, one after another. If there is less than six events left in the queue, some of those will fail.

There is no way around this, that's how the stream queue is designed, the requests are handled as a queue (that's where the StreamQueue name comes from).

Instead of having six independent listeners on the queue, you should have one listener which multiplexes the events to the six workers.
Alternatively, just call next and catch the StateError, and take that to mean that the stream is done. (Which is wrong if the stream actually omits a StateError.)

We could add a Future<T?> tryNext() request which returns null if the stream ends without a value, instead of throwing. Not really necessary, because take(1) would do the same, just wrapped in a list (which also better distinguishes a null value from the stream ending.)
So that's another rewrite you can try.

do {
  var list = await streamQueue.take(1);
  if (list.isEmpty) return;  
  var event = list[0];
  ...
}

@lrhn lrhn closed this as completed Oct 17, 2022
@lrhn lrhn added the closed-as-intended Closed as the reported issue is expected behavior label Oct 17, 2022
@Cat-sushi
Copy link
Author

Cat-sushi commented Oct 17, 2022

do {
  var list = await streamQueue.take(1);
  if (list.isEmpty) return;  
  var event = list[0];
  ...
}

Yes, current workaround does so.

@Cat-sushi
Copy link
Author

Cat-sushi commented Oct 17, 2022

As I said, in case that the queue length = 6 * n + α (n > 1 and 6 > α > 0), it works well.
How cab I understand it?

@Cat-sushi
Copy link
Author

For your information, when use StreamController.add(), it works well.

@lrhn
Copy link
Member

lrhn commented Oct 18, 2022

The reason it works for more than six events is probably that your event handling is asynchronous and slower then the event generation.

The stream/request queue initially looks like:

Stream: []
Requests: [hasNext, hasNext, hasNext, hasNext, hasNext, hasNext]

Then you emit an element to the stream, which makes it:

Stream: [e]
Requests: [hasNext, hasNext, hasNext, hasNext, hasNext, hasNext]

Then the requests start completing. Every time they, do, they add a next request, so you end up with

Stream: [e]
Requests: [next, next, next, next, next, next]

Then the first next request completes, which starts an asynchronous computation:

stream: []
Requests: [next, next, next, next, next]

If another five events come in before that async computation is done, then you end up with an empty queue again.
(And if there are not five more events, you get the state error.)

stream: [] (paused)
Requests: []

Then one computation completes, adds

stream: [] 
Requests: [hasNext]

If the next stream event arrives before any other computation completes, then everything will work fine, the event is read, and the stream and queue go back to sleeping.

So, if your event generation is snappy, and the event handling is slow, then your strategy will (almost always) work, because there is only one request in the queue at a time. Except at the beginning, where they all start at the same time.

@Cat-sushi
Copy link
Author

Yes, I think so.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
closed-as-intended Closed as the reported issue is expected behavior needs-info We need additional information from the issue author (auto-closed after 14 days if no response)
Projects
None yet
Development

No branches or pull requests

3 participants