Skip to content

Commit

Permalink
Use tokio mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
rakita committed Jun 14, 2022
1 parent 5685038 commit 887f944
Showing 1 changed file with 4 additions and 5 deletions.
9 changes: 4 additions & 5 deletions fuel-txpool/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::{interface::Interface, types::*, Config};
use fuel_core_interfaces::block_importer::ImportBlockBroadcast;
use fuel_core_interfaces::model::TxInfo;
use fuel_core_interfaces::txpool::{TxPoolDb, TxPoolMpsc, TxStatusBroadcast};
use parking_lot::Mutex as BlockingMutex;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
use tokio::task::JoinHandle;
Expand All @@ -12,7 +11,7 @@ pub struct Service {
interface: Arc<Interface>,
sender: mpsc::Sender<TxPoolMpsc>,
broadcast: broadcast::Sender<TxStatusBroadcast>,
join: BlockingMutex<Option<JoinHandle<mpsc::Receiver<TxPoolMpsc>>>>,
join: Mutex<Option<JoinHandle<mpsc::Receiver<TxPoolMpsc>>>>,
receiver: Arc<Mutex<Option<mpsc::Receiver<TxPoolMpsc>>>>,
}

Expand All @@ -24,13 +23,13 @@ impl Service {
interface: Arc::new(Interface::new(db, broadcast.clone(), config)),
sender,
broadcast,
join: BlockingMutex::new(None),
join: Mutex::new(None),
receiver: Arc::new(Mutex::new(Some(receiver))),
})
}

pub async fn start(&self, new_block: broadcast::Receiver<ImportBlockBroadcast>) -> bool {
let mut join = self.join.lock();
let mut join = self.join.lock().await;
if join.is_none() {
if let Some(receiver) = self.receiver.lock().await.take() {
let interface = self.interface.clone();
Expand All @@ -48,7 +47,7 @@ impl Service {
}

pub async fn stop(&self) -> Option<JoinHandle<()>> {
let mut join = self.join.lock();
let mut join = self.join.lock().await;
let join_handle = join.take();
if let Some(join_handle) = join_handle {
let _ = self.sender.send(TxPoolMpsc::Stop).await;
Expand Down

0 comments on commit 887f944

Please sign in to comment.