Skip to content

Commit

Permalink
Add mip store bootstrap
Browse files Browse the repository at this point in the history
  • Loading branch information
sydhds committed Apr 4, 2023
1 parent ffb6bfe commit 281b649
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions massa-bootstrap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ massa_serialization = { path = "../massa-serialization" }
massa_signature = { path = "../massa-signature" }
massa_pos_exports = { path = "../massa-pos-exports" }
massa_time = { path = "../massa-time" }
massa_versioning_worker = { path = "../massa-versioning-worker" }

[dev-dependencies]
bitvec = { version = "1.0", features = ["serde"] }
Expand Down
22 changes: 22 additions & 0 deletions massa-bootstrap/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use massa_logging::massa_trace;
use massa_models::{node::NodeId, streaming_step::StreamingStep, version::Version};
use massa_signature::PublicKey;
use massa_time::MassaTime;
use massa_versioning_worker::versioning::{MipStore, MipStoreRaw};
use parking_lot::RwLock;
use rand::{
prelude::{SliceRandom, StdRng},
Expand Down Expand Up @@ -328,6 +329,27 @@ async fn bootstrap_from_server(
other => return Err(BootstrapError::UnexpectedServerMessage(other)),
};
global_bootstrap_state.peers = Some(peers);
*next_bootstrap_message = BootstrapClientMessage::AskBootstrapMipStore;
}
BootstrapClientMessage::AskBootstrapMipStore => {
let mip_store_raw: MipStoreRaw = match send_client_message(
next_bootstrap_message,
client,
write_timeout,
cfg.read_timeout.into(),
"ask bootstrap versioning store timed out",
)
.await?
{
BootstrapServerMessage::BootstrapMipStore { store: store_raw } => store_raw,
BootstrapServerMessage::BootstrapError { error } => {
return Err(BootstrapError::ReceivedError(error))
}
other => return Err(BootstrapError::UnexpectedServerMessage(other)),
};

global_bootstrap_state.mip_store =
Some(MipStore(Arc::new(RwLock::new(mip_store_raw))));
*next_bootstrap_message = BootstrapClientMessage::BootstrapSuccess;
}
BootstrapClientMessage::BootstrapSuccess => {
Expand Down
4 changes: 4 additions & 0 deletions massa-bootstrap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod settings;
mod tools;
pub use client::get_state;
pub use establisher::DefaultEstablisher;
use massa_versioning_worker::versioning::MipStore;
pub use messages::{
BootstrapClientMessage, BootstrapClientMessageDeserializer, BootstrapClientMessageSerializer,
BootstrapServerMessage, BootstrapServerMessageDeserializer, BootstrapServerMessageSerializer,
Expand All @@ -51,6 +52,8 @@ pub struct GlobalBootstrapState {

/// list of network peers
pub peers: Option<BootstrapPeers>,

pub mip_store: Option<MipStore>,
}

impl GlobalBootstrapState {
Expand All @@ -59,6 +62,7 @@ impl GlobalBootstrapState {
final_state,
graph: None,
peers: None,
mip_store: None,
}
}
}
39 changes: 39 additions & 0 deletions massa-bootstrap/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use massa_serialization::{
U32VarIntDeserializer, U32VarIntSerializer, U64VarIntDeserializer, U64VarIntSerializer,
};
use massa_time::{MassaTime, MassaTimeDeserializer, MassaTimeSerializer};
use massa_versioning_worker::versioning::MipStoreRaw;
use massa_versioning_worker::versioning_ser_der::{MipStoreRawDeserializer, MipStoreRawSerializer};
use nom::error::context;
use nom::multi::{length_count, length_data};
use nom::sequence::tuple;
Expand Down Expand Up @@ -82,6 +84,11 @@ pub enum BootstrapServerMessage {
/// Outdated block ids in the current consensus graph bootstrap
consensus_outdated_ids: PreHashSet<BlockId>,
},
/// Bootstrap versioning store
BootstrapMipStore {
/// Server mip store
store: MipStoreRaw,
},
/// Message sent when the final state and consensus bootstrap are finished
BootstrapFinished,
/// Slot sent to get state changes is too old
Expand All @@ -102,6 +109,7 @@ enum MessageServerTypeId {
FinalStateFinished = 3u32,
SlotTooOld = 4u32,
BootstrapError = 5u32,
MipStore = 6u32,
}

/// Serializer for `BootstrapServerMessage`
Expand All @@ -120,6 +128,7 @@ pub struct BootstrapServerMessageSerializer {
opt_pos_cycle_serializer: OptionSerializer<CycleInfo, CycleInfoSerializer>,
pos_credits_serializer: DeferredCreditsSerializer,
exec_ops_serializer: ExecutedOpsSerializer,
store_serializer: MipStoreRawSerializer,
}

impl Default for BootstrapServerMessageSerializer {
Expand All @@ -146,6 +155,7 @@ impl BootstrapServerMessageSerializer {
opt_pos_cycle_serializer: OptionSerializer::new(CycleInfoSerializer::new()),
pos_credits_serializer: DeferredCreditsSerializer::new(),
exec_ops_serializer: ExecutedOpsSerializer::new(),
store_serializer: MipStoreRawSerializer::new(),
}
}
}
Expand Down Expand Up @@ -232,6 +242,11 @@ impl Serializer<BootstrapServerMessage> for BootstrapServerMessageSerializer {
self.block_id_set_serializer
.serialize(consensus_outdated_ids, buffer)?;
}
BootstrapServerMessage::BootstrapMipStore { store: store_raw } => {
self.u32_serializer
.serialize(&u32::from(MessageServerTypeId::MipStore), buffer)?;
self.store_serializer.serialize(store_raw, buffer)?;
}
BootstrapServerMessage::BootstrapFinished => {
self.u32_serializer
.serialize(&u32::from(MessageServerTypeId::FinalStateFinished), buffer)?;
Expand Down Expand Up @@ -273,6 +288,7 @@ pub struct BootstrapServerMessageDeserializer {
opt_pos_cycle_deserializer: OptionDeserializer<CycleInfo, CycleInfoDeserializer>,
pos_credits_deserializer: DeferredCreditsDeserializer,
exec_ops_deserializer: ExecutedOpsDeserializer,
store_deserializer: MipStoreRawDeserializer,
}

impl BootstrapServerMessageDeserializer {
Expand Down Expand Up @@ -344,6 +360,10 @@ impl BootstrapServerMessageDeserializer {
args.max_executed_ops_length,
args.max_operations_per_block as u64,
),
store_deserializer: MipStoreRawDeserializer::new(
args.mip_store_stats_block_considered,
args.mip_store_stats_counters_max,
),
}
}
}
Expand Down Expand Up @@ -427,6 +447,13 @@ impl Deserializer<BootstrapServerMessage> for BootstrapServerMessageDeserializer
})
.map(|peers| BootstrapServerMessage::BootstrapPeers { peers })
.parse(input),
MessageServerTypeId::MipStore => {
context("Failed MIP store deserialization", |input| {
self.store_deserializer.deserialize(input)
})
.map(|store| BootstrapServerMessage::BootstrapMipStore { store })
.parse(input)
}
MessageServerTypeId::FinalStatePart => tuple((
context("Failed slot deserialization", |input| {
self.slot_deserializer.deserialize(input)
Expand Down Expand Up @@ -534,6 +561,8 @@ pub enum BootstrapClientMessage {
/// Last received consensus block slot
last_consensus_step: StreamingStep<PreHashSet<BlockId>>,
},
/// Ask for mip store
AskBootstrapMipStore,
/// Bootstrap error
BootstrapError {
/// Error message
Expand All @@ -550,6 +579,7 @@ enum MessageClientTypeId {
AskFinalStatePart = 1u32,
BootstrapError = 2u32,
BootstrapSuccess = 3u32,
AskBootstrapMipStore = 4u32,
}

/// Serializer for `BootstrapClientMessage`
Expand Down Expand Up @@ -654,6 +684,12 @@ impl Serializer<BootstrapClientMessage> for BootstrapClientMessageSerializer {
self.u32_serializer
.serialize(&u32::from(MessageClientTypeId::BootstrapSuccess), buffer)?;
}
BootstrapClientMessage::AskBootstrapMipStore => {
self.u32_serializer.serialize(
&u32::from(MessageClientTypeId::AskBootstrapMipStore),
buffer,
)?;
}
}
Ok(())
}
Expand Down Expand Up @@ -756,6 +792,9 @@ impl Deserializer<BootstrapClientMessage> for BootstrapClientMessageDeserializer
MessageClientTypeId::AskBootstrapPeers => {
Ok((input, BootstrapClientMessage::AskBootstrapPeers))
}
MessageClientTypeId::AskBootstrapMipStore => {
Ok((input, BootstrapClientMessage::AskBootstrapMipStore))
}
MessageClientTypeId::AskFinalStatePart => {
if input.is_empty() {
Ok((
Expand Down
28 changes: 28 additions & 0 deletions massa-bootstrap/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ use massa_models::{
use massa_network_exports::NetworkCommandSender;
use massa_signature::KeyPair;
use massa_time::MassaTime;
use massa_versioning_worker::versioning::MipStore;

use parking_lot::RwLock;
use std::{
collections::HashMap,
Expand Down Expand Up @@ -108,6 +110,7 @@ pub async fn start_bootstrap_server(
mut establisher: impl BSEstablisher,
keypair: KeyPair,
version: Version,
mip_store: MipStore,
) -> Result<Option<BootstrapManager>, Box<BootstrapError>> {
massa_trace!("bootstrap.lib.start_bootstrap_server", {});
let Some(listen_addr) = config.listen_addr else {
Expand Down Expand Up @@ -188,6 +191,7 @@ pub async fn start_bootstrap_server(
ip_hist_map: HashMap::with_capacity(config.ip_list_max_size),
bootstrap_config: config,
bs_server_runtime,
mip_store,
}
.run_loop(max_bootstraps)
})
Expand Down Expand Up @@ -215,6 +219,7 @@ struct BootstrapServer<'a> {
version: Version,
ip_hist_map: HashMap<IpAddr, Instant>,
bs_server_runtime: Runtime,
mip_store: MipStore,
}

impl BootstrapServer<'_> {
Expand Down Expand Up @@ -365,6 +370,7 @@ impl BootstrapServer<'_> {

let bootstrap_count_token = bootstrap_sessions_counter.clone();
let session_handle = bs_loop_rt.handle().clone();
let mip_store = self.mip_store.clone();
let _ = thread::Builder::new()
.name(format!("bootstrap thread, peer: {}", remote_addr))
.spawn(move || {
Expand All @@ -378,6 +384,7 @@ impl BootstrapServer<'_> {
consensus_command_sender,
network_command_sender,
session_handle,
mip_store,
)
});

Expand Down Expand Up @@ -491,6 +498,7 @@ fn run_bootstrap_session(
consensus_command_sender: Box<dyn ConsensusController>,
network_command_sender: NetworkCommandSender,
bs_loop_rt_handle: Handle,
mip_store: MipStore,
) {
debug!("running bootstrap for peer {}", remote_addr);
bs_loop_rt_handle.block_on(async move {
Expand All @@ -503,6 +511,7 @@ fn run_bootstrap_session(
version,
consensus_command_sender,
network_command_sender,
mip_store,
),
)
.await;
Expand Down Expand Up @@ -755,6 +764,7 @@ async fn manage_bootstrap(
version: Version,
consensus_controller: Box<dyn ConsensusController>,
network_command_sender: NetworkCommandSender,
mip_store: MipStore,
) -> Result<(), BootstrapError> {
massa_trace!("bootstrap.lib.manage_bootstrap", {});
let read_error_timeout: Duration = bootstrap_config.read_error_timeout.into();
Expand Down Expand Up @@ -857,6 +867,24 @@ async fn manage_bootstrap(
)
.await?;
}
BootstrapClientMessage::AskBootstrapMipStore => {
let vs = mip_store.0.read().to_owned();
match tokio::time::timeout(
write_timeout,
server
.send(BootstrapServerMessage::BootstrapMipStore { store: vs.clone() }),
)
.await
{
Err(_) => Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"bootstrap peers send timed out",
)
.into()),
Ok(Err(e)) => Err(e),
Ok(Ok(_)) => Ok(()),
}?;
}
BootstrapClientMessage::BootstrapSuccess => break Ok(()),
BootstrapClientMessage::BootstrapError { error } => {
break Err(BootstrapError::ReceivedError(error));
Expand Down
8 changes: 8 additions & 0 deletions massa-bootstrap/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ pub struct BootstrapConfig {
pub consensus_bootstrap_part_size: u64,
/// max number of consensus block ids when sending a bootstrap cursor from the client
pub max_consensus_block_ids: u64,
/// block count to check / process for versioning stats
pub mip_store_stats_block_considered: usize,
/// max number of counters for versioning stats
pub mip_store_stats_counters_max: usize,
}

/// Bootstrap server binding
Expand Down Expand Up @@ -167,6 +171,8 @@ pub struct BootstrapClientConfig {
pub max_credits_length: u64,
pub max_executed_ops_length: u64,
pub max_ops_changes_length: u64,
pub mip_store_stats_block_considered: usize,
pub mip_store_stats_counters_max: usize,
}

/// Bootstrap Message der args
Expand Down Expand Up @@ -194,6 +200,8 @@ pub struct BootstrapServerMessageDeserializerArgs {
pub max_credits_length: u64,
pub max_executed_ops_length: u64,
pub max_ops_changes_length: u64,
pub mip_store_stats_block_considered: usize,
pub mip_store_stats_counters_max: usize,
}

// TODO: add a proc macro for this case
Expand Down
5 changes: 4 additions & 1 deletion massa-bootstrap/src/tests/binders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use massa_models::config::{
MAX_BOOTSTRAP_FINAL_STATE_PARTS_SIZE, MAX_BOOTSTRAP_MESSAGE_SIZE, MAX_DATASTORE_ENTRY_COUNT,
MAX_DATASTORE_KEY_LENGTH, MAX_DATASTORE_VALUE_LENGTH, MAX_DEFERRED_CREDITS_LENGTH,
MAX_EXECUTED_OPS_CHANGES_LENGTH, MAX_EXECUTED_OPS_LENGTH, MAX_LEDGER_CHANGES_COUNT,
MAX_OPERATIONS_PER_BLOCK, MAX_PRODUCTION_STATS_LENGTH, MAX_ROLLS_COUNT_LENGTH, THREAD_COUNT,
MAX_OPERATIONS_PER_BLOCK, MAX_PRODUCTION_STATS_LENGTH, MAX_ROLLS_COUNT_LENGTH,
MIP_STORE_STATS_BLOCK_CONSIDERED, MIP_STORE_STATS_COUNTERS_MAX, THREAD_COUNT,
};
use massa_models::node::NodeId;
use massa_models::version::Version;
Expand Down Expand Up @@ -55,6 +56,8 @@ impl BootstrapClientBinder {
max_credits_length: MAX_DEFERRED_CREDITS_LENGTH,
max_executed_ops_length: MAX_EXECUTED_OPS_LENGTH,
max_ops_changes_length: MAX_EXECUTED_OPS_CHANGES_LENGTH,
mip_store_stats_block_considered: MIP_STORE_STATS_BLOCK_CONSIDERED,
mip_store_stats_counters_max: MIP_STORE_STATS_COUNTERS_MAX,
};
BootstrapClientBinder::new(client_duplex, remote_pubkey, cfg)
}
Expand Down
Loading

0 comments on commit 281b649

Please sign in to comment.