Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix new config from peernet. #3979

Merged
merged 2 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion massa-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ tracing = { version = "0.1", features = [
"max_level_debug",
"release_max_level_debug",
] }
peernet = { git = "https://github.com/massalabs/PeerNet", branch = "generic_peer_id" }
peernet = { git = "https://github.com/massalabs/PeerNet", branch = "remove_old_method" }
tracing-subscriber = "0.3"
paw = "1.0"
structopt = { version = "0.3", features = ["paw"] }
Expand Down
2 changes: 2 additions & 0 deletions massa-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ use massa_models::config::constants::{
ROLL_COUNT_TO_SLASH_ON_DENUNCIATION, ROLL_PRICE, SELECTOR_DRAW_CACHE_SIZE, T0, THREAD_COUNT,
VERSION,
};
use massa_models::config::MAX_MESSAGE_SIZE;
use massa_pool_exports::{PoolChannels, PoolConfig, PoolManager};
use massa_pool_worker::start_pool_controller;
use massa_pos_exports::{PoSConfig, SelectorConfig, SelectorManager};
Expand Down Expand Up @@ -557,6 +558,7 @@ async fn launch(
genesis_timestamp: *GENESIS_TIMESTAMP,
t0: T0,
endorsement_count: ENDORSEMENT_COUNT,
max_message_size: MAX_MESSAGE_SIZE as usize,
max_operations_propagation_time: SETTINGS.protocol.max_operations_propagation_time,
max_endorsements_propagation_time: SETTINGS.protocol.max_endorsements_propagation_time,
last_start_period: final_state.read().last_start_period,
Expand Down
2 changes: 1 addition & 1 deletion massa-protocol-exports/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ thiserror = "1.0"
nom = "7.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
peernet = { git = "https://github.com/massalabs/PeerNet", branch = "generic_peer_id" }
peernet = { git = "https://github.com/massalabs/PeerNet", branch = "remove_old_method" }
tempfile = { version = "3.3", optional = true } # use with testing feature
mockall = "0.11.4"

Expand Down
2 changes: 2 additions & 0 deletions massa-protocol-exports/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ pub struct ProtocolConfig {
pub max_operations_propagation_time: MassaTime,
/// max time we propagate endorsements
pub max_endorsements_propagation_time: MassaTime,
/// Max message size
pub max_message_size: usize,
/// number of thread tester
pub thread_tester_count: u8,
/// Max size of the channel for command to the connectivity thread
Expand Down
3 changes: 2 additions & 1 deletion massa-protocol-exports/src/test_exports/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;

use crate::{settings::PeerCategoryInfo, ProtocolConfig};
use massa_models::config::ENDORSEMENT_COUNT;
use massa_models::config::{ENDORSEMENT_COUNT, MAX_MESSAGE_SIZE};
use massa_time::MassaTime;
use tempfile::NamedTempFile;

Expand Down Expand Up @@ -59,6 +59,7 @@ impl Default for ProtocolConfig {
max_size_channel_network_to_peer_handler: 1000,
max_size_channel_commands_peer_testers: 10000,
max_size_channel_commands_peers: 300,
max_message_size: MAX_MESSAGE_SIZE as usize,
endorsement_count: ENDORSEMENT_COUNT,
max_size_block_infos: 200,
max_size_value_datastore: 1_000_000,
Expand Down
2 changes: 1 addition & 1 deletion massa-protocol-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ crossbeam = "0.8"
serde_json = "1.0"
nom = "7.1"
num_enum = "0.5"
peernet = { git = "https://github.com/massalabs/PeerNet", branch = "generic_peer_id" }
peernet = { git = "https://github.com/massalabs/PeerNet", branch = "remove_old_method" }
tempfile = { version = "3.3", optional = true } # use with testing feature
rayon = "1.7.0"
lru = "0.10.0"
Expand Down
5 changes: 2 additions & 3 deletions massa-protocol-worker/src/connectivity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use massa_protocol_exports::{PeerCategoryInfo, PeerId, ProtocolConfig, ProtocolE
use massa_storage::Storage;
use massa_versioning::versioning::MipStore;
use parking_lot::RwLock;
use peernet::transports::TcpOutConnectionConfig;
use peernet::{peer::PeerConnectionType, transports::OutConnectionConfig};
use peernet::peer::PeerConnectionType;
use std::net::SocketAddr;
use std::{collections::HashMap, net::IpAddr};
use std::{num::NonZeroUsize, sync::Arc};
Expand Down Expand Up @@ -272,7 +271,7 @@ pub(crate) fn start_connectivity_thread(
for addr in addresses_to_connect {
info!("Trying to connect to addr {}", addr);
// We only manage TCP for now
if let Err(err) = network_controller.try_connect(addr, config.timeout_connection.to_duration(), &OutConnectionConfig::Tcp(Box::new(TcpOutConnectionConfig::new(config.read_write_limit_bytes_per_second / 10, Duration::from_millis(100))))) {
if let Err(err) = network_controller.try_connect(addr, config.timeout_connection.to_duration()) {
warn!("Failed to connect to peer {:?}: {:?}", addr, err);
}
}
Expand Down
44 changes: 21 additions & 23 deletions massa-protocol-worker/src/handlers/block_handler/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use massa_models::{
},
secure_share::{SecureShareDeserializer, SecureShareSerializer},
};
use massa_serialization::{Deserializer, Serializer, U64VarIntDeserializer, U64VarIntSerializer};
use massa_serialization::{
Deserializer, SerializeError, Serializer, U64VarIntDeserializer, U64VarIntSerializer,
};
use nom::{
error::{context, ContextError, ParseError},
multi::length_count,
Expand Down Expand Up @@ -55,21 +57,6 @@ pub enum BlockMessage {
ReplyForBlocks(Vec<(BlockId, BlockInfoReply)>),
}

impl BlockMessage {
pub fn get_id(&self) -> MessageTypeId {
match self {
BlockMessage::BlockHeader(_) => MessageTypeId::BlockHeader,
BlockMessage::AskForBlocks(_) => MessageTypeId::AskForBlocks,
BlockMessage::ReplyForBlocks(_) => MessageTypeId::ReplyForBlocks,
}
}

pub fn max_id() -> u64 {
<MessageTypeId as Into<u64>>::into(MessageTypeId::ReplyForBlocks) + 1
}
}

// DO NOT FORGET TO UPDATE MAX ID IF YOU UPDATE THERE
#[derive(IntoPrimitive, Debug, Eq, PartialEq, TryFromPrimitive)]
#[repr(u64)]
pub enum MessageTypeId {
Expand All @@ -78,6 +65,16 @@ pub enum MessageTypeId {
ReplyForBlocks,
}

impl From<&BlockMessage> for MessageTypeId {
fn from(value: &BlockMessage) -> Self {
match value {
BlockMessage::BlockHeader(_) => MessageTypeId::BlockHeader,
BlockMessage::AskForBlocks(_) => MessageTypeId::AskForBlocks,
BlockMessage::ReplyForBlocks(_) => MessageTypeId::ReplyForBlocks,
}
}
}

#[derive(IntoPrimitive, Debug, Eq, PartialEq, TryFromPrimitive)]
#[repr(u64)]
pub enum BlockInfoType {
Expand Down Expand Up @@ -114,6 +111,12 @@ impl Serializer<BlockMessage> for BlockMessageSerializer {
value: &BlockMessage,
buffer: &mut Vec<u8>,
) -> Result<(), massa_serialization::SerializeError> {
self.id_serializer.serialize(
&MessageTypeId::from(value).try_into().map_err(|_| {
SerializeError::GeneralError(String::from("Failed to serialize id"))
})?,
buffer,
)?;
match value {
BlockMessage::BlockHeader(endorsements) => {
self.secure_share_serializer
Expand Down Expand Up @@ -189,7 +192,6 @@ impl Serializer<BlockMessage> for BlockMessageSerializer {
}

pub struct BlockMessageDeserializer {
message_id: u64,
id_deserializer: U64VarIntDeserializer,
block_header_deserializer: SecureShareDeserializer<BlockHeader, BlockHeaderDeserializer>,
block_infos_length_deserializer: U64VarIntDeserializer,
Expand All @@ -216,7 +218,6 @@ pub struct BlockMessageDeserializerArgs {
impl BlockMessageDeserializer {
pub fn new(args: BlockMessageDeserializerArgs) -> Self {
Self {
message_id: 0,
id_deserializer: U64VarIntDeserializer::new(Included(0), Included(u64::MAX)),
block_header_deserializer: SecureShareDeserializer::new(BlockHeaderDeserializer::new(
args.thread_count,
Expand All @@ -243,10 +244,6 @@ impl BlockMessageDeserializer {
),
}
}

pub fn set_message_id(&mut self, message_id: u64) {
self.message_id = message_id;
}
}

impl Deserializer<BlockMessage> for BlockMessageDeserializer {
Expand All @@ -255,7 +252,8 @@ impl Deserializer<BlockMessage> for BlockMessageDeserializer {
buffer: &'a [u8],
) -> IResult<&'a [u8], BlockMessage, E> {
context("Failed BlockMessage deserialization", |buffer| {
let id = MessageTypeId::try_from(self.message_id).map_err(|_| {
let (buffer, raw_id) = self.id_deserializer.deserialize(buffer)?;
let id = MessageTypeId::try_from(raw_id).map_err(|_| {
nom::Err::Error(ParseError::from_error_kind(
buffer,
nom::error::ErrorKind::Eof,
Expand Down
5 changes: 2 additions & 3 deletions massa-protocol-worker/src/handlers/block_handler/retrieval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub struct RetrievalThread {

impl RetrievalThread {
fn run(&mut self) {
let mut block_message_deserializer =
let block_message_deserializer =
BlockMessageDeserializer::new(BlockMessageDeserializerArgs {
thread_count: self.config.thread_count,
endorsement_count: self.config.endorsement_count,
Expand All @@ -124,8 +124,7 @@ impl RetrievalThread {
select! {
recv(self.receiver_network) -> msg => {
match msg {
Ok((peer_id, message_id, message)) => {
block_message_deserializer.set_message_id(message_id);
Ok((peer_id, message)) => {
let (rest, message) = match block_message_deserializer
.deserialize::<DeserializeError>(&message) {
Ok((rest, message)) => (rest, message),
Expand Down
44 changes: 23 additions & 21 deletions massa-protocol-worker/src/handlers/endorsement_handler/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use massa_models::{
endorsement::{Endorsement, EndorsementDeserializer, SecureShareEndorsement},
secure_share::{SecureShareDeserializer, SecureShareSerializer},
};
use massa_serialization::{Deserializer, Serializer, U64VarIntDeserializer, U64VarIntSerializer};
use massa_serialization::{
Deserializer, SerializeError, Serializer, U64VarIntDeserializer, U64VarIntSerializer,
};
use nom::{
error::{context, ContextError, ParseError},
multi::length_count,
Expand All @@ -17,34 +19,31 @@ pub enum EndorsementMessage {
Endorsements(Vec<SecureShareEndorsement>),
}

impl EndorsementMessage {
pub fn get_id(&self) -> MessageTypeId {
match self {
EndorsementMessage::Endorsements(_) => MessageTypeId::Endorsements,
}
}

pub fn max_id() -> u64 {
<MessageTypeId as Into<u64>>::into(MessageTypeId::Endorsements) + 1
}
}

// DO NOT FORGET TO UPDATE MAX ID IF YOU UPDATE THERE
#[derive(IntoPrimitive, Debug, Eq, PartialEq, TryFromPrimitive)]
#[repr(u64)]
pub enum MessageTypeId {
Endorsements,
}

impl From<&EndorsementMessage> for MessageTypeId {
fn from(message: &EndorsementMessage) -> Self {
match message {
EndorsementMessage::Endorsements(_) => MessageTypeId::Endorsements,
}
}
}

#[derive(Default, Clone)]
pub struct EndorsementMessageSerializer {
id_serializer: U64VarIntSerializer,
length_endorsements_serializer: U64VarIntSerializer,
secure_share_serializer: SecureShareSerializer,
}

impl EndorsementMessageSerializer {
pub fn new() -> Self {
Self {
id_serializer: U64VarIntSerializer::new(),
length_endorsements_serializer: U64VarIntSerializer::new(),
secure_share_serializer: SecureShareSerializer::new(),
}
Expand All @@ -57,6 +56,12 @@ impl Serializer<EndorsementMessage> for EndorsementMessageSerializer {
value: &EndorsementMessage,
buffer: &mut Vec<u8>,
) -> Result<(), massa_serialization::SerializeError> {
self.id_serializer.serialize(
&MessageTypeId::from(value).try_into().map_err(|_| {
SerializeError::GeneralError(String::from("Failed to serialize id"))
})?,
buffer,
)?;
match value {
EndorsementMessage::Endorsements(endorsements) => {
self.length_endorsements_serializer
Expand All @@ -78,15 +83,15 @@ pub struct EndorsementMessageDeserializerArgs {
}

pub struct EndorsementMessageDeserializer {
message_id: u64,
id_deserializer: U64VarIntDeserializer,
length_endorsements_deserializer: U64VarIntDeserializer,
secure_share_deserializer: SecureShareDeserializer<Endorsement, EndorsementDeserializer>,
}

impl EndorsementMessageDeserializer {
pub fn new(args: EndorsementMessageDeserializerArgs) -> Self {
Self {
message_id: 0,
id_deserializer: U64VarIntDeserializer::new(Included(0), Included(u64::MAX)),
length_endorsements_deserializer: U64VarIntDeserializer::new(
Included(0),
Included(args.max_length_endorsements),
Expand All @@ -97,10 +102,6 @@ impl EndorsementMessageDeserializer {
)),
}
}

pub fn set_message_id(&mut self, message_id: u64) {
self.message_id = message_id;
}
}

impl Deserializer<EndorsementMessage> for EndorsementMessageDeserializer {
Expand All @@ -109,7 +110,8 @@ impl Deserializer<EndorsementMessage> for EndorsementMessageDeserializer {
buffer: &'a [u8],
) -> IResult<&'a [u8], EndorsementMessage, E> {
context("Failed EndorsementMessage deserialization", |buffer| {
let id = MessageTypeId::try_from(self.message_id).map_err(|_| {
let (buffer, raw_id) = self.id_deserializer.deserialize(buffer)?;
let id = MessageTypeId::try_from(raw_id).map_err(|_| {
nom::Err::Error(ParseError::from_error_kind(
buffer,
nom::error::ErrorKind::Eof,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub struct RetrievalThread {

impl RetrievalThread {
fn run(&mut self) {
let mut endorsement_message_deserializer =
let endorsement_message_deserializer =
EndorsementMessageDeserializer::new(EndorsementMessageDeserializerArgs {
thread_count: self.config.thread_count,
max_length_endorsements: self.config.max_endorsements_per_message,
Expand All @@ -57,8 +57,7 @@ impl RetrievalThread {
select! {
recv(self.receiver) -> msg => {
match msg {
Ok((peer_id, message_id, message)) => {
endorsement_message_deserializer.set_message_id(message_id);
Ok((peer_id, message)) => {
let (rest, message) = match endorsement_message_deserializer
.deserialize::<DeserializeError>(&message) {
Ok((rest, message)) => (rest, message),
Expand Down
Loading