From 883ee7d0ad8558d4ac68f10ef8c873eb5fb4a524 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 3 Dec 2021 10:46:06 +0100 Subject: [PATCH 01/22] Introduce a `ChainScanner` to scan the chains for clients, connections and channels --- .../src/core/ics03_connection/connection.rs | 13 +- relayer/src/chain/counterparty.rs | 2 +- relayer/src/connection.rs | 2 +- relayer/src/supervisor.rs | 28 +- relayer/src/supervisor/error.rs | 5 + relayer/src/supervisor/scan.rs | 508 ++++++++++++++++++ relayer/src/supervisor/spawn.rs | 25 +- 7 files changed, 564 insertions(+), 19 deletions(-) create mode 100644 relayer/src/supervisor/scan.rs diff --git a/modules/src/core/ics03_connection/connection.rs b/modules/src/core/ics03_connection/connection.rs index 5c379fe3f0..b71f48d187 100644 --- a/modules/src/core/ics03_connection/connection.rs +++ b/modules/src/core/ics03_connection/connection.rs @@ -2,7 +2,7 @@ use crate::prelude::*; use core::str::FromStr; use core::time::Duration; -use core::u64; +use core::{fmt, u64}; use serde::{Deserialize, Serialize}; use tendermint_proto::Protobuf; @@ -323,7 +323,7 @@ pub enum State { impl State { /// Yields the State as a string. - pub fn as_string(&self) -> &'static str { + pub fn as_str(&self) -> &'static str { match self { Self::Uninitialized => "UNINITIALIZED", Self::Init => "INIT", @@ -331,7 +331,8 @@ impl State { Self::Open => "OPEN", } } - // Parses the State out from a i32. + + /// Parses the State out from a i32. pub fn from_i32(s: i32) -> Result { match s { 0 => Ok(Self::Uninitialized), @@ -361,6 +362,12 @@ impl State { } } +impl fmt::Display for State { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_str()) + } +} + impl TryFrom for State { type Error = Error; fn try_from(value: i32) -> Result { diff --git a/relayer/src/chain/counterparty.rs b/relayer/src/chain/counterparty.rs index 49a5efec5b..3a3de610aa 100644 --- a/relayer/src/chain/counterparty.rs +++ b/relayer/src/chain/counterparty.rs @@ -73,7 +73,7 @@ fn connection_on_destination( } pub fn connection_state_on_destination( - connection: IdentifiedConnectionEnd, + connection: &IdentifiedConnectionEnd, counterparty_chain: &impl ChainHandle, ) -> Result { if let Some(remote_connection_id) = connection.connection_end.counterparty().connection_id() { diff --git a/relayer/src/connection.rs b/relayer/src/connection.rs index 733f710a8c..b3a8c82c72 100644 --- a/relayer/src/connection.rs +++ b/relayer/src/connection.rs @@ -602,7 +602,7 @@ impl Connection { connection_id: connection_id.clone(), }; - connection_state_on_destination(connection, &self.dst_chain()) + connection_state_on_destination(&connection, &self.dst_chain()) .map_err(ConnectionError::supervisor) } diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index 05d6ab2aff..ad3868dc46 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -36,12 +36,14 @@ pub mod dump_state; use dump_state::SupervisorState; pub mod spawn; -use spawn::SpawnContext; +use spawn::{SpawnContext, SpawnMode}; + +pub mod scan; pub mod cmd; use cmd::{CmdEffect, ConfigUpdate, SupervisorCmd}; -use self::spawn::SpawnMode; +use self::scan::ChainsScan; type ArcBatch = Arc>; type Subscription = Receiver; @@ -332,8 +334,22 @@ impl Supervisor { /// Spawn all the workers necessary for the relayer to connect /// and relay between all the chains in the configurations. - fn spawn_workers(&mut self, mode: SpawnMode) { - self.spawn_context(mode).spawn_workers(); + fn spawn_workers(&mut self, scan: ChainsScan, mode: SpawnMode) { + self.spawn_context(mode).spawn_workers(scan); + } + + fn scan(&mut self) -> ChainsScan { + use self::scan::ChainScanner; + + let mut scanner = ChainScanner::new( + self.config.read().unwrap().clone(), + self.registry.clone(), + &mut self.client_state_filter, + ); + + let scan = scanner.scan_chains(); + println!("{}", scan); + scan } /// Perform a health check on all connected chains @@ -413,7 +429,9 @@ impl Supervisor { } pub fn run_without_health_check(&mut self) -> Result<(), Error> { - self.spawn_workers(SpawnMode::Startup); + let scan = self.scan(); + + self.spawn_workers(scan, SpawnMode::Startup); let mut subscriptions = self.init_subscriptions()?; diff --git a/relayer/src/supervisor/error.rs b/relayer/src/supervisor/error.rs index dc5dc7ca04..cc8a511bf4 100644 --- a/relayer/src/supervisor/error.rs +++ b/relayer/src/supervisor/error.rs @@ -5,6 +5,7 @@ use ibc::core::ics24_host::identifier::{ChainId, ChannelId, ConnectionId, PortId use crate::error::Error as RelayerError; use crate::registry::SpawnError; +use crate::supervisor::scan::Error as ScanError; use crate::worker::WorkerError; define_error! { @@ -66,6 +67,10 @@ define_error! { [ SpawnError ] |_| { "supervisor was not able to connect to any chains" }, + Scan + [ ScanError ] + |_| { "supervisor encountered an error when scanning chains" }, + Worker [ WorkerError ] |_| { "worker error" }, diff --git a/relayer/src/supervisor/scan.rs b/relayer/src/supervisor/scan.rs new file mode 100644 index 0000000000..0a18c8e3ae --- /dev/null +++ b/relayer/src/supervisor/scan.rs @@ -0,0 +1,508 @@ +#![allow(unused_imports)] + +use core::fmt; +use std::collections::BTreeMap; + +use itertools::Itertools; +use tracing::{debug, error, info_span, warn}; + +use ibc::{ + core::{ + ics02_client::client_state::{ClientState, IdentifiedAnyClientState}, + ics03_connection::connection::{IdentifiedConnectionEnd, State as ConnectionState}, + ics04_channel::channel::{IdentifiedChannelEnd, State as ChannelState}, + ics24_host::identifier::{ChainId, ChannelId, ClientId, ConnectionId}, + }, + Height, +}; + +use ibc_proto::ibc::core::{ + channel::v1::QueryConnectionChannelsRequest, client::v1::QueryClientStatesRequest, + connection::v1::QueryClientConnectionsRequest, +}; + +use crate::{ + chain::{ + counterparty::{channel_on_destination, connection_state_on_destination}, + handle::ChainHandle, + }, + config::{ChainConfig, Config, ModeConfig, PacketFilter}, + object::{Channel, Client, Connection, Object, Packet}, + registry::SharedRegistry, + supervisor::client_state_filter::{FilterPolicy, Permission}, + supervisor::error::Error as SupervisorError, + worker::WorkerMap, +}; + +use crate::chain::counterparty::{unreceived_acknowledgements, unreceived_packets}; + +use crate::error::Error as RelayerError; +use crate::registry::SpawnError; + +flex_error::define_error! { + Error { + Spawn + [ SpawnError ] + |_| { "spawn" }, + + Query + [ RelayerError ] + |_| { "query" }, + } +} + +#[derive(Debug)] +pub struct ChainsScan { + pub chains: Vec>, +} + +impl ChainsScan {} + +impl fmt::Display for ChainsScan { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + for scan in self.chains.iter().flatten() { + writeln!(f, "# Chain:{}", scan.chain_id)?; + + for client in &scan.clients { + writeln!(f, " - Client: {}", client.client.client_id)?; + + for conn in &client.connections { + let counterparty = conn.counterparty_state(); + writeln!(f, " * Connection: {}", conn.connection.connection_id)?; + writeln!(f, " | State: {}", conn.state())?; + writeln!(f, " | Counterparty state: {}", counterparty)?; + + for chan in &conn.channels { + let counterparty = chan + .counterparty + .as_ref() + .map(|c| c.channel_id.to_string()) + .unwrap_or_else(|| "".to_string()); + + writeln!(f, " + Channel: {}", chan.channel.channel_id)?; + writeln!(f, " | Port: {}", chan.channel.port_id)?; + writeln!(f, " | State: {}", chan.channel.channel_end.state())?; + writeln!(f, " | Counterparty state: {}", counterparty)?; + } + } + } + } + + Ok(()) + } +} + +#[derive(Debug)] +pub struct ChainScan { + pub chain_id: ChainId, + pub clients: Vec, +} + +#[derive(Debug)] +pub struct ClientScan { + pub client: IdentifiedAnyClientState, + pub connections: Vec, +} + +#[derive(Debug)] +pub struct ConnectionScan { + pub connection: IdentifiedConnectionEnd, + pub counterparty_state: ConnectionState, + pub channels: Vec, +} + +impl ConnectionScan { + pub fn state(&self) -> ConnectionState { + self.connection.connection_end.state + } + pub fn counterparty_state(&self) -> ConnectionState { + self.counterparty_state + } +} + +#[derive(Debug)] +pub struct ChannelScan { + pub channel: IdentifiedChannelEnd, + pub counterparty: Option, +} + +impl ChannelScan { + pub fn unreceived_packets_on_counterparty( + &self, + chain: &impl ChainHandle, + counterparty_chain: &impl ChainHandle, + ) -> Option> { + self.counterparty.as_ref().map(|counterparty| { + unreceived_packets(counterparty_chain, chain, counterparty).unwrap_or_default() + }) + } + + pub fn unreceived_acknowledgements_on_counterparty( + &self, + chain: &impl ChainHandle, + counterparty_chain: &impl ChainHandle, + ) -> Option> { + self.counterparty.as_ref().map(|counterparty| { + unreceived_acknowledgements(counterparty_chain, chain, counterparty).unwrap_or_default() + }) + } +} + +pub struct ChainScanner<'a, Chain: ChainHandle> { + config: Config, + registry: SharedRegistry, + client_state_filter: &'a mut FilterPolicy, +} + +impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { + pub fn new( + config: Config, + registry: SharedRegistry, + client_state_filter: &'a mut FilterPolicy, + ) -> Self { + Self { + config, + registry, + client_state_filter, + } + } + + pub fn scan_chains(&mut self) -> ChainsScan { + let mut scans = ChainsScan { + chains: Vec::with_capacity(self.config.chains.len()), + }; + + for chain in self.config.chains.clone() { + scans.chains.push(self.scan_chain(&chain)); + } + + scans + } + + pub fn scan_chain(&mut self, chain_config: &ChainConfig) -> Result { + let span = info_span!("scan.chain", chain_id = %chain_config.id, ); + let _guard = span.enter(); + + let chain = match self.registry.get_or_spawn(&chain_config.id) { + Ok(chain_handle) => chain_handle, + Err(e) => { + error!( + "aborting scan, reason: failed to spawn chain runtime with error: {}", + e + ); + + return Err(Error::spawn(e)); + } + }; + + let mut scan = ChainScan { + chain_id: chain_config.id.clone(), + clients: Vec::new(), + }; + + self.scan_all_clients(&chain, &mut scan)?; + + Ok(scan) + } + + pub fn scan_all_clients(&mut self, chain: &Chain, scan: &mut ChainScan) -> Result<(), Error> { + let clients = query_all_clients(chain)?; + + for client in clients { + if let Some(client_scan) = self.scan_client(chain, client)? { + scan.clients.push(client_scan); + } + } + + Ok(()) + } + + fn scan_client( + &mut self, + chain: &Chain, + client: IdentifiedAnyClientState, + ) -> Result, Error> { + let span = info_span!("scan.client", client_id = %client.client_id); + let _guard = span.enter(); + + if !self.client_allowed(chain, &client) { + warn!( + trust_threshold = ?client.client_state.trust_threshold(), + "skipping client, reason: client is not allowed", + ); + + return Ok(None); + } + + let counterparty_chain_id = client.client_state.chain_id(); + let has_counterparty = self.config.has_chain(&counterparty_chain_id); + + if !has_counterparty { + debug!( + chain_id = %chain.id(), + counterparty_chain_id = %counterparty_chain_id, + "skipping client because its counterparty is not present in the config", + ); + + return Ok(None); + } + + let client_connections_ids = query_client_connections(chain, &client.client_id)?; + + let mut scan = ClientScan { + client, + connections: Vec::new(), + }; + + for connection_id in client_connections_ids { + if let Some(connection_scan) = + self.scan_connection(chain, &scan.client, connection_id)? + { + scan.connections.push(connection_scan); + } + } + + Ok(Some(scan)) + } + + fn scan_connection( + &mut self, + chain: &Chain, + client: &IdentifiedAnyClientState, + connection_end: IdentifiedConnectionEnd, + ) -> Result, Error> { + let span = info_span!("scan.connection", connection = %connection_end.connection_id); + let _guard = span.enter(); + + if !self.connection_allowed(chain, client, &connection_end) { + warn!("skipping connection, reason: connection is not allowed",); + return Ok(None); + } + + let connection = &connection_end.connection_end; + let connection_id = &connection_end.connection_id; + + if !connection.is_open() { + debug!("connection is not open, skipping scan of channels over this connection"); + return Ok(None); + } + + let counterparty_state = match self.counterparty_connection_state(client, &connection_end) { + Err(e) => { + error!("error fetching counterparty connection state: {}", e); + return Ok(None); + } + Ok(state) if !state.eq(&ConnectionState::Open) => { + warn!("counterparty connection is not open, skipping scan of channels over this connection"); + return Ok(None); + } + Ok(state) => state, + }; + + let channels = match query_connection_channels(chain, connection_id) { + Ok(channels) => channels, + Err(e) => { + error!("failed to fetch connection channels: {}", e); + Vec::new() + } + }; + + let counterparty_chain = self + .registry + .get_or_spawn(&client.client_state.chain_id()) + .map_err(Error::spawn)?; + + let channels = channels + .into_iter() + .filter(|channel| self.channel_allowed(chain, channel)) + .map(|channel| { + let counterparty = + channel_on_destination(&channel, &connection_end, &counterparty_chain) + .unwrap_or_default(); + + ChannelScan { + channel, + counterparty, + } + }) + .collect(); + + Ok(Some(ConnectionScan { + connection: connection_end, + counterparty_state, + channels, + })) + } + + fn counterparty_connection_state( + &mut self, + client: &IdentifiedAnyClientState, + connection: &IdentifiedConnectionEnd, + ) -> Result { + let counterparty_chain = self + .registry + .get_or_spawn(&client.client_state.chain_id()) + .map_err(Error::spawn)?; + + // FIXME + Ok(connection_state_on_destination(connection, &counterparty_chain).unwrap()) + } + + fn client_filter_enabled(&self) -> bool { + self.config.mode.packets.filter + } + + fn client_allowed(&mut self, chain: &Chain, client: &IdentifiedAnyClientState) -> bool { + if !self.client_filter_enabled() { + return true; + }; + + let permission = self.client_state_filter.control_client( + &chain.id(), + &client.client_id, + &client.client_state, + ); + + permission == Permission::Allow + } + + fn connection_allowed( + &mut self, + chain: &Chain, + client: &IdentifiedAnyClientState, + connection: &IdentifiedConnectionEnd, + ) -> bool { + if !self.client_filter_enabled() { + return true; + } + + let permission = self.client_state_filter.control_connection_end_and_client( + &mut self.registry.write(), + &chain.id(), + &client.client_state, + &connection.connection_end, + &connection.connection_id, + ); + + match permission { + Ok(Permission::Deny) => { + warn!( + "skipping workers for chain {}, client {} & conn {}. \ + reason: client or counterparty client is not allowed", + chain.id(), + client.client_id, + connection.connection_id + ); + + false + } + Err(e) => { + error!( + "skipping workers for chain {}, client {} & conn {}. reason: {}", + chain.id(), + client.client_id, + connection.connection_id, + e + ); + + false + } + _ => true, + } + } + + fn channel_allowed(&mut self, chain: &Chain, channel: &IdentifiedChannelEnd) -> bool { + self.config + .packets_on_channel_allowed(&chain.id(), &channel.port_id, &channel.channel_id) + } +} + +// fn query_clients_for_channels( +// _chain: &C, +// _channels: Vec, +// ) -> Result, Error> { +// todo!() +// } + +// fn query_client_for_channel( +// _chain: &C, +// _channel: ChannelId, +// ) -> Result { +// todo!() +// } + +// fn query_channel( +// _chain: &C, +// _channel_id: ChannelId, +// ) -> Result, Error> { +// todo!() +// } + +// fn query_connection_for_channel( +// _chain: &C, +// _channel: IdentifiedChannelEnd, +// ) -> Result { +// todo!() +// } + +fn query_all_clients(chain: &C) -> Result, Error> { + let clients_req = QueryClientStatesRequest { + pagination: ibc_proto::cosmos::base::query::pagination::all(), + }; + + chain.query_clients(clients_req).map_err(Error::query) +} + +fn query_client_connections( + chain: &C, + client_id: &ClientId, +) -> Result, Error> { + let conns_req = QueryClientConnectionsRequest { + client_id: client_id.to_string(), + }; + + let ids = chain + .query_client_connections(conns_req) + .map_err(Error::query)?; + + let connections = ids + .into_iter() + .filter_map(|id| match query_connection(chain, id) { + Ok(connection) => Some(connection), + Err(e) => { + error!("failed to query connection: {}", e); + None + } + }) + .collect_vec(); + + Ok(connections) +} + +fn query_connection( + chain: &C, + connection_id: ConnectionId, +) -> Result { + let connection_end = chain + .query_connection(&connection_id, Height::zero()) + .map_err(Error::query)?; + + Ok(IdentifiedConnectionEnd { + connection_id, + connection_end, + }) +} + +fn query_connection_channels( + chain: &C, + connection_id: &ConnectionId, +) -> Result, Error> { + let chans_req = QueryConnectionChannelsRequest { + connection: connection_id.to_string(), + pagination: ibc_proto::cosmos::base::query::pagination::all(), + }; + + chain + .query_connection_channels(chans_req) + .map_err(Error::query) +} diff --git a/relayer/src/supervisor/spawn.rs b/relayer/src/supervisor/spawn.rs index 7936e3d5f8..10d244e5c8 100644 --- a/relayer/src/supervisor/spawn.rs +++ b/relayer/src/supervisor/spawn.rs @@ -29,7 +29,7 @@ use crate::{ worker::WorkerMap, }; -use super::{Error, RwArc}; +use super::{scan::ChainsScan, Error, RwArc}; use crate::chain::counterparty::{unreceived_acknowledgements, unreceived_packets}; #[derive(Copy, Clone, Debug, PartialEq, Eq)] @@ -74,7 +74,7 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { .filter } - pub fn spawn_workers(&mut self) { + pub fn spawn_workers(&mut self, scan: ChainsScan) { let chain_ids = self .config .read() @@ -159,6 +159,8 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { } }; + println!("W: Chain: {}", chain.id()); + for client in clients { self.spawn_workers_for_client(chain.clone(), client); } @@ -321,7 +323,7 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { return; } - match self.counterparty_connection_state(client.clone(), connection.clone()) { + match self.counterparty_connection_state(client, &connection) { Err(e) => { debug!("error with counterparty: reason {}", e); return; @@ -381,8 +383,8 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { fn counterparty_connection_state( &mut self, - client: IdentifiedAnyClientState, - connection: IdentifiedConnectionEnd, + client: &IdentifiedAnyClientState, + connection: &IdentifiedConnectionEnd, ) -> Result { let counterparty_chain = self .registry @@ -412,8 +414,7 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { .map_err(Error::spawn)?; let conn_state_src = connection.connection_end.state; - let conn_state_dst = - connection_state_on_destination(connection.clone(), &counterparty_chain)?; + let conn_state_dst = connection_state_on_destination(&connection, &counterparty_chain)?; debug!( "connection {} on chain {} is: {:?}, state on dest. chain ({}) is: {:?}", @@ -424,6 +425,8 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { conn_state_dst ); + println!("W: Connection: {}", connection.connection_id); + if conn_state_src.is_open() && conn_state_dst.is_open() { debug!( "connection {} on chain {} is already open, not spawning Connection worker", @@ -505,6 +508,8 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { src_chain_id: client.client_state.chain_id(), }); + println!("W: Client: {}", client.client_id); + self.workers .spawn( counterparty_chain.clone(), @@ -533,15 +538,17 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { }; // If there are any outstanding packets or acks to send, spawn the worker - if has_packets() || has_acks() { + if true || has_packets() || has_acks() { // Create the Packet object and spawn worker let path_object = Object::Packet(Packet { dst_chain_id: counterparty_chain.id(), src_chain_id: chain.id(), - src_channel_id: channel.channel_id, + src_channel_id: channel.channel_id.clone(), src_port_id: channel.port_id, }); + println!("W: Channel: {}", channel.channel_id); + self.workers .spawn( chain.clone(), From 70f5e1f668827d4f001912ce24f238eb1793dd16 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 13 Dec 2021 21:14:50 +0100 Subject: [PATCH 02/22] Use `ChainScanner` for spawning workers --- relayer/src/config.rs | 25 +- relayer/src/supervisor.rs | 84 ++++--- relayer/src/supervisor/scan.rs | 422 ++++++++++++++++++++++++-------- relayer/src/supervisor/spawn.rs | 390 ++++------------------------- 4 files changed, 447 insertions(+), 474 deletions(-) diff --git a/relayer/src/config.rs b/relayer/src/config.rs index b3fe144650..ec55838ded 100644 --- a/relayer/src/config.rs +++ b/relayer/src/config.rs @@ -5,9 +5,10 @@ mod proof_specs; pub mod reload; pub mod types; -use alloc::collections::BTreeMap as HashMap; -use alloc::collections::BTreeSet as HashSet; +use alloc::collections::BTreeMap; +use alloc::collections::BTreeSet; use core::{fmt, time::Duration}; +use itertools::Itertools; use std::sync::{Arc, RwLock}; use std::{fs, fs::File, io::Write, path::Path}; @@ -75,12 +76,28 @@ impl PacketFilter { #[derive(Clone, Debug, Default, Serialize, Deserialize)] #[serde(deny_unknown_fields)] -pub struct ChannelsSpec(HashSet<(PortId, ChannelId)>); +pub struct ChannelsSpec(BTreeSet<(PortId, ChannelId)>); impl ChannelsSpec { pub fn contains(&self, channel_port: &(PortId, ChannelId)) -> bool { self.0.contains(channel_port) } + + pub fn iter(&self) -> impl Iterator { + self.0.iter() + } +} + +impl fmt::Display for ChannelsSpec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{}", + self.iter() + .map(|(pid, cid)| format!("{}/{}", pid, cid)) + .join(", ") + ) + } } /// Defaults for various fields @@ -165,7 +182,7 @@ impl Config { } } - pub fn chains_map(&self) -> HashMap<&ChainId, &ChainConfig> { + pub fn chains_map(&self) -> BTreeMap<&ChainId, &ChainConfig> { self.chains.iter().map(|c| (&c.id, c)).collect() } } diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index ad3868dc46..7786f8f447 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -36,14 +36,14 @@ pub mod dump_state; use dump_state::SupervisorState; pub mod spawn; -use spawn::{SpawnContext, SpawnMode}; +use spawn::SpawnContext; pub mod scan; pub mod cmd; use cmd::{CmdEffect, ConfigUpdate, SupervisorCmd}; -use self::scan::ChainsScan; +use self::scan::{ChainScanner, ChainsScan}; type ArcBatch = Arc>; type Subscription = Receiver; @@ -322,34 +322,31 @@ impl Supervisor { } /// Create a new `SpawnContext` for spawning workers. - fn spawn_context(&mut self, mode: SpawnMode) -> SpawnContext<'_, Chain> { + fn spawn_context(&mut self) -> SpawnContext<'_, Chain> { SpawnContext::new( self.config.clone(), self.registry.clone(), - &mut self.client_state_filter, &mut self.workers, - mode, ) } /// Spawn all the workers necessary for the relayer to connect /// and relay between all the chains in the configurations. - fn spawn_workers(&mut self, scan: ChainsScan, mode: SpawnMode) { - self.spawn_context(mode).spawn_workers(scan); + fn spawn_workers(&mut self, scan: ChainsScan) { + self.spawn_context().spawn_workers(scan); } + /// Scan the chains for clients, connections and channels to relay on. fn scan(&mut self) -> ChainsScan { - use self::scan::ChainScanner; + let config = self.config.read().expect("poisoned lock"); - let mut scanner = ChainScanner::new( - self.config.read().unwrap().clone(), + let scanner = ChainScanner::new( + &config, self.registry.clone(), &mut self.client_state_filter, ); - let scan = scanner.scan_chains(); - println!("{}", scan); - scan + scanner.scan_chains() } /// Perform a health check on all connected chains @@ -430,8 +427,10 @@ impl Supervisor { pub fn run_without_health_check(&mut self) -> Result<(), Error> { let scan = self.scan(); + info!("Scan: {}", scan); + trace!("Scan: {}", scan); - self.spawn_workers(scan, SpawnMode::Startup); + self.spawn_workers(scan); let mut subscriptions = self.init_subscriptions()?; @@ -515,43 +514,62 @@ impl Supervisor { /// /// If the addition had any effect, returns [`CmdEffect::ConfigChanged`] as /// subscriptions need to be reset to take into account the newly added chain. - fn add_chain(&mut self, config: ChainConfig) -> CmdEffect { - let id = config.id.clone(); + fn add_chain(&mut self, chain_config: ChainConfig) -> CmdEffect { + let id = chain_config.id.clone(); + + let mut config = self.config.write().expect("poisoned lock"); - if self.config.read().expect("poisoned lock").has_chain(&id) { + if config.has_chain(&id) { info!(chain.id=%id, "skipping addition of already existing chain"); return CmdEffect::Nothing; } info!(chain.id=%id, "adding new chain"); - - self.config - .write() - .expect("poisoned lock") - .chains - .push(config); + config.chains.push(chain_config.clone()); debug!(chain.id=%id, "spawning chain runtime"); if let Err(e) = self.registry.spawn(&id) { error!( - "failed to add chain {} because of failure to spawn the chain runtime: {}", - id, e + chain.id=%id, + "failed to add chain because of failure to spawn the chain runtime: {}", e ); // Remove the newly added config - self.config - .write() - .expect("poisoned lock") - .chains - .retain(|c| c.id != id); + config.chains.retain(|c| c.id != id); return CmdEffect::Nothing; } + debug!(chain.id=%id, "scanning chain"); + + let scan = { + let mut scanner = ChainScanner::new( + &config, + self.registry.clone(), + &mut self.client_state_filter, + ); + + match scanner.scan_chain(&chain_config) { + Ok(scan) => scan, + Err(e) => { + error!(chain.id=%id, "failed to scan chain, not adding chain to config, reason: {}", e); + + // Remove the newly added config + config.chains.retain(|c| c.id != id); + + return CmdEffect::Nothing; + } + } + }; + + // Release the write lock + drop(config); + debug!(chain.id=%id, "spawning workers"); - let mut ctx = self.spawn_context(SpawnMode::Reload); - ctx.spawn_workers_for_chain(&id); + + let mut ctx = self.spawn_context(); + ctx.spawn_workers_for_chain(scan); CmdEffect::ConfigChanged } @@ -576,7 +594,7 @@ impl Supervisor { .retain(|c| &c.id != id); debug!(chain.id=%id, "shutting down workers"); - let mut ctx = self.spawn_context(SpawnMode::Reload); + let mut ctx = self.spawn_context(); ctx.shutdown_workers_for_chain(id); debug!(chain.id=%id, "shutting down chain runtime"); diff --git a/relayer/src/supervisor/scan.rs b/relayer/src/supervisor/scan.rs index 0a18c8e3ae..d278a1d061 100644 --- a/relayer/src/supervisor/scan.rs +++ b/relayer/src/supervisor/scan.rs @@ -4,14 +4,14 @@ use core::fmt; use std::collections::BTreeMap; use itertools::Itertools; -use tracing::{debug, error, info_span, warn}; +use tracing::{debug, error, info, info_span, warn}; use ibc::{ core::{ ics02_client::client_state::{ClientState, IdentifiedAnyClientState}, ics03_connection::connection::{IdentifiedConnectionEnd, State as ConnectionState}, ics04_channel::channel::{IdentifiedChannelEnd, State as ChannelState}, - ics24_host::identifier::{ChainId, ChannelId, ClientId, ConnectionId}, + ics24_host::identifier::{ChainId, ChannelId, ClientId, ConnectionId, PortId}, }, Height, }; @@ -26,7 +26,7 @@ use crate::{ counterparty::{channel_on_destination, connection_state_on_destination}, handle::ChainHandle, }, - config::{ChainConfig, Config, ModeConfig, PacketFilter}, + config::{ChainConfig, ChannelsSpec, Config, ModeConfig, PacketFilter}, object::{Channel, Client, Connection, Object, Packet}, registry::SharedRegistry, supervisor::client_state_filter::{FilterPolicy, Permission}, @@ -48,6 +48,19 @@ flex_error::define_error! { Query [ RelayerError ] |_| { "query" }, + + MissingConnectionHop + { + port_id: PortId, + channel_id: ChannelId, + chain_id: ChainId, + } + |e| { + format_args!( + "could not retrieve the connection hop underlying port/channel {}/{} on chain '{}'", + e.port_id, e.channel_id, e.chain_id + ) + } } } @@ -56,23 +69,26 @@ pub struct ChainsScan { pub chains: Vec>, } -impl ChainsScan {} - impl fmt::Display for ChainsScan { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { for scan in self.chains.iter().flatten() { writeln!(f, "# Chain:{}", scan.chain_id)?; - for client in &scan.clients { + for client in scan.clients.values() { writeln!(f, " - Client: {}", client.client.client_id)?; - for conn in &client.connections { - let counterparty = conn.counterparty_state(); + for conn in client.connections.values() { + let counterparty = conn + .counterparty_state + .as_ref() + .map(|s| s.to_string()) + .unwrap_or_else(|| "".to_string()); + writeln!(f, " * Connection: {}", conn.connection.connection_id)?; writeln!(f, " | State: {}", conn.state())?; writeln!(f, " | Counterparty state: {}", counterparty)?; - for chan in &conn.channels { + for chan in conn.channels.values() { let counterparty = chan .counterparty .as_ref() @@ -82,7 +98,7 @@ impl fmt::Display for ChainsScan { writeln!(f, " + Channel: {}", chan.channel.channel_id)?; writeln!(f, " | Port: {}", chan.channel.port_id)?; writeln!(f, " | State: {}", chan.channel.channel_end.state())?; - writeln!(f, " | Counterparty state: {}", counterparty)?; + writeln!(f, " | Counterparty: {}", counterparty)?; } } } @@ -92,41 +108,94 @@ impl fmt::Display for ChainsScan { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct ChainScan { pub chain_id: ChainId, - pub clients: Vec, + pub clients: BTreeMap, } -#[derive(Debug)] +impl ChainScan { + fn new(chain_id: ChainId) -> ChainScan { + Self { + chain_id, + clients: BTreeMap::new(), + } + } +} + +#[derive(Clone, Debug)] pub struct ClientScan { pub client: IdentifiedAnyClientState, - pub connections: Vec, + pub connections: BTreeMap, } -#[derive(Debug)] +impl ClientScan { + fn new(client: IdentifiedAnyClientState) -> ClientScan { + Self { + client, + connections: BTreeMap::new(), + } + } + + pub fn id(&self) -> &ClientId { + &self.client.client_id + } + + pub fn counterparty_chain_id(&self) -> ChainId { + self.client.client_state.chain_id() + } +} + +#[derive(Clone, Debug)] pub struct ConnectionScan { pub connection: IdentifiedConnectionEnd, - pub counterparty_state: ConnectionState, - pub channels: Vec, + pub counterparty_state: Option, + pub channels: BTreeMap, } impl ConnectionScan { + pub fn new( + connection: IdentifiedConnectionEnd, + counterparty_state: Option, + ) -> Self { + Self { + connection, + counterparty_state, + channels: BTreeMap::new(), + } + } + + pub fn id(&self) -> &ConnectionId { + &self.connection.connection_id + } + pub fn state(&self) -> ConnectionState { self.connection.connection_end.state } - pub fn counterparty_state(&self) -> ConnectionState { - self.counterparty_state + + pub fn is_open(&self) -> bool { + self.connection.connection_end.is_open() } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct ChannelScan { pub channel: IdentifiedChannelEnd, pub counterparty: Option, } impl ChannelScan { + pub fn new(channel: IdentifiedChannelEnd, counterparty: Option) -> Self { + Self { + channel, + counterparty, + } + } + + pub fn id(&self) -> &ChannelId { + &self.channel.channel_id + } + pub fn unreceived_packets_on_counterparty( &self, chain: &impl ChainHandle, @@ -149,14 +218,14 @@ impl ChannelScan { } pub struct ChainScanner<'a, Chain: ChainHandle> { - config: Config, + config: &'a Config, registry: SharedRegistry, client_state_filter: &'a mut FilterPolicy, } impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { pub fn new( - config: Config, + config: &'a Config, registry: SharedRegistry, client_state_filter: &'a mut FilterPolicy, ) -> Self { @@ -167,7 +236,7 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { } } - pub fn scan_chains(&mut self) -> ChainsScan { + pub fn scan_chains(mut self) -> ChainsScan { let mut scans = ChainsScan { chains: Vec::with_capacity(self.config.chains.len()), }; @@ -180,9 +249,11 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { } pub fn scan_chain(&mut self, chain_config: &ChainConfig) -> Result { - let span = info_span!("scan.chain", chain_id = %chain_config.id, ); + let span = info_span!("scan.chain", chain = %chain_config.id, ); let _guard = span.enter(); + info!("scanning chain..."); + let chain = match self.registry.get_or_spawn(&chain_config.id) { Ok(chain_handle) => chain_handle, Err(e) => { @@ -195,22 +266,73 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { } }; - let mut scan = ChainScan { - chain_id: chain_config.id.clone(), - clients: Vec::new(), - }; + let mut scan = ChainScan::new(chain_config.id.clone()); - self.scan_all_clients(&chain, &mut scan)?; + if let Some(spec) = self.use_allow_list(chain_config) { + info!("chain uses an allow list, skipping scan for fast startup"); + info!("allowed ports/channels: {}", spec); + + self.query_allowed_channels(&chain, spec, &mut scan)?; + } else { + info!("scanning chain for all clients, connections and channels"); + + self.scan_all_clients(&chain, &mut scan)?; + } Ok(scan) } + pub fn query_allowed_channels( + &mut self, + chain: &Chain, + spec: &ChannelsSpec, + scan: &mut ChainScan, + ) -> Result<(), Error> { + info!("querying allowed channels..."); + + for (port_id, channel_id) in spec.iter() { + let result = scan_allowed_channel(&mut self.registry, chain, port_id, channel_id); + + match result { + Ok(ScannedChannel { + channel, + counterparty_channel, + connection, + counterparty_connection_state, + client, + }) => { + let client_scan = scan + .clients + .entry(client.client_id.clone()) + .or_insert_with(|| ClientScan::new(client)); + + let connection_scan = client_scan + .connections + .entry(connection.connection_id.clone()) + .or_insert_with(|| { + ConnectionScan::new(connection, counterparty_connection_state) + }); + + connection_scan + .channels + .entry(channel.channel_id.clone()) + .or_insert_with(|| ChannelScan::new(channel, counterparty_channel)); + } + Err(e) => error!("failed to scan channel {}, reason: {}", channel_id, e), + } + } + + Ok(()) + } + pub fn scan_all_clients(&mut self, chain: &Chain, scan: &mut ChainScan) -> Result<(), Error> { + info!("scanning all clients..."); + let clients = query_all_clients(chain)?; for client in clients { if let Some(client_scan) = self.scan_client(chain, client)? { - scan.clients.push(client_scan); + scan.clients.insert(client_scan.id().clone(), client_scan); } } @@ -222,9 +344,11 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { chain: &Chain, client: IdentifiedAnyClientState, ) -> Result, Error> { - let span = info_span!("scan.client", client_id = %client.client_id); + let span = info_span!("scan.client", client = %client.client_id); let _guard = span.enter(); + info!("scanning client..."); + if !self.client_allowed(chain, &client) { warn!( trust_threshold = ?client.client_state.trust_threshold(), @@ -249,16 +373,14 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { let client_connections_ids = query_client_connections(chain, &client.client_id)?; - let mut scan = ClientScan { - client, - connections: Vec::new(), - }; + let mut scan = ClientScan::new(client); - for connection_id in client_connections_ids { + for connection_end in client_connections_ids { if let Some(connection_scan) = - self.scan_connection(chain, &scan.client, connection_id)? + self.scan_connection(chain, &scan.client, connection_end)? { - scan.connections.push(connection_scan); + scan.connections + .insert(connection_scan.id().clone(), connection_scan); } } @@ -269,37 +391,41 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { &mut self, chain: &Chain, client: &IdentifiedAnyClientState, - connection_end: IdentifiedConnectionEnd, + connection: IdentifiedConnectionEnd, ) -> Result, Error> { - let span = info_span!("scan.connection", connection = %connection_end.connection_id); + let span = info_span!("scan.connection", connection = %connection.connection_id); let _guard = span.enter(); - if !self.connection_allowed(chain, client, &connection_end) { + info!("scanning connection..."); + + if !self.connection_allowed(chain, client, &connection) { warn!("skipping connection, reason: connection is not allowed",); return Ok(None); } - let connection = &connection_end.connection_end; - let connection_id = &connection_end.connection_id; + let mut scan = ConnectionScan::new(connection, None); - if !connection.is_open() { - debug!("connection is not open, skipping scan of channels over this connection"); - return Ok(None); + if !scan.is_open() { + warn!("connection is not open, skipping scan of channels over this connection"); + return Ok(Some(scan)); } - let counterparty_state = match self.counterparty_connection_state(client, &connection_end) { - Err(e) => { - error!("error fetching counterparty connection state: {}", e); - return Ok(None); - } + let counterparty_state = match self.counterparty_connection_state(client, &scan.connection) + { Ok(state) if !state.eq(&ConnectionState::Open) => { warn!("counterparty connection is not open, skipping scan of channels over this connection"); + return Ok(Some(scan)); + } + Err(e) => { + error!("error fetching counterparty connection state: {}", e); return Ok(None); } Ok(state) => state, }; - let channels = match query_connection_channels(chain, connection_id) { + scan.counterparty_state = Some(counterparty_state); + + let channels = match query_connection_channels(chain, scan.connection.id()) { Ok(channels) => channels, Err(e) => { error!("failed to fetch connection channels: {}", e); @@ -317,21 +443,21 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { .filter(|channel| self.channel_allowed(chain, channel)) .map(|channel| { let counterparty = - channel_on_destination(&channel, &connection_end, &counterparty_chain) + channel_on_destination(&channel, &scan.connection, &counterparty_chain) .unwrap_or_default(); - ChannelScan { + let scan = ChannelScan { channel, counterparty, - } + }; + + (scan.id().clone(), scan) }) .collect(); - Ok(Some(ConnectionScan { - connection: connection_end, - counterparty_state, - channels, - })) + scan.channels = channels; + + Ok(Some(scan)) } fn counterparty_connection_state( @@ -348,12 +474,23 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { Ok(connection_state_on_destination(connection, &counterparty_chain).unwrap()) } - fn client_filter_enabled(&self) -> bool { + fn filtering_enabled(&self) -> bool { self.config.mode.packets.filter } + fn use_allow_list<'b>(&self, chain_config: &'b ChainConfig) -> Option<&'b ChannelsSpec> { + if !self.filtering_enabled() { + return None; + } + + match chain_config.packet_filter { + PacketFilter::Allow(ref spec) => Some(spec), + _ => None, + } + } + fn client_allowed(&mut self, chain: &Chain, client: &IdentifiedAnyClientState) -> bool { - if !self.client_filter_enabled() { + if !self.filtering_enabled() { return true; }; @@ -372,7 +509,7 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { client: &IdentifiedAnyClientState, connection: &IdentifiedConnectionEnd, ) -> bool { - if !self.client_filter_enabled() { + if !self.filtering_enabled() { return true; } @@ -387,8 +524,8 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { match permission { Ok(Permission::Deny) => { warn!( - "skipping workers for chain {}, client {} & conn {}. \ - reason: client or counterparty client is not allowed", + "skipping workers for chain {}, client {} & conn {}, \ + reason: client or counterparty client is not allowed", chain.id(), client.client_id, connection.connection_id @@ -398,7 +535,7 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { } Err(e) => { error!( - "skipping workers for chain {}, client {} & conn {}. reason: {}", + "skipping workers for chain {}, client {} & conn {}, reason: {}", chain.id(), client.client_id, connection.connection_id, @@ -417,33 +554,124 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { } } -// fn query_clients_for_channels( -// _chain: &C, -// _channels: Vec, -// ) -> Result, Error> { -// todo!() -// } - -// fn query_client_for_channel( -// _chain: &C, -// _channel: ChannelId, -// ) -> Result { -// todo!() -// } - -// fn query_channel( -// _chain: &C, -// _channel_id: ChannelId, -// ) -> Result, Error> { -// todo!() -// } - -// fn query_connection_for_channel( -// _chain: &C, -// _channel: IdentifiedChannelEnd, -// ) -> Result { -// todo!() -// } +struct ScannedChannel { + channel: IdentifiedChannelEnd, + counterparty_channel: Option, + connection: IdentifiedConnectionEnd, + counterparty_connection_state: Option, + client: IdentifiedAnyClientState, +} + +fn scan_allowed_channel( + registry: &mut SharedRegistry, + chain: &C, + port_id: &PortId, + channel_id: &ChannelId, +) -> Result { + let span = info_span!("scan.channel", port_id = %port_id, channel_id = %channel_id); + let _guard = span.enter(); + + info!("querying channel {}/{}...", port_id, channel_id); + let channel = query_channel(chain, port_id, channel_id)?; + let connection = query_connection_for_channel(chain, &channel)?; + let client_id = connection.connection_end.client_id(); + + info!( + "found connection {} and client {}", + connection.connection_id, client_id + ); + + info!("querying client {}...", client_id); + let client = query_client(chain, client_id)?; + + info!( + "client lives on counterparty chain {}", + client.client_state.chain_id() + ); + + let counterparty_chain = registry + .get_or_spawn(&client.client_state.chain_id()) + .map_err(Error::spawn)?; + + let counterparty_channel = + channel_on_destination(&channel, &connection, &counterparty_chain).unwrap_or_default(); + + info!( + "found counterparty channel {}", + counterparty_channel + .as_ref() + .map(|c| c.channel_id.to_string()) + .unwrap_or_else(|| "".to_string()) + ); + + let counterparty_connection_state = + connection_state_on_destination(&connection, &counterparty_chain) + .map(Some) + .unwrap_or_default(); + + info!( + "found counterparty channel {}", + counterparty_channel + .as_ref() + .map(|c| c.channel_id.to_string()) + .unwrap_or_else(|| "".to_string()) + ); + + Ok(ScannedChannel { + channel, + counterparty_channel, + connection, + counterparty_connection_state, + client, + }) +} + +fn query_client( + chain: &C, + client_id: &ClientId, +) -> Result { + let client = chain + .query_client_state(client_id, Height::zero()) + .map_err(Error::query)?; + + Ok(IdentifiedAnyClientState::new(client_id.clone(), client)) +} + +fn query_channel( + chain: &C, + port_id: &PortId, + channel_id: &ChannelId, +) -> Result { + let channel_end = chain + .query_channel(port_id, channel_id, Height::zero()) + .map_err(Error::query)?; + + Ok(IdentifiedChannelEnd::new( + port_id.clone(), + channel_id.clone(), + channel_end, + )) +} + +fn query_connection_for_channel( + chain: &C, + channel: &IdentifiedChannelEnd, +) -> Result { + let connection_id = channel + .channel_end + .connection_hops() + .first() + .cloned() + .ok_or_else(|| { + Error::missing_connection_hop( + channel.port_id.clone(), + channel.channel_id.clone(), + chain.id(), + ) + })?; + + query_connection(chain, &connection_id) +} fn query_all_clients(chain: &C) -> Result, Error> { let clients_req = QueryClientStatesRequest { @@ -467,7 +695,7 @@ fn query_client_connections( let connections = ids .into_iter() - .filter_map(|id| match query_connection(chain, id) { + .filter_map(|id| match query_connection(chain, &id) { Ok(connection) => Some(connection), Err(e) => { error!("failed to query connection: {}", e); @@ -481,14 +709,14 @@ fn query_client_connections( fn query_connection( chain: &C, - connection_id: ConnectionId, + connection_id: &ConnectionId, ) -> Result { let connection_end = chain - .query_connection(&connection_id, Height::zero()) + .query_connection(connection_id, Height::zero()) .map_err(Error::query)?; Ok(IdentifiedConnectionEnd { - connection_id, + connection_id: connection_id.clone(), connection_end, }) } diff --git a/relayer/src/supervisor/spawn.rs b/relayer/src/supervisor/spawn.rs index 10d244e5c8..c4e6d74e76 100644 --- a/relayer/src/supervisor/spawn.rs +++ b/relayer/src/supervisor/spawn.rs @@ -1,158 +1,62 @@ -use itertools::Itertools; -use tracing::{debug, error, warn}; - -use ibc::{ - core::{ - ics02_client::client_state::{ClientState, IdentifiedAnyClientState}, - ics03_connection::connection::{IdentifiedConnectionEnd, State as ConnectionState}, - ics04_channel::channel::{IdentifiedChannelEnd, State as ChannelState}, - ics24_host::identifier::{ChainId, ConnectionId}, - }, - Height, -}; +use tracing::{debug, error}; -use ibc_proto::ibc::core::{ - channel::v1::QueryConnectionChannelsRequest, client::v1::QueryClientStatesRequest, - connection::v1::QueryClientConnectionsRequest, +use ibc::core::{ + ics02_client::client_state::{ClientState, IdentifiedAnyClientState}, + ics03_connection::connection::IdentifiedConnectionEnd, + ics04_channel::channel::State as ChannelState, + ics24_host::identifier::ChainId, }; use crate::{ - chain::{ - counterparty::{channel_on_destination, connection_state_on_destination}, - handle::ChainHandle, - }, + chain::{counterparty::connection_state_on_destination, handle::ChainHandle}, config::Config, object::{Channel, Client, Connection, Object, Packet}, registry::SharedRegistry, - supervisor::client_state_filter::{FilterPolicy, Permission}, supervisor::error::Error as SupervisorError, worker::WorkerMap, }; -use super::{scan::ChainsScan, Error, RwArc}; -use crate::chain::counterparty::{unreceived_acknowledgements, unreceived_packets}; - -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub enum SpawnMode { - Startup, - Reload, -} +use super::{ + scan::{ChainScan, ChainsScan, ChannelScan, ClientScan, ConnectionScan}, + Error, RwArc, +}; /// A context for spawning workers within the [`crate::supervisor::Supervisor`]. pub struct SpawnContext<'a, Chain: ChainHandle> { config: RwArc, registry: SharedRegistry, workers: &'a mut WorkerMap, - client_state_filter: &'a mut FilterPolicy, - mode: SpawnMode, } impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { pub fn new( config: RwArc, registry: SharedRegistry, - client_state_filter: &'a mut FilterPolicy, workers: &'a mut WorkerMap, - mode: SpawnMode, ) -> Self { Self { config, registry, workers, - client_state_filter, - mode, } } - fn client_filter_enabled(&self) -> bool { - // Currently just a wrapper over the global filter. - self.config - .read() - .expect("poisoned lock") - .mode - .packets - .filter - } - pub fn spawn_workers(&mut self, scan: ChainsScan) { - let chain_ids = self - .config - .read() - .expect("poisoned lock") - .chains - .iter() - .map(|c| &c.id) - .cloned() - .collect_vec(); - - for chain_id in chain_ids { - self.spawn_workers_for_chain(&chain_id); - } - } - - pub fn spawn_workers_from_chain_to_chain( - &mut self, - from_chain_id: &ChainId, - to_chain_id: &ChainId, - ) { - let clients_req = QueryClientStatesRequest { - pagination: ibc_proto::cosmos::base::query::pagination::all(), - }; - - let chain = match self.registry.get_or_spawn(from_chain_id) { - Ok(chain_handle) => chain_handle, - Err(e) => { - error!( - "skipping workers for chain {}, reason: failed to spawn chain runtime with error: {}", - from_chain_id, e - ); - - return; - } - }; - - let clients = match chain.query_clients(clients_req) { - Ok(clients) => clients, - Err(e) => { - error!( - "skipping workers for chain {}, reason: failed to query clients with error: {}", - from_chain_id, e - ); - - return; - } - }; - - for client in clients { - if &client.client_state.chain_id() == to_chain_id { - self.spawn_workers_for_client(chain.clone(), client); + for chain_scan in scan.chains { + match chain_scan { + Ok(chain_scan) => self.spawn_workers_for_chain(chain_scan), + Err(e) => error!("failed to spawn worker for a chain, reason: {}", e), // TODO: Show chain id } } } - pub fn spawn_workers_for_chain(&mut self, chain_id: &ChainId) { - let clients_req = QueryClientStatesRequest { - pagination: ibc_proto::cosmos::base::query::pagination::all(), - }; - - let chain = match self.registry.get_or_spawn(chain_id) { + pub fn spawn_workers_for_chain(&mut self, scan: ChainScan) { + let chain = match self.registry.get_or_spawn(&scan.chain_id) { Ok(chain_handle) => chain_handle, Err(e) => { error!( "skipping workers for chain {}, reason: failed to spawn chain runtime with error: {}", - chain_id, e - ); - - return; - } - }; - - let clients = match chain.query_clients(clients_req) { - Ok(clients) => clients, - Err(e) => { - error!( - "skipping workers for chain {}, reason: failed to query clients with error: {}", - chain_id, e + scan.chain_id, e ); return; @@ -161,91 +65,14 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { println!("W: Chain: {}", chain.id()); - for client in clients { - self.spawn_workers_for_client(chain.clone(), client); - } - - if self.mode != SpawnMode::Reload { - return; - } - - let chain_ids = self - .config - .read() - .expect("poisoned lock") - .chains - .iter() - .map(|c| &c.id) - .cloned() - .collect_vec(); - - for id in chain_ids { - if chain_id == &id { - continue; - } - self.spawn_workers_from_chain_to_chain(&id, chain_id); + for (_, client_scan) in scan.clients { + self.spawn_workers_for_client(chain.clone(), client_scan); } } - pub fn spawn_workers_for_client(&mut self, chain: Chain, client: IdentifiedAnyClientState) { - // Potentially ignore the client - if self.client_filter_enabled() - && matches!( - self.client_state_filter.control_client( - &chain.id(), - &client.client_id, - &client.client_state - ), - Permission::Deny - ) - { - warn!( - "skipping workers for chain {}, client {}. \ - reason: client is not allowed (client trust level={:?})", - chain.id(), - client.client_id, - client.client_state.trust_threshold() - ); - - return; - } - - let counterparty_chain_id = client.client_state.chain_id(); - let has_counterparty = self - .config - .read() - .expect("poisoned lock") - .has_chain(&counterparty_chain_id); - - if !has_counterparty { - debug!( - "skipping client worker for client {} on chain {} has its counterparty ({}) is not present in config", - client.client_id, chain.id(), counterparty_chain_id - ); - - return; - } - - let chain_id = chain.id(); - - let conns_req = QueryClientConnectionsRequest { - client_id: client.client_id.to_string(), - }; - - let client_connections = match chain.query_client_connections(conns_req) { - Ok(connections) => connections, - Err(e) => { - error!( - "skipping workers for chain {}, reason: failed to query client connections for client {}: {}", - chain_id, client.client_id, e - ); - - return; - } - }; - - for connection_id in client_connections { - self.spawn_workers_for_connection(chain.clone(), &client, connection_id); + pub fn spawn_workers_for_client(&mut self, chain: Chain, client_scan: ClientScan) { + for (_, connection_scan) in client_scan.connections { + self.spawn_workers_for_connection(chain.clone(), &client_scan.client, connection_scan); } } @@ -253,119 +80,30 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { &mut self, chain: Chain, client: &IdentifiedAnyClientState, - connection_id: ConnectionId, + connection_scan: ConnectionScan, ) { - let chain_id = chain.id(); - - let connection_end = match chain.query_connection(&connection_id, Height::zero()) { - Ok(connection_end) => connection_end, - Err(e) => { - error!( - "skipping workers for chain {} and connection {}, reason: failed to query connection end: {}", - chain_id, connection_id, e - ); - return; - } - }; + let connection_id = connection_scan.id().clone(); - let connection = IdentifiedConnectionEnd { - connection_id: connection_id.clone(), - connection_end: connection_end.clone(), - }; - - // Apply the client state filter - if self.client_filter_enabled() { - match self.client_state_filter.control_connection_end_and_client( - &mut self.registry.write(), - &chain_id, - &client.client_state, - &connection_end, - &connection_id, - ) { - Ok(Permission::Deny) => { - warn!( - "skipping workers for chain {}, client {} & conn {}. \ - reason: client or counterparty client is not allowed", - chain_id, client.client_id, connection_id - ); - return; - } - Err(e) => { - error!( - "skipping workers for chain {}, client {} & conn {}. reason: {}", - chain_id, client.client_id, connection_id, e - ); - return; - } - _ => {} // allowed - } - } - - match self.spawn_connection_workers(chain.clone(), client.clone(), connection.clone()) { + match self.spawn_connection_workers( + chain.clone(), + client.clone(), + connection_scan.connection, + ) { Ok(()) => debug!( "done spawning workers for connection {} on chain {}", - connection.connection_id, + connection_id, chain.id(), ), Err(e) => error!( "skipped workers for connection {} on chain {}, reason: {}", - connection.connection_id, + connection_id, chain.id(), e ), } - if !connection_end.is_open() { - debug!( - "connection {} not open, skip workers for channels over this connection", - connection.connection_id - ); - return; - } - - match self.counterparty_connection_state(client, &connection) { - Err(e) => { - debug!("error with counterparty: reason {}", e); - return; - } - Ok(state) => { - if !state.eq(&ConnectionState::Open) { - debug!( - "connection {} not open, skip workers for channels over this connection", - connection.connection_id - ); - - debug!( - "drop connection {} because its counterparty is not open", - connection_id - ); - - return; - } - } - }; - - let chans_req = QueryConnectionChannelsRequest { - connection: connection_id.to_string(), - pagination: ibc_proto::cosmos::base::query::pagination::all(), - }; - - let connection_channels = match chain.query_connection_channels(chans_req) { - Ok(channels) => channels, - Err(e) => { - error!( - "skipping workers for chain {} and connection {}, reason: failed to query its channels: {}", - chain.id(), connection_id, e - ); - - return; - } - }; - - for channel in connection_channels { - let channel_id = channel.channel_id.clone(); - - match self.spawn_workers_for_channel(chain.clone(), client, &connection, channel) { + for (channel_id, channel_scan) in connection_scan.channels { + match self.spawn_workers_for_channel(chain.clone(), client, channel_scan) { Ok(()) => debug!( "done spawning workers for chain {} and channel {}", chain.id(), @@ -381,19 +119,6 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { } } - fn counterparty_connection_state( - &mut self, - client: &IdentifiedAnyClientState, - connection: &IdentifiedConnectionEnd, - ) -> Result { - let counterparty_chain = self - .registry - .get_or_spawn(&client.client_state.chain_id()) - .map_err(Error::spawn)?; - - connection_state_on_destination(connection, &counterparty_chain) - } - fn spawn_connection_workers( &mut self, chain: Chain, @@ -468,8 +193,7 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { &mut self, chain: Chain, client: &IdentifiedAnyClientState, - connection: &IdentifiedConnectionEnd, - channel: IdentifiedChannelEnd, + channel_scan: ChannelScan, ) -> Result<(), Error> { let mode = &self.config.read().expect("poisoned lock").mode; @@ -478,17 +202,15 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { .get_or_spawn(&client.client_state.chain_id()) .map_err(SupervisorError::spawn)?; - let counterparty_channel = - channel_on_destination(&channel, connection, &counterparty_chain)?; - - let chan_state_src = channel.channel_end.state; - let chan_state_dst = counterparty_channel + let chan_state_src = channel_scan.channel.channel_end.state; + let chan_state_dst = channel_scan + .counterparty .as_ref() .map_or(ChannelState::Uninitialized, |c| c.channel_end.state); debug!( "channel {} on chain {} is: {}; state on dest. chain ({}) is: {}", - channel.channel_id, + channel_scan.id(), chain.id(), chan_state_src, counterparty_chain.id(), @@ -498,7 +220,6 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { if (mode.clients.enabled || mode.packets.enabled) && chan_state_src.is_open() && chan_state_dst.is_open() - && self.relay_packets_on_channel(&chain, &channel) { if mode.clients.enabled { // Spawn the client worker @@ -521,18 +242,16 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { } if mode.packets.enabled { - // SAFETY: Safe to unwrap because the inner channel end has state open - let counterparty_channel = - counterparty_channel.expect("inner channel end is in state OPEN"); - - let has_packets = || -> bool { - !unreceived_packets(&counterparty_chain, &chain, &counterparty_channel) + let has_packets = || { + !channel_scan + .unreceived_packets_on_counterparty(&chain, &counterparty_chain) .unwrap_or_default() .is_empty() }; - let has_acks = || -> bool { - !unreceived_acknowledgements(&counterparty_chain, &chain, &counterparty_channel) + let has_acks = || { + !channel_scan + .unreceived_acknowledgements_on_counterparty(&chain, &counterparty_chain) .unwrap_or_default() .is_empty() }; @@ -543,11 +262,11 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { let path_object = Object::Packet(Packet { dst_chain_id: counterparty_chain.id(), src_chain_id: chain.id(), - src_channel_id: channel.channel_id.clone(), - src_port_id: channel.port_id, + src_channel_id: channel_scan.id().clone(), + src_port_id: channel_scan.channel.port_id.clone(), }); - println!("W: Channel: {}", channel.channel_id); + println!("W: Channel: {}", channel_scan.id()); self.workers .spawn( @@ -567,8 +286,8 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { let channel_object = Object::Channel(Channel { dst_chain_id: counterparty_chain.id(), src_chain_id: chain.id(), - src_channel_id: channel.channel_id, - src_port_id: channel.port_id, + src_channel_id: channel_scan.id().clone(), + src_port_id: channel_scan.channel.port_id, }); self.workers @@ -584,15 +303,6 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { Ok(()) } - fn relay_packets_on_channel( - &self, - chain: &impl ChainHandle, - channel: &IdentifiedChannelEnd, - ) -> bool { - let config = self.config.read().expect("poisoned lock"); - config.packets_on_channel_allowed(&chain.id(), &channel.port_id, &channel.channel_id) - } - pub fn shutdown_workers_for_chain(&mut self, chain_id: &ChainId) { let affected_workers = self.workers.objects_for_chain(chain_id); for object in affected_workers { From 4ffffb7269c842f37337c3a0938280f4f1e5bf72 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 6 Jan 2022 16:08:02 +0100 Subject: [PATCH 03/22] Formatting --- relayer/src/supervisor.rs | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index 2da1f4c954..9fd25efb1c 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -786,12 +786,7 @@ fn update_chain( ) -> CmdEffect { info!(chain.id=%chain_config.id, "updating existing chain"); - let removed = remove_chain( - config, - registry, - workers, - &chain_config.id, - ); + let removed = remove_chain(config, registry, workers, &chain_config.id); let added = add_chain(config, registry, workers, client_state_filter, chain_config); @@ -813,9 +808,7 @@ fn update_config( ConfigUpdate::Add(chain_config) => { add_chain(config, registry, workers, client_state_filter, chain_config) } - ConfigUpdate::Remove(id) => { - remove_chain(config, registry, workers, &id) - } + ConfigUpdate::Remove(id) => remove_chain(config, registry, workers, &id), ConfigUpdate::Update(chain_config) => { update_chain(config, registry, workers, client_state_filter, chain_config) } From 8f1a62c27df832e9a623097040a1861c2c9975d5 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 6 Jan 2022 16:55:36 +0100 Subject: [PATCH 04/22] Add `--full-scan` option to `start` command to force a full scan even when some chains are using allow lists --- relayer-cli/src/commands/start.rs | 25 ++++++++-- relayer/src/supervisor.rs | 47 ++++++++++++++----- relayer/src/supervisor/scan.rs | 29 ++++++++---- .../src/framework/overrides.rs | 12 ++++- .../src/tests/client_expiration.rs | 24 ++++++++-- 5 files changed, 106 insertions(+), 31 deletions(-) diff --git a/relayer-cli/src/commands/start.rs b/relayer-cli/src/commands/start.rs index 34ff8a29f6..0d930545d4 100644 --- a/relayer-cli/src/commands/start.rs +++ b/relayer-cli/src/commands/start.rs @@ -1,4 +1,5 @@ use alloc::sync::Arc; +use ibc_relayer::supervisor::SupervisorOptions; use std::error::Error; use std::io; use std::sync::RwLock; @@ -18,15 +19,22 @@ use crate::conclude::Output; use crate::prelude::*; #[derive(Clone, Command, Debug, Clap)] -pub struct StartCmd {} +pub struct StartCmd { + #[clap( + short = 'f', + long = "full-scan", + about = "Force a full scan of the chains for clients, connections and channels" + )] + full_scan: bool, +} impl Runnable for StartCmd { fn run(&self) { let config = (*app_config()).clone(); let config = Arc::new(RwLock::new(config)); - let supervisor_handle = - make_supervisor::(config.clone()).unwrap_or_else(|e| { + let supervisor_handle = make_supervisor::(config.clone(), self.full_scan) + .unwrap_or_else(|e| { Output::error(format!("Hermes failed to start, last error: {}", e)).exit(); unreachable!() }); @@ -175,11 +183,20 @@ fn spawn_telemetry_server( fn make_supervisor( config: Arc>, + force_full_scan: bool, ) -> Result> { let registry = SharedRegistry::::new(config.clone()); spawn_telemetry_server(&config)?; let rest = spawn_rest_server(&config); - Ok(spawn_supervisor(config, registry, rest, true)?) + Ok(spawn_supervisor( + config, + registry, + rest, + SupervisorOptions { + health_check: true, + force_full_scan, + }, + )?) } diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index 9fd25efb1c..ce0bf4e784 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -15,17 +15,22 @@ use ibc::{ Height, }; -use crate::util::lock::LockExt; -use crate::util::task::{spawn_background_task, Next, TaskError, TaskHandle}; use crate::{ chain::{handle::ChainHandle, HealthCheck}, config::{ChainConfig, Config}, - event, - event::monitor::{Error as EventError, ErrorDetail as EventErrorDetail, EventBatch}, + event::{ + self, + monitor::{Error as EventError, ErrorDetail as EventErrorDetail, EventBatch}, + }, object::Object, registry::{Registry, SharedRegistry}, rest, - util::try_recv_multiple, + supervisor::scan::ScanMode, + util::{ + lock::LockExt, + task::{spawn_background_task, Next, TaskError, TaskHandle}, + try_recv_multiple, + }, worker::WorkerMap, }; @@ -60,6 +65,18 @@ pub struct SupervisorHandle { tasks: Vec, } +/// Options for the supervisor +#[derive(Debug)] +pub struct SupervisorOptions { + /// Perform a health check of all chains we connect to + pub health_check: bool, + + /// Force a full scan of the chains for clients, connections, and channels, + /// even when an allow list is configured for a chain and the full scan could + /// be omitted. + pub force_full_scan: bool, +} + /** Spawn a supervisor for testing purpose using the provided [`SharedConfig`] and [`SharedRegistry`]. Returns a @@ -70,11 +87,11 @@ pub fn spawn_supervisor( config: Arc>, registry: SharedRegistry, rest_rx: Option, - do_health_check: bool, + options: SupervisorOptions, ) -> Result { let (sender, receiver) = unbounded(); - let tasks = spawn_supervisor_tasks(config, registry, rest_rx, receiver, do_health_check)?; + let tasks = spawn_supervisor_tasks(config, registry, rest_rx, receiver, options)?; Ok(SupervisorHandle { sender, tasks }) } @@ -108,9 +125,9 @@ pub fn spawn_supervisor_tasks( registry: SharedRegistry, rest_rx: Option, cmd_rx: Receiver, - do_health_check: bool, + options: SupervisorOptions, ) -> Result, Error> { - if do_health_check { + if options.health_check { health_check(&config.acquire_read(), &mut registry.write()); } @@ -121,6 +138,11 @@ pub fn spawn_supervisor_tasks( &config.acquire_read(), &mut registry.write(), &mut client_state_filter.acquire_write(), + if options.force_full_scan { + ScanMode::Full + } else { + ScanMode::Auto + }, ) .scan_chains(); @@ -464,8 +486,9 @@ fn chain_scanner<'a, Chain: ChainHandle>( config: &'a Config, registry: &'a mut Registry, client_state_filter: &'a mut FilterPolicy, + full_scan: ScanMode, ) -> ChainScanner<'a, Chain> { - ChainScanner::new(config, registry, client_state_filter) + ChainScanner::new(config, registry, client_state_filter, full_scan) } /// Perform a health check on all connected chains @@ -748,8 +771,8 @@ fn add_chain( debug!(chain.id=%id, "scanning chain"); - let scan_result = - chain_scanner(config, registry, client_state_filter).scan_chain(&chain_config); + let scan_result = chain_scanner(config, registry, client_state_filter, ScanMode::Auto) + .scan_chain(&chain_config); let scan = match scan_result { Ok(scan) => scan, diff --git a/relayer/src/supervisor/scan.rs b/relayer/src/supervisor/scan.rs index 8af8cf6aab..0d3e16e76c 100644 --- a/relayer/src/supervisor/scan.rs +++ b/relayer/src/supervisor/scan.rs @@ -217,10 +217,17 @@ impl ChannelScan { } } +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum ScanMode { + Auto, + Full, +} + pub struct ChainScanner<'a, Chain: ChainHandle> { config: &'a Config, registry: &'a mut Registry, client_state_filter: &'a mut FilterPolicy, + scan_mode: ScanMode, } impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { @@ -228,11 +235,13 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { config: &'a Config, registry: &'a mut Registry, client_state_filter: &'a mut FilterPolicy, + scan_mode: ScanMode, ) -> Self { Self { config, registry, client_state_filter, + scan_mode, } } @@ -268,16 +277,18 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { let mut scan = ChainScan::new(chain_config.id.clone()); - if let Some(spec) = self.use_allow_list(chain_config) { - info!("chain uses an allow list, skipping scan for fast startup"); - info!("allowed ports/channels: {}", spec); + match self.use_allow_list(chain_config) { + Some(spec) if self.scan_mode == ScanMode::Auto => { + info!("chain uses an allow list, skipping scan for fast startup"); + info!("allowed ports/channels: {}", spec); - self.query_allowed_channels(&chain, spec, &mut scan)?; - } else { - info!("scanning chain for all clients, connections and channels"); - - self.scan_all_clients(&chain, &mut scan)?; - } + self.query_allowed_channels(&chain, spec, &mut scan)?; + } + _ => { + info!("scanning chain for all clients, connections and channels"); + self.scan_all_clients(&chain, &mut scan)?; + } + }; Ok(scan) } diff --git a/tools/integration-test/src/framework/overrides.rs b/tools/integration-test/src/framework/overrides.rs index d98ec46dc8..21d006e1e2 100644 --- a/tools/integration-test/src/framework/overrides.rs +++ b/tools/integration-test/src/framework/overrides.rs @@ -7,6 +7,7 @@ use ibc_relayer::chain::handle::ChainHandle; use ibc_relayer::config::Config; use ibc_relayer::config::SharedConfig; use ibc_relayer::registry::SharedRegistry; +use ibc_relayer::supervisor::SupervisorOptions; use ibc_relayer::supervisor::{spawn_supervisor, SupervisorHandle}; use crate::error::Error; @@ -75,7 +76,16 @@ pub trait TestOverrides { config: &SharedConfig, registry: &SharedRegistry, ) -> Result, Error> { - let handle = spawn_supervisor(config.clone(), registry.clone(), None, false)?; + let handle = spawn_supervisor( + config.clone(), + registry.clone(), + None, + SupervisorOptions { + health_check: false, + force_full_scan: false, + }, + )?; + Ok(Some(handle)) } diff --git a/tools/integration-test/src/tests/client_expiration.rs b/tools/integration-test/src/tests/client_expiration.rs index 89fbe98791..10ae9b3d8e 100644 --- a/tools/integration-test/src/tests/client_expiration.rs +++ b/tools/integration-test/src/tests/client_expiration.rs @@ -2,7 +2,7 @@ use core::time::Duration; use ibc::core::ics03_connection::connection::State as ConnectionState; use ibc::core::ics04_channel::channel::State as ChannelState; use ibc_relayer::config::{self, Config, ModeConfig}; -use ibc_relayer::supervisor::{spawn_supervisor, SupervisorHandle}; +use ibc_relayer::supervisor::{spawn_supervisor, SupervisorHandle, SupervisorOptions}; use ibc_relayer::worker::client::spawn_refresh_client; use std::thread::sleep; @@ -151,8 +151,15 @@ impl BinaryChainTest for ChannelExpirationTest { wait_for_client_expiry(); - let _supervisor = - spawn_supervisor(chains.config.clone(), chains.registry.clone(), None, false)?; + let _supervisor = spawn_supervisor( + chains.config.clone(), + chains.registry.clone(), + None, + SupervisorOptions { + health_check: false, + force_full_scan: false, + }, + )?; let port_a = tagged_transfer_port(); let port_b = tagged_transfer_port(); @@ -297,8 +304,15 @@ impl BinaryChainTest for PacketExpirationTest { wait_for_client_expiry(); - let _supervisor = - spawn_supervisor(chains.config.clone(), chains.registry.clone(), None, false)?; + let _supervisor = spawn_supervisor( + chains.config.clone(), + chains.registry.clone(), + None, + SupervisorOptions { + health_check: false, + force_full_scan: false, + }, + )?; let denom_a = chains.node_a.denom(); let balance_a = chains From 25e57de6ee9845dde8c4743f05cedd28e347aec4 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 7 Jan 2022 15:57:53 +0100 Subject: [PATCH 05/22] Remove debug statements and print scanned chains on startup --- relayer/src/supervisor.rs | 3 +++ relayer/src/supervisor/scan.rs | 2 +- relayer/src/supervisor/spawn.rs | 8 -------- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index ce0bf4e784..b20d8dc6f1 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -146,6 +146,9 @@ pub fn spawn_supervisor_tasks( ) .scan_chains(); + info!("Scanned chains:"); + info!("{}", scan); + spawn_context( &config.acquire_read(), &mut registry.write(), diff --git a/relayer/src/supervisor/scan.rs b/relayer/src/supervisor/scan.rs index 0d3e16e76c..6937f4d9e9 100644 --- a/relayer/src/supervisor/scan.rs +++ b/relayer/src/supervisor/scan.rs @@ -72,7 +72,7 @@ pub struct ChainsScan { impl fmt::Display for ChainsScan { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { for scan in self.chains.iter().flatten() { - writeln!(f, "# Chain:{}", scan.chain_id)?; + writeln!(f, "# Chain: {}", scan.chain_id)?; for client in scan.clients.values() { writeln!(f, " - Client: {}", client.client.client_id)?; diff --git a/relayer/src/supervisor/spawn.rs b/relayer/src/supervisor/spawn.rs index 0a0887c4de..ec231e5a42 100644 --- a/relayer/src/supervisor/spawn.rs +++ b/relayer/src/supervisor/spawn.rs @@ -63,8 +63,6 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { } }; - println!("W: Chain: {}", chain.id()); - for (_, client_scan) in scan.clients { self.spawn_workers_for_client(chain.clone(), client_scan); } @@ -144,8 +142,6 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { conn_state_dst ); - println!("W: Connection: {}", connection.connection_id); - if conn_state_src.is_open() && conn_state_dst.is_open() { debug!( "connection {} on chain {} is already open, not spawning Connection worker", @@ -218,8 +214,6 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { src_chain_id: client.client_state.chain_id(), }); - println!("W: Client: {}", client.client_id); - self.workers .spawn( counterparty_chain.clone(), @@ -255,8 +249,6 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { src_port_id: channel_scan.channel.port_id.clone(), }); - println!("W: Channel: {}", channel_scan.id()); - self.workers .spawn( chain.clone(), From 600a0c6f3736f20adad38adb308b62d026509bd1 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 10 Jan 2022 16:15:13 +0100 Subject: [PATCH 06/22] Changelog entry --- .../unreleased/improvements/ibc-relayer/1536-fast-start.md | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 .changelog/unreleased/improvements/ibc-relayer/1536-fast-start.md diff --git a/.changelog/unreleased/improvements/ibc-relayer/1536-fast-start.md b/.changelog/unreleased/improvements/ibc-relayer/1536-fast-start.md new file mode 100644 index 0000000000..5a3639e116 --- /dev/null +++ b/.changelog/unreleased/improvements/ibc-relayer/1536-fast-start.md @@ -0,0 +1,4 @@ +- Reduce the startup time of the relayer by querying the necessary + information directly from the chain when one is configured with an + allowlist, rather than scan for all clients, connections and channels + ([#1536](https://github.com/informalsystems/ibc-rs/issues/1536)) \ No newline at end of file From 25d5119f82a220d9c438d77a5ad99dd698684bf3 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Wed, 19 Jan 2022 14:35:21 +0100 Subject: [PATCH 07/22] Fix duplicate info message --- relayer/src/supervisor/scan.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/relayer/src/supervisor/scan.rs b/relayer/src/supervisor/scan.rs index 6937f4d9e9..41adf72f6e 100644 --- a/relayer/src/supervisor/scan.rs +++ b/relayer/src/supervisor/scan.rs @@ -608,7 +608,7 @@ fn scan_allowed_channel( channel_on_destination(&channel, &connection, &counterparty_chain).unwrap_or_default(); info!( - "found counterparty channel {}", + "found counterparty channel: {}", counterparty_channel .as_ref() .map(|c| c.channel_id.to_string()) @@ -621,10 +621,10 @@ fn scan_allowed_channel( .unwrap_or_default(); info!( - "found counterparty channel {}", - counterparty_channel + "found counterparty connection state: {}", + counterparty_connection_state .as_ref() - .map(|c| c.channel_id.to_string()) + .map(|s| s.to_string()) .unwrap_or_else(|| "".to_string()) ); From 59d744f2c04815fe0ccc32121d542bdccdae795a Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Wed, 19 Jan 2022 14:43:11 +0100 Subject: [PATCH 08/22] Quote identifiers in log messages --- relayer/src/supervisor/scan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/src/supervisor/scan.rs b/relayer/src/supervisor/scan.rs index 41adf72f6e..d761b75b36 100644 --- a/relayer/src/supervisor/scan.rs +++ b/relayer/src/supervisor/scan.rs @@ -329,7 +329,7 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { .entry(channel.channel_id.clone()) .or_insert_with(|| ChannelScan::new(channel, counterparty_channel)); } - Err(e) => error!("failed to scan channel {}, reason: {}", channel_id, e), + Err(e) => error!("failed to scan channel '{}', reason: {}", channel_id, e), } } From 23924e99b5a148ca50a2923bf107f5544b712dca Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Wed, 19 Jan 2022 14:52:11 +0100 Subject: [PATCH 09/22] Better error when port/channel does not exists --- relayer/src/supervisor/scan.rs | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/relayer/src/supervisor/scan.rs b/relayer/src/supervisor/scan.rs index d761b75b36..0c8b680633 100644 --- a/relayer/src/supervisor/scan.rs +++ b/relayer/src/supervisor/scan.rs @@ -60,7 +60,20 @@ flex_error::define_error! { "could not retrieve the connection hop underlying port/channel {}/{} on chain '{}'", e.port_id, e.channel_id, e.chain_id ) + }, + + UninitializedChannel + { + port_id: PortId, + channel_id: ChannelId, + chain_id: ChainId, } + |e| { + format_args!( + "channel '{}/{}' on chain '{}' is uninitialized", + e.port_id, e.channel_id, e.chain_id + ) + }, } } @@ -582,8 +595,20 @@ fn scan_allowed_channel( let span = info_span!("scan.channel", port_id = %port_id, channel_id = %channel_id); let _guard = span.enter(); - info!("querying channel {}/{}...", port_id, channel_id); + info!("querying channel '{}/{}'...", port_id, channel_id); let channel = query_channel(chain, port_id, channel_id)?; + + if channel + .channel_end + .state_matches(&ChannelState::Uninitialized) + { + return Err(Error::uninitialized_channel( + port_id.clone(), + channel_id.clone(), + chain.id(), + )); + } + let connection = query_connection_for_channel(chain, &channel)?; let client_id = connection.connection_end.client_id(); From 56ff22349b018402bd58d0aaea47590ff12a3bdb Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 20 Jan 2022 10:03:06 +0100 Subject: [PATCH 10/22] Add metrics for queries --- relayer/src/chain/cosmos.rs | 40 +++++++++++++++++++++++++++++++++++-- telemetry/src/state.rs | 19 ++++++++++++++++++ 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index 5161b5e438..1bcce73546 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -285,6 +285,7 @@ impl CosmosSdkChain { /// Specific to the SDK and used only for Tendermint client create pub fn query_consensus_params(&self) -> Result { crate::time!("query_consensus_params"); + crate::telemetry!(query, self.id(), "query_consensus_params"); Ok(self .block_on(self.rpc_client().genesis()) @@ -908,6 +909,9 @@ impl CosmosSdkChain { /// Query the chain's latest height pub fn query_latest_height(&self) -> Result { + crate::time!("query_latest_height"); + crate::telemetry!(query, self.id(), "query_latest_height"); + let status = self.status()?; Ok(ICSHeight { revision_number: ChainId::chain_version(status.node_info.network.as_str()), @@ -1190,6 +1194,7 @@ impl ChainEndpoint for CosmosSdkChain { fn query_commitment_prefix(&self) -> Result { crate::time!("query_commitment_prefix"); + crate::telemetry!(query, self.id(), "query_commitment_prefix"); // TODO - do a real chain query CommitmentPrefix::try_from(self.config().store_prefix.as_bytes().to_vec()) @@ -1199,6 +1204,8 @@ impl ChainEndpoint for CosmosSdkChain { /// Query the chain status fn query_status(&self) -> Result { crate::time!("query_status"); + crate::telemetry!(query, self.id(), "query_status"); + let status = self.status()?; let time = status.sync_info.latest_block_time; @@ -1218,6 +1225,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryClientStatesRequest, ) -> Result, Error> { crate::time!("query_clients"); + crate::telemetry!(query, self.id(), "query_clients"); let mut client = self .block_on( @@ -1252,6 +1260,7 @@ impl ChainEndpoint for CosmosSdkChain { height: ICSHeight, ) -> Result { crate::time!("query_client_state"); + crate::telemetry!(query, self.id(), "query_client_state"); let client_state = self .query(ClientStatePath(client_id.clone()), height, false) @@ -1265,6 +1274,7 @@ impl ChainEndpoint for CosmosSdkChain { height: ICSHeight, ) -> Result<(AnyClientState, MerkleProof), Error> { crate::time!("query_upgraded_client_state"); + crate::telemetry!(query, self.id(), "query_upgraded_client_state"); // Query for the value and the proof. let tm_height = Height::try_from(height.revision_height).map_err(Error::invalid_height)?; @@ -1284,6 +1294,7 @@ impl ChainEndpoint for CosmosSdkChain { height: ICSHeight, ) -> Result<(AnyConsensusState, MerkleProof), Error> { crate::time!("query_upgraded_consensus_state"); + crate::telemetry!(query, self.id(), "query_upgraded_consensus_state"); let tm_height = Height::try_from(height.revision_height).map_err(Error::invalid_height)?; @@ -1305,6 +1316,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryConsensusStatesRequest, ) -> Result, Error> { crate::time!("query_consensus_states"); + crate::telemetry!(query, self.id(), "query_consensus_states"); let mut client = self .block_on( @@ -1337,6 +1349,7 @@ impl ChainEndpoint for CosmosSdkChain { query_height: ICSHeight, ) -> Result { crate::time!("query_consensus_state"); + crate::telemetry!(query, self.id(), "query_consensus_state"); let (consensus_state, _proof) = self.proven_client_consensus(&client_id, consensus_height, query_height)?; @@ -1348,7 +1361,8 @@ impl ChainEndpoint for CosmosSdkChain { &self, request: QueryClientConnectionsRequest, ) -> Result, Error> { - crate::time!("query_connections"); + crate::time!("query_client_connections"); + crate::telemetry!(query, self.id(), "query_client_connections"); let mut client = self .block_on( @@ -1383,6 +1397,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryConnectionsRequest, ) -> Result, Error> { crate::time!("query_connections"); + crate::telemetry!(query, self.id(), "query_connections"); let mut client = self .block_on( @@ -1416,6 +1431,9 @@ impl ChainEndpoint for CosmosSdkChain { connection_id: &ConnectionId, height: ICSHeight, ) -> Result { + crate::time!("query_connection"); + crate::telemetry!(query, self.id(), "query_connection"); + async fn do_query_connection( chain: &CosmosSdkChain, connection_id: &ConnectionId, @@ -1473,6 +1491,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryConnectionChannelsRequest, ) -> Result, Error> { crate::time!("query_connection_channels"); + crate::telemetry!(query, self.id(), "query_connection_channels"); let mut client = self .block_on( @@ -1504,7 +1523,8 @@ impl ChainEndpoint for CosmosSdkChain { &self, request: QueryChannelsRequest, ) -> Result, Error> { - crate::time!("query_connections"); + crate::time!("query_channels"); + crate::telemetry!(query, self.id(), "query_channels"); let mut client = self .block_on( @@ -1535,6 +1555,9 @@ impl ChainEndpoint for CosmosSdkChain { channel_id: &ChannelId, height: ICSHeight, ) -> Result { + crate::time!("query_channel"); + crate::telemetry!(query, self.id(), "query_channel"); + let res = self.query( ChannelEndsPath(port_id.clone(), channel_id.clone()), height, @@ -1550,6 +1573,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryChannelClientStateRequest, ) -> Result, Error> { crate::time!("query_channel_client_state"); + crate::telemetry!(query, self.id(), "query_channel_client_state"); let mut client = self .block_on( @@ -1579,6 +1603,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryPacketCommitmentsRequest, ) -> Result<(Vec, ICSHeight), Error> { crate::time!("query_packet_commitments"); + crate::telemetry!(query, self.id(), "query_packet_commitments"); let mut client = self .block_on( @@ -1612,6 +1637,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryUnreceivedPacketsRequest, ) -> Result, Error> { crate::time!("query_unreceived_packets"); + crate::telemetry!(query, self.id(), "query_unreceived_packets"); let mut client = self .block_on( @@ -1638,6 +1664,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryPacketAcknowledgementsRequest, ) -> Result<(Vec, ICSHeight), Error> { crate::time!("query_packet_acknowledgements"); + crate::telemetry!(query, self.id(), "query_packet_acknowledgements"); let mut client = self .block_on( @@ -1670,6 +1697,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryUnreceivedAcksRequest, ) -> Result, Error> { crate::time!("query_unreceived_acknowledgements"); + crate::telemetry!(query, self.id(), "query_unreceived_acknowledgements"); let mut client = self .block_on( @@ -1695,6 +1723,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryNextSequenceReceiveRequest, ) -> Result { crate::time!("query_next_sequence_receive"); + crate::telemetry!(query, self.id(), "query_next_sequence_receive"); let mut client = self .block_on( @@ -1727,6 +1756,7 @@ impl ChainEndpoint for CosmosSdkChain { /// packets ever sent. fn query_txs(&self, request: QueryTxRequest) -> Result, Error> { crate::time!("query_txs"); + crate::telemetry!(query, self.id(), "query_txs"); match request { QueryTxRequest::Packet(request) => { @@ -1828,6 +1858,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryBlockRequest, ) -> Result<(Vec, Vec), Error> { crate::time!("query_blocks"); + crate::telemetry!(query, self.id(), "query_blocks"); match request { QueryBlockRequest::Packet(request) => { @@ -2066,6 +2097,9 @@ impl ChainEndpoint for CosmosSdkChain { } fn query_app_version(&self, request: AppVersion) -> Result { + crate::time!("query_app_version"); + crate::telemetry!(query, self.id(), "query_app_version"); + use ibc_proto::ibc::core::port::v1::query_client::QueryClient; let mut client = self @@ -2304,6 +2338,8 @@ pub async fn broadcast_tx_sync( /// Uses the GRPC client to retrieve the account sequence async fn query_account(chain: &CosmosSdkChain, address: String) -> Result { + crate::telemetry!(query, chain.id(), "query_account"); + let mut client = ibc_proto::cosmos::auth::v1beta1::query_client::QueryClient::connect( chain.grpc_addr.clone(), ) diff --git a/telemetry/src/state.rs b/telemetry/src/state.rs index b0214d7b0c..88715b2230 100644 --- a/telemetry/src/state.rs +++ b/telemetry/src/state.rs @@ -50,6 +50,9 @@ pub struct TelemetryState { /// Number of timeout packets relayed, per channel timeout_packets: Counter, + + /// Number of queries emitted by the relayer, per chain and query type + queries: Counter, } impl TelemetryState { @@ -132,6 +135,15 @@ impl TelemetryState { self.timeout_packets.add(count, labels); } + + pub fn query(&self, chain_id: &ChainId, query_type: &'static str) { + let labels = &[ + KeyValue::new("chain", chain_id.to_string()), + KeyValue::new("query_type", query_type), + ]; + + self.queries.add(1, labels); + } } impl Default for TelemetryState { @@ -171,6 +183,13 @@ impl Default for TelemetryState { .u64_counter("ibc_timeout_packets") .with_description("Number of timeout packets relayed per channel") .init(), + + queries: meter + .u64_counter("relayer_queries") + .with_description( + "Number of queries emitted by the relayer, per chain and query type", + ) + .init(), } } } From 3d450d99374c6298733afba06694a3820c1d8594 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 20 Jan 2022 10:19:09 +0100 Subject: [PATCH 11/22] Small log improvements --- relayer/src/chain/cosmos.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index 1bcce73546..82851f857c 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -330,7 +330,10 @@ impl CosmosSdkChain { let estimated_gas = self.estimate_gas(simulate_tx)?; if estimated_gas > self.max_gas() { - debug!(estimated = ?estimated_gas, max = ?self.max_gas(), "[{}] send_tx: estimated gas is higher than max gas", self.id()); + debug!( + id = %self.id(), estimated = ?estimated_gas, max = ?self.max_gas(), + "send_tx: estimated gas is higher than max gas" + ); return Err(Error::tx_simulate_gas_estimate_exceeded( self.id().clone(), @@ -342,7 +345,8 @@ impl CosmosSdkChain { let adjusted_fee = self.fee_with_gas(estimated_gas); debug!( - "using {} gas, fee {}", + id = %self.id(), + "send_tx: using {} gas, fee {}", estimated_gas, PrettyFee(&adjusted_fee) ); @@ -818,8 +822,8 @@ impl CosmosSdkChain { .join(", "); info!( - "[{}] waiting for commit of tx hashes(s) {}", - self.id(), + id = %self.id(), + "wait_for_block_commits: waiting for commit of tx hashes(s) {}", hashes ); @@ -832,8 +836,8 @@ impl CosmosSdkChain { |index| { if all_tx_results_found(&tx_sync_results) { trace!( - "[{}] wait_for_block_commits: retrieved {} tx results after {} tries ({}ms)", - self.id(), + id = %self.id(), + "wait_for_block_commits: retrieved {} tx results after {} tries ({}ms)", tx_sync_results.len(), index, start.elapsed().as_millis() From 5ea4f27bf98f00ba73b92522f69c508f455416f5 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 20 Jan 2022 10:31:25 +0100 Subject: [PATCH 12/22] Rename queries metric --- telemetry/src/state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/telemetry/src/state.rs b/telemetry/src/state.rs index 88715b2230..25ddbfc240 100644 --- a/telemetry/src/state.rs +++ b/telemetry/src/state.rs @@ -185,7 +185,7 @@ impl Default for TelemetryState { .init(), queries: meter - .u64_counter("relayer_queries") + .u64_counter("queries") .with_description( "Number of queries emitted by the relayer, per chain and query type", ) From a414523df8e9b49e5e9d071ed219fef671d4aa59 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 20 Jan 2022 10:44:59 +0100 Subject: [PATCH 13/22] Use `chain` key for recording chain identifier in tracing logs --- relayer/src/supervisor.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index 7fcfb065a1..1ecd3e61b0 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -716,18 +716,18 @@ fn remove_chain( id: &ChainId, ) -> CmdEffect { if !config.has_chain(id) { - info!(chain.id=%id, "skipping removal of non-existing chain"); + info!(chain = %id, "skipping removal of non-existing chain"); return CmdEffect::Nothing; } - info!(chain.id=%id, "removing existing chain"); + info!(chain = %id, "removing existing chain"); config.chains.retain(|c| &c.id != id); - debug!(chain.id=%id, "shutting down workers"); + debug!(chain = %id, "shutting down workers"); let mut ctx = spawn_context(config, registry, workers); ctx.shutdown_workers_for_chain(id); - debug!(chain.id=%id, "shutting down chain runtime"); + debug!(chain = %id, "shutting down chain runtime"); registry.shutdown(id); CmdEffect::ConfigChanged @@ -748,15 +748,15 @@ fn add_chain( let id = chain_config.id.clone(); if config.has_chain(&id) { - info!(chain.id=%id, "skipping addition of already existing chain"); + info!(chain = %id, "skipping addition of already existing chain"); return CmdEffect::Nothing; } - info!(chain.id=%id, "adding new chain"); + info!(chain = %id, "adding new chain"); config.chains.push(chain_config.clone()); - debug!(chain.id=%id, "spawning chain runtime"); + debug!(chain = %id, "spawning chain runtime"); if let Err(e) = registry.spawn(&id) { error!( @@ -770,7 +770,7 @@ fn add_chain( return CmdEffect::Nothing; } - debug!(chain.id=%id, "scanning chain"); + debug!(chain = %id, "scanning chain"); let scan_result = chain_scanner(config, registry, client_state_filter, ScanMode::Auto) .scan_chain(&chain_config); @@ -787,7 +787,7 @@ fn add_chain( } }; - debug!(chain.id=%id, "spawning workers"); + debug!(chain = %id, "spawning workers"); let mut ctx = spawn_context(config, registry, workers); ctx.spawn_workers_for_chain(scan); @@ -808,7 +808,7 @@ fn update_chain( client_state_filter: &mut FilterPolicy, chain_config: ChainConfig, ) -> CmdEffect { - info!(chain.id=%chain_config.id, "updating existing chain"); + info!(chain = %chain_config.id, "updating existing chain"); let removed = remove_chain(config, registry, workers, &chain_config.id); From 496d73524d87b16851600bcb75ba3020e6bd68aa Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 20 Jan 2022 10:45:21 +0100 Subject: [PATCH 14/22] Use more structured logging in chain scanner --- relayer/src/supervisor/scan.rs | 47 +++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/relayer/src/supervisor/scan.rs b/relayer/src/supervisor/scan.rs index 0c8b680633..9bf0f9f3e5 100644 --- a/relayer/src/supervisor/scan.rs +++ b/relayer/src/supervisor/scan.rs @@ -271,7 +271,7 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { } pub fn scan_chain(&mut self, chain_config: &ChainConfig) -> Result { - let span = info_span!("scan.chain", chain = %chain_config.id, ); + let span = info_span!("scan.chain", chain = %chain_config.id); let _guard = span.enter(); info!("scanning chain..."); @@ -342,7 +342,7 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { .entry(channel.channel_id.clone()) .or_insert_with(|| ChannelScan::new(channel, counterparty_channel)); } - Err(e) => error!("failed to scan channel '{}', reason: {}", channel_id, e), + Err(e) => error!(channel = %channel_id, "failed to scan channel, reason: {}", e), } } @@ -387,8 +387,8 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { if !has_counterparty { debug!( - chain_id = %chain.id(), - counterparty_chain_id = %counterparty_chain_id, + chain = %chain.id(), + counterparty_chain = %counterparty_chain_id, "skipping client because its counterparty is not present in the config", ); @@ -592,10 +592,10 @@ fn scan_allowed_channel( port_id: &PortId, channel_id: &ChannelId, ) -> Result { - let span = info_span!("scan.channel", port_id = %port_id, channel_id = %channel_id); + let span = info_span!("scan.channel", port = %port_id, channel = %channel_id); let _guard = span.enter(); - info!("querying channel '{}/{}'...", port_id, channel_id); + info!("querying channel..."); let channel = query_channel(chain, port_id, channel_id)?; if channel @@ -613,16 +613,17 @@ fn scan_allowed_channel( let client_id = connection.connection_end.client_id(); info!( - "found connection {} and client {}", - connection.connection_id, client_id + connection = %connection.connection_id, client = %client_id, + "found connection and client", ); - info!("querying client {}...", client_id); + info!(client = %client_id, "querying client..."); let client = query_client(chain, client_id)?; info!( - "client lives on counterparty chain {}", - client.client_state.chain_id() + client = %client_id, + counterparty_chain = %client.client_state.chain_id(), + "found counterparty chain for client", ); let counterparty_chain = registry @@ -632,12 +633,14 @@ fn scan_allowed_channel( let counterparty_channel = channel_on_destination(&channel, &connection, &counterparty_chain).unwrap_or_default(); + let counterparty_channel_name = counterparty_channel + .as_ref() + .map(|c| c.channel_id.to_string()) + .unwrap_or_else(|| "".to_string()); + info!( - "found counterparty channel: {}", - counterparty_channel - .as_ref() - .map(|c| c.channel_id.to_string()) - .unwrap_or_else(|| "".to_string()) + counterparty_channel = %counterparty_channel_name, + "found counterparty channel" ); let counterparty_connection_state = @@ -645,12 +648,14 @@ fn scan_allowed_channel( .map(Some) .unwrap_or_default(); + let counterparty_connection_name = counterparty_connection_state + .as_ref() + .map(|s| s.to_string()) + .unwrap_or_else(|| "".to_string()); + info!( - "found counterparty connection state: {}", - counterparty_connection_state - .as_ref() - .map(|s| s.to_string()) - .unwrap_or_else(|| "".to_string()) + counterparty_connection_state = %counterparty_connection_name, + "found counterparty connection state" ); Ok(ScannedChannel { From d763b4d728cf76408f4ec1cc1d0978db913fe07b Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 20 Jan 2022 10:52:58 +0100 Subject: [PATCH 15/22] Fix changelog entry --- .../ibc-relayer/1481-chainendpoint-any-consensus-state.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.changelog/unreleased/improvements/ibc-relayer/1481-chainendpoint-any-consensus-state.md b/.changelog/unreleased/improvements/ibc-relayer/1481-chainendpoint-any-consensus-state.md index bf933e02b5..88b2edcc3f 100644 --- a/.changelog/unreleased/improvements/ibc-relayer/1481-chainendpoint-any-consensus-state.md +++ b/.changelog/unreleased/improvements/ibc-relayer/1481-chainendpoint-any-consensus-state.md @@ -1,3 +1,2 @@ - Allow `ChainEndpoint` implementations to fetch any types of clients - and consensus states ([#1481](https://github.com/informalsystems/ibc- - rs/issues/1481)) \ No newline at end of file + and consensus states ([#1481](https://github.com/informalsystems/ibc-rs/issues/1481)) From 8a36fff3cabf3c21007d674dbbe62e5ad1af46d2 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 20 Jan 2022 15:59:23 +0100 Subject: [PATCH 16/22] Improve logs when no workers were spawned --- relayer/src/supervisor/spawn.rs | 34 +++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/relayer/src/supervisor/spawn.rs b/relayer/src/supervisor/spawn.rs index 1ff460c7ab..daa26ed743 100644 --- a/relayer/src/supervisor/spawn.rs +++ b/relayer/src/supervisor/spawn.rs @@ -87,11 +87,16 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { client.clone(), connection_scan.connection, ) { - Ok(()) => debug!( + Ok(true) => debug!( "done spawning workers for connection {} on chain {}", connection_id, chain.id(), ), + Ok(false) => debug!( + "no workers were spawn for connection {} on chain {}", + connection_id, + chain.id(), + ), Err(e) => error!( "skipped workers for connection {} on chain {}, reason: {}", connection_id, @@ -102,11 +107,16 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { for (channel_id, channel_scan) in connection_scan.channels { match self.spawn_workers_for_channel(chain.clone(), client, channel_scan) { - Ok(()) => debug!( + Ok(true) => debug!( "done spawning workers for chain {} and channel {}", chain.id(), channel_id, ), + Ok(false) => debug!( + "no workers spawn for chain {} and channel {}", + chain.id(), + channel_id, + ), Err(e) => error!( "skipped workers for chain {} and channel {} due to error {}", chain.id(), @@ -122,7 +132,7 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { chain: Chain, client: IdentifiedAnyClientState, connection: IdentifiedConnectionEnd, - ) -> Result<(), Error> { + ) -> Result { let config_conn_enabled = self.config.mode.connections.enabled; let counterparty_chain = self @@ -148,6 +158,8 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { connection.connection_id, chain.id() ); + + Ok(false) } else if config_conn_enabled && !conn_state_dst.is_open() && conn_state_dst.less_or_equal_progress(conn_state_src) @@ -167,9 +179,11 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { connection_object.short_name() ); }); - } - Ok(()) + Ok(true) + } else { + Ok(false) + } } /// Spawns all the [`Worker`](crate::worker::Worker)s that will @@ -179,7 +193,7 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { chain: Chain, client: &IdentifiedAnyClientState, channel_scan: ChannelScan, - ) -> Result<(), Error> { + ) -> Result { let mode = &self.config.mode; let counterparty_chain = self @@ -259,6 +273,8 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { .then(|| debug!("spawned Packet worker: {}", path_object.short_name())); } } + + Ok(mode.clients.enabled) } else if mode.channels.enabled && !chan_state_dst.is_open() && chan_state_dst.less_or_equal_progress(chan_state_src) @@ -274,9 +290,11 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { self.workers .spawn(chain, counterparty_chain, &channel_object, self.config) .then(|| debug!("spawned Channel worker: {}", channel_object.short_name())); - } - Ok(()) + Ok(true) + } else { + Ok(false) + } } pub fn shutdown_workers_for_chain(&mut self, chain_id: &ChainId) { From 8f49559fd59c7bc76593757c8f99ca387b710802 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 20 Jan 2022 16:02:35 +0100 Subject: [PATCH 17/22] Improve logs when spawning connection and channel workers --- relayer/src/util/task.rs | 13 +++++++------ relayer/src/worker/channel.rs | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/relayer/src/util/task.rs b/relayer/src/util/task.rs index 919e4e3212..55c573e769 100644 --- a/relayer/src/util/task.rs +++ b/relayer/src/util/task.rs @@ -4,7 +4,7 @@ use core::time::Duration; use crossbeam_channel::{bounded, Sender}; use std::sync::{Arc, RwLock}; use std::thread; -use tracing::{error, info, warn}; +use tracing::{debug, error, warn}; use crate::util::lock::LockExt; @@ -90,7 +90,7 @@ pub fn spawn_background_task( interval_pause: Option, mut step_runner: impl FnMut() -> Result> + Send + Sync + 'static, ) -> TaskHandle { - info!(parent: &span, "spawning"); + debug!(parent: &span, "spawning task"); let stopped = Arc::new(RwLock::new(false)); let write_stopped = stopped.clone(); @@ -107,14 +107,14 @@ pub fn spawn_background_task( _ => match step_runner() { Ok(Next::Continue) => {} Ok(Next::Abort) => { - info!("aborting"); + debug!("aborting task"); break; } Err(TaskError::Ignore(e)) => { - warn!("encountered ignorable error: {}", e); + warn!("task encountered ignorable error: {}", e); } Err(TaskError::Fatal(e)) => { - error!("aborting after encountering fatal error: {}", e); + error!("task aborting after encountering fatal error: {}", e); break; } }, @@ -125,7 +125,8 @@ pub fn spawn_background_task( } *write_stopped.acquire_write() = true; - info!("terminated"); + + debug!("task terminated"); }); TaskHandle { diff --git a/relayer/src/worker/channel.rs b/relayer/src/worker/channel.rs index c0b9571f38..44ee9e3433 100644 --- a/relayer/src/worker/channel.rs +++ b/relayer/src/worker/channel.rs @@ -20,7 +20,7 @@ pub fn spawn_channel_worker( cmd_rx: Receiver, ) -> TaskHandle { spawn_background_task( - error_span!("channel", channel = %channel.short_name()), + error_span!("worker.channel", object = %channel.short_name()), Some(Duration::from_millis(200)), move || { if let Ok(cmd) = cmd_rx.try_recv() { From 763aaa0faefeafae2c4a556290be8154cc800ced Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 20 Jan 2022 16:20:25 +0100 Subject: [PATCH 18/22] Remove spaces in objects names --- relayer/src/object.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/relayer/src/object.rs b/relayer/src/object.rs index a240bab421..c830379402 100644 --- a/relayer/src/object.rs +++ b/relayer/src/object.rs @@ -63,7 +63,7 @@ pub struct Connection { impl Connection { pub fn short_name(&self) -> String { format!( - "connection::{}:{} -> {}", + "connection::{}:{}->{}", self.src_connection_id, self.src_chain_id, self.dst_chain_id, ) } @@ -88,7 +88,7 @@ pub struct Channel { impl Channel { pub fn short_name(&self) -> String { format!( - "channel::{}/{}:{} -> {}", + "channel::{}/{}:{}->{}", self.src_channel_id, self.src_port_id, self.src_chain_id, self.dst_chain_id, ) } From 111150c084948d5eb71df8467b93e66e85f5bee6 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 20 Jan 2022 16:27:27 +0100 Subject: [PATCH 19/22] Add changelog entry --- .changelog/unreleased/improvements/1536-fast-start.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/unreleased/improvements/1536-fast-start.md diff --git a/.changelog/unreleased/improvements/1536-fast-start.md b/.changelog/unreleased/improvements/1536-fast-start.md new file mode 100644 index 0000000000..4141495364 --- /dev/null +++ b/.changelog/unreleased/improvements/1536-fast-start.md @@ -0,0 +1,3 @@ +- Improve startup time of the relayer + - When scanning a chain with filtering enabled and an allow list, skip scanning all the clients and query the allowed channels directly. This results in much fewer queries and a faster start. + - Add a `--full-scan` option to `hermes start` to opt out of the fast start mechanism and do a full scan. From 48616b8d5950c582bb45c87308c60f106ef7d757 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 24 Jan 2022 11:38:20 +0100 Subject: [PATCH 20/22] Revert part of logs changes --- relayer/src/worker/channel.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/src/worker/channel.rs b/relayer/src/worker/channel.rs index 44ee9e3433..aed739e1aa 100644 --- a/relayer/src/worker/channel.rs +++ b/relayer/src/worker/channel.rs @@ -20,7 +20,7 @@ pub fn spawn_channel_worker( cmd_rx: Receiver, ) -> TaskHandle { spawn_background_task( - error_span!("worker.channel", object = %channel.short_name()), + error_span!("worker.channel", channel = %channel.short_name()), Some(Duration::from_millis(200)), move || { if let Ok(cmd) = cmd_rx.try_recv() { From c43ce1a64002420f1a5b5df1b6b828f34eb55ab0 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 25 Jan 2022 17:12:59 +0100 Subject: [PATCH 21/22] Use INFO level for spawning logs --- relayer/src/supervisor/spawn.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/relayer/src/supervisor/spawn.rs b/relayer/src/supervisor/spawn.rs index daa26ed743..d58469d719 100644 --- a/relayer/src/supervisor/spawn.rs +++ b/relayer/src/supervisor/spawn.rs @@ -1,4 +1,4 @@ -use tracing::{debug, error, trace}; +use tracing::{error, info}; use ibc::core::{ ics02_client::client_state::{ClientState, IdentifiedAnyClientState}, @@ -87,12 +87,12 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { client.clone(), connection_scan.connection, ) { - Ok(true) => debug!( + Ok(true) => info!( "done spawning workers for connection {} on chain {}", connection_id, chain.id(), ), - Ok(false) => debug!( + Ok(false) => info!( "no workers were spawn for connection {} on chain {}", connection_id, chain.id(), @@ -107,12 +107,12 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { for (channel_id, channel_scan) in connection_scan.channels { match self.spawn_workers_for_channel(chain.clone(), client, channel_scan) { - Ok(true) => debug!( + Ok(true) => info!( "done spawning workers for chain {} and channel {}", chain.id(), channel_id, ), - Ok(false) => debug!( + Ok(false) => info!( "no workers spawn for chain {} and channel {}", chain.id(), channel_id, @@ -143,7 +143,7 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { let conn_state_src = connection.connection_end.state; let conn_state_dst = connection_state_on_destination(&connection, &counterparty_chain)?; - debug!( + info!( "connection {} on chain {} is: {:?}, state on dest. chain ({}) is: {:?}", connection.connection_id, chain.id(), @@ -153,7 +153,7 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { ); if conn_state_src.is_open() && conn_state_dst.is_open() { - trace!( + info!( "connection {} on chain {} is already open, not spawning Connection worker", connection.connection_id, chain.id() @@ -174,7 +174,7 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { self.workers .spawn(chain, counterparty_chain, &connection_object, self.config) .then(|| { - debug!( + info!( "spawning Connection worker: {}", connection_object.short_name() ); @@ -207,7 +207,7 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { .as_ref() .map_or(ChannelState::Uninitialized, |c| c.channel_end.state); - debug!( + info!( "channel {} on chain {} is: {}; state on dest. chain ({}) is: {}", channel_scan.id(), chain.id(), @@ -235,7 +235,7 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { &client_object, self.config, ) - .then(|| debug!("spawned Client worker: {}", client_object.short_name())); + .then(|| info!("spawned Client worker: {}", client_object.short_name())); } if mode.packets.enabled { @@ -270,7 +270,7 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { &path_object, self.config, ) - .then(|| debug!("spawned Packet worker: {}", path_object.short_name())); + .then(|| info!("spawned Packet worker: {}", path_object.short_name())); } } @@ -289,7 +289,7 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { self.workers .spawn(chain, counterparty_chain, &channel_object, self.config) - .then(|| debug!("spawned Channel worker: {}", channel_object.short_name())); + .then(|| info!("spawned Channel worker: {}", channel_object.short_name())); Ok(true) } else { From b0eafcf7c010ea6644b58e5302842c44ff63b058 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 25 Jan 2022 17:48:22 +0100 Subject: [PATCH 22/22] Remove redundant changelog entry --- .../v0.10.0/improvements/ibc-relayer/1536-fast-start.md | 4 ---- 1 file changed, 4 deletions(-) delete mode 100644 .changelog/v0.10.0/improvements/ibc-relayer/1536-fast-start.md diff --git a/.changelog/v0.10.0/improvements/ibc-relayer/1536-fast-start.md b/.changelog/v0.10.0/improvements/ibc-relayer/1536-fast-start.md deleted file mode 100644 index 5a3639e116..0000000000 --- a/.changelog/v0.10.0/improvements/ibc-relayer/1536-fast-start.md +++ /dev/null @@ -1,4 +0,0 @@ -- Reduce the startup time of the relayer by querying the necessary - information directly from the chain when one is configured with an - allowlist, rather than scan for all clients, connections and channels - ([#1536](https://github.com/informalsystems/ibc-rs/issues/1536)) \ No newline at end of file