Skip to content
This repository has been archived by the owner on Nov 5, 2023. It is now read-only.

fix: Optimism Network #141

Merged
merged 1 commit into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 27 additions & 25 deletions crates/net/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,19 @@ pub struct NetworkConfig<C> {
pub status: Status,
/// Sets the hello message for the p2p handshake in RLPx
pub hello_message: HelloMessage,
/// Whether to disable transaction gossip
pub tx_gossip_disabled: bool,
/// Optimism Network Config
#[cfg(feature = "optimism")]
pub optimism_network_config: OptimismNetworkConfig,
}

/// Optimmism Network Config
#[cfg(feature = "optimism")]
#[derive(Debug)]
pub struct OptimismNetworkConfig {
/// The sequencer HTTP endpoint, if provided via CLI flag
#[cfg(feature = "optimism")]
pub sequencer_endpoint: Option<String>,
/// Whether to disable transaction gossip
#[cfg(feature = "optimism")]
pub tx_gossip_disabled: bool,
}

// === impl NetworkConfig ===
Expand Down Expand Up @@ -162,12 +161,20 @@ pub struct NetworkConfigBuilder {
hello_message: Option<HelloMessage>,
/// Head used to start set for the fork filter and status.
head: Option<Head>,
/// The sequencer HTTP endpoint, if provided via CLI flag
#[cfg(feature = "optimism")]
sequencer_endpoint: Option<String>,
/// Whether tx gossip is disabled
#[cfg(feature = "optimism")]
tx_gossip_disabled: bool,
/// Optimism Network Config Builder
#[cfg(feature = "optimism")]
optimism_network_config: OptimismNetworkConfigBuilder,
}

/// Optimism Network Config Builder
#[cfg(feature = "optimism")]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[derive(Debug)]
pub struct OptimismNetworkConfigBuilder {
/// The sequencer HTTP endpoint, if provided via CLI flag
sequencer_endpoint: Option<String>,
}

// === impl NetworkConfigBuilder ===
Expand All @@ -189,10 +196,9 @@ impl NetworkConfigBuilder {
executor: None,
hello_message: None,
head: None,
#[cfg(feature = "optimism")]
sequencer_endpoint: None,
#[cfg(feature = "optimism")]
tx_gossip_disabled: false,
#[cfg(feature = "optimism")]
optimism_network_config: OptimismNetworkConfigBuilder { sequencer_endpoint: None },
}
}

Expand Down Expand Up @@ -373,17 +379,16 @@ impl NetworkConfigBuilder {
}
}

