Skip to content

Commit

Permalink
Implement basic "allow" filtering mechanism based on channel identifi…
Browse files Browse the repository at this point in the history
…ers (#1107)

* temp change for ft-transfer to send one msg/Tx

* event monitor: Bulk events from all transactions included in a block

* Update changelog

* temp change for ft-transfer to send one msg/Tx

* Optimize spawning of workers - draft

* Add back check to start workers only if channel is open

* Cleanup

* Check connection state

* temp change for ft-transfer to send one msg/Tx

* Improve config loading message (#933)

* Improve config load message

* Raised log level to error. Log link to config example in the guide.

* Changelog

Co-authored-by: Adi Seredinschi <adi@informal.systems>

* Migration to tx sync

* Add Tx simulate

* Add adjustment to the tx simulate result, some cleanup

* Nitpick in CosmosSdkChain::key_and_bytes

* Small cleanup

* Remove duplicate send_msgs method

* Cleanup config file

* Fix typo after refactoring

* Fix `query packet tx` description

* Rework `wait_for_block_commits` to use the `retry` crate

* Compute tx fee based on gas from tx simulate and gas price

* Re-add missing error type

* Combine `fee_denom` and `gas_price` into `gas_price` config option

* Add tests for `mul_ceil`

* Fix config serialization

* Remove `fee_amount` config property

* Update changelog

* Avoid op data regeneration if retries exhausted.

* Increase the number of retries while checking Tx is included in a block.

* Move `query packet tx` to `query tx events`

* better error msgs

* Add Display instance for GasPrice

* Fix default gas price denomination

* Improve some debug messages

* Rename `gas_price.amount` to `gase_price.price`

* Add configurable fee adjustment to make it more likely gas estimate ends up being enough

* Add Tx hash to ChainErr

* Fix config files

* Masked tonic::Code::NotFound result for query_client_connections.

* Modified cfg option from gas to max_gas

* Consistent trust_threshold in default config.toml

* Revert guide updates

* Nit: Imports for query.rs

* Print info message when Hermes starts

* Implement basic filtering based on channel identifiers

* Add per chain filters, only channel based filtering support

* Fix gas adjustement to be percentage on top of computed gas

* Attempt to fix gas_limit

* Fix chain spawn unrwap for QueryUnreceivedPacketsCmd

* changelog

Co-authored-by: Anca Zamfir <zamfiranca@gmail.com>
Co-authored-by: Adi Seredinschi <adi@informal.systems>
  • Loading branch information
3 people committed Jul 1, 2021
1 parent d41e725 commit c509f88
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 12 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- [ibc-relayer-cli]
- Added `config validate` CLI to Hermes ([#600])
- Added basic channel filter ([#1140])

### IMPROVEMENTS

Expand All @@ -15,6 +16,7 @@
[#600]: https://github.com/informalsystems/ibc-rs/issues/600
[#1125]: https://github.com/informalsystems/ibc-rs/issues/1125
[#1127]: https://github.com/informalsystems/ibc-rs/issues/1127
[#1140]: https://github.com/informalsystems/ibc-rs/issues/1140


## v0.5.0
Expand Down
8 changes: 8 additions & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
# - 'packets': Relay packets only.
strategy = 'packets'

# Enable or disable the filtering mechanism. Default: 'false'
# Valid options are 'true', 'false'.
filter = true

# Specify the verbosity for the relayer logging output. Default: 'info'
# Valid options are 'error', 'warn', 'info', 'debug', 'trace'.
log_level = 'info'
Expand Down Expand Up @@ -95,6 +99,8 @@ trusting_period = '14days'
# which have changed between two blocks.
# Default: { numerator = '1', denominator = '3' }, ie. 1/3.
trust_threshold = { numerator = '1', denominator = '3' }
[chains.filters]
channels = [['transfer', 'channel-0']]

[[chains]]
id = 'ibc-1'
Expand All @@ -113,3 +119,5 @@ max_tx_size = 2097152
clock_drift = '5s'
trusting_period = '14days'
trust_threshold = { numerator = '1', denominator = '3' }
[chains.filters]
channels = [['transfer', 'channel-0']]
25 changes: 21 additions & 4 deletions relayer-cli/src/commands/query/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,16 @@ impl Runnable for QueryUnreceivedPacketsCmd {
};

let rt = Arc::new(TokioRuntime::new().unwrap());
let (chain, _) =
ChainRuntime::<CosmosSdkChain>::spawn(chain_config.clone(), rt.clone()).unwrap();
let chain = match ChainRuntime::<CosmosSdkChain>::spawn(chain_config.clone(), rt.clone()) {
Ok((chain, _)) => chain,
Err(e) => {
return Output::error(format!(
"error when spawning the chain runtime for {}: {}",
chain_config.id, e,
))
.exit();
}
};

let channel_connection_client =
match channel_connection_client(chain.as_ref(), &self.port_id, &self.channel_id) {
Expand Down Expand Up @@ -208,8 +216,17 @@ impl Runnable for QueryUnreceivedPacketsCmd {
Some(chain_config) => chain_config,
};

let (counterparty_chain, _) =
ChainRuntime::<CosmosSdkChain>::spawn(counterparty_chain_config.clone(), rt).unwrap();
let counterparty_chain =
match ChainRuntime::<CosmosSdkChain>::spawn(counterparty_chain_config.clone(), rt) {
Ok((chain, _)) => chain,
Err(e) => {
return Output::error(format!(
"error when spawning the chain runtime for {}: {}",
chain_config.id, e,
))
.exit();
}
};

// get the packet commitments on the counterparty/ source chain
let commitments_request = QueryPacketCommitmentsRequest {
Expand Down
2 changes: 0 additions & 2 deletions relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ pub struct CosmosSdkChain {
grpc_addr: Uri,
rt: Arc<TokioRuntime>,
keybase: KeyRing,

/// A cached copy of the account information
account: Option<BaseAccount>,
}
Expand Down Expand Up @@ -166,7 +165,6 @@ impl CosmosSdkChain {

fn send_tx(&mut self, proto_msgs: Vec<Any>) -> Result<Response, Error> {
crate::time!("send_tx");

let account_seq = self.account_sequence()?;

debug!(
Expand Down
6 changes: 5 additions & 1 deletion relayer/src/chain/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,12 +385,13 @@ impl Chain for MockChain {
// For integration tests with the modules
#[cfg(test)]
pub mod test_utils {
use std::collections::HashSet;
use std::str::FromStr;
use std::time::Duration;

use ibc::ics24_host::identifier::ChainId;

use crate::config::{ChainConfig, GasPrice};
use crate::config::{ChainConfig, ChainFilters, GasPrice};

/// Returns a very minimal chain configuration, to be used in initializing `MockChain`s.
pub fn get_basic_chain_config(id: &str) -> ChainConfig {
Expand All @@ -411,6 +412,9 @@ pub mod test_utils {
clock_drift: Duration::from_secs(5),
trusting_period: Duration::from_secs(14 * 24 * 60 * 60), // 14 days
trust_threshold: Default::default(),
filters: ChainFilters {
channels: HashSet::new(),
},
}
}
}
21 changes: 20 additions & 1 deletion relayer/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! Relayer configuration

use std::collections::HashSet;
use std::{fmt, fs, fs::File, io::Write, path::Path, time::Duration};

use serde_derive::{Deserialize, Serialize};
use tendermint_light_client::types::TrustThreshold;

use ibc::ics24_host::identifier::ChainId;
use ibc::ics24_host::identifier::{ChainId, ChannelId, PortId};
use ibc::timestamp::ZERO_DURATION;

use crate::error;
Expand All @@ -28,6 +29,19 @@ impl fmt::Display for GasPrice {
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChainFilters {
pub channels: HashSet<(PortId, ChannelId)>,
}

impl Default for ChainFilters {
fn default() -> Self {
Self {
channels: HashSet::new(),
}
}
}

/// Defaults for various fields
pub mod default {
use super::*;
Expand Down Expand Up @@ -120,13 +134,16 @@ impl fmt::Display for LogLevel {
#[serde(default, deny_unknown_fields)]
pub struct GlobalConfig {
pub strategy: Strategy,
#[serde(default)]
pub filter: bool,
pub log_level: LogLevel,
}

impl Default for GlobalConfig {
fn default() -> Self {
Self {
strategy: Strategy::default(),
filter: false,
log_level: LogLevel::default(),
}
}
Expand Down Expand Up @@ -175,6 +192,8 @@ pub struct ChainConfig {
#[serde(default)]
pub trust_threshold: TrustThreshold,
pub gas_price: GasPrice,
#[serde(default)]
pub filters: ChainFilters,
}

/// Attempt to load and parse the TOML config file as a `Config`.
Expand Down
12 changes: 12 additions & 0 deletions relayer/src/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ impl Channel {
self.src_channel_id, self.src_port_id, self.src_chain_id, self.dst_chain_id,
)
}
pub fn src_port_id(&self) -> &PortId {
&self.src_port_id
}
pub fn src_channel_id(&self) -> &ChannelId {
&self.src_channel_id
}
}

/// A unidirectional path from a source chain, channel and port.
Expand All @@ -112,6 +118,12 @@ impl UnidirectionalChannelPath {
self.src_channel_id, self.src_port_id, self.src_chain_id, self.dst_chain_id,
)
}
pub fn src_port_id(&self) -> &PortId {
&self.src_port_id
}
pub fn src_channel_id(&self) -> &ChannelId {
&self.src_channel_id
}
}

/// An object determines the amount of parallelism that can
Expand Down
68 changes: 64 additions & 4 deletions relayer/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
use anomaly::BoxError;
use crossbeam_channel::Receiver;
use itertools::Itertools;
use tracing::{debug, error, warn};
use tracing::{debug, error, info, warn};

use ibc::{
events::IbcEvent,
ics02_client::client_state::{ClientState, IdentifiedAnyClientState},
ics03_connection::connection::{IdentifiedConnectionEnd, State as ConnectionState},
ics04_channel::channel::IdentifiedChannelEnd,
ics24_host::identifier::ChainId,
ics24_host::identifier::{ChainId, ChannelId},
Height,
};
use ibc_proto::ibc::core::{
Expand All @@ -35,6 +35,7 @@ use crate::{

mod error;
pub use error::Error;
use ibc::ics24_host::identifier::PortId;

/// The supervisor listens for events on multiple pairs of chains,
/// and dispatches the events it receives to the appropriate
Expand Down Expand Up @@ -69,6 +70,41 @@ impl Supervisor {
self.config.global.strategy == Strategy::HandshakeAndPackets
}

fn relay_on_channel(
&self,
chain_id: &ChainId,
port_id: &PortId,
channel_id: &ChannelId,
) -> bool {
!self.config.global.filter
|| self.config.find_chain(chain_id).map_or_else(
|| false,
|chain_config| {
chain_config
.filters
.channels
.contains(&(port_id.clone(), channel_id.clone()))
},
)
}

fn relay_on_object(&self, chain_id: &ChainId, object: &Object) -> bool {
if !self.config.global.filter {
return true;
}

match object {
Object::Client(_) => true,
Object::Channel(c) => {
self.relay_on_channel(chain_id, c.src_port_id(), c.src_channel_id())
}
Object::UnidirectionalChannelPath(u) => {
self.relay_on_channel(chain_id, u.src_port_id(), &u.src_channel_id())
}
Object::Connection(_) => true,
}
}

/// Collect the events we are interested in from an [`EventBatch`],
/// and maps each [`IbcEvent`] to their corresponding [`Object`].
pub fn collect_events(
Expand Down Expand Up @@ -240,6 +276,7 @@ impl Supervisor {
continue;
}
};

for connection_id in client_connections {
let connection_end =
match chain.query_connection(&connection_id, Height::zero()) {
Expand Down Expand Up @@ -324,12 +361,25 @@ impl Supervisor {
IdentifiedConnectionEnd::new(connection_id.clone(), connection_end);

for channel in connection_channels {
match self.spawn_workers_for_channel(
if !self.relay_on_channel(&chain_id, &channel.port_id, &channel.channel_id)
{
info!(
"skipping workers for chain {} and channel {}. \
reason: filtering is enabled and channel does not match any enabled channels",
chain.id(), &channel.channel_id
);

continue;
}

let spawn_result = self.spawn_workers_for_channel(
chain.clone(),
client.clone(),
connection.clone(),
channel.clone(),
) {
);

match spawn_result {
Ok(()) => debug!(
"done spawning workers for chain {} and channel {}",
chain.id(),
Expand Down Expand Up @@ -546,6 +596,16 @@ impl Supervisor {
let mut collected = self.collect_events(src_chain.clone().as_ref(), batch);

for (object, events) in collected.per_object.drain() {
if !self.relay_on_object(&src_chain.id(), &object) {
info!(
"skipping events for '{}'. \
reason: filtering is enabled and channel does not match any enabled channels",
object.short_name()
);

continue;
}

if events.is_empty() {
continue;
}
Expand Down

0 comments on commit c509f88

Please sign in to comment.