Skip to content

Commit

Permalink
feat(node): Implement exchange client (#63)
Browse files Browse the repository at this point in the history
* feat(node): Implement exchange client

* Implement Exchange client in Behaviour

* remove `todo!`

* Exchange can return multiple HeaderResponse

* add get_single_header test

* rename PeerBook to PeerTracker

* add more logs

* fix wasm compilation

* add more logs

* implement get_verified_header_range_by_height

* polish public API and add more tests

* Address PR comments

* fix clippy issues

* redefine HASH_SIZE from tendermint

* add HeaderRequest helper functions under HeaderRequestExt

* report connection errors

* test that get_header_by_height(0) works
  • Loading branch information
oblique authored Sep 14, 2023
1 parent 0a71b16 commit 3d32c54
Show file tree
Hide file tree
Showing 14 changed files with 826 additions and 85 deletions.
12 changes: 8 additions & 4 deletions celestia/src/bin/celestia-node.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use anyhow::Result;

fn main() -> Result<()> {
celestia::run()
#[cfg(not(target_arch = "wasm32"))]
#[tokio::main]
async fn main() -> anyhow::Result<()> {
celestia::run().await
}

// Placeholder to allow compilation
#[cfg(target_arch = "wasm32")]
fn main() {}
9 changes: 7 additions & 2 deletions celestia/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use std::env;

use anyhow::{Context, Result};
use celestia_node::node::{Node, NodeConfig};
use celestia_node::p2p::P2pService;
use celestia_rpc::prelude::*;
use libp2p::{core::upgrade::Version, identity, noise, tcp, yamux, Multiaddr, Transport};
use tracing::info;

const WS_URL: &str = "ws://localhost:26658";

#[tokio::main]
pub async fn run() -> Result<()> {
let _ = dotenvy::dotenv();
let _guard = init_tracing();
Expand All @@ -22,7 +22,7 @@ pub async fn run() -> Result<()> {
.multiplex(yamux::Config::default())
.boxed();

let _node = Node::new(NodeConfig {
let node = Node::new(NodeConfig {
network_id: "private".to_string(),
p2p_transport,
p2p_local_keypair,
Expand All @@ -32,6 +32,11 @@ pub async fn run() -> Result<()> {
.await
.unwrap();

node.p2p().wait_connected().await?;

let header = node.p2p().get_header_by_height(1).await?;
info!("{header:?}");

Ok(())
}

Expand Down
7 changes: 7 additions & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,10 @@ wasm-bindgen-futures = "0.4.37"
bytes = "1.4.0"
celestia-rpc = { workspace = true }
dotenvy = "0.15.7"
libp2p = { version = "0.52.3", features = [
"ed25519",
"noise",
"tcp",
"tokio",
"yamux",
] }
228 changes: 209 additions & 19 deletions node/src/exchange.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,32 @@
use std::io;
use std::sync::Arc;
use std::task::{Context, Poll};

use async_trait::async_trait;
use celestia_proto::p2p::pb::{HeaderRequest, HeaderResponse};
use celestia_types::ExtendedHeader;
use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use libp2p::request_response::{self, Codec, ProtocolSupport};
use libp2p::StreamProtocol;
use prost::length_delimiter_len;
use prost::Message;

use crate::utils::stream_protocol_id;
use libp2p::{
core::Endpoint,
request_response::{self, Codec, InboundFailure, OutboundFailure, ProtocolSupport},
swarm::{
ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, PollParameters,
THandlerInEvent, THandlerOutEvent, ToSwarm,
},
Multiaddr, PeerId, StreamProtocol,
};
use prost::{length_delimiter_len, Message};
use tracing::instrument;

mod client;
mod server;
mod utils;

use crate::exchange::client::ExchangeClientHandler;
use crate::exchange::server::ExchangeServerHandler;
use crate::p2p::P2pError;
use crate::peer_tracker::PeerTracker;
use crate::utils::{stream_protocol_id, OneshotResultSender};

/// Max request size in bytes
const REQUEST_SIZE_MAXIMUM: usize = 1024;
Expand All @@ -17,22 +35,194 @@ const RESPONSE_SIZE_MAXIMUM: usize = 10 * 1024 * 1024;
/// Maximum length of the protobuf length delimiter in bytes
const PROTOBUF_MAX_LENGTH_DELIMITER_LEN: usize = 10;

pub type Behaviour = request_response::Behaviour<HeaderCodec>;
pub type Event = request_response::Event<HeaderRequest, Vec<HeaderResponse>>;

/// Create a new [`Behaviour`]
pub fn new_behaviour(network: &str) -> Behaviour {
Behaviour::new(
[(
stream_protocol_id(network, "/header-ex/v0.0.3"),
ProtocolSupport::Full,
)],
request_response::Config::default(),
)
type ReqRespBehaviour = request_response::Behaviour<HeaderCodec>;
type ReqRespEvent = request_response::Event<HeaderRequest, Vec<HeaderResponse>>;
type ReqRespMessage = request_response::Message<HeaderRequest, Vec<HeaderResponse>>;

pub(crate) struct ExchangeBehaviour {
req_resp: ReqRespBehaviour,
client_handler: ExchangeClientHandler,
server_handler: ExchangeServerHandler,
}

pub(crate) struct ExchangeConfig<'a> {
pub network_id: &'a str,
pub peer_tracker: Arc<PeerTracker>,
}

#[derive(Debug, thiserror::Error)]
pub enum ExchangeError {
#[error("Header not found")]
HeaderNotFound,

#[error("Invalid response")]
InvalidResponse,

#[error("Invalid request")]
InvalidRequest,

#[error("Inbound failure: {0}")]
InboundFailure(InboundFailure),

#[error("Outbound failure: {0}")]
OutboundFailure(OutboundFailure),
}

impl ExchangeBehaviour {
pub(crate) fn new(config: ExchangeConfig<'_>) -> Self {
ExchangeBehaviour {
req_resp: ReqRespBehaviour::new(
[(
stream_protocol_id(config.network_id, "/header-ex/v0.0.3"),
ProtocolSupport::Full,
)],
request_response::Config::default(),
),
client_handler: ExchangeClientHandler::new(config.peer_tracker),
server_handler: ExchangeServerHandler::new(),
}
}

#[instrument(level = "trace", skip(self, respond_to))]
pub(crate) fn send_request(
&mut self,
request: HeaderRequest,
respond_to: OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
) {
self.client_handler
.on_send_request(&mut self.req_resp, request, respond_to);
}

fn on_to_swarm(
&mut self,
ev: ToSwarm<ReqRespEvent, THandlerInEvent<ReqRespBehaviour>>,
) -> Option<ToSwarm<(), THandlerInEvent<Self>>> {
match ev {
ToSwarm::GenerateEvent(ev) => {
self.on_req_resp_event(ev);
None
}
_ => Some(ev.map_out(|_| ())),
}
}

#[instrument(level = "trace", skip_all)]
fn on_req_resp_event(&mut self, ev: ReqRespEvent) {
match ev {
// Received a response for an ongoing outbound request
ReqRespEvent::Message {
message:
ReqRespMessage::Response {
request_id,
response,
},
..
} => {
self.client_handler
.on_response_received(request_id, response);
}

// Failure while client requests
ReqRespEvent::OutboundFailure {
request_id, error, ..
} => {
self.client_handler.on_failure(request_id, error);
}

// Received new inbound request
ReqRespEvent::Message {
message:
ReqRespMessage::Request {
request_id,
request,
channel,
},
..
} => {
self.server_handler
.on_request_received(request_id, request, channel);
}

// Response to inbound request was sent
ReqRespEvent::ResponseSent { request_id, .. } => {
self.server_handler.on_response_sent(request_id);
}

// Failure while server responds
ReqRespEvent::InboundFailure {
request_id, error, ..
} => {
self.server_handler.on_failure(request_id, error);
}
}
}
}

impl NetworkBehaviour for ExchangeBehaviour {
type ConnectionHandler = <ReqRespBehaviour as NetworkBehaviour>::ConnectionHandler;
type ToSwarm = ();

fn handle_established_inbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<Self::ConnectionHandler, ConnectionDenied> {
self.req_resp.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)
}

fn handle_established_outbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
addr: &Multiaddr,
role_override: Endpoint,
) -> Result<Self::ConnectionHandler, ConnectionDenied> {
self.req_resp.handle_established_outbound_connection(
connection_id,
peer,
addr,
role_override,
)
}

fn on_swarm_event(&mut self, event: FromSwarm<'_, Self::ConnectionHandler>) {
self.req_resp.on_swarm_event(event)
}

fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
self.req_resp
.on_connection_handler_event(peer_id, connection_id, event)
}

fn poll(
&mut self,
cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
while let Poll::Ready(ev) = self.req_resp.poll(cx, params) {
if let Some(ev) = self.on_to_swarm(ev) {
return Poll::Ready(ev);
}
}

Poll::Pending
}
}

#[derive(Clone, Copy, Debug, Default)]
pub struct HeaderCodec;
pub(crate) struct HeaderCodec;

#[derive(Debug, thiserror::Error, PartialEq)]
pub enum ReadHeaderError {
Expand Down
Loading

0 comments on commit 3d32c54

Please sign in to comment.