Skip to content

Commit

Permalink
fix(lock): make import pool, create replica and share replica operati…
Browse files Browse the repository at this point in the history
…ons mutually exclusive

Signed-off-by: Hrudaya <hrudayaranjan.sahoo@datacore.com>
  • Loading branch information
hrudaya21 committed Apr 5, 2024
1 parent 6dd25e6 commit 0af3c40
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 13 deletions.
4 changes: 3 additions & 1 deletion io-engine/src/bin/io-engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ fn start_tokio_runtime(args: &MayastorCliArgs) {

// Initialize Lock manager.
let cfg = ResourceLockManagerConfig::default()
.with_subsystem(ProtectedSubsystems::NEXUS, 512);
.with_subsystem(ProtectedSubsystems::POOL, 32)
.with_subsystem(ProtectedSubsystems::NEXUS, 512)
.with_subsystem(ProtectedSubsystems::REPLICA, 1024);
ResourceLockManager::initialize(cfg);

Mthread::spawn_unaffinitized(move || {
Expand Down
10 changes: 7 additions & 3 deletions io-engine/src/core/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use once_cell::sync::OnceCell;
pub struct ProtectedSubsystems;
impl ProtectedSubsystems {
pub const NEXUS: &'static str = "nexus";
pub const POOL: &'static str = "pool";
pub const REPLICA: &'static str = "replica";
}

/// Configuration parameters for initialization of the Lock manager.
Expand Down Expand Up @@ -81,7 +83,6 @@ impl ResourceSubsystem {
let mut hasher = DefaultHasher::new();
id.as_ref().hash(&mut hasher);
let mutex_id = hasher.finish() as usize % self.object_locks.len();

acquire_lock(&self.object_locks[mutex_id], wait_timeout).await
}
}
Expand Down Expand Up @@ -129,8 +130,11 @@ async fn acquire_lock(
Ok(g) => g,
}
} else {
// No timeout, wait for the lock indefinitely.
lock.lock().await
// No timeout, try for taking lock immediately.
match lock.try_lock() {
Some(l) => l,
None => return None,
}
};

lock_guard.num_acquires += 1;
Expand Down
42 changes: 37 additions & 5 deletions io-engine/src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
use futures::channel::oneshot::Receiver;
use nix::errno::Errno;
pub use server::MayastorGrpcServer;
use std::{
error::Error,
fmt::{Debug, Display},
future::Future,
time::Duration,
};

use futures::channel::oneshot::Receiver;
use nix::errno::Errno;
pub use server::MayastorGrpcServer;
use tokio::sync::RwLock;
use tonic::{Request, Response, Status};

use crate::{
bdev_api::BdevError,
core::{CoreError, Reactor, VerboseError},
core::{
CoreError,
Reactor,
ResourceLockGuard,
ResourceSubsystem,
VerboseError,
},
};

impl From<BdevError> for tonic::Status {
Expand Down Expand Up @@ -168,6 +173,33 @@ where
.map_err(|_| Status::resource_exhausted("ENOMEM"))
}

/// Manage locks accross multiple grpc services.
pub async fn acquire_subsystem_lock(
subsystem: &ResourceSubsystem,
resource: Option<String>,
) -> Result<ResourceLockGuard, Status> {
let lock_guard = if let Some(resource) = resource {
match subsystem.lock_resource(resource, None).await {
Some(l) => l,
None => return Err(Status::already_exists(
"Failed to acquire lock for the resource, lock already held"
.to_string(),
)),
}
} else {
match subsystem.lock(None).await {
Some(l) => l,
None => {
return Err(Status::already_exists(
"Failed to acquire subsystem lock, lock already held"
.to_string(),
))
}
}
};
Ok(lock_guard)
}

macro_rules! default_ip {
() => {
"0.0.0.0"
Expand Down
3 changes: 3 additions & 0 deletions io-engine/src/grpc/v0/mayastor_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,9 @@ impl From<LvsError> for tonic::Status {
LvsError::WipeFailed {
source,
} => source.into(),
LvsError::ResourceLockFailed {
..
} => Status::aborted(e.to_string()),
_ => Status::internal(e.verbose()),
}
}
Expand Down
29 changes: 27 additions & 2 deletions io-engine/src/grpc/v1/pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
use crate::{
core::Share,
grpc::{rpc_submit, GrpcClientContext, GrpcResult, RWLock, RWSerializer},
core::{
lock::{ProtectedSubsystems, ResourceLockManager},
Share,
},
grpc::{
acquire_subsystem_lock,
rpc_submit,
GrpcClientContext,
GrpcResult,
RWLock,
RWSerializer,
},
lvs::{Error as LvsError, Lvs},
pool_backend::{PoolArgs, PoolBackend},
};
Expand Down Expand Up @@ -314,6 +324,21 @@ impl PoolRpc for PoolService {
let args = request.into_inner();
info!("{:?}", args);
let rx = rpc_submit::<_, _, LvsError>(async move {
let pool_subsystem = ResourceLockManager::get_instance()
.get_subsystem(ProtectedSubsystems::POOL);
let _lock_guard = acquire_subsystem_lock(
pool_subsystem,
Some(args.name.clone()),
)
.await
.map_err(|_| {
LvsError::ResourceLockFailed {
msg: format!(
"resource {}, for disk pool {:?}",
&args.name, &args.disks,
),
}
})?;
let pool = Lvs::import_from_args(PoolArgs::try_from(args)?)
.await?;
Ok(Pool::from(pool))
Expand Down
38 changes: 36 additions & 2 deletions io-engine/src/grpc/v1/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
bdev::PtplFileOps,
bdev_api::BdevError,
core::{
lock::{ProtectedSubsystems, ResourceLockManager},
logical_volume::LogicalVolume,
Bdev,
CloneXattrs,
Expand All @@ -11,7 +12,14 @@ use crate::{
UntypedBdev,
UpdateProps,
},
grpc::{rpc_submit, GrpcClientContext, GrpcResult, RWLock, RWSerializer},
grpc::{
acquire_subsystem_lock,
rpc_submit,
GrpcClientContext,
GrpcResult,
RWLock,
RWSerializer,
},
lvs::{Error as LvsError, Lvol, LvolSpaceUsage, Lvs, LvsLvol, PropValue},
};
use ::function_name::named;
Expand Down Expand Up @@ -219,6 +227,20 @@ impl ReplicaRpc for ReplicaService {
}
}
};
let pool_subsystem = ResourceLockManager::get_instance().get_subsystem(ProtectedSubsystems::POOL);
let _lock_guard = acquire_subsystem_lock(
pool_subsystem, Some(lvs.name().to_string())
)
.await
.map_err(|_|
LvsError::ResourceLockFailed {
msg: format!(
"resource {}, for pooluuid {}",
lvs.name(),
args.pooluuid
)
}
)?;
// if pooltype is not Lvs, the provided replica uuid need to be added as
match lvs.create_lvol(&args.name, args.size, Some(&args.uuid), args.thin, args.entity_id).await {
Ok(mut lvol)
Expand Down Expand Up @@ -401,7 +423,19 @@ impl ReplicaRpc for ReplicaService {
match Bdev::lookup_by_uuid_str(&args.uuid) {
Some(bdev) => {
let mut lvol = Lvol::try_from(bdev)?;

let pool_subsystem = ResourceLockManager::get_instance().get_subsystem(ProtectedSubsystems::POOL);
let _lock_guard = acquire_subsystem_lock(
pool_subsystem,
Some(lvol.lvs().name().to_string()),
)
.await
.map_err(|_| LvsError::ResourceLockFailed {
msg: format!(
"resource {}, for lvol {:?}",
lvol.lvs().name(),
lvol
),
})?;
// if we are already shared with the same protocol
if lvol.shared()
== Some(Protocol::try_from(args.share)?)
Expand Down
7 changes: 7 additions & 0 deletions io-engine/src/lvs/lvs_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ pub enum Error {
WipeFailed {
source: crate::core::wiper::Error,
},
#[snafu(display("Failed to acquire resource lock, {}", msg))]
ResourceLockFailed {
msg: String,
},
}

/// Map CoreError to errno code.
Expand Down Expand Up @@ -265,6 +269,9 @@ impl ToErrno for Error {
Self::WipeFailed {
..
} => Errno::EINVAL,
Self::ResourceLockFailed {
..
} => Errno::EBUSY,
}
}
}
Expand Down

0 comments on commit 0af3c40

Please sign in to comment.