Skip to content

Commit

Permalink
wrap thread_local in Cell instead RefCell (tokio-rs#4764)
Browse files Browse the repository at this point in the history
  • Loading branch information
gftea committed Jun 28, 2022
1 parent ea414c3 commit 720a8de
Showing 1 changed file with 39 additions and 27 deletions.
66 changes: 39 additions & 27 deletions tokio/src/task/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task};
use crate::sync::AtomicWaker;
use crate::util::VecDequeCell;

use std::cell::{Cell, RefCell};
use std::cell::Cell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
Expand Down Expand Up @@ -253,7 +253,7 @@ pin_project! {
}
}

thread_local!(static CURRENT: RefCell<Option<Rc<Context>>> = RefCell::new(None));
thread_local!(static CURRENT: Cell<Option<Rc<Context>>> = Cell::new(None));

cfg_rt! {
/// Spawns a `!Send` future on the local task set.
Expand Down Expand Up @@ -303,7 +303,8 @@ cfg_rt! {
F::Output: 'static
{
CURRENT.with(|maybe_cx| {
match maybe_cx.borrow().as_ref() {
let ctx = clone_rc(maybe_cx);
match ctx {
None => panic!("`spawn_local` called from outside of a `task::LocalSet`"),
Some(cx) => cx.spawn(future, name)
}
Expand All @@ -328,7 +329,6 @@ pub struct LocalEnterGuard(Option<Rc<Context>>);
impl Drop for LocalEnterGuard {
fn drop(&mut self) {
CURRENT.with(|ctx| {
// *ctx.borrow_mut() = self.0.take();
ctx.replace(self.0.take());
})
}
Expand All @@ -354,7 +354,7 @@ impl LocalSet {
/// Enter current LocalSet context
pub fn enter(&self) -> LocalEnterGuard {
CURRENT.with(|ctx| {
let old = ctx.borrow_mut().replace(self.context.clone());
let old = ctx.replace(Some(self.context.clone()));
LocalEnterGuard(old)
})
}
Expand Down Expand Up @@ -586,18 +586,17 @@ impl LocalSet {
}

fn with<T>(&self, f: impl FnOnce() -> T) -> T {
// CURRENT.set(&self.context, f)
CURRENT.with(|ctx| {
struct Reset<'a> {
ctx_ref: &'a RefCell<Option<Rc<Context>>>,
ctx_ref: &'a Cell<Option<Rc<Context>>>,
val: Option<Rc<Context>>,
}
impl<'a> Drop for Reset<'a> {
fn drop(&mut self) {
self.ctx_ref.replace(self.val.take());
}
}
let old = ctx.borrow_mut().replace(self.context.clone());
let old = ctx.replace(Some(self.context.clone()));

let _reset = Reset {
ctx_ref: ctx,
Expand Down Expand Up @@ -725,23 +724,33 @@ impl<T: Future> Future for RunUntil<'_, T> {
}
}

fn clone_rc<T>(rc: &Cell<Option<Rc<T>>>) -> Option<Rc<T>> {
let value = rc.take();
let cloned = value.clone();
rc.set(value);
cloned
}

impl Shared {
/// Schedule the provided task on the scheduler.
fn schedule(&self, task: task::Notified<Arc<Self>>) {
CURRENT.with(|maybe_cx| match maybe_cx.borrow().as_ref() {
Some(cx) if cx.shared.ptr_eq(self) => {
cx.queue.push_back(task);
}
_ => {
// First check whether the queue is still there (if not, the
// LocalSet is dropped). Then push to it if so, and if not,
// do nothing.
let mut lock = self.queue.lock();

if let Some(queue) = lock.as_mut() {
queue.push_back(task);
drop(lock);
self.waker.wake();
CURRENT.with(|maybe_cx| {
let ctx = clone_rc(maybe_cx);
match ctx {
Some(cx) if cx.shared.ptr_eq(self) => {
cx.queue.push_back(task);
}
_ => {
// First check whether the queue is still there (if not, the
// LocalSet is dropped). Then push to it if so, and if not,
// do nothing.
let mut lock = self.queue.lock();

if let Some(queue) = lock.as_mut() {
queue.push_back(task);
drop(lock);
self.waker.wake();
}
}
}
});
Expand All @@ -754,11 +763,14 @@ impl Shared {

impl task::Schedule for Arc<Shared> {
fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
CURRENT.with(|maybe_cx| match maybe_cx.borrow().as_ref() {
None => panic!("scheduler context missing"),
Some(cx) => {
assert!(cx.shared.ptr_eq(self));
cx.owned.remove(task)
CURRENT.with(|maybe_cx| {
let ctx = clone_rc(maybe_cx);
match ctx {
None => panic!("scheduler context missing"),
Some(cx) => {
assert!(cx.shared.ptr_eq(self));
cx.owned.remove(task)
}
}
})
}
Expand Down

0 comments on commit 720a8de

Please sign in to comment.