Skip to content

Commit

Permalink
Fix take swallowing exception if thrown by the exactly the nth onNext
Browse files Browse the repository at this point in the history
call to it.
  • Loading branch information
akarnokd committed Jul 23, 2015
1 parent f036cd0 commit 2395720
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 32 deletions.
26 changes: 17 additions & 9 deletions src/main/java/rx/internal/operators/OperatorTake.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,33 +43,41 @@ public OperatorTake(int limit) {
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final Subscriber<T> parent = new Subscriber<T>() {

int count = 0;
boolean completed = false;
int count;
boolean completed;

@Override
public void onCompleted() {
if (!completed) {
completed = true;
child.onCompleted();
}
}

@Override
public void onError(Throwable e) {
if (!completed) {
child.onError(e);
completed = true;
try {
child.onError(e);
} finally {
unsubscribe();
}
}
}

@Override
public void onNext(T i) {
if (!isUnsubscribed()) {
if (++count >= limit) {
completed = true;
}
boolean c = ++count >= limit;
child.onNext(i);
if (completed) {
child.onCompleted();
unsubscribe();
if (c && !completed) {
completed = true;
try {
child.onCompleted();
} finally {
unsubscribe();
}
}
}
}
Expand Down
49 changes: 26 additions & 23 deletions src/test/java/rx/internal/operators/OperatorTakeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,21 @@
package rx.internal.operators;

import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import org.junit.Test;
import org.mockito.InOrder;

import rx.Observable;
import rx.*;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Producer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observers.Subscribers;
import rx.observers.TestSubscriber;
import rx.exceptions.TestException;
import rx.functions.*;
import rx.observers.*;
import rx.schedulers.Schedulers;

public class OperatorTakeTest {
Expand Down Expand Up @@ -414,4 +399,22 @@ public void call(Long n) {
ts.assertNoErrors();
assertEquals(2,requests.get());
}

@Test
public void takeFinalValueThrows() {
Observable<Integer> source = Observable.just(1).take(1);

TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
@Override
public void onNext(Integer t) {
throw new TestException();
}
};

source.subscribe(ts);

ts.assertNoValues();
ts.assertError(TestException.class);
ts.assertNotCompleted();
}
}

0 comments on commit 2395720

Please sign in to comment.