Skip to content

Commit

Permalink
Monitor most spawned futures
Browse files Browse the repository at this point in the history
  • Loading branch information
mystenmark committed Nov 1, 2022
1 parent d1d31a5 commit 0c36722
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 57 deletions.
9 changes: 5 additions & 4 deletions crates/sui-core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use prometheus::{
use std::future::Future;
use std::ops::Deref;
use std::{collections::HashSet, sync::Arc, time::Duration};
use sui_metrics::monitored_future;
use sui_types::committee::StakeUnit;
use sui_types::{
base_types::{AuthorityName, ExecutionDigests},
Expand Down Expand Up @@ -205,7 +206,7 @@ async fn follower_process<A, Handler: DigestHandler<A> + Clone>(
peer_names.insert(name);
let local_active_ref_copy = local_active.clone();
let handler_clone = handler.clone();
gossip_tasks.push(async move {
gossip_tasks.push(monitored_future!(async move {
let follower = Follower::new(name, &local_active_ref_copy);
// Add more duration if we make more than 1 to ensure overlap
debug!(peer = ?name, "Starting gossip from peer");
Expand All @@ -215,7 +216,7 @@ async fn follower_process<A, Handler: DigestHandler<A> + Clone>(
handler_clone,
)
.await
});
}));
k += 1;

// If we have already used all the good stake, then stop here and
Expand Down Expand Up @@ -445,10 +446,10 @@ where
metrics.total_tx_received.inc();

let fut = handler.handle_digest(self, digests).await?;
results.push_back(async move {
results.push_back(monitored_future!(async move {
fut.await?;
Ok::<(TxSequenceNumber, ExecutionDigests), SuiError>((seq, digests))
});
}));

self.state.metrics.gossip_queued_count.inc();
},
Expand Down
14 changes: 7 additions & 7 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use move_core_types::value::MoveStructLayout;
use mysten_network::config::Config;
use sui_config::genesis::Genesis;
use sui_config::NetworkConfig;
use sui_metrics::spawn_monitored_task;
use sui_metrics::{monitored_future, spawn_monitored_task};
use sui_network::{
default_mysten_network_config, DEFAULT_CONNECT_TIMEOUT_SEC, DEFAULT_REQUEST_TIMEOUT_SEC,
};
Expand Down Expand Up @@ -744,14 +744,14 @@ where
.map(|name| {
let client = &self.authority_clients[name];
let execute = map_each_authority.clone();
async move {
monitored_future!(async move {
(
*name,
execute(*name, client)
.instrument(tracing::trace_span!("quorum_map_auth", authority =? name.concise()))
.await,
)
}
})
})
.collect();

Expand Down Expand Up @@ -818,19 +818,19 @@ where

let start_req = |name: AuthorityName, client: SafeClient<A>| {
let map_each_authority = map_each_authority.clone();
Box::pin(async move {
Box::pin(monitored_future!(async move {
trace!(?name, now = ?tokio::time::Instant::now() - start, "new request");
let map = map_each_authority(name, client);
Event::Request(name, timeout(timeout_each_authority, map).await)
})
}))
};

let schedule_next = || {
let delay = self.timeouts.serial_authority_request_interval;
Box::pin(async move {
Box::pin(monitored_future!(async move {
sleep(delay).await;
Event::StartNext
})
}))
};

// This process is intended to minimize latency in the face of unreliable authorities,
Expand Down
9 changes: 5 additions & 4 deletions crates/sui-core/src/consensus_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher},
};
use sui_metrics::spawn_monitored_task;
use sui_metrics::{monitored_future, spawn_monitored_task};
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use sui_types::messages_checkpoint::SignedCheckpointFragmentMessage;
use sui_types::{
Expand Down Expand Up @@ -408,12 +408,12 @@ impl ConsensusListener {
list.push((id, replier));

// Register with the close notification.
closed_notifications.push(async move {
closed_notifications.push(monitored_future!(async move {
// Wait for the channel to close
_closer.closed().await;
// Return he digest concerned
(digest, id)
});
}));
},
ConsensusListenerMessage::Processed(serialized) => {
let digest = Self::hash_serialized_transaction(&serialized);
Expand Down Expand Up @@ -591,7 +591,8 @@ impl CheckpointConsensusAdapter {
let deliver = (serialized, sequence_number);
let timeout_delay =
Duration::from_millis(latency_estimate) + self.retry_delay;
let future = Self::waiter(waiter, timeout_delay, deliver);
let future =
monitored_future!(Self::waiter(waiter, timeout_delay, deliver));
waiting.push(future);

// Finally sent to consensus, after registering to avoid a race condition
Expand Down
19 changes: 11 additions & 8 deletions crates/sui-core/src/node_sync/node_follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
authority_active::gossip::GossipMetrics, authority_aggregator::AuthorityAggregator,
authority_client::AuthorityAPI, safe_client::SafeClient,
};
use sui_metrics::monitored_future;
use sui_storage::node_sync_store::NodeSyncStore;
use sui_types::{
base_types::{AuthorityName, EpochId, ExecutionDigests},
Expand Down Expand Up @@ -75,7 +76,7 @@ async fn follower_process<A, Handler>(
let start_one_task = |name, handle, store| {
let start_time = Instant::now();
let client = aggregator.clone_client(name);
async move {
monitored_future!(async move {
let result = follow_one_peer(
handle,
store,
Expand All @@ -88,7 +89,7 @@ async fn follower_process<A, Handler>(
.await
.tap_err(|e| warn!(peer=?name, "follower task exited with error {}", e));
(result, start_time, name)
}
})
};

for (name, _) in aggregator.committee.members() {
Expand Down Expand Up @@ -128,10 +129,10 @@ async fn follower_process<A, Handler>(

info!(?peer, ?delay, "will restart task after delay");

reconnects.push(async move {
reconnects.push(monitored_future!(async move {
sleep(delay).await;
peer
});
}));
}

Some(reconnect) = reconnects.next() => {
Expand Down Expand Up @@ -196,10 +197,12 @@ where
// Global timeout, we do not exceed this time in this task.
let mut results = FuturesUnordered::new();

let result_block = |fut, seq, digests| async move {
fut.await?;
trace!(?peer, ?seq, ?digests, "digest handler finished");
Ok::<TxSequenceNumber, SuiError>(seq)
let result_block = |fut, seq, digests| {
monitored_future!(async move {
fut.await?;
trace!(?peer, ?seq, ?digests, "digest handler finished");
Ok::<TxSequenceNumber, SuiError>(seq)
})
};

// Using a macro to avoid duplicating code - was much too difficult to satisfy the borrow
Expand Down
10 changes: 5 additions & 5 deletions narwhal/executor/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use async_trait::async_trait;
use fastcrypto::hash::Hash;
use rand::prelude::SliceRandom;
use rand::rngs::ThreadRng;
use sui_metrics::spawn_monitored_task;
use sui_metrics::{monitored_future, spawn_monitored_task};
use tokio::time::Instant;
use tokio::{
sync::{oneshot, watch},
Expand Down Expand Up @@ -199,10 +199,10 @@ impl<Network: SubscriberNetwork> Fetcher<Network> {
};
workers.shuffle(&mut ThreadRng::default());
debug!("Scheduling fetching batch {}", digest);
ret.push(
self.fetch_payload(*digest, *worker_id, workers)
.map(move |batch| (batch_index, batch)),
);
let fut = self
.fetch_payload(*digest, *worker_id, workers)
.map(move |batch| (batch_index, batch));
ret.push(monitored_future!(fut));
}

ret
Expand Down
40 changes: 25 additions & 15 deletions narwhal/primary/src/block_synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::{
};
use storage::CertificateStore;
use store::Store;
use sui_metrics::spawn_monitored_task;
use sui_metrics::{monitored_future, spawn_monitored_task};
use thiserror::Error;
use tokio::{
sync::{
Expand Down Expand Up @@ -423,15 +423,17 @@ impl BlockSynchronizer {
.collect();

// Now create the future that will send the requests.
let timeout = self.payload_availability_timeout;
let network = self.network.network();
Some(
Self::send_payload_availability_requests(
self.payload_availability_timeout,
monitored_future!(Self::send_payload_availability_requests(
timeout,
key,
certificates_to_sync,
request,
primaries,
self.network.network(),
)
network,
))
.boxed(),
)
}
Expand Down Expand Up @@ -490,16 +492,19 @@ impl BlockSynchronizer {
self.map_certificate_responses_senders.insert(key, sender);

// now create the future that will wait to gather the responses
let timeout = self.certificates_synchronize_timeout;
let committee = self.committee.clone();
let worker_cache = self.worker_cache.clone();
Some(
Self::wait_for_certificate_responses(
self.certificates_synchronize_timeout,
monitored_future!(Self::wait_for_certificate_responses(
timeout,
key,
self.committee.clone(),
self.worker_cache.clone(),
committee,
worker_cache,
to_sync,
primaries,
receiver,
)
))
.boxed(),
)
}
Expand Down Expand Up @@ -666,12 +671,14 @@ impl BlockSynchronizer {
.unique_values()
.into_iter()
.map(|certificate| {
Self::wait_for_block_payload(
self.payload_synchronize_timeout,
let timeout = self.payload_synchronize_timeout;
let payload_store = self.payload_store.clone();
monitored_future!(Self::wait_for_block_payload(
timeout,
request_id,
self.payload_store.clone(),
payload_store,
certificate,
)
))
.boxed()
})
.collect()
Expand Down Expand Up @@ -889,7 +896,10 @@ impl BlockSynchronizer {
let peer = network.waiting_peer(id);
let request =
anemo::Request::new(request.clone()).with_timeout(fetch_certificates_timeout);
get_payload_availability_fn(PrimaryToPrimaryClient::new(peer), request)
monitored_future!(get_payload_availability_fn(
PrimaryToPrimaryClient::new(peer),
request
))
})
.collect();
let mut peers = Peers::<Certificate>::new(SmallRng::from_entropy());
Expand Down
6 changes: 4 additions & 2 deletions narwhal/primary/src/certificate_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use network::{P2pNetwork, PrimaryToPrimaryRpc};
use rand::{rngs::ThreadRng, seq::SliceRandom};
use std::{collections::BTreeMap, future::pending, sync::Arc, time::Duration};
use storage::CertificateStore;
use sui_metrics::spawn_monitored_task;
use sui_metrics::{monitored_future, spawn_monitored_task};
use tokio::{
sync::{oneshot, watch},
task::{JoinError, JoinHandle},
Expand Down Expand Up @@ -382,7 +382,9 @@ async fn fetch_certificates_helper(
for peer in peers.iter() {
let network = network.network();
let request = request.clone();
fut.push(async move { network.fetch_certificates(peer, request).await });
fut.push(monitored_future!(async move {
network.fetch_certificates(peer, request).await
}));
let mut interval = Box::pin(time::sleep(request_interval));
tokio::select! {
res = fut.next() => match res {
Expand Down
12 changes: 7 additions & 5 deletions narwhal/primary/src/header_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
};
use storage::CertificateStore;
use store::Store;
use sui_metrics::spawn_monitored_task;
use sui_metrics::{monitored_future, spawn_monitored_task};
use tokio::{
sync::{oneshot, watch},
task::JoinHandle,
Expand Down Expand Up @@ -216,12 +216,13 @@ impl HeaderWaiter {
// its parents are in the store.
let (tx_cancel, rx_cancel) = oneshot::channel();
self.pending.insert(header_id, (round, tx_cancel));
let fut = Self::wait_for_batches(
let payload_store = self.payload_store.clone();
let fut = monitored_future!(Self::wait_for_batches(
requires_sync,
synchronize_handles,
self.payload_store.clone(),
payload_store,
header,
rx_cancel);
rx_cancel));
// pointer-size allocation, bounded by the # of blocks
// (may eventually go away, see rust RFC #1909)
waiting.push(Box::pin(fut));
Expand All @@ -245,7 +246,8 @@ impl HeaderWaiter {
let wait_for = missing.clone();
let (tx_cancel, rx_cancel) = oneshot::channel();
self.pending.insert(header_id, (round, tx_cancel));
let fut = Self::wait_for_parents(wait_for, self.certificate_store.clone(), header, rx_cancel);
let certificate_store = self.certificate_store.clone();
let fut = monitored_future!(Self::wait_for_parents(wait_for, certificate_store, header, rx_cancel));
// pointer-size allocation, bounded by the # of blocks (may eventually go away, see rust RFC #1909)
waiting.push(Box::pin(fut));

Expand Down
6 changes: 4 additions & 2 deletions narwhal/worker/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use types::{
WorkerToWorkerClient,
};

use sui_metrics::monitored_future;

#[cfg(test)]
#[path = "tests/handlers_tests.rs"]
pub mod handlers_tests;
Expand Down Expand Up @@ -172,11 +174,11 @@ impl PrimaryToWorker for PrimaryReceiverHandler {
let request_batch_fn =
|mut client: WorkerToWorkerClient<anemo::Peer>, batch_request, timeout| {
// Wrapper function enables us to move `client` into the future.
async move {
monitored_future!(async move {
client
.request_batch(anemo::Request::new(batch_request).with_timeout(timeout))
.await
}
})
};
if first_attempt {
// Send first sync request to a single node.
Expand Down
Loading

0 comments on commit 0c36722

Please sign in to comment.