Skip to content

Commit

Permalink
Enable listen command with pull event source (#3502)
Browse files Browse the repository at this point in the history
* Enable `listen` command with `pull` event source

* Add changelog entry

* Apply suggestions from code review

Co-authored-by: Luca Joss <43531661+ljoss17@users.noreply.github.com>
Signed-off-by: Romain Ruetschi <github@romac.me>

---------

Signed-off-by: Romain Ruetschi <github@romac.me>
Co-authored-by: Luca Joss <43531661+ljoss17@users.noreply.github.com>
  • Loading branch information
romac and ljoss17 authored Aug 17, 2023
1 parent 90b270d commit d9813d1
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- The `listen` command now works with both `push` and `pull` event sources
([\#3501](https://github.com/informalsystems/hermes/issues/3501))
44 changes: 20 additions & 24 deletions crates/relayer-cli/src/commands/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tracing::{error, info, instrument};
use ibc_relayer::{
chain::handle::Subscription,
config::{ChainConfig, EventSourceMode},
event::source::websocket::EventSource,
event::source::EventSource,
};
use ibc_relayer_types::{core::ics24_host::identifier::ChainId, events::IbcEvent};

Expand Down Expand Up @@ -144,29 +144,25 @@ fn subscribe(
compat_mode: CompatMode,
rt: Arc<TokioRuntime>,
) -> eyre::Result<Subscription> {
let EventSourceMode::Push { url, batch_delay } = &chain_config.event_source else {
return Err(eyre!("unsupported event source mode, only 'push' is supported for listening to events"));
};

let (mut event_source, tx_cmd) = EventSource::new(
chain_config.id.clone(),
url.clone(),
compat_mode,
*batch_delay,
rt,
)
.map_err(|e| eyre!("could not initialize event source: {}", e))?;

event_source
.init_subscriptions()
.map_err(|e| eyre!("could not initialize subscriptions: {}", e))?;

let queries = event_source.queries();
info!("listening for queries: {}", queries.iter().format(", "),);

thread::spawn(|| event_source.run());

let subscription = tx_cmd.subscribe()?;
let (event_source, monitor_tx) = match &chain_config.event_source {
EventSourceMode::Push { url, batch_delay } => EventSource::websocket(
chain_config.id.clone(),
url.clone(),
compat_mode,
*batch_delay,
rt,
),
EventSourceMode::Pull { interval } => EventSource::rpc(
chain_config.id.clone(),
HttpClient::new(chain_config.rpc_addr.clone())?,
*interval,
rt,
),
}?;

thread::spawn(move || event_source.run());

let subscription = monitor_tx.subscribe()?;
Ok(subscription)
}

Expand Down

0 comments on commit d9813d1

Please sign in to comment.