@@ -932,7 +932,7 @@ mod tests {
932932 use lightning:: chain:: channelmonitor:: ANTI_REORG_DELAY ;
933933 use lightning:: sign:: { InMemorySigner , KeysManager , ChangeDestinationSource } ;
934934 use lightning:: chain:: transaction:: OutPoint ;
935- use lightning:: events:: { Event , PathFailure , MessageSendEventsProvider , MessageSendEvent } ;
935+ use lightning:: events:: { Event , PathFailure , MessageSendEventsProvider , MessageSendEvent , ReplayEvent } ;
936936 use lightning:: { get_event_msg, get_event} ;
937937 use lightning:: ln:: types:: { PaymentHash , ChannelId } ;
938938 use lightning:: ln:: channelmanager;
@@ -954,6 +954,7 @@ mod tests {
954954 SCORER_PERSISTENCE_PRIMARY_NAMESPACE , SCORER_PERSISTENCE_SECONDARY_NAMESPACE , SCORER_PERSISTENCE_KEY } ;
955955 use lightning:: util:: sweep:: { OutputSweeper , OutputSpendStatus } ;
956956 use lightning_persister:: fs_store:: FilesystemStore ;
957+ use core:: sync:: atomic:: { AtomicBool , Ordering } ;
957958 use std:: collections:: VecDeque ;
958959 use std:: { fs, env} ;
959960 use std:: path:: PathBuf ;
@@ -1774,6 +1775,40 @@ mod tests {
17741775 }
17751776 }
17761777
1778+ #[ test]
1779+ fn test_event_handling_failures_are_replayed ( ) {
1780+ let ( _, nodes) = create_nodes ( 2 , "test_event_handling_failures_are_replayed" ) ;
1781+ let channel_value = 100000 ;
1782+ let data_dir = nodes[ 0 ] . kv_store . get_data_dir ( ) ;
1783+ let persister = Arc :: new ( Persister :: new ( data_dir. clone ( ) ) ) ;
1784+
1785+ let ( first_event_send, first_event_recv) = std:: sync:: mpsc:: sync_channel ( 1 ) ;
1786+ let ( second_event_send, second_event_recv) = std:: sync:: mpsc:: sync_channel ( 1 ) ;
1787+ let should_fail_event_handling = Arc :: new ( AtomicBool :: new ( true ) ) ;
1788+ let event_handler = move |event : Event | {
1789+ if let Ok ( true ) = should_fail_event_handling. compare_exchange ( true , false , Ordering :: Acquire , Ordering :: Relaxed ) {
1790+ first_event_send. send ( event) . unwrap ( ) ;
1791+ return Err ( ReplayEvent ( ) ) ;
1792+ }
1793+
1794+ second_event_send. send ( event) . unwrap ( ) ;
1795+ Ok ( ( ) )
1796+ } ;
1797+
1798+ let bg_processor = BackgroundProcessor :: start (
1799+ persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) ,
1800+ Some ( nodes[ 0 ] . messenger . clone ( ) ) , nodes[ 0 ] . no_gossip_sync ( ) ,
1801+ nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) , Some ( nodes[ 0 ] . scorer . clone ( ) )
1802+ ) ;
1803+
1804+ begin_open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , channel_value) ;
1805+ assert_eq ! ( first_event_recv. recv_timeout( Duration :: from_secs( EVENT_DEADLINE ) ) , second_event_recv. recv_timeout( Duration :: from_secs( EVENT_DEADLINE ) ) ) ;
1806+
1807+ if !std:: thread:: panicking ( ) {
1808+ bg_processor. stop ( ) . unwrap ( ) ;
1809+ }
1810+ }
1811+
17771812 #[ test]
17781813 fn test_scorer_persistence ( ) {
17791814 let ( _, nodes) = create_nodes ( 2 , "test_scorer_persistence" ) ;
0 commit comments