Skip to content

Commit

Permalink
Implement TaskRunner.
Browse files Browse the repository at this point in the history
This is attempting to balance simplicity, efficiency, and testability.
It doesn't use ScheduledExecutorService because that class wants a
permanent scheduler thread. Instead this uses a coordinator thread
that does its own wait and notify, similar to the mechanism in the
ConnectionPool that this is intended to replace.
  • Loading branch information
squarejesse committed Sep 22, 2019
1 parent e5f2eb3 commit 6beb688
Show file tree
Hide file tree
Showing 5 changed files with 827 additions and 15 deletions.
21 changes: 13 additions & 8 deletions okhttp/src/main/java/okhttp3/internal/Util.kt
Original file line number Diff line number Diff line change
Expand Up @@ -516,24 +516,25 @@ fun Int.toHexString(): String = Integer.toHexString(this)
*/
@Throws(InterruptedException::class)
fun Any.lockAndWaitNanos(nanos: Long) {
val ms = nanos / 1_000_000L
val ns = nanos - (ms * 1_000_000L)
synchronized(this) {
waitMillis(ms, ns.toInt())
objectWaitNanos(nanos)
}
}

@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "NOTHING_TO_INLINE")
inline fun Any.wait() = (this as Object).wait()

/**
* Lock and wait a duration in milliseconds and nanos.
* Unlike [java.lang.Object.wait] this interprets 0 as "don't wait" instead of "wait forever".
* Wait a duration in nanoseconds. Unlike [java.lang.Object.wait] this interprets 0 as "don't wait"
* instead of "wait forever".
*/
@Throws(InterruptedException::class)
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
fun Any.waitMillis(timeout: Long, nanos: Int = 0) {
if (timeout > 0L || nanos > 0) {
(this as Object).wait(timeout, nanos)
fun Any.objectWaitNanos(nanos: Long) {
val ms = nanos / 1_000_000L
val ns = nanos - (ms * 1_000_000L)
if (ms > 0L || nanos > 0) {
(this as Object).wait(ms, ns.toInt())
}
}

Expand Down Expand Up @@ -566,3 +567,7 @@ fun <T> readFieldOrNull(instance: Any, fieldType: Class<T>, fieldName: String):

return null
}

internal fun <E> MutableList<E>.addIfAbsent(element: E) {
if (!contains(element)) add(element)
}
34 changes: 34 additions & 0 deletions okhttp/src/main/java/okhttp3/internal/concurrent/Task.kt
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,43 @@ abstract class Task(
val name: String,
val daemon: Boolean = true
) {
// Guarded by the TaskRunner.
internal var queue: TaskQueue? = null

/** Undefined unless this is in [TaskQueue.futureTasks]. */
internal var nextExecuteNanoTime = -1L

internal var runRunnable: Runnable? = null
internal var cancelRunnable: Runnable? = null

/** Returns the delay in nanoseconds until the next execution, or -1L to not reschedule. */
abstract fun runOnce(): Long

/** Return true to skip the scheduled execution. */
open fun tryCancel(): Boolean = false

internal fun initQueue(queue: TaskQueue) {
if (this.queue == queue) return

check(this.queue == null) { "task is in multiple queues" }
this.queue = queue

this.runRunnable = Runnable {
var delayNanos = -1L
try {
delayNanos = runOnce()
} finally {
queue.runCompleted(this, delayNanos)
}
}

this.cancelRunnable = Runnable {
var skipExecution = false
try {
skipExecution = tryCancel()
} finally {
queue.tryCancelCompleted(this, skipExecution)
}
}
}
}
145 changes: 141 additions & 4 deletions okhttp/src/main/java/okhttp3/internal/concurrent/TaskQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,44 @@
*/
package okhttp3.internal.concurrent

import okhttp3.internal.addIfAbsent

/**
* A set of tasks that are executed in sequential order.
*
* Work within queues is not concurrent. This is equivalent to each queue having a dedicated thread
* for its work; in practice a set of queues may share a set of threads to save resources.
*/
interface TaskQueue {
class TaskQueue internal constructor(
private val taskRunner: TaskRunner,

/**
* An application-level object like a connection pool or HTTP call that this queue works on behalf
* of. This is intended to be useful for testing and debugging only.
*/
val owner: Any
) {
/** This queue's currently-executing task, or null if none is currently executing. */
private var activeTask: Task? = null

/** Scheduled tasks ordered by [Task.nextExecuteNanoTime]. */
private val futureTasks = mutableListOf<Task>()

/** Tasks to cancel. Always either [activeTask] or a member of [futureTasks]. */
private val cancelTasks = mutableListOf<Task>()

/** Returns a snapshot of tasks currently scheduled for execution. */
internal fun isActive(): Boolean {
check(Thread.holdsLock(taskRunner))

return activeTask != null || futureTasks.isNotEmpty()
}

/**
* Returns a snapshot of tasks currently scheduled for execution. Does not include the
* currently-executing task unless it is also scheduled for future execution.
*/
val scheduledTasks: List<Task>
get() = synchronized(taskRunner) { futureTasks.toList() }

/**
* Schedules [task] for execution in [delayNanos]. A task may only have one future execution
Expand All @@ -39,12 +62,126 @@ interface TaskQueue {
* is running when that time is reached, that task is allowed to complete before this task is
* started. Similarly the task will be delayed if the host lacks compute resources.
*/
fun schedule(task: Task, delayNanos: Long = 0L)
fun schedule(task: Task, delayNanos: Long) {
task.initQueue(this)

synchronized(taskRunner) {
if (scheduleAndDecide(task, delayNanos)) {
taskRunner.kickCoordinator(this)
}
}
}

/** Adds [task] to run in [delayNanos]. Returns true if the coordinator should run. */
private fun scheduleAndDecide(task: Task, delayNanos: Long): Boolean {
val now = taskRunner.backend.nanoTime()
val executeNanoTime = now + delayNanos

// If the task is already scheduled, take the earlier of the two times.
val existingIndex = futureTasks.indexOf(task)
if (existingIndex != -1) {
if (task.nextExecuteNanoTime <= executeNanoTime) return false // Already scheduled earlier.
futureTasks.removeAt(existingIndex) // Already scheduled later: reschedule below!
}
task.nextExecuteNanoTime = executeNanoTime

// Insert in chronological order.
var insertAt = futureTasks.indexOfFirst { it.nextExecuteNanoTime - now > delayNanos }
if (insertAt == -1) insertAt = futureTasks.size
futureTasks.add(insertAt, task)

// Run the coordinator if we inserted at the front.
return insertAt == 0
}

/**
* Schedules immediate execution of [Task.tryCancel] on all currently-enqueued tasks. These calls
* will not be made until any currently-executing task has completed. Tasks that return true will
* be removed from the execution schedule.
*/
fun cancelAll()
fun cancelAll() {
synchronized(taskRunner) {
if (cancelAllAndDecide()) {
taskRunner.kickCoordinator(this)
}
}
}

/** Returns true if the coordinator should run. */
private fun cancelAllAndDecide(): Boolean {
val runningTask = activeTask
if (runningTask != null) {
cancelTasks.addIfAbsent(runningTask)
}

for (task in futureTasks) {
cancelTasks.addIfAbsent(task)
}

// Run the coordinator if tasks were canceled.
return cancelTasks.isNotEmpty()
}

/**
* Posts the next available task to an executor for immediate execution.
*
* Returns the delay until the next call to this method, -1L for no further calls, or
* [Long.MAX_VALUE] to wait indefinitely.
*/
internal fun executeReadyTask(now: Long): Long {
check(Thread.holdsLock(taskRunner))

if (activeTask != null) return Long.MAX_VALUE // This queue is busy.

// Find a task to cancel.
val cancelTask = cancelTasks.firstOrNull()
if (cancelTask != null) {
activeTask = cancelTask
cancelTasks.removeAt(0)
taskRunner.backend.executeTask(cancelTask.cancelRunnable!!)
return Long.MAX_VALUE // This queue is busy until the cancel completes.
}

// Check if a task is immediately ready.
val runTask = futureTasks.firstOrNull() ?: return -1L
val delayNanos = runTask.nextExecuteNanoTime - now
if (delayNanos <= 0) {
activeTask = runTask
futureTasks.removeAt(0)
taskRunner.backend.executeTask(runTask.runRunnable!!)
return Long.MAX_VALUE // This queue is busy until the run completes.
}

// Wait until the next task is ready.
return delayNanos
}

internal fun runCompleted(task: Task, delayNanos: Long) {
synchronized(taskRunner) {
check(activeTask == task)

if (delayNanos != -1L) {
scheduleAndDecide(task, delayNanos)
} else if (!futureTasks.contains(task)) {
cancelTasks.remove(task) // We don't need to cancel it because it isn't scheduled.
}

activeTask = null
taskRunner.kickCoordinator(this)
}
}

internal fun tryCancelCompleted(task: Task, skipExecution: Boolean) {
synchronized(taskRunner) {
check(activeTask == task)

if (skipExecution) {
futureTasks.remove(task)
cancelTasks.remove(task)
}

activeTask = null
taskRunner.kickCoordinator(this)
}
}
}
Loading

0 comments on commit 6beb688

Please sign in to comment.