diff --git a/Cargo.lock b/Cargo.lock index 863f0440a38..a47f79a0cc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2373,6 +2373,7 @@ dependencies = [ "ctor", "fuel-core-chain-config", "fuel-core-metrics", + "fuel-core-services", "fuel-core-storage", "fuel-core-types", "futures", diff --git a/bin/fuel-core/src/cli/run/p2p.rs b/bin/fuel-core/src/cli/run/p2p.rs index 833e34549d2..d23e030292c 100644 --- a/bin/fuel-core/src/cli/run/p2p.rs +++ b/bin/fuel-core/src/cli/run/p2p.rs @@ -3,8 +3,8 @@ use fuel_core::{ p2p::{ config::{ convert_to_libp2p_keypair, + Config, NotInitialized, - P2PConfig, }, gossipsub_config::default_gossipsub_builder, Multiaddr, @@ -136,7 +136,7 @@ pub struct P2PArgs { } impl P2PArgs { - pub fn into_config(self, metrics: bool) -> anyhow::Result> { + pub fn into_config(self, metrics: bool) -> anyhow::Result> { let local_keypair = { match self.keypair { Some(path) => { @@ -179,7 +179,7 @@ impl P2PArgs { Some(Duration::from_secs(self.random_walk)) }; - Ok(P2PConfig { + Ok(Config { keypair: local_keypair, network_name: self.network, checksum: Default::default(), diff --git a/crates/fuel-core/src/service/adapters.rs b/crates/fuel-core/src/service/adapters.rs index 2013d5c89ca..4017bc0c48e 100644 --- a/crates/fuel-core/src/service/adapters.rs +++ b/crates/fuel-core/src/service/adapters.rs @@ -1,20 +1,11 @@ use crate::{ database::Database, - service::{ - modules::TxPoolService, - Config, - }, + service::Config, }; -#[cfg(feature = "p2p")] -use fuel_core_p2p::service::Service as P2PService; -#[cfg(feature = "relayer")] -use fuel_core_relayer::RelayerSynced; +use fuel_core_txpool::service::SharedState as TxPoolSharedState; use fuel_core_types::blockchain::SealedBlock; use std::sync::Arc; -use tokio::{ - sync::broadcast::Sender, - task::JoinHandle, -}; +use tokio::sync::broadcast::Sender; pub mod poa; pub mod producer; @@ -30,11 +21,11 @@ pub struct BlockImportAdapter { } pub struct TxPoolAdapter { - service: TxPoolService, + service: TxPoolSharedState, } impl TxPoolAdapter { - pub fn new(service: TxPoolService) -> Self { + pub fn new(service: TxPoolSharedState) -> Self { Self { service } } } @@ -47,7 +38,7 @@ pub struct ExecutorAdapter { pub struct MaybeRelayerAdapter { pub database: Database, #[cfg(feature = "relayer")] - pub relayer_synced: Option, + pub relayer_synced: Option, } pub struct BlockProducerAdapter { @@ -57,7 +48,7 @@ pub struct BlockProducerAdapter { #[cfg(feature = "p2p")] #[derive(Clone)] pub struct P2PAdapter { - service: Arc, + service: fuel_core_p2p::service::SharedState, } #[cfg(not(feature = "p2p"))] @@ -66,17 +57,9 @@ pub struct P2PAdapter; #[cfg(feature = "p2p")] impl P2PAdapter { - pub fn new(service: Arc) -> Self { + pub fn new(service: fuel_core_p2p::service::SharedState) -> Self { Self { service } } - - pub async fn stop(&self) -> Option> { - self.service.stop().await - } - - pub async fn start(&self) -> anyhow::Result<()> { - self.service.start().await - } } #[cfg(not(feature = "p2p"))] @@ -84,14 +67,4 @@ impl P2PAdapter { pub fn new() -> Self { Default::default() } - - pub async fn stop(&self) -> Option> { - None - } - - pub async fn start(&self) -> anyhow::Result<()> { - Ok(()) - } } - -// TODO: Create generic `Service` type that support `start` and `stop`. diff --git a/crates/fuel-core/src/service/adapters/poa.rs b/crates/fuel-core/src/service/adapters/poa.rs index 36811d62028..e4a2886cefe 100644 --- a/crates/fuel-core/src/service/adapters/poa.rs +++ b/crates/fuel-core/src/service/adapters/poa.rs @@ -27,15 +27,15 @@ use fuel_core_types::{ impl TransactionPool for TxPoolAdapter { fn pending_number(&self) -> usize { - self.service.shared.pending_number() + self.service.pending_number() } fn total_consumable_gas(&self) -> u64 { - self.service.shared.total_consumable_gas() + self.service.total_consumable_gas() } fn remove_txs(&self, ids: Vec) -> Vec { - self.service.shared.remove_txs(ids) + self.service.remove_txs(ids) } fn transaction_status_events(&self) -> BoxStream { @@ -44,7 +44,7 @@ impl TransactionPool for TxPoolAdapter { StreamExt, }; Box::pin( - BroadcastStream::new(self.service.shared.tx_status_subscribe()) + BroadcastStream::new(self.service.tx_status_subscribe()) .filter_map(|result| result.ok()), ) } diff --git a/crates/fuel-core/src/service/adapters/producer.rs b/crates/fuel-core/src/service/adapters/producer.rs index 56d1736d73a..6fb81cbf8b9 100644 --- a/crates/fuel-core/src/service/adapters/producer.rs +++ b/crates/fuel-core/src/service/adapters/producer.rs @@ -35,7 +35,7 @@ impl TxPool for TxPoolAdapter { _block_height: BlockHeight, max_gas: u64, ) -> Vec { - self.service.shared.select_transactions(max_gas) + self.service.select_transactions(max_gas) } } diff --git a/crates/fuel-core/src/service/adapters/txpool.rs b/crates/fuel-core/src/service/adapters/txpool.rs index 46ecf884f73..d79577366b4 100644 --- a/crates/fuel-core/src/service/adapters/txpool.rs +++ b/crates/fuel-core/src/service/adapters/txpool.rs @@ -2,7 +2,6 @@ use crate::service::adapters::{ BlockImportAdapter, P2PAdapter, }; -use async_trait::async_trait; use fuel_core_services::stream::BoxStream; use fuel_core_txpool::ports::BlockImport; use fuel_core_types::{ @@ -35,7 +34,6 @@ impl BlockImport for BlockImportAdapter { } #[cfg(feature = "p2p")] -#[async_trait] impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter { type GossipedTransaction = TransactionGossipData; @@ -54,19 +52,17 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter { ) } - async fn notify_gossip_transaction_validity( + fn notify_gossip_transaction_validity( &self, message: &Self::GossipedTransaction, validity: GossipsubMessageAcceptance, - ) { + ) -> anyhow::Result<()> { self.service .notify_gossip_transaction_validity(message, validity) - .await; } } #[cfg(not(feature = "p2p"))] -#[async_trait] impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter { type GossipedTransaction = TransactionGossipData; @@ -81,11 +77,11 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter { Box::pin(fuel_core_services::stream::pending()) } - async fn notify_gossip_transaction_validity( + fn notify_gossip_transaction_validity( &self, _message: &Self::GossipedTransaction, _validity: GossipsubMessageAcceptance, - ) { - // no-op + ) -> anyhow::Result<()> { + Ok(()) } } diff --git a/crates/fuel-core/src/service/config.rs b/crates/fuel-core/src/service/config.rs index b08096ba619..96997bc3e8b 100644 --- a/crates/fuel-core/src/service/config.rs +++ b/crates/fuel-core/src/service/config.rs @@ -19,8 +19,8 @@ use strum_macros::{ #[cfg(feature = "p2p")] use fuel_core_p2p::config::{ + Config as P2PConfig, NotInitialized, - P2PConfig, }; #[derive(Clone, Debug)] diff --git a/crates/fuel-core/src/service/modules.rs b/crates/fuel-core/src/service/modules.rs index 41a75673750..e86718e1ec8 100644 --- a/crates/fuel-core/src/service/modules.rs +++ b/crates/fuel-core/src/service/modules.rs @@ -27,7 +27,9 @@ pub type PoAService = fuel_core_poa::Service; #[cfg(feature = "relayer")] pub type RelayerService = fuel_core_relayer::Service; -pub type TxPoolService = fuel_core_txpool::Service; +#[cfg(feature = "p2p")] +pub type P2PService = fuel_core_p2p::service::Service; +pub type TxPoolService = fuel_core_txpool::Service; pub struct Modules { pub txpool: TxPoolService, @@ -36,7 +38,7 @@ pub struct Modules { #[cfg(feature = "relayer")] pub relayer: Option, #[cfg(feature = "p2p")] - pub network_service: P2PAdapter, + pub network_service: P2PService, } impl Modules { @@ -44,7 +46,7 @@ impl Modules { self.consensus_module.stop_and_await().await.unwrap(); self.txpool.stop_and_await().await.unwrap(); #[cfg(feature = "p2p")] - self.network_service.stop().await; + self.network_service.stop_and_await().await.unwrap(); } } @@ -71,16 +73,15 @@ pub async fn start_modules( let genesis = p2p_db.get_genesis()?; let p2p_config = config.p2p.clone().init(genesis)?; - Arc::new(fuel_core_p2p::service::Service::new(p2p_config, p2p_db)) + fuel_core_p2p::service::new_service(p2p_config, p2p_db) }; #[cfg(feature = "p2p")] - let p2p_adapter = P2PAdapter::new(network_service); + let p2p_adapter = P2PAdapter::new(network_service.shared.clone()); #[cfg(not(feature = "p2p"))] let p2p_adapter = P2PAdapter::new(); let p2p_adapter = p2p_adapter; - p2p_adapter.start().await?; let importer_adapter = BlockImportAdapter::new(block_import_tx); @@ -89,7 +90,7 @@ pub async fn start_modules( database.clone(), TxStatusChange::new(100), importer_adapter.clone(), - p2p_adapter.clone(), + p2p_adapter, ); // restrict the max number of concurrent dry runs to the number of CPUs @@ -98,7 +99,7 @@ pub async fn start_modules( let block_producer = Arc::new(fuel_core_producer::Producer { config: config.block_producer.clone(), db: database.clone(), - txpool: Box::new(TxPoolAdapter::new(txpool_service.clone())), + txpool: Box::new(TxPoolAdapter::new(txpool_service.shared.clone())), executor: Arc::new(ExecutorAdapter { database: database.clone(), config: config.clone(), @@ -122,7 +123,7 @@ pub async fn start_modules( signing_key: config.consensus_key.clone(), metrics: false, }, - TxPoolAdapter::new(txpool_service.clone()), + TxPoolAdapter::new(txpool_service.shared.clone()), // TODO: Pass Importer importer_adapter.tx, BlockProducerAdapter { @@ -138,6 +139,8 @@ pub async fn start_modules( relayer.start().expect("Should start relayer") } txpool_service.start()?; + #[cfg(feature = "p2p")] + network_service.start()?; Ok(Modules { txpool: txpool_service, @@ -146,6 +149,6 @@ pub async fn start_modules( #[cfg(feature = "relayer")] relayer, #[cfg(feature = "p2p")] - network_service: p2p_adapter, + network_service, }) } diff --git a/crates/services/p2p/Cargo.toml b/crates/services/p2p/Cargo.toml index 28ce04e69f7..f9b03b9e394 100644 --- a/crates/services/p2p/Cargo.toml +++ b/crates/services/p2p/Cargo.toml @@ -16,6 +16,7 @@ async-trait = "0.1" bincode = "1.3" fuel-core-chain-config = { path = "../../chain-config", version = "0.15.1" } fuel-core-metrics = { path = "../../metrics", version = "0.15.1" } # TODO make this a feature +fuel-core-services = { path = "../../services", version = "0.15.1" } fuel-core-storage = { path = "../../storage", version = "0.15.1" } fuel-core-types = { path = "../../types", features = [ "serde", diff --git a/crates/services/p2p/src/behavior.rs b/crates/services/p2p/src/behavior.rs index b0e9a67fa53..1becf42d4f9 100644 --- a/crates/services/p2p/src/behavior.rs +++ b/crates/services/p2p/src/behavior.rs @@ -1,6 +1,6 @@ use crate::{ codecs::NetworkCodec, - config::P2PConfig, + config::Config, discovery::{ DiscoveryBehaviour, DiscoveryConfig, @@ -71,7 +71,7 @@ pub struct FuelBehaviour { } impl FuelBehaviour { - pub fn new(p2p_config: &P2PConfig, codec: Codec) -> Self { + pub fn new(p2p_config: &Config, codec: Codec) -> Self { let local_public_key = p2p_config.keypair.public(); let local_peer_id = PeerId::from_public_key(&local_public_key); diff --git a/crates/services/p2p/src/config.rs b/crates/services/p2p/src/config.rs index 201f881223e..8268633eb3a 100644 --- a/crates/services/p2p/src/config.rs +++ b/crates/services/p2p/src/config.rs @@ -85,7 +85,7 @@ impl From<[u8; 32]> for Checksum { } #[derive(Clone, Debug)] -pub struct P2PConfig { +pub struct Config { /// The keypair used for for handshake during communication with other p2p nodes. pub keypair: Keypair, @@ -155,12 +155,12 @@ pub struct Initialized(()); #[derive(Clone, Debug)] pub struct NotInitialized; -impl P2PConfig { +impl Config { /// Inits the `P2PConfig` with some lazily loaded data. - pub fn init(self, mut genesis: Genesis) -> anyhow::Result> { + pub fn init(self, mut genesis: Genesis) -> anyhow::Result> { use fuel_core_chain_config::GenesisCommitment; - Ok(P2PConfig { + Ok(Config { keypair: self.keypair, network_name: self.network_name, checksum: genesis.root()?.into(), @@ -198,7 +198,7 @@ pub fn convert_to_libp2p_keypair( Ok(Keypair::Secp256k1(secret_key.into())) } -impl P2PConfig { +impl Config { pub fn default(network_name: &str) -> Self { let keypair = Keypair::generate_secp256k1(); @@ -235,9 +235,9 @@ impl P2PConfig { } #[cfg(any(feature = "test-helpers", test))] -impl P2PConfig { +impl Config { pub fn default_initialized(network_name: &str) -> Self { - P2PConfig::::default(network_name) + Config::::default(network_name) .init(Default::default()) .expect("Expected correct initialization of config") } @@ -247,7 +247,7 @@ impl P2PConfig { /// TCP/IP, Websocket /// Noise as encryption layer /// mplex or yamux for multiplexing -pub(crate) fn build_transport(p2p_config: &P2PConfig) -> Boxed<(PeerId, StreamMuxerBox)> { +pub(crate) fn build_transport(p2p_config: &Config) -> Boxed<(PeerId, StreamMuxerBox)> { let transport = { let generate_tcp_transport = || TokioTcpTransport::new(TcpConfig::new().port_reuse(true).nodelay(true)); diff --git a/crates/services/p2p/src/gossipsub/config.rs b/crates/services/p2p/src/gossipsub/config.rs index 6b4c1273bf7..db43b5ffeae 100644 --- a/crates/services/p2p/src/gossipsub/config.rs +++ b/crates/services/p2p/src/gossipsub/config.rs @@ -1,4 +1,4 @@ -use crate::config::P2PConfig; +use crate::config::Config; use fuel_core_metrics::p2p_metrics::P2P_METRICS; use libp2p::gossipsub::{ metrics::Config as MetricsConfig, @@ -52,7 +52,7 @@ pub(crate) fn default_gossipsub_config() -> GossipsubConfig { } /// Given a `P2pConfig` containing `GossipsubConfig` creates a Gossipsub Behaviour -pub(crate) fn build_gossipsub_behaviour(p2p_config: &P2PConfig) -> Gossipsub { +pub(crate) fn build_gossipsub_behaviour(p2p_config: &Config) -> Gossipsub { if p2p_config.metrics { // Move to Metrics related feature flag let mut p2p_registry = Registry::default(); diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 35d323ac89c..8b2d3ede36a 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -6,7 +6,7 @@ use crate::{ codecs::NetworkCodec, config::{ build_transport, - P2PConfig, + Config, }, discovery::DiscoveryEvent, gossipsub::{ @@ -70,6 +70,13 @@ use tracing::{ pub struct FuelP2PService { /// Store the local peer id pub local_peer_id: PeerId, + + /// IP address for Swarm to listen on + local_address: std::net::IpAddr, + + /// The TCP port that Swarm listens on + tcp_port: u16, + /// Swarm handler for FuelBehaviour swarm: Swarm>, @@ -118,7 +125,7 @@ pub enum FuelP2PEvent { } impl FuelP2PService { - pub fn new(config: P2PConfig, codec: Codec) -> anyhow::Result { + pub fn new(config: Config, codec: Codec) -> Self { let local_peer_id = PeerId::from(config.keypair.public()); // configure and build P2P Service @@ -128,22 +135,12 @@ impl FuelP2PService { SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id) .build(); - // set up node's address to listen on - let listen_multiaddr = { - let mut m = Multiaddr::from(config.address); - m.push(Protocol::Tcp(config.tcp_port)); - m - }; - // subscribe to gossipsub topics with the network name suffix for topic in config.topics { let t = Topic::new(format!("{}/{}", topic, config.network_name)); swarm.behaviour_mut().subscribe_to_topic(&t).unwrap(); } - // start listening at the given address - swarm.listen_on(listen_multiaddr)?; - let gossipsub_topics = GossipsubTopics::new(&config.network_name); let network_metadata = NetworkMetadata { gossipsub_topics }; @@ -153,15 +150,30 @@ impl FuelP2PService { let _ = swarm.add_external_address(public_address, AddressScore::Infinite); } - Ok(Self { + Self { local_peer_id, + local_address: config.address, + tcp_port: config.tcp_port, swarm, network_codec: codec, outbound_requests_table: HashMap::default(), inbound_requests_table: HashMap::default(), network_metadata, metrics, - }) + } + } + + pub fn start(&mut self) -> anyhow::Result<()> { + // set up node's address to listen on + let listen_multiaddr = { + let mut m = Multiaddr::from(self.local_address); + m.push(Protocol::Tcp(self.tcp_port)); + m + }; + + // start listening at the given address + self.swarm.listen_on(listen_multiaddr)?; + Ok(()) } pub fn get_peers_info(&self) -> &HashMap { @@ -279,6 +291,8 @@ impl FuelP2PService { /// Handles P2P Events. /// Returns only events that are of interest to the Network Orchestrator. pub async fn next_event(&mut self) -> Option { + // TODO: add handling for when the stream closes and return None only when there are no + // more events to consume if let SwarmEvent::Behaviour(fuel_behaviour) = self.swarm.select_next_some().await { self.handle_behaviour_event(fuel_behaviour) @@ -438,7 +452,7 @@ mod tests { use super::FuelP2PService; use crate::{ codecs::bincode::BincodeCodec, - config::P2PConfig, + config::Config, gossipsub::{ messages::{ GossipsubBroadcastRequest, @@ -532,13 +546,14 @@ mod tests { } /// helper function for building FuelP2PService - fn build_service_from_config( - mut p2p_config: P2PConfig, - ) -> FuelP2PService { + fn build_service_from_config(mut p2p_config: Config) -> FuelP2PService { p2p_config.keypair = Keypair::generate_secp256k1(); // change keypair for each Node let max_block_size = p2p_config.max_block_size; - FuelP2PService::new(p2p_config, BincodeCodec::new(max_block_size)).unwrap() + let mut service = + FuelP2PService::new(p2p_config, BincodeCodec::new(max_block_size)); + service.start().unwrap(); + service } /// returns a free tcp port number for a node to listen on @@ -583,24 +598,23 @@ mod tests { } /// Combines `NodeData` with `P2pConfig` to create a `FuelP2PService` - fn create_service( - &self, - mut p2p_config: P2PConfig, - ) -> FuelP2PService { + fn create_service(&self, mut p2p_config: Config) -> FuelP2PService { let max_block_size = p2p_config.max_block_size; p2p_config.tcp_port = self.tcp_port; p2p_config.keypair = self.keypair.clone(); - FuelP2PService::new(p2p_config, BincodeCodec::new(max_block_size)).unwrap() + let mut service = + FuelP2PService::new(p2p_config, BincodeCodec::new(max_block_size)); + service.start().unwrap(); + service } } #[tokio::test] #[instrument] async fn p2p_service_works() { - let mut fuel_p2p_service = build_service_from_config( - P2PConfig::default_initialized("p2p_service_works"), - ); + let mut fuel_p2p_service = + build_service_from_config(Config::default_initialized("p2p_service_works")); loop { match fuel_p2p_service.swarm.select_next_some().await { @@ -625,7 +639,7 @@ mod tests { let reserved_nodes_size = 4; let double_reserved_nodes_size = reserved_nodes_size * 2; - let mut p2p_config = P2PConfig::default_initialized("sentry_nodes_working"); + let mut p2p_config = Config::default_initialized("sentry_nodes_working"); // enable mdns for faster discovery of nodes p2p_config.enable_mdns = true; @@ -742,7 +756,7 @@ mod tests { #[instrument] async fn nodes_connected_via_mdns() { // Node A - let mut p2p_config = P2PConfig::default_initialized("nodes_connected_via_mdns"); + let mut p2p_config = Config::default_initialized("nodes_connected_via_mdns"); p2p_config.enable_mdns = true; let mut node_a = build_service_from_config(p2p_config.clone()); @@ -776,9 +790,8 @@ mod tests { TransportError, }; // Node A - let mut p2p_config = P2PConfig::default_initialized( - "nodes_cannot_connect_due_to_different_checksum", - ); + let mut p2p_config = + Config::default_initialized("nodes_cannot_connect_due_to_different_checksum"); p2p_config.enable_mdns = true; let mut node_a = build_service_from_config(p2p_config.clone()); @@ -815,8 +828,7 @@ mod tests { #[instrument] async fn nodes_connected_via_identify() { // Node A - let mut p2p_config = - P2PConfig::default_initialized("nodes_connected_via_identify"); + let mut p2p_config = Config::default_initialized("nodes_connected_via_identify"); let node_a_data = NodeData::random(); let mut node_a = node_a_data.create_service(p2p_config.clone()); @@ -855,7 +867,7 @@ mod tests { #[tokio::test] #[instrument] async fn peer_info_updates_work() { - let mut p2p_config = P2PConfig::default_initialized("peer_info_updates_work"); + let mut p2p_config = Config::default_initialized("peer_info_updates_work"); // Node A let node_a_data = NodeData::random(); @@ -918,8 +930,7 @@ mod tests { /// Reusable helper function for Broadcasting Gossipsub requests async fn gossipsub_broadcast(broadcast_request: GossipsubBroadcastRequest) { - let mut p2p_config = - P2PConfig::default_initialized("gossipsub_exchanges_messages"); + let mut p2p_config = Config::default_initialized("gossipsub_exchanges_messages"); let selected_topic: GossipTopic = { let topic = match broadcast_request { @@ -1006,7 +1017,7 @@ mod tests { async fn request_response_works() { use fuel_core_types::fuel_tx::Transaction; - let mut p2p_config = P2PConfig::default_initialized("request_response_works"); + let mut p2p_config = Config::default_initialized("request_response_works"); // Node A let node_a_data = NodeData::random(); @@ -1082,7 +1093,7 @@ mod tests { #[instrument] async fn req_res_outbound_timeout_works() { let mut p2p_config = - P2PConfig::default_initialized("req_res_outbound_timeout_works"); + Config::default_initialized("req_res_outbound_timeout_works"); // Node A // setup request timeout to 0 in order for the Request to fail diff --git a/crates/services/p2p/src/peer_info.rs b/crates/services/p2p/src/peer_info.rs index a6bd1306e71..25a77bfb984 100644 --- a/crates/services/p2p/src/peer_info.rs +++ b/crates/services/p2p/src/peer_info.rs @@ -1,4 +1,4 @@ -use crate::config::P2PConfig; +use crate::config::Config; use libp2p::{ core::{ connection::ConnectionId, @@ -91,7 +91,7 @@ pub struct PeerInfoBehaviour { } impl PeerInfoBehaviour { - pub fn new(local_public_key: PublicKey, config: &P2PConfig) -> Self { + pub fn new(local_public_key: PublicKey, config: &Config) -> Self { let identify = { let identify_config = IdentifyConfig::new("/fuel/1.0".to_string(), local_public_key); diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 7bcc7f4b376..0a7bb5ec97b 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -1,4 +1,29 @@ +use crate::{ + codecs::{ + bincode::BincodeCodec, + NetworkCodec, + }, + config::Config, + gossipsub::messages::{ + GossipsubBroadcastRequest, + GossipsubMessage, + }, + p2p_service::{ + FuelP2PEvent, + FuelP2PService, + }, + ports::P2pDb, + request_response::messages::{ + OutboundResponse, + RequestMessage, + ResponseChannelItem, + }, +}; use anyhow::anyhow; +use fuel_core_services::{ + RunnableService, + ServiceRunner, +}; use fuel_core_types::{ blockchain::{ block::Block, @@ -18,25 +43,14 @@ use libp2p::{ request_response::RequestId, PeerId, }; -use libp2p_gossipsub::{ - error::PublishError, - MessageId, -}; use std::{ fmt::Debug, sync::Arc, }; -use tokio::{ - sync::{ - broadcast, - mpsc::{ - Receiver, - Sender, - }, - oneshot, - Mutex, - }, - task::JoinHandle, +use tokio::sync::{ + broadcast, + mpsc, + oneshot, }; use tracing::{ debug, @@ -44,330 +58,253 @@ use tracing::{ warn, }; -use crate::{ - codecs::{ - bincode::BincodeCodec, - NetworkCodec, - }, - config::P2PConfig, - gossipsub::messages::{ - GossipsubBroadcastRequest, - GossipsubMessage, - }, - p2p_service::{ - FuelP2PEvent, - FuelP2PService, - }, - ports::P2pDb, - request_response::messages::{ - OutboundResponse, - RequestMessage, - ResponseChannelItem, - }, -}; - -/// Orchestrates various p2p-related events between the inner `P2pService` -/// and the top level `NetworkService`. -struct Task { - p2p_config: P2PConfig, - db: Arc, - /// Receive internal Orchestrator Requests - rx_orchestrator_request: Receiver, - /// Generate internal Orchestrator Requests - tx_orchestrator_request: Sender, - tx_broadcast: Arc>, -} +pub type Service = ServiceRunner>; -type BroadcastResponse = Result; - -enum OrchestratorRequest { +enum TaskRequest { // Broadcast requests to p2p network BroadcastTransaction(Arc), - BroadcastBlock((Arc, oneshot::Sender)), - BroadcastVote((Arc, oneshot::Sender)), + BroadcastBlock(Arc), + BroadcastVote(Arc), // Request to get one-off data from p2p network - GetPeersIds(oneshot::Sender>), + GetPeerIds(oneshot::Sender>), GetBlock((BlockHeight, oneshot::Sender)), // Responds back to the p2p network RespondWithGossipsubMessageReport((GossipsubMessageInfo, GossipsubMessageAcceptance)), RespondWithRequestedBlock((Option>, RequestId)), - // Request to Stop the Network Orchestrator / Service - Stop, } -impl Debug for OrchestratorRequest { +impl Debug for TaskRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "OrchestratorRequest") + write!(f, "TaskRequest") } } -impl Task { - fn new( - p2p_config: P2PConfig, - db: Arc, - orchestrator_request_channels: ( - Receiver, - Sender, - ), - ) -> Self { - let (rx_orchestrator_request, tx_orchestrator_request) = - orchestrator_request_channels; +/// Orchestrates various p2p-related events between the inner `P2pService` +/// and the top level `NetworkService`. +pub struct Task { + p2p_service: FuelP2PService, + db: Arc, + /// Receive internal Task Requests + request_receiver: mpsc::Receiver, + shared: SharedState, +} + +impl Task { + pub fn new(config: Config, db: Arc) -> Self { + let (request_sender, request_receiver) = mpsc::channel(100); let (tx_broadcast, _) = broadcast::channel(100); + let max_block_size = config.max_block_size; + let p2p_service = FuelP2PService::new(config, BincodeCodec::new(max_block_size)); Self { - p2p_config, + p2p_service, db, - rx_orchestrator_request, - tx_orchestrator_request, - tx_broadcast: Arc::new(tx_broadcast), + request_receiver, + shared: SharedState { + request_sender, + tx_broadcast, + }, } } +} - pub async fn run(mut self) -> anyhow::Result { - let mut p2p_service = FuelP2PService::new( - self.p2p_config.clone(), - BincodeCodec::new(self.p2p_config.max_block_size), - )?; - - loop { - tokio::select! { - next_service_request = self.rx_orchestrator_request.recv() => { - match next_service_request { - Some(OrchestratorRequest::BroadcastTransaction(transaction)) => { - let broadcast = GossipsubBroadcastRequest::NewTx(transaction); - let result = p2p_service.publish_message(broadcast); - if let Err(e) = result { - tracing::error!("Got an error during transaction broadcasting {}", e); - } - } - Some(OrchestratorRequest::BroadcastBlock((block, sender))) => { - let broadcast = GossipsubBroadcastRequest::NewBlock(block); - let _ = sender.send(p2p_service.publish_message(broadcast)); - } - Some(OrchestratorRequest::BroadcastVote((vote, sender))) => { - let broadcast = GossipsubBroadcastRequest::ConsensusVote(vote); - let _ = sender.send(p2p_service.publish_message(broadcast)); - } - Some(OrchestratorRequest::GetPeersIds(channel)) => { - let _ = channel.send(p2p_service.get_peers_ids()); - } - Some(OrchestratorRequest::GetBlock((height, response))) => { - let request_msg = RequestMessage::RequestBlock(height); - let channel_item = ResponseChannelItem::ResponseBlock(response); - let _ = p2p_service.send_request_msg(None, request_msg, channel_item); +#[async_trait::async_trait] +impl RunnableService for Task +where + D: P2pDb + 'static, +{ + const NAME: &'static str = "P2P"; + + type SharedData = SharedState; + + fn shared_data(&self) -> Self::SharedData { + self.shared.clone() + } + + async fn initialize(&mut self) -> anyhow::Result<()> { + self.p2p_service.start() + } + + async fn run(&mut self) -> anyhow::Result { + tokio::select! { + // TODO: Maybe we want to use `biased;` to first process requests asked by us. + next_service_request = self.request_receiver.recv() => { + match next_service_request { + Some(TaskRequest::BroadcastTransaction(transaction)) => { + let broadcast = GossipsubBroadcastRequest::NewTx(transaction); + let result = self.p2p_service.publish_message(broadcast); + if let Err(e) = result { + tracing::error!("Got an error during transaction broadcasting {}", e); } - Some(OrchestratorRequest::RespondWithGossipsubMessageReport((message, acceptance))) => { - report_message(message, acceptance, &mut p2p_service); + } + Some(TaskRequest::BroadcastBlock(block)) => { + let broadcast = GossipsubBroadcastRequest::NewBlock(block); + let result = self.p2p_service.publish_message(broadcast); + if let Err(e) = result { + tracing::error!("Got an error during block broadcasting {}", e); } - Some(OrchestratorRequest::RespondWithRequestedBlock((response, request_id))) => { - let _ = p2p_service.send_response_msg(request_id, response.map(OutboundResponse::ResponseBlock)); + } + Some(TaskRequest::BroadcastVote(vote)) => { + let broadcast = GossipsubBroadcastRequest::ConsensusVote(vote); + let result = self.p2p_service.publish_message(broadcast); + if let Err(e) = result { + tracing::error!("Got an error during vote broadcasting {}", e); } - Some(OrchestratorRequest::Stop) => break, - None => {} } - } - p2p_event = p2p_service.next_event() => { - match p2p_event { - Some(FuelP2PEvent::GossipsubMessage { message, message_id, peer_id,.. }) => { - let message_id = message_id.0; - - match message { - GossipsubMessage::NewTx(transaction) => { - let next_transaction = GossipData::new(transaction, peer_id, message_id); - let _ = self.tx_broadcast.send(next_transaction); - }, - GossipsubMessage::NewBlock(block) => { - // todo: add logic to gossip newly received blocks - let _new_block = GossipData::new(block, peer_id, message_id); - }, - GossipsubMessage::ConsensusVote(vote) => { - // todo: add logic to gossip newly received votes - let _new_vote = GossipData::new(vote, peer_id, message_id); - }, - } - }, - Some(FuelP2PEvent::RequestMessage { request_message, request_id }) => { - match request_message { - RequestMessage::RequestBlock(block_height) => { - let db = self.db.clone(); - let tx_orchestrator_request = self.tx_orchestrator_request.clone(); - - tokio::spawn(async move { - // TODO: Process `StorageError` somehow. - let block_response = db.get_sealed_block(block_height) - .await - .expect("Didn't expect error from database") - .map(Arc::new); - let _ = tx_orchestrator_request.send(OrchestratorRequest::RespondWithRequestedBlock((block_response, request_id))); - }); - } - } - }, - _ => {} + Some(TaskRequest::GetPeerIds(channel)) => { + let _ = channel.send(self.p2p_service.get_peers_ids()); + } + Some(TaskRequest::GetBlock((height, response))) => { + let request_msg = RequestMessage::RequestBlock(height); + let channel_item = ResponseChannelItem::ResponseBlock(response); + let _ = self.p2p_service.send_request_msg(None, request_msg, channel_item); + } + Some(TaskRequest::RespondWithGossipsubMessageReport((message, acceptance))) => { + report_message(&mut self.p2p_service, message, acceptance); + } + Some(TaskRequest::RespondWithRequestedBlock((response, request_id))) => { + let _ = self.p2p_service.send_response_msg(request_id, response.map(OutboundResponse::ResponseBlock)); } - }, + None => { + unreachable!("The `Task` is holder of the `Sender`, so it should not be possible"); + } + } } + p2p_event = self.p2p_service.next_event() => { + match p2p_event { + Some(FuelP2PEvent::GossipsubMessage { message, message_id, peer_id,.. }) => { + let message_id = message_id.0; + + match message { + GossipsubMessage::NewTx(transaction) => { + let next_transaction = GossipData::new(transaction, peer_id, message_id); + let _ = self.shared.tx_broadcast.send(next_transaction); + }, + GossipsubMessage::NewBlock(block) => { + // todo: add logic to gossip newly received blocks + let _new_block = GossipData::new(block, peer_id, message_id); + }, + GossipsubMessage::ConsensusVote(vote) => { + // todo: add logic to gossip newly received votes + let _new_vote = GossipData::new(vote, peer_id, message_id); + }, + } + }, + Some(FuelP2PEvent::RequestMessage { request_message, request_id }) => { + match request_message { + RequestMessage::RequestBlock(block_height) => { + let db = self.db.clone(); + let request_sender = self.shared.request_sender.clone(); + + tokio::spawn(async move { + // TODO: Process `StorageError` somehow. + let block_response = db.get_sealed_block(block_height) + .await + .expect("Didn't expect error from database") + .map(Arc::new); + let _ = request_sender.send( + TaskRequest::RespondWithRequestedBlock( + (block_response, request_id) + ) + ); + }); + } + } + }, + _ => {} + } + }, } - Ok(self) - } - - pub fn sender(&self) -> Arc> { - self.tx_broadcast.clone() + Ok(true /* should_continue */) } } -pub struct Service { - /// Network Orchestrator that handles p2p network and inter-module communication - network_orchestrator: Arc>>, - /// Holds the spawned task when Netowrk Orchestrator is started - join: Mutex>>>, - /// Used for communicating with the Orchestrator - tx_orchestrator_request: Sender, +#[derive(Clone)] +pub struct SharedState { /// Sender of p2p transaction used for subscribing. - tx_broadcast: Arc>, + tx_broadcast: broadcast::Sender, + /// Used for communicating with the `Task`. + request_sender: mpsc::Sender, } -impl Service { - pub fn new(p2p_config: P2PConfig, db: D) -> Self - where - D: P2pDb + 'static, - { - let (tx_orchestrator_request, rx_orchestrator_request) = - tokio::sync::mpsc::channel(100); - - let network_orchestrator = Task::new( - p2p_config, - Arc::new(db), - (rx_orchestrator_request, tx_orchestrator_request.clone()), - ); - let tx_broadcast = network_orchestrator.sender(); - - Self { - join: Mutex::new(None), - network_orchestrator: Arc::new(Mutex::new(Some(network_orchestrator))), - tx_orchestrator_request, - tx_broadcast, - } - } - - pub async fn notify_gossip_transaction_validity<'a, T>( +impl SharedState { + pub fn notify_gossip_transaction_validity<'a, T>( &self, message: &'a T, acceptance: GossipsubMessageAcceptance, - ) where + ) -> anyhow::Result<()> + where GossipsubMessageInfo: From<&'a T>, { let msg_info = message.into(); - let _ = self - .tx_orchestrator_request - .send(OrchestratorRequest::RespondWithGossipsubMessageReport(( + self.request_sender + .try_send(TaskRequest::RespondWithGossipsubMessageReport(( msg_info, acceptance, - ))) - .await; + )))?; + Ok(()) } pub async fn get_block(&self, height: BlockHeight) -> anyhow::Result { let (sender, receiver) = oneshot::channel(); - self.tx_orchestrator_request - .send(OrchestratorRequest::GetBlock((height, sender))) + self.request_sender + .send(TaskRequest::GetBlock((height, sender))) .await?; receiver.await.map_err(|e| anyhow!("{}", e)) } - pub async fn broadcast_vote(&self, vote: Arc) -> anyhow::Result<()> { - let (sender, receiver) = oneshot::channel(); - - self.tx_orchestrator_request - .send(OrchestratorRequest::BroadcastVote((vote, sender))) - .await?; + pub fn broadcast_vote(&self, vote: Arc) -> anyhow::Result<()> { + self.request_sender + .try_send(TaskRequest::BroadcastVote(vote))?; - receiver.await?.map(|_| ()).map_err(|e| anyhow!("{}", e)) + Ok(()) } - pub async fn broadcast_block(&self, block: Arc) -> anyhow::Result<()> { - let (sender, receiver) = oneshot::channel(); - - self.tx_orchestrator_request - .send(OrchestratorRequest::BroadcastBlock((block, sender))) - .await?; + pub fn broadcast_block(&self, block: Arc) -> anyhow::Result<()> { + self.request_sender + .try_send(TaskRequest::BroadcastBlock(block))?; - receiver.await?.map(|_| ()).map_err(|e| anyhow!("{}", e)) + Ok(()) } pub fn broadcast_transaction( &self, transaction: Arc, ) -> anyhow::Result<()> { - let request = self.tx_orchestrator_request.clone(); - // TODO: Fix me - tokio::spawn(async move { - let _ = request - .send(OrchestratorRequest::BroadcastTransaction(transaction)) - .await; - }); + self.request_sender + .try_send(TaskRequest::BroadcastTransaction(transaction))?; Ok(()) } - pub async fn get_peers_ids(&self) -> anyhow::Result> { + pub async fn get_peer_ids(&self) -> anyhow::Result> { let (sender, receiver) = oneshot::channel(); - self.tx_orchestrator_request - .send(OrchestratorRequest::GetPeersIds(sender)) + self.request_sender + .send(TaskRequest::GetPeerIds(sender)) .await?; receiver.await.map_err(|e| anyhow!("{}", e)) } - pub async fn start(&self) -> anyhow::Result<()> { - let mut join = self.join.lock().await; - - if join.is_none() { - if let Some(network_orchestrator) = - self.network_orchestrator.lock().await.take() - { - *join = Some(tokio::spawn(async { network_orchestrator.run().await })); - - Ok(()) - } else { - Err(anyhow!("Starting Network Orchestrator that is stopping")) - } - } else { - Err(anyhow!("Network Orchestrator already started")) - } - } - - pub async fn stop(&self) -> Option> { - let join_handle = self.join.lock().await.take(); - - if let Some(join_handle) = join_handle { - let network_orchestrator = self.network_orchestrator.clone(); - let _ = self - .tx_orchestrator_request - .send(OrchestratorRequest::Stop) - .await; - Some(tokio::spawn(async move { - if let Ok(res) = join_handle.await { - *network_orchestrator.lock().await = res.ok(); - } - })) - } else { - None - } - } - pub fn subscribe_tx(&self) -> broadcast::Receiver { self.tx_broadcast.subscribe() } } +pub fn new_service(p2p_config: Config, db: D) -> Service +where + D: P2pDb + 'static, +{ + Service::new(Task::new(p2p_config, Arc::new(db))) +} + fn report_message( + p2p_service: &mut FuelP2PService, message: GossipsubMessageInfo, acceptance: GossipsubMessageAcceptance, - p2p_service: &mut FuelP2PService, ) { let GossipsubMessageInfo { peer_id, @@ -426,6 +363,7 @@ pub mod tests { use super::*; use async_trait::async_trait; + use fuel_core_services::Service; use fuel_core_storage::Result as StorageResult; use fuel_core_types::blockchain::{ block::Block, @@ -460,17 +398,13 @@ pub mod tests { #[tokio::test] async fn start_stop_works() { - let p2p_config = P2PConfig::default_initialized("start_stop_works"); - let service = Service::new(p2p_config, FakeDb); + let p2p_config = Config::default_initialized("start_stop_works"); + let service = new_service(p2p_config, FakeDb); // Node with p2p service started - assert!(service.start().await.is_ok()); + assert!(service.start().is_ok()); sleep(Duration::from_secs(1)).await; // Node with p2p service stopped - assert!(service.stop().await.is_some()); - sleep(Duration::from_secs(1)).await; - - // Node with p2p service successfully restarted - assert!(service.start().await.is_ok()); + assert!(service.stop_and_await().await.unwrap().stopped()); } } diff --git a/crates/services/txpool/src/containers/dependency.rs b/crates/services/txpool/src/containers/dependency.rs index 3498aad8692..3414465104d 100644 --- a/crates/services/txpool/src/containers/dependency.rs +++ b/crates/services/txpool/src/containers/dependency.rs @@ -517,12 +517,15 @@ impl Dependency { /// insert tx inside dependency /// return list of transactions that are removed from txpool - pub(crate) fn insert<'a>( + pub(crate) fn insert<'a, DB>( &'a mut self, txs: &'a HashMap, - db: &dyn TxPoolDb, + db: &DB, tx: &'a ArcPoolTx, - ) -> anyhow::Result> { + ) -> anyhow::Result> + where + DB: TxPoolDb, + { let (max_depth, db_coins, db_contracts, db_messages, collided) = self.check_for_collision(txs, db, tx)?; diff --git a/crates/services/txpool/src/mock_db.rs b/crates/services/txpool/src/mock_db.rs index 13c2c326bac..a1961a13fb3 100644 --- a/crates/services/txpool/src/mock_db.rs +++ b/crates/services/txpool/src/mock_db.rs @@ -34,11 +34,11 @@ pub struct MockDb { } impl MockDb { - pub fn insert_coin(&mut self, id: UtxoId, coin: Coin) { + pub fn insert_coin(&self, id: UtxoId, coin: Coin) { self.data.lock().unwrap().coins.insert(id, coin); } - pub fn insert_message(&mut self, message: Message) { + pub fn insert_message(&self, message: Message) { self.data .lock() .unwrap() diff --git a/crates/services/txpool/src/ports.rs b/crates/services/txpool/src/ports.rs index 2e144916f7b..0a6c8a222ac 100644 --- a/crates/services/txpool/src/ports.rs +++ b/crates/services/txpool/src/ports.rs @@ -24,7 +24,6 @@ use fuel_core_types::{ }; use std::sync::Arc; -#[async_trait::async_trait] pub trait PeerToPeer: Send + Sync { type GossipedTransaction: NetworkData; @@ -35,11 +34,11 @@ pub trait PeerToPeer: Send + Sync { fn gossiped_transaction_events(&self) -> BoxStream; // Report the validity of a transaction received from the network. - async fn notify_gossip_transaction_validity( + fn notify_gossip_transaction_validity( &self, message: &Self::GossipedTransaction, validity: GossipsubMessageAcceptance, - ); + ) -> anyhow::Result<()>; } pub trait BlockImport: Send + Sync { diff --git a/crates/services/txpool/src/service.rs b/crates/services/txpool/src/service.rs index 8556af20fd4..222859bd8a9 100644 --- a/crates/services/txpool/src/service.rs +++ b/crates/services/txpool/src/service.rs @@ -39,7 +39,7 @@ use std::sync::Arc; use tokio::sync::broadcast; use tokio_stream::StreamExt; -pub type Service = ServiceRunner>; +pub type Service = ServiceRunner>; #[derive(Clone)] pub struct TxStatusChange { @@ -79,17 +79,15 @@ impl TxStatusChange { } } -pub struct SharedState { - db: Arc, +pub struct SharedState { tx_status_sender: TxStatusChange, - txpool: Arc>, + txpool: Arc>>, p2p: Arc, } -impl Clone for SharedState { +impl Clone for SharedState { fn clone(&self) -> Self { Self { - db: self.db.clone(), tx_status_sender: self.tx_status_sender.clone(), txpool: self.txpool.clone(), p2p: self.p2p.clone(), @@ -97,20 +95,21 @@ impl Clone for SharedState { } } -pub struct Task { +pub struct Task { gossiped_tx_stream: BoxStream, committed_block_stream: BoxStream, - shared: SharedState, + shared: SharedState, } #[async_trait::async_trait] -impl RunnableService for Task +impl RunnableService for Task where P2P: Send + Sync, + DB: TxPoolDb, { const NAME: &'static str = "TxPool"; - type SharedData = SharedState; + type SharedData = SharedState; fn shared_data(&self) -> Self::SharedData { self.shared.clone() @@ -126,7 +125,6 @@ where if let Some(GossipData { data: Some(tx), .. }) = new_transaction { let txs = vec!(Arc::new(tx)); self.shared.txpool.lock().insert( - self.shared.db.as_ref(), &self.shared.tx_status_sender, &txs ); @@ -153,7 +151,10 @@ where // Instead, `fuel-core` can create a `DatabaseWithTxPool` that aggregates `TxPool` and // storage `Database` together. GraphQL will retrieve data from this `DatabaseWithTxPool` via // `StorageInspect` trait. -impl SharedState { +impl SharedState +where + DB: TxPoolDb, +{ pub fn pending_number(&self) -> usize { self.txpool.lock().pending_number() } @@ -202,24 +203,28 @@ impl SharedState { } } -impl SharedState +impl SharedState where P2P: PeerToPeer, + DB: TxPoolDb, { pub fn insert( &self, txs: Vec>, ) -> Vec> { - let insert = { - self.txpool - .lock() - .insert(self.db.as_ref(), &self.tx_status_sender, &txs) - }; + let insert = { self.txpool.lock().insert(&self.tx_status_sender, &txs) }; for (ret, tx) in insert.iter().zip(txs.into_iter()) { match ret { Ok(_) => { - let _ = self.p2p.broadcast_transaction(tx.clone()); + let result = self.p2p.broadcast_transaction(tx.clone()); + if let Err(e) = result { + // It can be only in the case of p2p being down or requests overloading it. + tracing::error!( + "Unable to broadcast transaction, got an {} error", + e + ); + } } Err(_) => {} } @@ -268,7 +273,7 @@ pub fn new_service( tx_status_sender: TxStatusChange, importer: Importer, p2p: P2P, -) -> Service +) -> Service where Importer: BlockImport, P2P: PeerToPeer + 'static, @@ -277,13 +282,11 @@ where let p2p = Arc::new(p2p); let gossiped_tx_stream = p2p.gossiped_transaction_events(); let committed_block_stream = importer.block_events(); - let txpool = Arc::new(ParkingMutex::new(TxPool::new(config))); - let db: Arc = Arc::new(db); + let txpool = Arc::new(ParkingMutex::new(TxPool::new(config, db))); let task = Task { gossiped_tx_stream, committed_block_stream, shared: SharedState { - db, tx_status_sender, txpool, p2p, diff --git a/crates/services/txpool/src/service/test_helpers.rs b/crates/services/txpool/src/service/test_helpers.rs index 60fe8c364fb..bb14da3d2cf 100644 --- a/crates/services/txpool/src/service/test_helpers.rs +++ b/crates/services/txpool/src/service/test_helpers.rs @@ -27,7 +27,7 @@ use std::cell::RefCell; type GossipedTransaction = GossipData; pub struct TestContext { - pub(crate) service: Service, + pub(crate) service: Service, mock_db: MockDb, rng: RefCell, } @@ -37,7 +37,7 @@ impl TestContext { TestContextBuilder::new().build().await } - pub fn service(&self) -> &Service { + pub fn service(&self) -> &Service { &self.service } @@ -57,7 +57,6 @@ impl TestContext { mockall::mock! { pub P2P {} - #[async_trait::async_trait] impl PeerToPeer for P2P { type GossipedTransaction = GossipedTransaction; @@ -65,11 +64,11 @@ mockall::mock! { fn gossiped_transaction_events(&self) -> BoxStream; - async fn notify_gossip_transaction_validity( + fn notify_gossip_transaction_validity( &self, message: &GossipedTransaction, validity: GossipsubMessageAcceptance, - ); + ) -> anyhow::Result<()>; } } diff --git a/crates/services/txpool/src/txpool.rs b/crates/services/txpool/src/txpool.rs index 0e9370e75d8..51e3d5e8a77 100644 --- a/crates/services/txpool/src/txpool.rs +++ b/crates/services/txpool/src/txpool.rs @@ -34,15 +34,19 @@ use std::{ }; #[derive(Debug, Clone)] -pub struct TxPool { +pub struct TxPool { by_hash: HashMap, by_gas_price: PriceSort, by_dependency: Dependency, config: Config, + database: DB, } -impl TxPool { - pub fn new(config: Config) -> Self { +impl TxPool +where + DB: TxPoolDb, +{ + pub fn new(config: Config, database: DB) -> Self { let max_depth = config.max_depth; Self { @@ -50,6 +54,7 @@ impl TxPool { by_gas_price: PriceSort::default(), by_dependency: Dependency::new(max_depth, config.utxo_validation), config, + database, } } @@ -66,9 +71,8 @@ impl TxPool { &mut self, // TODO: Pass `&Transaction` tx: Arc, - db: &dyn TxPoolDb, ) -> anyhow::Result { - let current_height = db.current_block_height()?; + let current_height = self.database.current_block_height()?; if tx.is_mint() { return Err(Error::NotSupportedTransactionType.into()) @@ -143,7 +147,9 @@ impl TxPool { .observe(tx.metered_bytes_size() as f64); } // check and insert dependency - let rem = self.by_dependency.insert(&self.by_hash, db, &tx)?; + let rem = self + .by_dependency + .insert(&self.by_hash, &self.database, &tx)?; self.by_hash.insert(tx.id(), TxInfo::new(tx.clone())); self.by_gas_price.insert(&tx); @@ -234,7 +240,6 @@ impl TxPool { /// Import a set of transactions from network gossip or GraphQL endpoints. pub fn insert( &mut self, - db: &dyn TxPoolDb, tx_status_sender: &TxStatusChange, txs: &[Arc], ) -> Vec> { @@ -242,7 +247,7 @@ impl TxPool { // should be done before transaction comes to txpool, or before it enters RwLocked region. let mut res = Vec::new(); for tx in txs.iter() { - res.push(self.insert_inner(tx.clone(), db)) + res.push(self.insert_inner(tx.clone())) } // announce to subscribers for ret in res.iter() { diff --git a/crates/services/txpool/src/txpool/tests.rs b/crates/services/txpool/src/txpool/tests.rs index 842d713951e..e8a8b559b7c 100644 --- a/crates/services/txpool/src/txpool/tests.rs +++ b/crates/services/txpool/src/txpool/tests.rs @@ -42,10 +42,10 @@ use std::{ #[tokio::test] async fn insert_simple_tx_succeeds() { let mut rng = StdRng::seed_from_u64(0); - let mut txpool = TxPool::new(Default::default()); let db = MockDb::default(); + let mut txpool = TxPool::new(Default::default(), db); - let (_, gas_coin) = setup_coin(&mut rng, Some(&db)); + let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); let tx = Arc::new( TransactionBuilder::script(vec![], vec![]) .add_input(gas_coin) @@ -53,17 +53,17 @@ async fn insert_simple_tx_succeeds() { ); txpool - .insert_inner(tx, &db) + .insert_inner(tx) .expect("Transaction should be OK, got Err"); } #[tokio::test] async fn insert_simple_tx_dependency_chain_succeeds() { let mut rng = StdRng::seed_from_u64(0); - let mut txpool = TxPool::new(Default::default()); let db = MockDb::default(); + let mut txpool = TxPool::new(Default::default(), db); - let (_, gas_coin) = setup_coin(&mut rng, Some(&db)); + let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); let (output, unset_input) = create_output_and_input(&mut rng, 1); let tx1 = Arc::new( TransactionBuilder::script(vec![], vec![]) @@ -73,7 +73,7 @@ async fn insert_simple_tx_dependency_chain_succeeds() { .finalize_as_transaction(), ); - let (_, gas_coin) = setup_coin(&mut rng, Some(&db)); + let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); let input = unset_input.into_input(UtxoId::new(tx1.id(), 0)); let tx2 = Arc::new( TransactionBuilder::script(vec![], vec![]) @@ -83,19 +83,17 @@ async fn insert_simple_tx_dependency_chain_succeeds() { .finalize_as_transaction(), ); + txpool.insert_inner(tx1).expect("Tx1 should be OK, got Err"); txpool - .insert_inner(tx1, &db) - .expect("Tx1 should be OK, got Err"); - txpool - .insert_inner(tx2, &db) + .insert_inner(tx2) .expect("Tx2 dependent should be OK, got Err"); } #[tokio::test] async fn faulty_t2_collided_on_contract_id_from_tx1() { let mut rng = StdRng::seed_from_u64(0); - let mut txpool = TxPool::new(Default::default()); let db = MockDb::default(); + let mut txpool = TxPool::new(Default::default(), db); let contract_id = ContractId::from_str( "0x0000000000000000000000000000000000000000000000000000000000000100", @@ -103,7 +101,7 @@ async fn faulty_t2_collided_on_contract_id_from_tx1() { .unwrap(); // contract creation tx - let (_, gas_coin) = setup_coin(&mut rng, Some(&db)); + let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); let (output, unset_input) = create_output_and_input(&mut rng, 10); let tx = Arc::new( TransactionBuilder::create( @@ -118,7 +116,7 @@ async fn faulty_t2_collided_on_contract_id_from_tx1() { .finalize_as_transaction(), ); - let (_, gas_coin) = setup_coin(&mut rng, Some(&db)); + let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); let input = unset_input.into_input(UtxoId::new(tx.id(), 1)); // attempt to insert a different creation tx with a valid dependency on the first tx, @@ -137,12 +135,10 @@ async fn faulty_t2_collided_on_contract_id_from_tx1() { .finalize_as_transaction(), ); - txpool - .insert_inner(tx, &db) - .expect("Tx1 should be Ok, got Err"); + txpool.insert_inner(tx).expect("Tx1 should be Ok, got Err"); let err = txpool - .insert_inner(tx_faulty, &db) + .insert_inner(tx_faulty) .expect_err("Tx2 should be Err, got Ok"); assert!(matches!( err.downcast_ref::(), @@ -153,14 +149,14 @@ async fn faulty_t2_collided_on_contract_id_from_tx1() { #[tokio::test] async fn fail_to_insert_tx_with_dependency_on_invalid_utxo_type() { let mut rng = StdRng::seed_from_u64(0); - let mut txpool = TxPool::new(Default::default()); let db = MockDb::default(); + let mut txpool = TxPool::new(Default::default(), db); let contract_id = ContractId::from_str( "0x0000000000000000000000000000000000000000000000000000000000000100", ) .unwrap(); - let (_, gas_coin) = setup_coin(&mut rng, Some(&db)); + let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); let tx_faulty = Arc::new( TransactionBuilder::create( Default::default(), @@ -187,11 +183,11 @@ async fn fail_to_insert_tx_with_dependency_on_invalid_utxo_type() { ); txpool - .insert_inner(tx_faulty.clone(), &db) + .insert_inner(tx_faulty.clone()) .expect("Tx1 should be Ok, got Err"); let err = txpool - .insert_inner(tx, &db) + .insert_inner(tx) .expect_err("Tx2 should be Err, got Ok"); assert!(matches!( err.downcast_ref::(), @@ -201,18 +197,17 @@ async fn fail_to_insert_tx_with_dependency_on_invalid_utxo_type() { #[tokio::test] async fn not_inserted_known_tx() { - let mut txpool = TxPool::new(Default::default()); - let db = MockDb::default(); + let mut txpool = TxPool::new(Default::default(), MockDb::default()); let tx = Arc::new(TransactionBuilder::script(vec![], vec![]).finalize_as_transaction()); txpool - .insert_inner(tx.clone(), &db) + .insert_inner(tx.clone()) .expect("Tx1 should be Ok, got Err"); let err = txpool - .insert_inner(tx, &db) + .insert_inner(tx) .expect_err("Second insertion of Tx1 should be Err, got Ok"); assert!(matches!( err.downcast_ref::(), @@ -223,8 +218,7 @@ async fn not_inserted_known_tx() { #[tokio::test] async fn try_to_insert_tx2_missing_utxo() { let mut rng = StdRng::seed_from_u64(0); - let mut txpool = TxPool::new(Default::default()); - let db = MockDb::default(); + let mut txpool = TxPool::new(Default::default(), MockDb::default()); let (_, input) = setup_coin(&mut rng, None); let tx = Arc::new( @@ -235,7 +229,7 @@ async fn try_to_insert_tx2_missing_utxo() { ); let err = txpool - .insert_inner(tx, &db) + .insert_inner(tx) .expect_err("Tx should be Err, got Ok"); assert!(matches!( err.downcast_ref::(), @@ -246,14 +240,14 @@ async fn try_to_insert_tx2_missing_utxo() { #[tokio::test] async fn tx_try_to_use_spent_coin() { let mut rng = StdRng::seed_from_u64(0); - let mut txpool = TxPool::new(Default::default()); - let mut db = MockDb::default(); + let db = MockDb::default(); + let mut txpool = TxPool::new(Default::default(), db); // put a spent coin into the database let (mut coin, input) = setup_coin(&mut rng, None); let utxo_id = *input.utxo_id().unwrap(); coin.status = CoinStatus::Spent; - db.insert_coin(utxo_id, coin); + txpool.database.insert_coin(utxo_id, coin); let tx = Arc::new( TransactionBuilder::script(vec![], vec![]) @@ -264,7 +258,7 @@ async fn tx_try_to_use_spent_coin() { // attempt to insert the tx with an already spent coin let err = txpool - .insert_inner(tx, &db) + .insert_inner(tx) .expect_err("Tx should be Err, got Ok"); assert!(matches!( err.downcast_ref::(), @@ -275,10 +269,10 @@ async fn tx_try_to_use_spent_coin() { #[tokio::test] async fn higher_priced_tx_removes_lower_priced_tx() { let mut rng = StdRng::seed_from_u64(0); - let mut txpool = TxPool::new(Default::default()); let db = MockDb::default(); + let mut txpool = TxPool::new(Default::default(), db); - let (_, coin_input) = setup_coin(&mut rng, Some(&db)); + let (_, coin_input) = setup_coin(&mut rng, Some(&txpool.database)); let tx1 = Arc::new( TransactionBuilder::script(vec![], vec![]) @@ -294,22 +288,20 @@ async fn higher_priced_tx_removes_lower_priced_tx() { ); txpool - .insert_inner(tx1.clone(), &db) + .insert_inner(tx1.clone()) .expect("Tx1 should be Ok, got Err"); - let vec = txpool - .insert_inner(tx2, &db) - .expect("Tx2 should be Ok, got Err"); + let vec = txpool.insert_inner(tx2).expect("Tx2 should be Ok, got Err"); assert_eq!(vec.removed[0].id(), tx1.id(), "Tx1 id should be removed"); } #[tokio::test] async fn underpriced_tx1_not_included_coin_collision() { let mut rng = StdRng::seed_from_u64(0); - let mut txpool = TxPool::new(Default::default()); let db = MockDb::default(); + let mut txpool = TxPool::new(Default::default(), db); - let (_, gas_coin) = setup_coin(&mut rng, Some(&db)); + let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); let (output, unset_input) = create_output_and_input(&mut rng, 10); let tx1 = Arc::new( TransactionBuilder::script(vec![], vec![]) @@ -334,14 +326,14 @@ async fn underpriced_tx1_not_included_coin_collision() { ); txpool - .insert_inner(tx1.clone(), &db) + .insert_inner(tx1.clone()) .expect("Tx1 should be Ok, got Err"); txpool - .insert_inner(tx2.clone(), &db) + .insert_inner(tx2.clone()) .expect("Tx2 should be Ok, got Err"); let err = txpool - .insert_inner(tx3, &db) + .insert_inner(tx3) .expect_err("Tx3 should be Err, got Ok"); assert!(matches!( err.downcast_ref::(), @@ -352,10 +344,10 @@ async fn underpriced_tx1_not_included_coin_collision() { #[tokio::test] async fn overpriced_tx_contract_input_not_inserted() { let mut rng = StdRng::seed_from_u64(0); - let mut txpool = TxPool::new(Default::default()); let db = MockDb::default(); + let mut txpool = TxPool::new(Default::default(), db); - let (_, gas_funds) = setup_coin(&mut rng, Some(&db)); + let (_, gas_funds) = setup_coin(&mut rng, Some(&txpool.database)); let contract_id = ContractId::default(); let tx1 = Arc::new( TransactionBuilder::create( @@ -369,7 +361,7 @@ async fn overpriced_tx_contract_input_not_inserted() { .finalize_as_transaction(), ); - let (_, gas_funds) = setup_coin(&mut rng, Some(&db)); + let (_, gas_funds) = setup_coin(&mut rng, Some(&txpool.database)); let tx2 = Arc::new( TransactionBuilder::script(vec![], vec![]) .gas_price(11) @@ -382,12 +374,10 @@ async fn overpriced_tx_contract_input_not_inserted() { .finalize_as_transaction(), ); - txpool - .insert_inner(tx1, &db) - .expect("Tx1 should be Ok, got err"); + txpool.insert_inner(tx1).expect("Tx1 should be Ok, got err"); let err = txpool - .insert_inner(tx2, &db) + .insert_inner(tx2) .expect_err("Tx2 should be Err, got Ok"); assert!( matches!( @@ -402,11 +392,11 @@ async fn overpriced_tx_contract_input_not_inserted() { #[tokio::test] async fn dependent_contract_input_inserted() { let mut rng = StdRng::seed_from_u64(0); - let mut txpool = TxPool::new(Default::default()); let db = MockDb::default(); + let mut txpool = TxPool::new(Default::default(), db); let contract_id = ContractId::default(); - let (_, gas_funds) = setup_coin(&mut rng, Some(&db)); + let (_, gas_funds) = setup_coin(&mut rng, Some(&txpool.database)); let tx1 = Arc::new( TransactionBuilder::create( Default::default(), @@ -419,7 +409,7 @@ async fn dependent_contract_input_inserted() { .finalize_as_transaction(), ); - let (_, gas_funds) = setup_coin(&mut rng, Some(&db)); + let (_, gas_funds) = setup_coin(&mut rng, Some(&txpool.database)); let tx2 = Arc::new( TransactionBuilder::script(vec![], vec![]) .gas_price(10) @@ -432,21 +422,17 @@ async fn dependent_contract_input_inserted() { .finalize_as_transaction(), ); - txpool - .insert_inner(tx1, &db) - .expect("Tx1 should be Ok, got Err"); - txpool - .insert_inner(tx2, &db) - .expect("Tx2 should be Ok, got Err"); + txpool.insert_inner(tx1).expect("Tx1 should be Ok, got Err"); + txpool.insert_inner(tx2).expect("Tx2 should be Ok, got Err"); } #[tokio::test] async fn more_priced_tx3_removes_tx1_and_dependent_tx2() { let mut rng = StdRng::seed_from_u64(0); - let mut txpool = TxPool::new(Default::default()); let db = MockDb::default(); + let mut txpool = TxPool::new(Default::default(), db); - let (_, gas_coin) = setup_coin(&mut rng, Some(&db)); + let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); let (output, unset_input) = create_output_and_input(&mut rng, 10); let tx1 = Arc::new( @@ -472,14 +458,12 @@ async fn more_priced_tx3_removes_tx1_and_dependent_tx2() { ); txpool - .insert_inner(tx1.clone(), &db) + .insert_inner(tx1.clone()) .expect("Tx1 should be OK, got Err"); txpool - .insert_inner(tx2.clone(), &db) + .insert_inner(tx2.clone()) .expect("Tx2 should be OK, got Err"); - let vec = txpool - .insert_inner(tx3, &db) - .expect("Tx3 should be OK, got Err"); + let vec = txpool.insert_inner(tx3).expect("Tx3 should be OK, got Err"); assert_eq!( vec.removed.len(), 2, @@ -493,10 +477,10 @@ async fn more_priced_tx3_removes_tx1_and_dependent_tx2() { #[tokio::test] async fn more_priced_tx2_removes_tx1_and_more_priced_tx3_removes_tx2() { let mut rng = StdRng::seed_from_u64(0); - let mut txpool = TxPool::new(Default::default()); let db = MockDb::default(); + let mut txpool = TxPool::new(Default::default(), db); - let (_, gas_coin) = setup_coin(&mut rng, Some(&db)); + let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); let tx1 = Arc::new( TransactionBuilder::script(vec![], vec![]) @@ -517,16 +501,10 @@ async fn more_priced_tx2_removes_tx1_and_more_priced_tx3_removes_tx2() { .finalize_as_transaction(), ); - txpool - .insert_inner(tx1, &db) - .expect("Tx1 should be OK, got Err"); - let squeezed = txpool - .insert_inner(tx2, &db) - .expect("Tx2 should be OK, got Err"); + txpool.insert_inner(tx1).expect("Tx1 should be OK, got Err"); + let squeezed = txpool.insert_inner(tx2).expect("Tx2 should be OK, got Err"); assert_eq!(squeezed.removed.len(), 1); - let squeezed = txpool - .insert_inner(tx3, &db) - .expect("Tx3 should be OK, got Err"); + let squeezed = txpool.insert_inner(tx3).expect("Tx3 should be OK, got Err"); assert_eq!( squeezed.removed.len(), 1, @@ -538,32 +516,33 @@ async fn more_priced_tx2_removes_tx1_and_more_priced_tx3_removes_tx2() { #[tokio::test] async fn tx_limit_hit() { let mut rng = StdRng::seed_from_u64(0); - let mut txpool = TxPool::new(Config { - max_tx: 1, - ..Default::default() - }); let db = MockDb::default(); + let mut txpool = TxPool::new( + Config { + max_tx: 1, + ..Default::default() + }, + db, + ); - let (_, gas_coin) = setup_coin(&mut rng, Some(&db)); + let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); let tx1 = Arc::new( TransactionBuilder::script(vec![], vec![]) .add_input(gas_coin) .add_output(create_coin_output()) .finalize_as_transaction(), ); - let (_, gas_coin) = setup_coin(&mut rng, Some(&db)); + let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); let tx2 = Arc::new( TransactionBuilder::script(vec![], vec![]) .add_input(gas_coin) .finalize_as_transaction(), ); - txpool - .insert_inner(tx1, &db) - .expect("Tx1 should be Ok, got Err"); + txpool.insert_inner(tx1).expect("Tx1 should be Ok, got Err"); let err = txpool - .insert_inner(tx2, &db) + .insert_inner(tx2) .expect_err("Tx2 should be Err, got Ok"); assert!(matches!( err.downcast_ref::(), @@ -574,13 +553,16 @@ async fn tx_limit_hit() { #[tokio::test] async fn tx_depth_hit() { let mut rng = StdRng::seed_from_u64(0); - let mut txpool = TxPool::new(Config { - max_depth: 2, - ..Default::default() - }); let db = MockDb::default(); + let mut txpool = TxPool::new( + Config { + max_depth: 2, + ..Default::default() + }, + db, + ); - let (_, gas_coin) = setup_coin(&mut rng, Some(&db)); + let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); let (output, unset_input) = create_output_and_input(&mut rng, 10_000); let tx1 = Arc::new( TransactionBuilder::script(vec![], vec![]) @@ -605,15 +587,11 @@ async fn tx_depth_hit() { .finalize_as_transaction(), ); - txpool - .insert_inner(tx1, &db) - .expect("Tx1 should be OK, got Err"); - txpool - .insert_inner(tx2, &db) - .expect("Tx2 should be OK, got Err"); + txpool.insert_inner(tx1).expect("Tx1 should be OK, got Err"); + txpool.insert_inner(tx2).expect("Tx2 should be OK, got Err"); let err = txpool - .insert_inner(tx3, &db) + .insert_inner(tx3) .expect_err("Tx3 should be Err, got Ok"); assert!(matches!( err.downcast_ref::(), @@ -624,10 +602,10 @@ async fn tx_depth_hit() { #[tokio::test] async fn sorted_out_tx1_2_4() { let mut rng = StdRng::seed_from_u64(0); - let mut txpool = TxPool::new(Default::default()); let db = MockDb::default(); + let mut txpool = TxPool::new(Default::default(), db); - let (_, gas_coin) = setup_coin(&mut rng, Some(&db)); + let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); let tx1 = Arc::new( TransactionBuilder::script(vec![], vec![]) .gas_price(10) @@ -635,7 +613,7 @@ async fn sorted_out_tx1_2_4() { .finalize_as_transaction(), ); - let (_, gas_coin) = setup_coin(&mut rng, Some(&db)); + let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); let tx2 = Arc::new( TransactionBuilder::script(vec![], vec![]) .gas_price(9) @@ -643,7 +621,7 @@ async fn sorted_out_tx1_2_4() { .finalize_as_transaction(), ); - let (_, gas_coin) = setup_coin(&mut rng, Some(&db)); + let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); let tx3 = Arc::new( TransactionBuilder::script(vec![], vec![]) .gas_price(20) @@ -652,13 +630,13 @@ async fn sorted_out_tx1_2_4() { ); txpool - .insert_inner(tx1.clone(), &db) + .insert_inner(tx1.clone()) .expect("Tx1 should be Ok, got Err"); txpool - .insert_inner(tx2.clone(), &db) + .insert_inner(tx2.clone()) .expect("Tx2 should be Ok, got Err"); txpool - .insert_inner(tx3.clone(), &db) + .insert_inner(tx3.clone()) .expect("Tx4 should be Ok, got Err"); let txs = txpool.sorted_includable(); @@ -672,10 +650,10 @@ async fn sorted_out_tx1_2_4() { #[tokio::test] async fn find_dependent_tx1_tx2() { let mut rng = StdRng::seed_from_u64(0); - let mut txpool = TxPool::new(Default::default()); let db = MockDb::default(); + let mut txpool = TxPool::new(Default::default(), db); - let (_, gas_coin) = setup_coin(&mut rng, Some(&db)); + let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); let (output, unset_input) = create_output_and_input(&mut rng, 10_000); let tx1 = Arc::new( TransactionBuilder::script(vec![], vec![]) @@ -704,13 +682,13 @@ async fn find_dependent_tx1_tx2() { ); txpool - .insert_inner(tx1.clone(), &db) + .insert_inner(tx1.clone()) .expect("Tx0 should be Ok, got Err"); txpool - .insert_inner(tx2.clone(), &db) + .insert_inner(tx2.clone()) .expect("Tx1 should be Ok, got Err"); let tx3_result = txpool - .insert_inner(tx3.clone(), &db) + .insert_inner(tx3.clone()) .expect("Tx2 should be Ok, got Err"); let mut seen = HashMap::new(); @@ -730,13 +708,16 @@ async fn find_dependent_tx1_tx2() { #[tokio::test] async fn tx_at_least_min_gas_price_is_insertable() { let mut rng = StdRng::seed_from_u64(0); - let mut txpool = TxPool::new(Config { - min_gas_price: 10, - ..Default::default() - }); let db = MockDb::default(); + let mut txpool = TxPool::new( + Config { + min_gas_price: 10, + ..Default::default() + }, + db, + ); - let (_, gas_coin) = setup_coin(&mut rng, Some(&db)); + let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); let tx = Arc::new( TransactionBuilder::script(vec![], vec![]) .gas_price(10) @@ -744,21 +725,22 @@ async fn tx_at_least_min_gas_price_is_insertable() { .finalize_as_transaction(), ); - txpool - .insert_inner(tx, &db) - .expect("Tx should be Ok, got Err"); + txpool.insert_inner(tx).expect("Tx should be Ok, got Err"); } #[tokio::test] async fn tx_below_min_gas_price_is_not_insertable() { let mut rng = StdRng::seed_from_u64(0); - let mut txpool = TxPool::new(Config { - min_gas_price: 11, - ..Default::default() - }); let db = MockDb::default(); + let mut txpool = TxPool::new( + Config { + min_gas_price: 11, + ..Default::default() + }, + db, + ); - let (_, gas_coin) = setup_coin(&mut rng, Some(&db)); + let (_, gas_coin) = setup_coin(&mut rng, Some(&txpool.database)); let tx = Arc::new( TransactionBuilder::script(vec![], vec![]) .gas_price(10) @@ -767,7 +749,7 @@ async fn tx_below_min_gas_price_is_not_insertable() { ); let err = txpool - .insert_inner(tx, &db) + .insert_inner(tx) .expect_err("expected insertion failure"); assert!(matches!( err.root_cause().downcast_ref::().unwrap(), @@ -785,13 +767,11 @@ async fn tx_inserted_into_pool_when_input_message_id_exists_in_db() { .finalize_as_transaction(), ); - let mut db = MockDb::default(); + let db = MockDb::default(); db.insert_message(message); - let mut txpool = TxPool::new(Default::default()); + let mut txpool = TxPool::new(Default::default(), db); - txpool - .insert_inner(tx.clone(), &db) - .expect("should succeed"); + txpool.insert_inner(tx.clone()).expect("should succeed"); let tx_info = txpool.find_one(&tx.id()).unwrap(); assert_eq!(tx_info.tx().id(), tx.id()); @@ -808,11 +788,11 @@ async fn tx_rejected_when_input_message_id_is_spent() { .finalize_as_transaction(), ); - let mut db = MockDb::default(); + let db = MockDb::default(); db.insert_message(message.clone()); - let mut txpool = TxPool::new(Default::default()); + let mut txpool = TxPool::new(Default::default(), db); - let err = txpool.insert_inner(tx, &db).expect_err("should fail"); + let err = txpool.insert_inner(tx).expect_err("should fail"); // check error assert!(matches!( @@ -833,10 +813,9 @@ async fn tx_rejected_from_pool_when_input_message_id_does_not_exist_in_db() { let db = MockDb::default(); // Do not insert any messages into the DB to ensure there is no matching message for the // tx. + let mut txpool = TxPool::new(Default::default(), db); - let mut txpool = TxPool::new(Default::default()); - - let err = txpool.insert_inner(tx, &db).expect_err("should fail"); + let err = txpool.insert_inner(tx).expect_err("should fail"); // check error assert!(matches!( @@ -868,23 +847,21 @@ async fn tx_rejected_from_pool_when_gas_price_is_lower_than_another_tx_with_same .finalize_as_transaction(), ); - let mut db = MockDb::default(); + let db = MockDb::default(); db.insert_message(message.clone()); - let mut txpool = TxPool::new(Default::default()); + let mut txpool = TxPool::new(Default::default(), db); // Insert a tx for the message id with a high gas amount txpool - .insert_inner(tx_high.clone(), &db) + .insert_inner(tx_high.clone()) .expect("expected successful insertion"); // Insert a tx for the message id with a low gas amount // Because the new transaction's id matches an existing transaction, we compare the gas // prices of both the new and existing transactions. Since the existing transaction's gas // price is higher, we must now reject the new transaction. - let err = txpool - .insert_inner(tx_low, &db) - .expect_err("expected failure"); + let err = txpool.insert_inner(tx_low).expect_err("expected failure"); // check error assert!(matches!( @@ -909,14 +886,12 @@ async fn higher_priced_tx_squeezes_out_lower_priced_tx_with_same_message_id() { .finalize_as_transaction(), ); - let mut db = MockDb::default(); + let db = MockDb::default(); db.insert_message(message); - let mut txpool = TxPool::new(Default::default()); + let mut txpool = TxPool::new(Default::default(), db); - txpool - .insert_inner(tx_low.clone(), &db) - .expect("should succeed"); + txpool.insert_inner(tx_low.clone()).expect("should succeed"); // Insert a tx for the message id with a high gas amount // Because the new transaction's id matches an existing transaction, we compare the gas @@ -929,7 +904,7 @@ async fn higher_priced_tx_squeezes_out_lower_priced_tx_with_same_message_id() { .finalize_as_transaction(), ); - let squeezed_out_txs = txpool.insert_inner(tx_high, &db).expect("should succeed"); + let squeezed_out_txs = txpool.insert_inner(tx_high).expect("should succeed"); assert_eq!(squeezed_out_txs.removed.len(), 1); assert_eq!(squeezed_out_txs.removed[0].id(), tx_low.id()); @@ -971,14 +946,14 @@ async fn message_of_squeezed_out_tx_can_be_resubmitted_at_lower_gas_price() { .finalize_as_transaction(), ); - let mut db = MockDb::default(); + let db = MockDb::default(); db.insert_message(message_1); db.insert_message(message_2); - let mut txpool = TxPool::new(Default::default()); + let mut txpool = TxPool::new(Default::default(), db); - txpool.insert_inner(tx_1, &db).expect("should succeed"); + txpool.insert_inner(tx_1).expect("should succeed"); - txpool.insert_inner(tx_2, &db).expect("should succeed"); + txpool.insert_inner(tx_2).expect("should succeed"); - txpool.insert_inner(tx_3, &db).expect("should succeed"); + txpool.insert_inner(tx_3).expect("should succeed"); } diff --git a/docs/architecture.md b/docs/architecture.md index 72b5a1f3a9e..4a85b36f19d 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -317,7 +317,7 @@ trait PeerToPeer { fn gossiped_transaction_events(&self) -> BoxStream; // Report the validity of a transaction received from the network. - async fn notify_gossip_transaction_validity(message: &Self::GossipedTransaction, validity: GossipValidity); + fn notify_gossip_transaction_validity(message: &Self::GossipedTransaction, validity: GossipValidity) -> anyhow::Result<()>; } trait BlockImporter {