Skip to content

Commit

Permalink
Merge pull request #2826 from benlesh/lettables2
Browse files Browse the repository at this point in the history
More lettables
  • Loading branch information
benlesh authored Sep 7, 2017
2 parents 76668a9 + 2958917 commit 7bb8280
Show file tree
Hide file tree
Showing 12 changed files with 340 additions and 187 deletions.
2 changes: 1 addition & 1 deletion src/observable/combineLatest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { IScheduler } from '../Scheduler';
import { isScheduler } from '../util/isScheduler';
import { isArray } from '../util/isArray';
import { ArrayObservable } from './ArrayObservable';
import { CombineLatestOperator } from '../operator/combineLatest';
import { CombineLatestOperator } from '../operators/combineLatest';

/* tslint:disable:max-line-length */
export function combineLatest<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>, scheduler?: IScheduler): Observable<[T, T2]>;
Expand Down
5 changes: 3 additions & 2 deletions src/operator/combineAll.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { CombineLatestOperator } from './combineLatest';

import { Observable } from '../Observable';
import { combineAll as higherOrder } from '../operators';

/**
* Converts a higher-order Observable into a first-order Observable by waiting
Expand Down Expand Up @@ -42,5 +43,5 @@ import { Observable } from '../Observable';
* @owner Observable
*/
export function combineAll<T, R>(this: Observable<T>, project?: (...values: Array<any>) => R): Observable<R> {
return this.lift(new CombineLatestOperator(project));
return higherOrder(project)(this);
}
107 changes: 3 additions & 104 deletions src/operator/combineLatest.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
import { Observable, ObservableInput } from '../Observable';
import { ArrayObservable } from '../observable/ArrayObservable';
import { isArray } from '../util/isArray';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
const none = {};
import { combineLatest as higherOrder } from '../operators';

