Skip to content

Commit

Permalink
Fast start for chains configured with an allow list (informalsystems#…
Browse files Browse the repository at this point in the history
…1705)

- Split the worker spawning code into two parts:
  - a) scan the chains for the clients, connections and channels
  - b) spawn the appropriate workers based on the resulting scan
- When scanning a chain with filtering enabled and an allow list, skip scanning all the clients and query the allowed channels directly, resulting in much fewer queries and a faster start.
- Add a `--full-scan` option to `hermes start` to opt out of the fast start mechanism and do a full scan.

---

* Introduce a `ChainScanner` to scan the chains for clients, connections and channels

* Use `ChainScanner` for spawning workers

* Formatting

* Add `--full-scan` option to `start` command to force a full scan even when some chains are using allow lists

* Remove debug statements and print scanned chains on startup

* Changelog entry

* Fix duplicate info message

* Quote identifiers in log messages

* Better error when port/channel does not exists

* Add metrics for queries

* Small log improvements

* Rename queries metric

* Use `chain` key for recording chain identifier in tracing logs

* Use more structured logging in chain scanner

* Fix changelog entry

* Improve logs when no workers were spawned

* Improve logs when spawning connection and channel workers

* Remove spaces in objects names

* Add changelog entry

* Revert part of logs changes

* Use INFO level for spawning logs

* Remove redundant changelog entry
  • Loading branch information
romac authored and michaelfig committed Feb 4, 2022
1 parent d5a3e19 commit a9ae161
Show file tree
Hide file tree
Showing 17 changed files with 1,119 additions and 438 deletions.
3 changes: 3 additions & 0 deletions .changelog/unreleased/improvements/1536-fast-start.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- Improve startup time of the relayer
- When scanning a chain with filtering enabled and an allow list, skip scanning all the clients and query the allowed channels directly. This results in much fewer queries and a faster start.
- Add a `--full-scan` option to `hermes start` to opt out of the fast start mechanism and do a full scan.
13 changes: 10 additions & 3 deletions modules/src/core/ics03_connection/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::prelude::*;

use core::str::FromStr;
use core::time::Duration;
use core::u64;
use core::{fmt, u64};

use serde::{Deserialize, Serialize};
use tendermint_proto::Protobuf;
Expand Down Expand Up @@ -325,15 +325,16 @@ pub enum State {

impl State {
/// Yields the State as a string.
pub fn as_string(&self) -> &'static str {
pub fn as_str(&self) -> &'static str {
match self {
Self::Uninitialized => "UNINITIALIZED",
Self::Init => "INIT",
Self::TryOpen => "TRYOPEN",
Self::Open => "OPEN",
}
}
// Parses the State out from a i32.

/// Parses the State out from a i32.
pub fn from_i32(s: i32) -> Result<Self, Error> {
match s {
0 => Ok(Self::Uninitialized),
Expand Down Expand Up @@ -363,6 +364,12 @@ impl State {
}
}

impl fmt::Display for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_str())
}
}

impl TryFrom<i32> for State {
type Error = Error;
fn try_from(value: i32) -> Result<Self, Self::Error> {
Expand Down
25 changes: 21 additions & 4 deletions relayer-cli/src/commands/start.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use alloc::sync::Arc;
use ibc_relayer::supervisor::SupervisorOptions;
use std::error::Error;
use std::io;
use std::sync::RwLock;
Expand All @@ -19,15 +20,22 @@ use crate::conclude::Output;
use crate::prelude::*;

#[derive(Clone, Command, Debug, Parser)]
pub struct StartCmd {}
pub struct StartCmd {
#[clap(
short = 'f',
long = "full-scan",
help = "Force a full scan of the chains for clients, connections and channels"
)]
full_scan: bool,
}

