Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

If a hot observable's subscriber ignores errors, subsequent subscribers also ignore errors #2145

Closed
imgx64 opened this issue Nov 20, 2016 · 19 comments

Comments

@imgx64
Copy link

imgx64 commented Nov 20, 2016

If a subscriber subscribes to a hot observable with a next handler only (no error handler), then any subscribers that try to subscribe afterwards don't have their next handler called.

RxJS version:
rxjs@5.0.0-rc.4

Code to reproduce:
https://plnkr.co/edit/0CgdiUZ7iyIzzBKpoBTL?p=preview

var observable = Rx.Observable.throw('some error').share();

// This subscriber cares about errors and gets them
observable.subscribe(function(res) {
  console.log("first subscriber, next " + res);
}, function(err) {
  console.log("first subscriber, error " + err);
});

// This subscriber does not care about errors
observable.subscribe(function(res) {
  console.log("second subscriber, next " + res);
});

// This subscriber cares about errors, but never gets them
// because the second subscriber did not handle them
observable.subscribe(function(res) {
  console.log("third subscriber, next " + res);
}, function(err) {
  console.log("third subscriber, error " + err);
});

Expected behavior:
The first and third subscriber's error handler should be called, and this output should be logged:

first subscriber, error some error
third subscriber, error some error

Actual behavior:
Only the first subscriber's error handler is called, the third subscriber's error handler is not called. The below is logged instead:

first subscriber, error some error
  Uncaught some error (Rx.js:831)
    Observable.subscribe @ Rx.js:831
    (anonymous function) @ script.js:11

Additional information:
Possibly related: #1420

ocombe pushed a commit to ngx-translate/core that referenced this issue Nov 20, 2016
There's a bug in RxJS that breaks hot observables if one subscriber ignores errors (see ReactiveX/rxjs#2145 ), which is causing #308. This patch fixes it by always handling observable errors (handling errors is a good idea anyway).

Fixes #308
@jayphelps jayphelps added the bug Confirmed bug label Nov 20, 2016
@jayphelps
Copy link
Member

jayphelps commented Nov 20, 2016

Edit: this particular comment isn't applicable, skip to #2145 (comment)


Actually, I'm not sure this is a bug. What actually seems to be happening is that, if you don't provide an error handler, the error bubbles and the last subscriber doesn't even run

var observable = Rx.Observable.throw('some error').share();

// This subscriber cares about errors and gets them
observable.subscribe(function(res) {
  console.log("first subscriber, next " + res);
}, function(err) {
  console.log("first subscriber, error " + err);
});

// This subscriber does not care about errors
observable.subscribe(function(res) {
  console.log("second subscriber, next " + res);
});

alert('!!! THIS CODE IS NEVER REACHED !!!');

// This subscriber cares about errors, but never gets them
// because it never actually runs!
observable.subscribe(function(res) {
  console.log("third subscriber, next " + res);
}, function(err) {
  console.log("third subscriber, error " + err);
});

This seems as-designed. If you don't catch the error, they propagate as normal exceptions would.

@imgx64
Copy link
Author

imgx64 commented Nov 21, 2016

This example shows the bug better:

var observable = Rx.Observable.interval(200).do(function(i) {
  if (i === 2) {
    throw 'some error';
  }
}).share();

// This subscriber cares about errors and gets them
observable.subscribe(function(res) {
  console.log("first subscriber, next " + res);
}, function(err) {
  console.log("first subscriber, error " + err);
});

// This subscriber does not care about errors
observable.subscribe(function(res) {
  console.log("second subscriber, next " + res);
});

// This subscriber cares about errors, but never gets them
// because the second subscriber did not handle them
observable.subscribe(function(res) {
  console.log("third subscriber, next " + res);
}, function(err) {
  console.log("third subscriber, error " + err);
});

Output:

first subscriber, next 0
second subscriber, next 0
third subscriber, next 0
first subscriber, next 1
second subscriber, next 1
third subscriber, next 1
first subscriber, error some error
  Uncaught some error (Rx.js:3753)

Note that there is no third subscriber, error some error line.

@trxcllnt
Copy link
Member

@imgx64 Subscriber's default behavior is to throw error unhandled errors. The alternative is swallowing them, and nobody wants that. An Observable synchronously emitting an error to a Subscriber without an error handler behaves the same as synchronously calling a function that throws without wrapping in a try/catch.

The issue you linked to is a discussion about whether we should catch errors thrown in the end Subscriber's next handler and re-route them to the error handler, or if we should allow them to go uncaught. I was for it, because consumers could centralize all error handling logic in the error callback, instead of needing try/catches inside next and around the subscription point, to handle the case where in case the Observable synchronously emits.

Even if we did that though, unhandled errors would still end up interrupting the execution for Subscribers without an error handler. Rx doesn't stray from any other implementation of the Subject/Observer pattern; errors thrown during dispatch interrupts dispatch. Can't really be helped.

