diff --git a/consensus/core/src/authority_service.rs b/consensus/core/src/authority_service.rs index 94fa311e01a6b..e08d69c0c2f2a 100644 --- a/consensus/core/src/authority_service.rs +++ b/consensus/core/src/authority_service.rs @@ -55,17 +55,10 @@ impl AuthorityService { dag_state: Arc>, store: Arc, ) -> Self { - // Set the subscribed peers by default to 0 - for (_, authority) in context.committee.authorities() { - context - .metrics - .node_metrics - .subscribed_peers - .with_label_values(&[authority.hostname.as_str()]) - .set(0); - } - - let subscription_counter = Arc::new(SubscriptionCounter::new(core_dispatcher.clone())); + let subscription_counter = Arc::new(SubscriptionCounter::new( + context.clone(), + core_dispatcher.clone(), + )); Self { context, block_verifier, @@ -254,7 +247,6 @@ impl NetworkService for AuthorityService { ); let broadcasted_blocks = BroadcastedBlockStream::new( - self.context.clone(), peer, self.rx_block_broadcaster.resubscribe(), self.subscription_counter.clone(), @@ -418,25 +410,55 @@ impl NetworkService for AuthorityService { } } +struct Counter { + count: usize, + subscriptions_by_authority: Vec, +} + /// Atomically counts the number of active subscriptions to the block broadcast stream, /// and dispatch commands to core based on the changes. struct SubscriptionCounter { - counter: parking_lot::Mutex, + context: Arc, + counter: parking_lot::Mutex, dispatcher: Arc, } impl SubscriptionCounter { - fn new(dispatcher: Arc) -> Self { + fn new(context: Arc, dispatcher: Arc) -> Self { + // Set the subscribed peers by default to 0 + for (_, authority) in context.committee.authorities() { + context + .metrics + .node_metrics + .subscribed_peers + .with_label_values(&[authority.hostname.as_str()]) + .set(0); + } + Self { - counter: parking_lot::Mutex::new(0), + counter: parking_lot::Mutex::new(Counter { + count: 0, + subscriptions_by_authority: vec![0; context.committee.size()], + }), dispatcher, + context, } } - fn increment(&self) -> Result<(), ConsensusError> { + fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> { let mut counter = self.counter.lock(); - *counter += 1; - if *counter == 1 { + counter.count += 1; + counter.subscriptions_by_authority[peer] += 1; + + let peer_hostname = &self.context.committee.authority(peer).hostname; + self.context + .metrics + .node_metrics + .subscribed_peers + .with_label_values(&[peer_hostname]) + .set(1); + + if counter.count == 1 { self.dispatcher .set_consumer_availability(true) .map_err(|_| ConsensusError::Shutdown)?; @@ -444,10 +466,22 @@ impl SubscriptionCounter { Ok(()) } - fn decrement(&self) -> Result<(), ConsensusError> { + fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> { let mut counter = self.counter.lock(); - *counter -= 1; - if *counter == 0 { + counter.count -= 1; + counter.subscriptions_by_authority[peer] -= 1; + + if counter.subscriptions_by_authority[peer] == 0 { + let peer_hostname = &self.context.committee.authority(peer).hostname; + self.context + .metrics + .node_metrics + .subscribed_peers + .with_label_values(&[peer_hostname]) + .set(0); + } + + if counter.count == 0 { self.dispatcher .set_consumer_availability(false) .map_err(|_| ConsensusError::Shutdown)?; @@ -463,7 +497,6 @@ type BroadcastedBlockStream = BroadcastStream; /// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that /// this tolerates lags with only logging, without yielding errors. struct BroadcastStream { - context: Arc, peer: AuthorityIndex, // Stores the receiver across poll_next() calls. inner: ReusableBoxFuture< @@ -479,22 +512,17 @@ struct BroadcastStream { impl BroadcastStream { pub fn new( - context: Arc, peer: AuthorityIndex, rx: broadcast::Receiver, subscription_counter: Arc, ) -> Self { - let peer_hostname = &context.committee.authority(peer).hostname; - context - .metrics - .node_metrics - .subscribed_peers - .with_label_values(&[peer_hostname]) - .set(1); - // Failure can only be due to core shutdown. - let _ = subscription_counter.increment(); + if let Err(err) = subscription_counter.increment(peer) { + match err { + ConsensusError::Shutdown => {} + _ => panic!("Unexpected error: {err}"), + } + } Self { - context, peer, inner: ReusableBoxFuture::new(make_recv_future(rx)), subscription_counter, @@ -535,15 +563,12 @@ impl Stream for BroadcastStream { impl Drop for BroadcastStream { fn drop(&mut self) { - let peer_hostname = &self.context.committee.authority(self.peer).hostname; - self.context - .metrics - .node_metrics - .subscribed_peers - .with_label_values(&[peer_hostname]) - .set(0); - // Failure can only be due to core shutdown. - let _ = self.subscription_counter.decrement(); + if let Err(err) = self.subscription_counter.decrement(self.peer) { + match err { + ConsensusError::Shutdown => {} + _ => panic!("Unexpected error: {err}"), + } + } } }