/// Sets the sequencer HTTP endpoint.
#[cfg(feature = "optimism")]
pub fn sequencer_endpoint(mut self, endpoint: Option<String>) -> Self {
self.sequencer_endpoint = endpoint;
/// Sets whether tx gossip is disabled.
pub fn disable_tx_gossip(mut self, disable_tx_gossip: bool) -> Self {
self.tx_gossip_disabled = disable_tx_gossip;
self
}

/// Sets whether tx gossip is disabled.
/// Sets the sequencer HTTP endpoint.
#[cfg(feature = "optimism")]
pub fn disable_tx_gossip(mut self, disable_tx_gossip: bool) -> Self {
self.tx_gossip_disabled = disable_tx_gossip;
pub fn sequencer_endpoint(mut self, endpoint: Option<String>) -> Self {
self.optimism_network_config.sequencer_endpoint = endpoint;
self
}

Expand All @@ -409,10 +414,9 @@ impl NetworkConfigBuilder {
executor,
hello_message,
head,
#[cfg(feature = "optimism")]
sequencer_endpoint,
#[cfg(feature = "optimism")]
tx_gossip_disabled,
#[cfg(feature = "optimism")]
optimism_network_config: OptimismNetworkConfigBuilder { sequencer_endpoint },
} = self;

let listener_addr = listener_addr.unwrap_or(DEFAULT_DISCOVERY_ADDRESS);
Expand Down Expand Up @@ -463,11 +467,9 @@ impl NetworkConfigBuilder {
status,
hello_message,
fork_filter,
tx_gossip_disabled,
#[cfg(feature = "optimism")]
optimism_network_config: OptimismNetworkConfig {
sequencer_endpoint,
tx_gossip_disabled,
},
optimism_network_config: OptimismNetworkConfig { sequencer_endpoint },
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions crates/net/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ where
status,
fork_filter,
dns_discovery_config,
tx_gossip_disabled,
#[cfg(feature = "optimism")]
optimism_network_config:
crate::config::OptimismNetworkConfig { sequencer_endpoint, tx_gossip_disabled },
optimism_network_config: crate::config::OptimismNetworkConfig { sequencer_endpoint },
} = config;

let peers_manager = PeersManager::new(peers_config);
Expand Down Expand Up @@ -242,10 +242,9 @@ where
network_mode,
bandwidth_meter,
Arc::new(AtomicU64::new(chain_spec.chain.id())),
tx_gossip_disabled,
#[cfg(feature = "optimism")]
sequencer_endpoint,
#[cfg(feature = "optimism")]
tx_gossip_disabled,
);

Ok(Self {
Expand Down
11 changes: 4 additions & 7 deletions crates/net/network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ impl NetworkHandle {
network_mode: NetworkMode,
bandwidth_meter: BandwidthMeter,
chain_id: Arc<AtomicU64>,
tx_gossip_disabled: bool,
#[cfg(feature = "optimism")] sequencer_endpoint: Option<String>,
#[cfg(feature = "optimism")] tx_gossip_disabled: bool,
) -> Self {
let inner = NetworkInner {
num_active_peers,
Expand All @@ -60,10 +60,9 @@ impl NetworkHandle {
is_syncing: Arc::new(AtomicBool::new(false)),
initial_sync_done: Arc::new(AtomicBool::new(false)),
chain_id,
tx_gossip_disabled,
#[cfg(feature = "optimism")]
sequencer_endpoint,
#[cfg(feature = "optimism")]
tx_gossip_disabled,
};
Self { inner: Arc::new(inner) }
}
Expand Down Expand Up @@ -179,7 +178,6 @@ impl NetworkHandle {
}

/// Whether tx gossip is disabled
#[cfg(feature = "optimism")]
pub fn tx_gossip_disabled(&self) -> bool {
self.inner.tx_gossip_disabled
}
Expand Down Expand Up @@ -334,12 +332,11 @@ struct NetworkInner {
initial_sync_done: Arc<AtomicBool>,
/// The chain id
chain_id: Arc<AtomicU64>,
/// Whether to disable transaction gossip
tx_gossip_disabled: bool,
/// The sequencer HTTP Endpoint
#[cfg(feature = "optimism")]
sequencer_endpoint: Option<String>,
/// Whether to disable transaction gossip
#[cfg(feature = "optimism")]
tx_gossip_disabled: bool,
}

/// Internal messages that can be passed to the [`NetworkManager`](crate::NetworkManager).
Expand Down
50 changes: 22 additions & 28 deletions crates/net/network/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,9 @@ where
response: oneshot::Sender<RequestResult<PooledTransactions>>,
) {
if let Some(peer) = self.peers.get_mut(&peer_id) {
#[cfg(feature = "optimism")]
if self.network.tx_gossip_disabled() {
let _ = response.send(Ok(PooledTransactions::default()));
return
return;
}
let transactions = self
.pool
Expand Down Expand Up @@ -279,11 +278,10 @@ where
fn on_new_transactions(&mut self, hashes: Vec<TxHash>) {
// Nothing to propagate while initially syncing
if self.network.is_initially_syncing() {
return
return;
}
#[cfg(feature = "optimism")]
if self.network.tx_gossip_disabled() {
return
return;
}

trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
Expand All @@ -309,9 +307,8 @@ where
to_propagate: Vec<PropagateTransaction>,
) -> PropagatedTransactions {
let mut propagated = PropagatedTransactions::default();
#[cfg(feature = "optimism")]
if self.network.tx_gossip_disabled() {
return propagated
return propagated;
}

// send full transactions to a fraction fo the connected peers (square root of the total
Expand Down Expand Up @@ -419,7 +416,7 @@ where

if full_transactions.transactions.is_empty() {
// nothing to propagate
return None
return None;
}

let new_full_transactions = full_transactions.build();
Expand All @@ -446,7 +443,7 @@ where
let propagated = {
let Some(peer) = self.peers.get_mut(&peer_id) else {
// no such peer
return
return;
};

let to_propagate: Vec<PropagateTransaction> =
Expand All @@ -467,7 +464,7 @@ where

if new_pooled_hashes.is_empty() {
// nothing to propagate
return
return;
}

for hash in new_pooled_hashes.iter_hashes().copied() {
Expand Down Expand Up @@ -495,11 +492,10 @@ where
) {
// If the node is initially syncing, ignore transactions
if self.network.is_initially_syncing() {
return
return;
}
#[cfg(feature = "optimism")]
if self.network.tx_gossip_disabled() {
return
return;
}

let mut num_already_seen = 0;
Expand All @@ -517,7 +513,7 @@ where

if hashes.is_empty() {
// nothing to request
return
return;
}

// enforce recommended soft limit, however the peer may enforce an arbitrary limit on
Expand All @@ -529,7 +525,7 @@ where
self.transaction_fetcher.request_transactions_from_peer(hashes, peer);
if !request_sent {
self.metrics.egress_peer_channel_full.increment(1);
return
return;
}

if num_already_seen > 0 {
Expand Down Expand Up @@ -637,9 +633,8 @@ where
// `NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT` transactions in the
// pool
if !self.network.is_initially_syncing() {
#[cfg(feature = "optimism")]
if self.network.tx_gossip_disabled() {
return
return;
}
let peer = self.peers.get_mut(&peer_id).expect("is present; qed");

Expand All @@ -649,7 +644,7 @@ where
self.pool.pooled_transactions_max(NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT);
if pooled_txs.is_empty() {
// do not send a message if there are no transactions in the pool
return
return;
}

for pooled_tx in pooled_txs.into_iter() {
Expand All @@ -674,11 +669,10 @@ where
) {
// If the node is pipeline syncing, ignore transactions
if self.network.is_initially_syncing() {
return
return;
}
#[cfg(feature = "optimism")]
if self.network.tx_gossip_disabled() {
return
return;
}

// tracks the quality of the given transactions
Expand All @@ -692,7 +686,7 @@ where
tx
} else {
has_bad_transactions = true;
continue
continue;
};

// track that the peer knows this transaction, but only if this is a new broadcast.
Expand Down Expand Up @@ -747,7 +741,7 @@ where
RequestError::Timeout => ReputationChangeKind::Timeout,
RequestError::ChannelClosed | RequestError::ConnectionDropped => {
// peer is already disconnected
return
return;
}
RequestError::BadResponse => ReputationChangeKind::BadTransactions,
};
Expand Down Expand Up @@ -836,7 +830,7 @@ where
if err.is_bad_transaction() && !this.network.is_syncing() {
trace!(target: "net::tx", ?err, "Bad transaction import");
this.on_bad_import(err.hash);
continue
continue;
}
this.on_good_import(err.hash);
}
Expand Down Expand Up @@ -896,7 +890,7 @@ impl FullTransactionsBuilder {
fn push(&mut self, transaction: &PropagateTransaction) {
let new_size = self.total_size + transaction.size;
if new_size > MAX_FULL_TRANSACTIONS_PACKET_SIZE {
return
return;
}

self.total_size = new_size;
Expand Down Expand Up @@ -1111,7 +1105,7 @@ impl TransactionFetcher {
error: RequestError::ChannelClosed,
})
}
}
};
}
Poll::Pending
}
Expand Down Expand Up @@ -1161,7 +1155,7 @@ impl TransactionFetcher {
// 2. request all missing from peer
if announced_hashes.is_empty() {
// nothing to request
return false
return false;
}

let (response, rx) = oneshot::channel();
Expand All @@ -1184,7 +1178,7 @@ impl TransactionFetcher {
}
}
}
return false
return false;
} else {
//create a new request for it, from that peer
self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, rx))
Expand Down
Loading