@@ -7,7 +7,6 @@ use std::any::Any;
77use std:: panic:: { AssertUnwindSafe , catch_unwind, resume_unwind} ;
88
99use parking_lot:: Mutex ;
10- use rayon:: iter:: { FromParallelIterator , IntoParallelIterator , ParallelIterator } ;
1110
1211use crate :: FatalErrorMarker ;
1312use crate :: sync:: { DynSend , DynSync , FromDyn , IntoDynSyncSend , mode} ;
@@ -97,11 +96,11 @@ macro_rules! parallel {
9796// This function only works when `mode::is_dyn_thread_safe()`.
9897pub fn scope < ' scope , OP , R > ( op : OP ) -> R
9998where
100- OP : FnOnce ( & rayon :: Scope < ' scope > ) -> R + DynSend ,
99+ OP : FnOnce ( & rayon_core :: Scope < ' scope > ) -> R + DynSend ,
101100 R : DynSend ,
102101{
103102 let op = FromDyn :: from ( op) ;
104- rayon :: scope ( |s| FromDyn :: from ( op. into_inner ( ) ( s) ) ) . into_inner ( )
103+ rayon_core :: scope ( |s| FromDyn :: from ( op. into_inner ( ) ( s) ) ) . into_inner ( )
105104}
106105
107106#[ inline]
@@ -114,7 +113,7 @@ where
114113 let oper_a = FromDyn :: from ( oper_a) ;
115114 let oper_b = FromDyn :: from ( oper_b) ;
116115 let ( a, b) = parallel_guard ( |guard| {
117- rayon :: join (
116+ rayon_core :: join (
118117 move || guard. run ( move || FromDyn :: from ( oper_a. into_inner ( ) ( ) ) ) ,
119118 move || guard. run ( move || FromDyn :: from ( oper_b. into_inner ( ) ( ) ) ) ,
120119 )
@@ -125,56 +124,103 @@ where
125124 }
126125}
127126
128- pub fn par_for_each_in < I , T : IntoIterator < Item = I > + IntoParallelIterator < Item = I > > (
127+ fn par_slice < I : DynSend > (
128+ items : & mut [ I ] ,
129+ guard : & ParallelGuard ,
130+ for_each : impl Fn ( & mut I ) + DynSync + DynSend ,
131+ ) {
132+ struct State < ' a , F > {
133+ for_each : FromDyn < F > ,
134+ guard : & ' a ParallelGuard ,
135+ group : usize ,
136+ }
137+
138+ fn par_rec < I : DynSend , F : Fn ( & mut I ) + DynSync + DynSend > (
139+ items : & mut [ I ] ,
140+ state : & State < ' _ , F > ,
141+ ) {
142+ if items. len ( ) <= state. group {
143+ for item in items {
144+ state. guard . run ( || ( state. for_each ) ( item) ) ;
145+ }
146+ } else {
147+ let ( left, right) = items. split_at_mut ( items. len ( ) / 2 ) ;
148+ let mut left = state. for_each . derive ( left) ;
149+ let mut right = state. for_each . derive ( right) ;
150+ rayon_core:: join ( move || par_rec ( * left, state) , move || par_rec ( * right, state) ) ;
151+ }
152+ }
153+
154+ let state = State {
155+ for_each : FromDyn :: from ( for_each) ,
156+ guard,
157+ group : std:: cmp:: max ( items. len ( ) / 128 , 1 ) ,
158+ } ;
159+ par_rec ( items, & state)
160+ }
161+
162+ pub fn par_for_each_in < I : DynSend , T : IntoIterator < Item = I > > (
129163 t : T ,
130- for_each : impl Fn ( I ) + DynSync + DynSend ,
164+ for_each : impl Fn ( & I ) + DynSync + DynSend ,
131165) {
132166 parallel_guard ( |guard| {
133167 if mode:: is_dyn_thread_safe ( ) {
134- let for_each = FromDyn :: from ( for_each) ;
135- t. into_par_iter ( ) . for_each ( |i| {
136- guard. run ( || for_each ( i) ) ;
137- } ) ;
168+ let mut items: Vec < _ > = t. into_iter ( ) . collect ( ) ;
169+ par_slice ( & mut items, guard, |i| for_each ( & * i) )
138170 } else {
139171 t. into_iter ( ) . for_each ( |i| {
140- guard. run ( || for_each ( i) ) ;
172+ guard. run ( || for_each ( & i) ) ;
141173 } ) ;
142174 }
143175 } ) ;
144176}
145177
146- pub fn try_par_for_each_in <
147- T : IntoIterator + IntoParallelIterator < Item = <T as IntoIterator >:: Item > ,
148- E : Send ,
149- > (
178+ /// This runs `for_each` in parallel for each iterator item. If one or more of the
179+ /// `for_each` calls returns `Err`, the function will also return `Err`. The error returned
180+ /// will be non-deterministic, but this is expected to be used with `ErrorGuaranteed` which
181+ /// are all equivalent.
182+ pub fn try_par_for_each_in < T : IntoIterator , E : DynSend > (
150183 t : T ,
151- for_each : impl Fn ( <T as IntoIterator >:: Item ) -> Result < ( ) , E > + DynSync + DynSend ,
152- ) -> Result < ( ) , E > {
184+ for_each : impl Fn ( & <T as IntoIterator >:: Item ) -> Result < ( ) , E > + DynSync + DynSend ,
185+ ) -> Result < ( ) , E >
186+ where
187+ <T as IntoIterator >:: Item : DynSend ,
188+ {
153189 parallel_guard ( |guard| {
154190 if mode:: is_dyn_thread_safe ( ) {
155- let for_each = FromDyn :: from ( for_each) ;
156- t. into_par_iter ( )
157- . filter_map ( |i| guard. run ( || for_each ( i) ) )
158- . reduce ( || Ok ( ( ) ) , Result :: and)
191+ let mut items: Vec < _ > = t. into_iter ( ) . collect ( ) ;
192+
193+ let error = Mutex :: new ( None ) ;
194+
195+ par_slice ( & mut items, guard, |i| {
196+ if let Err ( err) = for_each ( & * i) {
197+ * error. lock ( ) = Some ( err) ;
198+ }
199+ } ) ;
200+
201+ if let Some ( err) = error. into_inner ( ) { Err ( err) } else { Ok ( ( ) ) }
159202 } else {
160- t. into_iter ( ) . filter_map ( |i| guard. run ( || for_each ( i) ) ) . fold ( Ok ( ( ) ) , Result :: and)
203+ t. into_iter ( ) . filter_map ( |i| guard. run ( || for_each ( & i) ) ) . fold ( Ok ( ( ) ) , Result :: and)
161204 }
162205 } )
163206}
164207
165- pub fn par_map <
166- I ,
167- T : IntoIterator < Item = I > + IntoParallelIterator < Item = I > ,
168- R : std:: marker:: Send ,
169- C : FromIterator < R > + FromParallelIterator < R > ,
170- > (
208+ pub fn par_map < I : DynSend , T : IntoIterator < Item = I > , R : DynSend , C : FromIterator < R > > (
171209 t : T ,
172210 map : impl Fn ( I ) -> R + DynSync + DynSend ,
173211) -> C {
174212 parallel_guard ( |guard| {
175213 if mode:: is_dyn_thread_safe ( ) {
176214 let map = FromDyn :: from ( map) ;
177- t. into_par_iter ( ) . filter_map ( |i| guard. run ( || map ( i) ) ) . collect ( )
215+
216+ let mut items: Vec < ( Option < I > , Option < R > ) > =
217+ t. into_iter ( ) . map ( |i| ( Some ( i) , None ) ) . collect ( ) ;
218+
219+ par_slice ( & mut items, guard, |i| {
220+ i. 1 = Some ( map ( i. 0 . take ( ) . unwrap ( ) ) ) ;
221+ } ) ;
222+
223+ items. into_iter ( ) . filter_map ( |i| i. 1 ) . collect ( )
178224 } else {
179225 t. into_iter ( ) . filter_map ( |i| guard. run ( || map ( i) ) ) . collect ( )
180226 }
0 commit comments