@@ -4,8 +4,6 @@ import {Observable} from '../Observable';
44import { Subject } from '../Subject' ;
55import { Map } from '../util/Map' ;
66import { FastMap } from '../util/FastMap' ;
7- import { tryCatch } from '../util/tryCatch' ;
8- import { errorObject } from '../util/errorObject' ;
97
108/**
119 * Groups the items emitted by an Observable according to a specified criterion,
@@ -56,48 +54,67 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> {
5654 this . add ( destination ) ;
5755 }
5856
59- protected _next ( x : T ) : void {
60- let key = tryCatch ( this . keySelector ) ( x ) ;
61- if ( key === errorObject ) {
62- this . error ( errorObject . e ) ;
63- } else {
64- let groups = this . groups ;
65- const elementSelector = this . elementSelector ;
66- const durationSelector = this . durationSelector ;
67-
68- if ( ! groups ) {
69- groups = this . groups = typeof key === 'string' ? new FastMap ( ) : new Map ( ) ;
70- }
57+ protected _next ( value : T ) : void {
58+ let key : any ;
59+ try {
60+ key = this . keySelector ( value ) ;
61+ } catch ( err ) {
62+ this . error ( err ) ;
63+ return ;
64+ }
65+ this . _group ( value , key ) ;
66+ }
7167
72- let group = groups . get ( key ) ;
68+ private _group ( value : T , key : K ) {
69+ let groups = this . groups ;
7370
74- if ( ! group ) {
75- groups . set ( key , group = new Subject < T | R > ( ) ) ;
76- let groupedObservable = new GroupedObservable ( key , group , this . refCountSubscription ) ;
71+ if ( ! groups ) {
72+ groups = this . groups = typeof key === 'string' ? new FastMap ( ) : new Map ( ) ;
73+ }
7774
78- if ( durationSelector ) {
79- let duration = tryCatch ( durationSelector ) ( new GroupedObservable < K , R > ( key , < any > group ) ) ;
80- if ( duration === errorObject ) {
81- this . error ( errorObject . e ) ;
82- } else {
83- this . add ( duration . subscribe ( new GroupDurationSubscriber ( key , group , this ) ) ) ;
84- }
85- }
75+ let group = groups . get ( key ) ;
8676
87- this . destination . next ( groupedObservable ) ;
88- }
77+ if ( ! group ) {
78+ groups . set ( key , group = new Subject < T | R > ( ) ) ;
79+ let groupedObservable = new GroupedObservable ( key , group , this . refCountSubscription ) ;
8980
90- if ( elementSelector ) {
91- let value = tryCatch ( elementSelector ) ( x ) ;
92- if ( value === errorObject ) {
93- this . error ( errorObject . e ) ;
94- } else {
95- group . next ( value ) ;
81+ if ( this . durationSelector ) {
82+ if ( ! this . _tryDuration ( key , group ) ) {
83+ return ;
9684 }
97- } else {
98- group . next ( x ) ;
9985 }
86+
87+ this . destination . next ( groupedObservable ) ;
88+ }
89+
90+ if ( this . elementSelector ) {
91+ this . _tryElementSelector ( value , group ) ;
92+ } else {
93+ group . next ( value ) ;
94+ }
95+ }
96+
97+ private _tryElementSelector ( value : T , group : Subject < T | R > ) {
98+ let result : any ;
99+ try {
100+ result = this . elementSelector ( value ) ;
101+ } catch ( err ) {
102+ this . error ( err ) ;
103+ return ;
104+ }
105+ group . next ( result ) ;
106+ }
107+
108+ private _tryDuration ( key : K , group : any ) : boolean {
109+ let duration : any ;
110+ try {
111+ duration = this . durationSelector ( new GroupedObservable < K , R > ( key , group ) ) ;
112+ } catch ( err ) {
113+ this . error ( err ) ;
114+ return false ;
100115 }
116+ this . add ( duration . subscribe ( new GroupDurationSubscriber ( key , group , this ) ) ) ;
117+ return true ;
101118 }
102119
103120 protected _error ( err : any ) : void {
0 commit comments