impl Runnable for StartCmd {
fn run(&self) {
let config = (*app_config()).clone();
let config = Arc::new(RwLock::new(config));

let supervisor_handle =
make_supervisor::<ProdChainHandle>(config.clone()).unwrap_or_else(|e| {
let supervisor_handle = make_supervisor::<ProdChainHandle>(config.clone(), self.full_scan)
.unwrap_or_else(|e| {
Output::error(format!("Hermes failed to start, last error: {}", e)).exit();
unreachable!()
});
Expand Down Expand Up @@ -176,11 +184,20 @@ fn spawn_telemetry_server(

fn make_supervisor<Chain: ChainHandle>(
config: Arc<RwLock<Config>>,
force_full_scan: bool,
) -> Result<SupervisorHandle, Box<dyn Error + Send + Sync>> {
let registry = SharedRegistry::<Chain>::new(config.clone());
spawn_telemetry_server(&config)?;

let rest = spawn_rest_server(&config);

Ok(spawn_supervisor(config, registry, rest, true)?)
Ok(spawn_supervisor(
config,
registry,
rest,
SupervisorOptions {
health_check: true,
force_full_scan,
},
)?)
}
56 changes: 48 additions & 8 deletions relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ impl CosmosSdkChain {
/// Specific to the SDK and used only for Tendermint client create
pub fn query_consensus_params(&self) -> Result<Params, Error> {
crate::time!("query_consensus_params");
crate::telemetry!(query, self.id(), "query_consensus_params");

Ok(self
.block_on(self.rpc_client().genesis())
Expand Down Expand Up @@ -329,7 +330,10 @@ impl CosmosSdkChain {
let estimated_gas = self.estimate_gas(simulate_tx)?;

if estimated_gas > self.max_gas() {
debug!(estimated = ?estimated_gas, max = ?self.max_gas(), "[{}] send_tx: estimated gas is higher than max gas", self.id());
debug!(
id = %self.id(), estimated = ?estimated_gas, max = ?self.max_gas(),
"send_tx: estimated gas is higher than max gas"
);

return Err(Error::tx_simulate_gas_estimate_exceeded(
self.id().clone(),
Expand All @@ -341,7 +345,8 @@ impl CosmosSdkChain {
let adjusted_fee = self.fee_with_gas(estimated_gas);

debug!(
"using {} gas, fee {}",
id = %self.id(),
"send_tx: using {} gas, fee {}",
estimated_gas,
PrettyFee(&adjusted_fee)
);
Expand Down Expand Up @@ -817,8 +822,8 @@ impl CosmosSdkChain {
.join(", ");

info!(
"[{}] waiting for commit of tx hashes(s) {}",
self.id(),
id = %self.id(),
"wait_for_block_commits: waiting for commit of tx hashes(s) {}",
hashes
);

Expand All @@ -831,8 +836,8 @@ impl CosmosSdkChain {
|index| {
if all_tx_results_found(&tx_sync_results) {
trace!(
"[{}] wait_for_block_commits: retrieved {} tx results after {} tries ({}ms)",
self.id(),
id = %self.id(),
"wait_for_block_commits: retrieved {} tx results after {} tries ({}ms)",
tx_sync_results.len(),
index,
start.elapsed().as_millis()
Expand Down Expand Up @@ -908,6 +913,9 @@ impl CosmosSdkChain {

/// Query the chain's latest height
pub fn query_latest_height(&self) -> Result<ICSHeight, Error> {
crate::time!("query_latest_height");
crate::telemetry!(query, self.id(), "query_latest_height");

let status = self.status()?;
Ok(ICSHeight {
revision_number: ChainId::chain_version(status.node_info.network.as_str()),
Expand Down Expand Up @@ -1190,6 +1198,7 @@ impl ChainEndpoint for CosmosSdkChain {

fn query_commitment_prefix(&self) -> Result<CommitmentPrefix, Error> {
crate::time!("query_commitment_prefix");
crate::telemetry!(query, self.id(), "query_commitment_prefix");

// TODO - do a real chain query
CommitmentPrefix::try_from(self.config().store_prefix.as_bytes().to_vec())
Expand All @@ -1199,6 +1208,8 @@ impl ChainEndpoint for CosmosSdkChain {
/// Query the chain status
fn query_status(&self) -> Result<StatusResponse, Error> {
crate::time!("query_status");
crate::telemetry!(query, self.id(), "query_status");

let status = self.status()?;

let time = status.sync_info.latest_block_time;
Expand All @@ -1218,6 +1229,7 @@ impl ChainEndpoint for CosmosSdkChain {
request: QueryClientStatesRequest,
) -> Result<Vec<IdentifiedAnyClientState>, Error> {
crate::time!("query_clients");
crate::telemetry!(query, self.id(), "query_clients");

let mut client = self
.block_on(
Expand Down Expand Up @@ -1252,6 +1264,7 @@ impl ChainEndpoint for CosmosSdkChain {
height: ICSHeight,
) -> Result<AnyClientState, Error> {
crate::time!("query_client_state");
crate::telemetry!(query, self.id(), "query_client_state");

let client_state = self
.query(ClientStatePath(client_id.clone()), height, false)
Expand All @@ -1265,6 +1278,7 @@ impl ChainEndpoint for CosmosSdkChain {
height: ICSHeight,
) -> Result<(AnyClientState, MerkleProof), Error> {
crate::time!("query_upgraded_client_state");
crate::telemetry!(query, self.id(), "query_upgraded_client_state");

// Query for the value and the proof.
let tm_height = Height::try_from(height.revision_height).map_err(Error::invalid_height)?;
Expand All @@ -1284,6 +1298,7 @@ impl ChainEndpoint for CosmosSdkChain {
height: ICSHeight,
) -> Result<(AnyConsensusState, MerkleProof), Error> {
crate::time!("query_upgraded_consensus_state");
crate::telemetry!(query, self.id(), "query_upgraded_consensus_state");

let tm_height = Height::try_from(height.revision_height).map_err(Error::invalid_height)?;

Expand All @@ -1305,6 +1320,7 @@ impl ChainEndpoint for CosmosSdkChain {
request: QueryConsensusStatesRequest,
) -> Result<Vec<AnyConsensusStateWithHeight>, Error> {
crate::time!("query_consensus_states");
crate::telemetry!(query, self.id(), "query_consensus_states");

let mut client = self
.block_on(
Expand Down Expand Up @@ -1337,6 +1353,7 @@ impl ChainEndpoint for CosmosSdkChain {
query_height: ICSHeight,
) -> Result<AnyConsensusState, Error> {
crate::time!("query_consensus_state");
crate::telemetry!(query, self.id(), "query_consensus_state");

let (consensus_state, _proof) =
self.proven_client_consensus(&client_id, consensus_height, query_height)?;
Expand All @@ -1348,7 +1365,8 @@ impl ChainEndpoint for CosmosSdkChain {
&self,
request: QueryClientConnectionsRequest,
) -> Result<Vec<ConnectionId>, Error> {
crate::time!("query_connections");
crate::time!("query_client_connections");
crate::telemetry!(query, self.id(), "query_client_connections");

let mut client = self
.block_on(
Expand Down Expand Up @@ -1383,6 +1401,7 @@ impl ChainEndpoint for CosmosSdkChain {
request: QueryConnectionsRequest,
) -> Result<Vec<IdentifiedConnectionEnd>, Error> {
crate::time!("query_connections");
crate::telemetry!(query, self.id(), "query_connections");

let mut client = self
.block_on(
Expand Down Expand Up @@ -1416,6 +1435,9 @@ impl ChainEndpoint for CosmosSdkChain {
connection_id: &ConnectionId,
height: ICSHeight,
) -> Result<ConnectionEnd, Error> {
crate::time!("query_connection");
crate::telemetry!(query, self.id(), "query_connection");

async fn do_query_connection(
chain: &CosmosSdkChain,
connection_id: &ConnectionId,
Expand Down Expand Up @@ -1473,6 +1495,7 @@ impl ChainEndpoint for CosmosSdkChain {
request: QueryConnectionChannelsRequest,
) -> Result<Vec<IdentifiedChannelEnd>, Error> {
crate::time!("query_connection_channels");
crate::telemetry!(query, self.id(), "query_connection_channels");

let mut client = self
.block_on(
Expand Down Expand Up @@ -1504,7 +1527,8 @@ impl ChainEndpoint for CosmosSdkChain {
&self,
request: QueryChannelsRequest,
) -> Result<Vec<IdentifiedChannelEnd>, Error> {
crate::time!("query_connections");
crate::time!("query_channels");
crate::telemetry!(query, self.id(), "query_channels");

let mut client = self
.block_on(
Expand Down Expand Up @@ -1535,6 +1559,9 @@ impl ChainEndpoint for CosmosSdkChain {
channel_id: &ChannelId,
height: ICSHeight,
) -> Result<ChannelEnd, Error> {
crate::time!("query_channel");
crate::telemetry!(query, self.id(), "query_channel");

let res = self.query(
ChannelEndsPath(port_id.clone(), channel_id.clone()),
height,
Expand All @@ -1550,6 +1577,7 @@ impl ChainEndpoint for CosmosSdkChain {
request: QueryChannelClientStateRequest,
) -> Result<Option<IdentifiedAnyClientState>, Error> {
crate::time!("query_channel_client_state");
crate::telemetry!(query, self.id(), "query_channel_client_state");

let mut client = self
.block_on(
Expand Down Expand Up @@ -1579,6 +1607,7 @@ impl ChainEndpoint for CosmosSdkChain {
request: QueryPacketCommitmentsRequest,
) -> Result<(Vec<PacketState>, ICSHeight), Error> {
crate::time!("query_packet_commitments");
crate::telemetry!(query, self.id(), "query_packet_commitments");

let mut client = self
.block_on(
Expand Down Expand Up @@ -1612,6 +1641,7 @@ impl ChainEndpoint for CosmosSdkChain {
request: QueryUnreceivedPacketsRequest,
) -> Result<Vec<u64>, Error> {
crate::time!("query_unreceived_packets");
crate::telemetry!(query, self.id(), "query_unreceived_packets");

let mut client = self
.block_on(
Expand All @@ -1638,6 +1668,7 @@ impl ChainEndpoint for CosmosSdkChain {
request: QueryPacketAcknowledgementsRequest,
) -> Result<(Vec<PacketState>, ICSHeight), Error> {
crate::time!("query_packet_acknowledgements");
crate::telemetry!(query, self.id(), "query_packet_acknowledgements");

let mut client = self
.block_on(
Expand Down Expand Up @@ -1670,6 +1701,7 @@ impl ChainEndpoint for CosmosSdkChain {
request: QueryUnreceivedAcksRequest,
) -> Result<Vec<u64>, Error> {
crate::time!("query_unreceived_acknowledgements");
crate::telemetry!(query, self.id(), "query_unreceived_acknowledgements");

let mut client = self
.block_on(
Expand All @@ -1695,6 +1727,7 @@ impl ChainEndpoint for CosmosSdkChain {
request: QueryNextSequenceReceiveRequest,
) -> Result<Sequence, Error> {
crate::time!("query_next_sequence_receive");
crate::telemetry!(query, self.id(), "query_next_sequence_receive");

let mut client = self
.block_on(
Expand Down Expand Up @@ -1727,6 +1760,7 @@ impl ChainEndpoint for CosmosSdkChain {
/// packets ever sent.
fn query_txs(&self, request: QueryTxRequest) -> Result<Vec<IbcEvent>, Error> {
crate::time!("query_txs");
crate::telemetry!(query, self.id(), "query_txs");

match request {
QueryTxRequest::Packet(request) => {
Expand Down Expand Up @@ -1828,6 +1862,7 @@ impl ChainEndpoint for CosmosSdkChain {
request: QueryBlockRequest,
) -> Result<(Vec<IbcEvent>, Vec<IbcEvent>), Error> {
crate::time!("query_blocks");
crate::telemetry!(query, self.id(), "query_blocks");

match request {
QueryBlockRequest::Packet(request) => {
Expand Down Expand Up @@ -2066,6 +2101,9 @@ impl ChainEndpoint for CosmosSdkChain {
}

fn query_app_version(&self, request: AppVersion) -> Result<ics04_channel::Version, Error> {
crate::time!("query_app_version");
crate::telemetry!(query, self.id(), "query_app_version");

use ibc_proto::ibc::core::port::v1::query_client::QueryClient;

let mut client = self
Expand Down Expand Up @@ -2304,6 +2342,8 @@ pub async fn broadcast_tx_sync(

/// Uses the GRPC client to retrieve the account sequence
async fn query_account(chain: &CosmosSdkChain, address: String) -> Result<BaseAccount, Error> {
crate::telemetry!(query, chain.id(), "query_account");

let mut client = ibc_proto::cosmos::auth::v1beta1::query_client::QueryClient::connect(
chain.grpc_addr.clone(),
)
Expand Down
2 changes: 1 addition & 1 deletion relayer/src/chain/counterparty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ fn connection_on_destination(
}

pub fn connection_state_on_destination(
connection: IdentifiedConnectionEnd,
connection: &IdentifiedConnectionEnd,
counterparty_chain: &impl ChainHandle,
) -> Result<ConnectionState, Error> {
if let Some(remote_connection_id) = connection.connection_end.counterparty().connection_id() {
Expand Down
Loading

0 comments on commit a9ae161

Please sign in to comment.