Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Better logs and metrics on PoV fetching. (#4593)
Browse files Browse the repository at this point in the history
  • Loading branch information
eskimor authored and drahnr committed Jan 4, 2022
1 parent 156b6bb commit 4435da4
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 16 deletions.
1 change: 1 addition & 0 deletions node/network/availability-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl AvailabilityDistributionSubsystem {
candidate_hash,
pov_hash,
tx,
metrics.clone(),
)
.await,
"pov_requester::fetch_pov",
Expand Down
26 changes: 20 additions & 6 deletions node/network/availability-distribution/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,12 @@ struct MetricsInner {
fetched_chunks: CounterVec<U64>,

/// Number of chunks served.
///
/// Note: Right now, `Succeeded` gets incremented whenever we were able to successfully respond
/// to a chunk request. This includes `NoSuchChunk` responses.
served_chunks: CounterVec<U64>,

/// Number of received fetch PoV responses.
fetched_povs: CounterVec<U64>,

/// Number of PoVs served.
///
/// Note: Right now, `Succeeded` gets incremented whenever we were able to successfully respond
/// to a PoV request. This includes `NoSuchPoV` responses.
served_povs: CounterVec<U64>,

/// Number of times our first set of validators did not provide the needed chunk and we had to
Expand Down Expand Up @@ -81,6 +78,13 @@ impl Metrics {
}
}

/// Increment counter on fetched PoVs.
pub fn on_fetched_pov(&self, label: &'static str) {
if let Some(metrics) = &self.0 {
metrics.fetched_povs.with_label_values(&[label]).inc()
}
}

/// Increment counter on served PoVs.
pub fn on_served_pov(&self, label: &'static str) {
if let Some(metrics) = &self.0 {
Expand Down Expand Up @@ -119,6 +123,16 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
fetched_povs: prometheus::register(
CounterVec::new(
Opts::new(
"polkadot_parachain_fetched_povs_total",
"Total number of povs fetches by this backer.",
),
&["success"]
)?,
registry,
)?,
served_povs: prometheus::register(
CounterVec::new(
Opts::new(
Expand Down
38 changes: 28 additions & 10 deletions node/network/availability-distribution/src/pov_requester/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use polkadot_node_network_protocol::request_response::{
};
use polkadot_node_primitives::PoV;
use polkadot_node_subsystem_util::runtime::RuntimeInfo;
use polkadot_primitives::v1::{CandidateHash, Hash, ValidatorIndex};
use polkadot_primitives::v1::{AuthorityDiscoveryId, CandidateHash, Hash, ValidatorIndex};
use polkadot_subsystem::{
jaeger,
messages::{IfDisconnected, NetworkBridgeMessage},
Expand All @@ -34,7 +34,8 @@ use polkadot_subsystem::{

use crate::{
error::{Fatal, NonFatal},
LOG_TARGET,
metrics::{FAILED, NOT_FOUND, SUCCEEDED},
Metrics, LOG_TARGET,
};

/// Start background worker for taking care of fetching the requested `PoV` from the network.
Expand All @@ -46,6 +47,7 @@ pub async fn fetch_pov<Context>(
candidate_hash: CandidateHash,
pov_hash: Hash,
tx: oneshot::Sender<PoV>,
metrics: Metrics,
) -> super::Result<()>
where
Context: SubsystemContext,
Expand All @@ -57,7 +59,7 @@ where
.ok_or(NonFatal::InvalidValidatorIndex)?
.clone();
let (req, pending_response) = OutgoingRequest::new(
Recipient::Authority(authority_id),
Recipient::Authority(authority_id.clone()),
PoVFetchingRequest { candidate_hash },
);
let full_req = Requests::PoVFetching(req);
Expand All @@ -71,20 +73,25 @@ where
let span = jaeger::Span::new(candidate_hash, "fetch-pov")
.with_validator_index(from_validator)
.with_relay_parent(parent);
ctx.spawn("pov-fetcher", fetch_pov_job(pov_hash, pending_response.boxed(), span, tx).boxed())
.map_err(|e| Fatal::SpawnTask(e))?;
ctx.spawn(
"pov-fetcher",
fetch_pov_job(pov_hash, authority_id, pending_response.boxed(), span, tx, metrics).boxed(),
)
.map_err(|e| Fatal::SpawnTask(e))?;
Ok(())
}

/// Future to be spawned for taking care of handling reception and sending of PoV.
async fn fetch_pov_job(
pov_hash: Hash,
authority_id: AuthorityDiscoveryId,
pending_response: BoxFuture<'static, Result<PoVFetchingResponse, RequestError>>,
span: jaeger::Span,
tx: oneshot::Sender<PoV>,
metrics: Metrics,
) {
if let Err(err) = do_fetch_pov(pov_hash, pending_response, span, tx).await {
tracing::warn!(target: LOG_TARGET, ?err, "fetch_pov_job");
if let Err(err) = do_fetch_pov(pov_hash, pending_response, span, tx, metrics).await {
tracing::warn!(target: LOG_TARGET, ?err, ?pov_hash, ?authority_id, "fetch_pov_job");
}
}

Expand All @@ -94,15 +101,25 @@ async fn do_fetch_pov(
pending_response: BoxFuture<'static, Result<PoVFetchingResponse, RequestError>>,
_span: jaeger::Span,
tx: oneshot::Sender<PoV>,
metrics: Metrics,
) -> std::result::Result<(), NonFatal> {
let response = pending_response.await.map_err(NonFatal::FetchPoV)?;
let response = pending_response.await.map_err(NonFatal::FetchPoV);
let pov = match response {
PoVFetchingResponse::PoV(pov) => pov,
PoVFetchingResponse::NoSuchPoV => return Err(NonFatal::NoSuchPoV),
Ok(PoVFetchingResponse::PoV(pov)) => pov,
Ok(PoVFetchingResponse::NoSuchPoV) => {
metrics.on_fetched_pov(NOT_FOUND);
return Err(NonFatal::NoSuchPoV)
},
Err(err) => {
metrics.on_fetched_pov(FAILED);
return Err(err)
},
};
if pov.hash() == pov_hash {
metrics.on_fetched_pov(SUCCEEDED);
tx.send(pov).map_err(|_| NonFatal::SendResponse)
} else {
metrics.on_fetched_pov(FAILED);
Err(NonFatal::UnexpectedPoV)
}
}
Expand Down Expand Up @@ -159,6 +176,7 @@ mod tests {
CandidateHash::default(),
pov_hash,
tx,
Metrics::new_dummy(),
)
.await
.expect("Should succeed");
Expand Down

0 comments on commit 4435da4

Please sign in to comment.