Skip to content

Commit

Permalink
[mempool] Specify number of workers for mempool threads. Add promethe…
Browse files Browse the repository at this point in the history
…us metrics for networking. (#1113)

* function to revoke vouch

* vouch revoking apis

* add test

* set rotation can only expand by a certain number of nodes on every epoch. So that no more than 25% of the next validator set has unproven nodes. This is based on the amount of nodes that we know to keep consensus (case 1 and 2).

* updating tests and patch implementation

* patch reconfig case_2

* make sure jailed nodes are dropping

* increase threshold for voting

* update writeset for rescue to include recovert mode

* patch build

* remove recovery mode

* expand epoch set by 1/6th

* make setting recovery mode optional from CLI

* add debug prints

* debug prints

* debug prints and comments.

* add debug prints and comments

* notes

* comment the mempool config params

* find where we could create backpressure on mempool

* prints for debugging

* WIP experimental backpressure on shared mempool consensus requests. Does not build.

* change node.yaml default params for state_sync and mempool

* Fix build

* adding prometheus counters

* patch build

* State sync debug (#1117)

* Release v5.1.1 (#1114)

* function to revoke vouch

* vouch revoking apis

* add test

* set rotation can only expand by a certain number of nodes on every epoch. So that no more than 25% of the next validator set has unproven nodes. This is based on the amount of nodes that we know to keep consensus (case 1 and 2).

* updating tests and patch implementation

* patch reconfig case_2

* make sure jailed nodes are dropping

* increase threshold for voting

* update writeset for rescue to include recovert mode

* patch build

* remove recovery mode

* expand epoch set by 1/6th

* make setting recovery mode optional from CLI

* impove mock case 1 helper

* patch onboarding reconfig

* patch mock tests

* refactored tests that use mock_

* build stdlib for release

* update 0L default configs for mempool and state sync

* bump version

* changelog

* Update 5_1_1.md

* adds some more metrics

* adds more metrics

* [move] [Fast Track Proposal] Turn down the heat on Cost To Exist (#1119)

* defer for 90 days cost to inactives, and reduce the cost of burn by only implementing at steady state.

* burn should be the default if user has not set send to community explicitly

* exchanges some dbg! statements with debug! statements to be able to control log output

Co-authored-by: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com>

Co-authored-by: Gökhan Şimşek <g.simsek@paralect.com>
Co-authored-by: Sven Panko <info@intuitiveminds.de>
  • Loading branch information
3 people authored Jun 1, 2022
1 parent ad9c806 commit 2e9b257
Show file tree
Hide file tree
Showing 13 changed files with 403 additions and 33 deletions.
16 changes: 16 additions & 0 deletions config/src/config/mempool_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,34 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct MempoolConfig {
/// What is the total size of the mempool queue, including invalid txs.
pub capacity: usize,
/// How many txs can each user have in the mempool at a given time.
pub capacity_per_user: usize,
// a threshold for fullnodes to determine which peers to broadcast to.
// peers which are go over this threshold, will receive broadcasts.
// number of failovers to broadcast to when the primary network is alive
pub default_failovers: usize,
// number of times a mempool broadcast gets re-sent to a peer if the previous was unacknowledged.
pub max_broadcasts_per_peer: usize,
// how often to snapshot the mempool for analytics purposes.
pub mempool_snapshot_interval_secs: u64,
// how long to wait for a peer after a broadcast was submitted, before we mark it as unacknowledged.
pub shared_mempool_ack_timeout_ms: u64,
// if peer_manager is in backoff mode mempool/src/shared_mempool/peer_manager.rs
// this is the base interval for backing off.
pub shared_mempool_backoff_interval_ms: u64,

// size of batch from mempool timeline to broadcast to peers.
pub shared_mempool_batch_size: usize,
// Number of workers to be spawned to receive inbound shared mempool broadcasts.
pub shared_mempool_max_concurrent_inbound_syncs: usize,
// the default interval to execute shared mempool broadcasts to peers.
// this is overriden when peer is in backoff mode.
pub shared_mempool_tick_interval_ms: u64,
/// when a transaction gets automatically garbage collected by system. Different than user tx expiry which has separate GC
pub system_transaction_timeout_secs: u64,
/// tick interval for system GC.
pub system_transaction_gc_interval_ms: u64,
}

Expand Down
5 changes: 5 additions & 0 deletions config/src/config/state_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ pub struct StateSyncConfig {
// The timeout of the state sync client to process a commit notification (in milliseconds)
pub client_commit_timeout_ms: u64,
// default timeout used for long polling to remote peer
// this is only used by fullnodes
pub long_poll_timeout_ms: u64,
// valid maximum chunk limit for sanity check
pub max_chunk_limit: u64,
// valid maximum timeout limit for sanity check
// This timeout applies to the process_request_for_target_and_highest
// if the chunk cannot be applied now, then insert it in a subscription to appply. The subscription expires at max_timeout_ms
pub max_timeout_ms: u64,
// The timeout of the state sync coordinator to receive a commit ack from mempool (in milliseconds)
// Stops sending
pub mempool_commit_timeout_ms: u64,
// default timeout to make state sync progress by sending chunk requests to a certain number of networks
// if no progress is made by sending chunk requests to a number of networks,
Expand All @@ -26,6 +30,7 @@ pub struct StateSyncConfig {
// commits when processing a sync request).
pub sync_request_timeout_ms: u64,
// interval used for checking state synchronization progress
// IMPORTANT: the mempool peer ack timeout is determined by 2X this number.
pub tick_interval_ms: u64,
}

Expand Down
6 changes: 3 additions & 3 deletions execution/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ impl<V: VMExecutor> ChunkExecutor for Executor<V> {

let latency = start_time.elapsed();
metrics_timer_vl.observe_duration();
dbg!("verify_chunk latency", &latency);
debug!("verify_chunk latency: {:?}", &latency);

// 3. Execute transactions.

Expand All @@ -632,7 +632,7 @@ impl<V: VMExecutor> ChunkExecutor for Executor<V> {

let latency = start_time.elapsed();
metrics_timer_el.observe_duration();
dbg!("execute_chunk latency", &latency);
debug!("execute_chunk latency: {:?}", &latency);


// temp time the transaction execution.
Expand All @@ -657,7 +657,7 @@ impl<V: VMExecutor> ChunkExecutor for Executor<V> {

let latency = start_time.elapsed();
metrics_timer_stxl.observe_duration();
dbg!("save_transactions latency", &latency);
debug!("save_transactions latency: {:?}", &latency);

// 5. Cache maintenance.
let output_trees = output.executed_trees().clone();
Expand Down
Binary file modified language/diem-framework/staged/stdlib.mv
Binary file not shown.
14 changes: 3 additions & 11 deletions language/diem-vm/src/diem_transaction_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,8 @@ impl DiemVM {
.map(|_return_vals| ())
.or_else(|e| {
println!("error here\n");
dbg!(&proposer);
dbg!(&previous_vote);
debug!("proposer: {:?}", &proposer);
debug!("previous vote: {:?}", &previous_vote);

expect_only_successful_execution(e, BLOCK_PROLOGUE.as_str(), log_context)
})?;
Expand Down Expand Up @@ -777,16 +777,8 @@ impl DiemVM {
let (vm_status, output, sender) =
self.execute_single_transaction(&txn, data_cache, &log_context)?;

// match &txn {
// PreprocessedTransaction::UserTransaction(t) => {
// dbg!(&t.sequence_number());
// },
// _ => {},
// };
// dbg!("tx sender", &sender);
// let latency = start_time.elapsed();
metric_single_tx_lat.observe_duration();
// dbg!("single tx latency", &latency);


if !output.status().is_discarded() {
data_cache.push_write_set(output.write_set());
Expand Down
73 changes: 73 additions & 0 deletions mempool/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ pub const STATE_SYNC_EVENT_LABEL: &str = "state_sync";
pub const RECONFIG_EVENT_LABEL: &str = "reconfig";
pub const PEER_BROADCAST_EVENT_LABEL: &str = "peer_broadcast";

//////// 0L ////////
pub const CONSENSUS_REQUEST_LABEL: &str = "consensus_request";


// task spawn stage labels
pub const SPAWN_LABEL: &str = "spawn";
pub const START_LABEL: &str = "start";
Expand Down Expand Up @@ -449,3 +453,72 @@ pub static MAIN_LOOP: Lazy<DurationHistogram> = Lazy::new(|| {
.unwrap(),
)
});



//////// 0L ////////
/// Counter for my node
pub static SELF_REQUEST_BACKOFF: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"diem_mempool_self_request_backoff",
"Number of times my node requested backoff"
)
.unwrap()
});

pub static COORDINATOR_HANDLE_CLIENT_EVENT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"diem_mempool_coordinator_handle_client_event",
"Number of times a client event was handled in mempool"
)
.unwrap()
});

pub static COORDINATOR_HANDLE_CONSENSUS_EVENT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"diem_mempool_coordinator_handle_consensus_event",
"Number of times a consensus event was handled in mempool"
)
.unwrap()
});

pub static COORDINATOR_HANDLE_STATE_SYNC_EVENT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"diem_mempool_coordinator_handle_state_sync_event",
"Number of times a state-sync event was handled in mempool"
)
.unwrap()
});

