From bd831992681966c1188aa16f2ca6111ffd8bbcc9 Mon Sep 17 00:00:00 2001 From: xiaoniu Date: Wed, 4 Mar 2020 18:27:28 -0500 Subject: [PATCH 1/7] add indexed scheduler api --- src/iter/mod.rs | 15 +++++ src/iter/scheduler.rs | 147 ++++++++++++++++++++++++++++++++++++++++++ src/iter/test.rs | 42 ++++++++++++ 3 files changed, 204 insertions(+) create mode 100644 src/iter/scheduler.rs diff --git a/src/iter/mod.rs b/src/iter/mod.rs index d571d3729..85df061b4 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -100,6 +100,11 @@ use std::ops::Fn; mod par_bridge; pub use self::par_bridge::{IterBridge, ParallelBridge}; +mod scheduler; +pub use self::scheduler::{ + DefaultScheduler, Scheduler, UnindexedScheduler, WithScheduler, WithUnindexedScheduler, +}; + mod chain; mod find; mod find_first_last; @@ -349,6 +354,11 @@ pub trait ParallelIterator: Sized + Send { /// [`for_each`]: #method.for_each type Item: Send; + /// Sets the "scheduler" that this thread-pool will use. + fn with_unindexed_scheduler(self, scheduler: S) -> WithUnindexedScheduler { + WithUnindexedScheduler::new(self, scheduler) + } + /// Executes `OP` on each item produced by the iterator, in parallel. /// /// # Examples @@ -2085,6 +2095,11 @@ impl IntoParallelIterator for T { /// /// **Note:** Not implemented for `u64`, `i64`, `u128`, or `i128` ranges pub trait IndexedParallelIterator: ParallelIterator { + /// Sets the "scheduler" that this thread-pool will use. + fn with_scheduler(self, scheduler: S) -> WithScheduler { + WithScheduler::new(self, scheduler) + } + /// Collects the results of the iterator into the specified /// vector. The vector is always truncated before execution /// begins. If possible, reusing the vector across calls can lead diff --git a/src/iter/scheduler.rs b/src/iter/scheduler.rs new file mode 100644 index 000000000..81e059d09 --- /dev/null +++ b/src/iter/scheduler.rs @@ -0,0 +1,147 @@ +use super::plumbing::*; +use super::*; + +/// Scheduler for Indexed Parallel Iterator +pub trait Scheduler: Send { + /// Consume one Indexed Producer and one Indexed Consumer. Split the work acordingly. + fn bridge(&mut self, len: usize, producer: P, consumer: C) -> C::Result + where + P: Producer, + C: Consumer; +} + +/// Scheduler for Unindexed Parallel Iterator +pub trait UnindexedScheduler: Send { + /// Consume one Unindexed Producer and one Unindexed Consumer. Split the work acordingly. + fn bridge_unindexed(&mut self, producer: P, consumer: C) -> C::Result + where + P: UnindexedProducer, + C: UnindexedConsumer; +} + +/// Default Scheduler. +/// When used as Indexed Scheduler, Thief-splitting will be used. +/// When used as Unindexed Scheduler, tasks will be divided to minimum piece. +#[derive(Debug, Clone, Default)] +pub struct DefaultScheduler; + +impl Scheduler for DefaultScheduler { + fn bridge(&mut self, len: usize, producer: P, consumer: C) -> C::Result + where + P: Producer, + C: Consumer, + { + bridge_producer_consumer(len, producer, consumer) + } +} + +impl UnindexedScheduler for DefaultScheduler { + fn bridge_unindexed(&mut self, producer: P, consumer: C) -> C::Result + where + P: UnindexedProducer, + C: UnindexedConsumer, + { + bridge_unindexed(producer, consumer) + } +} + +/// `WithScheduler` is an iterator that enclose one Indexed Scheduler. +#[derive(Debug)] +pub struct WithScheduler { + base: I, + scheduler: S, +} + +/// `WithUnindexedScheduler` is an iterator that enclose one Unindexed Scheduler. +#[derive(Debug)] +pub struct WithUnindexedScheduler { + base: I, + scheduler: S, +} + +impl WithScheduler { + /// Create a new `WithUnindexedScheduler` iterator. + pub(super) fn new(base: I, scheduler: S) -> Self { + WithScheduler { base, scheduler } + } +} + +impl WithUnindexedScheduler { + /// Create a new `WithUnindexedScheduler` iterator. + pub(super) fn new(base: I, scheduler: S) -> Self { + WithUnindexedScheduler { base, scheduler } + } +} + +impl ParallelIterator for WithScheduler { + type Item = I::Item; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + IndexedParallelIterator::drive(self, consumer) + } + + fn opt_len(&self) -> Option { + self.base.opt_len() + } +} + +impl IndexedParallelIterator for WithScheduler { + fn drive(self, consumer: C) -> C::Result + where + C: Consumer, + { + let len = self.base.len(); + return self.base.with_producer(Callback { + len: len, + scheduler: self.scheduler, + consumer: consumer, + }); + + struct Callback { + len: usize, + scheduler: S, + consumer: C, + } + + impl ProducerCallback for Callback + where + S: Scheduler, + C: Consumer, + { + type Output = C::Result; + fn callback

(mut self, producer: P) -> C::Result + where + P: Producer, + { + self.scheduler.bridge(self.len, producer, self.consumer) + } + } + } + fn len(&self) -> usize { + self.base.len() + } + fn with_producer(self, _callback: CB) -> CB::Output + where + CB: ProducerCallback, + { + panic!("After scheduler, the iterator should not be used as producer."); + } +} + +impl ParallelIterator for WithUnindexedScheduler { + type Item = I::Item; + + fn drive_unindexed(self, _consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + todo!() + } + + fn opt_len(&self) -> Option { + self.base.opt_len() + } +} diff --git a/src/iter/test.rs b/src/iter/test.rs index bc5106bec..75bfa7130 100644 --- a/src/iter/test.rs +++ b/src/iter/test.rs @@ -2186,3 +2186,45 @@ fn check_update() { assert_eq!(v, vec![vec![1, 0], vec![3, 2, 1, 0]]); } + +#[test] +fn check_schedule() { + let v = vec![1, 2, 3]; + + let r: i32 = v + .par_iter() + .with_scheduler(DefaultScheduler) + .map(|v| v * v) + .flat_map(|v| vec![1; v as usize]) + .sum(); + assert_eq!(r, 14); + + let r: i32 = v + .par_iter() + .map(|v| v * v) + .with_scheduler(DefaultScheduler) + .flat_map(|v| vec![1; v as usize]) + .sum(); + assert_eq!(r, 14); + + let r: i32 = v + .par_iter() + .zip(v.par_iter()) + .with_scheduler(DefaultScheduler) + .map(|(a, b)| a * b) + .sum(); + assert_eq!(r, 14); +} + +#[test] +#[should_panic(expected = "After scheduler, the iterator should not be used as producer.")] +fn check_schedule_fail_as_producer() { + let v = vec![1, 2, 3]; + let r: i32 = v + .par_iter() + .with_scheduler(DefaultScheduler) + .zip(v.par_iter()) + .map(|(a, b)| a * b) + .sum(); + assert_eq!(r, 14); +} From 7a9f2940b1b37d3ba2a33386a285e02a4d3baad0 Mon Sep 17 00:00:00 2001 From: xiaoniu Date: Wed, 4 Mar 2020 18:48:20 -0500 Subject: [PATCH 2/7] prevent `WithScheduler` being used as Producer --- src/iter/scheduler.rs | 14 +++++++++++++- src/iter/test.rs | 19 +++++++------------ 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/iter/scheduler.rs b/src/iter/scheduler.rs index 81e059d09..109c29b5d 100644 --- a/src/iter/scheduler.rs +++ b/src/iter/scheduler.rs @@ -73,6 +73,18 @@ impl WithUnindexedScheduler { } } +macro_rules! no_link { + ($l:literal) => { + { + extern "C" { + #[link_name = $l] + fn trigger() -> !; + } + unsafe { trigger() } + } + }; +} + impl ParallelIterator for WithScheduler { type Item = I::Item; @@ -127,7 +139,7 @@ impl IndexedParallelIterator for WithS where CB: ProducerCallback, { - panic!("After scheduler, the iterator should not be used as producer."); + no_link!( "\n\nERROR[rayon]: After `with_scheduler`, the iterator should not be used as producer.\nFor example, `Zip` works in producer mode, so `with_scheduler` should not happen before any `zip`.\nFor more information about producer and customer mode, please refer to https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md\n") } } diff --git a/src/iter/test.rs b/src/iter/test.rs index 75bfa7130..8628ae3f1 100644 --- a/src/iter/test.rs +++ b/src/iter/test.rs @@ -2214,17 +2214,12 @@ fn check_schedule() { .map(|(a, b)| a * b) .sum(); assert_eq!(r, 14); -} -#[test] -#[should_panic(expected = "After scheduler, the iterator should not be used as producer.")] -fn check_schedule_fail_as_producer() { - let v = vec![1, 2, 3]; - let r: i32 = v - .par_iter() - .with_scheduler(DefaultScheduler) - .zip(v.par_iter()) - .map(|(a, b)| a * b) - .sum(); - assert_eq!(r, 14); + // should not compile + // let r: i32 = v + // .par_iter() + // .zip(v.par_iter()) + // .with_scheduler(DefaultScheduler) + // .map(|(a, b)| a * b) + // .sum(); } From 35ce1298c2bce1a15421397f8c55b55914ca8c3a Mon Sep 17 00:00:00 2001 From: xiaoniu Date: Wed, 4 Mar 2020 19:29:02 -0500 Subject: [PATCH 3/7] refactor module structure --- src/iter/mod.rs | 2 +- src/iter/scheduler/misc.rs | 28 +++++++++++++++++ src/iter/{scheduler.rs => scheduler/mod.rs} | 33 ++++----------------- 3 files changed, 35 insertions(+), 28 deletions(-) create mode 100644 src/iter/scheduler/misc.rs rename src/iter/{scheduler.rs => scheduler/mod.rs} (83%) diff --git a/src/iter/mod.rs b/src/iter/mod.rs index 85df061b4..0e8860424 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -100,7 +100,7 @@ use std::ops::Fn; mod par_bridge; pub use self::par_bridge::{IterBridge, ParallelBridge}; -mod scheduler; +pub mod scheduler; pub use self::scheduler::{ DefaultScheduler, Scheduler, UnindexedScheduler, WithScheduler, WithUnindexedScheduler, }; diff --git a/src/iter/scheduler/misc.rs b/src/iter/scheduler/misc.rs new file mode 100644 index 000000000..922116aad --- /dev/null +++ b/src/iter/scheduler/misc.rs @@ -0,0 +1,28 @@ +//! This module contains useful schedulers. +use super::*; + +/// Default Scheduler. +/// When used as Indexed Scheduler, Thief-splitting will be used. +/// When used as Unindexed Scheduler, tasks will be divided to minimum piece. +#[derive(Debug, Clone, Default)] +pub struct DefaultScheduler; + +impl Scheduler for DefaultScheduler { + fn bridge(&mut self, len: usize, producer: P, consumer: C) -> C::Result + where + P: Producer, + C: Consumer, + { + bridge_producer_consumer(len, producer, consumer) + } +} + +impl UnindexedScheduler for DefaultScheduler { + fn bridge_unindexed(&mut self, producer: P, consumer: C) -> C::Result + where + P: UnindexedProducer, + C: UnindexedConsumer, + { + bridge_unindexed(producer, consumer) + } +} diff --git a/src/iter/scheduler.rs b/src/iter/scheduler/mod.rs similarity index 83% rename from src/iter/scheduler.rs rename to src/iter/scheduler/mod.rs index 109c29b5d..3cdf83e90 100644 --- a/src/iter/scheduler.rs +++ b/src/iter/scheduler/mod.rs @@ -1,6 +1,11 @@ -use super::plumbing::*; +//! This module is dedicated to custom scheduler API and useful schedulers. + use super::*; +pub mod misc; + +pub use misc::*; + /// Scheduler for Indexed Parallel Iterator pub trait Scheduler: Send { /// Consume one Indexed Producer and one Indexed Consumer. Split the work acordingly. @@ -19,32 +24,6 @@ pub trait UnindexedScheduler: Send { C: UnindexedConsumer; } -/// Default Scheduler. -/// When used as Indexed Scheduler, Thief-splitting will be used. -/// When used as Unindexed Scheduler, tasks will be divided to minimum piece. -#[derive(Debug, Clone, Default)] -pub struct DefaultScheduler; - -impl Scheduler for DefaultScheduler { - fn bridge(&mut self, len: usize, producer: P, consumer: C) -> C::Result - where - P: Producer, - C: Consumer, - { - bridge_producer_consumer(len, producer, consumer) - } -} - -impl UnindexedScheduler for DefaultScheduler { - fn bridge_unindexed(&mut self, producer: P, consumer: C) -> C::Result - where - P: UnindexedProducer, - C: UnindexedConsumer, - { - bridge_unindexed(producer, consumer) - } -} - /// `WithScheduler` is an iterator that enclose one Indexed Scheduler. #[derive(Debug)] pub struct WithScheduler { From 64372ed80727dac114d52b78125b129bc0c0eaa7 Mon Sep 17 00:00:00 2001 From: xiaoniu Date: Wed, 4 Mar 2020 19:35:31 -0500 Subject: [PATCH 4/7] add sequential scheduler --- src/iter/mod.rs | 5 ++--- src/iter/scheduler/misc.rs | 25 +++++++++++++++++++++++++ src/iter/test.rs | 12 ++++++++++++ 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/src/iter/mod.rs b/src/iter/mod.rs index 0e8860424..60885719f 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -101,9 +101,8 @@ mod par_bridge; pub use self::par_bridge::{IterBridge, ParallelBridge}; pub mod scheduler; -pub use self::scheduler::{ - DefaultScheduler, Scheduler, UnindexedScheduler, WithScheduler, WithUnindexedScheduler, -}; +pub use self::scheduler::{Scheduler, UnindexedScheduler, WithScheduler, WithUnindexedScheduler}; +pub use self::scheduler::misc::*; mod chain; mod find; diff --git a/src/iter/scheduler/misc.rs b/src/iter/scheduler/misc.rs index 922116aad..9a51046b2 100644 --- a/src/iter/scheduler/misc.rs +++ b/src/iter/scheduler/misc.rs @@ -26,3 +26,28 @@ impl UnindexedScheduler for DefaultScheduler { bridge_unindexed(producer, consumer) } } + +/// Dummy Sequential Scheduler. +/// No parallel is used at all. +#[derive(Debug, Clone, Default)] +pub struct SequentialScheduler; + +impl Scheduler for SequentialScheduler { + fn bridge(&mut self, _len: usize, producer: P, consumer: C) -> C::Result + where + P: Producer, + C: Consumer, + { + producer.fold_with(consumer.into_folder()).complete() + } +} + +impl UnindexedScheduler for SequentialScheduler { + fn bridge_unindexed(&mut self, producer: P, consumer: C) -> C::Result + where + P: UnindexedProducer, + C: UnindexedConsumer, + { + producer.fold_with(consumer.into_folder()).complete() + } +} diff --git a/src/iter/test.rs b/src/iter/test.rs index 8628ae3f1..53f8e2e2c 100644 --- a/src/iter/test.rs +++ b/src/iter/test.rs @@ -2223,3 +2223,15 @@ fn check_schedule() { // .map(|(a, b)| a * b) // .sum(); } + +#[test] +fn check_schedule_sequential() { + let v = vec![1, 2, 3]; + let r: i32 = v + .par_iter() + .with_scheduler(SequentialScheduler) + .map(|v| v * v) + .flat_map(|v| vec![1; v as usize]) + .sum(); + assert_eq!(r, 14); +} From fb96d27b94a36f7388189ea9578a6b2592e4d09f Mon Sep 17 00:00:00 2001 From: xiaoniu Date: Wed, 4 Mar 2020 23:10:41 -0500 Subject: [PATCH 5/7] add fixed length scheduler and static scheduler --- src/iter/scheduler/misc.rs | 111 +++++++++++++++++++++++++++++++++++++ src/iter/test.rs | 28 +++++++++- 2 files changed, 137 insertions(+), 2 deletions(-) diff --git a/src/iter/scheduler/misc.rs b/src/iter/scheduler/misc.rs index 9a51046b2..ecdbf4b7e 100644 --- a/src/iter/scheduler/misc.rs +++ b/src/iter/scheduler/misc.rs @@ -51,3 +51,114 @@ impl UnindexedScheduler for SequentialScheduler { producer.fold_with(consumer.into_folder()).complete() } } + +fn static_partition_bridge(positions: &[usize], producer: P, consumer: C) -> C::Result +where + P: Producer, + C: Consumer, +{ + fn helper(positions: &[usize], bias: usize, producer: P, consumer: C) -> C::Result + where + P: Producer, + C: Consumer, + { + if consumer.full() { + consumer.into_folder().complete() + } else if positions.len() > 0 { + let mid_index = positions.len() / 2; + let position = positions[mid_index]; + + let (left_producer, right_producer) = producer.split_at(position - bias); + let (left_consumer, right_consumer, reducer) = consumer.split_at(position - bias); + + use crate::join; + let (left_result, right_result) = join( + || helper(&positions[0..mid_index], bias, left_producer, left_consumer), + || { + helper( + &positions[mid_index + 1..], + position, + right_producer, + right_consumer, + ) + }, + ); + reducer.reduce(left_result, right_result) + } else { + producer.fold_with(consumer.into_folder()).complete() + } + } + helper(positions, 0, producer, consumer) +} + +/// Fixed length scheduler. +/// Every tasks assigned to a thread will contain a fixed number of items, +/// except for the last task which will possibly contain less. +/// The parameter in `with_min_len` and `with_max_len` will be ignored. +#[derive(Debug, Clone, Default)] +pub struct FixedLengthScheduler { + fixed_length: usize, +} + +impl FixedLengthScheduler { + /// Create fixed length scheduler with assigned length. Length must be greater than or equal to 1. + pub fn new(fixed_length: usize) -> Self { + if fixed_length == 0 { + panic!("Length must be greater than or equal to 1.") + }; + Self { fixed_length } + } +} + +impl Scheduler for FixedLengthScheduler { + fn bridge(&mut self, len: usize, producer: P, consumer: C) -> C::Result + where + P: Producer, + C: Consumer, + { + let positions: Vec<_> = (0..len).step_by(self.fixed_length).skip(1).collect(); + static_partition_bridge(&positions, producer, consumer) + } +} + +/// Static split scheduler. +/// Given a chunk size, this scheduler will divide all items evenly based on their +/// length to create `current_num_threads()` number of tasks. +/// The length of each task should be multiple of the chunk size, except for the last task. +#[derive(Debug, Clone, Default)] +pub struct StaticScheduler { + chunk_size: usize, +} + +impl StaticScheduler { + /// Create static split scheduler with default chunk size 1. + pub fn new() -> Self { + Self { chunk_size: 1 } + } + /// Create static split scheduler with assigned chunk size. Chunk size must be greater than or equal to 1. + pub fn with_chunk_size(chunk_size: usize) -> Self { + if chunk_size == 0 { + panic!("Chunk size must be greater than or equal to 1.") + }; + Self { + chunk_size: chunk_size, + } + } +} + +impl Scheduler for StaticScheduler { + fn bridge(&mut self, len: usize, producer: P, consumer: C) -> C::Result + where + P: Producer, + C: Consumer, + { + use crate::current_num_threads; + let num_threads = current_num_threads(); + let full_chunks = len / self.chunk_size; + let positions: Vec<_> = (1..num_threads) + .map(|i| (i * full_chunks) / num_threads * self.chunk_size) + .collect(); + + static_partition_bridge(&positions, producer, consumer) + } +} diff --git a/src/iter/test.rs b/src/iter/test.rs index 53f8e2e2c..88ee25881 100644 --- a/src/iter/test.rs +++ b/src/iter/test.rs @@ -2226,12 +2226,36 @@ fn check_schedule() { #[test] fn check_schedule_sequential() { - let v = vec![1, 2, 3]; + let v: Vec<_> = (1..100).collect(); let r: i32 = v .par_iter() .with_scheduler(SequentialScheduler) .map(|v| v * v) .flat_map(|v| vec![1; v as usize]) .sum(); - assert_eq!(r, 14); + assert_eq!(r, 328350); +} + +#[test] +fn check_schedule_fixed_length() { + let v: Vec<_> = (1..100).collect(); + let r: i32 = v + .par_iter() + .with_scheduler(FixedLengthScheduler::new(1)) + .map(|v| v * v) + .flat_map(|v| vec![1; v as usize]) + .sum(); + assert_eq!(r, 328350); +} + +#[test] +fn check_schedule_static() { + let v: Vec<_> = (1..100).collect(); + let r: i32 = v + .par_iter() + .with_scheduler(StaticScheduler::with_chunk_size(1)) + .map(|v| v * v) + .flat_map(|v| vec![1; v as usize]) + .sum(); + assert_eq!(r, 328350); } From 94c107d718b61aecfbea53bfe6af13f2daf1feb9 Mon Sep 17 00:00:00 2001 From: xiaoniu Date: Thu, 5 Mar 2020 23:33:58 -0500 Subject: [PATCH 6/7] add nbody benchmarks --- rayon-demo/src/nbody/mod.rs | 56 ++++++++++++++++++++++++++++--- rayon-demo/src/nbody/nbody.rs | 28 +++++++++++++++- rayon-demo/src/nbody/visualize.rs | 2 ++ 3 files changed, 80 insertions(+), 6 deletions(-) diff --git a/rayon-demo/src/nbody/mod.rs b/rayon-demo/src/nbody/mod.rs index ab46bc9d2..c9b67f1f6 100644 --- a/rayon-demo/src/nbody/mod.rs +++ b/rayon-demo/src/nbody/mod.rs @@ -45,6 +45,8 @@ Ported from the RiverTrail demo found at: #[derive(Copy, Clone, PartialEq, Eq, serde::Deserialize)] pub enum ExecutionMode { Par, + ParSched, + ParBridge, ParReduce, Seq, } @@ -77,6 +79,8 @@ pub fn main(args: &[String]) { fn run_benchmarks(mode: Option, bodies: usize, ticks: usize) { let run_par = mode.map(|m| m == ExecutionMode::Par).unwrap_or(true); + let run_par_schedule = mode.map(|m| m == ExecutionMode::ParSched).unwrap_or(true); + let run_par_bridge = mode.map(|m| m == ExecutionMode::ParBridge).unwrap_or(true); let run_par_reduce = mode.map(|m| m == ExecutionMode::ParReduce).unwrap_or(true); let run_seq = mode.map(|m| m == ExecutionMode::Seq).unwrap_or(true); @@ -90,7 +94,41 @@ fn run_benchmarks(mode: Option, bodies: usize, ticks: usize) { } let par_time = par_start.elapsed().as_nanos(); - println!("Parallel time : {} ns", par_time); + println!("Parallel time\t: {} ns", par_time); + + Some(par_time) + } else { + None + }; + + let par_schedule_time = if run_par_schedule { + let mut rng = crate::seeded_rng(); + let mut benchmark = NBodyBenchmark::new(bodies, &mut rng); + let par_start = Instant::now(); + + for _ in 0..ticks { + benchmark.tick_par_schedule(); + } + + let par_time = par_start.elapsed().as_nanos(); + println!("ParSched time\t: {} ns", par_time); + + Some(par_time) + } else { + None + }; + + let par_bridge_time = if run_par_bridge { + let mut rng = crate::seeded_rng(); + let mut benchmark = NBodyBenchmark::new(bodies, &mut rng); + let par_start = Instant::now(); + + for _ in 0..ticks { + benchmark.tick_par_bridge(); + } + + let par_time = par_start.elapsed().as_nanos(); + println!("ParBridge time\t: {} ns", par_time); Some(par_time) } else { @@ -107,7 +145,7 @@ fn run_benchmarks(mode: Option, bodies: usize, ticks: usize) { } let par_time = par_start.elapsed().as_nanos(); - println!("ParReduce time : {} ns", par_time); + println!("ParReduce time\t: {} ns", par_time); Some(par_time) } else { @@ -124,7 +162,7 @@ fn run_benchmarks(mode: Option, bodies: usize, ticks: usize) { } let seq_time = seq_start.elapsed().as_nanos(); - println!("Sequential time : {} ns", seq_time); + println!("Sequential time\t: {} ns", seq_time); Some(seq_time) } else { @@ -132,10 +170,18 @@ fn run_benchmarks(mode: Option, bodies: usize, ticks: usize) { }; if let (Some(pt), Some(st)) = (par_time, seq_time) { - println!("Parallel speedup : {}", (st as f32) / (pt as f32)); + println!("Parallel speedup\t: {}", (st as f32) / (pt as f32)); + } + + if let (Some(pt), Some(st)) = (par_schedule_time, seq_time) { + println!("ParSched speedup\t: {}", (st as f32) / (pt as f32)); + } + + if let (Some(pt), Some(st)) = (par_bridge_time, seq_time) { + println!("ParBridge speedup\t: {}", (st as f32) / (pt as f32)); } if let (Some(pt), Some(st)) = (par_reduce_time, seq_time) { - println!("ParReduce speedup: {}", (st as f32) / (pt as f32)); + println!("ParReduce speedup\t: {}", (st as f32) / (pt as f32)); } } diff --git a/rayon-demo/src/nbody/nbody.rs b/rayon-demo/src/nbody/nbody.rs index 6bdcc2a91..80c912065 100644 --- a/rayon-demo/src/nbody/nbody.rs +++ b/rayon-demo/src/nbody/nbody.rs @@ -116,7 +116,33 @@ impl NBodyBenchmark { out_bodies } - #[cfg(test)] + pub fn tick_par_schedule(&mut self) -> &[Body] { + let (in_bodies, out_bodies) = if (self.time & 1) == 0 { + (&self.bodies.0, &mut self.bodies.1) + } else { + (&self.bodies.1, &mut self.bodies.0) + }; + + let time = self.time; + use rayon::iter::StaticScheduler; + out_bodies + .par_iter_mut() + .zip(&in_bodies[..]) + .with_scheduler(StaticScheduler::new()) + .for_each(|(out, prev)| { + let (vel, vel2) = next_velocity(time, prev, in_bodies); + out.velocity = vel; + out.velocity2 = vel2; + + let next_velocity = vel - vel2; + out.position = prev.position + next_velocity; + }); + + self.time += 1; + + out_bodies + } + pub fn tick_par_bridge(&mut self) -> &[Body] { let (in_bodies, out_bodies) = if (self.time & 1) == 0 { (&self.bodies.0, &mut self.bodies.1) diff --git a/rayon-demo/src/nbody/visualize.rs b/rayon-demo/src/nbody/visualize.rs index c1df8917b..3c43b4c56 100644 --- a/rayon-demo/src/nbody/visualize.rs +++ b/rayon-demo/src/nbody/visualize.rs @@ -141,6 +141,8 @@ pub fn visualize_benchmarks(num_bodies: usize, mut mode: ExecutionMode) { Event::MainEventsCleared => { let bodies = match mode { ExecutionMode::Par => benchmark.tick_par(), + ExecutionMode::ParSched => benchmark.tick_par_schedule(), + ExecutionMode::ParBridge => benchmark.tick_par_bridge(), ExecutionMode::ParReduce => benchmark.tick_par_reduce(), ExecutionMode::Seq => benchmark.tick_seq(), }; From 824dd91c150e1868aa706c57c11498bede2ce9cf Mon Sep 17 00:00:00 2001 From: xiaoniu Date: Fri, 6 Mar 2020 16:38:56 -0500 Subject: [PATCH 7/7] add collect benchmarks --- rayon-demo/src/vec_collect.rs | 37 +++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/rayon-demo/src/vec_collect.rs b/rayon-demo/src/vec_collect.rs index ba7b1f469..95517d2f2 100644 --- a/rayon-demo/src/vec_collect.rs +++ b/rayon-demo/src/vec_collect.rs @@ -227,3 +227,40 @@ mod vec_i_filtered { make_bench!(generate, check); } + +/// Tests a big vector of i forall i in 0 to N, with static scheduler. +mod vec_i_schedule { + use rayon::prelude::*; + + const N: u32 = 4 * 1024 * 1024; + + fn generate() -> impl IndexedParallelIterator { + (0_u32..N) + .into_par_iter() + .with_scheduler(rayon::iter::StaticScheduler::with_chunk_size(4)) + } + + fn check(v: &[u32]) { + assert!(v.iter().cloned().eq(0..N)); + } + + #[bench] + fn with_collect_into_vec(b: &mut ::test::Bencher) { + let mut vec = None; + b.iter(|| { + let mut v = vec![]; + generate().collect_into_vec(&mut v); + vec = Some(v); + }); + check(&vec.unwrap()); + } + + #[bench] + fn with_collect_into_vec_reused(b: &mut ::test::Bencher) { + let mut vec = vec![]; + b.iter(|| generate().collect_into_vec(&mut vec)); + check(&vec); + } + + make_bench!(generate, check); +}