@@ -2,6 +2,7 @@ use std::{borrow::Cow, future::Future, mem::replace, panic, pin::Pin};
22
33use anyhow:: { anyhow, Result } ;
44use auto_hash_map:: AutoSet ;
5+ use futures:: { StreamExt , TryStreamExt } ;
56use parking_lot:: Mutex ;
67use rustc_hash:: FxHashSet ;
78use tracing:: { Instrument , Span } ;
@@ -17,6 +18,8 @@ use crate::{
1718 CollectiblesSource , NonLocalValue , ReadRef , ResolvedVc , TryJoinIterExt ,
1819} ;
1920
21+ const APPLY_EFFECTS_CONCURRENCY_LIMIT : usize = 1024 ;
22+
2023/// A trait to emit a task effect as collectible. This trait only has one
2124/// implementation, `EffectInstance` and no other implementation is allowed.
2225/// The trait is private to this module so that no other implementation can be
@@ -168,17 +171,16 @@ pub async fn apply_effects(source: impl CollectiblesSource) -> Result<()> {
168171 }
169172 let span = tracing:: info_span!( "apply effects" , count = effects. len( ) ) ;
170173 async move {
171- effects
172- . into_iter ( )
173- . map ( async |effect| {
174+ // Limit the concurrency of effects
175+ futures:: stream:: iter ( effects)
176+ . map ( Ok )
177+ . try_for_each_concurrent ( APPLY_EFFECTS_CONCURRENCY_LIMIT , async |effect| {
174178 let Some ( effect) = ResolvedVc :: try_downcast_type :: < EffectInstance > ( effect) else {
175179 panic ! ( "Effect must only be implemented by EffectInstance" ) ;
176180 } ;
177181 effect. await ?. apply ( ) . await
178182 } )
179- . try_join ( )
180- . await ?;
181- Ok ( ( ) )
183+ . await
182184 }
183185 . instrument ( span)
184186 . await
@@ -251,12 +253,13 @@ impl Effects {
251253 pub async fn apply ( & self ) -> Result < ( ) > {
252254 let span = tracing:: info_span!( "apply effects" , count = self . effects. len( ) ) ;
253255 async move {
254- self . effects
255- . iter ( )
256- . map ( async |effect| effect. apply ( ) . await )
257- . try_join ( )
258- . await ?;
259- Ok ( ( ) )
256+ // Limit the concurrency of effects
257+ futures:: stream:: iter ( self . effects . iter ( ) )
258+ . map ( Ok )
259+ . try_for_each_concurrent ( APPLY_EFFECTS_CONCURRENCY_LIMIT , async |effect| {
260+ effect. apply ( ) . await
261+ } )
262+ . await
260263 }
261264 . instrument ( span)
262265 . await
0 commit comments