Skip to content

Commit

Permalink
Add cleanup in DenunciationPool && DenunciationFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
sydhds committed Mar 30, 2023
1 parent c7cf0ab commit ac0f1c6
Show file tree
Hide file tree
Showing 19 changed files with 345 additions and 66 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions massa-factory-exports/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,10 @@ pub struct FactoryConfig {

/// maximum number of operation ids in block
pub max_operations_per_block: u32,

/// cycle duration in periods
pub periods_per_cycle: u64,

/// denunciation expiration as cycle delta
pub denunciation_expire_cycle_delta: u64,
}
2 changes: 2 additions & 0 deletions massa-factory-exports/src/test_exports/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ impl Default for FactoryConfig {
max_block_size: MAX_BLOCK_SIZE as u64,
max_block_gas: MAX_GAS_PER_BLOCK,
max_operations_per_block: MAX_OPERATIONS_PER_BLOCK,
periods_per_cycle: PERIODS_PER_CYCLE,
denunciation_expire_cycle_delta: DENUNCIATION_EXPIRE_CYCLE_DELTA,
}
}
}
78 changes: 50 additions & 28 deletions massa-factory-worker/src/denunciation_factory.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::thread;

use crossbeam_channel::{select, Receiver};
Expand All @@ -10,6 +10,8 @@ use massa_models::block_header::SecuredHeader;
use massa_models::denunciation::Denunciation;
use massa_models::endorsement::SecureShareEndorsement;
use massa_models::slot::Slot;
use massa_models::timeslots::get_closest_slot_to_timestamp;
use massa_time::MassaTime;

// TODO: rework these values
const DENUNCIATION_FACTORY_ENDORSEMENT_CACHE_MAX_LEN: usize = 4096;
Expand All @@ -29,14 +31,12 @@ pub(crate) struct DenunciationFactoryWorker {
/// Internal storage for endorsement -
/// store at max 1 endorsement per entry, as soon as we have 2 we produce a Denunciation
endorsements_by_slot_index: HashMap<(Slot, u32), Vec<SecureShareEndorsement>>,
/// Cache to avoid processing several time the same endorsement denunciation
seen_endorsement_denunciation: HashSet<(Slot, u32)>,

// internal for block header (or secured header)
/// Internal storage for endorsement -
/// store at max 1 endorsement per entry, as soon as we have 2 we produce a Denunciation
// FIXME: Store per 'Slot' then per 'PubKey'
block_header_by_slot: HashMap<Slot, Vec<SecuredHeader>>,
/// Cache to avoid processing several time the same endorsement denunciation
seen_block_header_denunciation: HashSet<Slot>,
}

