Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change TaskRunner to limit context switches. #5532

Merged
merged 1 commit into from
Oct 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,9 @@ class MockWebServer : ExternalResource(), Closeable {
val request = readRequest(stream)
atomicRequestCount.incrementAndGet()
requestQueue.add(request)
if (request.failure != null) {
return // Nothing to respond to.
}

val response: MockResponse = dispatcher.dispatch(request)

Expand Down
16 changes: 0 additions & 16 deletions okhttp/src/main/java/okhttp3/internal/concurrent/Task.kt
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ abstract class Task(
/** Undefined unless this is in [TaskQueue.futureTasks]. */
internal var nextExecuteNanoTime = -1L

internal var runRunnable: Runnable? = null

/** Returns the delay in nanoseconds until the next execution, or -1L to not reschedule. */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Negative values < -1 seem to be effectively a priority, is that ok? Or should be an error?

Not a big concern given this is internal.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intended contract is for implementations to never return such values! If ever we wanted to make this more broadly usable we’d need to enforce that requirement.

abstract fun runOnce(): Long

Expand All @@ -66,19 +64,5 @@ abstract class Task(

check(this.queue === null) { "task is in multiple queues" }
this.queue = queue

this.runRunnable = Runnable {
val currentThread = Thread.currentThread()
val oldName = currentThread.name
currentThread.name = name

var delayNanos = -1L
try {
delayNanos = runOnce()
} finally {
queue.runCompleted(this, delayNanos)
currentThread.name = oldName
}
}
}
}
67 changes: 11 additions & 56 deletions okhttp/src/main/java/okhttp3/internal/concurrent/TaskQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,16 @@ import java.util.concurrent.TimeUnit
class TaskQueue internal constructor(
private val taskRunner: TaskRunner
) {
private var shutdown = false
internal var shutdown = false

/** This queue's currently-executing task, or null if none is currently executing. */
private var activeTask: Task? = null

/** True if the [activeTask] should not recur when it completes. */
private var cancelActiveTask = false
internal var activeTask: Task? = null

/** Scheduled tasks ordered by [Task.nextExecuteNanoTime]. */
private val futureTasks = mutableListOf<Task>()

internal fun isActive(): Boolean {
check(Thread.holdsLock(taskRunner))
internal val futureTasks = mutableListOf<Task>()

return activeTask != null || futureTasks.isNotEmpty()
}
/** True if the [activeTask] should be canceled when it completes. */
internal var cancelActiveTask = false

/**
* Returns a snapshot of tasks currently scheduled for execution. Does not include the
Expand Down Expand Up @@ -87,7 +81,7 @@ class TaskQueue internal constructor(
fun awaitIdle(delayNanos: Long): Boolean {
val latch = CountDownLatch(1)

val task = object : Task("awaitIdle") {
val task = object : Task("awaitIdle", cancelable = false) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was the problem with flaky tests!

override fun runOnce(): Long {
latch.countDown()
return -1L
Expand All @@ -104,8 +98,8 @@ class TaskQueue internal constructor(
return latch.await(delayNanos, TimeUnit.NANOSECONDS)
}

/** Adds [task] to run in [delayNanos]. Returns true if the coordinator should run. */
private fun scheduleAndDecide(task: Task, delayNanos: Long): Boolean {
/** Adds [task] to run in [delayNanos]. Returns true if the coordinator is impacted. */
internal fun scheduleAndDecide(task: Task, delayNanos: Long): Boolean {
task.initQueue(this)

val now = taskRunner.backend.nanoTime()
Expand All @@ -124,7 +118,7 @@ class TaskQueue internal constructor(
if (insertAt == -1) insertAt = futureTasks.size
futureTasks.add(insertAt, task)

// Run the coordinator if we inserted at the front.
// Impact the coordinator if we inserted at the front.
return insertAt == 0
}

Expand Down Expand Up @@ -154,8 +148,8 @@ class TaskQueue internal constructor(
}
}

/** Returns true if the coordinator should run. */
private fun cancelAllAndDecide(): Boolean {
/** Returns true if the coordinator is impacted. */
internal fun cancelAllAndDecide(): Boolean {
if (activeTask != null && activeTask!!.cancelable) {
cancelActiveTask = true
}
Expand All @@ -169,43 +163,4 @@ class TaskQueue internal constructor(
}
return tasksCanceled
}

/**
* Posts the next available task to an executor for immediate execution.
*
* Returns the delay until the next call to this method, -1L for no further calls, or
* [Long.MAX_VALUE] to wait indefinitely.
*/
internal fun executeReadyTask(now: Long): Long {
check(Thread.holdsLock(taskRunner))

if (activeTask != null) return Long.MAX_VALUE // This queue is busy.

// Check if a task is immediately ready.
val runTask = futureTasks.firstOrNull() ?: return -1L
val delayNanos = runTask.nextExecuteNanoTime - now
if (delayNanos <= 0) {
activeTask = runTask
futureTasks.removeAt(0)
taskRunner.backend.executeTask(runTask.runRunnable!!)
return Long.MAX_VALUE // This queue is busy until the run completes.
}

// Wait until the next task is ready.
return delayNanos
}

internal fun runCompleted(task: Task, delayNanos: Long) {
synchronized(taskRunner) {
check(activeTask === task)

if (delayNanos != -1L && !cancelActiveTask && !shutdown) {
scheduleAndDecide(task, delayNanos)
}

activeTask = null
cancelActiveTask = false
taskRunner.kickCoordinator(this)
}
}
}
Loading