Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix cosmos and near chain send_message_and_wait_commit_*, #469

Open
wants to merge 2 commits into
base: v1.7.3-octopus
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 11 additions & 101 deletions crates/relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use core::{
};
use futures::future::join_all;
use num_bigint::BigInt;
use prost::Message;
use std::{cmp::Ordering, thread};

use tokio::runtime::Runtime as TokioRuntime;
Expand Down Expand Up @@ -685,8 +684,11 @@ impl CosmosSdkChain {
let account =
get_or_fetch_account(&self.grpc_addr, &key_account, &mut self.account).await?;

let canister_id = self.config.canister_id.id.as_str();
if self.config.sequential_batch_tx {
sequential_send_batched_messages_and_wait_commit(
&self.vp_client,
canister_id,
&self.rpc_client,
&self.tx_config,
&key_pair,
Expand Down Expand Up @@ -736,7 +738,10 @@ impl CosmosSdkChain {
let account =
get_or_fetch_account(&self.grpc_addr, &key_account, &mut self.account).await?;

let canister_id = self.config.canister_id.id.as_str();
send_batched_messages_and_wait_check_tx(
&self.vp_client,
canister_id,
&self.rpc_client,
&self.tx_config,
&key_pair,
Expand Down Expand Up @@ -1048,113 +1053,18 @@ impl ChainEndpoint for CosmosSdkChain {
&mut self,
tracked_msgs: TrackedMsgs,
) -> Result<Vec<IbcEventWithHeight>, Error> {
info!(
"tracked_msgs: {:?}, tracking_id: {:?}, \n{}",
tracked_msgs
.msgs
.iter()
.map(|msg| msg.type_url.clone())
.collect::<Vec<_>>(),
tracked_msgs.tracking_id,
std::panic::Location::caller()
);
use ibc_proto::google::protobuf::Any;

let mut tracked_msgs = tracked_msgs.clone();
if tracked_msgs.tracking_id().to_string() != "ft-transfer" {
let canister_id = self.config.canister_id.id.as_str();
let mut msgs: Vec<Any> = Vec::new();
for msg in tracked_msgs.messages() {
let res = self
.block_on(self.vp_client.deliver(canister_id, msg.encode_to_vec()))
.map_err(|e| {
let position = std::panic::Location::caller();
Error::report_error(format!(
"call vp deliver failed Error({}) \n{}",
e, position
))
})?;
if !res.is_empty() {
msgs.push(Any::decode(&res[..]).map_err(|e| {
let position = std::panic::Location::caller();
Error::report_error(format!(
"decode call vp deliver result failed Error({}) \n{}",
e, position
))
})?);
}
}
tracked_msgs.msgs = msgs;
info!(
"got proto_msgs from ic: {:?} \n{}",
tracked_msgs
.msgs
.iter()
.map(|msg| msg.type_url.clone())
.collect::<Vec<_>>(),
std::panic::Location::caller()
);
}
let runtime = self.rt.clone();

let rt = self.rt.clone();
rt.block_on(self.do_send_messages_and_wait_commit(tracked_msgs))
runtime.block_on(self.do_send_messages_and_wait_commit(tracked_msgs))
}

fn send_messages_and_wait_check_tx(
&mut self,
tracked_msgs: TrackedMsgs,
) -> Result<Vec<Response>, Error> {
info!(
"tracked_msgs: {:?}, tracking_id: {:?} \n{}",
tracked_msgs
.msgs
.iter()
.map(|msg| msg.type_url.clone())
.collect::<Vec<_>>(),
tracked_msgs.tracking_id,
std::panic::Location::caller()
);
use ibc_proto::google::protobuf::Any;

let mut tracked_msgs = tracked_msgs.clone();
if tracked_msgs.tracking_id().to_string() != "ft-transfer" {
let canister_id = self.config.canister_id.id.as_str();

let mut msgs: Vec<Any> = Vec::new();
for msg in tracked_msgs.messages() {
let res = self
.block_on(self.vp_client.deliver(canister_id, msg.encode_to_vec()))
.map_err(|e| {
let position = std::panic::Location::caller();
Error::report_error(format!(
"call icp deliver failed Error({}) \n{}",
e, position
))
})?;
if !res.is_empty() {
msgs.push(Any::decode(&res[..]).map_err(|e| {
let position = std::panic::Location::caller();
Error::report_error(format!(
"deliver result decode failed Error({}) \n{}",
e, position
))
})?);
}
}
tracked_msgs.msgs = msgs;
info!(
"got proto_msgs from ic: {:?} \n{}",
tracked_msgs
.msgs
.iter()
.map(|msg| msg.type_url.clone())
.collect::<Vec<_>>(),
std::panic::Location::caller()
);
}
let runtime = self.rt.clone();

let rt = self.rt.clone();
rt.block_on(self.do_send_messages_and_wait_check_tx(tracked_msgs))
runtime.block_on(self.do_send_messages_and_wait_check_tx(tracked_msgs))
}

/// Get the account for the signer
Expand Down Expand Up @@ -2693,6 +2603,6 @@ mod tests {
};

let res = inner_query_consensus_state_heights(rt, vp_client, cansiter_id, request);
dbg!(res);
assert!(res.is_ok());
}
}
75 changes: 72 additions & 3 deletions crates/relayer/src/chain/cosmos/batch.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use core::mem;

use crate::chain::ic::VpClient;
use ibc_proto::google::protobuf::Any;
use ibc_relayer_types::core::ics24_host::identifier::ChainId;
use ibc_relayer_types::events::IbcEvent;
Expand Down Expand Up @@ -66,6 +67,8 @@ pub async fn send_batched_messages_and_wait_commit(
are committed in the wrong order due to interference from priority mempool.
*/
pub async fn sequential_send_batched_messages_and_wait_commit(
vp_client: &VpClient,
canister_id: &str,
rpc_client: &HttpClient,
config: &TxConfig,
key_pair: &Secp256k1KeyPair,
Expand All @@ -78,7 +81,14 @@ pub async fn sequential_send_batched_messages_and_wait_commit(
}

let tx_sync_results = sequential_send_messages_as_batches(
rpc_client, config, key_pair, account, tx_memo, messages,
vp_client,
canister_id,
rpc_client,
config,
key_pair,
account,
tx_memo,
messages,
)
.await?;

Expand All @@ -91,6 +101,8 @@ pub async fn sequential_send_batched_messages_and_wait_commit(
}

pub async fn send_batched_messages_and_wait_check_tx(
vp_client: &VpClient,
canister_id: &str,
rpc_client: &HttpClient,
config: &TxConfig,
key_pair: &Secp256k1KeyPair,
Expand All @@ -107,8 +119,36 @@ pub async fn send_batched_messages_and_wait_check_tx(
let mut responses = Vec::new();

for batch in batches {
let mut inner_batch = vec![];
for msg in batch.iter() {
let res = vp_client
.deliver(canister_id, msg.encode_to_vec())
.await
.map_err(|e| {
let position = std::panic::Location::caller();
Error::report_error(format!(
"call vp deliver failed Error({}) \n{}",
e, position
))
})?;
if !res.is_empty() {
inner_batch.push(Any::decode(&res[..]).map_err(|e| {
let position = std::panic::Location::caller();
Error::report_error(format!(
"decode call vp deliver result failed Error({}) \n{}",
e, position
))
})?);
}
}

let response = send_tx_with_account_sequence_retry(
rpc_client, config, key_pair, account, tx_memo, &batch,
rpc_client,
config,
key_pair,
account,
tx_memo,
&inner_batch,
)
.await?;

Expand Down Expand Up @@ -160,6 +200,8 @@ async fn send_messages_as_batches(
}

async fn sequential_send_messages_as_batches(
vp_client: &VpClient,
canister_id: &str,
rpc_client: &HttpClient,
config: &TxConfig,
key_pair: &Secp256k1KeyPair,
Expand All @@ -186,9 +228,36 @@ async fn sequential_send_messages_as_batches(

for batch in batches {
let message_count = batch.len();
let mut inner_batch = vec![];
for msg in batch.iter() {
let res = vp_client
.deliver(canister_id, msg.encode_to_vec())
.await
.map_err(|e| {
let position = std::panic::Location::caller();
Error::report_error(format!(
"call vp deliver failed Error({}) \n{}",
e, position
))
})?;
if !res.is_empty() {
inner_batch.push(Any::decode(&res[..]).map_err(|e| {
let position = std::panic::Location::caller();
Error::report_error(format!(
"decode call vp deliver result failed Error({}) \n{}",
e, position
))
})?);
}
}

let response = send_tx_with_account_sequence_retry(
rpc_client, config, key_pair, account, tx_memo, &batch,
rpc_client,
config,
key_pair,
account,
tx_memo,
&inner_batch,
)
.await?;

Expand Down
14 changes: 6 additions & 8 deletions crates/relayer/src/chain/near/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,15 +560,13 @@ impl ChainEndpoint for NearChain {
std::panic::Location::caller()
);

for message in proto_msgs.messages().iter() {
let result = self.deliver(vec![message.clone()])?;
let result = self.deliver(proto_msgs.messages().to_vec())?;

debug!(
"deliver result: {:?} \n{}",
result,
std::panic::Location::caller()
);
}
debug!(
"deliver result: {:?} \n{}",
result,
std::panic::Location::caller()
);

Ok(vec![])
}
Expand Down
23 changes: 23 additions & 0 deletions tools/test-framework/src/bootstrap/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use eyre::Report as Error;
use ibc_relayer_cli::components::enable_ansi;
use std::env;
use std::fs;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Once;
use tracing_subscriber::{
self as ts,
Expand Down Expand Up @@ -65,6 +67,22 @@ pub fn init_test() -> Result<TestConfig, Error> {
.map(|val| val == "1")
.unwrap_or(false);

let ic_endpoint =
env::var("IC_ENDPOINT").unwrap_or_else(|_| "http://localhost:4943".to_string());

let home_dir = std::env::var("HOME").unwrap();

let canister_pem = PathBuf::from_str(&format!(
"{}/.config/dfx/identity/default/identity.pem",
home_dir
))
.unwrap();

let near_ibc_address = "v5.nearibc.testnet".to_string();
let canister_id = "bkyz2-fmaaa-aaaaa-qaaaq-cai".to_string();
let near_rpc_endpoint =
"https://near-testnet.infura.io/v3/272532ecf0b64d7782a03db0cbcf3c30".to_string();

Ok(TestConfig {
chain_command_paths,
chain_store_dir,
Expand All @@ -73,6 +91,11 @@ pub fn init_test() -> Result<TestConfig, Error> {
bootstrap_with_random_ids: false,
native_tokens,
compat_modes,
ic_endpoint,
canister_pem,
near_ibc_address,
canister_id,
near_rpc_endpoint,
})
}

Expand Down
14 changes: 14 additions & 0 deletions tools/test-framework/src/types/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,18 @@ pub struct TestConfig {
pub hang_on_fail: bool,

pub bootstrap_with_random_ids: bool,

// config for icp rpc endpoint url
pub ic_endpoint: String,

// config for icp canister pem file path
pub canister_pem: PathBuf,

// near ibc smart contract address
pub near_ibc_address: String,

// canister id for near ibc smart contract
pub canister_id: String,

pub near_rpc_endpoint: String,
}
7 changes: 7 additions & 0 deletions tools/test-framework/src/types/single/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use ibc_relayer::chain::ChainType;
use ibc_relayer::config;
use ibc_relayer::config::compat_mode::CompatMode;
use ibc_relayer::config::gas_multiplier::GasMultiplier;
use ibc_relayer::config::CanisterIdConfig;
use ibc_relayer::config::NearIbcContractAddress;
use ibc_relayer::keyring::Store;
use ibc_relayer_types::core::ics24_host::identifier::ChainId;
use std::sync::{Arc, RwLock};
Expand Down Expand Up @@ -145,6 +147,11 @@ impl FullNode {
Ok(config::ChainConfig {
id: self.chain_driver.chain_id.clone(),
r#type: ChainType::CosmosSdk,
ic_endpoint: test_config.ic_endpoint.clone(),
canister_pem: test_config.canister_pem.clone(),
near_ibc_address: NearIbcContractAddress::from_str(&test_config.near_ibc_address)
.unwrap(),
canister_id: CanisterIdConfig::from_str(&test_config.canister_id).unwrap(),
rpc_addr: Url::from_str(&self.chain_driver.rpc_address())?,
grpc_addr: Url::from_str(&self.chain_driver.grpc_address())?,
event_source: config::EventSourceMode::Push {
Expand Down