Skip to content

Commit

Permalink
refactor: add createOperatorSubscriber abstraction (ReactiveX#6804)
Browse files Browse the repository at this point in the history
Simply hides the creation of `new OperatorSubscriber` in a function called `createOperatorSubscriber`.

This is part of a larger plan to:

1. Productize our internal means of creating operators so other folks can use it. (this is just the first step, I do not plan on exposing `createOperatorSubscriber` at this time). The idea is to move away from using classes all over our operators for both minification reasons and because then later, when we productize things, we don't have to export classes that people will try to subclass.
2. Make it easier to refactor the class underpinning our operators by abstracting it away

NOTE: This does not attempt to change anything about the special case in `groupBy`, which will require some thought.
  • Loading branch information
benlesh authored Feb 8, 2022
1 parent 123a0f2 commit 211e6ff
Show file tree
Hide file tree
Showing 62 changed files with 174 additions and 150 deletions.
4 changes: 2 additions & 2 deletions src/internal/observable/ConnectableObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Observable } from '../Observable';
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { refCount as higherOrderRefCount } from '../operators/refCount';
import { OperatorSubscriber } from '../operators/OperatorSubscriber';
import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
import { hasLift } from '../util/lift';

/**
Expand Down Expand Up @@ -70,7 +70,7 @@ export class ConnectableObservable<T> extends Observable<T> {
const subject = this.getSubject();
connection.add(
this.source.subscribe(
new OperatorSubscriber(
createOperatorSubscriber(
subject as any,
undefined,
() => {
Expand Down
4 changes: 2 additions & 2 deletions src/internal/observable/combineLatest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { Subscription } from '../Subscription';
import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs';
import { popResultSelector, popScheduler } from '../util/args';
import { createObject } from '../util/createObject';
import { OperatorSubscriber } from '../operators/OperatorSubscriber';
import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
import { AnyCatcher } from '../AnyCatcher';
import { executeSchedule } from '../util/executeSchedule';

Expand Down Expand Up @@ -256,7 +256,7 @@ export function combineLatestInit(
const source = from(observables[i], scheduler as any);
let hasFirstValue = false;
source.subscribe(
new OperatorSubscriber(
createOperatorSubscriber(
subscriber,
(value) => {
// When we get a value, record it in our set of values.
Expand Down
4 changes: 2 additions & 2 deletions src/internal/observable/dom/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { OperatorSubscriber } from '../../operators/OperatorSubscriber';
import { createOperatorSubscriber } from '../../operators/OperatorSubscriber';
import { Observable } from '../../Observable';
import { innerFrom } from '../../observable/innerFrom';
import { ObservableInput } from '../../types';
Expand Down Expand Up @@ -151,7 +151,7 @@ export function fromFetch<T>(
// Note that any error that comes from our selector will be
// sent to the promise `catch` below and handled.
innerFrom(selector(response)).subscribe(
new OperatorSubscriber(
createOperatorSubscriber(
subscriber,
// Values are passed through to the subscriber
undefined,
Expand Down
4 changes: 2 additions & 2 deletions src/internal/observable/forkJoin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { ObservedValueOf, ObservableInputTuple, ObservableInput } from '../types
import { argsArgArrayOrObject } from '../util/argsArgArrayOrObject';
import { innerFrom } from './innerFrom';
import { popResultSelector } from '../util/args';
import { OperatorSubscriber } from '../operators/OperatorSubscriber';
import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs';
import { createObject } from '../util/createObject';
import { AnyCatcher } from '../AnyCatcher';
Expand Down Expand Up @@ -159,7 +159,7 @@ export function forkJoin(...args: any[]): Observable<any> {
for (let sourceIndex = 0; sourceIndex < length; sourceIndex++) {
let hasValue = false;
innerFrom(sources[sourceIndex]).subscribe(
new OperatorSubscriber(
createOperatorSubscriber(
subscriber,
(value) => {
if (!hasValue) {
Expand Down
4 changes: 2 additions & 2 deletions src/internal/observable/race.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { innerFrom } from './innerFrom';
import { Subscription } from '../Subscription';
import { ObservableInput, ObservableInputTuple } from '../types';
import { argsOrArgArray } from '../util/argsOrArgArray';
import { OperatorSubscriber } from '../operators/OperatorSubscriber';
import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
import { Subscriber } from '../Subscriber';

export function race<T extends readonly unknown[]>(inputs: [...ObservableInputTuple<T>]): Observable<T[number]>;
Expand Down Expand Up @@ -70,7 +70,7 @@ export function raceInit<T>(sources: ObservableInput<T>[]) {
for (let i = 0; subscriptions && !subscriber.closed && i < sources.length; i++) {
subscriptions.push(
innerFrom(sources[i] as ObservableInput<T>).subscribe(
new OperatorSubscriber(subscriber, (value) => {
createOperatorSubscriber(subscriber, (value) => {
if (subscriptions) {
// We're still racing, but we won! So unsubscribe
// all other subscriptions that we have, except this one.
Expand Down
4 changes: 2 additions & 2 deletions src/internal/observable/zip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { ObservableInputTuple } from '../types';
import { innerFrom } from './innerFrom';
import { argsOrArgArray } from '../util/argsOrArgArray';
import { EMPTY } from './empty';
import { OperatorSubscriber } from '../operators/OperatorSubscriber';
import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
import { popResultSelector } from '../util/args';

export function zip<A extends readonly unknown[]>(sources: [...ObservableInputTuple<A>]): Observable<A>;
Expand Down Expand Up @@ -74,7 +74,7 @@ export function zip(...args: unknown[]): Observable<unknown> {
// access the related buffers and completion properties
for (let sourceIndex = 0; !subscriber.closed && sourceIndex < sources.length; sourceIndex++) {
innerFrom(sources[sourceIndex]).subscribe(
new OperatorSubscriber(
createOperatorSubscriber(
subscriber,
(value) => {
buffers[sourceIndex].push(value);
Expand Down
22 changes: 22 additions & 0 deletions src/internal/operators/OperatorSubscriber.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,27 @@
import { Subscriber } from '../Subscriber';

/**
* Creates an instance of an `OperatorSubscriber`.
* @param destination The downstream subscriber.
* @param onNext Handles next values, only called if this subscriber is not stopped or closed. Any
* error that occurs in this function is caught and sent to the `error` method of this subscriber.
* @param onError Handles errors from the subscription, any errors that occur in this handler are caught
* and send to the `destination` error handler.
* @param onComplete Handles completion notification from the subscription. Any errors that occur in
* this handler are sent to the `destination` error handler.
* @param onFinalize Additional teardown logic here. This will only be called on teardown if the
* subscriber itself is not already closed. This is called after all other teardown logic is executed.
*/
export function createOperatorSubscriber<T>(
destination: Subscriber<any>,
onNext?: (value: T) => void,
onComplete?: () => void,
onError?: (err: any) => void,
onFinalize?: () => void
): Subscriber<T> {
return new OperatorSubscriber(destination, onNext, onComplete, onError, onFinalize);
}