impl DenunciationFactoryWorker {
Expand All @@ -59,9 +59,7 @@ impl DenunciationFactoryWorker {
consensus_receiver,
endorsement_pool_receiver,
endorsements_by_slot_index: Default::default(),
seen_endorsement_denunciation: Default::default(),
block_header_by_slot: Default::default(),
seen_block_header_denunciation: Default::default(),
};
factory.run();
})
Expand All @@ -71,22 +69,10 @@ impl DenunciationFactoryWorker {
/// Process new secured header (~ block header)
fn process_new_secured_header(&mut self, secured_header: SecuredHeader) {
let key = secured_header.content.slot;
if self.seen_block_header_denunciation.contains(&key) {
warn!(
"Denunciation factory process a block header that have already been denounced: {}",
secured_header
);
return;
}

// TODO: need 2 separate constants here?
if self.seen_block_header_denunciation.len()
> DENUNCIATION_FACTORY_BLOCK_HEADER_CACHE_MAX_LEN
|| self.block_header_by_slot.len() > DENUNCIATION_FACTORY_BLOCK_HEADER_CACHE_MAX_LEN
{
if self.block_header_by_slot.len() > DENUNCIATION_FACTORY_BLOCK_HEADER_CACHE_MAX_LEN {
warn!(
"Denunciation factory cannot process - cache full: {}, {}",
self.seen_block_header_denunciation.len(),
"Denunciation factory cannot process - cache full: {}",
self.block_header_by_slot.len()
);
return;
Expand Down Expand Up @@ -130,6 +116,8 @@ impl DenunciationFactoryWorker {
de_storage.store_denunciation(denunciation);
self.channels.pool.add_denunciations(de_storage);
}

self.cleanup_cache();
}

/// Process new secure share endorsement (~ endorsement)
Expand All @@ -141,21 +129,17 @@ impl DenunciationFactoryWorker {
secure_share_endorsement.content.slot,
secure_share_endorsement.content.index,
);
if self.seen_endorsement_denunciation.contains(&key) {
if self.endorsements_by_slot_index.contains_key(&key) {
warn!(
"Denunciation factory process an endorsement that have already been denounced: {}",
secure_share_endorsement
);
return;
}

if self.seen_endorsement_denunciation.len() > DENUNCIATION_FACTORY_ENDORSEMENT_CACHE_MAX_LEN
|| self.endorsements_by_slot_index.len()
> DENUNCIATION_FACTORY_ENDORSEMENT_CACHE_MAX_LEN
{
if self.endorsements_by_slot_index.len() > DENUNCIATION_FACTORY_ENDORSEMENT_CACHE_MAX_LEN {
warn!(
"Denunciation factory cannot process - cache full: {}, {}",
self.seen_endorsement_denunciation.len(),
"Denunciation factory cannot process - cache full: {}",
self.endorsements_by_slot_index.len()
);
return;
Expand Down Expand Up @@ -194,7 +178,45 @@ impl DenunciationFactoryWorker {
"Created a new endorsement denunciation : {:?}",
denunciation
);

let mut de_storage = self.channels.storage.clone_without_refs();
de_storage.store_denunciation(denunciation);
self.channels.pool.add_denunciations(de_storage);
}

self.cleanup_cache();
}

fn cleanup_cache(&mut self) {
let now = MassaTime::now().expect("could not get current time");

// get closest slot according to the current absolute time
let slot_now = get_closest_slot_to_timestamp(
self.cfg.thread_count,
self.cfg.t0,
self.cfg.genesis_timestamp,
now,
);

self.endorsements_by_slot_index.retain(|(slot, _index), _| {
Denunciation::is_expired(
slot,
&slot_now,
self.channels.pool.get_final_cs_periods(),
self.cfg.periods_per_cycle,
self.cfg.denunciation_expire_cycle_delta,
)
});

self.block_header_by_slot.retain(|slot, _| {
Denunciation::is_expired(
slot,
&slot_now,
self.channels.pool.get_final_cs_periods(),
self.cfg.periods_per_cycle,
self.cfg.denunciation_expire_cycle_delta,
)
});
}

/// main run loop of the denunciation creator thread
Expand Down
15 changes: 10 additions & 5 deletions massa-factory-worker/src/tests/scenarios.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ use super::TestFactory;
use massa_hash::Hash;
use massa_models::block_header::{BlockHeader, BlockHeaderSerializer, SecuredHeader};
use massa_models::block_id::BlockId;
use massa_models::config::THREAD_COUNT;
use massa_models::config::{T0, THREAD_COUNT};
use massa_models::denunciation::{Denunciation, DenunciationId};
use massa_models::endorsement::{Endorsement, EndorsementSerializerLW};
use massa_models::slot::Slot;
use massa_models::timeslots::get_closest_slot_to_timestamp;
use massa_models::{
amount::Amount,
operation::{Operation, OperationSerializer, OperationType},
secure_share::SecureShareContent,
};
use massa_signature::KeyPair;
use massa_time::MassaTime;
use std::str::FromStr;
use std::time::Duration;

Expand Down Expand Up @@ -74,11 +76,13 @@ fn basic_creation_with_multiple_operations() {

/// Send 2 block headers and check if a Denunciation op is in storage
#[test]
#[ignore]
fn test_denunciation_factory_block_header_denunciation() {
let keypair = KeyPair::generate();

let slot = Slot::new(2, 1);
let now = MassaTime::now().expect("could not get current time");
// get closest slot according to the current absolute time
let slot = get_closest_slot_to_timestamp(THREAD_COUNT, T0, now, now);

let parents: Vec<BlockId> = (0..THREAD_COUNT)
.map(|i| BlockId(Hash::compute_from(&[i])))
.collect();
Expand Down Expand Up @@ -146,7 +150,8 @@ fn test_denunciation_factory_block_header_denunciation() {
let de_indexes = test_factory.storage.read_denunciations();
assert_eq!(de_indexes.get(&denunciation_id), Some(&denunciation));

drop(de_indexes); // release RwLockReadGuard
// stop everything
// release RwLockReadGuard
drop(de_indexes);
// stop everything
drop(test_factory);
}
17 changes: 15 additions & 2 deletions massa-factory-worker/src/tests/tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ use massa_wallet::test_exports::create_test_wallet;
/// The factory will ask that to the the pool, consensus and factory and then will send the block to the consensus.
/// You can use the method `new` to build all the mocks and make the connections
/// Then you can use the method `get_next_created_block` that will manage the answers from the mock to the factory depending on the parameters you gave.
#[allow(dead_code)]
pub struct TestFactory {
consensus_event_receiver: ConsensusEventReceiver,
pool_receiver: PoolEventReceiver,
selector_receiver: Receiver<MockSelectorControllerMessage>,
selector_receiver: Option<Receiver<MockSelectorControllerMessage>>,
factory_config: FactoryConfig,
factory_manager: Box<dyn FactoryManager>,
genesis_blocks: Vec<(BlockId, u64)>,
Expand Down Expand Up @@ -103,7 +104,7 @@ impl TestFactory {
TestFactory {
consensus_event_receiver,
pool_receiver,
selector_receiver,
selector_receiver: Some(selector_receiver),
factory_config,
factory_manager,
genesis_blocks,
Expand Down Expand Up @@ -136,6 +137,8 @@ impl TestFactory {
loop {
match self
.selector_receiver
.as_ref()
.unwrap()
.recv_timeout(Duration::from_millis(100))
{
Ok(MockSelectorControllerMessage::GetProducer {
Expand Down Expand Up @@ -235,6 +238,16 @@ impl TestFactory {

impl Drop for TestFactory {
fn drop(&mut self) {
// Need this otherwise factory_manager is stuck while waiting for block & endorsement factory
// to join
// For instance, block factory is waiting for selector.get_producer(...)
// endorsement factory is waiting for selector.get_selection(...)
// Note: that this will make the 2 threads panic
// TODO: find a better way to resolve this
if let Some(selector_receiver) = self.selector_receiver.take() {
drop(selector_receiver);
}

self.factory_manager.stop();
}
}
7 changes: 7 additions & 0 deletions massa-models/src/config/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,13 @@ pub const NETWORK_NODE_EVENT_CHANNEL_SIZE: usize = 10_000;
/// Threshold to accept a new versioning
pub const VERSIONING_THRESHOLD_TRANSITION_ACCEPTED: Amount = Amount::from_mantissa_scale(75, 0);

//
// Constants for denunciation factory
//

/// denunciation expiration delta (in cycle count)
pub const DENUNCIATION_EXPIRE_CYCLE_DELTA: u64 = 3;

// Some checks at compile time that should not be ignored!
#[allow(clippy::assertions_on_constants)]
const _: () = {
Expand Down
34 changes: 34 additions & 0 deletions massa-models/src/denunciation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,40 @@ impl Denunciation {
&& public_key.verify_signature(&hash_1, &signature_1).is_ok()
&& public_key.verify_signature(&hash_2, &signature_2).is_ok())
}

/// Get Denunciation slot ref
pub fn get_slot(&self) -> &Slot {
match self {
Denunciation::Endorsement(endo_de) => &endo_de.slot,
Denunciation::BlockHeader(blkh_de) => &blkh_de.slot,
}
}

/// For a given slot (and given the slot at now()), check if it can be denounced
/// Can be used to check if block header | endorsement is not too old (at reception or too cleanup cache)
pub fn is_expired(
slot: &Slot,
slot_at_now: &Slot,
last_cs_final_periods: &[u64],
periods_per_cycle: u64,
denunciation_expire_cycle_delta: u64,
) -> bool {
// Slot is final -> cannot be Denounced anymore, it's too late!
if slot.period <= last_cs_final_periods[slot.thread as usize] {
return true;
}

// As we need to ensure that the Denounced has some 'Deferred credits'
// we will reject Denunciation older than 3 cycle compared to the current slot
let cycle = slot.get_cycle(periods_per_cycle);
let next_cycle = slot_at_now.get_cycle(periods_per_cycle);

if (next_cycle - cycle) > denunciation_expire_cycle_delta {
return true;
}

false
}
}

/// Create a new Denunciation from 2 SecureShareEndorsement
Expand Down
8 changes: 7 additions & 1 deletion massa-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use massa_models::config::constants::{
POS_SAVED_CYCLES, PROTOCOL_CONTROLLER_CHANNEL_SIZE, PROTOCOL_EVENT_CHANNEL_SIZE, ROLL_PRICE,
T0, THREAD_COUNT, VERSION,
};
use massa_models::config::CONSENSUS_BOOTSTRAP_PART_SIZE;
use massa_models::config::{CONSENSUS_BOOTSTRAP_PART_SIZE, DENUNCIATION_EXPIRE_CYCLE_DELTA};
use massa_models::endorsement::SecureShareEndorsement;
use massa_network_exports::{Establisher, NetworkConfig, NetworkManager};
use massa_network_worker::start_network_controller;
Expand Down Expand Up @@ -370,6 +370,10 @@ async fn launch(
channels_size: POOL_CONTROLLER_CHANNEL_SIZE,
broadcast_enabled: SETTINGS.api.enable_ws,
broadcast_operations_capacity: SETTINGS.pool.broadcast_operations_capacity,
genesis_timestamp: *GENESIS_TIMESTAMP,
t0: T0,
periods_per_cycle: PERIODS_PER_CYCLE,
denunciation_expire_cycle_delta: DENUNCIATION_EXPIRE_CYCLE_DELTA,
};

let pool_channels = PoolChannels {
Expand Down Expand Up @@ -504,6 +508,8 @@ async fn launch(
max_block_size: MAX_BLOCK_SIZE as u64,
max_block_gas: MAX_GAS_PER_BLOCK,
max_operations_per_block: MAX_OPERATIONS_PER_BLOCK,
periods_per_cycle: PERIODS_PER_CYCLE,
denunciation_expire_cycle_delta: DENUNCIATION_EXPIRE_CYCLE_DELTA,
};
let factory_channels = FactoryChannels {
selector: selector_controller.clone(),
Expand Down
4 changes: 2 additions & 2 deletions massa-pool-exports/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ tokio = { version = "1.23", features = ["sync"] }
# custom modules
massa_models = { path = "../massa-models" }
massa_storage = { path = "../massa-storage" }
massa_time = { path = "../massa-time", optional = true }
massa_time = { path = "../massa-time"}

[dev-dependencies]

# for more information on what are the following features used for, see the cargo.toml at workspace level
[features]
testing = [ "dep:massa_time" ]
testing = []
9 changes: 9 additions & 0 deletions massa-pool-exports/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Copyright (c) 2022 MASSA LABS <info@massa.net>
use massa_models::amount::Amount;
use massa_time::MassaTime;
use serde::{Deserialize, Serialize};

/// Pool configuration
Expand Down Expand Up @@ -30,4 +31,12 @@ pub struct PoolConfig {
pub broadcast_enabled: bool,
/// operations sender(channel) capacity
pub broadcast_operations_capacity: usize,
/// genesis timestamp
pub genesis_timestamp: MassaTime,
/// period duration
pub t0: MassaTime,
/// cycle duration in periods
pub periods_per_cycle: u64,
/// denunciation expiration as cycle delta
pub denunciation_expire_cycle_delta: u64,
}
3 changes: 3 additions & 0 deletions massa-pool-exports/src/controller_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub trait PoolController: Send + Sync {
/// Returns a boxed clone of self.
/// Useful to allow cloning `Box<dyn PoolController>`.
fn clone_box(&self) -> Box<dyn PoolController>;

/// Get final cs periods (updated regularly from consensus)
fn get_final_cs_periods(&self) -> &Vec<u64>;
}

/// Allow cloning `Box<dyn PoolController>`
Expand Down
Loading

0 comments on commit ac0f1c6

Please sign in to comment.