Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into feat/tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
oblique committed Sep 11, 2023
2 parents d7f551d + bd6394b commit 2ece4ea
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 159 deletions.
18 changes: 10 additions & 8 deletions celestia/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,23 @@ pub async fn run() -> Result<()> {
let _guard = init_tracing();

let bridge_ma = fetch_bridge_multiaddr().await?;
let local_keypair = identity::Keypair::generate_ed25519();
let p2p_local_keypair = identity::Keypair::generate_ed25519();

let transport = tcp::tokio::Transport::default()
let p2p_transport = tcp::tokio::Transport::default()
.upgrade(Version::V1Lazy)
.authenticate(noise::Config::new(&local_keypair)?)
.authenticate(noise::Config::new(&p2p_local_keypair)?)
.multiplex(yamux::Config::default())
.boxed();

let _node = Node::new(NodeConfig {
transport,
network_id: "private".to_string(),
local_keypair,
bootstrap_peers: vec![bridge_ma],
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse()?],
});
p2p_transport,
p2p_local_keypair,
p2p_bootstrap_peers: vec![bridge_ma],
p2p_listen_on: vec![],
})
.await
.unwrap();

Ok(())
}
Expand Down
17 changes: 17 additions & 0 deletions node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,24 @@
use async_trait::async_trait;

mod exchange;
mod executor;
pub mod node;
pub mod p2p;
pub mod store;
pub mod syncer;
mod utils;

#[async_trait]
pub trait Service: Send + Sync {
type Args;
type Command;
type Error;

async fn start(args: Self::Args) -> Result<Self, Self::Error>
where
Self: Sized;

async fn stop(&self) -> Result<(), Self::Error>;

async fn send_command(&self, cmd: Self::Command) -> Result<(), Self::Error>;
}
114 changes: 57 additions & 57 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,81 +10,81 @@ use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::transport::Boxed;
use libp2p::identity::Keypair;
use libp2p::{Multiaddr, PeerId};
use tokio::select;
use tokio::sync::RwLock;

use crate::executor::spawn;
use crate::p2p::{P2p, P2pConfig, P2pError};
use crate::p2p::{P2p, P2pArgs, P2pService};
use crate::store::Store;
use crate::syncer::Syncer;
use crate::syncer::{Syncer, SyncerArgs, SyncerService};

pub struct Node {
pub p2p: Arc<P2p>,
#[derive(Debug, thiserror::Error)]
pub enum NodeError<P2pSrv, SyncerSrv>
where
P2pSrv: P2pService,
SyncerSrv: SyncerService<P2pSrv>,
{
#[error(transparent)]
P2pService(P2pSrv::Error),

#[error(transparent)]
SyncerService(SyncerSrv::Error),
}

pub struct NodeConfig {
pub transport: Boxed<(PeerId, StreamMuxerBox)>,
pub network_id: String,
pub local_keypair: Keypair,
pub bootstrap_peers: Vec<Multiaddr>,
pub listen_on: Vec<Multiaddr>,
}

#[derive(Debug, thiserror::Error)]
pub enum NodeError {
#[error("P2p: {0}")]
P2p(#[from] P2pError),
pub p2p_transport: Boxed<(PeerId, StreamMuxerBox)>,
pub p2p_local_keypair: Keypair,
pub p2p_bootstrap_peers: Vec<Multiaddr>,
pub p2p_listen_on: Vec<Multiaddr>,
}

type Result<T, E = NodeError> = std::result::Result<T, E>;
pub type Node = GenericNode<P2p, Syncer<P2p>>;

#[allow(unused)]
struct Worker {
store: Arc<RwLock<Store>>,
syncer: Syncer,
p2p: Arc<P2p>,
pub struct GenericNode<P2pSrv, SyncerSrv>
where
P2pSrv: P2pService,
SyncerSrv: SyncerService<P2pSrv>,
{
p2p: Arc<P2pSrv>,
syncer: Arc<SyncerSrv>,
}

#[allow(unused)]
enum NodeCmd {}

#[allow(unused)]
enum NodeEvent {}

impl Node {
pub fn new(config: NodeConfig) -> Result<Self> {
impl<P2pSrv, SyncerSrv> GenericNode<P2pSrv, SyncerSrv>
where
P2pSrv: P2pService,
SyncerSrv: SyncerService<P2pSrv>,
{
pub async fn new(config: NodeConfig) -> Result<Self, NodeError<P2pSrv, SyncerSrv>> {
let store = Arc::new(RwLock::new(Store::new()));
let syncer = Syncer::new(store.clone());

let p2p = Arc::new(P2p::new(P2pConfig {
transport: config.transport,
store: store.clone(),
network_id: config.network_id,
local_keypair: config.local_keypair,
bootstrap_peers: config.bootstrap_peers,
listen_on: config.listen_on,
})?);
let p2p = Arc::new(
P2pSrv::start(P2pArgs {
network_id: config.network_id,
transport: config.p2p_transport,
local_keypair: config.p2p_local_keypair,
bootstrap_peers: config.p2p_bootstrap_peers,
listen_on: config.p2p_listen_on,
})
.await
.map_err(NodeError::P2pService)?,
);

spawn({
let p2p = p2p.clone();
async move {
Worker { store, syncer, p2p }.run().await;
}
});
let syncer = Arc::new(
SyncerSrv::start(SyncerArgs {
store: store.clone(),
p2p: p2p.clone(),
})
.await
.map_err(NodeError::SyncerService)?,
);

Ok(Node { p2p })
Ok(GenericNode { p2p, syncer })
}

pub fn p2p(&self) -> &impl P2pService {
&*self.p2p
}
}

impl Worker {
async fn run(&mut self) {
loop {
select! {
Some(_ev) = self.p2p.next_event() => {
// TODO: feed it to syncer
}
// TODO: receive command from `Node` and handle it
}
}
pub fn syncer(&self) -> &impl SyncerService<P2pSrv> {
&*self.syncer
}
}
Loading

0 comments on commit 2ece4ea

Please sign in to comment.