Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rocks db window service #1888

Merged
merged 11 commits into from
Nov 25, 2018
76 changes: 51 additions & 25 deletions src/db_ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
use bincode::{deserialize, serialize};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use entry::Entry;
use ledger::Block;
use packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
use result::{Error, Result};
use rocksdb::{ColumnFamily, Options, WriteBatch, DB};
use serde::de::DeserializeOwned;
use serde::Serialize;
use solana_sdk::pubkey::Pubkey;
use std::borrow::Borrow;
use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

pub const DB_LEDGER_DIRECTORY: &str = "db_ledger";

Expand Down Expand Up @@ -232,6 +234,8 @@ pub const ERASURE_CF: &str = "erasure";
impl DbLedger {
// Opens a Ledger in directory, provides "infinite" window of blobs
pub fn open(ledger_path: &str) -> Result<Self> {
let ledger_path = format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY);

// Use default database options
let mut options = Options::default();
options.create_if_missing(true);
Expand Down Expand Up @@ -260,10 +264,25 @@ impl DbLedger {
})
}

pub fn write_shared_blobs(&mut self, slot: u64, shared_blobs: &[SharedBlob]) -> Result<()> {
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
self.write_blobs(slot, &blobs)
pub fn destroy(ledger_path: &str) -> Result<()> {
let ledger_path = format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY);
DB::destroy(&Options::default(), &ledger_path)?;
Ok(())
}

pub fn write_shared_blobs<I>(&mut self, slot: u64, shared_blobs: I) -> Result<()>
where
I: IntoIterator,
I::Item: Borrow<SharedBlob>,
{
for b in shared_blobs {
let bl = b.borrow().read().unwrap();
let index = bl.index()?;
let key = DataCf::key(slot, index);
self.insert_data_blob(&key, &*bl)?;
}

Ok(())
}

pub fn write_blobs<'a, I>(&mut self, slot: u64, blobs: I) -> Result<()>
Expand All @@ -278,12 +297,20 @@ impl DbLedger {
Ok(())
}

pub fn write_entries(&mut self, slot: u64, entries: &[Entry]) -> Result<()> {
let shared_blobs = entries.to_blobs();
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
self.write_blobs(slot, &blobs)?;
Ok(())
pub fn write_entries<I>(&mut self, slot: u64, entries: I) -> Result<()>
where
I: IntoIterator,
I::Item: Borrow<Entry>,
{
let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let shared_blobs = entries.into_iter().enumerate().map(|(idx, entry)| {
entry.borrow().to_blob(
Some(idx as u64),
Some(Pubkey::default()),
Some(&default_addr),
)
});
self.write_shared_blobs(slot, shared_blobs)
}

pub fn insert_data_blob(&self, key: &[u8], new_blob: &Blob) -> Result<Vec<Entry>> {
Expand Down Expand Up @@ -421,12 +448,17 @@ impl DbLedger {
}
}

pub fn write_entries_to_ledger(ledger_paths: &[String], entries: &[Entry]) {
pub fn write_entries_to_ledger<I>(ledger_paths: &[&str], entries: I)
where
I: IntoIterator,
I::Item: Borrow<Entry>,
{
let mut entries = entries.into_iter();
for ledger_path in ledger_paths {
let mut db_ledger =
DbLedger::open(ledger_path).expect("Expected to be able to open database ledger");
db_ledger
.write_entries(DEFAULT_SLOT_HEIGHT, &entries)
.write_entries(DEFAULT_SLOT_HEIGHT, entries.by_ref())
.expect("Expected successful write of genesis entries");
}
}
Expand All @@ -435,7 +467,6 @@ pub fn write_entries_to_ledger(ledger_paths: &[String], entries: &[Entry]) {
mod tests {
use super::*;
use ledger::{get_tmp_ledger_path, make_tiny_test_entries, Block};
use rocksdb::{Options, DB};

#[test]
fn test_put_get_simple() {
Expand Down Expand Up @@ -485,8 +516,7 @@ mod tests {

// Destroying database without closing it first is undefined behavior
drop(ledger);
DB::destroy(&Options::default(), &ledger_path)
.expect("Expected successful database destruction");
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
}

#[test]
Expand Down Expand Up @@ -548,8 +578,7 @@ mod tests {

// Destroying database without closing it first is undefined behavior
drop(ledger);
DB::destroy(&Options::default(), &ledger_path)
.expect("Expected successful database destruction");
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
}

#[test]
Expand Down Expand Up @@ -591,8 +620,7 @@ mod tests {

// Destroying database without closing it first is undefined behavior
drop(ledger);
DB::destroy(&Options::default(), &ledger_path)
.expect("Expected successful database destruction");
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
}

#[test]
Expand Down Expand Up @@ -628,8 +656,7 @@ mod tests {

// Destroying database without closing it first is undefined behavior
drop(ledger);
DB::destroy(&Options::default(), &ledger_path)
.expect("Expected successful database destruction");
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
}

#[test]
Expand All @@ -644,7 +671,7 @@ mod tests {
let num_entries = 8;
let shared_blobs = make_tiny_test_entries(num_entries).to_blobs();

for (b, i) in shared_blobs.iter().zip(0..num_entries) {
for (i, b) in shared_blobs.iter().enumerate() {
b.write().unwrap().set_index(1 << (i * 8)).unwrap();
}

Expand All @@ -668,7 +695,6 @@ mod tests {
db_iterator.next();
}
}
DB::destroy(&Options::default(), &db_ledger_path)
.expect("Expected successful database destruction");
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
}
}
17 changes: 10 additions & 7 deletions src/db_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,10 @@ pub fn retransmit_all_leader_blocks(
for b in dq {
// Check if the blob is from the scheduled leader for its slot. If so,
// add to the retransmit_queue
let slot = b.read().unwrap().slot()?;
if let Some(leader_id) = leader_scheduler.get_leader_for_slot(slot) {
add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue);
if let Ok(slot) = b.read().unwrap().slot() {
if let Some(leader_id) = leader_scheduler.get_leader_for_slot(slot) {
add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue);
}
}
}

Expand Down Expand Up @@ -273,6 +274,9 @@ pub fn process_blob(
let is_coding = blob.read().unwrap().is_coding();

// Check if the blob is in the range of our known leaders. If not, we return.
// TODO: Need to update slot in broadcast, otherwise this check will fail with
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about a github issue instead?

// leader rotation enabled
// Github issue: https://github.com/solana-labs/solana/issues/1899.
let slot = blob.read().unwrap().slot()?;
let leader = leader_scheduler.get_leader_for_slot(slot);

Expand All @@ -292,12 +296,11 @@ pub fn process_blob(
)?;
vec![]
} else {
let data_key = ErasureCf::key(slot, pix);
let data_key = DataCf::key(slot, pix);
db_ledger.insert_data_blob(&data_key, &blob.read().unwrap())?
};

// TODO: Once erasure is fixed, readd that logic here

for entry in &consumed_entries {
*tick_height += entry.is_tick() as u64;
}
Expand Down Expand Up @@ -529,8 +532,8 @@ mod test {
assert!(gap > 3);
let num_entries = 10;
let shared_blobs = make_tiny_test_entries(num_entries).to_blobs();
for (b, i) in shared_blobs.iter().zip(0..shared_blobs.len() as u64) {
b.write().unwrap().set_index(i * gap).unwrap();
for (i, b) in shared_blobs.iter().enumerate() {
b.write().unwrap().set_index(i as u64 * gap).unwrap();
}
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
Expand Down
Loading