Replies: 6 comments 10 replies
-
Maybe it's time to think about integration with Subscription will not happen: const signal = AbortSignal.abort();
range(0, Infinity).subscribe({
next: (value) => {
console.log('next', value)
},
signal,
}); Limit infinity const controller = new AbortController();
range(0, Infinity).subscribe({
next: (value) => {
if (value >= 5) {
controller.abort();
}
console.log('next', value);
},
signal: controller.signal,
}); Custom operator: export function myOperator<T>(...args: any[]) {
return (source: Observable<T>) => new Observable(
subscriber => {
source.subscribe({
next: (value) => {
subscriber.next(value + 1)
},
complete: () => {
subscriber.next('this is about to complete');
subscriber.complete();
},
error: (error) => {
subscriber.error(error);
},
signal: subscriber.signal
})
}
);
} Better integration with native APIs: // Polling 10 seconds, last fetch will be cancelled
defer((signal) => fetch('/foo', {signal}).then((response) => response.json()))
.pipe(repeat(), takeUntil(timer(10_000)))
.subscribe((result) => {
console.log(result);
}); source$
.pipe(
switchMap((body, signal) =>
// the previous fetch will be correctly canceled on each new call to the handler
fetch('/foo', {body, signal}).then((response) => response.json()),
),
)
.subscribe((result) => {
console.log(result);
});
function fromEvent<T>(target: EventTarget, eventName: string): Observable<T> {
return new Observable<T>((subscriber) => {
target.addEventListener(
eventName,
(event) => {
subscriber.next(event as T);
},
{signal: subscriber.signal},
);
});
} Integration with Node.js: import type {FileChangeInfo} from 'node:fs/promises';
import {watch as nodeWatch} from 'node:fs/promises';
import type {Observable} from 'rxjs';
import {defer, takeUntil, timer} from 'rxjs';
function watch(file: string): Observable<FileChangeInfo<Buffer>> {
return defer((signal) => nodeWatch(file, {signal}));
}
watch('foo.txt')
.pipe(takeUntil(timer(200)))
.subscribe((info) => {
console.log('info', info);
}); |
Beta Was this translation helpful? Give feedback.
-
I've been kinda naughty in some of my projects and I've been using I'm wondering how will the proposed API help avoid using const someReallyCustomShareThatDoesMoreThingsThanThisImplementation =
<T>() =>
(source$: Observable<T>) => {
let subject: Subject<T> | null = null;
let sourceSubscriber: Subscriber<T> | null = null;
let refCount = 0;
return new Observable<T>(subscriber => {
refCount++
let innerSub: Subscription
subscriber.add(() => {
refCount--;
innerSub.unsubscribe();
if (refCount === 0) {
sourceSubscriber?.unsubscribe();
subject?.complete();
subject = null;
sourceSubscriber = null;
}
})
if (!subject) {
subject = new Subject<T>();
innerSub = subject.subscribe(subscriber);
sourceSubscriber = new Subscriber<T>({
next: (value: T) => {
subject.next(value)
},
error: (err: any) => {
sourceSubscriber = null
subject.error(err)
subject = null
},
complete: () => {
sourceSubscriber = null
subject.complete()
subject = null
},
})
source$.subscribe(sourceSubscriber)
} else {
innerSub = subject.subscribe(subscriber);
}
})
}; The test that breaks if we try to use user-land observer + returned value of When using Without using Am I right thinking that this API will help solve this, by creating a Essentially like this: const someReallyCustomShareThatDoesMoreThingsThanThisImplementation =
<T>() =>
(source$: Observable<T>) => {
let subject: Subject<T> | null = null;
let ownerSubscription: Subscription | null = null;
let refCount = 0;
return new Observable<T>(subscriber => {
refCount++
let innerSub: Subscription
subscriber.add(() => {
refCount--;
innerSub.unsubscribe();
if (refCount === 0) {
ownerSubscription?.unsubscribe();
subject?.complete();
subject = null;
ownerSubscription = null;
}
})
if (!subject) {
subject = new Subject<T>();
innerSub = subject.subscribe(subscriber);
ownerSubscription = new Subscription();
source$.subscribe({
next: (value: T) => {
subject.next(value)
},
error: (err: any) => {
ownerSubscription = null
subject.error(err)
subject = null
},
complete: () => {
ownerSubscription = null
subject.complete()
subject = null
},
}, {
owner: ownerSubscription
})
} else {
innerSub = subject.subscribe(subscriber);
}
})
}; If so, I'm really happy of this change. It puts an end of having to use some RxJS internals to build some of these operators. |
Beta Was this translation helpful? Give feedback.
-
I really, really, really like the direction that this RFC is taking. However, I would like to go one step further and make interface Observer<T> {
next(value: T): void;
error(err: any): void;
complete(): void;
finalize(): void; // NEW!
owner: Subscription | Subscriber<any>; // NEW!
}
interface Observable<T> {
subscribe(
handler: Partial<Observer<T>>,
): Subscription
subscribe(
handler: (value: T) => void,
owner?: Subscription | Subscriber<any>
): Subscription
// (omitting other methods, etc)
}
I think that this would be a mistake. Having a "hotchpotch" options object "just in case" can be very problematic IMO. The reason being that it can easily introduce breaking changes as backwards compatible changes. For instance, lets say that in the future a |
Beta Was this translation helpful? Give feedback.
-
Overall I like it, I think we we may be able to make the DX slightly cleaner for custom operators by having some factory like so: const operator = <
TSubscriber = unknown,
TSource = unknown,
Args extends readonly unknown[] = unknown[]
>(
callback: (
subscriber: Subscriber<TSubscriber>,
...args: Args
) => Partial<Observer<TSource>>
): ((
...args: Args
) => (source: Observable<TSource>) => Observable<TSubscriber>) => {
return (...args: Args) => {
return (source: Observable<TSource>) => {
return new Observable<TSubscriber>((subscriber) => {
const observer = callback(subscriber, ...args);
// Soon: source.subscribe(observer, { owner: subscriber });
source.subscribe(observer);
});
};
};
}; Usage on what you had before would look something like like this: // Initial Proposed API
function myOperator<T>(...args: any[]) {
return (source: Observable<T>) =>
new Observable((subscriber) => {
source.subscribe(
{
next: (value) => {
// override next here
subscriber.next((value as number) + 1);
},
complete: () => {
// override complete here
subscriber.next('this is about to complete');
subscriber.complete();
},
},
{
owner: subscriber,
}
);
});
}
// Using the operator function
const myOtherOperator = operator((subscriber, ...args: any[]) => ({
next: (value) => {
// override next here
subscriber.next((value as number) + 1);
},
complete: () => {
// override complete here
subscriber.next('this is about to complete');
subscriber.complete();
},
})); We can even get this typed fairly well: const maxSizeValidateOperator = operator(
(subscriber: Subscriber<boolean>, maxSize: number) => {
return {
next: (value: string) => {
// override next here
subscriber.next(value.length <= maxSize);
},
complete: () => {
// override complete here
subscriber.next(false);
subscriber.complete();
},
};
}
);
of('cat', 'person', 'dinosaur', 'goat')
.pipe(maxSizeValidateOperator(5))
.subscribe((value) => console.log({ value })); // value is boolean
of(1, 2, 3)
.pipe(maxSizeValidateOperator(5)) // Error: 'number' is not assignable to 'string'
.subscribe((value) => console.log({ value }));
of()
.pipe(maxSizeValidateOperator('bad')) // Error: 'string' is not assignable to 'number'
.subscribe((value) => console.log({ value })); We can also have the factory be curried so you don't need to smoosh the subscriber with the args if that's something that we want to do. e.g. const maxSizeValidateOperator = operator(
(subscriber: Subscriber<boolean>) => (maxSize: number) => {
return {
next: (value: string) => {
// override next here
subscriber.next(value.length <= maxSize);
},
complete: () => {
// override complete here
subscriber.next(false);
subscriber.complete();
},
};
}
); But I'm leaning towards the non curried version. Link to stackblitz |
Beta Was this translation helpful? Give feedback.
-
After some thought I'm pulling back from this proposal. The primary reason is that it doesn't align with any other observable implementations available, and we just went through some efforts to try to make RxJS operators somewhat "observable agnostic". As in, if there's another simplified observable implementation or something lands in a platform, RxJS operators should "just work" with them. |
Beta Was this translation helpful? Give feedback.
-
Closed for new proposal here: #7253 |
Beta Was this translation helpful? Give feedback.
-
(NOTE TO READERS: I have some spikes already in-flight on this, just calling that out so no one wastes time trying to implement it)
Summary Of Proposal
finalize
handler toObserver<T>
so that all consumer subscriptions can utilize it.Observable#subscribe
with anowner
property that accepts:Subscription
that controls the resulting subscription (similar to a cancelation token)Subscriber
that not only controls the subscription, but also acts as a "destination" for creating operators.Problem space
Different approaches externally and internally
Externally we tell people do basically do this:
Internally, we're doing something like this:
We would never want to productize that for general consumption. It's nonsense for the most part. You can see we're using the argument order to "know" what is what. This is done for size optimization mostly. The arguments are in this order: parent subscriber (the observation and subscription we want to chain), a next handler override, a complete handler override, an error handler override, and a finalization function (behaves like the
finalize
operator, in a sense).The reason we needed to do things differently
The problem comes from a corner case where an operator that is subscribing to "inner observables" happens to subscribe to an inner observable that is synchronous, and then have another operator down the chain that wants to terminate the resulting subscriptoin.
Consider this scenario:
If
concatMap
was implemented naively, as we currently tell external users of RxJS to implement custom operators, they'd lock up their thread.This is because
range(0, Infinity)
is a "synchronous firehose", meaning it will synchronously emit a LOT of values (an infinite number, in fact). And it should be okay, if theconcatMap
implementation connected it's subscriptions in advance, because thetake(3)
wouldclose
the subscription and end the emission of values from that infinite range. However, if the subscriptions aren't chained in advance, it will terminate the final subscription, but therange(0, Infinity)
will continue to loop and try to emit values.This is because of an inherent disadvantage to
subscribe
returningSubscription
: It has to wait for thesubscribe
call to finish its work in order to return. Which means you're not able to cancel that child subscription until after you have a handle to it... A handle you will never get.RxJS gets around this problem by providing "destination" subscriber to a newly created operator subscriber. Subscribers are the marriage of a subscription and the observer. When you provide a destination that is a subscription, the subscriptions are automatically chained.
For a variety of reasons, this API has not really been exposed in a meaningful, documented way for public consumption. But we want to make sure people can develop their operators in a way that will always work properly. It's also a bonus if they can use OUR operators as a reference because they're implemented in the same way.
Proposal: Change RxJS to create operators the same way as everyone else
This work is already underway for version 8. Very soon, you'll start to see that reflected. Basically all operators will be made with something like:
However, there are a few things we'd need to make sure we were able to do with the public API cleanly before this could work.
1. We need to add a
finalize
handler to ourObserver<T>
thatObservable#subscribe()
takes.I think this will be a welcome addition. This is necessary to make it more ergonomic and terse to run clean up for operators. Looking through our code base will show how this is needed. Basically what this would do will be fire when the subscription is finalized.
Basically this:
2. Allow a second configuration argument to
subscribe
to pass a parent subscriptionThis will be great for us in a lot of ways. Since in version 8, we only have one argument to
subscribe
, which must be either a function (next handler), or an object (the observer), that frees us up to have a configuration/options object as the second argument.We want to do a configuration/options object so we can have named arguments, and non-breaking flexible API to add/change features in that area in the future.
This parent subscription would effectively be a cancellation token (and it's controller) in a way, but also, it would provide for us an API to chain Subscriptions and Subscribers.
If a Subscription is passed as the
owner
This means that Subscription is the "parent" and owns the created/returned subscription. When the owner is unsubscribed, the created subscription is also unsubscribed. In effect, this will work like a cancellation token, but will provide no other semantic difference to current subscription.
If a Subscriber is passed as the
owner
This means that the subscriber is the "parent subscription" now (
Subscriber
is both anObserver
and aSubscription
).It ALSO means that the observer passed to subscribe will act like an override or interceptor for passing values to that owner Subscriber. For an example of how this will work, have a look at OperatorSubscriber in the current code base.
Safety
For the time being, these things will be determined by
instanceof
checks, to ensure we're dealing with compatible instances ofSubscription
orSubscriber
. If theSubscriber
orSubscription
passed to theowner
property of the subscribe options is not aninstanceof Subscriber
orinstanceof Subscription
, aTypeError
will be thrown immediately. The only reason this could happen is if a developer is mixing instances from multiple bundled copies of RxJS, or if they're trying to literally pass the wrong object.Alternatively, we may look into using a
Symbol
to "get" a known Subscriber or Subscription from whatever is being passed as anowner
. This warrants investigation.Proposed API
General use as a cancellation token
And usage in a general, public sense could be like so to get an cancellation token effect (similar to an AbortSignal):
Use for chaining subscribers (in operators)
This is clearly a contrived example, but here
Use Cases
Just as an example here are a before and after of a simple custom "map" operator:
RxJS 5.5 - 7.x:
RxJS 8.x (proposed):
Alternatives Considered
Alternative 1: The
start
event withSubscription
The
start
orsubscribe
handler onObserver
that supplies theSubscription
as soon as the subscription starts. This one isn't too bad, because we already have some precedent with asubscribe
handler intap
. There's also some preceding design discussion in the TC39 observable proposal where this was considered, and I remember some of those conversations.It looks like this:
This solves the problem of needing to get access to the subscription before anything is emitted from the source... However it's gross and/or doesn't solve for everything:
subscribe
on the source is going to synchronously trigger that prior to anything.try/catch
anderror: err => subscriber.error(err)
type boilerplate would still need to be done.Beta Was this translation helpful? Give feedback.
All reactions