diff --git a/rayon-demo/src/nbody/mod.rs b/rayon-demo/src/nbody/mod.rs index ab46bc9d2..c9b67f1f6 100644 --- a/rayon-demo/src/nbody/mod.rs +++ b/rayon-demo/src/nbody/mod.rs @@ -45,6 +45,8 @@ Ported from the RiverTrail demo found at: #[derive(Copy, Clone, PartialEq, Eq, serde::Deserialize)] pub enum ExecutionMode { Par, + ParSched, + ParBridge, ParReduce, Seq, } @@ -77,6 +79,8 @@ pub fn main(args: &[String]) { fn run_benchmarks(mode: Option, bodies: usize, ticks: usize) { let run_par = mode.map(|m| m == ExecutionMode::Par).unwrap_or(true); + let run_par_schedule = mode.map(|m| m == ExecutionMode::ParSched).unwrap_or(true); + let run_par_bridge = mode.map(|m| m == ExecutionMode::ParBridge).unwrap_or(true); let run_par_reduce = mode.map(|m| m == ExecutionMode::ParReduce).unwrap_or(true); let run_seq = mode.map(|m| m == ExecutionMode::Seq).unwrap_or(true); @@ -90,7 +94,41 @@ fn run_benchmarks(mode: Option, bodies: usize, ticks: usize) { } let par_time = par_start.elapsed().as_nanos(); - println!("Parallel time : {} ns", par_time); + println!("Parallel time\t: {} ns", par_time); + + Some(par_time) + } else { + None + }; + + let par_schedule_time = if run_par_schedule { + let mut rng = crate::seeded_rng(); + let mut benchmark = NBodyBenchmark::new(bodies, &mut rng); + let par_start = Instant::now(); + + for _ in 0..ticks { + benchmark.tick_par_schedule(); + } + + let par_time = par_start.elapsed().as_nanos(); + println!("ParSched time\t: {} ns", par_time); + + Some(par_time) + } else { + None + }; + + let par_bridge_time = if run_par_bridge { + let mut rng = crate::seeded_rng(); + let mut benchmark = NBodyBenchmark::new(bodies, &mut rng); + let par_start = Instant::now(); + + for _ in 0..ticks { + benchmark.tick_par_bridge(); + } + + let par_time = par_start.elapsed().as_nanos(); + println!("ParBridge time\t: {} ns", par_time); Some(par_time) } else { @@ -107,7 +145,7 @@ fn run_benchmarks(mode: Option, bodies: usize, ticks: usize) { } let par_time = par_start.elapsed().as_nanos(); - println!("ParReduce time : {} ns", par_time); + println!("ParReduce time\t: {} ns", par_time); Some(par_time) } else { @@ -124,7 +162,7 @@ fn run_benchmarks(mode: Option, bodies: usize, ticks: usize) { } let seq_time = seq_start.elapsed().as_nanos(); - println!("Sequential time : {} ns", seq_time); + println!("Sequential time\t: {} ns", seq_time); Some(seq_time) } else { @@ -132,10 +170,18 @@ fn run_benchmarks(mode: Option, bodies: usize, ticks: usize) { }; if let (Some(pt), Some(st)) = (par_time, seq_time) { - println!("Parallel speedup : {}", (st as f32) / (pt as f32)); + println!("Parallel speedup\t: {}", (st as f32) / (pt as f32)); + } + + if let (Some(pt), Some(st)) = (par_schedule_time, seq_time) { + println!("ParSched speedup\t: {}", (st as f32) / (pt as f32)); + } + + if let (Some(pt), Some(st)) = (par_bridge_time, seq_time) { + println!("ParBridge speedup\t: {}", (st as f32) / (pt as f32)); } if let (Some(pt), Some(st)) = (par_reduce_time, seq_time) { - println!("ParReduce speedup: {}", (st as f32) / (pt as f32)); + println!("ParReduce speedup\t: {}", (st as f32) / (pt as f32)); } } diff --git a/rayon-demo/src/nbody/nbody.rs b/rayon-demo/src/nbody/nbody.rs index 6bdcc2a91..80c912065 100644 --- a/rayon-demo/src/nbody/nbody.rs +++ b/rayon-demo/src/nbody/nbody.rs @@ -116,7 +116,33 @@ impl NBodyBenchmark { out_bodies } - #[cfg(test)] + pub fn tick_par_schedule(&mut self) -> &[Body] { + let (in_bodies, out_bodies) = if (self.time & 1) == 0 { + (&self.bodies.0, &mut self.bodies.1) + } else { + (&self.bodies.1, &mut self.bodies.0) + }; + + let time = self.time; + use rayon::iter::StaticScheduler; + out_bodies + .par_iter_mut() + .zip(&in_bodies[..]) + .with_scheduler(StaticScheduler::new()) + .for_each(|(out, prev)| { + let (vel, vel2) = next_velocity(time, prev, in_bodies); + out.velocity = vel; + out.velocity2 = vel2; + + let next_velocity = vel - vel2; + out.position = prev.position + next_velocity; + }); + + self.time += 1; + + out_bodies + } + pub fn tick_par_bridge(&mut self) -> &[Body] { let (in_bodies, out_bodies) = if (self.time & 1) == 0 { (&self.bodies.0, &mut self.bodies.1) diff --git a/rayon-demo/src/nbody/visualize.rs b/rayon-demo/src/nbody/visualize.rs index c1df8917b..3c43b4c56 100644 --- a/rayon-demo/src/nbody/visualize.rs +++ b/rayon-demo/src/nbody/visualize.rs @@ -141,6 +141,8 @@ pub fn visualize_benchmarks(num_bodies: usize, mut mode: ExecutionMode) { Event::MainEventsCleared => { let bodies = match mode { ExecutionMode::Par => benchmark.tick_par(), + ExecutionMode::ParSched => benchmark.tick_par_schedule(), + ExecutionMode::ParBridge => benchmark.tick_par_bridge(), ExecutionMode::ParReduce => benchmark.tick_par_reduce(), ExecutionMode::Seq => benchmark.tick_seq(), }; diff --git a/rayon-demo/src/vec_collect.rs b/rayon-demo/src/vec_collect.rs index ba7b1f469..95517d2f2 100644 --- a/rayon-demo/src/vec_collect.rs +++ b/rayon-demo/src/vec_collect.rs @@ -227,3 +227,40 @@ mod vec_i_filtered { make_bench!(generate, check); } + +/// Tests a big vector of i forall i in 0 to N, with static scheduler. +mod vec_i_schedule { + use rayon::prelude::*; + + const N: u32 = 4 * 1024 * 1024; + + fn generate() -> impl IndexedParallelIterator { + (0_u32..N) + .into_par_iter() + .with_scheduler(rayon::iter::StaticScheduler::with_chunk_size(4)) + } + + fn check(v: &[u32]) { + assert!(v.iter().cloned().eq(0..N)); + } + + #[bench] + fn with_collect_into_vec(b: &mut ::test::Bencher) { + let mut vec = None; + b.iter(|| { + let mut v = vec![]; + generate().collect_into_vec(&mut v); + vec = Some(v); + }); + check(&vec.unwrap()); + } + + #[bench] + fn with_collect_into_vec_reused(b: &mut ::test::Bencher) { + let mut vec = vec![]; + b.iter(|| generate().collect_into_vec(&mut vec)); + check(&vec); + } + + make_bench!(generate, check); +} diff --git a/src/iter/mod.rs b/src/iter/mod.rs index d571d3729..60885719f 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -100,6 +100,10 @@ use std::ops::Fn; mod par_bridge; pub use self::par_bridge::{IterBridge, ParallelBridge}; +pub mod scheduler; +pub use self::scheduler::{Scheduler, UnindexedScheduler, WithScheduler, WithUnindexedScheduler}; +pub use self::scheduler::misc::*; + mod chain; mod find; mod find_first_last; @@ -349,6 +353,11 @@ pub trait ParallelIterator: Sized + Send { /// [`for_each`]: #method.for_each type Item: Send; + /// Sets the "scheduler" that this thread-pool will use. + fn with_unindexed_scheduler(self, scheduler: S) -> WithUnindexedScheduler { + WithUnindexedScheduler::new(self, scheduler) + } + /// Executes `OP` on each item produced by the iterator, in parallel. /// /// # Examples @@ -2085,6 +2094,11 @@ impl IntoParallelIterator for T { /// /// **Note:** Not implemented for `u64`, `i64`, `u128`, or `i128` ranges pub trait IndexedParallelIterator: ParallelIterator { + /// Sets the "scheduler" that this thread-pool will use. + fn with_scheduler(self, scheduler: S) -> WithScheduler { + WithScheduler::new(self, scheduler) + } + /// Collects the results of the iterator into the specified /// vector. The vector is always truncated before execution /// begins. If possible, reusing the vector across calls can lead diff --git a/src/iter/scheduler/misc.rs b/src/iter/scheduler/misc.rs new file mode 100644 index 000000000..ecdbf4b7e --- /dev/null +++ b/src/iter/scheduler/misc.rs @@ -0,0 +1,164 @@ +//! This module contains useful schedulers. +use super::*; + +/// Default Scheduler. +/// When used as Indexed Scheduler, Thief-splitting will be used. +/// When used as Unindexed Scheduler, tasks will be divided to minimum piece. +#[derive(Debug, Clone, Default)] +pub struct DefaultScheduler; + +impl Scheduler for DefaultScheduler { + fn bridge(&mut self, len: usize, producer: P, consumer: C) -> C::Result + where + P: Producer, + C: Consumer, + { + bridge_producer_consumer(len, producer, consumer) + } +} + +impl UnindexedScheduler for DefaultScheduler { + fn bridge_unindexed(&mut self, producer: P, consumer: C) -> C::Result + where + P: UnindexedProducer, + C: UnindexedConsumer, + { + bridge_unindexed(producer, consumer) + } +} + +/// Dummy Sequential Scheduler. +/// No parallel is used at all. +#[derive(Debug, Clone, Default)] +pub struct SequentialScheduler; + +impl Scheduler for SequentialScheduler { + fn bridge(&mut self, _len: usize, producer: P, consumer: C) -> C::Result + where + P: Producer, + C: Consumer, + { + producer.fold_with(consumer.into_folder()).complete() + } +} + +impl UnindexedScheduler for SequentialScheduler { + fn bridge_unindexed(&mut self, producer: P, consumer: C) -> C::Result + where + P: UnindexedProducer, + C: UnindexedConsumer, + { + producer.fold_with(consumer.into_folder()).complete() + } +} + +fn static_partition_bridge(positions: &[usize], producer: P, consumer: C) -> C::Result +where + P: Producer, + C: Consumer, +{ + fn helper(positions: &[usize], bias: usize, producer: P, consumer: C) -> C::Result + where + P: Producer, + C: Consumer, + { + if consumer.full() { + consumer.into_folder().complete() + } else if positions.len() > 0 { + let mid_index = positions.len() / 2; + let position = positions[mid_index]; + + let (left_producer, right_producer) = producer.split_at(position - bias); + let (left_consumer, right_consumer, reducer) = consumer.split_at(position - bias); + + use crate::join; + let (left_result, right_result) = join( + || helper(&positions[0..mid_index], bias, left_producer, left_consumer), + || { + helper( + &positions[mid_index + 1..], + position, + right_producer, + right_consumer, + ) + }, + ); + reducer.reduce(left_result, right_result) + } else { + producer.fold_with(consumer.into_folder()).complete() + } + } + helper(positions, 0, producer, consumer) +} + +/// Fixed length scheduler. +/// Every tasks assigned to a thread will contain a fixed number of items, +/// except for the last task which will possibly contain less. +/// The parameter in `with_min_len` and `with_max_len` will be ignored. +#[derive(Debug, Clone, Default)] +pub struct FixedLengthScheduler { + fixed_length: usize, +} + +impl FixedLengthScheduler { + /// Create fixed length scheduler with assigned length. Length must be greater than or equal to 1. + pub fn new(fixed_length: usize) -> Self { + if fixed_length == 0 { + panic!("Length must be greater than or equal to 1.") + }; + Self { fixed_length } + } +} + +impl Scheduler for FixedLengthScheduler { + fn bridge(&mut self, len: usize, producer: P, consumer: C) -> C::Result + where + P: Producer, + C: Consumer, + { + let positions: Vec<_> = (0..len).step_by(self.fixed_length).skip(1).collect(); + static_partition_bridge(&positions, producer, consumer) + } +} + +/// Static split scheduler. +/// Given a chunk size, this scheduler will divide all items evenly based on their +/// length to create `current_num_threads()` number of tasks. +/// The length of each task should be multiple of the chunk size, except for the last task. +#[derive(Debug, Clone, Default)] +pub struct StaticScheduler { + chunk_size: usize, +} + +impl StaticScheduler { + /// Create static split scheduler with default chunk size 1. + pub fn new() -> Self { + Self { chunk_size: 1 } + } + /// Create static split scheduler with assigned chunk size. Chunk size must be greater than or equal to 1. + pub fn with_chunk_size(chunk_size: usize) -> Self { + if chunk_size == 0 { + panic!("Chunk size must be greater than or equal to 1.") + }; + Self { + chunk_size: chunk_size, + } + } +} + +impl Scheduler for StaticScheduler { + fn bridge(&mut self, len: usize, producer: P, consumer: C) -> C::Result + where + P: Producer, + C: Consumer, + { + use crate::current_num_threads; + let num_threads = current_num_threads(); + let full_chunks = len / self.chunk_size; + let positions: Vec<_> = (1..num_threads) + .map(|i| (i * full_chunks) / num_threads * self.chunk_size) + .collect(); + + static_partition_bridge(&positions, producer, consumer) + } +} diff --git a/src/iter/scheduler/mod.rs b/src/iter/scheduler/mod.rs new file mode 100644 index 000000000..3cdf83e90 --- /dev/null +++ b/src/iter/scheduler/mod.rs @@ -0,0 +1,138 @@ +//! This module is dedicated to custom scheduler API and useful schedulers. + +use super::*; + +pub mod misc; + +pub use misc::*; + +/// Scheduler for Indexed Parallel Iterator +pub trait Scheduler: Send { + /// Consume one Indexed Producer and one Indexed Consumer. Split the work acordingly. + fn bridge(&mut self, len: usize, producer: P, consumer: C) -> C::Result + where + P: Producer, + C: Consumer; +} + +/// Scheduler for Unindexed Parallel Iterator +pub trait UnindexedScheduler: Send { + /// Consume one Unindexed Producer and one Unindexed Consumer. Split the work acordingly. + fn bridge_unindexed(&mut self, producer: P, consumer: C) -> C::Result + where + P: UnindexedProducer, + C: UnindexedConsumer; +} + +/// `WithScheduler` is an iterator that enclose one Indexed Scheduler. +#[derive(Debug)] +pub struct WithScheduler { + base: I, + scheduler: S, +} + +/// `WithUnindexedScheduler` is an iterator that enclose one Unindexed Scheduler. +#[derive(Debug)] +pub struct WithUnindexedScheduler { + base: I, + scheduler: S, +} + +impl WithScheduler { + /// Create a new `WithUnindexedScheduler` iterator. + pub(super) fn new(base: I, scheduler: S) -> Self { + WithScheduler { base, scheduler } + } +} + +impl WithUnindexedScheduler { + /// Create a new `WithUnindexedScheduler` iterator. + pub(super) fn new(base: I, scheduler: S) -> Self { + WithUnindexedScheduler { base, scheduler } + } +} + +macro_rules! no_link { + ($l:literal) => { + { + extern "C" { + #[link_name = $l] + fn trigger() -> !; + } + unsafe { trigger() } + } + }; +} + +impl ParallelIterator for WithScheduler { + type Item = I::Item; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + IndexedParallelIterator::drive(self, consumer) + } + + fn opt_len(&self) -> Option { + self.base.opt_len() + } +} + +impl IndexedParallelIterator for WithScheduler { + fn drive(self, consumer: C) -> C::Result + where + C: Consumer, + { + let len = self.base.len(); + return self.base.with_producer(Callback { + len: len, + scheduler: self.scheduler, + consumer: consumer, + }); + + struct Callback { + len: usize, + scheduler: S, + consumer: C, + } + + impl ProducerCallback for Callback + where + S: Scheduler, + C: Consumer, + { + type Output = C::Result; + fn callback

