Skip to content

Commit

Permalink
cumulus/minimal-node: added prometheus metrics for the RPC client (#5572
Browse files Browse the repository at this point in the history
)

# Description

When we start a node with connections to external RPC servers (as a
minimal node), we lack metrics around how many individual calls we're
doing to the remote RPC servers and their duration. This PR adds metrics
that measure durations of each RPC call made by the minimal nodes, and
implicitly how many calls there are.

Closes #5409 
Closes #5689

## Integration

Node operators should be able to track minimal node metrics and decide
appropriate actions according to how the metrics are interpreted/felt.
The added metrics can be observed by curl'ing the prometheus metrics
endpoint for the ~relaychain~ parachain (it was changed based on the
review). The metrics are represented by
~`polkadot_parachain_relay_chain_rpc_interface`~
`relay_chain_rpc_interface` namespace (I realized lining up
`parachain_relay_chain` in the same metric might be confusing :).
Excerpt from the curl:

```
relay_chain_rpc_interface_bucket{method="chain_getBlockHash",chain="rococo_local_testnet",le="0.001"} 15
relay_chain_rpc_interface_bucket{method="chain_getBlockHash",chain="rococo_local_testnet",le="0.004"} 23
relay_chain_rpc_interface_bucket{method="chain_getBlockHash",chain="rococo_local_testnet",le="0.016"} 23
relay_chain_rpc_interface_bucket{method="chain_getBlockHash",chain="rococo_local_testnet",le="0.064"} 23
relay_chain_rpc_interface_bucket{method="chain_getBlockHash",chain="rococo_local_testnet",le="0.256"} 24
relay_chain_rpc_interface_bucket{method="chain_getBlockHash",chain="rococo_local_testnet",le="1.024"} 24
relay_chain_rpc_interface_bucket{method="chain_getBlockHash",chain="rococo_local_testnet",le="4.096"} 24
relay_chain_rpc_interface_bucket{method="chain_getBlockHash",chain="rococo_local_testnet",le="16.384"} 24
relay_chain_rpc_interface_bucket{method="chain_getBlockHash",chain="rococo_local_testnet",le="65.536"} 24
relay_chain_rpc_interface_bucket{method="chain_getBlockHash",chain="rococo_local_testnet",le="+Inf"} 24
relay_chain_rpc_interface_sum{method="chain_getBlockHash",chain="rococo_local_testnet"} 0.11719075
relay_chain_rpc_interface_count{method="chain_getBlockHash",chain="rococo_local_testnet"} 24
```

## Review Notes

The way we measure durations/hits is based on `HistogramVec` struct
which allows us to collect timings for each RPC client method called
from the minimal node., It can be extended to measure the RPCs against
other dimensions too (status codes, response sizes, etc). The timing
measuring is done at the level of the `relay-chain-rpc-interface`, in
the `RelayChainRpcClient` struct's method 'request_tracing'. A single
entry point for all RPC requests done through the
relay-chain-rpc-interface. The requests durations will fall under
exponential buckets described by start `0.001`, factor `4` and count
`9`.

---------

Signed-off-by: Iulian Barbu <iulian.barbu@parity.io>
  • Loading branch information
iulianbarbu committed Sep 19, 2024
1 parent 0c9d8fe commit c8d5e5a
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 39 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 7 additions & 4 deletions cumulus/client/relay-chain-minimal-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,20 @@ async fn build_interface(
client: RelayChainRpcClient,
) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option<CollatorPair>)> {
let collator_pair = CollatorPair::generate().0;
let blockchain_rpc_client = Arc::new(BlockChainRpcClient::new(client.clone()));
let collator_node = match polkadot_config.network.network_backend {
sc_network::config::NetworkBackendType::Libp2p =>
new_minimal_relay_chain::<RelayBlock, sc_network::NetworkWorker<RelayBlock, RelayHash>>(
polkadot_config,
collator_pair.clone(),
Arc::new(BlockChainRpcClient::new(client.clone())),
blockchain_rpc_client,
)
.await?,
sc_network::config::NetworkBackendType::Litep2p =>
new_minimal_relay_chain::<RelayBlock, sc_network::Litep2pNetworkBackend>(
polkadot_config,
collator_pair.clone(),
Arc::new(BlockChainRpcClient::new(client.clone())),
blockchain_rpc_client,
)
.await?,
};
Expand All @@ -120,17 +121,19 @@ async fn build_interface(
}

pub async fn build_minimal_relay_chain_node_with_rpc(
polkadot_config: Configuration,
relay_chain_config: Configuration,
parachain_prometheus_registry: Option<&Registry>,
task_manager: &mut TaskManager,
relay_chain_url: Vec<Url>,
) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option<CollatorPair>)> {
let client = cumulus_relay_chain_rpc_interface::create_client_and_start_worker(
relay_chain_url,
task_manager,
parachain_prometheus_registry,
)
.await?;

build_interface(polkadot_config, task_manager, client).await
build_interface(relay_chain_config, task_manager, client).await
}

