Skip to content

Commit

Permalink
Encapsulate RocksDb destroy, move opening of RocksDb ledger into full…
Browse files Browse the repository at this point in the history
…node, clean up tests
  • Loading branch information
carllin committed Nov 24, 2018
1 parent 29a9ef4 commit 6116063
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 157 deletions.
16 changes: 0 additions & 16 deletions src/bin/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,13 @@ extern crate log;
extern crate serde_json;
#[macro_use]
extern crate solana;
extern crate rocksdb;
extern crate solana_metrics;

use clap::{App, Arg};
use rocksdb::{Options, DB};
use solana::client::mk_client;
use solana::cluster_info::{Node, FULLNODE_PORT_RANGE};
use solana::db_ledger::{write_entries_to_ledger, DB_LEDGER_DIRECTORY};
use solana::fullnode::{Config, Fullnode, FullnodeReturnType};
use solana::leader_scheduler::LeaderScheduler;
use solana::ledger::read_ledger;
use solana::logger;
use solana::netutil::find_available_port_in_range;
use solana::signature::{Keypair, KeypairUtil};
Expand Down Expand Up @@ -96,18 +92,6 @@ fn main() {

let ledger_path = matches.value_of("ledger").unwrap();

// Create the RocksDb ledger, eventually will simply repurpose the input
// ledger path as the RocksDb ledger path
let db_ledger_path = format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY);
// Destroy old ledger
DB::destroy(&Options::default(), &db_ledger_path)
.expect("Expected successful database destruction");
let ledger_entries: Vec<_> = read_ledger(ledger_path, true)
.expect("opening ledger")
.map(|entry| entry.unwrap())
.collect();
write_entries_to_ledger(&[db_ledger_path], &ledger_entries[..]);

// socketaddr that is initial pointer into the network's gossip (ncp)
let network = matches
.value_of("network")
Expand Down
5 changes: 2 additions & 3 deletions src/bin/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use clap::{App, Arg};
use solana::chacha::{chacha_cbc_encrypt_file, CHACHA_BLOCK_SIZE};
use solana::client::mk_client;
use solana::cluster_info::Node;
use solana::db_ledger::{DbLedger, DB_LEDGER_DIRECTORY};
use solana::db_ledger::DbLedger;
use solana::fullnode::Config;
use solana::ledger::LEDGER_DATA_FILE;
use solana::logger;
Expand Down Expand Up @@ -97,9 +97,8 @@ fn main() {

// Create the RocksDb ledger, eventually will simply repurpose the input
// ledger path as the RocksDb ledger path
let db_ledger_path = format!("{}/{}", ledger_path.unwrap(), DB_LEDGER_DIRECTORY);
let db_ledger = Arc::new(RwLock::new(
DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"),
DbLedger::open(&ledger_path.unwrap()).expect("Expected to be able to open database ledger"),
));
let (replicator, leader_info) = Replicator::new(
db_ledger,
Expand Down
39 changes: 22 additions & 17 deletions src/db_ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ pub const ERASURE_CF: &str = "erasure";
impl DbLedger {
// Opens a Ledger in directory, provides "infinite" window of blobs
pub fn open(ledger_path: &str) -> Result<Self> {
let ledger_path = format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY);

// Use default database options
let mut options = Options::default();
options.create_if_missing(true);
Expand Down Expand Up @@ -262,6 +264,12 @@ impl DbLedger {
})
}

pub fn destroy(ledger_path: &str) -> Result<()> {
let ledger_path = format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY);
DB::destroy(&Options::default(), &ledger_path)?;
Ok(())
}

pub fn write_shared_blobs<I>(&mut self, slot: u64, shared_blobs: I) -> Result<()>
where
I: IntoIterator,
Expand Down Expand Up @@ -289,13 +297,14 @@ impl DbLedger {
Ok(())
}

