Skip to content

Commit

Permalink
fix(grpc-lock): serialized read/write lock for grpc request
Browse files Browse the repository at this point in the history
Signed-off-by: Hrudaya <hrudayaranjan.sahoo@datacore.com>
  • Loading branch information
hrudaya21 committed Mar 21, 2024
1 parent 1c36405 commit 3ef38b9
Show file tree
Hide file tree
Showing 11 changed files with 653 additions and 202 deletions.
4 changes: 3 additions & 1 deletion io-engine/src/bin/io-engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ fn start_tokio_runtime(args: &MayastorCliArgs) {

// Initialize Lock manager.
let cfg = ResourceLockManagerConfig::default()
.with_subsystem(ProtectedSubsystems::NEXUS, 512);
.with_subsystem(ProtectedSubsystems::NEXUS, 512)
.with_subsystem(ProtectedSubsystems::REPLICA, 512)
.with_subsystem(ProtectedSubsystems::POOL, 512);
ResourceLockManager::initialize(cfg);

Mthread::spawn_unaffinitized(move || {
Expand Down
119 changes: 88 additions & 31 deletions io-engine/src/core/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ use std::{
time::Duration,
};

use futures::lock::{Mutex, MutexGuard};
use once_cell::sync::OnceCell;
use std::sync::Arc;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};

/// Common IO engine resource subsystems.
pub struct ProtectedSubsystems;
impl ProtectedSubsystems {
pub const NEXUS: &'static str = "nexus";
pub const REPLICA: &'static str = "replica";
pub const POOL: &'static str = "pool";
}

/// Configuration parameters for initialization of the Lock manager.
Expand Down Expand Up @@ -41,48 +44,69 @@ impl ResourceLockManagerConfig {
}

/// Resource subsystem that holds locks for all resources withing this system.
#[derive(Clone)]
pub struct ResourceSubsystem {
id: String,
object_locks: Vec<Mutex<LockStats>>,
subsystem_lock: Mutex<LockStats>,
object_locks: Vec<Arc<RwLock<LockStats>>>,
subsystem_lock: Arc<RwLock<LockStats>>,
}

impl ResourceSubsystem {
/// Create a new resource subsystem with target id and maximum number of
/// objects.
fn new(id: String, num_objects: usize) -> Self {
let object_locks =
std::iter::repeat_with(|| Mutex::new(LockStats::default()))
std::iter::repeat_with(|| RwLock::new(LockStats::default()).into())
.take(num_objects)
.collect::<Vec<_>>();

Self {
id,
object_locks,
subsystem_lock: Mutex::new(LockStats::default()),
subsystem_lock: RwLock::new(LockStats::default()).into(),
}
}

/// Acquire the subsystem lock.
pub async fn lock(
/// Acquire the subsystem write lock.
pub async fn write_lock_subsystem(
&self,
wait_timeout: Option<Duration>,
) -> Option<ResourceLockGuard<'_>> {
acquire_lock(&self.subsystem_lock, wait_timeout).await
) -> Option<ResourceLockWriteGuard<'_>> {
acquire_write_lock(&self.subsystem_lock, wait_timeout).await
}

/// Lock subsystem resource by its ID and obtain a lock guard.
pub async fn lock_resource<T: AsRef<str>>(
/// Write lock subsystem resource by its ID and obtain a lock guard.
pub async fn write_lock_resource<T: AsRef<str>>(
&self,
id: T,
wait_timeout: Option<Duration>,
) -> Option<ResourceLockGuard<'_>> {
) -> Option<ResourceLockWriteGuard<'_>> {
// Calculate hash of the object to get the mutex index.
let mut hasher = DefaultHasher::new();
id.as_ref().hash(&mut hasher);
let mutex_id = hasher.finish() as usize % self.object_locks.len();
acquire_write_lock(&self.object_locks[mutex_id], wait_timeout).await
}

/// Acquire the subsystem read lock.
pub async fn read_lock_subsystem(
&self,
wait_timeout: Option<Duration>,
) -> Option<ResourceLockReadGuard<'_>> {
acquire_read_lock(&self.subsystem_lock, wait_timeout).await
}