pub async fn build_minimal_relay_chain_node_light_client(
Expand Down
2 changes: 2 additions & 0 deletions cumulus/client/relay-chain-rpc-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ sp-version = { workspace = true, default-features = true }
sc-client-api = { workspace = true, default-features = true }
sc-rpc-api = { workspace = true, default-features = true }
sc-service = { workspace = true, default-features = true }
prometheus-endpoint = { workspace = true, default-features = true }

tokio = { features = ["sync"], workspace = true, default-features = true }
tokio-util = { features = ["compat"], workspace = true }
Expand All @@ -49,3 +50,4 @@ either = { workspace = true, default-features = true }
thiserror = { workspace = true }
rand = { workspace = true, default-features = true }
pin-project = { workspace = true }
prometheus = { workspace = true }
6 changes: 4 additions & 2 deletions cumulus/client/relay-chain-rpc-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use cumulus_primitives_core::relay_chain::BlockId;
pub use url::Url;

mod light_client_worker;
mod metrics;
mod reconnecting_ws_client;
mod rpc_client;
mod tokio_platform;
Expand Down Expand Up @@ -87,12 +88,13 @@ impl RelayChainInterface for RelayChainRpcInterface {
async fn header(&self, block_id: BlockId) -> RelayChainResult<Option<PHeader>> {
let hash = match block_id {
BlockId::Hash(hash) => hash,
BlockId::Number(num) =>
BlockId::Number(num) => {
if let Some(hash) = self.rpc_client.chain_get_block_hash(Some(num)).await? {
hash
} else {
return Ok(None)
},
}
},
};
let header = self.rpc_client.chain_get_header(Some(hash)).await?;

Expand Down
49 changes: 49 additions & 0 deletions cumulus/client/relay-chain-rpc-interface/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Cumulus.

// Cumulus is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Cumulus is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.

use prometheus::{Error as PrometheusError, HistogramTimer, Registry};
use prometheus_endpoint::{HistogramOpts, HistogramVec, Opts};

/// Gathers metrics about the blockchain RPC client.
#[derive(Clone)]
pub(crate) struct RelaychainRpcMetrics {
rpc_request: HistogramVec,
}

impl RelaychainRpcMetrics {
pub(crate) fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
rpc_request: prometheus_endpoint::register(
HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"relay_chain_rpc_interface",
"Tracks stats about cumulus relay chain RPC interface",
),
buckets: prometheus::exponential_buckets(0.001, 4.0, 9)
.expect("function parameters are constant and always valid; qed"),
},
&["method"],
)?,
registry,
)?,
})
}

pub(crate) fn start_request_timer(&self, method: &str) -> HistogramTimer {
self.rpc_request.with_label_values(&[method]).start_timer()
}
}
25 changes: 21 additions & 4 deletions cumulus/client/relay-chain-rpc-interface/src/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use jsonrpsee::{
core::{params::ArrayParams, ClientError as JsonRpseeError},
rpc_params,
};
use prometheus::Registry;
use serde::de::DeserializeOwned;
use serde_json::Value as JsonValue;
use std::collections::{btree_map::BTreeMap, VecDeque};
Expand Down Expand Up @@ -52,6 +53,7 @@ use sp_version::RuntimeVersion;

