diff --git a/chain/network-primitives/src/network_protocol/edge.rs b/chain/network-primitives/src/network_protocol/edge.rs index 03f7eb70ef9..0ae9f1f0188 100644 --- a/chain/network-primitives/src/network_protocol/edge.rs +++ b/chain/network-primitives/src/network_protocol/edge.rs @@ -43,6 +43,11 @@ impl Edge { Edge(Arc::new(EdgeInner::new(peer0, peer1, nonce, signature0, signature1))) } + pub fn with_removal_info(mut self, ri: Option<(bool, Signature)>) -> Edge { + Arc::make_mut(&mut self.0).removal_info = ri; + self + } + pub fn key(&self) -> &(PeerId, PeerId) { &self.0.key } diff --git a/chain/network/src/lib.rs b/chain/network/src/lib.rs index d1c687ffa8c..a110f022949 100644 --- a/chain/network/src/lib.rs +++ b/chain/network/src/lib.rs @@ -16,6 +16,7 @@ pub mod private_actix; pub(crate) mod private_actix; pub mod routing; pub(crate) mod stats; +pub(crate) mod store; pub mod test_utils; #[cfg(test)] mod tests; diff --git a/chain/network/src/store/mod.rs b/chain/network/src/store/mod.rs new file mode 100644 index 00000000000..50ea3499df2 --- /dev/null +++ b/chain/network/src/store/mod.rs @@ -0,0 +1,149 @@ +#![allow(dead_code)] //TODO: remove, it will be in use in the next PR. +/// Store module defines atomic DB operations on top of schema module. +/// All transactions should be implemented within this module, +/// in particular schema::StoreUpdate is not exported. +use near_network_primitives::types::{Edge, KnownPeerState}; +use near_primitives::network::{AnnounceAccount, PeerId}; +use near_primitives::types::AccountId; +use std::collections::HashSet; +use tracing::debug; + +mod schema; +#[cfg(test)] +pub mod testonly; + +/// Opaque error type representing storage errors. +/// +/// Invariant: any store error is a critical operational operational error +/// which signals about data corruption. It wouldn't be wrong to replace all places /// where the error originates with outright panics. +/// +/// If you have an error condition which needs to be handled somehow, it should be +/// some *other* error type. +#[derive(thiserror::Error, Debug)] +#[error("{0}")] +pub(crate) struct Error(schema::Error); + +/// Store allows for performing synchronous atomic operations on the DB. +/// In particular it doesn't implement Clone and requires &mut self for +/// methods writing to the DB. +pub(crate) struct Store(schema::Store); + +impl Store { + pub fn new(s: near_store::Store) -> Self { + Self(schema::Store::new(s)) + } +} + +/// Everytime a group of peers becomes unreachable at the same time; We store edges belonging to +/// them in components. We remove all of those edges from memory, and save them to database, +/// If any of them become reachable again, we re-add whole component. +/// +/// To store components, we have following column in the DB. +/// DBCol::LastComponentNonce -> stores component_nonce: u64, which is the lowest nonce that +/// hasn't been used yet. If new component gets created it will use +/// this nonce. +/// DBCol::ComponentEdges -> Mapping from `component_nonce` to list of edges +/// DBCol::PeerComponent -> Mapping from `peer_id` to last component nonce if there +/// exists one it belongs to. +impl Store { + /// Inserts (account_id,aa) to the AccountAnnouncements column. + pub fn set_account_announcement( + &mut self, + account_id: &AccountId, + aa: &AnnounceAccount, + ) -> Result<(), Error> { + let mut update = self.0.new_update(); + update.set::(account_id, aa); + update.commit().map_err(Error) + } + + /// Fetches row with key account_id from the AccountAnnouncements column. + pub fn get_account_announcement( + &self, + account_id: &AccountId, + ) -> Result, Error> { + self.0.get::(account_id).map_err(Error) + } + + /// Atomically stores a graph component consisting of and + /// to the DB. On completion, all peers are considered members of the new component + /// (even if they were members of a different component so far). + /// The name (even though technically correct) is misleading, because the do + /// NOT have to constitute a CONNECTED component. I'm not fixing that because + /// the whole routing table in the current form is scheduled for deprecation. + pub fn push_component( + &mut self, + peers: &HashSet, + edges: &Vec, + ) -> Result<(), Error> { + debug!(target: "network", "push_component: moving {} peers from memory to DB", peers.len()); + let component = + self.0.get::(&()).map_err(Error)?.unwrap_or(0) + 1; + let mut update = self.0.new_update(); + update.set::(&(), &component); + update.set::(&component, &edges); + for peer_id in peers { + update.set::(peer_id, &component); + } + update.commit().map_err(Error) + } + + /// Reads and deletes from DB the component that is a member of. + /// Returns Ok(vec![]) if peer_id is not a member of any component. + pub fn pop_component(&mut self, peer_id: &PeerId) -> Result, Error> { + // Fetch the component assigned to the peer. + let component = match self.0.get::(peer_id).map_err(Error)? { + Some(c) => c, + None => return Ok(vec![]), + }; + let edges = + self.0.get::(&component).map_err(Error)?.unwrap_or(vec![]); + let mut update = self.0.new_update(); + update.delete::(&component); + let mut peers_checked = HashSet::new(); + for edge in &edges { + let key = edge.key(); + for peer_id in [&key.0, &key.1] { + if !peers_checked.insert(peer_id.clone()) { + // Store doesn't accept 2 mutations modifying the same row in a single + // transaction, even if they are identical. Therefore tracking peers_checked + // is critical for correctness, rather than just an optimization minimizing + // the number of lookups. + continue; + } + match self.0.get::(&peer_id).map_err(Error)? { + Some(c) if c == component => update.delete::(&peer_id), + _ => {} + } + } + } + update.commit().map_err(Error)?; + Ok(edges) + } +} + +// PeerStore storage. +impl Store { + /// Inserts (peer_id,peer_state) to Peers column. + pub fn set_peer_state( + &mut self, + peer_id: &PeerId, + peer_state: &KnownPeerState, + ) -> Result<(), Error> { + let mut update = self.0.new_update(); + update.set::(peer_id, peer_state); + update.commit().map_err(Error) + } + + /// Deletes rows with keys in from Peers column. + pub fn delete_peer_states(&mut self, peers: &[PeerId]) -> Result<(), Error> { + let mut update = self.0.new_update(); + peers.iter().for_each(|p| update.delete::(p)); + update.commit().map_err(Error) + } + + /// Reads the whole Peers column. + pub fn list_peer_states(&self) -> Result, Error> { + self.0.iter::().collect::>().map_err(Error) + } +} diff --git a/chain/network/src/store/schema/mod.rs b/chain/network/src/store/schema/mod.rs new file mode 100644 index 00000000000..1c3ecadd8c7 --- /dev/null +++ b/chain/network/src/store/schema/mod.rs @@ -0,0 +1,284 @@ +/// Schema module defines a type-safe access to the DB. +/// It is a concise definition of key and value types +/// of the DB columns. For high level access see store.rs. +use borsh::{BorshDeserialize, BorshSerialize}; +use near_crypto::Signature; +use near_network_primitives::types as primitives; +use near_primitives::account::id::AccountId; +use near_primitives::network::{AnnounceAccount, PeerId}; +use near_store::DBCol; + +#[cfg(test)] +mod tests; + +pub struct AccountIdFormat; +impl Format for AccountIdFormat { + type T = AccountId; + fn to_vec(a: &AccountId) -> Vec { + a.as_ref().as_bytes().into() + } + fn from_slice(a: &[u8]) -> Result { + std::str::from_utf8(a).map_err(invalid_data)?.parse().map_err(invalid_data) + } +} + +#[derive(BorshSerialize, BorshDeserialize)] +enum KnownPeerStatus { + Unknown, + NotConnected, + Connected, + /// UNIX timestamps in nanos. + Banned(primitives::ReasonForBan, u64), +} + +impl From for KnownPeerStatus { + fn from(s: primitives::KnownPeerStatus) -> Self { + match s { + primitives::KnownPeerStatus::Unknown => Self::Unknown, + primitives::KnownPeerStatus::NotConnected => Self::NotConnected, + primitives::KnownPeerStatus::Connected => Self::Connected, + primitives::KnownPeerStatus::Banned(r, t) => Self::Banned(r, t), + } + } +} + +impl From for primitives::KnownPeerStatus { + fn from(s: KnownPeerStatus) -> primitives::KnownPeerStatus { + match s { + KnownPeerStatus::Unknown => primitives::KnownPeerStatus::Unknown, + KnownPeerStatus::NotConnected => primitives::KnownPeerStatus::NotConnected, + KnownPeerStatus::Connected => primitives::KnownPeerStatus::Connected, + KnownPeerStatus::Banned(r, t) => primitives::KnownPeerStatus::Banned(r, t), + } + } +} + +/// A Borsh representation of the primitives::KnownPeerState. +/// TODO: Currently primitives::KnownPeerState implements Borsh serialization +/// directly, but eventually direct serialization should be removed +/// so that the storage format doesn't leak to the business logic. +/// TODO: Currently primitives::KnownPeerState is identical +/// to the KnownPeerStateRepr, but in the following PR the +/// timestamp type (currently u64), will be replaced with time::Utc. +#[derive(BorshSerialize, BorshDeserialize)] +pub struct KnownPeerStateRepr { + peer_info: primitives::PeerInfo, + status: KnownPeerStatus, + /// UNIX timestamps in nanos. + first_seen: u64, + last_seen: u64, +} + +impl BorshRepr for KnownPeerStateRepr { + type T = primitives::KnownPeerState; + fn to_repr(s: &primitives::KnownPeerState) -> Self { + Self { + peer_info: s.peer_info.clone(), + status: s.status.clone().into(), + first_seen: s.first_seen, + last_seen: s.last_seen, + } + } + + fn from_repr(s: Self) -> Result { + Ok(primitives::KnownPeerState { + peer_info: s.peer_info, + status: s.status.into(), + first_seen: s.first_seen, + last_seen: s.last_seen, + }) + } +} + +#[derive(BorshSerialize, BorshDeserialize)] +pub struct EdgeRepr { + key: (PeerId, PeerId), + nonce: u64, + signature0: Signature, + signature1: Signature, + removal_info: Option<(bool, Signature)>, +} + +impl BorshRepr for EdgeRepr { + type T = primitives::Edge; + + fn to_repr(e: &Self::T) -> Self { + Self { + key: e.key().clone(), + nonce: e.nonce(), + signature0: e.signature0().clone(), + signature1: e.signature1().clone(), + removal_info: e.removal_info().cloned(), + } + } + fn from_repr(e: Self) -> Result { + Ok(primitives::Edge::new(e.key.0, e.key.1, e.nonce, e.signature0, e.signature1) + .with_removal_info(e.removal_info)) + } +} + +///////////////////////////////////////////// +// Columns + +pub struct AccountAnnouncements; +impl Column for AccountAnnouncements { + const COL: DBCol = DBCol::AccountAnnouncements; + type Key = AccountIdFormat; + type Value = Borsh; +} + +pub struct Peers; +impl Column for Peers { + const COL: DBCol = DBCol::Peers; + type Key = Borsh; + type Value = KnownPeerStateRepr; +} + +pub struct PeerComponent; +impl Column for PeerComponent { + const COL: DBCol = DBCol::PeerComponent; + type Key = Borsh; + type Value = Borsh; +} + +pub struct ComponentEdges; +impl Column for ComponentEdges { + const COL: DBCol = DBCol::ComponentEdges; + type Key = U64LE; + type Value = Vec; +} + +pub struct LastComponentNonce; +impl Column for LastComponentNonce { + const COL: DBCol = DBCol::LastComponentNonce; + type Key = Borsh<()>; + type Value = Borsh; +} + +//////////////////////////////////////////////////// +// Storage + +pub type Error = std::io::Error; +fn invalid_data(e: impl std::error::Error + Send + Sync + 'static) -> Error { + Error::new(std::io::ErrorKind::InvalidData, e) +} + +pub trait Format { + type T; + fn to_vec(a: &Self::T) -> Vec; + fn from_slice(a: &[u8]) -> Result; +} + +/// BorshRepr defines an isomorphism between T and Self, +/// where Self implements serialization to Borsh. +/// Format trait is automatically derived for BorshRepr instances, +/// by first converting T to Self and then serializing to Borsh +/// (Format::from_slice analogically). +pub trait BorshRepr: BorshSerialize + BorshDeserialize { + type T; + fn to_repr(a: &Self::T) -> Self; + fn from_repr(s: Self) -> Result; +} + +impl Format for R { + type T = R::T; + fn to_vec(a: &Self::T) -> Vec { + R::to_repr(a).try_to_vec().unwrap() + } + fn from_slice(a: &[u8]) -> Result { + R::from_repr(R::try_from_slice(a)?) + } +} + +/// This is a wrapper which doesn't change the borsh encoding. +/// It automatically derives BorshRepr by using the trivial embedding +/// as the isomorphism (therefore the derived Format of Borsh is +/// just borsh serialization of T). +#[derive(BorshSerialize, BorshDeserialize)] +pub struct Borsh(T); + +impl BorshRepr for Borsh { + type T = T; + fn to_repr(a: &T) -> Self { + Self(a.clone()) + } + fn from_repr(a: Self) -> Result { + Ok(a.0) + } +} + +/// Combinator which derives BorshRepr for Vec, given +/// BorshRepr for R. +impl BorshRepr for Vec { + type T = Vec; + fn to_repr(a: &Self::T) -> Vec { + a.iter().map(R::to_repr).collect() + } + fn from_repr(a: Vec) -> Result { + a.into_iter().map(R::from_repr).collect() + } +} + +// Little endian representation for u64. +pub struct U64LE; +impl Format for U64LE { + type T = u64; + fn to_vec(a: &u64) -> Vec { + a.to_le_bytes().into() + } + fn from_slice(a: &[u8]) -> Result { + a.try_into().map(u64::from_le_bytes).map_err(invalid_data) + } +} + +/// Column is a type-safe specification of the DB column. +/// It defines how to encode/decode keys and values stored in the column. +pub trait Column { + const COL: DBCol; + type Key: Format; + type Value: Format; +} + +/// A type-safe wrapper of the near_store::Store. +#[derive(Clone)] +pub struct Store(near_store::Store); + +/// A type-safe wrapper of the near_store::StoreUpdate. +pub struct StoreUpdate(near_store::StoreUpdate); + +impl Store { + pub fn new(s: near_store::Store) -> Store { + Store(s) + } + pub fn new_update(&mut self) -> StoreUpdate { + StoreUpdate(self.0.store_update()) + } + pub fn iter( + &self, + ) -> impl Iterator::T, ::T), Error>> + '_ + { + self.0.iter(C::COL).map(|(k, v)| Ok((C::Key::from_slice(&k)?, C::Value::from_slice(&v)?))) + } + pub fn get( + &self, + k: &::T, + ) -> Result::T>, Error> { + let v = self.0.get(C::COL, C::Key::to_vec(k).as_ref())?; + Ok(match v { + Some(v) => Some(C::Value::from_slice(&v)?), + None => None, + }) + } +} + +impl StoreUpdate { + pub fn set(&mut self, k: &::T, v: &::T) { + self.0.set(C::COL, C::Key::to_vec(k).as_ref(), C::Value::to_vec(v).as_ref()) + } + pub fn delete(&mut self, k: &::T) { + self.0.delete(C::COL, C::Key::to_vec(k).as_ref()) + } + pub fn commit(self) -> Result<(), Error> { + self.0.commit() + } +} diff --git a/chain/network/src/store/schema/tests.rs b/chain/network/src/store/schema/tests.rs new file mode 100644 index 00000000000..38b97c06b05 --- /dev/null +++ b/chain/network/src/store/schema/tests.rs @@ -0,0 +1,13 @@ +use super::*; +use crate::tests::data; +use crate::tests::util; + +#[test] +fn borsh_wrapper_is_transparent() { + let mut rng = util::make_rng(423423); + let rng = &mut rng; + let s1 = data::make_signer(rng); + let s2 = data::make_signer(rng); + let e = data::make_edge(rng, &s1, &s2); + assert_eq!(Borsh(e.clone()).try_to_vec().unwrap(), e.try_to_vec().unwrap()); +} diff --git a/chain/network/src/store/testonly.rs b/chain/network/src/store/testonly.rs new file mode 100644 index 00000000000..1202d971641 --- /dev/null +++ b/chain/network/src/store/testonly.rs @@ -0,0 +1,58 @@ +use super::*; +use std::collections::HashMap; + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub struct Component { + pub peers: Vec, + pub edges: Vec, +} + +impl Default for Component { + fn default() -> Self { + Self { peers: vec![], edges: vec![] } + } +} + +impl Component { + pub fn normal(mut self) -> Self { + self.peers.sort(); + self.edges.sort_by(|a, b| a.key().cmp(b.key())); + self + } +} + +impl Store { + /// Reads all the components from the database. + /// Panics if any of the invariants has been violated. + pub fn list_components(&self) -> Vec { + let edges: HashMap<_, _> = + self.0.iter::().map(|x| x.unwrap()).collect(); + let peers: HashMap<_, _> = + self.0.iter::().map(|x| x.unwrap()).collect(); + let lcn: HashMap<(), _> = + self.0.iter::().map(|x| x.unwrap()).collect(); + // all component nonces should be <= LastComponentNonce + let lcn = lcn.get(&()).unwrap_or(&0); + for (c, _) in &edges { + assert!(c <= lcn); + } + for (_, c) in &peers { + assert!(c <= lcn); + } + // Each edge has to be incident to at least one peer in the same component. + for (c, es) in &edges { + for e in es { + let key = e.key(); + assert!(peers.get(&key.0) == Some(c) || peers.get(&key.1) == Some(c)); + } + } + let mut cs = HashMap::::new(); + for (c, es) in edges { + cs.entry(c).or_default().edges = es; + } + for (p, c) in peers { + cs.entry(c).or_default().peers.push(p); + } + cs.into_iter().map(|(_, v)| v).collect() + } +}