diff --git a/Cargo.lock b/Cargo.lock index 95268a3392..c1bb2cf52e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1416,6 +1416,7 @@ dependencies = [ "futures", "hdpath", "hex", + "http", "humantime-serde", "ibc", "ibc-proto", diff --git a/relayer/Cargo.toml b/relayer/Cargo.toml index 8ad3e9d320..c7e4c1c8a4 100644 --- a/relayer/Cargo.toml +++ b/relayer/Cargo.toml @@ -21,6 +21,7 @@ ibc = { version = "0.4.0", path = "../modules" } ibc-proto = { version = "0.8.0", path = "../proto" } ibc-telemetry = { version = "0.4.0", path = "../telemetry", optional = true } +http = "0.2.4" subtle-encoding = "0.5" anomaly = "0.2.0" async-trait = "0.1.50" diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index a58145e830..2e5e5e35a7 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -64,7 +64,11 @@ use ibc_proto::ibc::core::connection::v1::{ use crate::chain::QueryResponse; use crate::config::ChainConfig; -use crate::error::{Error, Kind}; +use crate::error::{ + abci_query_error, check_tx_error, deliver_tx_error, grpc_response_param_error, + grpc_status_error, grpc_transport_error, invalid_raw_client_state_error, invalid_uri_error, + keyring_error, protobuf_decode_error, tendermint_rpc_error, Error, Kind, +}; use crate::event::monitor::{EventMonitor, EventReceiver}; use crate::keyring::{KeyEntry, KeyRing, Store}; use crate::light_client::tendermint::LightClient as TmLightClient; @@ -98,21 +102,21 @@ impl CosmosSdkChain { self.grpc_addr.clone(), ), ) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(grpc_transport_error)?; let request = tonic::Request::new(ibc_proto::cosmos::staking::v1beta1::QueryParamsRequest {}); let response = self .block_on(client.params(request)) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(grpc_status_error)?; let res = response .into_inner() .params - .ok_or_else(|| Kind::Grpc.context("none staking params".to_string()))? + .ok_or_else(|| grpc_response_param_error("none staking params"))? .unbonding_time - .ok_or_else(|| Kind::Grpc.context("none unbonding time".to_string()))?; + .ok_or_else(|| grpc_response_param_error("none unbonding time"))?; Ok(Duration::new(res.seconds as u64, res.nanos as u32)) } @@ -132,7 +136,7 @@ impl CosmosSdkChain { Ok(self .block_on(self.rpc_client().genesis()) - .map_err(|e| Kind::Rpc(self.config.rpc_addr.clone()).context(e))? + .map_err(|e| tendermint_rpc_error(&self.config.rpc_addr, e))? .consensus_params) } @@ -148,7 +152,7 @@ impl CosmosSdkChain { let key = self .keybase() .get_key(&self.config.key_name) - .map_err(|e| Kind::KeyBase.context(e))?; + .map_err(keyring_error)?; // Create TxBody let body = TxBody { @@ -174,9 +178,7 @@ impl CosmosSdkChain { value: pk_buf, }; - let acct_response = self - .block_on(query_account(self, key.account)) - .map_err(|e| Kind::Grpc.context(e))?; + let acct_response = self.block_on(query_account(self, key.account))?; let single = Single { mode: 1 }; let sum_single = Some(Sum::Single(single)); @@ -218,7 +220,7 @@ impl CosmosSdkChain { let signed = self .keybase .sign_msg(&self.config.key_name, signdoc_buf) - .map_err(|e| Kind::KeyBase.context(e))?; + .map_err(keyring_error)?; let tx_raw = TxRaw { body_bytes: body_buf, @@ -231,9 +233,7 @@ impl CosmosSdkChain { crate::time!("TxRAW {:?}", hex::encode(txraw_buf.clone())); - let response = self - .block_on(broadcast_tx_commit(self, txraw_buf)) - .map_err(|e| Kind::Rpc(self.config.rpc_addr.clone()).context(e))?; + let response = self.block_on(broadcast_tx_commit(self, txraw_buf))?; let res = tx_result_to_event(&self.config.id, response)?; @@ -324,14 +324,16 @@ impl Chain for CosmosSdkChain { fn bootstrap(config: ChainConfig, rt: Arc) -> Result { let rpc_client = HttpClient::new(config.rpc_addr.clone()) - .map_err(|e| Kind::Rpc(config.rpc_addr.clone()).context(e))?; + .map_err(|e| tendermint_rpc_error(&config.rpc_addr, e))?; // Initialize key store and load key - let keybase = KeyRing::new(Store::Test, &config.account_prefix, &config.id) - .map_err(|e| Kind::KeyBase.context(e))?; + let keybase = + KeyRing::new(Store::Test, &config.account_prefix, &config.id).map_err(keyring_error)?; + + let grpc_raw_addr = config.grpc_addr.to_string(); let grpc_addr = - Uri::from_str(&config.grpc_addr.to_string()).map_err(|e| Kind::Grpc.context(e))?; + Uri::from_str(&grpc_raw_addr).map_err(|e| invalid_uri_error(grpc_raw_addr, e))?; Ok(Self { config, @@ -351,7 +353,7 @@ impl Chain for CosmosSdkChain { .rt .block_on(self.rpc_client.status()) .map(|s| s.node_info.id) - .map_err(|e| Kind::Rpc(self.config.rpc_addr.clone()).context(e))?; + .map_err(|e| tendermint_rpc_error(&self.config.rpc_addr, e))?; let light_client = TmLightClient::from_config(&self.config, peer_id)?; @@ -432,7 +434,7 @@ impl Chain for CosmosSdkChain { let key = self .keybase() .get_key(&self.config.key_name) - .map_err(|e| Kind::KeyBase.context(e))?; + .map_err(keyring_error)?; let bech32 = encode_to_bech32(&key.address.to_hex(), &self.config.account_prefix)?; Ok(Signer::new(bech32)) @@ -446,7 +448,7 @@ impl Chain for CosmosSdkChain { let key = self .keybase() .get_key(&self.config.key_name) - .map_err(|e| Kind::KeyBase.context(e))?; + .map_err(keyring_error)?; Ok(key) } @@ -466,7 +468,7 @@ impl Chain for CosmosSdkChain { let status = self .block_on(self.rpc_client().status()) - .map_err(|e| Kind::Rpc(self.config.rpc_addr.clone()).context(e))?; + .map_err(|e| tendermint_rpc_error(&self.config.rpc_addr, e))?; if status.sync_info.catching_up { fail!( @@ -495,12 +497,12 @@ impl Chain for CosmosSdkChain { self.grpc_addr.clone(), ), ) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(grpc_transport_error)?; let request = tonic::Request::new(request); let response = self .block_on(client.client_states(request)) - .map_err(|e| Kind::Grpc.context(e))? + .map_err(grpc_status_error)? .into_inner(); // Deserialize into domain type @@ -553,12 +555,12 @@ impl Chain for CosmosSdkChain { self.grpc_addr.clone(), ), ) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(grpc_transport_error)?; let req = tonic::Request::new(QueryCurrentPlanRequest {}); let response = self .block_on(client.current_plan(req)) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(grpc_status_error)?; let upgraded_client_state_raw = response .into_inner() @@ -567,7 +569,7 @@ impl Chain for CosmosSdkChain { .upgraded_client_state .ok_or(Kind::EmptyUpgradedClientState)?; let client_state = AnyClientState::try_from(upgraded_client_state_raw) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(invalid_raw_client_state_error)?; // TODO: Better error kinds here. let tm_client_state = @@ -601,14 +603,14 @@ impl Chain for CosmosSdkChain { self.grpc_addr.clone(), ), ) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(grpc_transport_error)?; let req = tonic::Request::new(QueryUpgradedConsensusStateRequest { last_height: tm_height.into(), }); let response = self .block_on(client.upgraded_consensus_state(req)) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(grpc_status_error)?; let upgraded_consensus_state_raw = response .into_inner() @@ -617,7 +619,7 @@ impl Chain for CosmosSdkChain { // TODO: More explicit error kinds (should not reuse Grpc all over the place) let consensus_state = AnyConsensusState::try_from(upgraded_consensus_state_raw) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(invalid_raw_client_state_error)?; let tm_consensus_state = downcast!(consensus_state => AnyConsensusState::Tendermint) .ok_or_else(|| { @@ -647,12 +649,12 @@ impl Chain for CosmosSdkChain { self.grpc_addr.clone(), ), ) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(grpc_transport_error)?; let request = tonic::Request::new(request); let response = self .block_on(client.consensus_states(request)) - .map_err(|e| Kind::Grpc.context(e))? + .map_err(grpc_status_error)? .into_inner(); let mut consensus_states: Vec = response @@ -691,13 +693,13 @@ impl Chain for CosmosSdkChain { self.grpc_addr.clone(), ), ) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(grpc_transport_error)?; let request = tonic::Request::new(request); let response = self .block_on(client.client_connections(request)) - .map_err(|e| Kind::Grpc.context(e))? + .map_err(grpc_status_error)? .into_inner(); // TODO: add warnings for any identifiers that fail to parse (below). @@ -724,13 +726,13 @@ impl Chain for CosmosSdkChain { self.grpc_addr.clone(), ), ) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(grpc_transport_error)?; let request = tonic::Request::new(request); let response = self .block_on(client.connections(request)) - .map_err(|e| Kind::Grpc.context(e))? + .map_err(grpc_status_error)? .into_inner(); // TODO: add warnings for any identifiers that fail to parse (below). @@ -769,13 +771,13 @@ impl Chain for CosmosSdkChain { self.grpc_addr.clone(), ), ) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(grpc_transport_error)?; let request = tonic::Request::new(request); let response = self .block_on(client.connection_channels(request)) - .map_err(|e| Kind::Grpc.context(e))? + .map_err(grpc_status_error)? .into_inner(); // TODO: add warnings for any identifiers that fail to parse (below). @@ -801,13 +803,13 @@ impl Chain for CosmosSdkChain { self.grpc_addr.clone(), ), ) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(grpc_transport_error)?; let request = tonic::Request::new(request); let response = self .block_on(client.channels(request)) - .map_err(|e| Kind::Grpc.context(e))? + .map_err(grpc_status_error)? .into_inner(); let channels = response @@ -877,22 +879,22 @@ impl Chain for CosmosSdkChain { self.grpc_addr.clone(), ), ) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(grpc_transport_error)?; let request = tonic::Request::new(request); let response = self .block_on(client.packet_commitments(request)) - .map_err(|e| Kind::Grpc.context(e))? + .map_err(grpc_status_error)? .into_inner(); let pc = response.commitments; let height = response .height - .ok_or_else(|| Kind::Grpc.context("missing height in response"))? + .ok_or_else(|| grpc_response_param_error("missing height in response"))? .try_into() - .map_err(|_| Kind::Grpc.context("invalid height in response"))?; + .map_err(|_| grpc_response_param_error("invalid height in response"))?; Ok((pc, height)) } @@ -910,13 +912,13 @@ impl Chain for CosmosSdkChain { self.grpc_addr.clone(), ), ) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(grpc_transport_error)?; let request = tonic::Request::new(request); let mut response = self .block_on(client.unreceived_packets(request)) - .map_err(|e| Kind::Grpc.context(e))? + .map_err(grpc_status_error)? .into_inner(); response.sequences.sort_unstable(); @@ -936,22 +938,22 @@ impl Chain for CosmosSdkChain { self.grpc_addr.clone(), ), ) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(grpc_transport_error)?; let request = tonic::Request::new(request); let response = self .block_on(client.packet_acknowledgements(request)) - .map_err(|e| Kind::Grpc.context(e))? + .map_err(grpc_status_error)? .into_inner(); let pc = response.acknowledgements; let height = response .height - .ok_or_else(|| Kind::Grpc.context("missing height in response"))? + .ok_or_else(|| grpc_response_param_error("missing height in response"))? .try_into() - .map_err(|_| Kind::Grpc.context("invalid height in response"))?; + .map_err(|_| grpc_response_param_error("invalid height in response"))?; Ok((pc, height)) } @@ -969,13 +971,13 @@ impl Chain for CosmosSdkChain { self.grpc_addr.clone(), ), ) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(grpc_transport_error)?; let request = tonic::Request::new(request); let mut response = self .block_on(client.unreceived_acks(request)) - .map_err(|e| Kind::Grpc.context(e))? + .map_err(grpc_status_error)? .into_inner(); response.sequences.sort_unstable(); @@ -994,13 +996,13 @@ impl Chain for CosmosSdkChain { self.grpc_addr.clone(), ), ) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(grpc_transport_error)?; let request = tonic::Request::new(request); let response = self .block_on(client.next_sequence_receive(request)) - .map_err(|e| Kind::Grpc.context(e))? + .map_err(grpc_status_error)? .into_inner(); Ok(Sequence::from(response.next_sequence_receive)) @@ -1036,7 +1038,7 @@ impl Chain for CosmosSdkChain { 1, // get only the first Tx matching the query Order::Ascending, )) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(|e| tendermint_rpc_error(&self.config.rpc_addr, e))?; assert!( response.txs.len() <= 1, @@ -1076,7 +1078,7 @@ impl Chain for CosmosSdkChain { 1, // get only the first Tx matching the query Order::Ascending, )) - .map_err(|e| Kind::Grpc.context(e))?; + .map_err(|e| tendermint_rpc_error(&self.config.rpc_addr, e))?; if response.txs.is_empty() { return Ok(vec![]); @@ -1426,13 +1428,10 @@ async fn abci_query( .rpc_client() .abci_query(Some(path), data.into_bytes(), height, prove) .await - .map_err(|e| Kind::Rpc(chain.config.rpc_addr.clone()).context(e))?; + .map_err(|e| tendermint_rpc_error(&chain.config.rpc_addr, e))?; - if !response.code.is_ok() { - // Fail with response log. - return Err(Kind::Rpc(chain.config.rpc_addr.clone()) - .context(response.log.to_string()) - .into()); + if response.code.is_err() { + return Err(abci_query_error(response)); } if prove && response.proof.is_none() { @@ -1464,7 +1463,7 @@ async fn broadcast_tx_commit( .rpc_client() .broadcast_tx_commit(data.into()) .await - .map_err(|e| Kind::Rpc(chain.config.rpc_addr.clone()).context(e))?; + .map_err(|e| tendermint_rpc_error(&chain.config.rpc_addr, e))?; Ok(response) } @@ -1475,7 +1474,7 @@ async fn query_account(chain: &CosmosSdkChain, address: String) -> Result Result)?; Ok(base_account) } @@ -1503,16 +1502,11 @@ pub fn tx_result_to_event( // Verify the return codes from check_tx and deliver_tx if response.check_tx.code.is_err() { - return Ok(vec![IbcEvent::ChainError(format!( - "check_tx reports error: log={:?}", - response.check_tx.log - ))]); + return Err(check_tx_error(response.check_tx)); } + if response.deliver_tx.code.is_err() { - return Ok(vec![IbcEvent::ChainError(format!( - "deliver_tx reports error: log={:?}", - response.deliver_tx.log - ))]); + return Err(deliver_tx_error(response.deliver_tx)); } let height = ICSHeight::new(chain_id.version(), u64::from(response.height)); diff --git a/relayer/src/chain/handle/prod.rs b/relayer/src/chain/handle/prod.rs index 6f7a4bb551..e65d4efd01 100644 --- a/relayer/src/chain/handle/prod.rs +++ b/relayer/src/chain/handle/prod.rs @@ -35,7 +35,7 @@ use ibc_proto::ibc::core::connection::v1::QueryClientConnectionsRequest; use crate::{ connection::ConnectionMsgType, - error::{Error, Kind}, + error::{recv_error, send_error, Error}, keyring::KeyEntry, }; @@ -66,11 +66,9 @@ impl ProdChainHandle { let (sender, receiver) = reply_channel(); let input = f(sender); - self.runtime_sender - .send(input) - .map_err(|e| Kind::Channel.context(e))?; + self.runtime_sender.send(input).map_err(send_error)?; - receiver.recv().map_err(|e| Kind::Channel.context(e))? + receiver.recv().map_err(recv_error)? } } diff --git a/relayer/src/chain/mock.rs b/relayer/src/chain/mock.rs index 3a8fcf661a..22c1437837 100644 --- a/relayer/src/chain/mock.rs +++ b/relayer/src/chain/mock.rs @@ -41,7 +41,7 @@ use ibc_proto::ibc::core::connection::v1::{ use crate::chain::Chain; use crate::config::ChainConfig; -use crate::error::{Error, Kind}; +use crate::error::{ics18_relayer_error, Error, Kind}; use crate::event::monitor::{EventReceiver, EventSender}; use crate::keyring::{KeyEntry, KeyRing}; use crate::light_client::{mock::LightClient as MockLightClient, LightClient}; @@ -105,10 +105,7 @@ impl Chain for MockChain { fn send_msgs(&mut self, proto_msgs: Vec) -> Result, Error> { // Use the ICS18Context interface to submit the set of messages. - let events = self - .context - .send(proto_msgs) - .map_err(|e| Kind::Rpc(self.config.rpc_addr.clone()).context(e))?; + let events = self.context.send(proto_msgs).map_err(ics18_relayer_error)?; Ok(events) } diff --git a/relayer/src/chain/runtime.rs b/relayer/src/chain/runtime.rs index 33a5bcc05f..3a308c2998 100644 --- a/relayer/src/chain/runtime.rs +++ b/relayer/src/chain/runtime.rs @@ -40,7 +40,7 @@ use ibc_proto::ibc::core::{ use crate::{ config::ChainConfig, connection::ConnectionMsgType, - error::{Error, Kind}, + error::{recv_error, send_error, Error}, event::{ bus::EventBus, monitor::{EventBatch, EventReceiver, Result as MonitorResult}, @@ -168,18 +168,18 @@ impl ChainRuntime { Ok(event_batch) => { self.event_bus .broadcast(Arc::new(event_batch)) - .map_err(Kind::channel)?; + .map_err(send_error)?; }, Err(e) => { error!("received error via event bus: {}", e); - return Err(Kind::Channel.into()); + return Err(recv_error(e)); }, } }, recv(self.request_receiver) -> event => { match event { Ok(ChainRequest::Terminate { reply_to }) => { - reply_to.send(Ok(())).map_err(Kind::channel)?; + reply_to.send(Ok(())).map_err(send_error)?; break; } @@ -339,7 +339,7 @@ impl ChainRuntime { fn subscribe(&mut self, reply_to: ReplyTo) -> Result<(), Error> { let subscription = self.event_bus.subscribe(); - reply_to.send(Ok(subscription)).map_err(Kind::channel)?; + reply_to.send(Ok(subscription)).map_err(send_error)?; Ok(()) } @@ -351,7 +351,7 @@ impl ChainRuntime { ) -> Result<(), Error> { let result = self.chain.send_msgs(proto_msgs); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -359,7 +359,7 @@ impl ChainRuntime { fn query_latest_height(&self, reply_to: ReplyTo) -> Result<(), Error> { let latest_height = self.chain.query_latest_height(); - reply_to.send(latest_height).map_err(Kind::channel)?; + reply_to.send(latest_height).map_err(send_error)?; Ok(()) } @@ -367,7 +367,7 @@ impl ChainRuntime { fn get_signer(&mut self, reply_to: ReplyTo) -> Result<(), Error> { let result = self.chain.get_signer(); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -375,7 +375,7 @@ impl ChainRuntime { fn get_key(&mut self, reply_to: ReplyTo) -> Result<(), Error> { let result = self.chain.get_key(); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -383,7 +383,7 @@ impl ChainRuntime { fn module_version(&self, port_id: PortId, reply_to: ReplyTo) -> Result<(), Error> { let result = self.chain.query_module_version(&port_id); - reply_to.send(Ok(result)).map_err(Kind::channel)?; + reply_to.send(Ok(result)).map_err(send_error)?; Ok(()) } @@ -418,7 +418,7 @@ impl ChainRuntime { .map_or_else(Err, |header| Ok(header.wrap_any())), }; - reply_to.send(header).map_err(Kind::channel)?; + reply_to.send(header).map_err(send_error)?; Ok(()) } @@ -434,7 +434,7 @@ impl ChainRuntime { .build_client_state(height) .map(|cs| cs.wrap_any()); - reply_to.send(client_state).map_err(Kind::channel)?; + reply_to.send(client_state).map_err(send_error)?; Ok(()) } @@ -454,7 +454,7 @@ impl ChainRuntime { .build_consensus_state(light_block) .map(|cs| cs.wrap_any()); - reply_to.send(consensus_state).map_err(Kind::channel)?; + reply_to.send(consensus_state).map_err(send_error)?; Ok(()) } @@ -470,7 +470,7 @@ impl ChainRuntime { .light_client .check_misbehaviour(update_event, &client_state); - reply_to.send(misbehaviour).map_err(Kind::channel)?; + reply_to.send(misbehaviour).map_err(send_error)?; Ok(()) } @@ -493,7 +493,7 @@ impl ChainRuntime { let result = result .map(|(opt_client_state, proofs)| (opt_client_state.map(|cs| cs.wrap_any()), proofs)); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -505,7 +505,7 @@ impl ChainRuntime { ) -> Result<(), Error> { let result = self.chain.query_clients(request); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -517,7 +517,7 @@ impl ChainRuntime { ) -> Result<(), Error> { let result = self.chain.query_client_connections(request); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -533,7 +533,7 @@ impl ChainRuntime { .query_client_state(&client_id, height) .map(|cs| cs.wrap_any()); - reply_to.send(client_state).map_err(Kind::channel)?; + reply_to.send(client_state).map_err(send_error)?; Ok(()) } @@ -548,7 +548,7 @@ impl ChainRuntime { .query_upgraded_client_state(height) .map(|(cl, proof)| (cl.wrap_any(), proof)); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -560,7 +560,7 @@ impl ChainRuntime { ) -> Result<(), Error> { let consensus_states = self.chain.query_consensus_states(request); - reply_to.send(consensus_states).map_err(Kind::channel)?; + reply_to.send(consensus_states).map_err(send_error)?; Ok(()) } @@ -576,7 +576,7 @@ impl ChainRuntime { self.chain .query_consensus_state(client_id, consensus_height, query_height); - reply_to.send(consensus_state).map_err(Kind::channel)?; + reply_to.send(consensus_state).map_err(send_error)?; Ok(()) } @@ -591,7 +591,7 @@ impl ChainRuntime { .query_upgraded_consensus_state(height) .map(|(cs, proof)| (cs.wrap_any(), proof)); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -599,7 +599,7 @@ impl ChainRuntime { fn query_commitment_prefix(&self, reply_to: ReplyTo) -> Result<(), Error> { let prefix = self.chain.query_commitment_prefix(); - reply_to.send(prefix).map_err(Kind::channel)?; + reply_to.send(prefix).map_err(send_error)?; Ok(()) } @@ -607,7 +607,7 @@ impl ChainRuntime { fn query_compatible_versions(&self, reply_to: ReplyTo>) -> Result<(), Error> { let versions = self.chain.query_compatible_versions(); - reply_to.send(versions).map_err(Kind::channel)?; + reply_to.send(versions).map_err(send_error)?; Ok(()) } @@ -620,7 +620,7 @@ impl ChainRuntime { ) -> Result<(), Error> { let connection_end = self.chain.query_connection(&connection_id, height); - reply_to.send(connection_end).map_err(Kind::channel)?; + reply_to.send(connection_end).map_err(send_error)?; Ok(()) } @@ -632,9 +632,7 @@ impl ChainRuntime { ) -> Result<(), Error> { let result = self.chain.query_connection_channels(request); - reply_to - .send(result) - .map_err(|e| Kind::Channel.context(e))?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -646,7 +644,7 @@ impl ChainRuntime { ) -> Result<(), Error> { let result = self.chain.query_channels(request); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -660,7 +658,7 @@ impl ChainRuntime { ) -> Result<(), Error> { let result = self.chain.query_channel(&port_id, &channel_id, height); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -672,7 +670,7 @@ impl ChainRuntime { ) -> Result<(), Error> { let result = self.chain.query_channel_client_state(request); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -688,7 +686,7 @@ impl ChainRuntime { .proven_client_state(&client_id, height) .map(|(cs, mp)| (cs.wrap_any(), mp)); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -701,7 +699,7 @@ impl ChainRuntime { ) -> Result<(), Error> { let result = self.chain.proven_connection(&connection_id, height); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -718,7 +716,7 @@ impl ChainRuntime { .proven_client_consensus(&client_id, consensus_height, height) .map(|(cs, mp)| (cs.wrap_any(), mp)); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -734,7 +732,7 @@ impl ChainRuntime { .chain .build_channel_proofs(&port_id, &channel_id, height); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -752,7 +750,7 @@ impl ChainRuntime { self.chain .build_packet_proofs(packet_type, port_id, channel_id, sequence, height); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -764,7 +762,7 @@ impl ChainRuntime { ) -> Result<(), Error> { let result = self.chain.query_packet_commitments(request); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -776,7 +774,7 @@ impl ChainRuntime { ) -> Result<(), Error> { let result = self.chain.query_unreceived_packets(request); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -788,7 +786,7 @@ impl ChainRuntime { ) -> Result<(), Error> { let result = self.chain.query_packet_acknowledgements(request); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -800,7 +798,7 @@ impl ChainRuntime { ) -> Result<(), Error> { let result = self.chain.query_unreceived_acknowledgements(request); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -812,7 +810,7 @@ impl ChainRuntime { ) -> Result<(), Error> { let result = self.chain.query_next_sequence_receive(request); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } @@ -824,7 +822,7 @@ impl ChainRuntime { ) -> Result<(), Error> { let result = self.chain.query_txs(request); - reply_to.send(result).map_err(Kind::channel)?; + reply_to.send(result).map_err(send_error)?; Ok(()) } diff --git a/relayer/src/error.rs b/relayer/src/error.rs index cef2d190f3..d06aa2decd 100644 --- a/relayer/src/error.rs +++ b/relayer/src/error.rs @@ -1,12 +1,22 @@ //! This module defines the various errors that be raised in the relayer. +use crate::sdk_error::{sdk_error_from_tx_result, SdkError}; +use crate::util::retry::RetryableError; use anomaly::{BoxError, Context}; -use thiserror::Error; - +use crossbeam_channel::{RecvError, SendError}; +use http::uri::InvalidUri; use ibc::{ - ics02_client::client_type::ClientType, + ics02_client::{client_type::ClientType, error::Error as ClientError}, ics24_host::identifier::{ChannelId, ConnectionId}, }; +use prost::DecodeError; +use std::any::type_name; +use std::fmt::Debug; +use tendermint_rpc::endpoint::abci_query::AbciQuery; +use tendermint_rpc::endpoint::broadcast::tx_commit::TxResult; +use tendermint_rpc::Error as TendermintRpcError; +use thiserror::Error; +use tonic::{transport::Error as TransportError, Status}; /// An error that can be raised by the relayer. pub type Error = anomaly::Error; @@ -27,8 +37,17 @@ pub enum Kind { Config, /// RPC error (typically raised by the RPC client or the RPC requester) - #[error("RPC error to endpoint {0}")] - Rpc(tendermint_rpc::Url), + #[error("call to Tendermint RPC endpoint {0} returned error")] + TendermintRpc(tendermint_rpc::Url), + + #[error("ABCI query returns error: {0:?}")] + AbciQuery(AbciQuery), + + #[error("CheckTX Commit returns error: {0}. RawResult: {1:?}")] + CheckTx(SdkError, TxResult), + + #[error("DeliverTX Commit returns error: {0}. RawResult: {1:?}")] + DeliverTx(SdkError, TxResult), /// Websocket error (typically raised by the Websocket client) #[error("Websocket error to endpoint {0}")] @@ -42,6 +61,15 @@ pub enum Kind { #[error("GRPC error")] Grpc, + #[error("GRPC call return error status {0}")] + GrpcStatus(Status), + + #[error("error in underlying transport when making GRPC call")] + GrpcTransport, + + #[error("missing parameter in GRPC response: {0}")] + GrpcResponseParam(String), + /// Light client instance error, typically raised by a `Client` #[error("Light client error for RPC address {0}")] LightClient(String), @@ -78,6 +106,9 @@ pub enum Kind { #[error("Failed to create client state")] BuildClientStateFailure, + #[error("Error converting from raw client state")] + InvalidRawClientState, + /// Create client failure #[error("Failed to create client {0}")] CreateClient(String), @@ -142,13 +173,16 @@ pub enum Kind { #[error("Message transaction failure: {0}")] MessageTransaction(String), + #[error("Error decoding protocol buffer for {0}")] + ProtobufDecode(String), + /// Failed query #[error("Query error occurred (failed to query for {0})")] Query(String), /// Keybase related error - #[error("Keybase error")] - KeyBase, + #[error("Key ring error: {0}")] + KeyRing(crate::keyring::errors::Kind), /// ICS 007 error #[error("ICS 007 error")] @@ -156,7 +190,13 @@ pub enum Kind { /// ICS 023 error #[error("ICS 023 error")] - Ics023(#[from] ibc::ics23_commitment::error::Error), + Ics023(ibc::ics23_commitment::error::Error), + + #[error("ICS 018 error: {0}")] + Ics18(ibc::ics18_relayer::error::Kind), + + #[error("error parsing URI {0} - {0}")] + InvalidUri(String, String), /// Invalid chain identifier #[error("invalid chain identifier format: {0}")] @@ -166,7 +206,10 @@ pub enum Kind { NonProvableData, #[error("failed to send or receive through channel")] - Channel, + ChannelSend, + + #[error("failed to send or receive through channel")] + ChannelReceive, #[error("the input header is not recognized as a header for this chain")] InvalidInputHeader, @@ -187,6 +230,80 @@ pub enum Kind { }, } +impl RetryableError for Kind { + fn is_retryable(&self) -> bool { + // TODO: actually classify whether an error kind is retryable + true + } +} + +fn error_with_source(err: Kind, source: impl Into) -> Error { + Context::new(err, Some(source.into())).into() +} + +fn error(err: Kind) -> Error { + Context::new(err, None).into() +} + +pub fn send_error(err: SendError) -> Error +where + T: Send + Sync + 'static, +{ + error_with_source(Kind::ChannelSend, err) +} + +pub fn recv_error(err: RecvError) -> Error { + error_with_source(Kind::ChannelReceive, err) +} + +pub fn grpc_status_error(status: Status) -> Error { + error(Kind::GrpcStatus(status)) +} + +pub fn grpc_transport_error(err: TransportError) -> Error { + error_with_source(Kind::GrpcTransport, err) +} + +pub fn grpc_response_param_error(msg: &str) -> Error { + error(Kind::GrpcResponseParam(msg.to_string())) +} + +pub fn tendermint_rpc_error(url: &tendermint_rpc::Url, err: TendermintRpcError) -> Error { + error_with_source(Kind::TendermintRpc(url.clone()), err) +} + +pub fn abci_query_error(query: AbciQuery) -> Error { + error(Kind::AbciQuery(query)) +} + +pub fn invalid_uri_error(uri: String, err: InvalidUri) -> Error { + error(Kind::InvalidUri(uri, format!("{}", err))) +} + +pub fn protobuf_decode_error(err: DecodeError) -> Error { + error_with_source(Kind::ProtobufDecode(type_name::().to_string()), err) +} + +pub fn invalid_raw_client_state_error(err: ClientError) -> Error { + error_with_source(Kind::InvalidRawClientState, err) +} + +pub fn ics18_relayer_error(err: ibc::ics18_relayer::error::Error) -> Error { + error_with_source(Kind::Ics18(err.kind().clone()), err) +} + +pub fn keyring_error(err: crate::keyring::errors::Error) -> Error { + error_with_source(Kind::KeyRing(err.kind().clone()), err) +} + +pub fn check_tx_error(result: TxResult) -> Error { + error(Kind::CheckTx(sdk_error_from_tx_result(&result), result)) +} + +pub fn deliver_tx_error(result: TxResult) -> Error { + error(Kind::DeliverTx(sdk_error_from_tx_result(&result), result)) +} + impl Kind { /// Add a given source error as context for this error kind /// @@ -199,8 +316,4 @@ impl Kind { pub fn context(self, source: impl Into) -> Context { Context::new(self, Some(source.into())) } - - pub fn channel(err: impl Into) -> Context { - Self::Channel.context(err) - } } diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 77a5126ceb..74760dd4da 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -21,7 +21,7 @@ use tendermint_rpc::{ use ibc::{events::IbcEvent, ics02_client::height::Height, ics24_host::identifier::ChainId}; use crate::util::{ - retry::{retry_with_index, RetryResult}, + retry::{retry_with_index, RetryResult, RetryableError}, stream::group_while, }; @@ -67,6 +67,13 @@ pub enum Error { ChannelSendFailed, } +impl RetryableError for Error { + fn is_retryable(&self) -> bool { + // TODO: actually classify whether an error kind is retryable + true + } +} + /// A batch of events from a chain at a specific height #[derive(Clone, Debug)] pub struct EventBatch { @@ -263,13 +270,23 @@ impl EventMonitor { // Try to reconnect if let Err(e) = self.try_reconnect() { trace!(chain.id = %self.chain_id, "error when reconnecting: {}", e); - return RetryResult::Retry(index); + + if e.is_retryable() { + return RetryResult::Retry(index); + } else { + return RetryResult::Err(index); + } } // Try to resubscribe if let Err(e) = self.try_resubscribe() { trace!(chain.id = %self.chain_id, "error when reconnecting: {}", e); - return RetryResult::Retry(index); + + if e.is_retryable() { + return RetryResult::Retry(index); + } else { + return RetryResult::Err(index); + } } RetryResult::Ok(()) diff --git a/relayer/src/lib.rs b/relayer/src/lib.rs index a6be011169..ea56af7282 100644 --- a/relayer/src/lib.rs +++ b/relayer/src/lib.rs @@ -29,6 +29,7 @@ pub mod link; pub mod macros; pub mod object; pub mod registry; +pub mod sdk_error; pub mod supervisor; pub mod telemetry; pub mod transfer; diff --git a/relayer/src/sdk_error.rs b/relayer/src/sdk_error.rs new file mode 100644 index 0000000000..55b7d7e38c --- /dev/null +++ b/relayer/src/sdk_error.rs @@ -0,0 +1,162 @@ +use tendermint::abci::Code; +use tendermint_rpc::endpoint::broadcast::tx_commit::TxResult; +use thiserror::Error; + +// Provides mapping for errors returned from ibc-go and cosmos-sdk +#[derive(Clone, Debug, Error)] +pub enum SdkError { + #[error("ICS02 Client Error: {0}")] + Client(ClientError), + + #[error("Unknown SDK Error")] + Unknown, +} + +#[derive(Clone, Debug, Error)] +pub enum ClientError { + #[error("light client already exists")] + LightClientAlreadyExists, + + #[error("light client is invalid")] + InvalidLightClient, + + #[error("light client not found")] + LightClientNotFound, + + #[error("light client is frozen due to misbehaviour")] + FrozenLightClient, + + #[error("invalid client metadata")] + InvalidClientMetadata, + + #[error("consensus state not found")] + ConsensusStateNotFound, + + #[error("invalid consensus state")] + InvalidConsensusState, + + #[error("client type not found")] + ClientTypeNotFound, + + #[error("invalid client type")] + InvalidClientType, + + #[error("commitment root not found")] + CommitmentRootNotFound, + + #[error("invalid client header")] + InvalidClientHeader, + + #[error("invalid light client misbehaviour")] + InvalidLightClientMisbehavior, + + #[error("client state verification failed")] + ClientStateVerificationFailed, + + #[error("client consensus state verification failed")] + ClientConsensusStateVerificationFailed, + + #[error("connection state verification failed")] + ConnectionStateVerificationFailed, + + #[error("channel state verification failed")] + ChannelStateVerificationFailed, + + #[error("packet commitment verification failed")] + PacketCommitmentVerificationFailed, + + #[error("packet acknowledgement verification failed")] + PacketAcknowledgementVerificationFailed, + + #[error("packet receipt verification failed")] + PacketReceiptVerificationFailed, + + #[error("next sequence receive verification failed")] + NextSequenceReceiveVerificationFailed, + + #[error("self consensus state not found")] + SelfConsensusStateNotFound, + + #[error("unable to update light client")] + UpdateLightClientFailed, + + #[error("invalid update client proposal")] + InvalidUpdateClientProposal, + + #[error("invalid client upgrade")] + InvalidClientUpgrade, + + #[error("invalid height")] + InvalidHeight, + + #[error("invalid client state substitute")] + InvalidClientStateSubstitute, + + #[error("invalid upgrade proposal")] + InvalidUpgradeProposal, + + #[error("client is not active")] + InactiveClient, + + #[error("Unknown client error")] + Unknown, +} + +impl ClientError { + // The error code mapping follows the Go code at + // ibc-go/modules/core/02-client/types/errors.go + fn from_code(code: u32) -> ClientError { + match code { + 2 => ClientError::LightClientAlreadyExists, + 3 => ClientError::InvalidLightClient, + 4 => ClientError::LightClientNotFound, + 5 => ClientError::FrozenLightClient, + 6 => ClientError::InvalidClientMetadata, + 7 => ClientError::ConsensusStateNotFound, + 8 => ClientError::InvalidConsensusState, + 9 => ClientError::ClientTypeNotFound, + 10 => ClientError::InvalidClientType, + 11 => ClientError::CommitmentRootNotFound, + 12 => ClientError::InvalidClientHeader, + 13 => ClientError::InvalidLightClientMisbehavior, + 14 => ClientError::ClientStateVerificationFailed, + 15 => ClientError::ClientConsensusStateVerificationFailed, + 16 => ClientError::ConnectionStateVerificationFailed, + 17 => ClientError::ChannelStateVerificationFailed, + 18 => ClientError::PacketCommitmentVerificationFailed, + 19 => ClientError::PacketAcknowledgementVerificationFailed, + 20 => ClientError::PacketReceiptVerificationFailed, + 21 => ClientError::NextSequenceReceiveVerificationFailed, + 22 => ClientError::SelfConsensusStateNotFound, + 23 => ClientError::UpdateLightClientFailed, + 24 => ClientError::InvalidUpdateClientProposal, + 25 => ClientError::InvalidClientUpgrade, + 26 => ClientError::InvalidHeight, + 27 => ClientError::InvalidClientStateSubstitute, + 28 => ClientError::InvalidUpgradeProposal, + 29 => ClientError::InactiveClient, + _ => ClientError::Unknown, + } + } +} + +// Converts the error in a TxResult into SdkError with the same +// mapping as defined in ibc-go and cosmos-sdk. This assumes the +// target chain we are interacting with are using cosmos-sdk and ibc-go. +// +// TODO: investigate ways to automatically generate the mapping by parsing +// the errors.go source code directly +pub fn sdk_error_from_tx_result(result: &TxResult) -> SdkError { + match result.code { + Code::Ok => SdkError::Unknown, + Code::Err(code) => { + let codespace = result.codespace.to_string(); + if codespace == "client" { + SdkError::Client(ClientError::from_code(code)) + } else { + // TODO: Implement mapping for other codespaces in ibc-go + SdkError::Unknown + } + } + } +} diff --git a/relayer/src/util/retry.rs b/relayer/src/util/retry.rs index f63e09e8fd..8103b66d52 100644 --- a/relayer/src/util/retry.rs +++ b/relayer/src/util/retry.rs @@ -1,3 +1,5 @@ +use anomaly::BoxError; +use std::fmt::{Debug, Display}; use std::time::Duration; pub use retry::{ @@ -5,6 +7,21 @@ pub use retry::{ retry_with_index, OperationResult as RetryResult, }; +/// When encountering an error, indicates whether the relayer should +/// perform retry on the same operation. +pub trait RetryableError { + fn is_retryable(&self) -> bool; +} + +impl RetryableError for anomaly::Error +where + Kind: RetryableError + Clone + Debug + Display + Into, +{ + fn is_retryable(&self) -> bool { + self.kind().is_retryable() + } +} + #[derive(Copy, Clone, Debug)] pub struct ConstantGrowth { delay: Duration,