Skip to content

Commit

Permalink
Use a state machine to implement AsyncTimeout (#1397)
Browse files Browse the repository at this point in the history
Previously we had 3 boolean state variables to track 4 possible
states. That was more complex than necessary.
  • Loading branch information
squarejesse authored Dec 16, 2023
1 parent 45d6834 commit ac9cd96
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 77 deletions.
6 changes: 0 additions & 6 deletions okio/api/okio.api
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public final class okio/-InflaterSourceExtensions {
}

public class okio/AsyncTimeout : okio/Timeout {
public static final field Companion Lokio/AsyncTimeout$Companion;
public fun <init> ()V
public final fun access$newTimeoutException (Ljava/io/IOException;)Ljava/io/IOException;
public fun cancel ()V
Expand All @@ -57,11 +56,6 @@ public class okio/AsyncTimeout : okio/Timeout {
public final fun withTimeout (Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
}

public final class okio/AsyncTimeout$Companion {
public final fun getCondition ()Ljava/util/concurrent/locks/Condition;
public final fun getLock ()Ljava/util/concurrent/locks/ReentrantLock;
}

public final class okio/Buffer : java/lang/Cloneable, java/nio/channels/ByteChannel, okio/BufferedSink, okio/BufferedSource {
public final fun -deprecated_getByte (J)B
public final fun -deprecated_size ()J
Expand Down
171 changes: 100 additions & 71 deletions okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,61 +28,62 @@ import kotlin.concurrent.withLock
* writing.
*
* Subclasses should override [timedOut] to take action when a timeout occurs. This method will be
* invoked by the shared watchdog thread so it should not do any long-running operations. Otherwise
* invoked by the shared watchdog thread so it should not do any long-running operations. Otherwise,
* we risk starving other timeouts from being triggered.
*
* Use [sink] and [source] to apply this timeout to a stream. The returned value will apply the
* timeout to each operation on the wrapped stream.
*
* Callers should call [enter] before doing work that is subject to timeouts, and [exit] afterwards.
* Callers should call [enter] before doing work that is subject to timeouts, and [exit] afterward.
* The return value of [exit] indicates whether a timeout was triggered. Note that the call to
* [timedOut] is asynchronous, and may be called after [exit].
*/
open class AsyncTimeout : Timeout() {
/** True if this node is currently in the queue. */
private var inQueue = false
private var state = STATE_IDLE

/** The next node in the linked list. */
private var next: AsyncTimeout? = null

/** If scheduled, this is the time that the watchdog should time this out. */
private var timeoutAt = 0L

private var isCanceled = false
private var hadTimeoutWhenCanceled = false

fun enter() {
val timeoutNanos = timeoutNanos()
val hasDeadline = hasDeadline()
if (timeoutNanos == 0L && !hasDeadline) {
return // No timeout and no deadline? Don't bother with the queue.
}
scheduleTimeout(this, timeoutNanos, hasDeadline)

lock.withLock {
check(state == STATE_IDLE) { "Unbalanced enter/exit" }
state = STATE_IN_QUEUE
insertIntoQueue(this, timeoutNanos, hasDeadline)
}
}

/** Returns true if the timeout occurred. */
fun exit(): Boolean {
lock.withLock {
if (isCanceled) {
return hadTimeoutWhenCanceled
.also {
isCanceled = false
hadTimeoutWhenCanceled = false
}
val oldState = this.state
state = STATE_IDLE

if (oldState == STATE_IN_QUEUE) {
removeFromQueue(this)
return false
} else {
return oldState == STATE_TIMED_OUT
}

return cancelScheduledTimeout(this)
}
}

override fun cancel() {
super.cancel()

lock.withLock {
if (isCanceled) return
if (!inQueue) return
isCanceled = true
hadTimeoutWhenCanceled = cancelScheduledTimeout(this)
if (state == STATE_IN_QUEUE) {
removeFromQueue(this)
state = STATE_CANCELED
}
}
}

Expand Down Expand Up @@ -197,16 +198,16 @@ open class AsyncTimeout : Timeout() {
return e
}

private class Watchdog internal constructor() : Thread("Okio Watchdog") {
private class Watchdog : Thread("Okio Watchdog") {
init {
isDaemon = true
}

override fun run() {
while (true) {
try {
var timedOut: AsyncTimeout? = null
AsyncTimeout.lock.withLock {
var timedOut: AsyncTimeout?
lock.withLock {
timedOut = awaitTimeout()

// The queue is completely empty. Let this thread exit and let another watchdog thread
Expand All @@ -225,7 +226,7 @@ open class AsyncTimeout : Timeout() {
}
}

companion object {
private companion object {
val lock: ReentrantLock = ReentrantLock()
val condition: Condition = lock.newCondition()

Expand All @@ -240,6 +241,43 @@ open class AsyncTimeout : Timeout() {
private val IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60)
private val IDLE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS)

/*
* .-------------.
* | |
* .------------ exit() ------| CANCELED |
* | | |
* | '-------------'
* | ^
* | | cancel()
* v |
* .-------------. .-------------.
* | |---- enter() ----->| |
* | IDLE | | IN QUEUE |
* | |<---- exit() ------| |
* '-------------' '-------------'
* ^ |
* | | time out
* | v
* | .-------------.
* | | |
* '------------ exit() ------| TIMED OUT |
* | |
* '-------------'
*
* Notes:
* * enter() crashes if called from a state other than IDLE.
* * If there's no timeout (ie. wait forever), then enter() is a no-op. There's no state to
* track entered but not in the queue.
* * exit() is a no-op from IDLE. This is probably too lenient, but it made it simpler for
* early implementations to support cases where enter() as a no-op.
* * cancel() is a no-op from every state but IN QUEUE.
*/

private const val STATE_IDLE = 0
private const val STATE_IN_QUEUE = 1
private const val STATE_TIMED_OUT = 2
private const val STATE_CANCELED = 3

/**
* The watchdog thread processes a linked list of pending timeouts, sorted in the order to be
* triggered. This class synchronizes on AsyncTimeout.class. This lock guards the queue.
Expand All @@ -250,77 +288,67 @@ open class AsyncTimeout : Timeout() {
*/
private var head: AsyncTimeout? = null

private fun scheduleTimeout(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) {
AsyncTimeout.lock.withLock {
check(!node.inQueue) { "Unbalanced enter/exit" }
node.inQueue = true

// Start the watchdog thread and create the head node when the first timeout is scheduled.
if (head == null) {
head = AsyncTimeout()
Watchdog().start()
}
private fun insertIntoQueue(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) {
// Start the watchdog thread and create the head node when the first timeout is scheduled.
if (head == null) {
head = AsyncTimeout()
Watchdog().start()
}

val now = System.nanoTime()
if (timeoutNanos != 0L && hasDeadline) {
// Compute the earliest event; either timeout or deadline. Because nanoTime can wrap
// around, minOf() is undefined for absolute values, but meaningful for relative ones.
node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now)
} else if (timeoutNanos != 0L) {
node.timeoutAt = now + timeoutNanos
} else if (hasDeadline) {
node.timeoutAt = node.deadlineNanoTime()
} else {
throw AssertionError()
}
val now = System.nanoTime()
if (timeoutNanos != 0L && hasDeadline) {
// Compute the earliest event; either timeout or deadline. Because nanoTime can wrap
// around, minOf() is undefined for absolute values, but meaningful for relative ones.
node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now)
} else if (timeoutNanos != 0L) {
node.timeoutAt = now + timeoutNanos
} else if (hasDeadline) {
node.timeoutAt = node.deadlineNanoTime()
} else {
throw AssertionError()
}

// Insert the node in sorted order.
val remainingNanos = node.remainingNanos(now)
var prev = head!!
while (true) {
if (prev.next == null || remainingNanos < prev.next!!.remainingNanos(now)) {
node.next = prev.next
prev.next = node
if (prev === head) {
// Wake up the watchdog when inserting at the front.
condition.signal()
}
break
// Insert the node in sorted order.
val remainingNanos = node.remainingNanos(now)
var prev = head!!
while (true) {
if (prev.next == null || remainingNanos < prev.next!!.remainingNanos(now)) {
node.next = prev.next
prev.next = node
if (prev === head) {
// Wake up the watchdog when inserting at the front.
condition.signal()
}
prev = prev.next!!
break
}
prev = prev.next!!
}
}

/** Returns true if the timeout occurred. */
private fun cancelScheduledTimeout(node: AsyncTimeout): Boolean {
if (!node.inQueue) return false
node.inQueue = false

// Remove the node from the linked list.
private fun removeFromQueue(node: AsyncTimeout) {
var prev = head
while (prev != null) {
if (prev.next === node) {
prev.next = node.next
node.next = null
return false
return
}
prev = prev.next
}

// The node wasn't found in the linked list: it must have timed out!
return true
error("node was not found in the queue")
}

/**
* Removes and returns the node at the head of the list, waiting for it to time out if
* necessary. This returns [head] if there was no node at the head of the list when starting,
* and there continues to be no node after waiting [IDLE_TIMEOUT_NANOS]. It returns null if a
* new node was inserted while waiting. Otherwise this returns the node being waited on that has
* been removed.
* new node was inserted while waiting. Otherwise, this returns the node being waited on that
* has been removed.
*/
@Throws(InterruptedException::class)
internal fun awaitTimeout(): AsyncTimeout? {
fun awaitTimeout(): AsyncTimeout? {
// Get the next eligible node.
val node = head!!.next

Expand All @@ -335,7 +363,7 @@ open class AsyncTimeout : Timeout() {
}
}

var waitNanos = node.remainingNanos(System.nanoTime())
val waitNanos = node.remainingNanos(System.nanoTime())

// The head of the queue hasn't timed out yet. Await that.
if (waitNanos > 0) {
Expand All @@ -346,6 +374,7 @@ open class AsyncTimeout : Timeout() {
// The head of the queue has timed out. Remove it.
head!!.next = node.next
node.next = null
node.state = STATE_TIMED_OUT
return node
}
}
Expand Down

0 comments on commit ac9cd96

Please sign in to comment.