@imgx64
Copy link
Author

imgx64 commented Nov 22, 2016

@trxcllnt

Subscriber's default behavior is to throw error unhandled errors. The alternative is swallowing them, and nobody wants that. An Observable synchronously emitting an error to a Subscriber without an error handler behaves the same as synchronously calling a function that throws without wrapping in a try/catch.

I understand about synchronous errors, and I've posted a second example that shows it applies for asynchronous errors.

The issue you linked to is a discussion about whether we should catch errors thrown in the end Subscriber's next handler and re-route them to the error handler, or if we should allow them to go uncaught. I was for it, because consumers could centralize all error handling logic in the error callback, instead of needing try/catches inside next and around the subscription point, to handle the case where in case the Observable synchronously emits.

The reason I mentioned that bug is this comment, which says that an unhandled error in one subscription terminates all other subscriptions to the same observable. This is what I'm seeing here which I think is unexpected behavior.

Even if we did that though, unhandled errors would still end up interrupting the execution for Subscribers without an error handler. Rx doesn't stray from any other implementation of the Subject/Observer pattern; errors thrown during dispatch interrupts dispatch. Can't really be helped.

But this is interrupting the execution for subscribers with an error handler if a subscriber without an error handler happened to subscribe to the same observable.

I tried the same thing with RxJava and got what I expected:

Code:

Observable<Long> observable = Observable.interval(200, TimeUnit.MILLISECONDS).doOnNext((i) -> {
	if (i == 2) {
		throw new RuntimeException("some error");
	}
}).share();

observable.subscribe((res) -> {
	System.out.println("first subscriber, next " + res);
}, (err) -> {
	System.out.println("first subscriber, error " + err);
});

// This subscriber does not care about errors
observable.subscribe((res) -> {
	System.out.println("second subscriber, next " + res);
});

// This subscriber cares about errors, and gets them, as expected
observable.subscribe((res) -> {
	System.out.println("third subscriber, next " + res);
}, (err) -> {
	System.out.println("third subscriber, error " + err);
});

Output:

first subscriber, next 0
second subscriber, next 0
third subscriber, next 0
first subscriber, next 1
second subscriber, next 1
third subscriber, next 1
first subscriber, error java.lang.RuntimeException: some error
java.lang.RuntimeException: some error
	at com.example.Demo.lambda$0(Demo.java:53)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95)
	at io.reactivex.internal.operators.observable.ObservableInterval$IntervalObserver.run(ObservableInterval.java:74)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Exception in thread "RxComputationThreadPool-1" java.lang.RuntimeException: some error
	at com.example.Demo.lambda$0(Demo.java:53)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95)
	at io.reactivex.internal.operators.observable.ObservableInterval$IntervalObserver.run(ObservableInterval.java:74)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
third subscriber, error java.lang.RuntimeException: some error

@jayphelps
Copy link
Member

Hey, we discussed this today at our meeting, notes here. We're going to keep the existing behavior for now (v4 had the same behavior), but TC39 is considering this exact problem in the context of DOM event streams, which are multicast as well. So we're going to wait and see what direction they want to go in.

@benlesh
Copy link
Member

benlesh commented Dec 20, 2016

The TC39 discussion on this is here: tc39/proposal-observable#119

@benlesh benlesh added type: discussion and removed bug Confirmed bug labels Dec 20, 2016
@ccrowhurstram
Copy link

Worth noting - @Blesh added a good blog post that discusses this issue

@ccrowhurstram
Copy link

Has a final consensus been reached on this issue? If so what is it?

Thanks

@jayphelps
Copy link
Member

jayphelps commented Feb 5, 2017

@ccrowhurstram nope. any updates would be posted here.

The tc39 thread (link above) the discussion was left at:

Errors thrown from observer notifications will be caught and reported to HostReportErrors, and will not close the subscription. This matches the behavior of Event Target and Iterable.iterate.

If the developer desires that the subscription be closed when there is an error in consumer code they can use forEach.

But this isn't the final word on it yet. It's too early in the spec process to have anything solidified and we don't want to change things in RxJS prematurely just to have to change them back a few months later.

@christianacca
Copy link

OK thanks for the information

@christianacca
Copy link

BTW, slightly off topic here but worth the cross reference...

The work around to the problem that uses observeOn(Scheduler.asap) results in another problem in an angular 2 app:

The error thrown by an operator like map will not being sent to the usual angular 2 ErrorHandler service.

I've reported the issue with angular here: angular/angular#14316

@Nava2
Copy link

Nava2 commented Jul 21, 2017

For working around this issue with errors, if I wanted to "percolate" errors to all the subscribers of a subject, would I be best to create a "safe" subject as @benlesh's article pointed out was the issue? The sample workaround provided to use the async scheduler is not useful int his context as the flows in use require "synchronized" execution on a "clock" (a la VirtualTime like the TestScheduler).

