Replies: 3 comments
-
I think that's a nice example which shows a really good case where a token makes things less error-prone, which I think it's awesome. However, I'd like to raise an issue which makes this not as straightforward as it might seem. Imagine, following the original example, that the user wants to subscribe to some changes for each value and emit those. Although it could be done in the same way as the example, let's put this in a custom operator so it doesn't look as contrived: const withLatestUserInput = () => source => new Observable((subscriber, signal) => {
let lastMousePos = null;
mouseMove$.subscribe(({ position }) => {
lastMousePos = position;
}, signal);
let lastClickPos = null;
mouseClick$.subscribe(({ position }) => {
lastClickPos = position;
}, signal);
source.pipe(
map(value => ({ value, lastMousePos, lastClickPos }))
).subscribe(subscriber, signal)
})
const ac = new AbortController();
flatteningSyncOuter.pipe(
take(3),
withLatestUserInput()
).subscribe(undefined, ac.signal); I'm sorry this is contrived, it's hard to find a good example - What this On first sight, if we follow the logic that the signal is shared along the whole subscription chain, it looks like it would work, right? However, this assumption is wrong, unless we'd use a different token system from AbortController/AbortSignal: an AbortSignal has a property This means that internally, This means that I think it would be great if we can find a solution where we can leverage the best of both worlds. Tokens have their use and in some cases are great to use (specially in synchronous operations), and Subscriptions provide a good abstraction that makes composition easier, although in some specific cases they are hard to use. Would it be posible to expose the token as proposed, while also keeping some sort of "cleanup" logic on subscription? |
Beta Was this translation helpful? Give feedback.
-
@voliva There are many ways to make sure this works. In all cases, an internal
|
Beta Was this translation helpful? Give feedback.
-
@benlesh If I understand the "synchronous firehose" issue you are describing here, wouldn't adding a Example (from your example): const subscription = new Subscription();
firehose.subscribe({
start: sub => subscription.add(sub),
next: value => subscriber.next(value)
});
// // Instead of this:
// subscription.add(firehose.subscribe({
// next: value => subscriber.next(value)
// })); Also, the // Without support of `start` function, instance of MyObserver has no way of getting its own subscription to unsubscribe itself at some point in the future or immediately.
const subscription = myObservable.subscribe(new MyObserver()); I also see the advantage and the simplicity of using interface Observer<T> {
start(): void; // Note, here we don't get the subscription anymore since we are using signal.
next(value: T): void;
error(err: any): void;
complete(): void;
signal: AbortSignal;
} This way an Observer instance can also "unsubscribe" itself by creating its own termination Example: class ConsoleLogObserver {
constructor(time) {
this._time = time;
this._ac = new AbortController();
this.signal = this._ac.signal;
}
start() {
// The Observer can unsubscribe itself since it has access to its own `AbortController`
timer(this._time).subscribe({
next: () => this._ac.abort(),
signal: this.signal,
});
}
next(value) {
console.log(value);
}
complete() {
this._ac.abort();
}
}
// console.log values for 30sec
valueStream$.subscribe(new ConsoleLogObserver(30000)); |
Beta Was this translation helpful? Give feedback.
-
There are weird corners of RxJS involving teardown and "synchronous firehose" observables that are very hard to resolve with naive RxJS code. This is due primarily to the fact that we do not give the consumer a reference to the "closed" state of the underlying subscription.. and the only way to do that, really, is with some sort of token.
Here is a contrived example that maybe only a seasoned RxJS veteran will understand, but it's here for discussion purposes:
The problem
If there is an additional subscription step between the
firehose
and thetake(3)
, there is no way to thread thatclosed
state of the outer subscription (related to thetake(3)
) to the inner subscription in such a way that it would close it.. If thefirehose
was infinite, or "very long", this would be catastrophic and hard to debug without a lot of expertise.This is not a problem in the "99% use case" for RxJS, which should be async. This is why we haven't run across a lot of complaints about this. In the case where the inner subscription (to
firehose
) is async, the code has time to get a reference to the inner subscription and connect it to the outer subscription.How a token system improves this
If we had a token system, the same token gets threaded through the entire subscription, even to the inner subscriptions. This means that the
firehose
would be checkingsignal.aborted
ortoken.cancelled
or whatever it ended up being called, instead ofsubscriber.closed
(however, we could wire the subscriber up to make sure it is closed by the token). The big advantage is the shared state throughout the subscription chain, even to the inner subscriptions.A token system will make the internal code lighter, and the implementation of naive operators by our users more readable and straight-forward, IMO.
Example with a token system (AbortSignal, let's say)
This is a make-believe example of the same problem, only using a system with
AbortSignal
-style tokens.Summary
I think we should probably move away from
Subscriptions
in the long term. I have been critical of the ergonomics of tokens vs subscriptions in the past, however given real-world implementations of things like we see above, I think that in those cases, subscriptions might actually be more complicated. (Think of other things, such as "Compound Subscriptions" aka "Parent Subscriptions" vs just using the same token for all subscriptions).I'll admit, tokens are still imperfect, there are rough edges around things like having more than one way to unsubscribe a subscription using a token. (as you'd have to create a token by "racing" two tokens, or something), but I think that's workable.
Short term, I think we should try to get every
Subscription
to create a signal/token of its own to thread through as seen above. Since we create a master "subscription" (in the form of a Subscriber, internally) at the beginning of every subscription, there is opportunity to provide this as an API), and we should start guiding people toward using these tokens when creating operators, particularly flattening operators as seen above.Alternatives
It's also plausible that we could do something avant-garde and literally use
Subscription
like a combinationAbortController/AbortSignal
(it does provide both pieces of functionality there and is arguably more ergonomic). However, that might be confusing. As the goal would be to get people to stop relying on the return value ofsubscribe
directly, and there would suddenly be subscriptions everywhere.Then again, this migration might be confusing either way.
Beta Was this translation helpful? Give feedback.
All reactions