Skip to content

Commit

Permalink
Handle fallible events in OnionMessenger
Browse files Browse the repository at this point in the history
Previously, we would just fire-and-forget in `OnionMessenger`'s event
handling. Since we now introduced the possibility of event handling
failures, we here adapt the event handling logic to retain any
events which we failed to handle to have them replayed upon the next
invocation of `process_pending_events`/`process_pending_events_async`.
  • Loading branch information
tnull committed Jul 3, 2024
1 parent 82d533a commit 7dc55e5
Showing 1 changed file with 84 additions and 34 deletions.
118 changes: 84 additions & 34 deletions lightning/src/onion_message/messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::util::ser::Writeable;

use core::fmt;
use core::ops::Deref;
use core::sync::atomic::{AtomicBool, Ordering};
use crate::io;
use crate::sync::Mutex;
use crate::prelude::*;
Expand Down Expand Up @@ -262,6 +263,7 @@ pub struct OnionMessenger<
intercept_messages_for_offline_peers: bool,
pending_intercepted_msgs_events: Mutex<Vec<Event>>,
pending_peer_connected_events: Mutex<Vec<Event>>,
pending_events_processor: AtomicBool,
}

/// [`OnionMessage`]s buffered to be sent.
Expand Down Expand Up @@ -1004,6 +1006,28 @@ where
}
}

macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset: expr, $event_queue: expr) => {
// We want ot make sure to cleanly abort upon event handling failure. To this end, we drop all
// successfully handled events from the given queue, reset the events processing flag, and
// return, to have the events eventually replayed upon next invocation.
{
let mut queue_lock = $event_queue.lock().unwrap();

// We skip `$offset` result entries to reach the ones relevant for the given `$event_queue`.
let mut res_iter = $res.iter().skip($offset);

// Keep all events which previously error'd *or* any that have been added since we dropped
// the Mutex before.
queue_lock.retain(|_| res_iter.next().map_or(true, |r| r.is_err()));

if $res.iter().any(|r| r.is_err()) {
// We failed handling some events. Return to have them eventually replayed.
$self.pending_events_processor.store(false, Ordering::Release);
return;
}
}
}}

impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, CMH: Deref>
OnionMessenger<ES, NS, L, NL, MR, OMH, APH, CMH>
where
Expand Down Expand Up @@ -1080,6 +1104,7 @@ where
intercept_messages_for_offline_peers,
pending_intercepted_msgs_events: Mutex::new(Vec::new()),
pending_peer_connected_events: Mutex::new(Vec::new()),
pending_events_processor: AtomicBool::new(false),
}
}

Expand Down Expand Up @@ -1320,42 +1345,57 @@ where
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>> + core::marker::Unpin, H: Fn(Event) -> Future>(
&self, handler: H
) {
let mut intercepted_msgs = Vec::new();
let mut peer_connecteds = Vec::new();
{
let mut pending_intercepted_msgs_events =
self.pending_intercepted_msgs_events.lock().unwrap();
let mut pending_peer_connected_events =
self.pending_peer_connected_events.lock().unwrap();
core::mem::swap(&mut *pending_intercepted_msgs_events, &mut intercepted_msgs);
core::mem::swap(&mut *pending_peer_connected_events, &mut peer_connecteds);
if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
return;
}

let mut futures = Vec::with_capacity(intercepted_msgs.len());
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
if let Some(addresses) = addresses.take() {
futures.push(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }));
{
let intercepted_msgs = self.pending_intercepted_msgs_events.lock().unwrap().clone();
let mut futures = Vec::with_capacity(intercepted_msgs.len());
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
if let Some(addresses) = addresses.take() {
futures.push(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }));
}
}
}
}

for ev in intercepted_msgs {
if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
futures.push(handler(ev));
// The offset in the `futures` vec at which `intercepted_msgs` start. We don't bother
// replaying `ConnectionNeeded` events.
let intercepted_msgs_offset = futures.len();

for ev in intercepted_msgs {
if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
futures.push(handler(ev));
}
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
let res = crate::util::async_poll::MultiEventFuturePoller::new(futures).await;
drop_handled_events_and_abort!(self, res, intercepted_msgs_offset, self.pending_intercepted_msgs_events);
}
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
crate::util::async_poll::MultiEventFuturePoller::new(futures).await;

if peer_connecteds.len() <= 1 {
for event in peer_connecteds { handler(event).await; }
} else {
let mut futures = Vec::new();
for event in peer_connecteds {
futures.push(handler(event));
{
let peer_connecteds = self.pending_peer_connected_events.lock().unwrap().clone();
let num_peer_connecteds = peer_connecteds.len();
if num_peer_connecteds <= 1 {
for event in peer_connecteds {
if handler(event).await.is_ok() {
self.pending_peer_connected_events.lock().unwrap().drain(..num_peer_connecteds);
} else {
// We failed handling the event. Return to have it eventually replayed.
self.pending_events_processor.store(false, Ordering::Release);
return;
}
}
} else {
let mut futures = Vec::new();
for event in peer_connecteds {
futures.push(handler(event));
}
let res = crate::util::async_poll::MultiEventFuturePoller::new(futures).await;
drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
}
crate::util::async_poll::MultiEventFuturePoller::new(futures).await;
}
self.pending_events_processor.store(false, Ordering::Release);
}
}

Expand Down Expand Up @@ -1395,17 +1435,24 @@ where
CMH::Target: CustomOnionMessageHandler,
{
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
return;
}

for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
if let Some(addresses) = addresses.take() {
let _ = handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses });
}
}
}
let mut events = Vec::new();
let intercepted_msgs;
let peer_connecteds;
{
let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
let pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
intercepted_msgs = pending_intercepted_msgs_events.clone();
let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
peer_connecteds = pending_peer_connected_events.clone();
#[cfg(debug_assertions)] {
for ev in pending_intercepted_msgs_events.iter() {
if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
Expand All @@ -1414,13 +1461,16 @@ where
if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
}
}
core::mem::swap(&mut *pending_intercepted_msgs_events, &mut events);
events.append(&mut pending_peer_connected_events);
pending_peer_connected_events.shrink_to(10); // Limit total heap usage
}
for ev in events {
handler.handle_event(ev);
}

let res = intercepted_msgs.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
drop_handled_events_and_abort!(self, res, 0, self.pending_intercepted_msgs_events);

let res = peer_connecteds.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);

self.pending_events_processor.store(false, Ordering::Release);
}
}

Expand Down

0 comments on commit 7dc55e5

Please sign in to comment.