@@ -15,10 +15,10 @@ use alloy::{
1515 sol_types:: { SolCall , SolError } ,
1616 transports:: TransportError ,
1717} ;
18- use eyre:: { bail, eyre} ;
18+ use eyre:: { Context , bail, eyre} ;
1919use init4_bin_base:: deps:: {
2020 metrics:: { counter, histogram} ,
21- tracing:: { self , debug, error, info, instrument, trace, warn} ,
21+ tracing:: { self , Instrument , debug, debug_span , error, info, instrument, trace, warn} ,
2222} ;
2323use oauth2:: TokenResponse ;
2424use signet_sim:: BuiltBlock ;
@@ -178,7 +178,10 @@ impl SubmitTask {
178178 "error in transaction submission"
179179 ) ;
180180
181- if e. as_revert_data ( ) == Some ( IncorrectHostBlock :: SELECTOR . into ( ) ) {
181+ if e. as_revert_data ( )
182+ . map ( |data| data. starts_with ( & IncorrectHostBlock :: SELECTOR ) )
183+ . unwrap_or_default ( )
184+ {
182185 return Ok ( ControlFlow :: Retry ) ;
183186 }
184187
@@ -234,15 +237,33 @@ impl SubmitTask {
234237 Ok ( ControlFlow :: Done )
235238 }
236239
237- #[ instrument( skip_all, err) ]
240+ /// Sign with a local signer if available, otherwise ask quincey
241+ /// for a signature (politely).
242+ #[ instrument( skip_all, fields( is_local = self . sequencer_signer. is_some( ) ) ) ]
243+ async fn get_signature ( & self , req : SignRequest ) -> eyre:: Result < SignResponse > {
244+ let sig = if let Some ( signer) = & self . sequencer_signer {
245+ signer. sign_hash ( & req. signing_hash ( ) ) . await ?
246+ } else {
247+ self . sup_quincey ( & req)
248+ . await
249+ . wrap_err ( "failed to get signature from quincey" )
250+ . inspect ( |_| {
251+ counter ! ( "builder.quincey_signature_acquired" ) . increment ( 1 ) ;
252+ } ) ?
253+ . sig
254+ } ;
255+
256+ debug ! ( sig = hex:: encode( sig. as_bytes( ) ) , "acquired signature" ) ;
257+ Ok ( SignResponse { req, sig } )
258+ }
259+
260+ #[ instrument( skip_all) ]
238261 async fn handle_inbound ( & self , block : & BuiltBlock ) -> eyre:: Result < ControlFlow > {
239262 info ! ( txns = block. tx_count( ) , "handling inbound block" ) ;
240- let sig_request = match self . construct_sig_request ( block) . await {
241- Ok ( sig_request) => sig_request,
242- Err ( e) => {
243- error ! ( error = %e, "error constructing signature request" ) ;
244- return Ok ( ControlFlow :: Skip ) ;
245- }
263+ let Ok ( sig_request) = self . construct_sig_request ( block) . await . inspect_err ( |e| {
264+ error ! ( error = %e, "error constructing signature request" ) ;
265+ } ) else {
266+ return Ok ( ControlFlow :: Skip ) ;
246267 } ;
247268
248269 debug ! (
@@ -251,76 +272,77 @@ impl SubmitTask {
251272 "constructed signature request for host block"
252273 ) ;
253274
254- // If configured with a local signer, we use it. Otherwise, we ask
255- // quincey (politely)
256- let signed = if let Some ( signer) = & self . sequencer_signer {
257- let sig = signer. sign_hash ( & sig_request. signing_hash ( ) ) . await ?;
258- debug ! ( sig = hex:: encode( sig. as_bytes( ) ) , "acquired signature from local signer" ) ;
259- SignResponse { req : sig_request, sig }
260- } else {
261- let resp: SignResponse = match self . sup_quincey ( & sig_request) . await {
262- Ok ( resp) => resp,
263- Err ( e) => {
264- error ! ( error = %e, "error acquiring signature from quincey" ) ;
265- return Ok ( ControlFlow :: Retry ) ;
266- }
267- } ;
268- debug ! ( sig = hex:: encode( resp. sig. as_bytes( ) ) , "acquired signature from quincey" ) ;
269- counter ! ( "builder.quincey_signature_acquired" ) . increment ( 1 ) ;
270- resp
271- } ;
275+ let signed = self . get_signature ( sig_request) . await ?;
272276
273277 self . submit_transaction ( & signed, block) . await
274278 }
275279
276- /// Spawns the in progress block building task
277- pub fn spawn ( self ) -> ( mpsc:: UnboundedSender < BuiltBlock > , JoinHandle < ( ) > ) {
278- let ( sender, mut inbound) = mpsc:: unbounded_channel ( ) ;
279- let handle = tokio:: spawn ( async move {
280- loop {
281- if let Some ( in_progress) = inbound. recv ( ) . await {
282- let building_start_time = Instant :: now ( ) ;
283- let mut retries = 0 ;
284- loop {
285- match self . handle_inbound ( & in_progress) . await {
286- Ok ( ControlFlow :: Retry ) => {
287- retries += 1 ;
288- if retries > 3 {
289- counter ! ( "builder.building_too_many_retries" ) . increment ( 1 ) ;
290- histogram ! ( "builder.block_build_time" )
291- . record ( building_start_time. elapsed ( ) . as_millis ( ) as f64 ) ;
292- error ! ( "error handling inbound block: too many retries" ) ;
293- break ;
294- }
295- error ! ( "error handling inbound block: retrying" ) ;
296- tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 2 ) ) . await ;
297- }
298- Ok ( ControlFlow :: Skip ) => {
299- histogram ! ( "builder.block_build_time" )
300- . record ( building_start_time. elapsed ( ) . as_millis ( ) as f64 ) ;
301- counter ! ( "builder.skipped_blocks" ) . increment ( 1 ) ;
302- info ! ( "skipping block" ) ;
303- break ;
304- }
305- Ok ( ControlFlow :: Done ) => {
306- histogram ! ( "builder.block_build_time" )
307- . record ( building_start_time. elapsed ( ) . as_millis ( ) as f64 ) ;
308- counter ! ( "builder.submitted_successful_blocks" ) . increment ( 1 ) ;
309- info ! ( "block landed successfully" ) ;
310- break ;
311- }
312- Err ( e) => {
313- error ! ( error = %e, "error handling inbound block" ) ;
314- break ;
315- }
316- }
280+ async fn retrying_handle_inbound (
281+ & self ,
282+ block : & BuiltBlock ,
283+ retry_limit : usize ,
284+ ) -> eyre:: Result < ControlFlow > {
285+ let mut retries = 0 ;
286+ let building_start_time = Instant :: now ( ) ;
287+
288+ let result = loop {
289+ let span = debug_span ! ( "SubmitTask::retrying_handle_inbound" , retries) ;
290+
291+ let result =
292+ self . handle_inbound ( block) . instrument ( span. clone ( ) ) . await . inspect_err ( |e| {
293+ error ! ( error = %e, "error handling inbound block" ) ;
294+ } ) ?;
295+
296+ let guard = span. entered ( ) ;
297+
298+ match result {
299+ ControlFlow :: Retry => {
300+ retries += 1 ;
301+ if retries > retry_limit {
302+ counter ! ( "builder.building_too_many_retries" ) . increment ( 1 ) ;
303+ return Ok ( ControlFlow :: Skip ) ;
317304 }
318- } else {
319- debug ! ( "upstream task gone" ) ;
320- break ;
305+ error ! ( "error handling inbound block: retrying" ) ;
306+ drop ( guard) ;
307+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 2 ) ) . await ;
308+
309+ continue ;
310+ }
311+ ControlFlow :: Skip => {
312+ counter ! ( "builder.skipped_blocks" ) . increment ( 1 ) ;
313+ break result;
314+ }
315+ ControlFlow :: Done => {
316+ counter ! ( "builder.submitted_successful_blocks" ) . increment ( 1 ) ;
317+ break result;
321318 }
322319 }
323- } ) ;
320+ } ;
321+
322+ // This is reached when `Done` or `Skip` is returned
323+ histogram ! ( "builder.block_build_time" )
324+ . record ( building_start_time. elapsed ( ) . as_millis ( ) as f64 ) ;
325+ info ! ( ?result, "finished block building" ) ;
326+ Ok ( result)
327+ }
328+
329+ async fn task_future ( self , mut inbound : mpsc:: UnboundedReceiver < BuiltBlock > ) {
330+ loop {
331+ let Some ( block) = inbound. recv ( ) . await else {
332+ debug ! ( "upstream task gone" ) ;
333+ break ;
334+ } ;
335+
336+ if self . retrying_handle_inbound ( & block, 3 ) . await . is_err ( ) {
337+ break ;
338+ }
339+ }
340+ }
341+
342+ /// Spawns the in progress block building task
343+ pub fn spawn ( self ) -> ( mpsc:: UnboundedSender < BuiltBlock > , JoinHandle < ( ) > ) {
344+ let ( sender, inbound) = mpsc:: unbounded_channel ( ) ;
345+ let handle = tokio:: spawn ( self . task_future ( inbound) ) ;
324346
325347 ( sender, handle)
326348 }
0 commit comments