From f94ceb9b3eed06d46ab9f10c4060a99d9b7026e2 Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Tue, 25 Oct 2016 04:32:03 +0800 Subject: [PATCH] feat(groupBy): Adds subjectSelector argument to groupBy (#2023) Adds subjectSelector argument to groupBy to allow consumers to customize the behavior of the hot GroupedObservable. --- spec/operators/groupBy-spec.ts | 22 ++++++++++++++++++++++ src/operator/groupBy.ts | 20 ++++++++++++-------- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/spec/operators/groupBy-spec.ts b/spec/operators/groupBy-spec.ts index e9298c4c7c..8dbdab6aae 100644 --- a/spec/operators/groupBy-spec.ts +++ b/spec/operators/groupBy-spec.ts @@ -5,6 +5,7 @@ declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions}; declare const rxTestScheduler: Rx.TestScheduler; const Observable = Rx.Observable; +const ReplaySubject = Rx.ReplaySubject; /** @test {groupBy} */ describe('Observable.prototype.groupBy', () => { @@ -98,6 +99,27 @@ describe('Observable.prototype.groupBy', () => { expect(resultingGroups).to.deep.equal(expectedGroups); }); + it('should group values with a subject selector', (done: MochaDone) => { + const expectedGroups = [ + { key: 1, values: [3] }, + { key: 0, values: [2] } + ]; + + Observable.of(1, 2, 3) + .groupBy((x: number) => x % 2, null, null, () => new ReplaySubject(1)) + // Ensure each inner group reaches the destination after the first event + // has been next'd to the group + .delay(5) + .subscribe((g: any) => { + const expectedGroup = expectedGroups.shift(); + expect(g.key).to.equal(expectedGroup.key); + + g.subscribe((x: any) => { + expect(x).to.deep.equal(expectedGroup.values.shift()); + }); + }, null, done); + }); + it('should handle an empty Observable', () => { const e1 = cold('|'); const e1subs = '(^!)'; diff --git a/src/operator/groupBy.ts b/src/operator/groupBy.ts index c0feeaa207..a23d462cc4 100644 --- a/src/operator/groupBy.ts +++ b/src/operator/groupBy.ts @@ -31,11 +31,13 @@ import { FastMap } from '../util/FastMap'; export function groupBy(this: Observable, keySelector: (value: T) => K): Observable>; export function groupBy(this: Observable, keySelector: (value: T) => K, elementSelector: void, durationSelector: (grouped: GroupedObservable) => Observable): Observable>; export function groupBy(this: Observable, keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable) => Observable): Observable>; +export function groupBy(this: Observable, keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable) => Observable, subjectSelector?: () => Subject): Observable>; /* tslint:disable:max-line-length */ export function groupBy(this: Observable, keySelector: (value: T) => K, elementSelector?: ((value: T) => R) | void, - durationSelector?: (grouped: GroupedObservable) => Observable): Observable> { - return this.lift(new GroupByOperator(this, keySelector, elementSelector, durationSelector)); + durationSelector?: (grouped: GroupedObservable) => Observable, + subjectSelector?: () => Subject): Observable> { + return this.lift(new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector)); } export interface RefCountSubscription { @@ -46,15 +48,15 @@ export interface RefCountSubscription { } class GroupByOperator implements Operator> { - constructor(public source: Observable, - private keySelector: (value: T) => K, + constructor(private keySelector: (value: T) => K, private elementSelector?: ((value: T) => R) | void, - private durationSelector?: (grouped: GroupedObservable) => Observable) { + private durationSelector?: (grouped: GroupedObservable) => Observable, + private subjectSelector?: () => Subject) { } call(subscriber: Subscriber>, source: any): any { return source._subscribe(new GroupBySubscriber( - subscriber, this.keySelector, this.elementSelector, this.durationSelector + subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector )); } } @@ -72,7 +74,8 @@ class GroupBySubscriber extends Subscriber implements RefCountSubscr constructor(destination: Subscriber>, private keySelector: (value: T) => K, private elementSelector?: ((value: T) => R) | void, - private durationSelector?: (grouped: GroupedObservable) => Observable) { + private durationSelector?: (grouped: GroupedObservable) => Observable, + private subjectSelector?: () => Subject) { super(destination); } @@ -109,7 +112,8 @@ class GroupBySubscriber extends Subscriber implements RefCountSubscr } if (!group) { - groups.set(key, group = new Subject()); + group = this.subjectSelector ? this.subjectSelector() : new Subject(); + groups.set(key, group); const groupedObservable = new GroupedObservable(key, group, this); this.destination.next(groupedObservable); if (this.durationSelector) {