Skip to content

Commit

Permalink
feat: Implement measuring of network round-trip time for sending stat…
Browse files Browse the repository at this point in the history
…e witness and receiving the ack (#10824)

The goal here is to measure the network time spent (see issue #10790)

1 when sending state-witness from chunk producers to validators and
2 when sending the endorsement message from chunk validators to block
producers.

But, for (2), we send an ack message (`ChunkStateWitness`) from the
validator back to the chunk producer. This is not fully accurate but
should give us some picture of the duration. We do not change the
state-witness time to record the sent-time. Instead we keep the
sent-time in the chunk producer (for which we add
`ChunkStateWitnessTracker` to the `Client`) along with the hash of the
chunk for which witness is created. Then upon receiving the ack message,
we calculate the duration and store it in a metric in the chunk
producer.

Most of the logic is in `state_witness_tracker.rs`. Note that this
implementation is for experimenting with network time estimates for now
and, since we are not changing the witness messages, having ack messages
is optional and we may remove it later when we have a stable
implementation. We have not added a feature flag for it (though guarded
by `statelessnet_protocol`) and may want to add a compile flag later
before releasing `statelessnet_protocol` in mainnet.

Note that we make the assumption that the messages are directly sent
between nodes (no intermediary hop node).

---------

Co-authored-by: Akhilesh Singhania <akhi@near.org>
Co-authored-by: Simonas Kazlauskas <github@kazlauskas.me>
  • Loading branch information
3 people authored Mar 20, 2024
1 parent 21fba5b commit f2f6ed0
Show file tree
Hide file tree
Showing 18 changed files with 361 additions and 14 deletions.
1 change: 1 addition & 0 deletions chain/client/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub fn client_sender_for_network(
tx_status_response: view_client_addr.clone().into_sender(),
announce_account: view_client_addr.into_sender(),
chunk_state_witness: client_addr.clone().into_sender(),
chunk_state_witness_ack: client_addr.clone().into_sender(),
chunk_endorsement: client_addr.into_sender(),
}
}
4 changes: 4 additions & 0 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::debug::BlockProductionTracker;
use crate::debug::PRODUCTION_TIMES_CACHE_SIZE;
use crate::stateless_validation::chunk_endorsement_tracker::ChunkEndorsementTracker;
use crate::stateless_validation::chunk_validator::ChunkValidator;
use crate::stateless_validation::state_witness_tracker::ChunkStateWitnessTracker;
use crate::sync::adapter::SyncShardInfo;
use crate::sync::block::BlockSync;
use crate::sync::epoch::EpochSync;
Expand Down Expand Up @@ -189,6 +190,8 @@ pub struct Client {
pub chunk_inclusion_tracker: ChunkInclusionTracker,
/// Tracks chunk endorsements received from chunk validators. Used to filter out chunks ready for inclusion
pub chunk_endorsement_tracker: Arc<ChunkEndorsementTracker>,
/// Tracks a collection of state witnesses sent from chunk producers to chunk validators.
pub state_witness_tracker: ChunkStateWitnessTracker,

// Optional value used for the Chunk Distribution Network Feature.
chunk_distribution_network: Option<ChunkDistributionNetwork>,
Expand Down Expand Up @@ -404,6 +407,7 @@ impl Client {
chunk_validator,
chunk_inclusion_tracker: ChunkInclusionTracker::new(),
chunk_endorsement_tracker,
state_witness_tracker: ChunkStateWitnessTracker::new(clock),
chunk_distribution_network,
})
}
Expand Down
12 changes: 10 additions & 2 deletions chain/client/src/client_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use near_client_primitives::types::{
};
use near_network::client::{
BlockApproval, BlockHeadersResponse, BlockResponse, ChunkEndorsementMessage,
ChunkStateWitnessMessage, ProcessTxRequest, ProcessTxResponse, RecvChallenge, SetNetworkInfo,
StateResponse,
ChunkStateWitnessAckMessage, ChunkStateWitnessMessage, ProcessTxRequest, ProcessTxResponse,
RecvChallenge, SetNetworkInfo, StateResponse,
};
use near_network::types::ReasonForBan;
use near_network::types::{
Expand Down Expand Up @@ -1849,6 +1849,14 @@ impl ClientActionHandler<ChunkStateWitnessMessage> for ClientActions {
}
}

impl ClientActionHandler<ChunkStateWitnessAckMessage> for ClientActions {
type Result = ();

fn handle(&mut self, msg: ChunkStateWitnessAckMessage) -> Self::Result {
self.client.process_chunk_state_witness_ack(msg.0);
}
}

impl ClientActionHandler<ChunkEndorsementMessage> for ClientActions {
type Result = ();

Expand Down
12 changes: 12 additions & 0 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,18 @@ pub(crate) static ORPHAN_CHUNK_STATE_WITNESSES_TOTAL_COUNT: Lazy<IntCounterVec>
.unwrap()
});

