Skip to content

Commit

Permalink
Box -> generics
Browse files Browse the repository at this point in the history
  • Loading branch information
fl0rek committed Sep 20, 2023
1 parent 00f886a commit af28a41
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 59 deletions.
4 changes: 4 additions & 0 deletions celestia/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::env;
use anyhow::{Context, Result};
use celestia_node::node::{Node, NodeConfig};
use celestia_node::p2p::P2pService;
use celestia_node::store::InMemoryStore;
use celestia_rpc::prelude::*;
use libp2p::{core::upgrade::Version, identity, noise, tcp, yamux, Multiaddr, Transport};
use tracing::info;
Expand All @@ -13,6 +14,8 @@ pub async fn run() -> Result<()> {
let _ = dotenvy::dotenv();
let _guard = init_tracing();

let store = InMemoryStore::new();

let bridge_ma = fetch_bridge_multiaddr().await?;
let p2p_local_keypair = identity::Keypair::generate_ed25519();

Expand All @@ -28,6 +31,7 @@ pub async fn run() -> Result<()> {
p2p_local_keypair,
p2p_bootstrap_peers: vec![bridge_ma],
p2p_listen_on: vec![],
store,
})
.await
.unwrap();
Expand Down
16 changes: 8 additions & 8 deletions node/src/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::exchange::client::ExchangeClientHandler;
use crate::exchange::server::ExchangeServerHandler;
use crate::p2p::P2pError;
use crate::peer_tracker::PeerTracker;
use crate::store::BoxedStore;
use crate::store::Store;
use crate::utils::{stream_protocol_id, OneshotResultSender};

/// Max request size in bytes
Expand All @@ -40,16 +40,16 @@ type ReqRespBehaviour = request_response::Behaviour<HeaderCodec>;
type ReqRespEvent = request_response::Event<HeaderRequest, Vec<HeaderResponse>>;
type ReqRespMessage = request_response::Message<HeaderRequest, Vec<HeaderResponse>>;

pub(crate) struct ExchangeBehaviour {
pub(crate) struct ExchangeBehaviour<S: Store + 'static> {
req_resp: ReqRespBehaviour,
client_handler: ExchangeClientHandler,
server_handler: ExchangeServerHandler,
server_handler: ExchangeServerHandler<S>,
}

pub(crate) struct ExchangeConfig<'a> {
pub(crate) struct ExchangeConfig<'a, S: Store> {
pub network_id: &'a str,
pub peer_tracker: Arc<PeerTracker>,
pub header_store: Arc<BoxedStore>,
pub header_store: Arc<S>,
}

#[derive(Debug, thiserror::Error)]
Expand All @@ -70,8 +70,8 @@ pub enum ExchangeError {
OutboundFailure(OutboundFailure),
}

