Skip to content

Commit

Permalink
Unify the work of runnable services (#860)
Browse files Browse the repository at this point in the history
  • Loading branch information
xgreenx authored Dec 23, 2022
1 parent d5cad93 commit e36389f
Show file tree
Hide file tree
Showing 45 changed files with 2,040 additions and 2,129 deletions.
21 changes: 19 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ members = [
"crates/database",
"crates/fuel-core",
"crates/metrics",
"crates/services",
"crates/services/consensus_module/bft",
"crates/services/consensus_module/poa",
"crates/services/executor",
Expand Down
2 changes: 2 additions & 0 deletions ci_checks.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/usr/bin/env bash

cargo +nightly fmt --all -- --check &&
cargo sort -w --check &&
source .github/workflows/scripts/verify_openssl.sh &&
Expand Down
3 changes: 0 additions & 3 deletions crates/chain-config/src/config/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,4 @@ pub enum BlockProduction {
#[serde(flatten)]
trigger: fuel_core_poa::Trigger,
},
// TODO:
// RoundRobin,
// ProofOfStake,
}
2 changes: 1 addition & 1 deletion crates/fuel-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ axum = { version = "0.5" }
bincode = "1.3"
derive_more = { version = "0.99" }
enum-iterator = "1.2"
fuel-core-bft = { path = "../services/consensus_module/bft", version = "0.15.1" }
fuel-core-chain-config = { path = "../chain-config", version = "0.15.1" }
fuel-core-database = { path = "../database", version = "0.15.1" }
fuel-core-executor = { path = "../services/executor", version = "0.15.1" }
Expand All @@ -30,6 +29,7 @@ fuel-core-p2p = { path = "../services/p2p", version = "0.15.1", optional = true
fuel-core-poa = { path = "../services/consensus_module/poa", version = "0.15.1" }
fuel-core-producer = { path = "../services/producer", version = "0.15.1" }
fuel-core-relayer = { path = "../services/relayer", version = "0.15.1", optional = true }
fuel-core-services = { path = "../services", version = "0.15.1" }
fuel-core-storage = { path = "../storage", version = "0.15.1" }
fuel-core-sync = { path = "../services/sync", version = "0.15.1" }
fuel-core-txpool = { path = "../services/txpool", version = "0.15.1" }
Expand Down
14 changes: 7 additions & 7 deletions crates/fuel-core/src/schema/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ impl TxQuery {
) -> async_graphql::Result<Option<Transaction>> {
let db = ctx.data_unchecked::<Database>();
let id = id.0;
let txpool = ctx.data_unchecked::<Arc<TxPoolService>>();
let txpool = ctx.data_unchecked::<TxPoolService>();

if let Ok(Some(transaction)) = txpool.find_one(id).await {
if let Some(transaction) = txpool.shared.find_one(id) {
Ok(Some(Transaction(transaction.tx().clone().deref().into())))
} else {
Ok(db
Expand Down Expand Up @@ -236,12 +236,12 @@ impl TxMutation {
ctx: &Context<'_>,
tx: HexString,
) -> async_graphql::Result<Transaction> {
let txpool = ctx.data_unchecked::<Arc<TxPoolService>>();
let txpool = ctx.data_unchecked::<TxPoolService>();
let mut tx = FuelTx::from_bytes(&tx.0)?;
tx.precompute();
let _: Vec<_> = txpool
.shared
.insert(vec![Arc::new(tx.clone())])
.await?
.into_iter()
.try_collect()?;

Expand All @@ -254,7 +254,7 @@ impl TxMutation {
pub struct TxStatusSubscription;

struct StreamState {
txpool: Arc<TxPoolService>,
txpool: TxPoolService,
db: Database,
}

Expand All @@ -277,9 +277,9 @@ impl TxStatusSubscription {
ctx: &Context<'_>,
#[graphql(desc = "The ID of the transaction")] id: TransactionId,
) -> impl Stream<Item = async_graphql::Result<TransactionStatus>> {
let txpool = ctx.data_unchecked::<Arc<TxPoolService>>().clone();
let txpool = ctx.data_unchecked::<TxPoolService>().clone();
let db = ctx.data_unchecked::<Database>().clone();
let rx = BroadcastStream::new(txpool.tx_update_subscribe());
let rx = BroadcastStream::new(txpool.shared.tx_update_subscribe());
let state = Box::new(StreamState { txpool, db });

transaction_status_change(state, rx.boxed(), id.into())
Expand Down
7 changes: 3 additions & 4 deletions crates/fuel-core/src/schema/tx/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ use fuel_core_types::{
services::txpool::TransactionStatus as TxStatus,
tai64::Tai64,
};
use std::sync::Arc;

pub struct ProgramState {
return_type: ReturnType,
Expand Down Expand Up @@ -406,7 +405,7 @@ impl Transaction {
) -> async_graphql::Result<Option<TransactionStatus>> {
let id = self.0.id();
let db = ctx.data_unchecked::<Database>();
let txpool = ctx.data_unchecked::<Arc<TxPoolService>>();
let txpool = ctx.data_unchecked::<TxPoolService>();
get_tx_status(id, db, txpool).await
}

Expand Down Expand Up @@ -503,8 +502,8 @@ pub(super) async fn get_tx_status(
) -> async_graphql::Result<Option<TransactionStatus>> {
match db.get_tx_status(&id)? {
Some(status) => Ok(Some(status.into())),
None => match txpool.find_one(id).await {
Ok(Some(transaction_in_pool)) => {
None => match txpool.shared.find_one(id) {
Some(transaction_in_pool) => {
let time = transaction_in_pool.submitted_time();
Ok(Some(TransactionStatus::Submitted(SubmittedStatus(time))))
}
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl FuelService {
.modules
.relayer
.as_ref()
.map(fuel_core_relayer::RelayerHandle::listen_synced);
.map(|relayer| relayer.shared.clone());

let handle = tokio::spawn(async move {
let run_fut = service.run();
Expand Down
53 changes: 23 additions & 30 deletions crates/fuel-core/src/service/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,11 @@ use crate::{
use fuel_core_p2p::orchestrator::Service as P2PService;
#[cfg(feature = "relayer")]
use fuel_core_relayer::RelayerSynced;
use fuel_core_txpool::Service;
use fuel_core_types::{
blockchain::SealedBlock,
services::txpool::TxStatus,
};
use fuel_core_txpool::Service as TxPoolService;
use fuel_core_types::blockchain::SealedBlock;
use std::sync::Arc;
use tokio::{
sync::broadcast::Receiver,
sync::broadcast::Sender,
task::JoinHandle,
};

Expand All @@ -26,12 +23,17 @@ pub mod txpool;
pub struct BlockImportAdapter {
// TODO: We should use `fuel_core_poa::Service here but for that we need to fix
// the `start` of the process and store the task inside of the `Service`.
rx: Receiver<SealedBlock>,
tx: Sender<SealedBlock>,
}

pub struct TxPoolAdapter {
pub service: Arc<Service>,
pub tx_status_rx: Receiver<TxStatus>,
service: TxPoolService,
}

impl TxPoolAdapter {
pub fn new(service: TxPoolService) -> Self {
Self { service }
}
}

pub struct ExecutorAdapter {
Expand All @@ -45,48 +47,39 @@ pub struct MaybeRelayerAdapter {
pub relayer_synced: Option<RelayerSynced>,
}

pub struct PoACoordinatorAdapter {
pub struct BlockProducerAdapter {
pub block_producer: Arc<fuel_core_producer::Producer<Database>>,
}

#[cfg_attr(not(feature = "p2p"), derive(Clone))]
#[cfg(feature = "p2p")]
#[derive(Clone)]
pub struct P2PAdapter {
#[cfg(feature = "p2p")]
p2p_service: Arc<P2PService>,
#[cfg(feature = "p2p")]
tx_receiver: Receiver<fuel_core_types::services::p2p::TransactionGossipData>,
service: Arc<P2PService>,
}

#[cfg(feature = "p2p")]
impl Clone for P2PAdapter {
fn clone(&self) -> Self {
Self::new(self.p2p_service.clone())
}
}
#[cfg(not(feature = "p2p"))]
#[derive(Default, Clone)]
pub struct P2PAdapter;

#[cfg(feature = "p2p")]
impl P2PAdapter {
pub fn new(p2p_service: Arc<P2PService>) -> Self {
let tx_receiver = p2p_service.subscribe_tx();
Self {
p2p_service,
tx_receiver,
}
pub fn new(service: Arc<P2PService>) -> Self {
Self { service }
}

pub async fn stop(&self) -> Option<JoinHandle<()>> {
self.p2p_service.stop().await
self.service.stop().await
}

pub async fn start(&self) -> anyhow::Result<()> {
self.p2p_service.start().await
self.service.start().await
}
}

#[cfg(not(feature = "p2p"))]
impl P2PAdapter {
pub fn new() -> Self {
Self {}
Default::default()
}

pub async fn stop(&self) -> Option<JoinHandle<()>> {
Expand Down
34 changes: 18 additions & 16 deletions crates/fuel-core/src/service/adapters/poa.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::{
database::Database,
service::adapters::{
PoACoordinatorAdapter,
BlockProducerAdapter,
TxPoolAdapter,
},
};
use fuel_core_poa::ports::TransactionPool;
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::transactional::StorageTransaction;
use fuel_core_types::{
blockchain::primitives::BlockHeight,
Expand All @@ -24,32 +25,33 @@ use fuel_core_types::{
},
};

#[async_trait::async_trait]
impl TransactionPool for TxPoolAdapter {
async fn pending_number(&self) -> anyhow::Result<usize> {
self.service.pending_number().await
fn pending_number(&self) -> usize {
self.service.shared.pending_number()
}

async fn total_consumable_gas(&self) -> anyhow::Result<u64> {
self.service.total_consumable_gas().await
fn total_consumable_gas(&self) -> u64 {
self.service.shared.total_consumable_gas()
}

async fn remove_txs(&self, ids: Vec<TxId>) -> anyhow::Result<Vec<ArcPoolTx>> {
self.service.remove_txs(ids).await
fn remove_txs(&self, ids: Vec<TxId>) -> Vec<ArcPoolTx> {
self.service.shared.remove_txs(ids)
}

async fn next_transaction_status_update(&mut self) -> TxStatus {
match self.tx_status_rx.recv().await {
Ok(status) => return status,
Err(err) => {
panic!("Tx Status Channel errored unexpectedly: {err:?}");
}
}
fn transaction_status_events(&self) -> BoxStream<TxStatus> {
use tokio_stream::{
wrappers::BroadcastStream,
StreamExt,
};
Box::pin(
BroadcastStream::new(self.service.shared.tx_status_subscribe())
.filter_map(|result| result.ok()),
)
}
}

#[async_trait::async_trait]
impl fuel_core_poa::ports::BlockProducer<Database> for PoACoordinatorAdapter {
impl fuel_core_poa::ports::BlockProducer<Database> for BlockProducerAdapter {
async fn produce_and_execute_block(
&self,
height: BlockHeight,
Expand Down
11 changes: 4 additions & 7 deletions crates/fuel-core/src/service/adapters/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,18 @@ use fuel_core_types::{
Result as ExecutorResult,
UncommittedResult,
},
txpool::{
ArcPoolTx,
Error as TxPoolError,
},
txpool::ArcPoolTx,
},
};

#[async_trait::async_trait]
impl TxPool for TxPoolAdapter {
async fn get_includable_txs(
fn get_includable_txs(
&self,
_block_height: BlockHeight,
max_gas: u64,
) -> Result<Vec<ArcPoolTx>, TxPoolError> {
self.service.select_transactions(max_gas).await
) -> Vec<ArcPoolTx> {
self.service.shared.select_transactions(max_gas)
}
}

Expand Down
Loading

0 comments on commit e36389f

Please sign in to comment.