Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TaskRunner support for shutting down queues #5502

Merged
merged 1 commit into from
Sep 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import okhttp3.Request
import okhttp3.Response
import okhttp3.internal.addHeaderLenient
import okhttp3.internal.closeQuietly
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.duplex.MwsDuplexAccess
import okhttp3.internal.execute
import okhttp3.internal.http.HttpMethod
Expand Down Expand Up @@ -99,6 +100,7 @@ import javax.net.ssl.X509TrustManager
* in sequence.
*/
class MockWebServer : ExternalResource(), Closeable {
private val taskRunner = TaskRunner()
private val requestQueue = LinkedBlockingQueue<RecordedRequest>()
private val openClientSockets =
Collections.newSetFromMap(ConcurrentHashMap<Socket, Boolean>())
Expand Down Expand Up @@ -454,6 +456,12 @@ class MockWebServer : ExternalResource(), Closeable {
} catch (e: InterruptedException) {
throw AssertionError()
}

for (queue in taskRunner.activeQueues()) {
if (!queue.awaitIdle(TimeUnit.MILLISECONDS.toNanos(500L))) {
throw IOException("Gave up waiting for ${queue.owner} to shut down")
}
}
}

@Synchronized override fun after() {
Expand Down Expand Up @@ -533,7 +541,7 @@ class MockWebServer : ExternalResource(), Closeable {

if (protocol === Protocol.HTTP_2 || protocol === Protocol.H2_PRIOR_KNOWLEDGE) {
val http2SocketHandler = Http2SocketHandler(socket, protocol)
val connection = Http2Connection.Builder(false)
val connection = Http2Connection.Builder(false, taskRunner)
.socket(socket)
.listener(http2SocketHandler)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import javax.net.ssl.SSLSocketFactory;
import okhttp3.Headers;
import okhttp3.Protocol;
import okhttp3.internal.concurrent.TaskRunner;
import okhttp3.internal.http2.Header;
import okhttp3.internal.http2.Http2Connection;
import okhttp3.internal.http2.Http2Stream;
Expand Down Expand Up @@ -69,7 +70,7 @@ private void run() throws Exception {
if (protocol != Protocol.HTTP_2) {
throw new ProtocolException("Protocol " + protocol + " unsupported");
}
Http2Connection connection = new Http2Connection.Builder(false)
Http2Connection connection = new Http2Connection.Builder(false, TaskRunner.INSTANCE)
.socket(sslSocket)
.listener(this)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package okhttp3

import okhttp3.internal.concurrent.Task
import okhttp3.internal.concurrent.TaskQueue
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.testing.Flaky
import org.assertj.core.api.Assertions.assertThat
Expand All @@ -25,7 +23,6 @@ import org.junit.runner.Description
import org.junit.runners.model.Statement
import java.net.InetAddress
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

/** Apply this rule to tests that need an OkHttpClient instance. */
Expand Down Expand Up @@ -66,7 +63,7 @@ class OkHttpClientTestRule : TestRule {

private fun ensureAllTaskQueuesIdle() {
for (queue in TaskRunner.INSTANCE.activeQueues()) {
assertThat(queue.awaitIdle(500L, TimeUnit.MILLISECONDS))
assertThat(queue.awaitIdle(TimeUnit.MILLISECONDS.toNanos(500L)))
.withFailMessage("Queue ${queue.owner} still active after 500ms")
.isTrue()
}
Expand Down Expand Up @@ -133,19 +130,6 @@ class OkHttpClientTestRule : TestRule {
}
}

/** Returns true if this queue became idle before the timeout elapsed. */
private fun TaskQueue.awaitIdle(timeout: Long, timeUnit: TimeUnit): Boolean {
val latch = CountDownLatch(1)
schedule(object : Task("awaitIdle") {
override fun runOnce(): Long {
latch.countDown()
return -1L
}
})

return latch.await(timeout, timeUnit)
}

companion object {
/**
* Quick and dirty pool of OkHttpClient instances. Each has its own independent dispatcher and
Expand Down
34 changes: 0 additions & 34 deletions okhttp/src/main/java/okhttp3/internal/Util.kt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import java.util.LinkedHashMap
import java.util.Locale
import java.util.TimeZone
import java.util.concurrent.Executor
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import kotlin.text.Charsets.UTF_32BE
Expand Down Expand Up @@ -393,14 +392,6 @@ inline fun Executor.execute(name: String, crossinline block: () -> Unit) {
}
}

/** Executes [block] unless this executor has been shutdown, in which case this does nothing. */
inline fun Executor.tryExecute(name: String, crossinline block: () -> Unit) {
try {
execute(name, block)
} catch (_: RejectedExecutionException) {
}
}

fun Buffer.skipAll(b: Byte): Int {
var count = 0
while (!exhausted() && this[0] == b) {
Expand Down Expand Up @@ -510,34 +501,9 @@ fun Long.toHexString(): String = java.lang.Long.toHexString(this)

fun Int.toHexString(): String = Integer.toHexString(this)

/**
* Lock and wait a duration in nanoseconds. Unlike [java.lang.Object.wait] this interprets 0 as
* "don't wait" instead of "wait forever".
*/
@Throws(InterruptedException::class)
fun Any.lockAndWaitNanos(nanos: Long) {
synchronized(this) {
objectWaitNanos(nanos)
}
}

@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "NOTHING_TO_INLINE")
inline fun Any.wait() = (this as Object).wait()

/**
* Wait a duration in nanoseconds. Unlike [java.lang.Object.wait] this interprets 0 as "don't wait"
* instead of "wait forever".
*/
@Throws(InterruptedException::class)
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
fun Any.objectWaitNanos(nanos: Long) {
val ms = nanos / 1_000_000L
val ns = nanos - (ms * 1_000_000L)
if (ms > 0L || nanos > 0) {
(this as Object).wait(ms, ns.toInt())
}
}

@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "NOTHING_TO_INLINE")
inline fun Any.notify() = (this as Object).notify()

Expand Down
58 changes: 56 additions & 2 deletions okhttp/src/main/java/okhttp3/internal/concurrent/TaskQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
package okhttp3.internal.concurrent

import okhttp3.internal.addIfAbsent
import java.util.concurrent.CountDownLatch
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit

/**
* A set of tasks that are executed in sequential order.
Expand All @@ -32,6 +35,8 @@ class TaskQueue internal constructor(
*/
val owner: Any
) {
private var shutdown = false

/** This queue's currently-executing task, or null if none is currently executing. */
private var activeTask: Task? = null

Expand Down Expand Up @@ -61,19 +66,55 @@ class TaskQueue internal constructor(
* The target execution time is implemented on a best-effort basis. If another task in this queue
* is running when that time is reached, that task is allowed to complete before this task is
* started. Similarly the task will be delayed if the host lacks compute resources.
*
* @throws RejectedExecutionException if the queue is shut down.
*/
fun schedule(task: Task, delayNanos: Long = 0L) {
task.initQueue(this)
synchronized(taskRunner) {
if (shutdown) throw RejectedExecutionException()

if (scheduleAndDecide(task, delayNanos)) {
taskRunner.kickCoordinator(this)
}
}
}

/** Like [schedule], but this silently discard the task if the queue is shut down. */
fun trySchedule(task: Task, delayNanos: Long = 0L) {
synchronized(taskRunner) {
if (shutdown) return

if (scheduleAndDecide(task, delayNanos)) {
taskRunner.kickCoordinator(this)
}
}
}

/** Returns true if this queue became idle before the timeout elapsed. */
fun awaitIdle(delayNanos: Long): Boolean {
val latch = CountDownLatch(1)

val task = object : Task("awaitIdle") {
override fun runOnce(): Long {
latch.countDown()
return -1L
}
}

// Don't delegate to schedule because that has to honor shutdown rules.
synchronized(taskRunner) {
if (scheduleAndDecide(task, 0L)) {
taskRunner.kickCoordinator(this)
}
}

return latch.await(delayNanos, TimeUnit.NANOSECONDS)
}

/** Adds [task] to run in [delayNanos]. Returns true if the coordinator should run. */
private fun scheduleAndDecide(task: Task, delayNanos: Long): Boolean {
task.initQueue(this)

val now = taskRunner.backend.nanoTime()
val executeNanoTime = now + delayNanos

Expand All @@ -100,7 +141,20 @@ class TaskQueue internal constructor(
* be removed from the execution schedule.
*/
fun cancelAll() {
check(!Thread.holdsLock(this))

synchronized(taskRunner) {
if (cancelAllAndDecide()) {
taskRunner.kickCoordinator(this)
}
}
}

fun shutdown() {
check(!Thread.holdsLock(this))

synchronized(taskRunner) {
shutdown = true
if (cancelAllAndDecide()) {
taskRunner.kickCoordinator(this)
}
Expand Down Expand Up @@ -160,7 +214,7 @@ class TaskQueue internal constructor(
synchronized(taskRunner) {
check(activeTask === task)

if (delayNanos != -1L) {
if (delayNanos != -1L && !shutdown) {
scheduleAndDecide(task, delayNanos)
} else if (!futureTasks.contains(task)) {
cancelTasks.remove(task) // We don't need to cancel it because it isn't scheduled.
Expand Down
13 changes: 11 additions & 2 deletions okhttp/src/main/java/okhttp3/internal/concurrent/TaskRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package okhttp3.internal.concurrent

import okhttp3.internal.addIfAbsent
import okhttp3.internal.notify
import okhttp3.internal.objectWaitNanos
import okhttp3.internal.threadFactory
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.SynchronousQueue
Expand Down Expand Up @@ -158,8 +157,18 @@ class TaskRunner(
taskRunner.notify()
}

/**
* Wait a duration in nanoseconds. Unlike [java.lang.Object.wait] this interprets 0 as
* "don't wait" instead of "wait forever".
*/
@Throws(InterruptedException::class)
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
override fun coordinatorWait(taskRunner: TaskRunner, nanos: Long) {
taskRunner.objectWaitNanos(nanos)
val ms = nanos / 1_000_000L
val ns = nanos - (ms * 1_000_000L)
if (ms > 0L || nanos > 0) {
(taskRunner as Object).wait(ms, ns.toInt())
}
}

fun shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import okhttp3.Response
import okhttp3.Route
import okhttp3.internal.EMPTY_RESPONSE
import okhttp3.internal.closeQuietly
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.http.ExchangeCodec
import okhttp3.internal.http1.Http1ExchangeCodec
import okhttp3.internal.http2.ConnectionShutdownException
Expand Down Expand Up @@ -321,7 +322,7 @@ class RealConnection(
val source = this.source!!
val sink = this.sink!!
socket.soTimeout = 0 // HTTP/2 connection timeouts are set per-stream.
val http2Connection = Http2Connection.Builder(true)
val http2Connection = Http2Connection.Builder(client = true, taskRunner = TaskRunner.INSTANCE)
.socket(socket, route.address.url.host, source, sink)
.listener(this)
.pingIntervalMillis(pingIntervalMillis)
Expand Down
Loading