Skip to content

Commit

Permalink
light client optimistic update reprocessing (sigp#3799)
Browse files Browse the repository at this point in the history
Currently there is a race between receiving blocks and receiving light client optimistic updates (in unstable), which results in processing errors. This is a continuation of PR sigp#3693 and seeks to progress on issue sigp#3651

Add the parent_root to ReprocessQueueMessage::BlockImported so we can remove blocks from queue when a block arrives that has the same parent root. We use the parent root as opposed to the block_root because the LightClientOptimisticUpdate does not contain the block_root.

If light_client_optimistic_update.attested_header.canonical_root() != head_block.message().parent_root() then we queue the update. Otherwise we process immediately.
michaelsproul came up with this idea.
The code was heavily based off of the attestation reprocessing.
I have not properly tested this to see if it works as intended.
  • Loading branch information
GeemoCandama authored and realbigsean committed Jan 25, 2023
1 parent 9b5c2ee commit f857811
Show file tree
Hide file tree
Showing 6 changed files with 370 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
beacon_chain::MAXIMUM_GOSSIP_CLOCK_DISPARITY, BeaconChain, BeaconChainError, BeaconChainTypes,
};
use derivative::Derivative;
use eth2::types::Hash256;
use slot_clock::SlotClock;
use std::time::Duration;
use strum::AsRefStr;
Expand Down Expand Up @@ -36,6 +37,8 @@ pub enum Error {
SigSlotStartIsNone,
/// Failed to construct a LightClientOptimisticUpdate from state.
FailedConstructingUpdate,
/// Unknown block with parent root.
UnknownBlockParentRoot(Hash256),
/// Beacon chain error occured.
BeaconChainError(BeaconChainError),
LightClientUpdateError(LightClientUpdateError),
Expand All @@ -58,6 +61,7 @@ impl From<LightClientUpdateError> for Error {
#[derivative(Clone(bound = "T: BeaconChainTypes"))]
pub struct VerifiedLightClientOptimisticUpdate<T: BeaconChainTypes> {
light_client_optimistic_update: LightClientOptimisticUpdate<T::EthSpec>,
pub parent_root: Hash256,
seen_timestamp: Duration,
}

Expand Down Expand Up @@ -107,6 +111,16 @@ impl<T: BeaconChainTypes> VerifiedLightClientOptimisticUpdate<T> {
None => return Err(Error::SigSlotStartIsNone),
}

// check if we can process the optimistic update immediately
// otherwise queue
let canonical_root = light_client_optimistic_update
.attested_header
.canonical_root();

if canonical_root != head_block.message().parent_root() {
return Err(Error::UnknownBlockParentRoot(canonical_root));
}

let optimistic_update =
LightClientOptimisticUpdate::new(&chain.spec, head_block, &attested_state)?;

Expand All @@ -119,6 +133,7 @@ impl<T: BeaconChainTypes> VerifiedLightClientOptimisticUpdate<T> {

Ok(Self {
light_client_optimistic_update,
parent_root: canonical_root,
seen_timestamp,
})
}
Expand Down
50 changes: 49 additions & 1 deletion beacon_node/network/src/beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ use types::{
SyncCommitteeMessage, SyncSubnetId,
};
use work_reprocessing_queue::{
spawn_reprocess_scheduler, QueuedAggregate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork,
spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock,
QueuedUnaggregate, ReadyWork,
};

use worker::{Toolbox, Worker};
Expand Down Expand Up @@ -144,6 +145,10 @@ const MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN: usize = 1_024;
/// before we start dropping them.
const MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 1_024;

/// The maximum number of queued `LightClientOptimisticUpdate` objects received on gossip that will be stored
/// for reprocessing before we start dropping them.
const MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN: usize = 128;

/// The maximum number of queued `SyncCommitteeMessage` objects that will be stored before we start dropping
/// them.
const MAX_SYNC_MESSAGE_QUEUE_LEN: usize = 2048;
Expand Down Expand Up @@ -233,6 +238,7 @@ pub const BLOBS_BY_ROOTS_REQUEST: &str = "blobs_by_roots_request";
pub const LIGHT_CLIENT_BOOTSTRAP_REQUEST: &str = "light_client_bootstrap";
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update";
pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change";

/// A simple first-in-first-out queue with a maximum length.
Expand Down Expand Up @@ -781,6 +787,21 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
seen_timestamp,
},
},
ReadyWork::LightClientUpdate(QueuedLightClientUpdate {
peer_id,
message_id,
light_client_optimistic_update,
seen_timestamp,
..
}) => Self {
drop_during_sync: true,
work: Work::UnknownLightClientOptimisticUpdate {
message_id,
peer_id,
light_client_optimistic_update,
seen_timestamp,
},
},
}
}
}
Expand Down Expand Up @@ -820,6 +841,12 @@ pub enum Work<T: BeaconChainTypes> {
aggregate: Box<SignedAggregateAndProof<T::EthSpec>>,
seen_timestamp: Duration,
},
UnknownLightClientOptimisticUpdate {
message_id: MessageId,
peer_id: PeerId,
light_client_optimistic_update: Box<LightClientOptimisticUpdate<T::EthSpec>>,
seen_timestamp: Duration,
},
GossipAggregateBatch {
packages: Vec<GossipAggregatePackage<T::EthSpec>>,
},
Expand Down Expand Up @@ -957,6 +984,7 @@ impl<T: BeaconChainTypes> Work<T> {
Work::LightClientBootstrapRequest { .. } => LIGHT_CLIENT_BOOTSTRAP_REQUEST,
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE,
Work::GossipBlsToExecutionChange { .. } => GOSSIP_BLS_TO_EXECUTION_CHANGE,
}
}
Expand Down Expand Up @@ -1092,6 +1120,8 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
// Using a FIFO queue for light client updates to maintain sequence order.
let mut finality_update_queue = FifoQueue::new(MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN);
let mut optimistic_update_queue = FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN);
let mut unknown_light_client_update_queue =
FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN);

// Using a FIFO queue since blocks need to be imported sequentially.
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
Expand Down Expand Up @@ -1483,6 +1513,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
Work::UnknownBlockAggregate { .. } => {
unknown_block_aggregate_queue.push(work)
}
Work::UnknownLightClientOptimisticUpdate { .. } => {
unknown_light_client_update_queue.push(work, work_id, &self.log)
}
Work::GossipBlsToExecutionChange { .. } => {
gossip_bls_to_execution_change_queue.push(work, work_id, &self.log)
}
Expand Down Expand Up @@ -1848,6 +1881,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
message_id,
peer_id,
*light_client_optimistic_update,
Some(work_reprocessing_tx),
seen_timestamp,
)
}),
Expand Down Expand Up @@ -1998,6 +2032,20 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
seen_timestamp,
)
}),
Work::UnknownLightClientOptimisticUpdate {
message_id,
peer_id,
light_client_optimistic_update,
seen_timestamp,
} => task_spawner.spawn_blocking(move || {
worker.process_gossip_optimistic_update(
message_id,
peer_id,
*light_client_optimistic_update,
None,
seen_timestamp,
)
}),
};
}
}
Expand Down
Loading

0 comments on commit f857811

Please sign in to comment.