Skip to content

Commit

Permalink
ReentrantLock: wakeup a single task on unlock and add a short spin (J…
Browse files Browse the repository at this point in the history
…uliaLang#56814)

I propose a change in the implementation of the `ReentrantLock` to
improve its overall throughput for short critical sections and fix the
quadratic wake-up behavior where each unlock schedules **all** waiting
tasks on the lock's wait queue.

This implementation follows the same principles of the `Mutex` in the
[parking_lot](https://github.com/Amanieu/parking_lot/tree/master) Rust
crate which is based on the Webkit
[WTF::ParkingLot](https://webkit.org/blog/6161/locking-in-webkit/)
class. Only the basic working principle is implemented here, further
improvements such as eventual fairness will be proposed separately.

The gist of the change is that we add one extra state to the lock,
essentially going from:
```
0x0 => The lock is not locked
0x1 => The lock is locked by exactly one task. No other task is waiting for it.
0x2 => The lock is locked and some other task tried to lock but failed (conflict)
```
To:
```
```

In the current implementation we must schedule all tasks to cause a
conflict (state 0x2) because on unlock we only notify any task if the
lock is in the conflict state. This behavior means that with high
contention and a short critical section the tasks will be effectively
spinning in the scheduler queue.

With the extra state the proposed implementation has enough information
to know if there are other tasks to be notified or not, which means we
can always notify one task at a time while preserving the optimized path
of not notifying if there are no tasks waiting. To improve throughput
for short critical sections we also introduce a bounded amount of
spinning before attempting to park.

Not spinning on the scheduler queue greatly reduces the CPU utilization
of the following example:

```julia
function example()
    lock = ReentrantLock()
    @sync begin
        for i in 1:10000
            Threads.@Spawn begin
                @lock lock begin
                    sleep(0.001)
                end
            end
        end
    end
end

@time example()
```

Current:
```
28.890623 seconds (101.65 k allocations: 7.646 MiB, 0.25% compilation time)
```

![image](https://github.com/user-attachments/assets/dbd6ce57-c760-4f5a-b68a-27df6a97a46e)

Proposed:
```
22.806669 seconds (101.65 k allocations: 7.814 MiB, 0.35% compilation time)
```

![image](https://github.com/user-attachments/assets/b0254180-658d-4493-86d3-dea4c500b5ac)

In a micro-benchmark where 8 threads contend for a single lock with a
very short critical section we see a ~2x improvement.

Current:
```
8-element Vector{Int64}:
 6258688
 5373952
 6651904
 6389760
 6586368
 3899392
 5177344
 5505024
Total iterations: 45842432
```

Proposed:
```
8-element Vector{Int64}:
 12320768
 12976128
 10354688
 12845056
  7503872
 13598720
 13860864
 11993088
Total iterations: 95453184
```

~~In the uncontended scenario the extra bookkeeping causes a 10%
throughput reduction:~~
EDIT: I reverted _trylock to the simple case to recover the uncontended
throughput and now both implementations are on the same ballpark
(without hurting the above numbers).

In the uncontended scenario:

Current:
```
Total iterations: 236748800
```

Proposed:
```
Total iterations: 237699072
```

Closes JuliaLang#56182
  • Loading branch information
andrebsguedes authored and kpamnany committed Dec 30, 2024
1 parent ce667bc commit 627cdee
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 23 deletions.
126 changes: 104 additions & 22 deletions base/lock.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

const ThreadSynchronizer = GenericCondition{Threads.SpinLock}

# This bit is set in the `havelock` of a `ReentrantLock` when that lock is locked by some task.
const LOCKED_BIT = 0b01
# This bit is set in the `havelock` of a `ReentrantLock` just before parking a task. A task is being
# parked if it wants to lock the lock, but it is currently being held by some other task.
const PARKED_BIT = 0b10

const MAX_SPIN_ITERS = 40

# Advisory reentrant lock
"""
ReentrantLock()
Expand Down Expand Up @@ -36,7 +44,28 @@ mutable struct ReentrantLock <: AbstractLock
# offset32 = 20, offset64 = 24
reentrancy_cnt::UInt32
# offset32 = 24, offset64 = 28
@atomic havelock::UInt8 # 0x0 = none, 0x1 = lock, 0x2 = conflict
#
# This atomic integer holds the current state of the lock instance. Only the two lowest bits
# are used. See `LOCKED_BIT` and `PARKED_BIT` for the bitmask for these bits.
#
# # State table:
#
# PARKED_BIT | LOCKED_BIT | Description
# 0 | 0 | The lock is not locked, nor is anyone waiting for it.
# -----------+------------+------------------------------------------------------------------
# 0 | 1 | The lock is locked by exactly one task. No other task is
# | | waiting for it.
# -----------+------------+------------------------------------------------------------------
# 1 | 0 | The lock is not locked. One or more tasks are parked.
# -----------+------------+------------------------------------------------------------------
# 1 | 1 | The lock is locked by exactly one task. One or more tasks are
# | | parked waiting for the lock to become available.
# | | In this state, PARKED_BIT is only ever cleared when the cond_wait lock
# | | is held (i.e. on unlock). This ensures that
# | | we never end up in a situation where there are parked tasks but
# | | PARKED_BIT is not set (which would result in those tasks
# | | potentially never getting woken up).
@atomic havelock::UInt8
# offset32 = 28, offset64 = 32
cond_wait::ThreadSynchronizer # 2 words
# offset32 = 36, offset64 = 48
Expand Down Expand Up @@ -91,7 +120,7 @@ function islocked end
# `ReentrantLock`.

function islocked(rl::ReentrantLock)
return (@atomic :monotonic rl.havelock) != 0
return (@atomic :monotonic rl.havelock) & LOCKED_BIT != 0
end

"""
Expand All @@ -115,17 +144,15 @@ function trylock end
@inline function trylock(rl::ReentrantLock)
ct = current_task()
if rl.locked_by === ct
#@assert rl.havelock !== 0x00
rl.reentrancy_cnt += 0x0000_0001
return true
end
return _trylock(rl, ct)
end
@noinline function _trylock(rl::ReentrantLock, ct::Task)
GC.disable_finalizers()
if (@atomicreplace :acquire rl.havelock 0x00 => 0x01).success
#@assert rl.locked_by === nothing
#@assert rl.reentrancy_cnt === 0
state = (@atomic :monotonic rl.havelock) & PARKED_BIT
if (@atomicreplace :acquire rl.havelock state => (state | LOCKED_BIT)).success
rl.reentrancy_cnt = 0x0000_0001
@atomic :release rl.locked_by = ct
return true
Expand All @@ -146,23 +173,69 @@ Each `lock` must be matched by an [`unlock`](@ref).
@inline function lock(rl::ReentrantLock)
trylock(rl) || (@noinline function slowlock(rl::ReentrantLock)
c = rl.cond_wait
lock(c.lock)
try
while true
if (@atomicreplace rl.havelock 0x01 => 0x02).old == 0x00 # :sequentially_consistent ? # now either 0x00 or 0x02
# it was unlocked, so try to lock it ourself
_trylock(rl, current_task()) && break
else # it was locked, so now wait for the release to notify us
wait(c)
ct = current_task()
iteration = 1
while true
state = @atomic :monotonic rl.havelock
# Grab the lock if it isn't locked, even if there is a queue on it
if state & LOCKED_BIT == 0
GC.disable_finalizers()
result = (@atomicreplace :acquire :monotonic rl.havelock state => (state | LOCKED_BIT))
if result.success
rl.reentrancy_cnt = 0x0000_0001
@atomic :release rl.locked_by = ct
return
end
GC.enable_finalizers()
continue
end
finally
unlock(c.lock)

if state & PARKED_BIT == 0
# If there is no queue, try spinning a few times
if iteration <= MAX_SPIN_ITERS
Base.yield()
iteration += 1
continue
end

# If still not locked, try setting the parked bit
@atomicreplace :monotonic :monotonic rl.havelock state => (state | PARKED_BIT)
end

# lock the `cond_wait`
lock(c.lock)

# Last check before we wait to make sure `unlock` did not win the race
# to the `cond_wait` lock and cleared the parked bit
state = @atomic :acquire rl.havelock
if state != LOCKED_BIT | PARKED_BIT
unlock(c.lock)
continue
end

# It was locked, so now wait for the unlock to notify us
wait_no_relock(c)

# Loop back and try locking again
iteration = 1
end
end)(rl)
return
end

function wait_no_relock(c::GenericCondition)
ct = current_task()
_wait2(c, ct)
token = unlockall(c.lock)
try
return wait()
catch
ct.queue === nothing || list_deletefirst!(ct.queue, ct)
rethrow()
end
end


"""
unlock(lock)
Expand All @@ -179,18 +252,27 @@ internal counter and return immediately.
rl.reentrancy_cnt = n
if n == 0x0000_00000
@atomic :monotonic rl.locked_by = nothing
if (@atomicswap :release rl.havelock = 0x00) == 0x02
result = (@atomicreplace :release :monotonic rl.havelock LOCKED_BIT => 0x00)
if result.success
return true
else
(@noinline function notifywaiters(rl)
cond_wait = rl.cond_wait
lock(cond_wait)
try
notify(cond_wait)
finally
unlock(cond_wait)

notify(cond_wait, all=false)
if !isempty(cond_wait.waitq)
@atomic :release rl.havelock = PARKED_BIT
else
# We may have won the race to the `cond_wait` lock as a task was about to park
# but we unlock anyway as any parking task will retry
@atomic :release rl.havelock = 0x00
end

unlock(cond_wait)
end)(rl)
return true
end
return true
end
return false
end)(rl) && GC.enable_finalizers()
Expand Down
3 changes: 2 additions & 1 deletion test/threads.jl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ let lk = ReentrantLock()
t2 = @async (notify(c2); trylock(lk))
wait(c1)
wait(c2)
@test t1.queue === lk.cond_wait.waitq
# wait for the task to park in the queue (it may be spinning)
@test timedwait(() -> t1.queue === lk.cond_wait.waitq, 1.0) == :ok
@test t2.queue !== lk.cond_wait.waitq
@test istaskdone(t2)
@test !fetch(t2)
Expand Down

0 comments on commit 627cdee

Please sign in to comment.