Skip to content

Commit

Permalink
WIP: parallelisation first version that compiles.
Browse files Browse the repository at this point in the history
TODO: improvements and simplifications. getting rid of RwLocks noone needs.
DMDcoin#76
  • Loading branch information
SurfingNerd committed Mar 23, 2022
1 parent dced45b commit 4eb3593
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 19 deletions.
19 changes: 10 additions & 9 deletions crates/ethcore/src/engines/hbbft/hbbft_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use engines::hbbft::{
hbbft_message_memorium::HbbftMessageMemorium,
};
use std::{ops::Deref, sync::atomic::Ordering};
use engines::hbbft::hbbft_message_memorium::HbbftMessageDispatcher;

type TargetedMessage = hbbft::TargetedMessage<Message, NodeId>;

Expand All @@ -70,7 +71,7 @@ pub struct HoneyBadgerBFT {
signer: Arc<RwLock<Option<Box<dyn EngineSigner>>>>,
machine: EthereumMachine,
hbbft_state: RwLock<HbbftState>,
hbbft_message_memorial: RwLock<HbbftMessageMemorium>,
hbbft_message_dispatcher: RwLock<HbbftMessageDispatcher>,
sealing: RwLock<BTreeMap<BlockNumber, Sealing>>,
params: HbbftParams,
message_counter: RwLock<usize>,
Expand Down Expand Up @@ -210,7 +211,7 @@ impl HoneyBadgerBFT {
signer: Arc::new(RwLock::new(None)),
machine,
hbbft_state: RwLock::new(HbbftState::new()),
hbbft_message_memorial: RwLock::new(HbbftMessageMemorium::new()),
hbbft_message_dispatcher: RwLock::new(HbbftMessageDispatcher::new()),
sealing: RwLock::new(BTreeMap::new()),
params,
message_counter: RwLock::new(0),
Expand Down Expand Up @@ -332,8 +333,8 @@ impl HoneyBadgerBFT {
trace!(target: "consensus", "Received message of idx {} {:?} from {}", msg_idx, message, sender_id);

// store received messages here.
self.hbbft_message_memorial
.write()
self.hbbft_message_dispatcher
.write()
.on_message_received(&message);

let step = self.hbbft_state.write().process_message(
Expand All @@ -357,9 +358,9 @@ impl HoneyBadgerBFT {
block_num: BlockNumber,
) -> Result<(), EngineError> {
// store received messages here.
self.hbbft_message_memorial
.write()
.on_sealing_message_received(&message, block_num);
// self.hbbft_message_memorial
// .write()
// .on_sealing_message_received(&message, block_num);

let client = self.client_arc().ok_or(EngineError::RequiresClient)?;
trace!(target: "consensus", "Received sealing message for block {} from {} : {:?} ",block_num, sender_id, message);
Expand Down Expand Up @@ -966,8 +967,8 @@ impl Engine<EthereumMachine> for HoneyBadgerBFT {
}
}

self.hbbft_message_memorial
.write()
self.hbbft_message_dispatcher
.write()
.free_memory(block.header.number());

Ok(())
Expand Down
85 changes: 75 additions & 10 deletions crates/ethcore/src/engines/hbbft/hbbft_message_memorium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::{
io::Write,
path::PathBuf,
};
use std::thread::Thread;

pub type HbMessage = honey_badger::Message<NodeId>;

Expand Down Expand Up @@ -49,12 +50,72 @@ pub(crate) struct HbbftMessageMemorium {

last_block_deleted_from_disk: u64,

//mutex: Mutex<u64>
dispatched_messages: RwLock<VecDeque<HbMessage>>,
dispatched_messages: RwLock<VecDeque<HbMessage>>,
//thread: Option<std::thread::JoinHandle<Self>>,

//sender: std::sync::mpsc::Sender<HbMessage>,
//receiver: std::sync::mpsc::Receiver<HbMessage>,

//memorial: std::sync::Arc<RwLock<HbbftMessageMemorium>>

}

pub(crate) struct HbbftMessageDispatcher {
// dispatched_messages: RwLock<VecDeque<HbMessage>>,
thread: Option<std::thread::JoinHandle<Self>>,
memorial: std::sync::Arc<RwLock<HbbftMessageMemorium>>
}


impl HbbftMessageDispatcher {
pub fn new() -> Self {
HbbftMessageDispatcher {

thread: None,
memorial: std::sync::Arc::new(RwLock::new(HbbftMessageMemorium::new()))
}
}

pub fn on_message_received(&mut self , message: &HbMessage) {
//performance: dispatcher pattern + multithreading could improve performance a lot.

let mut memorial = self.memorial.write();

let mut lock = memorial.dispatched_messages.write();
lock.push_back(message.clone());
//self.sender.send(message.clone());

if self.thread.is_none() {
// let mut memo = self;
// let mut arc = std::sync::Arc::new(&self);
let arc_clone = self.memorial.clone();
self.thread = Some(std::thread::spawn(move || {
loop {

let mut work_result = false;
{
let mut memorial = arc_clone.write();
work_result = memorial.work_message();
}

if !work_result {
std::thread::sleep(std::time::Duration::from_millis(250));
}
}
}));
}
}

pub fn free_memory(&mut self, _current_block: u64) {
// TODO: make memorium freeing memory of ancient block.
}
}



impl HbbftMessageMemorium {
pub fn new() -> Self {

HbbftMessageMemorium {
// signature_shares: BTreeMap::new(),
// decryption_shares: BTreeMap::new(),
Expand All @@ -63,6 +124,9 @@ impl HbbftMessageMemorium {
config_blocks_to_keep_on_disk: 200,
last_block_deleted_from_disk: 0,
dispatched_messages: RwLock::new(VecDeque::new()),
// thread: None,
// sender,
// receiver
}
}

Expand Down Expand Up @@ -136,21 +200,16 @@ impl HbbftMessageMemorium {
}
}

pub fn on_message_received(&mut self, message: &HbMessage) {
//performance: dispatcher pattern + multithreading could improve performance a lot.

let mut lock = self.dispatched_messages.write();
lock.push_back(message.clone());
}
fn work_message(&mut self ) -> bool {

pub fn work_message(&mut self) {
let mut message_option: Option<HbMessage> = None;

{
//scope it for short living.
let mut lock = self.dispatched_messages.write();
let mut lock = self.dispatched_messages.write();
message_option = lock.pop_front();
lock.len();
//lock.len();
}

if let Some(message) = message_option {
Expand All @@ -161,11 +220,17 @@ impl HbbftMessageMemorium {
self.on_message_string_received(json_string, epoch);
}
Err(e) => {
// being unable to interprete a message, could result in consequences
// not being able to report missbehavior,
// or reporting missbehavior, where there was not a missbehavior.
error!(target: "consensus", "could not create json: {:?}", e);
}
}
return true;
}

return false;

// let content = message.content();

//match content {
Expand Down

0 comments on commit 4eb3593

Please sign in to comment.