diff --git a/CHANGELOG.md b/CHANGELOG.md index dd7ee92a5d..8f16c20a87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ - Added `client-upgrade` CLI ([#357]) - Update gaia to version 4.1.0 for e2e tests on CI ([#702]) - Add `start-multi` command to relay on all paths defined in the configuration ([#748]) + - Add option to specify which events to listen for in `listen` command ([#550]) ### IMPROVEMENTS @@ -71,6 +72,7 @@ [#357]: https://github.com/informalsystems/ibc-rs/issues/357 [#416]: https://github.com/informalsystems/ibc-rs/issues/416 [#561]: https://github.com/informalsystems/ibc-rs/issues/561 +[#550]: https://github.com/informalsystems/ibc-rs/issues/550 [#599]: https://github.com/informalsystems/ibc-rs/issues/599 [#630]: https://github.com/informalsystems/ibc-rs/issues/630 [#672]: https://github.com/informalsystems/ibc-rs/issues/672 diff --git a/relayer-cli/src/commands/listen.rs b/relayer-cli/src/commands/listen.rs index 26070e55db..96400f0e8b 100644 --- a/relayer-cli/src/commands/listen.rs +++ b/relayer-cli/src/commands/listen.rs @@ -2,8 +2,11 @@ use std::{ops::Deref, sync::Arc, thread}; use abscissa_core::{application::fatal_error, error::BoxError, Command, Options, Runnable}; use crossbeam_channel as channel; +use itertools::Itertools; use tokio::runtime::Runtime as TokioRuntime; +use tendermint_rpc::query::{EventType, Query}; + use ibc::ics24_host::identifier::ChainId; use ibc_relayer::{config::ChainConfig, event::monitor::*}; @@ -11,19 +14,30 @@ use crate::prelude::*; #[derive(Command, Debug, Options)] pub struct ListenCmd { + /// Identifier of the chain to listen for events from #[options(free)] - chain_id: Option, + chain_id: ChainId, + + /// Add an event type to listen for, can be repeated. Listen for all events by default (available: Tx, NewBlock) + #[options(short = "e", long = "event", meta = "EVENT")] + events: Vec, } impl ListenCmd { fn cmd(&self) -> Result<(), BoxError> { - let rt = Arc::new(TokioRuntime::new()?); let config = app_config(); - let chain_id = self.chain_id.clone().unwrap(); - let chain_config = config.find_chain(&chain_id).unwrap(); + let chain_config = config + .find_chain(&self.chain_id) + .ok_or_else(|| format!("chain '{}' not found in configuration", self.chain_id))?; + + let events = if self.events.is_empty() { + &[EventType::Tx, EventType::NewBlock] + } else { + self.events.as_slice() + }; - listen(rt, chain_config.clone()) + listen(chain_config, events) } } @@ -35,11 +49,18 @@ impl Runnable for ListenCmd { } /// Listen to events -pub fn listen(rt: Arc, config: ChainConfig) -> Result<(), BoxError> { - info!(chain.id = %config.id, "spawning event monitor for"); +pub fn listen(config: &ChainConfig, events: &[EventType]) -> Result<(), BoxError> { + println!( + "[info] Listening for events `{}` on '{}'...", + events.iter().format(", "), + config.id + ); - let (event_monitor, rx) = subscribe(config, rt)?; - let _ = thread::spawn(|| event_monitor.run()); + let rt = Arc::new(TokioRuntime::new()?); + let queries = events.iter().cloned().map(Query::from).collect(); + let (event_monitor, rx) = subscribe(&config, queries, rt)?; + + thread::spawn(|| event_monitor.run()); while let Ok(event_batch) = rx.recv() { dbg!(event_batch); @@ -49,16 +70,22 @@ pub fn listen(rt: Arc, config: ChainConfig) -> Result<(), BoxError } fn subscribe( - chain_config: ChainConfig, + chain_config: &ChainConfig, + queries: Vec, rt: Arc, ) -> Result<(EventMonitor, channel::Receiver), BoxError> { - let (mut event_monitor, rx) = - EventMonitor::new(chain_config.id, chain_config.websocket_addr, rt) - .map_err(|e| format!("couldn't initialize event monitor: {}", e))?; + let (mut event_monitor, rx) = EventMonitor::new( + chain_config.id.clone(), + chain_config.websocket_addr.clone(), + rt, + ) + .map_err(|e| format!("could not initialize event monitor: {}", e))?; + + event_monitor.set_queries(queries); event_monitor .subscribe() - .map_err(|e| format!("couldn't initialize subscriptions: {}", e))?; + .map_err(|e| format!("could not initialize subscriptions: {}", e))?; Ok((event_monitor, rx)) } diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 2f016b749c..1d2286e138 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -32,8 +32,16 @@ impl EventBatch { type SubscriptionResult = Result; type SubscriptionStream = dyn Stream + Send + Sync + Unpin; -/// Connect to a TM node, receive push events over a websocket and filter them for the +/// Connect to a Tendermint node, subscribe to a set of queries, +/// receive push events over a websocket, and filter them for the /// event handler. +/// +/// The default events that are queried are: +/// - [`EventType::NewBlock`] +/// - [`EventType::Tx`] +/// +/// Those can be extending or overriden using +/// [`EventMonitor::add_query`] and [`EventMonitor::set_queries`]. pub struct EventMonitor { chain_id: ChainId, /// WebSocket to collect events from @@ -87,6 +95,22 @@ impl EventMonitor { Ok((monitor, rx)) } + /// Set the queries to subscribe to. + /// + /// ## Note + /// For this change to take effect, one has to [`subscribe`] again. + pub fn set_queries(&mut self, queries: Vec) { + self.event_queries = queries; + } + + /// Add a new query to subscribe to. + /// + /// ## Note + /// For this change to take effect, one has to [`subscribe`] again. + pub fn add_query(&mut self, query: Query) { + self.event_queries.push(query); + } + /// Clear the current subscriptions, and subscribe again to all queries. pub fn subscribe(&mut self) -> Result<(), BoxError> { let mut subscriptions = vec![];