@@ -524,3 +524,313 @@ impl PendingChecks {
524524 }
525525 }
526526}
527+
528+ #[ cfg( test) ]
529+ mod tests {
530+ use super :: * ;
531+ use crate :: routing:: gossip:: tests:: * ;
532+ use crate :: util:: test_utils:: { TestChainSource , TestLogger } ;
533+ use crate :: ln:: msgs;
534+
535+ use bitcoin:: blockdata:: constants:: genesis_block;
536+ use bitcoin:: secp256k1:: { Secp256k1 , SecretKey } ;
537+
538+ use core:: sync:: atomic:: Ordering ;
539+
540+ fn get_network ( ) -> ( TestChainSource , NetworkGraph < Box < TestLogger > > ) {
541+ let logger = Box :: new ( TestLogger :: new ( ) ) ;
542+ let genesis_hash = genesis_block ( bitcoin:: Network :: Testnet ) . header . block_hash ( ) ;
543+ let chain_source = TestChainSource :: new ( bitcoin:: Network :: Testnet ) ;
544+ let network_graph = NetworkGraph :: new ( genesis_hash, logger) ;
545+
546+ ( chain_source, network_graph)
547+ }
548+
549+ fn get_test_objects ( ) -> ( msgs:: ChannelAnnouncement , TestChainSource ,
550+ NetworkGraph < Box < TestLogger > > , bitcoin:: Script , msgs:: NodeAnnouncement ,
551+ msgs:: NodeAnnouncement , msgs:: ChannelUpdate , msgs:: ChannelUpdate , msgs:: ChannelUpdate )
552+ {
553+ let secp_ctx = Secp256k1 :: new ( ) ;
554+
555+ let ( chain_source, network_graph) = get_network ( ) ;
556+
557+ let good_script = get_channel_script ( & secp_ctx) ;
558+ let node_1_privkey = & SecretKey :: from_slice ( & [ 42 ; 32 ] ) . unwrap ( ) ;
559+ let node_2_privkey = & SecretKey :: from_slice ( & [ 41 ; 32 ] ) . unwrap ( ) ;
560+ let valid_announcement = get_signed_channel_announcement ( |_| { } , node_1_privkey, node_2_privkey, & secp_ctx) ;
561+
562+ let node_a_announce = get_signed_node_announcement ( |_| { } , node_1_privkey, & secp_ctx) ;
563+ let node_b_announce = get_signed_node_announcement ( |_| { } , node_2_privkey, & secp_ctx) ;
564+
565+ // Note that we have to set the "direction" flag correctly on both messages
566+ let chan_update_a = get_signed_channel_update ( |msg| msg. flags = 0 , node_1_privkey, & secp_ctx) ;
567+ let chan_update_b = get_signed_channel_update ( |msg| msg. flags = 1 , node_2_privkey, & secp_ctx) ;
568+ let chan_update_c = get_signed_channel_update ( |msg| {
569+ msg. flags = 1 ; msg. timestamp += 1 ; } , node_2_privkey, & secp_ctx) ;
570+
571+ ( valid_announcement, chain_source, network_graph, good_script, node_a_announce,
572+ node_b_announce, chan_update_a, chan_update_b, chan_update_c)
573+ }
574+
575+ #[ test]
576+ fn test_fast_async_lookup ( ) {
577+ // Check that async lookups which resolve quicker than the future is returned to the
578+ // `get_utxo` call can read it still resolve properly.
579+ let ( valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects ( ) ;
580+
581+ let future = AccessFuture :: new ( ) ;
582+ future. resolve_without_forwarding ( & network_graph,
583+ Ok ( TxOut { value : 1_000_000 , script_pubkey : good_script } ) ) ;
584+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
585+
586+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap ( ) ;
587+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_some( ) ) ;
588+ }
589+
590+ #[ test]
591+ fn test_async_lookup ( ) {
592+ // Test a simple async lookup
593+ let ( valid_announcement, chain_source, network_graph, good_script,
594+ node_a_announce, node_b_announce, ..) = get_test_objects ( ) ;
595+
596+ let future = AccessFuture :: new ( ) ;
597+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
598+
599+ assert_eq ! (
600+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
601+ "Channel being checked async" ) ;
602+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
603+
604+ future. resolve_without_forwarding ( & network_graph,
605+ Ok ( TxOut { value : 0 , script_pubkey : good_script } ) ) ;
606+ network_graph. read_only ( ) . channels ( ) . get ( & valid_announcement. contents . short_channel_id ) . unwrap ( ) ;
607+ network_graph. read_only ( ) . channels ( ) . get ( & valid_announcement. contents . short_channel_id ) . unwrap ( ) ;
608+
609+ assert ! ( network_graph. read_only( ) . nodes( )
610+ . get( & NodeId :: from_pubkey( & valid_announcement. contents. node_id_1) ) . unwrap( )
611+ . announcement_info. is_none( ) ) ;
612+
613+ network_graph. update_node_from_announcement ( & node_a_announce) . unwrap ( ) ;
614+ network_graph. update_node_from_announcement ( & node_b_announce) . unwrap ( ) ;
615+
616+ assert ! ( network_graph. read_only( ) . nodes( )
617+ . get( & NodeId :: from_pubkey( & valid_announcement. contents. node_id_1) ) . unwrap( )
618+ . announcement_info. is_some( ) ) ;
619+ }
620+
621+ #[ test]
622+ fn test_invalid_async_lookup ( ) {
623+ // Test an async lookup which returns an incorrect script
624+ let ( valid_announcement, chain_source, network_graph, ..) = get_test_objects ( ) ;
625+
626+ let future = AccessFuture :: new ( ) ;
627+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
628+
629+ assert_eq ! (
630+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
631+ "Channel being checked async" ) ;
632+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
633+
634+ future. resolve_without_forwarding ( & network_graph,
635+ Ok ( TxOut { value : 1_000_000 , script_pubkey : bitcoin:: Script :: new ( ) } ) ) ;
636+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
637+ }
638+
639+ #[ test]
640+ fn test_failing_async_lookup ( ) {
641+ // Test an async lookup which returns an incorrect script
642+ let ( valid_announcement, chain_source, network_graph, ..) = get_test_objects ( ) ;
643+
644+ let future = AccessFuture :: new ( ) ;
645+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
646+
647+ assert_eq ! (
648+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
649+ "Channel being checked async" ) ;
650+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
651+
652+ future. resolve_without_forwarding ( & network_graph, Err ( ChainAccessError :: UnknownTx ) ) ;
653+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
654+ }
655+
656+ #[ test]
657+ fn test_updates_async_lookup ( ) {
658+ // Test async lookups will process pending channel_update/node_announcements once they
659+ // complete.
660+ let ( valid_announcement, chain_source, network_graph, good_script, node_a_announce,
661+ node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects ( ) ;
662+
663+ let future = AccessFuture :: new ( ) ;
664+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
665+
666+ assert_eq ! (
667+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
668+ "Channel being checked async" ) ;
669+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
670+
671+ assert_eq ! (
672+ network_graph. update_node_from_announcement( & node_a_announce) . unwrap_err( ) . err,
673+ "Awaiting channel_announcement validation to accept node_announcement" ) ;
674+ assert_eq ! (
675+ network_graph. update_node_from_announcement( & node_b_announce) . unwrap_err( ) . err,
676+ "Awaiting channel_announcement validation to accept node_announcement" ) ;
677+
678+ assert_eq ! ( network_graph. update_channel( & chan_update_a) . unwrap_err( ) . err,
679+ "Awaiting channel_announcement validation to accept channel_update" ) ;
680+ assert_eq ! ( network_graph. update_channel( & chan_update_b) . unwrap_err( ) . err,
681+ "Awaiting channel_announcement validation to accept channel_update" ) ;
682+
683+ future. resolve_without_forwarding ( & network_graph,
684+ Ok ( TxOut { value : 1_000_000 , script_pubkey : good_script } ) ) ;
685+
686+ assert ! ( network_graph. read_only( ) . channels( )
687+ . get( & valid_announcement. contents. short_channel_id) . unwrap( ) . one_to_two. is_some( ) ) ;
688+ assert ! ( network_graph. read_only( ) . channels( )
689+ . get( & valid_announcement. contents. short_channel_id) . unwrap( ) . two_to_one. is_some( ) ) ;
690+
691+ assert ! ( network_graph. read_only( ) . nodes( )
692+ . get( & NodeId :: from_pubkey( & valid_announcement. contents. node_id_1) ) . unwrap( )
693+ . announcement_info. is_some( ) ) ;
694+ assert ! ( network_graph. read_only( ) . nodes( )
695+ . get( & NodeId :: from_pubkey( & valid_announcement. contents. node_id_2) ) . unwrap( )
696+ . announcement_info. is_some( ) ) ;
697+ }
698+
699+ #[ test]
700+ fn test_latest_update_async_lookup ( ) {
701+ // Test async lookups will process the latest channel_update if two are received while
702+ // awaiting an async UTXO lookup.
703+ let ( valid_announcement, chain_source, network_graph, good_script, _,
704+ _, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects ( ) ;
705+
706+ let future = AccessFuture :: new ( ) ;
707+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
708+
709+ assert_eq ! (
710+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
711+ "Channel being checked async" ) ;
712+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
713+
714+ assert_eq ! ( network_graph. update_channel( & chan_update_a) . unwrap_err( ) . err,
715+ "Awaiting channel_announcement validation to accept channel_update" ) ;
716+ assert_eq ! ( network_graph. update_channel( & chan_update_b) . unwrap_err( ) . err,
717+ "Awaiting channel_announcement validation to accept channel_update" ) ;
718+ assert_eq ! ( network_graph. update_channel( & chan_update_c) . unwrap_err( ) . err,
719+ "Awaiting channel_announcement validation to accept channel_update" ) ;
720+
721+ future. resolve_without_forwarding ( & network_graph,
722+ Ok ( TxOut { value : 1_000_000 , script_pubkey : good_script } ) ) ;
723+
724+ assert_eq ! ( chan_update_a. contents. timestamp, chan_update_b. contents. timestamp) ;
725+ assert ! ( network_graph. read_only( ) . channels( )
726+ . get( & valid_announcement. contents. short_channel_id) . as_ref( ) . unwrap( )
727+ . one_to_two. as_ref( ) . unwrap( ) . last_update !=
728+ network_graph. read_only( ) . channels( )
729+ . get( & valid_announcement. contents. short_channel_id) . as_ref( ) . unwrap( )
730+ . two_to_one. as_ref( ) . unwrap( ) . last_update) ;
731+ }
732+
733+ #[ test]
734+ fn test_no_double_lookups ( ) {
735+ // Test that a pending async lookup will prevent a second async lookup from flying, but
736+ // only if the channel_announcement message is identical.
737+ let ( valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects ( ) ;
738+
739+ let future = AccessFuture :: new ( ) ;
740+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
741+
742+ assert_eq ! (
743+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
744+ "Channel being checked async" ) ;
745+ assert_eq ! ( chain_source. get_utxo_call_count. load( Ordering :: Relaxed ) , 1 ) ;
746+
747+ // If we make a second request with the same message, the call count doesn't increase...
748+ let future_b = AccessFuture :: new ( ) ;
749+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future_b. clone ( ) ) ;
750+ assert_eq ! (
751+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
752+ "Channel announcement is already being checked" ) ;
753+ assert_eq ! ( chain_source. get_utxo_call_count. load( Ordering :: Relaxed ) , 1 ) ;
754+
755+ // But if we make a third request with a tweaked message, we should get a second call
756+ // against our new future...
757+ let secp_ctx = Secp256k1 :: new ( ) ;
758+ let replacement_pk_1 = & SecretKey :: from_slice ( & [ 99 ; 32 ] ) . unwrap ( ) ;
759+ let replacement_pk_2 = & SecretKey :: from_slice ( & [ 98 ; 32 ] ) . unwrap ( ) ;
760+ let invalid_announcement = get_signed_channel_announcement ( |_| { } , replacement_pk_1, replacement_pk_2, & secp_ctx) ;
761+ assert_eq ! (
762+ network_graph. update_channel_from_announcement( & invalid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
763+ "Channel being checked async" ) ;
764+ assert_eq ! ( chain_source. get_utxo_call_count. load( Ordering :: Relaxed ) , 2 ) ;
765+
766+ // Still, if we resolve the original future, the original channel will be accepted.
767+ future. resolve_without_forwarding ( & network_graph,
768+ Ok ( TxOut { value : 1_000_000 , script_pubkey : good_script } ) ) ;
769+ assert ! ( !network_graph. read_only( ) . channels( )
770+ . get( & valid_announcement. contents. short_channel_id) . unwrap( )
771+ . announcement_message. as_ref( ) . unwrap( )
772+ . contents. features. supports_unknown_test_feature( ) ) ;
773+ }
774+
775+ #[ test]
776+ fn test_checks_backpressure ( ) {
777+ // Test that too_many_checks_pending returns true when there are many checks pending, and
778+ // returns false once they complete.
779+ let secp_ctx = Secp256k1 :: new ( ) ;
780+ let ( chain_source, network_graph) = get_network ( ) ;
781+
782+ // We cheat and use a single future for all the lookups to complete them all at once.
783+ let future = AccessFuture :: new ( ) ;
784+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
785+
786+ let node_1_privkey = & SecretKey :: from_slice ( & [ 42 ; 32 ] ) . unwrap ( ) ;
787+ let node_2_privkey = & SecretKey :: from_slice ( & [ 41 ; 32 ] ) . unwrap ( ) ;
788+
789+ for i in 0 ..PendingChecks :: MAX_PENDING_LOOKUPS {
790+ let valid_announcement = get_signed_channel_announcement (
791+ |msg| msg. short_channel_id += 1 + i as u64 , node_1_privkey, node_2_privkey, & secp_ctx) ;
792+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap_err ( ) ;
793+ assert ! ( !network_graph. pending_checks. too_many_checks_pending( ) ) ;
794+ }
795+
796+ let valid_announcement = get_signed_channel_announcement (
797+ |_| { } , node_1_privkey, node_2_privkey, & secp_ctx) ;
798+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap_err ( ) ;
799+ assert ! ( network_graph. pending_checks. too_many_checks_pending( ) ) ;
800+
801+ // Once the future completes the "too many checks" flag should reset.
802+ future. resolve_without_forwarding ( & network_graph, Err ( ChainAccessError :: UnknownTx ) ) ;
803+ assert ! ( !network_graph. pending_checks. too_many_checks_pending( ) ) ;
804+ }
805+
806+ #[ test]
807+ fn test_checks_backpressure_drop ( ) {
808+ // Test that too_many_checks_pending returns true when there are many checks pending, and
809+ // returns false if we drop some of the the futures without completion.
810+ let secp_ctx = Secp256k1 :: new ( ) ;
811+ let ( chain_source, network_graph) = get_network ( ) ;
812+
813+ // We cheat and use a single future for all the lookups to complete them all at once.
814+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( AccessFuture :: new ( ) ) ;
815+
816+ let node_1_privkey = & SecretKey :: from_slice ( & [ 42 ; 32 ] ) . unwrap ( ) ;
817+ let node_2_privkey = & SecretKey :: from_slice ( & [ 41 ; 32 ] ) . unwrap ( ) ;
818+
819+ for i in 0 ..PendingChecks :: MAX_PENDING_LOOKUPS {
820+ let valid_announcement = get_signed_channel_announcement (
821+ |msg| msg. short_channel_id += 1 + i as u64 , node_1_privkey, node_2_privkey, & secp_ctx) ;
822+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap_err ( ) ;
823+ assert ! ( !network_graph. pending_checks. too_many_checks_pending( ) ) ;
824+ }
825+
826+ let valid_announcement = get_signed_channel_announcement (
827+ |_| { } , node_1_privkey, node_2_privkey, & secp_ctx) ;
828+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap_err ( ) ;
829+ assert ! ( network_graph. pending_checks. too_many_checks_pending( ) ) ;
830+
831+ // Once the future is drop'd (by resetting the `utxo_ret` value) the "too many checks" flag
832+ // should reset to false.
833+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Sync ( Err ( ChainAccessError :: UnknownTx ) ) ;
834+ assert ! ( !network_graph. pending_checks. too_many_checks_pending( ) ) ;
835+ }
836+ }
0 commit comments