Skip to content

Commit

Permalink
Emit warnings when locks are held for too long
Browse files Browse the repository at this point in the history
- Add read_with_warn() and write_with_warn() methods to RwLock
  - They emit warnings if lock acqusition takes over 100ms
  - They emit warnings if locks are held for over 100ms

- Use read_with_warn() and write_with_warn() in most places RwLock's are accessed

Using these methods I was able to immediately identify the cause of a
deadlock during boot.
  • Loading branch information
lithp committed Oct 22, 2021
1 parent 07d5813 commit 140d2d1
Show file tree
Hide file tree
Showing 12 changed files with 218 additions and 43 deletions.
5 changes: 3 additions & 2 deletions ethportal-peertest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use ethportal_peertest::events::PortalnetEvents;
use ethportal_peertest::jsonrpc::{
test_jsonrpc_endpoints_over_http, test_jsonrpc_endpoints_over_ipc,
};
use trin_core::locks::RwLoggingExt;
use trin_core::portalnet::{
discovery::Discovery,
overlay::{OverlayConfig, OverlayProtocol},
Expand All @@ -30,10 +31,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
};

let discovery = Arc::new(RwLock::new(Discovery::new(portal_config).unwrap()));
discovery.write().await.start().await.unwrap();
discovery.write_with_warn().await.start().await.unwrap();

let db = Arc::new(setup_overlay_db(
discovery.read().await.local_enr().node_id(),
discovery.read_with_warn().await.local_enr().node_id(),
));

