From f2f6ed060625c92fa0581f475652999149d42ba5 Mon Sep 17 00:00:00 2001 From: tayfunelmas Date: Wed, 20 Mar 2024 14:29:43 -0700 Subject: [PATCH] feat: Implement measuring of network round-trip time for sending state witness and receiving the ack (#10824) The goal here is to measure the network time spent (see issue #10790) 1 when sending state-witness from chunk producers to validators and 2 when sending the endorsement message from chunk validators to block producers. But, for (2), we send an ack message (`ChunkStateWitness`) from the validator back to the chunk producer. This is not fully accurate but should give us some picture of the duration. We do not change the state-witness time to record the sent-time. Instead we keep the sent-time in the chunk producer (for which we add `ChunkStateWitnessTracker` to the `Client`) along with the hash of the chunk for which witness is created. Then upon receiving the ack message, we calculate the duration and store it in a metric in the chunk producer. Most of the logic is in `state_witness_tracker.rs`. Note that this implementation is for experimenting with network time estimates for now and, since we are not changing the witness messages, having ack messages is optional and we may remove it later when we have a stable implementation. We have not added a feature flag for it (though guarded by `statelessnet_protocol`) and may want to add a compile flag later before releasing `statelessnet_protocol` in mainnet. Note that we make the assumption that the messages are directly sent between nodes (no intermediary hop node). --------- Co-authored-by: Akhilesh Singhania Co-authored-by: Simonas Kazlauskas --- chain/client/src/adapter.rs | 1 + chain/client/src/client.rs | 4 + chain/client/src/client_actions.rs | 12 +- chain/client/src/metrics.rs | 12 + .../chunk_validator/mod.rs | 34 ++- chain/client/src/stateless_validation/mod.rs | 1 + .../state_witness_producer.rs | 25 ++- .../state_witness_tracker.rs | 205 ++++++++++++++++++ .../test_utils/client_actions_test_utils.rs | 3 + chain/client/src/test_utils/setup.rs | 3 + chain/network/src/client.rs | 9 +- chain/network/src/network_protocol/mod.rs | 5 + chain/network/src/peer/peer_actor.rs | 10 +- .../src/peer_manager/peer_manager_actor.rs | 8 + chain/network/src/types.rs | 6 +- core/primitives/src/stateless_validation.rs | 20 ++ .../features/multinode_test_loop_example.rs | 16 +- tools/chainsync-loadtest/src/network.rs | 1 + 18 files changed, 361 insertions(+), 14 deletions(-) create mode 100644 chain/client/src/stateless_validation/state_witness_tracker.rs diff --git a/chain/client/src/adapter.rs b/chain/client/src/adapter.rs index 3eaf714a4fa..fdc59b7c63f 100644 --- a/chain/client/src/adapter.rs +++ b/chain/client/src/adapter.rs @@ -50,6 +50,7 @@ pub fn client_sender_for_network( tx_status_response: view_client_addr.clone().into_sender(), announce_account: view_client_addr.into_sender(), chunk_state_witness: client_addr.clone().into_sender(), + chunk_state_witness_ack: client_addr.clone().into_sender(), chunk_endorsement: client_addr.into_sender(), } } diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 5227f5e92fd..cc806da5929 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -7,6 +7,7 @@ use crate::debug::BlockProductionTracker; use crate::debug::PRODUCTION_TIMES_CACHE_SIZE; use crate::stateless_validation::chunk_endorsement_tracker::ChunkEndorsementTracker; use crate::stateless_validation::chunk_validator::ChunkValidator; +use crate::stateless_validation::state_witness_tracker::ChunkStateWitnessTracker; use crate::sync::adapter::SyncShardInfo; use crate::sync::block::BlockSync; use crate::sync::epoch::EpochSync; @@ -189,6 +190,8 @@ pub struct Client { pub chunk_inclusion_tracker: ChunkInclusionTracker, /// Tracks chunk endorsements received from chunk validators. Used to filter out chunks ready for inclusion pub chunk_endorsement_tracker: Arc, + /// Tracks a collection of state witnesses sent from chunk producers to chunk validators. + pub state_witness_tracker: ChunkStateWitnessTracker, // Optional value used for the Chunk Distribution Network Feature. chunk_distribution_network: Option, @@ -404,6 +407,7 @@ impl Client { chunk_validator, chunk_inclusion_tracker: ChunkInclusionTracker::new(), chunk_endorsement_tracker, + state_witness_tracker: ChunkStateWitnessTracker::new(clock), chunk_distribution_network, }) } diff --git a/chain/client/src/client_actions.rs b/chain/client/src/client_actions.rs index b9f0f9b78e4..91174c0bca9 100644 --- a/chain/client/src/client_actions.rs +++ b/chain/client/src/client_actions.rs @@ -40,8 +40,8 @@ use near_client_primitives::types::{ }; use near_network::client::{ BlockApproval, BlockHeadersResponse, BlockResponse, ChunkEndorsementMessage, - ChunkStateWitnessMessage, ProcessTxRequest, ProcessTxResponse, RecvChallenge, SetNetworkInfo, - StateResponse, + ChunkStateWitnessAckMessage, ChunkStateWitnessMessage, ProcessTxRequest, ProcessTxResponse, + RecvChallenge, SetNetworkInfo, StateResponse, }; use near_network::types::ReasonForBan; use near_network::types::{ @@ -1849,6 +1849,14 @@ impl ClientActionHandler for ClientActions { } } +impl ClientActionHandler for ClientActions { + type Result = (); + + fn handle(&mut self, msg: ChunkStateWitnessAckMessage) -> Self::Result { + self.client.process_chunk_state_witness_ack(msg.0); + } +} + impl ClientActionHandler for ClientActions { type Result = (); diff --git a/chain/client/src/metrics.rs b/chain/client/src/metrics.rs index a9bf2324448..395ec158e88 100644 --- a/chain/client/src/metrics.rs +++ b/chain/client/src/metrics.rs @@ -592,6 +592,18 @@ pub(crate) static ORPHAN_CHUNK_STATE_WITNESSES_TOTAL_COUNT: Lazy .unwrap() }); +pub(crate) static CHUNK_STATE_WITNESS_NETWORK_ROUNDTRIP_TIME: Lazy = Lazy::new( + || { + try_create_histogram_vec( + "near_chunk_state_witness_network_roundtrip_time", + "Time in seconds between sending state witness through the network to chunk producer and receiving the corresponding ack message", + &["witness_size_bucket"], + Some(exponential_buckets(0.001, 2.0, 20).unwrap()), + ) + .unwrap() + }, +); + pub(crate) static ORPHAN_CHUNK_STATE_WITNESS_POOL_SIZE: Lazy = Lazy::new(|| { try_create_int_gauge_vec( "near_orphan_chunk_state_witness_pool_size", diff --git a/chain/client/src/stateless_validation/chunk_validator/mod.rs b/chain/client/src/stateless_validation/chunk_validator/mod.rs index 476bd752e9d..6b50f94b5b8 100644 --- a/chain/client/src/stateless_validation/chunk_validator/mod.rs +++ b/chain/client/src/stateless_validation/chunk_validator/mod.rs @@ -6,7 +6,7 @@ use crate::stateless_validation::chunk_endorsement_tracker::ChunkEndorsementTrac use crate::{metrics, Client}; use itertools::Itertools; use near_async::futures::{AsyncComputationSpawner, AsyncComputationSpawnerExt}; -use near_async::messaging::Sender; +use near_async::messaging::{CanSend, Sender}; use near_chain::chain::{ apply_new_chunk, apply_old_chunk, NewChunkData, NewChunkResult, OldChunkData, OldChunkResult, ShardContext, StorageContext, @@ -27,7 +27,7 @@ use near_primitives::merkle::merklize; use near_primitives::receipt::Receipt; use near_primitives::sharding::{ChunkHash, ReceiptProof, ShardChunkHeader}; use near_primitives::stateless_validation::{ - ChunkEndorsement, ChunkStateWitness, ChunkStateWitnessInner, + ChunkEndorsement, ChunkStateWitness, ChunkStateWitnessAck, ChunkStateWitnessInner, }; use near_primitives::transaction::SignedTransaction; use near_primitives::types::chunk_extra::ChunkExtra; @@ -640,6 +640,14 @@ impl Client { witness: ChunkStateWitness, processing_done_tracker: Option, ) -> Result<(), Error> { + // Send the acknowledgement for the state witness back to the chunk producer. + // This is currently used for network roundtrip time measurement, so we do not need to + // wait for validation to finish. + if let Err(err) = self.send_state_witness_ack(&witness) { + tracing::warn!(target: "stateless_validation", error = &err as &dyn std::error::Error, + "Error sending chunk state witness acknowledgement"); + } + let prev_block_hash = witness.inner.chunk_header.prev_block_hash(); let prev_block = match self.chain.get_block(prev_block_hash) { Ok(block) => block, @@ -650,6 +658,7 @@ impl Client { } Err(err) => return Err(err), }; + self.process_chunk_state_witness_with_prev_block( witness, &prev_block, @@ -657,6 +666,27 @@ impl Client { ) } + fn send_state_witness_ack(&self, witness: &ChunkStateWitness) -> Result<(), Error> { + // First find the AccountId for the chunk producer and then send the ack to that account. + let chunk_header = &witness.inner.chunk_header; + let prev_block_hash = chunk_header.prev_block_hash(); + let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(prev_block_hash)?; + let chunk_producer = self.epoch_manager.get_chunk_producer( + &epoch_id, + chunk_header.height_created(), + chunk_header.shard_id(), + )?; + + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::ChunkStateWitnessAck( + chunk_producer, + ChunkStateWitnessAck::new(&witness), + ), + )); + + Ok(()) + } + pub fn process_chunk_state_witness_with_prev_block( &mut self, witness: ChunkStateWitness, diff --git a/chain/client/src/stateless_validation/mod.rs b/chain/client/src/stateless_validation/mod.rs index c92d97af73c..ed965f68e89 100644 --- a/chain/client/src/stateless_validation/mod.rs +++ b/chain/client/src/stateless_validation/mod.rs @@ -3,3 +3,4 @@ pub mod chunk_validator; pub mod processing_tracker; mod shadow_validate; mod state_witness_producer; +pub mod state_witness_tracker; diff --git a/chain/client/src/stateless_validation/state_witness_producer.rs b/chain/client/src/stateless_validation/state_witness_producer.rs index 6b61a83bcb4..87cc3785c5d 100644 --- a/chain/client/src/stateless_validation/state_witness_producer.rs +++ b/chain/client/src/stateless_validation/state_witness_producer.rs @@ -10,7 +10,8 @@ use near_primitives::hash::{hash, CryptoHash}; use near_primitives::receipt::Receipt; use near_primitives::sharding::{ChunkHash, ReceiptProof, ShardChunk, ShardChunkHeader}; use near_primitives::stateless_validation::{ - ChunkStateTransition, ChunkStateWitness, ChunkStateWitnessInner, StoredChunkStateTransitionData, + ChunkStateTransition, ChunkStateWitness, ChunkStateWitnessAck, ChunkStateWitnessInner, + StoredChunkStateTransitionData, }; use near_primitives::types::EpochId; use std::collections::HashMap; @@ -45,7 +46,7 @@ impl Client { .ordered_chunk_validators(); let my_signer = self.validator_signer.as_ref().ok_or(Error::NotAValidator)?.clone(); - let witness = { + let (witness, witness_size) = { let witness_inner = self.create_state_witness_inner( prev_block_header, prev_chunk_header, @@ -56,7 +57,7 @@ impl Client { metrics::CHUNK_STATE_WITNESS_TOTAL_SIZE .with_label_values(&[&chunk_header.shard_id().to_string()]) .observe(witness_size as f64); - ChunkStateWitness { inner: witness_inner, signature } + (ChunkStateWitness { inner: witness_inner, signature }, witness_size) }; if chunk_validators.contains(my_signer.validator_id()) { @@ -78,12 +79,30 @@ impl Client { chunk.chunk_hash(), chunk_validators, ); + + // Record the witness in order to match the incoming acks for measuring round-trip times. + // See process_chunk_state_witness_ack for the handling of the ack messages. + self.state_witness_tracker.record_witness_sent( + &witness, + witness_size, + chunk_validators.len(), + ); + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( NetworkRequests::ChunkStateWitness(chunk_validators, witness), )); Ok(()) } + /// Handles the state witness ack message from the chunk validator. + /// It computes the round-trip time between sending the state witness and receiving + /// the ack message and updates the corresponding metric with it. + /// Currently we do not raise an error for handling of witness-ack messages, + /// as it is used only for tracking some networking metrics. + pub fn process_chunk_state_witness_ack(&mut self, witness_ack: ChunkStateWitnessAck) -> () { + self.state_witness_tracker.on_witness_ack_received(witness_ack); + } + pub(crate) fn create_state_witness_inner( &mut self, prev_block_header: &BlockHeader, diff --git a/chain/client/src/stateless_validation/state_witness_tracker.rs b/chain/client/src/stateless_validation/state_witness_tracker.rs new file mode 100644 index 00000000000..5be1af88505 --- /dev/null +++ b/chain/client/src/stateless_validation/state_witness_tracker.rs @@ -0,0 +1,205 @@ +use crate::metrics; +use borsh::{BorshDeserialize, BorshSerialize}; +use bytesize::ByteSize; +use lru::LruCache; +use near_async::time::Clock; +use near_primitives::sharding::ChunkHash; +use near_primitives::stateless_validation::{ChunkStateWitness, ChunkStateWitnessAck}; +use std::hash::Hash; + +/// Limit to the number of witnesses tracked. +/// +/// Other witnesses past this number are discarded (perhaps add a blurb on how.) +const CHUNK_STATE_WITNESS_MAX_RECORD_COUNT: usize = 50; + +/// Refers to a state witness sent from a chunk producer to a chunk validator. +/// +/// Used to map the incoming acknowledgement messages back to the timing information of +/// the originating witness record. +#[derive(Debug, Clone, Hash, PartialEq, Eq, BorshSerialize, BorshDeserialize)] +struct ChunkStateWitnessKey { + /// Hash of the chunk for which the state witness was generated. + chunk_hash: ChunkHash, +} + +impl ChunkStateWitnessKey { + pub fn new(witness: &ChunkStateWitness) -> Self { + Self { chunk_hash: witness.inner.chunk_header.chunk_hash() } + } +} + +struct ChunkStateWitnessRecord { + /// Size of the witness in bytes. + witness_size: usize, + /// Number of validators that the witness is sent to. + num_validators: usize, + /// Timestamp of when the chunk producer sends the state witness. + sent_timestamp: near_async::time::Instant, +} + +/// Tracks a collection of state witnesses sent from chunk producers to validators. +/// +/// This is currently used to calculate the round-trip time of sending the witness and +/// getting the ack message back, where the ack message is used as a proxy for the endorsement +/// message from validators to block producers to make an estimate of network time for sending +/// witness and receiving the endorsement. +pub struct ChunkStateWitnessTracker { + witnesses: LruCache, + clock: Clock, +} + +impl ChunkStateWitnessTracker { + pub fn new(clock: Clock) -> Self { + Self { witnesses: LruCache::new(CHUNK_STATE_WITNESS_MAX_RECORD_COUNT), clock } + } + + /// Adds a new witness message to track. + pub fn record_witness_sent( + &mut self, + witness: &ChunkStateWitness, + witness_size_in_bytes: usize, + num_validators: usize, + ) -> () { + let key = ChunkStateWitnessKey::new(witness); + tracing::trace!(target: "state_witness_tracker", witness_key=?key, + size=witness_size_in_bytes, "Recording state witness sent."); + self.witnesses.put( + key, + ChunkStateWitnessRecord { + num_validators, + witness_size: witness_size_in_bytes, + sent_timestamp: self.clock.now(), + }, + ); + } + + /// Handles an ack message for the witness. Calculates the round-trip duration and + /// records it in the corresponding metric. + pub fn on_witness_ack_received(&mut self, ack: ChunkStateWitnessAck) -> () { + let key = ChunkStateWitnessKey { chunk_hash: ack.chunk_hash }; + tracing::trace!(target: "state_witness_tracker", witness_key=?key, + "Received ack for state witness"); + if let Some(record) = self.witnesses.get_mut(&key) { + debug_assert!(record.num_validators > 0); + + Self::update_roundtrip_time_metric(record, &self.clock); + + // Cleanup the record if we received the acks from all the validators, otherwise update + // the number of validators from which we are expecting an ack message. + let remaining = record.num_validators.saturating_sub(1); + if remaining > 0 { + record.num_validators = remaining; + } else { + self.witnesses.pop(&key); + } + } + } + + /// Records the roundtrip time in metrics. + fn update_roundtrip_time_metric(record: &ChunkStateWitnessRecord, clock: &Clock) -> () { + let received_time = clock.now(); + if received_time > record.sent_timestamp { + metrics::CHUNK_STATE_WITNESS_NETWORK_ROUNDTRIP_TIME + .with_label_values(&[witness_size_bucket(record.witness_size)]) + .observe((received_time - record.sent_timestamp).as_seconds_f64()); + } + } + + #[cfg(test)] + fn get_record_for_witness( + &mut self, + witness: &ChunkStateWitness, + ) -> Option<&ChunkStateWitnessRecord> { + let key = ChunkStateWitnessKey::new(witness); + self.witnesses.get(&key) + } +} + +/// Buckets for state-witness size. +static SIZE_IN_BYTES_TO_BUCKET: &'static [(ByteSize, &str)] = &[ + (ByteSize::kb(1), "<1KB"), + (ByteSize::kb(10), "1-10KB"), + (ByteSize::kb(100), "10-100KB"), + (ByteSize::mb(1), "100KB-1MB"), + (ByteSize::mb(2), "1-2MB"), + (ByteSize::mb(3), "2-3MB"), + (ByteSize::mb(4), "3-4MB"), + (ByteSize::mb(5), "4-5MB"), + (ByteSize::mb(10), "5-10MB"), + (ByteSize::mb(20), "10-20MB"), +]; + +/// Returns the string representation of the size buckets for a given witness size in bytes. +fn witness_size_bucket(size_in_bytes: usize) -> &'static str { + for (upper_size, label) in SIZE_IN_BYTES_TO_BUCKET.iter() { + if size_in_bytes < upper_size.as_u64() as usize { + return *label; + } + } + ">20MB" +} + +#[cfg(test)] +mod state_witness_tracker_tests { + use super::*; + use near_async::time::{Duration, FakeClock, Utc}; + use near_primitives::hash::hash; + use near_primitives::types::ShardId; + + const NUM_VALIDATORS: usize = 3; + + #[test] + fn record_and_receive_ack_num_validators_decreased() { + let witness = dummy_witness(); + let clock = dummy_clock(); + let mut tracker = ChunkStateWitnessTracker::new(clock.clock()); + + tracker.record_witness_sent(&witness, 4321, NUM_VALIDATORS); + clock.advance(Duration::milliseconds(3444)); + + // Ack received from all "except for one". + for _ in 1..NUM_VALIDATORS { + tracker.on_witness_ack_received(ChunkStateWitnessAck::new(&witness)); + } + + let record = tracker.get_record_for_witness(&witness); + assert!(record.is_some()); + assert_eq!(record.unwrap().num_validators, 1); + } + + #[test] + fn record_and_receive_ack_record_deleted() { + let witness = dummy_witness(); + let clock = dummy_clock(); + let mut tracker = ChunkStateWitnessTracker::new(clock.clock()); + + tracker.record_witness_sent(&witness, 4321, NUM_VALIDATORS); + clock.advance(Duration::milliseconds(3444)); + + // Ack received from all. + for _ in 1..=NUM_VALIDATORS { + tracker.on_witness_ack_received(ChunkStateWitnessAck::new(&witness)); + } + + let record = tracker.get_record_for_witness(&witness); + assert!(record.is_none()); + } + + #[test] + fn choose_size_bucket() { + assert_eq!(witness_size_bucket(500), "<1KB"); + assert_eq!(witness_size_bucket(15_000), "10-100KB"); + assert_eq!(witness_size_bucket(250_000), "100KB-1MB"); + assert_eq!(witness_size_bucket(2_500_000), "2-3MB"); + assert_eq!(witness_size_bucket(7_500_000), "5-10MB"); + assert_eq!(witness_size_bucket(25_000_000), ">20MB"); + } + + fn dummy_witness() -> ChunkStateWitness { + ChunkStateWitness::new_dummy(100, 2 as ShardId, hash("fake hash".as_bytes())) + } + + fn dummy_clock() -> FakeClock { + FakeClock::new(Utc::from_unix_timestamp(1601510400).unwrap()) + } +} diff --git a/chain/client/src/test_utils/client_actions_test_utils.rs b/chain/client/src/test_utils/client_actions_test_utils.rs index 7f06bd79ad7..0b584c40648 100644 --- a/chain/client/src/test_utils/client_actions_test_utils.rs +++ b/chain/client/src/test_utils/client_actions_test_utils.rs @@ -32,6 +32,9 @@ pub fn forward_client_messages_from_network_to_client_actions( ClientSenderForNetworkMessage::_chunk_state_witness(msg) => { (msg.callback)(Ok(client_actions.handle(msg.message))); } + ClientSenderForNetworkMessage::_chunk_state_witness_ack(msg) => { + (msg.callback)(Ok(client_actions.handle(msg.message))); + } ClientSenderForNetworkMessage::_chunk_endorsement(msg) => { (msg.callback)(Ok(client_actions.handle(msg.message))); } diff --git a/chain/client/src/test_utils/setup.rs b/chain/client/src/test_utils/setup.rs index 8027bc7c2f9..acf979f67b2 100644 --- a/chain/client/src/test_utils/setup.rs +++ b/chain/client/src/test_utils/setup.rs @@ -857,6 +857,9 @@ pub fn setup_mock_all_validators( NetworkRequests::ChunkStateWitness(_, _) => { // TODO(#10265): Implement for integration tests. }, + NetworkRequests::ChunkStateWitnessAck(_, _) => { + // TODO(#10790): Implement for integration tests. + }, NetworkRequests::ChunkEndorsement(_, _) => { // TODO(#10265): Implement for integration tests. }, diff --git a/chain/network/src/client.rs b/chain/network/src/client.rs index fc2377aae91..064a31b89d8 100644 --- a/chain/network/src/client.rs +++ b/chain/network/src/client.rs @@ -6,7 +6,9 @@ use near_primitives::challenge::Challenge; use near_primitives::errors::InvalidTxError; use near_primitives::hash::CryptoHash; use near_primitives::network::{AnnounceAccount, PeerId}; -use near_primitives::stateless_validation::{ChunkEndorsement, ChunkStateWitness}; +use near_primitives::stateless_validation::{ + ChunkEndorsement, ChunkStateWitness, ChunkStateWitnessAck, +}; use near_primitives::transaction::SignedTransaction; use near_primitives::types::{AccountId, EpochId, ShardId}; use near_primitives::views::FinalExecutionOutcomeView; @@ -116,6 +118,10 @@ pub struct AnnounceAccountRequest(pub Vec<(AnnounceAccount, Option)>); #[rtype(result = "()")] pub struct ChunkStateWitnessMessage(pub ChunkStateWitness); +#[derive(actix::Message, Debug, Clone, PartialEq, Eq)] +#[rtype(result = "()")] +pub struct ChunkStateWitnessAckMessage(pub ChunkStateWitnessAck); + #[derive(actix::Message, Debug, Clone, PartialEq, Eq)] #[rtype(result = "()")] pub struct ChunkEndorsementMessage(pub ChunkEndorsement); @@ -142,5 +148,6 @@ pub struct ClientSenderForNetwork { pub announce_account: AsyncSender, ReasonForBan>>, pub chunk_state_witness: AsyncSender, + pub chunk_state_witness_ack: AsyncSender, pub chunk_endorsement: AsyncSender, } diff --git a/chain/network/src/network_protocol/mod.rs b/chain/network/src/network_protocol/mod.rs index fd436c0a2f6..26c88c7951d 100644 --- a/chain/network/src/network_protocol/mod.rs +++ b/chain/network/src/network_protocol/mod.rs @@ -9,6 +9,7 @@ mod state_sync; pub use edge::*; use near_primitives::stateless_validation::ChunkEndorsement; use near_primitives::stateless_validation::ChunkStateWitness; +use near_primitives::stateless_validation::ChunkStateWitnessAck; pub use peer::*; pub use state_sync::*; @@ -531,6 +532,7 @@ pub enum RoutedMessageBody { PartialEncodedChunkForward(PartialEncodedChunkForwardMsg), ChunkStateWitness(ChunkStateWitness), ChunkEndorsement(ChunkEndorsement), + ChunkStateWitnessAck(ChunkStateWitnessAck), } impl RoutedMessageBody { @@ -598,6 +600,9 @@ impl fmt::Debug for RoutedMessageBody { RoutedMessageBody::_UnusedVersionedStateResponse => write!(f, "VersionedStateResponse"), RoutedMessageBody::ChunkStateWitness(_) => write!(f, "ChunkStateWitness"), RoutedMessageBody::ChunkEndorsement(_) => write!(f, "ChunkEndorsement"), + RoutedMessageBody::ChunkStateWitnessAck(ack, ..) => { + f.debug_tuple("ChunkStateWitnessAck").field(&ack.chunk_hash).finish() + } } } } diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index 07ecdd281ac..dcc99931635 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -1,9 +1,9 @@ use crate::accounts_data::AccountDataError; use crate::client::{ AnnounceAccountRequest, BlockApproval, BlockHeadersRequest, BlockHeadersResponse, BlockRequest, - BlockResponse, ChunkEndorsementMessage, ChunkStateWitnessMessage, ProcessTxRequest, - RecvChallenge, StateRequestHeader, StateRequestPart, StateResponse, TxStatusRequest, - TxStatusResponse, + BlockResponse, ChunkEndorsementMessage, ChunkStateWitnessAckMessage, ChunkStateWitnessMessage, + ProcessTxRequest, RecvChallenge, StateRequestHeader, StateRequestPart, StateResponse, + TxStatusRequest, TxStatusResponse, }; use crate::concurrency::atomic_cell::AtomicCell; use crate::concurrency::demux; @@ -1016,6 +1016,10 @@ impl PeerActor { network_state.client.send_async(ChunkStateWitnessMessage(witness)).await.ok(); None } + RoutedMessageBody::ChunkStateWitnessAck(ack) => { + network_state.client.send_async(ChunkStateWitnessAckMessage(ack)).await.ok(); + None + } RoutedMessageBody::ChunkEndorsement(endorsement) => { network_state.client.send_async(ChunkEndorsementMessage(endorsement)).await.ok(); None diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 773e5b33c9c..a7c5dc12992 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -976,6 +976,14 @@ impl PeerManagerActor { } NetworkResponses::NoResponse } + NetworkRequests::ChunkStateWitnessAck(target, ack) => { + self.state.send_message_to_account( + &self.clock, + &target, + RoutedMessageBody::ChunkStateWitnessAck(ack), + ); + NetworkResponses::NoResponse + } NetworkRequests::ChunkEndorsement(target, endorsement) => { self.state.send_message_to_account( &self.clock, diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index 31dee91f93c..6341cca3918 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -19,7 +19,9 @@ use near_primitives::challenge::Challenge; use near_primitives::hash::CryptoHash; use near_primitives::network::{AnnounceAccount, PeerId}; use near_primitives::sharding::PartialEncodedChunkWithArcReceipts; -use near_primitives::stateless_validation::{ChunkEndorsement, ChunkStateWitness}; +use near_primitives::stateless_validation::{ + ChunkEndorsement, ChunkStateWitness, ChunkStateWitnessAck, +}; use near_primitives::transaction::SignedTransaction; use near_primitives::types::{AccountId, BlockHeight, EpochHeight, ShardId}; use std::collections::{HashMap, HashSet}; @@ -258,6 +260,8 @@ pub enum NetworkRequests { Challenge(Challenge), /// A chunk's state witness. ChunkStateWitness(Vec, ChunkStateWitness), + /// Acknowledgement to a chunk's state witness, sent back to the originating chunk producer. + ChunkStateWitnessAck(AccountId, ChunkStateWitnessAck), /// Message for a chunk endorsement, sent by a chunk validator to the block producer. ChunkEndorsement(AccountId, ChunkEndorsement), } diff --git a/core/primitives/src/stateless_validation.rs b/core/primitives/src/stateless_validation.rs index afdcb31eff1..3dfc45cd944 100644 --- a/core/primitives/src/stateless_validation.rs +++ b/core/primitives/src/stateless_validation.rs @@ -24,6 +24,26 @@ pub struct ChunkStateWitness { pub signature: Signature, } +/// An acknowledgement sent from the chunk producer upon receiving the state witness to +/// the originator of the witness (chunk producer). +/// +/// This message is currently used for computing +/// the network round-trip time of sending the state witness to the chunk producer and receiving the +/// endorsement message. Note that the endorsement message is sent to the next block producer, +/// while this message is sent back to the originator of the state witness, though this allows +/// us to approximate the time for transmitting the state witness + transmitting the endorsement. +#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)] +pub struct ChunkStateWitnessAck { + /// Hash of the chunk for which the state witness was generated. + pub chunk_hash: ChunkHash, +} + +impl ChunkStateWitnessAck { + pub fn new(witness_to_ack: &ChunkStateWitness) -> Self { + Self { chunk_hash: witness_to_ack.inner.chunk_header.chunk_hash() } + } +} + /// The state witness for a chunk; proves the state transition that the /// chunk attests to. #[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)] diff --git a/integration-tests/src/tests/client/features/multinode_test_loop_example.rs b/integration-tests/src/tests/client/features/multinode_test_loop_example.rs index a9549f77e76..e775219b427 100644 --- a/integration-tests/src/tests/client/features/multinode_test_loop_example.rs +++ b/integration-tests/src/tests/client/features/multinode_test_loop_example.rs @@ -37,8 +37,9 @@ use near_client::{Client, SyncAdapter, SyncMessage}; use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig}; use near_epoch_manager::EpochManager; use near_network::client::{ - BlockApproval, BlockResponse, ChunkEndorsementMessage, ChunkStateWitnessMessage, - ClientSenderForNetwork, ClientSenderForNetworkMessage, ProcessTxRequest, + BlockApproval, BlockResponse, ChunkEndorsementMessage, ChunkStateWitnessAckMessage, + ChunkStateWitnessMessage, ClientSenderForNetwork, ClientSenderForNetworkMessage, + ProcessTxRequest, }; use near_network::shards_manager::ShardsManagerRequestFromNetwork; use near_network::test_loop::SupportsRoutingLookup; @@ -540,6 +541,17 @@ pub fn route_network_messages_to_client< } } } + NetworkRequests::ChunkStateWitnessAck(target, witness_ack) => { + let other_idx = data.index_for_account(&target); + if other_idx != idx { + drop( + client_senders[other_idx] + .send_async(ChunkStateWitnessAckMessage(witness_ack)), + ); + } else { + tracing::warn!("Dropping state-witness-ack message to self"); + } + } // TODO: Support more network message types as we expand the test. _ => return Err((idx, PeerManagerMessageRequest::NetworkRequests(request).into())), } diff --git a/tools/chainsync-loadtest/src/network.rs b/tools/chainsync-loadtest/src/network.rs index 5f3055bf2bf..ecd52f78f2f 100644 --- a/tools/chainsync-loadtest/src/network.rs +++ b/tools/chainsync-loadtest/src/network.rs @@ -264,6 +264,7 @@ impl Network { Ok(accounts.0.into_iter().map(|a| a.0).collect::>()) }), chunk_state_witness: noop().into_sender(), + chunk_state_witness_ack: noop().into_sender(), chunk_endorsement: noop().into_sender(), } }