Skip to content

Commit

Permalink
f Use return non-Arc'ed Event afterall
Browse files Browse the repository at this point in the history
  • Loading branch information
tnull committed Dec 13, 2022
1 parent 417eb55 commit 3b6ded0
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ pub(crate) struct EventQueue<K: Deref>
where
K::Target: KVStorePersister,
{
queue: Mutex<VecDeque<Arc<Event>>>,
queue: Mutex<VecDeque<Event>>,
notifier: Condvar,
persister: K,
}
Expand All @@ -146,26 +146,26 @@ where
K::Target: KVStorePersister,
{
pub(crate) fn new(persister: K) -> Self {
let queue: Mutex<VecDeque<Arc<Event>>> = Mutex::new(VecDeque::new());
let queue: Mutex<VecDeque<Event>> = Mutex::new(VecDeque::new());
let notifier = Condvar::new();
Self { queue, notifier, persister }
}

pub(crate) fn add_event(&self, event: Event) -> Result<(), Error> {
{
let mut locked_queue = self.queue.lock().unwrap();
locked_queue.push_back(Arc::new(event));
locked_queue.push_back(event);
self.persist_queue(&*locked_queue)?;
}

self.notifier.notify_one();
Ok(())
}

pub(crate) fn next_event(&self) -> Arc<Event> {
pub(crate) fn next_event(&self) -> Event {
let locked_queue =
self.notifier.wait_while(self.queue.lock().unwrap(), |queue| queue.is_empty()).unwrap();
Arc::clone(&locked_queue.front().unwrap())
locked_queue.front().unwrap().clone()
}

pub(crate) fn event_handled(&self) -> Result<(), Error> {
Expand All @@ -178,7 +178,7 @@ where
Ok(())
}

fn persist_queue(&self, locked_queue: &VecDeque<Arc<Event>>) -> Result<(), Error> {
fn persist_queue(&self, locked_queue: &VecDeque<Event>) -> Result<(), Error> {
self.persister
.persist(EVENTS_PERSISTENCE_KEY, &EventQueueSerWrapper(locked_queue))
.map_err(|_| Error::PersistenceFailed)?;
Expand All @@ -195,13 +195,13 @@ where
reader: &mut R, persister: K,
) -> Result<Self, lightning::ln::msgs::DecodeError> {
let read_queue: EventQueueDeserWrapper = Readable::read(reader)?;
let queue: Mutex<VecDeque<Arc<Event>>> = Mutex::new(read_queue.0);
let queue: Mutex<VecDeque<Event>> = Mutex::new(read_queue.0);
let notifier = Condvar::new();
Ok(Self { queue, notifier, persister })
}
}

struct EventQueueDeserWrapper(VecDeque<Arc<Event>>);
struct EventQueueDeserWrapper(VecDeque<Event>);

impl Readable for EventQueueDeserWrapper {
fn read<R: lightning::io::Read>(
Expand All @@ -210,13 +210,13 @@ impl Readable for EventQueueDeserWrapper {
let len: u16 = Readable::read(reader)?;
let mut queue = VecDeque::with_capacity(len as usize);
for _ in 0..len {
queue.push_back(Arc::new(Readable::read(reader)?));
queue.push_back(Readable::read(reader)?);
}
Ok(Self(queue))
}
}

struct EventQueueSerWrapper<'a>(&'a VecDeque<Arc<Event>>);
struct EventQueueSerWrapper<'a>(&'a VecDeque<Event>);

impl Writeable for EventQueueSerWrapper<'_> {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), lightning::io::Error> {
Expand Down Expand Up @@ -594,7 +594,7 @@ mod tests {

// Check we get the expected event and that it is returned until we mark it handled.
for _ in 0..5 {
assert_eq!(*event_queue.next_event(), expected_event);
assert_eq!(event_queue.next_event(), expected_event);
assert_eq!(false, test_persister.get_and_clear_pending_persist());
}

Expand Down

0 comments on commit 3b6ded0

Please sign in to comment.