diff --git a/.github/workflows/assertoor.yml b/.github/workflows/assertoor.yml new file mode 100644 index 000000000000..2bab23be2991 --- /dev/null +++ b/.github/workflows/assertoor.yml @@ -0,0 +1,230 @@ +name: Assertoor Tests + +on: + workflow_dispatch: + schedule: + - cron: '0 0 * * *' + +jobs: + get_tests: + name: "Run assertoor tests on reth pairs" + runs-on: ubuntu-latest + outputs: + test_result: ${{ steps.test_result.outputs.test_result }} + test_status: ${{ steps.test_result.outputs.test_status }} + failed_test_status: ${{ steps.test_result.outputs.failed_test_status }} + steps: + - name: Checkout Repository + uses: actions/checkout@v4 + - name: Setup Kurtosis + shell: bash + run: | + echo "deb [trusted=yes] https://apt.fury.io/kurtosis-tech/ /" | sudo tee /etc/apt/sources.list.d/kurtosis.list + sudo apt update + sudo apt install kurtosis-cli + kurtosis analytics disable + + - name: Run Kurtosis + shell: bash + id: services + run: | + export github_sha=${{ github.sha }} + export github_repository=${{ github.repository }} + + cat etc/assertoor/assertoor-template.yaml | envsubst > etc/assertoor/assertoor.yaml + + kurtosis run github.com/kurtosis-tech/ethereum-package --enclave assertoor-${{ github.run_id }} --args-file etc/assertoor/assertoor.yaml + + enclave_dump=$(kurtosis enclave inspect assertoor-${{ github.run_id }}) + + assertoor_url=$(echo "$enclave_dump" | grep assertoor | grep http | sed 's/.*\(http:\/\/[0-9.:]\+\).*/\1/') + echo "assertoor_url: ${assertoor_url}" + echo "assertoor_url=${assertoor_url}" >> $GITHUB_OUTPUT + + - name: Await test completion + shell: bash + id: test_result + run: | + assertoor_url="${{ steps.services.outputs.assertoor_url }}" + + YELLOW='\033[1;33m' + GRAY='\033[0;37m' + GREEN='\033[0;32m' + RED='\033[0;31m' + NC='\033[0m' + + # print assertor logs + assertoor_container=$(docker container list | grep assertoor | sed 's/^\([^ ]\+\) .*$/\1/') + docker logs -f $assertoor_container & + + # helper to fetch task status for specific test id + get_tasks_status() { + tasks=$(curl -s ${assertoor_url}/api/v1/test_run/$1 | jq -c ".data.tasks[] | {index, parent_index, name, title, status, result}") + declare -A task_graph_map + task_graph_map[0]="" + + while read task; do + task_id=$(echo "$task" | jq -r ".index") + task_parent=$(echo "$task" | jq -r ".parent_index") + task_name=$(echo "$task" | jq -r ".name") + task_title=$(echo "$task" | jq -r ".title") + task_status=$(echo "$task" | jq -r ".status") + task_result=$(echo "$task" | jq -r ".result") + + task_graph="${task_graph_map[$task_parent]}" + task_graph_map[$task_id]="$task_graph |" + if [ ! -z "$task_graph" ]; then + task_graph="${task_graph}- " + fi + + if [ "$task_status" == "pending" ]; then + task_status="${GRAY}pending ${NC}" + elif [ "$task_status" == "running" ]; then + task_status="${YELLOW}running ${NC}" + elif [ "$task_status" == "complete" ]; then + task_status="${GREEN}complete${NC}" + fi + + if [ "$task_result" == "none" ]; then + task_result="${GRAY}none ${NC}" + elif [ "$task_result" == "success" ]; then + task_result="${GREEN}success${NC}" + elif [ "$task_result" == "failure" ]; then + task_result="${RED}failure${NC}" + fi + + echo -e " $(printf '%-4s' "$task_id")\t$task_status\t$task_result\t$(printf '%-50s' "$task_graph$task_name") \t$task_title" + done <<< $(echo "$tasks") + } + + # poll & check test status + final_test_result="" + failed_test_id="" + while true + do + pending_tests=0 + failed_tests=0 + total_tests=0 + running_test="" + + status_lines=() + task_lines="" + status_lines+=("$(date +'%Y-%m-%d %H:%M:%S') Test Status:") + + tests=$(curl -s ${assertoor_url}/api/v1/test_runs | jq -c ".data[] | {run_id, test_id, name, status}") + while read test; do + if [ -z "$test" ]; then + continue + fi + run_id=$(echo "$test" | jq -r ".run_id") + test_id=$(echo "$test" | jq -r ".test_id") + test_name=$(echo "$test" | jq -r ".name") + test_status=$(echo "$test" | jq -r ".status") + + if [ "$test_status" == "pending" ]; then + pending_tests=$(expr $pending_tests + 1) + status_name="${GRAY}pending${NC}" + elif [ "$test_status" == "running" ]; then + pending_tests=$(expr $pending_tests + 1) + running_test="$run_id" + status_name="${YELLOW}running${NC}" + + elif [ "$test_status" == "success" ]; then + status_name="${GREEN}success${NC}" + elif [ "$test_status" == "failure" ]; then + failed_tests=$(expr $failed_tests + 1) + failed_test_id="$run_id" + status_name="${RED}failure${NC}" + else + status_name="$test_status" + fi + status_lines+=(" $(printf '%-3s' "$test_id") $status_name \t$test_name") + total_tests=$(expr $total_tests + 1) + done <<< $(echo "$tests") + + for status_line in "${status_lines[@]}" + do + echo -e "$status_line" + done + + if ! [ -z "$running_test" ]; then + task_lines=$(get_tasks_status "$running_test") + echo "Active Test Task Status:" + echo "$task_lines" + fi + + if [ $failed_tests -gt 0 ]; then + final_test_result="failure" + break + fi + if [ $total_tests -gt 0 ] && [ $pending_tests -le 0 ]; then + final_test_result="success" + break + fi + + sleep 60 + done + + # save test results & status to github output + echo "test_result=$(echo "$final_test_result")" >> $GITHUB_OUTPUT + echo "test_status<> $GITHUB_OUTPUT + for status_line in "${status_lines[@]}" + do + echo -e "$status_line" >> $GITHUB_OUTPUT + done + echo "EOF" >> $GITHUB_OUTPUT + + if ! [ -z "$failed_test_id" ]; then + echo "failed_test_status<> $GITHUB_OUTPUT + get_tasks_status "$failed_test_id" >> $GITHUB_OUTPUT + echo "EOF" >> $GITHUB_OUTPUT + else + echo "failed_test_status=" >> $GITHUB_OUTPUT + fi + + - name: Generate dump and remove kurtosis enclave + shell: bash + run: | + mkdir -p ./temp/dump + cd ./temp/dump + cp ../../etc/assertoor/assertoor.yaml ./kurtosis-params.yaml + + kurtosis enclave dump assertoor-${{ github.run_id }} + kurtosis enclave rm -f assertoor-${{ github.run_id }} + + - name: Upload dump artifact + uses: actions/upload-artifact@v3 + with: + name: "kurtosis-enclave-dump-${{ github.run_id }}" + path: ./temp/dump + + - name: Return test result + shell: bash + run: | + test_result="${{ steps.test_result.outputs.test_result }}" + test_status=$( + cat <<"EOF" + ${{ steps.test_result.outputs.test_status }} + EOF + ) + failed_test_status=$( + cat <<"EOF" + ${{ steps.test_result.outputs.failed_test_status }} + EOF + ) + + echo "Test Result: $test_result" + echo "$test_status" + + if ! [ "$test_result" == "success" ]; then + echo "" + echo "Failed Test Task Status:" + echo "$failed_test_status" + + echo "" + echo "See 'Await test completion' task for detailed logs about this failure!" + echo "" + + exit 1 # fail action + fi + diff --git a/bin/reth/src/commands/test_vectors/tables.rs b/bin/reth/src/commands/test_vectors/tables.rs index 5679317a9a06..6399c81ac235 100644 --- a/bin/reth/src/commands/test_vectors/tables.rs +++ b/bin/reth/src/commands/test_vectors/tables.rs @@ -73,10 +73,11 @@ pub(crate) fn generate_vectors(mut tables: Vec) -> Result<()> { } /// Generates test-vectors for normal tables. Keys are sorted and not repeated. -fn generate_table_vector(runner: &mut TestRunner, per_table: usize) -> Result<()> +fn generate_table_vector(runner: &mut TestRunner, per_table: usize) -> Result<()> where T::Key: Arbitrary + serde::Serialize + Ord + std::hash::Hash, T::Value: Arbitrary + serde::Serialize, + T: Table, { let mut rows = vec![]; let mut seen_keys = HashSet::new(); @@ -109,9 +110,9 @@ where /// Generates test-vectors for DUPSORT tables. Each key has multiple (subkey, value). Keys and /// subkeys are sorted. -fn generate_dupsort_vector(runner: &mut TestRunner, per_table: usize) -> Result<()> +fn generate_dupsort_vector(runner: &mut TestRunner, per_table: usize) -> Result<()> where - T: DupSort, + T: Table + DupSort, T::Key: Arbitrary + serde::Serialize + Ord + std::hash::Hash, T::Value: Arbitrary + serde::Serialize + Ord, { diff --git a/crates/blockchain-tree/src/block_buffer.rs b/crates/blockchain-tree/src/block_buffer.rs index cbdc692b588b..23c6ca6815e0 100644 --- a/crates/blockchain-tree/src/block_buffer.rs +++ b/crates/blockchain-tree/src/block_buffer.rs @@ -215,7 +215,7 @@ mod tests { /// Assert that the block was removed from all buffer collections. fn assert_block_removal(buffer: &BlockBuffer, block: &SealedBlockWithSenders) { - assert!(buffer.blocks.get(&block.hash()).is_none()); + assert!(!buffer.blocks.contains_key(&block.hash())); assert!(buffer .parent_to_child .get(&block.parent_hash) diff --git a/crates/consensus/auto-seal/src/lib.rs b/crates/consensus/auto-seal/src/lib.rs index 2f05b29c7088..55aef9911684 100644 --- a/crates/consensus/auto-seal/src/lib.rs +++ b/crates/consensus/auto-seal/src/lib.rs @@ -347,17 +347,13 @@ impl StorageInner { /// Fills in the post-execution header fields based on the given BundleState and gas used. /// In doing this, the state root is calculated and the final header is returned. - /// - /// This is optimism-specific and contains the `ChainSpec` so the proper state root can be - /// calculated. - #[cfg(feature = "optimism")] pub(crate) fn complete_header( &self, mut header: Header, bundle_state: &BundleStateWithReceipts, client: &S, gas_used: u64, - chain_spec: &ChainSpec, + #[cfg(feature = "optimism")] chain_spec: &ChainSpec, ) -> Result { let receipts = bundle_state.receipts_by_block(header.number); header.receipts_root = if receipts.is_empty() { @@ -369,46 +365,18 @@ impl StorageInner { .collect::>(); header.logs_bloom = receipts_with_bloom.iter().fold(Bloom::ZERO, |bloom, r| bloom | r.bloom); - proofs::calculate_receipt_root_optimism( - &receipts_with_bloom, - chain_spec, - header.timestamp, - ) - }; - - header.gas_used = gas_used; - - // calculate the state root - let state_root = client - .latest() - .map_err(|_| BlockExecutionError::ProviderError)? - .state_root(bundle_state) - .unwrap(); - header.state_root = state_root; - Ok(header) - } - - /// Fills in the post-execution header fields based on the given BundleState and gas used. - /// In doing this, the state root is calculated and the final header is returned. - #[cfg(not(feature = "optimism"))] - pub(crate) fn complete_header( - &self, - mut header: Header, - bundle_state: &BundleStateWithReceipts, - client: &S, - gas_used: u64, - ) -> Result { - let receipts = bundle_state.receipts_by_block(header.number); - header.receipts_root = if receipts.is_empty() { - EMPTY_RECEIPTS - } else { - let receipts_with_bloom = receipts - .iter() - .map(|r| (*r).clone().expect("receipts have not been pruned").into()) - .collect::>(); - header.logs_bloom = - receipts_with_bloom.iter().fold(Bloom::ZERO, |bloom, r| bloom | r.bloom); - proofs::calculate_receipt_root(&receipts_with_bloom) + #[cfg(feature = "optimism")] + { + proofs::calculate_receipt_root_optimism( + &receipts_with_bloom, + chain_spec, + header.timestamp, + ) + } + #[cfg(not(feature = "optimism"))] + { + proofs::calculate_receipt_root(&receipts_with_bloom) + } }; header.gas_used = gas_used; diff --git a/crates/ethereum-forks/src/hardfork.rs b/crates/ethereum-forks/src/hardfork.rs index e7aacd9564aa..6a7dbdb05034 100644 --- a/crates/ethereum-forks/src/hardfork.rs +++ b/crates/ethereum-forks/src/hardfork.rs @@ -69,11 +69,36 @@ pub enum Hardfork { } impl Hardfork { - /// Retrieves the activation block for the specified hardfork on the Ethereum mainnet. - pub fn mainnet_activation_block(&self, chain: Chain) -> Option { - if chain != Chain::mainnet() { - return None + /// Retrieves the consensus type for the specified hardfork. + pub fn consensus_type(&self) -> ConsensusType { + if *self >= Hardfork::Paris { + ConsensusType::ProofOfStake + } else { + ConsensusType::ProofOfWork + } + } + + /// Checks if the hardfork uses Proof of Stake consensus. + pub fn is_proof_of_stake(&self) -> bool { + matches!(self.consensus_type(), ConsensusType::ProofOfStake) + } + + /// Checks if the hardfork uses Proof of Work consensus. + pub fn is_proof_of_work(&self) -> bool { + matches!(self.consensus_type(), ConsensusType::ProofOfWork) + } + + /// Retrieves the activation block for the specified hardfork on the given chain. + pub fn activation_block(&self, chain: Chain) -> Option { + if chain == Chain::mainnet() { + return self.mainnet_activation_block() } + None + } + + /// Retrieves the activation block for the specified hardfork on the Ethereum mainnet. + pub fn mainnet_activation_block(&self) -> Option { + #[allow(unreachable_patterns)] match self { Hardfork::Frontier => Some(0), Hardfork::Homestead => Some(1150000), @@ -95,47 +120,21 @@ impl Hardfork { // upcoming hardforks Hardfork::Cancun => None, - // optimism hardforks - #[cfg(feature = "optimism")] - Hardfork::Bedrock => None, - #[cfg(feature = "optimism")] - Hardfork::Regolith => None, - #[cfg(feature = "optimism")] - Hardfork::Canyon => None, - #[cfg(feature = "optimism")] - Hardfork::Ecotone => None, - } - } - - /// Retrieves the consensus type for the specified hardfork. - pub fn consensus_type(&self) -> ConsensusType { - if *self >= Hardfork::Paris { - ConsensusType::ProofOfStake - } else { - ConsensusType::ProofOfWork + _ => None, } } - /// Checks if the hardfork uses Proof of Stake consensus. - pub fn is_proof_of_stake(&self) -> bool { - matches!(self.consensus_type(), ConsensusType::ProofOfStake) - } - - /// Checks if the hardfork uses Proof of Work consensus. - pub fn is_proof_of_work(&self) -> bool { - matches!(self.consensus_type(), ConsensusType::ProofOfWork) - } - /// Retrieves the activation timestamp for the specified hardfork on the given chain. pub fn activation_timestamp(&self, chain: Chain) -> Option { - if chain != Chain::mainnet() { - return None + if chain == Chain::mainnet() { + return self.mainnet_activation_timestamp() } - self.mainnet_activation_timestamp() + None } /// Retrieves the activation timestamp for the specified hardfork on the Ethereum mainnet. pub fn mainnet_activation_timestamp(&self) -> Option { + #[allow(unreachable_patterns)] match self { Hardfork::Frontier => Some(1438226773), Hardfork::Homestead => Some(1457938193), @@ -153,19 +152,10 @@ impl Hardfork { Hardfork::GrayGlacier => Some(1656586444), Hardfork::Paris => Some(1663224162), Hardfork::Shanghai => Some(1681338455), + Hardfork::Cancun => Some(1710338135), // upcoming hardforks - Hardfork::Cancun => None, - - // optimism hardforks - #[cfg(feature = "optimism")] - Hardfork::Bedrock => None, - #[cfg(feature = "optimism")] - Hardfork::Regolith => None, - #[cfg(feature = "optimism")] - Hardfork::Canyon => None, - #[cfg(feature = "optimism")] - Hardfork::Ecotone => None, + _ => None, } } } diff --git a/crates/net/discv4/src/lib.rs b/crates/net/discv4/src/lib.rs index af93537c1477..8aef5337de78 100644 --- a/crates/net/discv4/src/lib.rs +++ b/crates/net/discv4/src/lib.rs @@ -1378,9 +1378,12 @@ impl Discv4Service { fn respond_closest(&mut self, target: PeerId, to: SocketAddr) { let key = kad_key(target); let expire = self.send_neighbours_expiration(); - let all_nodes = self.kbuckets.closest_values(&key).collect::>(); - for nodes in all_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) { + // get the MAX_NODES_PER_BUCKET closest nodes to the target + let closest_nodes = + self.kbuckets.closest_values(&key).take(MAX_NODES_PER_BUCKET).collect::>(); + + for nodes in closest_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) { let nodes = nodes.iter().map(|node| node.value.record).collect::>(); trace!(target: "discv4", len = nodes.len(), to=?to,"Sent neighbours packet"); let msg = Message::Neighbours(Neighbours { nodes, expire }); diff --git a/crates/net/downloaders/src/file_client.rs b/crates/net/downloaders/src/file_client.rs index 073640427bf6..ebc5fe40895a 100644 --- a/crates/net/downloaders/src/file_client.rs +++ b/crates/net/downloaders/src/file_client.rs @@ -314,7 +314,6 @@ mod tests { async fn test_download_headers_from_file() { // Generate some random blocks let (file, headers, _) = generate_bodies_file(0..=19).await; - // now try to read them back let client = Arc::new(FileClient::from_file(file).await.unwrap()); diff --git a/crates/net/eth-wire/src/types/blocks.rs b/crates/net/eth-wire/src/types/blocks.rs index f0eee6b9cff3..b27c028cf581 100644 --- a/crates/net/eth-wire/src/types/blocks.rs +++ b/crates/net/eth-wire/src/types/blocks.rs @@ -2,9 +2,14 @@ //! types. use alloy_rlp::{RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper}; -use reth_codecs::derive_arbitrary; +use reth_codecs::{add_arbitrary_tests, derive_arbitrary}; use reth_primitives::{BlockBody, BlockHashOrNumber, Header, HeadersDirection, B256}; +#[cfg(any(test, feature = "arbitrary"))] +use proptest::{collection::vec, prelude::*}; +#[cfg(any(test, feature = "arbitrary"))] +use reth_primitives::{generate_valid_header, valid_header_strategy}; + #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; @@ -38,14 +43,46 @@ pub struct GetBlockHeaders { } /// The response to [`GetBlockHeaders`], containing headers if any headers were found. -#[derive_arbitrary(rlp)] #[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[add_arbitrary_tests(rlp, 10)] pub struct BlockHeaders( /// The requested headers. pub Vec
, ); +#[cfg(any(test, feature = "arbitrary"))] +impl proptest::arbitrary::Arbitrary for BlockHeaders { + type Parameters = (); + type Strategy = proptest::prelude::BoxedStrategy; + + fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { + let headers_strategy = vec(valid_header_strategy(), 0..10); // Adjust the range as needed + + headers_strategy.prop_map(BlockHeaders).boxed() + } +} + +#[cfg(any(test, feature = "arbitrary"))] +impl<'a> arbitrary::Arbitrary<'a> for BlockHeaders { + fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result { + let headers_count: usize = u.int_in_range(0..=10)?; + let mut headers = Vec::with_capacity(headers_count); + + for _ in 0..headers_count { + headers.push(generate_valid_header( + u.arbitrary()?, + u.arbitrary()?, + u.arbitrary()?, + u.arbitrary()?, + u.arbitrary()?, + )) + } + + Ok(BlockHeaders(headers)) + } +} + impl From> for BlockHeaders { fn from(headers: Vec
) -> Self { BlockHeaders(headers) diff --git a/crates/net/eth-wire/src/types/broadcast.rs b/crates/net/eth-wire/src/types/broadcast.rs index 5c856c43ba69..a789b81362fa 100644 --- a/crates/net/eth-wire/src/types/broadcast.rs +++ b/crates/net/eth-wire/src/types/broadcast.rs @@ -68,9 +68,9 @@ impl From for Vec { /// A new block with the current total difficulty, which includes the difficulty of the returned /// block. -#[derive_arbitrary(rlp, 25)] #[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable, Default)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[derive_arbitrary(rlp, 25)] pub struct NewBlock { /// A new block. pub block: Block, diff --git a/crates/net/network/src/cache.rs b/crates/net/network/src/cache.rs index 3a18d67450c5..c7fa271f1cb6 100644 --- a/crates/net/network/src/cache.rs +++ b/crates/net/network/src/cache.rs @@ -62,10 +62,10 @@ impl LruCache { } /// Returns `true` if the set contains a value. - pub fn contains(&self, value: &Q) -> bool + pub fn contains(&self, value: &Q) -> bool where T: Borrow, - Q: Hash + Eq, + Q: Hash + Eq + ?Sized, { self.inner.contains(value) } diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 4d31876ce4af..9dbf058e858e 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -56,6 +56,7 @@ use std::{ Arc, }, task::{Context, Poll}, + time::{Duration, Instant}, }; use tokio::sync::mpsc::{self, error::TrySendError}; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -147,6 +148,19 @@ impl NetworkManager { pub fn secret_key(&self) -> SecretKey { self.swarm.sessions().secret_key() } + + #[inline] + fn update_poll_metrics(&self, start: Instant, poll_durations: NetworkManagerPollDurations) { + let metrics = &self.metrics; + + let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations; + + // update metrics for whole poll function + metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64()); + // update poll metrics for nested items + metrics.duration_poll_network_handle.set(acc_network_handle.as_secs_f64()); + metrics.duration_poll_swarm.set(acc_swarm.as_secs_f64()); + } } impl NetworkManager @@ -602,6 +616,249 @@ where } } } + + fn on_swarm_event(&mut self, event: SwarmEvent) { + // handle event + match event { + SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message), + SwarmEvent::InvalidCapabilityMessage { peer_id, capabilities, message } => { + self.on_invalid_message(peer_id, capabilities, message); + self.metrics.invalid_messages_received.increment(1); + } + SwarmEvent::TcpListenerClosed { remote_addr } => { + trace!(target: "net", ?remote_addr, "TCP listener closed."); + } + SwarmEvent::TcpListenerError(err) => { + trace!(target: "net", %err, "TCP connection error."); + } + SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => { + trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection"); + self.metrics.total_incoming_connections.increment(1); + self.metrics + .incoming_connections + .set(self.swarm.state().peers().num_inbound_connections() as f64); + } + SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => { + trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection."); + self.metrics.total_outgoing_connections.increment(1); + self.metrics + .outgoing_connections + .set(self.swarm.state().peers().num_outbound_connections() as f64); + } + SwarmEvent::SessionEstablished { + peer_id, + remote_addr, + client_version, + capabilities, + version, + messages, + status, + direction, + } => { + let total_active = self.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1; + self.metrics.connected_peers.set(total_active as f64); + trace!( + target: "net", + ?remote_addr, + %client_version, + ?peer_id, + ?total_active, + kind=%direction, + peer_enode=%NodeRecord::new(remote_addr, peer_id), + "Session established" + ); + + if direction.is_incoming() { + self.swarm + .state_mut() + .peers_mut() + .on_incoming_session_established(peer_id, remote_addr); + } + self.event_listeners.notify(NetworkEvent::SessionEstablished { + peer_id, + remote_addr, + client_version, + capabilities, + version, + status, + messages, + }); + } + SwarmEvent::PeerAdded(peer_id) => { + trace!(target: "net", ?peer_id, "Peer added"); + self.event_listeners.notify(NetworkEvent::PeerAdded(peer_id)); + self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64); + } + SwarmEvent::PeerRemoved(peer_id) => { + trace!(target: "net", ?peer_id, "Peer dropped"); + self.event_listeners.notify(NetworkEvent::PeerRemoved(peer_id)); + self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64); + } + SwarmEvent::SessionClosed { peer_id, remote_addr, error } => { + let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1; + self.metrics.connected_peers.set(total_active as f64); + trace!( + target: "net", + ?remote_addr, + ?peer_id, + ?total_active, + ?error, + "Session disconnected" + ); + + let mut reason = None; + if let Some(ref err) = error { + // If the connection was closed due to an error, we report + // the peer + self.swarm.state_mut().peers_mut().on_active_session_dropped( + &remote_addr, + &peer_id, + err, + ); + reason = err.as_disconnected(); + } else { + // Gracefully disconnected + self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id); + } + self.metrics.closed_sessions.increment(1); + // This can either be an incoming or outgoing connection which + // was closed. So we update + // both metrics + self.metrics + .incoming_connections + .set(self.swarm.state().peers().num_inbound_connections() as f64); + self.metrics + .outgoing_connections + .set(self.swarm.state().peers().num_outbound_connections() as f64); + if let Some(reason) = reason { + self.disconnect_metrics.increment(reason); + } + self.metrics.backed_off_peers.set( + self.swarm + .state() + .peers() + .num_backed_off_peers() + .saturating_sub(1) + as f64, + ); + self.event_listeners.notify(NetworkEvent::SessionClosed { peer_id, reason }); + } + SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => { + trace!( + target: "net", + ?remote_addr, + ?error, + "Incoming pending session failed" + ); + + if let Some(ref err) = error { + self.swarm + .state_mut() + .peers_mut() + .on_incoming_pending_session_dropped(remote_addr, err); + self.metrics.pending_session_failures.increment(1); + if let Some(reason) = err.as_disconnected() { + self.disconnect_metrics.increment(reason); + } + } else { + self.swarm + .state_mut() + .peers_mut() + .on_incoming_pending_session_gracefully_closed(); + } + self.metrics.closed_sessions.increment(1); + self.metrics + .incoming_connections + .set(self.swarm.state().peers().num_inbound_connections() as f64); + self.metrics.backed_off_peers.set( + self.swarm + .state() + .peers() + .num_backed_off_peers() + .saturating_sub(1) + as f64, + ); + } + SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => { + trace!( + target: "net", + ?remote_addr, + ?peer_id, + ?error, + "Outgoing pending session failed" + ); + + if let Some(ref err) = error { + self.swarm.state_mut().peers_mut().on_pending_session_dropped( + &remote_addr, + &peer_id, + err, + ); + self.metrics.pending_session_failures.increment(1); + if let Some(reason) = err.as_disconnected() { + self.disconnect_metrics.increment(reason); + } + } else { + self.swarm + .state_mut() + .peers_mut() + .on_pending_session_gracefully_closed(&peer_id); + } + self.metrics.closed_sessions.increment(1); + self.metrics + .outgoing_connections + .set(self.swarm.state().peers().num_outbound_connections() as f64); + self.metrics.backed_off_peers.set( + self.swarm + .state() + .peers() + .num_backed_off_peers() + .saturating_sub(1) + as f64, + ); + } + SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => { + trace!( + target: "net", + ?remote_addr, + ?peer_id, + %error, + "Outgoing connection error" + ); + + self.swarm.state_mut().peers_mut().on_outgoing_connection_failure( + &remote_addr, + &peer_id, + &error, + ); + + self.metrics + .outgoing_connections + .set(self.swarm.state().peers().num_outbound_connections() as f64); + self.metrics.backed_off_peers.set( + self.swarm + .state() + .peers() + .num_backed_off_peers() + .saturating_sub(1) + as f64, + ); + } + SwarmEvent::BadMessage { peer_id } => { + self.swarm + .state_mut() + .peers_mut() + .apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage); + self.metrics.invalid_messages_received.increment(1); + } + SwarmEvent::ProtocolBreach { peer_id } => { + self.swarm + .state_mut() + .peers_mut() + .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol); + } + } + } } impl NetworkManager @@ -639,20 +896,25 @@ where type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let start = Instant::now(); + let mut poll_durations = NetworkManagerPollDurations::default(); + let this = self.get_mut(); - // poll new block imports + // poll new block imports (expected to be a noop for POS) while let Poll::Ready(outcome) = this.block_import.poll(cx) { this.on_block_import_result(outcome); } // process incoming messages from a handle + let start_network_handle = Instant::now(); loop { match this.from_handle_rx.poll_next_unpin(cx) { Poll::Pending => break, Poll::Ready(None) => { - // This is only possible if the channel was deliberately closed since we always - // have an instance of `NetworkHandle` + // This is only possible if the channel was deliberately closed since we + // always have an instance of + // `NetworkHandle` error!("Network message channel closed."); return Poll::Ready(()) } @@ -660,22 +922,25 @@ where }; } - // This loop drives the entire state of network and does a lot of work. - // Under heavy load (many messages/events), data may arrive faster than it can be processed - // (incoming messages/requests -> events), and it is possible that more data has already - // arrived by the time an internal event is processed. Which could turn this loop into a - // busy loop. Without yielding back to the executor, it can starve other tasks waiting on - // that executor to execute them, or drive underlying resources To prevent this, we - // preemptively return control when the `budget` is exhausted. The value itself is - // chosen somewhat arbitrarily, it is high enough so the swarm can make meaningful progress - // but low enough that this loop does not starve other tasks for too long. - // If the budget is exhausted we manually yield back control to the (coop) scheduler. This - // manual yield point should prevent situations where polling appears to be frozen. See also - // And tokio's docs on cooperative scheduling + poll_durations.acc_network_handle = start_network_handle.elapsed(); + + // This loop drives the entire state of network and does a lot of work. Under heavy load + // (many messages/events), data may arrive faster than it can be processed (incoming + // messages/requests -> events), and it is possible that more data has already arrived by + // the time an internal event is processed. Which could turn this loop into a busy loop. + // Without yielding back to the executor, it can starve other tasks waiting on that + // executor to execute them, or drive underlying resources To prevent this, we + // preemptively return control when the `budget` is exhausted. The value itself is chosen + // somewhat arbitrarily, it is high enough so the swarm can make meaningful progress but + // low enough that this loop does not starve other tasks for too long. If the budget is + // exhausted we manually yield back control to the (coop) scheduler. This manual yield + // point should prevent situations where polling appears to be frozen. See also + // And tokio's docs on cooperative scheduling + // // // Testing has shown that this loop naturally reaches the pending state within 1-5 - // iterations in << 100µs in most cases. On average it requires ~50µs, which is inside - // the range of what's recommended as rule of thumb. + // iterations in << 100µs in most cases. On average it requires ~50µs, which is inside the + // range of what's recommended as rule of thumb. // let mut budget = 10; @@ -683,246 +948,7 @@ where // advance the swarm match this.swarm.poll_next_unpin(cx) { Poll::Pending | Poll::Ready(None) => break, - Poll::Ready(Some(event)) => { - // handle event - match event { - SwarmEvent::ValidMessage { peer_id, message } => { - this.on_peer_message(peer_id, message) - } - SwarmEvent::InvalidCapabilityMessage { peer_id, capabilities, message } => { - this.on_invalid_message(peer_id, capabilities, message); - this.metrics.invalid_messages_received.increment(1); - } - SwarmEvent::TcpListenerClosed { remote_addr } => { - trace!(target: "net", ?remote_addr, "TCP listener closed."); - } - SwarmEvent::TcpListenerError(err) => { - trace!(target: "net", %err, "TCP connection error."); - } - SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => { - trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection"); - this.metrics.total_incoming_connections.increment(1); - this.metrics - .incoming_connections - .set(this.swarm.state().peers().num_inbound_connections() as f64); - } - SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => { - trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection."); - this.metrics.total_outgoing_connections.increment(1); - this.metrics - .outgoing_connections - .set(this.swarm.state().peers().num_outbound_connections() as f64); - } - SwarmEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - version, - messages, - status, - direction, - } => { - let total_active = - this.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1; - this.metrics.connected_peers.set(total_active as f64); - trace!( - target: "net", - ?remote_addr, - %client_version, - ?peer_id, - ?total_active, - kind=%direction, - peer_enode=%NodeRecord::new(remote_addr, peer_id), - "Session established" - ); - - if direction.is_incoming() { - this.swarm - .state_mut() - .peers_mut() - .on_incoming_session_established(peer_id, remote_addr); - } - this.event_listeners.notify(NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - version, - status, - messages, - }); - } - SwarmEvent::PeerAdded(peer_id) => { - trace!(target: "net", ?peer_id, "Peer added"); - this.event_listeners.notify(NetworkEvent::PeerAdded(peer_id)); - this.metrics - .tracked_peers - .set(this.swarm.state().peers().num_known_peers() as f64); - } - SwarmEvent::PeerRemoved(peer_id) => { - trace!(target: "net", ?peer_id, "Peer dropped"); - this.event_listeners.notify(NetworkEvent::PeerRemoved(peer_id)); - this.metrics - .tracked_peers - .set(this.swarm.state().peers().num_known_peers() as f64); - } - SwarmEvent::SessionClosed { peer_id, remote_addr, error } => { - let total_active = - this.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1; - this.metrics.connected_peers.set(total_active as f64); - trace!( - target: "net", - ?remote_addr, - ?peer_id, - ?total_active, - ?error, - "Session disconnected" - ); - - let mut reason = None; - if let Some(ref err) = error { - // If the connection was closed due to an error, we report the peer - this.swarm.state_mut().peers_mut().on_active_session_dropped( - &remote_addr, - &peer_id, - err, - ); - reason = err.as_disconnected(); - } else { - // Gracefully disconnected - this.swarm - .state_mut() - .peers_mut() - .on_active_session_gracefully_closed(peer_id); - } - this.metrics.closed_sessions.increment(1); - // This can either be an incoming or outgoing connection which was - // closed. So we update both metrics - this.metrics - .incoming_connections - .set(this.swarm.state().peers().num_inbound_connections() as f64); - this.metrics - .outgoing_connections - .set(this.swarm.state().peers().num_outbound_connections() as f64); - if let Some(reason) = reason { - this.disconnect_metrics.increment(reason); - } - this.metrics.backed_off_peers.set( - this.swarm.state().peers().num_backed_off_peers().saturating_sub(1) - as f64, - ); - this.event_listeners - .notify(NetworkEvent::SessionClosed { peer_id, reason }); - } - SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => { - trace!( - target: "net", - ?remote_addr, - ?error, - "Incoming pending session failed" - ); - - if let Some(ref err) = error { - this.swarm - .state_mut() - .peers_mut() - .on_incoming_pending_session_dropped(remote_addr, err); - this.metrics.pending_session_failures.increment(1); - if let Some(reason) = err.as_disconnected() { - this.disconnect_metrics.increment(reason); - } - } else { - this.swarm - .state_mut() - .peers_mut() - .on_incoming_pending_session_gracefully_closed(); - } - this.metrics.closed_sessions.increment(1); - this.metrics - .incoming_connections - .set(this.swarm.state().peers().num_inbound_connections() as f64); - this.metrics.backed_off_peers.set( - this.swarm.state().peers().num_backed_off_peers().saturating_sub(1) - as f64, - ); - } - SwarmEvent::OutgoingPendingSessionClosed { - remote_addr, - peer_id, - error, - } => { - trace!( - target: "net", - ?remote_addr, - ?peer_id, - ?error, - "Outgoing pending session failed" - ); - - if let Some(ref err) = error { - this.swarm.state_mut().peers_mut().on_pending_session_dropped( - &remote_addr, - &peer_id, - err, - ); - this.metrics.pending_session_failures.increment(1); - if let Some(reason) = err.as_disconnected() { - this.disconnect_metrics.increment(reason); - } - } else { - this.swarm - .state_mut() - .peers_mut() - .on_pending_session_gracefully_closed(&peer_id); - } - this.metrics.closed_sessions.increment(1); - this.metrics - .outgoing_connections - .set(this.swarm.state().peers().num_outbound_connections() as f64); - this.metrics.backed_off_peers.set( - this.swarm.state().peers().num_backed_off_peers().saturating_sub(1) - as f64, - ); - } - SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => { - trace!( - target: "net", - ?remote_addr, - ?peer_id, - %error, - "Outgoing connection error" - ); - - this.swarm.state_mut().peers_mut().on_outgoing_connection_failure( - &remote_addr, - &peer_id, - &error, - ); - - this.metrics - .outgoing_connections - .set(this.swarm.state().peers().num_outbound_connections() as f64); - this.metrics.backed_off_peers.set( - this.swarm.state().peers().num_backed_off_peers().saturating_sub(1) - as f64, - ); - } - SwarmEvent::BadMessage { peer_id } => { - this.swarm.state_mut().peers_mut().apply_reputation_change( - &peer_id, - ReputationChangeKind::BadMessage, - ); - this.metrics.invalid_messages_received.increment(1); - } - SwarmEvent::ProtocolBreach { peer_id } => { - this.swarm.state_mut().peers_mut().apply_reputation_change( - &peer_id, - ReputationChangeKind::BadProtocol, - ); - } - } - } + Poll::Ready(Some(event)) => this.on_swarm_event(event), } // ensure we still have enough budget for another iteration @@ -935,6 +961,11 @@ where } } + poll_durations.acc_swarm = + start_network_handle.elapsed() - poll_durations.acc_network_handle; + + this.update_poll_metrics(start, poll_durations); + Poll::Pending } } @@ -979,3 +1010,9 @@ pub enum NetworkEvent { pub enum DiscoveredEvent { EventQueued { peer_id: PeerId, socket_addr: SocketAddr, fork_id: Option }, } + +#[derive(Debug, Default)] +struct NetworkManagerPollDurations { + acc_network_handle: Duration, + acc_swarm: Duration, +} diff --git a/crates/net/network/src/metrics.rs b/crates/net/network/src/metrics.rs index fe3936d894fd..6a85d47c6f65 100644 --- a/crates/net/network/src/metrics.rs +++ b/crates/net/network/src/metrics.rs @@ -1,8 +1,10 @@ +use metrics::Histogram; use reth_eth_wire::DisconnectReason; use reth_metrics::{ metrics::{Counter, Gauge}, Metrics, }; +use reth_primitives::TxType; /// Scope for monitoring transactions sent from the manager to the tx manager pub(crate) const NETWORK_POOL_TRANSACTIONS_SCOPE: &str = "network.pool.transactions"; @@ -43,6 +45,30 @@ pub struct NetworkMetrics { /// Number of Eth Requests dropped due to channel being at full capacity pub(crate) total_dropped_eth_requests_at_full_capacity: Counter, + + /* ================ POLL DURATION ================ */ + + /* -- Total poll duration of `NetworksManager` future -- */ + /// Duration in seconds of call to + /// [`NetworkManager`](crate::NetworkManager)'s poll function. + /// + /// True duration of this call, should be sum of the accumulated durations of calling nested + // items. + pub(crate) duration_poll_network_manager: Gauge, + + /* -- Poll duration of items nested in `NetworkManager` future -- */ + /// Time spent streaming messages sent over the [`NetworkHandle`](crate::NetworkHandle), which + /// can be cloned and shared via [`NetworkManager::handle`](crate::NetworkManager::handle), in + /// one call to poll the [`NetworkManager`](crate::NetworkManager) future. + /// + /// Duration in seconds. + // todo: find out how many components hold the network handle. + pub(crate) duration_poll_network_handle: Gauge, + /// Time spent polling [`Swarm`](crate::swarm::Swarm), in one call to poll the + /// [`NetworkManager`](crate::NetworkManager) future. + /// + /// Duration in seconds. + pub(crate) duration_poll_swarm: Gauge, } /// Metrics for SessionManager @@ -117,7 +143,7 @@ pub struct TransactionsManagerMetrics { /// [`TransactionsManager`](crate::transactions::TransactionsManager)'s poll function. /// /// Updating metrics could take time, so the true duration of this call could - /// be longer than the sum of the accumulated durations of polling nested streams. + /// be longer than the sum of the accumulated durations of polling nested items. pub(crate) duration_poll_tx_manager: Gauge, /* -- Poll duration of items nested in `TransactionsManager` future -- */ @@ -165,6 +191,19 @@ pub struct TransactionsManagerMetrics { pub(crate) acc_duration_poll_commands: Gauge, } +/// Measures the duration of executing the given code block. The duration is added to the given +/// accumulator value passed as a mutable reference. +#[macro_export] +macro_rules! duration_metered_exec { + ($code:block, $acc:ident) => { + let start = Instant::now(); + + $code; + + *$acc += start.elapsed(); + }; +} + /// Metrics for Disconnection types /// /// These are just counters, and ideally we would implement these metrics on a peer-by-peer basis, @@ -244,3 +283,60 @@ pub struct EthRequestHandlerMetrics { /// Number of received bodies requests pub(crate) received_bodies_requests: Counter, } + +/// Eth67 announcement metrics, track entries by TxType +#[derive(Metrics)] +#[metrics(scope = "network.transaction_fetcher")] +pub struct AnnouncedTxTypesMetrics { + /// Histogram for tracking frequency of legacy transaction type + pub(crate) legacy: Histogram, + + /// Histogram for tracking frequency of EIP-2930 transaction type + pub(crate) eip2930: Histogram, + + /// Histogram for tracking frequency of EIP-1559 transaction type + pub(crate) eip1559: Histogram, + + /// Histogram for tracking frequency of EIP-4844 transaction type + pub(crate) eip4844: Histogram, +} + +#[derive(Debug, Default)] +pub struct TxTypesCounter { + pub(crate) legacy: usize, + pub(crate) eip2930: usize, + pub(crate) eip1559: usize, + pub(crate) eip4844: usize, +} + +impl TxTypesCounter { + pub(crate) fn increase_by_tx_type(&mut self, tx_type: TxType) { + match tx_type { + TxType::Legacy => { + self.legacy += 1; + } + TxType::EIP2930 => { + self.eip2930 += 1; + } + TxType::EIP1559 => { + self.eip1559 += 1; + } + TxType::EIP4844 => { + self.eip4844 += 1; + } + #[cfg(feature = "optimism")] + TxType::DEPOSIT => {} + } + } +} + +impl AnnouncedTxTypesMetrics { + /// Update metrics during announcement validation, by examining each announcement entry based on + /// TxType + pub(crate) fn update_eth68_announcement_metrics(&self, tx_types_counter: TxTypesCounter) { + self.legacy.record(tx_types_counter.legacy as f64); + self.eip2930.record(tx_types_counter.eip2930 as f64); + self.eip1559.record(tx_types_counter.eip1559 as f64); + self.eip4844.record(tx_types_counter.eip4844 as f64); + } +} diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index 4210cb029ecd..35c39c5c9cd4 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -1715,7 +1715,7 @@ mod tests { }) .await; - assert!(peers.peers.get(&peer).is_none()); + assert!(!peers.peers.contains_key(&peer)); } #[tokio::test] @@ -1770,7 +1770,7 @@ mod tests { }) .await; - assert!(peers.peers.get(&peer).is_none()); + assert!(!peers.peers.contains_key(&peer)); } #[tokio::test] @@ -1826,7 +1826,7 @@ mod tests { }) .await; - assert!(peers.peers.get(&peer).is_none()); + assert!(!peers.peers.contains_key(&peer)); } #[tokio::test] @@ -1991,7 +1991,7 @@ mod tests { assert_eq!(p.state, PeerConnectionState::DisconnectingOut); peers.on_active_session_gracefully_closed(peer); - assert!(peers.peers.get(&peer).is_none()); + assert!(!peers.peers.contains_key(&peer)); } #[tokio::test] @@ -2240,7 +2240,7 @@ mod tests { assert!(peer.remove_after_disconnect); peers.on_active_session_gracefully_closed(peer_id); - assert!(peers.peers.get(&peer_id).is_none()) + assert!(!peers.peers.contains_key(&peer_id)) } #[tokio::test] diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index 81f7880c7ce9..4c27b745d842 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -17,6 +17,7 @@ use reth_metrics::common::mpsc::{ }; use reth_primitives::{PeerId, PooledTransactionsElement, TxHash}; use schnellru::{ByLength, Unlimited}; +#[cfg(debug_assertions)] use smallvec::{smallvec, SmallVec}; use std::{ collections::HashMap, @@ -477,7 +478,7 @@ impl TransactionFetcher { } // hash has been seen and is in flight. store peer as fallback peer. // - // remove any ended sessions, so that in case of a full cache, alive peers aren't + // remove any ended sessions, so that in case of a full cache, alive peers aren't // removed in favour of lru dead peers let mut ended_sessions = vec!(); for &peer_id in fallback_peers.iter() { @@ -605,7 +606,7 @@ impl TransactionFetcher { true }(), "`%new_announced_hashes` should been taken out of buffer before packing in a request, breaks invariant `@buffered_hashes` and `@inflight_requests`, -`%new_announced_hashes`: {:?}, +`%new_announced_hashes`: {:?}, `@self`: {:?}", new_announced_hashes, self ); @@ -725,7 +726,7 @@ impl TransactionFetcher { } } - /// Returns `true` if [`TransactionFetcher`] has capacity to request pending hashes. Returns + /// Returns `true` if [`TransactionFetcher`] has capacity to request pending hashes. Returns /// `false` if [`TransactionFetcher`] is operating close to full capacity. pub fn has_capacity_for_fetching_pending_hashes(&self) -> bool { let info = &self.info; @@ -1111,7 +1112,7 @@ impl VerifyPooledTransactionsResponse for UnverifiedPooledTransactions { fn verify( self, requested_hashes: &[TxHash], - peer_id: &PeerId, + _peer_id: &PeerId, ) -> (VerificationOutcome, VerifiedPooledTransactions) { let mut verification_outcome = VerificationOutcome::Ok; @@ -1134,7 +1135,7 @@ impl VerifyPooledTransactionsResponse for UnverifiedPooledTransactions { #[cfg(debug_assertions)] trace!(target: "net::tx", - peer_id=format!("{peer_id:#}"), + peer_id=format!("{_peer_id:#}"), tx_hashes_not_requested=?tx_hashes_not_requested, "transactions in `PooledTransactions` response from peer were not requested" ); diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 4b0bc01558a9..700da8f2bdd7 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -29,6 +29,7 @@ use crate::{ cache::LruCache, + duration_metered_exec, manager::NetworkEvent, message::{PeerRequest, PeerRequestSender}, metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE}, @@ -369,7 +370,7 @@ where // update metrics for whole poll function metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64()); - // update poll metrics for nested streams + // update metrics for nested expressions metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64()); metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64()); metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64()); @@ -1163,18 +1164,6 @@ where } } -/// Measures the duration of executing the given code block. The duration is added to the given -/// accumulator value passed as a mutable reference. -macro_rules! duration_metered_exec { - ($code:block, $acc:ident) => { - let start = Instant::now(); - - $code; - - *$acc += start.elapsed(); - }; -} - #[derive(Debug, Default)] struct TxManagerPollDurations { acc_network_events: Duration, diff --git a/crates/net/network/src/transactions/validation.rs b/crates/net/network/src/transactions/validation.rs index 585bb3b73e0f..80a40f455f9a 100644 --- a/crates/net/network/src/transactions/validation.rs +++ b/crates/net/network/src/transactions/validation.rs @@ -2,15 +2,16 @@ //! and [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68) //! announcements. Validation and filtering of announcements is network dependent. -use std::{fmt, mem}; +use std::{fmt, fmt::Display, mem}; -use derive_more::{Deref, DerefMut, Display}; +use crate::metrics::{AnnouncedTxTypesMetrics, TxTypesCounter}; +use derive_more::{Deref, DerefMut}; use reth_eth_wire::{ DedupPayload, Eth68TxMetadata, HandleMempoolData, PartiallyValidData, ValidAnnouncementData, MAX_MESSAGE_SIZE, }; use reth_primitives::{Signature, TxHash, TxType}; -use tracing::{debug, trace}; +use tracing::trace; /// The size of a decoded signature in bytes. pub const SIGNATURE_DECODED_SIZE_BYTES: usize = mem::size_of::(); @@ -22,7 +23,13 @@ pub trait ValidateTx68 { /// entry. Returns [`ValidationOutcome`] which signals to the caller wether to fetch the /// transaction or wether to drop it, and wether the sender of the announcement should be /// penalized. - fn should_fetch(&self, ty: u8, hash: &TxHash, size: usize) -> ValidationOutcome; + fn should_fetch( + &self, + ty: u8, + hash: &TxHash, + size: usize, + tx_types_counter: &mut TxTypesCounter, + ) -> ValidationOutcome; /// Returns the reasonable maximum encoded transaction length configured for this network, if /// any. This property is not spec'ed out but can be inferred by looking how much data can be @@ -69,7 +76,7 @@ pub trait PartiallyFilterMessage { ) -> (FilterOutcome, PartiallyValidData) { // 1. checks if the announcement is empty if msg.is_empty() { - debug!(target: "net::tx", + trace!(target: "net::tx", msg=?msg, "empty payload" ); @@ -137,30 +144,45 @@ pub enum FilterOutcome { pub struct MessageFilter(N); /// Filter for announcements containing EIP [`TxType`]s. -#[derive(Debug, Display, Default)] -pub struct EthMessageFilter; +#[derive(Debug, Default)] +pub struct EthMessageFilter { + announced_tx_types_metrics: AnnouncedTxTypesMetrics, +} + +impl Display for EthMessageFilter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "EthMessageFilter") + } +} impl PartiallyFilterMessage for EthMessageFilter {} impl ValidateTx68 for EthMessageFilter { - fn should_fetch(&self, ty: u8, hash: &TxHash, size: usize) -> ValidationOutcome { + fn should_fetch( + &self, + ty: u8, + hash: &TxHash, + size: usize, + tx_types_counter: &mut TxTypesCounter, + ) -> ValidationOutcome { // // 1. checks if tx type is valid value for this network // let tx_type = match TxType::try_from(ty) { Ok(ty) => ty, Err(_) => { - debug!(target: "net::eth-wire", + trace!(target: "net::eth-wire", ty=ty, size=size, hash=%hash, - network=%Self, + network=%self, "invalid tx type in eth68 announcement" ); return ValidationOutcome::ReportPeer } }; + tx_types_counter.increase_by_tx_type(tx_type); // // 2. checks if tx's encoded length is within limits for this network @@ -170,12 +192,12 @@ impl ValidateTx68 for EthMessageFilter { // if let Some(strict_min_encoded_tx_length) = self.strict_min_encoded_tx_length(tx_type) { if size < strict_min_encoded_tx_length { - debug!(target: "net::eth-wire", + trace!(target: "net::eth-wire", ty=ty, size=size, hash=%hash, strict_min_encoded_tx_length=strict_min_encoded_tx_length, - network=%Self, + network=%self, "invalid tx size in eth68 announcement" ); @@ -184,13 +206,13 @@ impl ValidateTx68 for EthMessageFilter { } if let Some(reasonable_min_encoded_tx_length) = self.min_encoded_tx_length(tx_type) { if size < reasonable_min_encoded_tx_length { - debug!(target: "net::eth-wire", + trace!(target: "net::eth-wire", ty=ty, size=size, hash=%hash, reasonable_min_encoded_tx_length=reasonable_min_encoded_tx_length, strict_min_encoded_tx_length=self.strict_min_encoded_tx_length(tx_type), - network=%Self, + network=%self, "tx size in eth68 announcement, is unreasonably small" ); @@ -201,13 +223,13 @@ impl ValidateTx68 for EthMessageFilter { // this network has no strict max encoded tx length for any tx type if let Some(reasonable_max_encoded_tx_length) = self.max_encoded_tx_length(tx_type) { if size > reasonable_max_encoded_tx_length { - debug!(target: "net::eth-wire", + trace!(target: "net::eth-wire", ty=ty, size=size, hash=%hash, reasonable_max_encoded_tx_length=reasonable_max_encoded_tx_length, strict_max_encoded_tx_length=self.strict_max_encoded_tx_length(tx_type), - network=%Self, + network=%self, "tx size in eth68 announcement, is unreasonably large" ); @@ -256,11 +278,12 @@ impl FilterAnnouncement for EthMessageFilter { { trace!(target: "net::tx::validation", msg=?*msg, - network=%Self, + network=%self, "validating eth68 announcement data.." ); let mut should_report_peer = false; + let mut tx_types_counter = TxTypesCounter::default(); // checks if eth68 announcement metadata is valid // @@ -278,7 +301,7 @@ impl FilterAnnouncement for EthMessageFilter { return false }; - match self.should_fetch(*ty, hash, *size) { + match self.should_fetch(*ty, hash, *size, &mut tx_types_counter) { ValidationOutcome::Fetch => true, ValidationOutcome::Ignore => false, ValidationOutcome::ReportPeer => { @@ -287,6 +310,7 @@ impl FilterAnnouncement for EthMessageFilter { } } }); + self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter); ( if should_report_peer { FilterOutcome::ReportPeer } else { FilterOutcome::Ok }, ValidAnnouncementData::from_partially_valid_data(msg), @@ -299,7 +323,7 @@ impl FilterAnnouncement for EthMessageFilter { ) -> (FilterOutcome, ValidAnnouncementData) { trace!(target: "net::tx::validation", hashes=?*partially_valid_data, - network=%Self, + network=%self, "validating eth66 announcement data.." ); @@ -323,7 +347,7 @@ mod test { let announcement = NewPooledTransactionHashes68 { types, sizes, hashes }; - let filter = EthMessageFilter; + let filter = EthMessageFilter::default(); let (outcome, _partially_valid_data) = filter.partially_filter_valid_entries(announcement); @@ -350,7 +374,7 @@ mod test { hashes: hashes.clone(), }; - let filter = EthMessageFilter; + let filter = EthMessageFilter::default(); let (outcome, partially_valid_data) = filter.partially_filter_valid_entries(announcement); @@ -390,7 +414,7 @@ mod test { hashes: hashes.clone(), }; - let filter = EthMessageFilter; + let filter = EthMessageFilter::default(); let (outcome, partially_valid_data) = filter.partially_filter_valid_entries(announcement); @@ -433,7 +457,7 @@ mod test { hashes: hashes.clone(), }; - let filter = EthMessageFilter; + let filter = EthMessageFilter::default(); let (outcome, partially_valid_data) = filter.partially_filter_valid_entries(announcement); @@ -491,8 +515,8 @@ mod test { } #[test] - fn test_derive_more_display_for_zst() { - let filter = EthMessageFilter; + fn test_display_for_zst() { + let filter = EthMessageFilter::default(); assert_eq!("EthMessageFilter", &filter.to_string()); } } diff --git a/crates/node-api/src/evm/traits.rs b/crates/node-api/src/evm/traits.rs index d0a8f3c418a8..bc0b900719a3 100644 --- a/crates/node-api/src/evm/traits.rs +++ b/crates/node-api/src/evm/traits.rs @@ -5,11 +5,19 @@ use revm_primitives::{BlockEnv, CfgEnvWithHandlerCfg, SpecId, TxEnv}; /// Trait for configuring the EVM for executing full blocks. pub trait ConfigureEvm: ConfigureEvmEnv { /// Returns new EVM with the given database + /// + /// This does not automatically configure the EVM with [ConfigureEvmEnv] methods. It is up to + /// the caller to call an appropriate method to fill the transaction and block environment + /// before executing any transactions using the provided EVM. fn evm<'a, DB: Database + 'a>(&self, db: DB) -> Evm<'a, (), DB> { EvmBuilder::default().with_db(db).build() } - /// Returns a new EVM with the given inspector + /// Returns a new EVM with the given inspector. + /// + /// This does not automatically configure the EVM with [ConfigureEvmEnv] methods. It is up to + /// the caller to call an appropriate method to fill the transaction and block environment + /// before executing any transactions using the provided EVM. fn evm_with_inspector<'a, DB: Database + 'a, I>(&self, db: DB, inspector: I) -> Evm<'a, I, DB> { EvmBuilder::default().with_db(db).with_external_context(inspector).build() } diff --git a/crates/node-core/src/cli/components.rs b/crates/node-core/src/cli/components.rs index 342067bc873d..b4a0f168fd7f 100644 --- a/crates/node-core/src/cli/components.rs +++ b/crates/node-core/src/cli/components.rs @@ -57,7 +57,7 @@ pub trait RethNodeComponents: Clone + Send + Sync + 'static { /// The transaction pool type type Pool: TransactionPool + Clone + Unpin + 'static; /// The network type used to communicate with p2p. - type Network: NetworkInfo + Peers + NetworkProtocols + NetworkEvents + Clone + 'static; + type Network: NetworkInfo + Peers + NetworkProtocols + NetworkEvents + Clone + Unpin + 'static; /// The events type used to create subscriptions. type Events: CanonStateSubscriptions + Clone + 'static; /// The type that is used to spawn tasks. @@ -171,7 +171,7 @@ where Provider: FullProvider + Clone + 'static, Tasks: TaskSpawner + Clone + Unpin + 'static, Pool: TransactionPool + Clone + Unpin + 'static, - Network: NetworkInfo + Peers + NetworkProtocols + NetworkEvents + Clone + 'static, + Network: NetworkInfo + Peers + NetworkProtocols + NetworkEvents + Clone + Unpin + 'static, Events: CanonStateSubscriptions + Clone + 'static, EvmConfig: ConfigureEvmEnv + 'static, { diff --git a/crates/primitives/Cargo.toml b/crates/primitives/Cargo.toml index dd2bb80d0755..1d7468560206 100644 --- a/crates/primitives/Cargo.toml +++ b/crates/primitives/Cargo.toml @@ -122,6 +122,7 @@ harness = false [[bench]] name = "validate_blob_tx" +required-features = ["arbitrary"] harness = false [[bench]] diff --git a/crates/primitives/src/block.rs b/crates/primitives/src/block.rs index 0342b4404e26..5ccba67dda83 100644 --- a/crates/primitives/src/block.rs +++ b/crates/primitives/src/block.rs @@ -3,6 +3,8 @@ use crate::{ TransactionSignedEcRecovered, Withdrawals, B256, }; use alloy_rlp::{RlpDecodable, RlpEncodable}; +#[cfg(any(test, feature = "arbitrary"))] +use proptest::prelude::{any, prop_compose}; use reth_codecs::derive_arbitrary; use serde::{Deserialize, Serialize}; use std::ops::Deref; @@ -14,19 +16,34 @@ pub use reth_rpc_types::{ /// Ethereum full block. /// /// Withdrawals can be optionally included at the end of the RLP encoded message. +#[derive_arbitrary(rlp, 25)] #[derive( Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize, RlpEncodable, RlpDecodable, )] -#[derive_arbitrary(rlp, 25)] #[rlp(trailing)] pub struct Block { /// Block header. + #[cfg_attr(any(test, feature = "arbitrary"), proptest(strategy = "valid_header_strategy()"))] pub header: Header, /// Transactions in this block. + #[cfg_attr( + any(test, feature = "arbitrary"), + proptest( + strategy = "proptest::collection::vec(proptest::arbitrary::any::(), 0..=100)" + ) + )] pub body: Vec, /// Ommers/uncles header. + #[cfg_attr( + any(test, feature = "arbitrary"), + proptest(strategy = "proptest::collection::vec(valid_header_strategy(), 0..=2)") + )] pub ommers: Vec
, /// Block withdrawals. + #[cfg_attr( + any(test, feature = "arbitrary"), + proptest(strategy = "proptest::option::of(proptest::arbitrary::any::())") + )] pub withdrawals: Option, } @@ -206,7 +223,7 @@ impl std::ops::DerefMut for BlockWithSenders { /// Sealed Ethereum full block. /// /// Withdrawals can be optionally included at the end of the RLP encoded message. -#[derive_arbitrary(rlp, 10)] +#[derive_arbitrary(rlp)] #[derive( Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize, RlpEncodable, RlpDecodable, )] @@ -215,13 +232,87 @@ pub struct SealedBlock { /// Locked block header. pub header: SealedHeader, /// Transactions with signatures. + #[cfg_attr( + any(test, feature = "arbitrary"), + proptest( + strategy = "proptest::collection::vec(proptest::arbitrary::any::(), 0..=100)" + ) + )] pub body: Vec, /// Ommer/uncle headers + #[cfg_attr( + any(test, feature = "arbitrary"), + proptest(strategy = "proptest::collection::vec(valid_header_strategy(), 0..=2)") + )] pub ommers: Vec
, /// Block withdrawals. + #[cfg_attr( + any(test, feature = "arbitrary"), + proptest(strategy = "proptest::option::of(proptest::arbitrary::any::())") + )] pub withdrawals: Option, } +/// Generates a header which is valid __with respect to past and future forks__. This means, for +/// example, that if the withdrawals root is present, the base fee per gas is also present. +/// +/// If blob gas used were present, then the excess blob gas and parent beacon block root are also +/// present. In this example, the withdrawals root would also be present. +/// +/// This __does not, and should not guarantee__ that the header is valid with respect to __anything +/// else__. +#[cfg(any(test, feature = "arbitrary"))] +pub fn generate_valid_header( + mut header: Header, + eip_4844_active: bool, + blob_gas_used: u64, + excess_blob_gas: u64, + parent_beacon_block_root: B256, +) -> Header { + // EIP-1559 logic + if header.base_fee_per_gas.is_none() { + // If EIP-1559 is not active, clear related fields + header.withdrawals_root = None; + header.blob_gas_used = None; + header.excess_blob_gas = None; + header.parent_beacon_block_root = None; + } else if header.withdrawals_root.is_none() { + // If EIP-4895 is not active, clear related fields + header.blob_gas_used = None; + header.excess_blob_gas = None; + header.parent_beacon_block_root = None; + } else if eip_4844_active { + // Set fields based on EIP-4844 being active + header.blob_gas_used = Some(blob_gas_used); + header.excess_blob_gas = Some(excess_blob_gas); + header.parent_beacon_block_root = Some(parent_beacon_block_root); + } else { + // If EIP-4844 is not active, clear related fields + header.blob_gas_used = None; + header.excess_blob_gas = None; + header.parent_beacon_block_root = None; + } + + header +} + +#[cfg(any(test, feature = "arbitrary"))] +prop_compose! { + /// Generates a proptest strategy for constructing an instance of a header which is valid __with + /// respect to past and future forks__. + /// + /// See docs for [generate_valid_header] for more information. + pub fn valid_header_strategy()( + header in any::
(), + eip_4844_active in any::(), + blob_gas_used in any::(), + excess_blob_gas in any::(), + parent_beacon_block_root in any::() + ) -> Header { + generate_valid_header(header, eip_4844_active, blob_gas_used, excess_blob_gas, parent_beacon_block_root) + } +} + impl SealedBlock { /// Create a new sealed block instance using the sealed header and block body. #[inline] @@ -466,16 +557,10 @@ pub struct BlockBody { /// Uncle headers for the given block #[cfg_attr( any(test, feature = "arbitrary"), - proptest( - strategy = "proptest::collection::vec(proptest::arbitrary::any::
(), 0..=2)" - ) + proptest(strategy = "proptest::collection::vec(valid_header_strategy(), 0..=2)") )] pub ommers: Vec
, /// Withdrawals in the block. - #[cfg_attr( - any(test, feature = "arbitrary"), - proptest(strategy = "proptest::option::of(proptest::arbitrary::any::())") - )] pub withdrawals: Option, } diff --git a/crates/primitives/src/header.rs b/crates/primitives/src/header.rs index 5777a96b7114..5f7c937bcd49 100644 --- a/crates/primitives/src/header.rs +++ b/crates/primitives/src/header.rs @@ -1,3 +1,5 @@ +#[cfg(any(test, feature = "arbitrary"))] +use crate::block::{generate_valid_header, valid_header_strategy}; use crate::{ basefee::calculate_next_block_base_fee, constants, @@ -9,8 +11,10 @@ use crate::{ keccak256, Address, BaseFeeParams, BlockHash, BlockNumHash, BlockNumber, Bloom, Bytes, ChainSpec, GotExpected, GotExpectedBoxed, Hardfork, B256, B64, U256, }; -use alloy_rlp::{length_of_length, Decodable, Encodable, EMPTY_LIST_CODE, EMPTY_STRING_CODE}; -use bytes::{Buf, BufMut, BytesMut}; +use alloy_rlp::{length_of_length, Decodable, Encodable}; +use bytes::{BufMut, BytesMut}; +#[cfg(any(test, feature = "arbitrary"))] +use proptest::prelude::*; use reth_codecs::{add_arbitrary_tests, derive_arbitrary, main_codec, Compact}; use serde::{Deserialize, Serialize}; use std::{mem, ops::Deref}; @@ -299,34 +303,6 @@ impl Header { self.extra_data.len() // extra data } - /// Checks if `blob_gas_used` is present in the header. - /// - /// Returns `true` if `blob_gas_used` is `Some`, otherwise `false`. - fn has_blob_gas_used(&self) -> bool { - self.blob_gas_used.is_some() - } - - /// Checks if `excess_blob_gas` is present in the header. - /// - /// Returns `true` if `excess_blob_gas` is `Some`, otherwise `false`. - fn has_excess_blob_gas(&self) -> bool { - self.excess_blob_gas.is_some() - } - - // Checks if `withdrawals_root` is present in the header. - /// - /// Returns `true` if `withdrawals_root` is `Some`, otherwise `false`. - fn has_withdrawals_root(&self) -> bool { - self.withdrawals_root.is_some() - } - - /// Checks if `parent_beacon_block_root` is present in the header. - /// - /// Returns `true` if `parent_beacon_block_root` is `Some`, otherwise `false`. - fn has_parent_beacon_block_root(&self) -> bool { - self.parent_beacon_block_root.is_some() - } - fn header_payload_length(&self) -> usize { let mut length = 0; length += self.parent_hash.length(); // Hash of the previous block. @@ -348,49 +324,23 @@ impl Header { if let Some(base_fee) = self.base_fee_per_gas { // Adding base fee length if it exists. length += U256::from(base_fee).length(); - } else if self.has_withdrawals_root() || - self.has_blob_gas_used() || - self.has_excess_blob_gas() || - self.has_parent_beacon_block_root() - { - // Placeholder code for empty lists. - length += 1; } if let Some(root) = self.withdrawals_root { // Adding withdrawals_root length if it exists. length += root.length(); - } else if self.has_blob_gas_used() || - self.has_excess_blob_gas() || - self.has_parent_beacon_block_root() - { - // Placeholder code for a missing string value. - length += 1; } if let Some(blob_gas_used) = self.blob_gas_used { // Adding blob_gas_used length if it exists. length += U256::from(blob_gas_used).length(); - } else if self.has_excess_blob_gas() || self.has_parent_beacon_block_root() { - // Placeholder code for empty lists. - length += 1; } if let Some(excess_blob_gas) = self.excess_blob_gas { // Adding excess_blob_gas length if it exists. length += U256::from(excess_blob_gas).length(); - } else if self.has_parent_beacon_block_root() { - // Placeholder code for empty lists. - length += 1; } - // Encode parent beacon block root length. If new fields are added, the above pattern will - // need to be repeated and placeholder length added. Otherwise, it's impossible to - // tell _which_ fields are missing. This is mainly relevant for contrived cases - // where a header is created at random, for example: - // * A header is created with a withdrawals root, but no base fee. Shanghai blocks are - // post-London, so this is technically not valid. However, a tool like proptest would - // generate a block like this. if let Some(parent_beacon_block_root) = self.parent_beacon_block_root { length += parent_beacon_block_root.length(); } @@ -424,49 +374,28 @@ impl Encodable for Header { self.mix_hash.encode(out); // Encode mix hash. B64::new(self.nonce.to_be_bytes()).encode(out); // Encode nonce. - // The following code is needed only to handle proptest-generated headers that are - // technically invalid. - // - // TODO: make proptest generate more valid headers, ie if there is no base fee, there - // should be no withdrawals root or any future fork field. - // Encode base fee. Put empty list if base fee is missing, // but withdrawals root is present. if let Some(ref base_fee) = self.base_fee_per_gas { U256::from(*base_fee).encode(out); - } else if self.has_withdrawals_root() || - self.has_blob_gas_used() || - self.has_excess_blob_gas() || - self.has_parent_beacon_block_root() - { - out.put_u8(EMPTY_LIST_CODE); } // Encode withdrawals root. Put empty string if withdrawals root is missing, // but blob gas used is present. if let Some(ref root) = self.withdrawals_root { root.encode(out); - } else if self.has_blob_gas_used() || - self.has_excess_blob_gas() || - self.has_parent_beacon_block_root() - { - out.put_u8(EMPTY_STRING_CODE); } // Encode blob gas used. Put empty list if blob gas used is missing, // but excess blob gas is present. if let Some(ref blob_gas_used) = self.blob_gas_used { U256::from(*blob_gas_used).encode(out); - } else if self.has_excess_blob_gas() || self.has_parent_beacon_block_root() { - out.put_u8(EMPTY_LIST_CODE); } // Encode excess blob gas. Put empty list if excess blob gas is missing, // but parent beacon block root is present. if let Some(ref excess_blob_gas) = self.excess_blob_gas { U256::from(*excess_blob_gas).encode(out); - } else if self.has_parent_beacon_block_root() { - out.put_u8(EMPTY_LIST_CODE); } // Encode parent beacon block root. If new fields are added, the above pattern will need to @@ -518,39 +447,22 @@ impl Decodable for Header { excess_blob_gas: None, parent_beacon_block_root: None, }; - if started_len - buf.len() < rlp_head.payload_length { - if buf.first().map(|b| *b == EMPTY_LIST_CODE).unwrap_or_default() { - buf.advance(1) - } else { - this.base_fee_per_gas = Some(u64::decode(buf)?); - } + this.base_fee_per_gas = Some(u64::decode(buf)?); } // Withdrawals root for post-shanghai headers if started_len - buf.len() < rlp_head.payload_length { - if buf.first().map(|b| *b == EMPTY_STRING_CODE).unwrap_or_default() { - buf.advance(1) - } else { - this.withdrawals_root = Some(Decodable::decode(buf)?); - } + this.withdrawals_root = Some(Decodable::decode(buf)?); } // Blob gas used and excess blob gas for post-cancun headers if started_len - buf.len() < rlp_head.payload_length { - if buf.first().map(|b| *b == EMPTY_LIST_CODE).unwrap_or_default() { - buf.advance(1) - } else { - this.blob_gas_used = Some(u64::decode(buf)?); - } + this.blob_gas_used = Some(u64::decode(buf)?); } if started_len - buf.len() < rlp_head.payload_length { - if buf.first().map(|b| *b == EMPTY_LIST_CODE).unwrap_or_default() { - buf.advance(1) - } else { - this.excess_blob_gas = Some(u64::decode(buf)?); - } + this.excess_blob_gas = Some(u64::decode(buf)?); } // Decode parent beacon block root. If new fields are added, the above pattern will need to @@ -932,18 +844,24 @@ impl SealedHeader { impl proptest::arbitrary::Arbitrary for SealedHeader { type Parameters = (); fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { - use proptest::prelude::{any, Strategy}; - - any::<(Header, BlockHash)>().prop_map(move |(header, _)| header.seal_slow()).boxed() + // map valid header strategy by sealing + valid_header_strategy().prop_map(|header| header.seal_slow()).boxed() } - type Strategy = proptest::strategy::BoxedStrategy; } #[cfg(any(test, feature = "arbitrary"))] impl<'a> arbitrary::Arbitrary<'a> for SealedHeader { fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result { - Ok(Header::arbitrary(u)?.seal_slow()) + let sealed_header = generate_valid_header( + u.arbitrary()?, + u.arbitrary()?, + u.arbitrary()?, + u.arbitrary()?, + u.arbitrary()?, + ) + .seal_slow(); + Ok(sealed_header) } } diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index e304b773b4f1..1e94e0e41e66 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -46,6 +46,8 @@ pub mod trie; mod withdrawal; pub use account::{Account, Bytecode}; +#[cfg(any(test, feature = "arbitrary"))] +pub use block::{generate_valid_header, valid_header_strategy}; pub use block::{ Block, BlockBody, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag, BlockWithSenders, ForkBlock, RpcBlockHash, SealedBlock, SealedBlockWithSenders, diff --git a/crates/primitives/src/transaction/mod.rs b/crates/primitives/src/transaction/mod.rs index f54b9d25cef8..7bd67a570c7f 100644 --- a/crates/primitives/src/transaction/mod.rs +++ b/crates/primitives/src/transaction/mod.rs @@ -1236,7 +1236,6 @@ impl TransactionSigned { ) -> alloy_rlp::Result { // keep this around so we can use it to calculate the hash let original_encoding = *data; - let tx_type = *data.first().ok_or(RlpError::InputTooShort)?; data.advance(1); @@ -1460,7 +1459,7 @@ impl<'a> arbitrary::Arbitrary<'a> for TransactionSigned { #[cfg(feature = "optimism")] let signature = if transaction.is_deposit() { - Signature { r: crate::U256::ZERO, s: crate::U256::ZERO, odd_y_parity: false } + Signature::optimism_deposit_tx_signature() } else { signature }; diff --git a/crates/primitives/src/transaction/pooled.rs b/crates/primitives/src/transaction/pooled.rs index dae995765ad9..0840299353f9 100644 --- a/crates/primitives/src/transaction/pooled.rs +++ b/crates/primitives/src/transaction/pooled.rs @@ -10,7 +10,7 @@ use crate::{ EIP4844_TX_TYPE_ID, }; use alloy_rlp::{Decodable, Encodable, Error as RlpError, Header, EMPTY_LIST_CODE}; -use bytes::Buf; +use bytes::{Buf, BytesMut}; use derive_more::{AsRef, Deref}; use reth_codecs::add_arbitrary_tests; use serde::{Deserialize, Serialize}; @@ -317,6 +317,52 @@ impl PooledTransactionsElement { Self::Deposit { transaction, .. } => transaction.payload_len_without_header(), } } + + /// Returns the enveloped encoded transactions. + /// + /// See also [TransactionSigned::encode_enveloped] + pub fn envelope_encoded(&self) -> Bytes { + let mut buf = BytesMut::new(); + self.encode_enveloped(&mut buf); + buf.freeze().into() + } + + /// Encodes the transaction into the "raw" format (e.g. `eth_sendRawTransaction`). + /// This format is also referred to as "binary" encoding. + /// + /// For legacy transactions, it encodes the RLP of the transaction into the buffer: + /// `rlp(tx-data)` + /// For EIP-2718 typed it encodes the type of the transaction followed by the rlp of the + /// transaction: `tx-type || rlp(tx-data)` + pub fn encode_enveloped(&self, out: &mut dyn bytes::BufMut) { + // The encoding of `tx-data` depends on the transaction type. Refer to these docs for more + // information on the exact format: + // - Legacy: TxLegacy::encode_with_signature + // - EIP-2930: TxEip2930::encode_with_signature + // - EIP-1559: TxEip1559::encode_with_signature + // - EIP-4844: BlobTransaction::encode_with_type_inner + match self { + Self::Legacy { transaction, signature, .. } => { + transaction.encode_with_signature(signature, out) + } + Self::Eip2930 { transaction, signature, .. } => { + transaction.encode_with_signature(signature, out, false) + } + Self::Eip1559 { transaction, signature, .. } => { + transaction.encode_with_signature(signature, out, false) + } + Self::BlobTransaction(blob_tx) => { + // The inner encoding is used with `with_header` set to true, making the final + // encoding: + // `tx_type || rlp([transaction_payload_body, blobs, commitments, proofs]))` + blob_tx.encode_with_type_inner(out, false); + } + #[cfg(feature = "optimism")] + Self::Deposit { transaction, .. } => { + transaction.encode(out, false); + } + } + } } impl Encodable for PooledTransactionsElement { @@ -324,7 +370,8 @@ impl Encodable for PooledTransactionsElement { /// /// For legacy transactions, this encodes the transaction as `rlp(tx-data)`. /// - /// For EIP-2718 transactions, this encodes the transaction as `rlp(tx_type || rlp(tx-data)))`. + /// For EIP-2718 transactions, this encodes the transaction as `rlp(tx_type || rlp(tx-data)))`, + /// ___including__ the RLP-header for the entire transaction. fn encode(&self, out: &mut dyn bytes::BufMut) { // The encoding of `tx-data` depends on the transaction type. Refer to these docs for more // information on the exact format: @@ -646,7 +693,7 @@ impl From for PooledTransactionsElementEcRecovered #[cfg(test)] mod tests { use super::*; - use alloy_primitives::hex; + use alloy_primitives::{address, hex}; use assert_matches::assert_matches; #[test] @@ -690,6 +737,19 @@ mod tests { } } + // + #[test] + fn decode_eip1559_enveloped() { + let data = hex!("02f903d382426882ba09832dc6c0848674742682ed9694714b6a4ea9b94a8a7d9fd362ed72630688c8898c80b90364492d24749189822d8512430d3f3ff7a2ede675ac08265c08e2c56ff6fdaa66dae1cdbe4a5d1d7809f3e99272d067364e597542ac0c369d69e22a6399c3e9bee5da4b07e3f3fdc34c32c3d88aa2268785f3e3f8086df0934b10ef92cfffc2e7f3d90f5e83302e31382e302d64657600000000000000000000000000000000000000000000569e75fc77c1a856f6daaf9e69d8a9566ca34aa47f9133711ce065a571af0cfd000000000000000000000000e1e210594771824dad216568b91c9cb4ceed361c00000000000000000000000000000000000000000000000000000000000546e00000000000000000000000000000000000000000000000000000000000e4e1c00000000000000000000000000000000000000000000000000000000065d6750c00000000000000000000000000000000000000000000000000000000000f288000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002cf600000000000000000000000000000000000000000000000000000000000000640000000000000000000000000000000000000000000000000000000000000000f1628e56fa6d8c50e5b984a58c0df14de31c7b857ce7ba499945b99252976a93d06dcda6776fc42167fbe71cb59f978f5ef5b12577a90b132d14d9c6efa528076f0161d7bf03643cfc5490ec5084f4a041db7f06c50bd97efa08907ba79ddcac8b890f24d12d8db31abbaaf18985d54f400449ee0559a4452afe53de5853ce090000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000028000000000000000000000000000000000000000000000000000000000000003e800000000000000000000000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000064ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff00000000000000000000000000000000000000000000000000000000c080a01428023fc54a27544abc421d5d017b9a7c5936ad501cbdecd0d9d12d04c1a033a0753104bbf1c87634d6ff3f0ffa0982710612306003eb022363b57994bdef445a" +); + + let res = PooledTransactionsElement::decode_enveloped(data.into()).unwrap(); + assert_eq!( + res.into_transaction().to(), + Some(address!("714b6a4ea9b94a8a7d9fd362ed72630688c8898c")) + ); + } + #[test] fn legacy_valid_pooled_decoding() { // d3 <- payload length, d3 - c0 = 0x13 = 19 diff --git a/crates/primitives/src/transaction/signature.rs b/crates/primitives/src/transaction/signature.rs index 3cbfa1c746c8..4ab620a2996c 100644 --- a/crates/primitives/src/transaction/signature.rs +++ b/crates/primitives/src/transaction/signature.rs @@ -78,11 +78,6 @@ impl Signature { /// Output the `v` of the signature depends on chain_id #[inline] pub fn v(&self, chain_id: Option) -> u64 { - #[cfg(feature = "optimism")] - if self.r.is_zero() && self.s.is_zero() { - return 0 - } - if let Some(chain_id) = chain_id { // EIP-155: v = {0, 1} + CHAIN_ID * 2 + 35 self.odd_y_parity as u64 + chain_id * 2 + 35 @@ -206,16 +201,6 @@ mod tests { assert_eq!(4, signature.payload_len_with_eip155_chain_id(Some(47))); } - #[cfg(feature = "optimism")] - #[test] - fn test_zero_signature_payload_len_with_eip155_chain_id() { - let zero_signature = Signature { r: U256::ZERO, s: U256::ZERO, odd_y_parity: false }; - - assert_eq!(3, zero_signature.payload_len_with_eip155_chain_id(None)); - assert_eq!(3, zero_signature.payload_len_with_eip155_chain_id(Some(1))); - assert_eq!(3, zero_signature.payload_len_with_eip155_chain_id(Some(47))); - } - #[test] fn test_v() { // Select 1 as an arbitrary nonzero value for R and S, as v() always returns 0 for (0, 0). @@ -228,16 +213,6 @@ mod tests { assert_eq!(38, signature.v(Some(1))); } - #[cfg(feature = "optimism")] - #[test] - fn test_zero_signature_v() { - let signature = Signature { r: U256::ZERO, s: U256::ZERO, odd_y_parity: false }; - - assert_eq!(0, signature.v(None)); - assert_eq!(0, signature.v(Some(1))); - assert_eq!(0, signature.v(Some(47))); - } - #[test] fn test_encode_and_decode_with_eip155_chain_id() { // Select 1 as an arbitrary nonzero value for R and S, as v() always returns 0 for (0, 0). diff --git a/crates/rpc/rpc-api/src/debug.rs b/crates/rpc/rpc-api/src/debug.rs index 46c94ccecd5f..ccee09cc2b44 100644 --- a/crates/rpc/rpc-api/src/debug.rs +++ b/crates/rpc/rpc-api/src/debug.rs @@ -21,8 +21,10 @@ pub trait DebugApi { async fn raw_block(&self, block_id: BlockId) -> RpcResult; /// Returns a EIP-2718 binary-encoded transaction. + /// + /// If this is a pooled EIP-4844 transaction, the blob sidecar is included. #[method(name = "getRawTransaction")] - async fn raw_transaction(&self, hash: B256) -> RpcResult; + async fn raw_transaction(&self, hash: B256) -> RpcResult>; /// Returns an array of EIP-2718 binary-encoded transactions for the given [BlockId]. #[method(name = "getRawTransactions")] diff --git a/crates/rpc/rpc-api/src/eth.rs b/crates/rpc/rpc-api/src/eth.rs index 88923df602f3..db03e13fb11a 100644 --- a/crates/rpc/rpc-api/src/eth.rs +++ b/crates/rpc/rpc-api/src/eth.rs @@ -92,6 +92,12 @@ pub trait EthApi { index: Index, ) -> RpcResult>; + /// Returns the EIP-2718 encoded transaction if it exists. + /// + /// If this is a EIP-4844 transaction that is in the pool it will include the sidecar. + #[method(name = "getRawTransactionByHash")] + async fn raw_transaction_by_hash(&self, hash: B256) -> RpcResult>; + /// Returns the information about a transaction requested by transaction hash. #[method(name = "getTransactionByHash")] async fn transaction_by_hash(&self, hash: B256) -> RpcResult>; diff --git a/crates/rpc/rpc/src/debug.rs b/crates/rpc/rpc/src/debug.rs index ccd9c823b7d4..ada59386d6ac 100644 --- a/crates/rpc/rpc/src/debug.rs +++ b/crates/rpc/rpc/src/debug.rs @@ -5,7 +5,7 @@ use crate::{ inspect, inspect_and_return_db, prepare_call_env, replay_transactions_until, transact, EvmOverrides, }, - EthTransactions, TransactionSource, + EthTransactions, }, result::{internal_rpc_err, ToRpcResult}, BlockingTaskGuard, EthApiSpec, @@ -635,13 +635,12 @@ where } /// Handler for `debug_getRawTransaction` + /// + /// If this is a pooled EIP-4844 transaction, the blob sidecar is included. + /// /// Returns the bytes of the transaction for the given hash. - async fn raw_transaction(&self, hash: B256) -> RpcResult { - let tx = self.inner.eth_api.transaction_by_hash(hash).await?; - Ok(tx - .map(TransactionSource::into_recovered) - .map(|tx| tx.envelope_encoded()) - .unwrap_or_default()) + async fn raw_transaction(&self, hash: B256) -> RpcResult> { + Ok(self.inner.eth_api.raw_transaction_by_hash(hash).await?) } /// Handler for `debug_getRawTransactions` diff --git a/crates/rpc/rpc/src/eth/api/server.rs b/crates/rpc/rpc/src/eth/api/server.rs index 08b14ded5b50..e5c78809e682 100644 --- a/crates/rpc/rpc/src/eth/api/server.rs +++ b/crates/rpc/rpc/src/eth/api/server.rs @@ -153,6 +153,12 @@ where Ok(EthApi::ommer_by_block_and_index(self, number, index).await?) } + /// Handler for: `eth_getRawTransactionByHash` + async fn raw_transaction_by_hash(&self, hash: B256) -> Result> { + trace!(target: "rpc::eth", ?hash, "Serving eth_getRawTransactionByHash"); + Ok(EthTransactions::raw_transaction_by_hash(self, hash).await?) + } + /// Handler for: `eth_getTransactionByHash` async fn transaction_by_hash(&self, hash: B256) -> Result> { trace!(target: "rpc::eth", ?hash, "Serving eth_getTransactionByHash"); diff --git a/crates/rpc/rpc/src/eth/api/transactions.rs b/crates/rpc/rpc/src/eth/api/transactions.rs index afe7856927bf..c96d2ee5d4b6 100644 --- a/crates/rpc/rpc/src/eth/api/transactions.rs +++ b/crates/rpc/rpc/src/eth/api/transactions.rs @@ -146,6 +146,15 @@ pub trait EthTransactions: Send + Sync { block: BlockId, ) -> EthResult>>; + /// Returns the EIP-2718 encoded transaction by hash. + /// + /// If this is a pooled EIP-4844 transaction, the blob sidecar is included. + /// + /// Checks the pool and state. + /// + /// Returns `Ok(None)` if no matching transaction was found. + async fn raw_transaction_by_hash(&self, hash: B256) -> EthResult>; + /// Returns the transaction by hash. /// /// Checks the pool and state. @@ -428,6 +437,20 @@ where self.block_by_id(block).await.map(|block| block.map(|block| block.body)) } + async fn raw_transaction_by_hash(&self, hash: B256) -> EthResult> { + // Note: this is mostly used to fetch pooled transactions so we check the pool first + if let Some(tx) = + self.pool().get_pooled_transaction_element(hash).map(|tx| tx.envelope_encoded()) + { + return Ok(Some(tx)); + } + + self.on_blocking_task(|this| async move { + Ok(this.provider().transaction_by_hash(hash)?.map(|tx| tx.envelope_encoded())) + }) + .await + } + async fn transaction_by_hash(&self, hash: B256) -> EthResult> { // Try to find the transaction on disk let mut resp = self diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index ee93e5e48389..5a3e01a3e55b 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -63,6 +63,27 @@ pub struct DatabaseArguments { log_level: Option, /// Maximum duration of a read transaction. If [None], the default value is used. max_read_transaction_duration: Option, + /// Open environment in exclusive/monopolistic mode. If [None], the default value is used. + /// + /// This can be used as a replacement for `MDB_NOLOCK`, which don't supported by MDBX. In this + /// way, you can get the minimal overhead, but with the correct multi-process and multi-thread + /// locking. + /// + /// If `true` = open environment in exclusive/monopolistic mode or return `MDBX_BUSY` if + /// environment already used by other process. The main feature of the exclusive mode is the + /// ability to open the environment placed on a network share. + /// + /// If `false` = open environment in cooperative mode, i.e. for multi-process + /// access/interaction/cooperation. The main requirements of the cooperative mode are: + /// - Data files MUST be placed in the LOCAL file system, but NOT on a network share. + /// - Environment MUST be opened only by LOCAL processes, but NOT over a network. + /// - OS kernel (i.e. file system and memory mapping implementation) and all processes that + /// open the given environment MUST be running in the physically single RAM with + /// cache-coherency. The only exception for cache-consistency requirement is Linux on MIPS + /// architecture, but this case has not been tested for a long time). + /// + /// This flag affects only at environment opening but can't be changed after. + exclusive: Option, } impl DatabaseArguments { @@ -80,6 +101,12 @@ impl DatabaseArguments { self.max_read_transaction_duration = max_read_transaction_duration; self } + + /// Set the mdbx exclusive flag. + pub fn exclusive(mut self, exclusive: Option) -> Self { + self.exclusive = exclusive; + self + } } /// Wrapper for the libmdbx environment: [Environment] @@ -250,6 +277,7 @@ impl DatabaseEnv { // worsens it for random access (which is our access pattern outside of sync) no_rdahead: true, coalesce: true, + exclusive: args.exclusive.unwrap_or_default(), ..Default::default() }); // Configure more readers diff --git a/crates/storage/libmdbx-rs/src/error.rs b/crates/storage/libmdbx-rs/src/error.rs index aff584619509..22e440426247 100644 --- a/crates/storage/libmdbx-rs/src/error.rs +++ b/crates/storage/libmdbx-rs/src/error.rs @@ -224,8 +224,8 @@ pub(crate) fn mdbx_result_with_tx_kind( txn_manager: &TxnManager, ) -> Result { if K::IS_READ_ONLY && - err_code == ffi::MDBX_EBADSIGN && - txn_manager.remove_aborted_read_transaction(txn).is_some() + txn_manager.remove_aborted_read_transaction(txn).is_some() && + err_code == ffi::MDBX_EBADSIGN { return Err(Error::ReadTransactionAborted) } diff --git a/crates/storage/libmdbx-rs/src/txn_manager.rs b/crates/storage/libmdbx-rs/src/txn_manager.rs index cc00b7111f11..bf46680b81e5 100644 --- a/crates/storage/libmdbx-rs/src/txn_manager.rs +++ b/crates/storage/libmdbx-rs/src/txn_manager.rs @@ -63,31 +63,29 @@ impl TxnManager { Ok(msg) => match msg { TxnManagerMessage::Begin { parent, flags, sender } => { let mut txn: *mut ffi::MDBX_txn = ptr::null_mut(); - sender - .send( - mdbx_result(unsafe { - ffi::mdbx_txn_begin_ex( - env.0, - parent.0, - flags, - &mut txn, - ptr::null_mut(), - ) - }) - .map(|_| TxnPtr(txn)), + let res = mdbx_result(unsafe { + ffi::mdbx_txn_begin_ex( + env.0, + parent.0, + flags, + &mut txn, + ptr::null_mut(), ) - .unwrap(); + }) + .map(|_| TxnPtr(txn)); #[cfg(feature = "read-tx-timeouts")] { use crate::transaction::TransactionKind; - if flags == crate::transaction::RO::OPEN_FLAGS { + if res.is_ok() && flags == crate::transaction::RO::OPEN_FLAGS { if let Some(read_transactions) = &read_transactions { read_transactions.add_active(txn); } } } + + sender.send(res).unwrap(); } TxnManagerMessage::Abort { tx, sender } => { #[cfg(feature = "read-tx-timeouts")] @@ -420,5 +418,39 @@ mod read_transactions { assert!(read_transactions.active.contains_key(&(txn_ptr.0 as usize))); } + + #[test] + fn txn_manager_reassign_transaction_removes_from_aborted_transactions() { + const MAX_DURATION: Duration = Duration::from_secs(1); + + let dir = tempdir().unwrap(); + let env = Environment::builder() + .set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION)) + .open(dir.path()) + .unwrap(); + + let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap(); + + // Create a read-only transaction, wait until `MAX_DURATION` time is elapsed so the + // manager kills it, use it and observe the `Error::ReadTransactionAborted` error. + { + let tx = env.begin_ro_txn().unwrap(); + let tx_ptr = tx.txn() as usize; + assert!(read_transactions.active.contains_key(&tx_ptr)); + + sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL); + + assert!(!read_transactions.active.contains_key(&tx_ptr)); + assert!(read_transactions.aborted.contains(&tx_ptr)); + } + + // Create a read-only transaction, ensure this removes it from aborted set if mdbx + // reassigns same recently aborted transaction pointer. + { + let tx = env.begin_ro_txn().unwrap(); + let tx_ptr = tx.txn() as usize; + assert!(!read_transactions.aborted.contains(&tx_ptr)); + } + } } } diff --git a/crates/storage/provider/src/chain.rs b/crates/storage/provider/src/chain.rs index b5289fc6dd48..d7cb4cbac2b7 100644 --- a/crates/storage/provider/src/chain.rs +++ b/crates/storage/provider/src/chain.rs @@ -110,7 +110,7 @@ impl Chain { return Some(self.state.clone()) } - if self.blocks.get(&block_number).is_some() { + if self.blocks.contains_key(&block_number) { let mut state = self.state.clone(); state.revert_to(block_number); return Some(state) diff --git a/crates/storage/provider/src/test_utils/blocks.rs b/crates/storage/provider/src/test_utils/blocks.rs index 0d0aef625e2d..05c98c787660 100644 --- a/crates/storage/provider/src/test_utils/blocks.rs +++ b/crates/storage/provider/src/test_utils/blocks.rs @@ -230,6 +230,7 @@ fn block2( ); let mut block = SealedBlock::decode(&mut BLOCK_RLP.as_slice()).unwrap(); + block.withdrawals = Some(Withdrawals::new(vec![Withdrawal::default()])); let mut header = block.header.clone().unseal(); header.number = number; diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 90e0c6a3c1e0..017e0e594a34 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -103,7 +103,6 @@ use crate::{ traits::{GetPooledTransactionLimit, NewBlobSidecar, TransactionListenerKind}, validate::ValidTransaction, }; -use alloy_rlp::Encodable; pub use best::BestTransactionFilter; pub use blob::{blob_tx_priority, fee_delta}; pub use events::{FullTransactionEvent, TransactionEvent}; @@ -319,6 +318,7 @@ where let mut elements = Vec::with_capacity(transactions.len()); let mut size = 0; for transaction in transactions { + let encoded_len = transaction.encoded_length(); let tx = transaction.to_recovered_transaction().into_signed(); let pooled = if tx.is_eip4844() { if let Some(blob) = self.get_blob_transaction(tx) { @@ -330,7 +330,7 @@ where PooledTransactionsElement::from(tx) }; - size += pooled.length(); + size += encoded_len; elements.push(pooled); if limit.exceeds(size) { @@ -685,7 +685,7 @@ where } /// Removes and returns all transactions that are present in the pool. - pub(crate) fn retain_unknown(&self, announcement: &mut A) + pub(crate) fn retain_unknown(&self, announcement: &mut A) where A: HandleMempoolData, { diff --git a/crates/trie/src/prefix_set/mod.rs b/crates/trie/src/prefix_set/mod.rs index 948b75b2e080..a6bdfa9d8b0c 100644 --- a/crates/trie/src/prefix_set/mod.rs +++ b/crates/trie/src/prefix_set/mod.rs @@ -1,7 +1,7 @@ use reth_primitives::{trie::Nibbles, B256}; use std::{ collections::{HashMap, HashSet}, - rc::Rc, + sync::Arc, }; mod loader; @@ -121,7 +121,7 @@ impl PrefixSetMut { self.keys.dedup(); } - PrefixSet { keys: Rc::new(self.keys), index: self.index } + PrefixSet { keys: Arc::new(self.keys), index: self.index } } } @@ -130,7 +130,7 @@ impl PrefixSetMut { /// See also [PrefixSetMut::freeze]. #[derive(Debug, Default, Clone)] pub struct PrefixSet { - keys: Rc>, + keys: Arc>, index: usize, } diff --git a/crates/trie/src/trie_cursor/database_cursors.rs b/crates/trie/src/trie_cursor/database_cursors.rs index 71b48c7b844f..c30cdb428901 100644 --- a/crates/trie/src/trie_cursor/database_cursors.rs +++ b/crates/trie/src/trie_cursor/database_cursors.rs @@ -41,7 +41,7 @@ impl DatabaseAccountTrieCursor { impl TrieCursor for DatabaseAccountTrieCursor where - C: DbCursorRO, + C: DbCursorRO + Send + Sync, { /// Seeks an exact match for the provided key in the account trie. fn seek_exact( @@ -83,7 +83,7 @@ impl DatabaseStorageTrieCursor { impl TrieCursor for DatabaseStorageTrieCursor where - C: DbDupCursorRO + DbCursorRO, + C: DbDupCursorRO + DbCursorRO + Send + Sync, { /// Seeks an exact match for the given key in the storage trie. fn seek_exact( diff --git a/crates/trie/src/trie_cursor/mod.rs b/crates/trie/src/trie_cursor/mod.rs index 66ce57e546cf..3914924a719f 100644 --- a/crates/trie/src/trie_cursor/mod.rs +++ b/crates/trie/src/trie_cursor/mod.rs @@ -30,7 +30,7 @@ pub trait TrieCursorFactory { /// A cursor for navigating a trie that works with both Tables and DupSort tables. #[auto_impl::auto_impl(&mut, Box)] -pub trait TrieCursor { +pub trait TrieCursor: Send + Sync { /// Move the cursor to the key and return if it is an exact match. fn seek_exact( &mut self, diff --git a/etc/assertoor/assertoor-template.yaml b/etc/assertoor/assertoor-template.yaml new file mode 100644 index 000000000000..bf3e903cc1d0 --- /dev/null +++ b/etc/assertoor/assertoor-template.yaml @@ -0,0 +1,45 @@ +participants: +- el_client_type: reth + el_client_image: ghcr.io/paradigmxyz/reth + cl_client_type: lighthouse + cl_client_image: sigp/lighthouse:latest + count: 1 +- el_client_type: reth + el_client_image: ghcr.io/paradigmxyz/reth + cl_client_type: teku + cl_client_image: consensys/teku:latest + count: 1 +- el_client_type: reth + el_client_image: ghcr.io/paradigmxyz/reth + cl_client_type: prysm + cl_client_image: gcr.io/prysmaticlabs/prysm/beacon-chain:stable + validator_client_type: prysm + validator_client_image: gcr.io/prysmaticlabs/prysm/validator:stable + count: 1 +- el_client_type: reth + el_client_image: ghcr.io/paradigmxyz/reth + cl_client_type: nimbus + cl_client_image: statusim/nimbus-eth2:amd64-latest + count: 1 +- el_client_type: reth + el_client_image: ghcr.io/paradigmxyz/reth + cl_client_type: lodestar + cl_client_image: chainsafe/lodestar:latest + count: 1 +network_params: + genesis_delay: 120 + min_validator_withdrawability_delay: 1 + shard_committee_period: 1 + num_validator_keys_per_node: 250 +launch_additional_services: true +additional_services: +- assertoor +snooper_enabled: true +disable_peer_scoring: true +assertoor_params: + image: "ethpandaops/assertoor:master" + run_stability_check: true + run_block_proposal_check: true + run_transaction_test: true + run_blob_transaction_test: false + run_opcodes_transaction_test: true diff --git a/etc/grafana/dashboards/overview.json b/etc/grafana/dashboards/overview.json index 19e9b7e26e8e..85ebb52c4be1 100644 --- a/etc/grafana/dashboards/overview.json +++ b/etc/grafana/dashboards/overview.json @@ -1,5 +1,4 @@ { - "__inputs": [ { "name": "DS_PROMETHEUS", @@ -8,10 +7,22 @@ "type": "datasource", "pluginId": "prometheus", "pluginName": "Prometheus" + }, + { + "name": "DS_EXPRESSION", + "label": "Expression", + "description": "", + "type": "datasource", + "pluginId": "__expr__" } ], "__elements": {}, "__requires": [ + { + "type": "datasource", + "id": "__expr__", + "version": "1.0.0" + }, { "type": "panel", "id": "bargauge", @@ -48,6 +59,12 @@ "name": "Prometheus", "version": "1.0.0" }, + { + "type": "panel", + "id": "stat", + "name": "Stat", + "version": "" + }, { "type": "panel", "id": "table", @@ -86,7 +103,7 @@ "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 0, - "id": 1, + "id": null, "links": [], "liveNow": false, "panels": [ @@ -4442,7 +4459,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -4508,9 +4526,8 @@ }, { "datasource": { - "name": "Expression", "type": "__expr__", - "uid": "__expr__" + "uid": "${DS_EXPRESSION}" }, "expression": "$A + $B", "hide": false, @@ -4568,7 +4585,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -4960,13 +4978,326 @@ "title": "Blob store", "type": "timeseries" }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "description": "Accumulated durations of nested function calls, in one call to poll `TransactionsManager` future (`TransactionsManager` future has a loop, hence accumulated).\n\nNetwork Events - stream peer session updates from `NetworkManager`\nTransaction Events - stream txns gossip fom `NetworkManager`\nImported Transactions - stream hashes of successfully inserted txns from `TransactionPool`\nPending pool imports - flush txns to pool from `TransactionsManager`\nFetch Events - stream fetch txn events (success case wraps a tx) from `TransactionFetcher`\nCommands - stream commands from (?) to fetch/serve/propagate txns\n ", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 170 + }, + "id": 200, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "reth_network_acc_duration_poll_network_events{instance=\"$instance\"}", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "Network Events", + "range": true, + "refId": "B", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "reth_network_acc_duration_poll_transaction_events{instance=\"$instance\"}", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "Transaction Events", + "range": true, + "refId": "C", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "disableTextWrap": false, + "editorMode": "builder", + "exemplar": false, + "expr": "reth_network_acc_duration_poll_imported_transactions{instance=\"$instance\"}", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "Imported Transactions", + "range": true, + "refId": "D", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "reth_network_acc_duration_poll_pending_pool_imports{instance=\"$instance\"}", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "Pending Pool Imports", + "range": true, + "refId": "E", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "reth_network_acc_duration_poll_fetch_events{instance=\"$instance\"}", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "Fetch Events", + "range": true, + "refId": "F", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "reth_network_acc_duration_poll_commands{instance=\"$instance\"}", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "Commands", + "range": true, + "refId": "G", + "useBackend": false + } + ], + "title": "Transactions Manager Poll Duration Nested Function Calls", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "description": "Duration spent insdie one call to poll the `TransactionsManager` future", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [ + { + "__systemRef": "hideSeriesFrom", + "matcher": { + "id": "byNames", + "options": { + "mode": "exclude", + "names": [ + "Transactions Manager Future" + ], + "prefix": "All except:", + "readOnly": true + } + }, + "properties": [ + { + "id": "custom.hideFrom", + "value": { + "legend": false, + "tooltip": false, + "viz": true + } + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 170 + }, + "id": 201, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "reth_network_duration_poll_tx_manager{instance=\"$instance\"}", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "Transactions Manager Future", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Transactions Manager Future Poll Duration", + "type": "timeseries" + }, { "collapsed": false, "gridPos": { "h": 1, "w": 24, "x": 0, - "y": 170 + "y": 178 }, "id": 79, "panels": [], @@ -5038,7 +5369,7 @@ "h": 8, "w": 12, "x": 0, - "y": 171 + "y": 179 }, "id": 74, "options": { @@ -5133,7 +5464,7 @@ "h": 8, "w": 12, "x": 12, - "y": 171 + "y": 179 }, "id": 80, "options": { @@ -5228,7 +5559,7 @@ "h": 8, "w": 12, "x": 0, - "y": 179 + "y": 187 }, "id": 81, "options": { @@ -5323,7 +5654,7 @@ "h": 8, "w": 12, "x": 12, - "y": 179 + "y": 187 }, "id": 114, "options": { @@ -5419,7 +5750,7 @@ "h": 8, "w": 12, "x": 0, - "y": 187 + "y": 195 }, "id": 158, "options": { @@ -5541,7 +5872,7 @@ "h": 8, "w": 12, "x": 12, - "y": 187 + "y": 195 }, "id": 190, "options": { @@ -5579,7 +5910,7 @@ "h": 1, "w": 24, "x": 0, - "y": 195 + "y": 203 }, "id": 87, "panels": [], @@ -5651,7 +5982,7 @@ "h": 8, "w": 12, "x": 0, - "y": 196 + "y": 204 }, "id": 83, "options": { @@ -5745,7 +6076,7 @@ "h": 8, "w": 12, "x": 12, - "y": 196 + "y": 204 }, "id": 84, "options": { @@ -5851,7 +6182,7 @@ "h": 8, "w": 12, "x": 0, - "y": 204 + "y": 212 }, "id": 85, "options": { @@ -5888,7 +6219,7 @@ "h": 1, "w": 24, "x": 0, - "y": 212 + "y": 220 }, "id": 68, "panels": [], @@ -5960,7 +6291,7 @@ "h": 8, "w": 12, "x": 0, - "y": 213 + "y": 221 }, "id": 60, "options": { @@ -6054,7 +6385,7 @@ "h": 8, "w": 12, "x": 12, - "y": 213 + "y": 221 }, "id": 62, "options": { @@ -6148,7 +6479,7 @@ "h": 8, "w": 12, "x": 0, - "y": 221 + "y": 229 }, "id": 64, "options": { @@ -6185,7 +6516,7 @@ "h": 1, "w": 24, "x": 0, - "y": 229 + "y": 237 }, "id": 97, "panels": [], @@ -6255,7 +6586,7 @@ "h": 8, "w": 12, "x": 0, - "y": 230 + "y": 238 }, "id": 98, "options": { @@ -6416,7 +6747,7 @@ "h": 8, "w": 12, "x": 12, - "y": 230 + "y": 238 }, "id": 101, "options": { @@ -6512,7 +6843,7 @@ "h": 8, "w": 12, "x": 0, - "y": 238 + "y": 246 }, "id": 99, "options": { @@ -6608,7 +6939,7 @@ "h": 8, "w": 12, "x": 12, - "y": 238 + "y": 246 }, "id": 100, "options": { @@ -6646,7 +6977,7 @@ "h": 1, "w": 24, "x": 0, - "y": 246 + "y": 254 }, "id": 105, "panels": [], @@ -6717,7 +7048,7 @@ "h": 8, "w": 12, "x": 0, - "y": 247 + "y": 255 }, "id": 106, "options": { @@ -6813,7 +7144,7 @@ "h": 8, "w": 12, "x": 12, - "y": 247 + "y": 255 }, "id": 107, "options": { @@ -6851,7 +7182,7 @@ "h": 1, "w": 24, "x": 0, - "y": 255 + "y": 263 }, "id": 108, "panels": [], @@ -6947,7 +7278,7 @@ "h": 8, "w": 12, "x": 0, - "y": 256 + "y": 264 }, "id": 109, "options": { @@ -7009,7 +7340,7 @@ "h": 8, "w": 12, "x": 12, - "y": 256 + "y": 264 }, "id": 111, "maxDataPoints": 25, @@ -7136,7 +7467,7 @@ "h": 8, "w": 12, "x": 0, - "y": 264 + "y": 272 }, "id": 120, "options": { @@ -7194,7 +7525,7 @@ "h": 8, "w": 12, "x": 12, - "y": 264 + "y": 272 }, "id": 112, "maxDataPoints": 25, @@ -7345,7 +7676,7 @@ "h": 8, "w": 12, "x": 0, - "y": 272 + "y": 280 }, "id": 198, "options": { @@ -7552,6 +7883,6 @@ "timezone": "", "title": "reth", "uid": "2k8BXz24x", - "version": 15, + "version": 4, "weekStart": "" } \ No newline at end of file