Skip to content

Commit

Permalink
Move db_ledger open from bin/replicator to src/replicator
Browse files Browse the repository at this point in the history
  • Loading branch information
carllin committed Nov 25, 2018
1 parent bf35019 commit 0153f12
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 70 deletions.
9 changes: 1 addition & 8 deletions src/bin/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use clap::{App, Arg};
use solana::chacha::{chacha_cbc_encrypt_file, CHACHA_BLOCK_SIZE};
use solana::client::mk_client;
use solana::cluster_info::Node;
use solana::db_ledger::DbLedger;
use solana::fullnode::Config;
use solana::ledger::LEDGER_DATA_FILE;
use solana::logger;
Expand All @@ -24,7 +23,7 @@ use std::net::{Ipv4Addr, SocketAddr};
use std::path::Path;
use std::process::exit;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;

Expand Down Expand Up @@ -95,13 +94,7 @@ fn main() {
// TODO: ask network what slice we should store
let entry_height = 0;

// Create the RocksDb ledger, eventually will simply repurpose the input
// ledger path as the RocksDb ledger path
let db_ledger = Arc::new(RwLock::new(
DbLedger::open(&ledger_path.unwrap()).expect("Expected to be able to open database ledger"),
));
let (replicator, leader_info) = Replicator::new(
db_ledger,
entry_height,
5,
&exit,
Expand Down
130 changes: 68 additions & 62 deletions src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result<Hash> {

impl Replicator {
pub fn new(
db_ledger: Arc<RwLock<DbLedger>>,
entry_height: u64,
max_entry_height: u64,
exit: &Arc<AtomicBool>,
Expand Down Expand Up @@ -106,6 +105,17 @@ impl Replicator {
let (entry_window_sender, entry_window_receiver) = channel();
// todo: pull blobs off the retransmit_receiver and recycle them?
let (retransmit_sender, retransmit_receiver) = channel();

// Create the RocksDb ledger, eventually will simply repurpose the input
// ledger path as the RocksDb ledger path once we replace the ledger with
// RocksDb. Note for now, this ledger will not contain any of the existing entries
// in the ledger located at ledger_path, and will only append on newly received
// entries after being passed to window_service
let db_ledger = Arc::new(RwLock::new(
DbLedger::open(&ledger_path.unwrap())
.expect("Expected to be able to open database ledger"),
));

let t_window = window_service(
db_ledger,
cluster_info.clone(),
Expand Down Expand Up @@ -174,7 +184,6 @@ mod tests {
use logger;
use replicator::sample_file;
use replicator::Replicator;
use rocksdb::{Options, DB};
use signature::{Keypair, KeypairUtil};
use solana_sdk::hash::Hash;
use std::fs::File;
Expand All @@ -183,7 +192,7 @@ mod tests {
use std::mem::size_of;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;

Expand All @@ -208,76 +217,73 @@ mod tests {
let (mint, leader_ledger_path) =
create_tmp_genesis(leader_ledger_path, 100, leader_info.id, 1);

let leader = Fullnode::new(
leader_node,
&leader_ledger_path,
leader_keypair,
vote_account_keypair,
None,
false,
LeaderScheduler::from_bootstrap_leader(leader_info.id),
None,
);

let mut leader_client = mk_client(&leader_info);

let bob = Keypair::new();
{
let leader = Fullnode::new(
leader_node,
&leader_ledger_path,
leader_keypair,
vote_account_keypair,
None,
false,
LeaderScheduler::from_bootstrap_leader(leader_info.id),
None,
);

let last_id = leader_client.get_last_id();
leader_client
.transfer(1, &mint.keypair(), bob.pubkey(), &last_id)
.unwrap();
let mut leader_client = mk_client(&leader_info);

let replicator_keypair = Keypair::new();
let bob = Keypair::new();

info!("starting replicator node");
let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey());
let db_ledger_path = get_tmp_ledger_path("test_replicator_startup");
let db_ledger = Arc::new(RwLock::new(
DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"),
));
let (replicator, _leader_info) = Replicator::new(
db_ledger,
entry_height,
1,
&exit,
Some(replicator_ledger_path),
replicator_node,
Some(network_addr),
done.clone(),
);
let last_id = leader_client.get_last_id();
leader_client
.transfer(1, &mint.keypair(), bob.pubkey(), &last_id)
.unwrap();

let mut num_entries = 0;
for _ in 0..60 {
match read_ledger(replicator_ledger_path, true) {
Ok(entries) => {
for _ in entries {
num_entries += 1;
let replicator_keypair = Keypair::new();

info!("starting replicator node");
let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey());
let (replicator, _leader_info) = Replicator::new(
entry_height,
1,
&exit,
Some(replicator_ledger_path),
replicator_node,
Some(network_addr),
done.clone(),
);

let mut num_entries = 0;
for _ in 0..60 {
match read_ledger(replicator_ledger_path, true) {
Ok(entries) => {
for _ in entries {
num_entries += 1;
}
info!("{} entries", num_entries);
if num_entries > 0 {
break;
}
}
info!("{} entries", num_entries);
if num_entries > 0 {
break;
Err(e) => {
info!("error reading ledger: {:?}", e);
}
}
Err(e) => {
info!("error reading ledger: {:?}", e);
}
sleep(Duration::from_millis(300));
let last_id = leader_client.get_last_id();
leader_client
.transfer(1, &mint.keypair(), bob.pubkey(), &last_id)
.unwrap();
}
sleep(Duration::from_millis(300));
let last_id = leader_client.get_last_id();
leader_client
.transfer(1, &mint.keypair(), bob.pubkey(), &last_id)
.unwrap();
assert_eq!(done.load(Ordering::Relaxed), true);
assert!(num_entries > 0);
exit.store(true, Ordering::Relaxed);
replicator.join();
leader.exit();
}
assert_eq!(done.load(Ordering::Relaxed), true);
assert!(num_entries > 0);
exit.store(true, Ordering::Relaxed);
replicator.join();
leader.exit();

DB::destroy(&Options::default(), &db_ledger_path)
DbLedger::destroy(&leader_ledger_path).expect("Expected successful database destuction");
DbLedger::destroy(&replicator_ledger_path)
.expect("Expected successful database destuction");
let _ignored = remove_dir_all(&db_ledger_path);
let _ignored = remove_dir_all(&leader_ledger_path);
let _ignored = remove_dir_all(&replicator_ledger_path);
}
Expand Down

0 comments on commit 0153f12

Please sign in to comment.