Skip to content

Commit

Permalink
Remove assumptions about knowing a peer in protocol and fix memory le…
Browse files Browse the repository at this point in the history
…ak (#3904)

* Add more logs to debug

* Remove debug

* Fix clippy warning

* Remove locks from loop

* Display all len of all vectors to spot memory leak

* Add print debug protocol

* Update massa-sc-runtime

* Clean cache from stored_operations

* Add more debug on peernet and massa.

* Remove debug prints

* Update peernet version in massa-node

* Update starting timestamp
  • Loading branch information
AurelienFT authored May 5, 2023
1 parent ba55fcd commit 93333bf
Show file tree
Hide file tree
Showing 13 changed files with 105 additions and 110 deletions.
25 changes: 13 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 7 additions & 8 deletions massa-async-pool/src/changes.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
///! Copyright (c) 2022 MASSA LABS <info@massa.net>
///! This file provides structures representing changes to the asynchronous message pool
use std::ops::Bound::Included;

use crate::{
message::{AsyncMessage, AsyncMessageId, AsyncMessageIdDeserializer, AsyncMessageIdSerializer},
AsyncMessageDeserializer, AsyncMessageSerializer,
};
use massa_serialization::{
Deserializer, SerializeError, Serializer, U64VarIntDeserializer, U64VarIntSerializer,
};
Expand All @@ -11,14 +18,6 @@ use nom::{
};
use serde::{Deserialize, Serialize};

///! Copyright (c) 2022 MASSA LABS <info@massa.net>
///! This file provides structures representing changes to the asynchronous message pool
use crate::{
message::{AsyncMessage, AsyncMessageId, AsyncMessageIdDeserializer, AsyncMessageIdSerializer},
AsyncMessageDeserializer, AsyncMessageSerializer,
};

/// Enum representing a value U with identifier T being added or deleted
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
pub enum Change<T, U> {
Expand Down
2 changes: 1 addition & 1 deletion massa-models/src/config/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ lazy_static::lazy_static! {
)
)
} else {
1683205200000.into() // Thursday, May 4, 2023 01:00:00 PM UTC
1683291600000.into() // Friday, May 5, 2023 01:00:00 PM UTC
};

/// TESTNET: time when the blockclique is ended.
Expand Down
2 changes: 1 addition & 1 deletion massa-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ tracing = { version = "0.1", features = [
"max_level_debug",
"release_max_level_debug",
] }
peernet = { git = "https://github.com/massalabs/PeerNet", rev = "959a7a651ff9c346b6fc5a123a5a44f0bd0cc8d5" }
peernet = { git = "https://github.com/massalabs/PeerNet", rev = "9300cb35b09347f3848c9a232f9aaf3f3f100ece" }
tracing-subscriber = "0.3"
paw = "1.0"
structopt = { version = "0.3", features = ["paw"] }
Expand Down
1 change: 0 additions & 1 deletion massa-pool-worker/src/denunciation_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ impl DenunciationPool {
if let Some(denunciation) = denunciation_ {
info!("Created a new denunciation : {:?}", denunciation);
}

self.cleanup_caches();
}

Expand Down
2 changes: 1 addition & 1 deletion massa-protocol-exports/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ thiserror = "1.0"
nom = "7.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
peernet = { git = "https://github.com/massalabs/PeerNet", rev = "959a7a651ff9c346b6fc5a123a5a44f0bd0cc8d5"}
peernet = { git = "https://github.com/massalabs/PeerNet", rev = "9300cb35b09347f3848c9a232f9aaf3f3f100ece" }
tempfile = { version = "3.3", optional = true } # use with testing feature
mockall = "0.11.4"

Expand Down
2 changes: 1 addition & 1 deletion massa-protocol-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ crossbeam = "0.8"
serde_json = "1.0"
nom = "7.1"
num_enum = "0.5"
peernet = { git = "https://github.com/massalabs/PeerNet", rev = "959a7a651ff9c346b6fc5a123a5a44f0bd0cc8d5"}
peernet = { git = "https://github.com/massalabs/PeerNet", rev = "9300cb35b09347f3848c9a232f9aaf3f3f100ece" }
tempfile = { version = "3.3", optional = true } # use with testing feature
rayon = "1.7.0"
lru = "0.10.0"
Expand Down
6 changes: 2 additions & 4 deletions massa-protocol-worker/src/connectivity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::{num::NonZeroUsize, sync::Arc};
use std::{thread::JoinHandle, time::Duration};
use tracing::warn;
use tracing::{info, warn};

