Skip to content

Commit

Permalink
Be more consistent with task names
Browse files Browse the repository at this point in the history
Examples:
  - OkHttp TaskRunner
  - MockWebServer TaskRunner
  - OkHttp ConnectionPool
  - MockWebServer localhost applyAndAckSettings
  - OkHttp android.com applyAndAckSettings
  - OkHttp android.com onSettings
  - OkHttp awaitIdle
  - OkHttp localhost

I'm trying to use type names where appropriate, or method names otherwise.
Names include hostname and stream name if the task is working on behalf
of a specific stream or connection.
  • Loading branch information
squarejesse committed Oct 6, 2019
1 parent afd9db3 commit 38d1d04
Show file tree
Hide file tree
Showing 12 changed files with 244 additions and 297 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ import javax.net.ssl.X509TrustManager
* in sequence.
*/
class MockWebServer : ExternalResource(), Closeable {
private val taskRunner = TaskRunner()
private val taskRunnerBackend = TaskRunner.RealBackend(
threadFactory("MockWebServer TaskRunner", true))
private val taskRunner = TaskRunner(taskRunnerBackend)
private val requestQueue = LinkedBlockingQueue<RecordedRequest>()
private val openClientSockets =
Collections.newSetFromMap(ConcurrentHashMap<Socket, Boolean>())
Expand Down Expand Up @@ -463,6 +465,7 @@ class MockWebServer : ExternalResource(), Closeable {
throw IOException("Gave up waiting for queue to shut down")
}
}
taskRunnerBackend.shutdown()
}

@Synchronized override fun after() {
Expand Down
2 changes: 1 addition & 1 deletion okhttp/src/main/java/okhttp3/internal/Util.kt
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ fun Source.discard(timeout: Int, timeUnit: TimeUnit): Boolean = try {
false
}

fun Socket.connectionName(): String {
fun Socket.peerName(): String {
val address = remoteSocketAddress
return if (address is InetSocketAddress) address.hostName else address.toString()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class DiskLruCache internal constructor(
private var nextSequenceNumber: Long = 0

private val cleanupQueue = taskRunner.newQueue()
private val cleanupTask = object : Task("OkHttp DiskLruCache", cancelable = false) {
private val cleanupTask = object : Task("OkHttp Cache", cancelable = false) {
override fun runOnce(): Long {
synchronized(this@DiskLruCache) {
if (!initialized || closed) {
Expand Down
4 changes: 3 additions & 1 deletion okhttp/src/main/java/okhttp3/internal/concurrent/Task.kt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ package okhttp3.internal.concurrent
*/
abstract class Task(
val name: String,
val cancelable: Boolean = true
val cancelable: Boolean
) {
// Guarded by the TaskRunner.
internal var queue: TaskQueue? = null
Expand All @@ -65,4 +65,6 @@ abstract class Task(
check(this.queue === null) { "task is in multiple queues" }
this.queue = queue
}

override fun toString() = name
}
44 changes: 43 additions & 1 deletion okhttp/src/main/java/okhttp3/internal/concurrent/TaskQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ class TaskQueue internal constructor(
}
}

/** Overload of [schedule] that uses a lambda for a repeating task. */
fun schedule(
name: String,
delayNanos: Long = 0L,
cancelable: Boolean = true,
block: () -> Long
) {
schedule(object : Task(name, cancelable) {
override fun runOnce() = block()
}, delayNanos)
}

/** Like [schedule], but this silently discard the task if the queue is shut down. */
fun trySchedule(task: Task, delayNanos: Long = 0L) {
synchronized(taskRunner) {
Expand All @@ -77,11 +89,41 @@ class TaskQueue internal constructor(
}
}

/** Executes [block] once on a task runner thread. */
fun execute(
name: String,
delayNanos: Long = 0L,
cancelable: Boolean = true,
block: () -> Unit
) {
schedule(object : Task(name, cancelable) {
override fun runOnce(): Long {
block()
return -1L
}
}, delayNanos)
}

/** Like [execute], but this silently discard the task if the queue is shut down. */
fun tryExecute(
name: String,
delayNanos: Long = 0L,
cancelable: Boolean = true,
block: () -> Unit
) {
trySchedule(object : Task(name, cancelable) {
override fun runOnce(): Long {
block()
return -1L
}
}, delayNanos)
}

/** Returns true if this queue became idle before the timeout elapsed. */
fun awaitIdle(delayNanos: Long): Boolean {
val latch = CountDownLatch(1)

val task = object : Task("awaitIdle", cancelable = false) {
val task = object : Task("OkHttp awaitIdle", cancelable = false) {
override fun runOnce(): Long {
latch.countDown()
return -1L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import okhttp3.internal.addIfAbsent
import okhttp3.internal.notify
import okhttp3.internal.threadFactory
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadFactory
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

Expand All @@ -35,7 +36,7 @@ import java.util.concurrent.TimeUnit
* Most applications should share a process-wide [TaskRunner] and use queues for per-client work.
*/
class TaskRunner(
val backend: Backend = RealBackend()
val backend: Backend
) {
private var coordinatorWaiting = false
private var coordinatorWakeUpAt = 0L
Expand Down Expand Up @@ -243,13 +244,13 @@ class TaskRunner(
fun execute(runnable: Runnable)
}

internal class RealBackend : Backend {
class RealBackend(threadFactory: ThreadFactory) : Backend {
private val executor = ThreadPoolExecutor(
0, // corePoolSize.
Int.MAX_VALUE, // maximumPoolSize.
60L, TimeUnit.SECONDS, // keepAliveTime.
SynchronousQueue(),
threadFactory("OkHttp Task", true)
threadFactory
)

override fun beforeTask(taskRunner: TaskRunner) {
Expand Down Expand Up @@ -286,6 +287,6 @@ class TaskRunner(

companion object {
@JvmField
val INSTANCE = TaskRunner(RealBackend())
val INSTANCE = TaskRunner(RealBackend(threadFactory("OkHttp TaskRunner", true)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class RealConnectionPool(
private val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration)

private val cleanupQueue: TaskQueue = taskRunner.newQueue()
private val cleanupTask = object : Task("OkHttp ConnectionPool") {
private val cleanupTask = object : Task("OkHttp ConnectionPool", cancelable = true) {
override fun runOnce() = cleanup(System.nanoTime())
}

Expand Down
Loading

0 comments on commit 38d1d04

Please sign in to comment.