-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Should producer error close/cancel subscription ? #127
Comments
Why? |
@benjamingr because Rx guarantees deterministic memory management. Without teardown onError, unresolvable memory leaks can occur. |
The correct behavior is for the Observer to teardown when it receives an onError message. The consumer can choose to |
@benjamingr I was more thinking that if an Observable could emit multiple errors how would that work for the promise returned by |
@trxcllnt - Do you mind expanding a bit on that and/or providing some examples? I read something similar in the RxJs docs but had trouble imagining when that would occur or how its more safe or stateless. |
Per the Observable contract, an Observer's
Sure, no problem. This example might seem trivial, but it's succinct and easy to follow. Let's say I'm consuming the values from computing the fibonacci sequence over time and something goes wrong: function fibs(period = 0) {
return Observable
.interval(period)
.scan([i, n] => [n, i + n], [1, 0])
.pluck(0)
}
fibs(100 /*ms*/).do((val) => {
if (val === 13) {
throw new Error('bad luck!');
}
})
.subscribe({
next: console.log.bind(console, 'value'),
error: console.log.bind(console, 'error'),
complete: console.log.bind(console, 'done')
});
/*
value 0
value 1
value 1
value 2
value 3
value 5
value 8
error Error: bad luck!
*/ In this example, teardown on error ensures the interval Observable is cleaned up. Without it, we can't call We could recover from the error with fibs(100 /*ms*/).do((val) => {
if (val === 13) {
throw new Error('bad luck!');
}
})
.retry(1)
.subscribe({
next: console.log.bind(console, 'value'),
error: console.log.bind(console, 'error'),
complete: console.log.bind(console, 'done')
});
/*
value 0
value 1
value 1
value 2
value 3
value 5
value 8
value 0
value 1
value 1
value 2
value 3
value 5
value 8
error Error: bad luck!
*/ This is correct in functional programming. Declaring an observable is like declaring a function. So subscribing to an Observable is like calling a function. And so re-subscribing to an Observable is like re-calling a function; it just re-executes its logic again, isolated from any other callers. But it might be critical to preserve the current state of the fibonacci calculation. If that's the case, we can multicast the values through a Subject (which are like reactive variables), and preserve the original subscription: fibs(100 /*ms*/)
.multicast(
() => new ReplaySubject(1),
(xs) => xs
.do((val) => {
if (val === 13) {
throw new Error('bad luck!');
}
})
.catch(() => xs)
)
.take(10)
.subscribe({
next: console.log.bind(console, 'value'),
error: console.log.bind(console, 'error'),
complete: console.log.bind(console, 'done')
});
/*
value 0
value 1
value 1
value 2
value 3
value 5
value 8
value 13
value 21
value 34
done undefined
*/ |
@trxcllnt Thanks much! Just a few comments :)
I do admit I feel a little silly trying to change a type that has been around for so long but if you'll indulge me... In your example: function fibs(period = 0) {
return Observable
.interval(period)
.scan([i, n] => [n, i + n], [1, 0])
.pluck(0)
}
fibs(100 /*ms*/).do((val) => {
if (val === 13) {
throw new Error('bad luck!');
}
})
.subscribe({
next: console.log.bind(console, 'value'),
error: console.log.bind(console, 'error'),
complete: console.log.bind(console, 'done')
});
No disagreeing with you there but I'm still a bit lost about why treating the error path similarly to the next path violates functional programming. I'm thinking of the Either or Task/Future types which seem to treat errors as just a value rather than a state. Again, please forgive me if I'm missing something. |
No problem at all. The Observable is the push dual of Enumerable (aka Haskell's Data.List monad). The duality of the type is only maintained if the push semantics of Observability mirror the pull semantics of Enumerability. There's 4 key sections to enumerating a list of values: try {
while (iterator.hasNext()) { // <-- can throw or return false
// 1. handle next value
const x = iterator.getCurrent();
//4. break early
if (x === 5) { break; }
}
// 3. may have iterated some values, then finished successfully
} catch(e) {
// 2. may have iterated some values, then got an error
} And their Observable (push) equivalents: const subscription = observable.subscribe({
next(x) { /* 1. handle next value */ },
error(e) { /* 2. may have observed some values, then got an error */ },
complete() { /* 3. may have observed some values, then finished successfully */ }
});
//4. break early
subscription.unsubscribe(); Enumerables and Observables as they're defined here are referentially transparent; invoking |
I really appreciate both of you taking the time to explain your points of view. Thank you. |
@trxcllnt - Thanks so much, that is great info. I had seen an Erik Meijer talk on duality but hadn't read that paper. The code example you provided really illustrates your point well. I do have a few more questions, if you'll allow me to push on. 1a. So an observer's 1b. Also, if |
@jordalgo I'm going to over-simplify this, probably, but basically Comparisons to EventTarget are a little off here because EventTargets only push values out of themselves. They don't push errors (or completions for that matter). Generally the source of errors involving event target happen in the handler registered to the event target. There is a discussion regarding handling errors thrown in the observable equivalent of those handlers, Observer, here: #119 Observables can be used to wrap or model EventTargets because they're a push-based async primitive. Observables can also be used to wrap or model Promises. They're a very low-level async primitive. Observable is really just a function that accepts your Observer, wraps it in some guarantees, then passes it to the body of the function. It also adds some sugar around teardown and tying the observer to it. It doesn't multicast on it's own. It doesn't have the concept of removing listeners on it's own. It's a very primitive type. So trying to optimize Observable itself for a particular behavior around removing listeners when the observable is wrapping some externally created producer that multicasts is going to be awkward. |
@jordalgo |
Thanks much, gents -- really appreciate the info here. I'm going to close this issue as it seems like the Observable type (and the way it treats the It seems to me that maybe a strict Observable is not what I want to use to poll for data or cases where I want to be able to pass operational errors downstream. I could do this with Observables by passing an object like |
@jordalgo as @Blesh mentioned, since Observables are an async primitive, they're meant to be composed into more complex types with combinators, multicasting, etc. You can build a lot of cool stuff as long as the underlying primitives follow all the rules :). From what you've said, it sounds like multicasting a single source into two streams might be the way to go. One perhaps for composing computations on the values, and the other that emits operational errors. Then you can combine them in whichever way solves your problem (e.g. |
@trxcllnt - That's a good suggestion; I'll tinker with that some 👍 |
I don't think I'm going to re-open this issue but I was wondering if some generous folks wouldn't mind providing some additional input and knowledge. I fully understand that the Observable type has been around for a while (and there is contract) and has always had the behavior of either pushing an Now that the Observable type is not going to be routing caught errors to observer |
FWIW, our team also found the termination of streams on https://github.com/material-motion/indefinite-observable-js/ |
@appsforartists this is interesting, but I see a few possible weak points:
I can understand how an error causing tear down of the producer can be confusing. But surely it can be understood that the producer will be torn down when the stream is I think you could get the same ergonomics without the abstraction overhead by using simple callbacks and explicit teardowns if you're more comfortable with that. |
@Blesh, I don't know if this is the right venue for this conversation, but the values we're modeling never complete; only come to rest. We have a
|
@jordalgo one key guarantee of maintaining functional purity (at the level of the async primitive) is deterministic memory management; auto-disposal on termination guarantees you won't have memory leaks (e.g. from forgetting to remove an event listener) in your async code. Since the Observer grammar specifies you won't be notified after termination, not tearing down the subscription would only leak memory. Observables are ultimately just referentially transparent functions (as opposed to stateful lists of Observers to notify). It may be helpful to think of auto-disposal as the push-dual of deallocating the memory in a stack frame after a function returns. @appsforartists |
@trxcllnt We can always add them back if we need them. Our working hypothesis is that for interactive systems, an author ought to be able to declare the system once and never worry about it being torn down because something in the chain called If we ever find we need to build something like |
@trxcllnt I feel like we may be talking past each other as I'm quite confused as to how not closing the subscription when, specifically, the producer calls observer const backendPoll = new Observable(observer => {
const poller = setInterval(() => {
requestDataFromBackend(res => {
if (res.status !== 200) {
observer.error(new Error('Uh oh. Non 200 received.'));
} else {
observer.next(res);
}
});
}, 2000);
return () => { clearInterval(poller); };
});
backendPoll
.map(data => {
// do something cool
})
.subscribe({
next: mappedData => {},
error: error => {
// send user a message but keep listening
}
}); AND if the producer wanted to add extra protection it could do this const backendPoll = new Observable(observer => {
const poller = setInterval(() => {
try {
requestDataFromBackend(res => {
if (res.status !== 200) {
observer.error(new Error('Uh oh. Non 200 received.'));
} else {
observer.next(res);
}
});
} catch (e) {
observer.error(e);
observer.complete();
}
}, 2000);
return () => { clearInterval(poller); };
}); I don't see how this makes the primitive not functionality pure (perhaps I'm missing something). |
@jordalgo perhaps the easiest way to understand this is to write the equivalent synchronous function. Does this function throw an error, or return a result? It can't do both: function backendPoll() {
let result;
try {
result = requestDataFromBackend();
} catch (e) {
throw e;
}
return result;
} While I don't wish to discount your use-case, the problem you're describing is a level or two higher than what the Observer grammar was designed to express. I would encourage skimming the Rx Design Guidelines [pdf] for a more thorough explanation of the design, but I'll try to hit the high notes here. The Observer design is very low-level. Like, programming-language-design level. From this perspective, the design must strive to define the minimal grammar required to express as many different ideas as possible. Since Observable is the push dual of functional enumerable sequences, we already have a set of rules we need to follow. In fact it makes life easier, since we're not trying to do anything that hasn't already been done before. Observables are a programming language primitive, which is why they make sense to build into JavaScript, like they already are in C#, Dart, and maybe others at this point.
If you take If we agree on that, then we get back to the original topic, tearing down subscriptions after onError is called. Again, it's helpful to look at the behavior of But since push semantics aren't built into the VM, we have to help the VM out a little bit if we're going to fully implement the push-dual of As to your use-case, from a PLT perspective, the error-handling primitive isn't just another value type, it's a separate branch of control-flow. If you want to work with errors as values, you can materialize errors as values through another branch of the value path, and compose them like any other value. Your proposed design for As an exercise, here's one way to implement a solution that materializes non-200's into values without terminating the stream: const backendPoll = new Observable(observer => {
const values = new Subject();
const errors = new Subject();
let resource, interval = setInterval(() => {
resource = requestDataFromBackend(res => {
resource = null;
if (res.status !== 200) {
errors.next(new Error('Uh oh. Non 200 received.'));
} else {
values.next(res);
}
});
}, 2000);
observer.next({ values, errors });
observer.complete();
return () => {
clearInterval(interval);
if (resource) {
resource.unsubscribe();
}
};
});
backendPoll
.mergeMap(({ values, errors }) => Observable.merge(
values.map(data => {
// do something cool
}),
errors.do(err => {
// print the error, or combine with values, takeUntil, etc.
})
))
.subscribe((mappedData) => { /* etc. */ }) |
@trxcllnt - Thanks for taking the time to discuss this. Unfortunately, either I'm missing something very obvious or I'm doing a terrible job of explaining my POV 😄 So first off, I agree with pretty much everything you said e.g. As you've illustrated in your examples, it's definitely possible to accomplish my use-case using current Observable semantics. However, I'm still not convinced that an alternative push primitive (probably very similar to an Observable) that doesn't close and clean-up resources To address some of your points:
I'm not debating the merits of try/catch in the same way that node wasn't when it used the CPS design pattern. To steal a quote from this article: "When a function passes its errors to a callback it no longer has to make assumptions on how that error should be handled. readFile() itself has no idea how severe a file read error is to your specific application. It could be expected, or it could be catastrophic. Instead of having to decide itself, readFile() propagates it back for you to handle." I feel the same should exist for the future JS push primitive. In addition (and sorry if I'm repeating myself) since this current Observable spec does not catch any thrown errors and pipe them to |
I honestly can't answer that for anyone besides me, as this is more about personal taste than programming language design. I can say that over the years I, and many others, have found Observables to be a fantastic solution to a broad range of problems. Asynchrony cross-cuts problem domains; doing it well is a headache in every system I've ever built. For the few problems one-way push sequences aren't adequate on their own (like back-pressure), they can be combined with their Enumerable dual to build Async Iterables, and the meticulous adherence to duality between the two makes them a delight to work with :-) Observables have held up after years of battle-testing in some of the biggest software systems in the world (Netflix Edge, Microsoft Azure, and soon Facebook's infrastructure), which is exactly the quality of proof I'd personally feel comfortable building into a language that millions of people work with every day.
You're absolutely right, which is why I've argued so strongly against this in #119 and #47. Unfortunately the current state of this project does reflect a fundamental departure from the Rx grammar, a change which I believe should require the same level of proof as we have for the existing Observable semantics. Unfortunately I can't berate @jhusain in person about this anymore since I'm no longer at netflix. |
Definitely but given this might be become part of the language, I'm advocating for my own taste 😄 and attempting to back it up with arguments.
This is a great argument for keeping
No disagreement here. There are many other stream/observable-like libs out there but definitely none as popular as Rx's.
I'm just riding this slippery slope because if this spec violates the Observable contract by not catching thrown errors to close subscriptions and de-allocate resources is it still an Observable ? Perhaps that level of specific error handling is not in the contract. |
After a lot of consideration I’ve decided to re-open this thread for the following reasons:
I know this is a departure from how RxJs currently implements Observable but that doesn’t mean it shouldn’t be considered and discussed further as other libraries (mentioned in the above comments) have this behavior. It’s mostly been @trxcllnt and I discussing this, which has been incredibly valuable, but I would also really appreciate input from @jhusain , @headinthebox and other experienced devs in this arena. (Additionally, and I hesitate to mention it because I think what I said above is the best option, perhaps a case could be made for only closing/cleaning-up when these two events occur 1, 2) |
This was partially discussed here and here but I wanted to have a more focused conversation around explicit
error
calls as opposed to exceptions. My apologies and please be gentle if I'm missing something blantantly obvious. In the current implementation, callingobserver.error
cancels the subscription and calls the cleanup method but I wanted to discuss the pros (and cons) of leaving it open.Pros for Not Closing
Allow the producer to decide what errors are considered bad enough to close/cancel the subscription (by firing
error
and thencomplete
). In the case of dealing with asynchronous sequences of data (like reading a file), it's easy to see how it's neccesary to close the subscription if an error occurs but it seems easy enough to have the publisher do this manually, subclass the type with this functionality, or create an operator likecloseOnError
.At the point that a producer is calling an observer's
error
, I feel like the actual "error", whatever it is, has been "caught" or at least anticipated -- that is it's not an unexpected/programmer error, so why is it assumed that this error will cause issues (memory leaks or unexpected behaviors) in the Obversable if it doesn't end? This is in addition to the fact that the current implementation doesn't automaticallytry/catch
aroundnext
calls.EventTargets don't have this behavior of automatically removing listeners on error. Take the video node, for example; if it emits an error, the listener isn't automatically removed. (also mentioned here)
There is more freedom and power for both the producer and the observer. If errors didn't auto-cancel, observers would have the option of unsubscribing on error and also they wouldn't have to re-subscribe if they wanted to react to multiple producer errors. Producers wouldn't have to worry, if they wanted to make errors (like server errors) available to the observer without wrapping said error in an object passed to
next
e.gobserver.next({ error: '404', data: null })
.Counters:
What to do if the observer didn't implement an error handler? If these are "operational" errors then should they be thrown if the there is no error handler? Maybe?
This contradicts general promise and observable composition (which I'm assuming is probably a non-starter for most folks).
Hopefully long time RxJs contributors and Observable advocates have more counter arguments but I just wanted to throw this out there. Again, please be gentle.
The text was updated successfully, but these errors were encountered: