Skip to content

Commit

Permalink
[metrics] Rework on sync metrics and peer score.
Browse files Browse the repository at this point in the history
  • Loading branch information
jolestar committed Oct 15, 2021
1 parent e8d6ee7 commit 938d6f3
Show file tree
Hide file tree
Showing 19 changed files with 258 additions and 278 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion block-relayer/src/block_relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl BlockRelayer {
.get_peer(peer_id.clone())
.await?
.ok_or_else(|| format_err!("CompatBlockMessage's peer {} is not connected"))?;
let peer_selector = PeerSelector::new(vec![peer], PeerStrategy::default());
let peer_selector = PeerSelector::new(vec![peer], PeerStrategy::default(), None);
let rpc_client = VerifiedRpcClient::new(peer_selector, network);
let timer = BLOCK_RELAYER_METRICS.txns_filled_time.start_timer();
let block = BlockRelayer::fill_compact_block(
Expand Down
9 changes: 7 additions & 2 deletions config/src/metrics_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
};
use anyhow::Result;
use serde::{Deserialize, Serialize};
use starcoin_metrics::Registry;
use starcoin_metrics::{default_registry, Registry};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use structopt::StructOpt;
Expand Down Expand Up @@ -144,7 +144,12 @@ impl MetricsConfig {
impl ConfigModule for MetricsConfig {
fn merge_with_opt(&mut self, opt: &StarcoinOpt, base: Arc<BaseConfig>) -> Result<()> {
self.base = Some(base);
self.registry = Some(Registry::new_custom(Some("starcoin".to_string()), None)?);

//TODO change to new_custom registry after refactor all metrics.
let registry = default_registry().clone();
//let registry = Registry::new_custom(Some("starcoin".to_string()), None)?;

self.registry = Some(registry);

if opt.metrics.disable_metrics.is_some() {
self.disable_metrics = opt.metrics.disable_metrics;
Expand Down
1 change: 1 addition & 0 deletions network/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ bcs-ext = { package = "bcs-ext", path = "../../commons/bcs_ext" }
starcoin-service-registry = { path = "../../commons/service-registry" }
network-p2p-types = {path = "../../network-p2p/types"}
schemars = {git = "https://github.com/starcoinorg/schemars", rev="67e185dd17fa24ad349bf2ad0828068cdae77505"}
starcoin-metrics = {path = "../../commons/metrics"}
33 changes: 25 additions & 8 deletions network/api/src/peer_provider.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) The Starcoin Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::peer_score::ScoreCounter;
use crate::peer_score::{PeerScoreMetrics, ScoreCounter};
use crate::PeerId;
use crate::PeerInfo;
use anyhow::Result;
Expand Down Expand Up @@ -41,7 +41,6 @@ pub trait PeerProvider: Send + Sync + std::marker::Unpin {
fn ban_peer(&self, peer_id: PeerId, ban: bool);
}

#[derive(Clone)]
pub struct PeerDetail {
peer_info: PeerInfo,
score_counter: ScoreCounter,
Expand Down Expand Up @@ -134,6 +133,7 @@ pub struct PeerSelector {
details: Arc<Mutex<Vec<PeerDetail>>>,
total_score: Arc<AtomicU64>,
strategy: PeerStrategy,
peer_score_metrics: Option<PeerScoreMetrics>,
}

impl Debug for PeerSelector {
Expand All @@ -149,14 +149,19 @@ impl Debug for PeerSelector {
}

impl PeerSelector {
pub fn new(peers: Vec<PeerInfo>, strategy: PeerStrategy) -> Self {
Self::new_with_reputation(Vec::new(), peers, strategy)
pub fn new(
peers: Vec<PeerInfo>,
strategy: PeerStrategy,
peer_score_metrics: Option<PeerScoreMetrics>,
) -> Self {
Self::new_with_reputation(Vec::new(), peers, strategy, peer_score_metrics)
}

pub fn new_with_reputation(
reputations: Vec<(PeerId, u64)>,
peers: Vec<PeerInfo>,
strategy: PeerStrategy,
peer_score_metrics: Option<PeerScoreMetrics>,
) -> Self {
let reputations = reputations.into_iter().collect::<HashMap<PeerId, u64>>();
let mut total_score = 0u64;
Expand All @@ -176,6 +181,7 @@ impl PeerSelector {
details: Arc::new(Mutex::new(peer_details)),
total_score: Arc::new(AtomicU64::new(total_score)),
strategy,
peer_score_metrics,
}
}

Expand Down Expand Up @@ -210,13 +216,24 @@ impl PeerSelector {
.collect()
}

pub fn peer_score(&self, peer_id: &PeerId, score: i64) {
//TODO performance review
pub fn peer_score(&self, peer_id: &PeerId, score: u64) {
self.details
.lock()
.iter()
.filter(|peer| &peer.peer_id() == peer_id)
.for_each(|peer| peer.score_counter.inc_by(score));
self.total_score.fetch_add(score as u64, Ordering::SeqCst);
.find(|peer| &peer.peer_id() == peer_id)
.map(|peer| {
peer.score_counter.inc_by(score);
Some(peer)
});
let total_score = self.total_score.fetch_add(score, Ordering::SeqCst);
if let Some(peer_score_metrics) = self.peer_score_metrics.as_ref() {
peer_score_metrics
.peer_score
.with_label_values(&[format!("{}", peer_id).as_str()])
.set(score);
peer_score_metrics.total_score.set(total_score);
}
}

pub fn peer_exist(&self, peer_id: &PeerId) -> bool {
Expand Down
71 changes: 49 additions & 22 deletions network/api/src/peer_score.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
// Copyright (c) The Starcoin Core Contributors
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use starcoin_metrics::{register, Opts, Registry, UIntGauge, UIntGaugeVec};
use std::sync::atomic::{AtomicU64, Ordering};

#[derive(Clone)]
pub struct ScoreCounter {
score: Arc<AtomicU64>,
count: Arc<AtomicU64>,
score: AtomicU64,
count: AtomicU64,
}

impl ScoreCounter {
pub fn new(score: u64) -> Self {
Self {
score: Arc::new(AtomicU64::new(if score == 0 { 1 } else { score })),
count: Arc::new(AtomicU64::new(0)),
score: AtomicU64::new(if score == 0 { 1 } else { score }),
count: AtomicU64::new(0),
}
}

pub fn inc_by(&self, score: i64) {
self.score.fetch_add(score as u64, Ordering::SeqCst);
pub fn inc_by(&self, score: u64) {
self.score.fetch_add(score, Ordering::SeqCst);
self.count.fetch_add(1, Ordering::SeqCst);
}

Expand All @@ -40,29 +41,31 @@ impl Default for ScoreCounter {
}

pub trait Score<Entry>: Sync + Send {
fn execute(&self, entry: Entry) -> i64;
fn execute(&self, entry: Entry) -> u64;
}

#[derive(Clone)]
pub struct InverseScore {
k: i64,
k: u64,
}

impl InverseScore {
pub fn new(x: u32, y: u32) -> Self {
Self {
k: x.saturating_mul(y) as i64,
k: x.saturating_mul(y) as u64,
}
}
}

impl Score<u32> for InverseScore {
fn execute(&self, time: u32) -> i64 {
fn execute(&self, time: u32) -> u64 {
//if time is 0, treat as 1
self.k.checked_div(time as i64).unwrap_or(self.k)
self.k.checked_div(time as u64).unwrap_or(self.k)
}
}

//TODO rework on peer score.
//the future,and fail state do not used.
pub enum HandleState {
Future,
Succ,
Expand Down Expand Up @@ -90,20 +93,20 @@ impl LinearScore {
Self { base }
}

pub fn linear(&self) -> i64 {
self.base as i64
pub fn linear(&self) -> u64 {
self.base
}

pub fn percentage(&self, percent: usize) -> i64 {
pub fn percentage(&self, percent: usize) -> u64 {
self.base
.saturating_mul(percent as u64)
.checked_div(100)
.unwrap_or_default() as i64
.unwrap_or_default()
}
}

impl Score<BlockBroadcastEntry> for LinearScore {
fn execute(&self, entry: BlockBroadcastEntry) -> i64 {
fn execute(&self, entry: BlockBroadcastEntry) -> u64 {
match entry.state {
HandleState::Future => {
if entry.new {
Expand All @@ -119,7 +122,31 @@ impl Score<BlockBroadcastEntry> for LinearScore {
self.percentage(10)
}
}
HandleState::Fail => 0i64.saturating_sub(self.base as i64),
HandleState::Fail => 0u64,
}
}
}

#[derive(Clone)]
pub struct PeerScoreMetrics {
pub peer_score: UIntGaugeVec,
pub total_score: UIntGauge,
}

impl PeerScoreMetrics {
pub fn register(registry: &Registry) -> Result<PeerScoreMetrics> {
let peer_score = UIntGaugeVec::new(
Opts::new("peer_score", "peer sync score".to_string()).namespace("starcoin"),
&["peer"],
)?;
let total_score = UIntGauge::with_opts(
Opts::new("total_score", "total peer score".to_string()).namespace("starcoin"),
)?;
let peer_score = register(peer_score, registry)?;
let total_score = register(total_score, registry)?;
Ok(Self {
peer_score,
total_score,
})
}
}
7 changes: 5 additions & 2 deletions network/api/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) The Starcoin Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::peer_provider::{PeerSelector, PeerStrategy};
use crate::peer_score::{InverseScore, Score};
use starcoin_crypto::HashValue;
Expand Down Expand Up @@ -54,7 +57,7 @@ fn test_peer_selector() {
),
];

let peer_selector = PeerSelector::new(peers, PeerStrategy::default());
let peer_selector = PeerSelector::new(peers, PeerStrategy::default(), None);
let best_selector = peer_selector.bests(0.into()).unwrap();
assert_eq!(2, best_selector.len());

Expand All @@ -71,7 +74,7 @@ fn test_better_peer() {

let first_peer = peers.get(0).cloned().expect("first peer must exist.");

let peer_selector = PeerSelector::new(peers, PeerStrategy::default());
let peer_selector = PeerSelector::new(peers, PeerStrategy::default(), None);
let better_selector = peer_selector.betters(first_peer.total_difficulty(), 10);
if let Some(better_selector) = better_selector {
assert!(!better_selector.contains(&first_peer));
Expand Down
22 changes: 11 additions & 11 deletions network/src/broadcast_score_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl BroadcastScoreMetrics {
pub fn register() -> Result<Self, PrometheusError> {
let peer_broadcast_score = register_int_counter_vec!(
Opts::new("peer_broadcast_score", "peer broadcast score".to_string()).namespace(SC_NS),
&["broadcast_score"]
&["peer"]
)?;

let peer_broadcast_total_new_count = UIntCounterVec::new(
Expand All @@ -30,7 +30,7 @@ impl BroadcastScoreMetrics {
"total new count".to_string(),
)
.namespace(SC_NS),
&["total_new_count"],
&["peer"],
)?;

let peer_broadcast_total_old_count = UIntCounterVec::new(
Expand All @@ -39,7 +39,7 @@ impl BroadcastScoreMetrics {
"total old count".to_string(),
)
.namespace(SC_NS),
&["total_old_count"],
&["peer"],
)?;

default_registry().register(Box::new(peer_broadcast_total_new_count.clone()))?;
Expand All @@ -52,21 +52,21 @@ impl BroadcastScoreMetrics {
})
}

pub fn report_new(&self, peer: PeerId, score: i64) {
pub fn report_new(&self, peer: PeerId, score: u64) {
self.peer_broadcast_score
.with_label_values(&[&format!("peer-{:?}", peer)])
.inc_by(score as u64);
.with_label_values(&[&format!("{}", peer)])
.inc_by(score);
self.peer_broadcast_total_new_count
.with_label_values(&[&format!("peer-{:?}", peer)])
.with_label_values(&[&format!("{}", peer)])
.inc();
}

pub fn report_expire(&self, peer: PeerId, score: i64) {
pub fn report_expire(&self, peer: PeerId, score: u64) {
self.peer_broadcast_score
.with_label_values(&[&format!("peer-{:?}", peer)])
.inc_by(score as u64);
.with_label_values(&[&format!("{}", peer)])
.inc_by(score);
self.peer_broadcast_total_old_count
.with_label_values(&[&format!("peer-{:?}", peer)])
.with_label_values(&[&format!("{}", peer)])
.inc();
}
}
3 changes: 2 additions & 1 deletion node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ impl ServiceHandler<Self, NodeRequest> for NodeService {
info!("Peers is empty.");
None
} else {
let peer_selector = PeerSelector::new(peer_set, PeerStrategy::Best);
let peer_selector =
PeerSelector::new(peer_set, PeerStrategy::Best, None);
peer_selector.retain_rpc_peers();
let rpc_client = VerifiedRpcClient::new(peer_selector, network);
let mut blocks = rpc_client.get_blocks(vec![block_hash]).await?;
Expand Down
2 changes: 1 addition & 1 deletion sync/src/announcement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl EventHandler<Self, PeerAnnouncementMessage> for AnnouncementService {

if !fresh_ids.is_empty() {
let peer_selector =
PeerSelector::new(Vec::new(), PeerStrategy::default());
PeerSelector::new(Vec::new(), PeerStrategy::default(), None);
let rpc_client = VerifiedRpcClient::new(
peer_selector,
network.clone(),
Expand Down
Loading

0 comments on commit 938d6f3

Please sign in to comment.