Skip to content

Commit

Permalink
refactor(subscriber): Make a derived Subscribers' internal methods …
Browse files Browse the repository at this point in the history
…protected
  • Loading branch information
tetsuharuohzeki authored and kwonoj committed Jan 11, 2016
1 parent e7b2eb3 commit 41d39e2
Show file tree
Hide file tree
Showing 62 changed files with 147 additions and 138 deletions.
6 changes: 3 additions & 3 deletions src/observable/ConnectableObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,16 @@ class RefCountSubscriber<T> extends Subscriber<T> {
destination.add(this);
}

_next(value: T) {
protected _next(value: T) {
this.destination.next(value);
}

_error(err: any) {
protected _error(err: any) {
this._resetConnectable();
this.destination.error(err);
}

_complete() {
protected _complete() {
this._resetConnectable();
this.destination.complete();
}
Expand Down
4 changes: 2 additions & 2 deletions src/observable/forkJoin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ class AllSubscriber<T> extends Subscriber<T> {
super(destination);
}

_next(value: T): void {
protected _next(value: T): void {
this._value = value;
}

_complete(): void {
protected _complete(): void {
const destination = this.destination;

if (this._value == null) {
Expand Down
2 changes: 1 addition & 1 deletion src/operator/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class BufferSubscriber<T, R> extends OuterSubscriber<T, R> {
this.add(subscribeToResult(this, closingNotifier));
}

_next(value: T) {
protected _next(value: T) {
this.buffer.push(value);
}

Expand Down
4 changes: 2 additions & 2 deletions src/operator/bufferCount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class BufferCountSubscriber<T> extends Subscriber<T> {
super(destination);
}

_next(value: T) {
protected _next(value: T) {
const count = (this.count += 1);
const destination = this.destination;
const bufferSize = this.bufferSize;
Expand All @@ -66,7 +66,7 @@ class BufferCountSubscriber<T> extends Subscriber<T> {
}
}

_complete() {
protected _complete() {
const destination = this.destination;
const buffers = this.buffers;
while (buffers.length > 0) {
Expand Down
6 changes: 3 additions & 3 deletions src/operator/bufferTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,20 @@ class BufferTimeSubscriber<T> extends Subscriber<T> {
}
}

_next(value: T) {
protected _next(value: T) {
const buffers = this.buffers;
const len = buffers.length;
for (let i = 0; i < len; i++) {
buffers[i].push(value);
}
}

_error(err: any) {
protected _error(err: any) {
this.buffers.length = 0;
super._error(err);
}

_complete() {
protected _complete() {
const { buffers, destination } = this;
while (buffers.length > 0) {
destination.next(buffers.shift());
Expand Down
18 changes: 9 additions & 9 deletions src/operator/bufferToggle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> {
this.add(this.openings.subscribe(new BufferToggleOpeningsSubscriber(this)));
}

_next(value: T) {
protected _next(value: T) {
const contexts = this.contexts;
const len = contexts.length;
for (let i = 0; i < len; i++) {
contexts[i].buffer.push(value);
}
}

_error(err: any) {
protected _error(err: any) {
const contexts = this.contexts;
while (contexts.length > 0) {
const context = contexts.shift();
Expand All @@ -70,7 +70,7 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> {
super._error(err);
}

_complete() {
protected _complete() {
const contexts = this.contexts;
while (contexts.length > 0) {
const context = contexts.shift();
Expand Down Expand Up @@ -121,15 +121,15 @@ class BufferToggleOpeningsSubscriber<T, O> extends Subscriber<O> {
super(null);
}

_next(value: O) {
protected _next(value: O) {
this.parent.openBuffer(value);
}

_error(err: any) {
protected _error(err: any) {
this.parent.error(err);
}

_complete() {
protected _complete() {
// noop
}
}
Expand All @@ -140,15 +140,15 @@ class BufferToggleClosingsSubscriber<T> extends Subscriber<any> {
super(null);
}

_next() {
protected _next() {
this.parent.closeBuffer(this.context);
}

_error(err: any) {
protected _error(err: any) {
this.parent.error(err);
}

_complete() {
protected _complete() {
this.parent.closeBuffer(this.context);
}
}
4 changes: 2 additions & 2 deletions src/operator/bufferWhen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ class BufferWhenSubscriber<T, R> extends OuterSubscriber<T, R> {
this.openBuffer();
}

_next(value: T) {
protected _next(value: T) {
this.buffer.push(value);
}

_complete() {
protected _complete() {
const buffer = this.buffer;
if (buffer) {
this.destination.next(buffer);
Expand Down
4 changes: 2 additions & 2 deletions src/operator/combineLatest-support.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ export class CombineLatestSubscriber<T, R> extends OuterSubscriber<T, R> {
super(destination);
}

_next(observable: any) {
protected _next(observable: any) {
const toRespond = this.toRespond;
toRespond.push(toRespond.length);
this.observables.push(observable);
}

_complete() {
protected _complete() {
const observables = this.observables;
const len = observables.length;
if (len === 0) {
Expand Down
4 changes: 2 additions & 2 deletions src/operator/count.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class CountSubscriber<T> extends Subscriber<T> {
super(destination);
}

_next(value: T): void {
protected _next(value: T): void {
const predicate = this.predicate;
let passed: any = true;
if (predicate) {
Expand All @@ -58,7 +58,7 @@ class CountSubscriber<T> extends Subscriber<T> {
}
}

_complete(): void {
protected _complete(): void {
this.destination.next(this.count);
this.destination.complete();
}
Expand Down
4 changes: 2 additions & 2 deletions src/operator/debounce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class DebounceSubscriber<T, R> extends OuterSubscriber<T, R> {
super(destination);
}

_next(value: T) {
protected _next(value: T) {
let subscription = this.durationSubscription;
const duration = tryCatch(this.durationSelector)(value);

Expand All @@ -63,7 +63,7 @@ class DebounceSubscriber<T, R> extends OuterSubscriber<T, R> {
}
}

_complete() {
protected _complete() {
this.emitValue();
this.destination.complete();
}
Expand Down
4 changes: 2 additions & 2 deletions src/operator/debounceTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ class DebounceTimeSubscriber<T> extends Subscriber<T> {
super(destination);
}

_next(value: T) {
protected _next(value: T) {
this.clearDebounce();
this.lastValue = value;
this.hasValue = true;
this.add(this.debouncedSubscription = this.scheduler.schedule(dispatchNext, this.dueTime, this));
}

_complete() {
protected _complete() {
this.debouncedNext();
this.destination.complete();
}
Expand Down
4 changes: 2 additions & 2 deletions src/operator/defaultIfEmpty.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ class DefaultIfEmptySubscriber<T, R> extends Subscriber<T> {
super(destination);
}

_next(value: T): void {
protected _next(value: T): void {
this.isEmpty = false;
this.destination.next(value);
}

_complete(): void {
protected _complete(): void {
if (this.isEmpty) {
this.destination.next(this.defaultValue);
}
Expand Down
6 changes: 3 additions & 3 deletions src/operator/delay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,17 @@ class DelaySubscriber<T> extends Subscriber<T> {
}
}

_next(value: T) {
protected _next(value: T) {
this.scheduleNotification(Notification.createNext(value));
}

_error(err: any) {
protected _error(err: any) {
this.errored = true;
this.queue = [];
this.destination.error(err);
}

_complete() {
protected _complete() {
this.scheduleNotification(Notification.createComplete());
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/operator/dematerialize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class DeMaterializeSubscriber<T extends Notification<any>> extends Subscriber<T>
super(destination);
}

_next(value: T) {
protected _next(value: T) {
value.observe(this.destination);
}
}
2 changes: 1 addition & 1 deletion src/operator/distinctUntilChanged.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class DistinctUntilChangedSubscriber<T, K> extends Subscriber<T> {
return x === y;
}

_next(value: T): void {
protected _next(value: T): void {

const keySelector = this.keySelector;
let key: any = value;
Expand Down
6 changes: 3 additions & 3 deletions src/operator/do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class DoSubscriber<T> extends Subscriber<T> {
this.__complete = complete;
}

_next(x: T) {
protected _next(x: T) {
const result = tryCatch(this.__next)(x);
if (result === errorObject) {
this.destination.error(errorObject.e);
Expand All @@ -67,7 +67,7 @@ class DoSubscriber<T> extends Subscriber<T> {
}
}

_error(e: any) {
protected _error(e: any) {
const result = tryCatch(this.__error)(e);
if (result === errorObject) {
this.destination.error(errorObject.e);
Expand All @@ -76,7 +76,7 @@ class DoSubscriber<T> extends Subscriber<T> {
}
}

_complete() {
protected _complete() {
const result = tryCatch(this.__complete)();
if (result === errorObject) {
this.destination.error(errorObject.e);
Expand Down
4 changes: 2 additions & 2 deletions src/operator/elementAt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ class ElementAtSubscriber<T> extends Subscriber<T> {
super(destination);
}

_next(x: T) {
protected _next(x: T) {
if (this.index-- === 0) {
this.destination.next(x);
this.destination.complete();
}
}

_complete() {
protected _complete() {
const destination = this.destination;
if (this.index >= 0) {
if (typeof this.defaultValue !== 'undefined') {
Expand Down
4 changes: 2 additions & 2 deletions src/operator/every.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class EverySubscriber<T, R> extends Subscriber<T> {
this.destination.complete();
}

_next(value: T): void {
protected _next(value: T): void {
const result = tryCatch(this.predicate).call(this.thisArg || this, value, this.index++, this.source);

if (result === errorObject) {
Expand All @@ -75,7 +75,7 @@ class EverySubscriber<T, R> extends Subscriber<T> {
}
}

_complete(): void {
protected _complete(): void {
this.notifyComplete(true);
}
}
4 changes: 2 additions & 2 deletions src/operator/exhaust.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ class SwitchFirstSubscriber<T> extends OuterSubscriber<T, T> {
super(destination);
}

_next(value: T): void {
protected _next(value: T): void {
if (!this.hasSubscription) {
this.hasSubscription = true;
this.add(subscribeToResult(this, value));
}
}

_complete(): void {
protected _complete(): void {
this.hasCompleted = true;
if (!this.hasSubscription) {
this.destination.complete();
Expand Down
4 changes: 2 additions & 2 deletions src/operator/exhaustMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class SwitchFirstMapSubscriber<T, R, R2> extends OuterSubscriber<T, R> {
super(destination);
}

_next(value: T): void {
protected _next(value: T): void {
if (!this.hasSubscription) {
const index = this.index++;
const destination = this.destination;
Expand All @@ -57,7 +57,7 @@ class SwitchFirstMapSubscriber<T, R, R2> extends OuterSubscriber<T, R> {
}
}

_complete(): void {
protected _complete(): void {
this.hasCompleted = true;
if (!this.hasSubscription) {
this.destination.complete();
Expand Down
Loading

0 comments on commit 41d39e2

Please sign in to comment.