1818import io .reactivex .*;
1919import io .reactivex .disposables .Disposable ;
2020import io .reactivex .internal .disposables .DisposableHelper ;
21- import io .reactivex .internal .observers .QueueDrainObserver ;
2221import io .reactivex .internal .queue .MpscLinkedQueue ;
23- import io .reactivex .internal .util .NotificationLite ;
24- import io .reactivex .observers .* ;
22+ import io .reactivex .internal .util .AtomicThrowable ;
23+ import io .reactivex .observers .DisposableObserver ;
2524import io .reactivex .plugins .RxJavaPlugins ;
2625import io .reactivex .subjects .UnicastSubject ;
2726
2827public final class ObservableWindowBoundary <T , B > extends AbstractObservableWithUpstream <T , Observable <T >> {
2928 final ObservableSource <B > other ;
30- final int bufferSize ;
29+ final int capacityHint ;
3130
32- public ObservableWindowBoundary (ObservableSource <T > source , ObservableSource <B > other , int bufferSize ) {
31+ public ObservableWindowBoundary (ObservableSource <T > source , ObservableSource <B > other , int capacityHint ) {
3332 super (source );
3433 this .other = other ;
35- this .bufferSize = bufferSize ;
34+ this .capacityHint = capacityHint ;
3635 }
3736
3837 @ Override
39- public void subscribeActual (Observer <? super Observable <T >> t ) {
40- source .subscribe (new WindowBoundaryMainObserver <T , B >(new SerializedObserver <Observable <T >>(t ), other , bufferSize ));
38+ public void subscribeActual (Observer <? super Observable <T >> observer ) {
39+ WindowBoundaryMainObserver <T , B > parent = new WindowBoundaryMainObserver <T , B >(observer , capacityHint );
40+
41+ observer .onSubscribe (parent );
42+ other .subscribe (parent .boundaryObserver );
43+
44+ source .subscribe (parent );
4145 }
4246
4347 static final class WindowBoundaryMainObserver <T , B >
44- extends QueueDrainObserver < T , Object , Observable < T >>
45- implements Disposable {
48+ extends AtomicInteger
49+ implements Observer < T >, Disposable , Runnable {
4650
47- final ObservableSource <B > other ;
48- final int bufferSize ;
51+ private static final long serialVersionUID = 2233020065421370272L ;
4952
50- Disposable s ;
53+ final Observer <? super Observable < T >> downstream ;
5154
52- final AtomicReference < Disposable > boundary = new AtomicReference < Disposable >() ;
55+ final int capacityHint ;
5356
54- UnicastSubject < T > window ;
57+ final WindowBoundaryInnerObserver < T , B > boundaryObserver ;
5558
56- static final Object NEXT = new Object () ;
59+ final AtomicReference < Disposable > upstream ;
5760
58- final AtomicLong windows = new AtomicLong () ;
61+ final AtomicInteger windows ;
5962
60- WindowBoundaryMainObserver (Observer <? super Observable <T >> actual , ObservableSource <B > other ,
61- int bufferSize ) {
62- super (actual , new MpscLinkedQueue <Object >());
63- this .other = other ;
64- this .bufferSize = bufferSize ;
65- windows .lazySet (1 );
66- }
63+ final MpscLinkedQueue <Object > queue ;
6764
68- @ Override
69- public void onSubscribe (Disposable s ) {
70- if (DisposableHelper .validate (this .s , s )) {
71- this .s = s ;
65+ final AtomicThrowable errors ;
7266
73- Observer <? super Observable <T >> a = actual ;
74- a .onSubscribe (this );
67+ final AtomicBoolean stopWindows ;
7568
76- if (cancelled ) {
77- return ;
78- }
69+ static final Object NEXT_WINDOW = new Object ();
7970
80- UnicastSubject < T > w = UnicastSubject . create ( bufferSize ) ;
71+ volatile boolean done ;
8172
82- window = w ;
73+ UnicastSubject < T > window ;
8374
84- a .onNext (w );
75+ WindowBoundaryMainObserver (Observer <? super Observable <T >> downstream , int capacityHint ) {
76+ this .downstream = downstream ;
77+ this .capacityHint = capacityHint ;
78+ this .boundaryObserver = new WindowBoundaryInnerObserver <T , B >(this );
79+ this .upstream = new AtomicReference <Disposable >();
80+ this .windows = new AtomicInteger (1 );
81+ this .queue = new MpscLinkedQueue <Object >();
82+ this .errors = new AtomicThrowable ();
83+ this .stopWindows = new AtomicBoolean ();
84+ }
8585
86- WindowBoundaryInnerObserver <T , B > inner = new WindowBoundaryInnerObserver <T , B >(this );
86+ @ Override
87+ public void onSubscribe (Disposable d ) {
88+ if (DisposableHelper .setOnce (upstream , d )) {
8789
88- if (boundary .compareAndSet (null , inner )) {
89- windows .getAndIncrement ();
90- other .subscribe (inner );
91- }
90+ innerNext ();
9291 }
9392 }
9493
9594 @ Override
9695 public void onNext (T t ) {
97- if (fastEnter ()) {
98- UnicastSubject <T > w = window ;
99-
100- w .onNext (t );
96+ queue .offer (t );
97+ drain ();
98+ }
10199
102- if (leave (-1 ) == 0 ) {
103- return ;
104- }
100+ @ Override
101+ public void onError (Throwable e ) {
102+ boundaryObserver .dispose ();
103+ if (errors .addThrowable (e )) {
104+ done = true ;
105+ drain ();
105106 } else {
106- queue .offer (NotificationLite .next (t ));
107- if (!enter ()) {
108- return ;
109- }
107+ RxJavaPlugins .onError (e );
110108 }
111- drainLoop ();
112109 }
113110
114111 @ Override
115- public void onError (Throwable t ) {
116- if (done ) {
117- RxJavaPlugins .onError (t );
118- return ;
119- }
120- error = t ;
112+ public void onComplete () {
113+ boundaryObserver .dispose ();
121114 done = true ;
122- if (enter ()) {
123- drainLoop ();
124- }
115+ drain ();
116+ }
125117
126- if (windows .decrementAndGet () == 0 ) {
127- DisposableHelper .dispose (boundary );
118+ @ Override
119+ public void dispose () {
120+ if (stopWindows .compareAndSet (false , true )) {
121+ boundaryObserver .dispose ();
122+ if (windows .decrementAndGet () == 0 ) {
123+ DisposableHelper .dispose (upstream );
124+ }
128125 }
129-
130- actual .onError (t );
131126 }
132127
133128 @ Override
134- public void onComplete () {
135- if (done ) {
136- return ;
137- }
138- done = true ;
139- if (enter ()) {
140- drainLoop ();
141- }
129+ public boolean isDisposed () {
130+ return stopWindows .get ();
131+ }
142132
133+ @ Override
134+ public void run () {
143135 if (windows .decrementAndGet () == 0 ) {
144- DisposableHelper .dispose (boundary );
136+ DisposableHelper .dispose (upstream );
145137 }
138+ }
146139
147- actual .onComplete ();
148-
140+ void innerNext () {
141+ queue .offer (NEXT_WINDOW );
142+ drain ();
149143 }
150144
151- @ Override
152- public void dispose () {
153- cancelled = true ;
145+ void innerError (Throwable e ) {
146+ DisposableHelper .dispose (upstream );
147+ if (errors .addThrowable (e )) {
148+ done = true ;
149+ drain ();
150+ } else {
151+ RxJavaPlugins .onError (e );
152+ }
154153 }
155154
156- @ Override
157- public boolean isDisposed () {
158- return cancelled ;
155+ void innerComplete () {
156+ DisposableHelper .dispose (upstream );
157+ done = true ;
158+ drain ();
159159 }
160160
161- void drainLoop () {
162- final MpscLinkedQueue <Object > q = (MpscLinkedQueue <Object >)queue ;
163- final Observer <? super Observable <T >> a = actual ;
161+ @ SuppressWarnings ("unchecked" )
162+ void drain () {
163+ if (getAndIncrement () != 0 ) {
164+ return ;
165+ }
166+
164167 int missed = 1 ;
165- UnicastSubject <T > w = window ;
168+ Observer <? super Observable <T >> downstream = this .downstream ;
169+ MpscLinkedQueue <Object > queue = this .queue ;
170+ AtomicThrowable errors = this .errors ;
171+
166172 for (;;) {
167173
168174 for (;;) {
175+ if (windows .get () == 0 ) {
176+ queue .clear ();
177+ window = null ;
178+ return ;
179+ }
180+
181+ UnicastSubject <T > w = window ;
182+
169183 boolean d = done ;
170184
171- Object o = q .poll ();
185+ if (d && errors .get () != null ) {
186+ queue .clear ();
187+ Throwable ex = errors .terminate ();
188+ if (w != null ) {
189+ window = null ;
190+ w .onError (ex );
191+ }
192+ downstream .onError (ex );
193+ return ;
194+ }
195+
196+ Object v = queue .poll ();
172197
173- boolean empty = o == null ;
198+ boolean empty = v == null ;
174199
175200 if (d && empty ) {
176- DisposableHelper .dispose (boundary );
177- Throwable e = error ;
178- if (e != null ) {
179- w .onError (e );
201+ Throwable ex = errors .terminate ();
202+ if (ex == null ) {
203+ if (w != null ) {
204+ window = null ;
205+ w .onComplete ();
206+ }
207+ downstream .onComplete ();
180208 } else {
181- w .onComplete ();
209+ if (w != null ) {
210+ window = null ;
211+ w .onError (ex );
212+ }
213+ downstream .onError (ex );
182214 }
183215 return ;
184216 }
@@ -187,48 +219,35 @@ void drainLoop() {
187219 break ;
188220 }
189221
190- if (o == NEXT ) {
191- w .onComplete ();
192-
193- if (windows .decrementAndGet () == 0 ) {
194- DisposableHelper .dispose (boundary );
195- return ;
196- }
197-
198- if (cancelled ) {
199- continue ;
200- }
201-
202- w = UnicastSubject .create (bufferSize );
222+ if (v != NEXT_WINDOW ) {
223+ w .onNext ((T )v );
224+ continue ;
225+ }
203226
204- windows .getAndIncrement ();
227+ if (w != null ) {
228+ window = null ;
229+ w .onComplete ();
230+ }
205231
232+ if (!stopWindows .get ()) {
233+ w = UnicastSubject .create (capacityHint , this );
206234 window = w ;
235+ windows .getAndIncrement ();
207236
208- a .onNext (w );
209-
210- continue ;
237+ downstream .onNext (w );
211238 }
212-
213- w .onNext (NotificationLite .<T >getValue (o ));
214239 }
215240
216- missed = leave (-missed );
241+ missed = addAndGet (-missed );
217242 if (missed == 0 ) {
218- return ;
243+ break ;
219244 }
220245 }
221246 }
222-
223- void next () {
224- queue .offer (NEXT );
225- if (enter ()) {
226- drainLoop ();
227- }
228- }
229247 }
230248
231249 static final class WindowBoundaryInnerObserver <T , B > extends DisposableObserver <B > {
250+
232251 final WindowBoundaryMainObserver <T , B > parent ;
233252
234253 boolean done ;
@@ -242,7 +261,7 @@ public void onNext(B t) {
242261 if (done ) {
243262 return ;
244263 }
245- parent .next ();
264+ parent .innerNext ();
246265 }
247266
248267 @ Override
@@ -252,7 +271,7 @@ public void onError(Throwable t) {
252271 return ;
253272 }
254273 done = true ;
255- parent .onError (t );
274+ parent .innerError (t );
256275 }
257276
258277 @ Override
@@ -261,7 +280,7 @@ public void onComplete() {
261280 return ;
262281 }
263282 done = true ;
264- parent .onComplete ();
283+ parent .innerComplete ();
265284 }
266285 }
267286}
0 commit comments