Skip to content

Commit

Permalink
Merge pull request #9 from cuviper/rustc-crossbeam
Browse files Browse the repository at this point in the history
Update crossbeam and release 0.3.2
  • Loading branch information
cuviper authored Jan 10, 2022
2 parents c8ec88d + d79ec98 commit 9fcf0ff
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 20 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "rustc-rayon"
# Reminder to update html_rool_url in lib.rs when updating version
version = "0.3.1"
version = "0.3.2"
authors = ["Niko Matsakis <niko@alum.mit.edu>",
"Josh Stone <cuviper@gmail.com>"]
description = "Simple work-stealing parallelism for Rust - fork for rustc"
Expand All @@ -20,7 +20,7 @@ exclude = ["ci"]

[dependencies]
rayon-core = { version = "0.3", path = "rayon-core", package = "rustc-rayon-core" }
crossbeam-deque = "0.7.2"
crossbeam-deque = "0.8.0"

# This is a public dependency!
[dependencies.either]
Expand Down
7 changes: 3 additions & 4 deletions rayon-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rustc-rayon-core"
version = "0.3.1" # reminder to update html_root_url attribute
version = "0.3.2" # reminder to update html_root_url attribute
authors = ["Niko Matsakis <niko@alum.mit.edu>",
"Josh Stone <cuviper@gmail.com>"]
description = "Core APIs for Rayon - fork for rustc"
Expand All @@ -17,9 +17,8 @@ categories = ["concurrency"]
[dependencies]
num_cpus = "1.2"
lazy_static = "1"
crossbeam-deque = "0.7.2"
crossbeam-queue = "0.2"
crossbeam-utils = "0.7"
crossbeam-deque = "0.8.0"
crossbeam-utils = "0.8.0"

[dev-dependencies]
rand = "0.7"
Expand Down
14 changes: 10 additions & 4 deletions rayon-core/src/job.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::latch::Latch;
use crate::tlv;
use crate::unwind;
use crossbeam_queue::SegQueue;
use crossbeam_deque::{Injector, Steal};
use std::any::Any;
use std::cell::UnsafeCell;
use std::mem;
Expand Down Expand Up @@ -191,13 +191,13 @@ impl<T> JobResult<T> {

/// Indirect queue to provide FIFO job priority.
pub(super) struct JobFifo {
inner: SegQueue<JobRef>,
inner: Injector<JobRef>,
}

impl JobFifo {
pub(super) fn new() -> Self {
JobFifo {
inner: SegQueue::new(),
inner: Injector::new(),
}
}

Expand All @@ -213,6 +213,12 @@ impl JobFifo {
impl Job for JobFifo {
unsafe fn execute(this: *const Self) {
// We "execute" a queue by executing its first job, FIFO.
(*this).inner.pop().expect("job in fifo queue").execute()
loop {
match (*this).inner.steal() {
Steal::Success(job_ref) => break job_ref.execute(),
Steal::Empty => panic!("FIFO is empty"),
Steal::Retry => {}
}
}
}
}
24 changes: 14 additions & 10 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use crate::{
AcquireThreadHandler, DeadlockHandler, ErrorKind, ExitHandler, PanicHandler,
ReleaseThreadHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
};
use crossbeam_deque::{Steal, Stealer, Worker};
use crossbeam_queue::SegQueue;
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use std::any::Any;
use std::cell::Cell;
use std::collections::hash_map::DefaultHasher;
Expand Down Expand Up @@ -135,7 +134,7 @@ where
pub struct Registry {
thread_infos: Vec<ThreadInfo>,
sleep: Sleep,
injected_jobs: SegQueue<JobRef>,
injected_jobs: Injector<JobRef>,
panic_handler: Option<Box<PanicHandler>>,
pub(crate) deadlock_handler: Option<Box<DeadlockHandler>>,
start_handler: Option<Box<StartHandler>>,
Expand Down Expand Up @@ -240,7 +239,7 @@ impl Registry {
let registry = Arc::new(Registry {
thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
sleep: Sleep::new(n_threads),
injected_jobs: SegQueue::new(),
injected_jobs: Injector::new(),
terminate_latch: CountLatch::new(),
panic_handler: builder.take_panic_handler(),
deadlock_handler: builder.take_deadlock_handler(),
Expand Down Expand Up @@ -415,13 +414,18 @@ impl Registry {
}

fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
let job = self.injected_jobs.pop().ok();
if job.is_some() {
log!(UninjectedWork {
worker: worker_index
});
loop {
match self.injected_jobs.steal() {
Steal::Success(job) => {
log!(UninjectedWork {
worker: worker_index
});
return Some(job);
}
Steal::Empty => return None,
Steal::Retry => {}
}
}
job
}

/// If already in a worker-thread of this registry, just execute `op`.
Expand Down

0 comments on commit 9fcf0ff

Please sign in to comment.