@@ -503,3 +503,313 @@ impl PendingChecks {
503503 }
504504 }
505505}
506+
507+ #[ cfg( test) ]
508+ mod tests {
509+ use super :: * ;
510+ use crate :: routing:: gossip:: tests:: * ;
511+ use crate :: util:: test_utils:: { TestChainSource , TestLogger } ;
512+ use crate :: ln:: msgs;
513+
514+ use bitcoin:: blockdata:: constants:: genesis_block;
515+ use bitcoin:: secp256k1:: { Secp256k1 , SecretKey } ;
516+
517+ use core:: sync:: atomic:: Ordering ;
518+
519+ fn get_network ( ) -> ( TestChainSource , NetworkGraph < Box < TestLogger > > ) {
520+ let logger = Box :: new ( TestLogger :: new ( ) ) ;
521+ let genesis_hash = genesis_block ( bitcoin:: Network :: Testnet ) . header . block_hash ( ) ;
522+ let chain_source = TestChainSource :: new ( bitcoin:: Network :: Testnet ) ;
523+ let network_graph = NetworkGraph :: new ( genesis_hash, logger) ;
524+
525+ ( chain_source, network_graph)
526+ }
527+
528+ fn get_test_objects ( ) -> ( msgs:: ChannelAnnouncement , TestChainSource ,
529+ NetworkGraph < Box < TestLogger > > , bitcoin:: Script , msgs:: NodeAnnouncement ,
530+ msgs:: NodeAnnouncement , msgs:: ChannelUpdate , msgs:: ChannelUpdate , msgs:: ChannelUpdate )
531+ {
532+ let secp_ctx = Secp256k1 :: new ( ) ;
533+
534+ let ( chain_source, network_graph) = get_network ( ) ;
535+
536+ let good_script = get_channel_script ( & secp_ctx) ;
537+ let node_1_privkey = & SecretKey :: from_slice ( & [ 42 ; 32 ] ) . unwrap ( ) ;
538+ let node_2_privkey = & SecretKey :: from_slice ( & [ 41 ; 32 ] ) . unwrap ( ) ;
539+ let valid_announcement = get_signed_channel_announcement ( |_| { } , node_1_privkey, node_2_privkey, & secp_ctx) ;
540+
541+ let node_a_announce = get_signed_node_announcement ( |_| { } , node_1_privkey, & secp_ctx) ;
542+ let node_b_announce = get_signed_node_announcement ( |_| { } , node_2_privkey, & secp_ctx) ;
543+
544+ // Note that we have to set the "direction" flag correctly on both messages
545+ let chan_update_a = get_signed_channel_update ( |msg| msg. flags = 0 , node_1_privkey, & secp_ctx) ;
546+ let chan_update_b = get_signed_channel_update ( |msg| msg. flags = 1 , node_2_privkey, & secp_ctx) ;
547+ let chan_update_c = get_signed_channel_update ( |msg| {
548+ msg. flags = 1 ; msg. timestamp += 1 ; } , node_2_privkey, & secp_ctx) ;
549+
550+ ( valid_announcement, chain_source, network_graph, good_script, node_a_announce,
551+ node_b_announce, chan_update_a, chan_update_b, chan_update_c)
552+ }
553+
554+ #[ test]
555+ fn test_fast_async_lookup ( ) {
556+ // Check that async lookups which resolve quicker than the future is returned to the
557+ // `get_utxo` call can read it still resolve properly.
558+ let ( valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects ( ) ;
559+
560+ let future = AccessFuture :: new ( ) ;
561+ future. resolve_without_forwarding ( & network_graph,
562+ Ok ( TxOut { value : 1_000_000 , script_pubkey : good_script } ) ) ;
563+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
564+
565+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap ( ) ;
566+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_some( ) ) ;
567+ }
568+
569+ #[ test]
570+ fn test_async_lookup ( ) {
571+ // Test a simple async lookup
572+ let ( valid_announcement, chain_source, network_graph, good_script,
573+ node_a_announce, node_b_announce, ..) = get_test_objects ( ) ;
574+
575+ let future = AccessFuture :: new ( ) ;
576+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
577+
578+ assert_eq ! (
579+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
580+ "Channel being checked async" ) ;
581+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
582+
583+ future. resolve_without_forwarding ( & network_graph,
584+ Ok ( TxOut { value : 0 , script_pubkey : good_script } ) ) ;
585+ network_graph. read_only ( ) . channels ( ) . get ( & valid_announcement. contents . short_channel_id ) . unwrap ( ) ;
586+ network_graph. read_only ( ) . channels ( ) . get ( & valid_announcement. contents . short_channel_id ) . unwrap ( ) ;
587+
588+ assert ! ( network_graph. read_only( ) . nodes( )
589+ . get( & NodeId :: from_pubkey( & valid_announcement. contents. node_id_1) ) . unwrap( )
590+ . announcement_info. is_none( ) ) ;
591+
592+ network_graph. update_node_from_announcement ( & node_a_announce) . unwrap ( ) ;
593+ network_graph. update_node_from_announcement ( & node_b_announce) . unwrap ( ) ;
594+
595+ assert ! ( network_graph. read_only( ) . nodes( )
596+ . get( & NodeId :: from_pubkey( & valid_announcement. contents. node_id_1) ) . unwrap( )
597+ . announcement_info. is_some( ) ) ;
598+ }
599+
600+ #[ test]
601+ fn test_invalid_async_lookup ( ) {
602+ // Test an async lookup which returns an incorrect script
603+ let ( valid_announcement, chain_source, network_graph, ..) = get_test_objects ( ) ;
604+
605+ let future = AccessFuture :: new ( ) ;
606+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
607+
608+ assert_eq ! (
609+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
610+ "Channel being checked async" ) ;
611+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
612+
613+ future. resolve_without_forwarding ( & network_graph,
614+ Ok ( TxOut { value : 1_000_000 , script_pubkey : bitcoin:: Script :: new ( ) } ) ) ;
615+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
616+ }
617+
618+ #[ test]
619+ fn test_failing_async_lookup ( ) {
620+ // Test an async lookup which returns an incorrect script
621+ let ( valid_announcement, chain_source, network_graph, ..) = get_test_objects ( ) ;
622+
623+ let future = AccessFuture :: new ( ) ;
624+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
625+
626+ assert_eq ! (
627+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
628+ "Channel being checked async" ) ;
629+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
630+
631+ future. resolve_without_forwarding ( & network_graph, Err ( ChainAccessError :: UnknownTx ) ) ;
632+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
633+ }
634+
635+ #[ test]
636+ fn test_updates_async_lookup ( ) {
637+ // Test async lookups will process pending channel_update/node_announcements once they
638+ // complete.
639+ let ( valid_announcement, chain_source, network_graph, good_script, node_a_announce,
640+ node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects ( ) ;
641+
642+ let future = AccessFuture :: new ( ) ;
643+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
644+
645+ assert_eq ! (
646+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
647+ "Channel being checked async" ) ;
648+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
649+
650+ assert_eq ! (
651+ network_graph. update_node_from_announcement( & node_a_announce) . unwrap_err( ) . err,
652+ "Awaiting channel_announcement validation to accept node_announcement" ) ;
653+ assert_eq ! (
654+ network_graph. update_node_from_announcement( & node_b_announce) . unwrap_err( ) . err,
655+ "Awaiting channel_announcement validation to accept node_announcement" ) ;
656+
657+ assert_eq ! ( network_graph. update_channel( & chan_update_a) . unwrap_err( ) . err,
658+ "Awaiting channel_announcement validation to accept channel_update" ) ;
659+ assert_eq ! ( network_graph. update_channel( & chan_update_b) . unwrap_err( ) . err,
660+ "Awaiting channel_announcement validation to accept channel_update" ) ;
661+
662+ future. resolve_without_forwarding ( & network_graph,
663+ Ok ( TxOut { value : 1_000_000 , script_pubkey : good_script } ) ) ;
664+
665+ assert ! ( network_graph. read_only( ) . channels( )
666+ . get( & valid_announcement. contents. short_channel_id) . unwrap( ) . one_to_two. is_some( ) ) ;
667+ assert ! ( network_graph. read_only( ) . channels( )
668+ . get( & valid_announcement. contents. short_channel_id) . unwrap( ) . two_to_one. is_some( ) ) ;
669+
670+ assert ! ( network_graph. read_only( ) . nodes( )
671+ . get( & NodeId :: from_pubkey( & valid_announcement. contents. node_id_1) ) . unwrap( )
672+ . announcement_info. is_some( ) ) ;
673+ assert ! ( network_graph. read_only( ) . nodes( )
674+ . get( & NodeId :: from_pubkey( & valid_announcement. contents. node_id_2) ) . unwrap( )
675+ . announcement_info. is_some( ) ) ;
676+ }
677+
678+ #[ test]
679+ fn test_latest_update_async_lookup ( ) {
680+ // Test async lookups will process the latest channel_update if two are received while
681+ // awaiting an async UTXO lookup.
682+ let ( valid_announcement, chain_source, network_graph, good_script, _,
683+ _, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects ( ) ;
684+
685+ let future = AccessFuture :: new ( ) ;
686+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
687+
688+ assert_eq ! (
689+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
690+ "Channel being checked async" ) ;
691+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
692+
693+ assert_eq ! ( network_graph. update_channel( & chan_update_a) . unwrap_err( ) . err,
694+ "Awaiting channel_announcement validation to accept channel_update" ) ;
695+ assert_eq ! ( network_graph. update_channel( & chan_update_b) . unwrap_err( ) . err,
696+ "Awaiting channel_announcement validation to accept channel_update" ) ;
697+ assert_eq ! ( network_graph. update_channel( & chan_update_c) . unwrap_err( ) . err,
698+ "Awaiting channel_announcement validation to accept channel_update" ) ;
699+
700+ future. resolve_without_forwarding ( & network_graph,
701+ Ok ( TxOut { value : 1_000_000 , script_pubkey : good_script } ) ) ;
702+
703+ assert_eq ! ( chan_update_a. contents. timestamp, chan_update_b. contents. timestamp) ;
704+ assert ! ( network_graph. read_only( ) . channels( )
705+ . get( & valid_announcement. contents. short_channel_id) . as_ref( ) . unwrap( )
706+ . one_to_two. as_ref( ) . unwrap( ) . last_update !=
707+ network_graph. read_only( ) . channels( )
708+ . get( & valid_announcement. contents. short_channel_id) . as_ref( ) . unwrap( )
709+ . two_to_one. as_ref( ) . unwrap( ) . last_update) ;
710+ }
711+
712+ #[ test]
713+ fn test_no_double_lookups ( ) {
714+ // Test that a pending async lookup will prevent a second async lookup from flying, but
715+ // only if the channel_announcement message is identical.
716+ let ( valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects ( ) ;
717+
718+ let future = AccessFuture :: new ( ) ;
719+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
720+
721+ assert_eq ! (
722+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
723+ "Channel being checked async" ) ;
724+ assert_eq ! ( chain_source. get_utxo_call_count. load( Ordering :: Relaxed ) , 1 ) ;
725+
726+ // If we make a second request with the same message, the call count doesn't increase...
727+ let future_b = AccessFuture :: new ( ) ;
728+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future_b. clone ( ) ) ;
729+ assert_eq ! (
730+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
731+ "Channel announcement is already being checked" ) ;
732+ assert_eq ! ( chain_source. get_utxo_call_count. load( Ordering :: Relaxed ) , 1 ) ;
733+
734+ // But if we make a third request with a tweaked message, we should get a second call
735+ // against our new future...
736+ let secp_ctx = Secp256k1 :: new ( ) ;
737+ let replacement_pk_1 = & SecretKey :: from_slice ( & [ 99 ; 32 ] ) . unwrap ( ) ;
738+ let replacement_pk_2 = & SecretKey :: from_slice ( & [ 98 ; 32 ] ) . unwrap ( ) ;
739+ let invalid_announcement = get_signed_channel_announcement ( |_| { } , replacement_pk_1, replacement_pk_2, & secp_ctx) ;
740+ assert_eq ! (
741+ network_graph. update_channel_from_announcement( & invalid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
742+ "Channel being checked async" ) ;
743+ assert_eq ! ( chain_source. get_utxo_call_count. load( Ordering :: Relaxed ) , 2 ) ;
744+
745+ // Still, if we resolve the original future, the original channel will be accepted.
746+ future. resolve_without_forwarding ( & network_graph,
747+ Ok ( TxOut { value : 1_000_000 , script_pubkey : good_script } ) ) ;
748+ assert ! ( !network_graph. read_only( ) . channels( )
749+ . get( & valid_announcement. contents. short_channel_id) . unwrap( )
750+ . announcement_message. as_ref( ) . unwrap( )
751+ . contents. features. supports_unknown_test_feature( ) ) ;
752+ }
753+
754+ #[ test]
755+ fn test_checks_backpressure ( ) {
756+ // Test that too_many_checks_pending returns true when there are many checks pending, and
757+ // returns false once they complete.
758+ let secp_ctx = Secp256k1 :: new ( ) ;
759+ let ( chain_source, network_graph) = get_network ( ) ;
760+
761+ // We cheat and use a single future for all the lookups to complete them all at once.
762+ let future = AccessFuture :: new ( ) ;
763+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
764+
765+ let node_1_privkey = & SecretKey :: from_slice ( & [ 42 ; 32 ] ) . unwrap ( ) ;
766+ let node_2_privkey = & SecretKey :: from_slice ( & [ 41 ; 32 ] ) . unwrap ( ) ;
767+
768+ for i in 0 ..PendingChecks :: MAX_PENDING_LOOKUPS {
769+ let valid_announcement = get_signed_channel_announcement (
770+ |msg| msg. short_channel_id += 1 + i as u64 , node_1_privkey, node_2_privkey, & secp_ctx) ;
771+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap_err ( ) ;
772+ assert ! ( !network_graph. pending_checks. too_many_checks_pending( ) ) ;
773+ }
774+
775+ let valid_announcement = get_signed_channel_announcement (
776+ |_| { } , node_1_privkey, node_2_privkey, & secp_ctx) ;
777+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap_err ( ) ;
778+ assert ! ( network_graph. pending_checks. too_many_checks_pending( ) ) ;
779+
780+ // Once the future completes the "too many checks" flag should reset.
781+ future. resolve_without_forwarding ( & network_graph, Err ( ChainAccessError :: UnknownTx ) ) ;
782+ assert ! ( !network_graph. pending_checks. too_many_checks_pending( ) ) ;
783+ }
784+
785+ #[ test]
786+ fn test_checks_backpressure_drop ( ) {
787+ // Test that too_many_checks_pending returns true when there are many checks pending, and
788+ // returns false if we drop some of the the futures without completion.
789+ let secp_ctx = Secp256k1 :: new ( ) ;
790+ let ( chain_source, network_graph) = get_network ( ) ;
791+
792+ // We cheat and use a single future for all the lookups to complete them all at once.
793+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( AccessFuture :: new ( ) ) ;
794+
795+ let node_1_privkey = & SecretKey :: from_slice ( & [ 42 ; 32 ] ) . unwrap ( ) ;
796+ let node_2_privkey = & SecretKey :: from_slice ( & [ 41 ; 32 ] ) . unwrap ( ) ;
797+
798+ for i in 0 ..PendingChecks :: MAX_PENDING_LOOKUPS {
799+ let valid_announcement = get_signed_channel_announcement (
800+ |msg| msg. short_channel_id += 1 + i as u64 , node_1_privkey, node_2_privkey, & secp_ctx) ;
801+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap_err ( ) ;
802+ assert ! ( !network_graph. pending_checks. too_many_checks_pending( ) ) ;
803+ }
804+
805+ let valid_announcement = get_signed_channel_announcement (
806+ |_| { } , node_1_privkey, node_2_privkey, & secp_ctx) ;
807+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap_err ( ) ;
808+ assert ! ( network_graph. pending_checks. too_many_checks_pending( ) ) ;
809+
810+ // Once the future is drop'd (by resetting the `utxo_ret` value) the "too many checks" flag
811+ // should reset to false.
812+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Sync ( Err ( ChainAccessError :: UnknownTx ) ) ;
813+ assert ! ( !network_graph. pending_checks. too_many_checks_pending( ) ) ;
814+ }
815+ }
0 commit comments