I apologize if I over simplified anything, I'm not overly familiar with rxjs at this point.

@georgique
Copy link

I had to write an interim function which checks if a subscriber has error callback missing, and adding an empty one in that case. Quite an ugly approach.
When I subscribe to an observable, I expect either subscribe or error handler to work. Subscriber can't be responsible for previous subscribers for not having error handler defined.
Hope discussion goes into a way of fixing error propagation and let it flow through all the subscribers, which (for me) seems the only reasonable solution.

@jayphelps
Copy link
Member

@georgique the behavior in v5 is as intended, so this ticket could prolly be closed. In v6 errors from subscribers will not be rethrown, but they will still be reported to the console/window.onerror for debugging. That's very similar to the behavior of Promises. #3062

@georgique
Copy link

@jayphelps I might misunderstand things a bit, but initially issue was not about errors in subscribers, wasn't it? I have pretty simple scenario - an service sending login request to a server with wrong credentials. When a response is received, I have an Rx Subject broadcasting results to subscribers, so different system parts can react on login attempt. In my particular case I have three subscribers, two of them do not handle errors, it's just not needed there. The third though has to handle error, because it is the one showing error to the end user. But error never gets to the third subscriber just because first two don't have error handlers. If that's intended behavior, then maybe at least make error handlers compulsory?
Rxjs version is 5.5.3

@jayphelps
Copy link
Member

jayphelps commented Dec 6, 2017

@georgique That's okay. Indeed it was about errors in subscribers. In v5 if you don't provide an error callback a default one is provided for you which just rethrows the error. In this context I mean "subscriber" to be the person who is subscribing with an observer that does not include an error callback. Here's a demo https://jsbin.com/ruhosup/edit?js,console,output

If that's intended behavior, then maybe at least make error handlers compulsory?
Rxjs version is 5.5.3

It's intended behavior for v1-v5, but won't be in v6. It won't be possible to change this or make error handlers required in v5 because it's a breaking change.

If you want to mitigate it you could schedule the subscription to happen asynconronously using the .subscribeOn() operator. The AsapScheduler schedules on a microtask (e.g. like Promises) and the AsyncScheduler schedules on a macrotask (e.g. like setTimeout(0)) learn more about micro/macro tasks

I would use the AsapScheduler in this case IMO as the microtask will happen relatively faster.

https://jsbin.com/lifehu/edit?js,console,output

import { asap } from 'rxjs/schedulers/asap';

// when anyone subscribes it won't happen synchronously, instead
// each will be scheduled on their own microtask
const items = (new Subject()).subscribeOn(asap);

items.subscribe({
  next: d => console.log('first ' + d),
  error: e => console.error('FIRST SUBSCRIBER SAW IT')
});

items.subscribe({
  next: d => console.log('second ' + d),
  // no error handler so one is provided for you that
  // will rethrow the error synchronously but the fact that we scheduled
  // using the AsapScheduler means that rethrown error will not affect anyone else
  // because it happens with its own fresh callstack.
});

items.subscribe({
  next: d => console.log('third ' + d),
  error: e => console.error('THIS IS ALSO CALLED, CORRECTLY')
});

items.next(1);
items.next(2);
items.error('bad stuff');

Keep in mind that this has potential negatives too, since your subscription doesn't happen synchronously it becomes harder to reason so could introduce bugs. Technically it doesn't perform as well relatively speaking but the difference is negligible in most cases since you're not typically subscribing thousands of times per second

@georgique
Copy link

Now clear. Thanks @jayphelps.

@benlesh
Copy link
Member

benlesh commented Dec 15, 2017

Apologies I'm late to this party.

This is an issue called "Producer interference". It's been dealt with in the TC39 proposal, and it's also actually dealt with in master for RxJS 6. Unfortunately, it's not something we can fix in RxJS 5.x because it's a breaking change (albeit a small one)

Basically to fix this we have to break the following:

try {
  of(1).map(() => { throw new Error('lol') }).subscribe();
} catch (err) {
  console.log('this code should definitely be hit in RxJS 5.5, but won't in Rx 6');
}

The problem exists, as I'm sure some of you have figured out, because unhandled errors are immediately rethrown in RxJS. This means that they'll unwind the stack back to a loop that is notifying for a multicast, and break the loop.

The solution is quite simple, and makes sense for consistencies sake: Just schedule the throw on a timeout.

The truth is that putting a try/catch around a subscription is silly, and accommodating it doesn't make any sense. So we've moved away from that potentially buggy behavior, and it actually cleaned up a lot of code for us.

FYI: The workaround for this is to use observeOn(asap) after any multicast if you're worried about this behavior happening to you.

@benlesh
Copy link
Member

benlesh commented Jun 11, 2019

This was resolved with the release of v6 almost a year ago. Closing.

@benlesh benlesh closed this as completed Jun 11, 2019
@lock lock bot locked as resolved and limited conversation to collaborators Jul 11, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants