Skip to content

Commit

Permalink
feat(OuterSubscriber): notifyNext passes innersubscriber when next emits
Browse files Browse the repository at this point in the history
- notifyNext() passes inner subscriber as same as notifyError(), notifyComplete() does, supports subscription management

closes ReactiveX#1250
  • Loading branch information
kwonoj committed Jan 27, 2016
1 parent 7004c26 commit 2d33f32
Show file tree
Hide file tree
Showing 23 changed files with 96 additions and 31 deletions.
8 changes: 4 additions & 4 deletions src/InnerSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ export class InnerSubscriber<T, R> extends Subscriber<R> {
super();
}

protected _next(value: R) {
this.parent.notifyNext(this.outerValue, value, this.outerIndex, this.index++);
protected _next(value: R): void {
this.parent.notifyNext(this.outerValue, value, this.outerIndex, this.index++, this);
}

protected _error(error: any) {
protected _error(error: any): void {
this.parent.notifyError(error, this);
this.unsubscribe();
}

protected _complete() {
protected _complete(): void {
this.parent.notifyComplete(this);
this.unsubscribe();
}
Expand Down
4 changes: 3 additions & 1 deletion src/OuterSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ import {Subscriber} from './Subscriber';
import {InnerSubscriber} from './InnerSubscriber';

export class OuterSubscriber<T, R> extends Subscriber<T> {
notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this.destination.next(innerValue);
}

Expand Down
5 changes: 4 additions & 1 deletion src/operator/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {Subscriber} from '../Subscriber';
import {Observable} from '../Observable';

import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

/**
Expand Down Expand Up @@ -44,7 +45,9 @@ class BufferSubscriber<T, R> extends OuterSubscriber<T, R> {
this.buffer.push(value);
}

notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
const buffer = this.buffer;
this.buffer = [];
this.destination.next(buffer);
Expand Down
5 changes: 4 additions & 1 deletion src/operator/bufferWhen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

/**
Expand Down Expand Up @@ -60,7 +61,9 @@ class BufferWhenSubscriber<T, R> extends OuterSubscriber<T, R> {
this.subscribing = false;
}

notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this.openBuffer();
}

Expand Down
7 changes: 5 additions & 2 deletions src/operator/combineLatest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {isScheduler} from '../util/isScheduler';
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

/**
Expand Down Expand Up @@ -116,9 +117,11 @@ export class CombineLatestSubscriber<T, R> extends OuterSubscriber<T, R> {
}
}

notifyNext(observable: any, value: R, outerIndex: number, innerIndex: number) {
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
const values = this.values;
values[outerIndex] = value;
values[outerIndex] = innerValue;
const toRespond = this.toRespond;

if (toRespond.length > 0) {
Expand Down
5 changes: 4 additions & 1 deletion src/operator/debounce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

/**
Expand Down Expand Up @@ -68,7 +69,9 @@ class DebounceSubscriber<T, R> extends OuterSubscriber<T, R> {
this.destination.complete();
}

notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this.emitValue();
}

Expand Down
5 changes: 4 additions & 1 deletion src/operator/exhaustMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {Subscriber} from '../Subscriber';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

/**
Expand Down Expand Up @@ -64,7 +65,9 @@ class SwitchFirstMapSubscriber<T, R, R2> extends OuterSubscriber<T, R> {
}
}

notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
const { resultSelector, destination } = this;
if (resultSelector) {
const result = tryCatch(resultSelector)(outerValue, innerValue, outerIndex, innerIndex);
Expand Down
5 changes: 4 additions & 1 deletion src/operator/expand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import {Subscription} from '../Subscription';
import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

/**
Expand Down Expand Up @@ -96,7 +97,9 @@ export class ExpandSubscriber<T, R> extends OuterSubscriber<T, R> {
}
}

notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this._next(innerValue);
}

Expand Down
7 changes: 5 additions & 2 deletions src/operator/mergeMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';
import {subscribeToResult} from '../util/subscribeToResult';
import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';

/**
* Returns an Observable that emits items based on applying a function that you supply to each item emitted by the
Expand Down Expand Up @@ -80,7 +81,9 @@ export class MergeMapSubscriber<T, R, R2> extends OuterSubscriber<T, R> {
}
}

notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
if (this.resultSelector) {
this._notifyResultSelector(outerValue, innerValue, outerIndex, innerIndex);
} else {
Expand Down Expand Up @@ -109,4 +112,4 @@ export class MergeMapSubscriber<T, R, R2> extends OuterSubscriber<T, R> {
this.destination.complete();
}
}
}
}
5 changes: 4 additions & 1 deletion src/operator/mergeMapTo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import {Subscription} from '../Subscription';
import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

export function mergeMapTo<T, R, R2>(observable: Observable<R>,
Expand Down Expand Up @@ -67,7 +68,9 @@ export class MergeMapToSubscriber<T, R, R2> extends OuterSubscriber<T, R> {
}
}

notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
const { resultSelector, destination } = this;
if (resultSelector) {
const result = tryCatch(resultSelector)(outerValue, innerValue, outerIndex, innerIndex);
Expand Down
5 changes: 4 additions & 1 deletion src/operator/mergeScan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import {subscribeToResult} from '../util/subscribeToResult';
import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';

export function mergeScan<T, R>(project: (acc: R, value: T) => Observable<R>,
seed: R,
Expand Down Expand Up @@ -70,7 +71,9 @@ export class MergeScanSubscriber<T, R> extends OuterSubscriber<T, R> {
}
}

notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
const { destination } = this;
this.acc = innerValue;
this.hasValue = true;
Expand Down
7 changes: 5 additions & 2 deletions src/operator/race.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';
import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

/**
Expand Down Expand Up @@ -79,7 +80,9 @@ export class RaceSubscriber<T, R> extends OuterSubscriber<T, R> {
}
}

notifyNext(observable: any, value: R, outerIndex: number): void {
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
if (!this.hasFirst) {
this.hasFirst = true;

Expand All @@ -95,6 +98,6 @@ export class RaceSubscriber<T, R> extends OuterSubscriber<T, R> {
this.subscriptions = null;
}

this.destination.next(value);
this.destination.next(innerValue);
}
}
5 changes: 4 additions & 1 deletion src/operator/retryWhen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

/**
Expand Down Expand Up @@ -92,7 +93,9 @@ class RetryWhenSubscriber<T, R> extends OuterSubscriber<T, R> {
this.retries = null;
}

notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {

const { errors, retries, retriesSubscription } = this;
this.errors = null;
Expand Down
5 changes: 4 additions & 1 deletion src/operator/sample.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';

import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

/**
Expand Down Expand Up @@ -43,7 +44,9 @@ class SampleSubscriber<T, R> extends OuterSubscriber<T, R> {
this.hasValue = true;
}

notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this.emitValue();
}

Expand Down
5 changes: 4 additions & 1 deletion src/operator/skipUntil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {Subscriber} from '../Subscriber';
import {Observable} from '../Observable';

import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

/**
Expand Down Expand Up @@ -53,7 +54,9 @@ class SkipUntilSubscriber<T, R> extends OuterSubscriber<T, R> {
}
}

notifyNext(): void {
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this.hasValue = true;
}

Expand Down
5 changes: 4 additions & 1 deletion src/operator/switch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';
import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

/**
Expand Down Expand Up @@ -60,7 +61,9 @@ class SwitchSubscriber<T, R> extends OuterSubscriber<T, R> {
}
}

notifyNext(outerValue: T, innerValue: any): void {
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this.destination.next(innerValue);
}

Expand Down
5 changes: 4 additions & 1 deletion src/operator/switchMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';
import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

/**
Expand Down Expand Up @@ -83,7 +84,9 @@ class SwitchMapSubscriber<T, R, R2> extends OuterSubscriber<T, R> {
}
}

notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
if (this.resultSelector) {
this._tryNotifyNext(outerValue, innerValue, outerIndex, innerIndex);
} else {
Expand Down
5 changes: 4 additions & 1 deletion src/operator/switchMapTo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {Subscription} from '../Subscription';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

export function switchMapTo<T, R, R2>(observable: Observable<R>,
Expand Down Expand Up @@ -63,7 +64,9 @@ class SwitchMapToSubscriber<T, R, R2> extends OuterSubscriber<T, R> {
}
}

notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) {
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
const { resultSelector, destination } = this;
if (resultSelector) {
const result = tryCatch(resultSelector)(outerValue, innerValue, outerIndex, innerIndex);
Expand Down
5 changes: 4 additions & 1 deletion src/operator/takeUntil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';

import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

export function takeUntil<T>(notifier: Observable<any>): Observable<T> {
Expand All @@ -26,7 +27,9 @@ class TakeUntilSubscriber<T, R> extends OuterSubscriber<T, R> {
this.add(subscribeToResult(this, notifier));
}

notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this.complete();
}

Expand Down
5 changes: 4 additions & 1 deletion src/operator/throttle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {Subscription} from '../Subscription';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

export function throttle<T>(durationSelector: (value: T) => Observable<number> | Promise<number>): Observable<T> {
Expand Down Expand Up @@ -50,7 +51,9 @@ class ThrottleSubscriber<T, R> extends OuterSubscriber<T, R> {
}
}

notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this._unsubscribe();
}

Expand Down
Loading

0 comments on commit 2d33f32

Please sign in to comment.