From 871cb5abeca195f3428ed3aa8d416df4e9bb6931 Mon Sep 17 00:00:00 2001 From: Soares Chen Date: Tue, 9 Aug 2022 20:52:35 +0200 Subject: [PATCH 1/8] Add sequential version of send_batched_messages --- relayer/src/chain/cosmos/batch.rs | 123 ++++++++++++++++++++++++------ relayer/src/chain/cosmos/wait.rs | 4 + 2 files changed, 105 insertions(+), 22 deletions(-) diff --git a/relayer/src/chain/cosmos/batch.rs b/relayer/src/chain/cosmos/batch.rs index bac7fba586..7eee1084a4 100644 --- a/relayer/src/chain/cosmos/batch.rs +++ b/relayer/src/chain/cosmos/batch.rs @@ -1,5 +1,6 @@ use core::mem; +use ibc::core::ics24_host::identifier::ChainId; use ibc::events::IbcEvent; use ibc_proto::google::protobuf::Any; use prost::Message; @@ -55,6 +56,38 @@ pub async fn send_batched_messages_and_wait_commit( Ok(events) } +pub async fn sequential_send_batched_messages_and_wait_commit( + config: &TxConfig, + max_msg_num: MaxMsgNum, + max_tx_size: MaxTxSize, + key_entry: &KeyEntry, + account: &mut Account, + tx_memo: &Memo, + messages: Vec, +) -> Result, Error> { + if messages.is_empty() { + return Ok(Vec::new()); + } + + let tx_sync_results = sequential_send_messages_as_batches( + config, + max_msg_num, + max_tx_size, + key_entry, + account, + tx_memo, + messages, + ) + .await?; + + let events = tx_sync_results + .into_iter() + .flat_map(|el| el.events) + .collect(); + + Ok(events) +} + pub async fn send_batched_messages_and_wait_check_tx( config: &TxConfig, max_msg_num: MaxMsgNum, @@ -105,33 +138,79 @@ async fn send_messages_as_batches( let response = send_tx_with_account_sequence_retry(config, key_entry, account, tx_memo, batch).await?; - if response.code.is_err() { - let events_per_tx = vec![IbcEvent::ChainError(format!( - "check_tx (broadcast_tx_sync) on chain {} for Tx hash {} reports error: code={:?}, log={:?}", - config.chain_id, response.hash, response.code, response.log - )); message_count]; - - let tx_sync_result = TxSyncResult { - response, - events: events_per_tx, - status: TxStatus::ReceivedResponse, - }; - - tx_sync_results.push(tx_sync_result); - } else { - let tx_sync_result = TxSyncResult { - response, - events: Vec::new(), - status: TxStatus::Pending { message_count }, - }; - - tx_sync_results.push(tx_sync_result); - } + let tx_sync_result = response_to_tx_sync_result(&config.chain_id, message_count, response); + + tx_sync_results.push(tx_sync_result); } Ok(tx_sync_results) } +async fn sequential_send_messages_as_batches( + config: &TxConfig, + max_msg_num: MaxMsgNum, + max_tx_size: MaxTxSize, + key_entry: &KeyEntry, + account: &mut Account, + tx_memo: &Memo, + messages: Vec, +) -> Result, Error> { + if messages.is_empty() { + return Ok(Vec::new()); + } + + let batches = batch_messages(max_msg_num, max_tx_size, messages)?; + + let mut tx_sync_results = Vec::new(); + + for batch in batches { + let message_count = batch.len(); + + let response = + send_tx_with_account_sequence_retry(config, key_entry, account, tx_memo, batch).await?; + + let tx_sync_result = response_to_tx_sync_result(&config.chain_id, message_count, response); + + tx_sync_results.push(tx_sync_result); + + wait_for_block_commits( + &config.chain_id, + &config.rpc_client, + &config.rpc_address, + &config.rpc_timeout, + &mut tx_sync_results, + ) + .await?; + } + + Ok(tx_sync_results) +} + +fn response_to_tx_sync_result( + chain_id: &ChainId, + message_count: usize, + response: Response, +) -> TxSyncResult { + if response.code.is_err() { + let events_per_tx = vec![IbcEvent::ChainError(format!( + "check_tx (broadcast_tx_sync) on chain {} for Tx hash {} reports error: code={:?}, log={:?}", + chain_id, response.hash, response.code, response.log + )); message_count]; + + TxSyncResult { + response, + events: events_per_tx, + status: TxStatus::ReceivedResponse, + } + } else { + TxSyncResult { + response, + events: Vec::new(), + status: TxStatus::Pending { message_count }, + } + } +} + fn batch_messages( max_msg_num: MaxMsgNum, max_tx_size: MaxTxSize, diff --git a/relayer/src/chain/cosmos/wait.rs b/relayer/src/chain/cosmos/wait.rs index c39dfe72af..f8e4479b58 100644 --- a/relayer/src/chain/cosmos/wait.rs +++ b/relayer/src/chain/cosmos/wait.rs @@ -25,6 +25,10 @@ pub async fn wait_for_block_commits( rpc_timeout: &Duration, tx_sync_results: &mut [TxSyncResult], ) -> Result<(), Error> { + if all_tx_results_found(tx_sync_results) { + return Ok(()); + } + let start_time = Instant::now(); let hashes = tx_sync_results From 0dcec0a086ca3ce34928b19860db5ce32764e91f Mon Sep 17 00:00:00 2001 From: Soares Chen Date: Tue, 9 Aug 2022 21:00:29 +0200 Subject: [PATCH 2/8] Add sequential_batch_tx chain config --- relayer/src/chain/cosmos.rs | 34 +++++++++++++------ relayer/src/chain/mock.rs | 1 + relayer/src/config.rs | 8 +++++ tools/test-framework/src/types/single/node.rs | 1 + 4 files changed, 34 insertions(+), 10 deletions(-) diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index c43513fff8..44908145d9 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -53,6 +53,7 @@ use crate::account::Balance; use crate::chain::client::ClientSettings; use crate::chain::cosmos::batch::{ send_batched_messages_and_wait_check_tx, send_batched_messages_and_wait_commit, + sequential_send_batched_messages_and_wait_commit, }; use crate::chain::cosmos::encode::encode_to_bech32; use crate::chain::cosmos::gas::{calculate_fee, mul_ceil}; @@ -417,16 +418,29 @@ impl CosmosSdkChain { let account = get_or_fetch_account(&self.grpc_addr, &key_entry.account, &mut self.account).await?; - send_batched_messages_and_wait_commit( - &self.tx_config, - self.config.max_msg_num, - self.config.max_tx_size, - &key_entry, - account, - &self.config.memo_prefix, - proto_msgs, - ) - .await + if self.config.sequential_batch_tx { + sequential_send_batched_messages_and_wait_commit( + &self.tx_config, + self.config.max_msg_num, + self.config.max_tx_size, + &key_entry, + account, + &self.config.memo_prefix, + proto_msgs, + ) + .await + } else { + send_batched_messages_and_wait_commit( + &self.tx_config, + self.config.max_msg_num, + self.config.max_tx_size, + &key_entry, + account, + &self.config.memo_prefix, + proto_msgs, + ) + .await + } } async fn do_send_messages_and_wait_check_tx( diff --git a/relayer/src/chain/mock.rs b/relayer/src/chain/mock.rs index fe47ba9841..6867b8040c 100644 --- a/relayer/src/chain/mock.rs +++ b/relayer/src/chain/mock.rs @@ -484,6 +484,7 @@ pub mod test_utils { address_type: AddressType::default(), memo_prefix: Default::default(), proof_specs: Default::default(), + sequential_batch_tx: false, } } } diff --git a/relayer/src/config.rs b/relayer/src/config.rs index c7839acc49..4c2fcd2edb 100644 --- a/relayer/src/config.rs +++ b/relayer/src/config.rs @@ -361,6 +361,14 @@ pub struct ChainConfig { #[serde(default, with = "self::proof_specs")] pub proof_specs: ProofSpecs, + // This is an undocumented and hidden config to make the relayer wait for + // DeliverTX before sending the next transaction when sending messages in + // multiple batches. We will instruct relayer operators to turn this on + // in case relaying failed in a chain with priority mempool enabled. + // Warning: turning this on may cause degradation in performance. + #[serde(default)] + pub sequential_batch_tx: bool, + // these two need to be last otherwise we run into `ValueAfterTable` error when serializing to TOML /// The trust threshold defines what fraction of the total voting power of a known /// and trusted validator set is sufficient for a commit to be accepted going forward. diff --git a/tools/test-framework/src/types/single/node.rs b/tools/test-framework/src/types/single/node.rs index 1d2a1a27d7..bc76275df0 100644 --- a/tools/test-framework/src/types/single/node.rs +++ b/tools/test-framework/src/types/single/node.rs @@ -151,6 +151,7 @@ impl FullNode { address_type: Default::default(), memo_prefix: Default::default(), proof_specs: Default::default(), + sequential_batch_tx: false, }) } From d62be79f2911aaa90d39e35514de00a2b256e5ce Mon Sep 17 00:00:00 2001 From: Soares Chen Date: Tue, 9 Aug 2022 21:02:15 +0200 Subject: [PATCH 3/8] Manual test behavior of integration tests with sequential_batch_tx --- tools/test-framework/src/types/single/node.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/test-framework/src/types/single/node.rs b/tools/test-framework/src/types/single/node.rs index bc76275df0..bba4c3f6fe 100644 --- a/tools/test-framework/src/types/single/node.rs +++ b/tools/test-framework/src/types/single/node.rs @@ -151,7 +151,7 @@ impl FullNode { address_type: Default::default(), memo_prefix: Default::default(), proof_specs: Default::default(), - sequential_batch_tx: false, + sequential_batch_tx: true, }) } From 13d0b05ce89123e5283eeab99574bdfd9cec7475 Mon Sep 17 00:00:00 2001 From: Soares Chen Date: Wed, 10 Aug 2022 19:27:49 +0200 Subject: [PATCH 4/8] Add draft test for sequential batching --- Cargo.lock | 1 + relayer/src/chain/cosmos/batch.rs | 5 + tools/integration-test/Cargo.toml | 1 + tools/integration-test/src/tests/mod.rs | 1 + .../src/tests/tendermint/mod.rs | 1 + .../src/tests/tendermint/sequential.rs | 101 ++++++++++++++++++ tools/test-framework/src/types/single/node.rs | 2 +- 7 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 tools/integration-test/src/tests/tendermint/mod.rs create mode 100644 tools/integration-test/src/tests/tendermint/sequential.rs diff --git a/Cargo.lock b/Cargo.lock index d635595eb5..1c5908419a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1443,6 +1443,7 @@ dependencies = [ "tendermint", "tendermint-rpc", "time", + "toml", ] [[package]] diff --git a/relayer/src/chain/cosmos/batch.rs b/relayer/src/chain/cosmos/batch.rs index 7eee1084a4..c56b8223a7 100644 --- a/relayer/src/chain/cosmos/batch.rs +++ b/relayer/src/chain/cosmos/batch.rs @@ -5,6 +5,7 @@ use ibc::events::IbcEvent; use ibc_proto::google::protobuf::Any; use prost::Message; use tendermint_rpc::endpoint::broadcast::tx_sync::Response; +use tracing::info; use crate::chain::cosmos::retry::send_tx_with_account_sequence_retry; use crate::chain::cosmos::types::account::Account; @@ -130,6 +131,8 @@ async fn send_messages_as_batches( let batches = batch_messages(max_msg_num, max_tx_size, messages)?; + info!("sending {} batches of messages in parallel", batches.len()); + let mut tx_sync_results = Vec::new(); for batch in batches { @@ -161,6 +164,8 @@ async fn sequential_send_messages_as_batches( let batches = batch_messages(max_msg_num, max_tx_size, messages)?; + info!("sending {} batches of messages in serial", batches.len()); + let mut tx_sync_results = Vec::new(); for batch in batches { diff --git a/tools/integration-test/Cargo.toml b/tools/integration-test/Cargo.toml index cb2be7d955..74d912f087 100644 --- a/tools/integration-test/Cargo.toml +++ b/tools/integration-test/Cargo.toml @@ -24,6 +24,7 @@ tendermint-rpc = { version = "=0.23.9", features = ["http-client", "websocket-c serde_json = "1" time = "0.3" +toml = "0.5" serde = "1.0.141" [features] diff --git a/tools/integration-test/src/tests/mod.rs b/tools/integration-test/src/tests/mod.rs index 3d11025cd8..b6c7f65c7f 100644 --- a/tools/integration-test/src/tests/mod.rs +++ b/tools/integration-test/src/tests/mod.rs @@ -16,6 +16,7 @@ pub mod memo; pub mod python; mod query_packet; pub mod supervisor; +pub mod tendermint; pub mod ternary_transfer; pub mod transfer; diff --git a/tools/integration-test/src/tests/tendermint/mod.rs b/tools/integration-test/src/tests/tendermint/mod.rs new file mode 100644 index 0000000000..7b0ee374b1 --- /dev/null +++ b/tools/integration-test/src/tests/tendermint/mod.rs @@ -0,0 +1 @@ +pub mod sequential; diff --git a/tools/integration-test/src/tests/tendermint/sequential.rs b/tools/integration-test/src/tests/tendermint/sequential.rs new file mode 100644 index 0000000000..f4c0ad068d --- /dev/null +++ b/tools/integration-test/src/tests/tendermint/sequential.rs @@ -0,0 +1,101 @@ +use std::time::Instant; + +use ibc_relayer::chain::tracking::TrackedMsgs; +use ibc_relayer::config::types::max_msg_num::MaxMsgNum; +use ibc_test_framework::chain::config; +use ibc_test_framework::prelude::*; +use ibc_test_framework::relayer::transfer::build_transfer_message; + +#[test] +fn test_sequential_commit() -> Result<(), Error> { + run_binary_channel_test(&SequentialCommitTest) +} + +pub struct SequentialCommitTest; + +impl TestOverrides for SequentialCommitTest { + fn modify_node_config(&self, config: &mut toml::Value) -> Result<(), Error> { + config::set_timeout_commit(config, Duration::from_millis(500))?; + config::set_timeout_propose(config, Duration::from_millis(500))?; + Ok(()) + } + + fn modify_relayer_config(&self, config: &mut Config) { + let chain_config_a = &mut config.chains[0]; + chain_config_a.max_msg_num = MaxMsgNum::new(3).unwrap(); + chain_config_a.sequential_batch_tx = true; + + let chain_config_b = &mut config.chains[1]; + chain_config_b.max_msg_num = MaxMsgNum::new(3).unwrap(); + chain_config_b.sequential_batch_tx = false; + } + + fn should_spawn_supervisor(&self) -> bool { + false + } +} + +impl BinaryChannelTest for SequentialCommitTest { + fn run( + &self, + _config: &TestConfig, + _relayer: RelayerDriver, + chains: ConnectedChains, + channel: ConnectedChannel, + ) -> Result<(), Error> { + let wallet_a = chains.node_a.wallets().relayer().cloned(); + let wallet_b = chains.node_b.wallets().relayer().cloned(); + + { + let denom_a = chains.node_a.denom(); + + let transfer_message = build_transfer_message( + &channel.port_a.as_ref(), + &channel.channel_id_a.as_ref(), + &wallet_a.as_ref(), + &wallet_b.address(), + &denom_a, + 100, + )?; + + let messages = TrackedMsgs::new_static(vec![transfer_message; 15], "test_error_events"); + + let start = Instant::now(); + + chains.handle_a().send_messages_and_wait_commit(messages)?; + + let end = Instant::now(); + + let duration = end.duration_since(start); + + info!("time taken to send 15 messages on chain A: {:?}", duration); + } + + { + let denom_b = chains.node_b.denom(); + + let transfer_message = build_transfer_message( + &channel.port_b.as_ref(), + &channel.channel_id_b.as_ref(), + &wallet_b.as_ref(), + &wallet_a.address(), + &denom_b, + 100, + )?; + + let messages = TrackedMsgs::new_static(vec![transfer_message; 15], "test_error_events"); + + let start = Instant::now(); + + chains.handle_b().send_messages_and_wait_commit(messages)?; + + let end = Instant::now(); + + let duration = end.duration_since(start); + + info!("time taken to send 15 messages on chain B: {:?}", duration); + } + + Ok(()) + } +} diff --git a/tools/test-framework/src/types/single/node.rs b/tools/test-framework/src/types/single/node.rs index bba4c3f6fe..bc76275df0 100644 --- a/tools/test-framework/src/types/single/node.rs +++ b/tools/test-framework/src/types/single/node.rs @@ -151,7 +151,7 @@ impl FullNode { address_type: Default::default(), memo_prefix: Default::default(), proof_specs: Default::default(), - sequential_batch_tx: true, + sequential_batch_tx: false, }) } From 9860fe3e9a77a61148477273b3285acda9142ba2 Mon Sep 17 00:00:00 2001 From: Soares Chen Date: Wed, 10 Aug 2022 19:32:03 +0200 Subject: [PATCH 5/8] Add set_mempool_version to enable priority mempool --- .../src/tests/tendermint/sequential.rs | 2 ++ tools/test-framework/src/chain/config.rs | 14 ++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/tools/integration-test/src/tests/tendermint/sequential.rs b/tools/integration-test/src/tests/tendermint/sequential.rs index f4c0ad068d..b67308c8bb 100644 --- a/tools/integration-test/src/tests/tendermint/sequential.rs +++ b/tools/integration-test/src/tests/tendermint/sequential.rs @@ -17,6 +17,8 @@ impl TestOverrides for SequentialCommitTest { fn modify_node_config(&self, config: &mut toml::Value) -> Result<(), Error> { config::set_timeout_commit(config, Duration::from_millis(500))?; config::set_timeout_propose(config, Duration::from_millis(500))?; + config::set_mempool_version(config, "v1")?; + Ok(()) } diff --git a/tools/test-framework/src/chain/config.rs b/tools/test-framework/src/chain/config.rs index afa65fb630..e402f4fa9b 100644 --- a/tools/test-framework/src/chain/config.rs +++ b/tools/test-framework/src/chain/config.rs @@ -73,6 +73,20 @@ pub fn set_p2p_port(config: &mut Value, port: u16) -> Result<(), Error> { Ok(()) } +pub fn set_mempool_version(config: &mut Value, version: &str) -> Result<(), Error> { + config + .get_mut("mempool") + .ok_or_else(|| eyre!("expect mempool section"))? + .as_table_mut() + .ok_or_else(|| eyre!("expect object"))? + .insert( + "mempool_version".to_string(), + version.into(), + ); + + Ok(()) +} + /// Set the `consensus.timeout_commit` field in the full node config. pub fn set_timeout_commit(config: &mut Value, duration: Duration) -> Result<(), Error> { config From 16e1887260be5f641b619c0b427c19f425971e45 Mon Sep 17 00:00:00 2001 From: Soares Chen Date: Wed, 10 Aug 2022 20:16:36 +0200 Subject: [PATCH 6/8] Add integration test for sequential batching --- relayer/src/chain/cosmos/batch.rs | 20 ++++++- relayer/src/chain/cosmos/wait.rs | 4 +- .../src/tests/tendermint/sequential.rs | 55 ++++++++++++++++--- tools/test-framework/src/chain/config.rs | 5 +- 4 files changed, 67 insertions(+), 17 deletions(-) diff --git a/relayer/src/chain/cosmos/batch.rs b/relayer/src/chain/cosmos/batch.rs index c56b8223a7..6d5effa10e 100644 --- a/relayer/src/chain/cosmos/batch.rs +++ b/relayer/src/chain/cosmos/batch.rs @@ -5,7 +5,7 @@ use ibc::events::IbcEvent; use ibc_proto::google::protobuf::Any; use prost::Message; use tendermint_rpc::endpoint::broadcast::tx_sync::Response; -use tracing::info; +use tracing::debug; use crate::chain::cosmos::retry::send_tx_with_account_sequence_retry; use crate::chain::cosmos::types::account::Account; @@ -129,9 +129,16 @@ async fn send_messages_as_batches( return Ok(Vec::new()); } + let message_count = messages.len(); + let batches = batch_messages(max_msg_num, max_tx_size, messages)?; - info!("sending {} batches of messages in parallel", batches.len()); + debug!( + "sending {} messages as {} batches to chain {} in parallel", + message_count, + batches.len(), + config.chain_id + ); let mut tx_sync_results = Vec::new(); @@ -162,9 +169,16 @@ async fn sequential_send_messages_as_batches( return Ok(Vec::new()); } + let message_count = messages.len(); + let batches = batch_messages(max_msg_num, max_tx_size, messages)?; - info!("sending {} batches of messages in serial", batches.len()); + debug!( + "sending {} messages as {} batches to chain {} in serial", + message_count, + batches.len(), + config.chain_id + ); let mut tx_sync_results = Vec::new(); diff --git a/relayer/src/chain/cosmos/wait.rs b/relayer/src/chain/cosmos/wait.rs index f8e4479b58..57bfdc366c 100644 --- a/relayer/src/chain/cosmos/wait.rs +++ b/relayer/src/chain/cosmos/wait.rs @@ -6,7 +6,7 @@ use itertools::Itertools; use std::thread; use std::time::Instant; use tendermint_rpc::{HttpClient, Url}; -use tracing::{info, trace}; +use tracing::{debug, trace}; use crate::chain::cosmos::query::tx::query_tx_response; use crate::chain::cosmos::types::events::from_tx_response_event; @@ -36,7 +36,7 @@ pub async fn wait_for_block_commits( .map(|res| res.response.hash.to_string()) .join(", "); - info!( + debug!( id = %chain_id, "wait_for_block_commits: waiting for commit of tx hashes(s) {}", hashes diff --git a/tools/integration-test/src/tests/tendermint/sequential.rs b/tools/integration-test/src/tests/tendermint/sequential.rs index b67308c8bb..77f34fcb36 100644 --- a/tools/integration-test/src/tests/tendermint/sequential.rs +++ b/tools/integration-test/src/tests/tendermint/sequential.rs @@ -6,6 +6,12 @@ use ibc_test_framework::chain::config; use ibc_test_framework::prelude::*; use ibc_test_framework::relayer::transfer::build_transfer_message; +const MESSAGES_PER_BATCH: usize = 5; +const TOTAL_TRANSACTIONS: usize = 5; +const TOTAL_MESSAGES: usize = MESSAGES_PER_BATCH * TOTAL_TRANSACTIONS; +const BLOCK_TIME_MILLIS: u64 = 1000; +const BLOCK_TIME: Duration = Duration::from_millis(BLOCK_TIME_MILLIS); + #[test] fn test_sequential_commit() -> Result<(), Error> { run_binary_channel_test(&SequentialCommitTest) @@ -15,20 +21,24 @@ pub struct SequentialCommitTest; impl TestOverrides for SequentialCommitTest { fn modify_node_config(&self, config: &mut toml::Value) -> Result<(), Error> { - config::set_timeout_commit(config, Duration::from_millis(500))?; - config::set_timeout_propose(config, Duration::from_millis(500))?; + config::set_timeout_commit(config, BLOCK_TIME)?; + config::set_timeout_propose(config, BLOCK_TIME)?; + + // Enable priority mempool. Note that this is not working currently config::set_mempool_version(config, "v1")?; Ok(()) } fn modify_relayer_config(&self, config: &mut Config) { + // Use sequential batching for chain A, and default parallel batching for chain B + let chain_config_a = &mut config.chains[0]; - chain_config_a.max_msg_num = MaxMsgNum::new(3).unwrap(); + chain_config_a.max_msg_num = MaxMsgNum::new(MESSAGES_PER_BATCH).unwrap(); chain_config_a.sequential_batch_tx = true; let chain_config_b = &mut config.chains[1]; - chain_config_b.max_msg_num = MaxMsgNum::new(3).unwrap(); + chain_config_b.max_msg_num = MaxMsgNum::new(MESSAGES_PER_BATCH).unwrap(); chain_config_b.sequential_batch_tx = false; } @@ -60,7 +70,10 @@ impl BinaryChannelTest for SequentialCommitTest { 100, )?; - let messages = TrackedMsgs::new_static(vec![transfer_message; 15], "test_error_events"); + let messages = TrackedMsgs::new_static( + vec![transfer_message; TOTAL_MESSAGES], + "test_error_events", + ); let start = Instant::now(); @@ -70,7 +83,21 @@ impl BinaryChannelTest for SequentialCommitTest { let duration = end.duration_since(start); - info!("time taken to send 15 messages on chain A: {:?}", duration); + info!( + "time taken to send {} messages on chain A: {:?}", + TOTAL_MESSAGES, duration + ); + + // Time taken for submitting sequential batches should be around number of transactions * block time + + assert!( + duration + > Duration::from_millis((BLOCK_TIME_MILLIS * TOTAL_TRANSACTIONS as u64) - 1000) + ); + assert!( + duration + < Duration::from_millis((BLOCK_TIME_MILLIS * TOTAL_TRANSACTIONS as u64) + 1000) + ); } { @@ -85,7 +112,10 @@ impl BinaryChannelTest for SequentialCommitTest { 100, )?; - let messages = TrackedMsgs::new_static(vec![transfer_message; 15], "test_error_events"); + let messages = TrackedMsgs::new_static( + vec![transfer_message; TOTAL_MESSAGES], + "test_error_events", + ); let start = Instant::now(); @@ -95,7 +125,16 @@ impl BinaryChannelTest for SequentialCommitTest { let duration = end.duration_since(start); - info!("time taken to send 15 messages on chain B: {:?}", duration); + // Time taken for submitting sequential batches should be around a single block time, + // since the number of transactions are small enough to fit in a single block. + + info!( + "time taken to send {} messages on chain B: {:?}", + TOTAL_MESSAGES, duration + ); + + assert!(duration > BLOCK_TIME); + assert!(duration < Duration::from_millis(BLOCK_TIME_MILLIS * 2)); } Ok(()) diff --git a/tools/test-framework/src/chain/config.rs b/tools/test-framework/src/chain/config.rs index e402f4fa9b..e66c746d26 100644 --- a/tools/test-framework/src/chain/config.rs +++ b/tools/test-framework/src/chain/config.rs @@ -79,10 +79,7 @@ pub fn set_mempool_version(config: &mut Value, version: &str) -> Result<(), Erro .ok_or_else(|| eyre!("expect mempool section"))? .as_table_mut() .ok_or_else(|| eyre!("expect object"))? - .insert( - "mempool_version".to_string(), - version.into(), - ); + .insert("mempool_version".to_string(), version.into()); Ok(()) } From a609c0566d728202269dab1a6497281a9457b82d Mon Sep 17 00:00:00 2001 From: Soares Chen Date: Wed, 10 Aug 2022 21:44:54 +0200 Subject: [PATCH 7/8] Relax time assertion for parallel batch --- tools/integration-test/src/tests/tendermint/sequential.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tools/integration-test/src/tests/tendermint/sequential.rs b/tools/integration-test/src/tests/tendermint/sequential.rs index 77f34fcb36..d25910f156 100644 --- a/tools/integration-test/src/tests/tendermint/sequential.rs +++ b/tools/integration-test/src/tests/tendermint/sequential.rs @@ -133,7 +133,6 @@ impl BinaryChannelTest for SequentialCommitTest { TOTAL_MESSAGES, duration ); - assert!(duration > BLOCK_TIME); assert!(duration < Duration::from_millis(BLOCK_TIME_MILLIS * 2)); } From 6b0817ba9354c7fef9ade05d272697ddc10db85b Mon Sep 17 00:00:00 2001 From: Soares Chen Date: Fri, 12 Aug 2022 14:01:08 +0200 Subject: [PATCH 8/8] Add some documentation to send_messages functions --- relayer/src/chain/cosmos/batch.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/relayer/src/chain/cosmos/batch.rs b/relayer/src/chain/cosmos/batch.rs index 6d5effa10e..d4c96840ce 100644 --- a/relayer/src/chain/cosmos/batch.rs +++ b/relayer/src/chain/cosmos/batch.rs @@ -16,6 +16,13 @@ use crate::config::types::{MaxMsgNum, MaxTxSize, Memo}; use crate::error::Error; use crate::keyring::KeyEntry; +/** + Broadcast messages as multiple batched transactions to the chain all at once, + and then wait for all transactions to be committed. + This may improve performance in case when multiple transactions are + committed into the same block. However this approach may not work if + priority mempool is enabled. +*/ pub async fn send_batched_messages_and_wait_commit( config: &TxConfig, max_msg_num: MaxMsgNum, @@ -57,6 +64,11 @@ pub async fn send_batched_messages_and_wait_commit( Ok(events) } +/** + Send batched messages one after another, only after the previous one + has been committed. This is only used in case if parallel transactions + are committed in the wrong order due to interference from priority mempool. +*/ pub async fn sequential_send_batched_messages_and_wait_commit( config: &TxConfig, max_msg_num: MaxMsgNum,