Skip to content

Commit

Permalink
From IbcEvent to IbcEventWithHash
Browse files Browse the repository at this point in the history
  • Loading branch information
adizere committed Jul 17, 2021
1 parent e6d2336 commit b63335b
Show file tree
Hide file tree
Showing 14 changed files with 132 additions and 52 deletions.
2 changes: 1 addition & 1 deletion relayer-cli/src/commands/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pub fn listen(config: &ChainConfig, filters: &[EventFilter]) -> Result<(), BoxEr
let matching_events = batch
.events
.into_iter()
.filter(|e| event_match(e, filters))
.filter(|e| event_match(&e.event, filters))
.collect_vec();

if matching_events.is_empty() {
Expand Down
4 changes: 2 additions & 2 deletions relayer-cli/src/commands/misbehaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub fn monitor_misbehaviour(
match event_batch {
Ok(event_batch) => {
for event in event_batch.events {
match event {
match event.event {
IbcEvent::UpdateClient(update) => {
debug!("{:?}", update);
misbehaviour_handling(
Expand All @@ -78,7 +78,7 @@ pub fn monitor_misbehaviour(

IbcEvent::ClientMisbehaviour(ref _misbehaviour) => {
// TODO - submit misbehaviour to the witnesses (our full node)
return Ok(Some(event));
return Ok(Some(event.event));
}

_ => {}
Expand Down
2 changes: 1 addition & 1 deletion relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ impl CosmosSdkChain {
.map(|res| res.response.hash.to_string())
.join(", ");

debug!("[{}] waiting for commit of block(s) {}", self.id(), hashes);
warn!("[{}] waiting for commit of tx hashes(s) {}", self.id(), hashes);

// Wait a little bit initially
thread::sleep(Duration::from_millis(200));
Expand Down
22 changes: 18 additions & 4 deletions relayer/src/event/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use crate::util::{
retry::{retry_count, retry_with_index, RetryResult},
stream::group_while,
};
use itertools::Itertools;
use crate::event::rpc::IbcEventWithHash;

mod retry_strategy {
use crate::util::retry::clamp_total;
Expand Down Expand Up @@ -72,7 +74,7 @@ pub enum Error {
pub struct EventBatch {
pub chain_id: ChainId,
pub height: Height,
pub events: Vec<IbcEvent>,
pub events: Vec<IbcEventWithHash>,
}

pub trait UnwrapOrClone {
Expand Down Expand Up @@ -375,8 +377,16 @@ impl EventMonitor {
}

/// Collect the IBC events from an RPC event
fn collect_events(chain_id: &ChainId, event: RpcEvent) -> impl Stream<Item = (Height, IbcEvent)> {
fn collect_events(chain_id: &ChainId, event: RpcEvent) -> impl Stream<Item = (Height, IbcEventWithHash)> {
let q = event.query.clone();
let events = crate::event::rpc::get_all_events(chain_id, event).unwrap_or_default();

if chain_id.as_str() == "ibc-1" {
let s = events.iter().map(|(_, e)| e).join(",");
let hs = events.iter().map(|(h, _)| h).join(",");
info!("\t [0--monitor@{}] event.query {} -> vals length: {}, summary: {} <> {}", chain_id, q, events.len(), s, hs);
}

stream::iter(events)
}

Expand Down Expand Up @@ -406,6 +416,10 @@ fn stream_batches(
let mut events = events.into_iter().map(|(_, e)| e).collect();
sort_events(&mut events);

if chain_id.as_str() == "ibc-1" {
info!("\t [1--monitor@{}] event batch: {:?}", chain_id, events);
}

EventBatch {
height,
events,
Expand All @@ -416,8 +430,8 @@ fn stream_batches(

/// Sort the given events by putting the NewBlock event first,
/// and leaving the other events as is.
fn sort_events(events: &mut Vec<IbcEvent>) {
events.sort_by(|a, b| match (a, b) {
fn sort_events(events: &mut Vec<IbcEventWithHash>) {
events.sort_by(|a, b| match (&a.event, &b.event) {
(IbcEvent::NewBlock(_), _) => Ordering::Less,
_ => Ordering::Equal,
})
Expand Down
44 changes: 41 additions & 3 deletions relayer/src/event/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::{collections::HashMap, convert::TryFrom};

use anomaly::BoxError;
use tendermint_rpc::event::{Event as RpcEvent, EventData as RpcEventData};
use tracing::info;
use itertools::Itertools;

use ibc::ics02_client::events::NewBlock;
use ibc::ics02_client::height::Height;
Expand All @@ -13,14 +15,37 @@ use ibc::{
ics04_channel::events as ChannelEvents,
};

#[derive(Debug, Clone)]
pub struct IbcEventWithHash {
pub event: IbcEvent,
pub tx_hash: Option<String>
}

impl From<NewBlock> for IbcEventWithHash {
fn from(nb: NewBlock) -> Self {
Self {
event: nb.into(), // from `NewBlock` into `IbcEvent::NewBlock`
tx_hash: None // no hash associated here
}
}
}

impl std::fmt::Display for IbcEventWithHash {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}%{:?}", self.event, self.tx_hash)
}
}

pub fn get_all_events(
chain_id: &ChainId,
result: RpcEvent,
) -> Result<Vec<(Height, IbcEvent)>, String> {
let mut vals: Vec<(Height, IbcEvent)> = vec![];
) -> Result<Vec<(Height, IbcEventWithHash)>, String> {
let mut vals: Vec<(Height, IbcEventWithHash)> = vec![];

match &result.data {
RpcEventData::NewBlock { block, .. } => {
// info!("\t newblock looks like this: {:?}", result);

let height = Height::new(
ChainId::chain_version(chain_id.to_string().as_str()),
u64::from(block.as_ref().ok_or("tx.height")?.header.height),
Expand All @@ -38,16 +63,29 @@ pub fn get_all_events(
ChainId::chain_version(chain_id.to_string().as_str()),
height_raw,
);
let pre_tx_hash = events.get("tx.hash");
if let Some(hs) = pre_tx_hash {
let d = hs.iter().join(",");
info!("\t\t\t *** found tx hash(es) {}", d);
info!("DUMP: {:#?}", events);
}
let tx_hash_raw = pre_tx_hash.map(|hv| hv.first()).flatten().cloned();

let actions_and_indices = extract_helper(events)?;
// info!("\t actions & indices: {:?}", actions_and_indices);
// info!("\t events: {:?}", events);
for action in actions_and_indices {
if let Ok(event) = build_event(RawObject::new(
height,
action.0,
action.1 as usize,
events.clone(),
)) {
vals.push((height, event));

vals.push((height, IbcEventWithHash {
event,
tx_hash: tx_hash_raw.clone(),
}));
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions relayer/src/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod error;
mod operational_data;
mod relay_path;
mod relay_summary;
mod relay_sender;

// Re-export the telemetries summary
pub use relay_summary::RelaySummary;
Expand Down Expand Up @@ -169,6 +170,7 @@ impl Link {
Ok(Link::new(channel))
}

/// Implements the `packet-recv` CLI
pub fn build_and_send_recv_packet_messages(&mut self) -> Result<Vec<IbcEvent>, LinkError> {
self.a_to_b.build_recv_packet_and_timeout_msgs(None)?;

Expand All @@ -183,6 +185,7 @@ impl Link {
Ok(results)
}

/// Implements the `packet-ack` CLI
pub fn build_and_send_ack_packet_messages(&mut self) -> Result<Vec<IbcEvent>, LinkError> {
self.a_to_b.build_packet_ack_msgs(None)?;

Expand Down
10 changes: 8 additions & 2 deletions relayer/src/link/operational_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use ibc::Height;

use crate::link::error::LinkError;
use crate::link::RelayPath;
// use crate::link::relay_sender;

#[derive(Clone, Copy, PartialEq)]
pub enum OperationalDataTarget {
Expand Down Expand Up @@ -64,8 +65,9 @@ impl OperationalData {
self.batch.iter().map(|gm| gm.event.clone()).collect()
}

/// Returns all the messages in this operational data, plus prepending the client update message
/// if necessary.
/// Returns all the messages in this operational data,
/// also prepending the client update message if
/// necessary.
pub fn assemble_msgs(&self, relay_path: &RelayPath) -> Result<Vec<Any>, LinkError> {
if self.batch.is_empty() {
warn!("assemble_msgs() method call on an empty OperationalData!");
Expand Down Expand Up @@ -107,6 +109,10 @@ impl OperationalData {

Ok(msgs)
}

// pub fn into_sender(self, _mode: relay_sender::Mode) -> Box<dyn relay_sender::Sender> {
// todo!()
// }
}

impl fmt::Display for OperationalData {
Expand Down
27 changes: 15 additions & 12 deletions relayer/src/link/relay_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::foreign_client::{ForeignClient, ForeignClientError};
use crate::link::error::LinkError;
use crate::link::operational_data::{OperationalData, OperationalDataTarget, TransitMessage};
use crate::link::relay_summary::RelaySummary;
use crate::event::rpc::IbcEventWithHash;

const MAX_RETRIES: usize = 5;

Expand Down Expand Up @@ -203,7 +204,7 @@ impl RelayPath {

// Determines if the events received are relevant and should be processed.
// Only events for a port/channel matching one of the channel ends should be processed.
fn filter_events(&self, events: &[IbcEvent]) -> Vec<IbcEvent> {
fn filter_events(&self, events: &[IbcEventWithHash]) -> Vec<IbcEvent> {
let mut result = vec![];

let src_channel_id = if let Ok(some_id) = self.src_channel_id() {
Expand All @@ -212,34 +213,34 @@ impl RelayPath {
return vec![];
};

for event in events.iter() {
match event {
for ewh in events.iter() {
match &ewh.event {
IbcEvent::SendPacket(send_packet_ev) => {
if src_channel_id == send_packet_ev.src_channel_id()
&& self.src_port_id() == send_packet_ev.src_port_id()
{
result.push(event.clone());
result.push(ewh.event.clone());
}
}
IbcEvent::WriteAcknowledgement(write_ack_ev) => {
if src_channel_id == write_ack_ev.dst_channel_id()
&& self.src_port_id() == write_ack_ev.dst_port_id()
{
result.push(event.clone());
result.push(ewh.event.clone());
}
}
IbcEvent::CloseInitChannel(chan_close_ev) => {
if src_channel_id == chan_close_ev.channel_id()
&& self.src_port_id() == chan_close_ev.port_id()
{
result.push(event.clone());
result.push(ewh.event.clone());
}
}
IbcEvent::TimeoutPacket(timeout_ev) => {
if src_channel_id == timeout_ev.src_channel_id()
&& self.channel.src_port_id() == timeout_ev.src_port_id()
{
result.push(event.clone());
result.push(ewh.event.clone());
}
}
_ => {}
Expand Down Expand Up @@ -453,6 +454,9 @@ impl RelayPath {
}

/// Returns the events generated by the target chain
/// TODO: Add a sender strategy as a parameter here -- either sync or async.
/// Based on the sender, the result of this method will differ.
/// call it relay_sender?
pub(crate) fn relay_from_operational_data(
&mut self,
initial_od: OperationalData,
Expand Down Expand Up @@ -585,12 +589,11 @@ impl RelayPath {
None
}

/// Sends a transaction to the chain targeted by the operational data `odata`.
/// If the transaction generates an error, returns the error as well as `LinkError::SendError`
/// if input events if a sending failure occurs.
/// Returns the events generated by the target chain upon success.
/// Sends a transaction to the target chain of [`OperationalData`].
/// Returns the tx hashes generated by the target chain upon success.
/// Propagates any encountered errors.
fn send_from_operational_data(
&mut self,
&self,
odata: OperationalData,
) -> Result<RelaySummary, LinkError> {
if odata.batch.is_empty() {
Expand Down
Loading

0 comments on commit b63335b

Please sign in to comment.