diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 340b7f898d9..f729cecc012 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -12,52 +12,68 @@ jobs: beta, # 1.41.1 is MSRV for Rust-Lightning, lightning-invoice, and lightning-persister 1.41.1, - # 1.45.2 is MSRV for lightning-net-tokio, lightning-block-sync, lightning-background-processor, and coverage generation + # 1.45.2 is MSRV for lightning-net-tokio, lightning-block-sync, lightning-background-processor 1.45.2, # 1.47.0 will be the MSRV for no-std builds using hashbrown once core2 is updated - 1.47.0] + 1.47.0, + # 1.59.0 is the MSRV for lightning-transaction-sync + 1.59.0] include: - toolchain: stable build-net-tokio: true build-no-std: true build-futures: true + build-tx-sync: true + coverage: true - toolchain: stable platform: macos-latest build-net-tokio: true build-no-std: true build-futures: true + build-tx-sync: true - toolchain: beta platform: macos-latest build-net-tokio: true build-no-std: true build-futures: true + build-tx-sync: true - toolchain: stable platform: windows-latest build-net-tokio: true build-no-std: true build-futures: true + build-tx-sync: false - toolchain: beta platform: windows-latest build-net-tokio: true build-no-std: true build-futures: true + build-tx-sync: false - toolchain: beta build-net-tokio: true build-no-std: true build-futures: true + build-tx-sync: true - toolchain: 1.41.1 build-no-std: false test-log-variants: true build-futures: false + build-tx-sync: false - toolchain: 1.45.2 build-net-old-tokio: true build-net-tokio: true build-no-std: false build-futures: true - coverage: true + build-tx-sync: false - toolchain: 1.47.0 build-futures: true build-no-std: true + build-tx-sync: false + - toolchain: 1.59.0 + build-net-tokio: false + build-no-std: false + build-futures: false + build-tx-sync: true runs-on: ${{ matrix.platform }} steps: - name: Checkout source code @@ -73,10 +89,10 @@ jobs: run: cargo update -p tokio --precise "1.14.0" --verbose env: CARGO_NET_GIT_FETCH_WITH_CLI: "true" - - name: Build on Rust ${{ matrix.toolchain }} with net-tokio - if: "matrix.build-net-tokio && !matrix.coverage" + - name: Build on Rust ${{ matrix.toolchain }} with net-tokio and tx-sync + if: "matrix.build-net-tokio && !matrix.coverage && matrix.build-tx-sync" run: cargo build --verbose --color always - - name: Build on Rust ${{ matrix.toolchain }} with net-tokio and full code-linking for coverage generation + - name: Build on Rust ${{ matrix.toolchain }} with net-tokio, tx-sync, and full code-linking for coverage generation if: matrix.coverage run: RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always - name: Build on Rust ${{ matrix.toolchain }} @@ -108,14 +124,32 @@ jobs: RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client,rest-client RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client,rest-client,tokio + - name: Build Transaction Sync Clients on Rust ${{ matrix.toolchain }} with features + if: "matrix.build-tx-sync && !matrix.coverage" + run: | + cd lightning-transaction-sync + cargo build --verbose --color always --features esplora-blocking + cargo build --verbose --color always --features esplora-async + - name: Build transaction sync clients on Rust ${{ matrix.toolchain }} with features and full code-linking for coverage generation + if: "matrix.build-tx-sync && matrix.coverage" + run: | + cd lightning-transaction-sync + RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features esplora-blocking + RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features esplora-async + - name: Test transaction sync clients on Rust ${{ matrix.toolchain }} with features + if: "matrix.build-tx-sync" + run: | + cd lightning-transaction-sync + cargo test --verbose --color always --features esplora-blocking + cargo test --verbose --color always --features esplora-async - name: Test backtrace-debug builds on Rust ${{ matrix.toolchain }} if: "matrix.toolchain == 'stable'" run: | cd lightning && cargo test --verbose --color always --features backtrace - name: Test on Rust ${{ matrix.toolchain }} with net-tokio - if: "matrix.build-net-tokio && !matrix.coverage" + if: "matrix.build-net-tokio && !matrix.coverage && matrix.build-tx-sync" run: cargo test --verbose --color always - - name: Test on Rust ${{ matrix.toolchain }} with net-tokio and full code-linking for coverage generation + - name: Test on Rust ${{ matrix.toolchain }} with net-tokio, tx-sync, and full code-linking for coverage generation if: matrix.coverage run: RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always - name: Test no-std builds on Rust ${{ matrix.toolchain }} @@ -349,7 +383,7 @@ jobs: linting: runs-on: ubuntu-latest env: - TOOLCHAIN: 1.47.0 + TOOLCHAIN: stable steps: - name: Checkout source code uses: actions/checkout@v3 diff --git a/Cargo.toml b/Cargo.toml index 89b92a8c6e4..e8565e7ac09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "lightning", "lightning-block-sync", + "lightning-transaction-sync", "lightning-invoice", "lightning-net-tokio", "lightning-persister", diff --git a/lightning-transaction-sync/Cargo.toml b/lightning-transaction-sync/Cargo.toml new file mode 100644 index 00000000000..ae29753ecc4 --- /dev/null +++ b/lightning-transaction-sync/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "lightning-transaction-sync" +version = "0.0.113" +authors = ["Elias Rohrer"] +license = "MIT OR Apache-2.0" +repository = "http://github.com/lightningdevkit/rust-lightning" +description = """ +Utilities for syncing LDK via the transaction-based `Confirm` interface. +""" +edition = "2018" + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[features] +default = [] +esplora-async = ["async-interface", "esplora-client/async", "futures"] +esplora-blocking = ["esplora-client/blocking"] +async-interface = [] + +[dependencies] +lightning = { version = "0.0.113", path = "../lightning" } +bitcoin = "0.29.0" +bdk-macros = "0.6" +futures = { version = "0.3", optional = true } +esplora-client = { version = "0.3.0", default-features = false, optional = true } + +[dev-dependencies] +electrsd = { version = "0.22.0", features = ["legacy", "esplora_a33e97e1", "bitcoind_23_0"] } +electrum-client = "0.12.0" +once_cell = "1.16.0" +tokio = { version = "1.14.0", features = ["full"] } diff --git a/lightning-transaction-sync/src/common.rs b/lightning-transaction-sync/src/common.rs new file mode 100644 index 00000000000..a6ee61e90f2 --- /dev/null +++ b/lightning-transaction-sync/src/common.rs @@ -0,0 +1,75 @@ +use lightning::chain::WatchedOutput; +use bitcoin::{Txid, BlockHash, Transaction, BlockHeader, OutPoint}; + +use std::collections::{HashSet, HashMap}; + + +// Represents the current state. +pub(crate) struct SyncState { + // Transactions that were previously processed, but must not be forgotten + // yet since they still need to be monitored for confirmation on-chain. + pub watched_transactions: HashSet, + // Outputs that were previously processed, but must not be forgotten yet as + // as we still need to monitor any spends on-chain. + pub watched_outputs: HashMap, + // The tip hash observed during our last sync. + pub last_sync_hash: Option, + // Indicates whether we need to resync, e.g., after encountering an error. + pub pending_sync: bool, +} + +impl SyncState { + pub fn new() -> Self { + Self { + watched_transactions: HashSet::new(), + watched_outputs: HashMap::new(), + last_sync_hash: None, + pending_sync: false, + } + } +} + + +// A queue that is to be filled by `Filter` and drained during the next syncing round. +pub(crate) struct FilterQueue { + // Transactions that were registered via the `Filter` interface and have to be processed. + pub transactions: HashSet, + // Outputs that were registered via the `Filter` interface and have to be processed. + pub outputs: HashMap, +} + +impl FilterQueue { + pub fn new() -> Self { + Self { + transactions: HashSet::new(), + outputs: HashMap::new(), + } + } + + // Processes the transaction and output queues and adds them to the given [`SyncState`]. + // + // Returns `true` if new items had been registered. + pub fn process_queues(&mut self, sync_state: &mut SyncState) -> bool { + let mut pending_registrations = false; + + if !self.transactions.is_empty() { + pending_registrations = true; + + sync_state.watched_transactions.extend(self.transactions.drain()); + } + + if !self.outputs.is_empty() { + pending_registrations = true; + + sync_state.watched_outputs.extend(self.outputs.drain()); + } + pending_registrations + } +} + +pub(crate) struct ConfirmedTx { + pub tx: Transaction, + pub block_header: BlockHeader, + pub block_height: u32, + pub pos: usize, +} diff --git a/lightning-transaction-sync/src/error.rs b/lightning-transaction-sync/src/error.rs new file mode 100644 index 00000000000..0a529d063ec --- /dev/null +++ b/lightning-transaction-sync/src/error.rs @@ -0,0 +1,63 @@ +use std::fmt; + +#[derive(Debug)] +/// An error that possibly needs to be handled by the user. +pub enum TxSyncError { + /// A transaction sync failed and needs to be retried eventually. + Failed, +} + +impl std::error::Error for TxSyncError {} + +impl fmt::Display for TxSyncError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Self::Failed => write!(f, "Failed to conduct transaction sync."), + } + } +} + +#[derive(Debug)] +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +pub(crate) enum InternalError { + /// A transaction sync failed and needs to be retried eventually. + Failed, + /// An inconsisteny was encounterd during transaction sync. + Inconsistency, +} + +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +impl fmt::Display for InternalError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Self::Failed => write!(f, "Failed to conduct transaction sync."), + Self::Inconsistency => { + write!(f, "Encountered an inconsisteny during transaction sync.") + } + } + } +} + +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +impl std::error::Error for InternalError {} + +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +impl From for TxSyncError { + fn from(_e: esplora_client::Error) -> Self { + Self::Failed + } +} + +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +impl From for InternalError { + fn from(_e: esplora_client::Error) -> Self { + Self::Failed + } +} + +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +impl From for TxSyncError { + fn from(_e: InternalError) -> Self { + Self::Failed + } +} diff --git a/lightning-transaction-sync/src/esplora.rs b/lightning-transaction-sync/src/esplora.rs new file mode 100644 index 00000000000..807ef807500 --- /dev/null +++ b/lightning-transaction-sync/src/esplora.rs @@ -0,0 +1,383 @@ +use crate::error::{TxSyncError, InternalError}; +use crate::common::{SyncState, FilterQueue, ConfirmedTx}; + +use lightning::util::logger::Logger; +use lightning::{log_error, log_given_level, log_info, log_internal, log_debug, log_trace}; +use lightning::chain::WatchedOutput; +use lightning::chain::{Confirm, Filter}; + +use bitcoin::{BlockHash, Script, Txid}; + +use esplora_client::Builder; +#[cfg(feature = "async-interface")] +use esplora_client::r#async::AsyncClient; +#[cfg(not(feature = "async-interface"))] +use esplora_client::blocking::BlockingClient; + +use std::collections::HashSet; +use core::ops::Deref; + +/// Synchronizes LDK with a given [`Esplora`] server. +/// +/// Needs to be registered with a [`ChainMonitor`] via the [`Filter`] interface to be informed of +/// transactions and outputs to monitor for on-chain confirmation, unconfirmation, and +/// reconfirmation. +/// +/// Note that registration via [`Filter`] needs to happen before any calls to +/// [`Watch::watch_channel`] to ensure we get notified of the items to monitor. +/// +/// This uses and exposes either a blocking or async client variant dependent on whether the +/// `esplora-blocking` or the `esplora-async` feature is enabled. +/// +/// [`Esplora`]: https://github.com/Blockstream/electrs +/// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor +/// [`Watch::watch_channel`]: lightning::chain::Watch::watch_channel +/// [`Filter`]: lightning::chain::Filter +pub struct EsploraSyncClient +where + L::Target: Logger, +{ + sync_state: MutexType, + queue: std::sync::Mutex, + client: EsploraClientType, + logger: L, +} + +impl EsploraSyncClient +where + L::Target: Logger, +{ + /// Returns a new [`EsploraSyncClient`] object. + pub fn new(server_url: String, logger: L) -> Self { + let builder = Builder::new(&server_url); + #[cfg(not(feature = "async-interface"))] + let client = builder.build_blocking().unwrap(); + #[cfg(feature = "async-interface")] + let client = builder.build_async().unwrap(); + + EsploraSyncClient::from_client(client, logger) + } + + /// Returns a new [`EsploraSyncClient`] object using the given Esplora client. + pub fn from_client(client: EsploraClientType, logger: L) -> Self { + let sync_state = MutexType::new(SyncState::new()); + let queue = std::sync::Mutex::new(FilterQueue::new()); + Self { + sync_state, + queue, + client, + logger, + } + } + + /// Synchronizes the given `confirmables` via their [`Confirm`] interface implementations. This + /// method should be called regularly to keep LDK up-to-date with current chain data. + /// + /// For example, instances of [`ChannelManager`] and [`ChainMonitor`] can be informed about the + /// newest on-chain activity related to the items previously registered via the [`Filter`] + /// interface. + /// + /// [`Confirm`]: lightning::chain::Confirm + /// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor + /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager + /// [`Filter`]: lightning::chain::Filter + #[maybe_async] + pub fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync + Send)>) -> Result<(), TxSyncError> { + // This lock makes sure we're syncing once at a time. + #[cfg(not(feature = "async-interface"))] + let mut sync_state = self.sync_state.lock().unwrap(); + #[cfg(feature = "async-interface")] + let mut sync_state = self.sync_state.lock().await; + + log_info!(self.logger, "Starting transaction sync."); + + let mut tip_hash = maybe_await!(self.client.get_tip_hash())?; + + loop { + let pending_registrations = self.queue.lock().unwrap().process_queues(&mut sync_state); + let tip_is_new = Some(tip_hash) != sync_state.last_sync_hash; + + // We loop until any registered transactions have been processed at least once, or the + // tip hasn't been updated during the last iteration. + if !sync_state.pending_sync && !pending_registrations && !tip_is_new { + // Nothing to do. + break; + } else { + // Update the known tip to the newest one. + if tip_is_new { + // First check for any unconfirmed transactions and act on it immediately. + match maybe_await!(self.get_unconfirmed_transactions(&confirmables)) { + Ok(unconfirmed_txs) => { + // Double-check the tip hash. If it changed, a reorg happened since + // we started syncing and we need to restart last-minute. + let check_tip_hash = maybe_await!(self.client.get_tip_hash())?; + if check_tip_hash != tip_hash { + tip_hash = check_tip_hash; + continue; + } + + self.sync_unconfirmed_transactions(&mut sync_state, &confirmables, unconfirmed_txs); + }, + Err(err) => { + // (Semi-)permanent failure, retry later. + log_error!(self.logger, "Failed during transaction sync, aborting."); + sync_state.pending_sync = true; + return Err(TxSyncError::from(err)); + } + } + + match maybe_await!(self.sync_best_block_updated(&confirmables, &tip_hash)) { + Ok(()) => {} + Err(InternalError::Inconsistency) => { + // Immediately restart syncing when we encounter any inconsistencies. + log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting."); + sync_state.pending_sync = true; + continue; + } + Err(err) => { + // (Semi-)permanent failure, retry later. + sync_state.pending_sync = true; + return Err(TxSyncError::from(err)); + } + } + } + + match maybe_await!(self.get_confirmed_transactions(&sync_state)) { + Ok(confirmed_txs) => { + // Double-check the tip hash. If it changed, a reorg happened since + // we started syncing and we need to restart last-minute. + let check_tip_hash = maybe_await!(self.client.get_tip_hash())?; + if check_tip_hash != tip_hash { + tip_hash = check_tip_hash; + continue; + } + + self.sync_confirmed_transactions( + &mut sync_state, + &confirmables, + confirmed_txs, + ); + } + Err(InternalError::Inconsistency) => { + // Immediately restart syncing when we encounter any inconsistencies. + log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting."); + sync_state.pending_sync = true; + continue; + } + Err(err) => { + // (Semi-)permanent failure, retry later. + log_error!(self.logger, "Failed during transaction sync, aborting."); + sync_state.pending_sync = true; + return Err(TxSyncError::from(err)); + } + } + sync_state.last_sync_hash = Some(tip_hash); + sync_state.pending_sync = false; + } + } + log_info!(self.logger, "Finished transaction sync."); + Ok(()) + } + + #[maybe_async] + fn sync_best_block_updated( + &self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, tip_hash: &BlockHash, + ) -> Result<(), InternalError> { + + // Inform the interface of the new block. + let tip_header = maybe_await!(self.client.get_header_by_hash(tip_hash))?; + let tip_status = maybe_await!(self.client.get_block_status(&tip_hash))?; + if tip_status.in_best_chain { + if let Some(tip_height) = tip_status.height { + for c in confirmables { + c.best_block_updated(&tip_header, tip_height); + } + } + } else { + return Err(InternalError::Inconsistency); + } + Ok(()) + } + + fn sync_confirmed_transactions( + &self, sync_state: &mut SyncState, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, confirmed_txs: Vec, + ) { + for ctx in confirmed_txs { + for c in confirmables { + c.transactions_confirmed( + &ctx.block_header, + &[(ctx.pos, &ctx.tx)], + ctx.block_height, + ); + } + + sync_state.watched_transactions.remove(&ctx.tx.txid()); + + for input in &ctx.tx.input { + sync_state.watched_outputs.remove(&input.previous_output); + } + } + } + + #[maybe_async] + fn get_confirmed_transactions( + &self, sync_state: &SyncState, + ) -> Result, InternalError> { + + // First, check the confirmation status of registered transactions as well as the + // status of dependent transactions of registered outputs. + + let mut confirmed_txs = Vec::new(); + + for txid in &sync_state.watched_transactions { + if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(&txid, None, None))? { + confirmed_txs.push(confirmed_tx); + } + } + + for (_, output) in &sync_state.watched_outputs { + if let Some(output_status) = maybe_await!(self.client + .get_output_status(&output.outpoint.txid, output.outpoint.index as u64))? + { + if let Some(spending_txid) = output_status.txid { + if let Some(spending_tx_status) = output_status.status { + if let Some(confirmed_tx) = maybe_await!(self + .get_confirmed_tx( + &spending_txid, + spending_tx_status.block_hash, + spending_tx_status.block_height, + ))? + { + confirmed_txs.push(confirmed_tx); + } + } + } + } + } + + // Sort all confirmed transactions first by block height, then by in-block + // position, and finally feed them to the interface in order. + confirmed_txs.sort_unstable_by(|tx1, tx2| { + tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos)) + }); + + Ok(confirmed_txs) + } + + #[maybe_async] + fn get_confirmed_tx( + &self, txid: &Txid, expected_block_hash: Option, known_block_height: Option, + ) -> Result, InternalError> { + if let Some(merkle_block) = maybe_await!(self.client.get_merkle_block(&txid))? { + let block_header = merkle_block.header; + let block_hash = block_header.block_hash(); + if let Some(expected_block_hash) = expected_block_hash { + if expected_block_hash != block_hash { + log_trace!(self.logger, "Inconsistency: Tx {} expected in block {}, but is confirmed in {}", txid, expected_block_hash, block_hash); + return Err(InternalError::Inconsistency); + } + } + + let mut matches = Vec::new(); + let mut indexes = Vec::new(); + let _ = merkle_block.txn.extract_matches(&mut matches, &mut indexes); + if indexes.len() != 1 || matches.len() != 1 || matches[0] != *txid { + log_error!(self.logger, "Retrieved Merkle block for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid); + return Err(InternalError::Failed); + } + + let pos = *indexes.get(0).ok_or(InternalError::Failed)? as usize; + if let Some(tx) = maybe_await!(self.client.get_tx(&txid))? { + if let Some(block_height) = known_block_height { + // We can take a shortcut here if a previous call already gave us the height. + return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height })); + } + + let block_status = maybe_await!(self.client.get_block_status(&block_hash))?; + if let Some(block_height) = block_status.height { + return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height })); + } else { + // If any previously-confirmed block suddenly is no longer confirmed, we found + // an inconsistency and should start over. + log_trace!(self.logger, "Inconsistency: Tx {} was unconfirmed during syncing.", txid); + return Err(InternalError::Inconsistency); + } + } + } + Ok(None) + } + + #[maybe_async] + fn get_unconfirmed_transactions( + &self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, + ) -> Result, InternalError> { + // Query the interface for relevant txids and check whether the relevant blocks are still + // in the best chain, mark them unconfirmed otherwise + let relevant_txids = confirmables + .iter() + .flat_map(|c| c.get_relevant_txids()) + .collect::)>>(); + + let mut unconfirmed_txs = Vec::new(); + + for (txid, block_hash_opt) in relevant_txids { + if let Some(block_hash) = block_hash_opt { + let block_status = maybe_await!(self.client.get_block_status(&block_hash))?; + if block_status.in_best_chain { + // Skip if the block in question is still confirmed. + continue; + } + + unconfirmed_txs.push(txid); + } else { + log_error!(self.logger, "Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!"); + panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!"); + } + } + Ok(unconfirmed_txs) + } + + fn sync_unconfirmed_transactions( + &self, sync_state: &mut SyncState, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, unconfirmed_txs: Vec, + ) { + for txid in unconfirmed_txs { + for c in confirmables { + c.transaction_unconfirmed(&txid); + } + + sync_state.watched_transactions.insert(txid); + } + } + + /// Returns a reference to the underlying esplora client. + pub fn client(&self) -> &EsploraClientType { + &self.client + } +} + +#[cfg(feature = "async-interface")] +type MutexType = futures::lock::Mutex; +#[cfg(not(feature = "async-interface"))] +type MutexType = std::sync::Mutex; + +// The underlying client type. +#[cfg(feature = "async-interface")] +type EsploraClientType = AsyncClient; +#[cfg(not(feature = "async-interface"))] +type EsploraClientType = BlockingClient; + + +impl Filter for EsploraSyncClient +where + L::Target: Logger, +{ + fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) { + let mut locked_queue = self.queue.lock().unwrap(); + locked_queue.transactions.insert(*txid); + } + + fn register_output(&self, output: WatchedOutput) { + let mut locked_queue = self.queue.lock().unwrap(); + locked_queue.outputs.insert(output.outpoint.into_bitcoin_outpoint(), output); + } +} diff --git a/lightning-transaction-sync/src/lib.rs b/lightning-transaction-sync/src/lib.rs new file mode 100644 index 00000000000..791490d9dfc --- /dev/null +++ b/lightning-transaction-sync/src/lib.rs @@ -0,0 +1,82 @@ +//! Provides utilities for syncing LDK via the transaction-based [`Confirm`] interface. +//! +//! The provided synchronization clients need to be registered with a [`ChainMonitor`] via the +//! [`Filter`] interface. Then, the respective `fn sync` needs to be called with the [`Confirm`] +//! implementations to be synchronized, i.e., usually instances of [`ChannelManager`] and +//! [`ChainMonitor`]. +//! +//! ## Features and Backend Support +//! +//!- `esplora_blocking` enables syncing against an Esplora backend based on a blocking client. +//!- `esplora_async` enables syncing against an Esplora backend based on an async client. +//! +//! ## Version Compatibility +//! +//! Currently this crate is compatible with nodes that were created with LDK version 0.0.113 and above. +//! +//! ## Usage Example: +//! +//! ```ignore +//! let tx_sync = Arc::new(EsploraSyncClient::new( +//! esplora_server_url, +//! Arc::clone(&some_logger), +//! )); +//! +//! let chain_monitor = Arc::new(ChainMonitor::new( +//! Some(Arc::clone(&tx_sync)), +//! Arc::clone(&some_broadcaster), +//! Arc::clone(&some_logger), +//! Arc::clone(&some_fee_estimator), +//! Arc::clone(&some_persister), +//! )); +//! +//! let channel_manager = Arc::new(ChannelManager::new( +//! Arc::clone(&some_fee_estimator), +//! Arc::clone(&chain_monitor), +//! Arc::clone(&some_broadcaster), +//! Arc::clone(&some_router), +//! Arc::clone(&some_logger), +//! Arc::clone(&some_entropy_source), +//! Arc::clone(&some_node_signer), +//! Arc::clone(&some_signer_provider), +//! user_config, +//! chain_params, +//! )); +//! +//! let confirmables = vec![ +//! &*channel_manager as &(dyn Confirm + Sync + Send), +//! &*chain_monitor as &(dyn Confirm + Sync + Send), +//! ]; +//! +//! tx_sync.sync(confirmables).unwrap(); +//! ``` +//! +//! [`Confirm`]: lightning::chain::Confirm +//! [`Filter`]: lightning::chain::Filter +//! [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor +//! [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager + +// Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings. +#![deny(broken_intra_doc_links)] +#![deny(private_intra_doc_links)] + +#![deny(missing_docs)] +#![deny(unsafe_code)] + +#![cfg_attr(docsrs, feature(doc_auto_cfg))] + +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +#[macro_use] +extern crate bdk_macros; + +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +mod esplora; + +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +mod common; + +mod error; +pub use error::TxSyncError; + +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +pub use esplora::EsploraSyncClient; diff --git a/lightning-transaction-sync/tests/integration_tests.rs b/lightning-transaction-sync/tests/integration_tests.rs new file mode 100644 index 00000000000..5c5ee038936 --- /dev/null +++ b/lightning-transaction-sync/tests/integration_tests.rs @@ -0,0 +1,335 @@ +#![cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +use lightning_transaction_sync::EsploraSyncClient; +use lightning::chain::{Confirm, Filter}; +use lightning::chain::transaction::TransactionData; +use lightning::util::logger::{Logger, Record}; + +use electrsd::{bitcoind, bitcoind::BitcoinD, ElectrsD}; +use bitcoin::{Amount, Txid, BlockHash, BlockHeader}; +use bitcoin::blockdata::constants::genesis_block; +use bitcoin::network::constants::Network; +use electrsd::bitcoind::bitcoincore_rpc::bitcoincore_rpc_json::AddressType; +use bitcoind::bitcoincore_rpc::RpcApi; +use electrum_client::ElectrumApi; + +use once_cell::sync::OnceCell; + +use std::env; +use std::sync::Mutex; +use std::time::Duration; +use std::collections::{HashMap, HashSet}; + +static BITCOIND: OnceCell = OnceCell::new(); +static ELECTRSD: OnceCell = OnceCell::new(); +static PREMINE: OnceCell<()> = OnceCell::new(); +static MINER_LOCK: OnceCell> = OnceCell::new(); + +fn get_bitcoind() -> &'static BitcoinD { + BITCOIND.get_or_init(|| { + let bitcoind_exe = + env::var("BITCOIND_EXE").ok().or_else(|| bitcoind::downloaded_exe_path().ok()).expect( + "you need to provide an env var BITCOIND_EXE or specify a bitcoind version feature", + ); + let mut conf = bitcoind::Conf::default(); + conf.network = "regtest"; + BitcoinD::with_conf(bitcoind_exe, &conf).unwrap() + }) +} + +fn get_electrsd() -> &'static ElectrsD { + ELECTRSD.get_or_init(|| { + let bitcoind = get_bitcoind(); + let electrs_exe = + env::var("ELECTRS_EXE").ok().or_else(electrsd::downloaded_exe_path).expect( + "you need to provide env var ELECTRS_EXE or specify an electrsd version feature", + ); + let mut conf = electrsd::Conf::default(); + conf.http_enabled = true; + conf.network = "regtest"; + ElectrsD::with_conf(electrs_exe, &bitcoind, &conf).unwrap() + }) +} + +fn generate_blocks_and_wait(num: usize) { + let miner_lock = MINER_LOCK.get_or_init(|| Mutex::new(())); + let _miner = miner_lock.lock().unwrap(); + let cur_height = get_bitcoind().client.get_block_count().unwrap(); + let address = get_bitcoind().client.get_new_address(Some("test"), Some(AddressType::Legacy)).unwrap(); + let _block_hashes = get_bitcoind().client.generate_to_address(num as u64, &address).unwrap(); + wait_for_block(cur_height as usize + num); +} + +fn wait_for_block(min_height: usize) { + let mut header = get_electrsd().client.block_headers_subscribe().unwrap(); + loop { + if header.height >= min_height { + break; + } + header = exponential_backoff_poll(|| { + get_electrsd().trigger().unwrap(); + get_electrsd().client.ping().unwrap(); + get_electrsd().client.block_headers_pop().unwrap() + }); + } +} + +fn exponential_backoff_poll(mut poll: F) -> T +where + F: FnMut() -> Option, +{ + let mut delay = Duration::from_millis(64); + let mut tries = 0; + loop { + match poll() { + Some(data) => break data, + None if delay.as_millis() < 512 => { + delay = delay.mul_f32(2.0); + tries += 1; + } + None if tries == 10 => panic!("Exceeded our maximum wait time."), + None => tries += 1, + } + + std::thread::sleep(delay); + } +} + +fn premine() { + PREMINE.get_or_init(|| { + generate_blocks_and_wait(101); + }); +} + +#[derive(Debug)] +enum TestConfirmableEvent { + Confirmed(Txid, BlockHash, u32), + Unconfirmed(Txid), + BestBlockUpdated(BlockHash, u32), +} + +struct TestConfirmable { + pub confirmed_txs: Mutex>, + pub unconfirmed_txs: Mutex>, + pub best_block: Mutex<(BlockHash, u32)>, + pub events: Mutex>, +} + +impl TestConfirmable { + pub fn new() -> Self { + let genesis_hash = genesis_block(Network::Regtest).block_hash(); + Self { + confirmed_txs: Mutex::new(HashMap::new()), + unconfirmed_txs: Mutex::new(HashSet::new()), + best_block: Mutex::new((genesis_hash, 0)), + events: Mutex::new(Vec::new()), + } + } +} + +impl Confirm for TestConfirmable { + fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData<'_>, height: u32) { + for (_, tx) in txdata { + let txid = tx.txid(); + let block_hash = header.block_hash(); + self.confirmed_txs.lock().unwrap().insert(txid, (block_hash, height)); + self.unconfirmed_txs.lock().unwrap().remove(&txid); + self.events.lock().unwrap().push(TestConfirmableEvent::Confirmed(txid, block_hash, height)); + } + } + + fn transaction_unconfirmed(&self, txid: &Txid) { + self.unconfirmed_txs.lock().unwrap().insert(*txid); + self.confirmed_txs.lock().unwrap().remove(txid); + self.events.lock().unwrap().push(TestConfirmableEvent::Unconfirmed(*txid)); + } + + fn best_block_updated(&self, header: &BlockHeader, height: u32) { + let block_hash = header.block_hash(); + *self.best_block.lock().unwrap() = (block_hash, height); + self.events.lock().unwrap().push(TestConfirmableEvent::BestBlockUpdated(block_hash, height)); + } + + fn get_relevant_txids(&self) -> Vec<(Txid, Option)> { + self.confirmed_txs.lock().unwrap().iter().map(|(&txid, (hash, _))| (txid, Some(*hash))).collect::>() + } +} + +pub struct TestLogger {} + +impl Logger for TestLogger { + fn log(&self, record: &Record) { + println!("{} -- {}", + record.level, + record.args); + } +} + +#[test] +#[cfg(feature = "esplora-blocking")] +fn test_esplora_syncs() { + premine(); + let mut logger = TestLogger {}; + let esplora_url = format!("http://{}", get_electrsd().esplora_url.as_ref().unwrap()); + let tx_sync = EsploraSyncClient::new(esplora_url, &mut logger); + let confirmable = TestConfirmable::new(); + + // Check we pick up on new best blocks + let expected_height = 0u32; + assert_eq!(confirmable.best_block.lock().unwrap().1, expected_height); + + tx_sync.sync(vec![&confirmable]).unwrap(); + + let expected_height = get_bitcoind().client.get_block_count().unwrap() as u32; + assert_eq!(confirmable.best_block.lock().unwrap().1, expected_height); + + let events = std::mem::take(&mut *confirmable.events.lock().unwrap()); + assert_eq!(events.len(), 1); + + // Check registered confirmed transactions are marked confirmed + let new_address = get_bitcoind().client.get_new_address(Some("test"), Some(AddressType::Legacy)).unwrap(); + let txid = get_bitcoind().client.send_to_address(&new_address, Amount::from_sat(5000), None, None, None, None, None, None).unwrap(); + tx_sync.register_tx(&txid, &new_address.script_pubkey()); + + tx_sync.sync(vec![&confirmable]).unwrap(); + + let events = std::mem::take(&mut *confirmable.events.lock().unwrap()); + assert_eq!(events.len(), 0); + assert!(confirmable.confirmed_txs.lock().unwrap().is_empty()); + assert!(confirmable.unconfirmed_txs.lock().unwrap().is_empty()); + + generate_blocks_and_wait(1); + tx_sync.sync(vec![&confirmable]).unwrap(); + + let events = std::mem::take(&mut *confirmable.events.lock().unwrap()); + assert_eq!(events.len(), 2); + assert!(confirmable.confirmed_txs.lock().unwrap().contains_key(&txid)); + assert!(confirmable.unconfirmed_txs.lock().unwrap().is_empty()); + + // Check previously confirmed transactions are marked unconfirmed when they are reorged. + let best_block_hash = get_bitcoind().client.get_best_block_hash().unwrap(); + get_bitcoind().client.invalidate_block(&best_block_hash).unwrap(); + + // We're getting back to the previous height with a new tip, but best block shouldn't change. + generate_blocks_and_wait(1); + assert_ne!(get_bitcoind().client.get_best_block_hash().unwrap(), best_block_hash); + tx_sync.sync(vec![&confirmable]).unwrap(); + let events = std::mem::take(&mut *confirmable.events.lock().unwrap()); + assert_eq!(events.len(), 0); + + // Now we're surpassing previous height, getting new tip. + generate_blocks_and_wait(1); + assert_ne!(get_bitcoind().client.get_best_block_hash().unwrap(), best_block_hash); + tx_sync.sync(vec![&confirmable]).unwrap(); + + // Transaction still confirmed but under new tip. + assert!(confirmable.confirmed_txs.lock().unwrap().contains_key(&txid)); + assert!(confirmable.unconfirmed_txs.lock().unwrap().is_empty()); + + // Check we got unconfirmed, then reconfirmed in the meantime. + let events = std::mem::take(&mut *confirmable.events.lock().unwrap()); + assert_eq!(events.len(), 3); + + match events[0] { + TestConfirmableEvent::Unconfirmed(t) => { + assert_eq!(t, txid); + }, + _ => panic!("Unexpected event"), + } + + match events[1] { + TestConfirmableEvent::BestBlockUpdated(..) => {}, + _ => panic!("Unexpected event"), + } + + match events[2] { + TestConfirmableEvent::Confirmed(t, _, _) => { + assert_eq!(t, txid); + }, + _ => panic!("Unexpected event"), + } +} + +#[tokio::test] +#[cfg(feature = "esplora-async")] +async fn test_esplora_syncs() { + premine(); + let mut logger = TestLogger {}; + let esplora_url = format!("http://{}", get_electrsd().esplora_url.as_ref().unwrap()); + let tx_sync = EsploraSyncClient::new(esplora_url, &mut logger); + let confirmable = TestConfirmable::new(); + + // Check we pick up on new best blocks + let expected_height = 0u32; + assert_eq!(confirmable.best_block.lock().unwrap().1, expected_height); + + tx_sync.sync(vec![&confirmable]).await.unwrap(); + + let expected_height = get_bitcoind().client.get_block_count().unwrap() as u32; + assert_eq!(confirmable.best_block.lock().unwrap().1, expected_height); + + let events = std::mem::take(&mut *confirmable.events.lock().unwrap()); + assert_eq!(events.len(), 1); + + // Check registered confirmed transactions are marked confirmed + let new_address = get_bitcoind().client.get_new_address(Some("test"), Some(AddressType::Legacy)).unwrap(); + let txid = get_bitcoind().client.send_to_address(&new_address, Amount::from_sat(5000), None, None, None, None, None, None).unwrap(); + tx_sync.register_tx(&txid, &new_address.script_pubkey()); + + tx_sync.sync(vec![&confirmable]).await.unwrap(); + + let events = std::mem::take(&mut *confirmable.events.lock().unwrap()); + assert_eq!(events.len(), 0); + assert!(confirmable.confirmed_txs.lock().unwrap().is_empty()); + assert!(confirmable.unconfirmed_txs.lock().unwrap().is_empty()); + + generate_blocks_and_wait(1); + tx_sync.sync(vec![&confirmable]).await.unwrap(); + + let events = std::mem::take(&mut *confirmable.events.lock().unwrap()); + assert_eq!(events.len(), 2); + assert!(confirmable.confirmed_txs.lock().unwrap().contains_key(&txid)); + assert!(confirmable.unconfirmed_txs.lock().unwrap().is_empty()); + + // Check previously confirmed transactions are marked unconfirmed when they are reorged. + let best_block_hash = get_bitcoind().client.get_best_block_hash().unwrap(); + get_bitcoind().client.invalidate_block(&best_block_hash).unwrap(); + + // We're getting back to the previous height with a new tip, but best block shouldn't change. + generate_blocks_and_wait(1); + assert_ne!(get_bitcoind().client.get_best_block_hash().unwrap(), best_block_hash); + tx_sync.sync(vec![&confirmable]).await.unwrap(); + let events = std::mem::take(&mut *confirmable.events.lock().unwrap()); + assert_eq!(events.len(), 0); + + // Now we're surpassing previous height, getting new tip. + generate_blocks_and_wait(1); + assert_ne!(get_bitcoind().client.get_best_block_hash().unwrap(), best_block_hash); + tx_sync.sync(vec![&confirmable]).await.unwrap(); + + // Transaction still confirmed but under new tip. + assert!(confirmable.confirmed_txs.lock().unwrap().contains_key(&txid)); + assert!(confirmable.unconfirmed_txs.lock().unwrap().is_empty()); + + // Check we got unconfirmed, then reconfirmed in the meantime. + let events = std::mem::take(&mut *confirmable.events.lock().unwrap()); + assert_eq!(events.len(), 3); + + match events[0] { + TestConfirmableEvent::Unconfirmed(t) => { + assert_eq!(t, txid); + }, + _ => panic!("Unexpected event"), + } + + match events[1] { + TestConfirmableEvent::BestBlockUpdated(..) => {}, + _ => panic!("Unexpected event"), + } + + match events[2] { + TestConfirmableEvent::Confirmed(t, _, _) => { + assert_eq!(t, txid); + }, + _ => panic!("Unexpected event"), + } +}