Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mip store bootstrap #3765

Merged
merged 4 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions massa-bootstrap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ massa_serialization = { path = "../massa-serialization" }
massa_signature = { path = "../massa-signature" }
massa_pos_exports = { path = "../massa-pos-exports" }
massa_time = { path = "../massa-time" }
massa_versioning_worker = { path = "../massa-versioning-worker" }

[dev-dependencies]
bitvec = { version = "1.0", features = ["serde"] }
Expand Down
22 changes: 22 additions & 0 deletions massa-bootstrap/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use massa_logging::massa_trace;
use massa_models::{node::NodeId, streaming_step::StreamingStep, version::Version};
use massa_signature::PublicKey;
use massa_time::MassaTime;
use massa_versioning_worker::versioning::{MipStore, MipStoreRaw};
use parking_lot::RwLock;
use rand::{
prelude::{SliceRandom, StdRng},
Expand Down Expand Up @@ -339,6 +340,27 @@ async fn bootstrap_from_server<D: Duplex>(
other => return Err(BootstrapError::UnexpectedServerMessage(other)),
};
global_bootstrap_state.peers = Some(peers);
*next_bootstrap_message = BootstrapClientMessage::AskBootstrapMipStore;
}
BootstrapClientMessage::AskBootstrapMipStore => {
let mip_store_raw: MipStoreRaw = match send_client_message(
next_bootstrap_message,
client,
write_timeout,
cfg.read_timeout.into(),
"ask bootstrap versioning store timed out",
)
.await?
{
BootstrapServerMessage::BootstrapMipStore { store: store_raw } => store_raw,
BootstrapServerMessage::BootstrapError { error } => {
return Err(BootstrapError::ReceivedError(error))
}
other => return Err(BootstrapError::UnexpectedServerMessage(other)),
};

global_bootstrap_state.mip_store =
Some(MipStore(Arc::new(RwLock::new(mip_store_raw))));
*next_bootstrap_message = BootstrapClientMessage::BootstrapSuccess;
}
BootstrapClientMessage::BootstrapSuccess => {
Expand Down
5 changes: 5 additions & 0 deletions massa-bootstrap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod settings;
mod tools;
pub use client::get_state;
pub use establisher::{DefaultConnector, DefaultListener};
use massa_versioning_worker::versioning::MipStore;
pub use messages::{
BootstrapClientMessage, BootstrapClientMessageDeserializer, BootstrapClientMessageSerializer,
BootstrapServerMessage, BootstrapServerMessageDeserializer, BootstrapServerMessageSerializer,
Expand All @@ -51,6 +52,9 @@ pub struct GlobalBootstrapState {

/// list of network peers
pub peers: Option<BootstrapPeers>,

/// versioning info state
pub mip_store: Option<MipStore>,
}

impl GlobalBootstrapState {
Expand All @@ -59,6 +63,7 @@ impl GlobalBootstrapState {
final_state,
graph: None,
peers: None,
mip_store: None,
}
}
}
43 changes: 42 additions & 1 deletion massa-bootstrap/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use massa_serialization::{
U64VarIntSerializer,
};
use massa_time::{MassaTime, MassaTimeDeserializer, MassaTimeSerializer};
use massa_versioning_worker::versioning::MipStoreRaw;
use massa_versioning_worker::versioning_ser_der::{MipStoreRawDeserializer, MipStoreRawSerializer};
use nom::error::context;
use nom::multi::{length_count, length_data};
use nom::sequence::tuple;
Expand Down Expand Up @@ -85,6 +87,11 @@ pub enum BootstrapServerMessage {
/// Last Start Period for network restart management
last_start_period: Option<u64>,
},
/// Bootstrap versioning store
BootstrapMipStore {
/// Server mip store
store: MipStoreRaw,
},
/// Message sent when the final state and consensus bootstrap are finished
BootstrapFinished,
/// Slot sent to get state changes is too old
Expand All @@ -105,6 +112,7 @@ enum MessageServerTypeId {
FinalStateFinished = 3u32,
SlotTooOld = 4u32,
BootstrapError = 5u32,
MipStore = 6u32,
}

/// Serializer for `BootstrapServerMessage`
Expand All @@ -124,6 +132,7 @@ pub struct BootstrapServerMessageSerializer {
pos_credits_serializer: DeferredCreditsSerializer,
exec_ops_serializer: ExecutedOpsSerializer,
opt_last_start_period_serializer: OptionSerializer<u64, U64VarIntSerializer>,
store_serializer: MipStoreRawSerializer,
}

impl Default for BootstrapServerMessageSerializer {
Expand Down Expand Up @@ -151,6 +160,7 @@ impl BootstrapServerMessageSerializer {
pos_credits_serializer: DeferredCreditsSerializer::new(),
exec_ops_serializer: ExecutedOpsSerializer::new(),
opt_last_start_period_serializer: OptionSerializer::new(U64VarIntSerializer::new()),
store_serializer: MipStoreRawSerializer::new(),
}
}
}
Expand Down Expand Up @@ -241,6 +251,11 @@ impl Serializer<BootstrapServerMessage> for BootstrapServerMessageSerializer {
self.opt_last_start_period_serializer
.serialize(last_start_period, buffer)?;
}
BootstrapServerMessage::BootstrapMipStore { store: store_raw } => {
self.u32_serializer
.serialize(&u32::from(MessageServerTypeId::MipStore), buffer)?;
self.store_serializer.serialize(store_raw, buffer)?;
}
BootstrapServerMessage::BootstrapFinished => {
self.u32_serializer
.serialize(&u32::from(MessageServerTypeId::FinalStateFinished), buffer)?;
Expand Down Expand Up @@ -283,6 +298,7 @@ pub struct BootstrapServerMessageDeserializer {
pos_credits_deserializer: DeferredCreditsDeserializer,
exec_ops_deserializer: ExecutedOpsDeserializer,
opt_last_start_period_deserializer: OptionDeserializer<u64, U64VarIntDeserializer>,
store_deserializer: MipStoreRawDeserializer,
}

impl BootstrapServerMessageDeserializer {
Expand Down Expand Up @@ -358,6 +374,10 @@ impl BootstrapServerMessageDeserializer {
opt_last_start_period_deserializer: OptionDeserializer::new(
U64VarIntDeserializer::new(Included(u64::MIN), Included(u64::MAX)),
),
store_deserializer: MipStoreRawDeserializer::new(
args.mip_store_stats_block_considered,
args.mip_store_stats_counters_max,
),
}
}
}
Expand All @@ -382,7 +402,9 @@ impl Deserializer<BootstrapServerMessage> for BootstrapServerMessageDeserializer
/// max_datastore_value_length: 1000,
/// max_datastore_entry_count: 1000, max_bootstrap_error_length: 1000, max_changes_slot_count: 1000,
/// max_rolls_length: 1000, max_production_stats_length: 1000, max_credits_length: 1000,
/// max_executed_ops_length: 1000, max_ops_changes_length: 1000};
/// max_executed_ops_length: 1000, max_ops_changes_length: 1000,
/// mip_store_stats_block_considered: 100, mip_store_stats_counters_max: 10
/// };
/// let message_deserializer = BootstrapServerMessageDeserializer::new(args);
/// let bootstrap_server_message = BootstrapServerMessage::BootstrapTime {
/// server_time: MassaTime::from(0),
Expand Down Expand Up @@ -441,6 +463,13 @@ impl Deserializer<BootstrapServerMessage> for BootstrapServerMessageDeserializer
})
.map(|peers| BootstrapServerMessage::BootstrapPeers { peers })
.parse(input),
MessageServerTypeId::MipStore => {
context("Failed MIP store deserialization", |input| {
self.store_deserializer.deserialize(input)
})
.map(|store| BootstrapServerMessage::BootstrapMipStore { store })
.parse(input)
}
MessageServerTypeId::FinalStatePart => tuple((
context("Failed slot deserialization", |input| {
self.slot_deserializer.deserialize(input)
Expand Down Expand Up @@ -555,6 +584,8 @@ pub enum BootstrapClientMessage {
/// Should be true only for the first part, false later
send_last_start_period: bool,
},
/// Ask for mip store
AskBootstrapMipStore,
/// Bootstrap error
BootstrapError {
/// Error message
Expand All @@ -571,6 +602,7 @@ enum MessageClientTypeId {
AskFinalStatePart = 1u32,
BootstrapError = 2u32,
BootstrapSuccess = 3u32,
AskBootstrapMipStore = 4u32,
}

/// Serializer for `BootstrapClientMessage`
Expand Down Expand Up @@ -680,6 +712,12 @@ impl Serializer<BootstrapClientMessage> for BootstrapClientMessageSerializer {
self.u32_serializer
.serialize(&u32::from(MessageClientTypeId::BootstrapSuccess), buffer)?;
}
BootstrapClientMessage::AskBootstrapMipStore => {
self.u32_serializer.serialize(
&u32::from(MessageClientTypeId::AskBootstrapMipStore),
buffer,
)?;
}
}
Ok(())
}
Expand Down Expand Up @@ -784,6 +822,9 @@ impl Deserializer<BootstrapClientMessage> for BootstrapClientMessageDeserializer
MessageClientTypeId::AskBootstrapPeers => {
Ok((input, BootstrapClientMessage::AskBootstrapPeers))
}
MessageClientTypeId::AskBootstrapMipStore => {
Ok((input, BootstrapClientMessage::AskBootstrapMipStore))
}
MessageClientTypeId::AskFinalStatePart => {
if input.is_empty() {
Ok((
Expand Down
29 changes: 29 additions & 0 deletions massa-bootstrap/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ use massa_models::{
use massa_network_exports::NetworkCommandSender;
use massa_signature::KeyPair;
use massa_time::MassaTime;
use massa_versioning_worker::versioning::MipStore;

use parking_lot::RwLock;
use std::{
collections::HashMap,
Expand Down Expand Up @@ -103,6 +105,7 @@ impl<D: Duplex> BootstrapManager<D> {
}

/// See module level documentation for details
#[allow(clippy::too_many_arguments)]
pub fn start_bootstrap_server<D: Duplex + 'static>(
consensus_controller: Box<dyn ConsensusController>,
network_command_sender: NetworkCommandSender,
Expand All @@ -111,6 +114,7 @@ pub fn start_bootstrap_server<D: Duplex + 'static>(
listener: impl BSListener + Send + 'static,
keypair: KeyPair,
version: Version,
mip_store: MipStore,
) -> Result<Option<BootstrapManager<TcpStream>>, Box<BootstrapError>> {
massa_trace!("bootstrap.lib.start_bootstrap_server", {});

Expand Down Expand Up @@ -185,6 +189,7 @@ pub fn start_bootstrap_server<D: Duplex + 'static>(
ip_hist_map: HashMap::with_capacity(config.ip_list_max_size),
bootstrap_config: config,
bs_server_runtime,
mip_store,
}
.run_loop(max_bootstraps)
})
Expand Down Expand Up @@ -212,6 +217,7 @@ struct BootstrapServer<'a, D: Duplex> {
version: Version,
ip_hist_map: HashMap<IpAddr, Instant>,
bs_server_runtime: Runtime,
mip_store: MipStore,
}

impl<D: Duplex + 'static> BootstrapServer<'_, D> {
Expand Down Expand Up @@ -364,6 +370,7 @@ impl<D: Duplex + 'static> BootstrapServer<'_, D> {

let bootstrap_count_token = bootstrap_sessions_counter.clone();
let session_handle = bs_loop_rt.handle().clone();
let mip_store = self.mip_store.clone();

let _ = thread::Builder::new()
.name(format!("bootstrap thread, peer: {}", remote_addr))
Expand All @@ -378,6 +385,7 @@ impl<D: Duplex + 'static> BootstrapServer<'_, D> {
consensus_command_sender,
network_command_sender,
session_handle,
mip_store,
)
});

Expand Down Expand Up @@ -491,6 +499,7 @@ fn run_bootstrap_session<D: Duplex + 'static>(
consensus_command_sender: Box<dyn ConsensusController>,
network_command_sender: NetworkCommandSender,
bs_loop_rt_handle: Handle,
mip_store: MipStore,
) {
debug!("running bootstrap for peer {}", remote_addr);
bs_loop_rt_handle.block_on(async move {
Expand All @@ -503,6 +512,7 @@ fn run_bootstrap_session<D: Duplex + 'static>(
version,
consensus_command_sender,
network_command_sender,
mip_store,
),
)
.await;
Expand Down Expand Up @@ -766,6 +776,7 @@ async fn manage_bootstrap<D: Duplex + 'static>(
version: Version,
consensus_controller: Box<dyn ConsensusController>,
network_command_sender: NetworkCommandSender,
mip_store: MipStore,
) -> Result<(), BootstrapError> {
massa_trace!("bootstrap.lib.manage_bootstrap", {});
let read_error_timeout: Duration = bootstrap_config.read_error_timeout.into();
Expand Down Expand Up @@ -872,6 +883,24 @@ async fn manage_bootstrap<D: Duplex + 'static>(
)
.await?;
}
BootstrapClientMessage::AskBootstrapMipStore => {
let vs = mip_store.0.read().to_owned();
match tokio::time::timeout(
write_timeout,
server
.send(BootstrapServerMessage::BootstrapMipStore { store: vs.clone() }),
)
.await
{
Err(_) => Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"bootstrap peers send timed out",
)
.into()),
Ok(Err(e)) => Err(e),
Ok(Ok(_)) => Ok(()),
}?;
}
BootstrapClientMessage::BootstrapSuccess => break Ok(()),
BootstrapClientMessage::BootstrapError { error } => {
break Err(BootstrapError::ReceivedError(error));
Expand Down
8 changes: 8 additions & 0 deletions massa-bootstrap/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ pub struct BootstrapConfig {
pub consensus_bootstrap_part_size: u64,
/// max number of consensus block ids when sending a bootstrap cursor from the client
pub max_consensus_block_ids: u64,
/// block count to check / process for versioning stats
pub mip_store_stats_block_considered: usize,
/// max number of counters for versioning stats
pub mip_store_stats_counters_max: usize,
}

/// Bootstrap server binding
Expand Down Expand Up @@ -167,6 +171,8 @@ pub struct BootstrapClientConfig {
pub max_credits_length: u64,
pub max_executed_ops_length: u64,
pub max_ops_changes_length: u64,
pub mip_store_stats_block_considered: usize,
pub mip_store_stats_counters_max: usize,
}

/// Bootstrap Message der args
Expand Down Expand Up @@ -194,6 +200,8 @@ pub struct BootstrapServerMessageDeserializerArgs {
pub max_credits_length: u64,
pub max_executed_ops_length: u64,
pub max_ops_changes_length: u64,
pub mip_store_stats_block_considered: usize,
pub mip_store_stats_counters_max: usize,
}

// TODO: add a proc macro for this case
Expand Down
5 changes: 4 additions & 1 deletion massa-bootstrap/src/tests/binders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use massa_models::config::{
MAX_BOOTSTRAP_FINAL_STATE_PARTS_SIZE, MAX_BOOTSTRAP_MESSAGE_SIZE, MAX_DATASTORE_ENTRY_COUNT,
MAX_DATASTORE_KEY_LENGTH, MAX_DATASTORE_VALUE_LENGTH, MAX_DEFERRED_CREDITS_LENGTH,
MAX_EXECUTED_OPS_CHANGES_LENGTH, MAX_EXECUTED_OPS_LENGTH, MAX_LEDGER_CHANGES_COUNT,
MAX_OPERATIONS_PER_BLOCK, MAX_PRODUCTION_STATS_LENGTH, MAX_ROLLS_COUNT_LENGTH, THREAD_COUNT,
MAX_OPERATIONS_PER_BLOCK, MAX_PRODUCTION_STATS_LENGTH, MAX_ROLLS_COUNT_LENGTH,
MIP_STORE_STATS_BLOCK_CONSIDERED, MIP_STORE_STATS_COUNTERS_MAX, THREAD_COUNT,
};
use massa_models::node::NodeId;
use massa_models::version::Version;
Expand Down Expand Up @@ -55,6 +56,8 @@ impl<D: Duplex> BootstrapClientBinder<D> {
max_credits_length: MAX_DEFERRED_CREDITS_LENGTH,
max_executed_ops_length: MAX_EXECUTED_OPS_LENGTH,
max_ops_changes_length: MAX_EXECUTED_OPS_CHANGES_LENGTH,
mip_store_stats_block_considered: MIP_STORE_STATS_BLOCK_CONSIDERED,
mip_store_stats_counters_max: MIP_STORE_STATS_COUNTERS_MAX,
};
BootstrapClientBinder::new(client_duplex, remote_pubkey, cfg)
}
Expand Down
Loading