impl ExchangeBehaviour {
pub(crate) fn new(config: ExchangeConfig<'_>) -> Self {
impl<S: Store + 'static> ExchangeBehaviour<S> {
pub(crate) fn new(config: ExchangeConfig<'_, S>) -> Self {
ExchangeBehaviour {
req_resp: ReqRespBehaviour::new(
[(
Expand Down Expand Up @@ -164,7 +164,7 @@ impl ExchangeBehaviour {
}
}

impl NetworkBehaviour for ExchangeBehaviour {
impl<S: Store + 'static> NetworkBehaviour for ExchangeBehaviour<S> {
type ConnectionHandler = <ReqRespBehaviour as NetworkBehaviour>::ConnectionHandler;
type ToSwarm = ();

Expand Down
10 changes: 5 additions & 5 deletions node/src/exchange/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ use libp2p::{
};
use tracing::instrument;

use crate::store::BoxedStore;
use crate::store::Store;

pub(super) struct ExchangeServerHandler {
_store: Arc<BoxedStore>,
pub(super) struct ExchangeServerHandler<S: Store> {
_store: Arc<S>,
}

impl ExchangeServerHandler {
pub(super) fn new(store: Arc<BoxedStore>) -> Self {
impl<S: Store> ExchangeServerHandler<S> {
pub(super) fn new(store: Arc<S>) -> Self {
ExchangeServerHandler { _store: store }
}

Expand Down
48 changes: 31 additions & 17 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
//! [`Store`]: crate::store::Store
//! [`Syncer`]: crate::syncer::Syncer

use std::convert::Infallible;
use std::marker::PhantomData;
use std::sync::Arc;

use libp2p::core::muxing::StreamMuxerBox;
Expand All @@ -12,48 +14,56 @@ use libp2p::identity::Keypair;
use libp2p::{Multiaddr, PeerId};

use crate::p2p::{P2p, P2pArgs, P2pService};
use crate::store::{BoxedStore, InMemoryStore};
use crate::store::Store;
use crate::syncer::{Syncer, SyncerArgs, SyncerService};

#[derive(Debug, thiserror::Error)]
pub enum NodeError<P2pSrv, SyncerSrv>
pub enum NodeError<P2pSrv, SyncerSrv, S>
where
P2pSrv: P2pService,
SyncerSrv: SyncerService<P2pSrv>,
S: Store,
P2pSrv: P2pService<S>,
SyncerSrv: SyncerService<P2pSrv, S>,
{
#[error(transparent)]
P2pService(P2pSrv::Error),

#[error(transparent)]
SyncerService(SyncerSrv::Error),

#[error("should not happen")]
_Unreachable(Infallible, PhantomData<S>),
}

pub struct NodeConfig {
pub struct NodeConfig<S: Store> {
pub network_id: String,
pub p2p_transport: Boxed<(PeerId, StreamMuxerBox)>,
pub p2p_local_keypair: Keypair,
pub p2p_bootstrap_peers: Vec<Multiaddr>,
pub p2p_listen_on: Vec<Multiaddr>,
pub store: S,
}

pub type Node = GenericNode<P2p, Syncer<P2p>>;
pub type Node<S> = GenericNode<P2p<S>, Syncer<P2p<S>, S>, S>;

pub struct GenericNode<P2pSrv, SyncerSrv>
pub struct GenericNode<P2pSrv, SyncerSrv, S>
where
P2pSrv: P2pService,
SyncerSrv: SyncerService<P2pSrv>,
S: Store,
P2pSrv: P2pService<S>,
SyncerSrv: SyncerService<P2pSrv, S>,
{
p2p: Arc<P2pSrv>,
syncer: Arc<SyncerSrv>,
_store: PhantomData<S>,
}

impl<P2pSrv, SyncerSrv> GenericNode<P2pSrv, SyncerSrv>
impl<P2pSrv, SyncerSrv, S> GenericNode<P2pSrv, SyncerSrv, S>
where
P2pSrv: P2pService,
SyncerSrv: SyncerService<P2pSrv>,
S: Store,
P2pSrv: P2pService<S>,
SyncerSrv: SyncerService<P2pSrv, S>,
{
pub async fn new(config: NodeConfig) -> Result<Self, NodeError<P2pSrv, SyncerSrv>> {
let store: Arc<BoxedStore> = Arc::new(Box::new(InMemoryStore::new()));
pub async fn new(config: NodeConfig<S>) -> Result<Self, NodeError<P2pSrv, SyncerSrv, S>> {
let store = Arc::new(config.store);

let p2p = Arc::new(
P2pSrv::start(P2pArgs {
Expand All @@ -77,14 +87,18 @@ where
.map_err(NodeError::SyncerService)?,
);

Ok(GenericNode { p2p, syncer })
Ok(GenericNode {
p2p,
syncer,
_store: PhantomData,
})
}

pub fn p2p(&self) -> &impl P2pService {
pub fn p2p(&self) -> &impl P2pService<S> {
&*self.p2p
}

pub fn syncer(&self) -> &impl SyncerService<P2pSrv> {
pub fn syncer(&self) -> &impl SyncerService<P2pSrv, S> {
&*self.syncer
}
}
45 changes: 28 additions & 17 deletions node/src/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::io;
use std::marker::PhantomData;
use std::sync::Arc;

use async_trait::async_trait;
Expand All @@ -22,7 +23,7 @@ use tracing::{debug, info, instrument, warn};
use crate::exchange::{ExchangeBehaviour, ExchangeConfig};
use crate::executor::{spawn, Executor};
use crate::peer_tracker::PeerTracker;
use crate::store::BoxedStore;
use crate::store::Store;
use crate::utils::{gossipsub_ident_topic, OneshotResultSender, OneshotSenderExt};
use crate::Service;

Expand Down Expand Up @@ -64,17 +65,18 @@ impl From<oneshot::error::RecvError> for P2pError {
}

#[derive(Debug)]
pub struct P2p {
pub struct P2p<S> {
cmd_tx: flume::Sender<P2pCmd>,
_store: PhantomData<S>,
}

pub struct P2pArgs {
pub struct P2pArgs<S> {
pub transport: Boxed<(PeerId, StreamMuxerBox)>,
pub network_id: String,
pub local_keypair: Keypair,
pub bootstrap_peers: Vec<Multiaddr>,
pub listen_on: Vec<Multiaddr>,
pub store: Arc<BoxedStore>,
pub store: Arc<S>,
}

#[doc(hidden)]
Expand All @@ -93,20 +95,23 @@ pub enum P2pCmd {
}

#[async_trait]
impl Service for P2p {
impl<S: Store + Sync + Send + 'static> Service for P2p<S> {
type Command = P2pCmd;
type Args = P2pArgs;
type Args = P2pArgs<S>;
type Error = P2pError;

async fn start(args: P2pArgs) -> Result<Self, P2pError> {
async fn start(args: P2pArgs<S>) -> Result<Self, P2pError> {
let (cmd_tx, cmd_rx) = flume::bounded(16);
let mut worker = Worker::new(args, cmd_rx)?;

spawn(async move {
worker.run().await;
});

Ok(P2p { cmd_tx })
Ok(P2p {
cmd_tx,
_store: PhantomData,
})
}

async fn stop(&self) -> Result<()> {
Expand All @@ -123,7 +128,7 @@ impl Service for P2p {
}

#[async_trait]
pub trait P2pService: Service<Args = P2pArgs, Command = P2pCmd, Error = P2pError> {
pub trait P2pService<S>: Service<Args = P2pArgs<S>, Command = P2pCmd, Error = P2pError> {
async fn wait_connected(&self) -> Result<()> {
let (tx, rx) = oneshot::channel();

Expand Down Expand Up @@ -213,27 +218,33 @@ pub trait P2pService: Service<Args = P2pArgs, Command = P2pCmd, Error = P2pError
}

#[async_trait]
impl P2pService for P2p {}
impl<S: Store + Sync + Send + 'static> P2pService<S> for P2p<S> {}

/// Our network behaviour.
#[derive(NetworkBehaviour)]
struct Behaviour {
struct Behaviour<S>
where
S: Store + 'static,
{
identify: identify::Behaviour,
header_ex: ExchangeBehaviour,
header_ex: ExchangeBehaviour<S>,
keep_alive: keep_alive::Behaviour,
gossipsub: gossipsub::Behaviour,
}

struct Worker {
swarm: Swarm<Behaviour>,
struct Worker<S: Store + 'static> {
swarm: Swarm<Behaviour<S>>,
header_sub_topic_hash: TopicHash,
cmd_rx: flume::Receiver<P2pCmd>,
peer_tracker: Arc<PeerTracker>,
wait_connected_tx: Option<Vec<oneshot::Sender<()>>>,
}

impl Worker {
fn new(args: P2pArgs, cmd_rx: flume::Receiver<P2pCmd>) -> Result<Self, P2pError> {
impl<S> Worker<S>
where
S: Store + 'static,
{
fn new(args: P2pArgs<S>, cmd_rx: flume::Receiver<P2pCmd>) -> Result<Self, P2pError> {
let peer_tracker = Arc::new(PeerTracker::new());
let local_peer_id = PeerId::from(args.local_keypair.public());

Expand Down Expand Up @@ -312,7 +323,7 @@ impl Worker {
#[instrument(level = "trace", skip(self))]
async fn on_swarm_event(
&mut self,
ev: SwarmEvent<BehaviourEvent, THandlerErr<Behaviour>>,
ev: SwarmEvent<BehaviourEvent<S>, THandlerErr<Behaviour<S>>>,
) -> Result<()> {
match ev {
SwarmEvent::Behaviour(ev) => match ev {
Expand Down
Loading

0 comments on commit af28a41

Please sign in to comment.