use crate::{handlers::peer_handler::models::SharedPeerDB, worker::ProtocolChannels};
use crate::{handlers::peer_handler::PeerManagementHandler, messages::MessagesHandler};
Expand Down Expand Up @@ -235,9 +235,7 @@ pub(crate) fn start_connectivity_thread(
continue;
}
}
if config.debug {
println!("Trying to connect to peer {:?}", addr);
}
info!("Trying to connect to addr {} of peer {}", addr, peer_id);
// We only manage TCP for now
if let Err(err) = network_controller.try_connect(*addr, Duration::from_millis(200), &OutConnectionConfig::Tcp(Box::new(TcpOutConnectionConfig::new(config.read_write_limit_bytes_per_second / 10, Duration::from_millis(100))))) {
warn!("Failed to connect to peer {:?}: {:?}", addr, err);
Expand Down
128 changes: 64 additions & 64 deletions massa-protocol-worker/src/handlers/block_handler/retrieval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,6 @@ impl RetrievalThread {
known_ops.put(op_id.prefix(), ());
}
}

let info = if let Some(info) = self.block_wishlist.get_mut(&block_id) {
info
} else {
Expand All @@ -708,7 +707,6 @@ impl RetrievalThread {
}
return Ok(());
};

let header = if let Some(header) = &info.header {
header
} else {
Expand All @@ -722,7 +720,6 @@ impl RetrievalThread {
}
return Ok(());
};

