Skip to content

Commit

Permalink
3.x: Fix Single.timeout race condition (#7515)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Jan 17, 2023
1 parent e1b6cb4 commit baad331
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,7 @@ public void onError(Throwable e) {

@Override
public void run() {
Disposable d = get();
if (d != DisposableHelper.DISPOSED && compareAndSet(d, DisposableHelper.DISPOSED)) {
if (d != null) {
d.dispose();
}
if (DisposableHelper.dispose(this)) {
SingleSource<? extends T> other = this.other;
if (other == null) {
downstream.onError(new TimeoutException(timeoutMessage(timeout, unit)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.observers.TestObserver;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.schedulers.TestScheduler;
import io.reactivex.rxjava3.schedulers.*;
import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.TestHelper;

Expand Down Expand Up @@ -255,4 +255,24 @@ protected void subscribeActual(@NonNull SingleObserver<? super @NonNull Integer>

assertTrue(d.isDisposed());
}

@Test
public void timeoutWithZero() throws InterruptedException {
int n = 10_000;
Scheduler sch = Schedulers.single();
for (int i = 0; i < n; i++) {
final int y = i;
final CountDownLatch latch = new CountDownLatch(1);
Disposable d = Single.never()
.timeout(0, TimeUnit.NANOSECONDS, sch)
.subscribe(v -> {}, e -> {
//System.out.println("timeout " + y);
latch.countDown();
});
if (!latch.await(2, TimeUnit.SECONDS)) {
System.out.println(d + " " + sch);
throw new IllegalStateException("Timeout did not work at y = " + y);
}
}
}
}

0 comments on commit baad331

Please sign in to comment.