@@ -5,7 +5,8 @@ use eyre::Error;
55use reqwest:: { Client , Url } ;
66use serde:: { Deserialize , Serialize } ;
77use serde_json:: from_slice;
8- use tokio:: { sync:: mpsc, task:: JoinHandle } ;
8+ use tokio:: { sync:: mpsc, task:: JoinHandle , time} ;
9+ use tracing:: { Instrument , debug, trace} ;
910
1011/// Models a response from the transaction pool.
1112#[ derive( Debug , Clone , Serialize , Deserialize ) ]
@@ -46,22 +47,48 @@ impl TxPoller {
4647 Ok ( response. transactions )
4748 }
4849
49- /// Spawns a task that continuously polls the cache for transactions and sends any it finds to its sender.
50- pub fn spawn ( mut self ) -> ( mpsc:: UnboundedReceiver < TxEnvelope > , JoinHandle < ( ) > ) {
51- let ( outbound, inbound) = mpsc:: unbounded_channel ( ) ;
52- let jh = tokio:: spawn ( async move {
53- loop {
54- if let Ok ( transactions) = self . check_tx_cache ( ) . await {
55- tracing:: debug!( count = ?transactions. len( ) , "found transactions" ) ;
50+ async fn task_future ( mut self , outbound : mpsc:: UnboundedSender < TxEnvelope > ) {
51+ loop {
52+ let span = tracing:: debug_span!( "TxPoller::loop" , url = %self . config. tx_pool_url) ;
53+
54+ // Enter the span for the next check.
55+ let _guard = span. enter ( ) ;
56+
57+ // Check this here to avoid making the web request if we know
58+ // we don't need the results.
59+ if outbound. is_closed ( ) {
60+ trace ! ( "No receivers left, shutting down" ) ;
61+ break ;
62+ }
63+ // exit the span after the check.
64+ drop ( _guard) ;
65+
66+ match self . check_tx_cache ( ) . instrument ( span. clone ( ) ) . await {
67+ Ok ( transactions) => {
68+ let _guard = span. entered ( ) ;
69+ debug ! ( count = ?transactions. len( ) , "found transactions" ) ;
5670 for tx in transactions. into_iter ( ) {
57- if let Err ( err) = outbound. send ( tx) {
58- tracing:: error!( err = ?err, "failed to send transaction - channel is dropped." ) ;
71+ if outbound. send ( tx) . is_err ( ) {
72+ // If there are no receivers, we can shut down
73+ trace ! ( "No receivers left, shutting down" ) ;
74+ break ;
5975 }
6076 }
6177 }
62- tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( self . poll_interval_ms ) ) . await ;
78+ // If fetching was an error, we log and continue. We expect
79+ // these to be transient network issues.
80+ Err ( e) => {
81+ debug ! ( error = %e, "Error fetching transactions" ) ;
82+ }
6383 }
64- } ) ;
84+ time:: sleep ( time:: Duration :: from_millis ( self . poll_interval_ms ) ) . await ;
85+ }
86+ }
87+
88+ /// Spawns a task that continuously polls the cache for transactions and sends any it finds to its sender.
89+ pub fn spawn ( self ) -> ( mpsc:: UnboundedReceiver < TxEnvelope > , JoinHandle < ( ) > ) {
90+ let ( outbound, inbound) = mpsc:: unbounded_channel ( ) ;
91+ let jh = tokio:: spawn ( self . task_future ( outbound) ) ;
6592 ( inbound, jh)
6693 }
6794}
0 commit comments