@@ -177,92 +177,84 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
177177 {
178178 let _span = tracing:: trace_span!( "save snapshot" , session_id = ?session_id, operations = operations. len( ) ) ;
179179 let mut batch = self . database . write_batch ( ) ?;
180- let mut task_items_result = Ok ( Vec :: new ( ) ) ;
181180
182181 // Start organizing the updates in parallel
183182 match & mut batch {
184183 WriteBatch :: Concurrent ( ref batch, _) => {
185- turbo_tasks:: scope ( |s| {
186- s. spawn ( |_| {
187- let _span = tracing:: trace_span!( "update task meta" ) . entered ( ) ;
188- task_items_result = process_task_data ( snapshots, Some ( batch) ) ;
189- } ) ;
184+ {
185+ let _span = tracing:: trace_span!( "update task data" ) . entered ( ) ;
186+ process_task_data ( snapshots, Some ( batch) ) ?;
187+ }
190188
191- let mut next_task_id =
192- get_next_free_task_id :: <
193- T :: SerialWriteBatch < ' _ > ,
194- T :: ConcurrentWriteBatch < ' _ > ,
195- > ( & mut WriteBatchRef :: concurrent ( batch) ) ?;
189+ let mut next_task_id = get_next_free_task_id :: <
190+ T :: SerialWriteBatch < ' _ > ,
191+ T :: ConcurrentWriteBatch < ' _ > ,
192+ > ( & mut WriteBatchRef :: concurrent ( batch) ) ?;
196193
197- {
198- let _span = tracing:: trace_span!(
199- "update task cache" ,
200- items = task_cache_updates. iter( ) . map( |m| m. len( ) ) . sum:: <usize >( )
201- )
202- . entered ( ) ;
203- let result = task_cache_updates
204- . into_par_iter ( )
205- . with_max_len ( 1 )
206- . map ( |updates| {
207- let mut max_task_id = 0 ;
208-
209- let mut task_type_bytes = Vec :: new ( ) ;
210- for ( task_type, task_id) in updates {
211- let task_id: u32 = * task_id;
212- serialize_task_type ( & task_type, & mut task_type_bytes, task_id) ?;
213-
214- batch
215- . put (
216- KeySpace :: ForwardTaskCache ,
217- WriteBuffer :: Borrowed ( & task_type_bytes) ,
218- WriteBuffer :: Borrowed ( & task_id. to_le_bytes ( ) ) ,
194+ {
195+ let _span = tracing:: trace_span!(
196+ "update task cache" ,
197+ items = task_cache_updates. iter( ) . map( |m| m. len( ) ) . sum:: <usize >( )
198+ )
199+ . entered ( ) ;
200+ let result = task_cache_updates
201+ . into_par_iter ( )
202+ . with_max_len ( 1 )
203+ . map ( |updates| {
204+ let mut max_task_id = 0 ;
205+
206+ let mut task_type_bytes = Vec :: new ( ) ;
207+ for ( task_type, task_id) in updates {
208+ let task_id: u32 = * task_id;
209+ serialize_task_type ( & task_type, & mut task_type_bytes, task_id) ?;
210+
211+ batch
212+ . put (
213+ KeySpace :: ForwardTaskCache ,
214+ WriteBuffer :: Borrowed ( & task_type_bytes) ,
215+ WriteBuffer :: Borrowed ( & task_id. to_le_bytes ( ) ) ,
216+ )
217+ . with_context ( || {
218+ anyhow ! (
219+ "Unable to write task cache {task_type:?} => {task_id}"
219220 )
220- . with_context ( || {
221- anyhow ! (
222- "Unable to write task cache {task_type:?} => \
223- {task_id}"
224- )
225- } ) ?;
226- batch
227- . put (
228- KeySpace :: ReverseTaskCache ,
229- WriteBuffer :: Borrowed ( IntKey :: new ( task_id) . as_ref ( ) ) ,
230- WriteBuffer :: Borrowed ( & task_type_bytes) ,
221+ } ) ?;
222+ batch
223+ . put (
224+ KeySpace :: ReverseTaskCache ,
225+ WriteBuffer :: Borrowed ( IntKey :: new ( task_id) . as_ref ( ) ) ,
226+ WriteBuffer :: Borrowed ( & task_type_bytes) ,
227+ )
228+ . with_context ( || {
229+ anyhow ! (
230+ "Unable to write task cache {task_id} => {task_type:?}"
231231 )
232- . with_context ( || {
233- anyhow ! (
234- "Unable to write task cache {task_id} => \
235- {task_type:?}"
236- )
237- } ) ?;
238- max_task_id = max_task_id. max ( task_id + 1 ) ;
239- }
240-
241- Ok ( max_task_id)
242- } )
243- . reduce (
244- || Ok ( 0 ) ,
245- |a, b| -> anyhow:: Result < _ > {
246- let a_max = a?;
247- let b_max = b?;
248- Ok ( max ( a_max, b_max) )
249- } ,
250- ) ?;
251- next_task_id = next_task_id. max ( result) ;
252- }
253-
254- save_infra :: < T :: SerialWriteBatch < ' _ > , T :: ConcurrentWriteBatch < ' _ > > (
255- & mut WriteBatchRef :: concurrent ( batch) ,
256- next_task_id,
257- session_id,
258- operations,
259- ) ?;
260- anyhow:: Ok ( ( ) )
261- } ) ?;
232+ } ) ?;
233+ max_task_id = max_task_id. max ( task_id + 1 ) ;
234+ }
235+
236+ Ok ( max_task_id)
237+ } )
238+ . reduce (
239+ || Ok ( 0 ) ,
240+ |a, b| -> anyhow:: Result < _ > {
241+ let a_max = a?;
242+ let b_max = b?;
243+ Ok ( max ( a_max, b_max) )
244+ } ,
245+ ) ?;
246+ next_task_id = next_task_id. max ( result) ;
247+ }
262248
263- task_items_result?;
249+ save_infra :: < T :: SerialWriteBatch < ' _ > , T :: ConcurrentWriteBatch < ' _ > > (
250+ & mut WriteBatchRef :: concurrent ( batch) ,
251+ next_task_id,
252+ session_id,
253+ operations,
254+ ) ?;
264255 }
265256 WriteBatch :: Serial ( batch) => {
257+ let mut task_items_result = Ok ( Vec :: new ( ) ) ;
266258 turbo_tasks:: scope ( |s| {
267259 s. spawn ( |_| {
268260 task_items_result =
0 commit comments