diff --git a/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp b/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp index 1c09b412f..1812b41d2 100644 --- a/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp +++ b/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp @@ -143,12 +143,22 @@ namespace pika::execution::experimental { { if (next_state) { - // We pass the ownership of the intrusive_ptr of the next state to the next - // state itself, so that it can choose when to release it. If we can avoid it, - // we don't want this shared state to to hold on to the reference longer than - // necessary. + // We are also not accessing this shared state directly anymore, so we reset + // the next_state before calling done to avoid continuations being triggered by + // this reference being the last reference (if done after swapping the head of + // the queue that happens in done). When resetting before the swap of the head + // of the queue, we also know this can't be the last reference since senders + // that reference the shared state can't be used without adding a continuation + // to the queue (a continuation will hold another reference to the shared + // state). Continuations can run inline, but that can only happen after the head + // of the queue has been swapped. In summary, there must be at least two + // references to the shared state at this point, so we can safely reset it early. async_rw_mutex_shared_state_base* p = next_state.get(); - p->done(std::move(next_state)); + + PIKA_ASSERT(next_state.use_count() > 1); + next_state.reset(); + + p->done(); } } @@ -180,7 +190,7 @@ namespace pika::execution::experimental { current->continuation(); } - void done(shared_state_ptr_type p) noexcept + void done() noexcept { // `this` is not an async_rw_mutex_operation_state_base*, but is a known value to // signal that the queue has been processed @@ -190,10 +200,6 @@ namespace pika::execution::experimental { // We have now successfully acquired the head of the queue, and signaled to other // threads that they can't add any more items to the queue. We can now process the // queue without further synchronization. - // - // We are also not accessing this shared state directly anymore, so we can - // reset p early. - p.reset(); // Because of the way operation states are linked together, they will be accessed in // LIFO order (op_state_head points to the last operation state to be added, or @@ -446,7 +452,7 @@ namespace pika::execution::experimental { // value can be passed from the previous state to the next // state. if (PIKA_LIKELY(prev_state)) { prev_state->set_next_state(state); } - else { state->done(nullptr); } + else { state->done(); } } return {state}; @@ -462,7 +468,7 @@ namespace pika::execution::experimental { // a previous state we set the next state so that the value can be // passed from the previous state to the next state. if (PIKA_LIKELY(prev_state)) { prev_state->set_next_state(state); } - else { state->done(nullptr); } + else { state->done(); } return {state}; } @@ -618,7 +624,7 @@ namespace pika::execution::experimental { // Only the first access has no previous shared state. if (PIKA_LIKELY(prev_state)) { prev_state->set_next_state(state); } - else { state->done(nullptr); } + else { state->done(); } } return {state}; @@ -634,7 +640,7 @@ namespace pika::execution::experimental { // Only the first access has no previous shared state. if (PIKA_LIKELY(prev_state)) { prev_state->set_next_state(state); } - else { state->done(nullptr); } + else { state->done(); } return {state}; }