-
Notifications
You must be signed in to change notification settings - Fork 9.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement TaskRunner. #5485
Implement TaskRunner. #5485
Conversation
1412604
to
6beb688
Compare
/** Returns the delay in nanoseconds until the next execution, or -1L to not reschedule. */ | ||
abstract fun runOnce(): Long | ||
|
||
/** Return true to skip the scheduled execution. */ | ||
open fun tryCancel(): Boolean = false | ||
|
||
internal fun initQueue(queue: TaskQueue) { | ||
if (this.queue == queue) return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be ===?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! Good catch.
(this as Object).wait(timeout, nanos) | ||
fun Any.objectWaitNanos(nanos: Long) { | ||
val ms = nanos / 1_000_000L | ||
val ns = nanos - (ms * 1_000_000L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val ns = nanos - (ms * 1_000_000L) | |
val ns = nanos % 1_000_000L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming subtract and multiply is faster than mod. Good assumption to validate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://www.agner.org/optimize/instruction_tables.pdf
For an i7, multiply + subtract takes 4 operations and causes a delay of 4. Division takes 60 with a delay of 40-100.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me this is the kind of things I would expect the compiler to do for me. Does it happen sometimes but not always? do we just wanna guarantee that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really tough for the compiler to anticipate this case.
task.nextExecuteNanoTime = executeNanoTime | ||
|
||
// Insert in chronological order. | ||
var insertAt = futureTasks.indexOfFirst { it.nextExecuteNanoTime - now > delayNanos } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: simpler to compare to task.nextExecuteNanoTime? without backing out now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately System.nanoTime()
is documented to permit wrapping around.
For example, now
could be Long.MAX_VALUE - 100
, the existing task could be Long.MAX_VALUE
and the task-being scheduled could be Long.MAX_VALUE + 100
which is negative. We want to subtract off now to compare 100 < 200
instead of comparing the values-as-is which yields a different result.
I’ll add a comment!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Watch out for
"Differences in successive calls that span greater than approximately 292 years (263 nanoseconds) will not correctly compute elapsed time due to numerical overflow. "
:)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m not too worried about scheduling things 292 years into the future. I am worried about what the nanoTime initial value is. Is it 0? Long.MIN_VALUE?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, wasn't arguing against your valid justification. Just going back to basics and re-reading nanoTime, where I found that gem of a warning.
val executeNanoTime = now + delayNanos | ||
|
||
// If the task is already scheduled, take the earlier of the two times. | ||
val existingIndex = futureTasks.indexOf(task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you trusted task.nextExecuteTime then presumably you could simplify this by using a java PriorityQueue?
null to x = insert
x to <x = remove and insert
x to >x = don't change nextExecuteTime or modify queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea. Will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... upon actually trying it, it’s unsatisfying because of the above now
thing. I’m going to leave it as lists for now, unsatisfying as that is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for humoring me.
1, // maximumPoolSize. | ||
60L, TimeUnit.SECONDS, // keepAliveTime. | ||
SynchronousQueue(), | ||
threadFactory("OkHttp Task Coordinator", false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a good opportunity to throw the container users a bone and allow creating a backend with smaller timeouts or all daemon threads? Can we make this simple e.g. a RealBackend factory method to create for different uses?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My aspiration is to make container users happy by making the defaults work for them! Threads should promptly exit when there's no work to do.
|
||
private val taskExecutor: Executor = ThreadPoolExecutor( | ||
0, // corePoolSize. | ||
Int.MAX_VALUE, // maximumPoolSize. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to support contrained environments? e.g. at most X threads or a BlockingQueue etc? later?
Once we allow this to be customised in anyway like Dispatcher we presumably need to handle some weird edge cases.
val d = Dispatcher(
ThreadPoolExecutor(1, 10, 1, TimeUnit.MINUTES, ArrayBlockingQueue<Runnable>(5),
ThreadFactoryBuilder().apply {
setDaemon(true)
setNameFormat("XXX")
}.build()))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to keep this class and it's backend as implementation details. I think our resource limiting needs to be based on limiting the number of TaskQueues, which indirectly limits the number of threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing blocking in the review
Thanks for the quick turnaround on this. |
This is attempting to balance simplicity, efficiency, and testability. It doesn't use ScheduledExecutorService because that class wants a permanent scheduler thread. Instead this uses a coordinator thread that does its own wait and notify, similar to the mechanism in the ConnectionPool that this is intended to replace.
6beb688
to
3a3a9e0
Compare
This is attempting to balance simplicity, efficiency, and testability.
It doesn't use ScheduledExecutorService because that class wants a
permanent scheduler thread. Instead this uses a coordinator thread
that does its own wait and notify, similar to the mechanism in the
ConnectionPool that this is intended to replace.