From 037b70a294bb08573cd4c3838c2950a901c88150 Mon Sep 17 00:00:00 2001 From: QuietMisdreavus Date: Wed, 7 Mar 2018 16:31:01 -0600 Subject: [PATCH 01/12] add bridge from Iterator to ParallelIterator --- Cargo.toml | 1 + src/iter/as_parallel.rs | 160 ++++++++++++++++++++++++++++++++++++++++ src/iter/mod.rs | 3 + src/lib.rs | 1 + 4 files changed, 165 insertions(+) create mode 100644 src/iter/as_parallel.rs diff --git a/Cargo.toml b/Cargo.toml index 589272afe..e03d819c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ exclude = ["ci"] [dependencies] rayon-core = { version = "1.4", path = "rayon-core" } +crossbeam-deque = "0.2.0" # This is a public dependency! [dependencies.either] diff --git a/src/iter/as_parallel.rs b/src/iter/as_parallel.rs new file mode 100644 index 000000000..57d9fc685 --- /dev/null +++ b/src/iter/as_parallel.rs @@ -0,0 +1,160 @@ +use crossbeam_deque::{Deque, Stealer, Steal}; + +use std::thread::yield_now; +use std::sync::{Mutex, TryLockError}; +use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; + +use iter::*; +use current_num_threads; + +/// Conversion trait to convert an `Iterator` to a `ParallelIterator`. +/// +/// This needs to be distinct from `IntoParallelIterator` because that trait is already implemented +/// on a few `Iterator`s, like `std::ops::Range`. +pub trait AsParallel { + /// What is the type of the output `ParallelIterator`? + type Iter: ParallelIterator; + + /// What is the `Item` of the output `ParallelIterator`? + type Item: Send; + + /// Convert this type to a `ParallelIterator`. + fn as_parallel(self) -> Self::Iter; +} + +impl AsParallel for T + where T::Item: Send +{ + type Iter = IterParallel; + type Item = T::Item; + + fn as_parallel(self) -> Self::Iter { + IterParallel { + iter: self, + } + } +} + +/// `IterParallel` is a parallel iterator that wraps a sequential iterator. +#[derive(Debug)] +pub struct IterParallel { + iter: Iter, +} + +impl ParallelIterator for IterParallel + where Iter::Item: Send +{ + type Item = Iter::Item; + + fn drive_unindexed(self, consumer: C) -> C::Result + where C: UnindexedConsumer + { + let split_count = AtomicUsize::new(current_num_threads()); + let deque = Deque::new(); + let stealer = deque.stealer(); + let done = AtomicBool::new(false); + let iter = Mutex::new((self.iter, deque)); + + bridge_unindexed(IterParallelProducer { + split_count: &split_count, + done: &done, + iter: &iter, + items: stealer, + }, consumer) + } +} + +struct IterParallelProducer<'a, Iter: Iterator + 'a> { + split_count: &'a AtomicUsize, + done: &'a AtomicBool, + iter: &'a Mutex<(Iter, Deque)>, + items: Stealer, +} + +// manual clone because T doesn't need to be Clone, but the derive assumes it should be +impl<'a, Iter: Iterator + 'a> Clone for IterParallelProducer<'a, Iter> { + fn clone(&self) -> Self { + IterParallelProducer { + split_count: self.split_count, + done: self.done, + iter: self.iter, + items: self.items.clone(), + } + } +} + +impl<'a, Iter: Iterator + Send + 'a> UnindexedProducer for IterParallelProducer<'a, Iter> + where Iter::Item: Send +{ + type Item = Iter::Item; + + fn split(self) -> (Self, Option) { + let mut count = self.split_count.load(Ordering::SeqCst); + + loop { + let done = self.done.load(Ordering::SeqCst); + match count.checked_sub(1) { + Some(new_count) if !done => { + let last_count = self.split_count.compare_and_swap(count, new_count, Ordering::SeqCst); + if last_count == count { + return (self.clone(), Some(self)); + } else { + count = last_count; + } + } + _ => { + return (self, None); + } + } + } + } + + fn fold_with(self, mut folder: F) -> F + where F: Folder + { + loop { + match self.items.steal() { + Steal::Data(it) => { + folder = folder.consume(it); + if folder.full() { + return folder; + } + } + Steal::Empty => { + if self.done.load(Ordering::SeqCst) { + // the iterator is out of items, no use in continuing + return folder; + } else { + // our cache is out of items, time to load more from the iterator + match self.iter.try_lock() { + Ok(mut guard) => { + let count = current_num_threads(); + let count = (count * count) * 2; + + let (ref mut iter, ref deque) = *guard; + + for _ in 0..count { + if let Some(it) = iter.next() { + deque.push(it); + } else { + self.done.store(true, Ordering::SeqCst); + break; + } + } + } + Err(TryLockError::WouldBlock) => { + // someone else has the mutex, just sit tight until it's ready + yield_now(); //TODO: use a thread=pool-aware yield? (#548) + } + Err(TryLockError::Poisoned(_)) => { + // TODO: how to handle poison? + return folder; + } + } + } + } + Steal::Retry => (), + } + } + } +} diff --git a/src/iter/mod.rs b/src/iter/mod.rs index 7180033b2..4712ad7ff 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -84,6 +84,9 @@ use self::plumbing::*; // e.g. `find::find()`, are always used **prefixed**, so that they // can be readily distinguished. +mod as_parallel; +pub use self::as_parallel::{AsParallel, IterParallel}; + mod find; mod find_first_last; mod chain; diff --git a/src/lib.rs b/src/lib.rs index 535bda1f4..ae3ee29a7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -81,6 +81,7 @@ extern crate rayon_core; extern crate either; +extern crate crossbeam_deque; #[cfg(test)] extern crate rand; From 58957d07353331713fffc5725c767f2efdc646da Mon Sep 17 00:00:00 2001 From: QuietMisdreavus Date: Wed, 7 Mar 2018 17:07:06 -0600 Subject: [PATCH 02/12] fix imports --- src/iter/as_parallel.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/iter/as_parallel.rs b/src/iter/as_parallel.rs index 57d9fc685..9f3d5bd6c 100644 --- a/src/iter/as_parallel.rs +++ b/src/iter/as_parallel.rs @@ -4,7 +4,8 @@ use std::thread::yield_now; use std::sync::{Mutex, TryLockError}; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; -use iter::*; +use iter::ParallelIterator; +use iter::plumbing::{UnindexedConsumer, UnindexedProducer, bridge_unindexed, Folder}; use current_num_threads; /// Conversion trait to convert an `Iterator` to a `ParallelIterator`. From c352b2782e8e474ee22fb830503a16d8ea73863d Mon Sep 17 00:00:00 2001 From: QuietMisdreavus Date: Thu, 8 Mar 2018 10:24:31 -0600 Subject: [PATCH 03/12] change note around poisoned mutex --- src/iter/as_parallel.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/iter/as_parallel.rs b/src/iter/as_parallel.rs index 9f3d5bd6c..89140c1e8 100644 --- a/src/iter/as_parallel.rs +++ b/src/iter/as_parallel.rs @@ -148,7 +148,8 @@ impl<'a, Iter: Iterator + Send + 'a> UnindexedProducer for IterParallelProducer< yield_now(); //TODO: use a thread=pool-aware yield? (#548) } Err(TryLockError::Poisoned(_)) => { - // TODO: how to handle poison? + // any panics from other threads will have been caught by the pool, + // and will be re-thrown when joined - just exit return folder; } } From 01f6ecb0ca55f9f903f726e751376e6a7bb00e47 Mon Sep 17 00:00:00 2001 From: QuietMisdreavus Date: Thu, 8 Mar 2018 10:35:26 -0600 Subject: [PATCH 04/12] tweak IterParallelProducer reader behavior --- src/iter/as_parallel.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/iter/as_parallel.rs b/src/iter/as_parallel.rs index 89140c1e8..b765d6563 100644 --- a/src/iter/as_parallel.rs +++ b/src/iter/as_parallel.rs @@ -134,7 +134,7 @@ impl<'a, Iter: Iterator + Send + 'a> UnindexedProducer for IterParallelProducer< let (ref mut iter, ref deque) = *guard; - for _ in 0..count { + while deque.len() < count { if let Some(it) = iter.next() { deque.push(it); } else { From 3e2facc120f1c1caf46af9c2dc7e38688c7aa4ad Mon Sep 17 00:00:00 2001 From: Niko Matsakis Date: Tue, 27 Mar 2018 20:09:16 +0200 Subject: [PATCH 05/12] rustfmt nbody.rs --- rayon-demo/src/nbody/nbody.rs | 168 ++++++++++++++++++---------------- 1 file changed, 89 insertions(+), 79 deletions(-) diff --git a/rayon-demo/src/nbody/nbody.rs b/rayon-demo/src/nbody/nbody.rs index ef4a4a9f0..5d917d299 100644 --- a/rayon-demo/src/nbody/nbody.rs +++ b/rayon-demo/src/nbody/nbody.rs @@ -30,8 +30,8 @@ // [1]: https://github.com/IntelLabs/RiverTrail/blob/master/examples/nbody-webgl/NBody.js use cgmath::{InnerSpace, Point3, Vector3, Zero}; -use rayon::prelude::*; use rand::{Rand, Rng}; +use rayon::prelude::*; use std::f64::consts::PI; const INITIAL_VELOCITY: f64 = 8.0; // set to 0.0 to turn off. @@ -50,8 +50,7 @@ pub struct Body { impl NBodyBenchmark { pub fn new(num_bodies: usize, rng: &mut R) -> NBodyBenchmark { - let bodies0: Vec<_> = - (0..num_bodies) + let bodies0: Vec<_> = (0..num_bodies) .map(|_| { let position = Point3 { x: f64::rand(rng).floor() * 40_000.0, @@ -71,7 +70,11 @@ impl NBodyBenchmark { z: f64::rand(rng) * INITIAL_VELOCITY, }; - Body { position: position, velocity: velocity, velocity2: velocity2 } + Body { + position: position, + velocity: velocity, + velocity2: velocity2, + } }) .collect(); @@ -91,16 +94,17 @@ impl NBodyBenchmark { }; let time = self.time; - out_bodies.par_iter_mut() - .zip(&in_bodies[..]) - .for_each(|(out, prev)| { - let (vel, vel2) = next_velocity(time, prev, in_bodies); - out.velocity = vel; - out.velocity2 = vel2; - - let next_velocity = vel - vel2; - out.position = prev.position + next_velocity; - }); + out_bodies + .par_iter_mut() + .zip(&in_bodies[..]) + .for_each(|(out, prev)| { + let (vel, vel2) = next_velocity(time, prev, in_bodies); + out.velocity = vel; + out.velocity2 = vel2; + + let next_velocity = vel - vel2; + out.position = prev.position + next_velocity; + }); self.time += 1; @@ -115,16 +119,17 @@ impl NBodyBenchmark { }; let time = self.time; - out_bodies.par_iter_mut() - .zip(&in_bodies[..]) - .for_each(|(out, prev)| { - let (vel, vel2) = next_velocity_par(time, prev, in_bodies); - out.velocity = vel; - out.velocity2 = vel2; - - let next_velocity = vel - vel2; - out.position = prev.position + next_velocity; - }); + out_bodies + .par_iter_mut() + .zip(&in_bodies[..]) + .for_each(|(out, prev)| { + let (vel, vel2) = next_velocity_par(time, prev, in_bodies); + out.velocity = vel; + out.velocity2 = vel2; + + let next_velocity = vel - vel2; + out.position = prev.position + next_velocity; + }); self.time += 1; @@ -207,57 +212,57 @@ fn next_velocity(time: usize, prev: &Body, bodies: &[Body]) -> (Vector3, Ve let zero: Vector3 = Vector3::zero(); let (diff, diff2) = bodies .iter() - .fold( - (zero, zero), - |(mut diff, mut diff2), body| { - let r = body.position - prev.position; - - // make sure we are not testing the particle against its own position - let are_same = r == Vector3::zero(); - - let dist_sqrd = r.magnitude2(); - - if dist_sqrd < zone_sqrd && !are_same { - let length = dist_sqrd.sqrt(); - let percent = dist_sqrd / zone_sqrd; - - if dist_sqrd < repel { - let f = (repel / percent - 1.0) * 0.025; - let normal = (r / length) * f; - diff += normal; - diff2 += normal; - } else if dist_sqrd < align { - let thresh_delta = align - repel; - let adjusted_percent = (percent - repel) / thresh_delta; - let q = (0.5 - (adjusted_percent * PI * 2.0).cos() * 0.5 + 0.5) * 100.9; - - // normalize vel2 and multiply by factor - let vel2_length = body.velocity2.magnitude(); - let vel2 = (body.velocity2 / vel2_length) * q; - - // normalize own velocity - let vel_length = prev.velocity.magnitude(); - let vel = (prev.velocity / vel_length) * q; - - diff += vel2; - diff2 += vel; - } + .fold((zero, zero), |(mut diff, mut diff2), body| { + let r = body.position - prev.position; + + // make sure we are not testing the particle against its own position + let are_same = r == Vector3::zero(); + + let dist_sqrd = r.magnitude2(); + + if dist_sqrd < zone_sqrd && !are_same { + let length = dist_sqrd.sqrt(); + let percent = dist_sqrd / zone_sqrd; + + if dist_sqrd < repel { + let f = (repel / percent - 1.0) * 0.025; + let normal = (r / length) * f; + diff += normal; + diff2 += normal; + } else if dist_sqrd < align { + let thresh_delta = align - repel; + let adjusted_percent = (percent - repel) / thresh_delta; + let q = (0.5 - (adjusted_percent * PI * 2.0).cos() * 0.5 + 0.5) * 100.9; + + // normalize vel2 and multiply by factor + let vel2_length = body.velocity2.magnitude(); + let vel2 = (body.velocity2 / vel2_length) * q; + + // normalize own velocity + let vel_length = prev.velocity.magnitude(); + let vel = (prev.velocity / vel_length) * q; + + diff += vel2; + diff2 += vel; + } - if dist_sqrd > attract { // attract - let thresh_delta2 = 1.0 - attract; - let adjusted_percent2 = (percent - attract) / thresh_delta2; - let c = (1.0 - ((adjusted_percent2 * PI * 2.0).cos() * 0.5 + 0.5)) * attract_power; + if dist_sqrd > attract { + // attract + let thresh_delta2 = 1.0 - attract; + let adjusted_percent2 = (percent - attract) / thresh_delta2; + let c = + (1.0 - ((adjusted_percent2 * PI * 2.0).cos() * 0.5 + 0.5)) * attract_power; - // normalize the distance vector - let d = (r / length) * c; + // normalize the distance vector + let d = (r / length) * c; - diff += d; - diff2 -= d; - } + diff += d; + diff2 -= d; } + } - (diff, diff2) - }); + (diff, diff2) + }); acc += diff; acc2 += diff2; @@ -294,8 +299,7 @@ fn next_velocity(time: usize, prev: &Body, bodies: &[Body]) -> (Vector3, Ve } /// Compute next velocity of `prev` -fn next_velocity_par(time: usize, prev: &Body, bodies: &[Body]) - -> (Vector3, Vector3) { +fn next_velocity_par(time: usize, prev: &Body, bodies: &[Body]) -> (Vector3, Vector3) { let time = time as f64; let center = Point3 { x: (time / 22.0).cos() * -4200.0, @@ -344,8 +348,9 @@ fn next_velocity_par(time: usize, prev: &Body, bodies: &[Body]) let (diff, diff2) = bodies .par_iter() - .fold(|| (Vector3::zero(), Vector3::zero()), - |(mut diff, mut diff2), body| { + .fold( + || (Vector3::zero(), Vector3::zero()), + |(mut diff, mut diff2), body| { let r = body.position - prev.position; // make sure we are not testing the particle against its own position @@ -379,10 +384,12 @@ fn next_velocity_par(time: usize, prev: &Body, bodies: &[Body]) diff2 += vel; } - if dist_sqrd > attract { // attract + if dist_sqrd > attract { + // attract let thresh_delta2 = 1.0 - attract; let adjusted_percent2 = (percent - attract) / thresh_delta2; - let c = (1.0 - ((adjusted_percent2 * PI * 2.0).cos() * 0.5 + 0.5)) * attract_power; + let c = (1.0 - ((adjusted_percent2 * PI * 2.0).cos() * 0.5 + 0.5)) + * attract_power; // normalize the distance vector let d = (r / length) * c; @@ -393,9 +400,12 @@ fn next_velocity_par(time: usize, prev: &Body, bodies: &[Body]) } (diff, diff2) - }) - .reduce(|| (Vector3::zero(), Vector3::zero()), - |(diffa, diff2a), (diffb, diff2b)| (diffa + diffb, diff2a + diff2b)); + }, + ) + .reduce( + || (Vector3::zero(), Vector3::zero()), + |(diffa, diff2a), (diffb, diff2b)| (diffa + diffb, diff2a + diff2b), + ); acc += diff; acc2 += diff2; From 3b1fa384accc9e62603b5d5b75fc6d02b3ca287b Mon Sep 17 00:00:00 2001 From: Niko Matsakis Date: Tue, 27 Mar 2018 20:09:51 +0200 Subject: [PATCH 06/12] use as_parallel --- rayon-demo/src/nbody/nbody.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/rayon-demo/src/nbody/nbody.rs b/rayon-demo/src/nbody/nbody.rs index 5d917d299..c3193c883 100644 --- a/rayon-demo/src/nbody/nbody.rs +++ b/rayon-demo/src/nbody/nbody.rs @@ -111,6 +111,32 @@ impl NBodyBenchmark { out_bodies } + pub fn tick_par_as_parallel(&mut self) -> &[Body] { + let (in_bodies, out_bodies) = if (self.time & 1) == 0 { + (&self.bodies.0, &mut self.bodies.1) + } else { + (&self.bodies.1, &mut self.bodies.0) + }; + + let time = self.time; + out_bodies + .iter_mut() + .as_parallel() + .zip(&in_bodies[..]) + .for_each(|(out, prev)| { + let (vel, vel2) = next_velocity(time, prev, in_bodies); + out.velocity = vel; + out.velocity2 = vel2; + + let next_velocity = vel - vel2; + out.position = prev.position + next_velocity; + }); + + self.time += 1; + + out_bodies + } + pub fn tick_par_reduce(&mut self) -> &[Body] { let (in_bodies, out_bodies) = if (self.time & 1) == 0 { (&self.bodies.0, &mut self.bodies.1) From 1758509e71ec221e3e54fb577dec16647d52874b Mon Sep 17 00:00:00 2001 From: Niko Matsakis Date: Tue, 27 Mar 2018 20:16:27 +0200 Subject: [PATCH 07/12] make a version of nbody that uses `as_parallel` --- rayon-demo/src/nbody/bench.rs | 5 +++++ rayon-demo/src/nbody/nbody.rs | 5 ++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/rayon-demo/src/nbody/bench.rs b/rayon-demo/src/nbody/bench.rs index 3b2fbfb1a..dad713b84 100644 --- a/rayon-demo/src/nbody/bench.rs +++ b/rayon-demo/src/nbody/bench.rs @@ -30,6 +30,11 @@ fn nbody_par(b: &mut ::test::Bencher) { nbody_bench(b, |n| { n.tick_par(); }); } +#[bench] +fn nbody_par_as_parallel(b: &mut ::test::Bencher) { + nbody_bench(b, |n| { n.tick_par_as_parallel(); }); +} + #[bench] fn nbody_parreduce(b: &mut ::test::Bencher) { nbody_bench(b, |n| { n.tick_par_reduce(); }); diff --git a/rayon-demo/src/nbody/nbody.rs b/rayon-demo/src/nbody/nbody.rs index c3193c883..41855f5d0 100644 --- a/rayon-demo/src/nbody/nbody.rs +++ b/rayon-demo/src/nbody/nbody.rs @@ -32,6 +32,8 @@ use cgmath::{InnerSpace, Point3, Vector3, Zero}; use rand::{Rand, Rng}; use rayon::prelude::*; +#[cfg(test)] +use rayon::iter::AsParallel; use std::f64::consts::PI; const INITIAL_VELOCITY: f64 = 8.0; // set to 0.0 to turn off. @@ -111,6 +113,7 @@ impl NBodyBenchmark { out_bodies } + #[cfg(test)] pub fn tick_par_as_parallel(&mut self) -> &[Body] { let (in_bodies, out_bodies) = if (self.time & 1) == 0 { (&self.bodies.0, &mut self.bodies.1) @@ -121,8 +124,8 @@ impl NBodyBenchmark { let time = self.time; out_bodies .iter_mut() - .as_parallel() .zip(&in_bodies[..]) + .as_parallel() .for_each(|(out, prev)| { let (vel, vel2) = next_velocity(time, prev, in_bodies); out.velocity = vel; From 5ea74c10d8a994d01b2607366aea72796e00df99 Mon Sep 17 00:00:00 2001 From: Niko Matsakis Date: Tue, 27 Mar 2018 20:27:37 +0200 Subject: [PATCH 08/12] add game of life benchmark --- rayon-demo/src/life/bench.rs | 5 +++++ rayon-demo/src/life/mod.rs | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/rayon-demo/src/life/bench.rs b/rayon-demo/src/life/bench.rs index aa6ff53b7..b81717065 100644 --- a/rayon-demo/src/life/bench.rs +++ b/rayon-demo/src/life/bench.rs @@ -9,3 +9,8 @@ fn generations(b: &mut ::test::Bencher) { fn parallel_generations(b: &mut ::test::Bencher) { b.iter(|| super::parallel_generations(Board::new(200, 200).random(), 100)); } + +#[bench] +fn as_parallel_generations(b: &mut ::test::Bencher) { + b.iter(|| super::as_parallel_generations(Board::new(200, 200).random(), 100)); +} diff --git a/rayon-demo/src/life/mod.rs b/rayon-demo/src/life/mod.rs index 4868f1c65..f65a09f9d 100644 --- a/rayon-demo/src/life/mod.rs +++ b/rayon-demo/src/life/mod.rs @@ -20,6 +20,7 @@ use time; use docopt::Docopt; use rayon::prelude::*; +use rayon::iter::AsParallel; #[cfg(test)] mod bench; @@ -93,6 +94,15 @@ impl Board { self.next_board(new_brd) } + pub fn as_parallel_next_generation(&self) -> Board { + let new_brd = (0..self.len()) + .as_parallel() + .map(|cell| self.successor_cell(cell)) + .collect(); + + self.next_board(new_brd) + } + fn cell_live(&self, x: usize, y: usize) -> bool { !(x >= self.cols || y >= self.rows) && self.board[y * self.cols + x] } @@ -145,6 +155,11 @@ fn parallel_generations(board: Board, gens: usize) { for _ in 0..gens { brd = brd.parallel_next_generation(); } } +fn as_parallel_generations(board: Board, gens: usize) { + let mut brd = board; + for _ in 0..gens { brd = brd.as_parallel_next_generation(); } +} + fn measure(f: fn(Board, usize) -> (), args: &Args) -> u64 { let (n, gens) = (args.flag_size, args.flag_gens); let brd = Board::new(n, n).random(); @@ -168,5 +183,9 @@ pub fn main(args: &[String]) { let parallel = measure(parallel_generations, &args); println!("parallel: {:10} ns -> {:.2}x speedup", parallel, serial as f64 / parallel as f64); + + let as_parallel = measure(as_parallel_generations, &args); + println!("as_parallel: {:10} ns -> {:.2}x speedup", as_parallel, + serial as f64 / as_parallel as f64); } } From 3f7a6584874053d2db68ef57b47a12a56408a9ad Mon Sep 17 00:00:00 2001 From: QuietMisdreavus Date: Wed, 25 Apr 2018 16:49:35 -0500 Subject: [PATCH 09/12] rename AsParallel to ParallelBridge --- rayon-demo/src/life/bench.rs | 2 +- rayon-demo/src/life/mod.rs | 16 ++++++++-------- rayon-demo/src/nbody/bench.rs | 4 ++-- rayon-demo/src/nbody/nbody.rs | 6 +++--- src/iter/mod.rs | 4 ++-- src/iter/{as_parallel.rs => par_bridge.rs} | 8 ++++---- 6 files changed, 20 insertions(+), 20 deletions(-) rename src/iter/{as_parallel.rs => par_bridge.rs} (97%) diff --git a/rayon-demo/src/life/bench.rs b/rayon-demo/src/life/bench.rs index b81717065..c589b1c79 100644 --- a/rayon-demo/src/life/bench.rs +++ b/rayon-demo/src/life/bench.rs @@ -12,5 +12,5 @@ fn parallel_generations(b: &mut ::test::Bencher) { #[bench] fn as_parallel_generations(b: &mut ::test::Bencher) { - b.iter(|| super::as_parallel_generations(Board::new(200, 200).random(), 100)); + b.iter(|| super::par_bridge_generations(Board::new(200, 200).random(), 100)); } diff --git a/rayon-demo/src/life/mod.rs b/rayon-demo/src/life/mod.rs index f65a09f9d..2d7b99ea7 100644 --- a/rayon-demo/src/life/mod.rs +++ b/rayon-demo/src/life/mod.rs @@ -20,7 +20,7 @@ use time; use docopt::Docopt; use rayon::prelude::*; -use rayon::iter::AsParallel; +use rayon::iter::ParallelBridge; #[cfg(test)] mod bench; @@ -94,9 +94,9 @@ impl Board { self.next_board(new_brd) } - pub fn as_parallel_next_generation(&self) -> Board { + pub fn par_bridge_next_generation(&self) -> Board { let new_brd = (0..self.len()) - .as_parallel() + .par_bridge() .map(|cell| self.successor_cell(cell)) .collect(); @@ -155,9 +155,9 @@ fn parallel_generations(board: Board, gens: usize) { for _ in 0..gens { brd = brd.parallel_next_generation(); } } -fn as_parallel_generations(board: Board, gens: usize) { +fn par_bridge_generations(board: Board, gens: usize) { let mut brd = board; - for _ in 0..gens { brd = brd.as_parallel_next_generation(); } + for _ in 0..gens { brd = brd.par_bridge_next_generation(); } } fn measure(f: fn(Board, usize) -> (), args: &Args) -> u64 { @@ -184,8 +184,8 @@ pub fn main(args: &[String]) { println!("parallel: {:10} ns -> {:.2}x speedup", parallel, serial as f64 / parallel as f64); - let as_parallel = measure(as_parallel_generations, &args); - println!("as_parallel: {:10} ns -> {:.2}x speedup", as_parallel, - serial as f64 / as_parallel as f64); + let par_bridge = measure(par_bridge_generations, &args); + println!("par_bridge: {:10} ns -> {:.2}x speedup", par_bridge, + serial as f64 / par_bridge as f64); } } diff --git a/rayon-demo/src/nbody/bench.rs b/rayon-demo/src/nbody/bench.rs index dad713b84..d772a8581 100644 --- a/rayon-demo/src/nbody/bench.rs +++ b/rayon-demo/src/nbody/bench.rs @@ -31,8 +31,8 @@ fn nbody_par(b: &mut ::test::Bencher) { } #[bench] -fn nbody_par_as_parallel(b: &mut ::test::Bencher) { - nbody_bench(b, |n| { n.tick_par_as_parallel(); }); +fn nbody_par_bridge(b: &mut ::test::Bencher) { + nbody_bench(b, |n| { n.tick_par_bridge(); }); } #[bench] diff --git a/rayon-demo/src/nbody/nbody.rs b/rayon-demo/src/nbody/nbody.rs index 41855f5d0..43c93d16b 100644 --- a/rayon-demo/src/nbody/nbody.rs +++ b/rayon-demo/src/nbody/nbody.rs @@ -33,7 +33,7 @@ use cgmath::{InnerSpace, Point3, Vector3, Zero}; use rand::{Rand, Rng}; use rayon::prelude::*; #[cfg(test)] -use rayon::iter::AsParallel; +use rayon::iter::ParallelBridge; use std::f64::consts::PI; const INITIAL_VELOCITY: f64 = 8.0; // set to 0.0 to turn off. @@ -114,7 +114,7 @@ impl NBodyBenchmark { } #[cfg(test)] - pub fn tick_par_as_parallel(&mut self) -> &[Body] { + pub fn tick_par_bridge(&mut self) -> &[Body] { let (in_bodies, out_bodies) = if (self.time & 1) == 0 { (&self.bodies.0, &mut self.bodies.1) } else { @@ -125,7 +125,7 @@ impl NBodyBenchmark { out_bodies .iter_mut() .zip(&in_bodies[..]) - .as_parallel() + .par_bridge() .for_each(|(out, prev)| { let (vel, vel2) = next_velocity(time, prev, in_bodies); out.velocity = vel; diff --git a/src/iter/mod.rs b/src/iter/mod.rs index 4712ad7ff..1ddceddbf 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -84,8 +84,8 @@ use self::plumbing::*; // e.g. `find::find()`, are always used **prefixed**, so that they // can be readily distinguished. -mod as_parallel; -pub use self::as_parallel::{AsParallel, IterParallel}; +mod par_bridge; +pub use self::par_bridge::{ParallelBridge, IterParallel}; mod find; mod find_first_last; diff --git a/src/iter/as_parallel.rs b/src/iter/par_bridge.rs similarity index 97% rename from src/iter/as_parallel.rs rename to src/iter/par_bridge.rs index b765d6563..1f167beca 100644 --- a/src/iter/as_parallel.rs +++ b/src/iter/par_bridge.rs @@ -12,7 +12,7 @@ use current_num_threads; /// /// This needs to be distinct from `IntoParallelIterator` because that trait is already implemented /// on a few `Iterator`s, like `std::ops::Range`. -pub trait AsParallel { +pub trait ParallelBridge { /// What is the type of the output `ParallelIterator`? type Iter: ParallelIterator; @@ -20,16 +20,16 @@ pub trait AsParallel { type Item: Send; /// Convert this type to a `ParallelIterator`. - fn as_parallel(self) -> Self::Iter; + fn par_bridge(self) -> Self::Iter; } -impl AsParallel for T +impl ParallelBridge for T where T::Item: Send { type Iter = IterParallel; type Item = T::Item; - fn as_parallel(self) -> Self::Iter { + fn par_bridge(self) -> Self::Iter { IterParallel { iter: self, } From 3d54ea21e1d643cac131e8c18a522d87c24d5947 Mon Sep 17 00:00:00 2001 From: QuietMisdreavus Date: Wed, 30 May 2018 14:17:01 -0500 Subject: [PATCH 10/12] update docs for ParallelBridge and IterParallel --- src/iter/par_bridge.rs | 40 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/src/iter/par_bridge.rs b/src/iter/par_bridge.rs index 1f167beca..8d45e26bb 100644 --- a/src/iter/par_bridge.rs +++ b/src/iter/par_bridge.rs @@ -10,8 +10,39 @@ use current_num_threads; /// Conversion trait to convert an `Iterator` to a `ParallelIterator`. /// -/// This needs to be distinct from `IntoParallelIterator` because that trait is already implemented -/// on a few `Iterator`s, like `std::ops::Range`. +/// This creates a "bridge" from a sequential iterator to a parallel one, by distributing its items +/// across the Rayon thread pool. This has the advantage of being able to parallelize just about +/// anything, but the resulting `ParallelIterator` can be less efficient than if you started with +/// `par_iter` instead. However, it can still be useful for iterators that are difficult to +/// parallelize by other means, like channels or file or network I/O. +/// +/// The resulting iterator is not guaranteed to keep the order of the original iterator. +/// +/// # Examples +/// +/// To use this trait, take an existing `Iterator` and call `par_bridge` on it. After that, you can +/// use any of the `ParallelIterator` methods: +/// +/// ``` +/// use rayon::iter::ParallelBridge; +/// use rayon::prelude::ParallelIterator; +/// use std::sync::mpsc::channel; +/// +/// let rx = { +/// let (tx, rx) = channel(); +/// +/// tx.send("one!"); +/// tx.send("two!"); +/// tx.send("three!"); +/// +/// rx +/// }; +/// +/// let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect(); +/// output.sort_unstable(); +/// +/// assert_eq!(&*output, &["one!", "three!", "two!"]); +/// ``` pub trait ParallelBridge { /// What is the type of the output `ParallelIterator`? type Iter: ParallelIterator; @@ -37,6 +68,11 @@ impl ParallelBridge for T } /// `IterParallel` is a parallel iterator that wraps a sequential iterator. +/// +/// This type is created when using the `par_bridge` method on `ParallelBridge`. See the +/// [`ParallelBridge`] documentation for details. +/// +/// [`ParallelBridge`]: trait.ParallelBridge.html #[derive(Debug)] pub struct IterParallel { iter: Iter, From 8bf18d76968bb9c780909efd182e35e9dccfb9e7 Mon Sep 17 00:00:00 2001 From: QuietMisdreavus Date: Wed, 6 Jun 2018 13:19:24 -0500 Subject: [PATCH 11/12] tweak ParallelBridge and IterBridge --- src/iter/mod.rs | 2 +- src/iter/par_bridge.rs | 25 ++++++++----------------- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/src/iter/mod.rs b/src/iter/mod.rs index 1ddceddbf..49cbb7f82 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -85,7 +85,7 @@ use self::plumbing::*; // can be readily distinguished. mod par_bridge; -pub use self::par_bridge::{ParallelBridge, IterParallel}; +pub use self::par_bridge::{ParallelBridge, IterBridge}; mod find; mod find_first_last; diff --git a/src/iter/par_bridge.rs b/src/iter/par_bridge.rs index 8d45e26bb..69dae0d52 100644 --- a/src/iter/par_bridge.rs +++ b/src/iter/par_bridge.rs @@ -43,42 +43,33 @@ use current_num_threads; /// /// assert_eq!(&*output, &["one!", "three!", "two!"]); /// ``` -pub trait ParallelBridge { - /// What is the type of the output `ParallelIterator`? - type Iter: ParallelIterator; - - /// What is the `Item` of the output `ParallelIterator`? - type Item: Send; - - /// Convert this type to a `ParallelIterator`. - fn par_bridge(self) -> Self::Iter; +pub trait ParallelBridge: Sized { + /// Create a bridge from this type to a `ParallelIterator`. + fn par_bridge(self) -> IterBridge; } impl ParallelBridge for T where T::Item: Send { - type Iter = IterParallel; - type Item = T::Item; - - fn par_bridge(self) -> Self::Iter { - IterParallel { + fn par_bridge(self) -> IterBridge { + IterBridge { iter: self, } } } -/// `IterParallel` is a parallel iterator that wraps a sequential iterator. +/// `IterBridge` is a parallel iterator that wraps a sequential iterator. /// /// This type is created when using the `par_bridge` method on `ParallelBridge`. See the /// [`ParallelBridge`] documentation for details. /// /// [`ParallelBridge`]: trait.ParallelBridge.html #[derive(Debug)] -pub struct IterParallel { +pub struct IterBridge { iter: Iter, } -impl ParallelIterator for IterParallel +impl ParallelIterator for IterBridge where Iter::Item: Send { type Item = Iter::Item; From d488733c5ac381bbf283e6e881bdf280268a3254 Mon Sep 17 00:00:00 2001 From: QuietMisdreavus Date: Wed, 6 Jun 2018 14:21:37 -0500 Subject: [PATCH 12/12] updates to IterBridge + add ParallelBridge to prelude --- src/iter/par_bridge.rs | 2 +- src/prelude.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/iter/par_bridge.rs b/src/iter/par_bridge.rs index 69dae0d52..698ac099b 100644 --- a/src/iter/par_bridge.rs +++ b/src/iter/par_bridge.rs @@ -64,7 +64,7 @@ impl ParallelBridge for T /// [`ParallelBridge`] documentation for details. /// /// [`ParallelBridge`]: trait.ParallelBridge.html -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct IterBridge { iter: Iter, } diff --git a/src/prelude.rs b/src/prelude.rs index a13d6a378..54b4e8696 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -9,6 +9,7 @@ pub use iter::IntoParallelRefMutIterator; pub use iter::IndexedParallelIterator; pub use iter::ParallelExtend; pub use iter::ParallelIterator; +pub use iter::ParallelBridge; pub use slice::ParallelSlice; pub use slice::ParallelSliceMut; pub use str::ParallelString;