Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Fix code review issues
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitry-markin committed Dec 26, 2022
1 parent 999b1a1 commit 80082f1
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 49 deletions.
64 changes: 45 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ targets = ["x86_64-unknown-linux-gnu"]
array-bytes = "4.1"
async-trait = "0.1"
asynchronous-codec = "0.6"
backtrace = "0.3.67"
bytes = "1"
codec = { package = "parity-scale-codec", version = "3.0.0", features = ["derive"] }
either = "1.5.3"
Expand Down
21 changes: 8 additions & 13 deletions client/network/src/service/out_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
//! - Send events by calling [`OutChannels::send`]. Events are cloned for each sender in the
//! collection.
use backtrace::Backtrace;
use futures::{channel::mpsc, prelude::*, ready, stream::FusedStream};
use log::error;
use parking_lot::Mutex;
use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64};
use sc_network_common::protocol::event::Event;
use std::{
backtrace::{Backtrace, BacktraceStatus},
cell::RefCell,
fmt,
pin::Pin,
Expand All @@ -62,7 +62,7 @@ pub fn channel(name: &'static str, queue_size_warning: i64) -> (Sender, Receiver
queue_size: queue_size.clone(),
queue_size_warning,
warning_fired: false,
creation_backtrace: Backtrace::capture(),
creation_backtrace: Backtrace::new_unresolved(),
metrics: metrics.clone(),
};
let rx = Receiver { inner: rx, name, queue_size, metrics };
Expand Down Expand Up @@ -193,17 +193,12 @@ impl OutChannels {
let queue_size = sender.queue_size.fetch_add(1, Ordering::Relaxed);
if queue_size == sender.queue_size_warning && !sender.warning_fired {
sender.warning_fired = true;
match sender.creation_backtrace.status() {
BacktraceStatus::Captured => error!(
"The number of unprocessed events in channel `{}` reached {}.\n\
The channel was created at:\n{}",
sender.name, sender.queue_size_warning, sender.creation_backtrace,
),
_ => error!(
"The number of unprocessed events in channel `{}` reached {}.",
sender.name, sender.queue_size_warning,
),
}
sender.creation_backtrace.resolve();
error!(
"The number of unprocessed events in channel `{}` reached {}.\n\
The channel was created at:\n{:?}",
sender.name, sender.queue_size_warning, sender.creation_backtrace,
);
}
sender.inner.unbounded_send(event.clone()).is_ok()
});
Expand Down
1 change: 1 addition & 0 deletions client/utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ description = "I/O for Substrate runtimes"
readme = "README.md"

[dependencies]
backtrace = "0.3.67"
futures = "0.3.21"
futures-timer = "3.0.2"
lazy_static = "1.4.0"
Expand Down
31 changes: 14 additions & 17 deletions client/utils/src/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ mod inner {
mod inner {
// tracing implementation
use crate::metrics::UNBOUNDED_CHANNELS_COUNTER;
use backtrace::Backtrace;
use futures::{
channel::mpsc::{
self, SendError, TryRecvError, TrySendError, UnboundedReceiver, UnboundedSender,
Expand All @@ -47,11 +48,10 @@ mod inner {
};
use log::error;
use std::{
backtrace::{Backtrace, BacktraceStatus},
pin::Pin,
sync::{
atomic::{AtomicBool, AtomicI64, Ordering},
Arc,
Arc, Mutex,
},
};

Expand All @@ -67,7 +67,7 @@ mod inner {
queue_size: Arc<AtomicI64>,
queue_size_warning: i64,
warning_fired: Arc<AtomicBool>,
creation_backtrace: Arc<Backtrace>,
creation_backtrace: Arc<Mutex<Backtrace>>,
}

// Strangely, deriving `Clone` requires that `T` is also `Clone`.
Expand Down Expand Up @@ -108,7 +108,7 @@ mod inner {
queue_size: queue_size.clone(),
queue_size_warning,
warning_fired: Arc::new(AtomicBool::new(false)),
creation_backtrace: Arc::new(Backtrace::capture()),
creation_backtrace: Arc::new(Mutex::new(Backtrace::new_unresolved())),
};
let receiver = TracingUnboundedReceiver { inner: r, name, queue_size };
(sender, receiver)
Expand Down Expand Up @@ -149,23 +149,20 @@ mod inner {

let queue_size = self.queue_size.fetch_add(1, Ordering::Relaxed);
if queue_size == self.queue_size_warning &&
!self.warning_fired.load(Ordering::Relaxed)
self.warning_fired
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
// `warning_fired` and `queue_size` are not synchronized, so it's possible
// that the warning is fired few times before the `warning_fired` is seen
// by all threads. This seems better than introducing a mutex guarding them.
self.warning_fired.store(true, Ordering::Relaxed);
match self.creation_backtrace.status() {
BacktraceStatus::Captured => error!(
"The number of unprocessed messages in channel `{}` reached {}.\n\
The channel was created at:\n{}",
self.name, self.queue_size_warning, self.creation_backtrace,
),
_ => error!(
"The number of unprocessed messages in channel `{}` reached {}.",
self.name, self.queue_size_warning,
),
}
let mut bt = self.creation_backtrace.lock().expect("another thread panicked.");
bt.resolve();
error!(
"The number of unprocessed messages in channel `{}` reached {}.\n\
The channel was created at:\n{:?}",
self.name, self.queue_size_warning, bt,
);
}

s
Expand Down

0 comments on commit 80082f1

Please sign in to comment.