acquire_lock(&self.object_locks[mutex_id], wait_timeout).await
/// Read lock subsystem resource by its ID and obtain a lock guard.
pub async fn read_lock_resource<T: AsRef<str>>(
&self,
id: T,
wait_timeout: Option<Duration>,
) -> Option<ResourceLockReadGuard<'_>> {
// Calculate hash of the object to get the mutex index.
let mut hasher = DefaultHasher::new();
id.as_ref().hash(&mut hasher);
let mutex_id = hasher.finish() as usize % self.object_locks.len();
acquire_read_lock(&self.object_locks[mutex_id], wait_timeout).await
}
}

Expand All @@ -107,35 +131,59 @@ pub struct ResourceLockManager {
/// All known resource subsystems with locks.
subsystems: Vec<ResourceSubsystem>,
/// Global resource lock,
mgr_lock: Mutex<LockStats>,
mgr_lock: Arc<RwLock<LockStats>>,
}

/// Automatically releases the lock once dropped.
pub struct ResourceLockGuard<'a> {
_lock_guard: MutexGuard<'a, LockStats>,
/// Automatically releases the read lock once dropped.
pub struct ResourceLockReadGuard<'a> {
_lock_guard: RwLockReadGuard<'a, LockStats>,
}

/// Automatically releases the write lock once dropped.
pub struct ResourceLockWriteGuard<'a> {
_lock_guard: RwLockWriteGuard<'a, LockStats>,
}

/// Global instance of the resource lock manager.
static LOCK_MANAGER: OnceCell<ResourceLockManager> = OnceCell::new();

/// Helper function to abstract common lock acquisition logic.
async fn acquire_lock(
lock: &Mutex<LockStats>,
/// Helper function to aquire read lock.
async fn acquire_read_lock(
lock: &Arc<RwLock<LockStats>>,
wait_timeout: Option<Duration>,
) -> Option<ResourceLockGuard<'_>> {
let mut lock_guard = if let Some(d) = wait_timeout {
match tokio::time::timeout(d, lock.lock()).await {
) -> Option<ResourceLockReadGuard<'_>> {
let lock_guard = if let Some(d) = wait_timeout {
match tokio::time::timeout(d, lock.read()).await {
Err(_) => return None,
Ok(g) => g,
}
} else {
// No timeout, wait for the lock indefinitely.
lock.lock().await
lock.read().await
};

Some(ResourceLockReadGuard {
_lock_guard: lock_guard,
})
}

/// Helper function to aquire write lock.
async fn acquire_write_lock(
lock: &RwLock<LockStats>,
wait_timeout: Option<Duration>,
) -> Option<ResourceLockWriteGuard<'_>> {
let mut lock_guard = if let Some(d) = wait_timeout {
match tokio::time::timeout(d, lock.write()).await {
Err(_) => return None,
Ok(g) => g,
}
} else {
// No timeout, wait for the lock indefinitely.
lock.write().await
};
lock_guard.num_acquires += 1;

Some(ResourceLockGuard {
Some(ResourceLockWriteGuard {
_lock_guard: lock_guard,
})
}
Expand All @@ -153,17 +201,25 @@ impl ResourceLockManager {

ResourceLockManager {
subsystems,
mgr_lock: Mutex::new(LockStats::default()),
mgr_lock: RwLock::new(LockStats::default()).into(),
}
});
}

/// Acquire the global Lock manager lock.
pub async fn lock(
/// Acquire the global Lock manager write lock.
pub async fn write_lock(
&self,
wait_timeout: Option<Duration>,
) -> Option<ResourceLockWriteGuard<'_>> {
acquire_write_lock(&self.mgr_lock, wait_timeout).await
}

/// Acquire the global Lock manager read lock.
pub async fn read_lock(
&self,
wait_timeout: Option<Duration>,
) -> Option<ResourceLockGuard<'_>> {
acquire_lock(&self.mgr_lock, wait_timeout).await
) -> Option<ResourceLockReadGuard<'_>> {
acquire_read_lock(&self.mgr_lock, wait_timeout).await
}

/// Get resource subsystem by its id.
Expand All @@ -186,4 +242,5 @@ impl ResourceLockManager {
}
}

