Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Should do async stop but did sync stop [edited title] #185

Open
wclr opened this issue Apr 18, 2017 · 9 comments
Open

Should do async stop but did sync stop [edited title] #185

wclr opened this issue Apr 18, 2017 · 9 comments

Comments

@wclr
Copy link
Contributor

wclr commented Apr 18, 2017

This is the example of http like cycle driver where the request is resolved synchronously. Couldn't narrow it down simple xstream pattern, but I wonder why for one request in sink in this case it actually does two requests (starts stream two times).

import xs from 'xstream'
import delay from 'xstream/extra/delay'

let requestsCount = 0

const driverWithSyncResponse = (request$) => {
  const createResponse$ = (request) => {
    const stream$ = xs.create({
      start: (listener) => {      
        requestsCount++
        listener.next(request.num)
        listener.complete()
      },
      stop: () => { }
    }).remember()
    stream$.addListener({ next: () => { } })
    return stream$
  }
  const r$$ = request$.map(createResponse$)
  r$$.addListener({ next: () => { } })
  return r$$
}

const request$ = xs.of({ num: 1 }).compose(delay(0))

const r$$ = driverWithSyncResponse(request$)

r$$.flatten()
  .addListener({
  next: (res) => {
    console.log('Actual requests count should be 1, but it is', requestsCount)    
  }
})

https://www.webpackbin.com/bins/-Ki1D10ZxL0MfO5pVGQt

@xtianjohns
Copy link
Contributor

Hey @whitecolor, I don't know exactly why you are experiencing this issue. Your bin is slightly different than what you have pasted in the description.

However, my guess would be that your completion notification is immediate. And this means that between each addListener statement, your stream is ending and the number of listeners keeps moving between 0 and 1. You can check this out by logging inside the stop callback.

Here is a snippet demonstrating the two scenarios, and a bin.

import xs from 'xstream';

const willRunStartTwice = xs.create({
  start: listener => {
    console.log( 'starting (probably twice)' );
    listener.next();
    listener.complete();
  },
  stop: () => {}
});

const willRunStartOnce = xs.create({
  start: listener => {
    console.log( 'starting (only once)' );
    listener.next();
  },
  stop: () => {}
});

willRunStartTwice.remember().addListener({ next: () => {} });
willRunStartTwice.remember().addListener({ next: () => {} });

willRunStartOnce.remember().addListener({ next: () => {} });
willRunStartOnce.remember().addListener({ next: () => {} });
willRunStartOnce.remember().addListener({ next: () => {} });
willRunStartOnce.remember().addListener({ next: () => {} });
// => "starting (probably twice)"
// => "starting (probably twice)"
// => "starting (only once)"

@wclr
Copy link
Contributor Author

wclr commented Apr 22, 2017

Your bin is slightly different than what you have pasted in the description.

I probably modified it by chance after submitting the issue.

Thanks for picking this up. So yes I believe this is the case.

@staltz
Copy link
Owner

staltz commented Apr 24, 2017

Hi :)
Thanks for letting us know. Yes, this looks weird because two synchronous/sequential addListener calls should not start two executions, this violates the assumption of sync start & async stop.

@staltz staltz changed the title Flatten + sync http like cycle driver Should do async stop but did sync stop [edited title] Apr 24, 2017
@xtianjohns
Copy link
Contributor

xtianjohns commented Apr 24, 2017

This is what I thought, @staltz, but I'd like your guidance on whether or not this really is a violation. Consider our documentation on the question of asynchrony:

What matters for stopping the Producer is stream.removeListener. When the last Listener leaves (or in other words, when the number of Listeners suddenly changes from 1 to 0), the Stream schedules producer.stop() to happen on the next event loop. That is, asynchronously. If, however, a new Listener is added (number goes from 0 to 1) before that scheduled moment, the producer.stop() will be cancelled, and the Producer will continue generating events for its Stream normally.

What exactly is scheduled to happen asynchronously? The stopping of the producer. And when should that happen? When the last listener leaves.

But in the examples we are working with on this issue, I think I had it wrong. This isn't about the number of listeners, this is genuinely about the stream ending. And its completion isn't scheduled to take place in the future (in the examples), so why should the producer's stop callback be scheduled? Consider:

const stream = xs.create({
  start: listener => {
    listener.next();
    listener.complete(); // this line doesn't schedule an action, see below
  }
});

And the source for completion handling:

  _c(): void {
    const a = this._ils;
    const L = a.length;
    this._x(); // we definitely end up here
    if (this._d) this._dl._c();
    if (L == 1) a[0]._c(); else if (L == 0) return; else {
      const b = cp(a);
      for (let i = 0; i < L; i++) b[i]._c();
    }
  }

  _x(): void { // tear down logic, after error or complete
    if (this._ils.length === 0) return;
    if (this._prod !== NO) this._prod._stop(); // and we definitely invoke stop
    this._err = NO;
    this._ils = [];
  }

So I think the question is different: when a memory stream ends, and a new listener is added, what behavior is expected? Should start be called (again)? Or should the listener receive the last notification (if any) and then complete (essentially, this is on par with AsyncSubject from rxjs)? [Edit: Okay, it would be different from AsyncSubject in a bunch of ways, but still: emit last then end.]

@staltz
Copy link
Owner

staltz commented Apr 25, 2017

Question: does this issue happen only with remember/MemoryStream?

@xtianjohns
Copy link
Contributor

xtianjohns commented Apr 25, 2017

In short, no.

If you remove the .remember() calls from my snippet above, you get the same behavior. That is, completion ends the stream synchronously. It happens in both classes, and the only completion-related methods overridden by MemoryStream are _x(), which largely defers to the Stream._x(). So I wouldn't expect a big divergence between these two classes.

I think that the confusion, the idea that this behavior is buggy comes from the idea that MemoryStream should remember its emissions, so under no circumstances should it ever call Producer.start() twice.

If that is the rule of law, we need to change some code. If it's fine for MemoryStream to behave like Stream with respect to completion, then I think we should close.

@staltz
Copy link
Owner

staltz commented Apr 27, 2017

Yeah, so I think this is only about whether MemoryStream should be one-time use or be resetable. There is no correct answer. I've seen this exact issue in RxJS, I recommend reading the discussion: ReactiveX/rxjs#453

Right now, I'm inclined to do what RxJS eventually did: allow reusable MemoryStream. Which means keeping the current behavior, and if you really want MemoryStream to keep the value, then appending it with xs.never().

xs.merge(s, xs.never()) is becoming a common "hack", we might consider the idea of making it idiomatic with e.g. x$.neverEnd()

@xtianjohns
Copy link
Contributor

I'm okay with reusable MemoryStream.

@whitecolor, is the pattern @staltz identifies acceptable for your use cases? Here is a snippet and bin.

import xs from 'xstream';

const example = xs.create({
  start: listener => {
    console.log( 'starting...' );
    listener.next();
    listener.complete();
  },
  stop: () => {}
});

const once$ = xs.merge( example, xs.never() );

once$.addListener({ next: () => {} });
once$.addListener({ next: () => {} });
once$.addListener({ next: () => {} });
once$.addListener({ next: () => {} });
// => "starting..."

@xtianjohns
Copy link
Contributor

Hey @whitecolor, most (okay all) of the bins linked to this seem to have expired, but do the snippets make sense? Does the suggested pattern work for you?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants