Skip to content

Commit 26f3ce3

Browse files
committed
Read persisted event queue state in LiquidityManager::new
We read any previously-persisted state upon construction of `LiquidityManager`.
1 parent a6ac8d3 commit 26f3ce3

File tree

6 files changed

+49
-8
lines changed

6 files changed

+49
-8
lines changed

lightning-liquidity/src/events/event_queue.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ pub(crate) struct EventQueue {
3030
}
3131

3232
impl EventQueue {
33-
pub fn new(kv_store: Arc<dyn KVStore + Send + Sync>) -> Self {
34-
let queue = Arc::new(Mutex::new(VecDeque::new()));
33+
pub fn new(queue: VecDeque<LiquidityEvent>, kv_store: Arc<dyn KVStore + Send + Sync>) -> Self {
34+
let queue = Arc::new(Mutex::new(queue));
3535
let waker = Arc::new(Mutex::new(None));
3636
Self {
3737
queue,
@@ -193,7 +193,7 @@ mod tests {
193193
use std::time::Duration;
194194

195195
let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
196-
let event_queue = Arc::new(EventQueue::new(kv_store));
196+
let event_queue = Arc::new(EventQueue::new(VecDeque::new(), kv_store));
197197
assert_eq!(event_queue.next_event(), None);
198198

199199
let secp_ctx = Secp256k1::new();

lightning-liquidity/src/events/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
1818
mod event_queue;
1919

20-
pub(crate) use event_queue::EventQueue;
2120
pub use event_queue::MAX_EVENT_QUEUE_SIZE;
21+
pub(crate) use event_queue::{EventQueue, EventQueueDeserWrapper};
2222

2323
use crate::lsps0;
2424
use crate::lsps1;

lightning-liquidity/src/lsps0/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ where
113113

114114
#[cfg(test)]
115115
mod tests {
116+
use alloc::collections::VecDeque;
116117
use alloc::string::ToString;
117118
use alloc::sync::Arc;
118119

@@ -129,7 +130,7 @@ mod tests {
129130
let pending_messages = Arc::new(MessageQueue::new());
130131
let entropy_source = Arc::new(TestEntropy {});
131132
let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
132-
let event_queue = Arc::new(EventQueue::new(kv_store));
133+
let event_queue = Arc::new(EventQueue::new(VecDeque::new(), kv_store));
133134

134135
let lsps0_handler = Arc::new(LSPS0ClientHandler::new(
135136
entropy_source,

lightning-liquidity/src/lsps5/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ mod tests {
471471
let message_queue = Arc::new(MessageQueue::new());
472472

473473
let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
474-
let event_queue = Arc::new(EventQueue::new(kv_store));
474+
let event_queue = Arc::new(EventQueue::new(VecDeque::new(), kv_store));
475475
let client = LSPS5ClientHandler::new(
476476
test_entropy_source,
477477
Arc::clone(&message_queue),

lightning-liquidity/src/manager.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use crate::lsps5::client::{LSPS5ClientConfig, LSPS5ClientHandler};
2424
use crate::lsps5::msgs::LSPS5Message;
2525
use crate::lsps5::service::{LSPS5ServiceConfig, LSPS5ServiceHandler};
2626
use crate::message_queue::MessageQueue;
27-
use crate::persist::{read_lsps2_service_peer_states, read_lsps5_service_peer_states};
27+
use crate::persist::{
28+
read_event_queue, read_lsps2_service_peer_states, read_lsps5_service_peer_states,
29+
};
2830

2931
use crate::lsps1::client::{LSPS1ClientConfig, LSPS1ClientHandler};
3032
use crate::lsps1::msgs::LSPS1Message;
@@ -337,7 +339,9 @@ where
337339
client_config: Option<LiquidityClientConfig>, time_provider: TP,
338340
) -> Result<Self, lightning::io::Error> {
339341
let pending_messages = Arc::new(MessageQueue::new());
340-
let pending_events = Arc::new(EventQueue::new(Arc::clone(&kv_store)));
342+
343+
let persisted_queue = read_event_queue(Arc::clone(&kv_store)).await?.unwrap_or_default();
344+
let pending_events = Arc::new(EventQueue::new(persisted_queue, Arc::clone(&kv_store)));
341345
let ignored_peers = RwLock::new(new_hash_set());
342346

343347
let mut supported_protocols = Vec::new();

lightning-liquidity/src/persist.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
//! Types and utils for persistence.
1111
12+
use crate::events::{EventQueueDeserWrapper, LiquidityEvent};
1213
use crate::lsps2::service::PeerState as LSPS2ServicePeerState;
1314
use crate::lsps5::service::PeerState as LSPS5ServicePeerState;
1415

@@ -20,6 +21,7 @@ use bitcoin::secp256k1::PublicKey;
2021

2122
use core::ops::Deref;
2223
use core::str::FromStr;
24+
use std::collections::VecDeque;
2325

2426
/// The primary namespace under which the [`LiquidityManager`] will be persisted.
2527
///
@@ -46,6 +48,40 @@ pub const LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE: &str = "lsps2_service";
4648
/// [`LSPS5ServiceHandler`]: crate::lsps5::service::LSPS5ServiceHandler
4749
pub const LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE: &str = "lsps5_service";
4850

51+
pub(crate) async fn read_event_queue<K: Deref>(
52+
kv_store: K,
53+
) -> Result<Option<VecDeque<LiquidityEvent>>, lightning::io::Error>
54+
where
55+
K::Target: KVStore,
56+
{
57+
let read_fut = kv_store.read(
58+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
59+
LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
60+
LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_KEY,
61+
);
62+
63+
let mut reader = match read_fut.await {
64+
Ok(r) => Cursor::new(r),
65+
Err(e) => {
66+
if e.kind() == lightning::io::ErrorKind::NotFound {
67+
// Key wasn't found, no error but first time running.
68+
return Ok(None);
69+
} else {
70+
return Err(e);
71+
}
72+
},
73+
};
74+
75+
let queue: EventQueueDeserWrapper = Readable::read(&mut reader).map_err(|_| {
76+
lightning::io::Error::new(
77+
lightning::io::ErrorKind::InvalidData,
78+
"Failed to deserialize liquidity event queue",
79+
)
80+
})?;
81+
82+
Ok(Some(queue.0))
83+
}
84+
4985
pub(crate) async fn read_lsps2_service_peer_states<K: Deref>(
5086
kv_store: K,
5187
) -> Result<Vec<(PublicKey, LSPS2ServicePeerState)>, lightning::io::Error>

0 commit comments

Comments
 (0)