Skip to content

Refactor sync checks #84

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 16, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client/src/client.rs
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ use spaces_protocol::{
validate::{TxChangeSet, UpdateKind, Validator},
Bytes, Covenant, FullSpaceOut, RevokeReason, SpaceOut,
};
use spaces_wallet::bitcoin::Transaction;
use spaces_wallet::bitcoin::{Network, Transaction};

use crate::{
source::BitcoinRpcError,
@@ -27,7 +27,7 @@ pub trait BlockSource {
fn get_median_time(&self) -> Result<u64, BitcoinRpcError>;
fn in_mempool(&self, txid: &Txid, height: u32) -> Result<bool, BitcoinRpcError>;
fn get_block_count(&self) -> Result<u64, BitcoinRpcError>;
fn get_best_chain(&self) -> Result<ChainAnchor, BitcoinRpcError>;
fn get_best_chain(&self, tip: Option<u32>, expected_chain: Network) -> Result<Option<ChainAnchor>, BitcoinRpcError>;
}

#[derive(Debug, Clone)]
4 changes: 4 additions & 0 deletions client/src/config.rs
Original file line number Diff line number Diff line change
@@ -79,6 +79,9 @@ pub struct Args {
/// Skip maintaining historical root anchors
#[arg(long, env = "SPACED_SKIP_ANCHORS", default_value = "false")]
skip_anchors: bool,
/// The specified Bitcoin RPC is a light client
#[arg(long, env = "SPACED_BITCOIN_RPC_LIGHT", default_value = "false")]
bitcoin_rpc_light: bool,
}

#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, ValueEnum, Serialize, Deserialize)]
@@ -164,6 +167,7 @@ impl Args {
let rpc = BitcoinRpc::new(
&args.bitcoin_rpc_url.expect("bitcoin rpc url"),
bitcoin_rpc_auth,
!args.bitcoin_rpc_light
);

let genesis = Spaced::genesis(&rpc, args.chain).await?;
7 changes: 4 additions & 3 deletions client/src/format.rs
Original file line number Diff line number Diff line change
@@ -152,9 +152,10 @@ pub fn print_list_unspent(utxos: Vec<WalletOutput>, format: Format) {
pub fn print_server_info(info: ServerInfo, format: Format) {
match format {
Format::Text => {
println!("CHAIN: {}", info.chain);
println!(" Height {}", info.tip.height);
println!(" Hash {}", info.tip.hash);
println!("Network: {}", info.network);
println!("Height {}", info.tip.height);
println!("Hash {}", info.tip.hash);
println!("Progress {:.2}%", info.progress * 100.0);
}
Format::Json => {
println!("{}", serde_json::to_string_pretty(&info).unwrap());
65 changes: 51 additions & 14 deletions client/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -66,8 +66,16 @@ pub(crate) type Responder<T> = oneshot::Sender<T>;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerInfo {
pub chain: ExtendedNetwork,
pub network: String,
pub tip: ChainAnchor,
pub chain: ChainInfo,
pub progress: f32,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChainInfo {
blocks: u32,
headers: u32,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -99,8 +107,8 @@ pub enum ChainStateCommand {
txs: Vec<String>,
resp: Responder<anyhow::Result<Vec<Option<TxChangeSet>>>>,
},
GetTip {
resp: Responder<anyhow::Result<ChainAnchor>>,
GetServerInfo {
resp: Responder<anyhow::Result<ServerInfo>>,
},
GetSpace {
hash: SpaceKey,
@@ -723,13 +731,12 @@ impl RpcServerImpl {
#[async_trait]
impl RpcServer for RpcServerImpl {
async fn get_server_info(&self) -> Result<ServerInfo, ErrorObjectOwned> {
let chain = self.wallet_manager.network;
let tip = self
let info = self
.store
.get_tip()
.get_server_info()
.await
.map_err(|error| ErrorObjectOwned::owned(-1, error.to_string(), None::<String>))?;
Ok(ServerInfo { chain, tip })
Ok(info)
}

async fn get_space(
@@ -1083,7 +1090,7 @@ impl AsyncChainState {
rpc,
chain_state,
)
.await?;
.await?;

Ok(block
.block_meta
@@ -1092,6 +1099,7 @@ impl AsyncChainState {
.find(|tx| &tx.changeset.txid == txid))
}


async fn get_indexed_block(
index: &mut Option<LiveSnapshot>,
height_or_hash: HeightOrHash,
@@ -1173,9 +1181,9 @@ impl AsyncChainState {
let result = emulator.apply_package(tip.height + 1, txs);
let _ = resp.send(result);
}
ChainStateCommand::GetTip { resp } => {
ChainStateCommand::GetServerInfo { resp } => {
let tip = chain_state.tip.read().expect("read meta").clone();
_ = resp.send(Ok(tip))
_ = resp.send(get_server_info(client, rpc, tip).await)
}
ChainStateCommand::GetSpace { hash, resp } => {
let result = chain_state.get_space_info(&hash);
@@ -1204,7 +1212,7 @@ impl AsyncChainState {
rpc,
chain_state,
)
.await;
.await;
let _ = resp.send(res);
}
ChainStateCommand::GetTxMeta { txid, resp } => {
@@ -1266,7 +1274,7 @@ impl AsyncChainState {
File::open(anchors_path)
.or_else(|e| Err(anyhow!("Could not open anchors file: {}", e)))?,
)
.or_else(|e| Err(anyhow!("Could not read anchors file: {}", e)))?;
.or_else(|e| Err(anyhow!("Could not read anchors file: {}", e)))?;
return Ok(anchors);
}

@@ -1498,9 +1506,9 @@ impl AsyncChainState {
resp_rx.await?
}

pub async fn get_tip(&self) -> anyhow::Result<ChainAnchor> {
pub async fn get_server_info(&self) -> anyhow::Result<ServerInfo> {
let (resp, resp_rx) = oneshot::channel();
self.sender.send(ChainStateCommand::GetTip { resp }).await?;
self.sender.send(ChainStateCommand::GetServerInfo { resp }).await?;
resp_rx.await?
}

@@ -1561,3 +1569,32 @@ fn get_space_key(space_or_hash: &str) -> Result<SpaceKey, ErrorObjectOwned> {

Ok(SpaceKey::from(hash))
}


async fn get_server_info(client: &reqwest::Client, rpc: &BitcoinRpc, tip: ChainAnchor) -> anyhow::Result<ServerInfo> {
#[derive(Deserialize)]
struct Info {
pub chain: String,
pub headers: u32,
pub blocks: u32,
}

let info: Info = rpc
.send_json(client, &rpc.get_blockchain_info())
.await
.map_err(|e| anyhow!("Could not retrieve blockchain info ({})", e))?;

Ok(ServerInfo {
network: info.chain,
tip,
chain: ChainInfo {
blocks: info.blocks,
headers: info.headers,
},
progress: if info.headers != 0 && info.headers >= tip.height {
tip.height as f32 / info.headers as f32
} else {
0.0
},
})
}
72 changes: 57 additions & 15 deletions client/src/source.rs
Original file line number Diff line number Diff line change
@@ -12,15 +12,15 @@ use std::{
use base64::Engine;
use bitcoin::{Block, BlockHash, Txid};
use hex::FromHexError;
use log::error;
use log::{error, warn};
use reqwest::StatusCode;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::Value;
use spaces_protocol::constants::ChainAnchor;
use spaces_wallet::{bitcoin, bitcoin::Transaction};
use threadpool::ThreadPool;
use tokio::time::Instant;

use spaces_protocol::bitcoin::Network;
use crate::{client::BlockSource, std_wait};

const BITCOIN_RPC_IN_WARMUP: i32 = -28; // Client still warming up
@@ -34,9 +34,11 @@ pub struct BitcoinRpc {
id: Arc<AtomicU64>,
auth_token: Option<String>,
url: String,
legacy: bool
}

pub struct BlockFetcher {
chain: Network,
src: BitcoinBlockSource,
job_id: Arc<AtomicUsize>,
sender: std::sync::mpsc::SyncSender<BlockEvent>,
@@ -121,18 +123,23 @@ trait ErrorForRpcBlocking {
}

impl BitcoinRpc {
pub fn new(url: &str, auth: BitcoinRpcAuth) -> Self {
pub fn new(url: &str, auth: BitcoinRpcAuth, legacy: bool) -> Self {
Self {
id: Default::default(),
auth_token: auth.to_token(),
url: url.to_string(),
legacy,
}
}

pub fn make_request(&self, method: &str, params: serde_json::Value) -> BitcoinRpcRequest {
pub fn make_request(&self, method: &str, params: Value) -> BitcoinRpcRequest {
let id = self.id.fetch_add(1, Ordering::Relaxed);
let body = serde_json::json!({
"jsonrpc": "1.0",
"jsonrpc": if self.legacy {
"1.0"
} else {
"2.0"
},
"id": id.to_string(),
"method": method,
"params": params,
@@ -381,12 +388,14 @@ impl BitcoinRpcAuth {

impl BlockFetcher {
pub fn new(
chain: Network,
src: BitcoinBlockSource,
num_workers: usize,
) -> (Self, std::sync::mpsc::Receiver<BlockEvent>) {
let (tx, rx) = std::sync::mpsc::sync_channel(12);
(
Self {
chain,
src,
job_id: Arc::new(AtomicUsize::new(0)),
sender: tx,
@@ -401,10 +410,15 @@ impl BlockFetcher {
}

fn should_sync(
expected_chain: Network,
source: &BitcoinBlockSource,
start: ChainAnchor,
) -> Result<Option<ChainAnchor>, BlockFetchError> {
let tip = source.get_best_chain()?;
let tip = match source.get_best_chain(Some(start.height), expected_chain)? {
Some(tip) => tip,
None => return Ok(None),
};

if start.height > tip.height {
return Err(BlockFetchError::BlockMismatch);
}
@@ -437,6 +451,7 @@ impl BlockFetcher {
let current_task = self.job_id.clone();
let task_sender = self.sender.clone();
let num_workers = self.num_workers;
let chain = self.chain;

_ = std::thread::spawn(move || {
let mut last_check = Instant::now() - Duration::from_secs(2);
@@ -451,7 +466,7 @@ impl BlockFetcher {
}
last_check = Instant::now();

let tip = match BlockFetcher::should_sync(&task_src, checkpoint) {
let tip = match BlockFetcher::should_sync(chain, &task_src, checkpoint) {
Ok(t) => t,
Err(e) => {
_ = task_sender.send(BlockEvent::Error(e));
@@ -872,21 +887,48 @@ impl BlockSource for BitcoinBlockSource {
.send_json_blocking(&self.client, &self.rpc.get_block_count())?)
}

fn get_best_chain(&self) -> Result<ChainAnchor, BitcoinRpcError> {
fn get_best_chain(&self, tip: Option<u32>, expected_chain: Network) -> Result<Option<ChainAnchor>, BitcoinRpcError> {
#[derive(Deserialize)]
struct Info {
#[serde(rename = "blocks")]
height: u64,
pub chain: String,
pub blocks: u32,
pub headers: u32,
#[serde(rename = "bestblockhash")]
hash: BlockHash,
pub best_block_hash: BlockHash,
}
let info: Info = self
.rpc
.send_json_blocking(&self.client, &self.rpc.get_blockchain_info())?;

Ok(ChainAnchor {
hash: info.hash,
height: info.height as _,
})
let expected_chain = match expected_chain {
Network::Bitcoin => "main",
Network::Regtest => "regtest",
_ => "test"
};
if info.chain != expected_chain {
warn!("Invalid chain from connected rpc node - expected {}, got {}", expected_chain, info.chain);
return Ok(None);
}

let synced = info.headers == info.blocks;
let best_chain = if !synced {
let block_hash = self.get_block_hash(info.blocks)?;
ChainAnchor {
hash: block_hash,
height: info.blocks,
}
} else {
ChainAnchor {
hash: info.best_block_hash,
height: info.headers,
}
};

// If the source is still syncing, and we have a higher tip, wait.
if !synced && tip.is_some_and(|tip| tip > info.blocks) {
return Ok(None);
}

Ok(Some(best_chain))
}
}
6 changes: 5 additions & 1 deletion client/src/sync.rs
Original file line number Diff line number Diff line change
@@ -189,7 +189,11 @@ impl Spaced {
start_block.hash, start_block.height
);

let (fetcher, receiver) = BlockFetcher::new(source.clone(), self.num_workers);
let (fetcher, receiver) = BlockFetcher::new(
self.network.fallback_network(),
source.clone(),
self.num_workers,
);
fetcher.start(start_block);

let mut shutdown_signal = shutdown.subscribe();
37 changes: 30 additions & 7 deletions client/src/wallets.rs
Original file line number Diff line number Diff line change
@@ -357,7 +357,20 @@ impl RpcWallet {
synced: bool,
) -> anyhow::Result<()> {
match command {
WalletCommand::GetInfo { resp } => _ = resp.send(Ok(wallet.get_info())),
WalletCommand::GetInfo { resp } =>{
let mut wallet_info = wallet.get_info();
let best_chain = source
.get_best_chain(Some(wallet_info.tip), wallet.config.network);
if let Ok(Some(best_chain)) = best_chain {
wallet_info.progress = if best_chain.height >= wallet_info.tip {
wallet_info.tip as f32 / best_chain.height as f32
} else {
0.0
}
}

_ = resp.send(Ok(wallet_info))
},
WalletCommand::BatchTx { request, resp } => {
if !synced && !request.force {
_ = resp.send(Err(anyhow::anyhow!("Wallet is syncing")));
@@ -462,14 +475,17 @@ impl RpcWallet {
protocol: &mut LiveSnapshot,
wallet: &SpacesWallet,
) -> Option<ChainAnchor> {
let bitcoin_tip = match bitcoin.get_best_chain() {
Ok(tip) => tip,
let wallet_tip = wallet.local_chain().tip();

let bitcoin_tip = match bitcoin.get_best_chain(Some(wallet_tip.height()), wallet.config.network) {
Ok(Some(tip)) => tip,
Ok(None) => return None,
Err(e) => {
warn!("Sync check failed: {}", e);
return None;
}
};
let wallet_tip = wallet.local_chain().tip();

let protocol_tip = match protocol.tip.read() {
Ok(tip) => tip.clone(),
Err(e) => {
@@ -493,7 +509,9 @@ impl RpcWallet {
shutdown: broadcast::Sender<()>,
num_workers: usize,
) -> anyhow::Result<()> {
let (fetcher, receiver) = BlockFetcher::new(source.clone(), num_workers);
let (fetcher, receiver) = BlockFetcher::new(network.fallback_network(),
source.clone(),
num_workers);

let mut wallet_tip = {
let tip = wallet.local_chain().tip();
@@ -547,8 +565,13 @@ impl RpcWallet {
}
BlockEvent::Error(e) if matches!(e, BlockFetchError::BlockMismatch) => {
let mut checkpoint_in_chain = None;
let best_chain = match source.get_best_chain() {
Ok(best) => best,
let best_chain = match source.get_best_chain(Some(wallet_tip.height), network.fallback_network()) {
Ok(Some(best)) => best,
Ok(None) => {
warn!("Waiting for source to sync");
fetcher.restart(wallet_tip, &receiver);
continue;
}
Err(error) => {
warn!("Wallet error: {}", error);
fetcher.restart(wallet_tip, &receiver);
4 changes: 3 additions & 1 deletion client/tests/fetcher_tests.rs
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ use spaces_client::source::{
BitcoinBlockSource, BitcoinRpc, BitcoinRpcAuth, BlockEvent, BlockFetcher,
};
use spaces_protocol::{bitcoin::BlockHash, constants::ChainAnchor};
use spaces_protocol::bitcoin::Network;
use spaces_testutil::TestRig;

async fn setup(blocks: u64) -> Result<(TestRig, u64, BlockHash)> {
@@ -27,8 +28,9 @@ fn test_block_fetching_from_bitcoin_rpc() -> Result<()> {
let fetcher_rpc = BitcoinBlockSource::new(BitcoinRpc::new(
&rig.bitcoind.rpc_url(),
BitcoinRpcAuth::UserPass("user".to_string(), "password".to_string()),
true
));
let (fetcher, receiver) = BlockFetcher::new(fetcher_rpc.clone(), 8);
let (fetcher, receiver) = BlockFetcher::new(Network::Regtest, fetcher_rpc.clone(), 8);
fetcher.start(ChainAnchor { hash, height: 0 });

let timeout = Duration::from_secs(5);
2 changes: 2 additions & 0 deletions wallet/src/lib.rs
Original file line number Diff line number Diff line change
@@ -104,6 +104,7 @@ pub struct WalletInfo {
pub start_block: u32,
pub tip: u32,
pub descriptors: Vec<DescriptorInfo>,
pub progress: f32,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -589,6 +590,7 @@ impl SpacesWallet {
start_block: self.config.start_block,
tip: self.internal.local_chain().tip().height(),
descriptors,
progress: 0.0,
}
}