Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hidden configuration for sending batched messages sequentially #2543

Merged
merged 12 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 24 additions & 10 deletions relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use crate::account::Balance;
use crate::chain::client::ClientSettings;
use crate::chain::cosmos::batch::{
send_batched_messages_and_wait_check_tx, send_batched_messages_and_wait_commit,
sequential_send_batched_messages_and_wait_commit,
};
use crate::chain::cosmos::encode::encode_to_bech32;
use crate::chain::cosmos::gas::{calculate_fee, mul_ceil};
Expand Down Expand Up @@ -417,16 +418,29 @@ impl CosmosSdkChain {
let account =
get_or_fetch_account(&self.grpc_addr, &key_entry.account, &mut self.account).await?;

send_batched_messages_and_wait_commit(
&self.tx_config,
self.config.max_msg_num,
self.config.max_tx_size,
&key_entry,
account,
&self.config.memo_prefix,
proto_msgs,
)
.await
if self.config.sequential_batch_tx {
sequential_send_batched_messages_and_wait_commit(
&self.tx_config,
self.config.max_msg_num,
self.config.max_tx_size,
&key_entry,
account,
&self.config.memo_prefix,
proto_msgs,
)
.await
} else {
send_batched_messages_and_wait_commit(
&self.tx_config,
self.config.max_msg_num,
self.config.max_tx_size,
&key_entry,
account,
&self.config.memo_prefix,
proto_msgs,
)
.await
}
}

async fn do_send_messages_and_wait_check_tx(
Expand Down
154 changes: 132 additions & 22 deletions relayer/src/chain/cosmos/batch.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use core::mem;

use ibc::core::ics24_host::identifier::ChainId;
use ibc::events::IbcEvent;
use ibc_proto::google::protobuf::Any;
use prost::Message;
use tendermint_rpc::endpoint::broadcast::tx_sync::Response;
use tracing::debug;

use crate::chain::cosmos::retry::send_tx_with_account_sequence_retry;
use crate::chain::cosmos::types::account::Account;
Expand All @@ -14,6 +16,13 @@ use crate::config::types::{MaxMsgNum, MaxTxSize, Memo};
use crate::error::Error;
use crate::keyring::KeyEntry;

/**
Broadcast messages as multiple batched transactions to the chain all at once,
and then wait for all transactions to be committed.
This may improve performance in case when multiple transactions are
committed into the same block. However this approach may not work if
priority mempool is enabled.
*/
pub async fn send_batched_messages_and_wait_commit(
config: &TxConfig,
max_msg_num: MaxMsgNum,
Expand Down Expand Up @@ -55,6 +64,43 @@ pub async fn send_batched_messages_and_wait_commit(
Ok(events)
}

/**
Send batched messages one after another, only after the previous one
has been committed. This is only used in case if parallel transactions
are committed in the wrong order due to interference from priority mempool.
*/
pub async fn sequential_send_batched_messages_and_wait_commit(
config: &TxConfig,
max_msg_num: MaxMsgNum,
max_tx_size: MaxTxSize,
key_entry: &KeyEntry,
account: &mut Account,
tx_memo: &Memo,
messages: Vec<Any>,
) -> Result<Vec<IbcEvent>, Error> {
if messages.is_empty() {
return Ok(Vec::new());
}

let tx_sync_results = sequential_send_messages_as_batches(
config,
max_msg_num,
max_tx_size,
key_entry,
account,
tx_memo,
messages,
)
.await?;

let events = tx_sync_results
.into_iter()
.flat_map(|el| el.events)
.collect();

Ok(events)
}

pub async fn send_batched_messages_and_wait_check_tx(
config: &TxConfig,
max_msg_num: MaxMsgNum,
Expand Down Expand Up @@ -95,8 +141,17 @@ async fn send_messages_as_batches(
return Ok(Vec::new());
}

let message_count = messages.len();

let batches = batch_messages(max_msg_num, max_tx_size, messages)?;

debug!(
"sending {} messages as {} batches to chain {} in parallel",
message_count,
batches.len(),
config.chain_id
);

let mut tx_sync_results = Vec::new();

for batch in batches {
Expand All @@ -105,33 +160,88 @@ async fn send_messages_as_batches(
let response =
send_tx_with_account_sequence_retry(config, key_entry, account, tx_memo, batch).await?;

if response.code.is_err() {
let events_per_tx = vec![IbcEvent::ChainError(format!(
"check_tx (broadcast_tx_sync) on chain {} for Tx hash {} reports error: code={:?}, log={:?}",
config.chain_id, response.hash, response.code, response.log
)); message_count];

let tx_sync_result = TxSyncResult {
response,
events: events_per_tx,
status: TxStatus::ReceivedResponse,
};

tx_sync_results.push(tx_sync_result);
} else {
let tx_sync_result = TxSyncResult {
response,
events: Vec::new(),
status: TxStatus::Pending { message_count },
};

tx_sync_results.push(tx_sync_result);
}
let tx_sync_result = response_to_tx_sync_result(&config.chain_id, message_count, response);

tx_sync_results.push(tx_sync_result);
}

Ok(tx_sync_results)
}

async fn sequential_send_messages_as_batches(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned about more code duplication here.
When contemplating refactoring for batching fixes respecting actual tx size, already there are two functions to care about, this adds a third one.

Could this be incorporated in the regular send_messages_as_batches under a dynamic flag?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The core logic has already been abstracted into helper functions like batch_messages, send_tx_with_account_sequence_retry, and wait_for_block_commits. So functions like sequential_send_messages_as_batches only act as higher level functions that wire up the lower level functions. Such low level functions are already there to serve the code deduplication purpose.

Using a dynamic flag can obscure the logic that differentiates send_messages_as_batches from sequential_send_messages_as_batches. With them being separate functions, it is much more clear of how the behavior are different from one another.

config: &TxConfig,
max_msg_num: MaxMsgNum,
max_tx_size: MaxTxSize,
key_entry: &KeyEntry,
account: &mut Account,
tx_memo: &Memo,
messages: Vec<Any>,
) -> Result<Vec<TxSyncResult>, Error> {
if messages.is_empty() {
return Ok(Vec::new());
}

let message_count = messages.len();

let batches = batch_messages(max_msg_num, max_tx_size, messages)?;

debug!(
"sending {} messages as {} batches to chain {} in serial",
message_count,
batches.len(),
config.chain_id
);

let mut tx_sync_results = Vec::new();

for batch in batches {
let message_count = batch.len();

let response =
send_tx_with_account_sequence_retry(config, key_entry, account, tx_memo, batch).await?;

let tx_sync_result = response_to_tx_sync_result(&config.chain_id, message_count, response);

tx_sync_results.push(tx_sync_result);

wait_for_block_commits(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With sequential sending, we get the DeliverTX result before sending the next transaction. However it is a bit unclear here whether we should continue submitting the remaining batches to the chain, if DeliverTX or SendTX returns error.

Supposingly, we should probably short circuit and return on the first encounter of error. However I keep the behavior the same as the parallel version, so that we do not observe unexpected difference in behavior when sequential batching is enabled.

The anti-pattern behavior of converting errors to duplicate IBC error events also significantly complicates the design space here. So I'd rather leave the behavior untouched until the chain errors are handled in better ways.

&config.chain_id,
&config.rpc_client,
&config.rpc_address,
&config.rpc_timeout,
&mut tx_sync_results,
)
.await?;
}

Ok(tx_sync_results)
}

fn response_to_tx_sync_result(
chain_id: &ChainId,
message_count: usize,
response: Response,
) -> TxSyncResult {
if response.code.is_err() {
let events_per_tx = vec![IbcEvent::ChainError(format!(
"check_tx (broadcast_tx_sync) on chain {} for Tx hash {} reports error: code={:?}, log={:?}",
chain_id, response.hash, response.code, response.log
)); message_count];

TxSyncResult {
response,
events: events_per_tx,
status: TxStatus::ReceivedResponse,
}
} else {
TxSyncResult {
response,
events: Vec::new(),
status: TxStatus::Pending { message_count },
}
}
}

fn batch_messages(
max_msg_num: MaxMsgNum,
max_tx_size: MaxTxSize,
Expand Down
8 changes: 6 additions & 2 deletions relayer/src/chain/cosmos/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use itertools::Itertools;
use std::thread;
use std::time::Instant;
use tendermint_rpc::{HttpClient, Url};
use tracing::{info, trace};
use tracing::{debug, trace};

use crate::chain::cosmos::query::tx::query_tx_response;
use crate::chain::cosmos::types::events::from_tx_response_event;
Expand All @@ -25,14 +25,18 @@ pub async fn wait_for_block_commits(
rpc_timeout: &Duration,
tx_sync_results: &mut [TxSyncResult],
) -> Result<(), Error> {
if all_tx_results_found(tx_sync_results) {
return Ok(());
}

let start_time = Instant::now();

let hashes = tx_sync_results
.iter()
.map(|res| res.response.hash.to_string())
.join(", ");

info!(
debug!(
id = %chain_id,
"wait_for_block_commits: waiting for commit of tx hashes(s) {}",
hashes
Expand Down
1 change: 1 addition & 0 deletions relayer/src/chain/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ pub mod test_utils {
address_type: AddressType::default(),
memo_prefix: Default::default(),
proof_specs: Default::default(),
sequential_batch_tx: false,
}
}
}
8 changes: 8 additions & 0 deletions relayer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,14 @@ pub struct ChainConfig {
#[serde(default, with = "self::proof_specs")]
pub proof_specs: ProofSpecs,

// This is an undocumented and hidden config to make the relayer wait for
// DeliverTX before sending the next transaction when sending messages in
// multiple batches. We will instruct relayer operators to turn this on
// in case relaying failed in a chain with priority mempool enabled.
// Warning: turning this on may cause degradation in performance.
#[serde(default)]
pub sequential_batch_tx: bool,

// these two need to be last otherwise we run into `ValueAfterTable` error when serializing to TOML
/// The trust threshold defines what fraction of the total voting power of a known
/// and trusted validator set is sufficient for a commit to be accepted going forward.
Expand Down
1 change: 1 addition & 0 deletions tools/integration-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tendermint-rpc = { version = "=0.23.9", features = ["http-client", "websocket-c

serde_json = "1"
time = "0.3"
toml = "0.5"
serde = "1.0.141"

[features]
Expand Down
1 change: 1 addition & 0 deletions tools/integration-test/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod memo;
pub mod python;
mod query_packet;
pub mod supervisor;
pub mod tendermint;
pub mod ternary_transfer;
pub mod transfer;

Expand Down
1 change: 1 addition & 0 deletions tools/integration-test/src/tests/tendermint/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod sequential;
Loading