From 0153f128d832797cbf3b6fdbeeb86504564dc674 Mon Sep 17 00:00:00 2001 From: Carl Date: Sat, 24 Nov 2018 16:37:20 -0800 Subject: [PATCH] Move db_ledger open from bin/replicator to src/replicator --- src/bin/replicator.rs | 9 +-- src/replicator.rs | 130 ++++++++++++++++++++++-------------------- 2 files changed, 69 insertions(+), 70 deletions(-) diff --git a/src/bin/replicator.rs b/src/bin/replicator.rs index 6a8ba9450106c1..165fa1c89b8227 100644 --- a/src/bin/replicator.rs +++ b/src/bin/replicator.rs @@ -10,7 +10,6 @@ use clap::{App, Arg}; use solana::chacha::{chacha_cbc_encrypt_file, CHACHA_BLOCK_SIZE}; use solana::client::mk_client; use solana::cluster_info::Node; -use solana::db_ledger::DbLedger; use solana::fullnode::Config; use solana::ledger::LEDGER_DATA_FILE; use solana::logger; @@ -24,7 +23,7 @@ use std::net::{Ipv4Addr, SocketAddr}; use std::path::Path; use std::process::exit; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::thread::sleep; use std::time::Duration; @@ -95,13 +94,7 @@ fn main() { // TODO: ask network what slice we should store let entry_height = 0; - // Create the RocksDb ledger, eventually will simply repurpose the input - // ledger path as the RocksDb ledger path - let db_ledger = Arc::new(RwLock::new( - DbLedger::open(&ledger_path.unwrap()).expect("Expected to be able to open database ledger"), - )); let (replicator, leader_info) = Replicator::new( - db_ledger, entry_height, 5, &exit, diff --git a/src/replicator.rs b/src/replicator.rs index ad0338fb4c6489..e8c525532cf815 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -72,7 +72,6 @@ pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result { impl Replicator { pub fn new( - db_ledger: Arc>, entry_height: u64, max_entry_height: u64, exit: &Arc, @@ -106,6 +105,17 @@ impl Replicator { let (entry_window_sender, entry_window_receiver) = channel(); // todo: pull blobs off the retransmit_receiver and recycle them? let (retransmit_sender, retransmit_receiver) = channel(); + + // Create the RocksDb ledger, eventually will simply repurpose the input + // ledger path as the RocksDb ledger path once we replace the ledger with + // RocksDb. Note for now, this ledger will not contain any of the existing entries + // in the ledger located at ledger_path, and will only append on newly received + // entries after being passed to window_service + let db_ledger = Arc::new(RwLock::new( + DbLedger::open(&ledger_path.unwrap()) + .expect("Expected to be able to open database ledger"), + )); + let t_window = window_service( db_ledger, cluster_info.clone(), @@ -174,7 +184,6 @@ mod tests { use logger; use replicator::sample_file; use replicator::Replicator; - use rocksdb::{Options, DB}; use signature::{Keypair, KeypairUtil}; use solana_sdk::hash::Hash; use std::fs::File; @@ -183,7 +192,7 @@ mod tests { use std::mem::size_of; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::{Arc, RwLock}; + use std::sync::Arc; use std::thread::sleep; use std::time::Duration; @@ -208,76 +217,73 @@ mod tests { let (mint, leader_ledger_path) = create_tmp_genesis(leader_ledger_path, 100, leader_info.id, 1); - let leader = Fullnode::new( - leader_node, - &leader_ledger_path, - leader_keypair, - vote_account_keypair, - None, - false, - LeaderScheduler::from_bootstrap_leader(leader_info.id), - None, - ); - - let mut leader_client = mk_client(&leader_info); - - let bob = Keypair::new(); + { + let leader = Fullnode::new( + leader_node, + &leader_ledger_path, + leader_keypair, + vote_account_keypair, + None, + false, + LeaderScheduler::from_bootstrap_leader(leader_info.id), + None, + ); - let last_id = leader_client.get_last_id(); - leader_client - .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) - .unwrap(); + let mut leader_client = mk_client(&leader_info); - let replicator_keypair = Keypair::new(); + let bob = Keypair::new(); - info!("starting replicator node"); - let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey()); - let db_ledger_path = get_tmp_ledger_path("test_replicator_startup"); - let db_ledger = Arc::new(RwLock::new( - DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"), - )); - let (replicator, _leader_info) = Replicator::new( - db_ledger, - entry_height, - 1, - &exit, - Some(replicator_ledger_path), - replicator_node, - Some(network_addr), - done.clone(), - ); + let last_id = leader_client.get_last_id(); + leader_client + .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) + .unwrap(); - let mut num_entries = 0; - for _ in 0..60 { - match read_ledger(replicator_ledger_path, true) { - Ok(entries) => { - for _ in entries { - num_entries += 1; + let replicator_keypair = Keypair::new(); + + info!("starting replicator node"); + let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey()); + let (replicator, _leader_info) = Replicator::new( + entry_height, + 1, + &exit, + Some(replicator_ledger_path), + replicator_node, + Some(network_addr), + done.clone(), + ); + + let mut num_entries = 0; + for _ in 0..60 { + match read_ledger(replicator_ledger_path, true) { + Ok(entries) => { + for _ in entries { + num_entries += 1; + } + info!("{} entries", num_entries); + if num_entries > 0 { + break; + } } - info!("{} entries", num_entries); - if num_entries > 0 { - break; + Err(e) => { + info!("error reading ledger: {:?}", e); } } - Err(e) => { - info!("error reading ledger: {:?}", e); - } + sleep(Duration::from_millis(300)); + let last_id = leader_client.get_last_id(); + leader_client + .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) + .unwrap(); } - sleep(Duration::from_millis(300)); - let last_id = leader_client.get_last_id(); - leader_client - .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) - .unwrap(); + assert_eq!(done.load(Ordering::Relaxed), true); + assert!(num_entries > 0); + exit.store(true, Ordering::Relaxed); + replicator.join(); + leader.exit(); } - assert_eq!(done.load(Ordering::Relaxed), true); - assert!(num_entries > 0); - exit.store(true, Ordering::Relaxed); - replicator.join(); - leader.exit(); - DB::destroy(&Options::default(), &db_ledger_path) + DbLedger::destroy(&leader_ledger_path).expect("Expected successful database destuction"); + DbLedger::destroy(&replicator_ledger_path) .expect("Expected successful database destuction"); - let _ignored = remove_dir_all(&db_ledger_path); let _ignored = remove_dir_all(&leader_ledger_path); let _ignored = remove_dir_all(&replicator_ledger_path); }