/* tslint:disable:max-line-length */
export function combineLatest<T, R>(this: Observable<T>, project: (v1: T) => R): Observable<R>;
Expand Down Expand Up @@ -71,99 +64,5 @@ export function combineLatest<T, TOther, R>(this: Observable<T>, array: Observab
export function combineLatest<T, R>(this: Observable<T>, ...observables: Array<ObservableInput<any> |
Array<ObservableInput<any>> |
((...values: Array<any>) => R)>): Observable<R> {
let project: (...values: Array<any>) => R = null;
if (typeof observables[observables.length - 1] === 'function') {
project = <(...values: Array<any>) => R>observables.pop();
}

// if the first and only other argument besides the resultSelector is an array
// assume it's been called with `combineLatest([obs1, obs2, obs3], project)`
if (observables.length === 1 && isArray(observables[0])) {
observables = (<any>observables[0]).slice();
}

observables.unshift(this);

return this.lift.call(new ArrayObservable(observables), new CombineLatestOperator(project));
}

export class CombineLatestOperator<T, R> implements Operator<T, R> {
constructor(private project?: (...values: Array<any>) => R) {
}

call(subscriber: Subscriber<R>, source: any): any {
return source.subscribe(new CombineLatestSubscriber(subscriber, this.project));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
export class CombineLatestSubscriber<T, R> extends OuterSubscriber<T, R> {
private active: number = 0;
private values: any[] = [];
private observables: any[] = [];
private toRespond: number;

constructor(destination: Subscriber<R>, private project?: (...values: Array<any>) => R) {
super(destination);
}

protected _next(observable: any) {
this.values.push(none);
this.observables.push(observable);
}

protected _complete() {
const observables = this.observables;
const len = observables.length;
if (len === 0) {
this.destination.complete();
} else {
this.active = len;
this.toRespond = len;
for (let i = 0; i < len; i++) {
const observable = observables[i];
this.add(subscribeToResult(this, observable, observable, i));
}
}
}

notifyComplete(unused: Subscriber<R>): void {
if ((this.active -= 1) === 0) {
this.destination.complete();
}
}

notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
const values = this.values;
const oldVal = values[outerIndex];
const toRespond = !this.toRespond
? 0
: oldVal === none ? --this.toRespond : this.toRespond;
values[outerIndex] = innerValue;

if (toRespond === 0) {
if (this.project) {
this._tryProject(values);
} else {
this.destination.next(values.slice());
}
}
}

private _tryProject(values: any[]) {
let result: any;
try {
result = this.project.apply(this, values);
} catch (err) {
this.destination.error(err);
return;
}
this.destination.next(result);
}
}
return higherOrder(...observables)(this);
}
75 changes: 2 additions & 73 deletions src/operator/distinct.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
import { Observable } from '../Observable';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { TeardownLogic } from '../Subscription';
import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { ISet, Set } from '../util/Set';
import { distinct as higherOrder } from '../operators';

/**
* Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.
Expand Down Expand Up @@ -55,70 +49,5 @@ import { ISet, Set } from '../util/Set';
export function distinct<T, K>(this: Observable<T>,
keySelector?: (value: T) => K,
flushes?: Observable<any>): Observable<T> {
return this.lift(new DistinctOperator(keySelector, flushes));
}

class DistinctOperator<T, K> implements Operator<T, T> {
constructor(private keySelector: (value: T) => K, private flushes: Observable<any>) {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new DistinctSubscriber(subscriber, this.keySelector, this.flushes));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
export class DistinctSubscriber<T, K> extends OuterSubscriber<T, T> {
private values: ISet<K> = new Set<K>();

constructor(destination: Subscriber<T>, private keySelector: (value: T) => K, flushes: Observable<any>) {
super(destination);

if (flushes) {
this.add(subscribeToResult(this, flushes));
}
}

notifyNext(outerValue: T, innerValue: T,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, T>): void {
this.values.clear();
}

notifyError(error: any, innerSub: InnerSubscriber<T, T>): void {
this._error(error);
}

protected _next(value: T): void {
if (this.keySelector) {
this._useKeySelector(value);
} else {
this._finalizeNext(value, value);
}
}

private _useKeySelector(value: T): void {
let key: K;
const { destination } = this;
try {
key = this.keySelector(value);
} catch (err) {
destination.error(err);
return;
}
this._finalizeNext(key, value);
}

private _finalizeNext(key: K|T, value: T) {
const { values } = this;
if (!values.has(<K>key)) {
values.add(<K>key);
this.destination.next(value);
}
}

return higherOrder(keySelector, flushes)(this);
}
7 changes: 3 additions & 4 deletions src/operator/publishLast.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import { Observable } from '../Observable';
import { AsyncSubject } from '../AsyncSubject';
import { multicast } from './multicast';
import { ConnectableObservable } from '../observable/ConnectableObservable';

import { publishLast as higherOrder } from '../operators';
/**
* @return {ConnectableObservable<T>}
* @method publishLast
* @owner Observable
*/
export function publishLast<T>(this: Observable<T>): ConnectableObservable<T> {
return multicast.call(this, new AsyncSubject<T>());
//TODO(benlesh): correct type-flow through here.
return higherOrder()(this) as ConnectableObservable<T>;
}
5 changes: 2 additions & 3 deletions src/operator/publishReplay.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { Observable } from '../Observable';
import { ReplaySubject } from '../ReplaySubject';
import { IScheduler } from '../Scheduler';
import { multicast } from './multicast';
import { ConnectableObservable } from '../observable/ConnectableObservable';
import { publishReplay as higherOrder } from '../operators';

/**
* @param bufferSize
Expand All @@ -15,5 +14,5 @@ import { ConnectableObservable } from '../observable/ConnectableObservable';
export function publishReplay<T>(this: Observable<T>, bufferSize: number = Number.POSITIVE_INFINITY,
windowTime: number = Number.POSITIVE_INFINITY,
scheduler?: IScheduler): ConnectableObservable<T> {
return multicast.call(this, new ReplaySubject<T>(bufferSize, windowTime, scheduler));
return higherOrder(bufferSize, windowTime, scheduler)(this);
}
7 changes: 7 additions & 0 deletions src/operators/combineAll.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { CombineLatestOperator } from '../operators/combineLatest';
import { Observable } from '../Observable';
import { OperatorFunction } from '../interfaces';

export function combineAll<T, R>(project?: (...values: Array<any>) => R): OperatorFunction<T, R> {
return (source: Observable<T>) => source.lift(new CombineLatestOperator(project));
}
Loading

0 comments on commit 7bb8280

Please sign in to comment.