@@ -31,6 +31,8 @@ mod test_utils;
3131#[ cfg( any( feature = "rest-client" , feature = "rpc-client" ) ) ]
3232mod utils;
3333
34+ use crate :: poll:: { Poll , ValidatedBlockHeader } ;
35+
3436use bitcoin:: blockdata:: block:: { Block , BlockHeader } ;
3537use bitcoin:: hash_types:: BlockHash ;
3638use bitcoin:: util:: uint:: Uint256 ;
@@ -130,3 +132,336 @@ pub struct BlockHeaderData {
130132 /// of equivalent weight.
131133 pub chainwork : Uint256 ,
132134}
135+
136+ /// Adaptor used for notifying when blocks have been connected or disconnected from the chain.
137+ ///
138+ /// Used when needing to replay chain data upon startup or as new chain events occur.
139+ pub trait ChainListener {
140+ /// Notifies the listener that a block was added at the given height.
141+ fn block_connected ( & mut self , block : & Block , height : u32 ) ;
142+
143+ /// Notifies the listener that a block was removed at the given height.
144+ fn block_disconnected ( & mut self , header : & BlockHeader , height : u32 ) ;
145+ }
146+
147+ /// The `Cache` trait defines behavior for managing a block header cache, where block headers are
148+ /// keyed by block hash.
149+ ///
150+ /// Used by [`ChainNotifier`] to store headers along the best chain. Implementations may define
151+ /// their own cache eviction policy.
152+ ///
153+ /// [`ChainNotifier`]: struct.ChainNotifier.html
154+ pub trait Cache {
155+ /// Retrieves the block header keyed by the given block hash.
156+ fn get ( & self , block_hash : & BlockHash ) -> Option < & ValidatedBlockHeader > ;
157+
158+ /// Inserts a block header keyed by the given block hash.
159+ fn insert ( & mut self , block_hash : BlockHash , block_header : ValidatedBlockHeader ) ;
160+
161+ /// Removes the block header keyed by the given block hash.
162+ fn remove ( & mut self , block_hash : & BlockHash ) -> Option < ValidatedBlockHeader > ;
163+ }
164+
165+ /// Unbounded cache of block headers keyed by block hash.
166+ pub type UnboundedCache = std:: collections:: HashMap < BlockHash , ValidatedBlockHeader > ;
167+
168+ impl Cache for UnboundedCache {
169+ fn get ( & self , block_hash : & BlockHash ) -> Option < & ValidatedBlockHeader > {
170+ self . get ( block_hash)
171+ }
172+
173+ fn insert ( & mut self , block_hash : BlockHash , block_header : ValidatedBlockHeader ) {
174+ self . insert ( block_hash, block_header) ;
175+ }
176+
177+ fn remove ( & mut self , block_hash : & BlockHash ) -> Option < ValidatedBlockHeader > {
178+ self . remove ( block_hash)
179+ }
180+ }
181+
182+ /// Notifies [listeners] of blocks that have been connected or disconnected from the chain.
183+ ///
184+ /// [listeners]: trait.ChainListener.html
185+ struct ChainNotifier < C : Cache > {
186+ /// Cache for looking up headers before fetching from a block source.
187+ header_cache : C ,
188+ }
189+
190+ /// Steps outlining changes needed to be made to the chain in order to transform it from having one
191+ /// chain tip to another.
192+ enum ForkStep {
193+ ForkPoint ( ValidatedBlockHeader ) ,
194+ DisconnectBlock ( ValidatedBlockHeader ) ,
195+ ConnectBlock ( ValidatedBlockHeader ) ,
196+ }
197+
198+ impl < C : Cache > ChainNotifier < C > {
199+ /// Finds the fork point between `new_header` and `old_header`, disconnecting blocks from
200+ /// `old_header` to get to that point and then connecting blocks until `new_header`.
201+ ///
202+ /// Validates headers along the transition path, but doesn't fetch blocks until the chain is
203+ /// disconnected to the fork point. Thus, this may return an `Err` that includes where the tip
204+ /// ended up which may not be `new_header`. Note that iff the returned `Err` contains `Some`
205+ /// header then the transition from `old_header` to `new_header` is valid.
206+ async fn sync_listener < L : ChainListener , P : Poll > (
207+ & mut self ,
208+ new_header : ValidatedBlockHeader ,
209+ old_header : & ValidatedBlockHeader ,
210+ chain_poller : & mut P ,
211+ chain_listener : & mut L ,
212+ ) -> Result < ( ) , ( BlockSourceError , Option < ValidatedBlockHeader > ) > {
213+ let mut events = self . find_fork ( new_header, old_header, chain_poller) . await . map_err ( |e| ( e, None ) ) ?;
214+
215+ let mut last_disconnect_tip = None ;
216+ let mut new_tip = None ;
217+ for event in events. iter ( ) {
218+ match & event {
219+ & ForkStep :: DisconnectBlock ( ref header) => {
220+ println ! ( "Disconnecting block {}" , header. block_hash) ;
221+ if let Some ( cached_header) = self . header_cache . remove ( & header. block_hash ) {
222+ assert_eq ! ( cached_header, * header) ;
223+ }
224+ chain_listener. block_disconnected ( & header. header , header. height ) ;
225+ last_disconnect_tip = Some ( header. header . prev_blockhash ) ;
226+ } ,
227+ & ForkStep :: ForkPoint ( ref header) => {
228+ new_tip = Some ( * header) ;
229+ } ,
230+ _ => { } ,
231+ }
232+ }
233+
234+ // If blocks were disconnected, new blocks will connect starting from the fork point.
235+ // Otherwise, there was no fork, so new blocks connect starting from the old tip.
236+ assert_eq ! ( last_disconnect_tip. is_some( ) , new_tip. is_some( ) ) ;
237+ if let & Some ( ref tip_header) = & new_tip {
238+ debug_assert_eq ! ( tip_header. header. block_hash( ) , * last_disconnect_tip. as_ref( ) . unwrap( ) ) ;
239+ } else {
240+ new_tip = Some ( * old_header) ;
241+ }
242+
243+ for event in events. drain ( ..) . rev ( ) {
244+ if let ForkStep :: ConnectBlock ( header) = event {
245+ let block = chain_poller
246+ . fetch_block ( & header) . await
247+ . or_else ( |e| Err ( ( e, new_tip) ) ) ?;
248+ debug_assert_eq ! ( block. block_hash, header. block_hash) ;
249+
250+ println ! ( "Connecting block {}" , header. block_hash) ;
251+ self . header_cache . insert ( header. block_hash , header) ;
252+ chain_listener. block_connected ( & block, header. height ) ;
253+ new_tip = Some ( header) ;
254+ }
255+ }
256+ Ok ( ( ) )
257+ }
258+
259+ /// Walks backwards from `current_header` and `prev_header`, finding the common ancestor.
260+ /// Returns the steps needed to produce the chain with `current_header` as its tip from the
261+ /// chain with `prev_header` as its tip. There is no ordering guarantee between different
262+ /// `ForkStep` types, but `DisconnectBlock` and `ConnectBlock` are each returned in
263+ /// height-descending order.
264+ async fn find_fork < P : Poll > (
265+ & self ,
266+ current_header : ValidatedBlockHeader ,
267+ prev_header : & ValidatedBlockHeader ,
268+ chain_poller : & mut P ,
269+ ) -> BlockSourceResult < Vec < ForkStep > > {
270+ let mut steps = Vec :: new ( ) ;
271+ let mut current = current_header;
272+ let mut previous = * prev_header;
273+ loop {
274+ // Found the parent block.
275+ if current. height == previous. height + 1 &&
276+ current. header . prev_blockhash == previous. block_hash {
277+ steps. push ( ForkStep :: ConnectBlock ( current) ) ;
278+ break ;
279+ }
280+
281+ // Found a chain fork.
282+ if current. header . prev_blockhash == previous. header . prev_blockhash {
283+ let fork_point = self . look_up_previous_header ( chain_poller, & previous) . await ?;
284+ steps. push ( ForkStep :: DisconnectBlock ( previous) ) ;
285+ steps. push ( ForkStep :: ConnectBlock ( current) ) ;
286+ steps. push ( ForkStep :: ForkPoint ( fork_point) ) ;
287+ break ;
288+ }
289+
290+ // Walk back the chain, finding blocks needed to connect and disconnect. Only walk back
291+ // the header with the greater height, or both if equal heights.
292+ let current_height = current. height ;
293+ let previous_height = previous. height ;
294+ if current_height <= previous_height {
295+ steps. push ( ForkStep :: DisconnectBlock ( previous) ) ;
296+ previous = self . look_up_previous_header ( chain_poller, & previous) . await ?;
297+ }
298+ if current_height >= previous_height {
299+ steps. push ( ForkStep :: ConnectBlock ( current) ) ;
300+ current = self . look_up_previous_header ( chain_poller, & current) . await ?;
301+ }
302+ }
303+
304+ Ok ( steps)
305+ }
306+
307+ /// Returns the previous header for the given header, either by looking it up in the cache or
308+ /// fetching it if not found.
309+ async fn look_up_previous_header < P : Poll > (
310+ & self ,
311+ chain_poller : & mut P ,
312+ header : & ValidatedBlockHeader ,
313+ ) -> BlockSourceResult < ValidatedBlockHeader > {
314+ match self . header_cache . get ( & header. header . prev_blockhash ) {
315+ Some ( prev_header) => Ok ( * prev_header) ,
316+ None => chain_poller. look_up_previous_header ( header) . await ,
317+ }
318+ }
319+ }
320+
321+ #[ cfg( test) ]
322+ mod chain_notifier_tests {
323+ use crate :: test_utils:: { Blockchain , MockChainListener } ;
324+ use super :: * ;
325+
326+ use bitcoin:: network:: constants:: Network ;
327+
328+ #[ tokio:: test]
329+ async fn sync_from_same_chain ( ) {
330+ let mut chain = Blockchain :: default ( ) . with_height ( 3 ) ;
331+
332+ let new_tip = chain. tip ( ) ;
333+ let old_tip = chain. at_height ( 1 ) ;
334+ let mut listener = MockChainListener :: new ( )
335+ . expect_block_connected ( * chain. at_height ( 2 ) )
336+ . expect_block_connected ( * new_tip) ;
337+ let mut notifier = ChainNotifier { header_cache : chain. header_cache ( 0 ..=1 ) } ;
338+ let mut poller = poll:: ChainPoller :: new ( & mut chain as & mut dyn BlockSource , Network :: Testnet ) ;
339+ match notifier. sync_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
340+ Err ( ( e, _) ) => panic ! ( "Unexpected error: {:?}" , e) ,
341+ Ok ( _) => { } ,
342+ }
343+ }
344+
345+ #[ tokio:: test]
346+ async fn sync_from_different_chains ( ) {
347+ let mut test_chain = Blockchain :: with_network ( Network :: Testnet ) . with_height ( 1 ) ;
348+ let main_chain = Blockchain :: with_network ( Network :: Bitcoin ) . with_height ( 1 ) ;
349+
350+ let new_tip = test_chain. tip ( ) ;
351+ let old_tip = main_chain. tip ( ) ;
352+ let mut listener = MockChainListener :: new ( ) ;
353+ let mut notifier = ChainNotifier { header_cache : main_chain. header_cache ( 0 ..=1 ) } ;
354+ let mut poller = poll:: ChainPoller :: new ( & mut test_chain as & mut dyn BlockSource , Network :: Testnet ) ;
355+ match notifier. sync_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
356+ Err ( ( e, _) ) => {
357+ assert_eq ! ( e. kind( ) , BlockSourceErrorKind :: Persistent ) ;
358+ assert_eq ! ( e. into_inner( ) . as_ref( ) . to_string( ) , "genesis block reached" ) ;
359+ } ,
360+ Ok ( _) => panic ! ( "Expected error" ) ,
361+ }
362+ }
363+
364+ #[ tokio:: test]
365+ async fn sync_from_equal_length_fork ( ) {
366+ let main_chain = Blockchain :: default ( ) . with_height ( 2 ) ;
367+ let mut fork_chain = main_chain. fork_at_height ( 1 ) ;
368+
369+ let new_tip = fork_chain. tip ( ) ;
370+ let old_tip = main_chain. tip ( ) ;
371+ let mut listener = MockChainListener :: new ( )
372+ . expect_block_disconnected ( * old_tip)
373+ . expect_block_connected ( * new_tip) ;
374+ let mut notifier = ChainNotifier { header_cache : main_chain. header_cache ( 0 ..=2 ) } ;
375+ let mut poller = poll:: ChainPoller :: new ( & mut fork_chain as & mut dyn BlockSource , Network :: Testnet ) ;
376+ match notifier. sync_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
377+ Err ( ( e, _) ) => panic ! ( "Unexpected error: {:?}" , e) ,
378+ Ok ( _) => { } ,
379+ }
380+ }
381+
382+ #[ tokio:: test]
383+ async fn sync_from_shorter_fork ( ) {
384+ let main_chain = Blockchain :: default ( ) . with_height ( 3 ) ;
385+ let mut fork_chain = main_chain. fork_at_height ( 1 ) ;
386+ fork_chain. disconnect_tip ( ) ;
387+
388+ let new_tip = fork_chain. tip ( ) ;
389+ let old_tip = main_chain. tip ( ) ;
390+ let mut listener = MockChainListener :: new ( )
391+ . expect_block_disconnected ( * old_tip)
392+ . expect_block_disconnected ( * main_chain. at_height ( 2 ) )
393+ . expect_block_connected ( * new_tip) ;
394+ let mut notifier = ChainNotifier { header_cache : main_chain. header_cache ( 0 ..=3 ) } ;
395+ let mut poller = poll:: ChainPoller :: new ( & mut fork_chain as & mut dyn BlockSource , Network :: Testnet ) ;
396+ match notifier. sync_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
397+ Err ( ( e, _) ) => panic ! ( "Unexpected error: {:?}" , e) ,
398+ Ok ( _) => { } ,
399+ }
400+ }
401+
402+ #[ tokio:: test]
403+ async fn sync_from_longer_fork ( ) {
404+ let mut main_chain = Blockchain :: default ( ) . with_height ( 3 ) ;
405+ let mut fork_chain = main_chain. fork_at_height ( 1 ) ;
406+ main_chain. disconnect_tip ( ) ;
407+
408+ let new_tip = fork_chain. tip ( ) ;
409+ let old_tip = main_chain. tip ( ) ;
410+ let mut listener = MockChainListener :: new ( )
411+ . expect_block_disconnected ( * old_tip)
412+ . expect_block_connected ( * fork_chain. at_height ( 2 ) )
413+ . expect_block_connected ( * new_tip) ;
414+ let mut notifier = ChainNotifier { header_cache : main_chain. header_cache ( 0 ..=2 ) } ;
415+ let mut poller = poll:: ChainPoller :: new ( & mut fork_chain as & mut dyn BlockSource , Network :: Testnet ) ;
416+ match notifier. sync_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
417+ Err ( ( e, _) ) => panic ! ( "Unexpected error: {:?}" , e) ,
418+ Ok ( _) => { } ,
419+ }
420+ }
421+
422+ #[ tokio:: test]
423+ async fn sync_from_chain_without_headers ( ) {
424+ let mut chain = Blockchain :: default ( ) . with_height ( 3 ) . without_headers ( ) ;
425+
426+ let new_tip = chain. tip ( ) ;
427+ let old_tip = chain. at_height ( 1 ) ;
428+ let mut listener = MockChainListener :: new ( ) ;
429+ let mut notifier = ChainNotifier { header_cache : chain. header_cache ( 0 ..=1 ) } ;
430+ let mut poller = poll:: ChainPoller :: new ( & mut chain as & mut dyn BlockSource , Network :: Testnet ) ;
431+ match notifier. sync_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
432+ Err ( ( _, tip) ) => assert_eq ! ( tip, None ) ,
433+ Ok ( _) => panic ! ( "Expected error" ) ,
434+ }
435+ }
436+
437+ #[ tokio:: test]
438+ async fn sync_from_chain_without_any_new_blocks ( ) {
439+ let mut chain = Blockchain :: default ( ) . with_height ( 3 ) . without_blocks ( 2 ..) ;
440+
441+ let new_tip = chain. tip ( ) ;
442+ let old_tip = chain. at_height ( 1 ) ;
443+ let mut listener = MockChainListener :: new ( ) ;
444+ let mut notifier = ChainNotifier { header_cache : chain. header_cache ( 0 ..=3 ) } ;
445+ let mut poller = poll:: ChainPoller :: new ( & mut chain as & mut dyn BlockSource , Network :: Testnet ) ;
446+ match notifier. sync_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
447+ Err ( ( _, tip) ) => assert_eq ! ( tip, Some ( old_tip) ) ,
448+ Ok ( _) => panic ! ( "Expected error" ) ,
449+ }
450+ }
451+
452+ #[ tokio:: test]
453+ async fn sync_from_chain_without_some_new_blocks ( ) {
454+ let mut chain = Blockchain :: default ( ) . with_height ( 3 ) . without_blocks ( 3 ..) ;
455+
456+ let new_tip = chain. tip ( ) ;
457+ let old_tip = chain. at_height ( 1 ) ;
458+ let mut listener = MockChainListener :: new ( )
459+ . expect_block_connected ( * chain. at_height ( 2 ) ) ;
460+ let mut notifier = ChainNotifier { header_cache : chain. header_cache ( 0 ..=3 ) } ;
461+ let mut poller = poll:: ChainPoller :: new ( & mut chain as & mut dyn BlockSource , Network :: Testnet ) ;
462+ match notifier. sync_listener ( new_tip, & old_tip, & mut poller, & mut listener) . await {
463+ Err ( ( _, tip) ) => assert_eq ! ( tip, Some ( chain. at_height( 2 ) ) ) ,
464+ Ok ( _) => panic ! ( "Expected error" ) ,
465+ }
466+ }
467+ }
0 commit comments