-
Notifications
You must be signed in to change notification settings - Fork 534
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Blocking mechanism with cached threads #2687
Conversation
Some more benchmarks ran locally on my machine:
BlockingBenchmark.blockThenCede 10000 thrpt 20 12.032 ± 0.250 ops/s
BlockingBenchmark.coarse 10000 thrpt 20 9337.834 ± 67.816 ops/s
BlockingBenchmark.fine 10000 thrpt 20 12.635 ± 0.055 ops/s
BlockingBenchmark.nested 10000 thrpt 20 12.254 ± 0.356 ops/s
Benchmark (size) Mode Cnt Score Error Units
BlockingBenchmark.blockThenCede 10000 thrpt 20 23.029 ± 0.249 ops/s
BlockingBenchmark.coarse 10000 thrpt 20 10085.972 ± 159.529 ops/s
BlockingBenchmark.fine 10000 thrpt 20 1734.625 ± 30.182 ops/s
BlockingBenchmark.nested 10000 thrpt 20 1624.172 ± 90.719 ops/s |
This PR does not affect |
If anyone wants to take it for a spin, This snapshot works with |
After the latest round of changes: Benchmark (size) Mode Cnt Score Error Units
BlockingBenchmark.blockThenCede 10000 thrpt 20 6.229 ± 0.299 ops/s
BlockingBenchmark.coarse 10000 thrpt 20 3565.953 ± 105.111 ops/s
BlockingBenchmark.fine 10000 thrpt 20 758.839 ± 19.670 ops/s
BlockingBenchmark.nested 10000 thrpt 20 729.449 ± 12.753 ops/s Local: Benchmark (size) Mode Cnt Score Error Units
BlockingBenchmark.blockThenCede 10000 thrpt 20 21.879 ± 0.202 ops/s
BlockingBenchmark.coarse 10000 thrpt 20 10181.248 ± 18.676 ops/s
BlockingBenchmark.fine 10000 thrpt 20 1795.412 ± 16.884 ops/s
BlockingBenchmark.nested 10000 thrpt 20 1673.573 ± 6.557 ops/s |
Latest snapshot: "io.vasilev" %% "cats-effect" % "3.3-33-d14798d" Edit: works with |
This is supremely amazing. I'm going to take a closer review over the next few days but I really want to get this out soon! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is honestly extremely clever. I'm a little concerned about the index
bit, but aside from that it's amazing
@@ -66,6 +64,9 @@ private final class WorkerThread( | |||
import TracingConstants._ | |||
import WorkStealingThreadPoolConstants._ | |||
|
|||
// Index assigned by the `WorkStealingThreadPool` for identification purposes. | |||
private[this] var _index: Int = idx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this mean that threads can get "lost" in the cachedThreads
set? Since the index could change after the thread is inserted, meaning that the comparator would fail to find the old value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private[unsafe] val cachedThreads: ConcurrentSkipListSet[WorkerThread] =
new ConcurrentSkipListSet(Comparator.comparingInt[WorkerThread](_.nameIndex))
val nameIndex: Int = pool.blockedWorkerThreadNamingIndex.incrementAndGet()
The threads in the cachedThreads
skip list are ordered by this immutable val
. The value of the nameIndex
does not really matter, it can also be System.identityHashCode
. But, we need a stable and unique value for debugging (think inspecting threads in JDK Mission Control or in a debugger) and so I just reused it for this purpose too. The ordering doesn't really matter either. I chose the skip list because it offers a log n
remove operation instead of a linear one (the remove is necessary when a worker thread is woken up).
The only thing left to do is maybe rename nameIndex
to something else.
index
is mutable and only has a meaning inside our WSTP. It is an index for the threads in the core pool (only the N
threads where N = Runtime#availableProcessors()
) and is used to keep track of their data structures (local queues and synchronization primitives), while nameIndex
is just used for keeping track of threads in debuggers (to avoid multiple physical threads sharing the same name and be confusing to users).
I only force pushed to rebase the PR, I did not make any changes. |
Wait a second, I messed up signing my commits. I will force push again. |
Done, sorry for the inconvenience. |
Up to now, every thread that was spawned in order to respond to
scala.concurrent.blocking
simply replaced the blocked worker thread. The worked thread that got blocked died after running the blocking code.This PR adds a mechanism for caching threads, similar to
Executors.newCachedThreadPool
, where the threads are kept around for one minute after they become inactive, in case more blocking work arrives. This saves the expensive process of spawning a new thread.For now, the retention period is fixed and non-configurable, but I am open to making it user configurable in
3.4.0
.Furthemore, this PR makes a change in the
IOFiber
blocking section, where fibers can determine if they are running on the WSTP and instead of making an expensive shift to the blocking executor and another expensive shift back to the compute pool, the blocking actions are wrapped inscala.concurrent.blocking
and executed in place, taking advantage of cached worker threads or spawning new ones if necessary.I also added benchmarks and the improvements are pretty sweet.
series/3.3.x
:This PR
: