Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: retrieval server & client setup #654

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
621 changes: 550 additions & 71 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ members = [
"storage-provider/client",
"storage-provider/common",
"storage-provider/server",
"storage-retrieval",
"storage/polka-index",
"storagext/cli",
"storagext/lib",
Expand Down Expand Up @@ -51,8 +52,10 @@ async-stream = "0.3.6"
async-trait = "0.1.80"
axum = "0.7.5"
base64 = "0.22.1"
beetswap = "0.4.0"
bitflags = "2.5.0"
blake2b_simd = { version = "1.0.2", default-features = false }
blockstore = "0.7.1"
bls12_381 = "0.8"
bs58 = "0.5.1"
byteorder = "1.5.0"
Expand All @@ -78,6 +81,9 @@ ipld-core = "0.4.1"
ipld-dagpb = "0.2.1"
itertools = "0.13.0"
jsonrpsee = { version = "0.24.7" }
libp2p = "0.54.1"
libp2p-core = "0.42.0"
libp2p-swarm = "0.45.1"
log = { version = "0.4.21", default-features = false }
multihash-codetable = { version = "0.1.1", default-features = false }
num-bigint = { version = "0.4.5", default-features = false }
Expand Down Expand Up @@ -136,6 +142,7 @@ pallet-market = { path = "pallets/market", default-features = false }
pallet-proofs = { path = "pallets/proofs", default-features = false }
pallet-randomness = { path = "pallets/randomness", default-features = false }
pallet-storage-provider = { path = "pallets/storage-provider", default-features = false }
polka-index = { path = "storage/polka-index" }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is it?

polka-storage-proofs = { path = "lib/polka-storage-proofs", default-features = false }
polka-storage-provider-common = { path = "storage-provider/common" }
polka-storage-runtime = { path = "runtime" }
Expand Down
32 changes: 32 additions & 0 deletions storage-retrieval/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[package]
authors.workspace = true
edition.workspace = true
homepage.workspace = true
license-file.workspace = true
name = "storage-retrieval"
repository.workspace = true
version = "0.1.0"

[lints]
workspace = true

[dependencies]
anyhow = { workspace = true }
beetswap = { workspace = true }
blockstore = { workspace = true }
cid = { workspace = true }
futures = { workspace = true }
libp2p = { workspace = true, features = ["macros", "noise", "tcp", "tokio", "yamux"] }
libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
mater = { workspace = true }
polka-index = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "sync", "time"] }
tracing = { workspace = true }

[dev-dependencies]
# multihash = "0.19.3"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# multihash = "0.19.3"

multihash-codetable = { workspace = true, features = ["sha2"] }
tracing-appender = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
Empty file added storage-retrieval/README.md
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty?

Empty file.
89 changes: 89 additions & 0 deletions storage-retrieval/examples/simple_retrieval.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::{str::FromStr, sync::Arc, time::Duration};

use anyhow::Result;
use blockstore::{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, didn't know eiger had this library

block::{Block, CidError},
Blockstore, InMemoryBlockstore,
};
use cid::Cid;
use libp2p::Multiaddr;
use multihash_codetable::{Code, MultihashDigest};
use storage_retrieval::{client::Client, server::Server};
use tokio::time::sleep;

#[tokio::main]
async fn main() -> Result<()> {
// Init tracing
let _guard = init_tracing();

// Setup indexer
// let indexer_path = temp_dir();
// let indexer = Arc::new(RocksDBLid::new(RocksDBStateStoreConfig {
// path: indexer_path,
// })?);

// TODO: Blocks should not be hold in memory. Implement blockstore that can
// source blocks directly from sectors on disk with the help of an index.
let blockstore = Arc::new(InMemoryBlockstore::<64>::new());
blockstore.put(StringBlock("12345".to_string())).await?;

// Setup server
let server = Server::new(blockstore)?;
let listener: Multiaddr = format!("/ip4/127.0.0.1/tcp/8989").parse()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let listener: Multiaddr = format!("/ip4/127.0.0.1/tcp/8989").parse()?;
let server_address: Multiaddr = format!("/ip4/127.0.0.1/tcp/8989").parse()?;


tokio::spawn({
let listener = listener.clone();
async move {
let _ = server.run(vec![listener]).await;
}
});

// TODO: Implement blockstore that persist blocks directly to disk as car file.
let blockstore = Arc::new(InMemoryBlockstore::<64>::new());
let client = Client::new(blockstore, vec![listener])?;

// Payload cid of the car file we want to fetch
// let payload_cid =
// Cid::from_str("bafkreiechz74drg7tg5zswmxf4g2dnwhemlwdv7e3l5ypehdqdwaoyz3dy").unwrap();
let payload_cid =
Cid::from_str("bafkreiczsrdrvoybcevpzqmblh3my5fu6ui3tgag3jm3hsxvvhaxhswpyu").unwrap();
client
.download(payload_cid, sleep(Duration::from_secs(10)))
.await?;

Ok(())
}