impl ResourceLockGuard<'_> {}
impl ResourceLockReadGuard<'_> {}
impl ResourceLockWriteGuard<'_> {}
3 changes: 2 additions & 1 deletion io-engine/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ pub use reactor::{

pub use lock::{
ProtectedSubsystems,
ResourceLockGuard,
ResourceLockManager,
ResourceLockManagerConfig,
ResourceLockReadGuard,
ResourceLockWriteGuard,
ResourceSubsystem,
};

Expand Down
138 changes: 133 additions & 5 deletions io-engine/src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
use futures::{channel::oneshot::Receiver, FutureExt};
use nix::errno::Errno;
pub use server::MayastorGrpcServer;
use std::{
error::Error,
fmt::{Debug, Display},
future::Future,
time::Duration,
};

use futures::channel::oneshot::Receiver;
use nix::errno::Errno;
pub use server::MayastorGrpcServer;
use tokio::sync::RwLock;
use tonic::{Request, Response, Status};

use crate::{
bdev_api::BdevError,
core::{CoreError, Reactor, VerboseError},
core::{
lock::ResourceLockManager,
CoreError,
Reactor,
ResourceSubsystem,
VerboseError,
},
};

use std::panic::AssertUnwindSafe;

impl From<BdevError> for tonic::Status {
fn from(e: BdevError) -> Self {
match e {
Expand Down Expand Up @@ -131,6 +138,127 @@ pub(crate) trait RWSerializer<F, T> {
async fn shared(&self, ctx: GrpcClientContext, f: F) -> Result<T, Status>;
}

/// Trait give the default implementation of the gRPC serialization.
/// Each service can implement its own serialization logic.
#[async_trait::async_trait]
pub(crate) trait GrpcRWSerializer<F, T> {
async fn serialize_grpc(
&self,
ctx: GrpcClientContext,
resource: Option<String>,
subsystem: ResourceSubsystem,
global_operation: bool,
write_lock: bool,
f: F,
) -> Result<T, Status>
where
T: Send + 'static,
F: core::future::Future<Output = Result<T, Status>> + Send + 'static,
{
async fn acquire_lock(
ctx: &GrpcClientContext,
resource: Option<String>,
subsystem: ResourceSubsystem,
write_lock: bool,
global_operation: bool,
) -> Result<(), Status> {
let lock_manager = ResourceLockManager::get_instance();
if global_operation {
if write_lock {
match lock_manager.write_lock(Some(ctx.timeout)).await {
Some(_) => (),
None => return Err(Status::deadline_exceeded(
"Failed to acquire global write lock within given timeout"
.to_string()
)),
}
} else {
match lock_manager.read_lock(Some(ctx.timeout)).await {
Some(_) => (),
None => return Err(Status::deadline_exceeded(
"Failed to acquire global read lock within given timeout"
.to_string()
)),
}
}
}
if let Some(resource) = resource {
if write_lock {
match subsystem.write_lock_resource(resource, Some(ctx.timeout))
.await {
Some(_) => (),
None => return Err(Status::deadline_exceeded(
"Failed to acquire write lock for the resource within given timeout"
.to_string()
)),
};
} else {
match subsystem.read_lock_resource(resource, Some(ctx.timeout))
.await {
Some(_) => (),
None => return Err(Status::deadline_exceeded(
"Failed to acquire read lock for the resource within given timeout"
.to_string()
)),
};
}
} else if write_lock {
match subsystem.write_lock_subsystem(Some(ctx.timeout)).await {
Some(_) => (),
None => return Err(Status::deadline_exceeded(
"Failed to acquire subsystem write lock within given timeout"
.to_string()
)),
};
} else {
match subsystem.read_lock_subsystem(Some(ctx.timeout)).await {
Some(_) => (),
None => return Err(Status::deadline_exceeded(
"Failed to acquire subsystem read lock within given timeout"
.to_string()
)),
};
}
Ok(())
}

let fut = AssertUnwindSafe(f).catch_unwind();
// Schedule a Tokio task to detach it from the high-level gRPC future
// and avoid task cancellation when the top-level gRPC future is
// cancelled.
match tokio::spawn(async move {
acquire_lock(
&ctx,
resource,
subsystem,
write_lock,
global_operation,
)
.await?;

let r = fut.await;

match r {
Ok(r) => r,
Err(_e) => {
warn!(
"{}: gRPC method panicked, args: {}",
ctx.id, ctx.args
);
Err(Status::cancelled(format!(
"{}: gRPC method panicked",
ctx.id
)))
}
}
})
.await
{
Ok(r) => r,
Err(_) => Err(Status::cancelled("gRPC call cancelled")),
}
}
}
/// Trait allows service implementing to return RWLock of itself to the
/// caller.
#[async_trait::async_trait]
Expand Down
Loading

0 comments on commit 3ef38b9

Please sign in to comment.