From 2aa54bc19975700ccf19af239c7fc7d0b2fa7d1f Mon Sep 17 00:00:00 2001 From: refcell Date: Mon, 30 Oct 2023 20:14:18 -0400 Subject: [PATCH] fix optimism network --- crates/net/network/src/config.rs | 52 +++++++++++++------------- crates/net/network/src/manager.rs | 7 ++-- crates/net/network/src/network.rs | 11 ++---- crates/net/network/src/transactions.rs | 50 +++++++++++-------------- 4 files changed, 56 insertions(+), 64 deletions(-) diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index b03c0e965e2a..02184fbb2557 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -69,20 +69,19 @@ pub struct NetworkConfig { pub status: Status, /// Sets the hello message for the p2p handshake in RLPx pub hello_message: HelloMessage, + /// Whether to disable transaction gossip + pub tx_gossip_disabled: bool, /// Optimism Network Config #[cfg(feature = "optimism")] pub optimism_network_config: OptimismNetworkConfig, } /// Optimmism Network Config +#[cfg(feature = "optimism")] #[derive(Debug)] pub struct OptimismNetworkConfig { /// The sequencer HTTP endpoint, if provided via CLI flag - #[cfg(feature = "optimism")] pub sequencer_endpoint: Option, - /// Whether to disable transaction gossip - #[cfg(feature = "optimism")] - pub tx_gossip_disabled: bool, } // === impl NetworkConfig === @@ -162,12 +161,20 @@ pub struct NetworkConfigBuilder { hello_message: Option, /// Head used to start set for the fork filter and status. head: Option, - /// The sequencer HTTP endpoint, if provided via CLI flag - #[cfg(feature = "optimism")] - sequencer_endpoint: Option, /// Whether tx gossip is disabled - #[cfg(feature = "optimism")] tx_gossip_disabled: bool, + /// Optimism Network Config Builder + #[cfg(feature = "optimism")] + optimism_network_config: OptimismNetworkConfigBuilder, +} + +/// Optimism Network Config Builder +#[cfg(feature = "optimism")] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[derive(Debug)] +pub struct OptimismNetworkConfigBuilder { + /// The sequencer HTTP endpoint, if provided via CLI flag + sequencer_endpoint: Option, } // === impl NetworkConfigBuilder === @@ -189,10 +196,9 @@ impl NetworkConfigBuilder { executor: None, hello_message: None, head: None, - #[cfg(feature = "optimism")] - sequencer_endpoint: None, - #[cfg(feature = "optimism")] tx_gossip_disabled: false, + #[cfg(feature = "optimism")] + optimism_network_config: OptimismNetworkConfigBuilder { sequencer_endpoint: None }, } } @@ -373,17 +379,16 @@ impl NetworkConfigBuilder { } } - /// Sets the sequencer HTTP endpoint. - #[cfg(feature = "optimism")] - pub fn sequencer_endpoint(mut self, endpoint: Option) -> Self { - self.sequencer_endpoint = endpoint; + /// Sets whether tx gossip is disabled. + pub fn disable_tx_gossip(mut self, disable_tx_gossip: bool) -> Self { + self.tx_gossip_disabled = disable_tx_gossip; self } - /// Sets whether tx gossip is disabled. + /// Sets the sequencer HTTP endpoint. #[cfg(feature = "optimism")] - pub fn disable_tx_gossip(mut self, disable_tx_gossip: bool) -> Self { - self.tx_gossip_disabled = disable_tx_gossip; + pub fn sequencer_endpoint(mut self, endpoint: Option) -> Self { + self.optimism_network_config.sequencer_endpoint = endpoint; self } @@ -409,10 +414,9 @@ impl NetworkConfigBuilder { executor, hello_message, head, - #[cfg(feature = "optimism")] - sequencer_endpoint, - #[cfg(feature = "optimism")] tx_gossip_disabled, + #[cfg(feature = "optimism")] + optimism_network_config: OptimismNetworkConfigBuilder { sequencer_endpoint }, } = self; let listener_addr = listener_addr.unwrap_or(DEFAULT_DISCOVERY_ADDRESS); @@ -463,11 +467,9 @@ impl NetworkConfigBuilder { status, hello_message, fork_filter, + tx_gossip_disabled, #[cfg(feature = "optimism")] - optimism_network_config: OptimismNetworkConfig { - sequencer_endpoint, - tx_gossip_disabled, - }, + optimism_network_config: OptimismNetworkConfig { sequencer_endpoint }, } } } diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 1504cbbe17d3..e5d30d2d8332 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -182,9 +182,9 @@ where status, fork_filter, dns_discovery_config, + tx_gossip_disabled, #[cfg(feature = "optimism")] - optimism_network_config: - crate::config::OptimismNetworkConfig { sequencer_endpoint, tx_gossip_disabled }, + optimism_network_config: crate::config::OptimismNetworkConfig { sequencer_endpoint }, } = config; let peers_manager = PeersManager::new(peers_config); @@ -242,10 +242,9 @@ where network_mode, bandwidth_meter, Arc::new(AtomicU64::new(chain_spec.chain.id())), + tx_gossip_disabled, #[cfg(feature = "optimism")] sequencer_endpoint, - #[cfg(feature = "optimism")] - tx_gossip_disabled, ); Ok(Self { diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index dbbd3e3c29be..086070a56d66 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -46,8 +46,8 @@ impl NetworkHandle { network_mode: NetworkMode, bandwidth_meter: BandwidthMeter, chain_id: Arc, + tx_gossip_disabled: bool, #[cfg(feature = "optimism")] sequencer_endpoint: Option, - #[cfg(feature = "optimism")] tx_gossip_disabled: bool, ) -> Self { let inner = NetworkInner { num_active_peers, @@ -60,10 +60,9 @@ impl NetworkHandle { is_syncing: Arc::new(AtomicBool::new(false)), initial_sync_done: Arc::new(AtomicBool::new(false)), chain_id, + tx_gossip_disabled, #[cfg(feature = "optimism")] sequencer_endpoint, - #[cfg(feature = "optimism")] - tx_gossip_disabled, }; Self { inner: Arc::new(inner) } } @@ -179,7 +178,6 @@ impl NetworkHandle { } /// Whether tx gossip is disabled - #[cfg(feature = "optimism")] pub fn tx_gossip_disabled(&self) -> bool { self.inner.tx_gossip_disabled } @@ -334,12 +332,11 @@ struct NetworkInner { initial_sync_done: Arc, /// The chain id chain_id: Arc, + /// Whether to disable transaction gossip + tx_gossip_disabled: bool, /// The sequencer HTTP Endpoint #[cfg(feature = "optimism")] sequencer_endpoint: Option, - /// Whether to disable transaction gossip - #[cfg(feature = "optimism")] - tx_gossip_disabled: bool, } /// Internal messages that can be passed to the [`NetworkManager`](crate::NetworkManager). diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index a9a09b7a8f6c..58a37477f1a2 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -247,10 +247,9 @@ where response: oneshot::Sender>, ) { if let Some(peer) = self.peers.get_mut(&peer_id) { - #[cfg(feature = "optimism")] if self.network.tx_gossip_disabled() { let _ = response.send(Ok(PooledTransactions::default())); - return + return; } let transactions = self .pool @@ -279,11 +278,10 @@ where fn on_new_transactions(&mut self, hashes: Vec) { // Nothing to propagate while initially syncing if self.network.is_initially_syncing() { - return + return; } - #[cfg(feature = "optimism")] if self.network.tx_gossip_disabled() { - return + return; } trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions"); @@ -309,9 +307,8 @@ where to_propagate: Vec, ) -> PropagatedTransactions { let mut propagated = PropagatedTransactions::default(); - #[cfg(feature = "optimism")] if self.network.tx_gossip_disabled() { - return propagated + return propagated; } // send full transactions to a fraction fo the connected peers (square root of the total @@ -419,7 +416,7 @@ where if full_transactions.transactions.is_empty() { // nothing to propagate - return None + return None; } let new_full_transactions = full_transactions.build(); @@ -446,7 +443,7 @@ where let propagated = { let Some(peer) = self.peers.get_mut(&peer_id) else { // no such peer - return + return; }; let to_propagate: Vec = @@ -467,7 +464,7 @@ where if new_pooled_hashes.is_empty() { // nothing to propagate - return + return; } for hash in new_pooled_hashes.iter_hashes().copied() { @@ -495,11 +492,10 @@ where ) { // If the node is initially syncing, ignore transactions if self.network.is_initially_syncing() { - return + return; } - #[cfg(feature = "optimism")] if self.network.tx_gossip_disabled() { - return + return; } let mut num_already_seen = 0; @@ -517,7 +513,7 @@ where if hashes.is_empty() { // nothing to request - return + return; } // enforce recommended soft limit, however the peer may enforce an arbitrary limit on @@ -529,7 +525,7 @@ where self.transaction_fetcher.request_transactions_from_peer(hashes, peer); if !request_sent { self.metrics.egress_peer_channel_full.increment(1); - return + return; } if num_already_seen > 0 { @@ -637,9 +633,8 @@ where // `NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT` transactions in the // pool if !self.network.is_initially_syncing() { - #[cfg(feature = "optimism")] if self.network.tx_gossip_disabled() { - return + return; } let peer = self.peers.get_mut(&peer_id).expect("is present; qed"); @@ -649,7 +644,7 @@ where self.pool.pooled_transactions_max(NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT); if pooled_txs.is_empty() { // do not send a message if there are no transactions in the pool - return + return; } for pooled_tx in pooled_txs.into_iter() { @@ -674,11 +669,10 @@ where ) { // If the node is pipeline syncing, ignore transactions if self.network.is_initially_syncing() { - return + return; } - #[cfg(feature = "optimism")] if self.network.tx_gossip_disabled() { - return + return; } // tracks the quality of the given transactions @@ -692,7 +686,7 @@ where tx } else { has_bad_transactions = true; - continue + continue; }; // track that the peer knows this transaction, but only if this is a new broadcast. @@ -747,7 +741,7 @@ where RequestError::Timeout => ReputationChangeKind::Timeout, RequestError::ChannelClosed | RequestError::ConnectionDropped => { // peer is already disconnected - return + return; } RequestError::BadResponse => ReputationChangeKind::BadTransactions, }; @@ -836,7 +830,7 @@ where if err.is_bad_transaction() && !this.network.is_syncing() { trace!(target: "net::tx", ?err, "Bad transaction import"); this.on_bad_import(err.hash); - continue + continue; } this.on_good_import(err.hash); } @@ -896,7 +890,7 @@ impl FullTransactionsBuilder { fn push(&mut self, transaction: &PropagateTransaction) { let new_size = self.total_size + transaction.size; if new_size > MAX_FULL_TRANSACTIONS_PACKET_SIZE { - return + return; } self.total_size = new_size; @@ -1111,7 +1105,7 @@ impl TransactionFetcher { error: RequestError::ChannelClosed, }) } - } + }; } Poll::Pending } @@ -1161,7 +1155,7 @@ impl TransactionFetcher { // 2. request all missing from peer if announced_hashes.is_empty() { // nothing to request - return false + return false; } let (response, rx) = oneshot::channel(); @@ -1184,7 +1178,7 @@ impl TransactionFetcher { } } } - return false + return false; } else { //create a new request for it, from that peer self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, rx))