diff --git a/io-engine/src/bin/io-engine.rs b/io-engine/src/bin/io-engine.rs index f1d3457458..e5cad95d15 100644 --- a/io-engine/src/bin/io-engine.rs +++ b/io-engine/src/bin/io-engine.rs @@ -100,7 +100,9 @@ fn start_tokio_runtime(args: &MayastorCliArgs) { // Initialize Lock manager. let cfg = ResourceLockManagerConfig::default() - .with_subsystem(ProtectedSubsystems::NEXUS, 512); + .with_subsystem(ProtectedSubsystems::POOL, 32) + .with_subsystem(ProtectedSubsystems::NEXUS, 512) + .with_subsystem(ProtectedSubsystems::REPLICA, 1024); ResourceLockManager::initialize(cfg); Mthread::spawn_unaffinitized(move || { diff --git a/io-engine/src/core/lock.rs b/io-engine/src/core/lock.rs index d3fef4a7c9..e138d5602c 100755 --- a/io-engine/src/core/lock.rs +++ b/io-engine/src/core/lock.rs @@ -11,6 +11,8 @@ use once_cell::sync::OnceCell; pub struct ProtectedSubsystems; impl ProtectedSubsystems { pub const NEXUS: &'static str = "nexus"; + pub const POOL: &'static str = "pool"; + pub const REPLICA: &'static str = "replica"; } /// Configuration parameters for initialization of the Lock manager. @@ -41,6 +43,7 @@ impl ResourceLockManagerConfig { } /// Resource subsystem that holds locks for all resources withing this system. +#[derive(Debug)] pub struct ResourceSubsystem { id: String, object_locks: Vec>, @@ -81,7 +84,6 @@ impl ResourceSubsystem { let mut hasher = DefaultHasher::new(); id.as_ref().hash(&mut hasher); let mutex_id = hasher.finish() as usize % self.object_locks.len(); - acquire_lock(&self.object_locks[mutex_id], wait_timeout).await } } @@ -129,8 +131,11 @@ async fn acquire_lock( Ok(g) => g, } } else { - // No timeout, wait for the lock indefinitely. - lock.lock().await + // No timeout, try for taking lock immediately. + match lock.try_lock() { + Some(l) => l, + None => return None, + } }; lock_guard.num_acquires += 1; diff --git a/io-engine/src/grpc/mod.rs b/io-engine/src/grpc/mod.rs index 6ef7eebece..46f911c3bc 100644 --- a/io-engine/src/grpc/mod.rs +++ b/io-engine/src/grpc/mod.rs @@ -1,19 +1,24 @@ +use futures::channel::oneshot::Receiver; +use nix::errno::Errno; +pub use server::MayastorGrpcServer; use std::{ error::Error, fmt::{Debug, Display}, future::Future, time::Duration, }; - -use futures::channel::oneshot::Receiver; -use nix::errno::Errno; -pub use server::MayastorGrpcServer; use tokio::sync::RwLock; use tonic::{Request, Response, Status}; use crate::{ bdev_api::BdevError, - core::{CoreError, Reactor, VerboseError}, + core::{ + CoreError, + Reactor, + ResourceLockGuard, + ResourceSubsystem, + VerboseError, + }, }; impl From for tonic::Status { @@ -168,6 +173,29 @@ where .map_err(|_| Status::resource_exhausted("ENOMEM")) } +/// Manage locks across multiple grpc services. +pub async fn acquire_subsystem_lock<'a>( + subsystem: &'a ResourceSubsystem, + resource: Option<&str>, +) -> Result, Status> { + if let Some(resource) = resource { + match subsystem.lock_resource(resource.to_string(), None).await { + Some(lock_guard) => Ok(lock_guard), + None => Err(Status::already_exists(format!( + "Failed to acquire lock for the resource: {resource}, lock already held" + ))), + } + } else { + match subsystem.lock(None).await { + Some(lock_guard) => Ok(lock_guard), + None => Err(Status::already_exists(format!( + "Failed to acquire subsystem lock: {:?}, lock already held", + subsystem + ))), + } + } +} + macro_rules! default_ip { () => { "0.0.0.0" diff --git a/io-engine/src/grpc/v0/mayastor_grpc.rs b/io-engine/src/grpc/v0/mayastor_grpc.rs index 5d83d6d337..00f4eadbc7 100644 --- a/io-engine/src/grpc/v0/mayastor_grpc.rs +++ b/io-engine/src/grpc/v0/mayastor_grpc.rs @@ -302,6 +302,9 @@ impl From for tonic::Status { LvsError::WipeFailed { source, } => source.into(), + LvsError::ResourceLockFailed { + .. + } => Status::aborted(e.to_string()), _ => Status::internal(e.verbose()), } } diff --git a/io-engine/src/grpc/v1/pool.rs b/io-engine/src/grpc/v1/pool.rs index a6ff292c55..8c141ffccf 100644 --- a/io-engine/src/grpc/v1/pool.rs +++ b/io-engine/src/grpc/v1/pool.rs @@ -1,6 +1,16 @@ use crate::{ - core::Share, - grpc::{rpc_submit, GrpcClientContext, GrpcResult, RWLock, RWSerializer}, + core::{ + lock::{ProtectedSubsystems, ResourceLockManager}, + Share, + }, + grpc::{ + acquire_subsystem_lock, + rpc_submit, + GrpcClientContext, + GrpcResult, + RWLock, + RWSerializer, + }, lvs::{Error as LvsError, Lvs}, pool_backend::{PoolArgs, PoolBackend}, }; @@ -314,6 +324,21 @@ impl PoolRpc for PoolService { let args = request.into_inner(); info!("{:?}", args); let rx = rpc_submit::<_, _, LvsError>(async move { + let pool_subsystem = ResourceLockManager::get_instance() + .get_subsystem(ProtectedSubsystems::POOL); + let _lock_guard = acquire_subsystem_lock( + pool_subsystem, + Some(&args.name), + ) + .await + .map_err(|_| { + LvsError::ResourceLockFailed { + msg: format!( + "resource {}, for disk pool {:?}", + &args.name, &args.disks, + ), + } + })?; let pool = Lvs::import_from_args(PoolArgs::try_from(args)?) .await?; Ok(Pool::from(pool)) diff --git a/io-engine/src/grpc/v1/replica.rs b/io-engine/src/grpc/v1/replica.rs index 72e6c65252..733f267797 100644 --- a/io-engine/src/grpc/v1/replica.rs +++ b/io-engine/src/grpc/v1/replica.rs @@ -2,6 +2,7 @@ use crate::{ bdev::PtplFileOps, bdev_api::BdevError, core::{ + lock::{ProtectedSubsystems, ResourceLockManager}, logical_volume::LogicalVolume, Bdev, CloneXattrs, @@ -11,7 +12,14 @@ use crate::{ UntypedBdev, UpdateProps, }, - grpc::{rpc_submit, GrpcClientContext, GrpcResult, RWLock, RWSerializer}, + grpc::{ + acquire_subsystem_lock, + rpc_submit, + GrpcClientContext, + GrpcResult, + RWLock, + RWSerializer, + }, lvs::{Error as LvsError, Lvol, LvolSpaceUsage, Lvs, LvsLvol, PropValue}, }; use ::function_name::named; @@ -219,6 +227,20 @@ impl ReplicaRpc for ReplicaService { } } }; + let pool_subsystem = ResourceLockManager::get_instance().get_subsystem(ProtectedSubsystems::POOL); + let _lock_guard = acquire_subsystem_lock( + pool_subsystem, Some(lvs.name()) + ) + .await + .map_err(|_| + LvsError::ResourceLockFailed { + msg: format!( + "resource {}, for pooluuid {}", + lvs.name(), + args.pooluuid + ) + } + )?; // if pooltype is not Lvs, the provided replica uuid need to be added as match lvs.create_lvol(&args.name, args.size, Some(&args.uuid), args.thin, args.entity_id).await { Ok(mut lvol) @@ -401,7 +423,19 @@ impl ReplicaRpc for ReplicaService { match Bdev::lookup_by_uuid_str(&args.uuid) { Some(bdev) => { let mut lvol = Lvol::try_from(bdev)?; - + let pool_subsystem = ResourceLockManager::get_instance().get_subsystem(ProtectedSubsystems::POOL); + let _lock_guard = acquire_subsystem_lock( + pool_subsystem, + Some(lvol.lvs().name()), + ) + .await + .map_err(|_| LvsError::ResourceLockFailed { + msg: format!( + "resource {}, for lvol {:?}", + lvol.lvs().name(), + lvol + ), + })?; // if we are already shared with the same protocol if lvol.shared() == Some(Protocol::try_from(args.share)?) diff --git a/io-engine/src/lvs/lvs_error.rs b/io-engine/src/lvs/lvs_error.rs index dc1ab9964e..0dca264c73 100644 --- a/io-engine/src/lvs/lvs_error.rs +++ b/io-engine/src/lvs/lvs_error.rs @@ -178,6 +178,10 @@ pub enum Error { WipeFailed { source: crate::core::wiper::Error, }, + #[snafu(display("Failed to acquire resource lock, {}", msg))] + ResourceLockFailed { + msg: String, + }, } /// Map CoreError to errno code. @@ -265,6 +269,9 @@ impl ToErrno for Error { Self::WipeFailed { .. } => Errno::EINVAL, + Self::ResourceLockFailed { + .. + } => Errno::EBUSY, } } }