pub static COORDINATOR_HANDLE_MEMPOOL_RECONFIG_EVENT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"diem_mempool_coordinator_handle_mempool_reconfig_event",
"Number of times a mempool reconfiguration event was handled in mempool"
)
.unwrap()
});

pub static TASKS_PROCESS_TX_BROADCAST_EVENT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"diem_mempool_tasks_process_tx_broadcast_event",
"Number of times a transaction broadcast event was handled in mempool"
)
.unwrap()
});

pub static TASKS_PROCESS_CONSENSUS_REQUEST_EVENT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"diem_mempool_tasks_process_consensus_request_event",
"Number of times a consensus request was processed in mempool"
)
.unwrap()
});

pub static PEER_MANAGER_PEER_REQUESTED_BACKOFF: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"diem_mempool_peer_requested_backoff",
"Number of backoff requests from peers",
&["network", "peer"]
)
.unwrap()
});
38 changes: 37 additions & 1 deletion mempool/src/shared_mempool/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,36 @@ pub(crate) async fn coordinator<V>(
let _timer = counters::MAIN_LOOP.start_timer();
::futures::select! {
(msg, callback) = client_events.select_next_some() => {
debug!("handle_client_event");
counters::COORDINATOR_HANDLE_CLIENT_EVENT.inc();
handle_client_event(&mut smp, &bounded_executor, msg, callback).await;
},
// 0L TODO: execute mempool tasks in a bounded execution with capacity.
msg = consensus_requests.select_next_some() => {
tasks::process_consensus_request(&smp.mempool, msg).await;
debug!("process_consensus_request");
counters::COORDINATOR_HANDLE_CONSENSUS_EVENT.inc();
//////// 0L ////////
// The goal here is to put consensus requests also in a Tokio Semaphore (diem BoundedExecutor) where we can control the amount of workers and put backpressure.

handle_consensus_request(&mut smp, &bounded_executor, msg).await;
// tasks::process_consensus_request(&smp.mempool, msg).await;
}
msg = state_sync_requests.select_next_some() => {
debug!("state_sync_requests");
counters::COORDINATOR_HANDLE_STATE_SYNC_EVENT.inc();
handle_state_sync_request(&mut smp, msg);
}
config_update = mempool_reconfig_events.select_next_some() => {
debug!("handle_mempool_reconfig_event");
counters::COORDINATOR_HANDLE_MEMPOOL_RECONFIG_EVENT.inc();
handle_mempool_reconfig_event(&mut smp, &bounded_executor, config_update).await;
},
(peer, backoff) = scheduled_broadcasts.select_next_some() => {
tasks::execute_broadcast(peer, backoff, &mut smp, &mut scheduled_broadcasts, executor.clone());
},
(network_id, event) = events.select_next_some() => {
// dbg!("handle_event", &event.);

handle_event(&executor, &bounded_executor, &mut scheduled_broadcasts, &mut smp, network_id, event).await;
},
complete => break,
Expand Down Expand Up @@ -124,6 +139,27 @@ async fn handle_client_event<V>(
.await;
}

//////// 0L ////////
async fn handle_consensus_request<V>(
smp: &mut SharedMempool<V>,
bounded_executor: &BoundedExecutor,
msg: ConsensusRequest,
) where
V: TransactionValidation,
{
// This timer measures how long it took for the bounded executor to *schedule* the
// task.
let _timer =
counters::task_spawn_latency_timer(counters::CONSENSUS_REQUEST_LABEL, counters::SPAWN_LABEL);
// This timer measures how long it took for the task to go from scheduled to started.
let _task_start_timer =
counters::task_spawn_latency_timer(counters::CONSENSUS_REQUEST_LABEL, counters::START_LABEL);

bounded_executor
.spawn(tasks::process_consensus_request(smp.clone(), msg))
.await;
}

fn handle_state_sync_request<V>(smp: &mut SharedMempool<V>, msg: CommitNotification)
where
V: TransactionValidation,
Expand Down
18 changes: 16 additions & 2 deletions mempool/src/shared_mempool/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub(crate) type PeerSyncStates = HashMap<PeerNetworkId, PeerSyncState>;
/// State of last sync with peer:
/// `timeline_id` is position in log of ready transactions
/// `is_alive` - is connection healthy
#[derive(Clone)]
#[derive(Clone, Debug)]
pub(crate) struct PeerSyncState {
pub timeline_id: u64,
pub is_alive: bool,
Expand Down Expand Up @@ -81,7 +81,7 @@ impl Ord for BatchId {
}

/// Txn broadcast-related info for a given remote peer.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct BroadcastInfo {
// Sent broadcasts that have not yet received an ack.
pub sent_batches: BTreeMap<BatchId, SystemTime>,
Expand Down Expand Up @@ -140,6 +140,7 @@ impl PeerManager {

/// Disables a peer if it can be restarted, otherwise removes it
pub fn disable_peer(&self, peer: PeerNetworkId) {
error!("shared mempool disable peer {:?}", &peer);
// Remove all state on the peer, and start over
self.peer_states.lock().remove(&peer);
counters::active_upstream_peers(&peer.raw_network_id()).dec();
Expand All @@ -150,6 +151,7 @@ impl PeerManager {

pub fn is_backoff_mode(&self, peer: &PeerNetworkId) -> bool {
if let Some(state) = self.peer_states.lock().get(peer) {
warn!("shared mempool is in backoff mode for peer: {:?} ", &peer);
state.broadcast_info.backoff_mode
} else {
// If we don't have sync state, we shouldn't backoff
Expand All @@ -165,6 +167,10 @@ impl PeerManager {
) where
V: TransactionValidation,
{

// dbg!("execute broadcast");
// dbg!(&self.peer_states);

// Start timer for tracking broadcast latency.
let start_time = Instant::now();

Expand All @@ -178,6 +184,7 @@ impl PeerManager {

// Only broadcast to peers that are alive.
if !state.is_alive {
error!("shared mempool peer is not alive: {:?}", &state.metadata);
return;
}

Expand Down Expand Up @@ -242,6 +249,7 @@ impl PeerManager {
// This helps rate-limit egress network bandwidth and not overload a remote peer or this
// node's Diem network sender.
if pending_broadcasts >= self.mempool_config.max_broadcasts_per_peer {
error!("will stop broadcasting shared mempool to peer: {:?}", &peer);
return;
}
}
Expand Down Expand Up @@ -370,6 +378,7 @@ impl PeerManager {
let _ = std::mem::replace(&mut *prioritized_peers, peers);
}

/// Node receives ack from peer.
pub fn process_broadcast_ack(
&self,
peer: PeerNetworkId,
Expand Down Expand Up @@ -433,6 +442,11 @@ impl PeerManager {
// as a backoff broadcast.
// This ensures backpressure request from remote peer is honored at least once.
if backoff {
counters::PEER_MANAGER_PEER_REQUESTED_BACKOFF.with_label_values(&[
&peer.raw_network_id().to_string(),
&peer.peer_id().to_string(),
]).inc();
error!("Peer requested backoff: {:?}", &peer);
sync_state.broadcast_info.backoff_mode = true;
}
}
Expand Down
Loading

0 comments on commit 2e9b257

Please sign in to comment.