-
Notifications
You must be signed in to change notification settings - Fork 9.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adopt TaskRunner in RealConnectionPool #5494
Conversation
/** Returns true if this queue became idle before the timeout elapsed. */ | ||
private fun TaskQueue.awaitIdle(timeout: Long, timeUnit: TimeUnit): Boolean { | ||
val latch = CountDownLatch(1) | ||
schedule(object : Task("awaitIdle") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making a Task companion factory would be nice here.
Task("awaitIdle") {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried it and it doesn’t come out as satisfying as I was hoping because of the return value.
schedule(Task("awaitIdle") {
latch.countDown()
return@Task -1L
})
Lemme convert a few more of our thread pools over to use this new framework and then I’d like to take another attempt at tightening up the syntax.
/** Returns true if this queue became idle before the timeout elapsed. */ | ||
private fun TaskQueue.awaitIdle(timeout: Long, timeUnit: TimeUnit): Boolean { | ||
val latch = CountDownLatch(1) | ||
schedule(object : Task("awaitIdle") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we try to put some unique prefix on task names? So an app with >1 okhttp client, threads can be disambiguated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. But unfortunately from this layer they’re all pretty symmetric.
We’d need to either name the OkHttpClient, or name its component parts, or something. Any recommendations on what the names should be?
keepAliveDuration: Long, | ||
timeUnit: TimeUnit | ||
) : this(RealConnectionPool( | ||
taskRunner = TaskRunner.INSTANCE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How will we detect contention of coordinatorExecutor since its now a process singleton thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t anticipate contention on it because it doesn’t do anything other than kickoff tasks and sleep.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... and it should not do anything else unless we address that.
That said, it’d be interesting to shrink the total number of threads for a RealConnectionPool use case from 2 to 1 by sharing threads between task execution and the coordinator. The implementation gets a little more complex because threads would need to switch roles.
true | ||
} else { | ||
// Awake the cleanup thread: we may have exceeded the idle connection limit. | ||
this.notifyAll() | ||
cleanupQueue.schedule(cleanupTask, 0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
worth making 0L a default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Done.
I like where this is going. |
3a760fd
to
ca07580
Compare
ca07580
to
10d41b1
Compare
@@ -132,7 +132,7 @@ class TaskRunner( | |||
1, // maximumPoolSize. | |||
60L, TimeUnit.SECONDS, // keepAliveTime. | |||
SynchronousQueue(), | |||
threadFactory("OkHttp Task Coordinator", false) | |||
threadFactory("OkHttp Task Coordinator", true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dropping support for non-daemon threads for now.
@yschimke please take a look! I’m going to start rolling the TaskRunner out to the HTTP/2 and web sockets thread pools next! |
@@ -166,4 +166,8 @@ class TaskRunner( | |||
taskExecutor.shutdown() | |||
} | |||
} | |||
|
|||
companion object { | |||
val INSTANCE = TaskRunner(RealBackend()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this INSTANCE gets shutdown via INSTANCE.backend.shutDown() are you happy with RejectedExecutionException being raised? Or do you want a clearer message to educate the developer? OR should this instance not allow shutdown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one shouldn’t ever be shut down. It’s intended as an internal API but I’ve learned that sometimes developers can’t help themselves!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only a question around INSTANCE.backend.shutDown.
The D in shutdown should be lowercase, grep the project if you want to argue :)
Ahhh, yes I did spell shutdown wrong. Fixed! Also tightened visibility to |
10d41b1
to
f5e8ce1
Compare
This also configures tests to assert that the connection pool isn't doing any work after the test completes.
f5e8ce1
to
bc3ad11
Compare
private val coordinatorExecutor = ThreadPoolExecutor( | ||
0, // corePoolSize. | ||
1, // maximumPoolSize. | ||
60L, TimeUnit.SECONDS, // keepAliveTime. | ||
SynchronousQueue(), | ||
threadFactory("OkHttp Task Coordinator", false) | ||
LinkedBlockingQueue<Runnable>(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to switch this to prevent RejectedExecutionExceptions when a coordinator quickly exits and then starts again.
This also configures tests to assert that the connection pool
isn't doing any work after the test completes.