Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First operator is not completing the observable. #1759

Closed
onlyann opened this issue Jun 10, 2016 · 9 comments
Closed

First operator is not completing the observable. #1759

onlyann opened this issue Jun 10, 2016 · 9 comments

Comments

@onlyann
Copy link

onlyann commented Jun 10, 2016

**RxJS version: 5.0.0-beta.6

**Code to reproduce:
http://codepen.io/anon/pen/zBrPzj?editors=1111

Expected behavior:
Subcribe should only be called once when using the firstoperator.

Actual behavior:
It doesn't complete the subscription.

Additional information

Replacing first with take(1) works as expected.

@babeal
Copy link

babeal commented Jun 10, 2016

To add some detail:

The call to the BehaviorSubject.next inside of the subscribe method seems to be the issue. Since first() calls complete after emitting the value in the _emitValue function, calling next on the behavior subject synchronously will cause an infinite loop. This doesn't occur with take() since the recursive call will complete when the number of emitted items is greater than the count. If you make the "next" call async, then the first operator behaves correctly.

  .subscribe((x) => {
      console.log("I have been called " + x);
      Rx.Observable.timer(1).map(_ => {
        console.log("hello");
        store$.next({a: x[0] + 1, b: x[1] + 1});
      }).subscribe();

    }

@Dorus
Copy link

Dorus commented Jun 13, 2016

It's worth nothing that you are not allowed to call store$.next concurrent. This is one of the serialization properties of Rx, callers need to ensure they implement a proper locking mechanism, so that downstream operators do not have to run extensive overhead to protect them self against multiple concurrent calls.

@benlesh
Copy link
Member

benlesh commented Jun 13, 2016

@onlyann yup, that's a bug.

It appears to be an issue with the synchronous nexting inside of the observation. Here's a more minimal example showing the bug:

var store$ = new Rx.Subject();

store$.first().subscribe((x) => {
  console.log("I have been called " + x);
 store$.next({a: x[0] + 1, b: x[1] + 1});
});

store$.next({ a: 1, b: 2 })

But if you use a setTimeout it works fine:

var store$ = new Rx.Subject();

store$.first().subscribe((x) => {
  console.log("I have been called " + x);
  setTimeout(() => store$.next({a: x[0] + 1, b: x[1] + 1}));
});

store$.next({ a: 1, b: 2 })

@benlesh benlesh added bug Confirmed bug help wanted Issues we wouldn't mind assistance with. labels Jun 13, 2016
@kwonoj
Copy link
Member

kwonoj commented Jun 20, 2016

Is it expected to allow to source emitting value in subscription in case of synchronous observable instead of should be scheduled differently?

(just fyi - this does occur same on v4)

@jkyle
Copy link

jkyle commented Jun 21, 2016

.take(1) also does not complete when called with synchronous nexting. It behaves as expected in the example, but doesn't complete:
http://codepen.io/anon/pen/GqNjra?editors=0011

Since it doesn't complete, it holds on to the observers and can cause memory leaks:
http://codepen.io/anon/pen/EyNyGW?editors=0011

In both cases, wrapping the next call in a setTimeout fixes the completion issue, but you lose the synchronicity that might have been the original intent. Also, calling .unsubscribe() explicitly fixes the problem of holding on to the observers, but they still don't "complete".

var store$ = new Rx.Subject();

store$.take(1).subscribe((x) => {
  console.log("I have been called " + x.a);
 store$.next({a: x[0] + 1, b: x[1] + 1});
}, () => {}, () => console.log("completed"));

store$.next({ a: 1, b: 2 })
// "I have been called 1"
var store$ = new Rx.Subject();

store$.take(1).subscribe((x) => {
  console.log("I have been called " + x.a);
 setTimeout(() => store$.next({a: x[0] + 1, b: x[1] + 1}));
}, () => {}, () => console.log("completed"));

store$.next({ a: 1, b: 2 })
// "I have been called 1"
// "completed"

@trxcllnt
Copy link
Member

trxcllnt commented Jun 21, 2016

Protecting against re-entrency without throwing errors or dropping values would necessitate introducing unbounded buffers, which was a conscious design trade-off we made in the early days.

In this case, the take operator forwards on to its Subscriber, which immediately feeds another event into the head, and the stack never unwinds to allow the take operator to send the completion message (like a snake eating it's own tail).

This isn't a bug in take, since modifying it to complete before next'ing would mean the subscriber never hears the next event, and modifying it to unsubscribe before next'ing would dispose subscriptions in the wrong order.

As demonstrated, introducing an asynchronous boundary between emission and consumption (or vice-versa) is the best alternative here, though it doesn't have to be async across tick boundaries; async on the asap scheduler (which still blocks the current event loop) does the trick:

console.clear();

setTimeout(() => console.log('setTimeout flushed'));

var store$ = new Rx.Subject();

store$.take(1)
  .observeOn(Rx.Scheduler.asap)
  .subscribe((x) => {
    console.log("I have been called " + x.a);
    store$.next({a: x[0] + 1, b: x[1] + 1});
  }, () => {}, () => console.log("completed"));

store$.next({ a: 1, b: 2 });
// "I have been called 1"
// "completed"

Rx.Scheduler.asap.schedule(() => console.log('asap flushed'));
// "asap flushed"
// "setTimeout flushed"

@benlesh benlesh removed help wanted Issues we wouldn't mind assistance with. bug Confirmed bug labels Jul 6, 2016
@benlesh
Copy link
Member

benlesh commented Jul 6, 2016

@trxcllnt is correct. It's the reentrancy protection that is causing this effect. I think we can close this issue for now, however, it might be worth discussing the merits of the reentrancy protections in Subjects, as I know that we've discussed it in the past, but it can't hurt to revisit.

@vekexasia
Copy link

@Blesh & @trxcllnt can you take a look at #1993 ?

@lock
Copy link

lock bot commented Jun 7, 2018

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@lock lock bot locked as resolved and limited conversation to collaborators Jun 7, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants