Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,11 @@ where
#[cfg(test)]
mod tests {
use lightning::impl_writeable_tlv_based;
use lightning::util::test_utils::{TestLogger, TestStore};
use lightning::util::test_utils::TestLogger;

use super::*;
use crate::hex_utils;
use crate::io::test_utils::InMemoryStore;

#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
struct TestObjectId {
Expand Down Expand Up @@ -234,7 +235,7 @@ mod tests {

#[test]
fn data_is_persisted() {
let store: Arc<DynStore> = Arc::new(TestStore::new(false));
let store: Arc<DynStore> = Arc::new(InMemoryStore::new());
let logger = Arc::new(TestLogger::new());
let primary_namespace = "datastore_test_primary".to_string();
let secondary_namespace = "datastore_test_secondary".to_string();
Expand Down
70 changes: 38 additions & 32 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use lightning::util::config::{
ChannelConfigOverrides, ChannelConfigUpdate, ChannelHandshakeConfigUpdate,
};
use lightning::util::errors::APIError;
use lightning::util::persist::KVStoreSync;
use lightning::util::persist::KVStore;
use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
use lightning_liquidity::lsps2::utils::compute_opening_fee;
use lightning_types::payment::{PaymentHash, PaymentPreimage};
Expand Down Expand Up @@ -301,12 +301,14 @@ where
Self { queue, waker, kv_store, logger }
}

pub(crate) fn add_event(&self, event: Event) -> Result<(), Error> {
{
pub(crate) async fn add_event(&self, event: Event) -> Result<(), Error> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking at what's left in EventQueue that is sync. Just next_event. Unrelated to this PR, but in every test where that method is used, the test is already async. So it could use the async version?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking at what's left in EventQueue that is sync. Just next_event. Unrelated to this PR, but in every test where that method is used, the test is already async. So it could use the async version?

But next_event doesn't do anything that would require async operation? It doesn't repersist or anything?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we set the example and use the async api of eventqueue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah maybe the naming isn't optimally clear. next_event_async waits, and next_event does not?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The former returns a future, the second one could be named try_next_event.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that would be an improvement indeed.

let data = {
let mut locked_queue = self.queue.lock().unwrap();
locked_queue.push_back(event);
self.persist_queue(&locked_queue)?;
}
EventQueueSerWrapper(&locked_queue).encode()
};

self.persist_queue(data).await?;

if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
Expand All @@ -323,28 +325,30 @@ where
EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await
}

pub(crate) fn event_handled(&self) -> Result<(), Error> {
{
pub(crate) async fn event_handled(&self) -> Result<(), Error> {
let data = {
let mut locked_queue = self.queue.lock().unwrap();
locked_queue.pop_front();
self.persist_queue(&locked_queue)?;
}
EventQueueSerWrapper(&locked_queue).encode()
};

self.persist_queue(data).await?;

if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
}
Ok(())
}

fn persist_queue(&self, locked_queue: &VecDeque<Event>) -> Result<(), Error> {
let data = EventQueueSerWrapper(locked_queue).encode();
KVStoreSync::write(
async fn persist_queue(&self, encoded_queue: Vec<u8>) -> Result<(), Error> {
KVStore::write(
&*self.kv_store,
EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
EVENT_QUEUE_PERSISTENCE_KEY,
data,
encoded_queue,
)
.await
.map_err(|e| {
log_error!(
self.logger,
Expand Down Expand Up @@ -694,7 +698,7 @@ where
claim_deadline,
custom_records,
};
match self.event_queue.add_event(event) {
match self.event_queue.add_event(event).await {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(
Expand Down Expand Up @@ -928,7 +932,7 @@ where
.map(|cf| cf.custom_tlvs().into_iter().map(|tlv| tlv.into()).collect())
.unwrap_or_default(),
};
match self.event_queue.add_event(event) {
match self.event_queue.add_event(event).await {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
Expand Down Expand Up @@ -988,7 +992,7 @@ where
fee_paid_msat,
};

match self.event_queue.add_event(event) {
match self.event_queue.add_event(event).await {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
Expand Down Expand Up @@ -1019,7 +1023,7 @@ where

let event =
Event::PaymentFailed { payment_id: Some(payment_id), payment_hash, reason };
match self.event_queue.add_event(event) {
match self.event_queue.add_event(event).await {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
Expand Down Expand Up @@ -1295,7 +1299,7 @@ where
claim_from_onchain_tx,
outbound_amount_forwarded_msat,
};
self.event_queue.add_event(event).map_err(|e| {
self.event_queue.add_event(event).await.map_err(|e| {
log_error!(self.logger, "Failed to push to event queue: {}", e);
ReplayEvent()
})?;
Expand All @@ -1322,7 +1326,7 @@ where
counterparty_node_id,
funding_txo,
};
match self.event_queue.add_event(event) {
match self.event_queue.add_event(event).await {
Ok(_) => {},
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
Expand Down Expand Up @@ -1383,7 +1387,7 @@ where
user_channel_id: UserChannelId(user_channel_id),
counterparty_node_id: Some(counterparty_node_id),
};
match self.event_queue.add_event(event) {
match self.event_queue.add_event(event).await {
Ok(_) => {},
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
Expand All @@ -1407,7 +1411,7 @@ where
reason: Some(reason),
};

match self.event_queue.add_event(event) {
match self.event_queue.add_event(event).await {
Ok(_) => {},
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
Expand Down Expand Up @@ -1605,13 +1609,14 @@ mod tests {
use std::sync::atomic::{AtomicU16, Ordering};
use std::time::Duration;

use lightning::util::test_utils::{TestLogger, TestStore};
use lightning::util::test_utils::TestLogger;

use super::*;
use crate::io::test_utils::InMemoryStore;

#[tokio::test]
async fn event_queue_persistence() {
let store: Arc<DynStore> = Arc::new(TestStore::new(false));
let store: Arc<DynStore> = Arc::new(InMemoryStore::new());
let logger = Arc::new(TestLogger::new());
let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger)));
assert_eq!(event_queue.next_event(), None);
Expand All @@ -1621,7 +1626,7 @@ mod tests {
user_channel_id: UserChannelId(2323),
counterparty_node_id: None,
};
event_queue.add_event(expected_event.clone()).unwrap();
event_queue.add_event(expected_event.clone()).await.unwrap();

// Check we get the expected event and that it is returned until we mark it handled.
for _ in 0..5 {
Expand All @@ -1630,24 +1635,25 @@ mod tests {
}

// Check we can read back what we persisted.
let persisted_bytes = KVStoreSync::read(
let persisted_bytes = KVStore::read(
&*store,
EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
EVENT_QUEUE_PERSISTENCE_KEY,
)
.await
.unwrap();
let deser_event_queue =
EventQueue::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap();
assert_eq!(deser_event_queue.next_event_async().await, expected_event);

event_queue.event_handled().unwrap();
event_queue.event_handled().await.unwrap();
assert_eq!(event_queue.next_event(), None);
}

#[tokio::test]
async fn event_queue_concurrency() {
let store: Arc<DynStore> = Arc::new(TestStore::new(false));
let store: Arc<DynStore> = Arc::new(InMemoryStore::new());
let logger = Arc::new(TestLogger::new());
let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger)));
assert_eq!(event_queue.next_event(), None);
Expand Down Expand Up @@ -1675,28 +1681,28 @@ mod tests {
let mut delayed_enqueue = false;

for _ in 0..25 {
event_queue.add_event(expected_event.clone()).unwrap();
event_queue.add_event(expected_event.clone()).await.unwrap();
enqueued_events.fetch_add(1, Ordering::SeqCst);
}

loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(10)), if !delayed_enqueue => {
event_queue.add_event(expected_event.clone()).unwrap();
event_queue.add_event(expected_event.clone()).await.unwrap();
enqueued_events.fetch_add(1, Ordering::SeqCst);
delayed_enqueue = true;
}
e = event_queue.next_event_async() => {
assert_eq!(e, expected_event);
event_queue.event_handled().unwrap();
event_queue.event_handled().await.unwrap();
received_events.fetch_add(1, Ordering::SeqCst);

event_queue.add_event(expected_event.clone()).unwrap();
event_queue.add_event(expected_event.clone()).await.unwrap();
enqueued_events.fetch_add(1, Ordering::SeqCst);
}
e = event_queue.next_event_async() => {
assert_eq!(e, expected_event);
event_queue.event_handled().unwrap();
event_queue.event_handled().await.unwrap();
received_events.fetch_add(1, Ordering::SeqCst);
}
}
Expand Down
Loading