@@ -5,7 +5,7 @@ use alloy::{
55 primitives:: { B256 , U256 } ,
66 providers:: Provider ,
77} ;
8- use init4_bin_base:: deps:: tracing:: { self , Instrument , debug, error, info_span} ;
8+ use init4_bin_base:: deps:: tracing:: { Instrument , debug, error, info_span} ;
99use tokio:: { sync:: watch, task:: JoinHandle } ;
1010use tokio_stream:: StreamExt ;
1111use trevm:: revm:: { context:: BlockEnv , context_interface:: block:: BlobExcessGasAndPrice } ;
@@ -58,35 +58,25 @@ impl EnvTask {
5858 async fn task_fut ( self , sender : watch:: Sender < Option < SimEnv > > ) {
5959 let span = info_span ! ( "EnvTask::task_fut::init" ) ;
6060
61- let mut blocks = match self . ru_provider . subscribe_blocks ( ) . await {
61+ let mut headers = match self . ru_provider . subscribe_blocks ( ) . await {
6262 Ok ( poller) => poller,
6363 Err ( err) => {
64- let _span = span. enter ( ) ;
65- error ! ( %err, "Failed to subscribe to blocks" ) ;
64+ span. in_scope ( || {
65+ error ! ( %err, "Failed to subscribe to blocks" ) ;
66+ } ) ;
6667 return ;
6768 }
6869 }
6970 . into_stream ( ) ;
7071
71- while let Some ( block) =
72- blocks. next ( ) . instrument ( info_span ! ( "EnvTask::task_fut::stream" ) ) . await
72+ drop ( span) ;
73+
74+ while let Some ( rollup_header) =
75+ headers. next ( ) . instrument ( info_span ! ( "EnvTask::task_fut::stream" ) ) . await
7376 {
7477 let span =
75- info_span ! ( "EnvTask::task_fut::loop" , %block . hash, number = tracing :: field :: Empty ) ;
78+ info_span ! ( "EnvTask::task_fut::loop" , %rollup_header . hash, %rollup_header . number) ;
7679
77- // Get the rollup header for rollup block simulation environment configuration
78- let rollup_header = match self
79- . get_latest_rollup_header ( & sender, & block. hash , & span)
80- . await
81- {
82- Some ( value) => value,
83- None => {
84- // If we failed to get the rollup header, we skip this iteration.
85- debug ! ( %block. hash, "failed to get rollup header - continuing to next block" ) ;
86- continue ;
87- }
88- } ;
89- debug ! ( rollup_header. number, "pulled rollup block for simulation" ) ;
9080 span. record ( "rollup_block_number" , rollup_header. number ) ;
9181
9282 // Construct the block env using the previous block header
@@ -98,7 +88,7 @@ impl EnvTask {
9888 ) ;
9989
10090 if sender
101- . send ( Some ( SimEnv { block_env : signet_env, prev_header : rollup_header } ) )
91+ . send ( Some ( SimEnv { block_env : signet_env, prev_header : rollup_header. inner } ) )
10292 . is_err ( )
10393 {
10494 // The receiver has been dropped, so we can stop the task.
@@ -108,40 +98,6 @@ impl EnvTask {
10898 }
10999 }
110100
111- /// Get latest rollup [`Header`] for the given block hash.
112- async fn get_latest_rollup_header (
113- & self ,
114- sender : & watch:: Sender < Option < SimEnv > > ,
115- block : & alloy:: primitives:: FixedBytes < 32 > ,
116- span : & tracing:: Span ,
117- ) -> Option < Header > {
118- let previous = match self
119- . ru_provider
120- . get_block ( ( * block) . into ( ) )
121- . into_future ( )
122- . instrument ( span. clone ( ) )
123- . await
124- {
125- Ok ( Some ( block) ) => block. header . inner ,
126- Ok ( None ) => {
127- let _span = span. enter ( ) ;
128- let _ = sender. send ( None ) ;
129- debug ! ( "rollup block not found" ) ;
130- // This may mean the chain had a rollback, so the next poll
131- // should find something.
132- return None ;
133- }
134- Err ( err) => {
135- let _span = span. enter ( ) ;
136- let _ = sender. send ( None ) ;
137- error ! ( %err, "Failed to get latest block" ) ;
138- // Error may be transient, so we should not break the loop.
139- return None ;
140- }
141- } ;
142- Some ( previous)
143- }
144-
145101 /// Spawn the task and return a watch::Receiver for the BlockEnv.
146102 pub fn spawn ( self ) -> ( watch:: Receiver < Option < SimEnv > > , JoinHandle < ( ) > ) {
147103 let ( sender, receiver) = watch:: channel ( None ) ;
0 commit comments