use crate::{
light_client_worker::{build_smoldot_client, LightClientRpcWorker},
metrics::RelaychainRpcMetrics,
reconnecting_ws_client::ReconnectingWebsocketWorker,
};
pub use url::Url;
Expand Down Expand Up @@ -87,14 +89,15 @@ pub enum RpcDispatcherMessage {
pub async fn create_client_and_start_worker(
urls: Vec<Url>,
task_manager: &mut TaskManager,
prometheus_registry: Option<&Registry>,
) -> RelayChainResult<RelayChainRpcClient> {
let (worker, sender) = ReconnectingWebsocketWorker::new(urls).await;

task_manager
.spawn_essential_handle()
.spawn("relay-chain-rpc-worker", None, worker.run());

let client = RelayChainRpcClient::new(sender);
let client = RelayChainRpcClient::new(sender, prometheus_registry);

Ok(client)
}
Expand All @@ -113,7 +116,8 @@ pub async fn create_client_and_start_light_client_worker(
.spawn_essential_handle()
.spawn("relay-light-client-worker", None, worker.run());

let client = RelayChainRpcClient::new(sender);
// We'll not setup prometheus exporter metrics for the light client worker.
let client = RelayChainRpcClient::new(sender, None);

Ok(client)
}
Expand All @@ -123,15 +127,25 @@ pub async fn create_client_and_start_light_client_worker(
pub struct RelayChainRpcClient {
/// Sender to send messages to the worker.
worker_channel: TokioSender<RpcDispatcherMessage>,
metrics: Option<RelaychainRpcMetrics>,
}

impl RelayChainRpcClient {
/// Initialize new RPC Client.
///
/// This client expects a channel connected to a worker that processes
/// requests sent via this channel.
pub(crate) fn new(worker_channel: TokioSender<RpcDispatcherMessage>) -> Self {
RelayChainRpcClient { worker_channel }
pub(crate) fn new(
worker_channel: TokioSender<RpcDispatcherMessage>,
prometheus_registry: Option<&Registry>,
) -> Self {
RelayChainRpcClient {
worker_channel,
metrics: prometheus_registry
.and_then(|inner| RelaychainRpcMetrics::register(inner).map_err(|err| {
tracing::warn!(target: LOG_TARGET, error = %err, "Unable to instantiate the RPC client metrics, continuing w/o metrics setup.");
}).ok()),
}
}

/// Call a call to `state_call` rpc method.
Expand All @@ -148,6 +162,7 @@ impl RelayChainRpcClient {
payload_bytes,
hash
};

let res = self
.request_tracing::<sp_core::Bytes, _>("state_call", params, |err| {
tracing::trace!(
Expand Down Expand Up @@ -190,6 +205,8 @@ impl RelayChainRpcClient {
R: DeserializeOwned + std::fmt::Debug,
OR: Fn(&RelayChainError),
{
let _timer = self.metrics.as_ref().map(|inner| inner.start_request_timer(method));

let (tx, rx) = futures::channel::oneshot::channel();

let message = RpcDispatcherMessage::Request(method.into(), params, tx);
Expand Down
1 change: 1 addition & 0 deletions cumulus/client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ pub async fn build_relay_chain_interface(
cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) =>
build_minimal_relay_chain_node_with_rpc(
relay_chain_config,
parachain_config.prometheus_registry(),
task_manager,
rpc_target_urls,
)
Expand Down
1 change: 1 addition & 0 deletions cumulus/test/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ clap = { features = ["derive"], workspace = true }
codec = { workspace = true, default-features = true }
criterion = { features = ["async_tokio"], workspace = true, default-features = true }
jsonrpsee = { features = ["server"], workspace = true }
prometheus = { workspace = true }
rand = { workspace = true, default-features = true }
serde = { features = ["derive"], workspace = true, default-features = true }
serde_json = { workspace = true, default-features = true }
Expand Down
21 changes: 12 additions & 9 deletions cumulus/test/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use cumulus_client_consensus_aura::{
ImportQueueParams,
};
use cumulus_client_consensus_proposer::Proposer;
use prometheus::Registry;
use runtime::AccountId;
use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
use sp_consensus_aura::sr25519::AuthorityPair;
Expand Down Expand Up @@ -264,11 +265,12 @@ pub fn new_partial(

async fn build_relay_chain_interface(
relay_chain_config: Configuration,
parachain_prometheus_registry: Option<&Registry>,
collator_key: Option<CollatorPair>,
collator_options: CollatorOptions,
task_manager: &mut TaskManager,
) -> RelayChainResult<Arc<dyn RelayChainInterface + 'static>> {
let relay_chain_full_node = match collator_options.relay_chain_mode {
let relay_chain_node = match collator_options.relay_chain_mode {
cumulus_client_cli::RelayChainMode::Embedded => polkadot_test_service::new_full(
relay_chain_config,
if let Some(ref key) = collator_key {
Expand All @@ -283,6 +285,7 @@ async fn build_relay_chain_interface(
cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) =>
return build_minimal_relay_chain_node_with_rpc(
relay_chain_config,
parachain_prometheus_registry,
task_manager,
rpc_target_urls,
)
Expand All @@ -294,13 +297,13 @@ async fn build_relay_chain_interface(
.map(|r| r.0),
};

task_manager.add_child(relay_chain_full_node.task_manager);
task_manager.add_child(relay_chain_node.task_manager);
tracing::info!("Using inprocess node.");
Ok(Arc::new(RelayChainInProcessInterface::new(
relay_chain_full_node.client.clone(),
relay_chain_full_node.backend.clone(),
relay_chain_full_node.sync_service.clone(),
relay_chain_full_node.overseer_handle.ok_or(RelayChainError::GenericError(
relay_chain_node.client.clone(),
relay_chain_node.backend.clone(),
relay_chain_node.sync_service.clone(),
relay_chain_node.overseer_handle.ok_or(RelayChainError::GenericError(
"Overseer should be running in full node.".to_string(),
))?,
)))
Expand Down Expand Up @@ -344,9 +347,9 @@ where
let backend = params.backend.clone();

let block_import = params.other;

let relay_chain_interface = build_relay_chain_interface(
relay_chain_config,
parachain_config.prometheus_registry(),
collator_key.clone(),
collator_options.clone(),
&mut task_manager,
Expand Down Expand Up @@ -494,7 +497,7 @@ where
slot_drift: Duration::from_secs(1),
};

let (collation_future, block_builer_future) =
let (collation_future, block_builder_future) =
slot_based::run::<Block, AuthorityPair, _, _, _, _, _, _, _, _>(params);
task_manager.spawn_essential_handle().spawn(
"collation-task",
Expand All @@ -504,7 +507,7 @@ where
task_manager.spawn_essential_handle().spawn(
"block-builder-task",
None,
block_builer_future,
block_builder_future,
);
} else {
tracing::info!(target: LOG_TARGET, "Starting block authoring with lookahead collator.");
Expand Down
Loading

0 comments on commit c8d5e5a

Please sign in to comment.