Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to specify which events to listen for in listen command #804

Merged
merged 1 commit into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
- Added `client-upgrade` CLI ([#357])
- Update gaia to version 4.1.0 for e2e tests on CI ([#702])
- Add `start-multi` command to relay on all paths defined in the configuration ([#748])
- Add option to specify which events to listen for in `listen` command ([#550])

### IMPROVEMENTS

Expand Down Expand Up @@ -71,6 +72,7 @@
[#357]: https://github.com/informalsystems/ibc-rs/issues/357
[#416]: https://github.com/informalsystems/ibc-rs/issues/416
[#561]: https://github.com/informalsystems/ibc-rs/issues/561
[#550]: https://github.com/informalsystems/ibc-rs/issues/550
[#599]: https://github.com/informalsystems/ibc-rs/issues/599
[#630]: https://github.com/informalsystems/ibc-rs/issues/630
[#672]: https://github.com/informalsystems/ibc-rs/issues/672
Expand Down
55 changes: 41 additions & 14 deletions relayer-cli/src/commands/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,42 @@ use std::{ops::Deref, sync::Arc, thread};

use abscissa_core::{application::fatal_error, error::BoxError, Command, Options, Runnable};
use crossbeam_channel as channel;
use itertools::Itertools;
use tokio::runtime::Runtime as TokioRuntime;

use tendermint_rpc::query::{EventType, Query};

use ibc::ics24_host::identifier::ChainId;
use ibc_relayer::{config::ChainConfig, event::monitor::*};

use crate::prelude::*;

#[derive(Command, Debug, Options)]
pub struct ListenCmd {
/// Identifier of the chain to listen for events from
#[options(free)]
chain_id: Option<ChainId>,
chain_id: ChainId,

/// Add an event type to listen for, can be repeated. Listen for all events by default (available: Tx, NewBlock)
#[options(short = "e", long = "event", meta = "EVENT")]
events: Vec<EventType>,
}

impl ListenCmd {
fn cmd(&self) -> Result<(), BoxError> {
let rt = Arc::new(TokioRuntime::new()?);
let config = app_config();

let chain_id = self.chain_id.clone().unwrap();
let chain_config = config.find_chain(&chain_id).unwrap();
let chain_config = config
.find_chain(&self.chain_id)
.ok_or_else(|| format!("chain '{}' not found in configuration", self.chain_id))?;

let events = if self.events.is_empty() {
&[EventType::Tx, EventType::NewBlock]
} else {
self.events.as_slice()
};

listen(rt, chain_config.clone())
listen(chain_config, events)
}
}

Expand All @@ -35,11 +49,18 @@ impl Runnable for ListenCmd {
}

/// Listen to events
pub fn listen(rt: Arc<TokioRuntime>, config: ChainConfig) -> Result<(), BoxError> {
info!(chain.id = %config.id, "spawning event monitor for");
pub fn listen(config: &ChainConfig, events: &[EventType]) -> Result<(), BoxError> {
println!(
"[info] Listening for events `{}` on '{}'...",
events.iter().format(", "),
config.id
);

let (event_monitor, rx) = subscribe(config, rt)?;
let _ = thread::spawn(|| event_monitor.run());
let rt = Arc::new(TokioRuntime::new()?);
let queries = events.iter().cloned().map(Query::from).collect();
let (event_monitor, rx) = subscribe(&config, queries, rt)?;

thread::spawn(|| event_monitor.run());

while let Ok(event_batch) = rx.recv() {
dbg!(event_batch);
Expand All @@ -49,16 +70,22 @@ pub fn listen(rt: Arc<TokioRuntime>, config: ChainConfig) -> Result<(), BoxError
}

fn subscribe(
chain_config: ChainConfig,
chain_config: &ChainConfig,
queries: Vec<Query>,
rt: Arc<TokioRuntime>,
) -> Result<(EventMonitor, channel::Receiver<EventBatch>), BoxError> {
let (mut event_monitor, rx) =
EventMonitor::new(chain_config.id, chain_config.websocket_addr, rt)
.map_err(|e| format!("couldn't initialize event monitor: {}", e))?;
let (mut event_monitor, rx) = EventMonitor::new(
chain_config.id.clone(),
chain_config.websocket_addr.clone(),
rt,
)
.map_err(|e| format!("could not initialize event monitor: {}", e))?;

event_monitor.set_queries(queries);

event_monitor
.subscribe()
.map_err(|e| format!("couldn't initialize subscriptions: {}", e))?;
.map_err(|e| format!("could not initialize subscriptions: {}", e))?;

Ok((event_monitor, rx))
}
26 changes: 25 additions & 1 deletion relayer/src/event/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,16 @@ impl EventBatch {
type SubscriptionResult = Result<tendermint_rpc::event::Event, tendermint_rpc::Error>;
type SubscriptionStream = dyn Stream<Item = SubscriptionResult> + Send + Sync + Unpin;

/// Connect to a TM node, receive push events over a websocket and filter them for the
/// Connect to a Tendermint node, subscribe to a set of queries,
/// receive push events over a websocket, and filter them for the
/// event handler.
///
/// The default events that are queried are:
/// - [`EventType::NewBlock`]
/// - [`EventType::Tx`]
///
/// Those can be extending or overriden using
/// [`EventMonitor::add_query`] and [`EventMonitor::set_queries`].
pub struct EventMonitor {
chain_id: ChainId,
/// WebSocket to collect events from
Expand Down Expand Up @@ -87,6 +95,22 @@ impl EventMonitor {
Ok((monitor, rx))
}

/// Set the queries to subscribe to.
///
/// ## Note
/// For this change to take effect, one has to [`subscribe`] again.
pub fn set_queries(&mut self, queries: Vec<Query>) {
self.event_queries = queries;
}

/// Add a new query to subscribe to.
///
/// ## Note
/// For this change to take effect, one has to [`subscribe`] again.
pub fn add_query(&mut self, query: Query) {
self.event_queries.push(query);
}

/// Clear the current subscriptions, and subscribe again to all queries.
pub fn subscribe(&mut self) -> Result<(), BoxError> {
let mut subscriptions = vec![];
Expand Down