forked from sigp/lighthouse
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Impl
oneshot_broadcast
for committee promises (sigp#3595)
## Issue Addressed NA ## Proposed Changes Fixes an issue introduced in sigp#3574 where I erroneously assumed that a `crossbeam_channel` multiple receiver queue was a *broadcast* queue. This is incorrect, each message will be received by *only one* receiver. The effect of this mistake is these logs: ``` Sep 20 06:56:17.001 INFO Synced slot: 4736079, block: 0xaa8a…180d, epoch: 148002, finalized_epoch: 148000, finalized_root: 0x2775…47f2, exec_hash: 0x2ca5…ffde (verified), peers: 6, service: slot_notifier Sep 20 06:56:23.237 ERRO Unable to validate attestation error: CommitteeCacheWait(RecvError), peer_id: 16Uiu2HAm2Jnnj8868tb7hCta1rmkXUf5YjqUH1YPj35DCwNyeEzs, type: "aggregated", slot: Slot(4736047), beacon_block_root: 0x88d318534b1010e0ebd79aed60b6b6da1d70357d72b271c01adf55c2b46206c1 ``` ## Additional Info NA
- Loading branch information
1 parent
788e230
commit 44228c4
Showing
8 changed files
with
218 additions
and
19 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
[package] | ||
name = "oneshot_broadcast" | ||
version = "0.1.0" | ||
edition = "2021" | ||
|
||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||
|
||
[dependencies] | ||
parking_lot = "0.12.0" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,188 @@ | ||
//! Provides a single-sender, multiple receiver one-shot channel where any message sent will be | ||
//! received by all senders. | ||
//! | ||
//! This implementation may not be blazingly fast but it should be simple enough to be reliable. | ||
use parking_lot::{Condvar, Mutex}; | ||
use std::sync::{Arc, Weak}; | ||
|
||
#[derive(Copy, Clone, Debug, PartialEq)] | ||
pub enum Error { | ||
SenderDropped, | ||
} | ||
|
||
enum Future<T> { | ||
/// The future is ready and the item may be consumed. | ||
Ready(T), | ||
/// Future is not ready. The contained `Weak` is a reference to the `Sender` that may be used to | ||
/// detect when the channel is disconnected. | ||
NotReady(Weak<()>), | ||
} | ||
|
||
struct MutexCondvar<T> { | ||
mutex: Mutex<Future<T>>, | ||
condvar: Condvar, | ||
} | ||
|
||
/// The sending pair of the `oneshot` channel. | ||
pub struct Sender<T>(Arc<MutexCondvar<T>>, Option<Arc<()>>); | ||
|
||
impl<T> Sender<T> { | ||
/// Send a message, consuming `self` and delivering the message to *all* receivers. | ||
pub fn send(self, item: T) { | ||
*self.0.mutex.lock() = Future::Ready(item); | ||
// Condvar notification will be handled by the `Drop` implementation. | ||
} | ||
} | ||
|
||
impl<T> Drop for Sender<T> { | ||
/// Drop the `Arc` and notify all receivers so they can't upgrade their `Weak`s and know that | ||
/// the sender has been dropped. | ||
fn drop(&mut self) { | ||
self.1 = None; | ||
self.0.condvar.notify_all(); | ||
} | ||
} | ||
|
||
/// The receiving pair of the `oneshot` channel. Always receives the message sent by the `Sender` | ||
/// (if any). | ||
#[derive(Clone)] | ||
pub struct Receiver<T: Clone>(Arc<MutexCondvar<T>>); | ||
|
||
impl<T: Clone> Receiver<T> { | ||
/// Check to see if there is a message to be read *without* blocking/waiting. | ||
/// | ||
/// ## Note | ||
/// | ||
/// This method will technically perform *some* blocking to access a `Mutex`. It is non-blocking | ||
/// in the sense that it won't block until a message is received (i.e., it may return `Ok(None)` | ||
/// if no message has been sent yet). | ||
pub fn try_recv(&self) -> Result<Option<T>, Error> { | ||
match &*self.0.mutex.lock() { | ||
Future::Ready(item) => Ok(Some(item.clone())), | ||
Future::NotReady(weak) if weak.upgrade().is_some() => Ok(None), | ||
Future::NotReady(_) => Err(Error::SenderDropped), | ||
} | ||
} | ||
|
||
/// Check to see if there is a message to be read whilst blocking/waiting until a message is | ||
/// sent or the `Sender` is dropped. | ||
pub fn recv(self) -> Result<T, Error> { | ||
let mut lock = self.0.mutex.lock(); | ||
loop { | ||
match &*lock { | ||
Future::Ready(item) => return Ok(item.clone()), | ||
Future::NotReady(weak) if weak.upgrade().is_some() => { | ||
self.0.condvar.wait(&mut lock) | ||
} | ||
Future::NotReady(_) => return Err(Error::SenderDropped), | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// A single-sender, multiple-receiver broadcast channel. | ||
/// | ||
/// The sender may send *only one* message which will be received by *all* receivers. | ||
pub fn oneshot<T: Clone>() -> (Sender<T>, Receiver<T>) { | ||
let sender_ref = Arc::new(()); | ||
let mutex_condvar = Arc::new(MutexCondvar { | ||
mutex: Mutex::new(Future::NotReady(Arc::downgrade(&sender_ref))), | ||
condvar: Condvar::new(), | ||
}); | ||
let receiver = Receiver(mutex_condvar.clone()); | ||
let sender = Sender(mutex_condvar, Some(sender_ref)); | ||
(sender, receiver) | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use std::thread; | ||
use std::time::Duration; | ||
|
||
#[test] | ||
fn single_thread_try_recv() { | ||
let (sender, receiver) = oneshot(); | ||
assert_eq!(receiver.try_recv(), Ok(None)); | ||
sender.send(42); | ||
assert_eq!(receiver.try_recv(), Ok(Some(42))); | ||
} | ||
|
||
#[test] | ||
fn single_thread_try_recv_no_message() { | ||
let (sender, receiver) = oneshot::<u8>(); | ||
assert_eq!(receiver.try_recv(), Ok(None)); | ||
drop(sender); | ||
assert_eq!(receiver.try_recv(), Err(Error::SenderDropped)); | ||
} | ||
|
||
#[test] | ||
fn single_thread_recv() { | ||
let (sender, receiver) = oneshot(); | ||
assert_eq!(receiver.try_recv(), Ok(None)); | ||
sender.send(42); | ||
assert_eq!(receiver.recv(), Ok(42)); | ||
} | ||
|
||
#[test] | ||
fn single_thread_recv_no_message() { | ||
let (sender, receiver) = oneshot::<u8>(); | ||
assert_eq!(receiver.try_recv(), Ok(None)); | ||
drop(sender); | ||
assert_eq!(receiver.recv(), Err(Error::SenderDropped)); | ||
} | ||
|
||
#[test] | ||
fn two_threads_message_sent() { | ||
let (sender, receiver) = oneshot(); | ||
|
||
let handle = thread::spawn(|| receiver.recv().unwrap()); | ||
|
||
sender.send(42); | ||
assert_eq!(handle.join().unwrap(), 42); | ||
} | ||
|
||
#[test] | ||
fn three_threads_message_set() { | ||
let (sender, receiver) = oneshot(); | ||
|
||
let receiver_a = receiver.clone(); | ||
let handle_a = thread::spawn(|| receiver_a.recv().unwrap()); | ||
let handle_b = thread::spawn(|| receiver.recv().unwrap()); | ||
|
||
sender.send(42); | ||
assert_eq!(handle_a.join().unwrap(), 42); | ||
assert_eq!(handle_b.join().unwrap(), 42); | ||
} | ||
|
||
#[test] | ||
fn three_threads_sender_dropped() { | ||
let (sender, receiver) = oneshot::<u8>(); | ||
|
||
let receiver_a = receiver.clone(); | ||
let handle_a = thread::spawn(|| receiver_a.recv()); | ||
let handle_b = thread::spawn(|| receiver.recv()); | ||
|
||
drop(sender); | ||
assert_eq!(handle_a.join().unwrap(), Err(Error::SenderDropped)); | ||
assert_eq!(handle_b.join().unwrap(), Err(Error::SenderDropped)); | ||
} | ||
|
||
#[test] | ||
fn sender_dropped_after_recv() { | ||
let (sender_a, receiver_a) = oneshot(); | ||
let (sender_b, receiver_b) = oneshot::<u8>(); | ||
|
||
let handle_0 = thread::spawn(|| { | ||
sender_a.send(1); | ||
receiver_b.recv() | ||
}); | ||
|
||
assert_eq!(receiver_a.recv(), Ok(1)); | ||
// This is a slightly hacky sleep that assumes that the thread has had enough time after | ||
// sending down `sender_a` to start listening to `receiver_b`. | ||
thread::sleep(Duration::from_secs(1)); | ||
drop(sender_b); | ||
assert_eq!(handle_0.join().unwrap(), Err(Error::SenderDropped)) | ||
} | ||
} |