/**
* A generic helper for allowing operators to be created with a Subscriber and
* use closures to capture necessary state from the operator function itself.
Expand Down
6 changes: 3 additions & 3 deletions src/internal/operators/audit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { MonoTypeOperatorFunction, ObservableInput } from '../types';

import { operate } from '../util/lift';
import { innerFrom } from '../observable/innerFrom';
import { OperatorSubscriber } from './OperatorSubscriber';
import { createOperatorSubscriber } from './OperatorSubscriber';

/**
* Ignores source values for a duration determined by another Observable, then
Expand Down Expand Up @@ -75,14 +75,14 @@ export function audit<T>(durationSelector: (value: T) => ObservableInput<any>):
};

source.subscribe(
new OperatorSubscriber(
createOperatorSubscriber(
subscriber,
(value) => {
hasValue = true;
lastValue = value;
if (!durationSubscriber) {
innerFrom(durationSelector(value)).subscribe(
(durationSubscriber = new OperatorSubscriber(subscriber, endDuration, cleanupDuration))
(durationSubscriber = createOperatorSubscriber(subscriber, endDuration, cleanupDuration))
);
}
},
Expand Down
6 changes: 3 additions & 3 deletions src/internal/operators/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Observable } from '../Observable';
import { OperatorFunction } from '../types';
import { operate } from '../util/lift';
import { noop } from '../util/noop';
import { OperatorSubscriber } from './OperatorSubscriber';
import { createOperatorSubscriber } from './OperatorSubscriber';

/**
* Buffers the source Observable values until `closingNotifier` emits.
Expand Down Expand Up @@ -48,7 +48,7 @@ export function buffer<T>(closingNotifier: Observable<any>): OperatorFunction<T,

// Subscribe to our source.
source.subscribe(
new OperatorSubscriber(
createOperatorSubscriber(
subscriber,
(value) => currentBuffer.push(value),
() => {
Expand All @@ -60,7 +60,7 @@ export function buffer<T>(closingNotifier: Observable<any>): OperatorFunction<T,

// Subscribe to the closing notifier.
closingNotifier.subscribe(
new OperatorSubscriber(
createOperatorSubscriber(
subscriber,
() => {
// Start a new buffer and emit the previous one.
Expand Down
4 changes: 2 additions & 2 deletions src/internal/operators/bufferCount.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { OperatorFunction } from '../types';
import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';
import { createOperatorSubscriber } from './OperatorSubscriber';
import { arrRemove } from '../util/arrRemove';

/**
Expand Down Expand Up @@ -64,7 +64,7 @@ export function bufferCount<T>(bufferSize: number, startBufferEvery: number | nu
let count = 0;

source.subscribe(
new OperatorSubscriber(
createOperatorSubscriber(
subscriber,
(value) => {
let toEmit: T[][] | null = null;
Expand Down
4 changes: 2 additions & 2 deletions src/internal/operators/bufferTime.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Subscription } from '../Subscription';
import { OperatorFunction, SchedulerLike } from '../types';
import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';
import { createOperatorSubscriber } from './OperatorSubscriber';
import { arrRemove } from '../util/arrRemove';
import { asyncScheduler } from '../scheduler/async';
import { popScheduler } from '../util/args';
Expand Down Expand Up @@ -131,7 +131,7 @@ export function bufferTime<T>(bufferTimeSpan: number, ...otherArgs: any[]): Oper

startBuffer();

const bufferTimeSubscriber = new OperatorSubscriber(
const bufferTimeSubscriber = createOperatorSubscriber(
subscriber,
(value: T) => {
// Copy the records, so if we need to remove one we
Expand Down
8 changes: 4 additions & 4 deletions src/internal/operators/bufferToggle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Subscription } from '../Subscription';
import { OperatorFunction, ObservableInput } from '../types';
import { operate } from '../util/lift';
import { innerFrom } from '../observable/innerFrom';
import { OperatorSubscriber } from './OperatorSubscriber';
import { createOperatorSubscriber } from './OperatorSubscriber';
import { noop } from '../util/noop';
import { arrRemove } from '../util/arrRemove';

Expand Down Expand Up @@ -58,7 +58,7 @@ export function bufferToggle<T, O>(

// Subscribe to the openings notifier first
innerFrom(openings).subscribe(
new OperatorSubscriber(
createOperatorSubscriber(
subscriber,
(openValue) => {
const buffer: T[] = [];
Expand All @@ -74,14 +74,14 @@ export function bufferToggle<T, O>(
};

// The line below will add the subscription to the parent subscriber *and* the closing subscription.
closingSubscription.add(innerFrom(closingSelector(openValue)).subscribe(new OperatorSubscriber(subscriber, emitBuffer, noop)));
closingSubscription.add(innerFrom(closingSelector(openValue)).subscribe(createOperatorSubscriber(subscriber, emitBuffer, noop)));
},
noop
)
);

source.subscribe(
new OperatorSubscriber(
createOperatorSubscriber(
subscriber,
(value) => {
// Value from our source. Add it to all pending buffers.
Expand Down
6 changes: 3 additions & 3 deletions src/internal/operators/bufferWhen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Subscriber } from '../Subscriber';
import { ObservableInput, OperatorFunction } from '../types';
import { operate } from '../util/lift';
import { noop } from '../util/noop';
import { OperatorSubscriber } from './OperatorSubscriber';
import { createOperatorSubscriber } from './OperatorSubscriber';
import { innerFrom } from '../observable/innerFrom';

/**
Expand Down Expand Up @@ -66,15 +66,15 @@ export function bufferWhen<T>(closingSelector: () => ObservableInput<any>): Oper
b && subscriber.next(b);

// Get a new closing notifier and subscribe to it.
innerFrom(closingSelector()).subscribe((closingSubscriber = new OperatorSubscriber(subscriber, openBuffer, noop)));
innerFrom(closingSelector()).subscribe((closingSubscriber = createOperatorSubscriber(subscriber, openBuffer, noop)));
};

// Start the first buffer.
openBuffer();

// Subscribe to our source.
source.subscribe(
new OperatorSubscriber(
createOperatorSubscriber(
subscriber,
// Add every new value to the current buffer.
(value) => buffer?.push(value),
Expand Down
4 changes: 2 additions & 2 deletions src/internal/operators/catchError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Observable } from '../Observable';
import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types';
import { Subscription } from '../Subscription';
import { innerFrom } from '../observable/innerFrom';
import { OperatorSubscriber } from './OperatorSubscriber';
import { createOperatorSubscriber } from './OperatorSubscriber';
import { operate } from '../util/lift';

/* tslint:disable:max-line-length */
Expand Down Expand Up @@ -113,7 +113,7 @@ export function catchError<T, O extends ObservableInput<any>>(
let handledResult: Observable<ObservedValueOf<O>>;

innerSub = source.subscribe(
new OperatorSubscriber(subscriber, undefined, undefined, (err) => {
createOperatorSubscriber(subscriber, undefined, undefined, (err) => {
handledResult = innerFrom(selector(err, catchError(selector)(source)));
if (innerSub) {
innerSub.unsubscribe();
Expand Down
6 changes: 3 additions & 3 deletions src/internal/operators/debounce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Subscriber } from '../Subscriber';
import { MonoTypeOperatorFunction, ObservableInput } from '../types';
import { operate } from '../util/lift';
import { noop } from '../util/noop';
import { OperatorSubscriber } from './OperatorSubscriber';
import { createOperatorSubscriber } from './OperatorSubscriber';
import { innerFrom } from '../observable/innerFrom';

/**
Expand Down Expand Up @@ -86,7 +86,7 @@ export function debounce<T>(durationSelector: (value: T) => ObservableInput<any>
};

source.subscribe(
new OperatorSubscriber(
createOperatorSubscriber(
subscriber,
(value: T) => {
// Cancel any pending debounce duration. We don't
Expand All @@ -97,7 +97,7 @@ export function debounce<T>(durationSelector: (value: T) => ObservableInput<any>
lastValue = value;
// Capture our duration subscriber, so we can unsubscribe it when we're notified
// and we're going to emit the value.
durationSubscriber = new OperatorSubscriber(subscriber, emit, noop);
durationSubscriber = createOperatorSubscriber(subscriber, emit, noop);
// Subscribe to the duration.
innerFrom(durationSelector(value)).subscribe(durationSubscriber);
},
Expand Down
4 changes: 2 additions & 2 deletions src/internal/operators/debounceTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { asyncScheduler } from '../scheduler/async';
import { Subscription } from '../Subscription';
import { MonoTypeOperatorFunction, SchedulerAction, SchedulerLike } from '../types';
import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';
import { createOperatorSubscriber } from './OperatorSubscriber';

/**
* Emits a notification from the source Observable only after a particular time span
Expand Down Expand Up @@ -94,7 +94,7 @@ export function debounceTime<T>(dueTime: number, scheduler: SchedulerLike = asyn
}

source.subscribe(
new OperatorSubscriber(
createOperatorSubscriber(
subscriber,
(value: T) => {
lastValue = value;
Expand Down
4 changes: 2 additions & 2 deletions src/internal/operators/defaultIfEmpty.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { OperatorFunction } from '../types';
import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';
import { createOperatorSubscriber } from './OperatorSubscriber';

/**
* Emits a given value if the source Observable completes without emitting any
Expand Down Expand Up @@ -41,7 +41,7 @@ export function defaultIfEmpty<T, R>(defaultValue: R): OperatorFunction<T, T | R
return operate((source, subscriber) => {
let hasValue = false;
source.subscribe(
new OperatorSubscriber(
createOperatorSubscriber(
subscriber,
(value) => {
hasValue = true;
Expand Down
4 changes: 2 additions & 2 deletions src/internal/operators/dematerialize.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { observeNotification } from '../Notification';
import { OperatorFunction, ObservableNotification, ValueFromNotification } from '../types';
import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';
import { createOperatorSubscriber } from './OperatorSubscriber';

/**
* Converts an Observable of {@link ObservableNotification} objects into the emissions
Expand Down Expand Up @@ -53,6 +53,6 @@ import { OperatorSubscriber } from './OperatorSubscriber';
*/
export function dematerialize<N extends ObservableNotification<any>>(): OperatorFunction<N, ValueFromNotification<N>> {
return operate((source, subscriber) => {
source.subscribe(new OperatorSubscriber(subscriber, (notification) => observeNotification(notification, subscriber)));
source.subscribe(createOperatorSubscriber(subscriber, (notification) => observeNotification(notification, subscriber)));
});
}
Loading

0 comments on commit 211e6ff

Please sign in to comment.