Skip to content

Commit

Permalink
feat(groupBy): Adds subjectSelector argument to groupBy (#2023)
Browse files Browse the repository at this point in the history
Adds subjectSelector argument to groupBy to allow consumers to customize the behavior of the hot

GroupedObservable.
  • Loading branch information
trxcllnt authored and jayphelps committed Oct 24, 2016
1 parent 44fbc14 commit f94ceb9
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 8 deletions.
22 changes: 22 additions & 0 deletions spec/operators/groupBy-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions};

declare const rxTestScheduler: Rx.TestScheduler;
const Observable = Rx.Observable;
const ReplaySubject = Rx.ReplaySubject;

/** @test {groupBy} */
describe('Observable.prototype.groupBy', () => {
Expand Down Expand Up @@ -98,6 +99,27 @@ describe('Observable.prototype.groupBy', () => {
expect(resultingGroups).to.deep.equal(expectedGroups);
});

it('should group values with a subject selector', (done: MochaDone) => {
const expectedGroups = [
{ key: 1, values: [3] },
{ key: 0, values: [2] }
];

Observable.of(1, 2, 3)
.groupBy((x: number) => x % 2, null, null, () => new ReplaySubject(1))
// Ensure each inner group reaches the destination after the first event
// has been next'd to the group
.delay(5)
.subscribe((g: any) => {
const expectedGroup = expectedGroups.shift();
expect(g.key).to.equal(expectedGroup.key);

g.subscribe((x: any) => {
expect(x).to.deep.equal(expectedGroup.values.shift());
});
}, null, done);
});

it('should handle an empty Observable', () => {
const e1 = cold('|');
const e1subs = '(^!)';
Expand Down
20 changes: 12 additions & 8 deletions src/operator/groupBy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ import { FastMap } from '../util/FastMap';
export function groupBy<T, K>(this: Observable<T>, keySelector: (value: T) => K): Observable<GroupedObservable<K, T>>;
export function groupBy<T, K>(this: Observable<T>, keySelector: (value: T) => K, elementSelector: void, durationSelector: (grouped: GroupedObservable<K, T>) => Observable<any>): Observable<GroupedObservable<K, T>>;
export function groupBy<T, K, R>(this: Observable<T>, keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>): Observable<GroupedObservable<K, R>>;
export function groupBy<T, K, R>(this: Observable<T>, keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>, subjectSelector?: () => Subject<R>): Observable<GroupedObservable<K, R>>;
/* tslint:disable:max-line-length */
export function groupBy<T, K, R>(this: Observable<T>, keySelector: (value: T) => K,
elementSelector?: ((value: T) => R) | void,
durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>): Observable<GroupedObservable<K, R>> {
return this.lift(new GroupByOperator(this, keySelector, elementSelector, durationSelector));
durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
subjectSelector?: () => Subject<R>): Observable<GroupedObservable<K, R>> {
return this.lift(new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector));
}

export interface RefCountSubscription {
Expand All @@ -46,15 +48,15 @@ export interface RefCountSubscription {
}

class GroupByOperator<T, K, R> implements Operator<T, GroupedObservable<K, R>> {
constructor(public source: Observable<T>,
private keySelector: (value: T) => K,
constructor(private keySelector: (value: T) => K,
private elementSelector?: ((value: T) => R) | void,
private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>) {
private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
private subjectSelector?: () => Subject<R>) {
}

call(subscriber: Subscriber<GroupedObservable<K, R>>, source: any): any {
return source._subscribe(new GroupBySubscriber(
subscriber, this.keySelector, this.elementSelector, this.durationSelector
subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector
));
}
}
Expand All @@ -72,7 +74,8 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscr
constructor(destination: Subscriber<GroupedObservable<K, R>>,
private keySelector: (value: T) => K,
private elementSelector?: ((value: T) => R) | void,
private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>) {
private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
private subjectSelector?: () => Subject<R>) {
super(destination);
}

Expand Down Expand Up @@ -109,7 +112,8 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscr
}

if (!group) {
groups.set(key, group = new Subject<R>());
group = this.subjectSelector ? this.subjectSelector() : new Subject<R>();
groups.set(key, group);
const groupedObservable = new GroupedObservable(key, group, this);
this.destination.next(groupedObservable);
if (this.durationSelector) {
Expand Down

0 comments on commit f94ceb9

Please sign in to comment.