Skip to content

Commit

Permalink
feat: WIP integration of sync and bytes
Browse files Browse the repository at this point in the history
* removes content support from iroh-sync
* adds a quick-and-dirty writable database to iroh-bytes (will be
  replaced with a better generic writable database soon)
* adds a `Downloader` to queue get requests for individual hashes from
  individual peers
* adds a `BlobStore` that combines the writable db with the downloader
* adds a `Doc` abstraction that combines an iroh-sync `Replica` with a
  `BlobStore` to download content from peers on-demand
* updates the sync repl example to plug it all together
* also adds very basic persistence to `Replica` (encode to byte string)
  and uses this in the repl example
  • Loading branch information
Frando committed Jul 12, 2023
1 parent f15583b commit 6a81447
Show file tree
Hide file tree
Showing 12 changed files with 810 additions and 93 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions iroh-bytes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod protocol;
pub mod provider;
pub mod runtime;
pub mod util;
pub mod writable;

#[cfg(test)]
pub(crate) mod test_utils;
Expand Down
178 changes: 178 additions & 0 deletions iroh-bytes/src/writable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
#![allow(missing_docs)]
//! Quick-and-dirty writable database
//!
//! I wrote this while diving into iroh-bytes, wildly copying code around. This will be solved much
//! nicer with the upcoming generic writable database branch by @rklaehn.

use std::{collections::HashMap, io, path::PathBuf, sync::Arc};

use anyhow::Context;
use bytes::Bytes;
use iroh_io::{AsyncSliceWriter, File};
use range_collections::RangeSet2;

use crate::{
get::fsm,
protocol::{GetRequest, RangeSpecSeq, Request},
provider::{create_collection, DataSource, Database, DbEntry, FNAME_PATHS},
Hash,
};

/// A blob database into which new blobs can be inserted.
///
/// Blobs can be inserted either from bytes or by downloading from open connections to peers.
/// New blobs will be saved as files with a filename based on their hash.
///
/// TODO: Replace with the generic writable database.
#[derive(Debug, Clone)]
pub struct WritableFileDatabase {
db: Database,
storage: Arc<StoragePaths>,
}

impl WritableFileDatabase {
pub async fn new(data_path: PathBuf) -> anyhow::Result<Self> {
let storage = Arc::new(StoragePaths::new(data_path).await?);
let db = if storage.db_path.join(FNAME_PATHS).exists() {
Database::load(&storage.db_path).await.with_context(|| {
format!(
"Failed to load iroh database from {}",
storage.db_path.display()
)
})?
} else {
Database::default()
};
Ok(Self { db, storage })
}

pub fn db(&self) -> &Database {
&self.db
}

pub async fn save(&self) -> io::Result<()> {
self.db.save(&self.storage.db_path).await
}

pub async fn put_bytes(&self, data: Bytes) -> anyhow::Result<(Hash, u64)> {
let (hash, size, entry) = self.storage.put_bytes(data).await?;
self.db.union_with(HashMap::from_iter([(hash, entry)]));
Ok((hash, size))
}

pub async fn put_from_temp_file(&self, temp_path: &PathBuf) -> anyhow::Result<(Hash, u64)> {
let (hash, size, entry) = self.storage.move_to_blobs(&temp_path).await?;
self.db.union_with(HashMap::from_iter([(hash, entry)]));
Ok((hash, size))
}

pub async fn get_size(&self, hash: &Hash) -> Option<u64> {
Some(self.db.get(&hash)?.size().await)
}

pub fn has(&self, hash: &Hash) -> bool {
self.db.to_inner().contains_key(hash)
}
pub async fn download_single(
&self,
conn: quinn::Connection,
hash: Hash,
) -> anyhow::Result<Option<(Hash, u64)>> {
// 1. Download to temp file
let temp_path = {
let temp_path = self.storage.temp_path();
let request =
Request::Get(GetRequest::new(hash, RangeSpecSeq::new([RangeSet2::all()])));
let response = fsm::start(conn, request);
let connected = response.next().await?;

let fsm::ConnectedNext::StartRoot(curr) = connected.next().await? else {
return Ok(None)
};
let header = curr.next();

let path = temp_path.clone();
let mut data_file = File::create(move || {
std::fs::OpenOptions::new()
.write(true)
.create(true)
.open(&path)
})
.await?;

let (curr, _size) = header.next().await?;
let _curr = curr.write_all(&mut data_file).await?;
// Flush the data file first, it is the only thing that matters at this point
data_file.sync().await?;
temp_path
};

// 2. Insert into database
let (hash, size, entry) = self.storage.move_to_blobs(&temp_path).await?;
let entries = HashMap::from_iter([(hash, entry)]);
self.db.union_with(entries);
Ok(Some((hash, size)))
}
}

#[derive(Debug)]
pub struct StoragePaths {
blob_path: PathBuf,
temp_path: PathBuf,
db_path: PathBuf,
}

