Skip to content

Commit

Permalink
fix(groupBy): fix groupBy to use lift(), supports composability
Browse files Browse the repository at this point in the history
Fix bug #1085 in which groupBy was not using lift(). Using lift() is critical to support the
ubiquitous lift-based architecture in RxJS Next

Resolves #1085.
  • Loading branch information
staltz authored and benlesh committed Jan 27, 2016
1 parent 9eb25ba commit 815cfae
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 120 deletions.
40 changes: 20 additions & 20 deletions spec/Observable-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -331,17 +331,16 @@ describe('Observable.lift', function () {

it('should allow injecting behaviors into all subscribers in an operator ' +
'chain when overriden', function (done) {
// The custom Observable
function LogObservable() {
Observable.apply(this, arguments);
// The custom Subscriber
var log = [];
function LogSubscriber() {
Subscriber.apply(this, arguments);
}
LogObservable.prototype = Object.create(Observable.prototype);
LogObservable.prototype.constructor = LogObservable;
LogObservable.prototype.lift = function (operator) {
var obs = new LogObservable();
obs.source = this;
obs.operator = new LogOperator(operator);
return obs;
LogSubscriber.prototype = Object.create(Subscriber.prototype);
LogSubscriber.prototype.constructor = LogSubscriber;
LogSubscriber.prototype.next = function (x) {
log.push('next ' + x);
this.destination.next(x);
};

// The custom Operator
Expand All @@ -352,16 +351,17 @@ describe('Observable.lift', function () {
return this.childOperator.call(new LogSubscriber(subscriber));
};

// The custom Subscriber
var log = [];
function LogSubscriber() {
Subscriber.apply(this, arguments);
// The custom Observable
function LogObservable() {
Observable.apply(this, arguments);
}
LogSubscriber.prototype = Object.create(Subscriber.prototype);
LogSubscriber.prototype.constructor = LogSubscriber;
LogSubscriber.prototype.next = function (x) {
log.push('next ' + x);
this.destination.next(x);
LogObservable.prototype = Object.create(Observable.prototype);
LogObservable.prototype.constructor = LogObservable;
LogObservable.prototype.lift = function (operator) {
var obs = new LogObservable();
obs.source = this;
obs.operator = new LogOperator(operator);
return obs;
};

// Use the LogObservable
Expand All @@ -372,7 +372,7 @@ describe('Observable.lift', function () {
observer.complete();
})
.map(function (x) { return 10 * x; })
.filter(function (x) { return x > 15 })
.filter(function (x) { return x > 15; })
.count();

expect(result instanceof LogObservable).toBe(true);
Expand Down
2 changes: 1 addition & 1 deletion spec/operators/groupBy-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -1387,4 +1387,4 @@ describe('Observable.prototype.groupBy()', function () {
});
}, done.fail, done);
});
});
});
2 changes: 1 addition & 1 deletion src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ export class Observable<T> implements CoreOperators<T> {
projectResult?: (x: T, y: any, ix: number, iy: number) => R,
concurrent?: number) => Observable<R>;
flatMapTo: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable<R>;
groupBy: <K, R>(keySelector: (value: T) => string,
groupBy: <K, R>(keySelector: (value: T) => K,
elementSelector?: (value: T) => R,
durationSelector?: (group: GroupedObservable<K, R>) => Observable<any>) => Observable<GroupedObservable<K, R>>;
ignoreElements: () => Observable<T>;
Expand Down
171 changes: 73 additions & 98 deletions src/operator/groupBy.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';
import {Observable} from '../Observable';
import {Operator} from '../Operator';
import {Subject} from '../Subject';
import {Map} from '../util/Map';
import {FastMap} from '../util/FastMap';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

/**
* Groups the items emitted by an Observable according to a specified criterion,
Expand All @@ -19,33 +22,38 @@ import {FastMap} from '../util/FastMap';
*/
export function groupBy<T, K, R>(keySelector: (value: T) => K,
elementSelector?: (value: T) => R,
durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>): GroupByObservable<T, K, R> {
return new GroupByObservable(this, keySelector, elementSelector, durationSelector);
durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>): Observable<GroupedObservable<K, R>> {
return this.lift(new GroupByOperator(this, keySelector, elementSelector, durationSelector));
}

export class GroupByObservable<T, K, R> extends Observable<GroupedObservable<K, R>> {
export interface RefCountSubscription {
count: number;
unsubscribe: () => void;
isUnsubscribed: boolean;
attemptedToUnsubscribe: boolean;
}

class GroupByOperator<T, K, R> extends Operator<T, R> {
constructor(public source: Observable<T>,
private keySelector: (value: T) => K,
private elementSelector?: (value: T) => R,
private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>) {
super();
}

protected _subscribe(subscriber: Subscriber<any>): Subscription {
const refCountSubscription = new RefCountSubscription();
const groupBySubscriber = new GroupBySubscriber(
subscriber, refCountSubscription, this.keySelector, this.elementSelector, this.durationSelector
call(subscriber: Subscriber<GroupedObservable<K, R>>): Subscriber<T> {
return new GroupBySubscriber(
subscriber, this.keySelector, this.elementSelector, this.durationSelector
);
refCountSubscription.setPrimary(this.source.subscribe(groupBySubscriber));
return refCountSubscription;
}
}

class GroupBySubscriber<T, K, R> extends Subscriber<T> {
class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscription {
private groups: Map<K, Subject<T|R>> = null;
public attemptedToUnsubscribe: boolean = false;
public count: number = 0;

constructor(destination: Subscriber<R>,
private refCountSubscription: RefCountSubscription,
constructor(destination: Subscriber<GroupedObservable<K, R>>,
private keySelector: (value: T) => K,
private elementSelector?: (value: T) => R,
private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>) {
Expand All @@ -54,67 +62,48 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> {
this.add(destination);
}

protected _next(value: T): void {
let key: any;
try {
key = this.keySelector(value);
} catch (err) {
this.error(err);
return;
}
this._group(value, key);
}

private _group(value: T, key: K) {
let groups = this.groups;
protected _next(x: T): void {
let key = tryCatch(this.keySelector)(x);
if (key === errorObject) {
this.error(errorObject.e);
} else {
let groups = this.groups;
const elementSelector = this.elementSelector;
const durationSelector = this.durationSelector;

if (!groups) {
groups = this.groups = typeof key === 'string' ? new FastMap() : new Map();
}
if (!groups) {
groups = this.groups = typeof key === 'string' ? new FastMap() : new Map();
}

let group = groups.get(key);
let group = groups.get(key);

if (!group) {
groups.set(key, group = new Subject<T|R>());
let groupedObservable = new GroupedObservable(key, group, this.refCountSubscription);
if (!group) {
groups.set(key, group = new Subject<R>());
let groupedObservable = new GroupedObservable(key, group, this);

if (this.durationSelector) {
if (!this._tryDuration(key, group)) {
return;
if (durationSelector) {
let duration = tryCatch(durationSelector)(new GroupedObservable<K, R>(key, <any>group));
if (duration === errorObject) {
this.error(errorObject.e);
} else {
this.add(duration.subscribe(new GroupDurationSubscriber(key, group, this)));
}
}
}

this.destination.next(groupedObservable);
}

if (this.elementSelector) {
this._tryElementSelector(value, group);
} else {
group.next(value);
}
}

private _tryElementSelector(value: T, group: Subject<T | R>) {
let result: any;
try {
result = this.elementSelector(value);
} catch (err) {
this.error(err);
return;
}
group.next(result);
}
this.destination.next(groupedObservable);
}

private _tryDuration(key: K, group: any): boolean {
let duration: any;
try {
duration = this.durationSelector(new GroupedObservable<K, R>(key, group));
} catch (err) {
this.error(err);
return false;
if (elementSelector) {
let value = tryCatch(elementSelector)(x);
if (value === errorObject) {
this.error(errorObject.e);
} else {
group.next(value);
}
} else {
group.next(x);
}
}
this.add(duration.subscribe(new GroupDurationSubscriber(key, group, this)));
return true;
}

protected _error(err: any): void {
Expand Down Expand Up @@ -142,6 +131,15 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> {
removeGroup(key: K): void {
this.groups.delete(key);
}

unsubscribe() {
if (!this.isUnsubscribed && !this.attemptedToUnsubscribe) {
this.attemptedToUnsubscribe = true;
if (this.count === 0) {
super.unsubscribe();
}
}
}
}

class GroupDurationSubscriber<K, T> extends Subscriber<T> {
Expand All @@ -167,30 +165,6 @@ class GroupDurationSubscriber<K, T> extends Subscriber<T> {
}
}

export class RefCountSubscription extends Subscription {
primary: Subscription;
attemptedToUnsubscribePrimary: boolean = false;
count: number = 0;

constructor() {
super();
}

setPrimary(subscription: Subscription) {
this.primary = subscription;
}

unsubscribe() {
if (!this.isUnsubscribed && !this.attemptedToUnsubscribePrimary) {
this.attemptedToUnsubscribePrimary = true;
if (this.count === 0) {
super.unsubscribe();
this.primary.unsubscribe();
}
}
}
}

export class GroupedObservable<K, T> extends Observable<T> {
constructor(public key: K,
private groupSubject: Subject<T>,
Expand All @@ -200,27 +174,28 @@ export class GroupedObservable<K, T> extends Observable<T> {

protected _subscribe(subscriber: Subscriber<T>) {
const subscription = new Subscription();
if (this.refCountSubscription && !this.refCountSubscription.isUnsubscribed) {
subscription.add(new InnerRefCountSubscription(this.refCountSubscription));
const {refCountSubscription, groupSubject} = this;
if (refCountSubscription && !refCountSubscription.isUnsubscribed) {
subscription.add(new InnerRefCountSubscription(refCountSubscription));
}
subscription.add(this.groupSubject.subscribe(subscriber));
subscription.add(groupSubject.subscribe(subscriber));
return subscription;
}
}

export class InnerRefCountSubscription extends Subscription {
class InnerRefCountSubscription extends Subscription {
constructor(private parent: RefCountSubscription) {
super();
parent.count++;
}

unsubscribe() {
if (!this.parent.isUnsubscribed && !this.isUnsubscribed) {
const parent = this.parent;
if (!parent.isUnsubscribed && !this.isUnsubscribed) {
super.unsubscribe();
this.parent.count--;
if (this.parent.count === 0 && this.parent.attemptedToUnsubscribePrimary) {
this.parent.unsubscribe();
this.parent.primary.unsubscribe();
parent.count -= 1;
if (parent.count === 0 && parent.attemptedToUnsubscribe) {
parent.unsubscribe();
}
}
}
Expand Down

0 comments on commit 815cfae

Please sign in to comment.