Skip to content

Commit

Permalink
Refactored P2P Service to implement RunnableService trait and use `…
Browse files Browse the repository at this point in the history
…ServiceRunner` (#872)

* Refactored P2P Service to implement `RunnableService` trait and use `ServiceRunner`.

* Apply PR comments

* update event matching and rename GetPeersIds to GetPeerIds

* Removed `async_trait` from `PeerToPeer`

* remove panic from service

* Moved `Database` from `fuel_core_txpool::SharedState` into `TxPool` because it is only consumer of it.

* add todo, fix error

Co-authored-by: Voxelot <brandonkite92@gmail.com>
  • Loading branch information
xgreenx and Voxelot authored Dec 23, 2022
1 parent bd3f691 commit 09e9c25
Show file tree
Hide file tree
Showing 23 changed files with 483 additions and 580 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions bin/fuel-core/src/cli/run/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use fuel_core::{
p2p::{
config::{
convert_to_libp2p_keypair,
Config,
NotInitialized,
P2PConfig,
},
gossipsub_config::default_gossipsub_builder,
Multiaddr,
Expand Down Expand Up @@ -136,7 +136,7 @@ pub struct P2PArgs {
}

impl P2PArgs {
pub fn into_config(self, metrics: bool) -> anyhow::Result<P2PConfig<NotInitialized>> {
pub fn into_config(self, metrics: bool) -> anyhow::Result<Config<NotInitialized>> {
let local_keypair = {
match self.keypair {
Some(path) => {
Expand Down Expand Up @@ -179,7 +179,7 @@ impl P2PArgs {
Some(Duration::from_secs(self.random_walk))
};

Ok(P2PConfig {
Ok(Config {
keypair: local_keypair,
network_name: self.network,
checksum: Default::default(),
Expand Down
43 changes: 8 additions & 35 deletions crates/fuel-core/src/service/adapters.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,11 @@
use crate::{
database::Database,
service::{
modules::TxPoolService,
Config,
},
service::Config,
};
#[cfg(feature = "p2p")]
use fuel_core_p2p::service::Service as P2PService;
#[cfg(feature = "relayer")]
use fuel_core_relayer::RelayerSynced;
use fuel_core_txpool::service::SharedState as TxPoolSharedState;
use fuel_core_types::blockchain::SealedBlock;
use std::sync::Arc;
use tokio::{
sync::broadcast::Sender,
task::JoinHandle,
};
use tokio::sync::broadcast::Sender;

pub mod poa;
pub mod producer;
Expand All @@ -30,11 +21,11 @@ pub struct BlockImportAdapter {
}

pub struct TxPoolAdapter {
service: TxPoolService,
service: TxPoolSharedState<P2PAdapter, Database>,
}

impl TxPoolAdapter {
pub fn new(service: TxPoolService) -> Self {
pub fn new(service: TxPoolSharedState<P2PAdapter, Database>) -> Self {
Self { service }
}
}
Expand All @@ -47,7 +38,7 @@ pub struct ExecutorAdapter {
pub struct MaybeRelayerAdapter {
pub database: Database,
#[cfg(feature = "relayer")]
pub relayer_synced: Option<RelayerSynced>,
pub relayer_synced: Option<fuel_core_relayer::RelayerSynced>,
}

pub struct BlockProducerAdapter {
Expand All @@ -57,7 +48,7 @@ pub struct BlockProducerAdapter {
#[cfg(feature = "p2p")]
#[derive(Clone)]
pub struct P2PAdapter {
service: Arc<P2PService>,
service: fuel_core_p2p::service::SharedState,
}

#[cfg(not(feature = "p2p"))]
Expand All @@ -66,32 +57,14 @@ pub struct P2PAdapter;

#[cfg(feature = "p2p")]
impl P2PAdapter {
pub fn new(service: Arc<P2PService>) -> Self {
pub fn new(service: fuel_core_p2p::service::SharedState) -> Self {
Self { service }
}

pub async fn stop(&self) -> Option<JoinHandle<()>> {
self.service.stop().await
}

pub async fn start(&self) -> anyhow::Result<()> {
self.service.start().await
}
}

#[cfg(not(feature = "p2p"))]
impl P2PAdapter {
pub fn new() -> Self {
Default::default()
}

pub async fn stop(&self) -> Option<JoinHandle<()>> {
None
}

pub async fn start(&self) -> anyhow::Result<()> {
Ok(())
}
}

// TODO: Create generic `Service` type that support `start` and `stop`.
8 changes: 4 additions & 4 deletions crates/fuel-core/src/service/adapters/poa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ use fuel_core_types::{

impl TransactionPool for TxPoolAdapter {
fn pending_number(&self) -> usize {
self.service.shared.pending_number()
self.service.pending_number()
}

fn total_consumable_gas(&self) -> u64 {
self.service.shared.total_consumable_gas()
self.service.total_consumable_gas()
}

fn remove_txs(&self, ids: Vec<TxId>) -> Vec<ArcPoolTx> {
self.service.shared.remove_txs(ids)
self.service.remove_txs(ids)
}

fn transaction_status_events(&self) -> BoxStream<TxStatus> {
Expand All @@ -44,7 +44,7 @@ impl TransactionPool for TxPoolAdapter {
StreamExt,
};
Box::pin(
BroadcastStream::new(self.service.shared.tx_status_subscribe())
BroadcastStream::new(self.service.tx_status_subscribe())
.filter_map(|result| result.ok()),
)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/service/adapters/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl TxPool for TxPoolAdapter {
_block_height: BlockHeight,
max_gas: u64,
) -> Vec<ArcPoolTx> {
self.service.shared.select_transactions(max_gas)
self.service.select_transactions(max_gas)
}
}

Expand Down
14 changes: 5 additions & 9 deletions crates/fuel-core/src/service/adapters/txpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::service::adapters::{
BlockImportAdapter,
P2PAdapter,
};
use async_trait::async_trait;
use fuel_core_services::stream::BoxStream;
use fuel_core_txpool::ports::BlockImport;
use fuel_core_types::{
Expand Down Expand Up @@ -35,7 +34,6 @@ impl BlockImport for BlockImportAdapter {
}

#[cfg(feature = "p2p")]
#[async_trait]
impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
type GossipedTransaction = TransactionGossipData;

Expand All @@ -54,19 +52,17 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
)
}

async fn notify_gossip_transaction_validity(
fn notify_gossip_transaction_validity(
&self,
message: &Self::GossipedTransaction,
validity: GossipsubMessageAcceptance,
) {
) -> anyhow::Result<()> {
self.service
.notify_gossip_transaction_validity(message, validity)
.await;
}
}

#[cfg(not(feature = "p2p"))]
#[async_trait]
impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
type GossipedTransaction = TransactionGossipData;

Expand All @@ -81,11 +77,11 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
Box::pin(fuel_core_services::stream::pending())
}

async fn notify_gossip_transaction_validity(
fn notify_gossip_transaction_validity(
&self,
_message: &Self::GossipedTransaction,
_validity: GossipsubMessageAcceptance,
) {
// no-op
) -> anyhow::Result<()> {
Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/fuel-core/src/service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use strum_macros::{

#[cfg(feature = "p2p")]
use fuel_core_p2p::config::{
Config as P2PConfig,
NotInitialized,
P2PConfig,
};

#[derive(Clone, Debug)]
Expand Down
23 changes: 13 additions & 10 deletions crates/fuel-core/src/service/modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ pub type PoAService =
fuel_core_poa::Service<Database, TxPoolAdapter, BlockProducerAdapter>;
#[cfg(feature = "relayer")]
pub type RelayerService = fuel_core_relayer::Service<Database>;
pub type TxPoolService = fuel_core_txpool::Service<P2PAdapter>;
#[cfg(feature = "p2p")]
pub type P2PService = fuel_core_p2p::service::Service<Database>;
pub type TxPoolService = fuel_core_txpool::Service<P2PAdapter, Database>;

pub struct Modules {
pub txpool: TxPoolService,
Expand All @@ -36,15 +38,15 @@ pub struct Modules {
#[cfg(feature = "relayer")]
pub relayer: Option<RelayerService>,
#[cfg(feature = "p2p")]
pub network_service: P2PAdapter,
pub network_service: P2PService,
}

impl Modules {
pub async fn stop(&self) {
self.consensus_module.stop_and_await().await.unwrap();
self.txpool.stop_and_await().await.unwrap();
#[cfg(feature = "p2p")]
self.network_service.stop().await;
self.network_service.stop_and_await().await.unwrap();
}
}

Expand All @@ -71,16 +73,15 @@ pub async fn start_modules(
let genesis = p2p_db.get_genesis()?;
let p2p_config = config.p2p.clone().init(genesis)?;

Arc::new(fuel_core_p2p::service::Service::new(p2p_config, p2p_db))
fuel_core_p2p::service::new_service(p2p_config, p2p_db)
};

#[cfg(feature = "p2p")]
let p2p_adapter = P2PAdapter::new(network_service);
let p2p_adapter = P2PAdapter::new(network_service.shared.clone());
#[cfg(not(feature = "p2p"))]
let p2p_adapter = P2PAdapter::new();

let p2p_adapter = p2p_adapter;
p2p_adapter.start().await?;

let importer_adapter = BlockImportAdapter::new(block_import_tx);

Expand All @@ -89,7 +90,7 @@ pub async fn start_modules(
database.clone(),
TxStatusChange::new(100),
importer_adapter.clone(),
p2p_adapter.clone(),
p2p_adapter,
);

// restrict the max number of concurrent dry runs to the number of CPUs
Expand All @@ -98,7 +99,7 @@ pub async fn start_modules(
let block_producer = Arc::new(fuel_core_producer::Producer {
config: config.block_producer.clone(),
db: database.clone(),
txpool: Box::new(TxPoolAdapter::new(txpool_service.clone())),
txpool: Box::new(TxPoolAdapter::new(txpool_service.shared.clone())),
executor: Arc::new(ExecutorAdapter {
database: database.clone(),
config: config.clone(),
Expand All @@ -122,7 +123,7 @@ pub async fn start_modules(
signing_key: config.consensus_key.clone(),
metrics: false,
},
TxPoolAdapter::new(txpool_service.clone()),
TxPoolAdapter::new(txpool_service.shared.clone()),
// TODO: Pass Importer
importer_adapter.tx,
BlockProducerAdapter {
Expand All @@ -138,6 +139,8 @@ pub async fn start_modules(
relayer.start().expect("Should start relayer")
}
txpool_service.start()?;
#[cfg(feature = "p2p")]
network_service.start()?;

Ok(Modules {
txpool: txpool_service,
Expand All @@ -146,6 +149,6 @@ pub async fn start_modules(
#[cfg(feature = "relayer")]
relayer,
#[cfg(feature = "p2p")]
network_service: p2p_adapter,
network_service,
})
}
1 change: 1 addition & 0 deletions crates/services/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ async-trait = "0.1"
bincode = "1.3"
fuel-core-chain-config = { path = "../../chain-config", version = "0.15.1" }
fuel-core-metrics = { path = "../../metrics", version = "0.15.1" } # TODO make this a feature
fuel-core-services = { path = "../../services", version = "0.15.1" }
fuel-core-storage = { path = "../../storage", version = "0.15.1" }
fuel-core-types = { path = "../../types", features = [
"serde",
Expand Down
4 changes: 2 additions & 2 deletions crates/services/p2p/src/behavior.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
codecs::NetworkCodec,
config::P2PConfig,
config::Config,
discovery::{
DiscoveryBehaviour,
DiscoveryConfig,
Expand Down Expand Up @@ -71,7 +71,7 @@ pub struct FuelBehaviour<Codec: NetworkCodec> {
}

impl<Codec: NetworkCodec> FuelBehaviour<Codec> {
pub fn new(p2p_config: &P2PConfig, codec: Codec) -> Self {
pub fn new(p2p_config: &Config, codec: Codec) -> Self {
let local_public_key = p2p_config.keypair.public();
let local_peer_id = PeerId::from_public_key(&local_public_key);

Expand Down
16 changes: 8 additions & 8 deletions crates/services/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl From<[u8; 32]> for Checksum {
}

#[derive(Clone, Debug)]
pub struct P2PConfig<State = Initialized> {
pub struct Config<State = Initialized> {
/// The keypair used for for handshake during communication with other p2p nodes.
pub keypair: Keypair,

Expand Down Expand Up @@ -155,12 +155,12 @@ pub struct Initialized(());
#[derive(Clone, Debug)]
pub struct NotInitialized;

impl P2PConfig<NotInitialized> {
impl Config<NotInitialized> {
/// Inits the `P2PConfig` with some lazily loaded data.
pub fn init(self, mut genesis: Genesis) -> anyhow::Result<P2PConfig<Initialized>> {
pub fn init(self, mut genesis: Genesis) -> anyhow::Result<Config<Initialized>> {
use fuel_core_chain_config::GenesisCommitment;

Ok(P2PConfig {
Ok(Config {
keypair: self.keypair,
network_name: self.network_name,
checksum: genesis.root()?.into(),
Expand Down Expand Up @@ -198,7 +198,7 @@ pub fn convert_to_libp2p_keypair(
Ok(Keypair::Secp256k1(secret_key.into()))
}

impl P2PConfig<NotInitialized> {
impl Config<NotInitialized> {
pub fn default(network_name: &str) -> Self {
let keypair = Keypair::generate_secp256k1();

Expand Down Expand Up @@ -235,9 +235,9 @@ impl P2PConfig<NotInitialized> {
}

#[cfg(any(feature = "test-helpers", test))]
impl P2PConfig<Initialized> {
impl Config<Initialized> {
pub fn default_initialized(network_name: &str) -> Self {
P2PConfig::<NotInitialized>::default(network_name)
Config::<NotInitialized>::default(network_name)
.init(Default::default())
.expect("Expected correct initialization of config")
}
Expand All @@ -247,7 +247,7 @@ impl P2PConfig<Initialized> {
/// TCP/IP, Websocket
/// Noise as encryption layer
/// mplex or yamux for multiplexing
pub(crate) fn build_transport(p2p_config: &P2PConfig) -> Boxed<(PeerId, StreamMuxerBox)> {
pub(crate) fn build_transport(p2p_config: &Config) -> Boxed<(PeerId, StreamMuxerBox)> {
let transport = {
let generate_tcp_transport =
|| TokioTcpTransport::new(TcpConfig::new().port_reuse(true).nodelay(true));
Expand Down
Loading

0 comments on commit 09e9c25

Please sign in to comment.