pub fn write_entries<'a, I>(&mut self, slot: u64, entries: I) -> Result<()>
pub fn write_entries<I>(&mut self, slot: u64, entries: I) -> Result<()>
where
I: IntoIterator<Item = &'a Entry>,
I: IntoIterator,
I::Item: Borrow<Entry>,
{
let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let shared_blobs = entries.into_iter().enumerate().map(|(idx, entry)| {
entry.to_blob(
entry.borrow().to_blob(
Some(idx as u64),
Some(Pubkey::default()),
Some(&default_addr),
Expand Down Expand Up @@ -439,15 +448,17 @@ impl DbLedger {
}
}

pub fn write_entries_to_ledger<'a, I>(ledger_paths: &[String], entries: I)
pub fn write_entries_to_ledger<I>(ledger_paths: &[&str], entries: I)
where
I: IntoIterator<Item = &'a Entry> + Copy,
I: IntoIterator,
I::Item: Borrow<Entry>,
{
let mut entries = entries.into_iter();
for ledger_path in ledger_paths {
let mut db_ledger =
DbLedger::open(ledger_path).expect("Expected to be able to open database ledger");
db_ledger
.write_entries(DEFAULT_SLOT_HEIGHT, entries)
.write_entries(DEFAULT_SLOT_HEIGHT, entries.by_ref())
.expect("Expected successful write of genesis entries");
}
}
Expand All @@ -456,7 +467,6 @@ where
mod tests {
use super::*;
use ledger::{get_tmp_ledger_path, make_tiny_test_entries, Block};
use rocksdb::{Options, DB};

#[test]
fn test_put_get_simple() {
Expand Down Expand Up @@ -506,8 +516,7 @@ mod tests {

// Destroying database without closing it first is undefined behavior
drop(ledger);
DB::destroy(&Options::default(), &ledger_path)
.expect("Expected successful database destruction");
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
}

#[test]
Expand Down Expand Up @@ -569,8 +578,7 @@ mod tests {

// Destroying database without closing it first is undefined behavior
drop(ledger);
DB::destroy(&Options::default(), &ledger_path)
.expect("Expected successful database destruction");
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
}

#[test]
Expand Down Expand Up @@ -612,8 +620,7 @@ mod tests {

// Destroying database without closing it first is undefined behavior
drop(ledger);
DB::destroy(&Options::default(), &ledger_path)
.expect("Expected successful database destruction");
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
}

#[test]
Expand Down Expand Up @@ -649,8 +656,7 @@ mod tests {

// Destroying database without closing it first is undefined behavior
drop(ledger);
DB::destroy(&Options::default(), &ledger_path)
.expect("Expected successful database destruction");
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
}

#[test]
Expand Down Expand Up @@ -689,7 +695,6 @@ mod tests {
db_iterator.next();
}
}
DB::destroy(&Options::default(), &db_ledger_path)
.expect("Expected successful database destruction");
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
}
}
1 change: 1 addition & 0 deletions src/db_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ pub fn process_blob(
// Check if the blob is in the range of our known leaders. If not, we return.
// TODO: Need to update slot in broadcast, otherwise this check will fail with
// leader rotation enabled
// Github issue: https://github.com/solana-labs/solana/issues/1899.
let slot = blob.read().unwrap().slot()?;
let leader = leader_scheduler.get_leader_for_slot(slot);

Expand Down
118 changes: 73 additions & 45 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use bank::Bank;
use broadcast_stage::BroadcastStage;
use cluster_info::{ClusterInfo, Node, NodeInfo};
use db_ledger::DB_LEDGER_DIRECTORY;
use db_ledger::{write_entries_to_ledger, DbLedger};
use leader_scheduler::LeaderScheduler;
use ledger::read_ledger;
use ncp::Ncp;
Expand Down Expand Up @@ -107,6 +107,7 @@ pub struct Fullnode {
broadcast_socket: UdpSocket,
rpc_addr: SocketAddr,
rpc_pubsub_addr: SocketAddr,
db_ledger: Arc<RwLock<DbLedger>>,
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -259,6 +260,10 @@ impl Fullnode {
.expect("Leader not known after processing bank");

cluster_info.write().unwrap().set_leader(scheduled_leader);

// Create the RocksDb ledger
let db_ledger = Self::make_db_ledger(ledger_path);

let node_role = if scheduled_leader != keypair.pubkey() {
// Start in validator mode.
let tvu = Tvu::new(
Expand All @@ -282,7 +287,7 @@ impl Fullnode {
.try_clone()
.expect("Failed to clone retransmit socket"),
Some(ledger_path),
&format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY),
db_ledger.clone(),
);
let tpu_forwarder = TpuForwarder::new(
node.sockets
Expand Down Expand Up @@ -353,6 +358,7 @@ impl Fullnode {
broadcast_socket: node.sockets.broadcast,
rpc_addr,
rpc_pubsub_addr,
db_ledger,
}
}

Expand Down Expand Up @@ -435,7 +441,7 @@ impl Fullnode {
.try_clone()
.expect("Failed to clone retransmit socket"),
Some(&self.ledger_path),
&format!("{}/{}", self.ledger_path, DB_LEDGER_DIRECTORY),
self.db_ledger.clone(),
);
let tpu_forwarder = TpuForwarder::new(
self.transaction_sockets
Expand Down Expand Up @@ -590,6 +596,19 @@ impl Fullnode {
),
)
}

fn make_db_ledger(ledger_path: &str) -> Arc<RwLock<DbLedger>> {
// Destroy any existing instances of the RocksDb ledger
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
let ledger_entries = read_ledger(ledger_path, true)
.expect("opening ledger")
.map(|entry| entry.unwrap());

write_entries_to_ledger(&[ledger_path], ledger_entries);
let db =
DbLedger::open(ledger_path).expect("Expected to successfully open database ledger");
Arc::new(RwLock::new(db))
}
}

impl Service for Fullnode {
Expand Down Expand Up @@ -630,9 +649,8 @@ mod tests {
use db_ledger::*;
use fullnode::{Fullnode, FullnodeReturnType, NodeRole, TvuReturnType};
use leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig};
use ledger::{create_tmp_genesis, create_tmp_sample_ledger, LedgerWriter};
use ledger::{create_tmp_genesis, create_tmp_sample_ledger, tmp_copy_ledger, LedgerWriter};
use packet::make_consecutive_blobs;
use rocksdb::{Options, DB};
use service::Service;
use signature::{Keypair, KeypairUtil};
use std::cmp;
Expand Down Expand Up @@ -842,6 +860,13 @@ mod tests {
+ num_ending_ticks as u64;
ledger_writer.write_entries(&active_set_entries).unwrap();

let validator_ledger_path =
tmp_copy_ledger(&bootstrap_leader_ledger_path, "test_wrong_role_transition");
let ledger_paths = vec![
bootstrap_leader_ledger_path.clone(),
validator_ledger_path.clone(),
];

// Create the common leader scheduling configuration
let num_slots_per_epoch = 3;
let leader_rotation_interval = 5;
Expand All @@ -858,45 +883,53 @@ mod tests {
Some(genesis_tick_height),
);

// Test that a node knows to transition to a validator based on parsing the ledger
let leader_vote_account_keypair = Arc::new(Keypair::new());
let bootstrap_leader = Fullnode::new(
bootstrap_leader_node,
&bootstrap_leader_ledger_path,
bootstrap_leader_keypair,
leader_vote_account_keypair,
Some(bootstrap_leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
);
{
// Test that a node knows to transition to a validator based on parsing the ledger
let leader_vote_account_keypair = Arc::new(Keypair::new());
let bootstrap_leader = Fullnode::new(
bootstrap_leader_node,
&bootstrap_leader_ledger_path,
bootstrap_leader_keypair,
leader_vote_account_keypair,
Some(bootstrap_leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
);

match bootstrap_leader.node_role {
Some(NodeRole::Validator(_)) => (),
_ => {
panic!("Expected bootstrap leader to be a validator");
match bootstrap_leader.node_role {
Some(NodeRole::Validator(_)) => (),
_ => {
panic!("Expected bootstrap leader to be a validator");
}
}
}

// Test that a node knows to transition to a leader based on parsing the ledger
let validator = Fullnode::new(
validator_node,
&bootstrap_leader_ledger_path,
Arc::new(validator_keypair),
Arc::new(validator_vote_account_keypair),
Some(bootstrap_leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
);
// Test that a node knows to transition to a leader based on parsing the ledger
let validator = Fullnode::new(
validator_node,
&validator_ledger_path,
Arc::new(validator_keypair),
Arc::new(validator_vote_account_keypair),
Some(bootstrap_leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
);

match validator.node_role {
Some(NodeRole::Leader(_)) => (),
_ => {
panic!("Expected node to be the leader");
match validator.node_role {
Some(NodeRole::Leader(_)) => (),
_ => {
panic!("Expected node to be the leader");
}
}

validator.close().expect("Expected node to close");
bootstrap_leader.close().expect("Expected node to close");
}
for path in ledger_paths {
DbLedger::destroy(&path).expect("Expected successful database destruction");
let _ignored = remove_dir_all(&path);
}
let _ignored = remove_dir_all(&bootstrap_leader_ledger_path);
}

#[test]
Expand All @@ -909,7 +942,7 @@ mod tests {

// Create validator identity
let num_ending_ticks = 1;
let (mint, validator_ledger_path, mut genesis_entries) = create_tmp_sample_ledger(
let (mint, validator_ledger_path, genesis_entries) = create_tmp_sample_ledger(
"test_validator_to_leader_transition",
10_000,
num_ending_ticks,
Expand Down Expand Up @@ -946,11 +979,6 @@ mod tests {
ledger_writer.write_entries(&active_set_entries).unwrap();
let ledger_initial_len = genesis_entries.len() as u64 + active_set_entries_len;

// Create RocksDb ledger, write genesis entries to it
let db_ledger_path = format!("{}/{}", validator_ledger_path, DB_LEDGER_DIRECTORY);
genesis_entries.extend(active_set_entries);
write_entries_to_ledger(&vec![db_ledger_path.clone()], &genesis_entries);

// Set the leader scheduler for the validator
let leader_rotation_interval = 10;
let num_bootstrap_slots = 2;
Expand Down Expand Up @@ -1043,7 +1071,7 @@ mod tests {
// Shut down
t_responder.join().expect("responder thread join");
validator.close().unwrap();
DB::destroy(&Options::default(), &db_ledger_path)
DbLedger::destroy(&validator_ledger_path)
.expect("Expected successful database destruction");
let _ignored = remove_dir_all(&validator_ledger_path).unwrap();
}
Expand Down
Loading

0 comments on commit 6116063

Please sign in to comment.