Skip to content

Commit

Permalink
Implemented store module for near-network (#7010)
Browse files Browse the repository at this point in the history
It consists of 2 layers:
1. schema definition layer - enforcing type-safe access to the DB columns
2. atomic access layer - defining all DB transactions, which are the high level operations that business logic can execute.
  • Loading branch information
pompon0 authored and nikurt committed Jun 13, 2022
1 parent e13fbee commit e49ae5a
Show file tree
Hide file tree
Showing 7 changed files with 523 additions and 4 deletions.
9 changes: 7 additions & 2 deletions chain/network-primitives/src/network_protocol/edge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl PartialEdgeInfo {
}

#[cfg_attr(feature = "deepsize_feature", derive(deepsize::DeepSizeOf))]
#[derive(BorshSerialize, BorshDeserialize, Clone, Debug, PartialEq, Eq)]
#[derive(BorshSerialize, BorshDeserialize, Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "test_features", derive(serde::Serialize, serde::Deserialize))]
pub struct Edge(pub Arc<EdgeInner>);

Expand All @@ -43,6 +43,11 @@ impl Edge {
Edge(Arc::new(EdgeInner::new(peer0, peer1, nonce, signature0, signature1)))
}

pub fn with_removal_info(mut self, ri: Option<(bool, Signature)>) -> Edge {
Arc::make_mut(&mut self.0).removal_info = ri;
self
}

pub fn key(&self) -> &(PeerId, PeerId) {
&self.0.key
}
Expand Down Expand Up @@ -223,7 +228,7 @@ impl Edge {
/// We need to keep explicitly `Removed` edges, in order to be able to proof, that given `Edge`
/// isn't `Active` anymore. In case, someone delivers a proof that the edge existed.
#[cfg_attr(feature = "deepsize_feature", derive(deepsize::DeepSizeOf))]
#[derive(BorshSerialize, BorshDeserialize, Clone, Debug, PartialEq, Eq)]
#[derive(BorshSerialize, BorshDeserialize, Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "test_features", derive(serde::Serialize, serde::Deserialize))]
pub struct EdgeInner {
/// Each edge consists of unordered pair of public keys of both peers.
Expand Down
1 change: 1 addition & 0 deletions chain/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod private_actix;
pub(crate) mod private_actix;
pub mod routing;
pub(crate) mod stats;
pub(crate) mod store;
pub mod test_utils;
#[cfg(test)]
mod tests;
Expand Down
149 changes: 149 additions & 0 deletions chain/network/src/store/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#![allow(dead_code)] //TODO: remove, it will be in use in the next PR.
/// Store module defines atomic DB operations on top of schema module.
/// All transactions should be implemented within this module,
/// in particular schema::StoreUpdate is not exported.
use near_network_primitives::types::{Edge, KnownPeerState};
use near_primitives::network::{AnnounceAccount, PeerId};
use near_primitives::types::AccountId;
use std::collections::HashSet;
use tracing::debug;

mod schema;
#[cfg(test)]
pub mod testonly;

/// Opaque error type representing storage errors.
///
/// Invariant: any store error is a critical operational operational error
/// which signals about data corruption. It wouldn't be wrong to replace all places /// where the error originates with outright panics.
///
/// If you have an error condition which needs to be handled somehow, it should be
/// some *other* error type.
#[derive(thiserror::Error, Debug)]
#[error("{0}")]
pub(crate) struct Error(schema::Error);

/// Store allows for performing synchronous atomic operations on the DB.
/// In particular it doesn't implement Clone and requires &mut self for
/// methods writing to the DB.
pub(crate) struct Store(schema::Store);

impl Store {
pub fn new(s: near_store::Store) -> Self {
Self(schema::Store::new(s))
}
}

/// Everytime a group of peers becomes unreachable at the same time; We store edges belonging to
/// them in components. We remove all of those edges from memory, and save them to database,
/// If any of them become reachable again, we re-add whole component.
///
/// To store components, we have following column in the DB.
/// DBCol::LastComponentNonce -> stores component_nonce: u64, which is the lowest nonce that
/// hasn't been used yet. If new component gets created it will use
/// this nonce.
/// DBCol::ComponentEdges -> Mapping from `component_nonce` to list of edges
/// DBCol::PeerComponent -> Mapping from `peer_id` to last component nonce if there
/// exists one it belongs to.
impl Store {
/// Inserts (account_id,aa) to the AccountAnnouncements column.
pub fn set_account_announcement(
&mut self,
account_id: &AccountId,
aa: &AnnounceAccount,
) -> Result<(), Error> {
let mut update = self.0.new_update();
update.set::<schema::AccountAnnouncements>(account_id, aa);
update.commit().map_err(Error)
}

/// Fetches row with key account_id from the AccountAnnouncements column.
pub fn get_account_announcement(
&self,
account_id: &AccountId,
) -> Result<Option<AnnounceAccount>, Error> {
self.0.get::<schema::AccountAnnouncements>(account_id).map_err(Error)
}

/// Atomically stores a graph component consisting of <peers> and <edges>
/// to the DB. On completion, all peers are considered members of the new component
/// (even if they were members of a different component so far).
/// The name (even though technically correct) is misleading, because the <edges> do
/// NOT have to constitute a CONNECTED component. I'm not fixing that because
/// the whole routing table in the current form is scheduled for deprecation.
pub fn push_component(
&mut self,
peers: &HashSet<PeerId>,
edges: &Vec<Edge>,
) -> Result<(), Error> {
debug!(target: "network", "push_component: moving {} peers from memory to DB", peers.len());
let component =
self.0.get::<schema::LastComponentNonce>(&()).map_err(Error)?.unwrap_or(0) + 1;
let mut update = self.0.new_update();
update.set::<schema::LastComponentNonce>(&(), &component);
update.set::<schema::ComponentEdges>(&component, &edges);
for peer_id in peers {
update.set::<schema::PeerComponent>(peer_id, &component);
}
update.commit().map_err(Error)
}

/// Reads and deletes from DB the component that <peer_id> is a member of.
/// Returns Ok(vec![]) if peer_id is not a member of any component.
pub fn pop_component(&mut self, peer_id: &PeerId) -> Result<Vec<Edge>, Error> {
// Fetch the component assigned to the peer.
let component = match self.0.get::<schema::PeerComponent>(peer_id).map_err(Error)? {
Some(c) => c,
None => return Ok(vec![]),
};
let edges =
self.0.get::<schema::ComponentEdges>(&component).map_err(Error)?.unwrap_or(vec![]);
let mut update = self.0.new_update();
update.delete::<schema::ComponentEdges>(&component);
let mut peers_checked = HashSet::new();
for edge in &edges {
let key = edge.key();
for peer_id in [&key.0, &key.1] {
if !peers_checked.insert(peer_id.clone()) {
// Store doesn't accept 2 mutations modifying the same row in a single
// transaction, even if they are identical. Therefore tracking peers_checked
// is critical for correctness, rather than just an optimization minimizing
// the number of lookups.
continue;
}
match self.0.get::<schema::PeerComponent>(&peer_id).map_err(Error)? {
Some(c) if c == component => update.delete::<schema::PeerComponent>(&peer_id),
_ => {}
}
}
}
update.commit().map_err(Error)?;
Ok(edges)
}
}

// PeerStore storage.
impl Store {
/// Inserts (peer_id,peer_state) to Peers column.
pub fn set_peer_state(
&mut self,
peer_id: &PeerId,
peer_state: &KnownPeerState,
) -> Result<(), Error> {
let mut update = self.0.new_update();
update.set::<schema::Peers>(peer_id, peer_state);
update.commit().map_err(Error)
}

/// Deletes rows with keys in <peers> from Peers column.
pub fn delete_peer_states(&mut self, peers: &[PeerId]) -> Result<(), Error> {
let mut update = self.0.new_update();
peers.iter().for_each(|p| update.delete::<schema::Peers>(p));
update.commit().map_err(Error)
}

/// Reads the whole Peers column.
pub fn list_peer_states(&self) -> Result<Vec<(PeerId, KnownPeerState)>, Error> {
self.0.iter::<schema::Peers>().collect::<Result<_, _>>().map_err(Error)
}
}
Loading

0 comments on commit e49ae5a

Please sign in to comment.