Skip to content

Commit

Permalink
fix(lock): make import pool, create replica and share replica operati…
Browse files Browse the repository at this point in the history
…ons mutually exclusive

Signed-off-by: Hrudaya <hrudayaranjan.sahoo@datacore.com>
  • Loading branch information
hrudaya21 committed Apr 8, 2024
1 parent f2f74e7 commit 640cfe1
Show file tree
Hide file tree
Showing 11 changed files with 177 additions and 35 deletions.
4 changes: 3 additions & 1 deletion io-engine/src/bin/io-engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ fn start_tokio_runtime(args: &MayastorCliArgs) {

// Initialize Lock manager.
let cfg = ResourceLockManagerConfig::default()
.with_subsystem(ProtectedSubsystems::NEXUS, 512);
.with_subsystem(ProtectedSubsystems::POOL, 32)
.with_subsystem(ProtectedSubsystems::NEXUS, 512)
.with_subsystem(ProtectedSubsystems::REPLICA, 1024);
ResourceLockManager::initialize(cfg);

Mthread::spawn_unaffinitized(move || {
Expand Down
22 changes: 17 additions & 5 deletions io-engine/src/core/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use once_cell::sync::OnceCell;
pub struct ProtectedSubsystems;
impl ProtectedSubsystems {
pub const NEXUS: &'static str = "nexus";
pub const POOL: &'static str = "pool";
pub const REPLICA: &'static str = "replica";
}

/// Configuration parameters for initialization of the Lock manager.
Expand Down Expand Up @@ -41,6 +43,7 @@ impl ResourceLockManagerConfig {
}

/// Resource subsystem that holds locks for all resources withing this system.
#[derive(Debug)]
pub struct ResourceSubsystem {
id: String,
object_locks: Vec<Mutex<LockStats>>,
Expand All @@ -67,22 +70,23 @@ impl ResourceSubsystem {
pub async fn lock(
&self,
wait_timeout: Option<Duration>,
try_lock: bool,
) -> Option<ResourceLockGuard<'_>> {
acquire_lock(&self.subsystem_lock, wait_timeout).await
acquire_lock(&self.subsystem_lock, wait_timeout, try_lock).await
}

/// Lock subsystem resource by its ID and obtain a lock guard.
pub async fn lock_resource<T: AsRef<str>>(
&self,
id: T,
wait_timeout: Option<Duration>,
try_lock: bool,
) -> Option<ResourceLockGuard<'_>> {
// Calculate hash of the object to get the mutex index.
let mut hasher = DefaultHasher::new();
id.as_ref().hash(&mut hasher);
let mutex_id = hasher.finish() as usize % self.object_locks.len();

acquire_lock(&self.object_locks[mutex_id], wait_timeout).await
acquire_lock(&self.object_locks[mutex_id], wait_timeout, try_lock).await
}
}

Expand Down Expand Up @@ -122,14 +126,21 @@ static LOCK_MANAGER: OnceCell<ResourceLockManager> = OnceCell::new();
async fn acquire_lock(
lock: &Mutex<LockStats>,
wait_timeout: Option<Duration>,
try_lock: bool,
) -> Option<ResourceLockGuard<'_>> {
let mut lock_guard = if let Some(d) = wait_timeout {
match tokio::time::timeout(d, lock.lock()).await {
Err(_) => return None,
Ok(g) => g,
}
} else if try_lock {
// No timeout, try for taking lock immediately.
match lock.try_lock() {
Some(l) => l,
None => return None,
}
} else {
// No timeout, wait for the lock indefinitely.
// No timeout, wait indefinitely.
lock.lock().await
};

Expand Down Expand Up @@ -162,8 +173,9 @@ impl ResourceLockManager {
pub async fn lock(
&self,
wait_timeout: Option<Duration>,
try_lock: bool,
) -> Option<ResourceLockGuard<'_>> {
acquire_lock(&self.mgr_lock, wait_timeout).await
acquire_lock(&self.mgr_lock, wait_timeout, try_lock).await
}

/// Get resource subsystem by its id.
Expand Down
38 changes: 33 additions & 5 deletions io-engine/src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
use futures::channel::oneshot::Receiver;
use nix::errno::Errno;
pub use server::MayastorGrpcServer;
use std::{
error::Error,
fmt::{Debug, Display},
future::Future,
time::Duration,
};

use futures::channel::oneshot::Receiver;
use nix::errno::Errno;
pub use server::MayastorGrpcServer;
use tokio::sync::RwLock;
use tonic::{Request, Response, Status};

use crate::{
bdev_api::BdevError,
core::{CoreError, Reactor, VerboseError},
core::{
CoreError,
Reactor,
ResourceLockGuard,
ResourceSubsystem,
VerboseError,
},
};

impl From<BdevError> for tonic::Status {
Expand Down Expand Up @@ -168,6 +173,29 @@ where
.map_err(|_| Status::resource_exhausted("ENOMEM"))
}

/// Manage locks across multiple grpc services.
pub async fn acquire_subsystem_lock<'a>(
subsystem: &'a ResourceSubsystem,
resource: Option<&str>,
) -> Result<ResourceLockGuard<'a>, Status> {
if let Some(resource) = resource {
match subsystem.lock_resource(resource.to_string(), None, true).await {
Some(lock_guard) => Ok(lock_guard),
None => Err(Status::already_exists(format!(
"Failed to acquire lock for the resource: {resource}, lock already held"
))),
}
} else {
match subsystem.lock(None, true).await {
Some(lock_guard) => Ok(lock_guard),
None => Err(Status::already_exists(format!(
"Failed to acquire subsystem lock: {:?}, lock already held",
subsystem
))),
}
}
}

macro_rules! default_ip {
() => {
"0.0.0.0"
Expand Down
7 changes: 5 additions & 2 deletions io-engine/src/grpc/v0/mayastor_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl MayastorSvc {
match tokio::spawn(async move {
// Grab global operation lock, if requested.
let _global_guard = if global_operation {
match lock_manager.lock(Some(ctx.timeout)).await {
match lock_manager.lock(Some(ctx.timeout), false).await {
Some(g) => Some(g),
None => return Err(Status::deadline_exceeded(
"Failed to acquire access to object within given timeout"
Expand All @@ -169,7 +169,7 @@ impl MayastorSvc {
// Grab per-object lock before executing the future.
let _resource_guard = match lock_manager
.get_subsystem(ProtectedSubsystems::NEXUS)
.lock_resource(nexus_uuid, Some(ctx.timeout))
.lock_resource(nexus_uuid, Some(ctx.timeout), false)
.await {
Some(g) => g,
None => return Err(Status::deadline_exceeded(
Expand Down Expand Up @@ -302,6 +302,9 @@ impl From<LvsError> for tonic::Status {
LvsError::WipeFailed {
source,
} => source.into(),
LvsError::ResourceLockFailed {
..
} => Status::aborted(e.to_string()),
_ => Status::internal(e.verbose()),
}
}
Expand Down
4 changes: 2 additions & 2 deletions io-engine/src/grpc/v1/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl NexusService {
match tokio::spawn(async move {
// Grab global operation lock, if requested.
let _global_guard = if global_operation {
match lock_manager.lock(Some(ctx.timeout)).await {
match lock_manager.lock(Some(ctx.timeout), false).await {
Some(g) => Some(g),
None => return Err(Status::deadline_exceeded(
"Failed to acquire access to object within given timeout"
Expand All @@ -95,7 +95,7 @@ impl NexusService {
// Grab per-object lock before executing the future.
let _resource_guard = match lock_manager
.get_subsystem(ProtectedSubsystems::NEXUS)
.lock_resource(nexus_uuid, Some(ctx.timeout))
.lock_resource(nexus_uuid, Some(ctx.timeout), false)
.await {
Some(g) => g,
None => return Err(Status::deadline_exceeded(
Expand Down
29 changes: 27 additions & 2 deletions io-engine/src/grpc/v1/pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
use crate::{
core::Share,
grpc::{rpc_submit, GrpcClientContext, GrpcResult, RWLock, RWSerializer},
core::{
lock::{ProtectedSubsystems, ResourceLockManager},
Share,
},
grpc::{
acquire_subsystem_lock,
rpc_submit,
GrpcClientContext,
GrpcResult,
RWLock,
RWSerializer,
},
lvs::{Error as LvsError, Lvs},
pool_backend::{PoolArgs, PoolBackend},
};
Expand Down Expand Up @@ -314,6 +324,21 @@ impl PoolRpc for PoolService {
let args = request.into_inner();
info!("{:?}", args);
let rx = rpc_submit::<_, _, LvsError>(async move {
let pool_subsystem = ResourceLockManager::get_instance()
.get_subsystem(ProtectedSubsystems::POOL);
let _lock_guard = acquire_subsystem_lock(
pool_subsystem,
Some(&args.name),
)
.await
.map_err(|_| {
LvsError::ResourceLockFailed {
msg: format!(
"resource {}, for disk pool {:?}",
&args.name, &args.disks,
),
}
})?;
let pool = Lvs::import_from_args(PoolArgs::try_from(args)?)
.await?;
Ok(Pool::from(pool))
Expand Down
38 changes: 36 additions & 2 deletions io-engine/src/grpc/v1/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
bdev::PtplFileOps,
bdev_api::BdevError,
core::{
lock::{ProtectedSubsystems, ResourceLockManager},
logical_volume::LogicalVolume,
Bdev,
CloneXattrs,
Expand All @@ -11,7 +12,14 @@ use crate::{
UntypedBdev,
UpdateProps,
},
grpc::{rpc_submit, GrpcClientContext, GrpcResult, RWLock, RWSerializer},
grpc::{
acquire_subsystem_lock,
rpc_submit,
GrpcClientContext,
GrpcResult,
RWLock,
RWSerializer,
},
lvs::{Error as LvsError, Lvol, LvolSpaceUsage, Lvs, LvsLvol, PropValue},
};
use ::function_name::named;
Expand Down Expand Up @@ -219,6 +227,20 @@ impl ReplicaRpc for ReplicaService {
}
}
};
let pool_subsystem = ResourceLockManager::get_instance().get_subsystem(ProtectedSubsystems::POOL);
let _lock_guard = acquire_subsystem_lock(
pool_subsystem, Some(lvs.name())
)
.await
.map_err(|_|
LvsError::ResourceLockFailed {
msg: format!(
"resource {}, for pooluuid {}",
lvs.name(),
args.pooluuid
)
}
)?;
// if pooltype is not Lvs, the provided replica uuid need to be added as
match lvs.create_lvol(&args.name, args.size, Some(&args.uuid), args.thin, args.entity_id).await {
Ok(mut lvol)
Expand Down Expand Up @@ -401,7 +423,19 @@ impl ReplicaRpc for ReplicaService {
match Bdev::lookup_by_uuid_str(&args.uuid) {
Some(bdev) => {
let mut lvol = Lvol::try_from(bdev)?;

let pool_subsystem = ResourceLockManager::get_instance().get_subsystem(ProtectedSubsystems::POOL);
let _lock_guard = acquire_subsystem_lock(
pool_subsystem,
Some(lvol.lvs().name()),
)
.await
.map_err(|_| LvsError::ResourceLockFailed {
msg: format!(
"resource {}, for lvol {:?}",
lvol.lvs().name(),
lvol
),
})?;
// if we are already shared with the same protocol
if lvol.shared()
== Some(Protocol::try_from(args.share)?)
Expand Down
4 changes: 2 additions & 2 deletions io-engine/src/grpc/v1/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ impl SnapshotService {
match tokio::spawn(async move {
// Grab global operation lock, if requested.
let _global_guard = if global_operation {
match lock_manager.lock(Some(ctx.timeout)).await {
match lock_manager.lock(Some(ctx.timeout), false).await {
Some(g) => Some(g),
None => return Err(Status::deadline_exceeded(
"Failed to acquire access to object within given timeout"
Expand All @@ -255,7 +255,7 @@ impl SnapshotService {
// Grab per-object lock before executing the future.
let _resource_guard = match lock_manager
.get_subsystem(ProtectedSubsystems::NEXUS)
.lock_resource(nexus_uuid, Some(ctx.timeout))
.lock_resource(nexus_uuid, Some(ctx.timeout), false)
.await {
Some(g) => g,
None => return Err(Status::deadline_exceeded(
Expand Down
4 changes: 2 additions & 2 deletions io-engine/src/grpc/v1/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ where
let lock_manager = ResourceLockManager::get_instance();
// For nexus global lock.
let _global_guard =
match lock_manager.lock(Some(ctx.timeout)).await {
match lock_manager.lock(Some(ctx.timeout), false).await {
Some(g) => Some(g),
None => return Err(Status::deadline_exceeded(
"Failed to acquire access to object within given timeout",
Expand Down Expand Up @@ -96,7 +96,7 @@ impl StatsService {
let lock_manager = ResourceLockManager::get_instance();
// For nexus global lock.
let _global_guard =
match lock_manager.lock(Some(ctx.timeout)).await {
match lock_manager.lock(Some(ctx.timeout), false).await {
Some(g) => Some(g),
None => return Err(Status::deadline_exceeded(
"Failed to acquire access to object within given timeout",
Expand Down
7 changes: 7 additions & 0 deletions io-engine/src/lvs/lvs_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ pub enum Error {
WipeFailed {
source: crate::core::wiper::Error,
},
#[snafu(display("Failed to acquire resource lock, {}", msg))]
ResourceLockFailed {
msg: String,
},
}

/// Map CoreError to errno code.
Expand Down Expand Up @@ -265,6 +269,9 @@ impl ToErrno for Error {
Self::WipeFailed {
..
} => Errno::EINVAL,
Self::ResourceLockFailed {
..
} => Errno::EBUSY,
}
}
}
Expand Down
Loading

0 comments on commit 640cfe1

Please sign in to comment.