From 2344e1bc4be8a7b7a8585eb311a226f34f6d68e0 Mon Sep 17 00:00:00 2001 From: zerbina <100542850+zerbina@users.noreply.github.com> Date: Sat, 13 Apr 2024 17:07:26 +0000 Subject: [PATCH] fix logic error and race condition There were both a race condition and logic error with the implementation, which could lead to spurious signals, crashes, and other undefined behaviour. Race Condition -------------- Thread 1: ... 1. `suspend`: acquires lock for actor `A` and pool 2. changes the state to `Suspended` 3. `suspend`: releases the locks 4. pauses Thread 2: .... 1. `resume`: acquires lock for actor `A` and pool 2. actor state is `Suspended`, so the actor is queued 3. `resume`: releases the locks 4. ... 5. actor `A` is popped from the work queue and resumes execution Both thread 1 and 2 now have unguarded concurrent ownership of actor `A`, which constitutes a race condition. Logic Error ----------- Note: this is only *one* example of what could happen. Thread 1: ... 1. `suspend`: acquires lock for actor `A` and pool 2. changes the state to `Suspended` 3. releases the lock for actor `A` and and pool 4. pauses Thread 2: ... 1. `resume`: acquires lock for actor `A` and pool 2. actor state is `Suspend`, so the actor is queued 3. ... 4. actor `A` resumes execution 5. `jield` is called, and the state of actor `A` changes to `Jielding` 6. ... 7. `workerThread`: fetches the state of actor `A` (`Jielding`) 8. pauses Thread 1: 1. `workerThread`: fetches the state of actor `A` (`Jielding`) 2. queues actor `A` for execution and changes the state to `Queued` 3. pauses Thread 2: 1. the local `state` variable still contains `Jielding`, so the actor is queued again The same actor is now in the work queue two times! The Solution ------------ In both cases, the problem is that another thread can assume ownership of an actor while the worker thread responsible for the actor hasn't finished suspending it, due to the actor state being eagerly set to `Suspended` by `suspend`. `suspend` now sets the state to the new `Suspending` state first. When handling the `Suspending` state, if new signals were received in the meantime (`sendSig` cannot resume a *suspending* actor), the actor is queued again right away. Otherwise, it's suspended. --- actors/pool.nim | 15 +++++++++++++-- doc/flow.dot | 6 +++++- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/actors/pool.nim b/actors/pool.nim index 4c1ca10..d86bb95 100644 --- a/actors/pool.nim +++ b/actors/pool.nim @@ -35,7 +35,7 @@ type pool: ptr Pool State = enum - Suspended, Queued, Running, Jielding, Killed, Dead + Suspended, Queued, Running, Suspending, Jielding, Killed, Dead ActorObject* = object rc*: Atomic[int] @@ -191,7 +191,7 @@ proc suspend*(actor: Actor, c: sink Continuation): ActorCont = if actor[].sigQueue.len == 0: actor[].c = move c if actor[].state == Running: - actor[].state = Suspended + actor[].state = Suspending return nil actor.handleSignals() @@ -388,6 +388,17 @@ proc workerThread(worker: ptr Worker) {.thread.} = if c.finished: {.emit: ["/* actor finished */"].} pool.exit(actor, Normal) + elif state == Suspending: + pool.withLock: + actor.withLock: + # if there are already new signals in the queue, the actor needs + # to be queued again right away + if actor[].sigQueue.len > 0: + actor[].state = Queued + pool.workQueue.addLast(actor) + pool.cond.signal() + else: + actor[].state = Suspended elif state == Jielding: pool.withLock: actor.withLock: diff --git a/doc/flow.dot b/doc/flow.dot index 4c7fad6..c6b2a20 100644 --- a/doc/flow.dot +++ b/doc/flow.dot @@ -5,11 +5,15 @@ digraph dot { Suspended -> Running [ label = "recvCps()" ]; + Suspending -> Suspended; + + Suspending -> Queued; + Queued -> Running; Running -> Queued [ label = "jield()" ]; - Running -> Suspended [ label = "recv() ||\nsendCps()" ]; + Running -> Suspending [ label = "recv() ||\nsendCps()" ]; Running -> Killed [ label = "rx SigKill" ];