Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change: add Mutex to AsyncRuntime #1213

Merged
merged 1 commit into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ pub use message::InstallSnapshotResponse;
pub use message::SnapshotResponse;
pub use message::VoteRequest;
pub use message::VoteResponse;
use tokio::sync::Mutex;
use tracing::trace_span;
use tracing::Instrument;
use tracing::Level;

use crate::async_runtime::mutex::Mutex;
use crate::async_runtime::watch::WatchReceiver;
use crate::async_runtime::MpscUnboundedSender;
use crate::async_runtime::OneshotSender;
Expand Down Expand Up @@ -321,7 +321,7 @@ where C: RaftTypeConfig
tx_shutdown: std::sync::Mutex::new(Some(tx_shutdown)),
core_state: std::sync::Mutex::new(CoreState::Running(core_handle)),

snapshot: Mutex::new(None),
snapshot: C::mutex(None),
};

Ok(Self { inner: Arc::new(inner) })
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/raft/raft_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::fmt::Debug;
use std::future::Future;
use std::sync::Arc;

use tokio::sync::Mutex;
use tracing::Level;

use crate::async_runtime::watch::WatchReceiver;
Expand All @@ -20,6 +19,7 @@ use crate::metrics::RaftServerMetrics;
use crate::raft::core_state::CoreState;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::alias::MutexOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::WatchReceiverOf;
Expand Down Expand Up @@ -48,7 +48,7 @@ where C: RaftTypeConfig
pub(in crate::raft) core_state: std::sync::Mutex<CoreState<C>>,

/// The ongoing snapshot transmission.
pub(in crate::raft) snapshot: Mutex<Option<crate::network::snapshot_transport::Streaming<C>>>,
pub(in crate::raft) snapshot: MutexOf<C, Option<crate::network::snapshot_transport::Streaming<C>>>,
}

impl<C> RaftInner<C>
Expand Down
9 changes: 5 additions & 4 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use request::Replicate;
use response::ReplicationResult;
pub(crate) use response::Response;
use tokio::select;
use tokio::sync::Mutex;
use tracing_futures::Instrument;

use crate::async_runtime::MpscUnboundedReceiver;
Expand Down Expand Up @@ -56,8 +55,10 @@ use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::MpscUnboundedReceiverOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::alias::MpscUnboundedWeakSenderOf;
use crate::type_config::alias::MutexOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::async_runtime::mutex::Mutex;
use crate::type_config::TypeConfigExt;
use crate::LogId;
use crate::RaftLogId;
Expand Down Expand Up @@ -114,7 +115,7 @@ where
/// Another `RaftNetwork` specific for snapshot replication.
///
/// Snapshot transmitting is a long running task, and is processed in a separate task.
snapshot_network: Arc<Mutex<N::Network>>,
snapshot_network: Arc<MutexOf<C, N::Network>>,

/// The current snapshot replication state.
///
Expand Down Expand Up @@ -188,7 +189,7 @@ where
target,
session_id,
network,
snapshot_network: Arc::new(Mutex::new(snapshot_network)),
snapshot_network: Arc::new(C::mutex(snapshot_network)),
snapshot_state: None,
backoff: None,
log_reader,
Expand Down Expand Up @@ -754,7 +755,7 @@ where

async fn send_snapshot(
request_id: RequestId,
network: Arc<Mutex<N::Network>>,
network: Arc<MutexOf<C, N::Network>>,
vote: Vote<C::NodeId>,
snapshot: Snapshot<C>,
option: RPCOption,
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/type_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ pub mod alias {
pub type WatchSenderOf<C, T> = <WatchOf<C> as watch::Watch>::Sender<T>;
pub type WatchReceiverOf<C, T> = <WatchOf<C> as watch::Watch>::Receiver<T>;

pub type MutexOf<C, T> = <Rt<C> as AsyncRuntime>::Mutex<T>;

// Usually used types
pub type LogIdOf<C> = crate::LogId<NodeIdOf<C>>;
pub type VoteOf<C> = crate::Vote<NodeIdOf<C>>;
Expand Down
18 changes: 18 additions & 0 deletions openraft/src/type_config/async_runtime/impls/tokio_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::sync::watch as tokio_watch;

use crate::async_runtime::mpsc_unbounded;
use crate::async_runtime::mpsc_unbounded::MpscUnbounded;
use crate::async_runtime::mutex;
use crate::async_runtime::oneshot;
use crate::async_runtime::watch;
use crate::type_config::OneshotSender;
Expand Down Expand Up @@ -76,6 +77,7 @@ impl AsyncRuntime for TokioRuntime {
type MpscUnbounded = TokioMpscUnbounded;
type Watch = TokioWatch;
type Oneshot = TokioOneshot;
type Mutex<T: OptionalSend + 'static> = TokioMutex<T>;
}

pub struct TokioMpscUnbounded;
Expand Down Expand Up @@ -197,3 +199,19 @@ where T: OptionalSend
self.send(t)
}
}

