From 7d6fce78025e7e55a2256a6c16ad4a4dddfcdceb Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Sun, 31 Jan 2021 23:42:27 -0800 Subject: [PATCH 1/5] Define a Poll trait as an adaptor on BlockSource SPV clients need to poll one or more block sources for the best chain tip and to retrieve related chain data. The Poll trait serves as an adaptor interface for BlockSource. Implementations may define an appropriate polling strategy. --- lightning-block-sync/src/lib.rs | 2 + lightning-block-sync/src/poll.rs | 148 +++++++++++++++++++++++++++++++ 2 files changed, 150 insertions(+) create mode 100644 lightning-block-sync/src/poll.rs diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 58f77bdcaba..8a2f817cec1 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -14,6 +14,8 @@ #[cfg(any(feature = "rest-client", feature = "rpc-client"))] pub mod http; +pub mod poll; + #[cfg(feature = "rest-client")] pub mod rest; diff --git a/lightning-block-sync/src/poll.rs b/lightning-block-sync/src/poll.rs new file mode 100644 index 00000000000..82398a4b39c --- /dev/null +++ b/lightning-block-sync/src/poll.rs @@ -0,0 +1,148 @@ +use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSourceError, BlockSourceResult}; + +use bitcoin::blockdata::block::Block; +use bitcoin::hash_types::BlockHash; +use bitcoin::network::constants::Network; + +/// The `Poll` trait defines behavior for polling block sources for a chain tip and retrieving +/// related chain data. It serves as an adapter for `BlockSource`. +pub trait Poll { + /// Returns a chain tip in terms of its relationship to the provided chain tip. + fn poll_chain_tip<'a>(&'a mut self, best_known_chain_tip: ValidatedBlockHeader) -> + AsyncBlockSourceResult<'a, ChainTip>; + + /// Returns the header that preceded the given header in the chain. + fn look_up_previous_header<'a>(&'a mut self, header: &'a ValidatedBlockHeader) -> + AsyncBlockSourceResult<'a, ValidatedBlockHeader>; + + /// Returns the block associated with the given header. + fn fetch_block<'a>(&'a mut self, header: &'a ValidatedBlockHeader) -> + AsyncBlockSourceResult<'a, ValidatedBlock>; +} + +/// A chain tip relative to another chain tip in terms of block hash and chainwork. +#[derive(Clone, Debug, PartialEq)] +pub enum ChainTip { + /// A chain tip with the same hash as another chain's tip. + Common, + + /// A chain tip with more chainwork than another chain's tip. + Better(ValidatedBlockHeader), + + /// A chain tip with less or equal chainwork than another chain's tip. In either case, the + /// hashes of each tip will be different. + Worse(ValidatedBlockHeader), +} + +/// The `Validate` trait defines behavior for validating chain data. +pub(crate) trait Validate { + /// The validated data wrapper which can be dereferenced to obtain the validated data. + type T: std::ops::Deref; + + /// Validates the chain data against the given block hash and any criteria needed to ensure that + /// it is internally consistent. + fn validate(self, block_hash: BlockHash) -> BlockSourceResult; +} + +impl Validate for BlockHeaderData { + type T = ValidatedBlockHeader; + + fn validate(self, block_hash: BlockHash) -> BlockSourceResult { + self.header + .validate_pow(&self.header.target()) + .or_else(|e| Err(BlockSourceError::persistent(e)))?; + + // TODO: Use the result of validate_pow instead of recomputing the block hash once upstream. + if self.header.block_hash() != block_hash { + return Err(BlockSourceError::persistent("invalid block hash")); + } + + Ok(ValidatedBlockHeader { block_hash, inner: self }) + } +} + +impl Validate for Block { + type T = ValidatedBlock; + + fn validate(self, block_hash: BlockHash) -> BlockSourceResult { + self.header + .validate_pow(&self.header.target()) + .or_else(|e| Err(BlockSourceError::persistent(e)))?; + + // TODO: Use the result of validate_pow instead of recomputing the block hash once upstream. + if self.block_hash() != block_hash { + return Err(BlockSourceError::persistent("invalid block hash")); + } + + if !self.check_merkle_root() { + return Err(BlockSourceError::persistent("invalid merkle root")); + } + + if !self.check_witness_commitment() { + return Err(BlockSourceError::persistent("invalid witness commitment")); + } + + Ok(ValidatedBlock { block_hash, inner: self }) + } +} + +/// A block header with validated proof of work and corresponding block hash. +#[derive(Clone, Copy, Debug, PartialEq)] +pub struct ValidatedBlockHeader { + block_hash: BlockHash, + inner: BlockHeaderData, +} + +impl std::ops::Deref for ValidatedBlockHeader { + type Target = BlockHeaderData; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl ValidatedBlockHeader { + /// Checks that the header correctly builds on previous_header: the claimed work differential + /// matches the actual PoW and the difficulty transition is possible, i.e., within 4x. + fn check_builds_on(&self, previous_header: &ValidatedBlockHeader, network: Network) -> BlockSourceResult<()> { + if self.header.prev_blockhash != previous_header.block_hash { + return Err(BlockSourceError::persistent("invalid previous block hash")); + } + + if self.height != previous_header.height + 1 { + return Err(BlockSourceError::persistent("invalid block height")); + } + + let work = self.header.work(); + if self.chainwork != previous_header.chainwork + work { + return Err(BlockSourceError::persistent("invalid chainwork")); + } + + if let Network::Bitcoin = network { + if self.height % 2016 == 0 { + let previous_work = previous_header.header.work(); + if work > (previous_work << 2) || work < (previous_work >> 2) { + return Err(BlockSourceError::persistent("invalid difficulty transition")) + } + } else if self.header.bits != previous_header.header.bits { + return Err(BlockSourceError::persistent("invalid difficulty")) + } + } + + Ok(()) + } +} + +/// A block with validated data against its transaction list and corresponding block hash. +pub struct ValidatedBlock { + block_hash: BlockHash, + inner: Block, +} + +impl std::ops::Deref for ValidatedBlock { + type Target = Block; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} From 05ec06129c00655523780b2ea6ce57a538456164 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Sun, 31 Jan 2021 23:43:43 -0800 Subject: [PATCH 2/5] Add ChainPoller implementation of Poll trait ChainPoller defines a strategy for polling a single BlockSource. It handles validating chain data returned from the BlockSource. Thus, other implementations of Poll must be defined in terms of ChainPoller. --- lightning-block-sync/src/lib.rs | 6 +- lightning-block-sync/src/poll.rs | 210 ++++++++++++++++++++++++- lightning-block-sync/src/test_utils.rs | 126 +++++++++++++++ 3 files changed, 340 insertions(+), 2 deletions(-) create mode 100644 lightning-block-sync/src/test_utils.rs diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 8a2f817cec1..f2716ccc437 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -25,6 +25,9 @@ pub mod rpc; #[cfg(any(feature = "rest-client", feature = "rpc-client"))] mod convert; +#[cfg(test)] +mod test_utils; + #[cfg(any(feature = "rest-client", feature = "rpc-client"))] mod utils; @@ -67,13 +70,14 @@ type AsyncBlockSourceResult<'a, T> = Pin, } /// The kind of `BlockSourceError`, either persistent or transient. -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum BlockSourceErrorKind { /// Indicates an error that won't resolve when retrying a request (e.g., invalid data). Persistent, diff --git a/lightning-block-sync/src/poll.rs b/lightning-block-sync/src/poll.rs index 82398a4b39c..3ff7606b8d9 100644 --- a/lightning-block-sync/src/poll.rs +++ b/lightning-block-sync/src/poll.rs @@ -1,11 +1,18 @@ -use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSourceError, BlockSourceResult}; +use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceResult}; use bitcoin::blockdata::block::Block; use bitcoin::hash_types::BlockHash; use bitcoin::network::constants::Network; +use std::ops::DerefMut; + /// The `Poll` trait defines behavior for polling block sources for a chain tip and retrieving /// related chain data. It serves as an adapter for `BlockSource`. +/// +/// [`ChainPoller`] adapts a single `BlockSource`, while any other implementations of `Poll` are +/// required to be built in terms of it to ensure chain data validity. +/// +/// [`ChainPoller`]: ../struct.ChainPoller.html pub trait Poll { /// Returns a chain tip in terms of its relationship to the provided chain tip. fn poll_chain_tip<'a>(&'a mut self, best_known_chain_tip: ValidatedBlockHeader) -> @@ -146,3 +153,204 @@ impl std::ops::Deref for ValidatedBlock { &self.inner } } + +/// The canonical `Poll` implementation used for a single `BlockSource`. +/// +/// Other `Poll` implementations must be built using `ChainPoller` as it provides the only means of +/// validating chain data. +pub struct ChainPoller + Sized + Sync + Send, T: BlockSource> { + block_source: B, + network: Network, +} + +impl + Sized + Sync + Send, T: BlockSource> ChainPoller { + /// Creates a new poller for the given block source. + /// + /// If the `network` parameter is mainnet, then the difficulty between blocks is checked for + /// validity. + pub fn new(block_source: B, network: Network) -> Self { + Self { block_source, network } + } +} + +impl + Sized + Sync + Send, T: BlockSource> Poll for ChainPoller { + fn poll_chain_tip<'a>(&'a mut self, best_known_chain_tip: ValidatedBlockHeader) -> + AsyncBlockSourceResult<'a, ChainTip> + { + Box::pin(async move { + let (block_hash, height) = self.block_source.get_best_block().await?; + if block_hash == best_known_chain_tip.header.block_hash() { + return Ok(ChainTip::Common); + } + + let chain_tip = self.block_source + .get_header(&block_hash, height).await? + .validate(block_hash)?; + if chain_tip.chainwork > best_known_chain_tip.chainwork { + Ok(ChainTip::Better(chain_tip)) + } else { + Ok(ChainTip::Worse(chain_tip)) + } + }) + } + + fn look_up_previous_header<'a>(&'a mut self, header: &'a ValidatedBlockHeader) -> + AsyncBlockSourceResult<'a, ValidatedBlockHeader> + { + Box::pin(async move { + if header.height == 0 { + return Err(BlockSourceError::persistent("genesis block reached")); + } + + let previous_hash = &header.header.prev_blockhash; + let height = header.height - 1; + let previous_header = self.block_source + .get_header(previous_hash, Some(height)).await? + .validate(*previous_hash)?; + header.check_builds_on(&previous_header, self.network)?; + + Ok(previous_header) + }) + } + + fn fetch_block<'a>(&'a mut self, header: &'a ValidatedBlockHeader) -> + AsyncBlockSourceResult<'a, ValidatedBlock> + { + Box::pin(async move { + self.block_source + .get_block(&header.block_hash).await? + .validate(header.block_hash) + }) + } +} + +#[cfg(test)] +mod tests { + use crate::*; + use crate::test_utils::Blockchain; + use super::*; + use bitcoin::util::uint::Uint256; + + #[tokio::test] + async fn poll_empty_chain() { + let mut chain = Blockchain::default().with_height(0); + let best_known_chain_tip = chain.tip(); + chain.disconnect_tip(); + + let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + match poller.poll_chain_tip(best_known_chain_tip).await { + Err(e) => { + assert_eq!(e.kind(), BlockSourceErrorKind::Transient); + assert_eq!(e.into_inner().as_ref().to_string(), "empty chain"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn poll_chain_without_headers() { + let mut chain = Blockchain::default().with_height(1).without_headers(); + let best_known_chain_tip = chain.at_height(0); + + let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + match poller.poll_chain_tip(best_known_chain_tip).await { + Err(e) => { + assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); + assert_eq!(e.into_inner().as_ref().to_string(), "header not found"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn poll_chain_with_invalid_pow() { + let mut chain = Blockchain::default().with_height(1); + let best_known_chain_tip = chain.at_height(0); + + // Invalidate the tip by changing its target. + chain.blocks.last_mut().unwrap().header.bits = + BlockHeader::compact_target_from_u256(&Uint256::from_be_bytes([0; 32])); + + let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + match poller.poll_chain_tip(best_known_chain_tip).await { + Err(e) => { + assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); + assert_eq!(e.into_inner().as_ref().to_string(), "block target correct but not attained"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn poll_chain_with_malformed_headers() { + let mut chain = Blockchain::default().with_height(1).malformed_headers(); + let best_known_chain_tip = chain.at_height(0); + + let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + match poller.poll_chain_tip(best_known_chain_tip).await { + Err(e) => { + assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); + assert_eq!(e.into_inner().as_ref().to_string(), "invalid block hash"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn poll_chain_with_common_tip() { + let mut chain = Blockchain::default().with_height(0); + let best_known_chain_tip = chain.tip(); + + let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + match poller.poll_chain_tip(best_known_chain_tip).await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(tip) => assert_eq!(tip, ChainTip::Common), + } + } + + #[tokio::test] + async fn poll_chain_with_uncommon_tip_but_equal_chainwork() { + let mut chain = Blockchain::default().with_height(1); + let best_known_chain_tip = chain.tip(); + + // Change the nonce to get a different block hash with the same chainwork. + chain.blocks.last_mut().unwrap().header.nonce += 1; + let worse_chain_tip = chain.tip(); + assert_eq!(best_known_chain_tip.chainwork, worse_chain_tip.chainwork); + + let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + match poller.poll_chain_tip(best_known_chain_tip).await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(tip) => assert_eq!(tip, ChainTip::Worse(worse_chain_tip)), + } + } + + #[tokio::test] + async fn poll_chain_with_worse_tip() { + let mut chain = Blockchain::default().with_height(1); + let best_known_chain_tip = chain.tip(); + + chain.disconnect_tip(); + let worse_chain_tip = chain.tip(); + + let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + match poller.poll_chain_tip(best_known_chain_tip).await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(tip) => assert_eq!(tip, ChainTip::Worse(worse_chain_tip)), + } + } + + #[tokio::test] + async fn poll_chain_with_better_tip() { + let mut chain = Blockchain::default().with_height(1); + let best_known_chain_tip = chain.at_height(0); + + let better_chain_tip = chain.tip(); + + let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + match poller.poll_chain_tip(best_known_chain_tip).await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(tip) => assert_eq!(tip, ChainTip::Better(better_chain_tip)), + } + } +} diff --git a/lightning-block-sync/src/test_utils.rs b/lightning-block-sync/src/test_utils.rs new file mode 100644 index 00000000000..efd34f74ee5 --- /dev/null +++ b/lightning-block-sync/src/test_utils.rs @@ -0,0 +1,126 @@ +use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSource, BlockSourceError}; +use crate::poll::{Validate, ValidatedBlockHeader}; + +use bitcoin::blockdata::block::{Block, BlockHeader}; +use bitcoin::blockdata::constants::genesis_block; +use bitcoin::hash_types::BlockHash; +use bitcoin::network::constants::Network; +use bitcoin::util::uint::Uint256; + +#[derive(Default)] +pub struct Blockchain { + pub blocks: Vec, + without_headers: bool, + malformed_headers: bool, +} + +impl Blockchain { + pub fn default() -> Self { + Blockchain::with_network(Network::Bitcoin) + } + + pub fn with_network(network: Network) -> Self { + let blocks = vec![genesis_block(network)]; + Self { blocks, ..Default::default() } + } + + pub fn with_height(mut self, height: usize) -> Self { + self.blocks.reserve_exact(height); + let bits = BlockHeader::compact_target_from_u256(&Uint256::from_be_bytes([0xff; 32])); + for i in 1..=height { + let prev_block = &self.blocks[i - 1]; + let prev_blockhash = prev_block.block_hash(); + let time = prev_block.header.time + height as u32; + self.blocks.push(Block { + header: BlockHeader { + version: 0, + prev_blockhash, + merkle_root: Default::default(), + time, + bits, + nonce: 0, + }, + txdata: vec![], + }); + } + self + } + + pub fn without_headers(self) -> Self { + Self { without_headers: true, ..self } + } + + pub fn malformed_headers(self) -> Self { + Self { malformed_headers: true, ..self } + } + + pub fn at_height(&self, height: usize) -> ValidatedBlockHeader { + let block_header = self.at_height_unvalidated(height); + let block_hash = self.blocks[height].block_hash(); + block_header.validate(block_hash).unwrap() + } + + fn at_height_unvalidated(&self, height: usize) -> BlockHeaderData { + assert!(!self.blocks.is_empty()); + assert!(height < self.blocks.len()); + BlockHeaderData { + chainwork: self.blocks[0].header.work() + Uint256::from_u64(height as u64).unwrap(), + height: height as u32, + header: self.blocks[height].header.clone(), + } + } + + pub fn tip(&self) -> ValidatedBlockHeader { + assert!(!self.blocks.is_empty()); + self.at_height(self.blocks.len() - 1) + } + + pub fn disconnect_tip(&mut self) -> Option { + self.blocks.pop() + } +} + +impl BlockSource for Blockchain { + fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height_hint: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData> { + Box::pin(async move { + if self.without_headers { + return Err(BlockSourceError::persistent("header not found")); + } + + for (height, block) in self.blocks.iter().enumerate() { + if block.header.block_hash() == *header_hash { + let mut header_data = self.at_height_unvalidated(height); + if self.malformed_headers { + header_data.header.time += 1; + } + + return Ok(header_data); + } + } + Err(BlockSourceError::transient("header not found")) + }) + } + + fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> { + Box::pin(async move { + for block in self.blocks.iter() { + if block.header.block_hash() == *header_hash { + return Ok(block.clone()); + } + } + Err(BlockSourceError::transient("block not found")) + }) + } + + fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option)> { + Box::pin(async move { + match self.blocks.last() { + None => Err(BlockSourceError::transient("empty chain")), + Some(block) => { + let height = (self.blocks.len() - 1) as u32; + Ok((block.block_hash(), Some(height))) + }, + } + }) + } +} From b1ecfe705d77b1338c7df5e208d76eb6743f270c Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Mon, 1 Feb 2021 13:17:20 -0800 Subject: [PATCH 3/5] Add ChainNotifier and define ChainListener trait Add an interface for being notified of block connected and disconnected events, along with a notifier for generating such events. Used while polling block sources for a new tip in order to feed these events into ChannelManager and ChainMonitor. --- lightning-block-sync/src/lib.rs | 316 +++++++++++++++++++++++++ lightning-block-sync/src/poll.rs | 4 +- lightning-block-sync/src/test_utils.rs | 103 +++++++- 3 files changed, 419 insertions(+), 4 deletions(-) diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index f2716ccc437..9228e11f68e 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -31,6 +31,8 @@ mod test_utils; #[cfg(any(feature = "rest-client", feature = "rpc-client"))] mod utils; +use crate::poll::{Poll, ValidatedBlockHeader}; + use bitcoin::blockdata::block::{Block, BlockHeader}; use bitcoin::hash_types::BlockHash; use bitcoin::util::uint::Uint256; @@ -130,3 +132,317 @@ pub struct BlockHeaderData { /// of equivalent weight. pub chainwork: Uint256, } + +/// Adaptor used for notifying when blocks have been connected or disconnected from the chain. +/// +/// Used when needing to replay chain data upon startup or as new chain events occur. +pub trait ChainListener { + /// Notifies the listener that a block was added at the given height. + fn block_connected(&mut self, block: &Block, height: u32); + + /// Notifies the listener that a block was removed at the given height. + fn block_disconnected(&mut self, header: &BlockHeader, height: u32); +} + +/// The `Cache` trait defines behavior for managing a block header cache, where block headers are +/// keyed by block hash. +/// +/// Used by [`ChainNotifier`] to store headers along the best chain, which is important for ensuring +/// that blocks can be disconnected if they are no longer accessible from a block source (e.g., if +/// the block source does not store stale forks indefinitely). +/// +/// Implementations may define how long to retain headers such that it's unlikely they will ever be +/// needed to disconnect a block. In cases where block sources provide access to headers on stale +/// forks reliably, caches may be entirely unnecessary. +/// +/// [`ChainNotifier`]: struct.ChainNotifier.html +pub trait Cache { + /// Retrieves the block header keyed by the given block hash. + fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader>; + + /// Called when a block has been connected to the best chain to ensure it is available to be + /// disconnected later if needed. + fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader); + + /// Called when a block has been disconnected from the best chain. Once disconnected, a block's + /// header is no longer needed and thus can be removed. + fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option; +} + +/// Unbounded cache of block headers keyed by block hash. +pub type UnboundedCache = std::collections::HashMap; + +impl Cache for UnboundedCache { + fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { + self.get(block_hash) + } + + fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { + self.insert(block_hash, block_header); + } + + fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option { + self.remove(block_hash) + } +} + +/// Notifies [listeners] of blocks that have been connected or disconnected from the chain. +/// +/// [listeners]: trait.ChainListener.html +struct ChainNotifier { + /// Cache for looking up headers before fetching from a block source. + header_cache: C, +} + +/// Changes made to the chain between subsequent polls that transformed it from having one chain tip +/// to another. +/// +/// Blocks are given in height-descending order. Therefore, blocks are first disconnected in order +/// before new blocks are connected in reverse order. +struct ChainDifference { + /// Blocks that were disconnected from the chain since the last poll. + disconnected_blocks: Vec, + + /// Blocks that were connected to the chain since the last poll. + connected_blocks: Vec, +} + +impl ChainNotifier { + /// Finds the fork point between `new_header` and `old_header`, disconnecting blocks from + /// `old_header` to get to that point and then connecting blocks until `new_header`. + /// + /// Validates headers along the transition path, but doesn't fetch blocks until the chain is + /// disconnected to the fork point. Thus, this may return an `Err` that includes where the tip + /// ended up which may not be `new_header`. Note that the returned `Err` contains `Some` header + /// if and only if the transition from `old_header` to `new_header` is valid. + async fn synchronize_listener( + &mut self, + new_header: ValidatedBlockHeader, + old_header: &ValidatedBlockHeader, + chain_poller: &mut P, + chain_listener: &mut L, + ) -> Result<(), (BlockSourceError, Option)> { + let mut difference = self.find_difference(new_header, old_header, chain_poller).await + .map_err(|e| (e, None))?; + + let mut new_tip = *old_header; + for header in difference.disconnected_blocks.drain(..) { + if let Some(cached_header) = self.header_cache.block_disconnected(&header.block_hash) { + assert_eq!(cached_header, header); + } + chain_listener.block_disconnected(&header.header, header.height); + new_tip = header; + } + + for header in difference.connected_blocks.drain(..).rev() { + let block = chain_poller + .fetch_block(&header).await + .or_else(|e| Err((e, Some(new_tip))))?; + debug_assert_eq!(block.block_hash, header.block_hash); + + self.header_cache.block_connected(header.block_hash, header); + chain_listener.block_connected(&block, header.height); + new_tip = header; + } + + Ok(()) + } + + /// Returns the changes needed to produce the chain with `current_header` as its tip from the + /// chain with `prev_header` as its tip. + /// + /// Walks backwards from `current_header` and `prev_header`, finding the common ancestor. + async fn find_difference( + &self, + current_header: ValidatedBlockHeader, + prev_header: &ValidatedBlockHeader, + chain_poller: &mut P, + ) -> BlockSourceResult { + let mut disconnected_blocks = Vec::new(); + let mut connected_blocks = Vec::new(); + let mut current = current_header; + let mut previous = *prev_header; + loop { + // Found the common ancestor. + if current.block_hash == previous.block_hash { + break; + } + + // Walk back the chain, finding blocks needed to connect and disconnect. Only walk back + // the header with the greater height, or both if equal heights. + let current_height = current.height; + let previous_height = previous.height; + if current_height <= previous_height { + disconnected_blocks.push(previous); + previous = self.look_up_previous_header(chain_poller, &previous).await?; + } + if current_height >= previous_height { + connected_blocks.push(current); + current = self.look_up_previous_header(chain_poller, ¤t).await?; + } + } + + Ok(ChainDifference { disconnected_blocks, connected_blocks }) + } + + /// Returns the previous header for the given header, either by looking it up in the cache or + /// fetching it if not found. + async fn look_up_previous_header( + &self, + chain_poller: &mut P, + header: &ValidatedBlockHeader, + ) -> BlockSourceResult { + match self.header_cache.look_up(&header.header.prev_blockhash) { + Some(prev_header) => Ok(*prev_header), + None => chain_poller.look_up_previous_header(header).await, + } + } +} + +#[cfg(test)] +mod chain_notifier_tests { + use crate::test_utils::{Blockchain, MockChainListener}; + use super::*; + + use bitcoin::network::constants::Network; + + #[tokio::test] + async fn sync_from_same_chain() { + let mut chain = Blockchain::default().with_height(3); + + let new_tip = chain.tip(); + let old_tip = chain.at_height(1); + let mut listener = MockChainListener::new() + .expect_block_connected(*chain.at_height(2)) + .expect_block_connected(*new_tip); + let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=1) }; + let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await { + Err((e, _)) => panic!("Unexpected error: {:?}", e), + Ok(_) => {}, + } + } + + #[tokio::test] + async fn sync_from_different_chains() { + let mut test_chain = Blockchain::with_network(Network::Testnet).with_height(1); + let main_chain = Blockchain::with_network(Network::Bitcoin).with_height(1); + + let new_tip = test_chain.tip(); + let old_tip = main_chain.tip(); + let mut listener = MockChainListener::new(); + let mut notifier = ChainNotifier { header_cache: main_chain.header_cache(0..=1) }; + let mut poller = poll::ChainPoller::new(&mut test_chain, Network::Testnet); + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await { + Err((e, _)) => { + assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); + assert_eq!(e.into_inner().as_ref().to_string(), "genesis block reached"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn sync_from_equal_length_fork() { + let main_chain = Blockchain::default().with_height(2); + let mut fork_chain = main_chain.fork_at_height(1); + + let new_tip = fork_chain.tip(); + let old_tip = main_chain.tip(); + let mut listener = MockChainListener::new() + .expect_block_disconnected(*old_tip) + .expect_block_connected(*new_tip); + let mut notifier = ChainNotifier { header_cache: main_chain.header_cache(0..=2) }; + let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet); + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await { + Err((e, _)) => panic!("Unexpected error: {:?}", e), + Ok(_) => {}, + } + } + + #[tokio::test] + async fn sync_from_shorter_fork() { + let main_chain = Blockchain::default().with_height(3); + let mut fork_chain = main_chain.fork_at_height(1); + fork_chain.disconnect_tip(); + + let new_tip = fork_chain.tip(); + let old_tip = main_chain.tip(); + let mut listener = MockChainListener::new() + .expect_block_disconnected(*old_tip) + .expect_block_disconnected(*main_chain.at_height(2)) + .expect_block_connected(*new_tip); + let mut notifier = ChainNotifier { header_cache: main_chain.header_cache(0..=3) }; + let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet); + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await { + Err((e, _)) => panic!("Unexpected error: {:?}", e), + Ok(_) => {}, + } + } + + #[tokio::test] + async fn sync_from_longer_fork() { + let mut main_chain = Blockchain::default().with_height(3); + let mut fork_chain = main_chain.fork_at_height(1); + main_chain.disconnect_tip(); + + let new_tip = fork_chain.tip(); + let old_tip = main_chain.tip(); + let mut listener = MockChainListener::new() + .expect_block_disconnected(*old_tip) + .expect_block_connected(*fork_chain.at_height(2)) + .expect_block_connected(*new_tip); + let mut notifier = ChainNotifier { header_cache: main_chain.header_cache(0..=2) }; + let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet); + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await { + Err((e, _)) => panic!("Unexpected error: {:?}", e), + Ok(_) => {}, + } + } + + #[tokio::test] + async fn sync_from_chain_without_headers() { + let mut chain = Blockchain::default().with_height(3).without_headers(); + + let new_tip = chain.tip(); + let old_tip = chain.at_height(1); + let mut listener = MockChainListener::new(); + let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=1) }; + let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await { + Err((_, tip)) => assert_eq!(tip, None), + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn sync_from_chain_without_any_new_blocks() { + let mut chain = Blockchain::default().with_height(3).without_blocks(2..); + + let new_tip = chain.tip(); + let old_tip = chain.at_height(1); + let mut listener = MockChainListener::new(); + let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=3) }; + let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await { + Err((_, tip)) => assert_eq!(tip, Some(old_tip)), + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn sync_from_chain_without_some_new_blocks() { + let mut chain = Blockchain::default().with_height(3).without_blocks(3..); + + let new_tip = chain.tip(); + let old_tip = chain.at_height(1); + let mut listener = MockChainListener::new() + .expect_block_connected(*chain.at_height(2)); + let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=3) }; + let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await { + Err((_, tip)) => assert_eq!(tip, Some(chain.at_height(2))), + Ok(_) => panic!("Expected error"), + } + } +} diff --git a/lightning-block-sync/src/poll.rs b/lightning-block-sync/src/poll.rs index 3ff7606b8d9..34be2437c8e 100644 --- a/lightning-block-sync/src/poll.rs +++ b/lightning-block-sync/src/poll.rs @@ -96,7 +96,7 @@ impl Validate for Block { /// A block header with validated proof of work and corresponding block hash. #[derive(Clone, Copy, Debug, PartialEq)] pub struct ValidatedBlockHeader { - block_hash: BlockHash, + pub(crate) block_hash: BlockHash, inner: BlockHeaderData, } @@ -142,7 +142,7 @@ impl ValidatedBlockHeader { /// A block with validated data against its transaction list and corresponding block hash. pub struct ValidatedBlock { - block_hash: BlockHash, + pub(crate) block_hash: BlockHash, inner: Block, } diff --git a/lightning-block-sync/src/test_utils.rs b/lightning-block-sync/src/test_utils.rs index efd34f74ee5..70a8982a0ef 100644 --- a/lightning-block-sync/src/test_utils.rs +++ b/lightning-block-sync/src/test_utils.rs @@ -1,4 +1,4 @@ -use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSource, BlockSourceError}; +use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSource, BlockSourceError, ChainListener, UnboundedCache}; use crate::poll::{Validate, ValidatedBlockHeader}; use bitcoin::blockdata::block::{Block, BlockHeader}; @@ -7,9 +7,12 @@ use bitcoin::hash_types::BlockHash; use bitcoin::network::constants::Network; use bitcoin::util::uint::Uint256; +use std::collections::VecDeque; + #[derive(Default)] pub struct Blockchain { pub blocks: Vec, + without_blocks: Option>, without_headers: bool, malformed_headers: bool, } @@ -46,6 +49,10 @@ impl Blockchain { self } + pub fn without_blocks(self, range: std::ops::RangeFrom) -> Self { + Self { without_blocks: Some(range), ..self } + } + pub fn without_headers(self) -> Self { Self { without_headers: true, ..self } } @@ -54,6 +61,18 @@ impl Blockchain { Self { malformed_headers: true, ..self } } + pub fn fork_at_height(&self, height: usize) -> Self { + assert!(height + 1 < self.blocks.len()); + let mut blocks = self.blocks.clone(); + let mut prev_blockhash = blocks[height].block_hash(); + for block in blocks.iter_mut().skip(height + 1) { + block.header.prev_blockhash = prev_blockhash; + block.header.nonce += 1; + prev_blockhash = block.block_hash(); + } + Self { blocks, without_blocks: None, ..*self } + } + pub fn at_height(&self, height: usize) -> ValidatedBlockHeader { let block_header = self.at_height_unvalidated(height); let block_hash = self.blocks[height].block_hash(); @@ -78,6 +97,16 @@ impl Blockchain { pub fn disconnect_tip(&mut self) -> Option { self.blocks.pop() } + + pub fn header_cache(&self, heights: std::ops::RangeInclusive) -> UnboundedCache { + let mut cache = UnboundedCache::new(); + for i in heights { + let value = self.at_height(i); + let key = value.header.block_hash(); + assert!(cache.insert(key, value).is_none()); + } + cache + } } impl BlockSource for Blockchain { @@ -103,8 +132,14 @@ impl BlockSource for Blockchain { fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> { Box::pin(async move { - for block in self.blocks.iter() { + for (height, block) in self.blocks.iter().enumerate() { if block.header.block_hash() == *header_hash { + if let Some(without_blocks) = &self.without_blocks { + if without_blocks.contains(&height) { + return Err(BlockSourceError::persistent("block not found")); + } + } + return Ok(block.clone()); } } @@ -124,3 +159,67 @@ impl BlockSource for Blockchain { }) } } + +pub struct MockChainListener { + expected_blocks_connected: VecDeque, + expected_blocks_disconnected: VecDeque, +} + +impl MockChainListener { + pub fn new() -> Self { + Self { + expected_blocks_connected: VecDeque::new(), + expected_blocks_disconnected: VecDeque::new(), + } + } + + pub fn expect_block_connected(mut self, block: BlockHeaderData) -> Self { + self.expected_blocks_connected.push_back(block); + self + } + + pub fn expect_block_disconnected(mut self, block: BlockHeaderData) -> Self { + self.expected_blocks_disconnected.push_back(block); + self + } +} + +impl ChainListener for MockChainListener { + fn block_connected(&mut self, block: &Block, height: u32) { + match self.expected_blocks_connected.pop_front() { + None => { + panic!("Unexpected block connected: {:?}", block.block_hash()); + }, + Some(expected_block) => { + assert_eq!(block.block_hash(), expected_block.header.block_hash()); + assert_eq!(height, expected_block.height); + }, + } + } + + fn block_disconnected(&mut self, header: &BlockHeader, height: u32) { + match self.expected_blocks_disconnected.pop_front() { + None => { + panic!("Unexpected block disconnected: {:?}", header.block_hash()); + }, + Some(expected_block) => { + assert_eq!(header.block_hash(), expected_block.header.block_hash()); + assert_eq!(height, expected_block.height); + }, + } + } +} + +impl Drop for MockChainListener { + fn drop(&mut self) { + if std::thread::panicking() { + return; + } + if !self.expected_blocks_connected.is_empty() { + panic!("Expected blocks connected: {:?}", self.expected_blocks_connected); + } + if !self.expected_blocks_disconnected.is_empty() { + panic!("Expected blocks disconnected: {:?}", self.expected_blocks_disconnected); + } + } +} From 8505382b197ee9469028b2ea6062fe2489aee6c1 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 3 Feb 2021 17:41:03 -0800 Subject: [PATCH 4/5] Add SpvClient used to poll for the best chain tip Adds a lightweight client for polling one or more block sources for the best chain tip. Notifies listeners of blocks connected or disconnected since the last poll. Useful for keeping a Lightning node in sync with the chain. --- lightning-block-sync/src/lib.rs | 220 ++++++++++++++++++++++++- lightning-block-sync/src/test_utils.rs | 7 + 2 files changed, 223 insertions(+), 4 deletions(-) diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 9228e11f68e..5ed8689fb96 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -1,5 +1,9 @@ //! A lightweight client for keeping in sync with chain activity. //! +//! Defines an [`SpvClient`] utility for polling one or more block sources for the best chain tip. +//! It is used to notify listeners of blocks connected or disconnected since the last poll. Useful +//! for keeping a Lightning node in sync with the chain. +//! //! Defines a [`BlockSource`] trait, which is an asynchronous interface for retrieving block headers //! and data. //! @@ -9,6 +13,7 @@ //! Both features support either blocking I/O using `std::net::TcpStream` or, with feature `tokio`, //! non-blocking I/O using `tokio::net::TcpStream` from inside a Tokio runtime. //! +//! [`SpvClient`]: struct.SpvClient.html //! [`BlockSource`]: trait.BlockSource.html #[cfg(any(feature = "rest-client", feature = "rpc-client"))] @@ -31,7 +36,7 @@ mod test_utils; #[cfg(any(feature = "rest-client", feature = "rpc-client"))] mod utils; -use crate::poll::{Poll, ValidatedBlockHeader}; +use crate::poll::{ChainTip, Poll, ValidatedBlockHeader}; use bitcoin::blockdata::block::{Block, BlockHeader}; use bitcoin::hash_types::BlockHash; @@ -54,9 +59,13 @@ pub trait BlockSource : Sync + Send { /// error. fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block>; - // TODO: Phrase in terms of `Poll` once added. - /// Returns the hash of the best block and, optionally, its height. When polling a block source, - /// the height is passed to `get_header` to allow for a more efficient lookup. + /// Returns the hash of the best block and, optionally, its height. + /// + /// When polling a block source, [`Poll`] implementations may pass the height to [`get_header`] + /// to allow for a more efficient lookup. + /// + /// [`Poll`]: poll/trait.Poll.html + /// [`get_header`]: #tymethod.get_header fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<(BlockHash, Option)>; } @@ -133,6 +142,25 @@ pub struct BlockHeaderData { pub chainwork: Uint256, } +/// A lightweight client for keeping a listener in sync with the chain, allowing for Simplified +/// Payment Verification (SPV). +/// +/// The client is parameterized by a chain poller which is responsible for polling one or more block +/// sources for the best chain tip. During this process it detects any chain forks, determines which +/// constitutes the best chain, and updates the listener accordingly with any blocks that were +/// connected or disconnected since the last poll. +/// +/// Block headers for the best chain are maintained in the parameterized cache, allowing for a +/// custom cache eviction policy. This offers flexibility to those sensitive to resource usage. +/// Hence, there is a trade-off between a lower memory footprint and potentially increased network +/// I/O as headers are re-fetched during fork detection. +pub struct SpvClient { + chain_tip: ValidatedBlockHeader, + chain_poller: P, + chain_notifier: ChainNotifier, + chain_listener: L, +} + /// Adaptor used for notifying when blocks have been connected or disconnected from the chain. /// /// Used when needing to replay chain data upon startup or as new chain events occur. @@ -186,6 +214,69 @@ impl Cache for UnboundedCache { } } +impl SpvClient { + /// Creates a new SPV client using `chain_tip` as the best known chain tip. + /// + /// Subsequent calls to [`poll_best_tip`] will poll for the best chain tip using the given chain + /// poller, which may be configured with one or more block sources to query. At least one block + /// source must provide headers back from the best chain tip to its common ancestor with + /// `chain_tip`. + /// * `header_cache` is used to look up and store headers on the best chain + /// * `chain_listener` is notified of any blocks connected or disconnected + /// + /// [`poll_best_tip`]: struct.SpvClient.html#method.poll_best_tip + pub fn new( + chain_tip: ValidatedBlockHeader, + chain_poller: P, + header_cache: C, + chain_listener: L, + ) -> Self { + let chain_notifier = ChainNotifier { header_cache }; + Self { chain_tip, chain_poller, chain_notifier, chain_listener } + } + + /// Polls for the best tip and updates the chain listener with any connected or disconnected + /// blocks accordingly. + /// + /// Returns the best polled chain tip relative to the previous best known tip and whether any + /// blocks were indeed connected or disconnected. + pub async fn poll_best_tip(&mut self) -> BlockSourceResult<(ChainTip, bool)> { + let chain_tip = self.chain_poller.poll_chain_tip(self.chain_tip).await?; + let blocks_connected = match chain_tip { + ChainTip::Common => false, + ChainTip::Better(chain_tip) => { + debug_assert_ne!(chain_tip.block_hash, self.chain_tip.block_hash); + debug_assert!(chain_tip.chainwork > self.chain_tip.chainwork); + self.update_chain_tip(chain_tip).await + }, + ChainTip::Worse(chain_tip) => { + debug_assert_ne!(chain_tip.block_hash, self.chain_tip.block_hash); + debug_assert!(chain_tip.chainwork <= self.chain_tip.chainwork); + false + }, + }; + Ok((chain_tip, blocks_connected)) + } + + /// Updates the chain tip, syncing the chain listener with any connected or disconnected + /// blocks. Returns whether there were any such blocks. + async fn update_chain_tip(&mut self, best_chain_tip: ValidatedBlockHeader) -> bool { + match self.chain_notifier.synchronize_listener( + best_chain_tip, &self.chain_tip, &mut self.chain_poller, &mut self.chain_listener).await + { + Ok(_) => { + self.chain_tip = best_chain_tip; + true + }, + Err((_, Some(chain_tip))) if chain_tip.block_hash != self.chain_tip.block_hash => { + self.chain_tip = chain_tip; + true + }, + Err(_) => false, + } + } +} + /// Notifies [listeners] of blocks that have been connected or disconnected from the chain. /// /// [listeners]: trait.ChainListener.html @@ -299,6 +390,127 @@ impl ChainNotifier { } } +#[cfg(test)] +mod spv_client_tests { + use crate::test_utils::{Blockchain, NullChainListener}; + use super::*; + + use bitcoin::network::constants::Network; + + #[tokio::test] + async fn poll_from_chain_without_headers() { + let mut chain = Blockchain::default().with_height(3).without_headers(); + let best_tip = chain.at_height(1); + + let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + let cache = UnboundedCache::new(); + let mut client = SpvClient::new(best_tip, poller, cache, NullChainListener {}); + match client.poll_best_tip().await { + Err(e) => { + assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); + assert_eq!(e.into_inner().as_ref().to_string(), "header not found"); + }, + Ok(_) => panic!("Expected error"), + } + assert_eq!(client.chain_tip, best_tip); + } + + #[tokio::test] + async fn poll_from_chain_with_common_tip() { + let mut chain = Blockchain::default().with_height(3); + let common_tip = chain.tip(); + + let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + let cache = UnboundedCache::new(); + let mut client = SpvClient::new(common_tip, poller, cache, NullChainListener {}); + match client.poll_best_tip().await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok((chain_tip, blocks_connected)) => { + assert_eq!(chain_tip, ChainTip::Common); + assert!(!blocks_connected); + }, + } + assert_eq!(client.chain_tip, common_tip); + } + + #[tokio::test] + async fn poll_from_chain_with_better_tip() { + let mut chain = Blockchain::default().with_height(3); + let new_tip = chain.tip(); + let old_tip = chain.at_height(1); + + let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + let cache = UnboundedCache::new(); + let mut client = SpvClient::new(old_tip, poller, cache, NullChainListener {}); + match client.poll_best_tip().await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok((chain_tip, blocks_connected)) => { + assert_eq!(chain_tip, ChainTip::Better(new_tip)); + assert!(blocks_connected); + }, + } + assert_eq!(client.chain_tip, new_tip); + } + + #[tokio::test] + async fn poll_from_chain_with_better_tip_and_without_any_new_blocks() { + let mut chain = Blockchain::default().with_height(3).without_blocks(2..); + let new_tip = chain.tip(); + let old_tip = chain.at_height(1); + + let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + let cache = UnboundedCache::new(); + let mut client = SpvClient::new(old_tip, poller, cache, NullChainListener {}); + match client.poll_best_tip().await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok((chain_tip, blocks_connected)) => { + assert_eq!(chain_tip, ChainTip::Better(new_tip)); + assert!(!blocks_connected); + }, + } + assert_eq!(client.chain_tip, old_tip); + } + + #[tokio::test] + async fn poll_from_chain_with_better_tip_and_without_some_new_blocks() { + let mut chain = Blockchain::default().with_height(3).without_blocks(3..); + let new_tip = chain.tip(); + let old_tip = chain.at_height(1); + + let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + let cache = UnboundedCache::new(); + let mut client = SpvClient::new(old_tip, poller, cache, NullChainListener {}); + match client.poll_best_tip().await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok((chain_tip, blocks_connected)) => { + assert_eq!(chain_tip, ChainTip::Better(new_tip)); + assert!(blocks_connected); + }, + } + assert_eq!(client.chain_tip, chain.at_height(2)); + } + + #[tokio::test] + async fn poll_from_chain_with_worse_tip() { + let mut chain = Blockchain::default().with_height(3); + let best_tip = chain.tip(); + chain.disconnect_tip(); + let worse_tip = chain.tip(); + + let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + let cache = UnboundedCache::new(); + let mut client = SpvClient::new(best_tip, poller, cache, NullChainListener {}); + match client.poll_best_tip().await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok((chain_tip, blocks_connected)) => { + assert_eq!(chain_tip, ChainTip::Worse(worse_tip)); + assert!(!blocks_connected); + }, + } + assert_eq!(client.chain_tip, best_tip); + } +} + #[cfg(test)] mod chain_notifier_tests { use crate::test_utils::{Blockchain, MockChainListener}; diff --git a/lightning-block-sync/src/test_utils.rs b/lightning-block-sync/src/test_utils.rs index 70a8982a0ef..807a33a69de 100644 --- a/lightning-block-sync/src/test_utils.rs +++ b/lightning-block-sync/src/test_utils.rs @@ -160,6 +160,13 @@ impl BlockSource for Blockchain { } } +pub struct NullChainListener; + +impl ChainListener for NullChainListener { + fn block_connected(&mut self, _block: &Block, _height: u32) {} + fn block_disconnected(&mut self, _header: &BlockHeader, _height: u32) {} +} + pub struct MockChainListener { expected_blocks_connected: VecDeque, expected_blocks_disconnected: VecDeque, From 8bfdfdc9e4b40390e825eaf7539170b9373b1d31 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Thu, 4 Feb 2021 19:20:03 -0800 Subject: [PATCH 5/5] Utility for syncing a set of chain listeners Add a utility for syncing a set of chain listeners to a common chain tip. Required to use before creating an SpvClient when the chain listener used with the client is actually a set of listeners each of which may have had left off at a different block. This would occur when the listeners had been persisted individually at different frequencies (e.g., a ChainMonitor's individual ChannelMonitors). --- lightning-block-sync/src/init.rs | 351 +++++++++++++++++++++++++ lightning-block-sync/src/lib.rs | 221 ++++++++++------ lightning-block-sync/src/test_utils.rs | 49 ++-- lightning/src/chain/chainmonitor.rs | 22 +- lightning/src/chain/channelmonitor.rs | 20 +- lightning/src/chain/mod.rs | 39 +++ lightning/src/ln/channelmanager.rs | 20 +- 7 files changed, 611 insertions(+), 111 deletions(-) create mode 100644 lightning-block-sync/src/init.rs diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs new file mode 100644 index 00000000000..da9895ae721 --- /dev/null +++ b/lightning-block-sync/src/init.rs @@ -0,0 +1,351 @@ +use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier}; +use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader}; + +use bitcoin::blockdata::block::{Block, BlockHeader}; +use bitcoin::hash_types::BlockHash; +use bitcoin::network::constants::Network; + +use lightning::chain; + +/// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each +/// listener's view of the chain from its paired block hash to `block_source`'s best chain tip. +/// +/// Upon success, the returned header can be used to initialize [`SpvClient`]. In the case of +/// failure, each listener may be left at a different block hash than the one it was originally +/// paired with. +/// +/// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before +/// switching to [`SpvClient`]. For example: +/// +/// ``` +/// use bitcoin::hash_types::BlockHash; +/// use bitcoin::network::constants::Network; +/// +/// use lightning::chain; +/// use lightning::chain::Watch; +/// use lightning::chain::chainmonitor::ChainMonitor; +/// use lightning::chain::channelmonitor; +/// use lightning::chain::channelmonitor::ChannelMonitor; +/// use lightning::chain::chaininterface::BroadcasterInterface; +/// use lightning::chain::chaininterface::FeeEstimator; +/// use lightning::chain::keysinterface; +/// use lightning::chain::keysinterface::KeysInterface; +/// use lightning::ln::channelmanager::ChannelManager; +/// use lightning::ln::channelmanager::ChannelManagerReadArgs; +/// use lightning::util::config::UserConfig; +/// use lightning::util::logger::Logger; +/// use lightning::util::ser::ReadableArgs; +/// +/// use lightning_block_sync::*; +/// +/// use std::cell::RefCell; +/// use std::io::Cursor; +/// +/// async fn init_sync< +/// B: BlockSource, +/// K: KeysInterface, +/// S: keysinterface::Sign, +/// T: BroadcasterInterface, +/// F: FeeEstimator, +/// L: Logger, +/// C: chain::Filter, +/// P: channelmonitor::Persist, +/// >( +/// block_source: &mut B, +/// chain_monitor: &ChainMonitor, +/// config: UserConfig, +/// keys_manager: &K, +/// tx_broadcaster: &T, +/// fee_estimator: &F, +/// logger: &L, +/// persister: &P, +/// ) { +/// // Read a serialized channel monitor paired with the block hash when it was persisted. +/// let serialized_monitor = "..."; +/// let (monitor_block_hash, mut monitor) = <(BlockHash, ChannelMonitor)>::read( +/// &mut Cursor::new(&serialized_monitor), keys_manager).unwrap(); +/// +/// // Read the channel manager paired with the block hash when it was persisted. +/// let serialized_manager = "..."; +/// let (manager_block_hash, mut manager) = { +/// let read_args = ChannelManagerReadArgs::new( +/// keys_manager, +/// fee_estimator, +/// chain_monitor, +/// tx_broadcaster, +/// logger, +/// config, +/// vec![&mut monitor], +/// ); +/// <(BlockHash, ChannelManager, &T, &K, &F, &L>)>::read( +/// &mut Cursor::new(&serialized_manager), read_args).unwrap() +/// }; +/// +/// // Synchronize any channel monitors and the channel manager to be on the best block. +/// let mut cache = UnboundedCache::new(); +/// let mut monitor_listener = (RefCell::new(monitor), &*tx_broadcaster, &*fee_estimator, &*logger); +/// let listeners = vec![ +/// (monitor_block_hash, &mut monitor_listener as &mut dyn chain::Listen), +/// (manager_block_hash, &mut manager as &mut dyn chain::Listen), +/// ]; +/// let chain_tip = init::synchronize_listeners( +/// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap(); +/// +/// // Allow the chain monitor to watch any channels. +/// let monitor = monitor_listener.0.into_inner(); +/// chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor); +/// +/// // Create an SPV client to notify the chain monitor and channel manager of block events. +/// let chain_poller = poll::ChainPoller::new(block_source, Network::Bitcoin); +/// let mut chain_listener = (chain_monitor, &manager); +/// let spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, &chain_listener); +/// } +/// ``` +/// +/// [`SpvClient`]: ../struct.SpvClient.html +/// [`ChannelManager`]: ../../lightning/ln/channelmanager/struct.ChannelManager.html +/// [`ChannelMonitor`]: ../../lightning/chain/channelmonitor/struct.ChannelMonitor.html +pub async fn synchronize_listeners( + block_source: &mut B, + network: Network, + header_cache: &mut C, + mut chain_listeners: Vec<(BlockHash, &mut dyn chain::Listen)>, +) -> BlockSourceResult { + let (best_block_hash, best_block_height) = block_source.get_best_block().await?; + let best_header = block_source + .get_header(&best_block_hash, best_block_height).await? + .validate(best_block_hash)?; + + // Fetch the header for the block hash paired with each listener. + let mut chain_listeners_with_old_headers = Vec::new(); + for (old_block_hash, chain_listener) in chain_listeners.drain(..) { + let old_header = match header_cache.look_up(&old_block_hash) { + Some(header) => *header, + None => block_source + .get_header(&old_block_hash, None).await? + .validate(old_block_hash)? + }; + chain_listeners_with_old_headers.push((old_header, chain_listener)) + } + + // Find differences and disconnect blocks for each listener individually. + let mut chain_poller = ChainPoller::new(block_source, network); + let mut chain_listeners_at_height = Vec::new(); + let mut most_common_ancestor = None; + let mut most_connected_blocks = Vec::new(); + for (old_header, chain_listener) in chain_listeners_with_old_headers.drain(..) { + // Disconnect any stale blocks, but keep them in the cache for the next iteration. + let header_cache = &mut ReadOnlyCache(header_cache); + let (common_ancestor, connected_blocks) = { + let chain_listener = &DynamicChainListener(chain_listener); + let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; + let difference = + chain_notifier.find_difference(best_header, &old_header, &mut chain_poller).await?; + chain_notifier.disconnect_blocks(difference.disconnected_blocks); + (difference.common_ancestor, difference.connected_blocks) + }; + + // Keep track of the most common ancestor and all blocks connected across all listeners. + chain_listeners_at_height.push((common_ancestor.height, chain_listener)); + if connected_blocks.len() > most_connected_blocks.len() { + most_common_ancestor = Some(common_ancestor); + most_connected_blocks = connected_blocks; + } + } + + // Connect new blocks for all listeners at once to avoid re-fetching blocks. + if let Some(common_ancestor) = most_common_ancestor { + let chain_listener = &ChainListenerSet(chain_listeners_at_height); + let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; + chain_notifier.connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller) + .await.or_else(|(e, _)| Err(e))?; + } + + Ok(best_header) +} + +/// A wrapper to make a cache read-only. +/// +/// Used to prevent losing headers that may be needed to disconnect blocks common to more than one +/// listener. +struct ReadOnlyCache<'a, C: Cache>(&'a mut C); + +impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> { + fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { + self.0.look_up(block_hash) + } + + fn block_connected(&mut self, _block_hash: BlockHash, _block_header: ValidatedBlockHeader) { + unreachable!() + } + + fn block_disconnected(&mut self, _block_hash: &BlockHash) -> Option { + None + } +} + +/// Wrapper for supporting dynamically sized chain listeners. +struct DynamicChainListener<'a>(&'a mut dyn chain::Listen); + +impl<'a> chain::Listen for DynamicChainListener<'a> { + fn block_connected(&self, _block: &Block, _height: u32) { + unreachable!() + } + + fn block_disconnected(&self, header: &BlockHeader, height: u32) { + self.0.block_disconnected(header, height) + } +} + +/// A set of dynamically sized chain listeners, each paired with a starting block height. +struct ChainListenerSet<'a>(Vec<(u32, &'a mut dyn chain::Listen)>); + +impl<'a> chain::Listen for ChainListenerSet<'a> { + fn block_connected(&self, block: &Block, height: u32) { + for (starting_height, chain_listener) in self.0.iter() { + if height > *starting_height { + chain_listener.block_connected(block, height); + } + } + } + + fn block_disconnected(&self, _header: &BlockHeader, _height: u32) { + unreachable!() + } +} + +#[cfg(test)] +mod tests { + use crate::test_utils::{Blockchain, MockChainListener}; + use super::*; + + use bitcoin::network::constants::Network; + + #[tokio::test] + async fn sync_from_same_chain() { + let mut chain = Blockchain::default().with_height(4); + + let mut listener_1 = MockChainListener::new() + .expect_block_connected(*chain.at_height(2)) + .expect_block_connected(*chain.at_height(3)) + .expect_block_connected(*chain.at_height(4)); + let mut listener_2 = MockChainListener::new() + .expect_block_connected(*chain.at_height(3)) + .expect_block_connected(*chain.at_height(4)); + let mut listener_3 = MockChainListener::new() + .expect_block_connected(*chain.at_height(4)); + + let listeners = vec![ + (chain.at_height(1).block_hash, &mut listener_1 as &mut dyn chain::Listen), + (chain.at_height(2).block_hash, &mut listener_2 as &mut dyn chain::Listen), + (chain.at_height(3).block_hash, &mut listener_3 as &mut dyn chain::Listen), + ]; + let mut cache = chain.header_cache(0..=4); + match synchronize_listeners(&mut chain, Network::Bitcoin, &mut cache, listeners).await { + Ok(header) => assert_eq!(header, chain.tip()), + Err(e) => panic!("Unexpected error: {:?}", e), + } + } + + #[tokio::test] + async fn sync_from_different_chains() { + let mut main_chain = Blockchain::default().with_height(4); + let fork_chain_1 = main_chain.fork_at_height(1); + let fork_chain_2 = main_chain.fork_at_height(2); + let fork_chain_3 = main_chain.fork_at_height(3); + + let mut listener_1 = MockChainListener::new() + .expect_block_disconnected(*fork_chain_1.at_height(4)) + .expect_block_disconnected(*fork_chain_1.at_height(3)) + .expect_block_disconnected(*fork_chain_1.at_height(2)) + .expect_block_connected(*main_chain.at_height(2)) + .expect_block_connected(*main_chain.at_height(3)) + .expect_block_connected(*main_chain.at_height(4)); + let mut listener_2 = MockChainListener::new() + .expect_block_disconnected(*fork_chain_2.at_height(4)) + .expect_block_disconnected(*fork_chain_2.at_height(3)) + .expect_block_connected(*main_chain.at_height(3)) + .expect_block_connected(*main_chain.at_height(4)); + let mut listener_3 = MockChainListener::new() + .expect_block_disconnected(*fork_chain_3.at_height(4)) + .expect_block_connected(*main_chain.at_height(4)); + + let listeners = vec![ + (fork_chain_1.tip().block_hash, &mut listener_1 as &mut dyn chain::Listen), + (fork_chain_2.tip().block_hash, &mut listener_2 as &mut dyn chain::Listen), + (fork_chain_3.tip().block_hash, &mut listener_3 as &mut dyn chain::Listen), + ]; + let mut cache = fork_chain_1.header_cache(2..=4); + cache.extend(fork_chain_2.header_cache(3..=4)); + cache.extend(fork_chain_3.header_cache(4..=4)); + match synchronize_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await { + Ok(header) => assert_eq!(header, main_chain.tip()), + Err(e) => panic!("Unexpected error: {:?}", e), + } + } + + #[tokio::test] + async fn sync_from_overlapping_chains() { + let mut main_chain = Blockchain::default().with_height(4); + let fork_chain_1 = main_chain.fork_at_height(1); + let fork_chain_2 = fork_chain_1.fork_at_height(2); + let fork_chain_3 = fork_chain_2.fork_at_height(3); + + let mut listener_1 = MockChainListener::new() + .expect_block_disconnected(*fork_chain_1.at_height(4)) + .expect_block_disconnected(*fork_chain_1.at_height(3)) + .expect_block_disconnected(*fork_chain_1.at_height(2)) + .expect_block_connected(*main_chain.at_height(2)) + .expect_block_connected(*main_chain.at_height(3)) + .expect_block_connected(*main_chain.at_height(4)); + let mut listener_2 = MockChainListener::new() + .expect_block_disconnected(*fork_chain_2.at_height(4)) + .expect_block_disconnected(*fork_chain_2.at_height(3)) + .expect_block_disconnected(*fork_chain_2.at_height(2)) + .expect_block_connected(*main_chain.at_height(2)) + .expect_block_connected(*main_chain.at_height(3)) + .expect_block_connected(*main_chain.at_height(4)); + let mut listener_3 = MockChainListener::new() + .expect_block_disconnected(*fork_chain_3.at_height(4)) + .expect_block_disconnected(*fork_chain_3.at_height(3)) + .expect_block_disconnected(*fork_chain_3.at_height(2)) + .expect_block_connected(*main_chain.at_height(2)) + .expect_block_connected(*main_chain.at_height(3)) + .expect_block_connected(*main_chain.at_height(4)); + + let listeners = vec![ + (fork_chain_1.tip().block_hash, &mut listener_1 as &mut dyn chain::Listen), + (fork_chain_2.tip().block_hash, &mut listener_2 as &mut dyn chain::Listen), + (fork_chain_3.tip().block_hash, &mut listener_3 as &mut dyn chain::Listen), + ]; + let mut cache = fork_chain_1.header_cache(2..=4); + cache.extend(fork_chain_2.header_cache(3..=4)); + cache.extend(fork_chain_3.header_cache(4..=4)); + match synchronize_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await { + Ok(header) => assert_eq!(header, main_chain.tip()), + Err(e) => panic!("Unexpected error: {:?}", e), + } + } + + #[tokio::test] + async fn cache_connected_and_keep_disconnected_blocks() { + let mut main_chain = Blockchain::default().with_height(2); + let fork_chain = main_chain.fork_at_height(1); + let new_tip = main_chain.tip(); + let old_tip = fork_chain.tip(); + + let mut listener = MockChainListener::new() + .expect_block_disconnected(*old_tip) + .expect_block_connected(*new_tip); + + let listeners = vec![(old_tip.block_hash, &mut listener as &mut dyn chain::Listen)]; + let mut cache = fork_chain.header_cache(2..=2); + match synchronize_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await { + Ok(_) => { + assert!(cache.contains_key(&new_tip.block_hash)); + assert!(cache.contains_key(&old_tip.block_hash)); + }, + Err(e) => panic!("Unexpected error: {:?}", e), + } + } +} diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 5ed8689fb96..db536fe7d43 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -19,6 +19,7 @@ #[cfg(any(feature = "rest-client", feature = "rpc-client"))] pub mod http; +pub mod init; pub mod poll; #[cfg(feature = "rest-client")] @@ -42,7 +43,11 @@ use bitcoin::blockdata::block::{Block, BlockHeader}; use bitcoin::hash_types::BlockHash; use bitcoin::util::uint::Uint256; +use lightning::chain; +use lightning::chain::Listen; + use std::future::Future; +use std::ops::Deref; use std::pin::Pin; /// Abstract type for retrieving block headers and data. @@ -154,22 +159,11 @@ pub struct BlockHeaderData { /// custom cache eviction policy. This offers flexibility to those sensitive to resource usage. /// Hence, there is a trade-off between a lower memory footprint and potentially increased network /// I/O as headers are re-fetched during fork detection. -pub struct SpvClient { +pub struct SpvClient<'a, P: Poll, C: Cache, L: Deref> +where L::Target: chain::Listen { chain_tip: ValidatedBlockHeader, chain_poller: P, - chain_notifier: ChainNotifier, - chain_listener: L, -} - -/// Adaptor used for notifying when blocks have been connected or disconnected from the chain. -/// -/// Used when needing to replay chain data upon startup or as new chain events occur. -pub trait ChainListener { - /// Notifies the listener that a block was added at the given height. - fn block_connected(&mut self, block: &Block, height: u32); - - /// Notifies the listener that a block was removed at the given height. - fn block_disconnected(&mut self, header: &BlockHeader, height: u32); + chain_notifier: ChainNotifier<'a, C, L>, } /// The `Cache` trait defines behavior for managing a block header cache, where block headers are @@ -214,7 +208,7 @@ impl Cache for UnboundedCache { } } -impl SpvClient { +impl<'a, P: Poll, C: Cache, L: Deref> SpvClient<'a, P, C, L> where L::Target: chain::Listen { /// Creates a new SPV client using `chain_tip` as the best known chain tip. /// /// Subsequent calls to [`poll_best_tip`] will poll for the best chain tip using the given chain @@ -228,11 +222,11 @@ impl SpvClient { pub fn new( chain_tip: ValidatedBlockHeader, chain_poller: P, - header_cache: C, + header_cache: &'a mut C, chain_listener: L, ) -> Self { - let chain_notifier = ChainNotifier { header_cache }; - Self { chain_tip, chain_poller, chain_notifier, chain_listener } + let chain_notifier = ChainNotifier { header_cache, chain_listener }; + Self { chain_tip, chain_poller, chain_notifier } } /// Polls for the best tip and updates the chain listener with any connected or disconnected @@ -262,7 +256,7 @@ impl SpvClient { /// blocks. Returns whether there were any such blocks. async fn update_chain_tip(&mut self, best_chain_tip: ValidatedBlockHeader) -> bool { match self.chain_notifier.synchronize_listener( - best_chain_tip, &self.chain_tip, &mut self.chain_poller, &mut self.chain_listener).await + best_chain_tip, &self.chain_tip, &mut self.chain_poller).await { Ok(_) => { self.chain_tip = best_chain_tip; @@ -279,10 +273,13 @@ impl SpvClient { /// Notifies [listeners] of blocks that have been connected or disconnected from the chain. /// -/// [listeners]: trait.ChainListener.html -struct ChainNotifier { +/// [listeners]: ../../lightning/chain/trait.Listen.html +pub struct ChainNotifier<'a, C: Cache, L: Deref> where L::Target: chain::Listen { /// Cache for looking up headers before fetching from a block source. - header_cache: C, + header_cache: &'a mut C, + + /// Listener that will be notified of connected or disconnected blocks. + chain_listener: L, } /// Changes made to the chain between subsequent polls that transformed it from having one chain tip @@ -291,6 +288,11 @@ struct ChainNotifier { /// Blocks are given in height-descending order. Therefore, blocks are first disconnected in order /// before new blocks are connected in reverse order. struct ChainDifference { + /// The most recent ancestor common between the chain tips. + /// + /// If there are any disconnected blocks, this is where the chain forked. + common_ancestor: ValidatedBlockHeader, + /// Blocks that were disconnected from the chain since the last poll. disconnected_blocks: Vec, @@ -298,45 +300,28 @@ struct ChainDifference { connected_blocks: Vec, } -impl ChainNotifier { - /// Finds the fork point between `new_header` and `old_header`, disconnecting blocks from - /// `old_header` to get to that point and then connecting blocks until `new_header`. +impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L> where L::Target: chain::Listen { + /// Finds the first common ancestor between `new_header` and `old_header`, disconnecting blocks + /// from `old_header` to get to that point and then connecting blocks until `new_header`. /// /// Validates headers along the transition path, but doesn't fetch blocks until the chain is /// disconnected to the fork point. Thus, this may return an `Err` that includes where the tip /// ended up which may not be `new_header`. Note that the returned `Err` contains `Some` header /// if and only if the transition from `old_header` to `new_header` is valid. - async fn synchronize_listener( + async fn synchronize_listener( &mut self, new_header: ValidatedBlockHeader, old_header: &ValidatedBlockHeader, chain_poller: &mut P, - chain_listener: &mut L, ) -> Result<(), (BlockSourceError, Option)> { - let mut difference = self.find_difference(new_header, old_header, chain_poller).await + let difference = self.find_difference(new_header, old_header, chain_poller).await .map_err(|e| (e, None))?; - - let mut new_tip = *old_header; - for header in difference.disconnected_blocks.drain(..) { - if let Some(cached_header) = self.header_cache.block_disconnected(&header.block_hash) { - assert_eq!(cached_header, header); - } - chain_listener.block_disconnected(&header.header, header.height); - new_tip = header; - } - - for header in difference.connected_blocks.drain(..).rev() { - let block = chain_poller - .fetch_block(&header).await - .or_else(|e| Err((e, Some(new_tip))))?; - debug_assert_eq!(block.block_hash, header.block_hash); - - self.header_cache.block_connected(header.block_hash, header); - chain_listener.block_connected(&block, header.height); - new_tip = header; - } - - Ok(()) + self.disconnect_blocks(difference.disconnected_blocks); + self.connect_blocks( + difference.common_ancestor, + difference.connected_blocks, + chain_poller, + ).await } /// Returns the changes needed to produce the chain with `current_header` as its tip from the @@ -373,7 +358,8 @@ impl ChainNotifier { } } - Ok(ChainDifference { disconnected_blocks, connected_blocks }) + let common_ancestor = current; + Ok(ChainDifference { common_ancestor, disconnected_blocks, connected_blocks }) } /// Returns the previous header for the given header, either by looking it up in the cache or @@ -388,6 +374,37 @@ impl ChainNotifier { None => chain_poller.look_up_previous_header(header).await, } } + + /// Notifies the chain listeners of disconnected blocks. + fn disconnect_blocks(&mut self, mut disconnected_blocks: Vec) { + for header in disconnected_blocks.drain(..) { + if let Some(cached_header) = self.header_cache.block_disconnected(&header.block_hash) { + assert_eq!(cached_header, header); + } + self.chain_listener.block_disconnected(&header.header, header.height); + } + } + + /// Notifies the chain listeners of connected blocks. + async fn connect_blocks( + &mut self, + mut new_tip: ValidatedBlockHeader, + mut connected_blocks: Vec, + chain_poller: &mut P, + ) -> Result<(), (BlockSourceError, Option)> { + for header in connected_blocks.drain(..).rev() { + let block = chain_poller + .fetch_block(&header).await + .or_else(|e| Err((e, Some(new_tip))))?; + debug_assert_eq!(block.block_hash, header.block_hash); + + self.header_cache.block_connected(header.block_hash, header); + self.chain_listener.block_connected(&block, header.height); + new_tip = header; + } + + Ok(()) + } } #[cfg(test)] @@ -403,8 +420,9 @@ mod spv_client_tests { let best_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); - let mut client = SpvClient::new(best_tip, poller, cache, NullChainListener {}); + let mut cache = UnboundedCache::new(); + let mut listener = NullChainListener {}; + let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener); match client.poll_best_tip().await { Err(e) => { assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); @@ -421,8 +439,9 @@ mod spv_client_tests { let common_tip = chain.tip(); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); - let mut client = SpvClient::new(common_tip, poller, cache, NullChainListener {}); + let mut cache = UnboundedCache::new(); + let mut listener = NullChainListener {}; + let mut client = SpvClient::new(common_tip, poller, &mut cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -440,8 +459,9 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); - let mut client = SpvClient::new(old_tip, poller, cache, NullChainListener {}); + let mut cache = UnboundedCache::new(); + let mut listener = NullChainListener {}; + let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -459,8 +479,9 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); - let mut client = SpvClient::new(old_tip, poller, cache, NullChainListener {}); + let mut cache = UnboundedCache::new(); + let mut listener = NullChainListener {}; + let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -478,8 +499,9 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); - let mut client = SpvClient::new(old_tip, poller, cache, NullChainListener {}); + let mut cache = UnboundedCache::new(); + let mut listener = NullChainListener {}; + let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -498,8 +520,9 @@ mod spv_client_tests { let worse_tip = chain.tip(); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); - let mut client = SpvClient::new(best_tip, poller, cache, NullChainListener {}); + let mut cache = UnboundedCache::new(); + let mut listener = NullChainListener {}; + let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -524,12 +547,15 @@ mod chain_notifier_tests { let new_tip = chain.tip(); let old_tip = chain.at_height(1); - let mut listener = MockChainListener::new() + let chain_listener = &MockChainListener::new() .expect_block_connected(*chain.at_height(2)) .expect_block_connected(*new_tip); - let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=1) }; + let mut notifier = ChainNotifier { + header_cache: &mut chain.header_cache(0..=1), + chain_listener, + }; let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await { + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { Err((e, _)) => panic!("Unexpected error: {:?}", e), Ok(_) => {}, } @@ -542,10 +568,13 @@ mod chain_notifier_tests { let new_tip = test_chain.tip(); let old_tip = main_chain.tip(); - let mut listener = MockChainListener::new(); - let mut notifier = ChainNotifier { header_cache: main_chain.header_cache(0..=1) }; + let chain_listener = &MockChainListener::new(); + let mut notifier = ChainNotifier { + header_cache: &mut main_chain.header_cache(0..=1), + chain_listener, + }; let mut poller = poll::ChainPoller::new(&mut test_chain, Network::Testnet); - match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await { + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { Err((e, _)) => { assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); assert_eq!(e.into_inner().as_ref().to_string(), "genesis block reached"); @@ -561,12 +590,15 @@ mod chain_notifier_tests { let new_tip = fork_chain.tip(); let old_tip = main_chain.tip(); - let mut listener = MockChainListener::new() + let chain_listener = &MockChainListener::new() .expect_block_disconnected(*old_tip) .expect_block_connected(*new_tip); - let mut notifier = ChainNotifier { header_cache: main_chain.header_cache(0..=2) }; + let mut notifier = ChainNotifier { + header_cache: &mut main_chain.header_cache(0..=2), + chain_listener, + }; let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet); - match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await { + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { Err((e, _)) => panic!("Unexpected error: {:?}", e), Ok(_) => {}, } @@ -580,13 +612,16 @@ mod chain_notifier_tests { let new_tip = fork_chain.tip(); let old_tip = main_chain.tip(); - let mut listener = MockChainListener::new() + let chain_listener = &MockChainListener::new() .expect_block_disconnected(*old_tip) .expect_block_disconnected(*main_chain.at_height(2)) .expect_block_connected(*new_tip); - let mut notifier = ChainNotifier { header_cache: main_chain.header_cache(0..=3) }; + let mut notifier = ChainNotifier { + header_cache: &mut main_chain.header_cache(0..=3), + chain_listener, + }; let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet); - match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await { + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { Err((e, _)) => panic!("Unexpected error: {:?}", e), Ok(_) => {}, } @@ -600,13 +635,16 @@ mod chain_notifier_tests { let new_tip = fork_chain.tip(); let old_tip = main_chain.tip(); - let mut listener = MockChainListener::new() + let chain_listener = &MockChainListener::new() .expect_block_disconnected(*old_tip) .expect_block_connected(*fork_chain.at_height(2)) .expect_block_connected(*new_tip); - let mut notifier = ChainNotifier { header_cache: main_chain.header_cache(0..=2) }; + let mut notifier = ChainNotifier { + header_cache: &mut main_chain.header_cache(0..=2), + chain_listener, + }; let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet); - match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await { + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { Err((e, _)) => panic!("Unexpected error: {:?}", e), Ok(_) => {}, } @@ -618,10 +656,13 @@ mod chain_notifier_tests { let new_tip = chain.tip(); let old_tip = chain.at_height(1); - let mut listener = MockChainListener::new(); - let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=1) }; + let chain_listener = &MockChainListener::new(); + let mut notifier = ChainNotifier { + header_cache: &mut chain.header_cache(0..=1), + chain_listener, + }; let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await { + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { Err((_, tip)) => assert_eq!(tip, None), Ok(_) => panic!("Expected error"), } @@ -633,10 +674,13 @@ mod chain_notifier_tests { let new_tip = chain.tip(); let old_tip = chain.at_height(1); - let mut listener = MockChainListener::new(); - let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=3) }; + let chain_listener = &MockChainListener::new(); + let mut notifier = ChainNotifier { + header_cache: &mut chain.header_cache(0..=3), + chain_listener, + }; let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await { + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { Err((_, tip)) => assert_eq!(tip, Some(old_tip)), Ok(_) => panic!("Expected error"), } @@ -648,11 +692,14 @@ mod chain_notifier_tests { let new_tip = chain.tip(); let old_tip = chain.at_height(1); - let mut listener = MockChainListener::new() + let chain_listener = &MockChainListener::new() .expect_block_connected(*chain.at_height(2)); - let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=3) }; + let mut notifier = ChainNotifier { + header_cache: &mut chain.header_cache(0..=3), + chain_listener, + }; let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await { + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { Err((_, tip)) => assert_eq!(tip, Some(chain.at_height(2))), Ok(_) => panic!("Expected error"), } diff --git a/lightning-block-sync/src/test_utils.rs b/lightning-block-sync/src/test_utils.rs index 807a33a69de..8c37c94d8ec 100644 --- a/lightning-block-sync/src/test_utils.rs +++ b/lightning-block-sync/src/test_utils.rs @@ -1,4 +1,4 @@ -use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSource, BlockSourceError, ChainListener, UnboundedCache}; +use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSource, BlockSourceError, UnboundedCache}; use crate::poll::{Validate, ValidatedBlockHeader}; use bitcoin::blockdata::block::{Block, BlockHeader}; @@ -7,6 +7,9 @@ use bitcoin::hash_types::BlockHash; use bitcoin::network::constants::Network; use bitcoin::util::uint::Uint256; +use lightning::chain; + +use std::cell::RefCell; use std::collections::VecDeque; #[derive(Default)] @@ -162,38 +165,38 @@ impl BlockSource for Blockchain { pub struct NullChainListener; -impl ChainListener for NullChainListener { - fn block_connected(&mut self, _block: &Block, _height: u32) {} - fn block_disconnected(&mut self, _header: &BlockHeader, _height: u32) {} +impl chain::Listen for NullChainListener { + fn block_connected(&self, _block: &Block, _height: u32) {} + fn block_disconnected(&self, _header: &BlockHeader, _height: u32) {} } pub struct MockChainListener { - expected_blocks_connected: VecDeque, - expected_blocks_disconnected: VecDeque, + expected_blocks_connected: RefCell>, + expected_blocks_disconnected: RefCell>, } impl MockChainListener { pub fn new() -> Self { Self { - expected_blocks_connected: VecDeque::new(), - expected_blocks_disconnected: VecDeque::new(), + expected_blocks_connected: RefCell::new(VecDeque::new()), + expected_blocks_disconnected: RefCell::new(VecDeque::new()), } } - pub fn expect_block_connected(mut self, block: BlockHeaderData) -> Self { - self.expected_blocks_connected.push_back(block); + pub fn expect_block_connected(self, block: BlockHeaderData) -> Self { + self.expected_blocks_connected.borrow_mut().push_back(block); self } - pub fn expect_block_disconnected(mut self, block: BlockHeaderData) -> Self { - self.expected_blocks_disconnected.push_back(block); + pub fn expect_block_disconnected(self, block: BlockHeaderData) -> Self { + self.expected_blocks_disconnected.borrow_mut().push_back(block); self } } -impl ChainListener for MockChainListener { - fn block_connected(&mut self, block: &Block, height: u32) { - match self.expected_blocks_connected.pop_front() { +impl chain::Listen for MockChainListener { + fn block_connected(&self, block: &Block, height: u32) { + match self.expected_blocks_connected.borrow_mut().pop_front() { None => { panic!("Unexpected block connected: {:?}", block.block_hash()); }, @@ -204,8 +207,8 @@ impl ChainListener for MockChainListener { } } - fn block_disconnected(&mut self, header: &BlockHeader, height: u32) { - match self.expected_blocks_disconnected.pop_front() { + fn block_disconnected(&self, header: &BlockHeader, height: u32) { + match self.expected_blocks_disconnected.borrow_mut().pop_front() { None => { panic!("Unexpected block disconnected: {:?}", header.block_hash()); }, @@ -222,11 +225,15 @@ impl Drop for MockChainListener { if std::thread::panicking() { return; } - if !self.expected_blocks_connected.is_empty() { - panic!("Expected blocks connected: {:?}", self.expected_blocks_connected); + + let expected_blocks_connected = self.expected_blocks_connected.borrow(); + if !expected_blocks_connected.is_empty() { + panic!("Expected blocks connected: {:?}", expected_blocks_connected); } - if !self.expected_blocks_disconnected.is_empty() { - panic!("Expected blocks disconnected: {:?}", self.expected_blocks_disconnected); + + let expected_blocks_disconnected = self.expected_blocks_disconnected.borrow(); + if !expected_blocks_disconnected.is_empty() { + panic!("Expected blocks disconnected: {:?}", expected_blocks_disconnected); } } } diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 8483f5ca829..4cb9128d92f 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -29,7 +29,7 @@ //! [`ChannelMonitor`]: ../channelmonitor/struct.ChannelMonitor.html //! [`MonitorEvent`]: ../channelmonitor/enum.MonitorEvent.html -use bitcoin::blockdata::block::BlockHeader; +use bitcoin::blockdata::block::{Block, BlockHeader}; use chain; use chain::Filter; @@ -140,6 +140,26 @@ where C::Target: chain::Filter, } } +impl +chain::Listen for ChainMonitor +where + ChannelSigner: Sign, + C::Target: chain::Filter, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + P::Target: channelmonitor::Persist, +{ + fn block_connected(&self, block: &Block, height: u32) { + let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); + ChainMonitor::block_connected(self, &block.header, &txdata, height); + } + + fn block_disconnected(&self, header: &BlockHeader, height: u32) { + ChainMonitor::block_disconnected(self, header, height); + } +} + impl chain::Watch for ChainMonitor where C::Target: chain::Filter, diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 0266f31e204..71490dd6e0e 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -22,7 +22,7 @@ //! //! [`chain::Watch`]: ../trait.Watch.html -use bitcoin::blockdata::block::BlockHeader; +use bitcoin::blockdata::block::{Block, BlockHeader}; use bitcoin::blockdata::transaction::{TxOut,Transaction}; use bitcoin::blockdata::transaction::OutPoint as BitcoinOutPoint; use bitcoin::blockdata::script::{Script, Builder}; @@ -41,6 +41,7 @@ use ln::chan_utils; use ln::chan_utils::{CounterpartyCommitmentSecrets, HTLCOutputInCommitment, HTLCType, ChannelTransactionParameters, HolderCommitmentTransaction}; use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash}; use ln::onchaintx::{OnchainTxHandler, InputDescriptors}; +use chain; use chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use chain::transaction::{OutPoint, TransactionData}; use chain::keysinterface::{SpendableOutputDescriptor, StaticPaymentOutputDescriptor, DelayedPaymentOutputDescriptor, Sign, KeysInterface}; @@ -49,6 +50,7 @@ use util::ser::{Readable, ReadableArgs, MaybeReadable, Writer, Writeable, U48}; use util::byte_utils; use util::events::Event; +use std::cell::RefCell; use std::collections::{HashMap, HashSet, hash_map}; use std::{cmp, mem}; use std::ops::Deref; @@ -2297,6 +2299,22 @@ pub trait Persist: Send + Sync { fn update_persisted_channel(&self, id: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; } +impl chain::Listen for (RefCell>, T, F, L) +where + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, +{ + fn block_connected(&self, block: &Block, height: u32) { + let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); + self.0.borrow_mut().block_connected(&block.header, &txdata, height, &*self.1, &*self.2, &*self.3); + } + + fn block_disconnected(&self, header: &BlockHeader, height: u32) { + self.0.borrow_mut().block_disconnected(header, height, &*self.1, &*self.2, &*self.3); + } +} + const MAX_ALLOC_SIZE: usize = 64*1024; impl<'a, Signer: Sign, K: KeysInterface> ReadableArgs<&'a K> diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index 1d51f262216..c1fceec8c47 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -9,6 +9,7 @@ //! Structs and traits which allow other parts of rust-lightning to interact with the blockchain. +use bitcoin::blockdata::block::{Block, BlockHeader}; use bitcoin::blockdata::script::Script; use bitcoin::blockdata::transaction::TxOut; use bitcoin::hash_types::{BlockHash, Txid}; @@ -46,6 +47,18 @@ pub trait Access: Send + Sync { fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> Result; } +/// The `Listen` trait is used to be notified of when blocks have been connected or disconnected +/// from the chain. +/// +/// Useful when needing to replay chain data upon startup or as new chain events occur. +pub trait Listen { + /// Notifies the listener that a block was added at the given height. + fn block_connected(&self, block: &Block, height: u32); + + /// Notifies the listener that a block was removed at the given height. + fn block_disconnected(&self, header: &BlockHeader, height: u32); +} + /// The `Watch` trait defines behavior for watching on-chain activity pertaining to channels as /// blocks are connected and disconnected. /// @@ -123,3 +136,29 @@ pub trait Filter: Send + Sync { /// `script_pubkey` as the spending condition. fn register_output(&self, outpoint: &OutPoint, script_pubkey: &Script); } + +impl Listen for std::ops::Deref { + fn block_connected(&self, block: &Block, height: u32) { + (**self).block_connected(block, height); + } + + fn block_disconnected(&self, header: &BlockHeader, height: u32) { + (**self).block_disconnected(header, height); + } +} + +impl Listen for (T, U) +where + T::Target: Listen, + U::Target: Listen, +{ + fn block_connected(&self, block: &Block, height: u32) { + self.0.block_connected(block, height); + self.1.block_connected(block, height); + } + + fn block_disconnected(&self, header: &BlockHeader, height: u32) { + self.0.block_disconnected(header, height); + self.1.block_disconnected(header, height); + } +} diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index a759443d86e..8f7fbde5f9c 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -18,7 +18,7 @@ //! imply it needs to fail HTLCs/payments/channels it manages). //! -use bitcoin::blockdata::block::BlockHeader; +use bitcoin::blockdata::block::{Block, BlockHeader}; use bitcoin::blockdata::constants::genesis_block; use bitcoin::network::constants::Network; @@ -3139,6 +3139,24 @@ impl EventsProvi } } +impl chain::Listen for ChannelManager +where + M::Target: chain::Watch, + T::Target: BroadcasterInterface, + K::Target: KeysInterface, + F::Target: FeeEstimator, + L::Target: Logger, +{ + fn block_connected(&self, block: &Block, height: u32) { + let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); + ChannelManager::block_connected(self, &block.header, &txdata, height); + } + + fn block_disconnected(&self, header: &BlockHeader, _height: u32) { + ChannelManager::block_disconnected(self, header); + } +} + impl ChannelManager where M::Target: chain::Watch, T::Target: BroadcasterInterface,