diff --git a/Cargo.lock b/Cargo.lock index 5ea289bc4ca..50f91b43474 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2760,6 +2760,7 @@ dependencies = [ "futures", "log", "near-chain", + "near-chunks-primitives", "near-crypto", "near-logger-utils", "near-network", @@ -2771,6 +2772,13 @@ dependencies = [ "serde", ] +[[package]] +name = "near-chunks-primitives" +version = "0.1.0" +dependencies = [ + "near-chain-primitives", +] + [[package]] name = "near-client" version = "0.1.0" @@ -2821,9 +2829,9 @@ dependencies = [ "chrono", "near-chain-configs", "near-chain-primitives", - "near-chunks", + "near-chunks-primitives", "near-crypto", - "near-network", + "near-network-primitives", "near-primitives", "serde", "strum", @@ -3018,14 +3026,13 @@ dependencies = [ "near-crypto", "near-logger-utils", "near-metrics", + "near-network-primitives", "near-performance-metrics", "near-performance-metrics-macros", "near-primitives", "near-rust-allocator-proxy", "near-store", "rand 0.7.3", - "serde", - "serde_json", "strum", "tempfile", "tokio", @@ -3034,6 +3041,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "near-network-primitives" +version = "0.1.0" +dependencies = [ + "actix", + "borsh 0.8.1", + "chrono", + "near-crypto", + "near-primitives", + "serde", + "strum", + "tokio", + "tracing", +] + [[package]] name = "near-performance-metrics" version = "0.1.0" diff --git a/chain/chunks-primitives/Cargo.toml b/chain/chunks-primitives/Cargo.toml new file mode 100644 index 00000000000..c0d723d61c7 --- /dev/null +++ b/chain/chunks-primitives/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "near-chunks-primitives" +version = "0.1.0" +authors = ["Near Inc "] +edition = "2018" + +[dependencies] +near-chain-primitives = { path = "../chain-primitives" } diff --git a/chain/chunks/src/types.rs b/chain/chunks-primitives/src/error.rs similarity index 64% rename from chain/chunks/src/types.rs rename to chain/chunks-primitives/src/error.rs index 696eb1caa28..70845b6d658 100644 --- a/chain/chunks/src/types.rs +++ b/chain/chunks-primitives/src/error.rs @@ -1,3 +1,5 @@ +use std::fmt; + #[derive(Debug)] pub enum Error { InvalidPartMessage, @@ -10,12 +12,12 @@ pub enum Error { DuplicateChunkHeight, UnknownChunk, KnownPart, - ChainError(near_chain::Error), + ChainError(near_chain_primitives::Error), IOError(std::io::Error), } -impl std::fmt::Display for Error { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { write!(f, "{:?}", self) } } @@ -26,8 +28,8 @@ impl From for Error { } } -impl From for Error { - fn from(err: near_chain::Error) -> Self { +impl From for Error { + fn from(err: near_chain_primitives::Error) -> Self { Error::ChainError(err) } } diff --git a/chain/chunks-primitives/src/lib.rs b/chain/chunks-primitives/src/lib.rs new file mode 100644 index 00000000000..8894df6a2c7 --- /dev/null +++ b/chain/chunks-primitives/src/lib.rs @@ -0,0 +1,3 @@ +mod error; + +pub use error::Error; diff --git a/chain/chunks/Cargo.toml b/chain/chunks/Cargo.toml index c902a8d9b38..f9c93e0a7e9 100644 --- a/chain/chunks/Cargo.toml +++ b/chain/chunks/Cargo.toml @@ -17,6 +17,7 @@ reed-solomon-erasure = "4" near-crypto = { path = "../../core/crypto" } near-primitives = { path = "../../core/primitives" } +near-chunks-primitives = { path = "../chunks-primitives" } near-store = { path = "../../core/store" } near-network = { path = "../network" } near-chain = { path = "../chain" } diff --git a/chain/chunks/src/lib.rs b/chain/chunks/src/lib.rs index cf99586a15f..a6b461ca00f 100644 --- a/chain/chunks/src/lib.rs +++ b/chain/chunks/src/lib.rs @@ -40,12 +40,11 @@ use near_primitives::version::ProtocolVersion; use near_primitives::{checked_feature, unwrap_or_return}; use crate::chunk_cache::{EncodedChunksCache, EncodedChunksCacheEntry}; -pub use crate::types::Error; +pub use near_chunks_primitives::Error; use rand::Rng; mod chunk_cache; pub mod test_utils; -mod types; const CHUNK_PRODUCER_BLACKLIST_SIZE: usize = 100; pub const CHUNK_REQUEST_RETRY_MS: u64 = 100; diff --git a/chain/client-primitives/Cargo.toml b/chain/client-primitives/Cargo.toml index d8d008a9fbb..8dba947b769 100644 --- a/chain/client-primitives/Cargo.toml +++ b/chain/client-primitives/Cargo.toml @@ -16,10 +16,7 @@ thiserror = "1.0" near-chain-primitives = { path = "../chain-primitives" } near-chain-configs = { path = "../../core/chain-configs" } -near-chunks = { path = "../chunks" } +near-chunks-primitives = { path = "../chunks-primitives" } near-crypto = { path = "../../core/crypto" } -near-network = { path = "../network" } +near-network-primitives = { path = "../network-primitives" } near-primitives = { path = "../../core/primitives" } - -[features] -metric_recorder = [] diff --git a/chain/client-primitives/src/types.rs b/chain/client-primitives/src/types.rs index 0e7cff968af..e1971099ebc 100644 --- a/chain/client-primitives/src/types.rs +++ b/chain/client-primitives/src/types.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "metric_recorder")] -use near_network::recorder::MetricRecorder; use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -9,8 +7,7 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use near_chain_configs::ProtocolConfigView; -use near_network::types::{AccountOrPeerIdOrHash, KnownProducer}; -use near_network::PeerInfo; +use near_network_primitives::types::{AccountOrPeerIdOrHash, KnownProducer, PeerInfo}; use near_primitives::errors::InvalidTxError; use near_primitives::hash::CryptoHash; use near_primitives::merkle::{MerklePath, PartialMerkleTree}; @@ -33,7 +30,7 @@ pub use near_primitives::views::{StatusResponse, StatusSyncInfo}; #[derive(Debug)] pub enum Error { Chain(near_chain_primitives::Error), - Chunk(near_chunks::Error), + Chunk(near_chunks_primitives::Error), BlockProducer(String), ChunkProducer(String), Other(String), @@ -66,8 +63,8 @@ impl From for Error { } } -impl From for Error { - fn from(err: near_chunks::Error) -> Self { +impl From for Error { + fn from(err: near_chunks_primitives::Error) -> Self { Error::Chunk(err) } } @@ -469,8 +466,6 @@ pub struct NetworkInfoResponse { pub received_bytes_per_sec: u64, /// Accounts of known block and chunk producers from routing table. pub known_producers: Vec, - #[cfg(feature = "metric_recorder")] - pub metric_recorder: MetricRecorder, } /// Status of given transaction including all the subsequent receipts. diff --git a/chain/client/Cargo.toml b/chain/client/Cargo.toml index a9bbf063596..6d75ce6f651 100644 --- a/chain/client/Cargo.toml +++ b/chain/client/Cargo.toml @@ -52,7 +52,6 @@ near-test-contracts = { path = "../../runtime/near-test-contracts" } byzantine_asserts = ["near-chain/byzantine_asserts"] expensive_tests = [] adversarial = ["near-network/adversarial", "near-chain/adversarial"] -metric_recorder = ["near-client-primitives/metric_recorder"] delay_detector = ["near-chain/delay_detector", "near-network/delay_detector", "delay-detector"] protocol_feature_block_header_v3 = ["near-primitives/protocol_feature_block_header_v3", "near-chain/protocol_feature_block_header_v3", "near-store/protocol_feature_block_header_v3"] nightly_protocol = [] diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index 71b42eea548..4f4a9dd5d88 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -25,8 +25,6 @@ use near_chain_configs::ClientConfig; #[cfg(feature = "adversarial")] use near_chain_configs::GenesisConfig; use near_crypto::Signature; -#[cfg(feature = "metric_recorder")] -use near_network::recorder::MetricRecorder; #[cfg(feature = "adversarial")] use near_network::types::NetworkAdversarialMessage; use near_network::types::{NetworkInfo, ReasonForBan}; @@ -154,8 +152,6 @@ impl ClientActor { received_bytes_per_sec: 0, sent_bytes_per_sec: 0, known_producers: vec![], - #[cfg(feature = "metric_recorder")] - metric_recorder: MetricRecorder::default(), peer_counter: 0, }, last_validator_announce_time: None, @@ -652,8 +648,6 @@ impl Handler for ClientActor { sent_bytes_per_sec: self.network_info.sent_bytes_per_sec, received_bytes_per_sec: self.network_info.received_bytes_per_sec, known_producers: self.network_info.known_producers.clone(), - #[cfg(feature = "metric_recorder")] - metric_recorder: self.network_info.metric_recorder.clone(), }) } } diff --git a/chain/client/src/test_utils.rs b/chain/client/src/test_utils.rs index afa3202a64c..ea8f2901e57 100644 --- a/chain/client/src/test_utils.rs +++ b/chain/client/src/test_utils.rs @@ -18,8 +18,6 @@ use near_chain::{ }; use near_chain_configs::ClientConfig; use near_crypto::{InMemorySigner, KeyType, PublicKey}; -#[cfg(feature = "metric_recorder")] -use near_network::recorder::MetricRecorder; use near_network::routing::EdgeInfo; use near_network::types::{ AccountOrPeerIdOrHash, NetworkInfo, NetworkViewClientMessages, NetworkViewClientResponses, @@ -487,8 +485,6 @@ pub fn setup_mock_all_validators( sent_bytes_per_sec: 0, received_bytes_per_sec: 0, known_producers: vec![], - #[cfg(feature = "metric_recorder")] - metric_recorder: MetricRecorder::default(), peer_counter: 0, }; client_addr.do_send(NetworkClientMessages::NetworkInfo(info)); diff --git a/chain/network-primitives/Cargo.toml b/chain/network-primitives/Cargo.toml new file mode 100644 index 00000000000..46d8028bad2 --- /dev/null +++ b/chain/network-primitives/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "near-network-primitives" +version = "0.1.0" +authors = ["Near Inc "] +edition = "2018" + +[dependencies] +actix = "=0.11.0-beta.2" +tokio = { version = "1.1", features = ["full"] } +chrono = { version = "0.4.4", features = ["serde"] } +borsh = "0.8.1" +serde = { version = "1", features = [ "derive" ] } +strum = { version = "0.20", features = ["derive"] } +tracing = "0.1.13" + +near-crypto = { path = "../../core/crypto" } +near-primitives = { path = "../../core/primitives" } + +[features] +adversarial = [] +sandbox = [] diff --git a/chain/network-primitives/src/lib.rs b/chain/network-primitives/src/lib.rs new file mode 100644 index 00000000000..cd408564ea0 --- /dev/null +++ b/chain/network-primitives/src/lib.rs @@ -0,0 +1 @@ +pub mod types; diff --git a/chain/network-primitives/src/types.rs b/chain/network-primitives/src/types.rs new file mode 100644 index 00000000000..e6a27401f57 --- /dev/null +++ b/chain/network-primitives/src/types.rs @@ -0,0 +1,1109 @@ +use std::collections::{HashMap, HashSet}; +use std::convert::{Into, TryFrom}; +use std::fmt; +use std::net::{AddrParseError, IpAddr, SocketAddr}; +use std::str::FromStr; +use std::time::Duration; + +use actix::dev::{MessageResponse, ResponseChannel}; +use actix::{Actor, Message}; +use borsh::{BorshDeserialize, BorshSerialize}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use strum::AsStaticStr; +use tokio::net::TcpStream; +use tracing::{error, warn}; + +use near_crypto::{KeyType, PublicKey, SecretKey, Signature}; +use near_primitives::block::{Approval, Block, BlockHeader, GenesisId}; +use near_primitives::hash::{hash, CryptoHash}; +use near_primitives::network::{AnnounceAccount, PeerId}; +use near_primitives::sharding::{ + ChunkHash, PartialEncodedChunk, PartialEncodedChunkPart, PartialEncodedChunkV1, + PartialEncodedChunkWithArcReceipts, ReceiptProof, ShardChunkHeader, +}; +#[cfg(feature = "sandbox")] +use near_primitives::state_record::StateRecord; +use near_primitives::syncing::{ + EpochSyncFinalizationResponse, EpochSyncResponse, ShardStateSyncResponse, + ShardStateSyncResponseV1, +}; +use near_primitives::transaction::{ExecutionOutcomeWithIdAndProof, SignedTransaction}; +use near_primitives::types::{AccountId, BlockHeight, BlockReference, EpochId, ShardId}; +use near_primitives::utils::{from_timestamp, to_timestamp}; +use near_primitives::views::{FinalExecutionOutcomeView, QueryRequest, QueryResponse}; + +use std::fmt::{Debug, Error, Formatter}; + +use near_primitives::merkle::combine_hash; + +/// Number of hops a message is allowed to travel before being dropped. +/// This is used to avoid infinite loop because of inconsistent view of the network +/// by different nodes. +pub const ROUTED_MESSAGE_TTL: u8 = 100; +/// On every message from peer don't update `last_time_received_message` +/// but wait some "small" timeout between updates to avoid a lot of messages between +/// Peer and PeerManager. +pub const UPDATE_INTERVAL_LAST_TIME_RECEIVED_MESSAGE: Duration = Duration::from_secs(60); + +/// Peer information. +#[derive(BorshSerialize, BorshDeserialize, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct PeerInfo { + pub id: PeerId, + pub addr: Option, + pub account_id: Option, +} + +impl PeerInfo { + /// Creates random peer info. + pub fn new(id: PeerId, addr: SocketAddr) -> Self { + PeerInfo { id, addr: Some(addr), account_id: None } + } + + pub fn random() -> Self { + PeerInfo { id: PeerId::random(), addr: None, account_id: None } + } + + pub fn addr_port(&self) -> Option { + self.addr.map(|addr| addr.port()) + } +} + +// Note, `Display` automatically implements `ToString` which must be reciprocal to `FromStr`. +impl fmt::Display for PeerInfo { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.id)?; + if let Some(addr) = &self.addr { + write!(f, "@{}", addr)?; + } + if let Some(account_id) = &self.account_id { + write!(f, "@{}", account_id)?; + } + Ok(()) + } +} + +impl FromStr for PeerInfo { + type Err = Box; + + fn from_str(s: &str) -> Result { + let chunks: Vec<&str> = s.split('@').collect(); + let addr; + let account_id; + if chunks.len() == 1 { + addr = None; + account_id = None; + } else if chunks.len() == 2 { + if let Ok(x) = chunks[1].parse::() { + addr = Some(x); + account_id = None; + } else { + addr = None; + account_id = Some(chunks[1].parse().unwrap()); + } + } else if chunks.len() == 3 { + addr = Some(chunks[1].parse::()?); + account_id = Some(chunks[2].parse().unwrap()); + } else { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Invalid PeerInfo format: {:?}", chunks), + ))); + } + Ok(PeerInfo { id: PeerId(chunks[0].parse()?), addr, account_id }) + } +} + +impl TryFrom<&str> for PeerInfo { + type Error = Box; + + fn try_from(s: &str) -> Result { + Self::from_str(s) + } +} + +/// Peer chain information. +/// TODO: Remove in next version +#[derive(BorshSerialize, BorshDeserialize, Clone, Debug, Eq, PartialEq, Default)] +pub struct PeerChainInfo { + /// Chain Id and hash of genesis block. + pub genesis_id: GenesisId, + /// Last known chain height of the peer. + pub height: BlockHeight, + /// Shards that the peer is tracking. + pub tracked_shards: Vec, +} + +/// Peer chain information. +#[derive(BorshSerialize, BorshDeserialize, Clone, Debug, Eq, PartialEq, Default)] +pub struct PeerChainInfoV2 { + /// Chain Id and hash of genesis block. + pub genesis_id: GenesisId, + /// Last known chain height of the peer. + pub height: BlockHeight, + /// Shards that the peer is tracking. + pub tracked_shards: Vec, + /// Denote if a node is running in archival mode or not. + pub archival: bool, +} + +impl From for PeerChainInfoV2 { + fn from(peer_chain_info: PeerChainInfo) -> Self { + Self { + genesis_id: peer_chain_info.genesis_id, + height: peer_chain_info.height, + tracked_shards: peer_chain_info.tracked_shards, + archival: false, + } + } +} + +/// Peer type. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub enum PeerType { + /// Inbound session + Inbound, + /// Outbound session + Outbound, +} + +/// Peer status. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub enum PeerStatus { + /// Waiting for handshake. + Connecting, + /// Ready to go. + Ready, + /// Banned, should shutdown this peer. + Banned(ReasonForBan), +} + +/// Account route description +#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)] +pub struct AnnounceAccountRoute { + pub peer_id: PeerId, + pub hash: CryptoHash, + pub signature: Signature, +} + +#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)] +pub struct Ping { + pub nonce: u64, + pub source: PeerId, +} + +#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)] +pub struct Pong { + pub nonce: u64, + pub source: PeerId, +} + +// TODO(#1313): Use Box +#[derive( + BorshSerialize, + BorshDeserialize, + PartialEq, + Eq, + Clone, + strum::AsStaticStr, + strum::EnumVariantNames, +)] +#[allow(clippy::large_enum_variant)] +pub enum RoutedMessageBody { + BlockApproval(Approval), + ForwardTx(SignedTransaction), + + TxStatusRequest(AccountId, CryptoHash), + TxStatusResponse(FinalExecutionOutcomeView), + QueryRequest { + query_id: String, + block_reference: BlockReference, + request: QueryRequest, + }, + QueryResponse { + query_id: String, + response: Result, + }, + ReceiptOutcomeRequest(CryptoHash), + /// Not used, but needed to preserve backward compatibility. + Unused, + StateRequestHeader(ShardId, CryptoHash), + StateRequestPart(ShardId, CryptoHash, u64), + StateResponse(StateResponseInfoV1), + PartialEncodedChunkRequest(PartialEncodedChunkRequestMsg), + PartialEncodedChunkResponse(PartialEncodedChunkResponseMsg), + PartialEncodedChunk(PartialEncodedChunkV1), + /// Ping/Pong used for testing networking and routing. + Ping(Ping), + Pong(Pong), + VersionedPartialEncodedChunk(PartialEncodedChunk), + VersionedStateResponse(StateResponseInfo), + PartialEncodedChunkForward(PartialEncodedChunkForwardMsg), +} + +impl From for RoutedMessageBody { + fn from(pec: PartialEncodedChunkWithArcReceipts) -> Self { + if let ShardChunkHeader::V1(legacy_header) = pec.header { + Self::PartialEncodedChunk(PartialEncodedChunkV1 { + header: legacy_header, + parts: pec.parts, + receipts: pec.receipts.into_iter().map(|r| ReceiptProof::clone(&r)).collect(), + }) + } else { + Self::VersionedPartialEncodedChunk(pec.into()) + } + } +} + +impl Debug for RoutedMessageBody { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { + match self { + RoutedMessageBody::BlockApproval(approval) => write!( + f, + "Approval({}, {}, {:?})", + approval.target_height, approval.account_id, approval.inner + ), + RoutedMessageBody::ForwardTx(tx) => write!(f, "tx {}", tx.get_hash()), + RoutedMessageBody::TxStatusRequest(account_id, hash) => { + write!(f, "TxStatusRequest({}, {})", account_id, hash) + } + RoutedMessageBody::TxStatusResponse(response) => { + write!(f, "TxStatusResponse({})", response.transaction.hash) + } + RoutedMessageBody::QueryRequest { .. } => write!(f, "QueryRequest"), + RoutedMessageBody::QueryResponse { .. } => write!(f, "QueryResponse"), + RoutedMessageBody::ReceiptOutcomeRequest(hash) => write!(f, "ReceiptRequest({})", hash), + RoutedMessageBody::StateRequestHeader(shard_id, sync_hash) => { + write!(f, "StateRequestHeader({}, {})", shard_id, sync_hash) + } + RoutedMessageBody::StateRequestPart(shard_id, sync_hash, part_id) => { + write!(f, "StateRequestPart({}, {}, {})", shard_id, sync_hash, part_id) + } + RoutedMessageBody::StateResponse(response) => { + write!(f, "StateResponse({}, {})", response.shard_id, response.sync_hash) + } + RoutedMessageBody::PartialEncodedChunkRequest(request) => { + write!(f, "PartialChunkRequest({:?}, {:?})", request.chunk_hash, request.part_ords) + } + RoutedMessageBody::PartialEncodedChunkResponse(response) => write!( + f, + "PartialChunkResponse({:?}, {:?})", + response.chunk_hash, + response.parts.iter().map(|p| p.part_ord).collect::>() + ), + RoutedMessageBody::PartialEncodedChunk(chunk) => { + write!(f, "PartialChunk({:?})", chunk.header.hash) + } + RoutedMessageBody::VersionedPartialEncodedChunk(_) => { + write!(f, "VersionedPartialChunk(?)") + } + RoutedMessageBody::VersionedStateResponse(response) => write!( + f, + "VersionedStateResponse({}, {})", + response.shard_id(), + response.sync_hash() + ), + RoutedMessageBody::PartialEncodedChunkForward(forward) => write!( + f, + "PartialChunkForward({:?}, {:?})", + forward.chunk_hash, + forward.parts.iter().map(|p| p.part_ord).collect::>(), + ), + RoutedMessageBody::Ping(_) => write!(f, "Ping"), + RoutedMessageBody::Pong(_) => write!(f, "Pong"), + RoutedMessageBody::Unused => write!(f, "Unused"), + } + } +} + +#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)] +pub enum PeerIdOrHash { + PeerId(PeerId), + Hash(CryptoHash), +} + +#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize, Hash)] +// Defines the destination for a network request. +// The request should be sent either to the `account_id` as a routed message, or directly to +// any peer that tracks the shard. +// If `prefer_peer` is `true`, should be sent to the peer, unless no peer tracks the shard, in which +// case fall back to sending to the account. +// Otherwise, send to the account, unless we do not know the route, in which case send to the peer. +pub struct AccountIdOrPeerTrackingShard { + pub shard_id: ShardId, + pub only_archival: bool, + pub account_id: Option, + pub prefer_peer: bool, +} + +impl AccountIdOrPeerTrackingShard { + pub fn from_account(shard_id: ShardId, account_id: AccountId) -> Self { + Self { shard_id, only_archival: false, account_id: Some(account_id), prefer_peer: false } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize, Hash)] +pub enum AccountOrPeerIdOrHash { + AccountId(AccountId), + PeerId(PeerId), + Hash(CryptoHash), +} + +impl AccountOrPeerIdOrHash { + fn peer_id_or_hash(&self) -> Option { + match self { + AccountOrPeerIdOrHash::AccountId(_) => None, + AccountOrPeerIdOrHash::PeerId(peer_id) => Some(PeerIdOrHash::PeerId(peer_id.clone())), + AccountOrPeerIdOrHash::Hash(hash) => Some(PeerIdOrHash::Hash(hash.clone())), + } + } +} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct RawRoutedMessage { + pub target: AccountOrPeerIdOrHash, + pub body: RoutedMessageBody, +} + +impl RawRoutedMessage { + /// Add signature to the message. + /// Panics if the target is an AccountId instead of a PeerId. + pub fn sign( + self, + author: PeerId, + secret_key: &SecretKey, + routed_message_ttl: u8, + ) -> RoutedMessage { + let target = self.target.peer_id_or_hash().unwrap(); + let hash = RoutedMessage::build_hash(&target, &author, &self.body); + let signature = secret_key.sign(hash.as_ref()); + RoutedMessage { target, author, signature, ttl: routed_message_ttl, body: self.body } + } +} + +#[derive(BorshSerialize, PartialEq, Eq, Clone, Debug)] +pub struct RoutedMessageNoSignature<'a> { + target: &'a PeerIdOrHash, + author: &'a PeerId, + body: &'a RoutedMessageBody, +} + +/// RoutedMessage represent a package that will travel the network towards a specific peer id. +/// It contains the peer_id and signature from the original sender. Every intermediate peer in the +/// route must verify that this signature is valid otherwise previous sender of this package should +/// be banned. If the final receiver of this package finds that the body is invalid the original +/// sender of the package should be banned instead. +/// If target is hash, it is a message that should be routed back using the same path used to route +/// the request in first place. It is the hash of the request message. +#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)] +pub struct RoutedMessage { + /// Peer id which is directed this message. + /// If `target` is hash, this a message should be routed back. + pub target: PeerIdOrHash, + /// Original sender of this message + pub author: PeerId, + /// Signature from the author of the message. If this signature is invalid we should ban + /// last sender of this message. If the message is invalid we should ben author of the message. + pub signature: Signature, + /// Time to live for this message. After passing through some hop this number should be + /// decreased by 1. If this number is 0, drop this message. + pub ttl: u8, + /// Message + pub body: RoutedMessageBody, +} + +impl RoutedMessage { + pub fn build_hash( + target: &PeerIdOrHash, + source: &PeerId, + body: &RoutedMessageBody, + ) -> CryptoHash { + hash( + &RoutedMessageNoSignature { target, author: source, body } + .try_to_vec() + .expect("Failed to serialize"), + ) + } + + pub fn hash(&self) -> CryptoHash { + RoutedMessage::build_hash(&self.target, &self.author, &self.body) + } + + pub fn verify(&self) -> bool { + self.signature.verify(self.hash().as_ref(), &self.author.public_key()) + } + + pub fn expect_response(&self) -> bool { + match self.body { + RoutedMessageBody::Ping(_) + | RoutedMessageBody::TxStatusRequest(_, _) + | RoutedMessageBody::StateRequestHeader(_, _) + | RoutedMessageBody::StateRequestPart(_, _, _) + | RoutedMessageBody::PartialEncodedChunkRequest(_) + | RoutedMessageBody::QueryRequest { .. } + | RoutedMessageBody::ReceiptOutcomeRequest(_) => true, + _ => false, + } + } + + /// Return true if ttl is positive after decreasing ttl by one, false otherwise. + pub fn decrease_ttl(&mut self) -> bool { + self.ttl = self.ttl.saturating_sub(1); + self.ttl > 0 + } +} + +/// Routed Message wrapped with previous sender of the message. +pub struct RoutedMessageFrom { + /// Routed messages. + pub msg: RoutedMessage, + /// Previous hop in the route. Used for messages that needs routing back. + pub from: PeerId, +} + +impl Message for RoutedMessageFrom { + type Result = bool; +} + +#[derive(Debug, Clone)] +pub enum BlockedPorts { + All, + Some(HashSet), +} + +/// Configuration for the peer-to-peer manager. +#[derive(Clone)] +pub struct NetworkConfig { + pub public_key: PublicKey, + pub secret_key: SecretKey, + pub account_id: Option, + pub addr: Option, + pub boot_nodes: Vec, + pub handshake_timeout: Duration, + pub reconnect_delay: Duration, + pub bootstrap_peers_period: Duration, + /// Maximum number of active peers. Hard limit. + pub max_num_peers: u32, + /// Minimum outbound connections a peer should have to avoid eclipse attacks. + pub minimum_outbound_peers: u32, + /// Lower bound of the ideal number of connections. + pub ideal_connections_lo: u32, + /// Upper bound of the ideal number of connections. + pub ideal_connections_hi: u32, + /// Peers which last message is was within this period of time are considered active recent peers. + pub peer_recent_time_window: Duration, + /// Number of peers to keep while removing a connection. + /// Used to avoid disconnecting from peers we have been connected since long time. + pub safe_set_size: u32, + /// Lower bound of the number of connections to archival peers to keep + /// if we are an archival node. + pub archival_peer_connections_lower_bound: u32, + /// Duration of the ban for misbehaving peers. + pub ban_window: Duration, + /// Remove expired peers. + pub peer_expiration_duration: Duration, + /// Maximum number of peer addresses we should ever send on PeersRequest. + pub max_send_peers: u32, + /// Duration for checking on stats from the peers. + pub peer_stats_period: Duration, + /// Time to persist Accounts Id in the router without removing them. + pub ttl_account_id_router: Duration, + /// Number of hops a message is allowed to travel before being dropped. + /// This is used to avoid infinite loop because of inconsistent view of the network + /// by different nodes. + pub routed_message_ttl: u8, + /// Maximum number of routes that we should keep track for each Account id in the Routing Table. + pub max_routes_to_store: usize, + /// Height horizon for highest height peers + /// For example if one peer is 1 height away from max height peer, + /// we still want to use the rest to query for state/headers/blocks. + pub highest_peer_horizon: u64, + /// Period between pushing network info to client + pub push_info_period: Duration, + /// Peers on blacklist by IP:Port. + /// Nodes will not accept or try to establish connection to such peers. + pub blacklist: HashMap, + /// Flag to disable outbound connections. When this flag is active, nodes will not try to + /// establish connection with other nodes, but will accept incoming connection if other requirements + /// are satisfied. + /// This flag should be ALWAYS FALSE. Only set to true for testing purposes. + pub outbound_disabled: bool, + /// Not clear old data, set `true` for archive nodes. + pub archive: bool, +} + +impl NetworkConfig { + /// Returns network config with given seed used for peer id. + pub fn from_seed(seed: &str, port: u16) -> Self { + let secret_key = SecretKey::from_seed(KeyType::ED25519, seed); + let public_key = secret_key.public_key(); + NetworkConfig { + public_key, + secret_key, + account_id: Some(seed.parse().unwrap()), + addr: Some(format!("0.0.0.0:{}", port).parse().unwrap()), + boot_nodes: vec![], + handshake_timeout: Duration::from_secs(60), + reconnect_delay: Duration::from_secs(60), + bootstrap_peers_period: Duration::from_millis(100), + max_num_peers: 10, + minimum_outbound_peers: 5, + ideal_connections_lo: 30, + ideal_connections_hi: 35, + peer_recent_time_window: Duration::from_secs(600), + safe_set_size: 20, + archival_peer_connections_lower_bound: 10, + ban_window: Duration::from_secs(1), + peer_expiration_duration: Duration::from_secs(60 * 60), + max_send_peers: 512, + peer_stats_period: Duration::from_secs(5), + ttl_account_id_router: Duration::from_secs(60 * 60), + routed_message_ttl: ROUTED_MESSAGE_TTL, + max_routes_to_store: 1, + highest_peer_horizon: 5, + push_info_period: Duration::from_millis(100), + blacklist: HashMap::new(), + outbound_disabled: false, + archive: false, + } + } + + pub fn verify(&self) { + if self.ideal_connections_lo + 1 >= self.ideal_connections_hi { + error!(target: "network", + "Invalid ideal_connections values. lo({}) > hi({}).", + self.ideal_connections_lo, self.ideal_connections_hi); + } + + if self.ideal_connections_hi >= self.max_num_peers { + error!(target: "network", + "max_num_peers({}) is below ideal_connections_hi({}) which may lead to connection saturation and declining new connections.", + self.max_num_peers, self.ideal_connections_hi + ); + } + + if self.outbound_disabled { + warn!(target: "network", "Outbound connections are disabled."); + } + + if self.safe_set_size <= self.minimum_outbound_peers { + error!(target: "network", + "safe_set_size({}) must be larger than minimum_outbound_peers({}).", + self.safe_set_size, + self.minimum_outbound_peers + ); + } + + if UPDATE_INTERVAL_LAST_TIME_RECEIVED_MESSAGE * 2 > self.peer_recent_time_window { + error!( + target: "network", + "Very short peer_recent_time_window({}). it should be at least twice update_interval_last_time_received_message({}).", + self.peer_recent_time_window.as_secs(), UPDATE_INTERVAL_LAST_TIME_RECEIVED_MESSAGE.as_secs() + ); + } + } +} + +/// Used to match a socket addr by IP:Port or only by IP +#[derive(Clone, Debug)] +pub enum PatternAddr { + Ip(IpAddr), + IpPort(SocketAddr), +} + +impl PatternAddr { + pub fn contains(&self, addr: &SocketAddr) -> bool { + match self { + PatternAddr::Ip(pattern) => &addr.ip() == pattern, + PatternAddr::IpPort(pattern) => addr == pattern, + } + } +} + +impl FromStr for PatternAddr { + type Err = AddrParseError; + + fn from_str(s: &str) -> Result { + if let Ok(pattern) = s.parse::() { + return Ok(PatternAddr::Ip(pattern)); + } + + s.parse::().map(PatternAddr::IpPort) + } +} + +/// Status of the known peers. +#[derive(BorshSerialize, BorshDeserialize, Serialize, Eq, PartialEq, Debug, Clone)] +pub enum KnownPeerStatus { + Unknown, + NotConnected, + Connected, + Banned(ReasonForBan, u64), +} + +impl KnownPeerStatus { + pub fn is_banned(&self) -> bool { + match self { + KnownPeerStatus::Banned(_, _) => true, + _ => false, + } + } +} + +/// Information node stores about known peers. +#[derive(BorshSerialize, BorshDeserialize, Debug, Clone)] +pub struct KnownPeerState { + pub peer_info: PeerInfo, + pub status: KnownPeerStatus, + pub first_seen: u64, + pub last_seen: u64, +} + +impl KnownPeerState { + pub fn new(peer_info: PeerInfo) -> Self { + KnownPeerState { + peer_info, + status: KnownPeerStatus::Unknown, + first_seen: to_timestamp(Utc::now()), + last_seen: to_timestamp(Utc::now()), + } + } + + pub fn first_seen(&self) -> DateTime { + from_timestamp(self.first_seen) + } + + pub fn last_seen(&self) -> DateTime { + from_timestamp(self.last_seen) + } +} + +impl TryFrom> for KnownPeerState { + type Error = Box; + + fn try_from(bytes: Vec) -> Result { + KnownPeerState::try_from_slice(&bytes).map_err(|err| err.into()) + } +} + +/// Actor message that holds the TCP stream from an inbound TCP connection +#[derive(Message)] +#[rtype(result = "()")] +pub struct InboundTcpConnect { + /// Tcp stream of the inbound connections + pub stream: TcpStream, +} + +impl InboundTcpConnect { + /// Method to create a new InboundTcpConnect message from a TCP stream + pub fn new(stream: TcpStream) -> InboundTcpConnect { + InboundTcpConnect { stream } + } +} + +/// Actor message to request the creation of an outbound TCP connection to a peer. +#[derive(Message)] +#[rtype(result = "()")] +pub struct OutboundTcpConnect { + /// Peer information of the outbound connection + pub peer_info: PeerInfo, +} + +/// Unregister message from Peer to PeerManager. +#[derive(Message)] +#[rtype(result = "()")] +pub struct Unregister { + pub peer_id: PeerId, + pub peer_type: PeerType, + pub remove_from_peer_store: bool, +} + +pub struct PeerList { + pub peers: Vec, +} + +/// Requesting peers from peer manager to communicate to a peer. +pub struct PeersRequest {} + +impl Message for PeersRequest { + type Result = PeerList; +} + +/// Received new peers from another peer. +#[derive(Message)] +#[rtype(result = "()")] +pub struct PeersResponse { + pub peers: Vec, +} + +impl MessageResponse for PeerList +where + A: Actor, + M: Message, +{ + fn handle>(self, _: &mut A::Context, tx: Option) { + if let Some(tx) = tx { + tx.send(self) + } + } +} + +/// Ban reason. +#[derive(BorshSerialize, BorshDeserialize, Serialize, Debug, Clone, PartialEq, Eq, Copy)] +pub enum ReasonForBan { + None = 0, + BadBlock = 1, + BadBlockHeader = 2, + HeightFraud = 3, + BadHandshake = 4, + BadBlockApproval = 5, + Abusive = 6, + InvalidSignature = 7, + InvalidPeerId = 8, + InvalidHash = 9, + InvalidEdge = 10, + EpochSyncNoResponse = 11, + EpochSyncInvalidResponse = 12, + EpochSyncInvalidFinalizationResponse = 13, +} + +/// Banning signal sent from Peer instance to PeerManager +/// just before Peer instance is stopped. +#[derive(Message)] +#[rtype(result = "()")] +pub struct Ban { + pub peer_id: PeerId, + pub ban_reason: ReasonForBan, +} + +/// Messages from PeerManager to Peer +#[derive(Message, Debug)] +#[rtype(result = "()")] +pub enum PeerManagerRequest { + BanPeer(ReasonForBan), + UnregisterPeer, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct KnownProducer { + pub account_id: AccountId, + pub addr: Option, + pub peer_id: PeerId, +} + +#[derive(PartialEq, Eq, Clone, Debug, BorshSerialize, BorshDeserialize)] +pub struct StateResponseInfoV1 { + pub shard_id: ShardId, + pub sync_hash: CryptoHash, + pub state_response: ShardStateSyncResponseV1, +} + +#[derive(PartialEq, Eq, Clone, Debug, BorshSerialize, BorshDeserialize)] +pub struct StateResponseInfoV2 { + pub shard_id: ShardId, + pub sync_hash: CryptoHash, + pub state_response: ShardStateSyncResponse, +} + +#[derive(PartialEq, Eq, Clone, Debug, BorshSerialize, BorshDeserialize)] +pub enum StateResponseInfo { + V1(StateResponseInfoV1), + V2(StateResponseInfoV2), +} + +impl StateResponseInfo { + pub fn shard_id(&self) -> ShardId { + match self { + Self::V1(info) => info.shard_id, + Self::V2(info) => info.shard_id, + } + } + + pub fn sync_hash(&self) -> CryptoHash { + match self { + Self::V1(info) => info.sync_hash, + Self::V2(info) => info.sync_hash, + } + } + + pub fn take_state_response(self) -> ShardStateSyncResponse { + match self { + Self::V1(info) => ShardStateSyncResponse::V1(info.state_response), + Self::V2(info) => info.state_response, + } + } +} + +#[cfg(feature = "adversarial")] +#[derive(Debug)] +pub enum NetworkAdversarialMessage { + AdvProduceBlocks(u64, bool), + AdvSwitchToHeight(u64), + AdvDisableHeaderSync, + AdvDisableDoomslug, + AdvGetSavedBlocks, + AdvCheckStorageConsistency, + AdvSetSyncInfo(u64), +} + +#[cfg(feature = "sandbox")] +#[derive(Debug)] +pub enum NetworkSandboxMessage { + SandboxPatchState(Vec), + SandboxPatchStateStatus, +} + +#[derive(AsStaticStr)] +pub enum NetworkViewClientMessages { + #[cfg(feature = "adversarial")] + Adversarial(NetworkAdversarialMessage), + + /// Transaction status query + TxStatus { tx_hash: CryptoHash, signer_account_id: AccountId }, + /// Transaction status response + TxStatusResponse(Box), + /// Request for receipt outcome + ReceiptOutcomeRequest(CryptoHash), + /// Receipt outcome response + ReceiptOutcomeResponse(Box), + /// Request a block. + BlockRequest(CryptoHash), + /// Request headers. + BlockHeadersRequest(Vec), + /// State request header. + StateRequestHeader { shard_id: ShardId, sync_hash: CryptoHash }, + /// State request part. + StateRequestPart { shard_id: ShardId, sync_hash: CryptoHash, part_id: u64 }, + /// A request for a light client info during Epoch Sync + EpochSyncRequest { epoch_id: EpochId }, + /// A request for headers and proofs during Epoch Sync + EpochSyncFinalizationRequest { epoch_id: EpochId }, + /// Get Chain information from Client. + GetChainInfo, + /// Account announcements that needs to be validated before being processed. + /// They are paired with last epoch id known to this announcement, in order to accept only + /// newer announcements. + AnnounceAccount(Vec<(AnnounceAccount, Option)>), +} + +#[derive(Debug)] +pub enum NetworkViewClientResponses { + /// Transaction execution outcome + TxStatus(Box), + /// Response to general queries + QueryResponse { query_id: String, response: Result }, + /// Receipt outcome response + ReceiptOutcomeResponse(Box), + /// Block response. + Block(Box), + /// Headers response. + BlockHeaders(Vec), + /// Chain information. + ChainInfo { + genesis_id: GenesisId, + height: BlockHeight, + tracked_shards: Vec, + archival: bool, + }, + /// Response to state request. + StateResponse(Box), + /// Valid announce accounts. + AnnounceAccount(Vec), + /// A response to a request for a light client block during Epoch Sync + EpochSyncResponse(EpochSyncResponse), + /// A response to a request for headers and proofs during Epoch Sync + EpochSyncFinalizationResponse(EpochSyncFinalizationResponse), + /// Ban peer for malicious behavior. + Ban { ban_reason: ReasonForBan }, + /// Response not needed + NoResponse, +} + +impl MessageResponse for NetworkViewClientResponses +where + A: Actor, + M: Message, +{ + fn handle>(self, _: &mut A::Context, tx: Option) { + if let Some(tx) = tx { + tx.send(self) + } + } +} + +impl Message for NetworkViewClientMessages { + type Result = NetworkViewClientResponses; +} + +/// Peer stats query. +pub struct QueryPeerStats {} + +/// Peer stats result +#[derive(Debug)] +pub struct PeerStatsResult { + /// Chain info. + pub chain_info: PeerChainInfoV2, + /// Number of bytes we've received from the peer. + pub received_bytes_per_sec: u64, + /// Number of bytes we've sent to the peer. + pub sent_bytes_per_sec: u64, + /// Returns if this peer is abusive and should be banned. + pub is_abusive: bool, + /// Counts of incoming/outgoing messages from given peer. + pub message_counts: (u64, u64), +} + +impl MessageResponse for PeerStatsResult +where + A: Actor, + M: Message, +{ + fn handle>(self, _: &mut A::Context, tx: Option) { + if let Some(tx) = tx { + tx.send(self) + } + } +} + +impl Message for QueryPeerStats { + type Result = PeerStatsResult; +} + +#[derive(Clone, Debug, Eq, PartialEq, BorshSerialize, BorshDeserialize)] +pub struct PartialEncodedChunkRequestMsg { + pub chunk_hash: ChunkHash, + pub part_ords: Vec, + pub tracking_shards: HashSet, +} + +#[derive(Clone, Debug, Eq, PartialEq, BorshSerialize, BorshDeserialize)] +pub struct PartialEncodedChunkResponseMsg { + pub chunk_hash: ChunkHash, + pub parts: Vec, + pub receipts: Vec, +} + +/// Message for chunk part owners to forward their parts to validators tracking that shard. +/// This reduces the number of requests a node tracking a shard needs to send to obtain enough +/// parts to reconstruct the message (in the best case no such requests are needed). +#[derive(Clone, Debug, Eq, PartialEq, BorshSerialize, BorshDeserialize)] +pub struct PartialEncodedChunkForwardMsg { + pub chunk_hash: ChunkHash, + pub inner_header_hash: CryptoHash, + pub merkle_root: CryptoHash, + pub signature: Signature, + pub prev_block_hash: CryptoHash, + pub height_created: BlockHeight, + pub shard_id: ShardId, + pub parts: Vec, +} + +impl PartialEncodedChunkForwardMsg { + pub fn from_header_and_parts( + header: &ShardChunkHeader, + parts: Vec, + ) -> Self { + Self { + chunk_hash: header.chunk_hash(), + inner_header_hash: header.inner_header_hash(), + merkle_root: header.encoded_merkle_root(), + signature: header.signature().clone(), + prev_block_hash: header.prev_block_hash(), + height_created: header.height_created(), + shard_id: header.shard_id(), + parts, + } + } + + pub fn is_valid_hash(&self) -> bool { + let correct_hash = combine_hash(self.inner_header_hash, self.merkle_root); + ChunkHash(correct_hash) == self.chunk_hash + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // NOTE: this has it's counterpart in `near_network::types::tests` + const ALLOWED_SIZE: usize = 1 << 20; + const NOTIFY_SIZE: usize = 1024; + + macro_rules! assert_size { + ($type:ident) => { + let struct_size = std::mem::size_of::<$type>(); + if struct_size >= NOTIFY_SIZE { + println!("The size of {} is {}", stringify!($type), struct_size); + } + assert!(struct_size <= ALLOWED_SIZE); + }; + } + + #[test] + fn test_enum_size() { + assert_size!(PeerType); + assert_size!(PeerStatus); + assert_size!(RoutedMessageBody); + assert_size!(PeerIdOrHash); + assert_size!(KnownPeerStatus); + assert_size!(ReasonForBan); + assert_size!(PeerManagerRequest); + } + + #[test] + fn test_struct_size() { + assert_size!(PeerInfo); + assert_size!(PeerChainInfoV2); + assert_size!(AnnounceAccountRoute); + assert_size!(AnnounceAccount); + assert_size!(Ping); + assert_size!(Pong); + assert_size!(RawRoutedMessage); + assert_size!(RoutedMessageNoSignature); + assert_size!(RoutedMessage); + assert_size!(RoutedMessageFrom); + assert_size!(NetworkConfig); + assert_size!(KnownPeerState); + assert_size!(InboundTcpConnect); + assert_size!(OutboundTcpConnect); + assert_size!(Unregister); + assert_size!(PeerList); + assert_size!(PeersRequest); + assert_size!(PeersResponse); + assert_size!(Ban); + assert_size!(StateResponseInfoV1); + assert_size!(QueryPeerStats); + assert_size!(PartialEncodedChunkRequestMsg); + } + + #[test] + fn routed_message_body_compatibility_smoke_test() { + #[track_caller] + fn check(msg: RoutedMessageBody, expected: &[u8]) { + let actual = msg.try_to_vec().unwrap(); + assert_eq!(actual.as_slice(), expected); + } + + check( + RoutedMessageBody::TxStatusRequest("test_x".parse().unwrap(), CryptoHash([42; 32])), + &[ + 2, 6, 0, 0, 0, 116, 101, 115, 116, 95, 120, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, + 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, + 42, + ], + ); + + check( + RoutedMessageBody::VersionedStateResponse(StateResponseInfo::V1(StateResponseInfoV1 { + shard_id: 62, + sync_hash: CryptoHash([92; 32]), + state_response: ShardStateSyncResponseV1 { header: None, part: None }, + })), + &[ + 17, 0, 62, 0, 0, 0, 0, 0, 0, 0, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, + 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 0, 0, + ], + ); + } +} diff --git a/chain/network/Cargo.toml b/chain/network/Cargo.toml index 527f592d307..12047365376 100644 --- a/chain/network/Cargo.toml +++ b/chain/network/Cargo.toml @@ -12,8 +12,6 @@ tokio-util = { version = "0.6", features = ["codec"] } tokio-stream = { version = "0.1.2", features = ["net"] } futures = "0.3" chrono = { version = "0.4.4", features = ["serde"] } -serde = { version = "1", features = [ "derive" ] } -serde_json = "1" rand = "0.7" byteorder = "1.2" lazy_static = "1.4" @@ -26,29 +24,29 @@ conqueue = "0.4.0" borsh = "0.8.1" cached = "0.23" +near-network-primitives = { path = "../network-primitives" } near-chain-configs = { path = "../../core/chain-configs" } near-crypto = { path = "../../core/crypto" } near-primitives = { path = "../../core/primitives" } near-store = { path = "../../core/store" } near-metrics = { path = "../../core/metrics" } -near-chain = { path = "../chain" } near-performance-metrics = { path = "../../utils/near-performance-metrics" } near-performance-metrics-macros = { path = "../../utils/near-performance-metrics-macros" } delay-detector = { path = "../../tools/delay_detector", optional = true} [dev-dependencies] +near-chain = { path = "../chain" } near-logger-utils = {path = "../../test-utils/logger"} near-actix-test-utils = { path = "../../test-utils/actix-test-utils" } tempfile = "3" bencher = "0.1.5" [features] -adversarial = [] -metric_recorder = [] +adversarial = ["near-network-primitives/adversarial"] delay_detector = ["delay-detector"] performance_stats = ["near-performance-metrics/performance_stats"] -sandbox = [] +sandbox = ["near-network-primitives/sandbox"] [[bench]] name = "graph" diff --git a/chain/network/src/lib.rs b/chain/network/src/lib.rs index a1bf076c4a2..4cb8909451d 100644 --- a/chain/network/src/lib.rs +++ b/chain/network/src/lib.rs @@ -1,6 +1,3 @@ -#[macro_use] -extern crate lazy_static; - pub use peer::{EPOCH_SYNC_PEER_TIMEOUT_MS, EPOCH_SYNC_REQUEST_TIMEOUT_MS}; pub use peer_manager::PeerManagerActor; pub use types::{ @@ -15,8 +12,6 @@ mod peer; mod peer_manager; pub mod peer_store; mod rate_counter; -#[cfg(feature = "metric_recorder")] -pub mod recorder; pub mod routing; pub mod types; pub mod utils; diff --git a/chain/network/src/metrics.rs b/chain/network/src/metrics.rs index 992c83c877e..3ccb59516b3 100644 --- a/chain/network/src/metrics.rs +++ b/chain/network/src/metrics.rs @@ -6,6 +6,8 @@ use near_metrics::{ use std::collections::HashMap; use strum::VariantNames; +use lazy_static::lazy_static; + lazy_static! { pub static ref PEER_CONNECTIONS_TOTAL: near_metrics::Result = try_create_int_gauge("near_peer_connections_total", "Number of connected peers"); diff --git a/chain/network/src/peer.rs b/chain/network/src/peer.rs index f1487494a0a..a6145eed793 100644 --- a/chain/network/src/peer.rs +++ b/chain/network/src/peer.rs @@ -28,8 +28,6 @@ use near_primitives::version::{ use crate::codec::{self, bytes_to_peer_message, peer_message_to_bytes, Codec}; use crate::rate_counter::RateCounter; -#[cfg(feature = "metric_recorder")] -use crate::recorder::{PeerMessageMetadata, Status}; use crate::routing::{Edge, EdgeInfo}; use crate::types::{ Ban, Consolidate, ConsolidateResponse, Handshake, HandshakeFailureReason, HandshakeV2, @@ -41,10 +39,12 @@ use crate::types::{ UPDATE_INTERVAL_LAST_TIME_RECEIVED_MESSAGE, }; use crate::PeerManagerActor; -use crate::{metrics, NetworkResponses}; +use crate::{ + metrics::{self, NetworkMetrics}, + NetworkResponses, +}; #[cfg(feature = "delay_detector")] use delay_detector::DelayDetector; -use metrics::NetworkMetrics; use near_performance_metrics_macros::perf; use near_primitives::sharding::PartialEncodedChunk; use near_rust_allocator_proxy::allocator::get_tid; @@ -253,20 +253,9 @@ impl Peer { PeerMessage::BlockRequest(h) => self.tracker.push_request(*h), _ => (), }; - #[cfg(feature = "metric_recorder")] - let metadata = { - let mut metadata: PeerMessageMetadata = msg.into(); - metadata = metadata.set_source(self.node_id()).set_status(Status::Sent); - if let Some(target) = self.peer_id() { - metadata = metadata.set_target(target); - } - metadata - }; match peer_message_to_bytes(msg) { Ok(bytes) => { - #[cfg(feature = "metric_recorder")] - self.peer_manager_addr.do_send(metadata.set_size(bytes.len())); self.tracker.increment_sent(bytes.len() as u64); let bytes_len = bytes.len(); if !self.framed.write(bytes) { @@ -705,9 +694,6 @@ impl StreamHandler, ReasonForBan>> for Peer { near_metrics::inc_counter_by(&metrics::PEER_DATA_RECEIVED_BYTES, msg.len() as u64); near_metrics::inc_counter(&metrics::PEER_MESSAGE_RECEIVED_TOTAL); - #[cfg(feature = "metric_recorder")] - let msg_size = msg.len(); - self.tracker.increment_received(msg.len() as u64); if codec::is_forward_tx(&msg).unwrap_or(false) { let r = self.txns_since_last_block.load(Ordering::Acquire); @@ -758,19 +744,6 @@ impl StreamHandler, ReasonForBan>> for Peer { self.on_receive_message(); - #[cfg(feature = "metric_recorder")] - { - let mut metadata: PeerMessageMetadata = (&peer_msg).into(); - metadata = - metadata.set_size(msg_size).set_target(self.node_id()).set_status(Status::Received); - - if let Some(peer_id) = self.peer_id() { - metadata = metadata.set_source(peer_id); - } - - self.peer_manager_addr.do_send(metadata); - } - self.network_metrics .inc(NetworkMetrics::peer_message_total_rx(&peer_msg.msg_variant()).as_ref()); diff --git a/chain/network/src/peer_manager.rs b/chain/network/src/peer_manager.rs index 2f7721bdf6a..7d9d3696869 100644 --- a/chain/network/src/peer_manager.rs +++ b/chain/network/src/peer_manager.rs @@ -18,18 +18,16 @@ use tokio::net::{TcpListener, TcpStream}; use tokio_util::codec::FramedRead; use tracing::{debug, error, info, trace, warn}; +use crate::codec::Codec; +use crate::peer::Peer; use near_primitives::hash::CryptoHash; use near_primitives::network::{AnnounceAccount, PeerId}; use near_primitives::types::AccountId; use near_primitives::utils::from_timestamp; use near_store::Store; -use crate::codec::Codec; use crate::metrics; -use crate::peer::Peer; use crate::peer_store::{PeerStore, TrustLevel}; -#[cfg(feature = "metric_recorder")] -use crate::recorder::{MetricRecorder, PeerMessageMetadata}; use crate::routing::{ Edge, EdgeInfo, EdgeType, EdgeVerifierHelper, ProcessEdgeResult, RoutingTable, MAX_NUM_PEERS, }; @@ -74,9 +72,6 @@ const EXPONENTIAL_BACKOFF_RATIO: f64 = 1.1; /// /// EXPONENTIAL_BACKOFF_LIMIT = math.log(60000 / 10, EXPONENTIAL_BACKOFF_RATIO) const EXPONENTIAL_BACKOFF_LIMIT: u64 = 91; -/// Time to wait before sending ping to all reachable peers. -#[cfg(feature = "metric_recorder")] -const WAIT_BEFORE_PING: u64 = 20_000; /// Limit number of pending Peer actors to avoid OOM. const LIMIT_PENDING_PEERS: usize = 60; /// How ofter should we broadcast edges. @@ -172,9 +167,6 @@ pub struct PeerManagerActor { pending_update_nonce_request: HashMap, /// Dynamic Prometheus metrics network_metrics: NetworkMetrics, - /// Store all collected metrics from a node. - #[cfg(feature = "metric_recorder")] - metric_recorder: MetricRecorder, edge_verifier_pool: Addr, txns_since_last_block: Arc, pending_incoming_connections_counter: Arc, @@ -203,9 +195,6 @@ impl PeerManagerActor { let me: PeerId = config.public_key.clone().into(); let routing_table = RoutingTable::new(me.clone(), store); - #[cfg(feature = "metric_recorder")] - let metric_recorder = MetricRecorder::default().set_me(me.clone()); - let txns_since_last_block = Arc::new(AtomicUsize::new(0)); Ok(PeerManagerActor { @@ -223,8 +212,6 @@ impl PeerManagerActor { pending_update_nonce_request: HashMap::new(), network_metrics: NetworkMetrics::new(), edge_verifier_pool, - #[cfg(feature = "metric_recorder")] - metric_recorder, txns_since_last_block, pending_incoming_connections_counter: Arc::new(AtomicUsize::new(0)), peer_counter: Arc::new(AtomicUsize::new(0)), @@ -700,8 +687,6 @@ impl PeerManagerActor { act.scheduled_routing_table_update = false; // We only want to save prune edges if there are no pending requests to EdgeVerifier act.routing_table.update(act.edge_verifier_requests_in_progress == 0); - #[cfg(feature = "metric_recorder")] - act.metric_recorder.set_graph(act.routing_table.get_raw_graph()) }, ); } @@ -776,24 +761,6 @@ impl PeerManagerActor { ); } - #[cfg(feature = "metric_recorder")] - fn ping_all_peers(&mut self, ctx: &mut Context) { - for peer_id in self.routing_table.reachable_peers().cloned().collect::>() { - let nonce = self.routing_table.get_ping(peer_id.clone()); - self.send_ping(ctx, nonce, peer_id); - } - - near_performance_metrics::actix::run_later( - ctx, - file!(), - line!(), - Duration::from_millis(WAIT_BEFORE_PING), - move |act, ctx| { - act.ping_all_peers(ctx); - }, - ); - } - /// Periodically query peer actors for latest weight and traffic info. fn monitor_peer_stats(&mut self, ctx: &mut Context) { for (peer_id, active_peer) in self.active_peers.iter() { @@ -1209,17 +1176,9 @@ impl PeerManagerActor { } /// Handle pong messages. Add pong temporary to the routing table, mostly used for testing. - /// If `metric_recorder` feature flag is enabled, save how much time passed since we sent ping. fn handle_pong(&mut self, _ctx: &mut Context, pong: Pong) { - #[cfg(feature = "metric_recorder")] - let source = pong.source.clone(); #[allow(unused_variables)] let latency = self.routing_table.add_pong(pong); - #[cfg(feature = "metric_recorder")] - latency.and_then::<(), _>(|latency| { - self.metric_recorder.add_latency(source, latency); - None - }); } pub(crate) fn get_network_info(&mut self) -> NetworkInfo { @@ -1246,8 +1205,6 @@ impl PeerManagerActor { addr: None, }) .collect(), - #[cfg(feature = "metric_recorder")] - metric_recorder: self.metric_recorder.clone(), peer_counter: self.peer_counter.load(Ordering::SeqCst), } } @@ -1333,10 +1290,6 @@ impl Actor for PeerManagerActor { // Start active peer stats querying. self.monitor_peer_stats(ctx); - // Periodically ping all peers to determine latencies between pair of peers. - #[cfg(feature = "metric_recorder")] - self.ping_all_peers(ctx); - self.broadcast_edges(ctx); } @@ -2003,14 +1956,3 @@ impl Handler for PeerManagerActor { } } } - -#[cfg(feature = "metric_recorder")] -impl Handler for PeerManagerActor { - type Result = (); - #[perf] - fn handle(&mut self, msg: PeerMessageMetadata, _ctx: &mut Self::Context) -> Self::Result { - #[cfg(feature = "delay_detector")] - let _d = DelayDetector::new("peer message metadata".into()); - self.metric_recorder.handle_peer_message(msg); - } -} diff --git a/chain/network/src/recorder.rs b/chain/network/src/recorder.rs deleted file mode 100644 index ca19976d039..00000000000 --- a/chain/network/src/recorder.rs +++ /dev/null @@ -1,221 +0,0 @@ -use crate::types::PeerMessage; -use actix::Message; -use near_primitives::{hash::CryptoHash, network::PeerId}; -use serde::ser::SerializeMap; -use serde::{Deserialize, Serialize}; -use std::collections::{HashMap, HashSet}; -use tracing::info; - -const WEIGHTED_LATENCY_DECAY: f64 = 0.8; - -#[derive(Clone, Copy)] -pub enum Status { - Sent, - Received, -} -#[derive(Default, Serialize, Deserialize, Debug, Clone)] -struct CountSize { - count: usize, - bytes: usize, -} - -impl CountSize { - fn update(&mut self, bytes: usize) { - self.count += 1; - self.bytes += bytes; - } -} - -#[derive(Default, Serialize, Deserialize, Debug, Clone)] -struct SentReceived { - sent: CountSize, - received: CountSize, -} - -impl SentReceived { - fn get(&mut self, status: Status) -> &mut CountSize { - match status { - Status::Sent => &mut self.sent, - Status::Received => &mut self.received, - } - } -} - -#[derive(Default, Debug, Deserialize, Clone)] -struct HashAggregator { - total: usize, - all: HashSet, -} - -impl HashAggregator { - fn add(&mut self, hash: CryptoHash) { - self.total += 1; - self.all.insert(hash); - } - - /// Number of different hashes added to the aggregator so far. - fn num_different_hashes(&self) -> usize { - self.all.len() - } -} - -impl Serialize for HashAggregator { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let mut dic = serializer.serialize_map(Some(2))?; - dic.serialize_entry("total", &self.total)?; - dic.serialize_entry("different", &self.num_different_hashes())?; - dic.end() - } -} -#[derive(Default, Debug, Clone, Serialize, Deserialize)] -struct Latency { - received: usize, - mean_latency: f64, - weighted_latency: f64, -} - -impl Latency { - fn add(&mut self, latency: f64) { - if self.received == 0 { - self.weighted_latency = latency; - } else { - self.weighted_latency = WEIGHTED_LATENCY_DECAY * self.weighted_latency - + (1f64 - WEIGHTED_LATENCY_DECAY) * latency; - } - self.mean_latency = - (self.mean_latency * (self.received as f64) + latency) / ((self.received + 1) as f64); - self.received += 1; - } -} - -#[derive(Default, Serialize, Deserialize, Debug, Clone)] -pub struct MetricRecorder { - me: Option, - overall: SentReceived, - per_type: HashMap, - per_peer: HashMap, - graph: Vec<(PeerId, PeerId)>, - challenge_hashes: HashAggregator, - block_hashes: HashAggregator, - latencies: HashMap, -} - -impl MetricRecorder { - pub fn set_me(mut self, me: PeerId) -> Self { - self.me = Some(me); - self - } - - pub fn set_graph(&mut self, graph: HashMap>) { - self.graph.clear(); - for (u, u_adj) in graph { - for v in u_adj { - if u < v { - self.graph.push((u.clone(), v)); - } - } - } - } - - pub fn add_latency(&mut self, peer_id: PeerId, latency: f64) { - self.latencies.entry(peer_id).or_default().add(latency); - } - - pub fn handle_peer_message(&mut self, peer_message_metadata: PeerMessageMetadata) { - self.overall - .get(peer_message_metadata.status.unwrap()) - .update(peer_message_metadata.size.unwrap()); - - self.per_type - .entry(peer_message_metadata.message_type.clone()) - .or_insert(SentReceived::default()) - .get(peer_message_metadata.status.unwrap()) - .update(peer_message_metadata.size.unwrap()); - - if let Some(peer) = peer_message_metadata.other_peer() { - self.per_peer - .entry(peer) - .or_insert(SentReceived::default()) - .get(peer_message_metadata.status.unwrap()) - .update(peer_message_metadata.size.unwrap()); - } - - match peer_message_metadata.message_type.as_str() { - "Challenge" => self.challenge_hashes.add(peer_message_metadata.hash.unwrap()), - "Block" => self.block_hashes.add(peer_message_metadata.hash.unwrap()), - _ => {} - } - } - - #[allow(dead_code)] - pub fn report(&self) { - info!(target: "stats", "{:?}", serde_json::to_string(&self)); - } - - pub fn get_report(&self) -> Result { - serde_json::to_value(&self) - } -} - -#[derive(Message)] -#[rtype(result = "()")] -pub struct PeerMessageMetadata { - source: Option, - target: Option, - status: Option, - message_type: String, - size: Option, - hash: Option, -} - -impl PeerMessageMetadata { - pub fn set_source(mut self, peer_id: PeerId) -> Self { - self.source = Some(peer_id); - self - } - - pub fn set_target(mut self, peer_id: PeerId) -> Self { - self.target = Some(peer_id); - self - } - - pub fn set_status(mut self, status: Status) -> Self { - self.status = Some(status); - self - } - - pub fn set_size(mut self, size: usize) -> Self { - self.size = Some(size); - self - } - - fn other_peer(&self) -> Option { - match self.status { - Some(Status::Received) => self.source.clone(), - Some(Status::Sent) => self.target.clone(), - _ => None, - } - } -} - -impl From<&PeerMessage> for PeerMessageMetadata { - fn from(msg: &PeerMessage) -> Self { - let hash = match msg { - PeerMessage::Challenge(challenge) => Some(challenge.hash), - PeerMessage::Block(block) => Some(*block.hash()), - _ => None, - }; - - Self { - source: None, - target: None, - status: None, - message_type: msg.to_string(), - size: None, - hash, - } - } -} diff --git a/chain/network/src/routing.rs b/chain/network/src/routing.rs index 0285823b375..169b311f2cd 100644 --- a/chain/network/src/routing.rs +++ b/chain/network/src/routing.rs @@ -734,22 +734,6 @@ impl RoutingTable { }) } } - - #[cfg(feature = "metric_recorder")] - pub fn get_raw_graph(&self) -> HashMap> { - let mut res = HashMap::with_capacity(self.raw_graph.adjacency.len()); - for (key, neighbors) in self.raw_graph.adjacency.iter().enumerate() { - if self.raw_graph.used[key] { - let key = self.raw_graph.id2p[key].clone(); - let neighbors = neighbors - .iter() - .map(|&node| self.raw_graph.id2p[node as usize].clone()) - .collect::>(); - res.insert(key, neighbors); - } - } - res - } } pub struct ProcessEdgeResult { diff --git a/chain/network/src/test_utils.rs b/chain/network/src/test_utils.rs index 7836a34e9d3..fc778f091fe 100644 --- a/chain/network/src/test_utils.rs +++ b/chain/network/src/test_utils.rs @@ -13,11 +13,13 @@ use near_primitives::network::PeerId; use near_primitives::types::EpochId; use near_primitives::utils::index_to_bytes; -use crate::types::{NetworkConfig, NetworkInfo, PeerInfo, ReasonForBan, ROUTED_MESSAGE_TTL}; +use crate::types::{NetworkInfo, PeerInfo, ReasonForBan}; use crate::{NetworkAdapter, NetworkRequests, NetworkResponses, PeerManagerActor}; use futures::future::BoxFuture; use std::sync::{Arc, Mutex, RwLock}; +use lazy_static::lazy_static; + lazy_static! { static ref OPENED_PORTS: Mutex> = Mutex::new(HashSet::new()); } @@ -46,43 +48,6 @@ pub fn open_port() -> u16 { panic!("Failed to find an open port after {} attempts.", max_attempts); } -impl NetworkConfig { - /// Returns network config with given seed used for peer id. - pub fn from_seed(seed: &str, port: u16) -> Self { - let secret_key = SecretKey::from_seed(KeyType::ED25519, seed); - let public_key = secret_key.public_key(); - NetworkConfig { - public_key, - secret_key, - account_id: Some(seed.parse().unwrap()), - addr: Some(format!("0.0.0.0:{}", port).parse().unwrap()), - boot_nodes: vec![], - handshake_timeout: Duration::from_secs(60), - reconnect_delay: Duration::from_secs(60), - bootstrap_peers_period: Duration::from_millis(100), - max_num_peers: 10, - minimum_outbound_peers: 5, - ideal_connections_lo: 30, - ideal_connections_hi: 35, - peer_recent_time_window: Duration::from_secs(600), - safe_set_size: 20, - archival_peer_connections_lower_bound: 10, - ban_window: Duration::from_secs(1), - peer_expiration_duration: Duration::from_secs(60 * 60), - max_send_peers: 512, - peer_stats_period: Duration::from_secs(5), - ttl_account_id_router: Duration::from_secs(60 * 60), - routed_message_ttl: ROUTED_MESSAGE_TTL, - max_routes_to_store: 1, - highest_peer_horizon: 5, - push_info_period: Duration::from_millis(100), - blacklist: HashMap::new(), - outbound_disabled: false, - archive: false, - } - } -} - pub fn peer_id_from_seed(seed: &str) -> PeerId { SecretKey::from_seed(KeyType::ED25519, seed).public_key().into() } @@ -96,13 +61,6 @@ pub fn convert_boot_nodes(boot_nodes: Vec<(&str, u16)>) -> Vec { result } -impl PeerInfo { - /// Creates random peer info. - pub fn random() -> Self { - PeerInfo { id: PeerId::random(), addr: None, account_id: None } - } -} - /// Timeouts by stopping system without any condition and raises panic. /// Useful in tests to prevent them from running forever. #[allow(unreachable_code)] diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index 7d6c7dad12f..57a990dedb1 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -1,194 +1,39 @@ -use std::collections::{HashMap, HashSet}; -use std::convert::{Into, TryFrom, TryInto}; +use std::collections::HashMap; +use std::convert::{Into, TryInto}; use std::fmt; -use std::net::{AddrParseError, IpAddr, SocketAddr}; -use std::str::FromStr; use std::sync::{Arc, Mutex, RwLock}; -use std::time::{Duration, Instant}; +use std::time::Instant; use actix::dev::{MessageResponse, ResponseChannel}; use actix::{Actor, Addr, MailboxError, Message, Recipient}; use borsh::{BorshDeserialize, BorshSerialize}; -use chrono::{DateTime, Utc}; use futures::{future::BoxFuture, FutureExt}; -use serde::{Deserialize, Serialize}; use strum::AsStaticStr; -use tokio::net::TcpStream; -use tracing::{error, warn}; -use near_chain::{Block, BlockHeader}; -use near_crypto::{PublicKey, SecretKey, Signature}; -use near_primitives::block::{Approval, ApprovalMessage, GenesisId}; +pub use near_network_primitives::types::*; + +use near_primitives::block::{Approval, ApprovalMessage, Block, BlockHeader, GenesisId}; use near_primitives::challenge::Challenge; use near_primitives::errors::InvalidTxError; -use near_primitives::hash::{hash, CryptoHash}; +use near_primitives::hash::CryptoHash; use near_primitives::network::{AnnounceAccount, PeerId}; -use near_primitives::sharding::{ - ChunkHash, PartialEncodedChunk, PartialEncodedChunkPart, PartialEncodedChunkV1, - PartialEncodedChunkWithArcReceipts, ReceiptProof, ShardChunkHeader, -}; -#[cfg(feature = "sandbox")] -use near_primitives::state_record::StateRecord; -use near_primitives::syncing::{ - EpochSyncFinalizationResponse, EpochSyncResponse, ShardStateSyncResponse, - ShardStateSyncResponseV1, -}; -use near_primitives::transaction::{ExecutionOutcomeWithIdAndProof, SignedTransaction}; -use near_primitives::types::{AccountId, BlockHeight, BlockReference, EpochId, ShardId}; -use near_primitives::utils::{from_timestamp, to_timestamp}; +use near_primitives::sharding::{PartialEncodedChunk, PartialEncodedChunkWithArcReceipts}; +use near_primitives::syncing::{EpochSyncFinalizationResponse, EpochSyncResponse}; +use near_primitives::transaction::SignedTransaction; +use near_primitives::types::{AccountId, BlockReference, EpochId, ShardId}; use near_primitives::version::{ ProtocolVersion, OLDEST_BACKWARD_COMPATIBLE_PROTOCOL_VERSION, PROTOCOL_VERSION, }; -use near_primitives::views::{FinalExecutionOutcomeView, QueryRequest, QueryResponse}; +use near_primitives::views::QueryRequest; use crate::peer::Peer; -#[cfg(feature = "metric_recorder")] -use crate::recorder::MetricRecorder; use crate::routing::{Edge, EdgeInfo, RoutingTableInfo}; -use std::fmt::{Debug, Error, Formatter}; +use std::fmt::{Debug, Formatter}; use std::io; use conqueue::QueueSender; -use near_primitives::merkle::combine_hash; const ERROR_UNEXPECTED_LENGTH_OF_INPUT: &str = "Unexpected length of input"; -/// Number of hops a message is allowed to travel before being dropped. -/// This is used to avoid infinite loop because of inconsistent view of the network -/// by different nodes. -pub const ROUTED_MESSAGE_TTL: u8 = 100; -/// On every message from peer don't update `last_time_received_message` -/// but wait some "small" timeout between updates to avoid a lot of messages between -/// Peer and PeerManager. -pub const UPDATE_INTERVAL_LAST_TIME_RECEIVED_MESSAGE: Duration = Duration::from_secs(60); - -/// Peer information. -#[derive(BorshSerialize, BorshDeserialize, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -pub struct PeerInfo { - pub id: PeerId, - pub addr: Option, - pub account_id: Option, -} - -impl PeerInfo { - pub fn addr_port(&self) -> Option { - self.addr.map(|addr| addr.port()) - } -} - -impl PeerInfo { - pub fn new(id: PeerId, addr: SocketAddr) -> Self { - PeerInfo { id, addr: Some(addr), account_id: None } - } -} - -// Note, `Display` automatically implements `ToString` which must be reciprocal to `FromStr`. -impl fmt::Display for PeerInfo { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.id)?; - if let Some(addr) = &self.addr { - write!(f, "@{}", addr)?; - } - if let Some(account_id) = &self.account_id { - write!(f, "@{}", account_id)?; - } - Ok(()) - } -} - -impl FromStr for PeerInfo { - type Err = Box; - - fn from_str(s: &str) -> Result { - let chunks: Vec<&str> = s.split('@').collect(); - let addr; - let account_id; - if chunks.len() == 1 { - addr = None; - account_id = None; - } else if chunks.len() == 2 { - if let Ok(x) = chunks[1].parse::() { - addr = Some(x); - account_id = None; - } else { - addr = None; - account_id = Some(chunks[1].parse().unwrap()); - } - } else if chunks.len() == 3 { - addr = Some(chunks[1].parse::()?); - account_id = Some(chunks[2].parse().unwrap()); - } else { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - format!("Invalid PeerInfo format: {:?}", chunks), - ))); - } - Ok(PeerInfo { id: PeerId(chunks[0].parse()?), addr, account_id }) - } -} - -impl TryFrom<&str> for PeerInfo { - type Error = Box; - - fn try_from(s: &str) -> Result { - Self::from_str(s) - } -} - -/// Peer chain information. -/// TODO: Remove in next version -#[derive(BorshSerialize, BorshDeserialize, Clone, Debug, Eq, PartialEq, Default)] -pub struct PeerChainInfo { - /// Chain Id and hash of genesis block. - pub genesis_id: GenesisId, - /// Last known chain height of the peer. - pub height: BlockHeight, - /// Shards that the peer is tracking. - pub tracked_shards: Vec, -} - -/// Peer chain information. -#[derive(BorshSerialize, BorshDeserialize, Clone, Debug, Eq, PartialEq, Default)] -pub struct PeerChainInfoV2 { - /// Chain Id and hash of genesis block. - pub genesis_id: GenesisId, - /// Last known chain height of the peer. - pub height: BlockHeight, - /// Shards that the peer is tracking. - pub tracked_shards: Vec, - /// Denote if a node is running in archival mode or not. - pub archival: bool, -} - -impl From for PeerChainInfoV2 { - fn from(peer_chain_info: PeerChainInfo) -> Self { - Self { - genesis_id: peer_chain_info.genesis_id, - height: peer_chain_info.height, - tracked_shards: peer_chain_info.tracked_shards, - archival: false, - } - } -} - -/// Peer type. -#[derive(Copy, Clone, Debug, Eq, PartialEq)] -pub enum PeerType { - /// Inbound session - Inbound, - /// Outbound session - Outbound, -} - -/// Peer status. -#[derive(Copy, Clone, Debug, Eq, PartialEq)] -pub enum PeerStatus { - /// Waiting for handshake. - Connecting, - /// Ready to go. - Ready, - /// Banned, should shutdown this peer. - Banned(ReasonForBan), -} #[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)] pub enum HandshakeFailureReason { @@ -414,294 +259,6 @@ impl From for Handshake { } } -/// Account route description -#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)] -pub struct AnnounceAccountRoute { - pub peer_id: PeerId, - pub hash: CryptoHash, - pub signature: Signature, -} - -#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)] -pub struct Ping { - pub nonce: u64, - pub source: PeerId, -} - -#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)] -pub struct Pong { - pub nonce: u64, - pub source: PeerId, -} - -// TODO(#1313): Use Box -#[derive( - BorshSerialize, - BorshDeserialize, - PartialEq, - Eq, - Clone, - strum::AsStaticStr, - strum::EnumVariantNames, -)] -#[allow(clippy::large_enum_variant)] -pub enum RoutedMessageBody { - BlockApproval(Approval), - ForwardTx(SignedTransaction), - - TxStatusRequest(AccountId, CryptoHash), - TxStatusResponse(FinalExecutionOutcomeView), - QueryRequest { - query_id: String, - block_reference: BlockReference, - request: QueryRequest, - }, - QueryResponse { - query_id: String, - response: Result, - }, - ReceiptOutcomeRequest(CryptoHash), - /// Not used, but needed to preserve backward compatibility. - Unused, - StateRequestHeader(ShardId, CryptoHash), - StateRequestPart(ShardId, CryptoHash, u64), - StateResponse(StateResponseInfoV1), - PartialEncodedChunkRequest(PartialEncodedChunkRequestMsg), - PartialEncodedChunkResponse(PartialEncodedChunkResponseMsg), - PartialEncodedChunk(PartialEncodedChunkV1), - /// Ping/Pong used for testing networking and routing. - Ping(Ping), - Pong(Pong), - VersionedPartialEncodedChunk(PartialEncodedChunk), - VersionedStateResponse(StateResponseInfo), - PartialEncodedChunkForward(PartialEncodedChunkForwardMsg), -} - -impl From for RoutedMessageBody { - fn from(pec: PartialEncodedChunkWithArcReceipts) -> Self { - if let ShardChunkHeader::V1(legacy_header) = pec.header { - Self::PartialEncodedChunk(PartialEncodedChunkV1 { - header: legacy_header, - parts: pec.parts, - receipts: pec.receipts.into_iter().map(|r| ReceiptProof::clone(&r)).collect(), - }) - } else { - Self::VersionedPartialEncodedChunk(pec.into()) - } - } -} - -impl Debug for RoutedMessageBody { - fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { - match self { - RoutedMessageBody::BlockApproval(approval) => write!( - f, - "Approval({}, {}, {:?})", - approval.target_height, approval.account_id, approval.inner - ), - RoutedMessageBody::ForwardTx(tx) => write!(f, "tx {}", tx.get_hash()), - RoutedMessageBody::TxStatusRequest(account_id, hash) => { - write!(f, "TxStatusRequest({}, {})", account_id, hash) - } - RoutedMessageBody::TxStatusResponse(response) => { - write!(f, "TxStatusResponse({})", response.transaction.hash) - } - RoutedMessageBody::QueryRequest { .. } => write!(f, "QueryRequest"), - RoutedMessageBody::QueryResponse { .. } => write!(f, "QueryResponse"), - RoutedMessageBody::ReceiptOutcomeRequest(hash) => write!(f, "ReceiptRequest({})", hash), - RoutedMessageBody::StateRequestHeader(shard_id, sync_hash) => { - write!(f, "StateRequestHeader({}, {})", shard_id, sync_hash) - } - RoutedMessageBody::StateRequestPart(shard_id, sync_hash, part_id) => { - write!(f, "StateRequestPart({}, {}, {})", shard_id, sync_hash, part_id) - } - RoutedMessageBody::StateResponse(response) => { - write!(f, "StateResponse({}, {})", response.shard_id, response.sync_hash) - } - RoutedMessageBody::PartialEncodedChunkRequest(request) => { - write!(f, "PartialChunkRequest({:?}, {:?})", request.chunk_hash, request.part_ords) - } - RoutedMessageBody::PartialEncodedChunkResponse(response) => write!( - f, - "PartialChunkResponse({:?}, {:?})", - response.chunk_hash, - response.parts.iter().map(|p| p.part_ord).collect::>() - ), - RoutedMessageBody::PartialEncodedChunk(chunk) => { - write!(f, "PartialChunk({:?})", chunk.header.hash) - } - RoutedMessageBody::VersionedPartialEncodedChunk(_) => { - write!(f, "VersionedPartialChunk(?)") - } - RoutedMessageBody::VersionedStateResponse(response) => write!( - f, - "VersionedStateResponse({}, {})", - response.shard_id(), - response.sync_hash() - ), - RoutedMessageBody::PartialEncodedChunkForward(forward) => write!( - f, - "PartialChunkForward({:?}, {:?})", - forward.chunk_hash, - forward.parts.iter().map(|p| p.part_ord).collect::>(), - ), - RoutedMessageBody::Ping(_) => write!(f, "Ping"), - RoutedMessageBody::Pong(_) => write!(f, "Pong"), - RoutedMessageBody::Unused => write!(f, "Unused"), - } - } -} - -#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)] -pub enum PeerIdOrHash { - PeerId(PeerId), - Hash(CryptoHash), -} - -#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize, Hash)] -// Defines the destination for a network request. -// The request should be sent either to the `account_id` as a routed message, or directly to -// any peer that tracks the shard. -// If `prefer_peer` is `true`, should be sent to the peer, unless no peer tracks the shard, in which -// case fall back to sending to the account. -// Otherwise, send to the account, unless we do not know the route, in which case send to the peer. -pub struct AccountIdOrPeerTrackingShard { - pub shard_id: ShardId, - pub only_archival: bool, - pub account_id: Option, - pub prefer_peer: bool, -} - -impl AccountIdOrPeerTrackingShard { - pub fn from_account(shard_id: ShardId, account_id: AccountId) -> Self { - Self { shard_id, only_archival: false, account_id: Some(account_id), prefer_peer: false } - } -} - -#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize, Hash)] -pub enum AccountOrPeerIdOrHash { - AccountId(AccountId), - PeerId(PeerId), - Hash(CryptoHash), -} - -impl AccountOrPeerIdOrHash { - fn peer_id_or_hash(&self) -> Option { - match self { - AccountOrPeerIdOrHash::AccountId(_) => None, - AccountOrPeerIdOrHash::PeerId(peer_id) => Some(PeerIdOrHash::PeerId(peer_id.clone())), - AccountOrPeerIdOrHash::Hash(hash) => Some(PeerIdOrHash::Hash(hash.clone())), - } - } -} - -#[derive(Message)] -#[rtype(result = "()")] -pub struct RawRoutedMessage { - pub target: AccountOrPeerIdOrHash, - pub body: RoutedMessageBody, -} - -impl RawRoutedMessage { - /// Add signature to the message. - /// Panics if the target is an AccountId instead of a PeerId. - pub fn sign( - self, - author: PeerId, - secret_key: &SecretKey, - routed_message_ttl: u8, - ) -> RoutedMessage { - let target = self.target.peer_id_or_hash().unwrap(); - let hash = RoutedMessage::build_hash(&target, &author, &self.body); - let signature = secret_key.sign(hash.as_ref()); - RoutedMessage { target, author, signature, ttl: routed_message_ttl, body: self.body } - } -} - -#[derive(BorshSerialize, PartialEq, Eq, Clone, Debug)] -pub struct RoutedMessageNoSignature<'a> { - target: &'a PeerIdOrHash, - author: &'a PeerId, - body: &'a RoutedMessageBody, -} - -/// RoutedMessage represent a package that will travel the network towards a specific peer id. -/// It contains the peer_id and signature from the original sender. Every intermediate peer in the -/// route must verify that this signature is valid otherwise previous sender of this package should -/// be banned. If the final receiver of this package finds that the body is invalid the original -/// sender of the package should be banned instead. -/// If target is hash, it is a message that should be routed back using the same path used to route -/// the request in first place. It is the hash of the request message. -#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)] -pub struct RoutedMessage { - /// Peer id which is directed this message. - /// If `target` is hash, this a message should be routed back. - pub target: PeerIdOrHash, - /// Original sender of this message - pub author: PeerId, - /// Signature from the author of the message. If this signature is invalid we should ban - /// last sender of this message. If the message is invalid we should ben author of the message. - pub signature: Signature, - /// Time to live for this message. After passing through some hop this number should be - /// decreased by 1. If this number is 0, drop this message. - pub ttl: u8, - /// Message - pub body: RoutedMessageBody, -} - -impl RoutedMessage { - pub fn build_hash( - target: &PeerIdOrHash, - source: &PeerId, - body: &RoutedMessageBody, - ) -> CryptoHash { - hash( - &RoutedMessageNoSignature { target, author: source, body } - .try_to_vec() - .expect("Failed to serialize"), - ) - } - - pub fn hash(&self) -> CryptoHash { - RoutedMessage::build_hash(&self.target, &self.author, &self.body) - } - - pub fn verify(&self) -> bool { - self.signature.verify(self.hash().as_ref(), &self.author.public_key()) - } - - pub fn expect_response(&self) -> bool { - match self.body { - RoutedMessageBody::Ping(_) - | RoutedMessageBody::TxStatusRequest(_, _) - | RoutedMessageBody::StateRequestHeader(_, _) - | RoutedMessageBody::StateRequestPart(_, _, _) - | RoutedMessageBody::PartialEncodedChunkRequest(_) - | RoutedMessageBody::QueryRequest { .. } - | RoutedMessageBody::ReceiptOutcomeRequest(_) => true, - _ => false, - } - } - - /// Return true if ttl is positive after decreasing ttl by one, false otherwise. - pub fn decrease_ttl(&mut self) -> bool { - self.ttl = self.ttl.saturating_sub(1); - self.ttl > 0 - } -} - -/// Routed Message wrapped with previous sender of the message. -pub struct RoutedMessageFrom { - /// Routed messages. - pub msg: RoutedMessage, - /// Previous hop in the route. Used for messages that needs routing back. - pub from: PeerId, -} - -impl Message for RoutedMessageFrom { - type Result = bool; -} - #[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)] pub struct SyncData { pub edges: Vec, @@ -831,215 +388,6 @@ impl PeerMessage { } } -#[derive(Debug, Clone)] -pub enum BlockedPorts { - All, - Some(HashSet), -} - -/// Configuration for the peer-to-peer manager. -#[derive(Clone)] -pub struct NetworkConfig { - pub public_key: PublicKey, - pub secret_key: SecretKey, - pub account_id: Option, - pub addr: Option, - pub boot_nodes: Vec, - pub handshake_timeout: Duration, - pub reconnect_delay: Duration, - pub bootstrap_peers_period: Duration, - /// Maximum number of active peers. Hard limit. - pub max_num_peers: u32, - /// Minimum outbound connections a peer should have to avoid eclipse attacks. - pub minimum_outbound_peers: u32, - /// Lower bound of the ideal number of connections. - pub ideal_connections_lo: u32, - /// Upper bound of the ideal number of connections. - pub ideal_connections_hi: u32, - /// Peers which last message is was within this period of time are considered active recent peers. - pub peer_recent_time_window: Duration, - /// Number of peers to keep while removing a connection. - /// Used to avoid disconnecting from peers we have been connected since long time. - pub safe_set_size: u32, - /// Lower bound of the number of connections to archival peers to keep - /// if we are an archival node. - pub archival_peer_connections_lower_bound: u32, - /// Duration of the ban for misbehaving peers. - pub ban_window: Duration, - /// Remove expired peers. - pub peer_expiration_duration: Duration, - /// Maximum number of peer addresses we should ever send on PeersRequest. - pub max_send_peers: u32, - /// Duration for checking on stats from the peers. - pub peer_stats_period: Duration, - /// Time to persist Accounts Id in the router without removing them. - pub ttl_account_id_router: Duration, - /// Number of hops a message is allowed to travel before being dropped. - /// This is used to avoid infinite loop because of inconsistent view of the network - /// by different nodes. - pub routed_message_ttl: u8, - /// Maximum number of routes that we should keep track for each Account id in the Routing Table. - pub max_routes_to_store: usize, - /// Height horizon for highest height peers - /// For example if one peer is 1 height away from max height peer, - /// we still want to use the rest to query for state/headers/blocks. - pub highest_peer_horizon: u64, - /// Period between pushing network info to client - pub push_info_period: Duration, - /// Peers on blacklist by IP:Port. - /// Nodes will not accept or try to establish connection to such peers. - pub blacklist: HashMap, - /// Flag to disable outbound connections. When this flag is active, nodes will not try to - /// establish connection with other nodes, but will accept incoming connection if other requirements - /// are satisfied. - /// This flag should be ALWAYS FALSE. Only set to true for testing purposes. - pub outbound_disabled: bool, - /// Not clear old data, set `true` for archive nodes. - pub archive: bool, -} - -impl NetworkConfig { - pub fn verify(&self) { - if self.ideal_connections_lo + 1 >= self.ideal_connections_hi { - error!(target: "network", - "Invalid ideal_connections values. lo({}) > hi({}).", - self.ideal_connections_lo, self.ideal_connections_hi); - } - - if self.ideal_connections_hi >= self.max_num_peers { - error!(target: "network", - "max_num_peers({}) is below ideal_connections_hi({}) which may lead to connection saturation and declining new connections.", - self.max_num_peers, self.ideal_connections_hi - ); - } - - if self.outbound_disabled { - warn!(target: "network", "Outbound connections are disabled."); - } - - if self.safe_set_size <= self.minimum_outbound_peers { - error!(target: "network", - "safe_set_size({}) must be larger than minimum_outbound_peers({}).", - self.safe_set_size, - self.minimum_outbound_peers - ); - } - - if UPDATE_INTERVAL_LAST_TIME_RECEIVED_MESSAGE * 2 > self.peer_recent_time_window { - error!( - target: "network", - "Very short peer_recent_time_window({}). it should be at least twice update_interval_last_time_received_message({}).", - self.peer_recent_time_window.as_secs(), UPDATE_INTERVAL_LAST_TIME_RECEIVED_MESSAGE.as_secs() - ); - } - } -} - -/// Used to match a socket addr by IP:Port or only by IP -#[derive(Clone, Debug)] -pub enum PatternAddr { - Ip(IpAddr), - IpPort(SocketAddr), -} - -impl PatternAddr { - pub fn contains(&self, addr: &SocketAddr) -> bool { - match self { - PatternAddr::Ip(pattern) => &addr.ip() == pattern, - PatternAddr::IpPort(pattern) => addr == pattern, - } - } -} - -impl FromStr for PatternAddr { - type Err = AddrParseError; - - fn from_str(s: &str) -> Result { - if let Ok(pattern) = s.parse::() { - return Ok(PatternAddr::Ip(pattern)); - } - - s.parse::().map(PatternAddr::IpPort) - } -} - -/// Status of the known peers. -#[derive(BorshSerialize, BorshDeserialize, Serialize, Eq, PartialEq, Debug, Clone)] -pub enum KnownPeerStatus { - Unknown, - NotConnected, - Connected, - Banned(ReasonForBan, u64), -} - -impl KnownPeerStatus { - pub fn is_banned(&self) -> bool { - match self { - KnownPeerStatus::Banned(_, _) => true, - _ => false, - } - } -} - -/// Information node stores about known peers. -#[derive(BorshSerialize, BorshDeserialize, Debug, Clone)] -pub struct KnownPeerState { - pub peer_info: PeerInfo, - pub status: KnownPeerStatus, - pub first_seen: u64, - pub last_seen: u64, -} - -impl KnownPeerState { - pub fn new(peer_info: PeerInfo) -> Self { - KnownPeerState { - peer_info, - status: KnownPeerStatus::Unknown, - first_seen: to_timestamp(Utc::now()), - last_seen: to_timestamp(Utc::now()), - } - } - - pub fn first_seen(&self) -> DateTime { - from_timestamp(self.first_seen) - } - - pub fn last_seen(&self) -> DateTime { - from_timestamp(self.last_seen) - } -} - -impl TryFrom> for KnownPeerState { - type Error = Box; - - fn try_from(bytes: Vec) -> Result { - KnownPeerState::try_from_slice(&bytes).map_err(|err| err.into()) - } -} - -/// Actor message that holds the TCP stream from an inbound TCP connection -#[derive(Message)] -#[rtype(result = "()")] -pub struct InboundTcpConnect { - /// Tcp stream of the inbound connections - pub stream: TcpStream, -} - -impl InboundTcpConnect { - /// Method to create a new InboundTcpConnect message from a TCP stream - pub fn new(stream: TcpStream) -> InboundTcpConnect { - InboundTcpConnect { stream } - } -} - -/// Actor message to request the creation of an outbound TCP connection to a peer. -#[derive(Message)] -#[rtype(result = "()")] -pub struct OutboundTcpConnect { - /// Peer information of the outbound connection - pub peer_info: PeerInfo, -} - #[derive(Message, Clone, Debug)] #[rtype(result = "()")] pub struct SendMessage { @@ -1072,19 +420,6 @@ pub enum ConsolidateResponse { Reject, } -/// Unregister message from Peer to PeerManager. -#[derive(Message)] -#[rtype(result = "()")] -pub struct Unregister { - pub peer_id: PeerId, - pub peer_type: PeerType, - pub remove_from_peer_store: bool, -} - -pub struct PeerList { - pub peers: Vec, -} - /// Message from peer to peer manager #[derive(strum::AsRefStr)] pub enum PeerRequest { @@ -1104,60 +439,6 @@ pub enum PeerResponse { UpdatedEdge(EdgeInfo), } -/// Requesting peers from peer manager to communicate to a peer. -pub struct PeersRequest {} - -impl Message for PeersRequest { - type Result = PeerList; -} - -/// Received new peers from another peer. -#[derive(Message)] -#[rtype(result = "()")] -pub struct PeersResponse { - pub peers: Vec, -} - -impl MessageResponse for PeerList -where - A: Actor, - M: Message, -{ - fn handle>(self, _: &mut A::Context, tx: Option) { - if let Some(tx) = tx { - tx.send(self) - } - } -} - -/// Ban reason. -#[derive(BorshSerialize, BorshDeserialize, Serialize, Debug, Clone, PartialEq, Eq, Copy)] -pub enum ReasonForBan { - None = 0, - BadBlock = 1, - BadBlockHeader = 2, - HeightFraud = 3, - BadHandshake = 4, - BadBlockApproval = 5, - Abusive = 6, - InvalidSignature = 7, - InvalidPeerId = 8, - InvalidHash = 9, - InvalidEdge = 10, - EpochSyncNoResponse = 11, - EpochSyncInvalidResponse = 12, - EpochSyncInvalidFinalizationResponse = 13, -} - -/// Banning signal sent from Peer instance to PeerManager -/// just before Peer instance is stopped. -#[derive(Message)] -#[rtype(result = "()")] -pub struct Ban { - pub peer_id: PeerId, - pub ban_reason: ReasonForBan, -} - // TODO(#1313): Use Box #[derive(Debug, Clone, PartialEq, strum::AsRefStr)] #[allow(clippy::large_enum_variant)] @@ -1270,14 +551,6 @@ pub enum NetworkRequests { Challenge(Challenge), } -/// Messages from PeerManager to Peer -#[derive(Message, Debug)] -#[rtype(result = "()")] -pub enum PeerManagerRequest { - BanPeer(ReasonForBan), - UnregisterPeer, -} - pub struct EdgeList { pub edges: Vec, pub edges_info_shared: Arc>>, @@ -1296,13 +569,6 @@ pub struct FullPeerInfo { pub edge_info: EdgeInfo, } -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct KnownProducer { - pub account_id: AccountId, - pub addr: Option, - pub peer_id: PeerId, -} - #[derive(Debug)] pub struct NetworkInfo { pub active_peers: Vec, @@ -1313,8 +579,6 @@ pub struct NetworkInfo { pub received_bytes_per_sec: u64, /// Accounts of known block and chunk producers from routing table. pub known_producers: Vec, - #[cfg(feature = "metric_recorder")] - pub metric_recorder: MetricRecorder, pub peer_counter: usize, } @@ -1356,68 +620,6 @@ impl Message for NetworkRequests { type Result = NetworkResponses; } -#[derive(PartialEq, Eq, Clone, Debug, BorshSerialize, BorshDeserialize)] -pub struct StateResponseInfoV1 { - pub shard_id: ShardId, - pub sync_hash: CryptoHash, - pub state_response: ShardStateSyncResponseV1, -} - -#[derive(PartialEq, Eq, Clone, Debug, BorshSerialize, BorshDeserialize)] -pub struct StateResponseInfoV2 { - pub shard_id: ShardId, - pub sync_hash: CryptoHash, - pub state_response: ShardStateSyncResponse, -} - -#[derive(PartialEq, Eq, Clone, Debug, BorshSerialize, BorshDeserialize)] -pub enum StateResponseInfo { - V1(StateResponseInfoV1), - V2(StateResponseInfoV2), -} - -impl StateResponseInfo { - pub fn shard_id(&self) -> ShardId { - match self { - Self::V1(info) => info.shard_id, - Self::V2(info) => info.shard_id, - } - } - - pub fn sync_hash(&self) -> CryptoHash { - match self { - Self::V1(info) => info.sync_hash, - Self::V2(info) => info.sync_hash, - } - } - - pub fn take_state_response(self) -> ShardStateSyncResponse { - match self { - Self::V1(info) => ShardStateSyncResponse::V1(info.state_response), - Self::V2(info) => info.state_response, - } - } -} - -#[cfg(feature = "adversarial")] -#[derive(Debug)] -pub enum NetworkAdversarialMessage { - AdvProduceBlocks(u64, bool), - AdvSwitchToHeight(u64), - AdvDisableHeaderSync, - AdvDisableDoomslug, - AdvGetSavedBlocks, - AdvCheckStorageConsistency, - AdvSetSyncInfo(u64), -} - -#[cfg(feature = "sandbox")] -#[derive(Debug)] -pub enum NetworkSandboxMessage { - SandboxPatchState(Vec), - SandboxPatchStateStatus, -} - #[derive(Debug, strum::AsRefStr, AsStaticStr)] // TODO(#1313): Use Box #[allow(clippy::large_enum_variant)] @@ -1513,174 +715,6 @@ impl Message for NetworkClientMessages { type Result = NetworkClientResponses; } -#[derive(AsStaticStr)] -pub enum NetworkViewClientMessages { - #[cfg(feature = "adversarial")] - Adversarial(NetworkAdversarialMessage), - - /// Transaction status query - TxStatus { tx_hash: CryptoHash, signer_account_id: AccountId }, - /// Transaction status response - TxStatusResponse(Box), - /// Request for receipt outcome - ReceiptOutcomeRequest(CryptoHash), - /// Receipt outcome response - ReceiptOutcomeResponse(Box), - /// Request a block. - BlockRequest(CryptoHash), - /// Request headers. - BlockHeadersRequest(Vec), - /// State request header. - StateRequestHeader { shard_id: ShardId, sync_hash: CryptoHash }, - /// State request part. - StateRequestPart { shard_id: ShardId, sync_hash: CryptoHash, part_id: u64 }, - /// A request for a light client info during Epoch Sync - EpochSyncRequest { epoch_id: EpochId }, - /// A request for headers and proofs during Epoch Sync - EpochSyncFinalizationRequest { epoch_id: EpochId }, - /// Get Chain information from Client. - GetChainInfo, - /// Account announcements that needs to be validated before being processed. - /// They are paired with last epoch id known to this announcement, in order to accept only - /// newer announcements. - AnnounceAccount(Vec<(AnnounceAccount, Option)>), -} - -#[derive(Debug)] -pub enum NetworkViewClientResponses { - /// Transaction execution outcome - TxStatus(Box), - /// Response to general queries - QueryResponse { query_id: String, response: Result }, - /// Receipt outcome response - ReceiptOutcomeResponse(Box), - /// Block response. - Block(Box), - /// Headers response. - BlockHeaders(Vec), - /// Chain information. - ChainInfo { - genesis_id: GenesisId, - height: BlockHeight, - tracked_shards: Vec, - archival: bool, - }, - /// Response to state request. - StateResponse(Box), - /// Valid announce accounts. - AnnounceAccount(Vec), - /// A response to a request for a light client block during Epoch Sync - EpochSyncResponse(EpochSyncResponse), - /// A response to a request for headers and proofs during Epoch Sync - EpochSyncFinalizationResponse(EpochSyncFinalizationResponse), - /// Ban peer for malicious behavior. - Ban { ban_reason: ReasonForBan }, - /// Response not needed - NoResponse, -} - -impl MessageResponse for NetworkViewClientResponses -where - A: Actor, - M: Message, -{ - fn handle>(self, _: &mut A::Context, tx: Option) { - if let Some(tx) = tx { - tx.send(self) - } - } -} - -impl Message for NetworkViewClientMessages { - type Result = NetworkViewClientResponses; -} - -/// Peer stats query. -pub struct QueryPeerStats {} - -/// Peer stats result -#[derive(Debug)] -pub struct PeerStatsResult { - /// Chain info. - pub chain_info: PeerChainInfoV2, - /// Number of bytes we've received from the peer. - pub received_bytes_per_sec: u64, - /// Number of bytes we've sent to the peer. - pub sent_bytes_per_sec: u64, - /// Returns if this peer is abusive and should be banned. - pub is_abusive: bool, - /// Counts of incoming/outgoing messages from given peer. - pub message_counts: (u64, u64), -} - -impl MessageResponse for PeerStatsResult -where - A: Actor, - M: Message, -{ - fn handle>(self, _: &mut A::Context, tx: Option) { - if let Some(tx) = tx { - tx.send(self) - } - } -} - -impl Message for QueryPeerStats { - type Result = PeerStatsResult; -} - -#[derive(Clone, Debug, Eq, PartialEq, BorshSerialize, BorshDeserialize)] -pub struct PartialEncodedChunkRequestMsg { - pub chunk_hash: ChunkHash, - pub part_ords: Vec, - pub tracking_shards: HashSet, -} - -#[derive(Clone, Debug, Eq, PartialEq, BorshSerialize, BorshDeserialize)] -pub struct PartialEncodedChunkResponseMsg { - pub chunk_hash: ChunkHash, - pub parts: Vec, - pub receipts: Vec, -} - -/// Message for chunk part owners to forward their parts to validators tracking that shard. -/// This reduces the number of requests a node tracking a shard needs to send to obtain enough -/// parts to reconstruct the message (in the best case no such requests are needed). -#[derive(Clone, Debug, Eq, PartialEq, BorshSerialize, BorshDeserialize)] -pub struct PartialEncodedChunkForwardMsg { - pub chunk_hash: ChunkHash, - pub inner_header_hash: CryptoHash, - pub merkle_root: CryptoHash, - pub signature: Signature, - pub prev_block_hash: CryptoHash, - pub height_created: BlockHeight, - pub shard_id: ShardId, - pub parts: Vec, -} - -impl PartialEncodedChunkForwardMsg { - pub fn from_header_and_parts( - header: &ShardChunkHeader, - parts: Vec, - ) -> Self { - Self { - chunk_hash: header.chunk_hash(), - inner_header_hash: header.inner_header_hash(), - merkle_root: header.encoded_merkle_root(), - signature: header.signature().clone(), - prev_block_hash: header.prev_block_hash(), - height_created: header.height_created(), - shard_id: header.shard_id(), - parts, - } - } - - pub fn is_valid_hash(&self) -> bool { - let correct_hash = combine_hash(self.inner_header_hash, self.merkle_root); - ChunkHash(correct_hash) == self.chunk_hash - } -} - /// Adapter to break dependency of sub-components on the network requests. /// For tests use MockNetworkAdapter that accumulates the requests to network. pub trait NetworkAdapter: Sync + Send { @@ -1735,16 +769,15 @@ impl NetworkAdapter for NetworkRecipient { #[cfg(test)] mod tests { - use std::mem::size_of; - use super::*; + // NOTE: this has it's counterpart in `near_network_primitives::types::tests` const ALLOWED_SIZE: usize = 1 << 20; const NOTIFY_SIZE: usize = 1024; macro_rules! assert_size { ($type:ident) => { - let struct_size = size_of::<$type>(); + let struct_size = std::mem::size_of::<$type>(); if struct_size >= NOTIFY_SIZE { println!("The size of {} is {}", stringify!($type), struct_size); } @@ -1754,18 +787,11 @@ mod tests { #[test] fn test_enum_size() { - assert_size!(PeerType); - assert_size!(PeerStatus); assert_size!(HandshakeFailureReason); - assert_size!(RoutedMessageBody); - assert_size!(PeerIdOrHash); - assert_size!(KnownPeerStatus); assert_size!(ConsolidateResponse); assert_size!(PeerRequest); assert_size!(PeerResponse); - assert_size!(ReasonForBan); assert_size!(NetworkRequests); - assert_size!(PeerManagerRequest); assert_size!(NetworkResponses); assert_size!(NetworkClientMessages); assert_size!(NetworkClientResponses); @@ -1773,63 +799,13 @@ mod tests { #[test] fn test_struct_size() { - assert_size!(PeerInfo); - assert_size!(PeerChainInfoV2); assert_size!(Handshake); - assert_size!(AnnounceAccountRoute); - assert_size!(AnnounceAccount); assert_size!(Ping); assert_size!(Pong); - assert_size!(RawRoutedMessage); - assert_size!(RoutedMessageNoSignature); - assert_size!(RoutedMessage); - assert_size!(RoutedMessageFrom); assert_size!(SyncData); - assert_size!(NetworkConfig); - assert_size!(KnownPeerState); - assert_size!(InboundTcpConnect); - assert_size!(OutboundTcpConnect); assert_size!(SendMessage); assert_size!(Consolidate); - assert_size!(Unregister); - assert_size!(PeerList); - assert_size!(PeersRequest); - assert_size!(PeersResponse); - assert_size!(Ban); assert_size!(FullPeerInfo); assert_size!(NetworkInfo); - assert_size!(StateResponseInfoV1); - assert_size!(QueryPeerStats); - assert_size!(PartialEncodedChunkRequestMsg); - } - - #[test] - fn routed_message_body_compatibility_smoke_test() { - #[track_caller] - fn check(msg: RoutedMessageBody, expected: &[u8]) { - let actual = msg.try_to_vec().unwrap(); - assert_eq!(actual.as_slice(), expected); - } - - check( - RoutedMessageBody::TxStatusRequest("test_x".parse().unwrap(), CryptoHash([42; 32])), - &[ - 2, 6, 0, 0, 0, 116, 101, 115, 116, 95, 120, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, - ], - ); - - check( - RoutedMessageBody::VersionedStateResponse(StateResponseInfo::V1(StateResponseInfoV1 { - shard_id: 62, - sync_hash: CryptoHash([92; 32]), - state_response: ShardStateSyncResponseV1 { header: None, part: None }, - })), - &[ - 17, 0, 62, 0, 0, 0, 0, 0, 0, 0, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, - 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 0, 0, - ], - ); } } diff --git a/integration-tests/Cargo.toml b/integration-tests/Cargo.toml index a28e03913b6..b73d3c2d5e7 100644 --- a/integration-tests/Cargo.toml +++ b/integration-tests/Cargo.toml @@ -51,7 +51,6 @@ performance_stats = ["nearcore/performance_stats", "near-network/performance_sta regression_tests = [] expensive_tests = [] adversarial = ["nearcore/adversarial"] -metric_recorder = ["near-client-primitives/metric_recorder"] protocol_feature_alt_bn128 = [ "near-primitives/protocol_feature_alt_bn128", "node-runtime/protocol_feature_alt_bn128", diff --git a/integration-tests/tests/client/process_blocks.rs b/integration-tests/tests/client/process_blocks.rs index 17ff012159a..a958e47177c 100644 --- a/integration-tests/tests/client/process_blocks.rs +++ b/integration-tests/tests/client/process_blocks.rs @@ -24,8 +24,6 @@ use near_client::test_utils::{setup_client, setup_mock, TestEnv}; use near_client::{Client, GetBlock, GetBlockWithMerkleTree}; use near_crypto::{InMemorySigner, KeyType, PublicKey, Signature, Signer}; use near_logger_utils::init_test_logger; -#[cfg(feature = "metric_recorder")] -use near_network::recorder::MetricRecorder; use near_network::routing::EdgeInfo; use near_network::test_utils::{wait_or_panic, MockNetworkAdapter}; use near_network::types::{NetworkInfo, PeerChainInfoV2, ReasonForBan}; @@ -916,8 +914,6 @@ fn client_sync_headers() { sent_bytes_per_sec: 0, received_bytes_per_sec: 0, known_producers: vec![], - #[cfg(feature = "metric_recorder")] - metric_recorder: MetricRecorder::default(), peer_counter: 0, })); wait_or_panic(2000); diff --git a/nearcore/Cargo.toml b/nearcore/Cargo.toml index 5a119d5c3ed..c311eff3e42 100644 --- a/nearcore/Cargo.toml +++ b/nearcore/Cargo.toml @@ -63,7 +63,6 @@ memory_stats = ["near-performance-metrics/memory_stats"] c_memory_stats = ["near-performance-metrics/c_memory_stats"] adversarial = ["near-client/adversarial", "near-network/adversarial", "near-store/adversarial", "near-jsonrpc/adversarial"] expensive_tests = ["near-client/expensive_tests", "near-epoch-manager/expensive_tests", "near-chain/expensive_tests"] -metric_recorder = ["near-network/metric_recorder", "near-client/metric_recorder"] no_cache = ["node-runtime/no_cache", "near-store/no_cache", "near-chain/no_cache"] delay_detector = ["near-client/delay_detector"] rosetta_rpc = ["near-rosetta-rpc"] diff --git a/neard/Cargo.toml b/neard/Cargo.toml index 585126317c5..c8db07259ea 100644 --- a/neard/Cargo.toml +++ b/neard/Cargo.toml @@ -37,7 +37,6 @@ memory_stats = ["nearcore/memory_stats", "near-rust-allocator-proxy"] c_memory_stats = ["nearcore/c_memory_stats"] adversarial = ["nearcore/adversarial"] expensive_tests = ["nearcore/expensive_tests"] -metric_recorder = ["nearcore/metric_recorder"] no_cache = ["nearcore/no_cache"] delay_detector = ["nearcore/delay_detector"] rosetta_rpc = ["nearcore/rosetta_rpc"]