From 33e9e4ea9041bf7645cd3cfb48ea2650cbe03b65 Mon Sep 17 00:00:00 2001 From: Jose Storopoli Date: Mon, 26 Aug 2024 18:20:55 -0300 Subject: [PATCH] feat(btcio)!: BitcoinClient using jsonrpsee Replaced the reqwest::Client --- Cargo.lock | 1 - Cargo.toml | 1 - crates/btcio/Cargo.toml | 1 - crates/btcio/src/reader/query.rs | 2 +- crates/btcio/src/rpc/client.rs | 396 ++++++++++++------------------- 5 files changed, 157 insertions(+), 244 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b43a6cd99..2e7113dd0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -686,7 +686,6 @@ dependencies = [ "jsonrpsee-types", "mockall", "rand 0.8.5", - "reqwest 0.12.5", "serde", "serde_json", "sha2 0.10.8", diff --git a/Cargo.toml b/Cargo.toml index c365e6dac..e386e35b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,7 +90,6 @@ num_enum = "0.7" parking_lot = "0.12.3" paste = "1.0" rand = "0.8.5" -reqwest = { version = "0.12.4", features = ["json"] } reth = { git = "https://github.com/alpenlabs/reth.git", rev = "dde184f56591d0af" } reth-chainspec = { git = "https://github.com/alpenlabs/reth.git", rev = "dde184f56591d0af" } reth-cli-commands = { git = "https://github.com/alpenlabs/reth.git", rev = "dde184f56591d0af" } diff --git a/crates/btcio/Cargo.toml b/crates/btcio/Cargo.toml index 860a65da9..801909fc4 100644 --- a/crates/btcio/Cargo.toml +++ b/crates/btcio/Cargo.toml @@ -21,7 +21,6 @@ hex = { workspace = true } jsonrpsee = { workspace = true, features = ["client"] } jsonrpsee-types = { workspace = true } rand = { workspace = true } -reqwest = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } sha2 = { workspace = true } diff --git a/crates/btcio/src/reader/query.rs b/crates/btcio/src/reader/query.rs index 3ae2f2f10..6229437a9 100644 --- a/crates/btcio/src/reader/query.rs +++ b/crates/btcio/src/reader/query.rs @@ -193,7 +193,7 @@ async fn do_reader_task( warn!(%cur_best_height, err = %err, "failed to poll Bitcoin client"); status_updates.push(StatusUpdate::RpcError(err.to_string())); - if let Some(err) = err.downcast_ref::() { + if let Some(err) = err.downcast_ref::() { // recoverable errors if err.is_connect() { status_updates.push(StatusUpdate::RpcConnected(false)); diff --git a/crates/btcio/src/rpc/client.rs b/crates/btcio/src/rpc/client.rs index ebabd7f85..f58f4c416 100644 --- a/crates/btcio/src/rpc/client.rs +++ b/crates/btcio/src/rpc/client.rs @@ -1,128 +1,117 @@ -use std::str::FromStr; -use std::sync::atomic::AtomicU64; -use std::time::Duration; +use std::{cmp::Ordering, str::FromStr, sync::atomic::AtomicU64, time::Duration}; use async_trait::async_trait; use base64::{engine::general_purpose, Engine}; use bitcoin::{ + absolute::Height, block::{Header, Version}, - consensus::deserialize, - consensus::encode::{deserialize_hex, serialize_hex}, + consensus::{ + deserialize, + encode::{deserialize_hex, serialize_hex}, + }, hash_types::TxMerkleNode, hashes::Hash as _, - Address, Block, BlockHash, CompactTarget, Network, Transaction, Txid, + Address, Amount, Block, BlockHash, CompactTarget, OutPoint, Transaction, Txid, }; -use reqwest::header::HeaderMap; -use serde::{Deserialize, Serialize}; +use bitcoind_json_rpc_types::model::{GetBlockchainInfo, GetTransaction}; +use jsonrpsee::{ + core::client::ClientT, + http_client::{HeaderMap, HttpClient}, + rpc_params, +}; +use serde::{de, Deserialize, Serialize}; use serde_json::{ - json, to_value, + to_value, value::{RawValue, Value}, }; -use thiserror::Error; use tracing::*; -use crate::rpc::traits::{BitcoinBroadcaster, BitcoinBroadcaster, BitcoinReader}; -use crate::rpc::error::Error as ClientError; +use crate::rpc::{ + error::Error as ClientError, + traits::{BitcoinBroadcaster, BitcoinReader, BitcoinSigner}, +}; -const MAX_RETRIES: u32 = 3; +/// Maximum number of retries for a request. +const MAX_RETRIES: u8 = 3; -/// This is an alias for the result type returned by the [`Client`]. -pub type ClientResult<'a, T> = Result>; +/// This is an alias for the result type returned by the [`BitcoinClient`]. +pub type ClientResult<'a, T> = Result; +/// Converts a returned JSON value to a [`Value`]. pub fn to_val<'a, T>(value: T) -> ClientResult<'a, Value> where T: Serialize, { - to_value(value).map_err(|e| ClientError::Param(format!("Error creating value: {}", e))) + to_value(value).map_err(|e| ClientError::UnexpectedStructure(e.to_string())) } -// Response is a struct that represents a response returned by the Bitcoin RPC -// It is generic over the type of the result field, which is usually a String in Bitcoin Core +/// Represents a response returned by the [`BitcoinClient`]s underlying RPC. #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] struct Response { pub result: Option, - pub error: Option, + pub error: Option, pub id: u64, } -// BitcoinClient is a struct that represents a connection to a Bitcoin RPC node +/// An `async` client for interacting with a `bitcoind` instance. #[derive(Debug)] pub struct BitcoinClient { + /// The URL of the `bitcoind` instance. url: String, - client: reqwest::Client, - network: Network, + /// The underlying `async` HTTP client. + client: HttpClient, + /// Next ID for the request. next_id: AtomicU64, } impl BitcoinClient { - pub fn new(url: String, username: String, password: String, network: Network) -> Self { + /// Creates a new [`BitcoinClient`] with the given URL, username, and password. + pub fn new<'a>(url: String, username: String, password: String) -> ClientResult<'a, Self> { + if username.is_empty() || password.is_empty() { + return Err(ClientError::MissingUserPassword); + } + let mut headers = HeaderMap::new(); let mut user_pw = String::new(); + general_purpose::STANDARD.encode_string(format!("{}:{}", username, password), &mut user_pw); - headers.insert( - "Authorization", - format!("Basic {}", user_pw) - .parse() - .expect("Failed to parse auth header!"), - ); - headers.insert( - "Content-Type", - "application/json" - .parse() - .expect("Failed to parse content type header!"), - ); - - let client = reqwest::Client::builder() - .default_headers(headers) - .build() - .expect("Failed to build client!"); - - Self { + headers.insert("Authorization", format!("Basic {}", user_pw).parse()?); + headers.insert("Content-Type", "application/json".parse()?); + + let client = HttpClient::default().set_headers(headers).build()?; + + Ok(Self { url, client, - network, next_id: AtomicU64::new(0), - } - } - - pub fn network(&self) -> Network { - self.network + }) } + /// Increments the next ID and returns the previous value. fn next_id(&self) -> u64 { - self.next_id - .fetch_add(1, std::sync::atomic::Ordering::AcqRel) + self.next_id.fetch_add(1, Ordering::AcqRel) } - async fn call( + async fn call( &self, method: &str, - params: &[serde_json::Value], + params: &[Value], ) -> ClientResult { let mut retries = 0; loop { let id = self.next_id(); - let response = self - .client - .post(&self.url) - .json(&json!({ - "jsonrpc": "1.0", - "id": id, - "method": method, - "params": params - })) - .send() - .await; + let params = rpc_params!(params); + let response: T = self.client.request(method, params).await?; match response { - Ok(resp) => { + Ok(_) => { let data = resp .json::>() .await - .map_err(|e| ClientError::Parse(e.to_string()))?; + .map_err(|e| ClientError::Returned(e.to_string()))?; if let Some(err) = data.error { - return Err(ClientError::Server(err.code, err.message)); + return Err(ClientError::JsonRpc(err)); } return data .result @@ -131,45 +120,9 @@ impl BitcoinClient { Err(err) => { warn!(err = %err, "Error calling bitcoin client"); - if err.is_body() { - // Body error, unlikely to be recoverable by retrying - return Err(ClientError::Body(err.to_string())); - } else if err.is_status() { - // HTTP status error, not retryable - let e = match err.status() { - Some(code) => ClientError::Status(code, err.to_string()), - _ => ClientError::Other(err.to_string()), - }; - return Err(e); - } else if err.is_decode() { - // Error decoding the response, retry might not help - return Err(ClientError::MalformedResponse(err.to_string())); - } else if err.is_connect() { - // Connection error, retry might help - let e = ClientError::Connection(err.to_string()); - warn!(%e, "connection error, retrying..."); - } else if err.is_timeout() { - let e = ClientError::Timeout; - // Timeout error, retry might help - warn!(%e, "timeout error, retrying..."); - } else if err.is_request() { - // General request error, retry might help - let e = ClientError::Request(err.to_string()); - warn!(%e, "request error, retrying..."); - } else if err.is_builder() { - // Error building the request, unlikely to be recoverable - return Err(ClientError::ReqBuilder(err.to_string())); - } else if err.is_redirect() { - // Redirect error, not retryable - return Err(ClientError::HttpRedirect(err.to_string())); - } else { - // Unknown error, unlikely to be recoverable - return Err(ClientError::Other("Unknown error".to_string())); - } - retries += 1; if retries >= MAX_RETRIES { - return Err(ClientError::MaxRetriesExceeded(MAX_RETRIES)); + return Err(ClientError::Other(err.to_string())); } tokio::time::sleep(Duration::from_millis(1000)).await; } @@ -177,46 +130,30 @@ impl BitcoinClient { } } - // get_block_count returns the current block height - pub async fn get_block_count(&self) -> ClientResult { - self.call::("getblockcount", &[]).await - } - - // This returns [(txid, timestamp)] - pub async fn list_transactions(&self, confirmations: u32) -> ClientResult> { - let res = self - .call::("listtransactions", &[to_value(confirmations)?]) - .await?; - if let serde_json::Value::Array(array) = res { - Ok(array - .iter() - .map(|el| { - ( - serde_json::from_value::(el.get("txid").unwrap().clone()) - .unwrap() - .clone(), - serde_json::from_value::(el.get("time").unwrap().clone()).unwrap(), - ) - }) - .collect()) - } else { - Err(ClientError::MalformedResponse(res.to_string())) - } - } + // get_block returns the block at the given hash +} - // get_mempool_txids returns a list of txids in the current mempool - pub async fn get_mempool_txids(&self) -> ClientResult> { +#[async_trait] +impl BitcoinReader for BitcoinClient { + async fn estimate_smart_fee(&self, conf_target: u16) -> ClientResult { let result = self - .call::>("getrawmempool", &[]) + .call::>("estimatesmartfee", &[to_value(1)?]) .await? .to_string(); - serde_json::from_str::>(&result) - .map_err(|e| ClientError::MalformedResponse(e.to_string())) + let result_map: serde_json::Value = serde_json::from_str(&result)?; + + let btc_vkb = result_map + .get("feerate") + .unwrap_or(&serde_json::Value::from_str("0.00001").unwrap()) + .as_f64() + .unwrap(); + + // convert to sat/vB and round up + Ok((btc_vkb * 100_000_000.0 / 1000.0).ceil() as u64) } - // get_block returns the block at the given hash - pub async fn get_block(&self, hash: BlockHash) -> ClientResult { + async fn get_block(&self, hash: &BlockHash) -> ClientResult { let result = self .call::>("getblock", &[to_value(hash.to_string())?, to_value(3)?]) .await? @@ -258,7 +195,45 @@ impl BitcoinClient { }) } - pub async fn list_since_block(&self, blockhash: String) -> ClientResult> { + async fn get_block_at(&self, height: Height) -> ClientResult { + let hash = self.get_block_hash(height).await?; + let block = self.get_block(&hash).await?; + Ok(block) + } + + async fn get_block_count(&self) -> ClientResult { + self.call::("getblockcount", &[]).await + } + + async fn get_block_hash(&self, height: u64) -> ClientResult { + let hash = self + .call::("getblockhash", &[to_value(height)?]) + .await?; + Ok( + BlockHash::from_str(&hash) + .map_err(|e| ClientError::MalformedResponse(e.to_string()))?, + ) + } + + async fn get_blockchain_info(&self) -> ClientResult { + let res = self + .call::("getblockchaininfo", &[]) + .await?; + Ok(res) + } + + // get_mempool_txids returns a list of txids in the current mempool + async fn get_raw_mempool(&self) -> ClientResult> { + let result = self + .call::>("getrawmempool", &[]) + .await? + .to_string(); + + serde_json::from_str::>(&result) + .map_err(|e| ClientError::MalformedResponse(e.to_string())) + } + + async fn list_since_block(&self, blockhash: &BlockHash) -> ClientResult> { let result = self .call::>("listsinceblock", &[to_value(blockhash)?]) .await? @@ -273,85 +248,11 @@ impl BitcoinClient { .collect(); Ok(txids) } - - // get_change_address returns a change address for the wallet of bitcoind - async fn get_change_address(&self) -> ClientResult
{ - let address_string = self.call::("getrawchangeaddress", &[]).await?; - let addr = Address::from_str(&address_string).and_then(|x| x.require_network(self.network)); - addr.map_err(|_| ClientError::WrongNetworkAddress(self.network)) - } - - pub async fn get_change_addresses(&self) -> ClientResult<[Address; 2]> { - let change_address = self.get_change_address().await?; - let change_address_2 = self.get_change_address().await?; - - Ok([change_address, change_address_2]) - } - - #[cfg(test)] - pub async fn send_to_address(&self, address: String, amt: u32) -> anyhow::Result { - if self.network == Network::Regtest { - let result = self - .call::>( - "sendtoaddress", - &[ - to_value(address)?, - to_value(amt)?, - // All the following items are needed to pass the fee-rate and fee-rate - // needs to be passed just in case the regtest - // chain cannot estimate fee rate due to - // insufficient blocks - to_value("")?, - to_value("")?, - to_value(true)?, - to_value(true)?, - to_value(>::None)?, - to_value("unset")?, - to_value(>::None)?, - to_value(1.1)?, // fee rate - ], - ) - .await; - Ok(result.unwrap().to_string()) - } else { - Err(anyhow::anyhow!( - "Cannot send_to_address on non-regtest network" - )) - } - } - pub async fn list_wallets(&self) -> ClientResult> { - self.call::>("listwallets", &[]).await - } } #[async_trait] -impl L1Client for BitcoinClient { - async fn get_blockchain_info(&self) -> ClientResult { - let res = self - .call::("getblockchaininfo", &[]) - .await?; - Ok(res) - } - - // get_block_hash returns the block hash of the block at the given height - async fn get_block_hash(&self, height: u64) -> ClientResult { - let hash = self - .call::("getblockhash", &[to_value(height)?]) - .await?; - Ok( - BlockHash::from_str(&hash) - .map_err(|e| ClientError::MalformedResponse(e.to_string()))?, - ) - } - - async fn get_block_at(&self, height: u64) -> ClientResult { - let hash = self.get_block_hash(height).await?; - let block = self.get_block(hash).await?; - Ok(block) - } - - // send_raw_transaction sends a raw transaction to the network - async fn send_raw_transaction + Send>(&self, tx: T) -> ClientResult { +impl BitcoinBroadcaster for BitcoinClient { + async fn send_raw_transaction(&self, tx: &Transaction) -> ClientResult { let txstr = hex::encode(tx); let resp = self .call::("sendrawtransaction", &[to_value(txstr)?]) @@ -371,20 +272,23 @@ impl L1Client for BitcoinClient { Err(e) => Err(ClientError::MalformedResponse(e.to_string())), } } +} - async fn get_transaction_info(&self, txid: Txid) -> ClientResult { +#[async_trait] +impl BitcoinSigner for BitcoinClient { + async fn get_new_address(&self) -> ClientResult
{ + todo!() + } + async fn get_transaction(&self, txid: &Txid) -> ClientResult { Ok(self - .call::("gettransaction", &[to_val(txid.to_string())?]) + .call::("gettransaction", &[to_val(txid.to_string())?]) .await?) } -} -#[async_trait] -impl SeqL1Client for BitcoinClient { // get_utxos returns all unspent transaction outputs for the wallets of bitcoind - async fn get_utxos(&self) -> ClientResult> { + async fn get_utxos(&self) -> ClientResult> { let utxos = self - .call::>("listunspent", &[to_value(0)?, to_value(9999999)?]) + .call::>("listunspent", &[to_value(0)?, to_value(9999999)?]) .await?; if utxos.is_empty() { @@ -394,27 +298,43 @@ impl SeqL1Client for BitcoinClient { Ok(utxos) } - // estimate_smart_fee estimates the fee to confirm a transaction in the next block - async fn estimate_smart_fee(&self) -> ClientResult { - let result = self - .call::>("estimatesmartfee", &[to_value(1)?]) - .await? - .to_string(); - - let result_map: serde_json::Value = serde_json::from_str(&result)?; + async fn list_transactions( + &self, + confirmations: Option, + ) -> ClientResult> { + let res = self + .call::("listtransactions", &[to_value(confirmations)?]) + .await?; + if let serde_json::Value::Array(array) = res { + Ok(array + .iter() + .map(|el| { + ( + serde_json::from_value::(el.get("txid").unwrap().clone()) + .unwrap() + .clone(), + serde_json::from_value::(el.get("time").unwrap().clone()).unwrap(), + ) + }) + .collect()) + } else { + Err(ClientError::MalformedResponse(res.to_string())) + } + } - let btc_vkb = result_map - .get("feerate") - .unwrap_or(&serde_json::Value::from_str("0.00001").unwrap()) - .as_f64() - .unwrap(); + async fn list_wallets(&self) -> ClientResult> { + self.call::>("listwallets", &[]).await + } - // convert to sat/vB and round up - Ok((btc_vkb * 100_000_000.0 / 1000.0).ceil() as u64) + async fn send_to_address(&self, address: &Address, amount: Amount) -> ClientResult { + todo!() } // sign_raw_transaction_with_wallet signs a raw transaction with the wallet of bitcoind - async fn sign_raw_transaction_with_wallet(&self, tx: Transaction) -> ClientResult { + async fn sign_raw_transaction_with_wallet( + &self, + tx: &Transaction, + ) -> ClientResult { #[derive(Serialize, Deserialize, Debug)] struct SignError { txid: String, @@ -460,8 +380,4 @@ impl SeqL1Client for BitcoinClient { } } } - - fn network(&self) -> Network { - self.network - } }