Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[r2r] Tx wait for confirmation timeout fix #1446

Merged
merged 32 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d5715a1
fixed unneccessary wait timeout
borngraced Aug 17, 2022
ad150e8
pr review fixes — part 1
borngraced Aug 23, 2022
5965a59
added unit test for electrum and udeps fix
borngraced Aug 23, 2022
70ec4a2
fix wasm
borngraced Aug 23, 2022
163dddf
PR review fixes — error handling improvements
borngraced Aug 26, 2022
fab04e4
removed unnecessary #[allow(clippy::large_enum_variant)]
borngraced Aug 26, 2022
33154e9
PR review fix — fixed unit test for electrum
borngraced Aug 29, 2022
4596373
pr review fixes — added and used SlurpError for JsonRpcError
borngraced Aug 30, 2022
8ea0218
pr review fixes — electrum_request and wasm fix
borngraced Aug 30, 2022
4d43e61
electrum_request error refactor
borngraced Aug 30, 2022
ca0023e
PR review fixes
borngraced Sep 5, 2022
62e4bf8
review fixes — moved InvalidRequest Error from electrum_request fn
borngraced Sep 6, 2022
eb68d42
pr review fixes, fixed wasm warnings
borngraced Sep 6, 2022
9c2dfae
pr review fixes
borngraced Sep 6, 2022
8c1a840
review fixes
borngraced Sep 7, 2022
cfee27b
review fixes
borngraced Sep 7, 2022
3d8028c
fixed last review note
borngraced Sep 9, 2022
dcb88be
fix electrum and nattive tests
borngraced Sep 9, 2022
f31c4d6
comments
borngraced Sep 9, 2022
5150c97
improved unit tests
borngraced Sep 9, 2022
511af7d
fix — test_for_non_existent_tx_hex_utxo
borngraced Sep 11, 2022
83b3f5b
test debugging on ci build
borngraced Sep 11, 2022
dae8d7d
test debugging for ci build
borngraced Sep 11, 2022
1bcbdd9
fmt
borngraced Sep 11, 2022
7dc8040
ci build debug
borngraced Sep 11, 2022
bd55559
fixed unit teat error
borngraced Sep 11, 2022
abc2d3a
Merge remote-tracking branch 'origin/dev' into tx_wait_confirmation_e…
borngraced Sep 12, 2022
6bfa3e3
Merge remote-tracking branch 'origin/dev' into tx_wait_confirmation_e…
borngraced Sep 12, 2022
d3ad8c9
cargo fmt
borngraced Sep 12, 2022
79eaf39
cargo fmt
borngraced Sep 12, 2022
1d43ca8
fix mm_ctx imports
borngraced Sep 12, 2022
1293884
Merge remote-tracking branch 'origin/dev' into tx_wait_confirmation_e…
borngraced Sep 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion mm2src/coins/qrc20/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,8 @@ impl Qrc20Coin {
},
JsonRpcErrorType::InvalidRequest(err)
| JsonRpcErrorType::Transport(err)
| JsonRpcErrorType::Parse(_, err) => {
| JsonRpcErrorType::Parse(_, err)
| JsonRpcErrorType::Internal(err) => {
return RequestTxHistoryResult::Retry {
error: ERRL!("Error {} on blockchain_contract_event_get_history", err),
};
Expand Down
153 changes: 85 additions & 68 deletions mm2src/coins/utxo/rpc_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ impl UtxoRpcClientEnum {
}
},
Err(e) => {
if e.get_inner().is_tx_not_found_error() {
return ERR!("Tx {} is not on chain anymore", tx_hash);
};

if expiry_height > 0 {
let block = match selfi.get_block_count().compat().await {
Ok(b) => b,
Expand Down Expand Up @@ -265,7 +269,9 @@ pub enum UtxoRpcError {
impl From<JsonRpcError> for UtxoRpcError {
fn from(e: JsonRpcError) -> Self {
match e.error {
JsonRpcErrorType::InvalidRequest(_) => UtxoRpcError::Internal(e.to_string()),
JsonRpcErrorType::InvalidRequest(_) | JsonRpcErrorType::Internal(_) => {
UtxoRpcError::Internal(e.to_string())
},
JsonRpcErrorType::Transport(_) => UtxoRpcError::Transport(e),
JsonRpcErrorType::Parse(_, _) | JsonRpcErrorType::Response(_, _) => UtxoRpcError::ResponseParseError(e),
}
Expand All @@ -280,6 +286,20 @@ impl From<NumConversError> for UtxoRpcError {
fn from(e: NumConversError) -> Self { UtxoRpcError::Internal(e.to_string()) }
}

impl UtxoRpcError {
pub fn is_tx_not_found_error(&self) -> bool {
if let UtxoRpcError::ResponseParseError(ref json_err) = self {
if let JsonRpcErrorType::Response(_, json) = &json_err.error {
return json["error"]["code"] == -5 // native compatible
|| json["message"].as_str().unwrap_or_default().contains(NO_TX_ERROR_CODE);
// electrum compatible;
}
};

false
}
}

/// Common operations that both types of UTXO clients have but implement them differently
#[async_trait]
pub trait UtxoRpcClientOps: fmt::Debug + Send + Sync + 'static {
Expand Down Expand Up @@ -356,12 +376,8 @@ pub trait UtxoRpcClientOps: fmt::Debug + Send + Sync + 'static {
{
Ok(bytes) => Ok(Some(deserialize(bytes.as_slice())?)),
Err(err) => {
if let UtxoRpcError::ResponseParseError(ref json_err) = err {
if let JsonRpcErrorType::Response(_, json) = &json_err.error {
if json["message"].as_str().unwrap_or_default().contains(NO_TX_ERROR_CODE) {
return Ok(None);
}
}
if err.is_tx_not_found_error() {
return Ok(None);
}
Err(err.into())
},
Expand Down Expand Up @@ -632,46 +648,46 @@ impl JsonRpcClient for NativeClientImpl {

#[cfg(target_arch = "wasm32")]
fn transport(&self, _request: JsonRpcRequestEnum) -> JsonRpcResponseFut {
Box::new(futures01::future::err(ERRL!(
"'NativeClientImpl' must be used in native mode only"
Box::new(futures01::future::err(JsonRpcErrorType::Internal(
"'NativeClientImpl' must be used in native mode only".to_string(),
)))
}

#[cfg(not(target_arch = "wasm32"))]
fn transport(&self, request: JsonRpcRequestEnum) -> JsonRpcResponseFut {
use mm2_net::transport::slurp_req;

let request_body = try_fus!(json::to_string(&request));
let request_body =
try_f!(json::to_string(&request).map_err(|e| JsonRpcErrorType::InvalidRequest(e.to_string())));
// measure now only body length, because the `hyper` crate doesn't allow to get total HTTP packet length
self.event_handlers.on_outgoing_request(request_body.as_bytes());

let uri = self.uri.clone();

let http_request = try_fus!(Request::builder()
let http_request = try_f!(Request::builder()
.method("POST")
.header(AUTHORIZATION, self.auth.clone())
.uri(uri.clone())
.body(Vec::from(request_body)));
.body(Vec::from(request_body))
.map_err(|e| JsonRpcErrorType::InvalidRequest(e.to_string())));

let event_handles = self.event_handlers.clone();
Box::new(slurp_req(http_request).boxed().compat().then(
move |result| -> Result<(JsonRpcRemoteAddr, JsonRpcResponseEnum), String> {
let res = try_s!(result);
move |result| -> Result<(JsonRpcRemoteAddr, JsonRpcResponseEnum), JsonRpcErrorType> {
let res = result.map_err(|e| e.into_inner())?;
// measure now only body length, because the `hyper` crate doesn't allow to get total HTTP packet length
event_handles.on_incoming_response(&res.2);

let body = try_s!(std::str::from_utf8(&res.2));
let body =
std::str::from_utf8(&res.2).map_err(|e| JsonRpcErrorType::parse_error(&uri, e.to_string()))?;

if res.0 != StatusCode::OK {
return ERR!(
"Rpc request {:?} failed with HTTP status code {}, response body: {}",
request,
res.0,
body
);
let res_value = serde_json::from_slice(&res.2)
.map_err(|e| JsonRpcErrorType::parse_error(&uri, e.to_string()))?;
return Err(JsonRpcErrorType::Response(uri.into(), res_value));
}

let response = try_s!(json::from_str(body));
let response = json::from_str(body).map_err(|e| JsonRpcErrorType::parse_error(&uri, e.to_string()))?;
Ok((uri.into(), response))
},
))
Expand Down Expand Up @@ -1582,76 +1598,77 @@ pub struct ElectrumClientImpl {
async fn electrum_request_multi(
client: ElectrumClient,
request: JsonRpcRequestEnum,
) -> Result<(JsonRpcRemoteAddr, JsonRpcResponseEnum), String> {
) -> Result<(JsonRpcRemoteAddr, JsonRpcResponseEnum), JsonRpcErrorType> {
let mut futures = vec![];
let connections = client.connections.lock().await;
for (i, connection) in connections.iter().enumerate() {
let connection_addr = connection.addr.clone();
match &*connection.tx.lock().await {
Some(tx) => {
let fut = electrum_request(
request.clone(),
tx.clone(),
connection.responses.clone(),
ELECTRUM_TIMEOUT / (connections.len() - i) as u64,
)
.map(|response| (JsonRpcRemoteAddr(connection_addr), response));
futures.push(fut)
},
None => (),
let json = json::to_string(&request).map_err(|e| JsonRpcErrorType::InvalidRequest(e.to_string()))?;
sergeyboyko0791 marked this conversation as resolved.
Show resolved Hide resolved
if let Some(tx) = &*connection.tx.lock().await {
let fut = electrum_request(
json,
request.rpc_id(),
tx.clone(),
connection.responses.clone(),
ELECTRUM_TIMEOUT / (connections.len() - i) as u64,
)
.map(|response| (JsonRpcRemoteAddr(connection_addr), response));
futures.push(fut)
}
}
drop(connections);

if futures.is_empty() {
return ERR!("All electrums are currently disconnected");
return Err(JsonRpcErrorType::Transport(
"All electrums are currently disconnected".to_string(),
));
}

match request {
JsonRpcRequestEnum::Single(single) if single.method == "server.ping" => {
if let JsonRpcRequestEnum::Single(single) = &request {
if single.method == "server.ping" {
// server.ping must be sent to all servers to keep all connections alive
return select_ok(futures)
.map(|(result, _)| result)
.map_err(|e| ERRL!("{:?}", e))
.compat()
.await;
},
_ => (),
return select_ok(futures).map(|(result, _)| result).compat().await;
}
}

let (res, no_of_failed_requests) = select_ok_sequential(futures)
.compat()
.await
.map_err(|e| ERRL!("{:?}", e))?;
.map_err(|e| JsonRpcErrorType::Transport(format!("{:?}", e)))?;
client.rotate_servers(no_of_failed_requests).await;

Ok(res)
}

async fn electrum_request_to(
client: ElectrumClient,
request: JsonRpcRequestEnum,
to_addr: String,
) -> Result<(JsonRpcRemoteAddr, JsonRpcResponseEnum), String> {
) -> Result<(JsonRpcRemoteAddr, JsonRpcResponseEnum), JsonRpcErrorType> {
let (tx, responses) = {
let connections = client.connections.lock().await;
let connection = connections
.iter()
.find(|c| c.addr == to_addr)
.ok_or(ERRL!("Unknown destination address {}", to_addr))?;
.ok_or_else(|| JsonRpcErrorType::Internal(format!("Unknown destination address {}", to_addr)))?;
let responses = connection.responses.clone();
let tx = {
match &*connection.tx.lock().await {
Some(tx) => tx.clone(),
None => return ERR!("Connection {} is not established yet", to_addr),
None => {
return Err(JsonRpcErrorType::Transport(format!(
"Connection {} is not established yet",
to_addr
)))
},
}
};
(tx, responses)
};

let response = try_s!(
electrum_request(request.clone(), tx, responses, ELECTRUM_TIMEOUT)
.compat()
.await
);
let json = json::to_string(&request).map_err(|err| JsonRpcErrorType::InvalidRequest(err.to_string()))?;
let response = electrum_request(json, request.rpc_id(), tx, responses, ELECTRUM_TIMEOUT)
.compat()
.await?;
Ok((JsonRpcRemoteAddr(to_addr.to_owned()), response))
}

Expand Down Expand Up @@ -2716,36 +2733,36 @@ fn electrum_connect(
}
}

/// # Important
/// `electrum_request` should always return [`JsonRpcErrorType::Transport`] error.
fn electrum_request(
request: JsonRpcRequestEnum,
mut req_json: String,
rpc_id: JsonRpcId,
tx: mpsc::Sender<Vec<u8>>,
responses: JsonRpcPendingRequestsShared,
timeout: u64,
) -> Box<dyn Future<Item = JsonRpcResponseEnum, Error = String> + Send + 'static> {
) -> Box<dyn Future<Item = JsonRpcResponseEnum, Error = JsonRpcErrorType> + Send + 'static> {
let send_fut = async move {
let mut json = try_s!(json::to_string(&request));
#[cfg(not(target_arch = "wasm"))]
{
// Electrum request and responses must end with \n
// https://electrumx.readthedocs.io/en/latest/protocol-basics.html#message-stream
json.push('\n');
req_json.push('\n');
}

let (req_tx, resp_rx) = async_oneshot::channel();
responses.lock().await.insert(request.rpc_id(), req_tx);
try_s!(tx.send(json.into_bytes()).compat().await);
let resps = try_s!(resp_rx.await);
responses.lock().await.insert(rpc_id, req_tx);
tx.send(req_json.into_bytes())
.compat()
.await
.map_err(|err| JsonRpcErrorType::Transport(err.to_string()))?;
let resps = resp_rx.await.map_err(|e| JsonRpcErrorType::Transport(e.to_string()))?;
Ok(resps)
};
let send_fut = send_fut
.boxed()
.timeout(Duration::from_secs(timeout))
.compat()
.then(|res| match res {
Ok(response) => response,
Err(timeout_error) => ERR!("{}", timeout_error),
})
.map_err(|e| ERRL!("{}", e));
.then(move |res| res.map_err(|err| JsonRpcErrorType::Transport(err.to_string()))?);
Box::new(send_fut)
}

Expand Down
3 changes: 2 additions & 1 deletion mm2src/coins/utxo/utxo_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2327,8 +2327,9 @@ where
Ok(value) => value,
Err(e) => match &e.error {
JsonRpcErrorType::InvalidRequest(e)
| JsonRpcErrorType::Parse(_, e)
| JsonRpcErrorType::Transport(e)
| JsonRpcErrorType::Parse(_, e) => {
| JsonRpcErrorType::Internal(e) => {
sergeyboyko0791 marked this conversation as resolved.
Show resolved Hide resolved
return RequestTxHistoryResult::Retry {
error: ERRL!("Error {} on scripthash_get_history", e),
};
Expand Down
24 changes: 24 additions & 0 deletions mm2src/coins/utxo/utxo_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3899,6 +3899,30 @@ fn test_electrum_display_balances() {
block_on(utxo_common_tests::test_electrum_display_balances(&rpc_client));
}

#[test]
fn test_for_non_existent_tx_hex_utxo_electrum() {
// This test shouldn't wait till timeout!
let timeout = (now_ms() / 1000) + 120;
let client = electrum_client_for_test(RICK_ELECTRUM_ADDRS);
let coin = utxo_coin_for_test(
client.into(),
Some("spice describe gravity federal blast come thank unfair canal monkey style afraid"),
false,
);
// bad transaction hex
let tx = hex::decode("0400008085202f8902bf17bf7d1daace52e08f732a6b8771743ca4b1cb765a187e72fd091a0aabfd52000000006a47304402203eaaa3c4da101240f80f9c5e9de716a22b1ec6d66080de6a0cca32011cd77223022040d9082b6242d6acf9a1a8e658779e1c655d708379862f235e8ba7b8ca4e69c6012102031d4256c4bc9f99ac88bf3dba21773132281f65f9bf23a59928bce08961e2f3ffffffffff023ca13c0e9e085dd13f481f193e8a3e8fd609020936e98b5587342d994f4d020000006b483045022100c0ba56adb8de923975052312467347d83238bd8d480ce66e8b709a7997373994022048507bcac921fdb2302fa5224ce86e41b7efc1a2e20ae63aa738dfa99b7be826012102031d4256c4bc9f99ac88bf3dba21773132281f65f9bf23a59928bce08961e2f3ffffffff0300e1f5050000000017a9141ee6d4c38a3c078eab87ad1a5e4b00f21259b10d87000000000000000016611400000000000000000000000000000000000000001b94d736000000001976a91405aab5342166f8594baf17a7d9bef5d56744332788ac2d08e35e000000000000000000000000000000").unwrap();
let expected_timeout = (now_ms() / 1000) + 10;
let actual = coin
.wait_for_confirmations(&tx, 1, false, timeout, 1)
.wait()
.err()
.unwrap();
assert!(
actual.contains("Tx d342ff9da528a2e262bddf2b6f9a27d1beb7aeb03f0fc8d9eac2987266447e44 is not on chain anymore")
);
assert!((now_ms() / 1000) < expected_timeout);
}

#[test]
fn test_native_display_balances() {
let unspents = vec![
Expand Down
Loading