Skip to content

Commit

Permalink
Split massa db as worker and exports (#4045)
Browse files Browse the repository at this point in the history
* 1st step to have massa_db split between exports and worker

* Create MassaDBController trait

* Implement MassaDBController for MassaDB

* Added read function to MassaDBController

* Started cleaning up build errors - not finished

* Implemented more trait methods, cleaned a bit

Still has lifetime issues and some trait methods missing, e.g. for bootstrap

* Fixed build errors

* Fix after rebase on testnet24

* Fix clippy warnings

* Simplified lifetime definitions

* Added doc comments

* Updated doc comments for get_batch and get_versioning_batch

* In pos-exports, put the massa-db-worker as a dev-dependency instead of optional dependancy
  • Loading branch information
Leo-Besancon authored and AurelienFT committed Jun 15, 2023
1 parent 80a812d commit c1379f2
Show file tree
Hide file tree
Showing 46 changed files with 550 additions and 425 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ members = [
"massa-cipher",
"massa-consensus-exports",
"massa-consensus-worker",
"massa-db",
"massa-db-exports",
"massa-db-worker",
"massa-executed-ops",
"massa-execution-exports",
"massa-execution-worker",
Expand Down
3 changes: 1 addition & 2 deletions massa-async-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ nom = "=7.1"
num = "0.4"
serde = { version = "1.0", features = ["derive"] }
rand = "0.8"
rocksdb = "0.20"
parking_lot = { version = "0.12", features = ["deadlock_detection"] }

# custom modules
Expand All @@ -18,7 +17,7 @@ massa_ledger_exports = { path = "../massa-ledger-exports" }
massa_models = { path = "../massa-models" }
massa_serialization = { path = "../massa-serialization" }
massa_signature = { path = "../massa-signature" }
massa_db = { path = "../massa-db" }
massa_db_exports = { path = "../massa-db-exports" }
massa_time = { path = "../massa-time" }
massa-proto-rs = { git = "https://github.com/massalabs/massa-proto-rs", branch = "main", features = ["tonic"] }

Expand Down
35 changes: 12 additions & 23 deletions massa-async-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use crate::{
AsyncMessageDeserializer, AsyncMessageIdDeserializer, AsyncMessageIdSerializer,
AsyncMessageSerializer,
};
use massa_db::{
DBBatch, MassaDB, ASYNC_POOL_PREFIX, CF_ERROR, MESSAGE_ID_DESER_ERROR, MESSAGE_ID_SER_ERROR,
MESSAGE_SER_ERROR, STATE_CF,
use massa_db_exports::{
DBBatch, MassaDirection, MassaIteratorMode, ShareableMassaDBController, ASYNC_POOL_PREFIX,
MESSAGE_ID_DESER_ERROR, MESSAGE_ID_SER_ERROR, MESSAGE_SER_ERROR, STATE_CF,
};
use massa_ledger_exports::{Applicable, SetOrKeep, SetUpdateOrDelete};
use massa_models::address::Address;
Expand All @@ -25,10 +25,8 @@ use nom::{
sequence::tuple,
IResult, Parser,
};
use parking_lot::RwLock;
use rocksdb::{Direction, IteratorMode};
use std::collections::BTreeMap;
use std::ops::Bound::Included;
use std::{collections::BTreeMap, sync::Arc};

const EMISSION_SLOT_IDENT: u8 = 0u8;
const EMISSION_INDEX_IDENT: u8 = 1u8;
Expand Down Expand Up @@ -193,7 +191,7 @@ macro_rules! message_id_prefix {
pub struct AsyncPool {
/// Asynchronous pool configuration
pub config: AsyncPoolConfig,
pub db: Arc<RwLock<MassaDB>>,
pub db: ShareableMassaDBController,
pub message_info_cache: BTreeMap<AsyncMessageId, AsyncMessageInfo>,
message_id_serializer: AsyncMessageIdSerializer,
message_serializer: AsyncMessageSerializer,
Expand All @@ -203,7 +201,7 @@ pub struct AsyncPool {

impl AsyncPool {
/// Creates an empty `AsyncPool`
pub fn new(config: AsyncPoolConfig, db: Arc<RwLock<MassaDB>>) -> AsyncPool {
pub fn new(config: AsyncPoolConfig, db: ShareableMassaDBController) -> AsyncPool {
AsyncPool {
config: config.clone(),
db,
Expand All @@ -225,25 +223,19 @@ impl AsyncPool {
self.message_info_cache.clear();

let db = self.db.read();
let handle = db.db.cf_handle(STATE_CF).expect(CF_ERROR);

// Iterates over the whole database
let mut last_id: Option<Vec<u8>> = None;

while let Some(Ok((serialized_message_id, _))) = match last_id {
while let Some((serialized_message_id, _)) = match last_id {
Some(id) => db
.db
.iterator_cf(
handle,
IteratorMode::From(&can_be_executed_key!(id), Direction::Forward),
STATE_CF,
MassaIteratorMode::From(&can_be_executed_key!(id), MassaDirection::Forward),
)
.nth(1),
None => db
.db
.iterator_cf(
handle,
IteratorMode::From(ASYNC_POOL_PREFIX.as_bytes(), Direction::Forward),
)
.prefix_iterator_cf(STATE_CF, ASYNC_POOL_PREFIX.as_bytes())
.next(),
} {
if !serialized_message_id.starts_with(ASYNC_POOL_PREFIX.as_bytes()) {
Expand Down Expand Up @@ -314,18 +306,15 @@ impl AsyncPool {
/// Otherwise, we should use the `message_info_cache`.
pub fn fetch_message(&self, message_id: &AsyncMessageId) -> Option<AsyncMessage> {
let db = self.db.read();
let handle = db.db.cf_handle(STATE_CF).expect(CF_ERROR);

let mut serialized_message_id = Vec::new();
self.message_id_serializer
.serialize(message_id, &mut serialized_message_id)
.expect(MESSAGE_ID_SER_ERROR);

let mut serialized_message: Vec<u8> = Vec::new();
for (serialized_key, serialized_value) in db
.db
.prefix_iterator_cf(handle, &message_id_prefix!(serialized_message_id))
.flatten()
for (serialized_key, serialized_value) in
db.prefix_iterator_cf(STATE_CF, &message_id_prefix!(serialized_message_id))
{
if !serialized_key.starts_with(&message_id_prefix!(serialized_message_id)) {
break;
Expand Down
33 changes: 5 additions & 28 deletions massa-async-pool/src/test_exports/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::{
AsyncMessage, AsyncMessageDeserializer, AsyncMessageId, AsyncMessageIdDeserializer, AsyncPool,
};
use massa_db::{ASYNC_POOL_PREFIX, STATE_CF};
use massa_db_exports::{ASYNC_POOL_PREFIX, STATE_CF};
use massa_models::{
address::Address,
amount::Amount,
Expand All @@ -13,7 +13,6 @@ use massa_models::{
use massa_serialization::{DeserializeError, Deserializer};
use massa_signature::KeyPair;
use rand::Rng;
use rocksdb::{Direction, IteratorMode};
use std::str::FromStr;

/// This file defines tools to test the asynchronous pool bootstrap
Expand Down Expand Up @@ -74,24 +73,12 @@ pub fn assert_eq_async_pool_bootstrap_state(v1: &AsyncPool, v2: &AsyncPool) {
);
let db1 = v1.db.read();
let db2 = v2.db.read();
let handle1 = db1.db.cf_handle(STATE_CF).unwrap();
let handle2 = db2.db.cf_handle(STATE_CF).unwrap();

let iter_1 = db1
.db
.iterator_cf(
handle1,
IteratorMode::From(ASYNC_POOL_PREFIX.as_bytes(), Direction::Forward),
)
.flatten()
.prefix_iterator_cf(STATE_CF, ASYNC_POOL_PREFIX.as_bytes())
.take_while(|(k, _v)| k.starts_with(ASYNC_POOL_PREFIX.as_bytes()));
let iter_2 = db2
.db
.iterator_cf(
handle2,
IteratorMode::From(ASYNC_POOL_PREFIX.as_bytes(), Direction::Forward),
)
.flatten()
.prefix_iterator_cf(STATE_CF, ASYNC_POOL_PREFIX.as_bytes())
.take_while(|(k, _v)| k.starts_with(ASYNC_POOL_PREFIX.as_bytes()));

assert_eq!(
Expand All @@ -108,20 +95,10 @@ pub fn assert_eq_async_pool_bootstrap_state(v1: &AsyncPool, v2: &AsyncPool) {
const TOTAL_FIELDS_COUNT: u8 = 13;

let iter_1 = db1
.db
.iterator_cf(
handle1,
IteratorMode::From(ASYNC_POOL_PREFIX.as_bytes(), Direction::Forward),
)
.flatten()
.prefix_iterator_cf(STATE_CF, ASYNC_POOL_PREFIX.as_bytes())
.take_while(|(k, _v)| k.starts_with(ASYNC_POOL_PREFIX.as_bytes()));
let iter_2 = db2
.db
.iterator_cf(
handle2,
IteratorMode::From(ASYNC_POOL_PREFIX.as_bytes(), Direction::Forward),
)
.flatten()
.prefix_iterator_cf(STATE_CF, ASYNC_POOL_PREFIX.as_bytes())
.take_while(|(k, _v)| k.starts_with(ASYNC_POOL_PREFIX.as_bytes()));

for (val1, val2) in iter_1.zip(iter_2) {
Expand Down
3 changes: 2 additions & 1 deletion massa-bootstrap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ massa_serialization = { path = "../massa-serialization" }
massa_signature = { path = "../massa-signature" }
massa_pos_exports = { path = "../massa-pos-exports" }
massa_time = { path = "../massa-time" }
massa_db = { path = "../massa-db" }
massa_db_exports = { path = "../massa-db-exports" }
massa_versioning = { path = "../massa-versioning" }

[dev-dependencies]
Expand All @@ -53,6 +53,7 @@ massa_pos_exports = { path = "../massa-pos-exports", features = ["testing"] }
massa_consensus_exports = { path = "../massa-consensus-exports", features = [
"testing",
] }
massa_db_worker = { path = "../massa-db-worker" }

# for more information on what are the following features used for, see the cargo.toml at workspace level
[features]
Expand Down
2 changes: 1 addition & 1 deletion massa-bootstrap/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use humantime::format_duration;
use massa_db::DBBatch;
use massa_db_exports::DBBatch;
use massa_final_state::{FinalState, FinalStateError};
use massa_logging::massa_trace;
use massa_models::{node::NodeId, slot::Slot, streaming_step::StreamingStep, version::Version};
Expand Down
2 changes: 1 addition & 1 deletion massa-bootstrap/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::settings::BootstrapServerMessageDeserializerArgs;
use massa_consensus_exports::bootstrapable_graph::{
BootstrapableGraph, BootstrapableGraphDeserializer, BootstrapableGraphSerializer,
};
use massa_db::StreamBatch;
use massa_db_exports::StreamBatch;
use massa_models::block_id::{BlockId, BlockIdDeserializer, BlockIdSerializer};
use massa_models::prehash::PreHashSet;
use massa_models::serialization::{
Expand Down
2 changes: 1 addition & 1 deletion massa-bootstrap/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod white_black_list;
use crossbeam::channel::tick;
use humantime::format_duration;
use massa_consensus_exports::{bootstrapable_graph::BootstrapableGraph, ConsensusController};
use massa_db::CHANGE_ID_DESER_ERROR;
use massa_db_exports::CHANGE_ID_DESER_ERROR;
use massa_final_state::FinalState;
use massa_logging::massa_trace;
use massa_models::{
Expand Down
15 changes: 11 additions & 4 deletions massa-bootstrap/src/tests/scenarios.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use massa_async_pool::AsyncPoolConfig;
use massa_consensus_exports::{
bootstrapable_graph::BootstrapableGraph, test_exports::MockConsensusControllerImpl,
};
use massa_db::{DBBatch, MassaDB, MassaDBConfig};
use massa_db_exports::{DBBatch, MassaDBConfig, MassaDBController};
use massa_db_worker::MassaDB;
use massa_executed_ops::{ExecutedDenunciationsConfig, ExecutedOpsConfig};
use massa_final_state::{
test_exports::{assert_eq_final_state, assert_eq_final_state_hash},
Expand Down Expand Up @@ -94,7 +95,9 @@ fn mock_bootstrap_manager(addr: SocketAddr, bootstrap_config: BootstrapConfig) -
max_new_elements: 100,
thread_count,
};
let db = Arc::new(RwLock::new(MassaDB::new(db_config)));
let db = Arc::new(RwLock::new(
Box::new(MassaDB::new(db_config)) as Box<(dyn MassaDBController + 'static)>
));
let final_state_local_config = FinalStateConfig {
ledger_config: LedgerConfig {
thread_count,
Expand Down Expand Up @@ -199,15 +202,19 @@ fn test_bootstrap_server() {
max_new_elements: 100,
thread_count,
};
let db_server = Arc::new(RwLock::new(MassaDB::new(db_server_config)));
let db_server = Arc::new(RwLock::new(
Box::new(MassaDB::new(db_server_config)) as Box<(dyn MassaDBController + 'static)>
));
let temp_dir_client = TempDir::new().unwrap();
let db_client_config = MassaDBConfig {
path: temp_dir_client.path().to_path_buf(),
max_history_length: 10,
max_new_elements: 100,
thread_count,
};
let db_client = Arc::new(RwLock::new(MassaDB::new(db_client_config)));
let db_client = Arc::new(RwLock::new(
Box::new(MassaDB::new(db_client_config)) as Box<(dyn MassaDBController + 'static)>
));
let final_state_local_config = FinalStateConfig {
ledger_config: LedgerConfig {
thread_count,
Expand Down
10 changes: 4 additions & 6 deletions massa-bootstrap/src/tests/tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use massa_consensus_exports::{
},
export_active_block::{ExportActiveBlock, ExportActiveBlockSerializer},
};
use massa_db::{DBBatch, MassaDB};
use massa_db_exports::{DBBatch, ShareableMassaDBController};
use massa_executed_ops::{
ExecutedDenunciations, ExecutedDenunciationsChanges, ExecutedDenunciationsConfig, ExecutedOps,
ExecutedOpsConfig,
Expand Down Expand Up @@ -58,11 +58,9 @@ use massa_serialization::{DeserializeError, Deserializer, Serializer};
use massa_signature::KeyPair;
use massa_time::MassaTime;
use massa_versioning::versioning::{MipStatsConfig, MipStore};
use parking_lot::RwLock;
use rand::Rng;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;
use std::{
collections::BTreeMap,
net::{IpAddr, Ipv4Addr, SocketAddr},
Expand Down Expand Up @@ -225,7 +223,7 @@ pub fn get_random_executed_ops(
_r_limit: u64,
slot: Slot,
config: ExecutedOpsConfig,
db: Arc<RwLock<MassaDB>>,
db: ShareableMassaDBController,
) -> ExecutedOps {
let mut executed_ops = ExecutedOps::new(config.clone(), db.clone());
let mut batch = DBBatch::new();
Expand Down Expand Up @@ -255,7 +253,7 @@ pub fn get_random_executed_de(
_r_limit: u64,
slot: Slot,
config: ExecutedDenunciationsConfig,
db: Arc<RwLock<MassaDB>>,
db: ShareableMassaDBController,
) -> ExecutedDenunciations {
let mut executed_de = ExecutedDenunciations::new(config, db);
let mut batch = DBBatch::new();
Expand Down Expand Up @@ -292,7 +290,7 @@ pub fn get_random_executed_de_changes(r_limit: u64) -> ExecutedDenunciationsChan
pub fn get_random_final_state_bootstrap(
pos: PoSFinalState,
config: FinalStateConfig,
db: Arc<RwLock<MassaDB>>,
db: ShareableMassaDBController,
) -> FinalState {
let r_limit: u64 = 50;

Expand Down
6 changes: 2 additions & 4 deletions massa-db/Cargo.toml → massa-db-exports/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
[package]
name = "massa_db"
name = "massa_db_exports"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
parking_lot = { version = "0.12", features = ["deadlock_detection"] }
rocksdb = "0.20"
displaydoc = "0.2"
thiserror = "1.0"
lsmtree = "=0.1.1"
parking_lot = { version = "0.12", features = ["deadlock_detection"] }

# Custom modules
massa_hash = { path = "../massa-hash" }
Expand Down
File renamed without changes.
Loading

0 comments on commit c1379f2

Please sign in to comment.