diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs new file mode 100644 index 00000000000..da9895ae721 --- /dev/null +++ b/lightning-block-sync/src/init.rs @@ -0,0 +1,351 @@ +use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier}; +use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader}; + +use bitcoin::blockdata::block::{Block, BlockHeader}; +use bitcoin::hash_types::BlockHash; +use bitcoin::network::constants::Network; + +use lightning::chain; + +/// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each +/// listener's view of the chain from its paired block hash to `block_source`'s best chain tip. +/// +/// Upon success, the returned header can be used to initialize [`SpvClient`]. In the case of +/// failure, each listener may be left at a different block hash than the one it was originally +/// paired with. +/// +/// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before +/// switching to [`SpvClient`]. For example: +/// +/// ``` +/// use bitcoin::hash_types::BlockHash; +/// use bitcoin::network::constants::Network; +/// +/// use lightning::chain; +/// use lightning::chain::Watch; +/// use lightning::chain::chainmonitor::ChainMonitor; +/// use lightning::chain::channelmonitor; +/// use lightning::chain::channelmonitor::ChannelMonitor; +/// use lightning::chain::chaininterface::BroadcasterInterface; +/// use lightning::chain::chaininterface::FeeEstimator; +/// use lightning::chain::keysinterface; +/// use lightning::chain::keysinterface::KeysInterface; +/// use lightning::ln::channelmanager::ChannelManager; +/// use lightning::ln::channelmanager::ChannelManagerReadArgs; +/// use lightning::util::config::UserConfig; +/// use lightning::util::logger::Logger; +/// use lightning::util::ser::ReadableArgs; +/// +/// use lightning_block_sync::*; +/// +/// use std::cell::RefCell; +/// use std::io::Cursor; +/// +/// async fn init_sync< +/// B: BlockSource, +/// K: KeysInterface, +/// S: keysinterface::Sign, +/// T: BroadcasterInterface, +/// F: FeeEstimator, +/// L: Logger, +/// C: chain::Filter, +/// P: channelmonitor::Persist, +/// >( +/// block_source: &mut B, +/// chain_monitor: &ChainMonitor, +/// config: UserConfig, +/// keys_manager: &K, +/// tx_broadcaster: &T, +/// fee_estimator: &F, +/// logger: &L, +/// persister: &P, +/// ) { +/// // Read a serialized channel monitor paired with the block hash when it was persisted. +/// let serialized_monitor = "..."; +/// let (monitor_block_hash, mut monitor) = <(BlockHash, ChannelMonitor)>::read( +/// &mut Cursor::new(&serialized_monitor), keys_manager).unwrap(); +/// +/// // Read the channel manager paired with the block hash when it was persisted. +/// let serialized_manager = "..."; +/// let (manager_block_hash, mut manager) = { +/// let read_args = ChannelManagerReadArgs::new( +/// keys_manager, +/// fee_estimator, +/// chain_monitor, +/// tx_broadcaster, +/// logger, +/// config, +/// vec![&mut monitor], +/// ); +/// <(BlockHash, ChannelManager, &T, &K, &F, &L>)>::read( +/// &mut Cursor::new(&serialized_manager), read_args).unwrap() +/// }; +/// +/// // Synchronize any channel monitors and the channel manager to be on the best block. +/// let mut cache = UnboundedCache::new(); +/// let mut monitor_listener = (RefCell::new(monitor), &*tx_broadcaster, &*fee_estimator, &*logger); +/// let listeners = vec![ +/// (monitor_block_hash, &mut monitor_listener as &mut dyn chain::Listen), +/// (manager_block_hash, &mut manager as &mut dyn chain::Listen), +/// ]; +/// let chain_tip = init::synchronize_listeners( +/// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap(); +/// +/// // Allow the chain monitor to watch any channels. +/// let monitor = monitor_listener.0.into_inner(); +/// chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor); +/// +/// // Create an SPV client to notify the chain monitor and channel manager of block events. +/// let chain_poller = poll::ChainPoller::new(block_source, Network::Bitcoin); +/// let mut chain_listener = (chain_monitor, &manager); +/// let spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, &chain_listener); +/// } +/// ``` +/// +/// [`SpvClient`]: ../struct.SpvClient.html +/// [`ChannelManager`]: ../../lightning/ln/channelmanager/struct.ChannelManager.html +/// [`ChannelMonitor`]: ../../lightning/chain/channelmonitor/struct.ChannelMonitor.html +pub async fn synchronize_listeners( + block_source: &mut B, + network: Network, + header_cache: &mut C, + mut chain_listeners: Vec<(BlockHash, &mut dyn chain::Listen)>, +) -> BlockSourceResult { + let (best_block_hash, best_block_height) = block_source.get_best_block().await?; + let best_header = block_source + .get_header(&best_block_hash, best_block_height).await? + .validate(best_block_hash)?; + + // Fetch the header for the block hash paired with each listener. + let mut chain_listeners_with_old_headers = Vec::new(); + for (old_block_hash, chain_listener) in chain_listeners.drain(..) { + let old_header = match header_cache.look_up(&old_block_hash) { + Some(header) => *header, + None => block_source + .get_header(&old_block_hash, None).await? + .validate(old_block_hash)? + }; + chain_listeners_with_old_headers.push((old_header, chain_listener)) + } + + // Find differences and disconnect blocks for each listener individually. + let mut chain_poller = ChainPoller::new(block_source, network); + let mut chain_listeners_at_height = Vec::new(); + let mut most_common_ancestor = None; + let mut most_connected_blocks = Vec::new(); + for (old_header, chain_listener) in chain_listeners_with_old_headers.drain(..) { + // Disconnect any stale blocks, but keep them in the cache for the next iteration. + let header_cache = &mut ReadOnlyCache(header_cache); + let (common_ancestor, connected_blocks) = { + let chain_listener = &DynamicChainListener(chain_listener); + let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; + let difference = + chain_notifier.find_difference(best_header, &old_header, &mut chain_poller).await?; + chain_notifier.disconnect_blocks(difference.disconnected_blocks); + (difference.common_ancestor, difference.connected_blocks) + }; + + // Keep track of the most common ancestor and all blocks connected across all listeners. + chain_listeners_at_height.push((common_ancestor.height, chain_listener)); + if connected_blocks.len() > most_connected_blocks.len() { + most_common_ancestor = Some(common_ancestor); + most_connected_blocks = connected_blocks; + } + } + + // Connect new blocks for all listeners at once to avoid re-fetching blocks. + if let Some(common_ancestor) = most_common_ancestor { + let chain_listener = &ChainListenerSet(chain_listeners_at_height); + let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; + chain_notifier.connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller) + .await.or_else(|(e, _)| Err(e))?; + } + + Ok(best_header) +} + +/// A wrapper to make a cache read-only. +/// +/// Used to prevent losing headers that may be needed to disconnect blocks common to more than one +/// listener. +struct ReadOnlyCache<'a, C: Cache>(&'a mut C); + +impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> { + fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { + self.0.look_up(block_hash) + } + + fn block_connected(&mut self, _block_hash: BlockHash, _block_header: ValidatedBlockHeader) { + unreachable!() + } + + fn block_disconnected(&mut self, _block_hash: &BlockHash) -> Option { + None + } +} + +/// Wrapper for supporting dynamically sized chain listeners. +struct DynamicChainListener<'a>(&'a mut dyn chain::Listen); + +impl<'a> chain::Listen for DynamicChainListener<'a> { + fn block_connected(&self, _block: &Block, _height: u32) { + unreachable!() + } + + fn block_disconnected(&self, header: &BlockHeader, height: u32) { + self.0.block_disconnected(header, height) + } +} + +/// A set of dynamically sized chain listeners, each paired with a starting block height. +struct ChainListenerSet<'a>(Vec<(u32, &'a mut dyn chain::Listen)>); + +impl<'a> chain::Listen for ChainListenerSet<'a> { + fn block_connected(&self, block: &Block, height: u32) { + for (starting_height, chain_listener) in self.0.iter() { + if height > *starting_height { + chain_listener.block_connected(block, height); + } + } + } + + fn block_disconnected(&self, _header: &BlockHeader, _height: u32) { + unreachable!() + } +} + +#[cfg(test)] +mod tests { + use crate::test_utils::{Blockchain, MockChainListener}; + use super::*; + + use bitcoin::network::constants::Network; + + #[tokio::test] + async fn sync_from_same_chain() { + let mut chain = Blockchain::default().with_height(4); + + let mut listener_1 = MockChainListener::new() + .expect_block_connected(*chain.at_height(2)) + .expect_block_connected(*chain.at_height(3)) + .expect_block_connected(*chain.at_height(4)); + let mut listener_2 = MockChainListener::new() + .expect_block_connected(*chain.at_height(3)) + .expect_block_connected(*chain.at_height(4)); + let mut listener_3 = MockChainListener::new() + .expect_block_connected(*chain.at_height(4)); + + let listeners = vec![ + (chain.at_height(1).block_hash, &mut listener_1 as &mut dyn chain::Listen), + (chain.at_height(2).block_hash, &mut listener_2 as &mut dyn chain::Listen), + (chain.at_height(3).block_hash, &mut listener_3 as &mut dyn chain::Listen), + ]; + let mut cache = chain.header_cache(0..=4); + match synchronize_listeners(&mut chain, Network::Bitcoin, &mut cache, listeners).await { + Ok(header) => assert_eq!(header, chain.tip()), + Err(e) => panic!("Unexpected error: {:?}", e), + } + } + + #[tokio::test] + async fn sync_from_different_chains() { + let mut main_chain = Blockchain::default().with_height(4); + let fork_chain_1 = main_chain.fork_at_height(1); + let fork_chain_2 = main_chain.fork_at_height(2); + let fork_chain_3 = main_chain.fork_at_height(3); + + let mut listener_1 = MockChainListener::new() + .expect_block_disconnected(*fork_chain_1.at_height(4)) + .expect_block_disconnected(*fork_chain_1.at_height(3)) + .expect_block_disconnected(*fork_chain_1.at_height(2)) + .expect_block_connected(*main_chain.at_height(2)) + .expect_block_connected(*main_chain.at_height(3)) + .expect_block_connected(*main_chain.at_height(4)); + let mut listener_2 = MockChainListener::new() + .expect_block_disconnected(*fork_chain_2.at_height(4)) + .expect_block_disconnected(*fork_chain_2.at_height(3)) + .expect_block_connected(*main_chain.at_height(3)) + .expect_block_connected(*main_chain.at_height(4)); + let mut listener_3 = MockChainListener::new() + .expect_block_disconnected(*fork_chain_3.at_height(4)) + .expect_block_connected(*main_chain.at_height(4)); + + let listeners = vec![ + (fork_chain_1.tip().block_hash, &mut listener_1 as &mut dyn chain::Listen), + (fork_chain_2.tip().block_hash, &mut listener_2 as &mut dyn chain::Listen), + (fork_chain_3.tip().block_hash, &mut listener_3 as &mut dyn chain::Listen), + ]; + let mut cache = fork_chain_1.header_cache(2..=4); + cache.extend(fork_chain_2.header_cache(3..=4)); + cache.extend(fork_chain_3.header_cache(4..=4)); + match synchronize_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await { + Ok(header) => assert_eq!(header, main_chain.tip()), + Err(e) => panic!("Unexpected error: {:?}", e), + } + } + + #[tokio::test] + async fn sync_from_overlapping_chains() { + let mut main_chain = Blockchain::default().with_height(4); + let fork_chain_1 = main_chain.fork_at_height(1); + let fork_chain_2 = fork_chain_1.fork_at_height(2); + let fork_chain_3 = fork_chain_2.fork_at_height(3); + + let mut listener_1 = MockChainListener::new() + .expect_block_disconnected(*fork_chain_1.at_height(4)) + .expect_block_disconnected(*fork_chain_1.at_height(3)) + .expect_block_disconnected(*fork_chain_1.at_height(2)) + .expect_block_connected(*main_chain.at_height(2)) + .expect_block_connected(*main_chain.at_height(3)) + .expect_block_connected(*main_chain.at_height(4)); + let mut listener_2 = MockChainListener::new() + .expect_block_disconnected(*fork_chain_2.at_height(4)) + .expect_block_disconnected(*fork_chain_2.at_height(3)) + .expect_block_disconnected(*fork_chain_2.at_height(2)) + .expect_block_connected(*main_chain.at_height(2)) + .expect_block_connected(*main_chain.at_height(3)) + .expect_block_connected(*main_chain.at_height(4)); + let mut listener_3 = MockChainListener::new() + .expect_block_disconnected(*fork_chain_3.at_height(4)) + .expect_block_disconnected(*fork_chain_3.at_height(3)) + .expect_block_disconnected(*fork_chain_3.at_height(2)) + .expect_block_connected(*main_chain.at_height(2)) + .expect_block_connected(*main_chain.at_height(3)) + .expect_block_connected(*main_chain.at_height(4)); + + let listeners = vec![ + (fork_chain_1.tip().block_hash, &mut listener_1 as &mut dyn chain::Listen), + (fork_chain_2.tip().block_hash, &mut listener_2 as &mut dyn chain::Listen), + (fork_chain_3.tip().block_hash, &mut listener_3 as &mut dyn chain::Listen), + ]; + let mut cache = fork_chain_1.header_cache(2..=4); + cache.extend(fork_chain_2.header_cache(3..=4)); + cache.extend(fork_chain_3.header_cache(4..=4)); + match synchronize_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await { + Ok(header) => assert_eq!(header, main_chain.tip()), + Err(e) => panic!("Unexpected error: {:?}", e), + } + } + + #[tokio::test] + async fn cache_connected_and_keep_disconnected_blocks() { + let mut main_chain = Blockchain::default().with_height(2); + let fork_chain = main_chain.fork_at_height(1); + let new_tip = main_chain.tip(); + let old_tip = fork_chain.tip(); + + let mut listener = MockChainListener::new() + .expect_block_disconnected(*old_tip) + .expect_block_connected(*new_tip); + + let listeners = vec![(old_tip.block_hash, &mut listener as &mut dyn chain::Listen)]; + let mut cache = fork_chain.header_cache(2..=2); + match synchronize_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await { + Ok(_) => { + assert!(cache.contains_key(&new_tip.block_hash)); + assert!(cache.contains_key(&old_tip.block_hash)); + }, + Err(e) => panic!("Unexpected error: {:?}", e), + } + } +} diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 58f77bdcaba..db536fe7d43 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -1,5 +1,9 @@ //! A lightweight client for keeping in sync with chain activity. //! +//! Defines an [`SpvClient`] utility for polling one or more block sources for the best chain tip. +//! It is used to notify listeners of blocks connected or disconnected since the last poll. Useful +//! for keeping a Lightning node in sync with the chain. +//! //! Defines a [`BlockSource`] trait, which is an asynchronous interface for retrieving block headers //! and data. //! @@ -9,11 +13,15 @@ //! Both features support either blocking I/O using `std::net::TcpStream` or, with feature `tokio`, //! non-blocking I/O using `tokio::net::TcpStream` from inside a Tokio runtime. //! +//! [`SpvClient`]: struct.SpvClient.html //! [`BlockSource`]: trait.BlockSource.html #[cfg(any(feature = "rest-client", feature = "rpc-client"))] pub mod http; +pub mod init; +pub mod poll; + #[cfg(feature = "rest-client")] pub mod rest; @@ -23,14 +31,23 @@ pub mod rpc; #[cfg(any(feature = "rest-client", feature = "rpc-client"))] mod convert; +#[cfg(test)] +mod test_utils; + #[cfg(any(feature = "rest-client", feature = "rpc-client"))] mod utils; +use crate::poll::{ChainTip, Poll, ValidatedBlockHeader}; + use bitcoin::blockdata::block::{Block, BlockHeader}; use bitcoin::hash_types::BlockHash; use bitcoin::util::uint::Uint256; +use lightning::chain; +use lightning::chain::Listen; + use std::future::Future; +use std::ops::Deref; use std::pin::Pin; /// Abstract type for retrieving block headers and data. @@ -47,9 +64,13 @@ pub trait BlockSource : Sync + Send { /// error. fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block>; - // TODO: Phrase in terms of `Poll` once added. - /// Returns the hash of the best block and, optionally, its height. When polling a block source, - /// the height is passed to `get_header` to allow for a more efficient lookup. + /// Returns the hash of the best block and, optionally, its height. + /// + /// When polling a block source, [`Poll`] implementations may pass the height to [`get_header`] + /// to allow for a more efficient lookup. + /// + /// [`Poll`]: poll/trait.Poll.html + /// [`get_header`]: #tymethod.get_header fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<(BlockHash, Option)>; } @@ -65,13 +86,14 @@ type AsyncBlockSourceResult<'a, T> = Pin, } /// The kind of `BlockSourceError`, either persistent or transient. -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum BlockSourceErrorKind { /// Indicates an error that won't resolve when retrying a request (e.g., invalid data). Persistent, @@ -124,3 +146,562 @@ pub struct BlockHeaderData { /// of equivalent weight. pub chainwork: Uint256, } + +/// A lightweight client for keeping a listener in sync with the chain, allowing for Simplified +/// Payment Verification (SPV). +/// +/// The client is parameterized by a chain poller which is responsible for polling one or more block +/// sources for the best chain tip. During this process it detects any chain forks, determines which +/// constitutes the best chain, and updates the listener accordingly with any blocks that were +/// connected or disconnected since the last poll. +/// +/// Block headers for the best chain are maintained in the parameterized cache, allowing for a +/// custom cache eviction policy. This offers flexibility to those sensitive to resource usage. +/// Hence, there is a trade-off between a lower memory footprint and potentially increased network +/// I/O as headers are re-fetched during fork detection. +pub struct SpvClient<'a, P: Poll, C: Cache, L: Deref> +where L::Target: chain::Listen { + chain_tip: ValidatedBlockHeader, + chain_poller: P, + chain_notifier: ChainNotifier<'a, C, L>, +} + +/// The `Cache` trait defines behavior for managing a block header cache, where block headers are +/// keyed by block hash. +/// +/// Used by [`ChainNotifier`] to store headers along the best chain, which is important for ensuring +/// that blocks can be disconnected if they are no longer accessible from a block source (e.g., if +/// the block source does not store stale forks indefinitely). +/// +/// Implementations may define how long to retain headers such that it's unlikely they will ever be +/// needed to disconnect a block. In cases where block sources provide access to headers on stale +/// forks reliably, caches may be entirely unnecessary. +/// +/// [`ChainNotifier`]: struct.ChainNotifier.html +pub trait Cache { + /// Retrieves the block header keyed by the given block hash. + fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader>; + + /// Called when a block has been connected to the best chain to ensure it is available to be + /// disconnected later if needed. + fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader); + + /// Called when a block has been disconnected from the best chain. Once disconnected, a block's + /// header is no longer needed and thus can be removed. + fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option; +} + +/// Unbounded cache of block headers keyed by block hash. +pub type UnboundedCache = std::collections::HashMap; + +impl Cache for UnboundedCache { + fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { + self.get(block_hash) + } + + fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { + self.insert(block_hash, block_header); + } + + fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option { + self.remove(block_hash) + } +} + +impl<'a, P: Poll, C: Cache, L: Deref> SpvClient<'a, P, C, L> where L::Target: chain::Listen { + /// Creates a new SPV client using `chain_tip` as the best known chain tip. + /// + /// Subsequent calls to [`poll_best_tip`] will poll for the best chain tip using the given chain + /// poller, which may be configured with one or more block sources to query. At least one block + /// source must provide headers back from the best chain tip to its common ancestor with + /// `chain_tip`. + /// * `header_cache` is used to look up and store headers on the best chain + /// * `chain_listener` is notified of any blocks connected or disconnected + /// + /// [`poll_best_tip`]: struct.SpvClient.html#method.poll_best_tip + pub fn new( + chain_tip: ValidatedBlockHeader, + chain_poller: P, + header_cache: &'a mut C, + chain_listener: L, + ) -> Self { + let chain_notifier = ChainNotifier { header_cache, chain_listener }; + Self { chain_tip, chain_poller, chain_notifier } + } + + /// Polls for the best tip and updates the chain listener with any connected or disconnected + /// blocks accordingly. + /// + /// Returns the best polled chain tip relative to the previous best known tip and whether any + /// blocks were indeed connected or disconnected. + pub async fn poll_best_tip(&mut self) -> BlockSourceResult<(ChainTip, bool)> { + let chain_tip = self.chain_poller.poll_chain_tip(self.chain_tip).await?; + let blocks_connected = match chain_tip { + ChainTip::Common => false, + ChainTip::Better(chain_tip) => { + debug_assert_ne!(chain_tip.block_hash, self.chain_tip.block_hash); + debug_assert!(chain_tip.chainwork > self.chain_tip.chainwork); + self.update_chain_tip(chain_tip).await + }, + ChainTip::Worse(chain_tip) => { + debug_assert_ne!(chain_tip.block_hash, self.chain_tip.block_hash); + debug_assert!(chain_tip.chainwork <= self.chain_tip.chainwork); + false + }, + }; + Ok((chain_tip, blocks_connected)) + } + + /// Updates the chain tip, syncing the chain listener with any connected or disconnected + /// blocks. Returns whether there were any such blocks. + async fn update_chain_tip(&mut self, best_chain_tip: ValidatedBlockHeader) -> bool { + match self.chain_notifier.synchronize_listener( + best_chain_tip, &self.chain_tip, &mut self.chain_poller).await + { + Ok(_) => { + self.chain_tip = best_chain_tip; + true + }, + Err((_, Some(chain_tip))) if chain_tip.block_hash != self.chain_tip.block_hash => { + self.chain_tip = chain_tip; + true + }, + Err(_) => false, + } + } +} + +/// Notifies [listeners] of blocks that have been connected or disconnected from the chain. +/// +/// [listeners]: ../../lightning/chain/trait.Listen.html +pub struct ChainNotifier<'a, C: Cache, L: Deref> where L::Target: chain::Listen { + /// Cache for looking up headers before fetching from a block source. + header_cache: &'a mut C, + + /// Listener that will be notified of connected or disconnected blocks. + chain_listener: L, +} + +/// Changes made to the chain between subsequent polls that transformed it from having one chain tip +/// to another. +/// +/// Blocks are given in height-descending order. Therefore, blocks are first disconnected in order +/// before new blocks are connected in reverse order. +struct ChainDifference { + /// The most recent ancestor common between the chain tips. + /// + /// If there are any disconnected blocks, this is where the chain forked. + common_ancestor: ValidatedBlockHeader, + + /// Blocks that were disconnected from the chain since the last poll. + disconnected_blocks: Vec, + + /// Blocks that were connected to the chain since the last poll. + connected_blocks: Vec, +} + +impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L> where L::Target: chain::Listen { + /// Finds the first common ancestor between `new_header` and `old_header`, disconnecting blocks + /// from `old_header` to get to that point and then connecting blocks until `new_header`. + /// + /// Validates headers along the transition path, but doesn't fetch blocks until the chain is + /// disconnected to the fork point. Thus, this may return an `Err` that includes where the tip + /// ended up which may not be `new_header`. Note that the returned `Err` contains `Some` header + /// if and only if the transition from `old_header` to `new_header` is valid. + async fn synchronize_listener( + &mut self, + new_header: ValidatedBlockHeader, + old_header: &ValidatedBlockHeader, + chain_poller: &mut P, + ) -> Result<(), (BlockSourceError, Option)> { + let difference = self.find_difference(new_header, old_header, chain_poller).await + .map_err(|e| (e, None))?; + self.disconnect_blocks(difference.disconnected_blocks); + self.connect_blocks( + difference.common_ancestor, + difference.connected_blocks, + chain_poller, + ).await + } + + /// Returns the changes needed to produce the chain with `current_header` as its tip from the + /// chain with `prev_header` as its tip. + /// + /// Walks backwards from `current_header` and `prev_header`, finding the common ancestor. + async fn find_difference( + &self, + current_header: ValidatedBlockHeader, + prev_header: &ValidatedBlockHeader, + chain_poller: &mut P, + ) -> BlockSourceResult { + let mut disconnected_blocks = Vec::new(); + let mut connected_blocks = Vec::new(); + let mut current = current_header; + let mut previous = *prev_header; + loop { + // Found the common ancestor. + if current.block_hash == previous.block_hash { + break; + } + + // Walk back the chain, finding blocks needed to connect and disconnect. Only walk back + // the header with the greater height, or both if equal heights. + let current_height = current.height; + let previous_height = previous.height; + if current_height <= previous_height { + disconnected_blocks.push(previous); + previous = self.look_up_previous_header(chain_poller, &previous).await?; + } + if current_height >= previous_height { + connected_blocks.push(current); + current = self.look_up_previous_header(chain_poller, ¤t).await?; + } + } + + let common_ancestor = current; + Ok(ChainDifference { common_ancestor, disconnected_blocks, connected_blocks }) + } + + /// Returns the previous header for the given header, either by looking it up in the cache or + /// fetching it if not found. + async fn look_up_previous_header( + &self, + chain_poller: &mut P, + header: &ValidatedBlockHeader, + ) -> BlockSourceResult { + match self.header_cache.look_up(&header.header.prev_blockhash) { + Some(prev_header) => Ok(*prev_header), + None => chain_poller.look_up_previous_header(header).await, + } + } + + /// Notifies the chain listeners of disconnected blocks. + fn disconnect_blocks(&mut self, mut disconnected_blocks: Vec) { + for header in disconnected_blocks.drain(..) { + if let Some(cached_header) = self.header_cache.block_disconnected(&header.block_hash) { + assert_eq!(cached_header, header); + } + self.chain_listener.block_disconnected(&header.header, header.height); + } + } + + /// Notifies the chain listeners of connected blocks. + async fn connect_blocks( + &mut self, + mut new_tip: ValidatedBlockHeader, + mut connected_blocks: Vec, + chain_poller: &mut P, + ) -> Result<(), (BlockSourceError, Option)> { + for header in connected_blocks.drain(..).rev() { + let block = chain_poller + .fetch_block(&header).await + .or_else(|e| Err((e, Some(new_tip))))?; + debug_assert_eq!(block.block_hash, header.block_hash); + + self.header_cache.block_connected(header.block_hash, header); + self.chain_listener.block_connected(&block, header.height); + new_tip = header; + } + + Ok(()) + } +} + +#[cfg(test)] +mod spv_client_tests { + use crate::test_utils::{Blockchain, NullChainListener}; + use super::*; + + use bitcoin::network::constants::Network; + + #[tokio::test] + async fn poll_from_chain_without_headers() { + let mut chain = Blockchain::default().with_height(3).without_headers(); + let best_tip = chain.at_height(1); + + let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + let mut cache = UnboundedCache::new(); + let mut listener = NullChainListener {}; + let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener); + match client.poll_best_tip().await { + Err(e) => { + assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); + assert_eq!(e.into_inner().as_ref().to_string(), "header not found"); + }, + Ok(_) => panic!("Expected error"), + } + assert_eq!(client.chain_tip, best_tip); + } + + #[tokio::test] + async fn poll_from_chain_with_common_tip() { + let mut chain = Blockchain::default().with_height(3); + let common_tip = chain.tip(); + + let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + let mut cache = UnboundedCache::new(); + let mut listener = NullChainListener {}; + let mut client = SpvClient::new(common_tip, poller, &mut cache, &mut listener); + match client.poll_best_tip().await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok((chain_tip, blocks_connected)) => { + assert_eq!(chain_tip, ChainTip::Common); + assert!(!blocks_connected); + }, + } + assert_eq!(client.chain_tip, common_tip); + } + + #[tokio::test] + async fn poll_from_chain_with_better_tip() { + let mut chain = Blockchain::default().with_height(3); + let new_tip = chain.tip(); + let old_tip = chain.at_height(1); + + let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + let mut cache = UnboundedCache::new(); + let mut listener = NullChainListener {}; + let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); + match client.poll_best_tip().await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok((chain_tip, blocks_connected)) => { + assert_eq!(chain_tip, ChainTip::Better(new_tip)); + assert!(blocks_connected); + }, + } + assert_eq!(client.chain_tip, new_tip); + } + + #[tokio::test] + async fn poll_from_chain_with_better_tip_and_without_any_new_blocks() { + let mut chain = Blockchain::default().with_height(3).without_blocks(2..); + let new_tip = chain.tip(); + let old_tip = chain.at_height(1); + + let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + let mut cache = UnboundedCache::new(); + let mut listener = NullChainListener {}; + let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); + match client.poll_best_tip().await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok((chain_tip, blocks_connected)) => { + assert_eq!(chain_tip, ChainTip::Better(new_tip)); + assert!(!blocks_connected); + }, + } + assert_eq!(client.chain_tip, old_tip); + } + + #[tokio::test] + async fn poll_from_chain_with_better_tip_and_without_some_new_blocks() { + let mut chain = Blockchain::default().with_height(3).without_blocks(3..); + let new_tip = chain.tip(); + let old_tip = chain.at_height(1); + + let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + let mut cache = UnboundedCache::new(); + let mut listener = NullChainListener {}; + let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); + match client.poll_best_tip().await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok((chain_tip, blocks_connected)) => { + assert_eq!(chain_tip, ChainTip::Better(new_tip)); + assert!(blocks_connected); + }, + } + assert_eq!(client.chain_tip, chain.at_height(2)); + } + + #[tokio::test] + async fn poll_from_chain_with_worse_tip() { + let mut chain = Blockchain::default().with_height(3); + let best_tip = chain.tip(); + chain.disconnect_tip(); + let worse_tip = chain.tip(); + + let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + let mut cache = UnboundedCache::new(); + let mut listener = NullChainListener {}; + let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener); + match client.poll_best_tip().await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok((chain_tip, blocks_connected)) => { + assert_eq!(chain_tip, ChainTip::Worse(worse_tip)); + assert!(!blocks_connected); + }, + } + assert_eq!(client.chain_tip, best_tip); + } +} + +#[cfg(test)] +mod chain_notifier_tests { + use crate::test_utils::{Blockchain, MockChainListener}; + use super::*; + + use bitcoin::network::constants::Network; + + #[tokio::test] + async fn sync_from_same_chain() { + let mut chain = Blockchain::default().with_height(3); + + let new_tip = chain.tip(); + let old_tip = chain.at_height(1); + let chain_listener = &MockChainListener::new() + .expect_block_connected(*chain.at_height(2)) + .expect_block_connected(*new_tip); + let mut notifier = ChainNotifier { + header_cache: &mut chain.header_cache(0..=1), + chain_listener, + }; + let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { + Err((e, _)) => panic!("Unexpected error: {:?}", e), + Ok(_) => {}, + } + } + + #[tokio::test] + async fn sync_from_different_chains() { + let mut test_chain = Blockchain::with_network(Network::Testnet).with_height(1); + let main_chain = Blockchain::with_network(Network::Bitcoin).with_height(1); + + let new_tip = test_chain.tip(); + let old_tip = main_chain.tip(); + let chain_listener = &MockChainListener::new(); + let mut notifier = ChainNotifier { + header_cache: &mut main_chain.header_cache(0..=1), + chain_listener, + }; + let mut poller = poll::ChainPoller::new(&mut test_chain, Network::Testnet); + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { + Err((e, _)) => { + assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); + assert_eq!(e.into_inner().as_ref().to_string(), "genesis block reached"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn sync_from_equal_length_fork() { + let main_chain = Blockchain::default().with_height(2); + let mut fork_chain = main_chain.fork_at_height(1); + + let new_tip = fork_chain.tip(); + let old_tip = main_chain.tip(); + let chain_listener = &MockChainListener::new() + .expect_block_disconnected(*old_tip) + .expect_block_connected(*new_tip); + let mut notifier = ChainNotifier { + header_cache: &mut main_chain.header_cache(0..=2), + chain_listener, + }; + let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet); + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { + Err((e, _)) => panic!("Unexpected error: {:?}", e), + Ok(_) => {}, + } + } + + #[tokio::test] + async fn sync_from_shorter_fork() { + let main_chain = Blockchain::default().with_height(3); + let mut fork_chain = main_chain.fork_at_height(1); + fork_chain.disconnect_tip(); + + let new_tip = fork_chain.tip(); + let old_tip = main_chain.tip(); + let chain_listener = &MockChainListener::new() + .expect_block_disconnected(*old_tip) + .expect_block_disconnected(*main_chain.at_height(2)) + .expect_block_connected(*new_tip); + let mut notifier = ChainNotifier { + header_cache: &mut main_chain.header_cache(0..=3), + chain_listener, + }; + let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet); + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { + Err((e, _)) => panic!("Unexpected error: {:?}", e), + Ok(_) => {}, + } + } + + #[tokio::test] + async fn sync_from_longer_fork() { + let mut main_chain = Blockchain::default().with_height(3); + let mut fork_chain = main_chain.fork_at_height(1); + main_chain.disconnect_tip(); + + let new_tip = fork_chain.tip(); + let old_tip = main_chain.tip(); + let chain_listener = &MockChainListener::new() + .expect_block_disconnected(*old_tip) + .expect_block_connected(*fork_chain.at_height(2)) + .expect_block_connected(*new_tip); + let mut notifier = ChainNotifier { + header_cache: &mut main_chain.header_cache(0..=2), + chain_listener, + }; + let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet); + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { + Err((e, _)) => panic!("Unexpected error: {:?}", e), + Ok(_) => {}, + } + } + + #[tokio::test] + async fn sync_from_chain_without_headers() { + let mut chain = Blockchain::default().with_height(3).without_headers(); + + let new_tip = chain.tip(); + let old_tip = chain.at_height(1); + let chain_listener = &MockChainListener::new(); + let mut notifier = ChainNotifier { + header_cache: &mut chain.header_cache(0..=1), + chain_listener, + }; + let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { + Err((_, tip)) => assert_eq!(tip, None), + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn sync_from_chain_without_any_new_blocks() { + let mut chain = Blockchain::default().with_height(3).without_blocks(2..); + + let new_tip = chain.tip(); + let old_tip = chain.at_height(1); + let chain_listener = &MockChainListener::new(); + let mut notifier = ChainNotifier { + header_cache: &mut chain.header_cache(0..=3), + chain_listener, + }; + let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { + Err((_, tip)) => assert_eq!(tip, Some(old_tip)), + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn sync_from_chain_without_some_new_blocks() { + let mut chain = Blockchain::default().with_height(3).without_blocks(3..); + + let new_tip = chain.tip(); + let old_tip = chain.at_height(1); + let chain_listener = &MockChainListener::new() + .expect_block_connected(*chain.at_height(2)); + let mut notifier = ChainNotifier { + header_cache: &mut chain.header_cache(0..=3), + chain_listener, + }; + let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { + Err((_, tip)) => assert_eq!(tip, Some(chain.at_height(2))), + Ok(_) => panic!("Expected error"), + } + } +} diff --git a/lightning-block-sync/src/poll.rs b/lightning-block-sync/src/poll.rs new file mode 100644 index 00000000000..34be2437c8e --- /dev/null +++ b/lightning-block-sync/src/poll.rs @@ -0,0 +1,356 @@ +use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceResult}; + +use bitcoin::blockdata::block::Block; +use bitcoin::hash_types::BlockHash; +use bitcoin::network::constants::Network; + +use std::ops::DerefMut; + +/// The `Poll` trait defines behavior for polling block sources for a chain tip and retrieving +/// related chain data. It serves as an adapter for `BlockSource`. +/// +/// [`ChainPoller`] adapts a single `BlockSource`, while any other implementations of `Poll` are +/// required to be built in terms of it to ensure chain data validity. +/// +/// [`ChainPoller`]: ../struct.ChainPoller.html +pub trait Poll { + /// Returns a chain tip in terms of its relationship to the provided chain tip. + fn poll_chain_tip<'a>(&'a mut self, best_known_chain_tip: ValidatedBlockHeader) -> + AsyncBlockSourceResult<'a, ChainTip>; + + /// Returns the header that preceded the given header in the chain. + fn look_up_previous_header<'a>(&'a mut self, header: &'a ValidatedBlockHeader) -> + AsyncBlockSourceResult<'a, ValidatedBlockHeader>; + + /// Returns the block associated with the given header. + fn fetch_block<'a>(&'a mut self, header: &'a ValidatedBlockHeader) -> + AsyncBlockSourceResult<'a, ValidatedBlock>; +} + +/// A chain tip relative to another chain tip in terms of block hash and chainwork. +#[derive(Clone, Debug, PartialEq)] +pub enum ChainTip { + /// A chain tip with the same hash as another chain's tip. + Common, + + /// A chain tip with more chainwork than another chain's tip. + Better(ValidatedBlockHeader), + + /// A chain tip with less or equal chainwork than another chain's tip. In either case, the + /// hashes of each tip will be different. + Worse(ValidatedBlockHeader), +} + +/// The `Validate` trait defines behavior for validating chain data. +pub(crate) trait Validate { + /// The validated data wrapper which can be dereferenced to obtain the validated data. + type T: std::ops::Deref; + + /// Validates the chain data against the given block hash and any criteria needed to ensure that + /// it is internally consistent. + fn validate(self, block_hash: BlockHash) -> BlockSourceResult; +} + +impl Validate for BlockHeaderData { + type T = ValidatedBlockHeader; + + fn validate(self, block_hash: BlockHash) -> BlockSourceResult { + self.header + .validate_pow(&self.header.target()) + .or_else(|e| Err(BlockSourceError::persistent(e)))?; + + // TODO: Use the result of validate_pow instead of recomputing the block hash once upstream. + if self.header.block_hash() != block_hash { + return Err(BlockSourceError::persistent("invalid block hash")); + } + + Ok(ValidatedBlockHeader { block_hash, inner: self }) + } +} + +impl Validate for Block { + type T = ValidatedBlock; + + fn validate(self, block_hash: BlockHash) -> BlockSourceResult { + self.header + .validate_pow(&self.header.target()) + .or_else(|e| Err(BlockSourceError::persistent(e)))?; + + // TODO: Use the result of validate_pow instead of recomputing the block hash once upstream. + if self.block_hash() != block_hash { + return Err(BlockSourceError::persistent("invalid block hash")); + } + + if !self.check_merkle_root() { + return Err(BlockSourceError::persistent("invalid merkle root")); + } + + if !self.check_witness_commitment() { + return Err(BlockSourceError::persistent("invalid witness commitment")); + } + + Ok(ValidatedBlock { block_hash, inner: self }) + } +} + +/// A block header with validated proof of work and corresponding block hash. +#[derive(Clone, Copy, Debug, PartialEq)] +pub struct ValidatedBlockHeader { + pub(crate) block_hash: BlockHash, + inner: BlockHeaderData, +} + +impl std::ops::Deref for ValidatedBlockHeader { + type Target = BlockHeaderData; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl ValidatedBlockHeader { + /// Checks that the header correctly builds on previous_header: the claimed work differential + /// matches the actual PoW and the difficulty transition is possible, i.e., within 4x. + fn check_builds_on(&self, previous_header: &ValidatedBlockHeader, network: Network) -> BlockSourceResult<()> { + if self.header.prev_blockhash != previous_header.block_hash { + return Err(BlockSourceError::persistent("invalid previous block hash")); + } + + if self.height != previous_header.height + 1 { + return Err(BlockSourceError::persistent("invalid block height")); + } + + let work = self.header.work(); + if self.chainwork != previous_header.chainwork + work { + return Err(BlockSourceError::persistent("invalid chainwork")); + } + + if let Network::Bitcoin = network { + if self.height % 2016 == 0 { + let previous_work = previous_header.header.work(); + if work > (previous_work << 2) || work < (previous_work >> 2) { + return Err(BlockSourceError::persistent("invalid difficulty transition")) + } + } else if self.header.bits != previous_header.header.bits { + return Err(BlockSourceError::persistent("invalid difficulty")) + } + } + + Ok(()) + } +} + +/// A block with validated data against its transaction list and corresponding block hash. +pub struct ValidatedBlock { + pub(crate) block_hash: BlockHash, + inner: Block, +} + +impl std::ops::Deref for ValidatedBlock { + type Target = Block; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +/// The canonical `Poll` implementation used for a single `BlockSource`. +/// +/// Other `Poll` implementations must be built using `ChainPoller` as it provides the only means of +/// validating chain data. +pub struct ChainPoller + Sized + Sync + Send, T: BlockSource> { + block_source: B, + network: Network, +} + +impl + Sized + Sync + Send, T: BlockSource> ChainPoller { + /// Creates a new poller for the given block source. + /// + /// If the `network` parameter is mainnet, then the difficulty between blocks is checked for + /// validity. + pub fn new(block_source: B, network: Network) -> Self { + Self { block_source, network } + } +} + +impl + Sized + Sync + Send, T: BlockSource> Poll for ChainPoller { + fn poll_chain_tip<'a>(&'a mut self, best_known_chain_tip: ValidatedBlockHeader) -> + AsyncBlockSourceResult<'a, ChainTip> + { + Box::pin(async move { + let (block_hash, height) = self.block_source.get_best_block().await?; + if block_hash == best_known_chain_tip.header.block_hash() { + return Ok(ChainTip::Common); + } + + let chain_tip = self.block_source + .get_header(&block_hash, height).await? + .validate(block_hash)?; + if chain_tip.chainwork > best_known_chain_tip.chainwork { + Ok(ChainTip::Better(chain_tip)) + } else { + Ok(ChainTip::Worse(chain_tip)) + } + }) + } + + fn look_up_previous_header<'a>(&'a mut self, header: &'a ValidatedBlockHeader) -> + AsyncBlockSourceResult<'a, ValidatedBlockHeader> + { + Box::pin(async move { + if header.height == 0 { + return Err(BlockSourceError::persistent("genesis block reached")); + } + + let previous_hash = &header.header.prev_blockhash; + let height = header.height - 1; + let previous_header = self.block_source + .get_header(previous_hash, Some(height)).await? + .validate(*previous_hash)?; + header.check_builds_on(&previous_header, self.network)?; + + Ok(previous_header) + }) + } + + fn fetch_block<'a>(&'a mut self, header: &'a ValidatedBlockHeader) -> + AsyncBlockSourceResult<'a, ValidatedBlock> + { + Box::pin(async move { + self.block_source + .get_block(&header.block_hash).await? + .validate(header.block_hash) + }) + } +} + +#[cfg(test)] +mod tests { + use crate::*; + use crate::test_utils::Blockchain; + use super::*; + use bitcoin::util::uint::Uint256; + + #[tokio::test] + async fn poll_empty_chain() { + let mut chain = Blockchain::default().with_height(0); + let best_known_chain_tip = chain.tip(); + chain.disconnect_tip(); + + let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + match poller.poll_chain_tip(best_known_chain_tip).await { + Err(e) => { + assert_eq!(e.kind(), BlockSourceErrorKind::Transient); + assert_eq!(e.into_inner().as_ref().to_string(), "empty chain"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn poll_chain_without_headers() { + let mut chain = Blockchain::default().with_height(1).without_headers(); + let best_known_chain_tip = chain.at_height(0); + + let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + match poller.poll_chain_tip(best_known_chain_tip).await { + Err(e) => { + assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); + assert_eq!(e.into_inner().as_ref().to_string(), "header not found"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn poll_chain_with_invalid_pow() { + let mut chain = Blockchain::default().with_height(1); + let best_known_chain_tip = chain.at_height(0); + + // Invalidate the tip by changing its target. + chain.blocks.last_mut().unwrap().header.bits = + BlockHeader::compact_target_from_u256(&Uint256::from_be_bytes([0; 32])); + + let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + match poller.poll_chain_tip(best_known_chain_tip).await { + Err(e) => { + assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); + assert_eq!(e.into_inner().as_ref().to_string(), "block target correct but not attained"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn poll_chain_with_malformed_headers() { + let mut chain = Blockchain::default().with_height(1).malformed_headers(); + let best_known_chain_tip = chain.at_height(0); + + let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + match poller.poll_chain_tip(best_known_chain_tip).await { + Err(e) => { + assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); + assert_eq!(e.into_inner().as_ref().to_string(), "invalid block hash"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn poll_chain_with_common_tip() { + let mut chain = Blockchain::default().with_height(0); + let best_known_chain_tip = chain.tip(); + + let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + match poller.poll_chain_tip(best_known_chain_tip).await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(tip) => assert_eq!(tip, ChainTip::Common), + } + } + + #[tokio::test] + async fn poll_chain_with_uncommon_tip_but_equal_chainwork() { + let mut chain = Blockchain::default().with_height(1); + let best_known_chain_tip = chain.tip(); + + // Change the nonce to get a different block hash with the same chainwork. + chain.blocks.last_mut().unwrap().header.nonce += 1; + let worse_chain_tip = chain.tip(); + assert_eq!(best_known_chain_tip.chainwork, worse_chain_tip.chainwork); + + let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + match poller.poll_chain_tip(best_known_chain_tip).await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(tip) => assert_eq!(tip, ChainTip::Worse(worse_chain_tip)), + } + } + + #[tokio::test] + async fn poll_chain_with_worse_tip() { + let mut chain = Blockchain::default().with_height(1); + let best_known_chain_tip = chain.tip(); + + chain.disconnect_tip(); + let worse_chain_tip = chain.tip(); + + let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + match poller.poll_chain_tip(best_known_chain_tip).await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(tip) => assert_eq!(tip, ChainTip::Worse(worse_chain_tip)), + } + } + + #[tokio::test] + async fn poll_chain_with_better_tip() { + let mut chain = Blockchain::default().with_height(1); + let best_known_chain_tip = chain.at_height(0); + + let better_chain_tip = chain.tip(); + + let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + match poller.poll_chain_tip(best_known_chain_tip).await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(tip) => assert_eq!(tip, ChainTip::Better(better_chain_tip)), + } + } +} diff --git a/lightning-block-sync/src/test_utils.rs b/lightning-block-sync/src/test_utils.rs new file mode 100644 index 00000000000..8c37c94d8ec --- /dev/null +++ b/lightning-block-sync/src/test_utils.rs @@ -0,0 +1,239 @@ +use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSource, BlockSourceError, UnboundedCache}; +use crate::poll::{Validate, ValidatedBlockHeader}; + +use bitcoin::blockdata::block::{Block, BlockHeader}; +use bitcoin::blockdata::constants::genesis_block; +use bitcoin::hash_types::BlockHash; +use bitcoin::network::constants::Network; +use bitcoin::util::uint::Uint256; + +use lightning::chain; + +use std::cell::RefCell; +use std::collections::VecDeque; + +#[derive(Default)] +pub struct Blockchain { + pub blocks: Vec, + without_blocks: Option>, + without_headers: bool, + malformed_headers: bool, +} + +impl Blockchain { + pub fn default() -> Self { + Blockchain::with_network(Network::Bitcoin) + } + + pub fn with_network(network: Network) -> Self { + let blocks = vec![genesis_block(network)]; + Self { blocks, ..Default::default() } + } + + pub fn with_height(mut self, height: usize) -> Self { + self.blocks.reserve_exact(height); + let bits = BlockHeader::compact_target_from_u256(&Uint256::from_be_bytes([0xff; 32])); + for i in 1..=height { + let prev_block = &self.blocks[i - 1]; + let prev_blockhash = prev_block.block_hash(); + let time = prev_block.header.time + height as u32; + self.blocks.push(Block { + header: BlockHeader { + version: 0, + prev_blockhash, + merkle_root: Default::default(), + time, + bits, + nonce: 0, + }, + txdata: vec![], + }); + } + self + } + + pub fn without_blocks(self, range: std::ops::RangeFrom) -> Self { + Self { without_blocks: Some(range), ..self } + } + + pub fn without_headers(self) -> Self { + Self { without_headers: true, ..self } + } + + pub fn malformed_headers(self) -> Self { + Self { malformed_headers: true, ..self } + } + + pub fn fork_at_height(&self, height: usize) -> Self { + assert!(height + 1 < self.blocks.len()); + let mut blocks = self.blocks.clone(); + let mut prev_blockhash = blocks[height].block_hash(); + for block in blocks.iter_mut().skip(height + 1) { + block.header.prev_blockhash = prev_blockhash; + block.header.nonce += 1; + prev_blockhash = block.block_hash(); + } + Self { blocks, without_blocks: None, ..*self } + } + + pub fn at_height(&self, height: usize) -> ValidatedBlockHeader { + let block_header = self.at_height_unvalidated(height); + let block_hash = self.blocks[height].block_hash(); + block_header.validate(block_hash).unwrap() + } + + fn at_height_unvalidated(&self, height: usize) -> BlockHeaderData { + assert!(!self.blocks.is_empty()); + assert!(height < self.blocks.len()); + BlockHeaderData { + chainwork: self.blocks[0].header.work() + Uint256::from_u64(height as u64).unwrap(), + height: height as u32, + header: self.blocks[height].header.clone(), + } + } + + pub fn tip(&self) -> ValidatedBlockHeader { + assert!(!self.blocks.is_empty()); + self.at_height(self.blocks.len() - 1) + } + + pub fn disconnect_tip(&mut self) -> Option { + self.blocks.pop() + } + + pub fn header_cache(&self, heights: std::ops::RangeInclusive) -> UnboundedCache { + let mut cache = UnboundedCache::new(); + for i in heights { + let value = self.at_height(i); + let key = value.header.block_hash(); + assert!(cache.insert(key, value).is_none()); + } + cache + } +} + +impl BlockSource for Blockchain { + fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height_hint: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData> { + Box::pin(async move { + if self.without_headers { + return Err(BlockSourceError::persistent("header not found")); + } + + for (height, block) in self.blocks.iter().enumerate() { + if block.header.block_hash() == *header_hash { + let mut header_data = self.at_height_unvalidated(height); + if self.malformed_headers { + header_data.header.time += 1; + } + + return Ok(header_data); + } + } + Err(BlockSourceError::transient("header not found")) + }) + } + + fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> { + Box::pin(async move { + for (height, block) in self.blocks.iter().enumerate() { + if block.header.block_hash() == *header_hash { + if let Some(without_blocks) = &self.without_blocks { + if without_blocks.contains(&height) { + return Err(BlockSourceError::persistent("block not found")); + } + } + + return Ok(block.clone()); + } + } + Err(BlockSourceError::transient("block not found")) + }) + } + + fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option)> { + Box::pin(async move { + match self.blocks.last() { + None => Err(BlockSourceError::transient("empty chain")), + Some(block) => { + let height = (self.blocks.len() - 1) as u32; + Ok((block.block_hash(), Some(height))) + }, + } + }) + } +} + +pub struct NullChainListener; + +impl chain::Listen for NullChainListener { + fn block_connected(&self, _block: &Block, _height: u32) {} + fn block_disconnected(&self, _header: &BlockHeader, _height: u32) {} +} + +pub struct MockChainListener { + expected_blocks_connected: RefCell>, + expected_blocks_disconnected: RefCell>, +} + +impl MockChainListener { + pub fn new() -> Self { + Self { + expected_blocks_connected: RefCell::new(VecDeque::new()), + expected_blocks_disconnected: RefCell::new(VecDeque::new()), + } + } + + pub fn expect_block_connected(self, block: BlockHeaderData) -> Self { + self.expected_blocks_connected.borrow_mut().push_back(block); + self + } + + pub fn expect_block_disconnected(self, block: BlockHeaderData) -> Self { + self.expected_blocks_disconnected.borrow_mut().push_back(block); + self + } +} + +impl chain::Listen for MockChainListener { + fn block_connected(&self, block: &Block, height: u32) { + match self.expected_blocks_connected.borrow_mut().pop_front() { + None => { + panic!("Unexpected block connected: {:?}", block.block_hash()); + }, + Some(expected_block) => { + assert_eq!(block.block_hash(), expected_block.header.block_hash()); + assert_eq!(height, expected_block.height); + }, + } + } + + fn block_disconnected(&self, header: &BlockHeader, height: u32) { + match self.expected_blocks_disconnected.borrow_mut().pop_front() { + None => { + panic!("Unexpected block disconnected: {:?}", header.block_hash()); + }, + Some(expected_block) => { + assert_eq!(header.block_hash(), expected_block.header.block_hash()); + assert_eq!(height, expected_block.height); + }, + } + } +} + +impl Drop for MockChainListener { + fn drop(&mut self) { + if std::thread::panicking() { + return; + } + + let expected_blocks_connected = self.expected_blocks_connected.borrow(); + if !expected_blocks_connected.is_empty() { + panic!("Expected blocks connected: {:?}", expected_blocks_connected); + } + + let expected_blocks_disconnected = self.expected_blocks_disconnected.borrow(); + if !expected_blocks_disconnected.is_empty() { + panic!("Expected blocks disconnected: {:?}", expected_blocks_disconnected); + } + } +} diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 8483f5ca829..4cb9128d92f 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -29,7 +29,7 @@ //! [`ChannelMonitor`]: ../channelmonitor/struct.ChannelMonitor.html //! [`MonitorEvent`]: ../channelmonitor/enum.MonitorEvent.html -use bitcoin::blockdata::block::BlockHeader; +use bitcoin::blockdata::block::{Block, BlockHeader}; use chain; use chain::Filter; @@ -140,6 +140,26 @@ where C::Target: chain::Filter, } } +impl +chain::Listen for ChainMonitor +where + ChannelSigner: Sign, + C::Target: chain::Filter, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + P::Target: channelmonitor::Persist, +{ + fn block_connected(&self, block: &Block, height: u32) { + let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); + ChainMonitor::block_connected(self, &block.header, &txdata, height); + } + + fn block_disconnected(&self, header: &BlockHeader, height: u32) { + ChainMonitor::block_disconnected(self, header, height); + } +} + impl chain::Watch for ChainMonitor where C::Target: chain::Filter, diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 0266f31e204..71490dd6e0e 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -22,7 +22,7 @@ //! //! [`chain::Watch`]: ../trait.Watch.html -use bitcoin::blockdata::block::BlockHeader; +use bitcoin::blockdata::block::{Block, BlockHeader}; use bitcoin::blockdata::transaction::{TxOut,Transaction}; use bitcoin::blockdata::transaction::OutPoint as BitcoinOutPoint; use bitcoin::blockdata::script::{Script, Builder}; @@ -41,6 +41,7 @@ use ln::chan_utils; use ln::chan_utils::{CounterpartyCommitmentSecrets, HTLCOutputInCommitment, HTLCType, ChannelTransactionParameters, HolderCommitmentTransaction}; use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash}; use ln::onchaintx::{OnchainTxHandler, InputDescriptors}; +use chain; use chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use chain::transaction::{OutPoint, TransactionData}; use chain::keysinterface::{SpendableOutputDescriptor, StaticPaymentOutputDescriptor, DelayedPaymentOutputDescriptor, Sign, KeysInterface}; @@ -49,6 +50,7 @@ use util::ser::{Readable, ReadableArgs, MaybeReadable, Writer, Writeable, U48}; use util::byte_utils; use util::events::Event; +use std::cell::RefCell; use std::collections::{HashMap, HashSet, hash_map}; use std::{cmp, mem}; use std::ops::Deref; @@ -2297,6 +2299,22 @@ pub trait Persist: Send + Sync { fn update_persisted_channel(&self, id: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; } +impl chain::Listen for (RefCell>, T, F, L) +where + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, +{ + fn block_connected(&self, block: &Block, height: u32) { + let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); + self.0.borrow_mut().block_connected(&block.header, &txdata, height, &*self.1, &*self.2, &*self.3); + } + + fn block_disconnected(&self, header: &BlockHeader, height: u32) { + self.0.borrow_mut().block_disconnected(header, height, &*self.1, &*self.2, &*self.3); + } +} + const MAX_ALLOC_SIZE: usize = 64*1024; impl<'a, Signer: Sign, K: KeysInterface> ReadableArgs<&'a K> diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index 1d51f262216..c1fceec8c47 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -9,6 +9,7 @@ //! Structs and traits which allow other parts of rust-lightning to interact with the blockchain. +use bitcoin::blockdata::block::{Block, BlockHeader}; use bitcoin::blockdata::script::Script; use bitcoin::blockdata::transaction::TxOut; use bitcoin::hash_types::{BlockHash, Txid}; @@ -46,6 +47,18 @@ pub trait Access: Send + Sync { fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> Result; } +/// The `Listen` trait is used to be notified of when blocks have been connected or disconnected +/// from the chain. +/// +/// Useful when needing to replay chain data upon startup or as new chain events occur. +pub trait Listen { + /// Notifies the listener that a block was added at the given height. + fn block_connected(&self, block: &Block, height: u32); + + /// Notifies the listener that a block was removed at the given height. + fn block_disconnected(&self, header: &BlockHeader, height: u32); +} + /// The `Watch` trait defines behavior for watching on-chain activity pertaining to channels as /// blocks are connected and disconnected. /// @@ -123,3 +136,29 @@ pub trait Filter: Send + Sync { /// `script_pubkey` as the spending condition. fn register_output(&self, outpoint: &OutPoint, script_pubkey: &Script); } + +impl Listen for std::ops::Deref { + fn block_connected(&self, block: &Block, height: u32) { + (**self).block_connected(block, height); + } + + fn block_disconnected(&self, header: &BlockHeader, height: u32) { + (**self).block_disconnected(header, height); + } +} + +impl Listen for (T, U) +where + T::Target: Listen, + U::Target: Listen, +{ + fn block_connected(&self, block: &Block, height: u32) { + self.0.block_connected(block, height); + self.1.block_connected(block, height); + } + + fn block_disconnected(&self, header: &BlockHeader, height: u32) { + self.0.block_disconnected(header, height); + self.1.block_disconnected(header, height); + } +} diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index a759443d86e..8f7fbde5f9c 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -18,7 +18,7 @@ //! imply it needs to fail HTLCs/payments/channels it manages). //! -use bitcoin::blockdata::block::BlockHeader; +use bitcoin::blockdata::block::{Block, BlockHeader}; use bitcoin::blockdata::constants::genesis_block; use bitcoin::network::constants::Network; @@ -3139,6 +3139,24 @@ impl EventsProvi } } +impl chain::Listen for ChannelManager +where + M::Target: chain::Watch, + T::Target: BroadcasterInterface, + K::Target: KeysInterface, + F::Target: FeeEstimator, + L::Target: Logger, +{ + fn block_connected(&self, block: &Block, height: u32) { + let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); + ChannelManager::block_connected(self, &block.header, &txdata, height); + } + + fn block_disconnected(&self, header: &BlockHeader, _height: u32) { + ChannelManager::block_disconnected(self, header); + } +} + impl ChannelManager where M::Target: chain::Watch, T::Target: BroadcasterInterface,