diff --git a/spec/operators/groupBy-spec.js b/spec/operators/groupBy-spec.js index 3f49e1a1ac1..87cddb74f52 100644 --- a/spec/operators/groupBy-spec.js +++ b/spec/operators/groupBy-spec.js @@ -1,7 +1,7 @@ /* globals describe, it, expect, hot, cold, expectObservable, expectSubscriptions */ var Rx = require('../../dist/cjs/Rx.KitchenSink'); var Observable = Rx.Observable; -var GroupedObservable = require('../../dist/cjs/operator/groupBy-support').GroupedObservable; +var GroupedObservable = require('../../dist/cjs/operator/groupBy').GroupedObservable; describe('Observable.prototype.groupBy()', function () { it.asDiagram('groupBy(i => i % 2)')('should group numbers by odd/even', function () { @@ -1387,4 +1387,4 @@ describe('Observable.prototype.groupBy()', function () { }); }, done.fail, done); }); -}); \ No newline at end of file +}); diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index 48fa45fa556..58f13509f9e 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -2,7 +2,7 @@ import {Observable} from './Observable'; import {Scheduler} from './Scheduler'; import {ConnectableObservable} from './observable/ConnectableObservable'; import {Subject} from './Subject'; -import {GroupedObservable} from './operator/groupBy-support'; +import {GroupedObservable} from './operator/groupBy'; import {Notification} from './Notification'; export interface CoreOperators { diff --git a/src/Observable.ts b/src/Observable.ts index 8183b848d8c..af0f49e769b 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -6,7 +6,7 @@ import {Subscription} from './Subscription'; import {root} from './util/root'; import {CoreOperators} from './CoreOperators'; import {SymbolShim} from './util/SymbolShim'; -import {GroupedObservable} from './operator/groupBy-support'; +import {GroupedObservable} from './operator/groupBy'; import {ConnectableObservable} from './observable/ConnectableObservable'; import {Subject} from './Subject'; import {Notification} from './Notification'; @@ -211,7 +211,7 @@ export class Observable implements CoreOperators { projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable; flatMapTo: (observable: Observable, projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable; - groupBy: (keySelector: (value: T) => string, + groupBy: (keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (group: GroupedObservable) => Observable) => Observable>; ignoreElements: () => Observable; diff --git a/src/operator/groupBy-support.ts b/src/operator/groupBy-support.ts deleted file mode 100644 index 1f52fe7045c..00000000000 --- a/src/operator/groupBy-support.ts +++ /dev/null @@ -1,63 +0,0 @@ -import {Subscription} from '../Subscription'; -import {Subject} from '../Subject'; -import {Subscriber} from '../Subscriber'; -import {Observable} from '../Observable'; - -export class RefCountSubscription extends Subscription { - primary: Subscription; - attemptedToUnsubscribePrimary: boolean = false; - count: number = 0; - - constructor() { - super(); - } - - setPrimary(subscription: Subscription) { - this.primary = subscription; - } - - unsubscribe() { - if (!this.isUnsubscribed && !this.attemptedToUnsubscribePrimary) { - this.attemptedToUnsubscribePrimary = true; - if (this.count === 0) { - super.unsubscribe(); - this.primary.unsubscribe(); - } - } - } -} - -export class GroupedObservable extends Observable { - constructor(public key: K, - private groupSubject: Subject, - private refCountSubscription?: RefCountSubscription) { - super(); - } - - protected _subscribe(subscriber: Subscriber) { - const subscription = new Subscription(); - if (this.refCountSubscription && !this.refCountSubscription.isUnsubscribed) { - subscription.add(new InnerRefCountSubscription(this.refCountSubscription)); - } - subscription.add(this.groupSubject.subscribe(subscriber)); - return subscription; - } -} - -export class InnerRefCountSubscription extends Subscription { - constructor(private parent: RefCountSubscription) { - super(); - parent.count++; - } - - unsubscribe() { - if (!this.parent.isUnsubscribed && !this.isUnsubscribed) { - super.unsubscribe(); - this.parent.count--; - if (this.parent.count === 0 && this.parent.attemptedToUnsubscribePrimary) { - this.parent.unsubscribe(); - this.parent.primary.unsubscribe(); - } - } - } -} diff --git a/src/operator/groupBy.ts b/src/operator/groupBy.ts index 22761f43eb2..2fdc5dcc512 100644 --- a/src/operator/groupBy.ts +++ b/src/operator/groupBy.ts @@ -1,20 +1,27 @@ import {Subscriber} from '../Subscriber'; import {Subscription} from '../Subscription'; import {Observable} from '../Observable'; +import {Operator} from '../Operator'; import {Subject} from '../Subject'; import {Map} from '../util/Map'; import {FastMap} from '../util/FastMap'; -import {RefCountSubscription, GroupedObservable} from './groupBy-support'; import {tryCatch} from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; export function groupBy(keySelector: (value: T) => K, elementSelector?: (value: T) => R, - durationSelector?: (grouped: GroupedObservable) => Observable): GroupByObservable { - return new GroupByObservable(this, keySelector, elementSelector, durationSelector); + durationSelector?: (grouped: GroupedObservable) => Observable): Observable> { + return this.lift(new GroupByOperator(this, keySelector, elementSelector, durationSelector)); } -export class GroupByObservable extends Observable> { +export interface RefCountSubscription { + count: number; + unsubscribe: () => void; + isUnsubscribed: boolean; + attemptedToUnsubscribe: boolean; +} + +class GroupByOperator extends Operator { constructor(public source: Observable, private keySelector: (value: T) => K, private elementSelector?: (value: T) => R, @@ -22,21 +29,19 @@ export class GroupByObservable extends Observable): Subscription { - const refCountSubscription = new RefCountSubscription(); - const groupBySubscriber = new GroupBySubscriber( - subscriber, refCountSubscription, this.keySelector, this.elementSelector, this.durationSelector + call(subscriber: Subscriber>): Subscriber { + return new GroupBySubscriber( + subscriber, this.keySelector, this.elementSelector, this.durationSelector ); - refCountSubscription.setPrimary(this.source.subscribe(groupBySubscriber)); - return refCountSubscription; } } -class GroupBySubscriber extends Subscriber { +class GroupBySubscriber extends Subscriber implements RefCountSubscription { private groups: Map> = null; + public attemptedToUnsubscribe: boolean = false; + public count: number = 0; - constructor(destination: Subscriber, - private refCountSubscription: RefCountSubscription, + constructor(destination: Subscriber>, private keySelector: (value: T) => K, private elementSelector?: (value: T) => R, private durationSelector?: (grouped: GroupedObservable) => Observable) { @@ -61,8 +66,8 @@ class GroupBySubscriber extends Subscriber { let group = groups.get(key); if (!group) { - groups.set(key, group = new Subject()); - let groupedObservable = new GroupedObservable(key, group, this.refCountSubscription); + groups.set(key, group = new Subject()); + let groupedObservable = new GroupedObservable(key, group, this); if (durationSelector) { let duration = tryCatch(durationSelector)(new GroupedObservable(key, group)); @@ -114,6 +119,15 @@ class GroupBySubscriber extends Subscriber { removeGroup(key: K): void { this.groups.delete(key); } + + unsubscribe() { + if (!this.isUnsubscribed && !this.attemptedToUnsubscribe) { + this.attemptedToUnsubscribe = true; + if (this.count === 0) { + super.unsubscribe(); + } + } + } } class GroupDurationSubscriber extends Subscriber { @@ -138,3 +152,39 @@ class GroupDurationSubscriber extends Subscriber { this.parent.removeGroup(this.key); } } + +export class GroupedObservable extends Observable { + constructor(public key: K, + private groupSubject: Subject, + private refCountSubscription?: RefCountSubscription) { + super(); + } + + protected _subscribe(subscriber: Subscriber) { + const subscription = new Subscription(); + const {refCountSubscription, groupSubject} = this; + if (refCountSubscription && !refCountSubscription.isUnsubscribed) { + subscription.add(new InnerRefCountSubscription(refCountSubscription)); + } + subscription.add(groupSubject.subscribe(subscriber)); + return subscription; + } +} + +class InnerRefCountSubscription extends Subscription { + constructor(private parent: RefCountSubscription) { + super(); + parent.count++; + } + + unsubscribe() { + const parent = this.parent; + if (!parent.isUnsubscribed && !this.isUnsubscribed) { + super.unsubscribe(); + parent.count -= 1; + if (parent.count === 0 && parent.attemptedToUnsubscribe) { + parent.unsubscribe(); + } + } + } +}