Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.first and .firstWhere Futures don't return until two items are added to a Stream by yield in an async* function #35063

Closed
idealopamp opened this issue Nov 6, 2018 · 2 comments
Labels
area-vm Use area-vm for VM related issues, including code coverage, and the AOT and JIT backends. closed-duplicate Closed in favor of an existing report type-bug Incorrect behavior (everything from a crash to more subtle misbehavior)

Comments

@idealopamp
Copy link

idealopamp commented Nov 6, 2018

  • Dart SDK Version (dart --version): 2.0.0 and 2.1.0-dev.5.0.flutter-a2eb050044
  • OS: Linux

I have a Stream (or technically a _ControllerStream) that is constructed using async*, await for, and yield. It works the way I would expect until I try to use .first or .firstWhere to get the first item from that Stream, at which point I see surprising behavior -- the first item is eventually returned by the .first Future, but only after a second item has been yielded by the async* function that generates the Stream. Even more surprisingly, when using .firstWhere, the boolean test function is evaluated after the first item has been yielded, but the Future still doesn't return until the second item is yielded.

Below is example code and it's output, where you can see an async listener in getFirstOverAndOver that does not receive the first value until after two items are added to the source stream and two items yielded by the async* streamFromStreamViaAsyncStar, even though the second item is not yielded until a full second after the first event was yielded. The final value of 4 is only received ~ 15 seconds after it is yielded, when the source StreamController is closed.

import 'dart:core';


void generateEverySecond(StreamController<int> streamIntController) async {
  int i = 0;
  while(i < 5) {
    await Future.delayed(Duration(seconds:1));
//    print("Generator: Awoke from sleep; adding $i to stream.");
    streamIntController.add(i);
    print("GENERATOR: ADDED $i");
    i+=1;
  }
}

void getFirstOverAndOver(Stream<int> stream) async {
  int retrievedValue;
  print("FYI, ${streamFromStreamViaAsyncStar(stream)} is runtimeType ${streamFromStreamViaAsyncStar(stream).runtimeType} "
      "and ${streamFromStreamViaAsyncStar(stream).asBroadcastStream()} is ${streamFromStreamViaAsyncStar(stream).asBroadcastStream().runtimeType}");

  while(retrievedValue == null || retrievedValue<4) {
    print("GETTER: Waiting for next value (previous value was $retrievedValue)");

    // retrievedValue = await streamFromStreamViaAsyncStar(stream).asBroadcastStream().firstWhere((int i) {
    // Broadcast works fine, but not the non-broadcast; i.e., replace line below with line above and works as expected
    retrievedValue = await streamFromStreamViaAsyncStar(stream).firstWhere((int i) {
      print("Getter: firstWhere evaluating for $i, and I will now immediately return true");
      return true;
    });
    print("GETTER: GOT VALUE $retrievedValue");
  }
}

Stream<int> streamFromStreamViaAsyncStar(Stream<int> originalStream) async* {
  // A simple for loop + yield also produces same behavior, 
  // but output is cleaner with the StreamController generator.  
  await for (int i in originalStream) {
    print("Async *: Yielding $i");
    yield i;
  }

}

Future<Null> main() async {
  StreamController<int> streamIntController = StreamController<int>.broadcast();

  try {
    getFirstOverAndOver(streamIntController.stream);
    await Future.delayed(Duration(seconds: 1)); // Overkill since generator sleeps at the start too, but lets just be suuuper sure that it's listening before the first event is fired.
    generateEverySecond(streamIntController);
    await Future.delayed(Duration(seconds: 20));
  }
  finally {
    print("20 seconds have elapsed; closing the source StreamController");
    streamIntController.close();
  }
}

Output of the code:

GETTER: Waiting for next value (previous value was null)
GENERATOR: ADDED 0
Async *: Yielding 0
Getter: firstWhere evaluating for 0, and I will now immediately return true
GENERATOR: ADDED 1
Async *: Yielding 1
GETTER: GOT VALUE 0
GETTER: Waiting for next value (previous value was 0)
GENERATOR: ADDED 2
Async *: Yielding 2
Getter: firstWhere evaluating for 2, and I will now immediately return true
GENERATOR: ADDED 3
Async *: Yielding 3
GETTER: GOT VALUE 2
GETTER: Waiting for next value (previous value was 2)
GENERATOR: ADDED 4
Async *: Yielding 4
Getter: firstWhere evaluating for 4, and I will now immediately return true
20 seconds have elapsed; closing the source StreamController
GETTER: GOT VALUE 4
@lrhn
Copy link
Member

lrhn commented Nov 6, 2018

It's not that first doesn't stop, but tha the code for the async* functions continues running before the first data event is delivered.

This is a bug. The specification states that the async* function must wait at the yield until the event has been delivered (the onData callback on the stream listener corresponding to the current invocation must be called) before continuing. That gives the listener time to cancel or pause before the async* function can continue. (If the function delivers the event using a synchronous stream controller, then no actual waiting is needed, it just has to check for pause/cancel after sending the event).

The specification clarified the behavior fairly recently (it was somewhat vague before), and implementations have not been changed to match yet, but it is the intent that the callback should be able to cancel in time.

@lrhn lrhn added area-vm Use area-vm for VM related issues, including code coverage, and the AOT and JIT backends. type-bug Incorrect behavior (everything from a crash to more subtle misbehavior) labels Nov 6, 2018
@natebosch
Copy link
Member

See further discussion in #34685 and #34775

I'm going to close in favor of #34775

@natebosch natebosch added the closed-duplicate Closed in favor of an existing report label Nov 7, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-vm Use area-vm for VM related issues, including code coverage, and the AOT and JIT backends. closed-duplicate Closed in favor of an existing report type-bug Incorrect behavior (everything from a crash to more subtle misbehavior)
Projects
None yet
Development

No branches or pull requests

3 participants