Skip to content

Commit eb06199

Browse files
Score payment paths from events in BackgroundProcessor
1 parent d2bf407 commit eb06199

File tree

1 file changed

+212
-14
lines changed
  • lightning-background-processor/src

1 file changed

+212
-14
lines changed

lightning-background-processor/src/lib.rs

+212-14
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
#[cfg(any(test, feature = "std"))]
1717
extern crate core;
1818

19+
#[cfg(not(feature = "std"))]
20+
extern crate alloc;
21+
1922
#[macro_use] extern crate lightning;
2023
extern crate lightning_rapid_gossip_sync;
2124

@@ -28,7 +31,7 @@ use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMes
2831
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
2932
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
3033
use lightning::routing::router::Router;
31-
use lightning::routing::scoring::WriteableScore;
34+
use lightning::routing::scoring::{Score, WriteableScore};
3235
use lightning::util::events::{Event, EventHandler, EventsProvider};
3336
use lightning::util::logger::Logger;
3437
use lightning::util::persist::Persister;
@@ -49,6 +52,8 @@ use std::time::Instant;
4952

5053
#[cfg(feature = "futures")]
5154
use futures_util::{select_biased, future::FutureExt, task};
55+
#[cfg(not(feature = "std"))]
56+
use alloc::vec::Vec;
5257

5358
/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
5459
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
@@ -216,6 +221,31 @@ fn handle_network_graph_update<L: Deref>(
216221
}
217222
}
218223

