Skip to content

Commit

Permalink
Fixes before 0.6.4 tag (#64)
Browse files Browse the repository at this point in the history
* A jumble of performance improving fixes

* Revising test plan a bit

* clippy+cleanup

* Bump changelog
  • Loading branch information
plauche authored May 15, 2023
1 parent 724e999 commit 67011ec
Show file tree
Hide file tree
Showing 12 changed files with 367 additions and 162 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
- Updated default block size to 256 bytes and generally improved handling of larger files in storage layer.
- Archived unused `car-utility` and `cpp-transmit-example` projects.
- Converted all crate dependencies to workspace dependencies, tightened version specificity, narrowed features when possible.
- Increase default MTU to 512 bytes to accommodate more realistic systems
- Move functions for fetching all DAG cids & blocks into storage layer
- Increase default file block size to 100kb for better performance when importing larger files
- Small revision to testing plan

## [0.6.3] - 2023-05-04

Expand Down
2 changes: 1 addition & 1 deletion controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct Cli {

impl Cli {
pub async fn run(&self) -> Result<()> {
let transport = UdpTransport::new(&self.bind_address, 60)?;
let transport = UdpTransport::new(&self.bind_address, 512)?;

let command = Message::ApplicationAPI(self.command.clone());
let cmd_str = serde_json::to_string(&command)?;
Expand Down
143 changes: 143 additions & 0 deletions local-storage/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ pub trait StorageProvider {
fn get_links_by_cid(&self, cid: &str) -> Result<Vec<String>>;
fn list_available_dags(&self) -> Result<Vec<String>>;
fn get_missing_cid_blocks(&self, cid: &str) -> Result<Vec<String>>;
fn get_dag_blocks_by_window(
&self,
cid: &str,
offset: u32,
window_size: u32,
) -> Result<Vec<StoredBlock>>;
fn get_all_dag_cids(&self, cid: &str) -> Result<Vec<String>>;
fn get_all_dag_blocks(&self, cid: &str) -> Result<Vec<StoredBlock>>;
fn get_all_blocks_under_cid(&self, cid: &str) -> Result<Vec<StoredBlock>>;
}

pub struct SqliteStorageProvider {
Expand Down Expand Up @@ -201,6 +210,140 @@ impl StorageProvider for SqliteStorageProvider {
.collect();
Ok(cids)
}

fn get_dag_blocks_by_window(
&self,
cid: &str,
offset: u32,
window_size: u32,
) -> Result<Vec<StoredBlock>> {
let blocks: Vec<StoredBlock> = self
.conn
.prepare(
"
WITH RECURSIVE cids(x,y) AS (
SELECT cid,data FROM blocks WHERE cid = (?1)
UNION
SELECT cid,data FROM blocks b
INNER JOIN links l ON b.cid==l.block_cid
INNER JOIN cids ON (root_cid=x)
)
SELECT x,y FROM cids
LIMIT (?2) OFFSET (?3);
",
)?
.query_map(
[cid, &format!("{window_size}"), &format!("{offset}")],
|row| {
let cid_str: String = row.get(0)?;
let data: Vec<u8> = row.get(1)?;
let links = match self.get_links_by_cid(&cid_str) {
Ok(links) => links,
Err(_) => vec![],
};
Ok(StoredBlock {
cid: cid_str,
data,
links,
})
},
)?
.filter_map(|b| b.ok())
.collect();

Ok(blocks)
}

fn get_all_dag_cids(&self, cid: &str) -> Result<Vec<String>> {
let cids: Vec<String> = self
.conn
.prepare(
"
WITH RECURSIVE cids(x) AS (
VALUES(?1)
UNION
SELECT block_cid FROM links JOIN cids ON root_cid=x
)
SELECT x FROM cids;
",
)?
.query_map([cid], |row| {
let cid_str: String = row.get(0)?;
Ok(cid_str)
})?
.filter_map(|b| b.ok())
.collect();

Ok(cids)
}

fn get_all_dag_blocks(&self, cid: &str) -> Result<Vec<StoredBlock>> {
let blocks: Vec<StoredBlock> = self
.conn
.prepare(
"
WITH RECURSIVE cids(x,y) AS (
SELECT cid,data FROM blocks WHERE cid = (?1)
UNION
SELECT cid,data FROM blocks b
INNER JOIN links l ON b.cid==l.block_cid
INNER JOIN cids ON (root_cid=x)
)
SELECT x,y FROM cids
",
)?
.query_map([cid], |row| {
let cid_str: String = row.get(0)?;
let data: Vec<u8> = row.get(1)?;
let links = match self.get_links_by_cid(&cid_str) {
Ok(links) => links,
Err(_) => vec![],
};
Ok(StoredBlock {
cid: cid_str,
data,
links,
})
})?
.filter_map(|b| b.ok())
.collect();

Ok(blocks)
}

fn get_all_blocks_under_cid(&self, cid: &str) -> Result<Vec<StoredBlock>> {
let blocks: Vec<StoredBlock> = self
.conn
.prepare(
"
WITH RECURSIVE cids(x,y) AS (
SELECT cid,data FROM blocks WHERE cid = (?1)
UNION
SELECT cid,data FROM blocks b
INNER JOIN links l ON b.cid==l.block_cid
INNER JOIN cids ON (root_cid=x)
)
SELECT x,y FROM cids
",
)?
.query_map([cid], |row| {
let cid_str: String = row.get(0)?;
let data: Vec<u8> = row.get(1)?;
let links = match self.get_links_by_cid(&cid_str) {
Ok(links) => links,
Err(_) => vec![],
};
Ok(StoredBlock {
cid: cid_str,
data,
links,
})
})?
.filter_map(|b| b.ok())
.collect();

Ok(blocks)
}
}

#[cfg(test)]
Expand Down
111 changes: 83 additions & 28 deletions local-storage/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ pub struct Storage {
}

// TODO: Make this configurable
const BLOCK_SIZE: usize = 256;
// Changing to 1MB to optimize for larger files
const BLOCK_SIZE: usize = 1024 * 100;

impl Storage {
pub fn new(provider: Box<dyn StorageProvider>) -> Self {
Expand All @@ -39,6 +40,8 @@ impl Storage {
let blocks = blocks?;
let mut root_cid: Option<String> = None;

let mut stored_blocks = vec![];

blocks.iter().for_each(|b| {
let links = b
.links()
Expand All @@ -50,13 +53,22 @@ impl Storage {
data: b.data().to_vec(),
links,
};
// First validate each block
if let Err(e) = stored.validate() {
error!("Failed to validate {}, {e}", b.cid());
}
if let Err(e) = self.provider.import_block(&stored) {
error!("Failed to import block {e}");
}
if !stored.links.is_empty() {
root_cid = Some(stored.cid);
root_cid = Some(stored.cid.clone());
}
stored_blocks.push(stored);
});
info!("Validating imported blocks {}", blocks.len());
if let Err(e) = crate::block::validate_dag(&stored_blocks) {
error!("Failed to validate dag on import: {e}");
}
if blocks.len() == 1 {
if let Some(first) = blocks.first() {
root_cid = Some(first.cid().to_string());
Expand All @@ -77,7 +89,7 @@ impl Storage {
bail!(StorageError::DagIncomplete(cid.to_string()))
}
// Fetch all blocks tied to links under given cid
let child_blocks = self.get_all_blocks_under_cid(cid)?;
let child_blocks = self.get_all_dag_blocks(cid)?;
// Open up file path for writing
let mut output_file = FsFile::create(path)?;
// Walk the StoredBlocks and write out to path
Expand All @@ -102,31 +114,12 @@ impl Storage {
self.provider.get_block_by_cid(cid)
}

pub fn get_all_blocks_under_cid(&self, cid: &str) -> Result<Vec<StoredBlock>> {
// Get StoredBlock by cid and check for links
let root_block = self.provider.get_block_by_cid(cid)?;
// If links, grab all appropriate StoredBlocks
let mut child_blocks = vec![];
for link in root_block.links {
let block = self.provider.get_block_by_cid(&link)?;
if !block.links.is_empty() {
child_blocks.append(&mut self.get_all_blocks_under_cid(&block.cid)?);
}
child_blocks.push(block);
}
Ok(child_blocks)
pub fn get_all_dag_cids(&self, cid: &str) -> Result<Vec<String>> {
self.provider.get_all_dag_cids(cid)
}

pub fn get_dag_blocks(&self, cid: &str) -> Result<Vec<StoredBlock>> {
// Get StoredBlock by cid and check for links
let root_block = self.provider.get_block_by_cid(cid)?;
// If links, grab all appropriate StoredBlocks
let mut blocks = vec![];
for link in &root_block.links {
blocks.push(self.provider.get_block_by_cid(link)?);
}
blocks.push(root_block);
Ok(blocks)
pub fn get_all_dag_blocks(&self, cid: &str) -> Result<Vec<StoredBlock>> {
self.provider.get_all_dag_blocks(cid)
}

pub fn import_block(&self, block: &StoredBlock) -> Result<()> {
Expand All @@ -141,6 +134,19 @@ impl Storage {
pub fn list_available_dags(&self) -> Result<Vec<String>> {
self.provider.list_available_dags()
}

pub fn get_dag_blocks_by_window(
&self,
cid: &str,
window_size: u32,
window_num: u32,
) -> Result<Vec<StoredBlock>> {
println!("offset = {} * {}", window_size, window_num);
let offset = window_size * window_num;

self.provider
.get_dag_blocks_by_window(cid, offset, window_size)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -198,7 +204,7 @@ pub mod tests {
test_file
.write_binary(
"654684646847616846846876168468416874616846416846846186468464684684648684684"
.repeat(10)
.repeat(500)
.as_bytes(),
)
.unwrap();
Expand All @@ -217,7 +223,7 @@ pub mod tests {

#[test]
pub fn export_from_storage_various_file_sizes_binary_data() {
for size in [100, 200, 300, 500, 1000] {
for size in [100, 200, 300, 500, 1_000] {
let harness = TestHarness::new();
let temp_dir = assert_fs::TempDir::new().unwrap();
let test_file = temp_dir.child("data.txt");
Expand All @@ -243,6 +249,55 @@ pub mod tests {
}
}

#[test]
pub fn test_get_dag_blocks_by_window() {
let harness = TestHarness::new();
let temp_dir = assert_fs::TempDir::new().unwrap();
let test_file = temp_dir.child("data.txt");

let data_size = BLOCK_SIZE * 50;
let mut data = Vec::<u8>::new();
data.resize(data_size, 1);
thread_rng().fill_bytes(&mut data);

test_file.write_binary(&data).unwrap();
let cid = harness.storage.import_path(test_file.path()).unwrap();

let window_size: u32 = 10;
let mut window_num = 0;

let all_dag_blocks = harness.storage.get_all_dag_blocks(&cid).unwrap();

for chunk in all_dag_blocks.chunks(window_size as usize).into_iter() {
let window_blocks = harness
.storage
.get_dag_blocks_by_window(&cid, window_size, window_num)
.unwrap();
assert_eq!(chunk, &window_blocks);
window_num += 1;
}
}

#[test]
pub fn compare_get_blocks_to_get_cids() {
let harness = TestHarness::new();
let temp_dir = assert_fs::TempDir::new().unwrap();
let test_file = temp_dir.child("data.txt");

let data_size = BLOCK_SIZE * 50;
let mut data = Vec::<u8>::new();
data.resize(data_size, 1);
thread_rng().fill_bytes(&mut data);

test_file.write_binary(&data).unwrap();
let cid = harness.storage.import_path(test_file.path()).unwrap();

let blocks = harness.storage.get_all_dag_blocks(&cid).unwrap();
let cids = harness.storage.get_all_dag_cids(&cid).unwrap();

assert_eq!(blocks.len(), cids.len());
}

// TODO: duplicated data is not being handled correctly right now, need to fix this
// #[test]
// pub fn export_from_storage_various_file_sizes_duplicated_data() {
Expand Down
Loading

0 comments on commit 67011ec

Please sign in to comment.