struct StringBlock(pub String);

impl Block<64> for StringBlock {
fn cid(&self) -> Result<Cid, CidError> {
const RAW_CODEC: u64 = 0x55;
let hash = Code::Sha2_256.digest(self.0.as_ref());
Ok(Cid::new_v1(RAW_CODEC, hash))
}

fn data(&self) -> &[u8] {
self.0.as_ref()
}
}

fn init_tracing() -> tracing_appender::non_blocking::WorkerGuard {
let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout());

let filter = tracing_subscriber::EnvFilter::builder()
.with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
.from_env_lossy();

tracing_subscriber::fmt()
.event_format(
tracing_subscriber::fmt::format()
.with_file(true)
.with_line_number(true),
)
.with_env_filter(filter)
.with_writer(non_blocking)
.init();

guard
}
189 changes: 189 additions & 0 deletions storage-retrieval/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use std::{collections::HashMap, sync::Arc};

use beetswap::QueryId;
use blockstore::Blockstore;
use cid::Cid;
use futures::{pin_mut, Future, StreamExt};
use libp2p::{Multiaddr, PeerId, Swarm};
use libp2p_core::ConnectedPoint;
use libp2p_swarm::{ConnectionId, DialError, SwarmEvent};
use thiserror::Error;
use tracing::{debug, info, instrument, trace};

use crate::{new_swarm, Behaviour, BehaviourEvent, InitSwarmError};

