Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pagination and timeout to commitments and ack queries #4110

Merged
merged 8 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- Paginate results of `query_packet_commitments` and `query_packet_acknowledgements`
queries to speed up the scanning phase.
([\#4101](https://github.com/informalsystems/hermes/issues/4101))
3 changes: 2 additions & 1 deletion crates/relayer-cli/src/commands/query/packet/commitments.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use abscissa_core::clap::Parser;

use ibc_relayer::chain::counterparty::commitments_on_chain;
use ibc_relayer::chain::requests::Paginate;
use ibc_relayer_types::core::ics24_host::identifier::{ChainId, ChannelId, PortId};

use crate::cli_utils::spawn_chain_runtime;
Expand Down Expand Up @@ -47,7 +48,7 @@ impl QueryPacketCommitmentsCmd {

let chain = spawn_chain_runtime(&config, &self.chain_id)?;

commitments_on_chain(&chain, &self.port_id, &self.channel_id)
commitments_on_chain(&chain, &self.port_id, &self.channel_id, Paginate::All)
.map_err(Error::supervisor)
.map(|(seqs_vec, height)| PacketSeqs {
height,
Expand Down
19 changes: 15 additions & 4 deletions crates/relayer-cli/src/commands/query/packet/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use ibc_relayer::chain::counterparty::{
channel_on_destination, pending_packet_summary, PendingPackets,
};
use ibc_relayer::chain::handle::{BaseChainHandle, ChainHandle};
use ibc_relayer::chain::requests::Paginate;
use ibc_relayer_types::core::ics24_host::identifier::{ChainId, ChannelId, PortId};

use crate::cli_utils::spawn_chain_counterparty;
Expand Down Expand Up @@ -130,8 +131,13 @@ impl QueryPendingPacketsCmd {
self.chain_id, chan_conn_cli.channel
);

let src_summary = pending_packet_summary(&chains.src, &chains.dst, &chan_conn_cli.channel)
.map_err(Error::supervisor)?;
let src_summary = pending_packet_summary(
&chains.src,
&chains.dst,
&chan_conn_cli.channel,
Paginate::All,
)
.map_err(Error::supervisor)?;

let counterparty_channel = channel_on_destination(
&chan_conn_cli.channel,
Expand All @@ -141,8 +147,13 @@ impl QueryPendingPacketsCmd {
.map_err(Error::supervisor)?
.ok_or_else(|| Error::missing_counterparty_channel_id(chan_conn_cli.channel))?;

let dst_summary = pending_packet_summary(&chains.dst, &chains.src, &counterparty_channel)
.map_err(Error::supervisor)?;
let dst_summary = pending_packet_summary(
&chains.dst,
&chains.src,
&counterparty_channel,
Paginate::All,
)
.map_err(Error::supervisor)?;

Ok(Summary {
src_chain: chains.src.id(),
Expand Down
6 changes: 4 additions & 2 deletions crates/relayer-cli/src/commands/query/packet/pending_acks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use abscissa_core::clap::Parser;

use ibc_relayer::chain::counterparty::unreceived_acknowledgements;
use ibc_relayer::chain::handle::BaseChainHandle;
use ibc_relayer::chain::requests::Paginate;
use ibc_relayer::path::PathIdentifiers;
use ibc_relayer::util::collate::CollatedIterExt;
use ibc_relayer_types::core::ics04_channel::packet::Sequence;
Expand Down Expand Up @@ -68,8 +69,9 @@ impl QueryPendingAcksCmd {
let path_identifiers = PathIdentifiers::from_channel_end(channel.clone())
.ok_or_else(|| Error::missing_counterparty_channel_id(channel))?;

let acks = unreceived_acknowledgements(&chains.src, &chains.dst, &path_identifiers)
.map_err(Error::supervisor)?;
let acks =
unreceived_acknowledgements(&chains.src, &chains.dst, &path_identifiers, Paginate::All)
.map_err(Error::supervisor)?;

Ok(acks.map_or(vec![], |(sns, _)| sns))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use abscissa_core::clap::Parser;

use ibc_relayer::chain::counterparty::unreceived_packets;
use ibc_relayer::chain::handle::BaseChainHandle;
use ibc_relayer::chain::requests::Paginate;
use ibc_relayer::path::PathIdentifiers;
use ibc_relayer::util::collate::CollatedIterExt;
use ibc_relayer_types::core::ics04_channel::packet::Sequence;
Expand Down Expand Up @@ -68,7 +69,7 @@ impl QueryPendingSendsCmd {
let path_identifiers = PathIdentifiers::from_channel_end(channel.clone())
.ok_or_else(|| Error::missing_counterparty_channel_id(channel))?;

unreceived_packets(&chains.src, &chains.dst, &path_identifiers)
unreceived_packets(&chains.src, &chains.dst, &path_identifiers, Paginate::All)
.map_err(Error::supervisor)
.map(|(seq, _)| seq)
}
Expand Down
230 changes: 195 additions & 35 deletions crates/relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1933,31 +1933,116 @@ impl ChainEndpoint for CosmosSdkChain {
self.grpc_addr.clone(),
),
)
.map(|client| {
client.max_decoding_message_size(
self.config().max_grpc_decoding_size.get_bytes() as usize
)
})
.map_err(Error::grpc_transport)?;

client = client
.max_decoding_message_size(self.config().max_grpc_decoding_size.get_bytes() as usize);
if request.pagination.is_enabled() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are dealing with pagination in two queries, and are duplicating a bunch of fairly complex code, how about extracting the pagination logic in a standalone function we can use in both places?

async fn paginate<C, Req, Res, F>(
    mut client: C,
    request: Req,
    mut do_query: impl FnMut(&mut C, Req, Vec<u8>) -> F,
    get_next_key: impl Fn(&Res) -> Option<Vec<u8>>,
) -> Vec<Result<Res, Error>>
where
    F: Future<Output = Result<Res, Error>> + 'static,
    Req: Clone,
{
    let mut results = vec![];
    let mut page_key = vec![];

    loop {
        let response =
            do_query(&mut client, request.clone(), std::mem::take(&mut page_key)).await;

        match response {
            Ok(response) => {
                let next_key = get_next_key(&response);

                results.push(Ok(response));

                match next_key {
                    Some(next_key) if !next_key.is_empty() => {
                        page_key = next_key;
                    }
                    _ => break,
                }
            }
            Err(e) => {
                results.push(Err(e));
                break;
            }
        }
    }

    results
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave this for a follow-up PR, or when we add pagination to yet another query.

let mut results = Vec::new();
let mut page_key = Vec::new();

let request = tonic::Request::new(request.into());
let pagination_information = request.pagination.get_values();
let mut current_results = 0;

let response = self
.block_on(client.packet_commitments(request))
.map_err(|e| Error::grpc_status(e, "query_packet_commitments".to_owned()))?
.into_inner();
loop {
crate::time!(
"query_packet_commitments_loop_iteration",
{
"src_chain": self.config().id.to_string(),
}
);
let mut raw_request =
ibc_proto::ibc::core::channel::v1::QueryPacketCommitmentsRequest::from(
request.clone(),
);

let mut commitment_sequences: Vec<Sequence> = response
.commitments
.into_iter()
.map(|v| v.sequence.into())
.collect();
commitment_sequences.sort_unstable();
if let Some(pagination) = raw_request.pagination.as_mut() {
pagination.key = page_key;
}

let height = response
.height
.and_then(|raw_height| raw_height.try_into().ok())
.ok_or_else(|| Error::grpc_response_param("height".to_string()))?;
let mut tonic_request = tonic::Request::new(raw_request);
// TODO: This should either be configurable or inferred from the pagination
tonic_request.set_timeout(Duration::from_secs(10));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note for us that we should eventually make this either:
a) Configurable (eg. grpc_timeout setting)
b) Inferred from the pagination, ie. use larger timeout for Paginate::All and smaller one otherwise


let response = self.rt.block_on(async {
client
.packet_commitments(tonic_request)
.await
.map_err(|e| Error::grpc_status(e, "query_packet_commitments".to_owned()))
});

match response {
Ok(response) => {
let inner_response = response.into_inner().clone();
let next_key = inner_response
.pagination
.as_ref()
.map(|p| p.next_key.clone());

results.push(Ok(inner_response));
current_results += pagination_information.0;

match next_key {
Some(next_key) if !next_key.is_empty() => {
page_key = next_key;
}
_ => break,
}
}
Err(e) => {
results.push(Err(e));
break;
}
}
if current_results >= pagination_information.1 {
break;
}
}

let responses = results.into_iter().collect::<Result<Vec<_>, _>>()?;

Ok((commitment_sequences, height))
let mut commitment_sequences = Vec::new();

for response in &responses {
commitment_sequences.extend(
response
.commitments
.iter()
.map(|commit| Sequence::from(commit.sequence)),
);
}

let height = responses
.first()
.and_then(|res| res.height)
.and_then(|raw_height| raw_height.try_into().ok())
.ok_or_else(|| Error::grpc_response_param("height".to_string()))?;

Ok((commitment_sequences, height))
} else {
let request = tonic::Request::new(request.into());
let response = self
.block_on(client.packet_commitments(request))
.map_err(|e| Error::grpc_status(e, "query_packet_commitments".to_owned()))?
.into_inner();

let mut commitment_sequences: Vec<Sequence> = response
.commitments
.into_iter()
.map(|v| v.sequence.into())
.collect();
commitment_sequences.sort_unstable();

let height = response
.height
.and_then(|raw_height| raw_height.try_into().ok())
.ok_or_else(|| Error::grpc_response_param("height".to_string()))?;

Ok((commitment_sequences, height))
}
}

fn query_packet_receipt(
Expand Down Expand Up @@ -2074,30 +2159,105 @@ impl ChainEndpoint for CosmosSdkChain {
self.grpc_addr.clone(),
),
)
.map(|client| {
client.max_decoding_message_size(
self.config().max_grpc_decoding_size.get_bytes() as usize
)
})
.map_err(Error::grpc_transport)?;

client = client
.max_decoding_message_size(self.config().max_grpc_decoding_size.get_bytes() as usize);
if request.pagination.is_enabled() {
let mut results = Vec::new();
let mut page_key = Vec::new();

let request = tonic::Request::new(request.into());
loop {
let mut raw_request =
ibc_proto::ibc::core::channel::v1::QueryPacketAcknowledgementsRequest::from(
request.clone(),
);

let response = self
.block_on(client.packet_acknowledgements(request))
.map_err(|e| Error::grpc_status(e, "query_packet_acknowledgements".to_owned()))?
.into_inner();
if let Some(pagination) = raw_request.pagination.as_mut() {
pagination.key = page_key;
}

let acks_sequences = response
.acknowledgements
.into_iter()
.map(|v| v.sequence.into())
.collect();
let mut tonic_request = tonic::Request::new(raw_request);
// TODO: This should either be configurable or inferred from the pagination
tonic_request.set_timeout(Duration::from_secs(10));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here


let response = self.rt.block_on(async {
client
.packet_acknowledgements(tonic_request)
.await
.map_err(|e| {
Error::grpc_status(e, "query_packet_acknowledgements".to_owned())
})
});

match response {
Ok(response) => {
let inner_response = response.into_inner().clone();
let next_key = inner_response
.pagination
.as_ref()
.map(|p| p.next_key.clone());

results.push(Ok(inner_response));

match next_key {
Some(next_key) if !next_key.is_empty() => {
page_key = next_key;
}
_ => break,
}
}
Err(e) => {
results.push(Err(e));
break;
}
}
}

let responses = results.into_iter().collect::<Result<Vec<_>, _>>()?;

let height = response
.height
.and_then(|raw_height| raw_height.try_into().ok())
.ok_or_else(|| Error::grpc_response_param("height".to_string()))?;
let mut acks_sequences = Vec::new();

Ok((acks_sequences, height))
for response in &responses {
acks_sequences.extend(
response
.acknowledgements
.iter()
.map(|commit| Sequence::from(commit.sequence)),
);
}

let height = responses
.first()
.and_then(|res| res.height)
.and_then(|raw_height| raw_height.try_into().ok())
.ok_or_else(|| Error::grpc_response_param("height".to_string()))?;

Ok((acks_sequences, height))
} else {
let request = tonic::Request::new(request.into());
let response = self
.block_on(client.packet_acknowledgements(request))
.map_err(|e| Error::grpc_status(e, "query_packet_commitments".to_owned()))?
.into_inner();

let mut acks_sequences: Vec<Sequence> = response
.acknowledgements
.into_iter()
.map(|v| v.sequence.into())
.collect();
acks_sequences.sort_unstable();

let height = response
.height
.and_then(|raw_height| raw_height.try_into().ok())
.ok_or_else(|| Error::grpc_response_param("height".to_string()))?;

Ok((acks_sequences, height))
}
}

/// Performs a `QueryUnreceivedAcksRequest` gRPC query to fetch the unreceived acknowledgements
Expand Down
Loading
Loading