impl StoragePaths {
pub async fn new(data_path: PathBuf) -> anyhow::Result<Self> {
let blob_path = data_path.join("blobs");
let temp_path = data_path.join("temp");
let db_path = data_path.join("db");
tokio::fs::create_dir_all(&blob_path).await?;
tokio::fs::create_dir_all(&temp_path).await?;
tokio::fs::create_dir_all(&db_path).await?;
Ok(Self {
blob_path,
temp_path,
db_path,
})
}

pub async fn put_bytes(&self, data: Bytes) -> anyhow::Result<(Hash, u64, DbEntry)> {
let temp_path = self.temp_path();
tokio::fs::write(&temp_path, &data).await?;
let (hash, size, entry) = self.move_to_blobs(&temp_path).await?;
Ok((hash, size, entry))
}

async fn move_to_blobs(&self, path: &PathBuf) -> anyhow::Result<(Hash, u64, DbEntry)> {
let datasource = DataSource::new(path.clone());
// TODO: this needlessly creates a collection, but that's what's pub atm in iroh-bytes
let (db, _collection_hash) = create_collection(vec![datasource]).await?;
// the actual blob is the first entry in the external entries in the created collection
let (hash, _path, _len) = db.external().next().unwrap();
let Some(DbEntry::External { outboard, size, .. }) = db.get(&hash) else {
unreachable!("just inserted");
};

let final_path = prepare_hash_dir(&self.blob_path, &hash).await?;
tokio::fs::rename(&path, &final_path).await?;
let entry = DbEntry::External {
outboard,
path: final_path,
size,
};
Ok((hash, size, entry))
}

fn temp_path(&self) -> PathBuf {
let name = hex::encode(rand::random::<u64>().to_be_bytes());
self.temp_path.join(name)
}
}

async fn prepare_hash_dir(path: &PathBuf, hash: &Hash) -> anyhow::Result<PathBuf> {
let hash = hex::encode(hash.as_ref());
let path = path.join(&hash[0..2]).join(&hash[2..4]).join(&hash[4..]);
tokio::fs::create_dir_all(path.parent().unwrap()).await?;
Ok(path)
}
2 changes: 1 addition & 1 deletion iroh-gossip/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tracing::{debug, warn};
use self::util::{read_message, write_message, Dialer, Timers};
use crate::proto::{self, PeerData, TopicId};

mod util;
pub mod util;

/// ALPN protocol name
pub const GOSSIP_ALPN: &[u8] = b"n0/iroh-gossip/0";
Expand Down
2 changes: 2 additions & 0 deletions iroh-gossip/src/net/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ pub type DialFuture = BoxFuture<'static, (PeerId, anyhow::Result<quinn::Connecti
///
/// This wraps a [MagicEndpoint], connects to peers through the endpoint, stores
/// the pending connect futures and emits finished connect results.
///
/// TODO: Move to iroh-net
pub struct Dialer {
endpoint: MagicEndpoint,
pending: FuturesUnordered<DialFuture>,
Expand Down
14 changes: 14 additions & 0 deletions iroh-net/src/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ impl From<SecretKey> for Keypair {
#[derive(Clone, PartialEq, Eq, Copy, Serialize, Deserialize, Hash)]
pub struct PeerId(PublicKey);

impl PeerId {
/// Try to create a peer id from a byte array.
///
/// # Warning
///
/// The caller is responsible for ensuring that the bytes passed into this
/// method actually represent a `curve25519_dalek::curve::CompressedEdwardsY`
/// and that said compressed point is actually a point on the curve.
pub fn from_bytes(bytes: &[u8; 32]) -> anyhow::Result<Self> {
let key = PublicKey::from_bytes(bytes)?;
Ok(PeerId(key))
}
}

impl From<PublicKey> for PeerId {
fn from(key: PublicKey) -> Self {
PeerId(key)
Expand Down
1 change: 1 addition & 0 deletions iroh-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ derive_more = { version = "0.99.17", git = "https://github.com/JelteF/derive_mor
ed25519-dalek = { version = "2.0.0-rc.2", features = ["serde", "rand_core"] }
iroh-bytes = { path = "../iroh-bytes", version = "0.4" }
once_cell = "1.18.0"
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
rand = "0.8.5"
rand_core = "0.6.4"
serde = { version = "1.0.164", features = ["derive"] }
Expand Down
9 changes: 5 additions & 4 deletions iroh-sync/src/ranger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,11 +401,12 @@ where

/// Processes an incoming message and produces a response.
/// If terminated, returns `None`
pub fn process_message(&mut self, message: Message<K, V>) -> Option<Message<K, V>> {
pub fn process_message(&mut self, message: Message<K, V>) -> (Vec<K>, Option<Message<K, V>>) {
let mut out = Vec::new();

// TODO: can these allocs be avoided?
let mut items = Vec::new();
let mut inserted = Vec::new();
let mut fingerprints = Vec::new();
for part in message.parts {
match part {
Expand All @@ -431,7 +432,6 @@ where
Some(
self.store
.get_range(range.clone(), self.limit.clone())
.into_iter()
.filter(|(k, _)| !values.iter().any(|(vk, _)| &vk == k))
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
Expand All @@ -440,6 +440,7 @@ where

// Store incoming values
for (k, v) in values {
inserted.push(k.clone());
self.store.put(k, v);
}

Expand Down Expand Up @@ -546,9 +547,9 @@ where

// If we have any parts, return a message
if !out.is_empty() {
Some(Message { parts: out })
(inserted, Some(Message { parts: out }))
} else {
None
(inserted, None)
}
}

Expand Down
Loading

0 comments on commit 6a81447

Please sign in to comment.