let overlay = Arc::new(
Expand Down
5 changes: 3 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use tokio::sync::RwLock;

use trin_core::jsonrpc::handlers::JsonRpcHandler;
use trin_core::jsonrpc::types::PortalJsonRpcRequest;
use trin_core::locks::RwLoggingExt;
use trin_core::portalnet::events::PortalnetEvents;
use trin_core::{
cli::{TrinConfig, HISTORY_NETWORK, STATE_NETWORK},
Expand Down Expand Up @@ -52,11 +53,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let discovery = Arc::new(RwLock::new(
Discovery::new(portalnet_config.clone()).unwrap(),
));
discovery.write().await.start().await.unwrap();
discovery.write_with_warn().await.start().await.unwrap();

// Setup Overlay database
let db = Arc::new(setup_overlay_db(
discovery.read().await.local_enr().node_id(),
discovery.read_with_warn().await.local_enr().node_id(),
));

debug!("Selected networks to spawn: {:?}", trin_config.networks);
Expand Down
5 changes: 3 additions & 2 deletions trin-core/src/jsonrpc/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tokio::sync::RwLock;

use crate::jsonrpc::endpoints::{Discv5Endpoint, HistoryEndpoint, StateEndpoint, TrinEndpoint};
use crate::jsonrpc::types::{HistoryJsonRpcRequest, PortalJsonRpcRequest, StateJsonRpcRequest};
use crate::locks::RwLoggingExt;
use crate::portalnet::discovery::Discovery;

type Responder<T, E> = mpsc::UnboundedSender<Result<T, E>>;
Expand All @@ -25,9 +26,9 @@ impl JsonRpcHandler {
while let Some(request) = self.portal_jsonrpc_rx.recv().await {
let response: Value = match request.endpoint {
TrinEndpoint::Discv5Endpoint(endpoint) => match endpoint {
Discv5Endpoint::NodeInfo => self.discovery.read().await.node_info(),
Discv5Endpoint::NodeInfo => self.discovery.read_with_warn().await.node_info(),
Discv5Endpoint::RoutingTableInfo => {
self.discovery.write().await.routing_table_info()
self.discovery.write_with_warn().await.routing_table_info()
}
},
TrinEndpoint::HistoryEndpoint(endpoint) => {
Expand Down
1 change: 1 addition & 0 deletions trin-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ extern crate lazy_static;

pub mod cli;
pub mod jsonrpc;
pub mod locks;
pub mod portalnet;
pub mod socket;
pub mod utils;
153 changes: 153 additions & 0 deletions trin-core/src/locks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use futures::future::FutureExt;
use std::future::Future;
use std::marker::Sync;
use std::ops::Deref;
use std::ops::DerefMut;
use std::panic::Location;
use std::pin::Pin;
use std::time::Duration;
use std::time::Instant;
use tokio::sync::RwLock;
use tokio::sync::RwLockReadGuard;
use tokio::sync::RwLockWriteGuard;
use tokio::task::JoinHandle;

const ACQUIRE_TIMEOUT_MS: u64 = 100;
const HOLD_TIMEOUT_MS: u64 = 100;

/// Tries to look exactly like a T, by implementing Deref and DerefMut, but emits
/// a warning if drop() is not called soon enough.
pub struct TimedGuard<T> {
inner: T,
acquisition_line: u32,
acquisition_file: &'static str,
acquisition_time: Instant,
sleep_task: JoinHandle<()>,
}

impl<T> TimedGuard<T> {
fn new(inner: T, acquisition_line: u32, acquisition_file: &'static str) -> TimedGuard<T> {
let now = Instant::now();
let move_line = acquisition_line;
let move_file = acquisition_file;
let handle = tokio::spawn(async move {
sleep_then_log(move_file, move_line).await;
});

TimedGuard {
inner,
acquisition_line,
acquisition_file,
acquisition_time: now,
sleep_task: handle,
}
}
}

impl<T> Deref for TimedGuard<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<T> DerefMut for TimedGuard<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

impl<T> Drop for TimedGuard<T> {
fn drop(&mut self) {
self.sleep_task.abort();
let held_for = self.acquisition_time.elapsed().as_millis();
if held_for > HOLD_TIMEOUT_MS.into() {
log::warn!(
"[{}:{}] lock held for too long: {}ms",
self.acquisition_file,
self.acquisition_line,
held_for,
)
}
}
}

async fn sleep_then_log(file: &'static str, line: u32) {
tokio::time::sleep(Duration::from_millis(HOLD_TIMEOUT_MS)).await;
log::warn!(
"[{}:{}] lock held for over {}ms, not yet released",
file,
line,
HOLD_TIMEOUT_MS.to_string()
);
}

async fn try_lock<T, Fut>(fut: Fut, file: &'static str, line: u32) -> TimedGuard<T>
where
Fut: Future<Output = T>,
{
let acquire_timeout = Duration::from_millis(ACQUIRE_TIMEOUT_MS);
let sleep = tokio::time::sleep(acquire_timeout).fuse();
let fused = fut.fuse();

futures::pin_mut!(sleep, fused);

let now = Instant::now();

futures::select! {
_ = sleep => {
log::warn!(
"[{}:{}] waiting more than {}ms to acquire lock, still waiting",
file, line, ACQUIRE_TIMEOUT_MS,
);
},
guard = fused => {
return TimedGuard::new(guard, line, file);
}
}

let guard = fused.await;
let wait_time = now.elapsed().as_millis();
log::warn!("[{}:{}] waited {}ms to acquire lock", file, line, wait_time);

TimedGuard::new(guard, line, file)
}

// this is a workaround:
// - Rust does not support async in traits
// https://rust-lang.github.io/async-book/07_workarounds/05_async_in_traits.html
// - async_trait does not give us enough flexibility to implement #[track_caller]
//
// So we manually desugar the async functions and have them return futures
type Async<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;

/// These methods should be used in favor of the stock read() and write() methods.
///
/// These methods emit warnings when the lock takes too long to acquire (meaning it's
/// likely some other user is holding onto the lock for too long).
///
/// They also emit warnings when the returned TimedGuard is kept alive for too long.
/// (The lock is held until the returned TimedGuard is dropped, so it should be dropped
/// as soon as possible!)
pub trait RwLoggingExt<T> {
#[track_caller]
fn read_with_warn(&self) -> Async<TimedGuard<RwLockReadGuard<T>>>;

#[track_caller]
fn write_with_warn(&self) -> Async<TimedGuard<RwLockWriteGuard<T>>>;
}

impl<T: Send + Sync> RwLoggingExt<T> for RwLock<T> {
#[track_caller]
fn read_with_warn(&self) -> Async<TimedGuard<RwLockReadGuard<T>>> {
let loc = Location::caller();
Box::pin(try_lock(self.read(), loc.file(), loc.line()))
}

#[track_caller]
fn write_with_warn(&self) -> Async<TimedGuard<RwLockWriteGuard<T>>> {
let loc = Location::caller();
Box::pin(try_lock(self.write(), loc.file(), loc.line()))
}
}
3 changes: 2 additions & 1 deletion trin-core/src/portalnet/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use super::{
utp::{UtpListener, UTP_PROTOCOL},
};
use crate::cli::{HISTORY_NETWORK, STATE_NETWORK};
use crate::locks::RwLoggingExt;
use std::collections::HashMap;
use std::convert::TryInto;

Expand All @@ -28,7 +29,7 @@ impl PortalnetEvents {
state_sender: Option<mpsc::UnboundedSender<TalkRequest>>,
) -> Self {
let protocol_receiver = discovery
.write()
.write_with_warn()
.await
.discv5
.event_stream()
Expand Down
29 changes: 18 additions & 11 deletions trin-core/src/portalnet/overlay.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::locks::RwLoggingExt;
use crate::utils::xor_two_values;

use super::{
Expand Down Expand Up @@ -112,7 +113,12 @@ impl OverlayProtocol {
data_radius: U256,
) -> Self {
let kbuckets = Arc::new(RwLock::new(KBucketsTable::new(
discovery.read().await.local_enr().node_id().into(),
discovery
.read_with_warn()
.await
.local_enr()
.node_id()
.into(),
config.bucket_pending_timeout,
config.max_incoming_per_bucket,
config.table_filter,
Expand Down Expand Up @@ -140,7 +146,7 @@ impl OverlayProtocol {
let response = match request {
Request::Ping(Ping { .. }) => {
debug!("Got overlay ping request {:?}", request);
let enr_seq = self.discovery.read().await.local_enr().seq();
let enr_seq = self.discovery.read_with_warn().await.local_enr().seq();
let payload = CustomPayload::new(self.data_radius().await, None);
Response::Pong(Pong {
enr_seq,
Expand Down Expand Up @@ -181,12 +187,12 @@ impl OverlayProtocol {

/// Returns the local ENR of the node.
pub async fn local_enr(&self) -> Enr {
self.discovery.read().await.discv5.local_enr()
self.discovery.read_with_warn().await.discv5.local_enr()
}

// Returns the data radius of the node.
pub async fn data_radius(&self) -> U256 {
self.data_radius.read().await.clone()
self.data_radius.read_with_warn().await.clone()
}

/// Returns a vector of the ENRs of the closest nodes by the given log2 distances.
Expand All @@ -203,7 +209,7 @@ impl OverlayProtocol {
}

if !log2_distances.is_empty() {
let mut kbuckets = self.kbuckets.write().await;
let mut kbuckets = self.kbuckets.write_with_warn().await;
for node in kbuckets
.nodes_by_distances(&log2_distances, FIND_NODES_MAX_NODES)
.into_iter()
Expand Down Expand Up @@ -247,7 +253,7 @@ impl OverlayProtocol {
/// Returns a vector of all ENR node IDs of nodes currently contained in the routing table.
pub async fn table_entries_id(&self) -> Vec<NodeId> {
self.kbuckets
.write()
.write_with_warn()
.await
.iter()
.map(|entry| *entry.node.key.preimage())
Expand All @@ -257,7 +263,7 @@ impl OverlayProtocol {
/// Returns a vector of all the ENRs of nodes currently contained in the routing table.
pub async fn table_entries_enr(&self) -> Vec<Enr> {
self.kbuckets
.write()
.write_with_warn()
.await
.iter()
.map(|entry| entry.node.value.enr().clone())
Expand All @@ -271,15 +277,16 @@ impl OverlayProtocol {
protocol: ProtocolKind,
payload: Option<Vec<u8>>,
) -> Result<Vec<u8>, SendPingError> {
let enr_seq = self.discovery.read().await.local_enr().seq();
let enr_seq = self.discovery.read_with_warn().await.local_enr().seq();

let payload = CustomPayload::new(data_radius, payload);
let msg = Ping {
enr_seq,
payload: Some(payload),
};
Ok(self
.discovery
.read()
.read_with_warn()
.await
.send_talkreq(
enr,
Expand All @@ -297,7 +304,7 @@ impl OverlayProtocol {
) -> Result<Vec<u8>, RequestError> {
let msg = FindNodes { distances };
self.discovery
.read()
.read_with_warn()
.await
.send_talkreq(
enr,
Expand All @@ -315,7 +322,7 @@ impl OverlayProtocol {
) -> Result<Vec<u8>, RequestError> {
let msg = FindContent { content_key };
self.discovery
.read()
.read_with_warn()
.await
.send_talkreq(
enr,
Expand Down
20 changes: 17 additions & 3 deletions trin-core/src/portalnet/utp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;

use crate::locks::RwLoggingExt;

pub const UTP_PROTOCOL: &str = "utp";
pub const HEADER_SIZE: usize = 20;
pub const MAX_DISCV5_PACKET_SIZE: usize = 1280;
Expand Down Expand Up @@ -433,7 +435,13 @@ impl UtpListener {
}
}
Type::StSyn => {
if let Some(enr) = self.discovery.read().await.discv5.find_enr(&node_id) {
if let Some(enr) = self
.discovery
.read_with_warn()
.await
.discv5
.find_enr(&node_id)
{
// If neither of those cases happened handle this is a new request
let mut conn = UtpStream::init(Arc::clone(&self.discovery), enr);
conn.handle_packet(packet).await;
Expand Down Expand Up @@ -466,7 +474,13 @@ impl UtpListener {

// I am honestly not sure if I should init this with Enr or NodeId since we could use both
async fn connect(&mut self, connection_id: u16, node_id: NodeId) {
if let Some(enr) = self.discovery.read().await.discv5.find_enr(&node_id) {
if let Some(enr) = self
.discovery
.read_with_warn()
.await
.discv5
.find_enr(&node_id)
{
let mut conn = UtpStream::init(Arc::clone(&self.discovery), enr);
conn.make_connection(connection_id).await;
self.utp_connections.insert(
Expand Down Expand Up @@ -587,7 +601,7 @@ impl UtpStream {
}
let talk_request_result = self
.discovery
.read()
.read_with_warn()
.await
.send_talkreq(self.enr.clone(), UTP_PROTOCOL.to_string(), packet.0.clone())
.await;
Expand Down
Loading

0 comments on commit 140d2d1

Please sign in to comment.