diff --git a/Cargo.toml b/Cargo.toml index 589272afe..e03d819c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ exclude = ["ci"] [dependencies] rayon-core = { version = "1.4", path = "rayon-core" } +crossbeam-deque = "0.2.0" # This is a public dependency! [dependencies.either] diff --git a/rayon-demo/src/life/bench.rs b/rayon-demo/src/life/bench.rs index aa6ff53b7..c589b1c79 100644 --- a/rayon-demo/src/life/bench.rs +++ b/rayon-demo/src/life/bench.rs @@ -9,3 +9,8 @@ fn generations(b: &mut ::test::Bencher) { fn parallel_generations(b: &mut ::test::Bencher) { b.iter(|| super::parallel_generations(Board::new(200, 200).random(), 100)); } + +#[bench] +fn as_parallel_generations(b: &mut ::test::Bencher) { + b.iter(|| super::par_bridge_generations(Board::new(200, 200).random(), 100)); +} diff --git a/rayon-demo/src/life/mod.rs b/rayon-demo/src/life/mod.rs index 4868f1c65..2d7b99ea7 100644 --- a/rayon-demo/src/life/mod.rs +++ b/rayon-demo/src/life/mod.rs @@ -20,6 +20,7 @@ use time; use docopt::Docopt; use rayon::prelude::*; +use rayon::iter::ParallelBridge; #[cfg(test)] mod bench; @@ -93,6 +94,15 @@ impl Board { self.next_board(new_brd) } + pub fn par_bridge_next_generation(&self) -> Board { + let new_brd = (0..self.len()) + .par_bridge() + .map(|cell| self.successor_cell(cell)) + .collect(); + + self.next_board(new_brd) + } + fn cell_live(&self, x: usize, y: usize) -> bool { !(x >= self.cols || y >= self.rows) && self.board[y * self.cols + x] } @@ -145,6 +155,11 @@ fn parallel_generations(board: Board, gens: usize) { for _ in 0..gens { brd = brd.parallel_next_generation(); } } +fn par_bridge_generations(board: Board, gens: usize) { + let mut brd = board; + for _ in 0..gens { brd = brd.par_bridge_next_generation(); } +} + fn measure(f: fn(Board, usize) -> (), args: &Args) -> u64 { let (n, gens) = (args.flag_size, args.flag_gens); let brd = Board::new(n, n).random(); @@ -168,5 +183,9 @@ pub fn main(args: &[String]) { let parallel = measure(parallel_generations, &args); println!("parallel: {:10} ns -> {:.2}x speedup", parallel, serial as f64 / parallel as f64); + + let par_bridge = measure(par_bridge_generations, &args); + println!("par_bridge: {:10} ns -> {:.2}x speedup", par_bridge, + serial as f64 / par_bridge as f64); } } diff --git a/rayon-demo/src/nbody/bench.rs b/rayon-demo/src/nbody/bench.rs index 3b2fbfb1a..d772a8581 100644 --- a/rayon-demo/src/nbody/bench.rs +++ b/rayon-demo/src/nbody/bench.rs @@ -30,6 +30,11 @@ fn nbody_par(b: &mut ::test::Bencher) { nbody_bench(b, |n| { n.tick_par(); }); } +#[bench] +fn nbody_par_bridge(b: &mut ::test::Bencher) { + nbody_bench(b, |n| { n.tick_par_bridge(); }); +} + #[bench] fn nbody_parreduce(b: &mut ::test::Bencher) { nbody_bench(b, |n| { n.tick_par_reduce(); }); diff --git a/rayon-demo/src/nbody/nbody.rs b/rayon-demo/src/nbody/nbody.rs index ef4a4a9f0..43c93d16b 100644 --- a/rayon-demo/src/nbody/nbody.rs +++ b/rayon-demo/src/nbody/nbody.rs @@ -30,8 +30,10 @@ // [1]: https://github.com/IntelLabs/RiverTrail/blob/master/examples/nbody-webgl/NBody.js use cgmath::{InnerSpace, Point3, Vector3, Zero}; -use rayon::prelude::*; use rand::{Rand, Rng}; +use rayon::prelude::*; +#[cfg(test)] +use rayon::iter::ParallelBridge; use std::f64::consts::PI; const INITIAL_VELOCITY: f64 = 8.0; // set to 0.0 to turn off. @@ -50,8 +52,7 @@ pub struct Body { impl NBodyBenchmark { pub fn new(num_bodies: usize, rng: &mut R) -> NBodyBenchmark { - let bodies0: Vec<_> = - (0..num_bodies) + let bodies0: Vec<_> = (0..num_bodies) .map(|_| { let position = Point3 { x: f64::rand(rng).floor() * 40_000.0, @@ -71,7 +72,11 @@ impl NBodyBenchmark { z: f64::rand(rng) * INITIAL_VELOCITY, }; - Body { position: position, velocity: velocity, velocity2: velocity2 } + Body { + position: position, + velocity: velocity, + velocity2: velocity2, + } }) .collect(); @@ -91,16 +96,44 @@ impl NBodyBenchmark { }; let time = self.time; - out_bodies.par_iter_mut() - .zip(&in_bodies[..]) - .for_each(|(out, prev)| { - let (vel, vel2) = next_velocity(time, prev, in_bodies); - out.velocity = vel; - out.velocity2 = vel2; + out_bodies + .par_iter_mut() + .zip(&in_bodies[..]) + .for_each(|(out, prev)| { + let (vel, vel2) = next_velocity(time, prev, in_bodies); + out.velocity = vel; + out.velocity2 = vel2; + + let next_velocity = vel - vel2; + out.position = prev.position + next_velocity; + }); - let next_velocity = vel - vel2; - out.position = prev.position + next_velocity; - }); + self.time += 1; + + out_bodies + } + + #[cfg(test)] + pub fn tick_par_bridge(&mut self) -> &[Body] { + let (in_bodies, out_bodies) = if (self.time & 1) == 0 { + (&self.bodies.0, &mut self.bodies.1) + } else { + (&self.bodies.1, &mut self.bodies.0) + }; + + let time = self.time; + out_bodies + .iter_mut() + .zip(&in_bodies[..]) + .par_bridge() + .for_each(|(out, prev)| { + let (vel, vel2) = next_velocity(time, prev, in_bodies); + out.velocity = vel; + out.velocity2 = vel2; + + let next_velocity = vel - vel2; + out.position = prev.position + next_velocity; + }); self.time += 1; @@ -115,16 +148,17 @@ impl NBodyBenchmark { }; let time = self.time; - out_bodies.par_iter_mut() - .zip(&in_bodies[..]) - .for_each(|(out, prev)| { - let (vel, vel2) = next_velocity_par(time, prev, in_bodies); - out.velocity = vel; - out.velocity2 = vel2; - - let next_velocity = vel - vel2; - out.position = prev.position + next_velocity; - }); + out_bodies + .par_iter_mut() + .zip(&in_bodies[..]) + .for_each(|(out, prev)| { + let (vel, vel2) = next_velocity_par(time, prev, in_bodies); + out.velocity = vel; + out.velocity2 = vel2; + + let next_velocity = vel - vel2; + out.position = prev.position + next_velocity; + }); self.time += 1; @@ -207,57 +241,57 @@ fn next_velocity(time: usize, prev: &Body, bodies: &[Body]) -> (Vector3, Ve let zero: Vector3 = Vector3::zero(); let (diff, diff2) = bodies .iter() - .fold( - (zero, zero), - |(mut diff, mut diff2), body| { - let r = body.position - prev.position; - - // make sure we are not testing the particle against its own position - let are_same = r == Vector3::zero(); - - let dist_sqrd = r.magnitude2(); - - if dist_sqrd < zone_sqrd && !are_same { - let length = dist_sqrd.sqrt(); - let percent = dist_sqrd / zone_sqrd; - - if dist_sqrd < repel { - let f = (repel / percent - 1.0) * 0.025; - let normal = (r / length) * f; - diff += normal; - diff2 += normal; - } else if dist_sqrd < align { - let thresh_delta = align - repel; - let adjusted_percent = (percent - repel) / thresh_delta; - let q = (0.5 - (adjusted_percent * PI * 2.0).cos() * 0.5 + 0.5) * 100.9; - - // normalize vel2 and multiply by factor - let vel2_length = body.velocity2.magnitude(); - let vel2 = (body.velocity2 / vel2_length) * q; - - // normalize own velocity - let vel_length = prev.velocity.magnitude(); - let vel = (prev.velocity / vel_length) * q; + .fold((zero, zero), |(mut diff, mut diff2), body| { + let r = body.position - prev.position; + + // make sure we are not testing the particle against its own position + let are_same = r == Vector3::zero(); + + let dist_sqrd = r.magnitude2(); + + if dist_sqrd < zone_sqrd && !are_same { + let length = dist_sqrd.sqrt(); + let percent = dist_sqrd / zone_sqrd; + + if dist_sqrd < repel { + let f = (repel / percent - 1.0) * 0.025; + let normal = (r / length) * f; + diff += normal; + diff2 += normal; + } else if dist_sqrd < align { + let thresh_delta = align - repel; + let adjusted_percent = (percent - repel) / thresh_delta; + let q = (0.5 - (adjusted_percent * PI * 2.0).cos() * 0.5 + 0.5) * 100.9; + + // normalize vel2 and multiply by factor + let vel2_length = body.velocity2.magnitude(); + let vel2 = (body.velocity2 / vel2_length) * q; + + // normalize own velocity + let vel_length = prev.velocity.magnitude(); + let vel = (prev.velocity / vel_length) * q; + + diff += vel2; + diff2 += vel; + } - diff += vel2; - diff2 += vel; - } + if dist_sqrd > attract { + // attract + let thresh_delta2 = 1.0 - attract; + let adjusted_percent2 = (percent - attract) / thresh_delta2; + let c = + (1.0 - ((adjusted_percent2 * PI * 2.0).cos() * 0.5 + 0.5)) * attract_power; - if dist_sqrd > attract { // attract - let thresh_delta2 = 1.0 - attract; - let adjusted_percent2 = (percent - attract) / thresh_delta2; - let c = (1.0 - ((adjusted_percent2 * PI * 2.0).cos() * 0.5 + 0.5)) * attract_power; + // normalize the distance vector + let d = (r / length) * c; - // normalize the distance vector - let d = (r / length) * c; - - diff += d; - diff2 -= d; - } + diff += d; + diff2 -= d; } + } - (diff, diff2) - }); + (diff, diff2) + }); acc += diff; acc2 += diff2; @@ -294,8 +328,7 @@ fn next_velocity(time: usize, prev: &Body, bodies: &[Body]) -> (Vector3, Ve } /// Compute next velocity of `prev` -fn next_velocity_par(time: usize, prev: &Body, bodies: &[Body]) - -> (Vector3, Vector3) { +fn next_velocity_par(time: usize, prev: &Body, bodies: &[Body]) -> (Vector3, Vector3) { let time = time as f64; let center = Point3 { x: (time / 22.0).cos() * -4200.0, @@ -344,8 +377,9 @@ fn next_velocity_par(time: usize, prev: &Body, bodies: &[Body]) let (diff, diff2) = bodies .par_iter() - .fold(|| (Vector3::zero(), Vector3::zero()), - |(mut diff, mut diff2), body| { + .fold( + || (Vector3::zero(), Vector3::zero()), + |(mut diff, mut diff2), body| { let r = body.position - prev.position; // make sure we are not testing the particle against its own position @@ -379,10 +413,12 @@ fn next_velocity_par(time: usize, prev: &Body, bodies: &[Body]) diff2 += vel; } - if dist_sqrd > attract { // attract + if dist_sqrd > attract { + // attract let thresh_delta2 = 1.0 - attract; let adjusted_percent2 = (percent - attract) / thresh_delta2; - let c = (1.0 - ((adjusted_percent2 * PI * 2.0).cos() * 0.5 + 0.5)) * attract_power; + let c = (1.0 - ((adjusted_percent2 * PI * 2.0).cos() * 0.5 + 0.5)) + * attract_power; // normalize the distance vector let d = (r / length) * c; @@ -393,9 +429,12 @@ fn next_velocity_par(time: usize, prev: &Body, bodies: &[Body]) } (diff, diff2) - }) - .reduce(|| (Vector3::zero(), Vector3::zero()), - |(diffa, diff2a), (diffb, diff2b)| (diffa + diffb, diff2a + diff2b)); + }, + ) + .reduce( + || (Vector3::zero(), Vector3::zero()), + |(diffa, diff2a), (diffb, diff2b)| (diffa + diffb, diff2a + diff2b), + ); acc += diff; acc2 += diff2; diff --git a/src/iter/mod.rs b/src/iter/mod.rs index 7180033b2..49cbb7f82 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -84,6 +84,9 @@ use self::plumbing::*; // e.g. `find::find()`, are always used **prefixed**, so that they // can be readily distinguished. +mod par_bridge; +pub use self::par_bridge::{ParallelBridge, IterBridge}; + mod find; mod find_first_last; mod chain; diff --git a/src/iter/par_bridge.rs b/src/iter/par_bridge.rs new file mode 100644 index 000000000..698ac099b --- /dev/null +++ b/src/iter/par_bridge.rs @@ -0,0 +1,189 @@ +use crossbeam_deque::{Deque, Stealer, Steal}; + +use std::thread::yield_now; +use std::sync::{Mutex, TryLockError}; +use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; + +use iter::ParallelIterator; +use iter::plumbing::{UnindexedConsumer, UnindexedProducer, bridge_unindexed, Folder}; +use current_num_threads; + +/// Conversion trait to convert an `Iterator` to a `ParallelIterator`. +/// +/// This creates a "bridge" from a sequential iterator to a parallel one, by distributing its items +/// across the Rayon thread pool. This has the advantage of being able to parallelize just about +/// anything, but the resulting `ParallelIterator` can be less efficient than if you started with +/// `par_iter` instead. However, it can still be useful for iterators that are difficult to +/// parallelize by other means, like channels or file or network I/O. +/// +/// The resulting iterator is not guaranteed to keep the order of the original iterator. +/// +/// # Examples +/// +/// To use this trait, take an existing `Iterator` and call `par_bridge` on it. After that, you can +/// use any of the `ParallelIterator` methods: +/// +/// ``` +/// use rayon::iter::ParallelBridge; +/// use rayon::prelude::ParallelIterator; +/// use std::sync::mpsc::channel; +/// +/// let rx = { +/// let (tx, rx) = channel(); +/// +/// tx.send("one!"); +/// tx.send("two!"); +/// tx.send("three!"); +/// +/// rx +/// }; +/// +/// let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect(); +/// output.sort_unstable(); +/// +/// assert_eq!(&*output, &["one!", "three!", "two!"]); +/// ``` +pub trait ParallelBridge: Sized { + /// Create a bridge from this type to a `ParallelIterator`. + fn par_bridge(self) -> IterBridge; +} + +impl ParallelBridge for T + where T::Item: Send +{ + fn par_bridge(self) -> IterBridge { + IterBridge { + iter: self, + } + } +} + +/// `IterBridge` is a parallel iterator that wraps a sequential iterator. +/// +/// This type is created when using the `par_bridge` method on `ParallelBridge`. See the +/// [`ParallelBridge`] documentation for details. +/// +/// [`ParallelBridge`]: trait.ParallelBridge.html +#[derive(Debug, Clone)] +pub struct IterBridge { + iter: Iter, +} + +impl ParallelIterator for IterBridge + where Iter::Item: Send +{ + type Item = Iter::Item; + + fn drive_unindexed(self, consumer: C) -> C::Result + where C: UnindexedConsumer + { + let split_count = AtomicUsize::new(current_num_threads()); + let deque = Deque::new(); + let stealer = deque.stealer(); + let done = AtomicBool::new(false); + let iter = Mutex::new((self.iter, deque)); + + bridge_unindexed(IterParallelProducer { + split_count: &split_count, + done: &done, + iter: &iter, + items: stealer, + }, consumer) + } +} + +struct IterParallelProducer<'a, Iter: Iterator + 'a> { + split_count: &'a AtomicUsize, + done: &'a AtomicBool, + iter: &'a Mutex<(Iter, Deque)>, + items: Stealer, +} + +// manual clone because T doesn't need to be Clone, but the derive assumes it should be +impl<'a, Iter: Iterator + 'a> Clone for IterParallelProducer<'a, Iter> { + fn clone(&self) -> Self { + IterParallelProducer { + split_count: self.split_count, + done: self.done, + iter: self.iter, + items: self.items.clone(), + } + } +} + +impl<'a, Iter: Iterator + Send + 'a> UnindexedProducer for IterParallelProducer<'a, Iter> + where Iter::Item: Send +{ + type Item = Iter::Item; + + fn split(self) -> (Self, Option) { + let mut count = self.split_count.load(Ordering::SeqCst); + + loop { + let done = self.done.load(Ordering::SeqCst); + match count.checked_sub(1) { + Some(new_count) if !done => { + let last_count = self.split_count.compare_and_swap(count, new_count, Ordering::SeqCst); + if last_count == count { + return (self.clone(), Some(self)); + } else { + count = last_count; + } + } + _ => { + return (self, None); + } + } + } + } + + fn fold_with(self, mut folder: F) -> F + where F: Folder + { + loop { + match self.items.steal() { + Steal::Data(it) => { + folder = folder.consume(it); + if folder.full() { + return folder; + } + } + Steal::Empty => { + if self.done.load(Ordering::SeqCst) { + // the iterator is out of items, no use in continuing + return folder; + } else { + // our cache is out of items, time to load more from the iterator + match self.iter.try_lock() { + Ok(mut guard) => { + let count = current_num_threads(); + let count = (count * count) * 2; + + let (ref mut iter, ref deque) = *guard; + + while deque.len() < count { + if let Some(it) = iter.next() { + deque.push(it); + } else { + self.done.store(true, Ordering::SeqCst); + break; + } + } + } + Err(TryLockError::WouldBlock) => { + // someone else has the mutex, just sit tight until it's ready + yield_now(); //TODO: use a thread=pool-aware yield? (#548) + } + Err(TryLockError::Poisoned(_)) => { + // any panics from other threads will have been caught by the pool, + // and will be re-thrown when joined - just exit + return folder; + } + } + } + } + Steal::Retry => (), + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 535bda1f4..ae3ee29a7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -81,6 +81,7 @@ extern crate rayon_core; extern crate either; +extern crate crossbeam_deque; #[cfg(test)] extern crate rand; diff --git a/src/prelude.rs b/src/prelude.rs index a13d6a378..54b4e8696 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -9,6 +9,7 @@ pub use iter::IntoParallelRefMutIterator; pub use iter::IndexedParallelIterator; pub use iter::ParallelExtend; pub use iter::ParallelIterator; +pub use iter::ParallelBridge; pub use slice::ParallelSlice; pub use slice::ParallelSliceMut; pub use str::ParallelString;