(mut self, producer: P) -> C::Result + where + P: Producer, + { + self.scheduler.bridge(self.len, producer, self.consumer) + } + } + } + fn len(&self) -> usize { + self.base.len() + } + fn with_producer(self, _callback: CB) -> CB::Output + where + CB: ProducerCallback, + { + no_link!( "\n\nERROR[rayon]: After `with_scheduler`, the iterator should not be used as producer.\nFor example, `Zip` works in producer mode, so `with_scheduler` should not happen before any `zip`.\nFor more information about producer and customer mode, please refer to https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md\n") + } +} + +impl ParallelIterator for WithUnindexedScheduler { + type Item = I::Item; + + fn drive_unindexed(self, _consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + todo!() + } + + fn opt_len(&self) -> Option { + self.base.opt_len() + } +} diff --git a/src/iter/test.rs b/src/iter/test.rs index bc5106bec..88ee25881 100644 --- a/src/iter/test.rs +++ b/src/iter/test.rs @@ -2186,3 +2186,76 @@ fn check_update() { assert_eq!(v, vec![vec![1, 0], vec![3, 2, 1, 0]]); } + +#[test] +fn check_schedule() { + let v = vec![1, 2, 3]; + + let r: i32 = v + .par_iter() + .with_scheduler(DefaultScheduler) + .map(|v| v * v) + .flat_map(|v| vec![1; v as usize]) + .sum(); + assert_eq!(r, 14); + + let r: i32 = v + .par_iter() + .map(|v| v * v) + .with_scheduler(DefaultScheduler) + .flat_map(|v| vec![1; v as usize]) + .sum(); + assert_eq!(r, 14); + + let r: i32 = v + .par_iter() + .zip(v.par_iter()) + .with_scheduler(DefaultScheduler) + .map(|(a, b)| a * b) + .sum(); + assert_eq!(r, 14); + + // should not compile + // let r: i32 = v + // .par_iter() + // .zip(v.par_iter()) + // .with_scheduler(DefaultScheduler) + // .map(|(a, b)| a * b) + // .sum(); +} + +#[test] +fn check_schedule_sequential() { + let v: Vec<_> = (1..100).collect(); + let r: i32 = v + .par_iter() + .with_scheduler(SequentialScheduler) + .map(|v| v * v) + .flat_map(|v| vec![1; v as usize]) + .sum(); + assert_eq!(r, 328350); +} + +#[test] +fn check_schedule_fixed_length() { + let v: Vec<_> = (1..100).collect(); + let r: i32 = v + .par_iter() + .with_scheduler(FixedLengthScheduler::new(1)) + .map(|v| v * v) + .flat_map(|v| vec![1; v as usize]) + .sum(); + assert_eq!(r, 328350); +} + +#[test] +fn check_schedule_static() { + let v: Vec<_> = (1..100).collect(); + let r: i32 = v + .par_iter() + .with_scheduler(StaticScheduler::with_chunk_size(1)) + .map(|v| v * v) + .flat_map(|v| vec![1; v as usize]) + .sum(); + assert_eq!(r, 328350); +}