Skip to content

Commit

Permalink
Add option to specify which events to listen for in listen command (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed Apr 7, 2021
1 parent cabefe3 commit 126ffe6
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 15 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
- Added `client-upgrade` CLI ([#357])
- Update gaia to version 4.1.0 for e2e tests on CI ([#702])
- Add `start-multi` command to relay on all paths defined in the configuration ([#748])
- Add option to specify which events to listen for in `listen` command ([#550])

### IMPROVEMENTS

Expand Down Expand Up @@ -74,6 +75,7 @@
[#357]: https://github.com/informalsystems/ibc-rs/issues/357
[#416]: https://github.com/informalsystems/ibc-rs/issues/416
[#561]: https://github.com/informalsystems/ibc-rs/issues/561
[#550]: https://github.com/informalsystems/ibc-rs/issues/550
[#599]: https://github.com/informalsystems/ibc-rs/issues/599
[#630]: https://github.com/informalsystems/ibc-rs/issues/630
[#672]: https://github.com/informalsystems/ibc-rs/issues/672
Expand Down
55 changes: 41 additions & 14 deletions relayer-cli/src/commands/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,42 @@ use std::{ops::Deref, sync::Arc, thread};

use abscissa_core::{application::fatal_error, error::BoxError, Command, Options, Runnable};
use crossbeam_channel as channel;
use itertools::Itertools;
use tokio::runtime::Runtime as TokioRuntime;

use tendermint_rpc::query::{EventType, Query};

use ibc::ics24_host::identifier::ChainId;
use ibc_relayer::{config::ChainConfig, event::monitor::*};

use crate::prelude::*;

#[derive(Command, Debug, Options)]
pub struct ListenCmd {
/// Identifier of the chain to listen for events from
#[options(free)]
chain_id: Option<ChainId>,
chain_id: ChainId,

/// Add an event type to listen for, can be repeated. Listen for all events by default (available: Tx, NewBlock)
#[options(short = "e", long = "event", meta = "EVENT")]
events: Vec<EventType>,
}

impl ListenCmd {
fn cmd(&self) -> Result<(), BoxError> {
let rt = Arc::new(TokioRuntime::new()?);
let config = app_config();

let chain_id = self.chain_id.clone().unwrap();
let chain_config = config.find_chain(&chain_id).unwrap();
let chain_config = config
.find_chain(&self.chain_id)
.ok_or_else(|| format!("chain '{}' not found in configuration", self.chain_id))?;

let events = if self.events.is_empty() {
&[EventType::Tx, EventType::NewBlock]
} else {
self.events.as_slice()
};

listen(rt, chain_config.clone())
listen(chain_config, events)
}
}

Expand All @@ -35,11 +49,18 @@ impl Runnable for ListenCmd {
}

/// Listen to events
pub fn listen(rt: Arc<TokioRuntime>, config: ChainConfig) -> Result<(), BoxError> {
info!(chain.id = %config.id, "spawning event monitor for");
pub fn listen(config: &ChainConfig, events: &[EventType]) -> Result<(), BoxError> {
println!(
"[info] Listening for events `{}` on '{}'...",
events.iter().format(", "),
config.id
);

let (event_monitor, rx) = subscribe(config, rt)?;
let _ = thread::spawn(|| event_monitor.run());
let rt = Arc::new(TokioRuntime::new()?);
let queries = events.iter().cloned().map(Query::from).collect();
let (event_monitor, rx) = subscribe(&config, queries, rt)?;

thread::spawn(|| event_monitor.run());

while let Ok(event_batch) = rx.recv() {
dbg!(event_batch);
Expand All @@ -49,16 +70,22 @@ pub fn listen(rt: Arc<TokioRuntime>, config: ChainConfig) -> Result<(), BoxError
}

fn subscribe(
chain_config: ChainConfig,
chain_config: &ChainConfig,
queries: Vec<Query>,
rt: Arc<TokioRuntime>,
) -> Result<(EventMonitor, channel::Receiver<EventBatch>), BoxError> {
let (mut event_monitor, rx) =
EventMonitor::new(chain_config.id, chain_config.websocket_addr, rt)
.map_err(|e| format!("couldn't initialize event monitor: {}", e))?;
let (mut event_monitor, rx) = EventMonitor::new(
chain_config.id.clone(),
chain_config.websocket_addr.clone(),
rt,
)
.map_err(|e| format!("could not initialize event monitor: {}", e))?;

event_monitor.set_queries(queries);

event_monitor
.subscribe()
.map_err(|e| format!("couldn't initialize subscriptions: {}", e))?;
.map_err(|e| format!("could not initialize subscriptions: {}", e))?;

Ok((event_monitor, rx))
}
26 changes: 25 additions & 1 deletion relayer/src/event/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,16 @@ impl EventBatch {
type SubscriptionResult = Result<tendermint_rpc::event::Event, tendermint_rpc::Error>;
type SubscriptionStream = dyn Stream<Item = SubscriptionResult> + Send + Sync + Unpin;

/// Connect to a TM node, receive push events over a websocket and filter them for the
/// Connect to a Tendermint node, subscribe to a set of queries,
/// receive push events over a websocket, and filter them for the
/// event handler.
///
/// The default events that are queried are:
/// - [`EventType::NewBlock`]
/// - [`EventType::Tx`]
///
/// Those can be extending or overriden using
/// [`EventMonitor::add_query`] and [`EventMonitor::set_queries`].
pub struct EventMonitor {
chain_id: ChainId,
/// WebSocket to collect events from
Expand Down Expand Up @@ -87,6 +95,22 @@ impl EventMonitor {
Ok((monitor, rx))
}

/// Set the queries to subscribe to.
///
/// ## Note
/// For this change to take effect, one has to [`subscribe`] again.
pub fn set_queries(&mut self, queries: Vec<Query>) {
self.event_queries = queries;
}

/// Add a new query to subscribe to.
///
/// ## Note
/// For this change to take effect, one has to [`subscribe`] again.
pub fn add_query(&mut self, query: Query) {
self.event_queries.push(query);
}

/// Clear the current subscriptions, and subscribe again to all queries.
pub fn subscribe(&mut self) -> Result<(), BoxError> {
let mut subscriptions = vec![];
Expand Down

0 comments on commit 126ffe6

Please sign in to comment.