Skip to content

Commit

Permalink
fix(groupBy): fix groupBy to use lift(), supports composability
Browse files Browse the repository at this point in the history
Fix bug #1085 in which groupBy was not using lift(). Using lift() is critical to support the
ubiquitous lift-based architecture in RxJS Next

Resolves #1085.
  • Loading branch information
staltz committed Jan 18, 2016
1 parent b726118 commit f0ad32a
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 83 deletions.
4 changes: 2 additions & 2 deletions spec/operators/groupBy-spec.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* globals describe, it, expect, hot, cold, expectObservable, expectSubscriptions */
var Rx = require('../../dist/cjs/Rx.KitchenSink');
var Observable = Rx.Observable;
var GroupedObservable = require('../../dist/cjs/operator/groupBy-support').GroupedObservable;
var GroupedObservable = require('../../dist/cjs/operator/groupBy').GroupedObservable;

describe('Observable.prototype.groupBy()', function () {
it.asDiagram('groupBy(i => i % 2)')('should group numbers by odd/even', function () {
Expand Down Expand Up @@ -1387,4 +1387,4 @@ describe('Observable.prototype.groupBy()', function () {
});
}, done.fail, done);
});
});
});
2 changes: 1 addition & 1 deletion src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {Observable} from './Observable';
import {Scheduler} from './Scheduler';
import {ConnectableObservable} from './observable/ConnectableObservable';
import {Subject} from './Subject';
import {GroupedObservable} from './operator/groupBy-support';
import {GroupedObservable} from './operator/groupBy';
import {Notification} from './Notification';

export interface CoreOperators<T> {
Expand Down
4 changes: 2 additions & 2 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {Subscription} from './Subscription';
import {root} from './util/root';
import {CoreOperators} from './CoreOperators';
import {SymbolShim} from './util/SymbolShim';
import {GroupedObservable} from './operator/groupBy-support';
import {GroupedObservable} from './operator/groupBy';
import {ConnectableObservable} from './observable/ConnectableObservable';
import {Subject} from './Subject';
import {Notification} from './Notification';
Expand Down Expand Up @@ -211,7 +211,7 @@ export class Observable<T> implements CoreOperators<T> {
projectResult?: (x: T, y: any, ix: number, iy: number) => R,
concurrent?: number) => Observable<R>;
flatMapTo: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable<R>;
groupBy: <K, R>(keySelector: (value: T) => string,
groupBy: <T, K, R>(keySelector: (value: T) => K,
elementSelector?: (value: T) => R,
durationSelector?: (group: GroupedObservable<K, R>) => Observable<any>) => Observable<GroupedObservable<K, R>>;
ignoreElements: () => Observable<T>;
Expand Down
63 changes: 0 additions & 63 deletions src/operator/groupBy-support.ts

This file was deleted.

80 changes: 65 additions & 15 deletions src/operator/groupBy.ts
Original file line number Diff line number Diff line change
@@ -1,42 +1,47 @@
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';
import {Observable} from '../Observable';
import {Operator} from '../Operator';
import {Subject} from '../Subject';
import {Map} from '../util/Map';
import {FastMap} from '../util/FastMap';
import {RefCountSubscription, GroupedObservable} from './groupBy-support';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

export function groupBy<T, K, R>(keySelector: (value: T) => K,
elementSelector?: (value: T) => R,
durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>): GroupByObservable<T, K, R> {
return new GroupByObservable(this, keySelector, elementSelector, durationSelector);
durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>): Observable<GroupedObservable<K, R>> {
return this.lift(new GroupByOperator(this, keySelector, elementSelector, durationSelector));
}

export class GroupByObservable<T, K, R> extends Observable<GroupedObservable<K, R>> {
export interface RefCountSubscription {
count: number;
unsubscribe: () => void;
isUnsubscribed: boolean;
attemptedToUnsubscribe: boolean;
}

class GroupByOperator<T, K, R> extends Operator<T, R> {
constructor(public source: Observable<T>,
private keySelector: (value: T) => K,
private elementSelector?: (value: T) => R,
private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>) {
super();
}

protected _subscribe(subscriber: Subscriber<any>): Subscription {
const refCountSubscription = new RefCountSubscription();
const groupBySubscriber = new GroupBySubscriber(
subscriber, refCountSubscription, this.keySelector, this.elementSelector, this.durationSelector
call(subscriber: Subscriber<GroupedObservable<K, R>>): Subscriber<T> {
return new GroupBySubscriber(
subscriber, this.keySelector, this.elementSelector, this.durationSelector
);
refCountSubscription.setPrimary(this.source.subscribe(groupBySubscriber));
return refCountSubscription;
}
}

class GroupBySubscriber<T, K, R> extends Subscriber<T> {
class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscription {
private groups: Map<K, Subject<T|R>> = null;
public attemptedToUnsubscribe: boolean = false;
public count: number = 0;

constructor(destination: Subscriber<R>,
private refCountSubscription: RefCountSubscription,
constructor(destination: Subscriber<GroupedObservable<K, R>>,
private keySelector: (value: T) => K,
private elementSelector?: (value: T) => R,
private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>) {
Expand All @@ -61,8 +66,8 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> {
let group = groups.get(key);

if (!group) {
groups.set(key, group = new Subject<T|R>());
let groupedObservable = new GroupedObservable(key, group, this.refCountSubscription);
groups.set(key, group = new Subject<R>());
let groupedObservable = new GroupedObservable(key, group, this);

if (durationSelector) {
let duration = tryCatch(durationSelector)(new GroupedObservable<K, R>(key, <any>group));
Expand Down Expand Up @@ -114,6 +119,15 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> {
removeGroup(key: K): void {
this.groups.delete(key);
}

unsubscribe() {
if (!this.isUnsubscribed && !this.attemptedToUnsubscribe) {
this.attemptedToUnsubscribe = true;
if (this.count === 0) {
super.unsubscribe();
}
}
}
}

class GroupDurationSubscriber<K, T> extends Subscriber<T> {
Expand All @@ -138,3 +152,39 @@ class GroupDurationSubscriber<K, T> extends Subscriber<T> {
this.parent.removeGroup(this.key);
}
}

export class GroupedObservable<K, T> extends Observable<T> {
constructor(public key: K,
private groupSubject: Subject<T>,
private refCountSubscription?: RefCountSubscription) {
super();
}

protected _subscribe(subscriber: Subscriber<T>) {
const subscription = new Subscription();
const {refCountSubscription, groupSubject} = this;
if (refCountSubscription && !refCountSubscription.isUnsubscribed) {
subscription.add(new InnerRefCountSubscription(refCountSubscription));
}
subscription.add(groupSubject.subscribe(subscriber));
return subscription;
}
}

class InnerRefCountSubscription extends Subscription {
constructor(private parent: RefCountSubscription) {
super();
parent.count++;
}

unsubscribe() {
const parent = this.parent;
if (!parent.isUnsubscribed && !this.isUnsubscribed) {
super.unsubscribe();
parent.count -= 1;
if (parent.count === 0 && parent.attemptedToUnsubscribe) {
parent.unsubscribe();
}
}
}
}

0 comments on commit f0ad32a

Please sign in to comment.