@@ -31,6 +31,8 @@ mod test_utils;
3131#[ cfg( any( feature = "rest-client" , feature = "rpc-client" ) ) ]
3232mod utils;
3333
34+ use crate :: poll:: { Poll , ValidatedBlockHeader } ;
35+
3436use bitcoin:: blockdata:: block:: { Block , BlockHeader } ;
3537use bitcoin:: hash_types:: BlockHash ;
3638use bitcoin:: util:: uint:: Uint256 ;
@@ -130,3 +132,317 @@ pub struct BlockHeaderData {
130132 /// of equivalent weight.
131133 pub chainwork : Uint256 ,
132134}
135+
136+ /// Adaptor used for notifying when blocks have been connected or disconnected from the chain.
137+ ///
138+ /// Used when needing to replay chain data upon startup or as new chain events occur.
139+ pub trait ChainListener {
140+ /// Notifies the listener that a block was added at the given height.
141+ fn block_connected ( & mut self , block : & Block , height : u32 ) ;
142+
143+ /// Notifies the listener that a block was removed at the given height.
144+ fn block_disconnected ( & mut self , header : & BlockHeader , height : u32 ) ;
145+ }
146+
147+ /// The `Cache` trait defines behavior for managing a block header cache, where block headers are
148+ /// keyed by block hash.
149+ ///
150+ /// Used by [`ChainNotifier`] to store headers along the best chain, which is important for ensuring
151+ /// that blocks can be disconnected if they are no longer accessible from a block source (e.g., if
152+ /// the block source does not store stale forks indefinitely).
153+ ///
154+ /// Implementations may define how long to retain headers such that it's unlikely they will ever be
155+ /// needed to disconnect a block. In cases where block sources provide access to headers on stale
156+ /// forks reliably, caches may be entirely unnecessary.
157+ ///
158+ /// [`ChainNotifier`]: struct.ChainNotifier.html
159+ pub trait Cache {
160+ /// Retrieves the block header keyed by the given block hash.
161+ fn look_up ( & self , block_hash : & BlockHash ) -> Option < & ValidatedBlockHeader > ;
162+
163+ /// Called when a block has been connected to the best chain to ensure it is available to be
164+ /// disconnected later if needed.
165+ fn block_connected ( & mut self , block_hash : BlockHash , block_header : ValidatedBlockHeader ) ;
166+
167+ /// Called when a block has been disconnected from the best chain. Once disconnected, a block's
168+ /// header is no longer needed and thus can be removed.
169+ fn block_disconnected ( & mut self , block_hash : & BlockHash ) -> Option < ValidatedBlockHeader > ;
170+ }
171+
172+ /// Unbounded cache of block headers keyed by block hash.
173+ pub type UnboundedCache = std:: collections:: HashMap < BlockHash , ValidatedBlockHeader > ;
174+
175+ impl Cache for UnboundedCache {
176+ fn look_up ( & self , block_hash : & BlockHash ) -> Option < & ValidatedBlockHeader > {
177+ self . get ( block_hash)
178+ }
179+
180+ fn block_connected ( & mut self , block_hash : BlockHash , block_header : ValidatedBlockHeader ) {
181+ self . insert ( block_hash, block_header) ;
182+ }
183+
184+ fn block_disconnected ( & mut self , block_hash : & BlockHash ) -> Option < ValidatedBlockHeader > {
185+ self . remove ( block_hash)
186+ }
187+ }
188+
189+ /// Notifies [listeners] of blocks that have been connected or disconnected from the chain.
190+ ///
191+ /// [listeners]: trait.ChainListener.html
192+ struct ChainNotifier < C : Cache > {
193+ /// Cache for looking up headers before fetching from a block source.
194+ header_cache : C ,
195+ }
196+
197+ /// Changes made to the chain between subsequent polls that transformed it from having one chain tip
198+ /// to another.
199+ ///
200+ /// Blocks are given in height-descending order. Therefore, blocks are first disconnected in order
201+ /// before new blocks are connected in reverse order.
202+ struct ChainDifference {
203+ /// Blocks that were disconnected from the chain since the last poll.
204+ disconnected_blocks : Vec < ValidatedBlockHeader > ,
205+
206+ /// Blocks that were connected to the chain since the last poll.
207+ connected_blocks : Vec < ValidatedBlockHeader > ,
208+ }
209+
210+ impl < C : Cache > ChainNotifier < C > {
211+ /// Finds the fork point between `new_header` and `old_header`, disconnecting blocks from
212+ /// `old_header` to get to that point and then connecting blocks until `new_header`.
213+ ///
214+ /// Validates headers along the transition path, but doesn't fetch blocks until the chain is
215+ /// disconnected to the fork point. Thus, this may return an `Err` that includes where the tip
216+ /// ended up which may not be `new_header`. Note that the returned `Err` contains `Some` header
217+ /// if and only if the transition from `old_header` to `new_header` is valid.
218+ async fn synchronize_listener < L : ChainListener , P : Poll > (
219+ & mut self ,
220+ new_header : ValidatedBlockHeader ,
221+ old_header : & ValidatedBlockHeader ,
222+ chain_poller : & mut P ,
223+ chain_listener : & mut L ,
224+ ) -> Result < ( ) , ( BlockSourceError , Option < ValidatedBlockHeader > ) > {
225+ let mut difference = self . find_difference ( new_header, old_header, chain_poller) . await
226+ . map_err ( |e| ( e, None ) ) ?;
227+
228+ let mut new_tip = * old_header;
229+ for header in difference. disconnected_blocks . drain ( ..) {
230+ if let Some ( cached_header) = self . header_cache . block_disconnected ( & header. block_hash ) {
231+ assert_eq ! ( cached_header, header) ;
232+ }
233+ chain_listener. block_disconnected ( & header. header , header. height ) ;
234+ new_tip = header;
235+ }
236+
237+ for header in difference. connected_blocks . drain ( ..) . rev ( ) {
238+ let block = chain_poller
239+ . fetch_block ( & header) . await
240+ . or_else ( |e| Err ( ( e, Some ( new_tip) ) ) ) ?;
241+ debug_assert_eq ! ( block. block_hash, header. block_hash) ;
242+
243+ self . header_cache . block_connected ( header. block_hash , header) ;
244+ chain_listener. block_connected ( & block, header. height ) ;
245+ new_tip = header;
246+ }
247+
248+ Ok ( ( ) )
249+ }
250+
251+ /// Returns the changes needed to produce the chain with `current_header` as its tip from the
252+ /// chain with `prev_header` as its tip.
253+ ///
254+ /// Walks backwards from `current_header` and `prev_header`, finding the common ancestor.
255+ async fn find_difference < P : Poll > (
256+ & self ,
257+ current_header : ValidatedBlockHeader ,
258+ prev_header : & ValidatedBlockHeader ,
259+ chain_poller : & mut P ,
260+ ) -> BlockSourceResult < ChainDifference > {
261+ let mut disconnected_blocks = Vec :: new ( ) ;
262+ let mut connected_blocks = Vec :: new ( ) ;
263+ let mut current = current_header;
264+ let mut previous = * prev_header;
265+ loop {
266+ // Found the common ancestor.
267+ if current. block_hash == previous. block_hash {
268+ break ;
269+ }
270+
271+ // Walk back the chain, finding blocks needed to connect and disconnect. Only walk back
272+ // the header with the greater height, or both if equal heights.
273+ let current_height = current. height ;
274+ let previous_height = previous. height ;
275+ if current_height <= previous_height {
276+ disconnected_blocks. push ( previous) ;
277+ previous = self . look_up_previous_header ( chain_poller, & previous) . await ?;
278+ }
279+ if current_height >= previous_height {
280+ connected_blocks. push ( current) ;
281+ current = self . look_up_previous_header ( chain_poller, & current) . await ?;
282+ }
283+ }
284+
285+ Ok ( ChainDifference { disconnected_blocks, connected_blocks } )
286+ }
287+
288+ /// Returns the previous header for the given header, either by looking it up in the cache or
289+ /// fetching it if not found.
290+ async fn look_up_previous_header < P : Poll > (
291+ & self ,
292+ chain_poller : & mut P ,
293+ header : & ValidatedBlockHeader ,
294+ ) -> BlockSourceResult < ValidatedBlockHeader > {
295+ match self . header_cache . look_up ( & header. header . prev_blockhash ) {
296+ Some ( prev_header) => Ok ( * prev_header) ,
297+ None => chain_poller. look_up_previous_header ( header) . await ,
298+ }
299+ }
300+ }
301+
302+ #[ cfg( test) ]
303+ mod chain_notifier_tests {
304+ use crate :: test_utils:: { Blockchain , MockChainListener } ;
305+ use super :: * ;
306+
307+ use bitcoin:: network:: constants:: Network ;
308+
309+ #[ tokio:: test]
310+ async fn sync_from_same_chain ( ) {
311+ let mut chain = Blockchain :: default ( ) . with_height ( 3 ) ;
312+
313+ let new_tip = chain. tip ( ) ;
314+ let old_tip = chain. at_height ( 1 ) ;
315+ let mut listener = MockChainListener :: new ( )
316+ . expect_block_connected ( * chain. at_height ( 2 ) )
317+ . expect_block_connected ( * new_tip) ;
318+ let mut notifier = ChainNotifier { header_cache : chain. header_cache ( 0 ..=1 ) } ;
319+ let mut poller = poll:: ChainPoller :: new ( & mut chain, Network :: Testnet ) ;
320+ match notifier. synchronize_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
321+ Err ( ( e, _) ) => panic ! ( "Unexpected error: {:?}" , e) ,
322+ Ok ( _) => { } ,
323+ }
324+ }
325+
326+ #[ tokio:: test]
327+ async fn sync_from_different_chains ( ) {
328+ let mut test_chain = Blockchain :: with_network ( Network :: Testnet ) . with_height ( 1 ) ;
329+ let main_chain = Blockchain :: with_network ( Network :: Bitcoin ) . with_height ( 1 ) ;
330+
331+ let new_tip = test_chain. tip ( ) ;
332+ let old_tip = main_chain. tip ( ) ;
333+ let mut listener = MockChainListener :: new ( ) ;
334+ let mut notifier = ChainNotifier { header_cache : main_chain. header_cache ( 0 ..=1 ) } ;
335+ let mut poller = poll:: ChainPoller :: new ( & mut test_chain, Network :: Testnet ) ;
336+ match notifier. synchronize_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
337+ Err ( ( e, _) ) => {
338+ assert_eq ! ( e. kind( ) , BlockSourceErrorKind :: Persistent ) ;
339+ assert_eq ! ( e. into_inner( ) . as_ref( ) . to_string( ) , "genesis block reached" ) ;
340+ } ,
341+ Ok ( _) => panic ! ( "Expected error" ) ,
342+ }
343+ }
344+
345+ #[ tokio:: test]
346+ async fn sync_from_equal_length_fork ( ) {
347+ let main_chain = Blockchain :: default ( ) . with_height ( 2 ) ;
348+ let mut fork_chain = main_chain. fork_at_height ( 1 ) ;
349+
350+ let new_tip = fork_chain. tip ( ) ;
351+ let old_tip = main_chain. tip ( ) ;
352+ let mut listener = MockChainListener :: new ( )
353+ . expect_block_disconnected ( * old_tip)
354+ . expect_block_connected ( * new_tip) ;
355+ let mut notifier = ChainNotifier { header_cache : main_chain. header_cache ( 0 ..=2 ) } ;
356+ let mut poller = poll:: ChainPoller :: new ( & mut fork_chain, Network :: Testnet ) ;
357+ match notifier. synchronize_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
358+ Err ( ( e, _) ) => panic ! ( "Unexpected error: {:?}" , e) ,
359+ Ok ( _) => { } ,
360+ }
361+ }
362+
363+ #[ tokio:: test]
364+ async fn sync_from_shorter_fork ( ) {
365+ let main_chain = Blockchain :: default ( ) . with_height ( 3 ) ;
366+ let mut fork_chain = main_chain. fork_at_height ( 1 ) ;
367+ fork_chain. disconnect_tip ( ) ;
368+
369+ let new_tip = fork_chain. tip ( ) ;
370+ let old_tip = main_chain. tip ( ) ;
371+ let mut listener = MockChainListener :: new ( )
372+ . expect_block_disconnected ( * old_tip)
373+ . expect_block_disconnected ( * main_chain. at_height ( 2 ) )
374+ . expect_block_connected ( * new_tip) ;
375+ let mut notifier = ChainNotifier { header_cache : main_chain. header_cache ( 0 ..=3 ) } ;
376+ let mut poller = poll:: ChainPoller :: new ( & mut fork_chain, Network :: Testnet ) ;
377+ match notifier. synchronize_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
378+ Err ( ( e, _) ) => panic ! ( "Unexpected error: {:?}" , e) ,
379+ Ok ( _) => { } ,
380+ }
381+ }
382+
383+ #[ tokio:: test]
384+ async fn sync_from_longer_fork ( ) {
385+ let mut main_chain = Blockchain :: default ( ) . with_height ( 3 ) ;
386+ let mut fork_chain = main_chain. fork_at_height ( 1 ) ;
387+ main_chain. disconnect_tip ( ) ;
388+
389+ let new_tip = fork_chain. tip ( ) ;
390+ let old_tip = main_chain. tip ( ) ;
391+ let mut listener = MockChainListener :: new ( )
392+ . expect_block_disconnected ( * old_tip)
393+ . expect_block_connected ( * fork_chain. at_height ( 2 ) )
394+ . expect_block_connected ( * new_tip) ;
395+ let mut notifier = ChainNotifier { header_cache : main_chain. header_cache ( 0 ..=2 ) } ;
396+ let mut poller = poll:: ChainPoller :: new ( & mut fork_chain, Network :: Testnet ) ;
397+ match notifier. synchronize_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
398+ Err ( ( e, _) ) => panic ! ( "Unexpected error: {:?}" , e) ,
399+ Ok ( _) => { } ,
400+ }
401+ }
402+
403+ #[ tokio:: test]
404+ async fn sync_from_chain_without_headers ( ) {
405+ let mut chain = Blockchain :: default ( ) . with_height ( 3 ) . without_headers ( ) ;
406+
407+ let new_tip = chain. tip ( ) ;
408+ let old_tip = chain. at_height ( 1 ) ;
409+ let mut listener = MockChainListener :: new ( ) ;
410+ let mut notifier = ChainNotifier { header_cache : chain. header_cache ( 0 ..=1 ) } ;
411+ let mut poller = poll:: ChainPoller :: new ( & mut chain, Network :: Testnet ) ;
412+ match notifier. synchronize_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
413+ Err ( ( _, tip) ) => assert_eq ! ( tip, None ) ,
414+ Ok ( _) => panic ! ( "Expected error" ) ,
415+ }
416+ }
417+
418+ #[ tokio:: test]
419+ async fn sync_from_chain_without_any_new_blocks ( ) {
420+ let mut chain = Blockchain :: default ( ) . with_height ( 3 ) . without_blocks ( 2 ..) ;
421+
422+ let new_tip = chain. tip ( ) ;
423+ let old_tip = chain. at_height ( 1 ) ;
424+ let mut listener = MockChainListener :: new ( ) ;
425+ let mut notifier = ChainNotifier { header_cache : chain. header_cache ( 0 ..=3 ) } ;
426+ let mut poller = poll:: ChainPoller :: new ( & mut chain, Network :: Testnet ) ;
427+ match notifier. synchronize_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
428+ Err ( ( _, tip) ) => assert_eq ! ( tip, Some ( old_tip) ) ,
429+ Ok ( _) => panic ! ( "Expected error" ) ,
430+ }
431+ }
432+
433+ #[ tokio:: test]
434+ async fn sync_from_chain_without_some_new_blocks ( ) {
435+ let mut chain = Blockchain :: default ( ) . with_height ( 3 ) . without_blocks ( 3 ..) ;
436+
437+ let new_tip = chain. tip ( ) ;
438+ let old_tip = chain. at_height ( 1 ) ;
439+ let mut listener = MockChainListener :: new ( )
440+ . expect_block_connected ( * chain. at_height ( 2 ) ) ;
441+ let mut notifier = ChainNotifier { header_cache : chain. header_cache ( 0 ..=3 ) } ;
442+ let mut poller = poll:: ChainPoller :: new ( & mut chain, Network :: Testnet ) ;
443+ match notifier. synchronize_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
444+ Err ( ( _, tip) ) => assert_eq ! ( tip, Some ( chain. at_height( 2 ) ) ) ,
445+ Ok ( _) => panic ! ( "Expected error" ) ,
446+ }
447+ }
448+ }
0 commit comments