File tree Expand file tree Collapse file tree 2 files changed +31
-1
lines changed
main/java/rx/internal/operators
test/java/rx/internal/operators Expand file tree Collapse file tree 2 files changed +31
-1
lines changed Original file line number Diff line number Diff line change @@ -73,7 +73,7 @@ public void request(long n) {
7373 }
7474 } else if (n > 0 ) {
7575 // backpressure is requested
76- long _c = REQUESTED_UPDATER . getAndAdd ( this , n );
76+ long _c = BackpressureUtils . getAndAddRequest ( REQUESTED_UPDATER , this , n );
7777 if (_c == 0 ) {
7878 while (true ) {
7979 /*
Original file line number Diff line number Diff line change 3232
3333import rx .Observable ;
3434import rx .Observer ;
35+ import rx .Subscriber ;
3536import rx .functions .Action1 ;
3637import rx .internal .util .RxRingBuffer ;
3738import rx .observers .TestSubscriber ;
@@ -190,4 +191,33 @@ public void testWithBackpressureRequestWayMore() {
190191 ts .assertReceivedOnNext (list );
191192 ts .assertTerminalEvent ();
192193 }
194+
195+ @ Test
196+ public void testRequestOverflow () {
197+ final AtomicInteger count = new AtomicInteger ();
198+ int n = 10 ;
199+ Observable .range (1 , n ).subscribe (new Subscriber <Integer >() {
200+
201+ @ Override
202+ public void onStart () {
203+ request (2 );
204+ }
205+
206+ @ Override
207+ public void onCompleted () {
208+ //do nothing
209+ }
210+
211+ @ Override
212+ public void onError (Throwable e ) {
213+ throw new RuntimeException (e );
214+ }
215+
216+ @ Override
217+ public void onNext (Integer t ) {
218+ count .incrementAndGet ();
219+ request (Long .MAX_VALUE - 1 );
220+ }});
221+ assertEquals (n , count .get ());
222+ }
193223}
You can’t perform that action at this time.
0 commit comments