@@ -31,6 +31,8 @@ mod test_utils;
3131#[ cfg( any( feature = "rest-client" , feature = "rpc-client" ) ) ]
3232mod utils;
3333
34+ use crate :: poll:: { Poll , ValidatedBlockHeader } ;
35+
3436use bitcoin:: blockdata:: block:: { Block , BlockHeader } ;
3537use bitcoin:: hash_types:: BlockHash ;
3638use bitcoin:: util:: uint:: Uint256 ;
@@ -130,3 +132,319 @@ pub struct BlockHeaderData {
130132 /// of equivalent weight.
131133 pub chainwork : Uint256 ,
132134}
135+
136+ /// Adaptor used for notifying when blocks have been connected or disconnected from the chain.
137+ ///
138+ /// Used when needing to replay chain data upon startup or as new chain events occur.
139+ pub trait ChainListener {
140+ /// Notifies the listener that a block was added at the given height.
141+ fn block_connected ( & mut self , block : & Block , height : u32 ) ;
142+
143+ /// Notifies the listener that a block was removed at the given height.
144+ fn block_disconnected ( & mut self , header : & BlockHeader , height : u32 ) ;
145+ }
146+
147+ /// The `Cache` trait defines behavior for managing a block header cache, where block headers are
148+ /// keyed by block hash.
149+ ///
150+ /// Used by [`ChainNotifier`] to store headers along the best chain, which is important for ensuring
151+ /// that blocks can be disconnected if they are no longer accessible from a block source (e.g., if
152+ /// the block source does not store stale forks indefinitely).
153+ ///
154+ /// Implementations may define how long to retain headers such that it's unlikely they will ever be
155+ /// needed to disconnect a block. In cases where block sources provide access to headers on stale
156+ /// forks reliably, caches may be entirely unnecessary.
157+ ///
158+ /// [`ChainNotifier`]: struct.ChainNotifier.html
159+ pub trait Cache {
160+ /// Retrieves the block header keyed by the given block hash.
161+ fn look_up ( & self , block_hash : & BlockHash ) -> Option < & ValidatedBlockHeader > ;
162+
163+ /// Called when a block has been connected to the best chain to ensure it is available to be
164+ /// disconnected later if needed.
165+ fn block_connected ( & mut self , block_hash : BlockHash , block_header : ValidatedBlockHeader ) ;
166+
167+ /// Called when a block has been disconnected from the best chain. Once disconnected, a block's
168+ /// header is no longer needed and thus can be removed.
169+ fn block_disconnected ( & mut self , block_hash : & BlockHash ) -> Option < ValidatedBlockHeader > ;
170+ }
171+
172+ /// Unbounded cache of block headers keyed by block hash.
173+ pub type UnboundedCache = std:: collections:: HashMap < BlockHash , ValidatedBlockHeader > ;
174+
175+ impl Cache for UnboundedCache {
176+ fn look_up ( & self , block_hash : & BlockHash ) -> Option < & ValidatedBlockHeader > {
177+ self . get ( block_hash)
178+ }
179+
180+ fn block_connected ( & mut self , block_hash : BlockHash , block_header : ValidatedBlockHeader ) {
181+ self . insert ( block_hash, block_header) ;
182+ }
183+
184+ fn block_disconnected ( & mut self , block_hash : & BlockHash ) -> Option < ValidatedBlockHeader > {
185+ self . remove ( block_hash)
186+ }
187+ }
188+
189+ /// Notifies [listeners] of blocks that have been connected or disconnected from the chain.
190+ ///
191+ /// [listeners]: trait.ChainListener.html
192+ struct ChainNotifier < C : Cache > {
193+ /// Cache for looking up headers before fetching from a block source.
194+ header_cache : C ,
195+ }
196+
197+ /// Changes made to the chain between subsequent polls that transformed it from having one chain tip
198+ /// to another.
199+ ///
200+ /// Blocks are given in height-descending order. Therefore, blocks are first disconnected in order
201+ /// before new blocks are connected in reverse order.
202+ struct ChainDifference {
203+ /// Blocks that were disconnected from the chain since the last poll.
204+ disconnected_blocks : Vec < ValidatedBlockHeader > ,
205+
206+ /// Blocks that were connected to the chain since the last poll.
207+ connected_blocks : Vec < ValidatedBlockHeader > ,
208+ }
209+
210+ impl < C : Cache > ChainNotifier < C > {
211+ /// Finds the fork point between `new_header` and `old_header`, disconnecting blocks from
212+ /// `old_header` to get to that point and then connecting blocks until `new_header`.
213+ ///
214+ /// Validates headers along the transition path, but doesn't fetch blocks until the chain is
215+ /// disconnected to the fork point. Thus, this may return an `Err` that includes where the tip
216+ /// ended up which may not be `new_header`. Note that iff the returned `Err` contains `Some`
217+ /// header then the transition from `old_header` to `new_header` is valid.
218+ async fn sync_listener < L : ChainListener , P : Poll > (
219+ & mut self ,
220+ new_header : ValidatedBlockHeader ,
221+ old_header : & ValidatedBlockHeader ,
222+ chain_poller : & mut P ,
223+ chain_listener : & mut L ,
224+ ) -> Result < ( ) , ( BlockSourceError , Option < ValidatedBlockHeader > ) > {
225+ let mut difference = self . find_difference ( new_header, old_header, chain_poller) . await
226+ . map_err ( |e| ( e, None ) ) ?;
227+
228+ let mut new_tip = * old_header;
229+ for header in difference. disconnected_blocks . drain ( ..) {
230+ println ! ( "Disconnecting block {}" , header. block_hash) ;
231+ if let Some ( cached_header) = self . header_cache . block_disconnected ( & header. block_hash ) {
232+ assert_eq ! ( cached_header, header) ;
233+ }
234+ chain_listener. block_disconnected ( & header. header , header. height ) ;
235+ new_tip = header;
236+ }
237+
238+ for header in difference. connected_blocks . drain ( ..) . rev ( ) {
239+ let block = chain_poller
240+ . fetch_block ( & header) . await
241+ . or_else ( |e| Err ( ( e, Some ( new_tip) ) ) ) ?;
242+ debug_assert_eq ! ( block. block_hash, header. block_hash) ;
243+
244+ println ! ( "Connecting block {}" , header. block_hash) ;
245+ self . header_cache . block_connected ( header. block_hash , header) ;
246+ chain_listener. block_connected ( & block, header. height ) ;
247+ new_tip = header;
248+ }
249+
250+ Ok ( ( ) )
251+ }
252+
253+ /// Returns the changes needed to produce the chain with `current_header` as its tip from the
254+ /// chain with `prev_header` as its tip.
255+ ///
256+ /// Walks backwards from `current_header` and `prev_header`, finding the common ancestor.
257+ async fn find_difference < P : Poll > (
258+ & self ,
259+ current_header : ValidatedBlockHeader ,
260+ prev_header : & ValidatedBlockHeader ,
261+ chain_poller : & mut P ,
262+ ) -> BlockSourceResult < ChainDifference > {
263+ let mut disconnected_blocks = Vec :: new ( ) ;
264+ let mut connected_blocks = Vec :: new ( ) ;
265+ let mut current = current_header;
266+ let mut previous = * prev_header;
267+ loop {
268+ // Found the common ancestor.
269+ if current. block_hash == previous. block_hash {
270+ break ;
271+ }
272+
273+ // Walk back the chain, finding blocks needed to connect and disconnect. Only walk back
274+ // the header with the greater height, or both if equal heights.
275+ let current_height = current. height ;
276+ let previous_height = previous. height ;
277+ if current_height <= previous_height {
278+ disconnected_blocks. push ( previous) ;
279+ previous = self . look_up_previous_header ( chain_poller, & previous) . await ?;
280+ }
281+ if current_height >= previous_height {
282+ connected_blocks. push ( current) ;
283+ current = self . look_up_previous_header ( chain_poller, & current) . await ?;
284+ }
285+ }
286+
287+ Ok ( ChainDifference { disconnected_blocks, connected_blocks } )
288+ }
289+
290+ /// Returns the previous header for the given header, either by looking it up in the cache or
291+ /// fetching it if not found.
292+ async fn look_up_previous_header < P : Poll > (
293+ & self ,
294+ chain_poller : & mut P ,
295+ header : & ValidatedBlockHeader ,
296+ ) -> BlockSourceResult < ValidatedBlockHeader > {
297+ match self . header_cache . look_up ( & header. header . prev_blockhash ) {
298+ Some ( prev_header) => Ok ( * prev_header) ,
299+ None => chain_poller. look_up_previous_header ( header) . await ,
300+ }
301+ }
302+ }
303+
304+ #[ cfg( test) ]
305+ mod chain_notifier_tests {
306+ use crate :: test_utils:: { Blockchain , MockChainListener } ;
307+ use super :: * ;
308+
309+ use bitcoin:: network:: constants:: Network ;
310+
311+ #[ tokio:: test]
312+ async fn sync_from_same_chain ( ) {
313+ let mut chain = Blockchain :: default ( ) . with_height ( 3 ) ;
314+
315+ let new_tip = chain. tip ( ) ;
316+ let old_tip = chain. at_height ( 1 ) ;
317+ let mut listener = MockChainListener :: new ( )
318+ . expect_block_connected ( * chain. at_height ( 2 ) )
319+ . expect_block_connected ( * new_tip) ;
320+ let mut notifier = ChainNotifier { header_cache : chain. header_cache ( 0 ..=1 ) } ;
321+ let mut poller = poll:: ChainPoller :: new ( & mut chain, Network :: Testnet ) ;
322+ match notifier. sync_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
323+ Err ( ( e, _) ) => panic ! ( "Unexpected error: {:?}" , e) ,
324+ Ok ( _) => { } ,
325+ }
326+ }
327+
328+ #[ tokio:: test]
329+ async fn sync_from_different_chains ( ) {
330+ let mut test_chain = Blockchain :: with_network ( Network :: Testnet ) . with_height ( 1 ) ;
331+ let main_chain = Blockchain :: with_network ( Network :: Bitcoin ) . with_height ( 1 ) ;
332+
333+ let new_tip = test_chain. tip ( ) ;
334+ let old_tip = main_chain. tip ( ) ;
335+ let mut listener = MockChainListener :: new ( ) ;
336+ let mut notifier = ChainNotifier { header_cache : main_chain. header_cache ( 0 ..=1 ) } ;
337+ let mut poller = poll:: ChainPoller :: new ( & mut test_chain, Network :: Testnet ) ;
338+ match notifier. sync_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
339+ Err ( ( e, _) ) => {
340+ assert_eq ! ( e. kind( ) , BlockSourceErrorKind :: Persistent ) ;
341+ assert_eq ! ( e. into_inner( ) . as_ref( ) . to_string( ) , "genesis block reached" ) ;
342+ } ,
343+ Ok ( _) => panic ! ( "Expected error" ) ,
344+ }
345+ }
346+
347+ #[ tokio:: test]
348+ async fn sync_from_equal_length_fork ( ) {
349+ let main_chain = Blockchain :: default ( ) . with_height ( 2 ) ;
350+ let mut fork_chain = main_chain. fork_at_height ( 1 ) ;
351+
352+ let new_tip = fork_chain. tip ( ) ;
353+ let old_tip = main_chain. tip ( ) ;
354+ let mut listener = MockChainListener :: new ( )
355+ . expect_block_disconnected ( * old_tip)
356+ . expect_block_connected ( * new_tip) ;
357+ let mut notifier = ChainNotifier { header_cache : main_chain. header_cache ( 0 ..=2 ) } ;
358+ let mut poller = poll:: ChainPoller :: new ( & mut fork_chain, Network :: Testnet ) ;
359+ match notifier. sync_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
360+ Err ( ( e, _) ) => panic ! ( "Unexpected error: {:?}" , e) ,
361+ Ok ( _) => { } ,
362+ }
363+ }
364+
365+ #[ tokio:: test]
366+ async fn sync_from_shorter_fork ( ) {
367+ let main_chain = Blockchain :: default ( ) . with_height ( 3 ) ;
368+ let mut fork_chain = main_chain. fork_at_height ( 1 ) ;
369+ fork_chain. disconnect_tip ( ) ;
370+
371+ let new_tip = fork_chain. tip ( ) ;
372+ let old_tip = main_chain. tip ( ) ;
373+ let mut listener = MockChainListener :: new ( )
374+ . expect_block_disconnected ( * old_tip)
375+ . expect_block_disconnected ( * main_chain. at_height ( 2 ) )
376+ . expect_block_connected ( * new_tip) ;
377+ let mut notifier = ChainNotifier { header_cache : main_chain. header_cache ( 0 ..=3 ) } ;
378+ let mut poller = poll:: ChainPoller :: new ( & mut fork_chain, Network :: Testnet ) ;
379+ match notifier. sync_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
380+ Err ( ( e, _) ) => panic ! ( "Unexpected error: {:?}" , e) ,
381+ Ok ( _) => { } ,
382+ }
383+ }
384+
385+ #[ tokio:: test]
386+ async fn sync_from_longer_fork ( ) {
387+ let mut main_chain = Blockchain :: default ( ) . with_height ( 3 ) ;
388+ let mut fork_chain = main_chain. fork_at_height ( 1 ) ;
389+ main_chain. disconnect_tip ( ) ;
390+
391+ let new_tip = fork_chain. tip ( ) ;
392+ let old_tip = main_chain. tip ( ) ;
393+ let mut listener = MockChainListener :: new ( )
394+ . expect_block_disconnected ( * old_tip)
395+ . expect_block_connected ( * fork_chain. at_height ( 2 ) )
396+ . expect_block_connected ( * new_tip) ;
397+ let mut notifier = ChainNotifier { header_cache : main_chain. header_cache ( 0 ..=2 ) } ;
398+ let mut poller = poll:: ChainPoller :: new ( & mut fork_chain, Network :: Testnet ) ;
399+ match notifier. sync_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
400+ Err ( ( e, _) ) => panic ! ( "Unexpected error: {:?}" , e) ,
401+ Ok ( _) => { } ,
402+ }
403+ }
404+
405+ #[ tokio:: test]
406+ async fn sync_from_chain_without_headers ( ) {
407+ let mut chain = Blockchain :: default ( ) . with_height ( 3 ) . without_headers ( ) ;
408+
409+ let new_tip = chain. tip ( ) ;
410+ let old_tip = chain. at_height ( 1 ) ;
411+ let mut listener = MockChainListener :: new ( ) ;
412+ let mut notifier = ChainNotifier { header_cache : chain. header_cache ( 0 ..=1 ) } ;
413+ let mut poller = poll:: ChainPoller :: new ( & mut chain, Network :: Testnet ) ;
414+ match notifier. sync_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
415+ Err ( ( _, tip) ) => assert_eq ! ( tip, None ) ,
416+ Ok ( _) => panic ! ( "Expected error" ) ,
417+ }
418+ }
419+
420+ #[ tokio:: test]
421+ async fn sync_from_chain_without_any_new_blocks ( ) {
422+ let mut chain = Blockchain :: default ( ) . with_height ( 3 ) . without_blocks ( 2 ..) ;
423+
424+ let new_tip = chain. tip ( ) ;
425+ let old_tip = chain. at_height ( 1 ) ;
426+ let mut listener = MockChainListener :: new ( ) ;
427+ let mut notifier = ChainNotifier { header_cache : chain. header_cache ( 0 ..=3 ) } ;
428+ let mut poller = poll:: ChainPoller :: new ( & mut chain, Network :: Testnet ) ;
429+ match notifier. sync_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
430+ Err ( ( _, tip) ) => assert_eq ! ( tip, Some ( old_tip) ) ,
431+ Ok ( _) => panic ! ( "Expected error" ) ,
432+ }
433+ }
434+
435+ #[ tokio:: test]
436+ async fn sync_from_chain_without_some_new_blocks ( ) {
437+ let mut chain = Blockchain :: default ( ) . with_height ( 3 ) . without_blocks ( 3 ..) ;
438+
439+ let new_tip = chain. tip ( ) ;
440+ let old_tip = chain. at_height ( 1 ) ;
441+ let mut listener = MockChainListener :: new ( )
442+ . expect_block_connected ( * chain. at_height ( 2 ) ) ;
443+ let mut notifier = ChainNotifier { header_cache : chain. header_cache ( 0 ..=3 ) } ;
444+ let mut poller = poll:: ChainPoller :: new ( & mut chain, Network :: Testnet ) ;
445+ match notifier. sync_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
446+ Err ( ( _, tip) ) => assert_eq ! ( tip, Some ( chain. at_height( 2 ) ) ) ,
447+ Ok ( _) => panic ! ( "Expected error" ) ,
448+ }
449+ }
450+ }
0 commit comments