Skip to content

Commit

Permalink
Merge pull request #5672 from square/jwilson.1230.await_idle
Browse files Browse the repository at this point in the history
Fix crash on repeated MockWebServer shutdown
  • Loading branch information
swankjesse authored Dec 31, 2019
2 parents 975be25 + edb5865 commit b9e0422
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ class MockWebServer : ExternalResource(), Closeable {

// Await shutdown.
for (queue in taskRunner.activeQueues()) {
if (!queue.awaitIdle(TimeUnit.SECONDS.toNanos(5))) {
if (!queue.idleLatch().await(5, TimeUnit.SECONDS)) {
throw IOException("Gave up waiting for queue to shut down")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class OkHttpClientTestRule : TestRule {

private fun ensureAllTaskQueuesIdle() {
for (queue in TaskRunner.INSTANCE.activeQueues()) {
assertThat(queue.awaitIdle(TimeUnit.MILLISECONDS.toNanos(1000L)))
assertThat(queue.idleLatch().await(1_000L, TimeUnit.MILLISECONDS))
.withFailMessage("Queue still active after 1000 ms")
.isTrue()
}
Expand Down
42 changes: 29 additions & 13 deletions okhttp/src/main/java/okhttp3/internal/concurrent/TaskQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package okhttp3.internal.concurrent
import okhttp3.internal.assertThreadDoesntHoldLock
import java.util.concurrent.CountDownLatch
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit

/**
* A set of tasks that are executed in sequential order.
Expand Down Expand Up @@ -101,25 +100,42 @@ class TaskQueue internal constructor(
}, delayNanos)
}

/** Returns true if this queue became idle before the timeout elapsed. */
fun awaitIdle(delayNanos: Long): Boolean {
val latch = CountDownLatch(1)
/** Returns a latch that reaches 0 when the queue is next idle. */
fun idleLatch(): CountDownLatch {
synchronized(taskRunner) {
// If the queue is already idle, that's easy.
if (activeTask == null && futureTasks.isEmpty()) {
return CountDownLatch(0)
}

val task = object : Task("OkHttp awaitIdle", cancelable = false) {
override fun runOnce(): Long {
latch.countDown()
return -1L
// If there's an existing AwaitIdleTask, use it. This is necessary when the executor is
// shutdown but still busy as we can't enqueue in that case.
val existingTask = activeTask
if (existingTask is AwaitIdleTask) {
return existingTask.latch
}
for (futureTask in futureTasks) {
if (futureTask is AwaitIdleTask) {
return futureTask.latch
}
}
}

// Don't delegate to schedule because that has to honor shutdown rules.
synchronized(taskRunner) {
if (scheduleAndDecide(task, 0L)) {
// Don't delegate to schedule() because that enforces shutdown rules.
val newTask = AwaitIdleTask()
if (scheduleAndDecide(newTask, 0L)) {
taskRunner.kickCoordinator(this)
}
return newTask.latch
}
}

return latch.await(delayNanos, TimeUnit.NANOSECONDS)
private class AwaitIdleTask : Task("OkHttp awaitIdle", cancelable = false) {
val latch = CountDownLatch(1)

override fun runOnce(): Long {
latch.countDown()
return -1L
}
}

/** Adds [task] to run in [delayNanos]. Returns true if the coordinator is impacted. */
Expand Down
4 changes: 2 additions & 2 deletions okhttp/src/main/java/okhttp3/internal/ws/RealWebSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,14 @@ class RealWebSocket(
/** For testing: wait until the web socket's executor has terminated. */
@Throws(InterruptedException::class)
fun awaitTermination(timeout: Long, timeUnit: TimeUnit) {
taskQueue.awaitIdle(timeUnit.toNanos(timeout))
taskQueue.idleLatch().await(timeout, timeUnit)
}

/** For testing: force this web socket to release its threads. */
@Throws(InterruptedException::class)
fun tearDown() {
taskQueue.shutdown()
taskQueue.awaitIdle(TimeUnit.SECONDS.toNanos(10L))
taskQueue.idleLatch().await(10, TimeUnit.SECONDS)
}

@Synchronized fun sentPingCount(): Int = sentPingCount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,22 @@ class TaskRunnerRealBackendTest {
return@schedule -1L
}

queue.awaitIdle(TimeUnit.MILLISECONDS.toNanos(500))
queue.idleLatch().await(500, TimeUnit.MILLISECONDS)

assertThat(log.take()).isEqualTo("failing task running")
assertThat(log.take()).isEqualTo("uncaught exception: java.lang.RuntimeException: boom!")
assertThat(log.take()).isEqualTo("normal task running")
assertThat(log).isEmpty()
}

@Test fun idleLatchAfterShutdown() {
queue.schedule("task") {
Thread.sleep(250)
backend.shutdown()
return@schedule -1L
}

assertThat(queue.idleLatch().await(500L, TimeUnit.MILLISECONDS)).isTrue()
assertThat(queue.idleLatch().count).isEqualTo(0)
}
}
24 changes: 24 additions & 0 deletions okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,30 @@ class TaskRunnerTest {
)
}

@Test fun idleLatch() {
redQueue.execute("task") {
log += "run@${taskFaker.nanoTime}"
}

val idleLatch = redQueue.idleLatch()
assertThat(idleLatch.count).isEqualTo(1)

taskFaker.advanceUntil(0.µs)
assertThat(log).containsExactly("run@0")

assertThat(idleLatch.count).isEqualTo(0)
}

@Test fun multipleCallsToIdleLatchReturnSameInstance() {
redQueue.execute("task") {
log += "run@${taskFaker.nanoTime}"
}

val idleLatch1 = redQueue.idleLatch()
val idleLatch2 = redQueue.idleLatch()
assertThat(idleLatch2).isSameAs(idleLatch1)
}

private val Int.µs: Long
get() = this * 1_000L
}

0 comments on commit b9e0422

Please sign in to comment.