@@ -525,3 +525,313 @@ impl PendingChecks {
525525 }
526526 }
527527}
528+
529+ #[ cfg( test) ]
530+ mod tests {
531+ use super :: * ;
532+ use crate :: routing:: gossip:: tests:: * ;
533+ use crate :: util:: test_utils:: { TestChainSource , TestLogger } ;
534+ use crate :: ln:: msgs;
535+
536+ use bitcoin:: blockdata:: constants:: genesis_block;
537+ use bitcoin:: secp256k1:: { Secp256k1 , SecretKey } ;
538+
539+ use core:: sync:: atomic:: Ordering ;
540+
541+ fn get_network ( ) -> ( TestChainSource , NetworkGraph < Box < TestLogger > > ) {
542+ let logger = Box :: new ( TestLogger :: new ( ) ) ;
543+ let genesis_hash = genesis_block ( bitcoin:: Network :: Testnet ) . header . block_hash ( ) ;
544+ let chain_source = TestChainSource :: new ( bitcoin:: Network :: Testnet ) ;
545+ let network_graph = NetworkGraph :: new ( genesis_hash, logger) ;
546+
547+ ( chain_source, network_graph)
548+ }
549+
550+ fn get_test_objects ( ) -> ( msgs:: ChannelAnnouncement , TestChainSource ,
551+ NetworkGraph < Box < TestLogger > > , bitcoin:: Script , msgs:: NodeAnnouncement ,
552+ msgs:: NodeAnnouncement , msgs:: ChannelUpdate , msgs:: ChannelUpdate , msgs:: ChannelUpdate )
553+ {
554+ let secp_ctx = Secp256k1 :: new ( ) ;
555+
556+ let ( chain_source, network_graph) = get_network ( ) ;
557+
558+ let good_script = get_channel_script ( & secp_ctx) ;
559+ let node_1_privkey = & SecretKey :: from_slice ( & [ 42 ; 32 ] ) . unwrap ( ) ;
560+ let node_2_privkey = & SecretKey :: from_slice ( & [ 41 ; 32 ] ) . unwrap ( ) ;
561+ let valid_announcement = get_signed_channel_announcement ( |_| { } , node_1_privkey, node_2_privkey, & secp_ctx) ;
562+
563+ let node_a_announce = get_signed_node_announcement ( |_| { } , node_1_privkey, & secp_ctx) ;
564+ let node_b_announce = get_signed_node_announcement ( |_| { } , node_2_privkey, & secp_ctx) ;
565+
566+ // Note that we have to set the "direction" flag correctly on both messages
567+ let chan_update_a = get_signed_channel_update ( |msg| msg. flags = 0 , node_1_privkey, & secp_ctx) ;
568+ let chan_update_b = get_signed_channel_update ( |msg| msg. flags = 1 , node_2_privkey, & secp_ctx) ;
569+ let chan_update_c = get_signed_channel_update ( |msg| {
570+ msg. flags = 1 ; msg. timestamp += 1 ; } , node_2_privkey, & secp_ctx) ;
571+
572+ ( valid_announcement, chain_source, network_graph, good_script, node_a_announce,
573+ node_b_announce, chan_update_a, chan_update_b, chan_update_c)
574+ }
575+
576+ #[ test]
577+ fn test_fast_async_lookup ( ) {
578+ // Check that async lookups which resolve quicker than the future is returned to the
579+ // `get_utxo` call can read it still resolve properly.
580+ let ( valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects ( ) ;
581+
582+ let future = AccessFuture :: new ( ) ;
583+ future. resolve_without_forwarding ( & network_graph,
584+ Ok ( TxOut { value : 1_000_000 , script_pubkey : good_script } ) ) ;
585+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
586+
587+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap ( ) ;
588+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_some( ) ) ;
589+ }
590+
591+ #[ test]
592+ fn test_async_lookup ( ) {
593+ // Test a simple async lookup
594+ let ( valid_announcement, chain_source, network_graph, good_script,
595+ node_a_announce, node_b_announce, ..) = get_test_objects ( ) ;
596+
597+ let future = AccessFuture :: new ( ) ;
598+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
599+
600+ assert_eq ! (
601+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
602+ "Channel being checked async" ) ;
603+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
604+
605+ future. resolve_without_forwarding ( & network_graph,
606+ Ok ( TxOut { value : 0 , script_pubkey : good_script } ) ) ;
607+ network_graph. read_only ( ) . channels ( ) . get ( & valid_announcement. contents . short_channel_id ) . unwrap ( ) ;
608+ network_graph. read_only ( ) . channels ( ) . get ( & valid_announcement. contents . short_channel_id ) . unwrap ( ) ;
609+
610+ assert ! ( network_graph. read_only( ) . nodes( )
611+ . get( & NodeId :: from_pubkey( & valid_announcement. contents. node_id_1) ) . unwrap( )
612+ . announcement_info. is_none( ) ) ;
613+
614+ network_graph. update_node_from_announcement ( & node_a_announce) . unwrap ( ) ;
615+ network_graph. update_node_from_announcement ( & node_b_announce) . unwrap ( ) ;
616+
617+ assert ! ( network_graph. read_only( ) . nodes( )
618+ . get( & NodeId :: from_pubkey( & valid_announcement. contents. node_id_1) ) . unwrap( )
619+ . announcement_info. is_some( ) ) ;
620+ }
621+
622+ #[ test]
623+ fn test_invalid_async_lookup ( ) {
624+ // Test an async lookup which returns an incorrect script
625+ let ( valid_announcement, chain_source, network_graph, ..) = get_test_objects ( ) ;
626+
627+ let future = AccessFuture :: new ( ) ;
628+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
629+
630+ assert_eq ! (
631+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
632+ "Channel being checked async" ) ;
633+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
634+
635+ future. resolve_without_forwarding ( & network_graph,
636+ Ok ( TxOut { value : 1_000_000 , script_pubkey : bitcoin:: Script :: new ( ) } ) ) ;
637+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
638+ }
639+
640+ #[ test]
641+ fn test_failing_async_lookup ( ) {
642+ // Test an async lookup which returns an incorrect script
643+ let ( valid_announcement, chain_source, network_graph, ..) = get_test_objects ( ) ;
644+
645+ let future = AccessFuture :: new ( ) ;
646+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
647+
648+ assert_eq ! (
649+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
650+ "Channel being checked async" ) ;
651+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
652+
653+ future. resolve_without_forwarding ( & network_graph, Err ( ChainAccessError :: UnknownTx ) ) ;
654+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
655+ }
656+
657+ #[ test]
658+ fn test_updates_async_lookup ( ) {
659+ // Test async lookups will process pending channel_update/node_announcements once they
660+ // complete.
661+ let ( valid_announcement, chain_source, network_graph, good_script, node_a_announce,
662+ node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects ( ) ;
663+
664+ let future = AccessFuture :: new ( ) ;
665+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
666+
667+ assert_eq ! (
668+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
669+ "Channel being checked async" ) ;
670+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
671+
672+ assert_eq ! (
673+ network_graph. update_node_from_announcement( & node_a_announce) . unwrap_err( ) . err,
674+ "Awaiting channel_announcement validation to accept node_announcement" ) ;
675+ assert_eq ! (
676+ network_graph. update_node_from_announcement( & node_b_announce) . unwrap_err( ) . err,
677+ "Awaiting channel_announcement validation to accept node_announcement" ) ;
678+
679+ assert_eq ! ( network_graph. update_channel( & chan_update_a) . unwrap_err( ) . err,
680+ "Awaiting channel_announcement validation to accept channel_update" ) ;
681+ assert_eq ! ( network_graph. update_channel( & chan_update_b) . unwrap_err( ) . err,
682+ "Awaiting channel_announcement validation to accept channel_update" ) ;
683+
684+ future. resolve_without_forwarding ( & network_graph,
685+ Ok ( TxOut { value : 1_000_000 , script_pubkey : good_script } ) ) ;
686+
687+ assert ! ( network_graph. read_only( ) . channels( )
688+ . get( & valid_announcement. contents. short_channel_id) . unwrap( ) . one_to_two. is_some( ) ) ;
689+ assert ! ( network_graph. read_only( ) . channels( )
690+ . get( & valid_announcement. contents. short_channel_id) . unwrap( ) . two_to_one. is_some( ) ) ;
691+
692+ assert ! ( network_graph. read_only( ) . nodes( )
693+ . get( & NodeId :: from_pubkey( & valid_announcement. contents. node_id_1) ) . unwrap( )
694+ . announcement_info. is_some( ) ) ;
695+ assert ! ( network_graph. read_only( ) . nodes( )
696+ . get( & NodeId :: from_pubkey( & valid_announcement. contents. node_id_2) ) . unwrap( )
697+ . announcement_info. is_some( ) ) ;
698+ }
699+
700+ #[ test]
701+ fn test_latest_update_async_lookup ( ) {
702+ // Test async lookups will process the latest channel_update if two are received while
703+ // awaiting an async UTXO lookup.
704+ let ( valid_announcement, chain_source, network_graph, good_script, _,
705+ _, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects ( ) ;
706+
707+ let future = AccessFuture :: new ( ) ;
708+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
709+
710+ assert_eq ! (
711+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
712+ "Channel being checked async" ) ;
713+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
714+
715+ assert_eq ! ( network_graph. update_channel( & chan_update_a) . unwrap_err( ) . err,
716+ "Awaiting channel_announcement validation to accept channel_update" ) ;
717+ assert_eq ! ( network_graph. update_channel( & chan_update_b) . unwrap_err( ) . err,
718+ "Awaiting channel_announcement validation to accept channel_update" ) ;
719+ assert_eq ! ( network_graph. update_channel( & chan_update_c) . unwrap_err( ) . err,
720+ "Awaiting channel_announcement validation to accept channel_update" ) ;
721+
722+ future. resolve_without_forwarding ( & network_graph,
723+ Ok ( TxOut { value : 1_000_000 , script_pubkey : good_script } ) ) ;
724+
725+ assert_eq ! ( chan_update_a. contents. timestamp, chan_update_b. contents. timestamp) ;
726+ assert ! ( network_graph. read_only( ) . channels( )
727+ . get( & valid_announcement. contents. short_channel_id) . as_ref( ) . unwrap( )
728+ . one_to_two. as_ref( ) . unwrap( ) . last_update !=
729+ network_graph. read_only( ) . channels( )
730+ . get( & valid_announcement. contents. short_channel_id) . as_ref( ) . unwrap( )
731+ . two_to_one. as_ref( ) . unwrap( ) . last_update) ;
732+ }
733+
734+ #[ test]
735+ fn test_no_double_lookups ( ) {
736+ // Test that a pending async lookup will prevent a second async lookup from flying, but
737+ // only if the channel_announcement message is identical.
738+ let ( valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects ( ) ;
739+
740+ let future = AccessFuture :: new ( ) ;
741+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
742+
743+ assert_eq ! (
744+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
745+ "Channel being checked async" ) ;
746+ assert_eq ! ( chain_source. get_utxo_call_count. load( Ordering :: Relaxed ) , 1 ) ;
747+
748+ // If we make a second request with the same message, the call count doesn't increase...
749+ let future_b = AccessFuture :: new ( ) ;
750+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future_b. clone ( ) ) ;
751+ assert_eq ! (
752+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
753+ "Channel announcement is already being checked" ) ;
754+ assert_eq ! ( chain_source. get_utxo_call_count. load( Ordering :: Relaxed ) , 1 ) ;
755+
756+ // But if we make a third request with a tweaked message, we should get a second call
757+ // against our new future...
758+ let secp_ctx = Secp256k1 :: new ( ) ;
759+ let replacement_pk_1 = & SecretKey :: from_slice ( & [ 99 ; 32 ] ) . unwrap ( ) ;
760+ let replacement_pk_2 = & SecretKey :: from_slice ( & [ 98 ; 32 ] ) . unwrap ( ) ;
761+ let invalid_announcement = get_signed_channel_announcement ( |_| { } , replacement_pk_1, replacement_pk_2, & secp_ctx) ;
762+ assert_eq ! (
763+ network_graph. update_channel_from_announcement( & invalid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
764+ "Channel being checked async" ) ;
765+ assert_eq ! ( chain_source. get_utxo_call_count. load( Ordering :: Relaxed ) , 2 ) ;
766+
767+ // Still, if we resolve the original future, the original channel will be accepted.
768+ future. resolve_without_forwarding ( & network_graph,
769+ Ok ( TxOut { value : 1_000_000 , script_pubkey : good_script } ) ) ;
770+ assert ! ( !network_graph. read_only( ) . channels( )
771+ . get( & valid_announcement. contents. short_channel_id) . unwrap( )
772+ . announcement_message. as_ref( ) . unwrap( )
773+ . contents. features. supports_unknown_test_feature( ) ) ;
774+ }
775+
776+ #[ test]
777+ fn test_checks_backpressure ( ) {
778+ // Test that too_many_checks_pending returns true when there are many checks pending, and
779+ // returns false once they complete.
780+ let secp_ctx = Secp256k1 :: new ( ) ;
781+ let ( chain_source, network_graph) = get_network ( ) ;
782+
783+ // We cheat and use a single future for all the lookups to complete them all at once.
784+ let future = AccessFuture :: new ( ) ;
785+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
786+
787+ let node_1_privkey = & SecretKey :: from_slice ( & [ 42 ; 32 ] ) . unwrap ( ) ;
788+ let node_2_privkey = & SecretKey :: from_slice ( & [ 41 ; 32 ] ) . unwrap ( ) ;
789+
790+ for i in 0 ..PendingChecks :: MAX_PENDING_LOOKUPS {
791+ let valid_announcement = get_signed_channel_announcement (
792+ |msg| msg. short_channel_id += 1 + i as u64 , node_1_privkey, node_2_privkey, & secp_ctx) ;
793+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap_err ( ) ;
794+ assert ! ( !network_graph. pending_checks. too_many_checks_pending( ) ) ;
795+ }
796+
797+ let valid_announcement = get_signed_channel_announcement (
798+ |_| { } , node_1_privkey, node_2_privkey, & secp_ctx) ;
799+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap_err ( ) ;
800+ assert ! ( network_graph. pending_checks. too_many_checks_pending( ) ) ;
801+
802+ // Once the future completes the "too many checks" flag should reset.
803+ future. resolve_without_forwarding ( & network_graph, Err ( ChainAccessError :: UnknownTx ) ) ;
804+ assert ! ( !network_graph. pending_checks. too_many_checks_pending( ) ) ;
805+ }
806+
807+ #[ test]
808+ fn test_checks_backpressure_drop ( ) {
809+ // Test that too_many_checks_pending returns true when there are many checks pending, and
810+ // returns false if we drop some of the the futures without completion.
811+ let secp_ctx = Secp256k1 :: new ( ) ;
812+ let ( chain_source, network_graph) = get_network ( ) ;
813+
814+ // We cheat and use a single future for all the lookups to complete them all at once.
815+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( AccessFuture :: new ( ) ) ;
816+
817+ let node_1_privkey = & SecretKey :: from_slice ( & [ 42 ; 32 ] ) . unwrap ( ) ;
818+ let node_2_privkey = & SecretKey :: from_slice ( & [ 41 ; 32 ] ) . unwrap ( ) ;
819+
820+ for i in 0 ..PendingChecks :: MAX_PENDING_LOOKUPS {
821+ let valid_announcement = get_signed_channel_announcement (
822+ |msg| msg. short_channel_id += 1 + i as u64 , node_1_privkey, node_2_privkey, & secp_ctx) ;
823+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap_err ( ) ;
824+ assert ! ( !network_graph. pending_checks. too_many_checks_pending( ) ) ;
825+ }
826+
827+ let valid_announcement = get_signed_channel_announcement (
828+ |_| { } , node_1_privkey, node_2_privkey, & secp_ctx) ;
829+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap_err ( ) ;
830+ assert ! ( network_graph. pending_checks. too_many_checks_pending( ) ) ;
831+
832+ // Once the future is drop'd (by resetting the `utxo_ret` value) the "too many checks" flag
833+ // should reset to false.
834+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Sync ( Err ( ChainAccessError :: UnknownTx ) ) ;
835+ assert ! ( !network_graph. pending_checks. too_many_checks_pending( ) ) ;
836+ }
837+ }
0 commit comments