From ac0f1c61035aecef4b49ecb4c6b7c4e8d2fbae2c Mon Sep 17 00:00:00 2001 From: sydhds Date: Wed, 29 Mar 2023 19:24:45 +0200 Subject: [PATCH] Add cleanup in DenunciationPool && DenunciationFactory --- Cargo.lock | 1 + massa-factory-exports/src/config.rs | 6 ++ .../src/test_exports/config.rs | 2 + .../src/denunciation_factory.rs | 78 +++++++++++------- massa-factory-worker/src/tests/scenarios.rs | 15 ++-- massa-factory-worker/src/tests/tools.rs | 17 +++- massa-models/src/config/constants.rs | 7 ++ massa-models/src/denunciation.rs | 34 ++++++++ massa-node/src/main.rs | 8 +- massa-pool-exports/Cargo.toml | 4 +- massa-pool-exports/src/config.rs | 9 +++ massa-pool-exports/src/controller_traits.rs | 3 + massa-pool-exports/src/test_exports/config.rs | 9 ++- massa-pool-exports/src/test_exports/mock.rs | 37 ++++++--- massa-pool-worker/Cargo.toml | 1 + massa-pool-worker/src/controller_impl.rs | 46 ++++++++++- massa-pool-worker/src/denunciation_pool.rs | 79 ++++++++++++++++--- massa-pool-worker/src/worker.rs | 54 ++++++++++++- massa-storage/src/denunciation_indexes.rs | 1 + 19 files changed, 345 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3864af05c07..6be01072e85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2828,6 +2828,7 @@ dependencies = [ "massa_pool_exports", "massa_signature", "massa_storage", + "massa_time", "num", "parking_lot", "tokio", diff --git a/massa-factory-exports/src/config.rs b/massa-factory-exports/src/config.rs index d1a0ecb769b..c3aff957212 100644 --- a/massa-factory-exports/src/config.rs +++ b/massa-factory-exports/src/config.rs @@ -27,4 +27,10 @@ pub struct FactoryConfig { /// maximum number of operation ids in block pub max_operations_per_block: u32, + + /// cycle duration in periods + pub periods_per_cycle: u64, + + /// denunciation expiration as cycle delta + pub denunciation_expire_cycle_delta: u64, } diff --git a/massa-factory-exports/src/test_exports/config.rs b/massa-factory-exports/src/test_exports/config.rs index a1d9ff34549..1ad558f10e8 100644 --- a/massa-factory-exports/src/test_exports/config.rs +++ b/massa-factory-exports/src/test_exports/config.rs @@ -14,6 +14,8 @@ impl Default for FactoryConfig { max_block_size: MAX_BLOCK_SIZE as u64, max_block_gas: MAX_GAS_PER_BLOCK, max_operations_per_block: MAX_OPERATIONS_PER_BLOCK, + periods_per_cycle: PERIODS_PER_CYCLE, + denunciation_expire_cycle_delta: DENUNCIATION_EXPIRE_CYCLE_DELTA, } } } diff --git a/massa-factory-worker/src/denunciation_factory.rs b/massa-factory-worker/src/denunciation_factory.rs index 6b7d8ff4053..573ad36d472 100644 --- a/massa-factory-worker/src/denunciation_factory.rs +++ b/massa-factory-worker/src/denunciation_factory.rs @@ -1,5 +1,5 @@ use std::collections::hash_map::Entry; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::thread; use crossbeam_channel::{select, Receiver}; @@ -10,6 +10,8 @@ use massa_models::block_header::SecuredHeader; use massa_models::denunciation::Denunciation; use massa_models::endorsement::SecureShareEndorsement; use massa_models::slot::Slot; +use massa_models::timeslots::get_closest_slot_to_timestamp; +use massa_time::MassaTime; // TODO: rework these values const DENUNCIATION_FACTORY_ENDORSEMENT_CACHE_MAX_LEN: usize = 4096; @@ -29,14 +31,12 @@ pub(crate) struct DenunciationFactoryWorker { /// Internal storage for endorsement - /// store at max 1 endorsement per entry, as soon as we have 2 we produce a Denunciation endorsements_by_slot_index: HashMap<(Slot, u32), Vec>, - /// Cache to avoid processing several time the same endorsement denunciation - seen_endorsement_denunciation: HashSet<(Slot, u32)>, + // internal for block header (or secured header) /// Internal storage for endorsement - /// store at max 1 endorsement per entry, as soon as we have 2 we produce a Denunciation + // FIXME: Store per 'Slot' then per 'PubKey' block_header_by_slot: HashMap>, - /// Cache to avoid processing several time the same endorsement denunciation - seen_block_header_denunciation: HashSet, } impl DenunciationFactoryWorker { @@ -59,9 +59,7 @@ impl DenunciationFactoryWorker { consensus_receiver, endorsement_pool_receiver, endorsements_by_slot_index: Default::default(), - seen_endorsement_denunciation: Default::default(), block_header_by_slot: Default::default(), - seen_block_header_denunciation: Default::default(), }; factory.run(); }) @@ -71,22 +69,10 @@ impl DenunciationFactoryWorker { /// Process new secured header (~ block header) fn process_new_secured_header(&mut self, secured_header: SecuredHeader) { let key = secured_header.content.slot; - if self.seen_block_header_denunciation.contains(&key) { - warn!( - "Denunciation factory process a block header that have already been denounced: {}", - secured_header - ); - return; - } - // TODO: need 2 separate constants here? - if self.seen_block_header_denunciation.len() - > DENUNCIATION_FACTORY_BLOCK_HEADER_CACHE_MAX_LEN - || self.block_header_by_slot.len() > DENUNCIATION_FACTORY_BLOCK_HEADER_CACHE_MAX_LEN - { + if self.block_header_by_slot.len() > DENUNCIATION_FACTORY_BLOCK_HEADER_CACHE_MAX_LEN { warn!( - "Denunciation factory cannot process - cache full: {}, {}", - self.seen_block_header_denunciation.len(), + "Denunciation factory cannot process - cache full: {}", self.block_header_by_slot.len() ); return; @@ -130,6 +116,8 @@ impl DenunciationFactoryWorker { de_storage.store_denunciation(denunciation); self.channels.pool.add_denunciations(de_storage); } + + self.cleanup_cache(); } /// Process new secure share endorsement (~ endorsement) @@ -141,7 +129,7 @@ impl DenunciationFactoryWorker { secure_share_endorsement.content.slot, secure_share_endorsement.content.index, ); - if self.seen_endorsement_denunciation.contains(&key) { + if self.endorsements_by_slot_index.contains_key(&key) { warn!( "Denunciation factory process an endorsement that have already been denounced: {}", secure_share_endorsement @@ -149,13 +137,9 @@ impl DenunciationFactoryWorker { return; } - if self.seen_endorsement_denunciation.len() > DENUNCIATION_FACTORY_ENDORSEMENT_CACHE_MAX_LEN - || self.endorsements_by_slot_index.len() - > DENUNCIATION_FACTORY_ENDORSEMENT_CACHE_MAX_LEN - { + if self.endorsements_by_slot_index.len() > DENUNCIATION_FACTORY_ENDORSEMENT_CACHE_MAX_LEN { warn!( - "Denunciation factory cannot process - cache full: {}, {}", - self.seen_endorsement_denunciation.len(), + "Denunciation factory cannot process - cache full: {}", self.endorsements_by_slot_index.len() ); return; @@ -194,7 +178,45 @@ impl DenunciationFactoryWorker { "Created a new endorsement denunciation : {:?}", denunciation ); + + let mut de_storage = self.channels.storage.clone_without_refs(); + de_storage.store_denunciation(denunciation); + self.channels.pool.add_denunciations(de_storage); } + + self.cleanup_cache(); + } + + fn cleanup_cache(&mut self) { + let now = MassaTime::now().expect("could not get current time"); + + // get closest slot according to the current absolute time + let slot_now = get_closest_slot_to_timestamp( + self.cfg.thread_count, + self.cfg.t0, + self.cfg.genesis_timestamp, + now, + ); + + self.endorsements_by_slot_index.retain(|(slot, _index), _| { + Denunciation::is_expired( + slot, + &slot_now, + self.channels.pool.get_final_cs_periods(), + self.cfg.periods_per_cycle, + self.cfg.denunciation_expire_cycle_delta, + ) + }); + + self.block_header_by_slot.retain(|slot, _| { + Denunciation::is_expired( + slot, + &slot_now, + self.channels.pool.get_final_cs_periods(), + self.cfg.periods_per_cycle, + self.cfg.denunciation_expire_cycle_delta, + ) + }); } /// main run loop of the denunciation creator thread diff --git a/massa-factory-worker/src/tests/scenarios.rs b/massa-factory-worker/src/tests/scenarios.rs index 9135a0fe6ad..c5f0dd721ff 100644 --- a/massa-factory-worker/src/tests/scenarios.rs +++ b/massa-factory-worker/src/tests/scenarios.rs @@ -2,16 +2,18 @@ use super::TestFactory; use massa_hash::Hash; use massa_models::block_header::{BlockHeader, BlockHeaderSerializer, SecuredHeader}; use massa_models::block_id::BlockId; -use massa_models::config::THREAD_COUNT; +use massa_models::config::{T0, THREAD_COUNT}; use massa_models::denunciation::{Denunciation, DenunciationId}; use massa_models::endorsement::{Endorsement, EndorsementSerializerLW}; use massa_models::slot::Slot; +use massa_models::timeslots::get_closest_slot_to_timestamp; use massa_models::{ amount::Amount, operation::{Operation, OperationSerializer, OperationType}, secure_share::SecureShareContent, }; use massa_signature::KeyPair; +use massa_time::MassaTime; use std::str::FromStr; use std::time::Duration; @@ -74,11 +76,13 @@ fn basic_creation_with_multiple_operations() { /// Send 2 block headers and check if a Denunciation op is in storage #[test] -#[ignore] fn test_denunciation_factory_block_header_denunciation() { let keypair = KeyPair::generate(); - let slot = Slot::new(2, 1); + let now = MassaTime::now().expect("could not get current time"); + // get closest slot according to the current absolute time + let slot = get_closest_slot_to_timestamp(THREAD_COUNT, T0, now, now); + let parents: Vec = (0..THREAD_COUNT) .map(|i| BlockId(Hash::compute_from(&[i]))) .collect(); @@ -146,7 +150,8 @@ fn test_denunciation_factory_block_header_denunciation() { let de_indexes = test_factory.storage.read_denunciations(); assert_eq!(de_indexes.get(&denunciation_id), Some(&denunciation)); - drop(de_indexes); // release RwLockReadGuard - // stop everything + // release RwLockReadGuard + drop(de_indexes); + // stop everything drop(test_factory); } diff --git a/massa-factory-worker/src/tests/tools.rs b/massa-factory-worker/src/tests/tools.rs index 87fab3ed49b..9234d286c1c 100644 --- a/massa-factory-worker/src/tests/tools.rs +++ b/massa-factory-worker/src/tests/tools.rs @@ -37,10 +37,11 @@ use massa_wallet::test_exports::create_test_wallet; /// The factory will ask that to the the pool, consensus and factory and then will send the block to the consensus. /// You can use the method `new` to build all the mocks and make the connections /// Then you can use the method `get_next_created_block` that will manage the answers from the mock to the factory depending on the parameters you gave. +#[allow(dead_code)] pub struct TestFactory { consensus_event_receiver: ConsensusEventReceiver, pool_receiver: PoolEventReceiver, - selector_receiver: Receiver, + selector_receiver: Option>, factory_config: FactoryConfig, factory_manager: Box, genesis_blocks: Vec<(BlockId, u64)>, @@ -103,7 +104,7 @@ impl TestFactory { TestFactory { consensus_event_receiver, pool_receiver, - selector_receiver, + selector_receiver: Some(selector_receiver), factory_config, factory_manager, genesis_blocks, @@ -136,6 +137,8 @@ impl TestFactory { loop { match self .selector_receiver + .as_ref() + .unwrap() .recv_timeout(Duration::from_millis(100)) { Ok(MockSelectorControllerMessage::GetProducer { @@ -235,6 +238,16 @@ impl TestFactory { impl Drop for TestFactory { fn drop(&mut self) { + // Need this otherwise factory_manager is stuck while waiting for block & endorsement factory + // to join + // For instance, block factory is waiting for selector.get_producer(...) + // endorsement factory is waiting for selector.get_selection(...) + // Note: that this will make the 2 threads panic + // TODO: find a better way to resolve this + if let Some(selector_receiver) = self.selector_receiver.take() { + drop(selector_receiver); + } + self.factory_manager.stop(); } } diff --git a/massa-models/src/config/constants.rs b/massa-models/src/config/constants.rs index 905e6e47a25..fbd0ded622f 100644 --- a/massa-models/src/config/constants.rs +++ b/massa-models/src/config/constants.rs @@ -226,6 +226,13 @@ pub const NETWORK_NODE_EVENT_CHANNEL_SIZE: usize = 10_000; /// Threshold to accept a new versioning pub const VERSIONING_THRESHOLD_TRANSITION_ACCEPTED: Amount = Amount::from_mantissa_scale(75, 0); +// +// Constants for denunciation factory +// + +/// denunciation expiration delta (in cycle count) +pub const DENUNCIATION_EXPIRE_CYCLE_DELTA: u64 = 3; + // Some checks at compile time that should not be ignored! #[allow(clippy::assertions_on_constants)] const _: () = { diff --git a/massa-models/src/denunciation.rs b/massa-models/src/denunciation.rs index bee336ef663..8a7cf499f15 100644 --- a/massa-models/src/denunciation.rs +++ b/massa-models/src/denunciation.rs @@ -265,6 +265,40 @@ impl Denunciation { && public_key.verify_signature(&hash_1, &signature_1).is_ok() && public_key.verify_signature(&hash_2, &signature_2).is_ok()) } + + /// Get Denunciation slot ref + pub fn get_slot(&self) -> &Slot { + match self { + Denunciation::Endorsement(endo_de) => &endo_de.slot, + Denunciation::BlockHeader(blkh_de) => &blkh_de.slot, + } + } + + /// For a given slot (and given the slot at now()), check if it can be denounced + /// Can be used to check if block header | endorsement is not too old (at reception or too cleanup cache) + pub fn is_expired( + slot: &Slot, + slot_at_now: &Slot, + last_cs_final_periods: &[u64], + periods_per_cycle: u64, + denunciation_expire_cycle_delta: u64, + ) -> bool { + // Slot is final -> cannot be Denounced anymore, it's too late! + if slot.period <= last_cs_final_periods[slot.thread as usize] { + return true; + } + + // As we need to ensure that the Denounced has some 'Deferred credits' + // we will reject Denunciation older than 3 cycle compared to the current slot + let cycle = slot.get_cycle(periods_per_cycle); + let next_cycle = slot_at_now.get_cycle(periods_per_cycle); + + if (next_cycle - cycle) > denunciation_expire_cycle_delta { + return true; + } + + false + } } /// Create a new Denunciation from 2 SecureShareEndorsement diff --git a/massa-node/src/main.rs b/massa-node/src/main.rs index c42624773a9..d859a83f7d2 100644 --- a/massa-node/src/main.rs +++ b/massa-node/src/main.rs @@ -46,7 +46,7 @@ use massa_models::config::constants::{ POS_SAVED_CYCLES, PROTOCOL_CONTROLLER_CHANNEL_SIZE, PROTOCOL_EVENT_CHANNEL_SIZE, ROLL_PRICE, T0, THREAD_COUNT, VERSION, }; -use massa_models::config::CONSENSUS_BOOTSTRAP_PART_SIZE; +use massa_models::config::{CONSENSUS_BOOTSTRAP_PART_SIZE, DENUNCIATION_EXPIRE_CYCLE_DELTA}; use massa_models::endorsement::SecureShareEndorsement; use massa_network_exports::{Establisher, NetworkConfig, NetworkManager}; use massa_network_worker::start_network_controller; @@ -370,6 +370,10 @@ async fn launch( channels_size: POOL_CONTROLLER_CHANNEL_SIZE, broadcast_enabled: SETTINGS.api.enable_ws, broadcast_operations_capacity: SETTINGS.pool.broadcast_operations_capacity, + genesis_timestamp: *GENESIS_TIMESTAMP, + t0: T0, + periods_per_cycle: PERIODS_PER_CYCLE, + denunciation_expire_cycle_delta: DENUNCIATION_EXPIRE_CYCLE_DELTA, }; let pool_channels = PoolChannels { @@ -504,6 +508,8 @@ async fn launch( max_block_size: MAX_BLOCK_SIZE as u64, max_block_gas: MAX_GAS_PER_BLOCK, max_operations_per_block: MAX_OPERATIONS_PER_BLOCK, + periods_per_cycle: PERIODS_PER_CYCLE, + denunciation_expire_cycle_delta: DENUNCIATION_EXPIRE_CYCLE_DELTA, }; let factory_channels = FactoryChannels { selector: selector_controller.clone(), diff --git a/massa-pool-exports/Cargo.toml b/massa-pool-exports/Cargo.toml index a30134f2a5e..9fac2de0182 100644 --- a/massa-pool-exports/Cargo.toml +++ b/massa-pool-exports/Cargo.toml @@ -10,10 +10,10 @@ tokio = { version = "1.23", features = ["sync"] } # custom modules massa_models = { path = "../massa-models" } massa_storage = { path = "../massa-storage" } -massa_time = { path = "../massa-time", optional = true } +massa_time = { path = "../massa-time"} [dev-dependencies] # for more information on what are the following features used for, see the cargo.toml at workspace level [features] -testing = [ "dep:massa_time" ] +testing = [] diff --git a/massa-pool-exports/src/config.rs b/massa-pool-exports/src/config.rs index ff67911e9cb..1405f193773 100644 --- a/massa-pool-exports/src/config.rs +++ b/massa-pool-exports/src/config.rs @@ -1,6 +1,7 @@ //! Copyright (c) 2022 MASSA LABS use massa_models::amount::Amount; +use massa_time::MassaTime; use serde::{Deserialize, Serialize}; /// Pool configuration @@ -30,4 +31,12 @@ pub struct PoolConfig { pub broadcast_enabled: bool, /// operations sender(channel) capacity pub broadcast_operations_capacity: usize, + /// genesis timestamp + pub genesis_timestamp: MassaTime, + /// period duration + pub t0: MassaTime, + /// cycle duration in periods + pub periods_per_cycle: u64, + /// denunciation expiration as cycle delta + pub denunciation_expire_cycle_delta: u64, } diff --git a/massa-pool-exports/src/controller_traits.rs b/massa-pool-exports/src/controller_traits.rs index 85c03963464..de01df45fd1 100644 --- a/massa-pool-exports/src/controller_traits.rs +++ b/massa-pool-exports/src/controller_traits.rs @@ -44,6 +44,9 @@ pub trait PoolController: Send + Sync { /// Returns a boxed clone of self. /// Useful to allow cloning `Box`. fn clone_box(&self) -> Box; + + /// Get final cs periods (updated regularly from consensus) + fn get_final_cs_periods(&self) -> &Vec; } /// Allow cloning `Box` diff --git a/massa-pool-exports/src/test_exports/config.rs b/massa-pool-exports/src/test_exports/config.rs index 65e13b29b78..6f5bc524af6 100644 --- a/massa-pool-exports/src/test_exports/config.rs +++ b/massa-pool-exports/src/test_exports/config.rs @@ -1,8 +1,9 @@ // Copyright (c) 2022 MASSA LABS use massa_models::config::{ - ENDORSEMENT_COUNT, MAX_BLOCK_SIZE, MAX_GAS_PER_BLOCK, MAX_OPERATIONS_PER_BLOCK, - OPERATION_VALIDITY_PERIODS, ROLL_PRICE, THREAD_COUNT, + DENUNCIATION_EXPIRE_CYCLE_DELTA, ENDORSEMENT_COUNT, GENESIS_TIMESTAMP, MAX_BLOCK_SIZE, + MAX_GAS_PER_BLOCK, MAX_OPERATIONS_PER_BLOCK, OPERATION_VALIDITY_PERIODS, PERIODS_PER_CYCLE, + ROLL_PRICE, T0, THREAD_COUNT, }; use crate::PoolConfig; @@ -22,6 +23,10 @@ impl Default for PoolConfig { channels_size: 1024, broadcast_enabled: false, broadcast_operations_capacity: 5000, + genesis_timestamp: *GENESIS_TIMESTAMP, + t0: T0, + periods_per_cycle: PERIODS_PER_CYCLE, + denunciation_expire_cycle_delta: DENUNCIATION_EXPIRE_CYCLE_DELTA, } } } diff --git a/massa-pool-exports/src/test_exports/mock.rs b/massa-pool-exports/src/test_exports/mock.rs index c58d9716423..fdce7357a3e 100644 --- a/massa-pool-exports/src/test_exports/mock.rs +++ b/massa-pool-exports/src/test_exports/mock.rs @@ -5,6 +5,7 @@ use std::sync::{ Arc, Mutex, }; +use massa_models::config::THREAD_COUNT; use massa_models::{ block_id::BlockId, endorsement::EndorsementId, operation::OperationId, slot::Slot, }; @@ -95,7 +96,11 @@ pub enum MockPoolControllerMessage { /// For messages with a `response_tx` field, the mock will await a response through their `response_tx` channel /// in order to simulate returning this value at the end of the call. #[derive(Clone)] -pub struct MockPoolController(Arc>>); +// pub struct MockPoolController(Arc>>); +pub struct MockPoolController { + q: Arc>>, + last_final_cs_periods: Vec, +} impl MockPoolController { /// Create a new pair (mock execution controller, mpsc receiver for emitted messages) @@ -103,7 +108,10 @@ impl MockPoolController { pub fn new_with_receiver() -> (Box, PoolEventReceiver) { let (tx, rx) = mpsc::channel(); ( - Box::new(MockPoolController(Arc::new(Mutex::new(tx)))), + Box::new(MockPoolController { + q: Arc::new(Mutex::new(tx)), + last_final_cs_periods: vec![0u64; THREAD_COUNT as usize], + }), PoolEventReceiver(rx), ) } @@ -129,7 +137,7 @@ impl PoolEventReceiver { /// See the documentation of `PoolController` for details on each function. impl PoolController for MockPoolController { fn add_endorsements(&mut self, endorsements: Storage) { - self.0 + self.q .lock() .unwrap() .send(MockPoolControllerMessage::AddEndorsements { endorsements }) @@ -137,7 +145,7 @@ impl PoolController for MockPoolController { } fn add_operations(&mut self, operations: Storage) { - self.0 + self.q .lock() .unwrap() .send(MockPoolControllerMessage::AddOperations { operations }) @@ -150,7 +158,7 @@ impl PoolController for MockPoolController { target_slot: &Slot, ) -> (Vec>, Storage) { let (response_tx, response_rx) = mpsc::channel(); - self.0 + self.q .lock() .unwrap() .send(MockPoolControllerMessage::GetBlockEndorsements { @@ -164,7 +172,7 @@ impl PoolController for MockPoolController { fn get_block_operations(&self, slot: &Slot) -> (Vec, Storage) { let (response_tx, response_rx) = mpsc::channel(); - self.0 + self.q .lock() .unwrap() .send(MockPoolControllerMessage::GetBlockOperations { @@ -177,7 +185,7 @@ impl PoolController for MockPoolController { fn get_endorsement_count(&self) -> usize { let (response_tx, response_rx) = mpsc::channel(); - self.0 + self.q .lock() .unwrap() .send(MockPoolControllerMessage::GetEndorsementCount { response_tx }) @@ -187,7 +195,7 @@ impl PoolController for MockPoolController { fn get_operation_count(&self) -> usize { let (response_tx, response_rx) = mpsc::channel(); - self.0 + self.q .lock() .unwrap() .send(MockPoolControllerMessage::GetOperationCount { response_tx }) @@ -197,7 +205,7 @@ impl PoolController for MockPoolController { fn contains_endorsements(&self, endorsements: &[EndorsementId]) -> Vec { let (response_tx, response_rx) = mpsc::channel(); - self.0 + self.q .lock() .unwrap() .send(MockPoolControllerMessage::ContainsEndorsements { @@ -210,7 +218,7 @@ impl PoolController for MockPoolController { fn contains_operations(&self, operations: &[OperationId]) -> Vec { let (response_tx, response_rx) = mpsc::channel(); - self.0 + self.q .lock() .unwrap() .send(MockPoolControllerMessage::ContainsOperations { @@ -222,7 +230,8 @@ impl PoolController for MockPoolController { } fn notify_final_cs_periods(&mut self, final_cs_periods: &[u64]) { - self.0 + self.last_final_cs_periods = final_cs_periods.to_vec(); + self.q .lock() .unwrap() .send(MockPoolControllerMessage::NotifyFinalCsPeriods { @@ -236,10 +245,14 @@ impl PoolController for MockPoolController { } fn add_denunciations(&mut self, denunciations: Storage) { - self.0 + self.q .lock() .unwrap() .send(MockPoolControllerMessage::AddDenunciations { denunciations }) .unwrap(); } + + fn get_final_cs_periods(&self) -> &Vec { + &self.last_final_cs_periods + } } diff --git a/massa-pool-worker/Cargo.toml b/massa-pool-worker/Cargo.toml index 5aae0435bd3..cc4df605e86 100644 --- a/massa-pool-worker/Cargo.toml +++ b/massa-pool-worker/Cargo.toml @@ -14,6 +14,7 @@ massa_models = { path = "../massa-models" } massa_storage = { path = "../massa-storage" } massa_pool_exports = { path = "../massa-pool-exports" } massa_execution_exports = { path = "../massa-execution-exports" } +massa_time = { path = "../massa-time" } [dev-dependencies] tokio = { version = "1.23", features = ["sync"] } diff --git a/massa-pool-worker/src/controller_impl.rs b/massa-pool-worker/src/controller_impl.rs index 32151d324da..fdc68bc3902 100644 --- a/massa-pool-worker/src/controller_impl.rs +++ b/massa-pool-worker/src/controller_impl.rs @@ -42,6 +42,10 @@ pub struct PoolControllerImpl { pub(crate) operations_input_sender: SyncSender, /// Endorsement write worker command sender pub(crate) endorsements_input_sender: SyncSender, + /// Denunciation write worker command sender + pub(crate) denunciations_input_sender: SyncSender, + /// Last final periods from Consensus + pub last_cs_final_periods: Vec, } impl PoolController for PoolControllerImpl { @@ -79,6 +83,8 @@ impl PoolController for PoolControllerImpl { /// Asynchronously notify of new final consensus periods. Simply print a warning on failure. fn notify_final_cs_periods(&mut self, final_cs_periods: &[u64]) { + self.last_cs_final_periods = final_cs_periods.to_vec().clone(); + match self .operations_input_sender .try_send(Command::NotifyFinalCsPeriods(final_cs_periods.to_vec())) @@ -110,6 +116,23 @@ impl PoolController for PoolControllerImpl { } Ok(_) => {} } + + match self + .denunciations_input_sender + .try_send(Command::NotifyFinalCsPeriods(final_cs_periods.to_vec())) + { + Err(TrySendError::Disconnected(_)) => { + warn!( + "Could not notify endorsement pool of new final slots: worker is unreachable." + ); + } + Err(TrySendError::Full(_)) => { + warn!( + "Could not notify endorsement pool of new final slots: worker channel is full." + ); + } + Ok(_) => {} + } } /// get operations for block creation @@ -158,8 +181,23 @@ impl PoolController for PoolControllerImpl { /// Add denunciation to pool fn add_denunciations(&mut self, denunciations: Storage) { - let mut lck = self.denunciation_pool.write(); - lck.add_denunciation(denunciations); + match self + .denunciations_input_sender + .try_send(Command::AddItems(denunciations)) + { + Err(TrySendError::Disconnected(_)) => { + warn!("Could not add denunciations to pool: worker is unreachable."); + } + Err(TrySendError::Full(_)) => { + warn!("Could not add denunciations to pool: worker channel is full."); + } + Ok(_) => {} + } + } + + /// Get final consensus periods + fn get_final_cs_periods(&self) -> &Vec { + &self.last_cs_final_periods } } @@ -171,10 +209,14 @@ pub struct PoolManagerImpl { pub(crate) operations_thread_handle: Option>, /// Handle used to join the endorsement thread pub(crate) endorsements_thread_handle: Option>, + /// Handle used to join the denunciation thread + pub(crate) denunciations_thread_handle: Option>, /// Operations input data mpsc (used to stop the pool thread) pub(crate) operations_input_sender: SyncSender, /// Endorsements input data mpsc (used to stop the pool thread) pub(crate) endorsements_input_sender: SyncSender, + /// Denunciations input data mpsc (used to stop the pool thread) + pub(crate) denunciations_input_sender: SyncSender, } impl PoolManager for PoolManagerImpl { diff --git a/massa-pool-worker/src/denunciation_pool.rs b/massa-pool-worker/src/denunciation_pool.rs index 0d2c23984bf..203cad22856 100644 --- a/massa-pool-worker/src/denunciation_pool.rs +++ b/massa-pool-worker/src/denunciation_pool.rs @@ -1,13 +1,21 @@ -use massa_models::prehash::{CapacityAllocator, PreHashSet}; +use massa_models::denunciation::{Denunciation, DenunciationId}; +use massa_models::prehash::{CapacityAllocator, PreHashMap, PreHashSet}; +use massa_models::slot::Slot; +use massa_models::timeslots::get_closest_slot_to_timestamp; use massa_pool_exports::PoolConfig; use massa_storage::Storage; +use massa_time::MassaTime; pub struct DenunciationPool { /// configuration config: PoolConfig, /// storage storage: Storage, + /// last consensus final periods, per thread + last_cs_final_periods: Vec, + /// internal cache + denunciations_cache: PreHashMap, } impl DenunciationPool { @@ -15,6 +23,8 @@ impl DenunciationPool { Self { config, storage: storage.clone_without_refs(), + last_cs_final_periods: vec![0u64; config.thread_count as usize], + denunciations_cache: Default::default(), } } @@ -40,26 +50,39 @@ impl DenunciationPool { .collect::>(); let mut added = PreHashSet::with_capacity(denunciation_ids.len()); - let mut removed = PreHashSet::with_capacity(denunciation_ids.len()); // populate added { let de_indexes = denunciation_storage.read_denunciations(); for de_id in denunciation_ids { - let _de = de_indexes.get(&de_id).expect( + let de = de_indexes.get(&de_id).expect( "Attempting to add denunciation to pool but it is absent from given storage", ); - // TODO: do not add denunciation if expired + let now = MassaTime::now().expect("could not get current time"); + // get closest slot according to the current absolute time + let slot_now = get_closest_slot_to_timestamp( + self.config.thread_count, + self.config.t0, + self.config.genesis_timestamp, + now, + ); + if Denunciation::is_expired( + de.get_slot(), + &slot_now, + &self.last_cs_final_periods, + self.config.periods_per_cycle, + self.config.denunciation_expire_cycle_delta, + ) { + continue; + } - // TODO: check that we do not add "duplicate" + self.denunciations_cache + .insert(de_id, de.get_slot().clone()); added.insert(de_id); } } - // populate removed - // TODO: remove from self storage - // take ownership on added denunciations self.storage.extend(denunciation_storage.split_off( &Default::default(), @@ -67,9 +90,6 @@ impl DenunciationPool { &Default::default(), &added, )); - - // drop removed endorsements from storage - self.storage.drop_denunciation_refs(&removed); } // In next PR @@ -82,4 +102,41 @@ impl DenunciationPool { todo!() } */ + + pub(crate) fn notify_final_cs_periods(&mut self, final_cs_periods: &[u64]) { + // update internal final CS period counter + self.last_cs_final_periods = final_cs_periods.to_vec(); + + // remove all denunciations that are expired + let mut removed: PreHashSet = Default::default(); + + let now = MassaTime::now().expect("could not get current time"); + // get closest slot according to the current absolute time + let slot_now = get_closest_slot_to_timestamp( + self.config.thread_count, + self.config.t0, + self.config.genesis_timestamp, + now, + ); + + for (de_id, de_slot) in self.denunciations_cache.iter() { + if Denunciation::is_expired( + de_slot, + &slot_now, + &self.last_cs_final_periods, + self.config.periods_per_cycle, + self.config.denunciation_expire_cycle_delta, + ) { + removed.insert(de_id.clone()); + } + } + + // Remove from internal cache + for de_id in removed.iter() { + self.denunciations_cache.remove(de_id); + } + + // Remove from storage + self.storage.drop_denunciation_refs(&removed); + } } diff --git a/massa-pool-worker/src/worker.rs b/massa-pool-worker/src/worker.rs index 46f3d44423a..1a0c5c4ce02 100644 --- a/massa-pool-worker/src/worker.rs +++ b/massa-pool-worker/src/worker.rs @@ -108,6 +108,50 @@ impl OperationPoolThread { } } +/// Denunciation pool writer thread. +pub(crate) struct DenunciationPoolThread { + /// Command reception channel + receiver: Receiver, + /// Shared reference to the denunciation pool + denunciation_pool: Arc>, +} + +impl DenunciationPoolThread { + /// Spawns a pool writer thread, returning a join handle. + pub(crate) fn spawn( + receiver: Receiver, + denunciation_pool: Arc>, + ) -> JoinHandle<()> { + let thread_builder = thread::Builder::new().name("denunciation-pool".into()); + thread_builder + .spawn(|| { + let this = Self { + receiver, + denunciation_pool, + }; + this.run() + }) + .expect("failed to spawn thread : operation-pool") + } + + /// Run the thread. + fn run(self) { + loop { + match self.receiver.recv() { + Err(RecvError) => break, + Ok(Command::Stop) => break, + Ok(Command::AddItems(operations)) => { + self.denunciation_pool.write().add_denunciation(operations) + } + Ok(Command::NotifyFinalCsPeriods(final_cs_periods)) => self + .denunciation_pool + .write() + .notify_final_cs_periods(&final_cs_periods), + }; + } + } +} + /// Start pool manager and controller #[allow(clippy::type_complexity)] pub fn start_pool_controller( @@ -120,6 +164,8 @@ pub fn start_pool_controller( let (operations_input_sender, operations_input_receiver) = sync_channel(config.channels_size); let (endorsements_input_sender, endorsements_input_receiver) = sync_channel(config.channels_size); + let (denunciations_input_sender, denunciations_input_receiver) = + sync_channel(config.channels_size); let operation_pool = Arc::new(RwLock::new(OperationPool::init( config, storage, @@ -136,21 +182,27 @@ pub fn start_pool_controller( _config: config, operation_pool: operation_pool.clone(), endorsement_pool: endorsement_pool.clone(), - denunciation_pool: denunciation_pool, + denunciation_pool: denunciation_pool.clone(), operations_input_sender: operations_input_sender.clone(), endorsements_input_sender: endorsements_input_sender.clone(), + denunciations_input_sender: denunciations_input_sender.clone(), + last_cs_final_periods: vec![0u64; usize::from(config.thread_count)], }; let operations_thread_handle = OperationPoolThread::spawn(operations_input_receiver, operation_pool); let endorsements_thread_handle = EndorsementPoolThread::spawn(endorsements_input_receiver, endorsement_pool); + let denunciations_thread_handle = + DenunciationPoolThread::spawn(denunciations_input_receiver, denunciation_pool); let manager = PoolManagerImpl { operations_thread_handle: Some(operations_thread_handle), endorsements_thread_handle: Some(endorsements_thread_handle), + denunciations_thread_handle: Some(denunciations_thread_handle), operations_input_sender, endorsements_input_sender, + denunciations_input_sender, }; (Box::new(manager), Box::new(controller)) } diff --git a/massa-storage/src/denunciation_indexes.rs b/massa-storage/src/denunciation_indexes.rs index c67cad18a62..660d9843f04 100644 --- a/massa-storage/src/denunciation_indexes.rs +++ b/massa-storage/src/denunciation_indexes.rs @@ -13,6 +13,7 @@ impl DenunciationIndexes { /// Insert a denunciation /// Arguments: /// - denunciation: the denunciation to insert + #[allow(unused_must_use)] pub(crate) fn insert(&mut self, denunciation: Denunciation) { let denunciation_id = DenunciationId::from(&denunciation); self.denunciations.try_insert(denunciation_id, denunciation);