224+
fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
225+
scorer: &'a S, event: &Event
226+
) {
227+
let mut score = scorer.lock();
228+
match event {
229+
Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
230+
let path = path.iter().collect::<Vec<_>>();
231+
score.payment_path_failed(&path, *scid);
232+
},
233+
Event::PaymentPathSuccessful { path, .. } => {
234+
let path = path.iter().collect::<Vec<_>>();
235+
score.payment_path_successful(&path);
236+
},
237+
Event::ProbeSuccessful { path, .. } => {
238+
let path = path.iter().collect::<Vec<_>>();
239+
score.probe_successful(&path);
240+
},
241+
Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
242+
let path = path.iter().collect::<Vec<_>>();
243+
score.probe_failed(&path, *scid);
244+
},
245+
_ => {},
246+
}
247+
}
248+
219249
macro_rules! define_run_body {
220250
($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
221251
$channel_manager: ident, $process_channel_manager_events: expr,
@@ -387,7 +417,7 @@ pub async fn process_events_async<
387417
UMH: 'static + Deref + Send + Sync,
388418
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
389419
S: 'static + Deref<Target = SC> + Send + Sync,
390-
SC: WriteableScore<'a>,
420+
SC: for<'b> WriteableScore<'b>,
391421
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
392422
Sleeper: Fn(Duration) -> SleepFuture
393423
>(
@@ -417,10 +447,14 @@ where
417447
let async_event_handler = |event| {
418448
let network_graph = gossip_sync.network_graph();
419449
let event_handler = &event_handler;
450+
let scorer = &scorer;
420451
async move {
421452
if let Some(network_graph) = network_graph {
422453
handle_network_graph_update(network_graph, &event)
423454
}
455+
if let Some(ref scorer) = scorer {
456+
update_scorer(scorer, &event);
457+
}
424458
event_handler(event).await;
425459
}
426460
};
@@ -516,7 +550,7 @@ impl BackgroundProcessor {
516550
UMH: 'static + Deref + Send + Sync,
517551
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
518552
S: 'static + Deref<Target = SC> + Send + Sync,
519-
SC: WriteableScore<'a>,
553+
SC: for <'b> WriteableScore<'b>,
520554
>(
521555
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
522556
gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
@@ -547,6 +581,9 @@ impl BackgroundProcessor {
547581
if let Some(network_graph) = network_graph {
548582
handle_network_graph_update(network_graph, &event)
549583
}
584+
if let Some(ref scorer) = scorer {
585+
update_scorer(scorer, &event);
586+
}
550587
event_handler.handle_event(event);
551588
};
552589
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
@@ -618,15 +655,16 @@ mod tests {
618655
use lightning::chain::keysinterface::{InMemorySigner, EntropySource, KeysManager};
619656
use lightning::chain::transaction::OutPoint;
620657
use lightning::get_event_msg;
621-
use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
658+
use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, PaymentId, Retry as ChannelManagerRetry, SimpleArcChannelManager};
659+
use lightning::ln::functional_test_utils::*;
622660
use lightning::ln::features::ChannelFeatures;
623-
use lightning::ln::msgs::{ChannelMessageHandler, Init};
661+
use lightning::ln::msgs::{ChannelMessageHandler, Init, RoutingMessageHandler};
624662
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
625-
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
626-
use lightning::routing::router::DefaultRouter;
663+
use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
664+
use lightning::routing::router::{DefaultRouter, PaymentParameters, RouteParameters};
627665
use lightning::routing::scoring::{ProbabilisticScoringParameters, ProbabilisticScorer};
628666
use lightning::util::config::UserConfig;
629-
use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
667+
use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent, PaymentPurpose};
630668
use lightning::util::ser::Writeable;
631669
use lightning::util::test_utils;
632670
use lightning::util::persist::KVStorePersister;
@@ -782,8 +820,8 @@ mod tests {
782820
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
783821
let best_block = BestBlock::from_genesis(network);
784822
let params = ChainParameters { network, best_block };
785-
let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
786-
let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
823+
let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), test_default_channel_config(), params));
824+
let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), None, logger.clone()));
787825
let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
788826
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
789827
let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
@@ -803,7 +841,7 @@ mod tests {
803841

804842
macro_rules! open_channel {
805843
($node_a: expr, $node_b: expr, $channel_value: expr) => {{
806-
begin_open_channel!($node_a, $node_b, $channel_value);
844+
begin_open_channel!($node_a, $node_b, $channel_value, None);
807845
let events = $node_a.node.get_and_clear_pending_events();
808846
assert_eq!(events.len(), 1);
809847
let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
@@ -813,8 +851,8 @@ mod tests {
813851
}
814852

815853
macro_rules! begin_open_channel {
816-
($node_a: expr, $node_b: expr, $channel_value: expr) => {{
817-
$node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
854+
($node_a: expr, $node_b: expr, $channel_value: expr, $override_cfg: expr) => {{
855+
$node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, $override_cfg).unwrap();
818856
$node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
819857
$node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
820858
}}
@@ -1034,7 +1072,7 @@ mod tests {
10341072
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
10351073

10361074
// Open a channel and check that the FundingGenerationReady event was handled.
1037-
begin_open_channel!(nodes[0], nodes[1], channel_value);
1075+
begin_open_channel!(nodes[0], nodes[1], channel_value, Some(UserConfig::default()));
10381076
let (temporary_channel_id, funding_tx) = receiver
10391077
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
10401078
.expect("FundingGenerationReady not handled within deadline");
@@ -1171,4 +1209,164 @@ mod tests {
11711209
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
11721210
assert!(bg_processor.stop().is_ok());
11731211
}
1212+
1213+
#[test]
1214+
fn test_payment_path_scoring() {
1215+
// Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1216+
// that we update the scorer upon a payment path succeeding (note that the channel must be
1217+
// public or else we won't score it).
1218+
let mut nodes = create_nodes(2, "test_payment_path_scoring".to_string());
1219+
let channel_value = 100000;
1220+
let data_dir_0 = nodes[0].persister.get_data_dir();
1221+
let persister_0 = Arc::new(Persister::new(data_dir_0.clone()));
1222+
1223+
// Set up a background event handler for FundingGenerationReady events.
1224+
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1225+
let event_handler_0 = move |event: Event| match event {
1226+
Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1227+
Event::ChannelReady { .. } => {},
1228+
_ => panic!("Unexpected event: {:?}", event),
1229+
};
1230+
1231+
let bg_processor_0 = BackgroundProcessor::start(persister_0, event_handler_0, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1232+
1233+
// Open a channel and check that the FundingGenerationReady event was handled.
1234+
begin_open_channel!(nodes[0], nodes[1], channel_value, None);
1235+
let (temporary_channel_id, funding_tx) = receiver
1236+
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1237+
.expect("FundingGenerationReady not handled within deadline");
1238+
end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1239+
1240+
// Confirm the funding transaction.
1241+
confirm_transaction(&mut nodes[0], &funding_tx);
1242+
let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1243+
confirm_transaction(&mut nodes[1], &funding_tx);
1244+
let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1245+
nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1246+
let as_ann_sigs = get_event_msg!(nodes[0], MessageSendEvent::SendAnnouncementSignatures, nodes[1].node.get_our_node_id());
1247+
nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1248+
let bs_ann_sigs = get_event_msg!(nodes[1], MessageSendEvent::SendAnnouncementSignatures, nodes[0].node.get_our_node_id());
1249+
nodes[1].node.handle_announcement_signatures(&nodes[0].node.get_our_node_id(), &as_ann_sigs);
1250+
let events = nodes[1].node.get_and_clear_pending_msg_events();
1251+
assert_eq!(events.len(), 1);
1252+
let (ann, bs_update) = match events[0] {
1253+
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
1254+
(msg, update_msg)
1255+
},
1256+
_ => panic!("Unexpected event"),
1257+
};
1258+
let scid = bs_update.contents.short_channel_id;
1259+
nodes[0].node.handle_announcement_signatures(&nodes[1].node.get_our_node_id(), &bs_ann_sigs);
1260+
let events = nodes[0].node.get_and_clear_pending_msg_events();
1261+
assert_eq!(events.len(), 1);
1262+
let as_update = match events[0] {
1263+
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
1264+
assert!(*ann == *msg);
1265+
assert_eq!(update_msg.contents.short_channel_id, ann.contents.short_channel_id);
1266+
assert_eq!(update_msg.contents.short_channel_id, bs_update.contents.short_channel_id);
1267+
update_msg
1268+
},
1269+
_ => panic!("Unexpected event"),
1270+
};
1271+
1272+
for node in nodes.iter() {
1273+
assert!(node.p2p_gossip_sync.handle_channel_announcement(ann).unwrap());
1274+
node.p2p_gossip_sync.handle_channel_update(as_update).unwrap();
1275+
node.p2p_gossip_sync.handle_channel_update(bs_update).unwrap();
1276+
1277+
node.node.handle_channel_update(&nodes[0].node.get_our_node_id(), &as_update);
1278+
node.node.handle_channel_update(&nodes[1].node.get_our_node_id(), &bs_update);
1279+
}
1280+
1281+
assert!(bg_processor_0.stop().is_ok());
1282+
1283+
let (sender_0, receiver_0) = std::sync::mpsc::sync_channel(1);
1284+
let event_handler_0 = move |event: Event| match event {
1285+
Event::ChannelReady { .. } => {},
1286+
Event::PendingHTLCsForwardable { .. } => {},
1287+
Event::PaymentSent { .. } => {},
1288+
Event::PaymentPathSuccessful { .. } => sender_0.send(event).unwrap(),
1289+
_ => panic!("Unexpected event: {:?}", event),
1290+
};
1291+
let persister_0 = Arc::new(Persister::new(data_dir_0));
1292+
let _bg_processor_0 = BackgroundProcessor::start(persister_0, event_handler_0, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1293+
1294+
let amt_msat = 40000;
1295+
let payment_params = PaymentParameters::from_node_id(nodes[1].node.get_our_node_id())
1296+
.with_features(nodes[1].node.invoice_features());
1297+
let route_params = RouteParameters {
1298+
payment_params,
1299+
final_value_msat: amt_msat,
1300+
final_cltv_expiry_delta: 70,
1301+
};
1302+
let (payment_hash, payment_secret) = nodes[1].node.create_inbound_payment(Some(amt_msat), 60*60, None).unwrap();
1303+
1304+
let (sender_1, receiver_1) = std::sync::mpsc::sync_channel(1);
1305+
let event_handler_1 = move |event: Event| match event {
1306+
Event::ChannelReady { .. } => {},
1307+
Event::PendingHTLCsForwardable { .. } => {},
1308+
Event::PaymentClaimable { .. } => sender_1.send(event).unwrap(),
1309+
Event::PaymentClaimed { .. } => {},
1310+
_ => panic!("Unexpected event: {:?}", event),
1311+
};
1312+
let data_dir_1 = nodes[1].persister.get_data_dir();
1313+
let persister_1 = Arc::new(Persister::new(data_dir_1.clone()));
1314+
let _bg_processor_1 = BackgroundProcessor::start(persister_1, event_handler_1, nodes[1].chain_monitor.clone(), nodes[1].node.clone(), nodes[1].no_gossip_sync(), nodes[1].peer_manager.clone(), nodes[1].logger.clone(), Some(nodes[1].scorer.clone()));
1315+
1316+
nodes[0].node.send_payment_with_retry(payment_hash, &Some(payment_secret), PaymentId(payment_hash.0), route_params, ChannelManagerRetry::Attempts(0)).unwrap();
1317+
1318+
let mut msg_events = nodes[0].node.get_and_clear_pending_msg_events();
1319+
1320+
assert_eq!(msg_events.len(), 1);
1321+
let payment_event = SendEvent::from_event(msg_events.remove(0));
1322+
1323+
nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &payment_event.msgs[0]);
1324+
nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &payment_event.commitment_msg);
1325+
let (bs_first_raa, bs_first_cs) = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id());
1326+
1327+
nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_first_raa);
1328+
nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &bs_first_cs);
1329+
let as_first_raa = get_event_msg!(nodes[0], MessageSendEvent::SendRevokeAndACK, nodes[1].node.get_our_node_id());
1330+
1331+
nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &as_first_raa);
1332+
nodes[1].node.process_pending_htlc_forwards();
1333+
1334+
let event = receiver_1
1335+
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1336+
.expect("Events not handled within deadline");
1337+
match event {
1338+
Event::PaymentClaimable { purpose: PaymentPurpose::InvoicePayment { payment_preimage: Some(payment_preimage), .. }, .. } => nodes[1].node.claim_funds(payment_preimage),
1339+
Event::ChannelReady { .. } => {},
1340+
_ => panic!("Unexpected event: {:?}", event),
1341+
}
1342+
1343+
nodes[1].node.process_pending_htlc_forwards();
1344+
let update_1 = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
1345+
assert!(update_1.update_fulfill_htlcs.len() == 1);
1346+
let fulfill_msg = update_1.update_fulfill_htlcs[0].clone();
1347+
1348+
nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &fulfill_msg);
1349+
nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &update_1.commitment_signed);
1350+
let (as_first_raa, as_first_cs) = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id());
1351+
1352+
nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &as_first_raa);
1353+
nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &as_first_cs);
1354+
let bs_first_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id());
1355+
1356+
// Ensure that nodes[0] has no capacity data for its channel before the PaymentPathSuccess event
1357+
// is processed.
1358+
assert_eq!(nodes[0].scorer.lock().unwrap().estimated_channel_liquidity_range(scid, &NodeId::from_pubkey(&nodes[0].node.get_our_node_id())), None);
1359+
1360+
nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_first_raa);
1361+
1362+
let event = receiver_0
1363+
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1364+
.expect("Events not handled within deadline");
1365+
match event {
1366+
Event::PaymentPathSuccessful { .. } => {
1367+
assert_eq!(nodes[0].scorer.lock().unwrap().estimated_channel_liquidity_range(scid, &NodeId::from_pubkey(&nodes[0].node.get_our_node_id())), Some((40000, 10000000)));
1368+
},
1369+
_ => panic!("Unexpected event: {:?}", event),
1370+
}
1371+
}
11741372
}

0 commit comments

Comments
 (0)