From 9b90fb0f60ac2763780570cde7dcbe43f34f5c3e Mon Sep 17 00:00:00 2001 From: Hrudaya Date: Fri, 29 Mar 2024 10:35:45 +0000 Subject: [PATCH] fix(lock): make import pool, create replica and share replica operations mutually exclusive Signed-off-by: Hrudaya --- io-engine/src/bin/io-engine.rs | 4 +- io-engine/src/core/lock.rs | 22 ++++++++--- io-engine/src/grpc/mod.rs | 38 +++++++++++++++--- io-engine/src/grpc/v0/mayastor_grpc.rs | 7 +++- io-engine/src/grpc/v1/nexus.rs | 4 +- io-engine/src/grpc/v1/pool.rs | 29 +++++++++++++- io-engine/src/grpc/v1/replica.rs | 38 +++++++++++++++++- io-engine/src/grpc/v1/snapshot.rs | 4 +- io-engine/src/grpc/v1/stats.rs | 4 +- io-engine/src/lvs/lvs_error.rs | 7 ++++ io-engine/tests/lock.rs | 55 ++++++++++++++++++++------ 11 files changed, 177 insertions(+), 35 deletions(-) diff --git a/io-engine/src/bin/io-engine.rs b/io-engine/src/bin/io-engine.rs index f1d3457458..e5cad95d15 100644 --- a/io-engine/src/bin/io-engine.rs +++ b/io-engine/src/bin/io-engine.rs @@ -100,7 +100,9 @@ fn start_tokio_runtime(args: &MayastorCliArgs) { // Initialize Lock manager. let cfg = ResourceLockManagerConfig::default() - .with_subsystem(ProtectedSubsystems::NEXUS, 512); + .with_subsystem(ProtectedSubsystems::POOL, 32) + .with_subsystem(ProtectedSubsystems::NEXUS, 512) + .with_subsystem(ProtectedSubsystems::REPLICA, 1024); ResourceLockManager::initialize(cfg); Mthread::spawn_unaffinitized(move || { diff --git a/io-engine/src/core/lock.rs b/io-engine/src/core/lock.rs index d3fef4a7c9..9490ba9a6b 100755 --- a/io-engine/src/core/lock.rs +++ b/io-engine/src/core/lock.rs @@ -11,6 +11,8 @@ use once_cell::sync::OnceCell; pub struct ProtectedSubsystems; impl ProtectedSubsystems { pub const NEXUS: &'static str = "nexus"; + pub const POOL: &'static str = "pool"; + pub const REPLICA: &'static str = "replica"; } /// Configuration parameters for initialization of the Lock manager. @@ -41,6 +43,7 @@ impl ResourceLockManagerConfig { } /// Resource subsystem that holds locks for all resources withing this system. +#[derive(Debug)] pub struct ResourceSubsystem { id: String, object_locks: Vec>, @@ -67,8 +70,9 @@ impl ResourceSubsystem { pub async fn lock( &self, wait_timeout: Option, + try_lock: bool, ) -> Option> { - acquire_lock(&self.subsystem_lock, wait_timeout).await + acquire_lock(&self.subsystem_lock, wait_timeout, try_lock).await } /// Lock subsystem resource by its ID and obtain a lock guard. @@ -76,13 +80,13 @@ impl ResourceSubsystem { &self, id: T, wait_timeout: Option, + try_lock: bool, ) -> Option> { // Calculate hash of the object to get the mutex index. let mut hasher = DefaultHasher::new(); id.as_ref().hash(&mut hasher); let mutex_id = hasher.finish() as usize % self.object_locks.len(); - - acquire_lock(&self.object_locks[mutex_id], wait_timeout).await + acquire_lock(&self.object_locks[mutex_id], wait_timeout, try_lock).await } } @@ -122,14 +126,21 @@ static LOCK_MANAGER: OnceCell = OnceCell::new(); async fn acquire_lock( lock: &Mutex, wait_timeout: Option, + try_lock: bool, ) -> Option> { let mut lock_guard = if let Some(d) = wait_timeout { match tokio::time::timeout(d, lock.lock()).await { Err(_) => return None, Ok(g) => g, } + } else if try_lock { + // No timeout, try for taking lock immediately. + match lock.try_lock() { + Some(l) => l, + None => return None, + } } else { - // No timeout, wait for the lock indefinitely. + // No timeout, wait indefinitely. lock.lock().await }; @@ -162,8 +173,9 @@ impl ResourceLockManager { pub async fn lock( &self, wait_timeout: Option, + try_lock: bool, ) -> Option> { - acquire_lock(&self.mgr_lock, wait_timeout).await + acquire_lock(&self.mgr_lock, wait_timeout, try_lock).await } /// Get resource subsystem by its id. diff --git a/io-engine/src/grpc/mod.rs b/io-engine/src/grpc/mod.rs index 6ef7eebece..817643fd5e 100644 --- a/io-engine/src/grpc/mod.rs +++ b/io-engine/src/grpc/mod.rs @@ -1,19 +1,24 @@ +use futures::channel::oneshot::Receiver; +use nix::errno::Errno; +pub use server::MayastorGrpcServer; use std::{ error::Error, fmt::{Debug, Display}, future::Future, time::Duration, }; - -use futures::channel::oneshot::Receiver; -use nix::errno::Errno; -pub use server::MayastorGrpcServer; use tokio::sync::RwLock; use tonic::{Request, Response, Status}; use crate::{ bdev_api::BdevError, - core::{CoreError, Reactor, VerboseError}, + core::{ + CoreError, + Reactor, + ResourceLockGuard, + ResourceSubsystem, + VerboseError, + }, }; impl From for tonic::Status { @@ -168,6 +173,29 @@ where .map_err(|_| Status::resource_exhausted("ENOMEM")) } +/// Manage locks across multiple grpc services. +pub async fn acquire_subsystem_lock<'a>( + subsystem: &'a ResourceSubsystem, + resource: Option<&str>, +) -> Result, Status> { + if let Some(resource) = resource { + match subsystem.lock_resource(resource.to_string(), None, true).await { + Some(lock_guard) => Ok(lock_guard), + None => Err(Status::already_exists(format!( + "Failed to acquire lock for the resource: {resource}, lock already held" + ))), + } + } else { + match subsystem.lock(None, true).await { + Some(lock_guard) => Ok(lock_guard), + None => Err(Status::already_exists(format!( + "Failed to acquire subsystem lock: {:?}, lock already held", + subsystem + ))), + } + } +} + macro_rules! default_ip { () => { "0.0.0.0" diff --git a/io-engine/src/grpc/v0/mayastor_grpc.rs b/io-engine/src/grpc/v0/mayastor_grpc.rs index 5d83d6d337..65039e8b4b 100644 --- a/io-engine/src/grpc/v0/mayastor_grpc.rs +++ b/io-engine/src/grpc/v0/mayastor_grpc.rs @@ -155,7 +155,7 @@ impl MayastorSvc { match tokio::spawn(async move { // Grab global operation lock, if requested. let _global_guard = if global_operation { - match lock_manager.lock(Some(ctx.timeout)).await { + match lock_manager.lock(Some(ctx.timeout), false).await { Some(g) => Some(g), None => return Err(Status::deadline_exceeded( "Failed to acquire access to object within given timeout" @@ -169,7 +169,7 @@ impl MayastorSvc { // Grab per-object lock before executing the future. let _resource_guard = match lock_manager .get_subsystem(ProtectedSubsystems::NEXUS) - .lock_resource(nexus_uuid, Some(ctx.timeout)) + .lock_resource(nexus_uuid, Some(ctx.timeout), false) .await { Some(g) => g, None => return Err(Status::deadline_exceeded( @@ -302,6 +302,9 @@ impl From for tonic::Status { LvsError::WipeFailed { source, } => source.into(), + LvsError::ResourceLockFailed { + .. + } => Status::aborted(e.to_string()), _ => Status::internal(e.verbose()), } } diff --git a/io-engine/src/grpc/v1/nexus.rs b/io-engine/src/grpc/v1/nexus.rs index fadb31c7e5..cfeef3867c 100644 --- a/io-engine/src/grpc/v1/nexus.rs +++ b/io-engine/src/grpc/v1/nexus.rs @@ -81,7 +81,7 @@ impl NexusService { match tokio::spawn(async move { // Grab global operation lock, if requested. let _global_guard = if global_operation { - match lock_manager.lock(Some(ctx.timeout)).await { + match lock_manager.lock(Some(ctx.timeout), false).await { Some(g) => Some(g), None => return Err(Status::deadline_exceeded( "Failed to acquire access to object within given timeout" @@ -95,7 +95,7 @@ impl NexusService { // Grab per-object lock before executing the future. let _resource_guard = match lock_manager .get_subsystem(ProtectedSubsystems::NEXUS) - .lock_resource(nexus_uuid, Some(ctx.timeout)) + .lock_resource(nexus_uuid, Some(ctx.timeout), false) .await { Some(g) => g, None => return Err(Status::deadline_exceeded( diff --git a/io-engine/src/grpc/v1/pool.rs b/io-engine/src/grpc/v1/pool.rs index a6ff292c55..8c141ffccf 100644 --- a/io-engine/src/grpc/v1/pool.rs +++ b/io-engine/src/grpc/v1/pool.rs @@ -1,6 +1,16 @@ use crate::{ - core::Share, - grpc::{rpc_submit, GrpcClientContext, GrpcResult, RWLock, RWSerializer}, + core::{ + lock::{ProtectedSubsystems, ResourceLockManager}, + Share, + }, + grpc::{ + acquire_subsystem_lock, + rpc_submit, + GrpcClientContext, + GrpcResult, + RWLock, + RWSerializer, + }, lvs::{Error as LvsError, Lvs}, pool_backend::{PoolArgs, PoolBackend}, }; @@ -314,6 +324,21 @@ impl PoolRpc for PoolService { let args = request.into_inner(); info!("{:?}", args); let rx = rpc_submit::<_, _, LvsError>(async move { + let pool_subsystem = ResourceLockManager::get_instance() + .get_subsystem(ProtectedSubsystems::POOL); + let _lock_guard = acquire_subsystem_lock( + pool_subsystem, + Some(&args.name), + ) + .await + .map_err(|_| { + LvsError::ResourceLockFailed { + msg: format!( + "resource {}, for disk pool {:?}", + &args.name, &args.disks, + ), + } + })?; let pool = Lvs::import_from_args(PoolArgs::try_from(args)?) .await?; Ok(Pool::from(pool)) diff --git a/io-engine/src/grpc/v1/replica.rs b/io-engine/src/grpc/v1/replica.rs index 72e6c65252..733f267797 100644 --- a/io-engine/src/grpc/v1/replica.rs +++ b/io-engine/src/grpc/v1/replica.rs @@ -2,6 +2,7 @@ use crate::{ bdev::PtplFileOps, bdev_api::BdevError, core::{ + lock::{ProtectedSubsystems, ResourceLockManager}, logical_volume::LogicalVolume, Bdev, CloneXattrs, @@ -11,7 +12,14 @@ use crate::{ UntypedBdev, UpdateProps, }, - grpc::{rpc_submit, GrpcClientContext, GrpcResult, RWLock, RWSerializer}, + grpc::{ + acquire_subsystem_lock, + rpc_submit, + GrpcClientContext, + GrpcResult, + RWLock, + RWSerializer, + }, lvs::{Error as LvsError, Lvol, LvolSpaceUsage, Lvs, LvsLvol, PropValue}, }; use ::function_name::named; @@ -219,6 +227,20 @@ impl ReplicaRpc for ReplicaService { } } }; + let pool_subsystem = ResourceLockManager::get_instance().get_subsystem(ProtectedSubsystems::POOL); + let _lock_guard = acquire_subsystem_lock( + pool_subsystem, Some(lvs.name()) + ) + .await + .map_err(|_| + LvsError::ResourceLockFailed { + msg: format!( + "resource {}, for pooluuid {}", + lvs.name(), + args.pooluuid + ) + } + )?; // if pooltype is not Lvs, the provided replica uuid need to be added as match lvs.create_lvol(&args.name, args.size, Some(&args.uuid), args.thin, args.entity_id).await { Ok(mut lvol) @@ -401,7 +423,19 @@ impl ReplicaRpc for ReplicaService { match Bdev::lookup_by_uuid_str(&args.uuid) { Some(bdev) => { let mut lvol = Lvol::try_from(bdev)?; - + let pool_subsystem = ResourceLockManager::get_instance().get_subsystem(ProtectedSubsystems::POOL); + let _lock_guard = acquire_subsystem_lock( + pool_subsystem, + Some(lvol.lvs().name()), + ) + .await + .map_err(|_| LvsError::ResourceLockFailed { + msg: format!( + "resource {}, for lvol {:?}", + lvol.lvs().name(), + lvol + ), + })?; // if we are already shared with the same protocol if lvol.shared() == Some(Protocol::try_from(args.share)?) diff --git a/io-engine/src/grpc/v1/snapshot.rs b/io-engine/src/grpc/v1/snapshot.rs index 33fadc31ad..5be81a48c1 100644 --- a/io-engine/src/grpc/v1/snapshot.rs +++ b/io-engine/src/grpc/v1/snapshot.rs @@ -241,7 +241,7 @@ impl SnapshotService { match tokio::spawn(async move { // Grab global operation lock, if requested. let _global_guard = if global_operation { - match lock_manager.lock(Some(ctx.timeout)).await { + match lock_manager.lock(Some(ctx.timeout), false).await { Some(g) => Some(g), None => return Err(Status::deadline_exceeded( "Failed to acquire access to object within given timeout" @@ -255,7 +255,7 @@ impl SnapshotService { // Grab per-object lock before executing the future. let _resource_guard = match lock_manager .get_subsystem(ProtectedSubsystems::NEXUS) - .lock_resource(nexus_uuid, Some(ctx.timeout)) + .lock_resource(nexus_uuid, Some(ctx.timeout), false) .await { Some(g) => g, None => return Err(Status::deadline_exceeded( diff --git a/io-engine/src/grpc/v1/stats.rs b/io-engine/src/grpc/v1/stats.rs index 6261474727..db46d12aba 100644 --- a/io-engine/src/grpc/v1/stats.rs +++ b/io-engine/src/grpc/v1/stats.rs @@ -50,7 +50,7 @@ where let lock_manager = ResourceLockManager::get_instance(); // For nexus global lock. let _global_guard = - match lock_manager.lock(Some(ctx.timeout)).await { + match lock_manager.lock(Some(ctx.timeout), false).await { Some(g) => Some(g), None => return Err(Status::deadline_exceeded( "Failed to acquire access to object within given timeout", @@ -96,7 +96,7 @@ impl StatsService { let lock_manager = ResourceLockManager::get_instance(); // For nexus global lock. let _global_guard = - match lock_manager.lock(Some(ctx.timeout)).await { + match lock_manager.lock(Some(ctx.timeout), false).await { Some(g) => Some(g), None => return Err(Status::deadline_exceeded( "Failed to acquire access to object within given timeout", diff --git a/io-engine/src/lvs/lvs_error.rs b/io-engine/src/lvs/lvs_error.rs index dc1ab9964e..0dca264c73 100644 --- a/io-engine/src/lvs/lvs_error.rs +++ b/io-engine/src/lvs/lvs_error.rs @@ -178,6 +178,10 @@ pub enum Error { WipeFailed { source: crate::core::wiper::Error, }, + #[snafu(display("Failed to acquire resource lock, {}", msg))] + ResourceLockFailed { + msg: String, + }, } /// Map CoreError to errno code. @@ -265,6 +269,9 @@ impl ToErrno for Error { Self::WipeFailed { .. } => Errno::EINVAL, + Self::ResourceLockFailed { + .. + } => Errno::EBUSY, } } } diff --git a/io-engine/tests/lock.rs b/io-engine/tests/lock.rs index 0ce44dc333..b07d1ddb0b 100755 --- a/io-engine/tests/lock.rs +++ b/io-engine/tests/lock.rs @@ -38,19 +38,41 @@ async fn test_lock_level(level: LockLevel) { // Step 1: acquire lock. let guard = match level { - LockLevel::Global => lock_mgr.lock(None).await, + LockLevel::Global => lock_mgr.lock(None, false).await, LockLevel::Subsystem => { - lock_mgr.get_subsystem(TEST_SUBSYSTEM).lock(None).await + lock_mgr + .get_subsystem(TEST_SUBSYSTEM) + .lock(None, false) + .await } LockLevel::Resource => { lock_mgr .get_subsystem(TEST_SUBSYSTEM) - .lock_resource(TEST_RESOURCE, None) + .lock_resource(TEST_RESOURCE, None, false) .await } }; assert!(guard.is_some(), "Failed to acquire the lock"); + if guard.is_some() { + let try_lock_guard = match level { + LockLevel::Global => lock_mgr.lock(None, true).await, + LockLevel::Subsystem => { + lock_mgr + .get_subsystem(TEST_SUBSYSTEM) + .lock(None, true) + .await + } + LockLevel::Resource => { + lock_mgr + .get_subsystem(TEST_SUBSYSTEM) + .lock_resource(TEST_RESOURCE, None, true) + .await + } + }; + assert!(try_lock_guard.is_none(), "Double Lock acquired"); + } + // Notify that the lock is acquired. tx.send(0).expect("Failed to notify the peer "); @@ -81,14 +103,17 @@ async fn test_lock_level(level: LockLevel) { // Try to grab the lock - we must wait, since the lock is already // acquired. let guard = match level { - LockLevel::Global => lock_mgr.lock(None).await, + LockLevel::Global => lock_mgr.lock(None, false).await, LockLevel::Subsystem => { - lock_mgr.get_subsystem(TEST_SUBSYSTEM).lock(None).await + lock_mgr + .get_subsystem(TEST_SUBSYSTEM) + .lock(None, false) + .await } LockLevel::Resource => { lock_mgr .get_subsystem(TEST_SUBSYSTEM) - .lock_resource(TEST_RESOURCE, None) + .lock_resource(TEST_RESOURCE, None, false) .await } }; @@ -113,14 +138,17 @@ async fn test_lock_timed_level(level: LockLevel) { // Step 1: acquire lock. let guard = match level { - LockLevel::Global => lock_mgr.lock(None).await, + LockLevel::Global => lock_mgr.lock(None, false).await, LockLevel::Subsystem => { - lock_mgr.get_subsystem(TEST_SUBSYSTEM).lock(None).await + lock_mgr + .get_subsystem(TEST_SUBSYSTEM) + .lock(None, false) + .await } LockLevel::Resource => { lock_mgr .get_subsystem(TEST_SUBSYSTEM) - .lock_resource(TEST_RESOURCE, None) + .lock_resource(TEST_RESOURCE, None, false) .await } }; @@ -147,14 +175,17 @@ async fn test_lock_timed_level(level: LockLevel) { // Try to grab the lock - we must wait, since the lock is already // acquired. let guard = match level { - LockLevel::Global => lock_mgr.lock(duration).await, + LockLevel::Global => lock_mgr.lock(duration, false).await, LockLevel::Subsystem => { - lock_mgr.get_subsystem(TEST_SUBSYSTEM).lock(duration).await + lock_mgr + .get_subsystem(TEST_SUBSYSTEM) + .lock(duration, false) + .await } LockLevel::Resource => { lock_mgr .get_subsystem(TEST_SUBSYSTEM) - .lock_resource(TEST_RESOURCE, duration) + .lock_resource(TEST_RESOURCE, duration, false) .await } };