diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 02c287b68e3..2a69b04c916 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -135,6 +135,7 @@ pub struct BeaconProcessorQueueLengths { lc_bootstrap_queue: usize, lc_optimistic_update_queue: usize, lc_finality_update_queue: usize, + lc_update_range_queue: usize, api_request_p0_queue: usize, api_request_p1_queue: usize, } @@ -202,6 +203,7 @@ impl BeaconProcessorQueueLengths { lc_bootstrap_queue: 1024, lc_optimistic_update_queue: 512, lc_finality_update_queue: 512, + lc_update_range_queue: 512, api_request_p0_queue: 1024, api_request_p1_queue: 1024, }) @@ -622,6 +624,7 @@ pub enum Work { LightClientBootstrapRequest(BlockingFn), LightClientOptimisticUpdateRequest(BlockingFn), LightClientFinalityUpdateRequest(BlockingFn), + LightClientUpdatesByRangeRequest(BlockingFn), ApiRequestP0(BlockingOrAsync), ApiRequestP1(BlockingOrAsync), } @@ -673,6 +676,7 @@ pub enum WorkType { LightClientBootstrapRequest, LightClientOptimisticUpdateRequest, LightClientFinalityUpdateRequest, + LightClientUpdatesByRangeRequest, ApiRequestP0, ApiRequestP1, } @@ -723,6 +727,7 @@ impl Work { WorkType::LightClientOptimisticUpdateRequest } Work::LightClientFinalityUpdateRequest(_) => WorkType::LightClientFinalityUpdateRequest, + Work::LightClientUpdatesByRangeRequest(_) => WorkType::LightClientUpdatesByRangeRequest, Work::UnknownBlockAttestation { .. } => WorkType::UnknownBlockAttestation, Work::UnknownBlockAggregate { .. } => WorkType::UnknownBlockAggregate, Work::UnknownBlockSamplingRequest { .. } => WorkType::UnknownBlockSamplingRequest, @@ -902,6 +907,7 @@ impl BeaconProcessor { let mut lc_optimistic_update_queue = FifoQueue::new(queue_lengths.lc_optimistic_update_queue); let mut lc_finality_update_queue = FifoQueue::new(queue_lengths.lc_finality_update_queue); + let mut lc_update_range_queue = FifoQueue::new(queue_lengths.lc_update_range_queue); let mut api_request_p0_queue = FifoQueue::new(queue_lengths.api_request_p0_queue); let mut api_request_p1_queue = FifoQueue::new(queue_lengths.api_request_p1_queue); @@ -1379,6 +1385,9 @@ impl BeaconProcessor { Work::LightClientFinalityUpdateRequest { .. } => { lc_finality_update_queue.push(work, work_id, &self.log) } + Work::LightClientUpdatesByRangeRequest { .. } => { + lc_update_range_queue.push(work, work_id, &self.log) + } Work::UnknownBlockAttestation { .. } => { unknown_block_attestation_queue.push(work) } @@ -1470,6 +1479,7 @@ impl BeaconProcessor { WorkType::LightClientFinalityUpdateRequest => { lc_finality_update_queue.len() } + WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(), WorkType::ApiRequestP0 => api_request_p0_queue.len(), WorkType::ApiRequestP1 => api_request_p1_queue.len(), }; @@ -1622,7 +1632,8 @@ impl BeaconProcessor { | Work::GossipBlsToExecutionChange(process_fn) | Work::LightClientBootstrapRequest(process_fn) | Work::LightClientOptimisticUpdateRequest(process_fn) - | Work::LightClientFinalityUpdateRequest(process_fn) => { + | Work::LightClientFinalityUpdateRequest(process_fn) + | Work::LightClientUpdatesByRangeRequest(process_fn) => { task_spawner.spawn_blocking(process_fn) } }; diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index ec4d892c9b0..c1e72d250ff 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -558,6 +558,7 @@ impl PeerManager { Protocol::LightClientBootstrap => return, Protocol::LightClientOptimisticUpdate => return, Protocol::LightClientFinalityUpdate => return, + Protocol::LightClientUpdatesByRange => return, Protocol::BlobsByRoot => PeerAction::MidToleranceError, Protocol::DataColumnsByRoot => PeerAction::MidToleranceError, Protocol::DataColumnsByRange => PeerAction::MidToleranceError, @@ -585,6 +586,7 @@ impl PeerManager { Protocol::LightClientBootstrap => return, Protocol::LightClientOptimisticUpdate => return, Protocol::LightClientFinalityUpdate => return, + Protocol::LightClientUpdatesByRange => return, Protocol::MetaData => PeerAction::Fatal, Protocol::Status => PeerAction::Fatal, } @@ -606,6 +608,7 @@ impl PeerManager { Protocol::LightClientBootstrap => return, Protocol::LightClientOptimisticUpdate => return, Protocol::LightClientFinalityUpdate => return, + Protocol::LightClientUpdatesByRange => return, Protocol::Goodbye => return, Protocol::MetaData => return, Protocol::Status => return, diff --git a/beacon_node/lighthouse_network/src/rpc/codec.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs index 13af04f9b8d..9bdecab70b1 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -18,9 +18,9 @@ use tokio_util::codec::{Decoder, Encoder}; use types::{ BlobSidecar, ChainSpec, DataColumnSidecar, EthSpec, ForkContext, ForkName, Hash256, LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate, - RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, - SignedBeaconBlockBellatrix, SignedBeaconBlockCapella, SignedBeaconBlockDeneb, - SignedBeaconBlockElectra, + LightClientUpdate, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockAltair, + SignedBeaconBlockBase, SignedBeaconBlockBellatrix, SignedBeaconBlockCapella, + SignedBeaconBlockDeneb, SignedBeaconBlockElectra, }; use unsigned_varint::codec::Uvi; @@ -76,6 +76,7 @@ impl SSZSnappyInboundCodec { RpcSuccessResponse::LightClientBootstrap(res) => res.as_ssz_bytes(), RpcSuccessResponse::LightClientOptimisticUpdate(res) => res.as_ssz_bytes(), RpcSuccessResponse::LightClientFinalityUpdate(res) => res.as_ssz_bytes(), + RpcSuccessResponse::LightClientUpdatesByRange(res) => res.as_ssz_bytes(), RpcSuccessResponse::Pong(res) => res.data.as_ssz_bytes(), RpcSuccessResponse::MetaData(res) => // Encode the correct version of the MetaData response based on the negotiated version. @@ -342,6 +343,7 @@ impl Encoder> for SSZSnappyOutboundCodec { RequestType::DataColumnsByRoot(req) => req.data_column_ids.as_ssz_bytes(), RequestType::Ping(req) => req.as_ssz_bytes(), RequestType::LightClientBootstrap(req) => req.as_ssz_bytes(), + RequestType::LightClientUpdatesByRange(req) => req.as_ssz_bytes(), // no metadata to encode RequestType::MetaData(_) | RequestType::LightClientOptimisticUpdate @@ -503,6 +505,10 @@ fn context_bytes( return lc_finality_update .map_with_fork_name(|fork_name| fork_context.to_context_bytes(fork_name)); } + RpcSuccessResponse::LightClientUpdatesByRange(lc_update) => { + return lc_update + .map_with_fork_name(|fork_name| fork_context.to_context_bytes(fork_name)); + } // These will not pass the has_context_bytes() check RpcSuccessResponse::Status(_) | RpcSuccessResponse::Pong(_) @@ -613,6 +619,11 @@ fn handle_rpc_request( SupportedProtocol::LightClientFinalityUpdateV1 => { Ok(Some(RequestType::LightClientFinalityUpdate)) } + SupportedProtocol::LightClientUpdatesByRangeV1 => { + Ok(Some(RequestType::LightClientUpdatesByRange( + LightClientUpdatesByRangeRequest::from_ssz_bytes(decoded_buffer)?, + ))) + } // MetaData requests return early from InboundUpgrade and do not reach the decoder. // Handle this case just for completeness. SupportedProtocol::MetaDataV3 => { @@ -795,6 +806,21 @@ fn handle_rpc_response( ), )), }, + SupportedProtocol::LightClientUpdatesByRangeV1 => match fork_name { + Some(fork_name) => Ok(Some(RpcSuccessResponse::LightClientUpdatesByRange( + Arc::new(LightClientUpdate::from_ssz_bytes( + decoded_buffer, + &fork_name, + )?), + ))), + None => Err(RPCError::ErrorResponse( + RpcErrorResponse::InvalidRequest, + format!( + "No context bytes provided for {:?} response", + versioned_protocol + ), + )), + }, // MetaData V2/V3 responses have no context bytes, so behave similarly to V1 responses SupportedProtocol::MetaDataV3 => Ok(Some(RpcSuccessResponse::MetaData(MetaData::V3( MetaDataV3::from_ssz_bytes(decoded_buffer)?, @@ -1214,6 +1240,12 @@ mod tests { ) } RequestType::LightClientOptimisticUpdate | RequestType::LightClientFinalityUpdate => {} + RequestType::LightClientUpdatesByRange(light_client_updates_by_range) => { + assert_eq!( + decoded, + RequestType::LightClientUpdatesByRange(light_client_updates_by_range) + ) + } } } diff --git a/beacon_node/lighthouse_network/src/rpc/config.rs b/beacon_node/lighthouse_network/src/rpc/config.rs index fcb9c986048..42ece6dc4ff 100644 --- a/beacon_node/lighthouse_network/src/rpc/config.rs +++ b/beacon_node/lighthouse_network/src/rpc/config.rs @@ -96,6 +96,7 @@ pub struct RateLimiterConfig { pub(super) light_client_bootstrap_quota: Quota, pub(super) light_client_optimistic_update_quota: Quota, pub(super) light_client_finality_update_quota: Quota, + pub(super) light_client_updates_by_range_quota: Quota, } impl RateLimiterConfig { @@ -121,6 +122,7 @@ impl RateLimiterConfig { pub const DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA: Quota = Quota::one_every(10); pub const DEFAULT_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUOTA: Quota = Quota::one_every(10); pub const DEFAULT_LIGHT_CLIENT_FINALITY_UPDATE_QUOTA: Quota = Quota::one_every(10); + pub const DEFAULT_LIGHT_CLIENT_UPDATES_BY_RANGE_QUOTA: Quota = Quota::one_every(10); } impl Default for RateLimiterConfig { @@ -140,6 +142,7 @@ impl Default for RateLimiterConfig { light_client_optimistic_update_quota: Self::DEFAULT_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUOTA, light_client_finality_update_quota: Self::DEFAULT_LIGHT_CLIENT_FINALITY_UPDATE_QUOTA, + light_client_updates_by_range_quota: Self::DEFAULT_LIGHT_CLIENT_UPDATES_BY_RANGE_QUOTA, } } } @@ -198,6 +201,7 @@ impl FromStr for RateLimiterConfig { let mut light_client_bootstrap_quota = None; let mut light_client_optimistic_update_quota = None; let mut light_client_finality_update_quota = None; + let mut light_client_updates_by_range_quota = None; for proto_def in s.split(';') { let ProtocolQuota { protocol, quota } = proto_def.parse()?; @@ -228,6 +232,10 @@ impl FromStr for RateLimiterConfig { light_client_finality_update_quota = light_client_finality_update_quota.or(quota) } + Protocol::LightClientUpdatesByRange => { + light_client_updates_by_range_quota = + light_client_updates_by_range_quota.or(quota) + } } } Ok(RateLimiterConfig { @@ -252,6 +260,8 @@ impl FromStr for RateLimiterConfig { .unwrap_or(Self::DEFAULT_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUOTA), light_client_finality_update_quota: light_client_finality_update_quota .unwrap_or(Self::DEFAULT_LIGHT_CLIENT_FINALITY_UPDATE_QUOTA), + light_client_updates_by_range_quota: light_client_updates_by_range_quota + .unwrap_or(Self::DEFAULT_LIGHT_CLIENT_UPDATES_BY_RANGE_QUOTA), }) } } diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index dc7d316fb0f..bb8bfb0e206 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -14,10 +14,11 @@ use std::sync::Arc; use strum::IntoStaticStr; use superstruct::superstruct; use types::blob_sidecar::BlobIdentifier; +use types::light_client_update::MAX_REQUEST_LIGHT_CLIENT_UPDATES; use types::{ blob_sidecar::BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar, Epoch, EthSpec, Hash256, LightClientBootstrap, LightClientFinalityUpdate, - LightClientOptimisticUpdate, RuntimeVariableList, SignedBeaconBlock, Slot, + LightClientOptimisticUpdate, LightClientUpdate, RuntimeVariableList, SignedBeaconBlock, Slot, }; /// Maximum length of error message. @@ -477,6 +478,34 @@ impl DataColumnsByRootRequest { } } +/// Request a number of beacon data columns from a peer. +#[derive(Encode, Decode, Clone, Debug, PartialEq)] +pub struct LightClientUpdatesByRangeRequest { + /// The starting period to request light client updates. + pub start_period: u64, + /// The number of periods from `start_period`. + pub count: u64, +} + +impl LightClientUpdatesByRangeRequest { + pub fn max_requested(&self) -> u64 { + MAX_REQUEST_LIGHT_CLIENT_UPDATES + } + + pub fn ssz_min_len() -> usize { + LightClientUpdatesByRangeRequest { + start_period: 0, + count: 0, + } + .as_ssz_bytes() + .len() + } + + pub fn ssz_max_len() -> usize { + Self::ssz_min_len() + } +} + /* RPC Handling and Grouping */ // Collection of enums and structs used by the Codecs to encode/decode RPC messages @@ -504,6 +533,9 @@ pub enum RpcSuccessResponse { /// A response to a get LIGHT_CLIENT_FINALITY_UPDATE request. LightClientFinalityUpdate(Arc>), + /// A response to a get LIGHT_CLIENT_UPDATES_BY_RANGE request. + LightClientUpdatesByRange(Arc>), + /// A response to a get BLOBS_BY_ROOT request. BlobsByRoot(Arc>), @@ -540,6 +572,9 @@ pub enum ResponseTermination { /// Data column sidecars by range stream termination. DataColumnsByRange, + + /// Light client updates by range stream termination. + LightClientUpdatesByRange, } /// The structured response containing a result/code indicating success or failure @@ -639,6 +674,7 @@ impl RpcSuccessResponse { Protocol::LightClientOptimisticUpdate } RpcSuccessResponse::LightClientFinalityUpdate(_) => Protocol::LightClientFinalityUpdate, + RpcSuccessResponse::LightClientUpdatesByRange(_) => Protocol::LightClientUpdatesByRange, } } } @@ -710,6 +746,13 @@ impl std::fmt::Display for RpcSuccessResponse { update.signature_slot() ) } + RpcSuccessResponse::LightClientUpdatesByRange(update) => { + write!( + f, + "LightClientUpdatesByRange Slot: {}", + update.signature_slot(), + ) + } } } } diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index e3b41ea1df7..7d091da7660 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -553,6 +553,9 @@ where ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot, ResponseTermination::DataColumnsByRoot => Protocol::DataColumnsByRoot, ResponseTermination::DataColumnsByRange => Protocol::DataColumnsByRange, + ResponseTermination::LightClientUpdatesByRange => { + Protocol::LightClientUpdatesByRange + } }, ), }; diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 67104fbc296..16c3a133912 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -21,7 +21,8 @@ use types::{ BlobSidecar, ChainSpec, DataColumnSidecar, EmptyBlock, EthSpec, ForkContext, ForkName, LightClientBootstrap, LightClientBootstrapAltair, LightClientFinalityUpdate, LightClientFinalityUpdateAltair, LightClientOptimisticUpdate, - LightClientOptimisticUpdateAltair, MainnetEthSpec, Signature, SignedBeaconBlock, + LightClientOptimisticUpdateAltair, LightClientUpdate, MainnetEthSpec, Signature, + SignedBeaconBlock, }; // Note: Hardcoding the `EthSpec` type for `SignedBeaconBlock` as min/max values is @@ -143,6 +144,13 @@ pub static LIGHT_CLIENT_BOOTSTRAP_ELECTRA_MAX: LazyLock = LazyLock::new(| LightClientBootstrap::::ssz_max_len_for_fork(ForkName::Electra) }); +pub static LIGHT_CLIENT_UPDATES_BY_RANGE_CAPELLA_MAX: LazyLock = + LazyLock::new(|| LightClientUpdate::::ssz_max_len_for_fork(ForkName::Capella)); +pub static LIGHT_CLIENT_UPDATES_BY_RANGE_DENEB_MAX: LazyLock = + LazyLock::new(|| LightClientUpdate::::ssz_max_len_for_fork(ForkName::Deneb)); +pub static LIGHT_CLIENT_UPDATES_BY_RANGE_ELECTRA_MAX: LazyLock = + LazyLock::new(|| LightClientUpdate::::ssz_max_len_for_fork(ForkName::Electra)); + /// The protocol prefix the RPC protocol id. const PROTOCOL_PREFIX: &str = "/eth2/beacon_chain/req"; /// The number of seconds to wait for the first bytes of a request once a protocol has been @@ -190,6 +198,26 @@ pub fn rpc_block_limits_by_fork(current_fork: ForkName) -> RpcLimits { } } +fn rpc_light_client_updates_by_range_limits_by_fork(current_fork: ForkName) -> RpcLimits { + let altair_fixed_len = LightClientFinalityUpdateAltair::::ssz_fixed_len(); + + match ¤t_fork { + ForkName::Base => RpcLimits::new(0, 0), + ForkName::Altair | ForkName::Bellatrix => { + RpcLimits::new(altair_fixed_len, altair_fixed_len) + } + ForkName::Capella => { + RpcLimits::new(altair_fixed_len, *LIGHT_CLIENT_UPDATES_BY_RANGE_CAPELLA_MAX) + } + ForkName::Deneb => { + RpcLimits::new(altair_fixed_len, *LIGHT_CLIENT_UPDATES_BY_RANGE_DENEB_MAX) + } + ForkName::Electra => { + RpcLimits::new(altair_fixed_len, *LIGHT_CLIENT_UPDATES_BY_RANGE_ELECTRA_MAX) + } + } +} + fn rpc_light_client_finality_update_limits_by_fork(current_fork: ForkName) -> RpcLimits { let altair_fixed_len = LightClientFinalityUpdateAltair::::ssz_fixed_len(); @@ -286,6 +314,9 @@ pub enum Protocol { /// The `LightClientFinalityUpdate` protocol name. #[strum(serialize = "light_client_finality_update")] LightClientFinalityUpdate, + /// The `LightClientUpdatesByRange` protocol name + #[strum(serialize = "light_client_updates_by_range")] + LightClientUpdatesByRange, } impl Protocol { @@ -304,6 +335,7 @@ impl Protocol { Protocol::LightClientBootstrap => None, Protocol::LightClientOptimisticUpdate => None, Protocol::LightClientFinalityUpdate => None, + Protocol::LightClientUpdatesByRange => None, } } } @@ -334,6 +366,7 @@ pub enum SupportedProtocol { LightClientBootstrapV1, LightClientOptimisticUpdateV1, LightClientFinalityUpdateV1, + LightClientUpdatesByRangeV1, } impl SupportedProtocol { @@ -356,6 +389,7 @@ impl SupportedProtocol { SupportedProtocol::LightClientBootstrapV1 => "1", SupportedProtocol::LightClientOptimisticUpdateV1 => "1", SupportedProtocol::LightClientFinalityUpdateV1 => "1", + SupportedProtocol::LightClientUpdatesByRangeV1 => "1", } } @@ -380,6 +414,7 @@ impl SupportedProtocol { Protocol::LightClientOptimisticUpdate } SupportedProtocol::LightClientFinalityUpdateV1 => Protocol::LightClientFinalityUpdate, + SupportedProtocol::LightClientUpdatesByRangeV1 => Protocol::LightClientUpdatesByRange, } } @@ -542,6 +577,10 @@ impl ProtocolId { ), Protocol::LightClientOptimisticUpdate => RpcLimits::new(0, 0), Protocol::LightClientFinalityUpdate => RpcLimits::new(0, 0), + Protocol::LightClientUpdatesByRange => RpcLimits::new( + LightClientUpdatesByRangeRequest::ssz_min_len(), + LightClientUpdatesByRangeRequest::ssz_max_len(), + ), Protocol::MetaData => RpcLimits::new(0, 0), // Metadata requests are empty } } @@ -577,6 +616,9 @@ impl ProtocolId { Protocol::LightClientFinalityUpdate => { rpc_light_client_finality_update_limits_by_fork(fork_context.current_fork()) } + Protocol::LightClientUpdatesByRange => { + rpc_light_client_updates_by_range_limits_by_fork(fork_context.current_fork()) + } } } @@ -592,7 +634,8 @@ impl ProtocolId { | SupportedProtocol::DataColumnsByRangeV1 | SupportedProtocol::LightClientBootstrapV1 | SupportedProtocol::LightClientOptimisticUpdateV1 - | SupportedProtocol::LightClientFinalityUpdateV1 => true, + | SupportedProtocol::LightClientFinalityUpdateV1 + | SupportedProtocol::LightClientUpdatesByRangeV1 => true, SupportedProtocol::StatusV1 | SupportedProtocol::BlocksByRootV1 | SupportedProtocol::BlocksByRangeV1 @@ -723,6 +766,7 @@ pub enum RequestType { LightClientBootstrap(LightClientBootstrapRequest), LightClientOptimisticUpdate, LightClientFinalityUpdate, + LightClientUpdatesByRange(LightClientUpdatesByRangeRequest), Ping(Ping), MetaData(MetadataRequest), } @@ -747,6 +791,7 @@ impl RequestType { RequestType::LightClientBootstrap(_) => 1, RequestType::LightClientOptimisticUpdate => 1, RequestType::LightClientFinalityUpdate => 1, + RequestType::LightClientUpdatesByRange(req) => req.max_requested(), } } @@ -780,6 +825,9 @@ impl RequestType { RequestType::LightClientFinalityUpdate => { SupportedProtocol::LightClientFinalityUpdateV1 } + RequestType::LightClientUpdatesByRange(_) => { + SupportedProtocol::LightClientUpdatesByRangeV1 + } } } @@ -802,6 +850,7 @@ impl RequestType { RequestType::LightClientBootstrap(_) => unreachable!(), RequestType::LightClientFinalityUpdate => unreachable!(), RequestType::LightClientOptimisticUpdate => unreachable!(), + RequestType::LightClientUpdatesByRange(_) => unreachable!(), } } @@ -861,6 +910,10 @@ impl RequestType { SupportedProtocol::LightClientFinalityUpdateV1, Encoding::SSZSnappy, )], + RequestType::LightClientUpdatesByRange(_) => vec![ProtocolId::new( + SupportedProtocol::LightClientUpdatesByRangeV1, + Encoding::SSZSnappy, + )], } } @@ -879,6 +932,7 @@ impl RequestType { RequestType::LightClientBootstrap(_) => true, RequestType::LightClientOptimisticUpdate => true, RequestType::LightClientFinalityUpdate => true, + RequestType::LightClientUpdatesByRange(_) => true, } } } @@ -997,6 +1051,9 @@ impl std::fmt::Display for RequestType { RequestType::LightClientFinalityUpdate => { write!(f, "Light client finality update request") } + RequestType::LightClientUpdatesByRange(_) => { + write!(f, "Light client updates by range request") + } } } } diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index a8e8f45b6fe..ecbacc8c112 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -107,6 +107,8 @@ pub struct RPCRateLimiter { lc_optimistic_update_rl: Limiter, /// LightClientFinalityUpdate rate limiter. lc_finality_update_rl: Limiter, + /// LightClientUpdatesByRange rate limiter. + lc_updates_by_range_rl: Limiter, } /// Error type for non conformant requests @@ -147,6 +149,8 @@ pub struct RPCRateLimiterBuilder { lc_optimistic_update_quota: Option, /// Quota for the LightClientOptimisticUpdate protocol. lc_finality_update_quota: Option, + /// Quota for the LightClientUpdatesByRange protocol. + lc_updates_by_range_quota: Option, } impl RPCRateLimiterBuilder { @@ -167,6 +171,7 @@ impl RPCRateLimiterBuilder { Protocol::LightClientBootstrap => self.lcbootstrap_quota = q, Protocol::LightClientOptimisticUpdate => self.lc_optimistic_update_quota = q, Protocol::LightClientFinalityUpdate => self.lc_finality_update_quota = q, + Protocol::LightClientUpdatesByRange => self.lc_updates_by_range_quota = q, } self } @@ -192,6 +197,9 @@ impl RPCRateLimiterBuilder { let lc_finality_update_quota = self .lc_finality_update_quota .ok_or("LightClientFinalityUpdate quota not specified")?; + let lc_updates_by_range_quota = self + .lc_updates_by_range_quota + .ok_or("LightClientUpdatesByRange quota not specified")?; let blbrange_quota = self .blbrange_quota @@ -222,6 +230,7 @@ impl RPCRateLimiterBuilder { let lc_bootstrap_rl = Limiter::from_quota(lc_bootstrap_quota)?; let lc_optimistic_update_rl = Limiter::from_quota(lc_optimistic_update_quota)?; let lc_finality_update_rl = Limiter::from_quota(lc_finality_update_quota)?; + let lc_updates_by_range_rl = Limiter::from_quota(lc_updates_by_range_quota)?; // check for peers to prune every 30 seconds, starting in 30 seconds let prune_every = tokio::time::Duration::from_secs(30); @@ -242,6 +251,7 @@ impl RPCRateLimiterBuilder { lc_bootstrap_rl, lc_optimistic_update_rl, lc_finality_update_rl, + lc_updates_by_range_rl, init_time: Instant::now(), }) } @@ -279,6 +289,7 @@ impl RPCRateLimiter { light_client_bootstrap_quota, light_client_optimistic_update_quota, light_client_finality_update_quota, + light_client_updates_by_range_quota, } = config; Self::builder() @@ -301,6 +312,10 @@ impl RPCRateLimiter { Protocol::LightClientFinalityUpdate, light_client_finality_update_quota, ) + .set_quota( + Protocol::LightClientUpdatesByRange, + light_client_updates_by_range_quota, + ) .build() } @@ -333,6 +348,7 @@ impl RPCRateLimiter { Protocol::LightClientBootstrap => &mut self.lc_bootstrap_rl, Protocol::LightClientOptimisticUpdate => &mut self.lc_optimistic_update_rl, Protocol::LightClientFinalityUpdate => &mut self.lc_finality_update_rl, + Protocol::LightClientUpdatesByRange => &mut self.lc_updates_by_range_rl, }; check(limiter) } diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index e57e846c336..0e06013ad6b 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use libp2p::swarm::ConnectionId; use types::{ BlobSidecar, DataColumnSidecar, EthSpec, Hash256, LightClientBootstrap, - LightClientFinalityUpdate, LightClientOptimisticUpdate, SignedBeaconBlock, + LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock, }; use crate::rpc::{ @@ -114,6 +114,8 @@ pub enum Response { LightClientOptimisticUpdate(Arc>), /// A response to a LightClientFinalityUpdate request. LightClientFinalityUpdate(Arc>), + /// A response to a LightClientUpdatesByRange request. + LightClientUpdatesByRange(Option>>), } impl std::convert::From> for RpcResponse { @@ -153,6 +155,12 @@ impl std::convert::From> for RpcResponse { Response::LightClientFinalityUpdate(f) => { RpcResponse::Success(RpcSuccessResponse::LightClientFinalityUpdate(f)) } + Response::LightClientUpdatesByRange(f) => match f { + Some(d) => RpcResponse::Success(RpcSuccessResponse::LightClientUpdatesByRange(d)), + None => { + RpcResponse::StreamTermination(ResponseTermination::LightClientUpdatesByRange) + } + }, } } } diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index ea4c3acb421..4038d12dd15 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1578,6 +1578,17 @@ impl Network { request, }) } + RequestType::LightClientUpdatesByRange(_) => { + metrics::inc_counter_vec( + &metrics::TOTAL_RPC_REQUESTS, + &["light_client_updates_by_range"], + ); + Some(NetworkEvent::RequestReceived { + peer_id, + id: (connection_id, request.substream_id), + request, + }) + } } } Ok(RPCReceived::Response(id, resp)) => { @@ -1631,6 +1642,11 @@ impl Network { peer_id, Response::LightClientFinalityUpdate(update), ), + RpcSuccessResponse::LightClientUpdatesByRange(update) => self.build_response( + id, + peer_id, + Response::LightClientUpdatesByRange(Some(update)), + ), } } Ok(RPCReceived::EndOfStream(id, termination)) => { @@ -1641,6 +1657,9 @@ impl Network { ResponseTermination::BlobsByRoot => Response::BlobsByRoot(None), ResponseTermination::DataColumnsByRoot => Response::DataColumnsByRoot(None), ResponseTermination::DataColumnsByRange => Response::DataColumnsByRange(None), + ResponseTermination::LightClientUpdatesByRange => { + Response::LightClientUpdatesByRange(None) + } }; self.build_response(id, peer_id, response) } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 04571e181d7..ee412bece45 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -12,6 +12,7 @@ use beacon_processor::{ use lighthouse_network::discovery::ConnectionId; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, + LightClientUpdatesByRangeRequest, }; use lighthouse_network::rpc::{RequestId, SubstreamId}; use lighthouse_network::{ @@ -829,6 +830,32 @@ impl NetworkBeaconProcessor { }) } + /// Create a new work event to process a `LightClientUpdatesByRange` request from the RPC network. + pub fn send_light_client_updates_by_range_request( + self: &Arc, + peer_id: PeerId, + connection_id: ConnectionId, + substream_id: SubstreamId, + request_id: RequestId, + request: LightClientUpdatesByRangeRequest, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = move || { + processor.handle_light_client_updates_by_range( + peer_id, + connection_id, + substream_id, + request_id, + request, + ) + }; + + self.try_send(BeaconWorkEvent { + drop_during_sync: true, + work: Work::LightClientUpdatesByRangeRequest(Box::new(process_fn)), + }) + } + /// Send a message to `sync_tx`. /// /// Creates a log if there is an internal error. diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 88a7616ec74..6d32806713d 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -10,6 +10,7 @@ use lighthouse_network::rpc::methods::{ }; use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; +use methods::LightClientUpdatesByRangeRequest; use slog::{debug, error, warn}; use slot_clock::SlotClock; use std::collections::{hash_map::Entry, HashMap}; @@ -428,6 +429,105 @@ impl NetworkBeaconProcessor { Ok(()) } + pub fn handle_light_client_updates_by_range( + self: &Arc, + peer_id: PeerId, + connection_id: ConnectionId, + substream_id: SubstreamId, + request_id: RequestId, + request: LightClientUpdatesByRangeRequest, + ) { + self.terminate_response_stream( + peer_id, + connection_id, + substream_id, + request_id, + self.clone() + .handle_light_client_updates_by_range_request_inner( + peer_id, + connection_id, + substream_id, + request_id, + request, + ), + Response::LightClientUpdatesByRange, + ); + } + + /// Handle a `LightClientUpdatesByRange` request from the peer. + pub fn handle_light_client_updates_by_range_request_inner( + self: Arc, + peer_id: PeerId, + connection_id: ConnectionId, + substream_id: SubstreamId, + request_id: RequestId, + req: LightClientUpdatesByRangeRequest, + ) -> Result<(), (RpcErrorResponse, &'static str)> { + debug!(self.log, "Received LightClientUpdatesByRange Request"; + "peer_id" => %peer_id, + "count" => req.count, + "start_period" => req.start_period, + ); + + // Should not send more than max light client updates + let max_request_size: u64 = req.max_requested(); + if req.count > max_request_size { + return Err(( + RpcErrorResponse::InvalidRequest, + "Request exceeded max size", + )); + } + + let lc_updates = match self + .chain + .get_light_client_updates(req.start_period, req.count) + { + Ok(lc_updates) => lc_updates, + Err(e) => { + error!(self.log, "Unable to obtain light client updates"; + "request" => ?req, + "peer" => %peer_id, + "error" => ?e + ); + return Err((RpcErrorResponse::ServerError, "Database error")); + } + }; + + for lc_update in lc_updates.iter() { + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + response: Response::LightClientUpdatesByRange(Some(Arc::new(lc_update.clone()))), + request_id, + id: (connection_id, substream_id), + }); + } + + let lc_updates_sent = lc_updates.len(); + + if lc_updates_sent < req.count as usize { + debug!( + self.log, + "LightClientUpdatesByRange outgoing response processed"; + "peer" => %peer_id, + "info" => "Failed to return all requested light client updates. The peer may have requested data ahead of whats currently available", + "start_period" => req.start_period, + "requested" => req.count, + "returned" => lc_updates_sent + ); + } else { + debug!( + self.log, + "LightClientUpdatesByRange outgoing response processed"; + "peer" => %peer_id, + "start_period" => req.start_period, + "requested" => req.count, + "returned" => lc_updates_sent + ); + } + + Ok(()) + } + /// Handle a `LightClientBootstrap` request from the peer. pub fn handle_light_client_bootstrap( self: &Arc, diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index f05cb01fa42..e1badfda9d5 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -311,6 +311,17 @@ impl Router { rpc_request.id, ), ), + RequestType::LightClientUpdatesByRange(request) => self + .handle_beacon_processor_send_result( + self.network_beacon_processor + .send_light_client_updates_by_range_request( + peer_id, + request_id.0, + request_id.1, + rpc_request.id, + request, + ), + ), _ => {} } } @@ -351,7 +362,8 @@ impl Router { // Light client responses should not be received Response::LightClientBootstrap(_) | Response::LightClientOptimisticUpdate(_) - | Response::LightClientFinalityUpdate(_) => unreachable!(), + | Response::LightClientFinalityUpdate(_) + | Response::LightClientUpdatesByRange(_) => unreachable!(), } } diff --git a/consensus/types/src/light_client_update.rs b/consensus/types/src/light_client_update.rs index 3b48a68df31..1f5592a929f 100644 --- a/consensus/types/src/light_client_update.rs +++ b/consensus/types/src/light_client_update.rs @@ -1,5 +1,6 @@ use super::{EthSpec, FixedVector, Hash256, Slot, SyncAggregate, SyncCommittee}; use crate::light_client_header::LightClientHeaderElectra; +use crate::LightClientHeader; use crate::{ beacon_state, test_utils::TestRandom, ChainSpec, Epoch, ForkName, ForkVersionDeserialize, LightClientHeaderAltair, LightClientHeaderCapella, LightClientHeaderDeneb, @@ -10,7 +11,7 @@ use safe_arith::ArithError; use safe_arith::SafeArith; use serde::{Deserialize, Deserializer, Serialize}; use serde_json::Value; -use ssz::Decode; +use ssz::{Decode, Encode}; use ssz_derive::Decode; use ssz_derive::Encode; use ssz_types::typenum::{U4, U5, U6}; @@ -35,6 +36,10 @@ pub const CURRENT_SYNC_COMMITTEE_PROOF_LEN: usize = 5; pub const NEXT_SYNC_COMMITTEE_PROOF_LEN: usize = 5; pub const EXECUTION_PAYLOAD_PROOF_LEN: usize = 4; +// Max light client updates by range request limits +// spec: https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/p2p-interface.md#configuration +pub const MAX_REQUEST_LIGHT_CLIENT_UPDATES: u64 = 128; + type FinalityBranch = FixedVector; type NextSyncCommitteeBranch = FixedVector; @@ -403,6 +408,32 @@ impl LightClientUpdate { } true } + + // A `LightClientUpdate` has two `LightClientHeader`s + // Spec: https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/sync-protocol.md#lightclientupdate + #[allow(clippy::arithmetic_side_effects)] + pub fn ssz_max_len_for_fork(fork_name: ForkName) -> usize { + let fixed_len = match fork_name { + ForkName::Base | ForkName::Bellatrix => 0, + ForkName::Altair => as Encode>::ssz_fixed_len(), + ForkName::Capella => as Encode>::ssz_fixed_len(), + ForkName::Deneb => as Encode>::ssz_fixed_len(), + ForkName::Electra => as Encode>::ssz_fixed_len(), + }; + fixed_len + 2 * LightClientHeader::::ssz_max_var_len_for_fork(fork_name) + } + + pub fn map_with_fork_name(&self, func: F) -> R + where + F: Fn(ForkName) -> R, + { + match self { + Self::Altair(_) => func(ForkName::Altair), + Self::Capella(_) => func(ForkName::Capella), + Self::Deneb(_) => func(ForkName::Deneb), + Self::Electra(_) => func(ForkName::Electra), + } + } } fn compute_sync_committee_period_at_slot( diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index 9ad40a6acd4..89d759d6629 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -333,7 +333,7 @@ impl EnvironmentBuilder { eth2_network_config: Eth2NetworkConfig, ) -> Result { // Create a new chain spec from the default configuration. - self.eth2_config.spec = Arc::new(eth2_network_config.chain_spec::()?); + self.eth2_config.spec = eth2_network_config.chain_spec::()?.into(); self.eth2_network_config = Some(eth2_network_config); Ok(self)