type TokioMutex<T> = tokio::sync::Mutex<T>;

impl<T> mutex::Mutex<T> for TokioMutex<T>
where T: OptionalSend + 'static
{
type Guard<'a> = tokio::sync::MutexGuard<'a, T>;

fn new(value: T) -> Self {
TokioMutex::new(value)
}

fn lock(&self) -> impl Future<Output = Self::Guard<'_>> + OptionalSend {
self.lock()
}
}
4 changes: 4 additions & 0 deletions openraft/src/type_config/async_runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub(crate) mod impls {
pub use tokio_runtime::TokioRuntime;
}
pub mod mpsc_unbounded;
pub mod mutex;
pub mod oneshot;
pub mod watch;

Expand All @@ -23,6 +24,7 @@ pub use mpsc_unbounded::MpscUnboundedSender;
pub use mpsc_unbounded::MpscUnboundedWeakSender;
pub use mpsc_unbounded::SendError;
pub use mpsc_unbounded::TryRecvError;
pub use mutex::Mutex;
pub use oneshot::Oneshot;
pub use oneshot::OneshotSender;
pub use watch::Watch;
Expand Down Expand Up @@ -101,4 +103,6 @@ pub trait AsyncRuntime: Debug + Default + PartialEq + Eq + OptionalSend + Option
type Watch: Watch;

type Oneshot: Oneshot;

type Mutex<T: OptionalSend + 'static>: Mutex<T>;
}
18 changes: 18 additions & 0 deletions openraft/src/type_config/async_runtime/mutex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use std::future::Future;
use std::ops::DerefMut;

use crate::OptionalSend;
use crate::OptionalSync;

/// Represents an implementation of an asynchronous Mutex.
pub trait Mutex<T: OptionalSend + 'static>: OptionalSend + OptionalSync {
/// Handle to an acquired lock, should release it when dropped.
type Guard<'a>: DerefMut<Target = T> + OptionalSend
where Self: 'a;

/// Creates a new lock.
fn new(value: T) -> Self;

/// Locks this Mutex.
fn lock(&self) -> impl Future<Output = Self::Guard<'_>> + OptionalSend;
}
11 changes: 11 additions & 0 deletions openraft/src/type_config/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::time::Duration;

use openraft_macros::since;

use crate::async_runtime::mutex::Mutex;
use crate::async_runtime::watch::Watch;
use crate::async_runtime::MpscUnbounded;
use crate::async_runtime::Oneshot;
Expand All @@ -12,6 +13,7 @@ use crate::type_config::alias::JoinHandleOf;
use crate::type_config::alias::MpscUnboundedOf;
use crate::type_config::alias::MpscUnboundedReceiverOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::alias::MutexOf;
use crate::type_config::alias::OneshotOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::OneshotSenderOf;
Expand Down Expand Up @@ -90,6 +92,15 @@ pub trait TypeConfigExt: RaftTypeConfig {
WatchOf::<Self>::channel(init)
}

/// Creates a Mutex lock.
///
/// This is just a wrapper of
/// [`AsyncRuntime::Mutex::new()`](`crate::async_runtime::Mutex::new`).
fn mutex<T>(value: T) -> MutexOf<Self, T>
where T: OptionalSend {
MutexOf::<Self, T>::new(value)
}

// Task methods

/// Spawn a new task.
Expand Down
Loading