From 2b812b1eea4fa1f895aa39ad6ab9b32350d3c04e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arma=C4=9Fan=20Y=C4=B1ld=C4=B1rak?= <41453366+armaganyildirak@users.noreply.github.com> Date: Thu, 3 Aug 2023 00:42:03 +0000 Subject: [PATCH] Shift networking configuration (#4426) ## Issue Addressed Addresses [#4401](https://github.com/sigp/lighthouse/issues/4401) ## Proposed Changes Shift some constants into ```ChainSpec``` and remove the constant values from code space. ## Additional Info I mostly used ```MainnetEthSpec::default_spec()``` for getting ```ChainSpec```. I wonder Did I make a mistake about that. Co-authored-by: armaganyildirak Co-authored-by: Paul Hauner Co-authored-by: Age Manning Co-authored-by: Diva M --- Cargo.lock | 1 + .../src/attestation_verification.rs | 20 +- beacon_node/beacon_chain/src/beacon_chain.rs | 5 - .../beacon_chain/src/block_verification.rs | 4 +- beacon_node/beacon_chain/src/lib.rs | 2 +- ...ght_client_finality_update_verification.rs | 7 +- ...t_client_optimistic_update_verification.rs | 7 +- .../src/sync_committee_verification.rs | 17 +- beacon_node/beacon_processor/src/lib.rs | 8 +- .../src/work_reprocessing_queue.rs | 8 +- beacon_node/client/src/builder.rs | 3 +- beacon_node/http_api/src/attester_duties.rs | 6 +- beacon_node/http_api/src/proposer_duties.rs | 4 +- beacon_node/http_api/src/sync_committees.rs | 4 +- beacon_node/http_api/tests/tests.rs | 12 +- beacon_node/lighthouse_network/src/config.rs | 36 ++-- .../lighthouse_network/src/rpc/codec/base.rs | 15 +- .../src/rpc/codec/ssz_snappy.rs | 122 +++++++++--- .../lighthouse_network/src/rpc/handler.rs | 36 ++-- beacon_node/lighthouse_network/src/rpc/mod.rs | 31 ++- .../lighthouse_network/src/rpc/protocol.rs | 20 +- .../lighthouse_network/src/service/mod.rs | 18 +- .../lighthouse_network/tests/common.rs | 11 +- .../lighthouse_network/tests/rpc_tests.rs | 52 +++-- .../gossip_methods.rs | 3 + .../src/network_beacon_processor/mod.rs | 11 -- .../src/network_beacon_processor/tests.rs | 15 +- beacon_node/src/config.rs | 6 +- .../gnosis/config.yaml | 12 +- .../mainnet/config.yaml | 12 +- .../prater/config.yaml | 12 +- .../sepolia/config.yaml | 12 +- consensus/types/Cargo.toml | 1 + consensus/types/src/chain_spec.rs | 185 +++++++++++++++--- consensus/types/src/subnet_id.rs | 4 +- .../environment/tests/testnet_dir/config.yaml | 14 ++ 36 files changed, 523 insertions(+), 213 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b2e8188eec4..13f2b7cd43c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8391,6 +8391,7 @@ dependencies = [ "merkle_proof", "metastruct", "parking_lot 0.12.1", + "paste", "rand 0.8.5", "rand_xorshift", "rayon", diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index 6df0758b2e6..5535fec37c4 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -35,10 +35,8 @@ mod batch; use crate::{ - beacon_chain::{MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT}, - metrics, - observed_aggregates::ObserveOutcome, - observed_attesters::Error as ObservedAttestersError, + beacon_chain::VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, metrics, + observed_aggregates::ObserveOutcome, observed_attesters::Error as ObservedAttestersError, BeaconChain, BeaconChainError, BeaconChainTypes, }; use bls::verify_signature_sets; @@ -57,8 +55,8 @@ use std::borrow::Cow; use strum::AsRefStr; use tree_hash::TreeHash; use types::{ - Attestation, BeaconCommittee, CommitteeIndex, Epoch, EthSpec, Hash256, IndexedAttestation, - SelectionProof, SignedAggregateAndProof, Slot, SubnetId, + Attestation, BeaconCommittee, ChainSpec, CommitteeIndex, Epoch, EthSpec, Hash256, + IndexedAttestation, SelectionProof, SignedAggregateAndProof, Slot, SubnetId, }; pub use batch::{batch_verify_aggregated_attestations, batch_verify_unaggregated_attestations}; @@ -454,7 +452,7 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> { // MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance). // // We do not queue future attestations for later processing. - verify_propagation_slot_range(&chain.slot_clock, attestation)?; + verify_propagation_slot_range(&chain.slot_clock, attestation, &chain.spec)?; // Check the attestation's epoch matches its target. if attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()) @@ -722,7 +720,7 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { // MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance). // // We do not queue future attestations for later processing. - verify_propagation_slot_range(&chain.slot_clock, attestation)?; + verify_propagation_slot_range(&chain.slot_clock, attestation, &chain.spec)?; // Check to ensure that the attestation is "unaggregated". I.e., it has exactly one // aggregation bit set. @@ -1037,11 +1035,11 @@ fn verify_head_block_is_known( pub fn verify_propagation_slot_range( slot_clock: &S, attestation: &Attestation, + spec: &ChainSpec, ) -> Result<(), Error> { let attestation_slot = attestation.data.slot; - let latest_permissible_slot = slot_clock - .now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) + .now_with_future_tolerance(spec.maximum_gossip_clock_disparity()) .ok_or(BeaconChainError::UnableToReadSlot)?; if attestation_slot > latest_permissible_slot { return Err(Error::FutureSlot { @@ -1052,7 +1050,7 @@ pub fn verify_propagation_slot_range( // Taking advantage of saturating subtraction on `Slot`. let earliest_permissible_slot = slot_clock - .now_with_past_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) + .now_with_past_tolerance(spec.maximum_gossip_clock_disparity()) .ok_or(BeaconChainError::UnableToReadSlot)? - E::slots_per_epoch(); if attestation_slot < earliest_permissible_slot { diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 78f2c3f03b4..25964ed2165 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -217,11 +217,6 @@ pub enum OverrideForkchoiceUpdate { AlreadyApplied, } -/// The accepted clock drift for nodes gossiping blocks and attestations. See: -/// -/// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/p2p-interface.md#configuration -pub const MAXIMUM_GOSSIP_CLOCK_DISPARITY: Duration = Duration::from_millis(500); - #[derive(Debug, PartialEq)] pub enum AttestationProcessingOutcome { Processed, diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 492f492521e..0a82eae3711 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -59,7 +59,7 @@ use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{ beacon_chain::{ BeaconForkChoice, ForkChoiceError, BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, - MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, + VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, }, metrics, BeaconChain, BeaconChainError, BeaconChainTypes, }; @@ -730,7 +730,7 @@ impl GossipVerifiedBlock { // Do not gossip or process blocks from future slots. let present_slot_with_tolerance = chain .slot_clock - .now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) + .now_with_future_tolerance(chain.spec.maximum_gossip_clock_disparity()) .ok_or(BeaconChainError::UnableToReadSlot)?; if block.slot() > present_slot_with_tolerance { return Err(BlockError::FutureSlot { diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 85ff0f20a01..4ea1eeee011 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -54,7 +54,7 @@ pub use self::beacon_chain::{ AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult, ForkChoiceError, OverrideForkchoiceUpdate, ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, - INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, MAXIMUM_GOSSIP_CLOCK_DISPARITY, + INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, }; pub use self::beacon_snapshot::BeaconSnapshot; pub use self::chain_config::ChainConfig; diff --git a/beacon_node/beacon_chain/src/light_client_finality_update_verification.rs b/beacon_node/beacon_chain/src/light_client_finality_update_verification.rs index 7c431ebccca..638d2b4012e 100644 --- a/beacon_node/beacon_chain/src/light_client_finality_update_verification.rs +++ b/beacon_node/beacon_chain/src/light_client_finality_update_verification.rs @@ -1,6 +1,4 @@ -use crate::{ - beacon_chain::MAXIMUM_GOSSIP_CLOCK_DISPARITY, BeaconChain, BeaconChainError, BeaconChainTypes, -}; +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; use derivative::Derivative; use slot_clock::SlotClock; use std::time::Duration; @@ -103,7 +101,8 @@ impl VerifiedLightClientFinalityUpdate { // verify that enough time has passed for the block to have been propagated match start_time { Some(time) => { - if seen_timestamp + MAXIMUM_GOSSIP_CLOCK_DISPARITY < time + one_third_slot_duration + if seen_timestamp + chain.spec.maximum_gossip_clock_disparity() + < time + one_third_slot_duration { return Err(Error::TooEarly); } diff --git a/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs b/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs index 20d7181808a..2d1a5cf97cf 100644 --- a/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs +++ b/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs @@ -1,6 +1,4 @@ -use crate::{ - beacon_chain::MAXIMUM_GOSSIP_CLOCK_DISPARITY, BeaconChain, BeaconChainError, BeaconChainTypes, -}; +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; use derivative::Derivative; use eth2::types::Hash256; use slot_clock::SlotClock; @@ -103,7 +101,8 @@ impl VerifiedLightClientOptimisticUpdate { // verify that enough time has passed for the block to have been propagated match start_time { Some(time) => { - if seen_timestamp + MAXIMUM_GOSSIP_CLOCK_DISPARITY < time + one_third_slot_duration + if seen_timestamp + chain.spec.maximum_gossip_clock_disparity() + < time + one_third_slot_duration { return Err(Error::TooEarly); } diff --git a/beacon_node/beacon_chain/src/sync_committee_verification.rs b/beacon_node/beacon_chain/src/sync_committee_verification.rs index 246bb12cc0e..5c6710bfd6c 100644 --- a/beacon_node/beacon_chain/src/sync_committee_verification.rs +++ b/beacon_node/beacon_chain/src/sync_committee_verification.rs @@ -28,10 +28,8 @@ use crate::observed_attesters::SlotSubcommitteeIndex; use crate::{ - beacon_chain::{MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT}, - metrics, - observed_aggregates::ObserveOutcome, - BeaconChain, BeaconChainError, BeaconChainTypes, + beacon_chain::VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, metrics, + observed_aggregates::ObserveOutcome, BeaconChain, BeaconChainError, BeaconChainTypes, }; use bls::{verify_signature_sets, PublicKeyBytes}; use derivative::Derivative; @@ -52,6 +50,7 @@ use tree_hash_derive::TreeHash; use types::consts::altair::SYNC_COMMITTEE_SUBNET_COUNT; use types::slot_data::SlotData; use types::sync_committee::Error as SyncCommitteeError; +use types::ChainSpec; use types::{ sync_committee_contribution::Error as ContributionError, AggregateSignature, BeaconStateError, EthSpec, Hash256, SignedContributionAndProof, Slot, SyncCommitteeContribution, @@ -297,7 +296,7 @@ impl VerifiedSyncContribution { let subcommittee_index = contribution.subcommittee_index as usize; // Ensure sync committee contribution is within the MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance. - verify_propagation_slot_range(&chain.slot_clock, contribution)?; + verify_propagation_slot_range(&chain.slot_clock, contribution, &chain.spec)?; // Validate subcommittee index. if contribution.subcommittee_index >= SYNC_COMMITTEE_SUBNET_COUNT { @@ -460,7 +459,7 @@ impl VerifiedSyncCommitteeMessage { // MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance). // // We do not queue future sync committee messages for later processing. - verify_propagation_slot_range(&chain.slot_clock, &sync_message)?; + verify_propagation_slot_range(&chain.slot_clock, &sync_message, &chain.spec)?; // Ensure the `subnet_id` is valid for the given validator. let pubkey = chain @@ -576,11 +575,11 @@ impl VerifiedSyncCommitteeMessage { pub fn verify_propagation_slot_range( slot_clock: &S, sync_contribution: &U, + spec: &ChainSpec, ) -> Result<(), Error> { let message_slot = sync_contribution.get_slot(); - let latest_permissible_slot = slot_clock - .now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) + .now_with_future_tolerance(spec.maximum_gossip_clock_disparity()) .ok_or(BeaconChainError::UnableToReadSlot)?; if message_slot > latest_permissible_slot { return Err(Error::FutureSlot { @@ -590,7 +589,7 @@ pub fn verify_propagation_slot_range( } let earliest_permissible_slot = slot_clock - .now_with_past_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) + .now_with_past_tolerance(spec.maximum_gossip_clock_disparity()) .ok_or(BeaconChainError::UnableToReadSlot)?; if message_slot < earliest_permissible_slot { diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 88066f2a305..297c4868db7 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -661,7 +661,8 @@ impl BeaconProcessor { work_reprocessing_rx: mpsc::Receiver, work_journal_tx: Option>, slot_clock: S, - ) { + maximum_gossip_clock_disparity: Duration, + ) -> Result<(), String> { // Used by workers to communicate that they are finished a task. let (idle_tx, idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN); @@ -717,13 +718,15 @@ impl BeaconProcessor { // receive them back once they are ready (`ready_work_rx`). let (ready_work_tx, ready_work_rx) = mpsc::channel::(MAX_SCHEDULED_WORK_QUEUE_LEN); + spawn_reprocess_scheduler( ready_work_tx, work_reprocessing_rx, &self.executor, slot_clock, self.log.clone(), - ); + maximum_gossip_clock_disparity, + )?; let executor = self.executor.clone(); @@ -1203,6 +1206,7 @@ impl BeaconProcessor { // Spawn on the core executor. executor.spawn(manager_future, MANAGER_TASK_NAME); + Ok(()) } /// Spawns a blocking worker thread to process some `Work`. diff --git a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs index 608f634d537..9191509d39f 100644 --- a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs @@ -361,7 +361,12 @@ pub fn spawn_reprocess_scheduler( executor: &TaskExecutor, slot_clock: S, log: Logger, -) { + maximum_gossip_clock_disparity: Duration, +) -> Result<(), String> { + // Sanity check + if ADDITIONAL_QUEUED_BLOCK_DELAY >= maximum_gossip_clock_disparity { + return Err("The block delay and gossip disparity don't match.".to_string()); + } let mut queue = ReprocessQueue { work_reprocessing_rx, ready_work_tx, @@ -400,6 +405,7 @@ pub fn spawn_reprocess_scheduler( }, TASK_NAME, ); + Ok(()) } impl ReprocessQueue { diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 14edbb97309..71a9b28fb05 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -795,7 +795,8 @@ where self.work_reprocessing_rx, None, beacon_chain.slot_clock.clone(), - ); + beacon_chain.spec.maximum_gossip_clock_disparity(), + )?; } let state_advance_context = runtime_context.service_context("state_advance".into()); diff --git a/beacon_node/http_api/src/attester_duties.rs b/beacon_node/http_api/src/attester_duties.rs index 5c3e420839d..aad405d56ba 100644 --- a/beacon_node/http_api/src/attester_duties.rs +++ b/beacon_node/http_api/src/attester_duties.rs @@ -1,9 +1,7 @@ //! Contains the handler for the `GET validator/duties/attester/{epoch}` endpoint. use crate::state_id::StateId; -use beacon_chain::{ - BeaconChain, BeaconChainError, BeaconChainTypes, MAXIMUM_GOSSIP_CLOCK_DISPARITY, -}; +use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2::types::{self as api_types}; use slot_clock::SlotClock; use state_processing::state_advance::partial_state_advance; @@ -32,7 +30,7 @@ pub fn attester_duties( // will equal `current_epoch + 1` let tolerant_current_epoch = chain .slot_clock - .now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) + .now_with_future_tolerance(chain.spec.maximum_gossip_clock_disparity()) .ok_or_else(|| warp_utils::reject::custom_server_error("unable to read slot clock".into()))? .epoch(T::EthSpec::slots_per_epoch()); diff --git a/beacon_node/http_api/src/proposer_duties.rs b/beacon_node/http_api/src/proposer_duties.rs index 7e946b89e72..708df39b4d6 100644 --- a/beacon_node/http_api/src/proposer_duties.rs +++ b/beacon_node/http_api/src/proposer_duties.rs @@ -3,7 +3,7 @@ use crate::state_id::StateId; use beacon_chain::{ beacon_proposer_cache::{compute_proposer_duties_from_head, ensure_state_is_in_epoch}, - BeaconChain, BeaconChainError, BeaconChainTypes, MAXIMUM_GOSSIP_CLOCK_DISPARITY, + BeaconChain, BeaconChainError, BeaconChainTypes, }; use eth2::types::{self as api_types}; use safe_arith::SafeArith; @@ -33,7 +33,7 @@ pub fn proposer_duties( // will equal `current_epoch + 1` let tolerant_current_epoch = chain .slot_clock - .now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) + .now_with_future_tolerance(chain.spec.maximum_gossip_clock_disparity()) .ok_or_else(|| warp_utils::reject::custom_server_error("unable to read slot clock".into()))? .epoch(T::EthSpec::slots_per_epoch()); diff --git a/beacon_node/http_api/src/sync_committees.rs b/beacon_node/http_api/src/sync_committees.rs index 07dfb5c988f..dcf41429f6d 100644 --- a/beacon_node/http_api/src/sync_committees.rs +++ b/beacon_node/http_api/src/sync_committees.rs @@ -6,7 +6,7 @@ use beacon_chain::sync_committee_verification::{ }; use beacon_chain::{ validator_monitor::timestamp_now, BeaconChain, BeaconChainError, BeaconChainTypes, - StateSkipConfig, MAXIMUM_GOSSIP_CLOCK_DISPARITY, + StateSkipConfig, }; use eth2::types::{self as api_types}; use lighthouse_network::PubsubMessage; @@ -85,7 +85,7 @@ fn duties_from_state_load( let current_epoch = chain.epoch()?; let tolerant_current_epoch = chain .slot_clock - .now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) + .now_with_future_tolerance(chain.spec.maximum_gossip_clock_disparity()) .ok_or(BeaconChainError::UnableToReadSlot)? .epoch(T::EthSpec::slots_per_epoch()); diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 7c3872925a3..28eb106e8df 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1,7 +1,7 @@ use beacon_chain::test_utils::RelativeSyncCommittee; use beacon_chain::{ test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, - BeaconChain, StateSkipConfig, WhenSlotSkipped, MAXIMUM_GOSSIP_CLOCK_DISPARITY, + BeaconChain, StateSkipConfig, WhenSlotSkipped, }; use environment::null_logger; use eth2::{ @@ -2313,7 +2313,9 @@ impl ApiTester { .unwrap(); self.chain.slot_clock.set_current_time( - current_epoch_start - MAXIMUM_GOSSIP_CLOCK_DISPARITY - Duration::from_millis(1), + current_epoch_start + - self.chain.spec.maximum_gossip_clock_disparity() + - Duration::from_millis(1), ); let dependent_root = self @@ -2350,9 +2352,9 @@ impl ApiTester { "should not get attester duties outside of tolerance" ); - self.chain - .slot_clock - .set_current_time(current_epoch_start - MAXIMUM_GOSSIP_CLOCK_DISPARITY); + self.chain.slot_clock.set_current_time( + current_epoch_start - self.chain.spec.maximum_gossip_clock_disparity(), + ); self.client .get_validator_duties_proposer(current_epoch) diff --git a/beacon_node/lighthouse_network/src/config.rs b/beacon_node/lighthouse_network/src/config.rs index 6c8f20a24b9..0ab7c03e7f6 100644 --- a/beacon_node/lighthouse_network/src/config.rs +++ b/beacon_node/lighthouse_network/src/config.rs @@ -16,11 +16,6 @@ use std::sync::Arc; use std::time::Duration; use types::{ForkContext, ForkName}; -/// The maximum transmit size of gossip messages in bytes pre-merge. -const GOSSIP_MAX_SIZE: usize = 1_048_576; // 1M -/// The maximum transmit size of gossip messages in bytes post-merge. -const GOSSIP_MAX_SIZE_POST_MERGE: usize = 10 * 1_048_576; // 10M - /// The cache time is set to accommodate the circulation time of an attestation. /// /// The p2p spec declares that we accept attestations within the following range: @@ -35,20 +30,20 @@ const GOSSIP_MAX_SIZE_POST_MERGE: usize = 10 * 1_048_576; // 10M /// another 500ms for "fudge factor". pub const DUPLICATE_CACHE_TIME: Duration = Duration::from_secs(33 * 12 + 1); -// We treat uncompressed messages as invalid and never use the INVALID_SNAPPY_DOMAIN as in the -// specification. We leave it here for posterity. -// const MESSAGE_DOMAIN_INVALID_SNAPPY: [u8; 4] = [0, 0, 0, 0]; -const MESSAGE_DOMAIN_VALID_SNAPPY: [u8; 4] = [1, 0, 0, 0]; - /// The maximum size of gossip messages. -pub fn gossip_max_size(is_merge_enabled: bool) -> usize { +pub fn gossip_max_size(is_merge_enabled: bool, gossip_max_size: usize) -> usize { if is_merge_enabled { - GOSSIP_MAX_SIZE_POST_MERGE + gossip_max_size } else { - GOSSIP_MAX_SIZE + gossip_max_size / 10 } } +pub struct GossipsubConfigParams { + pub message_domain_valid_snappy: [u8; 4], + pub gossip_max_size: usize, +} + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] /// Network configuration for lighthouse. @@ -413,7 +408,11 @@ impl From for NetworkLoad { } /// Return a Lighthouse specific `GossipsubConfig` where the `message_id_fn` depends on the current fork. -pub fn gossipsub_config(network_load: u8, fork_context: Arc) -> gossipsub::Config { +pub fn gossipsub_config( + network_load: u8, + fork_context: Arc, + gossipsub_config_params: GossipsubConfigParams, +) -> gossipsub::Config { // The function used to generate a gossipsub message id // We use the first 8 bytes of SHA256(topic, data) for content addressing let fast_gossip_message_id = |message: &gossipsub::RawMessage| { @@ -446,12 +445,12 @@ pub fn gossipsub_config(network_load: u8, fork_context: Arc) -> gos } } } - + let message_domain_valid_snappy = gossipsub_config_params.message_domain_valid_snappy; let is_merge_enabled = fork_context.fork_exists(ForkName::Merge); let gossip_message_id = move |message: &gossipsub::Message| { gossipsub::MessageId::from( &Sha256::digest( - prefix(MESSAGE_DOMAIN_VALID_SNAPPY, message, fork_context.clone()).as_slice(), + prefix(message_domain_valid_snappy, message, fork_context.clone()).as_slice(), )[..20], ) }; @@ -459,7 +458,10 @@ pub fn gossipsub_config(network_load: u8, fork_context: Arc) -> gos let load = NetworkLoad::from(network_load); gossipsub::ConfigBuilder::default() - .max_transmit_size(gossip_max_size(is_merge_enabled)) + .max_transmit_size(gossip_max_size( + is_merge_enabled, + gossipsub_config_params.gossip_max_size, + )) .heartbeat_interval(load.heartbeat_interval) .mesh_n(load.mesh_n) .mesh_n_low(load.mesh_n_low) diff --git a/beacon_node/lighthouse_network/src/rpc/codec/base.rs b/beacon_node/lighthouse_network/src/rpc/codec/base.rs index d568f27897e..943d4a3bce2 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/base.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/base.rs @@ -217,9 +217,12 @@ mod tests { let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy); let fork_context = Arc::new(fork_context(ForkName::Base)); + + let chain_spec = Spec::default_spec(); + let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new( snappy_protocol_id, - max_rpc_size(&fork_context), + max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize), fork_context, ); @@ -251,9 +254,12 @@ mod tests { let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy); let fork_context = Arc::new(fork_context(ForkName::Base)); + + let chain_spec = Spec::default_spec(); + let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new( snappy_protocol_id, - max_rpc_size(&fork_context), + max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize), fork_context, ); @@ -279,7 +285,10 @@ mod tests { // Response limits let fork_context = Arc::new(fork_context(ForkName::Base)); - let max_rpc_size = max_rpc_size(&fork_context); + + let chain_spec = Spec::default_spec(); + + let max_rpc_size = max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize); let limit = protocol_id.rpc_response_limits::(&fork_context); let mut max = encode_len(limit.max + 1); let mut codec = SSZSnappyOutboundCodec::::new( diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index 39cf8b3eb29..f1d94da7ece 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -615,8 +615,8 @@ mod tests { }; use std::sync::Arc; use types::{ - BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, EmptyBlock, Epoch, - ForkContext, FullPayload, Hash256, Signature, SignedBeaconBlock, Slot, + BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, ChainSpec, EmptyBlock, + Epoch, ForkContext, FullPayload, Hash256, Signature, SignedBeaconBlock, Slot, }; use snap::write::FrameEncoder; @@ -658,7 +658,7 @@ mod tests { } /// Merge block with length < max_rpc_size. - fn merge_block_small(fork_context: &ForkContext) -> SignedBeaconBlock { + fn merge_block_small(fork_context: &ForkContext, spec: &ChainSpec) -> SignedBeaconBlock { let mut block: BeaconBlockMerge<_, FullPayload> = BeaconBlockMerge::empty(&Spec::default_spec()); let tx = VariableList::from(vec![0; 1024]); @@ -667,14 +667,14 @@ mod tests { block.body.execution_payload.execution_payload.transactions = txs; let block = BeaconBlock::Merge(block); - assert!(block.ssz_bytes_len() <= max_rpc_size(fork_context)); + assert!(block.ssz_bytes_len() <= max_rpc_size(fork_context, spec.max_chunk_size as usize)); SignedBeaconBlock::from_block(block, Signature::empty()) } /// Merge block with length > MAX_RPC_SIZE. /// The max limit for a merge block is in the order of ~16GiB which wouldn't fit in memory. /// Hence, we generate a merge block just greater than `MAX_RPC_SIZE` to test rejection on the rpc layer. - fn merge_block_large(fork_context: &ForkContext) -> SignedBeaconBlock { + fn merge_block_large(fork_context: &ForkContext, spec: &ChainSpec) -> SignedBeaconBlock { let mut block: BeaconBlockMerge<_, FullPayload> = BeaconBlockMerge::empty(&Spec::default_spec()); let tx = VariableList::from(vec![0; 1024]); @@ -683,7 +683,7 @@ mod tests { block.body.execution_payload.execution_payload.transactions = txs; let block = BeaconBlock::Merge(block); - assert!(block.ssz_bytes_len() > max_rpc_size(fork_context)); + assert!(block.ssz_bytes_len() > max_rpc_size(fork_context, spec.max_chunk_size as usize)); SignedBeaconBlock::from_block(block, Signature::empty()) } @@ -737,10 +737,11 @@ mod tests { protocol: SupportedProtocol, message: RPCCodedResponse, fork_name: ForkName, + spec: &ChainSpec, ) -> Result { let snappy_protocol_id = ProtocolId::new(protocol, Encoding::SSZSnappy); let fork_context = Arc::new(fork_context(fork_name)); - let max_packet_size = max_rpc_size(&fork_context); + let max_packet_size = max_rpc_size(&fork_context, spec.max_chunk_size as usize); let mut buf = BytesMut::new(); let mut snappy_inbound_codec = @@ -783,10 +784,11 @@ mod tests { protocol: SupportedProtocol, message: &mut BytesMut, fork_name: ForkName, + spec: &ChainSpec, ) -> Result>, RPCError> { let snappy_protocol_id = ProtocolId::new(protocol, Encoding::SSZSnappy); let fork_context = Arc::new(fork_context(fork_name)); - let max_packet_size = max_rpc_size(&fork_context); + let max_packet_size = max_rpc_size(&fork_context, spec.max_chunk_size as usize); let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new(snappy_protocol_id, max_packet_size, fork_context); // decode message just as snappy message @@ -798,15 +800,20 @@ mod tests { protocol: SupportedProtocol, message: RPCCodedResponse, fork_name: ForkName, + spec: &ChainSpec, ) -> Result>, RPCError> { - let mut encoded = encode_response(protocol, message, fork_name)?; - decode_response(protocol, &mut encoded, fork_name) + let mut encoded = encode_response(protocol, message, fork_name, spec)?; + decode_response(protocol, &mut encoded, fork_name, spec) } /// Verifies that requests we send are encoded in a way that we would correctly decode too. - fn encode_then_decode_request(req: OutboundRequest, fork_name: ForkName) { + fn encode_then_decode_request( + req: OutboundRequest, + fork_name: ForkName, + spec: &ChainSpec, + ) { let fork_context = Arc::new(fork_context(fork_name)); - let max_packet_size = max_rpc_size(&fork_context); + let max_packet_size = max_rpc_size(&fork_context, spec.max_chunk_size as usize); let protocol = ProtocolId::new(req.versioned_protocol(), Encoding::SSZSnappy); // Encode a request we send let mut buf = BytesMut::new(); @@ -851,11 +858,14 @@ mod tests { // Test RPCResponse encoding/decoding for V1 messages #[test] fn test_encode_then_decode_v1() { + let chain_spec = Spec::default_spec(); + assert_eq!( encode_then_decode_response( SupportedProtocol::StatusV1, RPCCodedResponse::Success(RPCResponse::Status(status_message())), ForkName::Base, + &chain_spec, ), Ok(Some(RPCResponse::Status(status_message()))) ); @@ -865,6 +875,7 @@ mod tests { SupportedProtocol::PingV1, RPCCodedResponse::Success(RPCResponse::Pong(ping_message())), ForkName::Base, + &chain_spec, ), Ok(Some(RPCResponse::Pong(ping_message()))) ); @@ -874,6 +885,7 @@ mod tests { SupportedProtocol::BlocksByRangeV1, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(empty_base_block()))), ForkName::Base, + &chain_spec, ), Ok(Some(RPCResponse::BlocksByRange(Arc::new( empty_base_block() @@ -886,6 +898,7 @@ mod tests { SupportedProtocol::BlocksByRangeV1, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(altair_block()))), ForkName::Altair, + &chain_spec, ) .unwrap_err(), RPCError::SSZDecodeError(_) @@ -898,6 +911,7 @@ mod tests { SupportedProtocol::BlocksByRootV1, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(empty_base_block()))), ForkName::Base, + &chain_spec, ), Ok(Some(RPCResponse::BlocksByRoot( Arc::new(empty_base_block()) @@ -910,6 +924,7 @@ mod tests { SupportedProtocol::BlocksByRootV1, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(altair_block()))), ForkName::Altair, + &chain_spec, ) .unwrap_err(), RPCError::SSZDecodeError(_) @@ -922,6 +937,7 @@ mod tests { SupportedProtocol::MetaDataV1, RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), ForkName::Base, + &chain_spec, ), Ok(Some(RPCResponse::MetaData(metadata()))), ); @@ -932,6 +948,7 @@ mod tests { SupportedProtocol::MetaDataV1, RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2())), ForkName::Base, + &chain_spec, ), Ok(Some(RPCResponse::MetaData(metadata()))), ); @@ -940,11 +957,14 @@ mod tests { // Test RPCResponse encoding/decoding for V1 messages #[test] fn test_encode_then_decode_v2() { + let chain_spec = Spec::default_spec(); + assert_eq!( encode_then_decode_response( SupportedProtocol::BlocksByRangeV2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(empty_base_block()))), ForkName::Base, + &chain_spec, ), Ok(Some(RPCResponse::BlocksByRange(Arc::new( empty_base_block() @@ -959,6 +979,7 @@ mod tests { SupportedProtocol::BlocksByRangeV2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(empty_base_block()))), ForkName::Altair, + &chain_spec, ), Ok(Some(RPCResponse::BlocksByRange(Arc::new( empty_base_block() @@ -970,12 +991,13 @@ mod tests { SupportedProtocol::BlocksByRangeV2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(altair_block()))), ForkName::Altair, + &chain_spec, ), Ok(Some(RPCResponse::BlocksByRange(Arc::new(altair_block())))) ); - let merge_block_small = merge_block_small(&fork_context(ForkName::Merge)); - let merge_block_large = merge_block_large(&fork_context(ForkName::Merge)); + let merge_block_small = merge_block_small(&fork_context(ForkName::Merge), &chain_spec); + let merge_block_large = merge_block_large(&fork_context(ForkName::Merge), &chain_spec); assert_eq!( encode_then_decode_response( @@ -984,6 +1006,7 @@ mod tests { merge_block_small.clone() ))), ForkName::Merge, + &chain_spec, ), Ok(Some(RPCResponse::BlocksByRange(Arc::new( merge_block_small.clone() @@ -1000,6 +1023,7 @@ mod tests { SupportedProtocol::BlocksByRangeV2, &mut encoded, ForkName::Merge, + &chain_spec, ) .unwrap_err(), RPCError::InvalidData(_) @@ -1012,6 +1036,7 @@ mod tests { SupportedProtocol::BlocksByRootV2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(empty_base_block()))), ForkName::Base, + &chain_spec, ), Ok(Some(RPCResponse::BlocksByRoot( Arc::new(empty_base_block()) @@ -1026,6 +1051,7 @@ mod tests { SupportedProtocol::BlocksByRootV2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(empty_base_block()))), ForkName::Altair, + &chain_spec, ), Ok(Some(RPCResponse::BlocksByRoot( Arc::new(empty_base_block()) @@ -1037,6 +1063,7 @@ mod tests { SupportedProtocol::BlocksByRootV2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(altair_block()))), ForkName::Altair, + &chain_spec, ), Ok(Some(RPCResponse::BlocksByRoot(Arc::new(altair_block())))) ); @@ -1048,6 +1075,7 @@ mod tests { merge_block_small.clone() ))), ForkName::Merge, + &chain_spec, ), Ok(Some(RPCResponse::BlocksByRoot(Arc::new(merge_block_small)))) ); @@ -1062,6 +1090,7 @@ mod tests { SupportedProtocol::BlocksByRootV2, &mut encoded, ForkName::Merge, + &chain_spec, ) .unwrap_err(), RPCError::InvalidData(_) @@ -1075,6 +1104,7 @@ mod tests { SupportedProtocol::MetaDataV2, RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), ForkName::Base, + &chain_spec, ), Ok(Some(RPCResponse::MetaData(metadata_v2()))) ); @@ -1084,6 +1114,7 @@ mod tests { SupportedProtocol::MetaDataV2, RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2())), ForkName::Altair, + &chain_spec, ), Ok(Some(RPCResponse::MetaData(metadata_v2()))) ); @@ -1094,11 +1125,14 @@ mod tests { fn test_context_bytes_v2() { let fork_context = fork_context(ForkName::Altair); + let chain_spec = Spec::default_spec(); + // Removing context bytes for v2 messages should error let mut encoded_bytes = encode_response( SupportedProtocol::BlocksByRangeV2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(empty_base_block()))), ForkName::Base, + &chain_spec, ) .unwrap(); @@ -1108,7 +1142,8 @@ mod tests { decode_response( SupportedProtocol::BlocksByRangeV2, &mut encoded_bytes, - ForkName::Base + ForkName::Base, + &chain_spec, ) .unwrap_err(), RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), @@ -1118,6 +1153,7 @@ mod tests { SupportedProtocol::BlocksByRootV2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(empty_base_block()))), ForkName::Base, + &chain_spec, ) .unwrap(); @@ -1127,7 +1163,8 @@ mod tests { decode_response( SupportedProtocol::BlocksByRangeV2, &mut encoded_bytes, - ForkName::Base + ForkName::Base, + &chain_spec, ) .unwrap_err(), RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), @@ -1138,6 +1175,7 @@ mod tests { SupportedProtocol::BlocksByRangeV2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(empty_base_block()))), ForkName::Altair, + &chain_spec, ) .unwrap(); @@ -1150,7 +1188,8 @@ mod tests { decode_response( SupportedProtocol::BlocksByRangeV2, &mut wrong_fork_bytes, - ForkName::Altair + ForkName::Altair, + &chain_spec, ) .unwrap_err(), RPCError::SSZDecodeError(_), @@ -1161,6 +1200,7 @@ mod tests { SupportedProtocol::BlocksByRootV2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(altair_block()))), ForkName::Altair, + &chain_spec, ) .unwrap(); @@ -1172,7 +1212,8 @@ mod tests { decode_response( SupportedProtocol::BlocksByRangeV2, &mut wrong_fork_bytes, - ForkName::Altair + ForkName::Altair, + &chain_spec, ) .unwrap_err(), RPCError::SSZDecodeError(_), @@ -1186,6 +1227,7 @@ mod tests { SupportedProtocol::MetaDataV2, RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), ForkName::Altair, + &chain_spec, ) .unwrap(), ); @@ -1193,7 +1235,8 @@ mod tests { assert!(decode_response( SupportedProtocol::MetaDataV2, &mut encoded_bytes, - ForkName::Altair + ForkName::Altair, + &chain_spec, ) .is_err()); @@ -1202,6 +1245,7 @@ mod tests { SupportedProtocol::BlocksByRootV2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(empty_base_block()))), ForkName::Altair, + &chain_spec, ) .unwrap(); @@ -1213,7 +1257,8 @@ mod tests { decode_response( SupportedProtocol::BlocksByRangeV2, &mut wrong_fork_bytes, - ForkName::Altair + ForkName::Altair, + &chain_spec, ) .unwrap_err(), RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), @@ -1224,6 +1269,7 @@ mod tests { SupportedProtocol::BlocksByRootV2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(empty_base_block()))), ForkName::Altair, + &chain_spec, ) .unwrap(); @@ -1233,7 +1279,8 @@ mod tests { decode_response( SupportedProtocol::BlocksByRangeV2, &mut part, - ForkName::Altair + ForkName::Altair, + &chain_spec, ), Ok(None) ) @@ -1252,9 +1299,12 @@ mod tests { OutboundRequest::MetaData(MetadataRequest::new_v1()), OutboundRequest::MetaData(MetadataRequest::new_v2()), ]; + + let chain_spec = Spec::default_spec(); + for req in requests.iter() { for fork_name in ForkName::list_all() { - encode_then_decode_request(req.clone(), fork_name); + encode_then_decode_request(req.clone(), fork_name, &chain_spec); } } } @@ -1308,9 +1358,16 @@ mod tests { assert_eq!(writer.get_ref().len(), 42); dst.extend_from_slice(writer.get_ref()); + let chain_spec = Spec::default_spec(); // 10 (for stream identifier) + 80 + 42 = 132 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`. assert!(matches!( - decode_response(SupportedProtocol::StatusV1, &mut dst, ForkName::Base).unwrap_err(), + decode_response( + SupportedProtocol::StatusV1, + &mut dst, + ForkName::Base, + &chain_spec + ) + .unwrap_err(), RPCError::InvalidData(_) )); } @@ -1365,12 +1422,15 @@ mod tests { assert_eq!(writer.get_ref().len(), 8103); dst.extend_from_slice(writer.get_ref()); + let chain_spec = Spec::default_spec(); + // 10 (for stream identifier) + 176156 + 8103 = 184269 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`. assert!(matches!( decode_response( SupportedProtocol::BlocksByRangeV2, &mut dst, - ForkName::Altair + ForkName::Altair, + &chain_spec, ) .unwrap_err(), RPCError::InvalidData(_) @@ -1398,8 +1458,12 @@ mod tests { let mut uvi_codec: Uvi = Uvi::default(); let mut dst = BytesMut::with_capacity(1024); + let chain_spec = Spec::default_spec(); + // Insert length-prefix - uvi_codec.encode(MAX_RPC_SIZE + 1, &mut dst).unwrap(); + uvi_codec + .encode(chain_spec.max_chunk_size as usize + 1, &mut dst) + .unwrap(); // Insert snappy stream identifier dst.extend_from_slice(stream_identifier); @@ -1411,7 +1475,13 @@ mod tests { dst.extend_from_slice(writer.get_ref()); assert!(matches!( - decode_response(SupportedProtocol::StatusV1, &mut dst, ForkName::Base).unwrap_err(), + decode_response( + SupportedProtocol::StatusV1, + &mut dst, + ForkName::Base, + &chain_spec + ) + .unwrap_err(), RPCError::InvalidData(_) )); } diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index d42248ad5f6..36a5abc0863 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -3,9 +3,7 @@ use super::methods::{GoodbyeReason, RPCCodedResponse, RPCResponseErrorCode, ResponseTermination}; use super::outbound::OutboundRequestContainer; -use super::protocol::{ - max_rpc_size, InboundOutput, InboundRequest, Protocol, RPCError, RPCProtocol, -}; +use super::protocol::{InboundOutput, InboundRequest, Protocol, RPCError, RPCProtocol}; use super::{RPCReceived, RPCSend, ReqId}; use crate::rpc::outbound::{OutboundFramed, OutboundRequest}; use crate::rpc::protocol::InboundFramed; @@ -31,9 +29,6 @@ use tokio::time::{sleep_until, Instant as TInstant, Sleep}; use tokio_util::time::{delay_queue, DelayQueue}; use types::{EthSpec, ForkContext}; -/// The time (in seconds) before a substream that is awaiting a response from the user times out. -pub const RESPONSE_TIMEOUT: u64 = 10; - /// The number of times to retry an outbound upgrade in the case of IO errors. const IO_ERROR_RETRIES: u8 = 3; @@ -131,6 +126,9 @@ where /// Logger for handling RPC streams log: slog::Logger, + + /// Timeout that will me used for inbound and outbound responses. + resp_timeout: Duration, } enum HandlerState { @@ -212,7 +210,8 @@ where pub fn new( listen_protocol: SubstreamProtocol, ()>, fork_context: Arc, - log: slog::Logger, + log: &slog::Logger, + resp_timeout: Duration, ) -> Self { RPCHandler { listen_protocol, @@ -230,7 +229,8 @@ where outbound_io_error_retries: 0, fork_context, waker: None, - log, + log: log.clone(), + resp_timeout, } } @@ -554,7 +554,7 @@ where // Each chunk is allowed RESPONSE_TIMEOUT to be sent. if let Some(ref delay_key) = info.delay_key { self.inbound_substreams_delay - .reset(delay_key, Duration::from_secs(RESPONSE_TIMEOUT)); + .reset(delay_key, self.resp_timeout); } // The stream may be currently idle. Attempt to process more @@ -688,7 +688,7 @@ where }; substream_entry.remaining_chunks = Some(remaining_chunks); self.outbound_substreams_delay - .reset(delay_key, Duration::from_secs(RESPONSE_TIMEOUT)); + .reset(delay_key, self.resp_timeout); } } else { // either this is a single response request or this response closes the @@ -811,7 +811,7 @@ where OutboundRequestContainer { req: req.clone(), fork_context: self.fork_context.clone(), - max_rpc_size: max_rpc_size(&self.fork_context), + max_rpc_size: self.listen_protocol().upgrade().max_rpc_size, }, (), ) @@ -896,10 +896,9 @@ where if expected_responses > 0 { if self.inbound_substreams.len() < MAX_INBOUND_SUBSTREAMS { // Store the stream and tag the output. - let delay_key = self.inbound_substreams_delay.insert( - self.current_inbound_substream_id, - Duration::from_secs(RESPONSE_TIMEOUT), - ); + let delay_key = self + .inbound_substreams_delay + .insert(self.current_inbound_substream_id, self.resp_timeout); let awaiting_stream = InboundState::Idle(substream); self.inbound_substreams.insert( self.current_inbound_substream_id, @@ -961,10 +960,9 @@ where let expected_responses = request.expected_responses(); if expected_responses > 0 { // new outbound request. Store the stream and tag the output. - let delay_key = self.outbound_substreams_delay.insert( - self.current_outbound_substream_id, - Duration::from_secs(RESPONSE_TIMEOUT), - ); + let delay_key = self + .outbound_substreams_delay + .insert(self.current_outbound_substream_id, self.resp_timeout); let awaiting_stream = OutboundSubstreamState::RequestPendingResponse { substream: Box::new(substream), request, diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 4fd9b516d4c..14f77e4ba23 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -17,6 +17,7 @@ use slog::{crit, debug, o}; use std::marker::PhantomData; use std::sync::Arc; use std::task::{Context, Poll}; +use std::time::Duration; use types::{EthSpec, ForkContext}; pub(crate) use handler::HandlerErr; @@ -107,6 +108,12 @@ pub struct RPCMessage { type BehaviourAction = ToSwarm, RPCSend>; +pub struct NetworkParams { + pub max_chunk_size: usize, + pub ttfb_timeout: Duration, + pub resp_timeout: Duration, +} + /// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level /// logic. pub struct RPC { @@ -120,6 +127,8 @@ pub struct RPC { enable_light_client_server: bool, /// Slog logger for RPC behaviour. log: slog::Logger, + /// Networking constant values + network_params: NetworkParams, } impl RPC { @@ -129,6 +138,7 @@ impl RPC { inbound_rate_limiter_config: Option, outbound_rate_limiter_config: Option, log: slog::Logger, + network_params: NetworkParams, ) -> Self { let log = log.new(o!("service" => "libp2p_rpc")); @@ -149,6 +159,7 @@ impl RPC { fork_context, enable_light_client_server, log, + network_params, } } @@ -220,16 +231,22 @@ where let protocol = SubstreamProtocol::new( RPCProtocol { fork_context: self.fork_context.clone(), - max_rpc_size: max_rpc_size(&self.fork_context), + max_rpc_size: max_rpc_size(&self.fork_context, self.network_params.max_chunk_size), enable_light_client_server: self.enable_light_client_server, phantom: PhantomData, + ttfb_timeout: self.network_params.ttfb_timeout, }, (), ); // NOTE: this is needed because PeerIds have interior mutability. let peer_repr = peer_id.to_string(); let log = self.log.new(slog::o!("peer_id" => peer_repr)); - let handler = RPCHandler::new(protocol, self.fork_context.clone(), log); + let handler = RPCHandler::new( + protocol, + self.fork_context.clone(), + &log, + self.network_params.resp_timeout, + ); Ok(handler) } @@ -244,9 +261,10 @@ where let protocol = SubstreamProtocol::new( RPCProtocol { fork_context: self.fork_context.clone(), - max_rpc_size: max_rpc_size(&self.fork_context), + max_rpc_size: max_rpc_size(&self.fork_context, self.network_params.max_chunk_size), enable_light_client_server: self.enable_light_client_server, phantom: PhantomData, + ttfb_timeout: self.network_params.ttfb_timeout, }, (), ); @@ -254,7 +272,12 @@ where // NOTE: this is needed because PeerIds have interior mutability. let peer_repr = peer_id.to_string(); let log = self.log.new(slog::o!("peer_id" => peer_repr)); - let handler = RPCHandler::new(protocol, self.fork_context.clone(), log); + let handler = RPCHandler::new( + protocol, + self.fork_context.clone(), + &log, + self.network_params.resp_timeout, + ); Ok(handler) } diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 22f9f19d680..f2a39470b94 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -72,7 +72,7 @@ lazy_static! { /// The `BeaconBlockMerge` block has an `ExecutionPayload` field which has a max size ~16 GiB for future proofing. /// We calculate the value from its fields instead of constructing the block and checking the length. /// Note: This is only the theoretical upper bound. We further bound the max size we receive over the network - /// with `MAX_RPC_SIZE_POST_MERGE`. + /// with `max_chunk_size`. pub static ref SIGNED_BEACON_BLOCK_MERGE_MAX: usize = // Size of a full altair block *SIGNED_BEACON_BLOCK_ALTAIR_MAX @@ -109,25 +109,18 @@ lazy_static! { .len(); } -/// The maximum bytes that can be sent across the RPC pre-merge. -pub(crate) const MAX_RPC_SIZE: usize = 1_048_576; // 1M -/// The maximum bytes that can be sent across the RPC post-merge. -pub(crate) const MAX_RPC_SIZE_POST_MERGE: usize = 10 * 1_048_576; // 10M -pub(crate) const MAX_RPC_SIZE_POST_CAPELLA: usize = 10 * 1_048_576; // 10M /// The protocol prefix the RPC protocol id. const PROTOCOL_PREFIX: &str = "/eth2/beacon_chain/req"; -/// Time allowed for the first byte of a request to arrive before we time out (Time To First Byte). -const TTFB_TIMEOUT: u64 = 5; /// The number of seconds to wait for the first bytes of a request once a protocol has been /// established before the stream is terminated. const REQUEST_TIMEOUT: u64 = 15; /// Returns the maximum bytes that can be sent across the RPC. -pub fn max_rpc_size(fork_context: &ForkContext) -> usize { +pub fn max_rpc_size(fork_context: &ForkContext, max_chunk_size: usize) -> usize { match fork_context.current_fork() { - ForkName::Altair | ForkName::Base => MAX_RPC_SIZE, - ForkName::Merge => MAX_RPC_SIZE_POST_MERGE, - ForkName::Capella => MAX_RPC_SIZE_POST_CAPELLA, + ForkName::Altair | ForkName::Base => max_chunk_size / 10, + ForkName::Merge => max_chunk_size, + ForkName::Capella => max_chunk_size, } } @@ -262,6 +255,7 @@ pub struct RPCProtocol { pub max_rpc_size: usize, pub enable_light_client_server: bool, pub phantom: PhantomData, + pub ttfb_timeout: Duration, } impl UpgradeInfo for RPCProtocol { @@ -447,7 +441,7 @@ where } }; let mut timed_socket = TimeoutStream::new(socket); - timed_socket.set_read_timeout(Some(Duration::from_secs(TTFB_TIMEOUT))); + timed_socket.set_read_timeout(Some(self.ttfb_timeout)); let socket = Framed::new(Box::pin(timed_socket), codec); diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 1a25beee0a7..63e5bcbff66 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1,6 +1,6 @@ use self::behaviour::Behaviour; use self::gossip_cache::GossipCache; -use crate::config::{gossipsub_config, NetworkLoad}; +use crate::config::{gossipsub_config, GossipsubConfigParams, NetworkLoad}; use crate::discovery::{ subnet_predicate, DiscoveredPeers, Discovery, FIND_NODE_QUERY_CLOSEST_PEERS, }; @@ -232,7 +232,15 @@ impl Network { max_subscriptions_per_request: 150, // 148 in theory = (64 attestation + 4 sync committee + 6 core topics) * 2 }; - config.gs_config = gossipsub_config(config.network_load, ctx.fork_context.clone()); + let gossipsub_config_params = GossipsubConfigParams { + message_domain_valid_snappy: ctx.chain_spec.message_domain_valid_snappy, + gossip_max_size: ctx.chain_spec.gossip_max_size as usize, + }; + config.gs_config = gossipsub_config( + config.network_load, + ctx.fork_context.clone(), + gossipsub_config_params, + ); // If metrics are enabled for gossipsub build the configuration let gossipsub_metrics = ctx @@ -256,12 +264,18 @@ impl Network { (gossipsub, update_gossipsub_scores) }; + let network_params = NetworkParams { + max_chunk_size: ctx.chain_spec.max_chunk_size as usize, + ttfb_timeout: ctx.chain_spec.ttfb_timeout(), + resp_timeout: ctx.chain_spec.resp_timeout(), + }; let eth2_rpc = RPC::new( ctx.fork_context.clone(), config.enable_light_client_server, config.inbound_rate_limiter_config.clone(), config.outbound_rate_limiter_config.clone(), log.clone(), + network_params, ); let discovery = { diff --git a/beacon_node/lighthouse_network/tests/common.rs b/beacon_node/lighthouse_network/tests/common.rs index b48891335cc..36a2e523855 100644 --- a/beacon_node/lighthouse_network/tests/common.rs +++ b/beacon_node/lighthouse_network/tests/common.rs @@ -94,6 +94,7 @@ pub async fn build_libp2p_instance( boot_nodes: Vec, log: slog::Logger, fork_name: ForkName, + spec: &ChainSpec, ) -> Libp2pInstance { let port = unused_tcp4_port().unwrap(); let config = build_config(port, boot_nodes); @@ -106,7 +107,7 @@ pub async fn build_libp2p_instance( config: &config, enr_fork_id: EnrForkId::default(), fork_context: Arc::new(fork_context(fork_name)), - chain_spec: &ChainSpec::minimal(), + chain_spec: spec, gossipsub_registry: None, }; Libp2pInstance( @@ -130,12 +131,13 @@ pub async fn build_node_pair( rt: Weak, log: &slog::Logger, fork_name: ForkName, + spec: &ChainSpec, ) -> (Libp2pInstance, Libp2pInstance) { let sender_log = log.new(o!("who" => "sender")); let receiver_log = log.new(o!("who" => "receiver")); - let mut sender = build_libp2p_instance(rt.clone(), vec![], sender_log, fork_name).await; - let mut receiver = build_libp2p_instance(rt, vec![], receiver_log, fork_name).await; + let mut sender = build_libp2p_instance(rt.clone(), vec![], sender_log, fork_name, spec).await; + let mut receiver = build_libp2p_instance(rt, vec![], receiver_log, fork_name, spec).await; let receiver_multiaddr = receiver.local_enr().multiaddr()[1].clone(); @@ -180,10 +182,11 @@ pub async fn build_linear( log: slog::Logger, n: usize, fork_name: ForkName, + spec: &ChainSpec, ) -> Vec { let mut nodes = Vec::with_capacity(n); for _ in 0..n { - nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone(), fork_name).await); + nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone(), fork_name, spec).await); } let multiaddrs: Vec = nodes diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index 656df0c4a16..05fa5ab8542 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -9,8 +9,9 @@ use std::time::Duration; use tokio::runtime::Runtime; use tokio::time::sleep; use types::{ - BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, EmptyBlock, Epoch, EthSpec, - ForkContext, ForkName, Hash256, MinimalEthSpec, Signature, SignedBeaconBlock, Slot, + BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, ChainSpec, EmptyBlock, + Epoch, EthSpec, ForkContext, ForkName, Hash256, MinimalEthSpec, Signature, SignedBeaconBlock, + Slot, }; mod common; @@ -18,30 +19,30 @@ mod common; type E = MinimalEthSpec; /// Merge block with length < max_rpc_size. -fn merge_block_small(fork_context: &ForkContext) -> BeaconBlock { - let mut block = BeaconBlockMerge::::empty(&E::default_spec()); +fn merge_block_small(fork_context: &ForkContext, spec: &ChainSpec) -> BeaconBlock { + let mut block = BeaconBlockMerge::::empty(spec); let tx = VariableList::from(vec![0; 1024]); let txs = VariableList::from(std::iter::repeat(tx).take(5000).collect::>()); block.body.execution_payload.execution_payload.transactions = txs; let block = BeaconBlock::Merge(block); - assert!(block.ssz_bytes_len() <= max_rpc_size(fork_context)); + assert!(block.ssz_bytes_len() <= max_rpc_size(fork_context, spec.max_chunk_size as usize)); block } /// Merge block with length > MAX_RPC_SIZE. /// The max limit for a merge block is in the order of ~16GiB which wouldn't fit in memory. /// Hence, we generate a merge block just greater than `MAX_RPC_SIZE` to test rejection on the rpc layer. -fn merge_block_large(fork_context: &ForkContext) -> BeaconBlock { - let mut block = BeaconBlockMerge::::empty(&E::default_spec()); +fn merge_block_large(fork_context: &ForkContext, spec: &ChainSpec) -> BeaconBlock { + let mut block = BeaconBlockMerge::::empty(spec); let tx = VariableList::from(vec![0; 1024]); let txs = VariableList::from(std::iter::repeat(tx).take(100000).collect::>()); block.body.execution_payload.execution_payload.transactions = txs; let block = BeaconBlock::Merge(block); - assert!(block.ssz_bytes_len() > max_rpc_size(fork_context)); + assert!(block.ssz_bytes_len() > max_rpc_size(fork_context, spec.max_chunk_size as usize)); block } @@ -57,10 +58,12 @@ fn test_status_rpc() { let log = common::build_log(log_level, enable_logging); + let spec = E::default_spec(); + rt.block_on(async { // get sender/receiver let (mut sender, mut receiver) = - common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base, &spec).await; // Dummy STATUS RPC message let rpc_request = Request::Status(StatusMessage { @@ -149,10 +152,12 @@ fn test_blocks_by_range_chunked_rpc() { let rt = Arc::new(Runtime::new().unwrap()); + let spec = E::default_spec(); + rt.block_on(async { // get sender/receiver let (mut sender, mut receiver) = - common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge).await; + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge, &spec).await; // BlocksByRange Request let rpc_request = Request::BlocksByRange(BlocksByRangeRequest::new(0, messages_to_send)); @@ -168,7 +173,7 @@ fn test_blocks_by_range_chunked_rpc() { let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_altair = Response::BlocksByRange(Some(Arc::new(signed_full_block))); - let full_block = merge_block_small(&common::fork_context(ForkName::Merge)); + let full_block = merge_block_small(&common::fork_context(ForkName::Merge), &spec); let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_merge_small = Response::BlocksByRange(Some(Arc::new(signed_full_block))); @@ -273,16 +278,18 @@ fn test_blocks_by_range_over_limit() { let rt = Arc::new(Runtime::new().unwrap()); + let spec = E::default_spec(); + rt.block_on(async { // get sender/receiver let (mut sender, mut receiver) = - common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge).await; + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge, &spec).await; // BlocksByRange Request let rpc_request = Request::BlocksByRange(BlocksByRangeRequest::new(0, messages_to_send)); // BlocksByRange Response - let full_block = merge_block_large(&common::fork_context(ForkName::Merge)); + let full_block = merge_block_large(&common::fork_context(ForkName::Merge), &spec); let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_merge_large = Response::BlocksByRange(Some(Arc::new(signed_full_block))); @@ -355,10 +362,12 @@ fn test_blocks_by_range_chunked_rpc_terminates_correctly() { let rt = Arc::new(Runtime::new().unwrap()); + let spec = E::default_spec(); + rt.block_on(async { // get sender/receiver let (mut sender, mut receiver) = - common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base, &spec).await; // BlocksByRange Request let rpc_request = Request::BlocksByRange(BlocksByRangeRequest::new(0, messages_to_send)); @@ -475,10 +484,12 @@ fn test_blocks_by_range_single_empty_rpc() { let log = common::build_log(log_level, enable_logging); let rt = Arc::new(Runtime::new().unwrap()); + let spec = E::default_spec(); + rt.block_on(async { // get sender/receiver let (mut sender, mut receiver) = - common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base, &spec).await; // BlocksByRange Request let rpc_request = Request::BlocksByRange(BlocksByRangeRequest::new(0, 10)); @@ -579,7 +590,7 @@ fn test_blocks_by_root_chunked_rpc() { // get sender/receiver rt.block_on(async { let (mut sender, mut receiver) = - common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge).await; + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge, &spec).await; // BlocksByRoot Request let rpc_request = @@ -601,7 +612,7 @@ fn test_blocks_by_root_chunked_rpc() { let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_altair = Response::BlocksByRoot(Some(Arc::new(signed_full_block))); - let full_block = merge_block_small(&common::fork_context(ForkName::Merge)); + let full_block = merge_block_small(&common::fork_context(ForkName::Merge), &spec); let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_merge_small = Response::BlocksByRoot(Some(Arc::new(signed_full_block))); @@ -706,7 +717,7 @@ fn test_blocks_by_root_chunked_rpc_terminates_correctly() { // get sender/receiver rt.block_on(async { let (mut sender, mut receiver) = - common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base, &spec).await; // BlocksByRoot Request let rpc_request = @@ -833,10 +844,13 @@ fn test_goodbye_rpc() { let log = common::build_log(log_level, enable_logging); let rt = Arc::new(Runtime::new().unwrap()); + + let spec = E::default_spec(); + // get sender/receiver rt.block_on(async { let (mut sender, mut receiver) = - common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base, &spec).await; // build the sender future let sender_future = async { diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index cde4da9ffcc..cb4d6f9c2bd 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1634,6 +1634,7 @@ impl NetworkBeaconProcessor { attestation_verification::verify_propagation_slot_range( seen_clock, failed_att.attestation(), + &self.chain.spec, ); // Only penalize the peer if it would have been invalid at the moment we received @@ -2182,6 +2183,7 @@ impl NetworkBeaconProcessor { sync_committee_verification::verify_propagation_slot_range( seen_clock, &sync_committee_message_slot, + &self.chain.spec, ); hindsight_verification.is_err() }; @@ -2494,6 +2496,7 @@ impl NetworkBeaconProcessor { let is_timely = attestation_verification::verify_propagation_slot_range( &self.chain.slot_clock, attestation, + &self.chain.spec, ) .is_ok(); diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 7f0ef1fb817..db83bfc1645 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -577,14 +577,3 @@ impl NetworkBeaconProcessor> { (network_beacon_processor, beacon_processor_receive) } } - -#[cfg(test)] -mod test { - #[test] - fn queued_block_delay_is_sane() { - assert!( - beacon_processor::work_reprocessing_queue::ADDITIONAL_QUEUED_BLOCK_DELAY - < beacon_chain::MAXIMUM_GOSSIP_CLOCK_DISPARITY - ); - } -} diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index b8d5db568ec..dbe93de1ea9 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -11,7 +11,7 @@ use crate::{ use beacon_chain::test_utils::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, }; -use beacon_chain::{BeaconChain, ChainConfig, MAXIMUM_GOSSIP_CLOCK_DISPARITY}; +use beacon_chain::{BeaconChain, ChainConfig}; use beacon_processor::{work_reprocessing_queue::*, *}; use lighthouse_network::{ discv5::enr::{CombinedKey, EnrBuilder}, @@ -215,7 +215,7 @@ impl TestRig { }; let network_beacon_processor = Arc::new(network_beacon_processor); - BeaconProcessor { + let beacon_processor = BeaconProcessor { network_globals, executor, max_workers: cmp::max(1, num_cpus::get()), @@ -229,8 +229,11 @@ impl TestRig { work_reprocessing_rx, Some(work_journal_tx), harness.chain.slot_clock.clone(), + chain.spec.maximum_gossip_clock_disparity(), ); + assert!(!beacon_processor.is_err()); + Self { chain, next_block: Arc::new(next_block), @@ -505,7 +508,7 @@ async fn import_gossip_block_acceptably_early() { rig.chain .slot_clock - .set_current_time(slot_start - MAXIMUM_GOSSIP_CLOCK_DISPARITY); + .set_current_time(slot_start - rig.chain.spec.maximum_gossip_clock_disparity()); assert_eq!( rig.chain.slot().unwrap(), @@ -552,9 +555,9 @@ async fn import_gossip_block_unacceptably_early() { .start_of(rig.next_block.slot()) .unwrap(); - rig.chain - .slot_clock - .set_current_time(slot_start - MAXIMUM_GOSSIP_CLOCK_DISPARITY - Duration::from_millis(1)); + rig.chain.slot_clock.set_current_time( + slot_start - rig.chain.spec.maximum_gossip_clock_disparity() - Duration::from_millis(1), + ); assert_eq!( rig.chain.slot().unwrap(), diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 4abf649bfbb..c16b1675a9f 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -581,8 +581,10 @@ pub fn get_config( }; } - client_config.chain.max_network_size = - lighthouse_network::gossip_max_size(spec.bellatrix_fork_epoch.is_some()); + client_config.chain.max_network_size = lighthouse_network::gossip_max_size( + spec.bellatrix_fork_epoch.is_some(), + spec.gossip_max_size as usize, + ); if cli_args.is_present("slasher") { let slasher_dir = if let Some(slasher_dir) = cli_args.value_of("slasher-dir") { diff --git a/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml b/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml index 0fdc159ec2b..8e7a9dd07a4 100644 --- a/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml @@ -89,4 +89,14 @@ DEPOSIT_CONTRACT_ADDRESS: 0x0B98057eA310F4d31F2a452B414647007d1645d9 # Network # --------------------------------------------------------------- -SUBNETS_PER_NODE: 4 \ No newline at end of file +SUBNETS_PER_NODE: 4 +GOSSIP_MAX_SIZE: 10485760 +MIN_EPOCHS_FOR_BLOCK_REQUESTS: 33024 +MAX_CHUNK_SIZE: 10485760 +TTFB_TIMEOUT: 5 +RESP_TIMEOUT: 10 +MESSAGE_DOMAIN_INVALID_SNAPPY: 0x00000000 +MESSAGE_DOMAIN_VALID_SNAPPY: 0x01000000 +ATTESTATION_SUBNET_COUNT: 64 +ATTESTATION_SUBNET_EXTRA_BITS: 0 +ATTESTATION_SUBNET_PREFIX_BITS: 6 diff --git a/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml b/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml index 7b26b30a6ce..98984f3b7db 100644 --- a/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml @@ -89,4 +89,14 @@ DEPOSIT_CONTRACT_ADDRESS: 0x00000000219ab540356cBB839Cbe05303d7705Fa # Network # --------------------------------------------------------------- -SUBNETS_PER_NODE: 2 \ No newline at end of file +SUBNETS_PER_NODE: 2 +GOSSIP_MAX_SIZE: 10485760 +MIN_EPOCHS_FOR_BLOCK_REQUESTS: 33024 +MAX_CHUNK_SIZE: 10485760 +TTFB_TIMEOUT: 5 +RESP_TIMEOUT: 10 +MESSAGE_DOMAIN_INVALID_SNAPPY: 0x00000000 +MESSAGE_DOMAIN_VALID_SNAPPY: 0x01000000 +ATTESTATION_SUBNET_COUNT: 64 +ATTESTATION_SUBNET_EXTRA_BITS: 0 +ATTESTATION_SUBNET_PREFIX_BITS: 6 diff --git a/common/eth2_network_config/built_in_network_configs/prater/config.yaml b/common/eth2_network_config/built_in_network_configs/prater/config.yaml index 63b3d45db9a..a0dd85fec07 100644 --- a/common/eth2_network_config/built_in_network_configs/prater/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/prater/config.yaml @@ -89,4 +89,14 @@ DEPOSIT_CONTRACT_ADDRESS: 0xff50ed3d0ec03aC01D4C79aAd74928BFF48a7b2b # Network # --------------------------------------------------------------- -SUBNETS_PER_NODE: 2 \ No newline at end of file +SUBNETS_PER_NODE: 2 +GOSSIP_MAX_SIZE: 10485760 +MIN_EPOCHS_FOR_BLOCK_REQUESTS: 33024 +MAX_CHUNK_SIZE: 10485760 +TTFB_TIMEOUT: 5 +RESP_TIMEOUT: 10 +MESSAGE_DOMAIN_INVALID_SNAPPY: 0x00000000 +MESSAGE_DOMAIN_VALID_SNAPPY: 0x01000000 +ATTESTATION_SUBNET_COUNT: 64 +ATTESTATION_SUBNET_EXTRA_BITS: 0 +ATTESTATION_SUBNET_PREFIX_BITS: 6 diff --git a/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml b/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml index 8489f085f4c..e3674cf7df5 100644 --- a/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml @@ -77,4 +77,14 @@ DEPOSIT_CONTRACT_ADDRESS: 0x7f02C3E3c98b133055B8B348B2Ac625669Ed295D # Network # --------------------------------------------------------------- -SUBNETS_PER_NODE: 2 \ No newline at end of file +SUBNETS_PER_NODE: 2 +GOSSIP_MAX_SIZE: 10485760 +MIN_EPOCHS_FOR_BLOCK_REQUESTS: 33024 +MAX_CHUNK_SIZE: 10485760 +TTFB_TIMEOUT: 5 +RESP_TIMEOUT: 10 +MESSAGE_DOMAIN_INVALID_SNAPPY: 0x00000000 +MESSAGE_DOMAIN_VALID_SNAPPY: 0x01000000 +ATTESTATION_SUBNET_COUNT: 64 +ATTESTATION_SUBNET_EXTRA_BITS: 0 +ATTESTATION_SUBNET_PREFIX_BITS: 6 diff --git a/consensus/types/Cargo.toml b/consensus/types/Cargo.toml index ba15f6d4885..f030f2e97a6 100644 --- a/consensus/types/Cargo.toml +++ b/consensus/types/Cargo.toml @@ -60,6 +60,7 @@ beacon_chain = { path = "../../beacon_node/beacon_chain" } eth2_interop_keypairs = { path = "../../common/eth2_interop_keypairs" } state_processing = { path = "../state_processing" } tokio = "1.14.0" +paste = "1.0.14" [features] default = ["sqlite", "legacy-arith"] diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index fbb6a3d8573..a13d3116d8b 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -6,6 +6,7 @@ use serde_derive::Deserialize; use serde_utils::quoted_u64::MaybeQuoted; use std::fs::File; use std::path::Path; +use std::time::Duration; use tree_hash::TreeHash; /// Each of the BLS signature domains. @@ -170,7 +171,15 @@ pub struct ChainSpec { pub attestation_subnet_count: u64, pub subnets_per_node: u8, pub epochs_per_subnet_subscription: u64, + pub gossip_max_size: u64, + pub min_epochs_for_block_requests: u64, + pub max_chunk_size: u64, + pub ttfb_timeout: u64, + pub resp_timeout: u64, + pub message_domain_invalid_snappy: [u8; 4], + pub message_domain_valid_snappy: [u8; 4], pub attestation_subnet_extra_bits: u8, + pub attestation_subnet_prefix_bits: u8, /* * Application params @@ -451,10 +460,16 @@ impl ChainSpec { Hash256::from(domain) } - #[allow(clippy::arithmetic_side_effects)] - pub const fn attestation_subnet_prefix_bits(&self) -> u32 { - let attestation_subnet_count_bits = self.attestation_subnet_count.ilog2(); - self.attestation_subnet_extra_bits as u32 + attestation_subnet_count_bits + pub fn maximum_gossip_clock_disparity(&self) -> Duration { + Duration::from_millis(self.maximum_gossip_clock_disparity_millis) + } + + pub fn ttfb_timeout(&self) -> Duration { + Duration::from_secs(self.ttfb_timeout) + } + + pub fn resp_timeout(&self) -> Duration { + Duration::from_secs(self.resp_timeout) } /// Returns a `ChainSpec` compatible with the Ethereum Foundation specification. @@ -617,8 +632,15 @@ impl ChainSpec { maximum_gossip_clock_disparity_millis: 500, target_aggregators_per_committee: 16, epochs_per_subnet_subscription: 256, - attestation_subnet_extra_bits: 0, - + gossip_max_size: default_gossip_max_size(), + min_epochs_for_block_requests: default_min_epochs_for_block_requests(), + max_chunk_size: default_max_chunk_size(), + ttfb_timeout: default_ttfb_timeout(), + resp_timeout: default_resp_timeout(), + message_domain_invalid_snappy: default_message_domain_invalid_snappy(), + message_domain_valid_snappy: default_message_domain_valid_snappy(), + attestation_subnet_extra_bits: default_attestation_subnet_extra_bits(), + attestation_subnet_prefix_bits: default_attestation_subnet_prefix_bits(), /* * Application specific */ @@ -842,7 +864,15 @@ impl ChainSpec { maximum_gossip_clock_disparity_millis: 500, target_aggregators_per_committee: 16, epochs_per_subnet_subscription: 256, - attestation_subnet_extra_bits: 0, + gossip_max_size: default_gossip_max_size(), + min_epochs_for_block_requests: default_min_epochs_for_block_requests(), + max_chunk_size: default_max_chunk_size(), + ttfb_timeout: default_ttfb_timeout(), + resp_timeout: default_resp_timeout(), + message_domain_invalid_snappy: default_message_domain_invalid_snappy(), + message_domain_valid_snappy: default_message_domain_valid_snappy(), + attestation_subnet_extra_bits: default_attestation_subnet_extra_bits(), + attestation_subnet_prefix_bits: default_attestation_subnet_prefix_bits(), /* * Application specific @@ -953,6 +983,34 @@ pub struct Config { #[serde(with = "serde_utils::quoted_u64")] deposit_network_id: u64, deposit_contract_address: Address, + + #[serde(default = "default_gossip_max_size")] + #[serde(with = "serde_utils::quoted_u64")] + gossip_max_size: u64, + #[serde(default = "default_min_epochs_for_block_requests")] + #[serde(with = "serde_utils::quoted_u64")] + min_epochs_for_block_requests: u64, + #[serde(default = "default_max_chunk_size")] + #[serde(with = "serde_utils::quoted_u64")] + max_chunk_size: u64, + #[serde(default = "default_ttfb_timeout")] + #[serde(with = "serde_utils::quoted_u64")] + ttfb_timeout: u64, + #[serde(default = "default_resp_timeout")] + #[serde(with = "serde_utils::quoted_u64")] + resp_timeout: u64, + #[serde(default = "default_message_domain_invalid_snappy")] + #[serde(with = "serde_utils::bytes_4_hex")] + message_domain_invalid_snappy: [u8; 4], + #[serde(default = "default_message_domain_valid_snappy")] + #[serde(with = "serde_utils::bytes_4_hex")] + message_domain_valid_snappy: [u8; 4], + #[serde(default = "default_attestation_subnet_extra_bits")] + #[serde(with = "serde_utils::quoted_u8")] + attestation_subnet_extra_bits: u8, + #[serde(default = "default_attestation_subnet_prefix_bits")] + #[serde(with = "serde_utils::quoted_u8")] + attestation_subnet_prefix_bits: u8, } fn default_bellatrix_fork_version() -> [u8; 4] { @@ -993,6 +1051,42 @@ fn default_subnets_per_node() -> u8 { 2u8 } +const fn default_gossip_max_size() -> u64 { + 10485760 +} + +const fn default_min_epochs_for_block_requests() -> u64 { + 33024 +} + +const fn default_max_chunk_size() -> u64 { + 10485760 +} + +const fn default_ttfb_timeout() -> u64 { + 5 +} + +const fn default_resp_timeout() -> u64 { + 10 +} + +const fn default_message_domain_invalid_snappy() -> [u8; 4] { + [0, 0, 0, 0] +} + +const fn default_message_domain_valid_snappy() -> [u8; 4] { + [1, 0, 0, 0] +} + +const fn default_attestation_subnet_extra_bits() -> u8 { + 0 +} + +const fn default_attestation_subnet_prefix_bits() -> u8 { + 6 +} + impl Default for Config { fn default() -> Self { let chain_spec = MainnetEthSpec::default_spec(); @@ -1088,6 +1182,16 @@ impl Config { deposit_chain_id: spec.deposit_chain_id, deposit_network_id: spec.deposit_network_id, deposit_contract_address: spec.deposit_contract_address, + + gossip_max_size: spec.gossip_max_size, + min_epochs_for_block_requests: spec.min_epochs_for_block_requests, + max_chunk_size: spec.max_chunk_size, + ttfb_timeout: spec.ttfb_timeout, + resp_timeout: spec.resp_timeout, + message_domain_invalid_snappy: spec.message_domain_invalid_snappy, + message_domain_valid_snappy: spec.message_domain_valid_snappy, + attestation_subnet_extra_bits: spec.attestation_subnet_extra_bits, + attestation_subnet_prefix_bits: spec.attestation_subnet_prefix_bits, } } @@ -1132,6 +1236,15 @@ impl Config { deposit_chain_id, deposit_network_id, deposit_contract_address, + gossip_max_size, + min_epochs_for_block_requests, + max_chunk_size, + ttfb_timeout, + resp_timeout, + message_domain_invalid_snappy, + message_domain_valid_snappy, + attestation_subnet_extra_bits, + attestation_subnet_prefix_bits, } = self; if preset_base != T::spec_name().to_string().as_str() { @@ -1169,6 +1282,15 @@ impl Config { terminal_block_hash, terminal_block_hash_activation_epoch, safe_slots_to_import_optimistically, + gossip_max_size, + min_epochs_for_block_requests, + max_chunk_size, + ttfb_timeout, + resp_timeout, + message_domain_invalid_snappy, + message_domain_valid_snappy, + attestation_subnet_extra_bits, + attestation_subnet_prefix_bits, ..chain_spec.clone() }) } @@ -1306,6 +1428,7 @@ mod tests { #[cfg(test)] mod yaml_tests { use super::*; + use paste::paste; use tempfile::NamedTempFile; #[test] @@ -1410,29 +1533,35 @@ mod yaml_tests { "#; let chain_spec: Config = serde_yaml::from_str(spec).unwrap(); - assert_eq!( - chain_spec.terminal_total_difficulty, - default_terminal_total_difficulty() - ); - assert_eq!( - chain_spec.terminal_block_hash, - default_terminal_block_hash() - ); - assert_eq!( - chain_spec.terminal_block_hash_activation_epoch, - default_terminal_block_hash_activation_epoch() - ); - assert_eq!( - chain_spec.safe_slots_to_import_optimistically, - default_safe_slots_to_import_optimistically() - ); - assert_eq!(chain_spec.bellatrix_fork_epoch, None); + // Asserts that `chain_spec.$name` and `default_$name()` are equal. + macro_rules! check_default { + ($name: ident) => { + paste! { + assert_eq!( + chain_spec.$name, + [](), + "{} does not match default", stringify!($name)); + } + }; + } - assert_eq!( - chain_spec.bellatrix_fork_version, - default_bellatrix_fork_version() - ); + check_default!(terminal_total_difficulty); + check_default!(terminal_block_hash); + check_default!(terminal_block_hash_activation_epoch); + check_default!(safe_slots_to_import_optimistically); + check_default!(bellatrix_fork_version); + check_default!(gossip_max_size); + check_default!(min_epochs_for_block_requests); + check_default!(max_chunk_size); + check_default!(ttfb_timeout); + check_default!(resp_timeout); + check_default!(message_domain_invalid_snappy); + check_default!(message_domain_valid_snappy); + check_default!(attestation_subnet_extra_bits); + check_default!(attestation_subnet_prefix_bits); + + assert_eq!(chain_spec.bellatrix_fork_epoch, None); } #[test] diff --git a/consensus/types/src/subnet_id.rs b/consensus/types/src/subnet_id.rs index eb25b57b0d7..415d6a14040 100644 --- a/consensus/types/src/subnet_id.rs +++ b/consensus/types/src/subnet_id.rs @@ -84,7 +84,7 @@ impl SubnetId { let subscription_duration = spec.epochs_per_subnet_subscription; let node_id_prefix = - (node_id >> (256 - spec.attestation_subnet_prefix_bits() as usize)).as_usize(); + (node_id >> (256 - spec.attestation_subnet_prefix_bits as usize)).as_usize(); // NOTE: The as_u64() panics if the number is larger than u64::max_value(). This cannot be // true as spec.epochs_per_subnet_subscription is a u64. @@ -99,7 +99,7 @@ impl SubnetId { let permutation_seed = ethereum_hashing::hash(&int_to_bytes::int_to_bytes8(subscription_event_idx)); - let num_subnets = 1 << spec.attestation_subnet_prefix_bits(); + let num_subnets = 1 << spec.attestation_subnet_prefix_bits; let permutated_prefix = compute_shuffled_index( node_id_prefix, num_subnets, diff --git a/lighthouse/environment/tests/testnet_dir/config.yaml b/lighthouse/environment/tests/testnet_dir/config.yaml index 33aa8ad165d..b98145163c4 100644 --- a/lighthouse/environment/tests/testnet_dir/config.yaml +++ b/lighthouse/environment/tests/testnet_dir/config.yaml @@ -81,3 +81,17 @@ PROPOSER_SCORE_BOOST: 40 DEPOSIT_CHAIN_ID: 1 DEPOSIT_NETWORK_ID: 1 DEPOSIT_CONTRACT_ADDRESS: 0x00000000219ab540356cBB839Cbe05303d7705Fa + +# Network +# --------------------------------------------------------------- +SUBNETS_PER_NODE: 2 +GOSSIP_MAX_SIZE: 10485760 +MIN_EPOCHS_FOR_BLOCK_REQUESTS: 33024 +MAX_CHUNK_SIZE: 10485760 +TTFB_TIMEOUT: 5 +RESP_TIMEOUT: 10 +MESSAGE_DOMAIN_INVALID_SNAPPY: 0x00000000 +MESSAGE_DOMAIN_VALID_SNAPPY: 0x01000000 +ATTESTATION_SUBNET_COUNT: 64 +ATTESTATION_SUBNET_EXTRA_BITS: 0 +ATTESTATION_SUBNET_PREFIX_BITS: 6