Skip to content

Commit

Permalink
Add more internal enforcement of static/scope lifetimes
Browse files Browse the repository at this point in the history
  • Loading branch information
cuviper committed Jun 11, 2022
1 parent b8e54d5 commit 6ec01f6
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 34 deletions.
2 changes: 1 addition & 1 deletion rayon-core/src/broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ where
// on each thread. This ref is decremented at the (*) above.
registry.increment_terminate_count();

ArcJob::as_job_ref(&job)
ArcJob::as_static_job_ref(&job)
});

registry.inject_broadcast(job_refs);
Expand Down
16 changes: 16 additions & 0 deletions rayon-core/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ where
pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef {
JobRef::new(Box::into_raw(self))
}

/// Creates a static `JobRef` from this job.
pub(super) fn into_static_job_ref(self: Box<Self>) -> JobRef
where
BODY: 'static,
{
unsafe { self.into_job_ref() }
}
}

impl<BODY> Job for HeapJob<BODY>
Expand Down Expand Up @@ -179,6 +187,14 @@ where
pub(super) unsafe fn as_job_ref(this: &Arc<Self>) -> JobRef {
JobRef::new(Arc::into_raw(Arc::clone(this)))
}

/// Creates a static `JobRef` from this job.
pub(super) fn as_static_job_ref(this: &Arc<Self>) -> JobRef
where
BODY: 'static,
{
unsafe { Self::as_job_ref(this) }
}
}

impl<BODY> Job for ArcJob<BODY>
Expand Down
97 changes: 65 additions & 32 deletions rayon-core/src/scope/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! [`join()`]: ../join/join.fn.html
use crate::broadcast::BroadcastContext;
use crate::job::{ArcJob, HeapJob, JobFifo};
use crate::job::{ArcJob, HeapJob, JobFifo, JobRef};
use crate::latch::{CountLatch, CountLockLatch, Latch};
use crate::registry::{global_registry, in_worker, Registry, WorkerThread};
use crate::unwind;
Expand Down Expand Up @@ -539,16 +539,18 @@ impl<'scope> Scope<'scope> {
where
BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
{
self.base.increment();
unsafe {
let job_ref =
HeapJob::new(move || self.base.execute_job(move || body(self))).into_job_ref();
let scope_ptr = ScopePtr(self);
let job = HeapJob::new(move || {
// SAFETY: this job will execute before the scope ends.
let scope = unsafe { &*scope_ptr.0 };
scope.base.execute_job(move || body(scope))
});
let job_ref = self.base.heap_job_ref(job);

// Since `Scope` implements `Sync`, we can't be sure that we're still in a
// thread of this pool, so we can't just push to the local worker thread.
// Also, this might be an in-place scope.
self.base.registry.inject_or_push(job_ref);
}
// Since `Scope` implements `Sync`, we can't be sure that we're still in a
// thread of this pool, so we can't just push to the local worker thread.
// Also, this might be an in-place scope.
self.base.registry.inject_or_push(job_ref);
}

/// Spawns a job into every thread of the fork-join scope `self`. This job will
Expand All @@ -559,12 +561,15 @@ impl<'scope> Scope<'scope> {
where
BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
{
let scope_ptr = ScopePtr(self);
let job = ArcJob::new(move || {
// SAFETY: this job will execute before the scope ends.
let scope = unsafe { &*scope_ptr.0 };
let body = &body;
self.base
.execute_job(move || BroadcastContext::with(move |ctx| body(self, ctx)))
let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
scope.base.execute_job(func);
});
unsafe { self.base.inject_broadcast(job) }
self.base.inject_broadcast(job)
}
}

Expand Down Expand Up @@ -594,20 +599,23 @@ impl<'scope> ScopeFifo<'scope> {
where
BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
{
self.base.increment();
unsafe {
let job_ref =
HeapJob::new(move || self.base.execute_job(move || body(self))).into_job_ref();

// If we're in the pool, use our scope's private fifo for this thread to execute
// in a locally-FIFO order. Otherwise, just use the pool's global injector.
match self.base.registry.current_thread() {
Some(worker) => {
let fifo = &self.fifos[worker.index()];
worker.push(fifo.push(job_ref));
}
None => self.base.registry.inject(&[job_ref]),
let scope_ptr = ScopePtr(self);
let job = HeapJob::new(move || {
// SAFETY: this job will execute before the scope ends.
let scope = unsafe { &*scope_ptr.0 };
scope.base.execute_job(move || body(scope))
});
let job_ref = self.base.heap_job_ref(job);

// If we're in the pool, use our scope's private fifo for this thread to execute
// in a locally-FIFO order. Otherwise, just use the pool's global injector.
match self.base.registry.current_thread() {
Some(worker) => {
let fifo = &self.fifos[worker.index()];
// SAFETY: this job will execute before the scope ends.
unsafe { worker.push(fifo.push(job_ref)) };
}
None => self.base.registry.inject(&[job_ref]),
}
}

Expand All @@ -619,12 +627,15 @@ impl<'scope> ScopeFifo<'scope> {
where
BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
{
let scope_ptr = ScopePtr(self);
let job = ArcJob::new(move || {
// SAFETY: this job will execute before the scope ends.
let scope = unsafe { &*scope_ptr.0 };
let body = &body;
self.base
.execute_job(move || BroadcastContext::with(move |ctx| body(self, ctx)))
let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
scope.base.execute_job(func);
});
unsafe { self.base.inject_broadcast(job) }
self.base.inject_broadcast(job)
}
}

Expand All @@ -648,12 +659,22 @@ impl<'scope> ScopeBase<'scope> {
self.job_completed_latch.increment();
}

unsafe fn inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>)
fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef
where
FUNC: FnOnce() + Send + 'scope,
{
unsafe {
self.increment();
job.into_job_ref()
}
}

fn inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>)
where
FUNC: Fn() + Send + Sync,
FUNC: Fn() + Send + Sync + 'scope,
{
let n_threads = self.registry.num_threads();
let job_refs = (0..n_threads).map(|_| {
let job_refs = (0..n_threads).map(|_| unsafe {
self.increment();
ArcJob::as_job_ref(&job)
});
Expand Down Expand Up @@ -817,3 +838,15 @@ impl fmt::Debug for ScopeLatch {
}
}
}

/// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime.
///
/// Unsafe code is still required to dereference the pointer, but that's fine in
/// scope jobs that are guaranteed to execute before the scope ends.
struct ScopePtr<T>(*const T);

// SAFETY: !Send for raw pointers is not for safety, just as a lint
unsafe impl<T: Sync> Send for ScopePtr<T> {}

// SAFETY: !Sync for raw pointers is not for safety, just as a lint
unsafe impl<T: Sync> Sync for ScopePtr<T> {}
2 changes: 1 addition & 1 deletion rayon-core/src/spawn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ where
registry.terminate(); // (*) permit registry to terminate now
}
})
.into_job_ref()
.into_static_job_ref()
}

/// Fires off a task into the Rayon threadpool in the "static" or
Expand Down

0 comments on commit 6ec01f6

Please sign in to comment.