Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement TaskRunner. #5485

Merged
merged 1 commit into from
Sep 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions okhttp/src/main/java/okhttp3/internal/Util.kt
Original file line number Diff line number Diff line change
Expand Up @@ -516,24 +516,25 @@ fun Int.toHexString(): String = Integer.toHexString(this)
*/
@Throws(InterruptedException::class)
fun Any.lockAndWaitNanos(nanos: Long) {
val ms = nanos / 1_000_000L
val ns = nanos - (ms * 1_000_000L)
synchronized(this) {
waitMillis(ms, ns.toInt())
objectWaitNanos(nanos)
}
}

@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "NOTHING_TO_INLINE")
inline fun Any.wait() = (this as Object).wait()

/**
* Lock and wait a duration in milliseconds and nanos.
* Unlike [java.lang.Object.wait] this interprets 0 as "don't wait" instead of "wait forever".
* Wait a duration in nanoseconds. Unlike [java.lang.Object.wait] this interprets 0 as "don't wait"
* instead of "wait forever".
*/
@Throws(InterruptedException::class)
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
fun Any.waitMillis(timeout: Long, nanos: Int = 0) {
if (timeout > 0L || nanos > 0) {
(this as Object).wait(timeout, nanos)
fun Any.objectWaitNanos(nanos: Long) {
val ms = nanos / 1_000_000L
val ns = nanos - (ms * 1_000_000L)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val ns = nanos - (ms * 1_000_000L)
val ns = nanos % 1_000_000L

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming subtract and multiply is faster than mod. Good assumption to validate.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://www.agner.org/optimize/instruction_tables.pdf

For an i7, multiply + subtract takes 4 operations and causes a delay of 4. Division takes 60 with a delay of 40-100.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me this is the kind of things I would expect the compiler to do for me. Does it happen sometimes but not always? do we just wanna guarantee that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really tough for the compiler to anticipate this case.

if (ms > 0L || nanos > 0) {
(this as Object).wait(ms, ns.toInt())
}
}

Expand Down Expand Up @@ -566,3 +567,7 @@ fun <T> readFieldOrNull(instance: Any, fieldType: Class<T>, fieldName: String):

return null
}

internal fun <E> MutableList<E>.addIfAbsent(element: E) {
if (!contains(element)) add(element)
}
34 changes: 34 additions & 0 deletions okhttp/src/main/java/okhttp3/internal/concurrent/Task.kt
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,43 @@ abstract class Task(
val name: String,
val daemon: Boolean = true
) {
// Guarded by the TaskRunner.
internal var queue: TaskQueue? = null

/** Undefined unless this is in [TaskQueue.futureTasks]. */
internal var nextExecuteNanoTime = -1L

internal var runRunnable: Runnable? = null
internal var cancelRunnable: Runnable? = null

/** Returns the delay in nanoseconds until the next execution, or -1L to not reschedule. */
abstract fun runOnce(): Long

/** Return true to skip the scheduled execution. */
open fun tryCancel(): Boolean = false

internal fun initQueue(queue: TaskQueue) {
if (this.queue === queue) return

check(this.queue === null) { "task is in multiple queues" }
this.queue = queue

this.runRunnable = Runnable {
var delayNanos = -1L
try {
delayNanos = runOnce()
} finally {
queue.runCompleted(this, delayNanos)
}
}

this.cancelRunnable = Runnable {
var skipExecution = false
try {
skipExecution = tryCancel()
} finally {
queue.tryCancelCompleted(this, skipExecution)
}
}
}
}
145 changes: 141 additions & 4 deletions okhttp/src/main/java/okhttp3/internal/concurrent/TaskQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,44 @@
*/
package okhttp3.internal.concurrent

import okhttp3.internal.addIfAbsent

/**
* A set of tasks that are executed in sequential order.
*
* Work within queues is not concurrent. This is equivalent to each queue having a dedicated thread
* for its work; in practice a set of queues may share a set of threads to save resources.
*/
interface TaskQueue {
class TaskQueue internal constructor(
private val taskRunner: TaskRunner,

/**
* An application-level object like a connection pool or HTTP call that this queue works on behalf
* of. This is intended to be useful for testing and debugging only.
*/
val owner: Any
) {
/** This queue's currently-executing task, or null if none is currently executing. */
private var activeTask: Task? = null

/** Scheduled tasks ordered by [Task.nextExecuteNanoTime]. */
private val futureTasks = mutableListOf<Task>()

/** Tasks to cancel. Always either [activeTask] or a member of [futureTasks]. */
private val cancelTasks = mutableListOf<Task>()

/** Returns a snapshot of tasks currently scheduled for execution. */
internal fun isActive(): Boolean {
check(Thread.holdsLock(taskRunner))

return activeTask != null || futureTasks.isNotEmpty()
}

/**
* Returns a snapshot of tasks currently scheduled for execution. Does not include the
* currently-executing task unless it is also scheduled for future execution.
*/
val scheduledTasks: List<Task>
get() = synchronized(taskRunner) { futureTasks.toList() }

/**
* Schedules [task] for execution in [delayNanos]. A task may only have one future execution
Expand All @@ -39,12 +62,126 @@ interface TaskQueue {
* is running when that time is reached, that task is allowed to complete before this task is
* started. Similarly the task will be delayed if the host lacks compute resources.
*/
fun schedule(task: Task, delayNanos: Long = 0L)
fun schedule(task: Task, delayNanos: Long) {
task.initQueue(this)

synchronized(taskRunner) {
if (scheduleAndDecide(task, delayNanos)) {
taskRunner.kickCoordinator(this)
}
}
}

/** Adds [task] to run in [delayNanos]. Returns true if the coordinator should run. */
private fun scheduleAndDecide(task: Task, delayNanos: Long): Boolean {
val now = taskRunner.backend.nanoTime()
val executeNanoTime = now + delayNanos

// If the task is already scheduled, take the earlier of the two times.
val existingIndex = futureTasks.indexOf(task)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you trusted task.nextExecuteTime then presumably you could simplify this by using a java PriorityQueue?

null to x = insert
x to <x = remove and insert
x to >x = don't change nextExecuteTime or modify queue

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea. Will do.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... upon actually trying it, it’s unsatisfying because of the above now thing. I’m going to leave it as lists for now, unsatisfying as that is.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for humoring me.

if (existingIndex != -1) {
if (task.nextExecuteNanoTime <= executeNanoTime) return false // Already scheduled earlier.
futureTasks.removeAt(existingIndex) // Already scheduled later: reschedule below!
}
task.nextExecuteNanoTime = executeNanoTime

// Insert in chronological order. Always compare deltas because nanoTime() is permitted to wrap.
var insertAt = futureTasks.indexOfFirst { it.nextExecuteNanoTime - now > delayNanos }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: simpler to compare to task.nextExecuteNanoTime? without backing out now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately System.nanoTime() is documented to permit wrapping around.

For example, now could be Long.MAX_VALUE - 100, the existing task could be Long.MAX_VALUE and the task-being scheduled could be Long.MAX_VALUE + 100 which is negative. We want to subtract off now to compare 100 < 200 instead of comparing the values-as-is which yields a different result.

I’ll add a comment!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Watch out for

"Differences in successive calls that span greater than approximately 292 years (263 nanoseconds) will not correctly compute elapsed time due to numerical overflow. "

:)

Copy link
Collaborator Author

@swankjesse swankjesse Sep 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not too worried about scheduling things 292 years into the future. I am worried about what the nanoTime initial value is. Is it 0? Long.MIN_VALUE?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, wasn't arguing against your valid justification. Just going back to basics and re-reading nanoTime, where I found that gem of a warning.

if (insertAt == -1) insertAt = futureTasks.size
futureTasks.add(insertAt, task)

// Run the coordinator if we inserted at the front.
return insertAt == 0
}

/**
* Schedules immediate execution of [Task.tryCancel] on all currently-enqueued tasks. These calls
* will not be made until any currently-executing task has completed. Tasks that return true will
* be removed from the execution schedule.
*/
fun cancelAll()
fun cancelAll() {
synchronized(taskRunner) {
if (cancelAllAndDecide()) {
taskRunner.kickCoordinator(this)
}
}
}

/** Returns true if the coordinator should run. */
private fun cancelAllAndDecide(): Boolean {
val runningTask = activeTask
if (runningTask != null) {
cancelTasks.addIfAbsent(runningTask)
}

for (task in futureTasks) {
cancelTasks.addIfAbsent(task)
}

// Run the coordinator if tasks were canceled.
return cancelTasks.isNotEmpty()
}

/**
* Posts the next available task to an executor for immediate execution.
*
* Returns the delay until the next call to this method, -1L for no further calls, or
* [Long.MAX_VALUE] to wait indefinitely.
*/
internal fun executeReadyTask(now: Long): Long {
check(Thread.holdsLock(taskRunner))

if (activeTask != null) return Long.MAX_VALUE // This queue is busy.

// Find a task to cancel.
val cancelTask = cancelTasks.firstOrNull()
if (cancelTask != null) {
activeTask = cancelTask
cancelTasks.removeAt(0)
taskRunner.backend.executeTask(cancelTask.cancelRunnable!!)
return Long.MAX_VALUE // This queue is busy until the cancel completes.
}

// Check if a task is immediately ready.
val runTask = futureTasks.firstOrNull() ?: return -1L
val delayNanos = runTask.nextExecuteNanoTime - now
if (delayNanos <= 0) {
activeTask = runTask
futureTasks.removeAt(0)
taskRunner.backend.executeTask(runTask.runRunnable!!)
return Long.MAX_VALUE // This queue is busy until the run completes.
}

// Wait until the next task is ready.
return delayNanos
}

internal fun runCompleted(task: Task, delayNanos: Long) {
synchronized(taskRunner) {
check(activeTask === task)

if (delayNanos != -1L) {
scheduleAndDecide(task, delayNanos)
} else if (!futureTasks.contains(task)) {
cancelTasks.remove(task) // We don't need to cancel it because it isn't scheduled.
}

activeTask = null
taskRunner.kickCoordinator(this)
}
}

internal fun tryCancelCompleted(task: Task, skipExecution: Boolean) {
synchronized(taskRunner) {
check(activeTask === task)

if (skipExecution) {
futureTasks.remove(task)
cancelTasks.remove(task)
}

activeTask = null
taskRunner.kickCoordinator(this)
}
}
}
Loading