pub(crate) static CHUNK_STATE_WITNESS_NETWORK_ROUNDTRIP_TIME: Lazy<HistogramVec> = Lazy::new(
|| {
try_create_histogram_vec(
"near_chunk_state_witness_network_roundtrip_time",
"Time in seconds between sending state witness through the network to chunk producer and receiving the corresponding ack message",
&["witness_size_bucket"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
},
);

pub(crate) static ORPHAN_CHUNK_STATE_WITNESS_POOL_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_orphan_chunk_state_witness_pool_size",
Expand Down
34 changes: 32 additions & 2 deletions chain/client/src/stateless_validation/chunk_validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::stateless_validation::chunk_endorsement_tracker::ChunkEndorsementTrac
use crate::{metrics, Client};
use itertools::Itertools;
use near_async::futures::{AsyncComputationSpawner, AsyncComputationSpawnerExt};
use near_async::messaging::Sender;
use near_async::messaging::{CanSend, Sender};
use near_chain::chain::{
apply_new_chunk, apply_old_chunk, NewChunkData, NewChunkResult, OldChunkData, OldChunkResult,
ShardContext, StorageContext,
Expand All @@ -27,7 +27,7 @@ use near_primitives::merkle::merklize;
use near_primitives::receipt::Receipt;
use near_primitives::sharding::{ChunkHash, ReceiptProof, ShardChunkHeader};
use near_primitives::stateless_validation::{
ChunkEndorsement, ChunkStateWitness, ChunkStateWitnessInner,
ChunkEndorsement, ChunkStateWitness, ChunkStateWitnessAck, ChunkStateWitnessInner,
};
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::chunk_extra::ChunkExtra;
Expand Down Expand Up @@ -640,6 +640,14 @@ impl Client {
witness: ChunkStateWitness,
processing_done_tracker: Option<ProcessingDoneTracker>,
) -> Result<(), Error> {
// Send the acknowledgement for the state witness back to the chunk producer.
// This is currently used for network roundtrip time measurement, so we do not need to
// wait for validation to finish.
if let Err(err) = self.send_state_witness_ack(&witness) {
tracing::warn!(target: "stateless_validation", error = &err as &dyn std::error::Error,
"Error sending chunk state witness acknowledgement");
}

let prev_block_hash = witness.inner.chunk_header.prev_block_hash();
let prev_block = match self.chain.get_block(prev_block_hash) {
Ok(block) => block,
Expand All @@ -650,13 +658,35 @@ impl Client {
}
Err(err) => return Err(err),
};

self.process_chunk_state_witness_with_prev_block(
witness,
&prev_block,
processing_done_tracker,
)
}

fn send_state_witness_ack(&self, witness: &ChunkStateWitness) -> Result<(), Error> {
// First find the AccountId for the chunk producer and then send the ack to that account.
let chunk_header = &witness.inner.chunk_header;
let prev_block_hash = chunk_header.prev_block_hash();
let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(prev_block_hash)?;
let chunk_producer = self.epoch_manager.get_chunk_producer(
&epoch_id,
chunk_header.height_created(),
chunk_header.shard_id(),
)?;

self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::ChunkStateWitnessAck(
chunk_producer,
ChunkStateWitnessAck::new(&witness),
),
));

Ok(())
}

pub fn process_chunk_state_witness_with_prev_block(
&mut self,
witness: ChunkStateWitness,
Expand Down
1 change: 1 addition & 0 deletions chain/client/src/stateless_validation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ pub mod chunk_validator;
pub mod processing_tracker;
mod shadow_validate;
mod state_witness_producer;
pub mod state_witness_tracker;
25 changes: 22 additions & 3 deletions chain/client/src/stateless_validation/state_witness_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use near_primitives::hash::{hash, CryptoHash};
use near_primitives::receipt::Receipt;
use near_primitives::sharding::{ChunkHash, ReceiptProof, ShardChunk, ShardChunkHeader};
use near_primitives::stateless_validation::{
ChunkStateTransition, ChunkStateWitness, ChunkStateWitnessInner, StoredChunkStateTransitionData,
ChunkStateTransition, ChunkStateWitness, ChunkStateWitnessAck, ChunkStateWitnessInner,
StoredChunkStateTransitionData,
};
use near_primitives::types::EpochId;
use std::collections::HashMap;
Expand Down Expand Up @@ -45,7 +46,7 @@ impl Client {
.ordered_chunk_validators();

let my_signer = self.validator_signer.as_ref().ok_or(Error::NotAValidator)?.clone();
let witness = {
let (witness, witness_size) = {
let witness_inner = self.create_state_witness_inner(
prev_block_header,
prev_chunk_header,
Expand All @@ -56,7 +57,7 @@ impl Client {
metrics::CHUNK_STATE_WITNESS_TOTAL_SIZE
.with_label_values(&[&chunk_header.shard_id().to_string()])
.observe(witness_size as f64);
ChunkStateWitness { inner: witness_inner, signature }
(ChunkStateWitness { inner: witness_inner, signature }, witness_size)
};

if chunk_validators.contains(my_signer.validator_id()) {
Expand All @@ -78,12 +79,30 @@ impl Client {
chunk.chunk_hash(),
chunk_validators,
);

// Record the witness in order to match the incoming acks for measuring round-trip times.
// See process_chunk_state_witness_ack for the handling of the ack messages.
self.state_witness_tracker.record_witness_sent(
&witness,
witness_size,
chunk_validators.len(),
);

self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::ChunkStateWitness(chunk_validators, witness),
));
Ok(())
}

/// Handles the state witness ack message from the chunk validator.
/// It computes the round-trip time between sending the state witness and receiving
/// the ack message and updates the corresponding metric with it.
/// Currently we do not raise an error for handling of witness-ack messages,
/// as it is used only for tracking some networking metrics.
pub fn process_chunk_state_witness_ack(&mut self, witness_ack: ChunkStateWitnessAck) -> () {
self.state_witness_tracker.on_witness_ack_received(witness_ack);
}

pub(crate) fn create_state_witness_inner(
&mut self,
prev_block_header: &BlockHeader,
Expand Down
Loading

0 comments on commit f2f6ed0

Please sign in to comment.