#[derive(Debug, Error)]
pub enum ClientError {
/// Error occurred while initialing swarm
#[error("Swarm initialization error: {0}")]
InitSwarm(#[from] InitSwarmError),
/// Error occurred when trying to establish or upgrade an outbound connection.
#[error("Dial error: {0}")]
Dial(#[from] DialError),
/// This error indicates that the download was canceled
#[error("Download canceled")]
DownloadCanceled,
}

/// A client is used to download blocks from the storage provider. Single client
/// supports getting a single payload.
pub struct Client<B>
where
B: Blockstore + 'static,
{
// Providers of data
providers: Vec<Multiaddr>,
// Swarm instance
swarm: Swarm<Behaviour<B>>,
/// The in flight block queries. If empty we know that the client received
/// all requested data.
queries: HashMap<QueryId, Cid>,
}

impl<B> Client<B>
where
B: Blockstore,
{
pub fn new(blockstore: Arc<B>, providers: Vec<Multiaddr>) -> Result<Self, ClientError> {
let swarm = new_swarm(blockstore)?;

Ok(Self {
providers,
swarm,
queries: HashMap::new(),
})
}

/// Start download of some content with a payload cid.
pub async fn download(
mut self,
payload_cid: Cid,
cancellation: impl Future<Output = ()>,
) -> Result<(), ClientError> {
// Dial all providers
for provider in self.providers.clone() {
self.swarm.dial(provider)?;
}

// Request the root node of the car file
let query_id = self.swarm.behaviour_mut().bitswap.get(&payload_cid);
self.queries.insert(query_id, payload_cid);

// Pin cancellation future
pin_mut!(cancellation);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? What about CancellationTokens?


loop {
tokio::select! {
// Data download was canceled
_ = &mut cancellation => {
// Return an error as indication that the download was cancelled
return Err(ClientError::DownloadCanceled);
}
// Handle events received when we get some blocks back
event = self.swarm.select_next_some() => {
// Handle event received from the providers
self.on_swarm_event(event).await?;

// if no inflight queries, that means we received
// everything requested.
if self.queries.is_empty() {
info!("Download of payload {payload_cid} finished");
break;
}
}
}
}

Ok(())
}

async fn on_swarm_event(
&mut self,
event: SwarmEvent<BehaviourEvent<B>>,
) -> Result<(), ClientError> {
trace!(?event, "Received swarm event");

match event {
SwarmEvent::ConnectionEstablished {
peer_id,
connection_id,
endpoint,
..
} => {
self.on_peer_connected(peer_id, connection_id, endpoint);
}
SwarmEvent::ConnectionClosed {
peer_id,
connection_id,
..
} => {
self.on_peer_disconnected(peer_id, connection_id)?;
}
SwarmEvent::Behaviour(BehaviourEvent::Bitswap(event)) => {
self.on_bitswap_event(event)?;
}
_ => {
// Nothing to do here
}
}

Ok(())
}

#[instrument(skip_all, fields(peer_id = %peer_id))]
fn on_peer_connected(
&mut self,
peer_id: PeerId,
_connection_id: ConnectionId,
_endpoint: ConnectedPoint,
) {
debug!("Peer connected");

// TODO: Track connections to the storage providers. We need statuses so
// that we know if there is still some peer viable to download data
// from.
}

#[instrument(skip_all, fields(peer_id = %peer_id))]
fn on_peer_disconnected(
&mut self,
peer_id: PeerId,
_connection_id: ConnectionId,
) -> Result<(), ClientError> {
debug!("Peer disconnected");

// TODO: Remove connection from tracked. If there are no established
// connections return an error. The download can never finish.

Ok(())
}

fn on_bitswap_event(&mut self, event: beetswap::Event) -> Result<(), ClientError> {
match event {
beetswap::Event::GetQueryResponse { query_id, data } => {
if let Some(cid) = self.queries.remove(&query_id) {
info!("received response for {cid:?}: {data:?}");
}

// TODO: Extract linked blocks from the cid. Then request those
// new unknown blocks from the providers. Received blocks are
// added automatically to the blockstore used by the client.

// TODO: Figure out how the sequence of blocks is guaranteed. Do
// we request each of them in sequence and wait for each of them
// before requesting for a new one? Is there a better way?
}
beetswap::Event::GetQueryError { query_id, error } => {
if let Some(cid) = self.queries.remove(&query_id) {
info!("received error for {cid:?}: {error}");
}

// TODO: Track errors for blocks. There is a case when no
// providers can have a requested block. In that case we
// should return an error and cancel download.
}
}

Ok(())
}
}
49 changes: 49 additions & 0 deletions storage-retrieval/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
pub mod client;
pub mod server;

use std::{sync::Arc, time::Duration};

use ::blockstore::Blockstore;
use libp2p::{noise, swarm::NetworkBehaviour, tcp, yamux, Swarm, SwarmBuilder};
use thiserror::Error;

const MAX_MULTIHASH_LENGHT: usize = 64;

/// Custom Behaviour used by the server and client.
#[derive(NetworkBehaviour)]
struct Behaviour<B>
where
B: Blockstore + 'static,
{
bitswap: beetswap::Behaviour<MAX_MULTIHASH_LENGHT, B>,
}

/// Error that can occur while initializing a swarm
#[derive(Debug, Error)]
pub enum InitSwarmError {
/// Failed to initialize noise protocol.
#[error("Failed to initialize noise: {0}")]
Noise(#[from] noise::Error),
}

/// Initialize a new swarm with our custom Behaviour.
fn new_swarm<B>(blockstore: Arc<B>) -> Result<Swarm<Behaviour<B>>, InitSwarmError>
where
B: Blockstore + 'static,
{
let swarm = SwarmBuilder::with_new_identity()
.with_tokio()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)?
.with_behaviour(|_| Behaviour {
bitswap: beetswap::Behaviour::new(blockstore),
})
.expect("infallible")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because?

.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

magic number

.build();

Ok(swarm)
}
Loading
Loading