if info.operation_ids.is_some() {
warn!(
"Peer {} sent us an operation list for block id {} but we already received it.",
Expand All @@ -737,7 +734,6 @@ impl RetrievalThread {
}
return Ok(());
}

let mut total_hash: Vec<u8> =
Vec::with_capacity(operation_ids.len().saturating_mul(HASH_SIZE_BYTES));
operation_ids.iter().for_each(|op_id| {
Expand Down Expand Up @@ -844,7 +840,6 @@ impl RetrievalThread {
}
return Ok(());
}

match self.block_wishlist.entry(block_id) {
Entry::Occupied(mut entry) => {
let info = entry.get_mut();
Expand Down Expand Up @@ -1038,28 +1033,28 @@ impl RetrievalThread {
Default::default();

// list blocks to re-ask and from whom
for (hash, block_info) in self.block_wishlist.iter() {
let required_info = if block_info.header.is_none() {
AskForBlocksInfo::Header
} else if block_info.operation_ids.is_none() {
AskForBlocksInfo::Info
} else {
let already_stored_operations = block_info.storage.get_op_refs();
// Unwrap safety: Check if `operation_ids` is none just above
AskForBlocksInfo::Operations(
block_info
.operation_ids
.as_ref()
.unwrap()
.iter()
.filter(|id| !already_stored_operations.contains(id))
.copied()
.collect(),
)
};
let mut needs_ask = true;
{
let mut cache_write = self.cache.write();
{
let mut cache_write = self.cache.write();
for (hash, block_info) in self.block_wishlist.iter() {
let required_info = if block_info.header.is_none() {
AskForBlocksInfo::Header
} else if block_info.operation_ids.is_none() {
AskForBlocksInfo::Info
} else {
let already_stored_operations = block_info.storage.get_op_refs();
// Unwrap safety: Check if `operation_ids` is none just above
AskForBlocksInfo::Operations(
block_info
.operation_ids
.as_ref()
.unwrap()
.iter()
.filter(|id| !already_stored_operations.contains(id))
.copied()
.collect(),
)
};
let mut needs_ask = true;
// Clean old peers that aren't active anymore
let peers_connected: HashSet<PeerId> =
self.active_connections.get_peer_ids_connected();
Expand Down Expand Up @@ -1202,45 +1197,50 @@ impl RetrievalThread {
)
})
.collect();
{
let cache_read = self.cache.read();
for (hash, criteria) in candidate_nodes.into_iter() {
// find the best node
if let Some((_knowledge, best_node, required_info, _)) = criteria
.into_iter()
.filter_map(|(knowledge, peer_id, required_info)| {
// filter out nodes with too many active block requests
if *active_block_req_count.get(&peer_id).unwrap_or(&0)
<= self.config.max_simultaneous_ask_blocks_per_node
{
cache_read
.blocks_known_by_peer
.peek(&peer_id)
.map(|peer_data| (knowledge, peer_id, required_info, peer_data.1))
} else {
None
}
})
.min_by_key(|(knowledge, peer_id, _, instant)| {
(
*knowledge, // block knowledge
*active_block_req_count.get(peer_id).unwrap_or(&0), // active requests
*instant, // node age
peer_id.clone(), // node ID
)
})
{
let asked_blocks = self.asked_blocks.get_mut(&best_node).unwrap(); // will not panic, already checked
asked_blocks.insert(hash, now);
if let Some(cnt) = active_block_req_count.get_mut(&best_node) {
*cnt += 1; // increase the number of actively asked blocks
}

for (hash, criteria) in candidate_nodes.into_iter() {
// find the best node
if let Some((_knowledge, best_node, required_info)) = criteria
.into_iter()
.filter(|(_knowledge, peer_id, _)| {
// filter out nodes with too many active block requests
*active_block_req_count.get(peer_id).unwrap_or(&0)
<= self.config.max_simultaneous_ask_blocks_per_node
})
.min_by_key(|(knowledge, peer_id, _)| {
(
*knowledge, // block knowledge
*active_block_req_count.get(peer_id).unwrap_or(&0), // active requests
self.cache
.read()
.blocks_known_by_peer
.peek(peer_id)
.unwrap()
.1, // node age (will not panic, already checked)
peer_id.clone(), // node ID
)
})
{
let asked_blocks = self.asked_blocks.get_mut(&best_node).unwrap(); // will not panic, already checked
asked_blocks.insert(hash, now);
if let Some(cnt) = active_block_req_count.get_mut(&best_node) {
*cnt += 1; // increase the number of actively asked blocks
}

ask_block_list
.entry(best_node.clone())
.or_insert_with(Vec::new)
.push((hash, required_info.clone()));
ask_block_list
.entry(best_node.clone())
.or_insert_with(Vec::new)
.push((hash, required_info.clone()));

let timeout_at = now
.checked_add(self.config.ask_block_timeout.into())
.ok_or(TimeError::TimeOverflowError)?;
next_tick = std::cmp::min(next_tick, timeout_at);
let timeout_at = now
.checked_add(self.config.ask_block_timeout.into())
.ok_or(TimeError::TimeOverflowError)?;
next_tick = std::cmp::min(next_tick, timeout_at);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,14 @@ impl RetrievalThread {
}

fn clear_storage(&mut self) {
for (instant, operations) in self.stored_operations.iter() {
self.stored_operations.retain(|instant, operations| {
if instant.elapsed() > self.config.asked_operations_pruning_period.to_duration() {
self.storage.drop_operation_refs(operations);
false
} else {
break;
true
}
}
});
}

fn note_operations_from_peer(
Expand Down
6 changes: 3 additions & 3 deletions massa-protocol-worker/src/handlers/peer_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,10 @@ impl InitConnectionHandler for MassaHandshake {
if !announcement.listeners.is_empty() {
peer_db_write
.index_by_newest
.retain(|_, peer_id_stored| peer_id_stored != peer_id);
.retain(|(_, peer_id_stored)| peer_id_stored != peer_id);
peer_db_write
.index_by_newest
.insert(Reverse(announcement.timestamp), peer_id.clone());
.insert((Reverse(announcement.timestamp), peer_id.clone()));
}
peer_db_write
.peers
Expand Down Expand Up @@ -504,7 +504,7 @@ impl InitConnectionHandler for MassaHandshake {
serializer.serialize_id(&msg, &mut buf).unwrap();
serializer.serialize(&msg, &mut buf).unwrap();
endpoint.send(buf.as_slice()).unwrap();
std::thread::sleep(Duration::from_millis(200));
std::thread::sleep(Duration::from_millis(500));
endpoint.shutdown();
});
Ok(())
Expand Down
Loading

0 comments on commit 93333bf

Please sign in to comment.