From 815cfae35e47f08abc7131a2f6b1af989eee186d Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Mon, 18 Jan 2016 15:15:40 +0200 Subject: [PATCH] fix(groupBy): fix groupBy to use lift(), supports composability Fix bug #1085 in which groupBy was not using lift(). Using lift() is critical to support the ubiquitous lift-based architecture in RxJS Next Resolves #1085. --- spec/Observable-spec.js | 40 ++++---- spec/operators/groupBy-spec.js | 2 +- src/Observable.ts | 2 +- src/operator/groupBy.ts | 171 ++++++++++++++------------------- 4 files changed, 95 insertions(+), 120 deletions(-) diff --git a/spec/Observable-spec.js b/spec/Observable-spec.js index cf33b492a1..3627f81132 100644 --- a/spec/Observable-spec.js +++ b/spec/Observable-spec.js @@ -331,17 +331,16 @@ describe('Observable.lift', function () { it('should allow injecting behaviors into all subscribers in an operator ' + 'chain when overriden', function (done) { - // The custom Observable - function LogObservable() { - Observable.apply(this, arguments); + // The custom Subscriber + var log = []; + function LogSubscriber() { + Subscriber.apply(this, arguments); } - LogObservable.prototype = Object.create(Observable.prototype); - LogObservable.prototype.constructor = LogObservable; - LogObservable.prototype.lift = function (operator) { - var obs = new LogObservable(); - obs.source = this; - obs.operator = new LogOperator(operator); - return obs; + LogSubscriber.prototype = Object.create(Subscriber.prototype); + LogSubscriber.prototype.constructor = LogSubscriber; + LogSubscriber.prototype.next = function (x) { + log.push('next ' + x); + this.destination.next(x); }; // The custom Operator @@ -352,16 +351,17 @@ describe('Observable.lift', function () { return this.childOperator.call(new LogSubscriber(subscriber)); }; - // The custom Subscriber - var log = []; - function LogSubscriber() { - Subscriber.apply(this, arguments); + // The custom Observable + function LogObservable() { + Observable.apply(this, arguments); } - LogSubscriber.prototype = Object.create(Subscriber.prototype); - LogSubscriber.prototype.constructor = LogSubscriber; - LogSubscriber.prototype.next = function (x) { - log.push('next ' + x); - this.destination.next(x); + LogObservable.prototype = Object.create(Observable.prototype); + LogObservable.prototype.constructor = LogObservable; + LogObservable.prototype.lift = function (operator) { + var obs = new LogObservable(); + obs.source = this; + obs.operator = new LogOperator(operator); + return obs; }; // Use the LogObservable @@ -372,7 +372,7 @@ describe('Observable.lift', function () { observer.complete(); }) .map(function (x) { return 10 * x; }) - .filter(function (x) { return x > 15 }) + .filter(function (x) { return x > 15; }) .count(); expect(result instanceof LogObservable).toBe(true); diff --git a/spec/operators/groupBy-spec.js b/spec/operators/groupBy-spec.js index 2a0642a158..87cddb74f5 100644 --- a/spec/operators/groupBy-spec.js +++ b/spec/operators/groupBy-spec.js @@ -1387,4 +1387,4 @@ describe('Observable.prototype.groupBy()', function () { }); }, done.fail, done); }); -}); \ No newline at end of file +}); diff --git a/src/Observable.ts b/src/Observable.ts index f08c6f8a52..d7220b2bc2 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -215,7 +215,7 @@ export class Observable implements CoreOperators { projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable; flatMapTo: (observable: Observable, projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable; - groupBy: (keySelector: (value: T) => string, + groupBy: (keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (group: GroupedObservable) => Observable) => Observable>; ignoreElements: () => Observable; diff --git a/src/operator/groupBy.ts b/src/operator/groupBy.ts index dc07660f96..c92dcd2e25 100644 --- a/src/operator/groupBy.ts +++ b/src/operator/groupBy.ts @@ -1,9 +1,12 @@ import {Subscriber} from '../Subscriber'; import {Subscription} from '../Subscription'; import {Observable} from '../Observable'; +import {Operator} from '../Operator'; import {Subject} from '../Subject'; import {Map} from '../util/Map'; import {FastMap} from '../util/FastMap'; +import {tryCatch} from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; /** * Groups the items emitted by an Observable according to a specified criterion, @@ -19,11 +22,18 @@ import {FastMap} from '../util/FastMap'; */ export function groupBy(keySelector: (value: T) => K, elementSelector?: (value: T) => R, - durationSelector?: (grouped: GroupedObservable) => Observable): GroupByObservable { - return new GroupByObservable(this, keySelector, elementSelector, durationSelector); + durationSelector?: (grouped: GroupedObservable) => Observable): Observable> { + return this.lift(new GroupByOperator(this, keySelector, elementSelector, durationSelector)); } -export class GroupByObservable extends Observable> { +export interface RefCountSubscription { + count: number; + unsubscribe: () => void; + isUnsubscribed: boolean; + attemptedToUnsubscribe: boolean; +} + +class GroupByOperator extends Operator { constructor(public source: Observable, private keySelector: (value: T) => K, private elementSelector?: (value: T) => R, @@ -31,21 +41,19 @@ export class GroupByObservable extends Observable): Subscription { - const refCountSubscription = new RefCountSubscription(); - const groupBySubscriber = new GroupBySubscriber( - subscriber, refCountSubscription, this.keySelector, this.elementSelector, this.durationSelector + call(subscriber: Subscriber>): Subscriber { + return new GroupBySubscriber( + subscriber, this.keySelector, this.elementSelector, this.durationSelector ); - refCountSubscription.setPrimary(this.source.subscribe(groupBySubscriber)); - return refCountSubscription; } } -class GroupBySubscriber extends Subscriber { +class GroupBySubscriber extends Subscriber implements RefCountSubscription { private groups: Map> = null; + public attemptedToUnsubscribe: boolean = false; + public count: number = 0; - constructor(destination: Subscriber, - private refCountSubscription: RefCountSubscription, + constructor(destination: Subscriber>, private keySelector: (value: T) => K, private elementSelector?: (value: T) => R, private durationSelector?: (grouped: GroupedObservable) => Observable) { @@ -54,67 +62,48 @@ class GroupBySubscriber extends Subscriber { this.add(destination); } - protected _next(value: T): void { - let key: any; - try { - key = this.keySelector(value); - } catch (err) { - this.error(err); - return; - } - this._group(value, key); - } - - private _group(value: T, key: K) { - let groups = this.groups; + protected _next(x: T): void { + let key = tryCatch(this.keySelector)(x); + if (key === errorObject) { + this.error(errorObject.e); + } else { + let groups = this.groups; + const elementSelector = this.elementSelector; + const durationSelector = this.durationSelector; - if (!groups) { - groups = this.groups = typeof key === 'string' ? new FastMap() : new Map(); - } + if (!groups) { + groups = this.groups = typeof key === 'string' ? new FastMap() : new Map(); + } - let group = groups.get(key); + let group = groups.get(key); - if (!group) { - groups.set(key, group = new Subject()); - let groupedObservable = new GroupedObservable(key, group, this.refCountSubscription); + if (!group) { + groups.set(key, group = new Subject()); + let groupedObservable = new GroupedObservable(key, group, this); - if (this.durationSelector) { - if (!this._tryDuration(key, group)) { - return; + if (durationSelector) { + let duration = tryCatch(durationSelector)(new GroupedObservable(key, group)); + if (duration === errorObject) { + this.error(errorObject.e); + } else { + this.add(duration.subscribe(new GroupDurationSubscriber(key, group, this))); + } } - } - this.destination.next(groupedObservable); - } - - if (this.elementSelector) { - this._tryElementSelector(value, group); - } else { - group.next(value); - } - } - - private _tryElementSelector(value: T, group: Subject) { - let result: any; - try { - result = this.elementSelector(value); - } catch (err) { - this.error(err); - return; - } - group.next(result); - } + this.destination.next(groupedObservable); + } - private _tryDuration(key: K, group: any): boolean { - let duration: any; - try { - duration = this.durationSelector(new GroupedObservable(key, group)); - } catch (err) { - this.error(err); - return false; + if (elementSelector) { + let value = tryCatch(elementSelector)(x); + if (value === errorObject) { + this.error(errorObject.e); + } else { + group.next(value); + } + } else { + group.next(x); + } } - this.add(duration.subscribe(new GroupDurationSubscriber(key, group, this))); - return true; } protected _error(err: any): void { @@ -142,6 +131,15 @@ class GroupBySubscriber extends Subscriber { removeGroup(key: K): void { this.groups.delete(key); } + + unsubscribe() { + if (!this.isUnsubscribed && !this.attemptedToUnsubscribe) { + this.attemptedToUnsubscribe = true; + if (this.count === 0) { + super.unsubscribe(); + } + } + } } class GroupDurationSubscriber extends Subscriber { @@ -167,30 +165,6 @@ class GroupDurationSubscriber extends Subscriber { } } -export class RefCountSubscription extends Subscription { - primary: Subscription; - attemptedToUnsubscribePrimary: boolean = false; - count: number = 0; - - constructor() { - super(); - } - - setPrimary(subscription: Subscription) { - this.primary = subscription; - } - - unsubscribe() { - if (!this.isUnsubscribed && !this.attemptedToUnsubscribePrimary) { - this.attemptedToUnsubscribePrimary = true; - if (this.count === 0) { - super.unsubscribe(); - this.primary.unsubscribe(); - } - } - } -} - export class GroupedObservable extends Observable { constructor(public key: K, private groupSubject: Subject, @@ -200,27 +174,28 @@ export class GroupedObservable extends Observable { protected _subscribe(subscriber: Subscriber) { const subscription = new Subscription(); - if (this.refCountSubscription && !this.refCountSubscription.isUnsubscribed) { - subscription.add(new InnerRefCountSubscription(this.refCountSubscription)); + const {refCountSubscription, groupSubject} = this; + if (refCountSubscription && !refCountSubscription.isUnsubscribed) { + subscription.add(new InnerRefCountSubscription(refCountSubscription)); } - subscription.add(this.groupSubject.subscribe(subscriber)); + subscription.add(groupSubject.subscribe(subscriber)); return subscription; } } -export class InnerRefCountSubscription extends Subscription { +class InnerRefCountSubscription extends Subscription { constructor(private parent: RefCountSubscription) { super(); parent.count++; } unsubscribe() { - if (!this.parent.isUnsubscribed && !this.isUnsubscribed) { + const parent = this.parent; + if (!parent.isUnsubscribed && !this.isUnsubscribed) { super.unsubscribe(); - this.parent.count--; - if (this.parent.count === 0 && this.parent.attemptedToUnsubscribePrimary) { - this.parent.unsubscribe(); - this.parent.primary.unsubscribe(); + parent.count -= 1; + if (parent.count === 0 && parent.attemptedToUnsubscribe) { + parent.unsubscribe(); } } }