Skip to content

Commit

Permalink
[Consensus] subscriber counter to atomically set node status (#19313)
Browse files Browse the repository at this point in the history
## Description 

Looking on the connectivity metrics for the subscribed peers I believe
it's possible to have some race conditions when nodes quickly
connect/disconnect. As the metric is a gauge the last who sets the value
"wins", so it's possible that we might have nodes connecting again while
the earlier connection has not been dropped yet - which consequently
once dropped it will make the peer appear in metrics as disconnected.
The PR is refactoring a bit that part and only sets the peer as
disconnected when there is no other pending connection.

## Test plan 

CI

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
akichidis authored Sep 13, 2024
1 parent e5703e3 commit 71ac187
Showing 1 changed file with 67 additions and 42 deletions.
109 changes: 67 additions & 42 deletions consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,10 @@ impl<C: CoreThreadDispatcher> AuthorityService<C> {
dag_state: Arc<RwLock<DagState>>,
store: Arc<dyn Store>,
) -> Self {
// Set the subscribed peers by default to 0
for (_, authority) in context.committee.authorities() {
context
.metrics
.node_metrics
.subscribed_peers
.with_label_values(&[authority.hostname.as_str()])
.set(0);
}

let subscription_counter = Arc::new(SubscriptionCounter::new(core_dispatcher.clone()));
let subscription_counter = Arc::new(SubscriptionCounter::new(
context.clone(),
core_dispatcher.clone(),
));
Self {
context,
block_verifier,
Expand Down Expand Up @@ -254,7 +247,6 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
);

let broadcasted_blocks = BroadcastedBlockStream::new(
self.context.clone(),
peer,
self.rx_block_broadcaster.resubscribe(),
self.subscription_counter.clone(),
Expand Down Expand Up @@ -418,36 +410,78 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
}
}

struct Counter {
count: usize,
subscriptions_by_authority: Vec<usize>,
}

/// Atomically counts the number of active subscriptions to the block broadcast stream,
/// and dispatch commands to core based on the changes.
struct SubscriptionCounter {
counter: parking_lot::Mutex<usize>,
context: Arc<Context>,
counter: parking_lot::Mutex<Counter>,
dispatcher: Arc<dyn CoreThreadDispatcher>,
}

impl SubscriptionCounter {
fn new(dispatcher: Arc<dyn CoreThreadDispatcher>) -> Self {
fn new(context: Arc<Context>, dispatcher: Arc<dyn CoreThreadDispatcher>) -> Self {
// Set the subscribed peers by default to 0
for (_, authority) in context.committee.authorities() {
context
.metrics
.node_metrics
.subscribed_peers
.with_label_values(&[authority.hostname.as_str()])
.set(0);
}

Self {
counter: parking_lot::Mutex::new(0),
counter: parking_lot::Mutex::new(Counter {
count: 0,
subscriptions_by_authority: vec![0; context.committee.size()],
}),
dispatcher,
context,
}
}

fn increment(&self) -> Result<(), ConsensusError> {
fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
let mut counter = self.counter.lock();
*counter += 1;
if *counter == 1 {
counter.count += 1;
counter.subscriptions_by_authority[peer] += 1;

let peer_hostname = &self.context.committee.authority(peer).hostname;
self.context
.metrics
.node_metrics
.subscribed_peers
.with_label_values(&[peer_hostname])
.set(1);

if counter.count == 1 {
self.dispatcher
.set_consumer_availability(true)
.map_err(|_| ConsensusError::Shutdown)?;
}
Ok(())
}

fn decrement(&self) -> Result<(), ConsensusError> {
fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
let mut counter = self.counter.lock();
*counter -= 1;
if *counter == 0 {
counter.count -= 1;
counter.subscriptions_by_authority[peer] -= 1;

if counter.subscriptions_by_authority[peer] == 0 {
let peer_hostname = &self.context.committee.authority(peer).hostname;
self.context
.metrics
.node_metrics
.subscribed_peers
.with_label_values(&[peer_hostname])
.set(0);
}

if counter.count == 0 {
self.dispatcher
.set_consumer_availability(false)
.map_err(|_| ConsensusError::Shutdown)?;
Expand All @@ -463,7 +497,6 @@ type BroadcastedBlockStream = BroadcastStream<VerifiedBlock>;
/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that
/// this tolerates lags with only logging, without yielding errors.
struct BroadcastStream<T> {
context: Arc<Context>,
peer: AuthorityIndex,
// Stores the receiver across poll_next() calls.
inner: ReusableBoxFuture<
Expand All @@ -479,22 +512,17 @@ struct BroadcastStream<T> {

impl<T: 'static + Clone + Send> BroadcastStream<T> {
pub fn new(
context: Arc<Context>,
peer: AuthorityIndex,
rx: broadcast::Receiver<T>,
subscription_counter: Arc<SubscriptionCounter>,
) -> Self {
let peer_hostname = &context.committee.authority(peer).hostname;
context
.metrics
.node_metrics
.subscribed_peers
.with_label_values(&[peer_hostname])
.set(1);
// Failure can only be due to core shutdown.
let _ = subscription_counter.increment();
if let Err(err) = subscription_counter.increment(peer) {
match err {
ConsensusError::Shutdown => {}
_ => panic!("Unexpected error: {err}"),
}
}
Self {
context,
peer,
inner: ReusableBoxFuture::new(make_recv_future(rx)),
subscription_counter,
Expand Down Expand Up @@ -535,15 +563,12 @@ impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {

impl<T> Drop for BroadcastStream<T> {
fn drop(&mut self) {
let peer_hostname = &self.context.committee.authority(self.peer).hostname;
self.context
.metrics
.node_metrics
.subscribed_peers
.with_label_values(&[peer_hostname])
.set(0);
// Failure can only be due to core shutdown.
let _ = self.subscription_counter.decrement();
if let Err(err) = self.subscription_counter.decrement(self.peer) {
match err {
ConsensusError::Shutdown => {}
_ => panic!("Unexpected error: {err}"),
}
}
}
}

Expand Down

0 comments on commit 71ac187

Please sign in to comment.