Skip to content

Commit

Permalink
fix(grpc/snapshot): snapshot grpc should use replica svc
Browse files Browse the repository at this point in the history
We should not allow concurrent snapshot and replica operations as they might clash.
todo: allow per-resource locking
Furthermore it was observed that holding a bdev ptr whilst it's being deleted might
cause a crash, we need to determine if this was to do with it being held over an
async point, or a more general problem.

Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>
  • Loading branch information
tiagolobocastro committed Nov 29, 2023
1 parent 93bbeea commit a7cf92a
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 44 deletions.
8 changes: 6 additions & 2 deletions io-engine/src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,14 @@ impl MayastorGrpcServer {
v1::replica::ReplicaRpcServer::new(replica_v1.clone())
}))
.add_optional_service(enable_v1.map(|_| {
v1::test::TestRpcServer::new(TestService::new(replica_v1))
v1::test::TestRpcServer::new(TestService::new(
replica_v1.clone(),
))
}))
.add_optional_service(enable_v1.map(|_| {
v1::snapshot::SnapshotRpcServer::new(SnapshotService::new())
v1::snapshot::SnapshotRpcServer::new(SnapshotService::new(
replica_v1,
))
}))
.add_optional_service(enable_v1.map(|_| {
v1::host::HostRpcServer::new(HostService::new(
Expand Down
53 changes: 11 additions & 42 deletions io-engine/src/grpc/v1/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
v1::nexus::nexus_lookup,
GrpcClientContext,
GrpcResult,
Serializer,
RWSerializer,
},
lvs::{Error as LvsError, Lvol, Lvs, LvsLvol},
spdk_rs::ffihelper::IntoCString,
Expand All @@ -45,7 +45,7 @@ const SNAPSHOT_READY_AS_SOURCE: bool = false;
#[allow(dead_code)]
pub struct SnapshotService {
name: String,
client_context: tokio::sync::Mutex<Option<GrpcClientContext>>,
replica_svc: super::replica::ReplicaService,
}

#[derive(Debug)]
Expand Down Expand Up @@ -201,55 +201,24 @@ impl From<VolumeSnapshotDescriptor> for SnapshotInfo {
}
}
#[async_trait::async_trait]
impl<F, T> Serializer<F, T> for SnapshotService
impl<F, T> RWSerializer<F, T> for SnapshotService
where
T: Send + 'static,
F: core::future::Future<Output = Result<T, Status>> + Send + 'static,
{
async fn locked(&self, ctx: GrpcClientContext, f: F) -> Result<T, Status> {
let mut context_guard = self.client_context.lock().await;

// Store context as a marker of to detect abnormal termination of the
// request. Even though AssertUnwindSafe() allows us to
// intercept asserts in underlying method strategies, such a
// situation can still happen when the high-level future that
// represents gRPC call at the highest level (i.e. the one created
// by gRPC server) gets cancelled (due to timeout or somehow else).
// This can't be properly intercepted by 'locked' function itself in the
// first place, so the state needs to be cleaned up properly
// upon subsequent gRPC calls.
if let Some(c) = context_guard.replace(ctx) {
warn!("{}: gRPC method timed out, args: {}", c.id, c.args);
}

let fut = AssertUnwindSafe(f).catch_unwind();
let r = fut.await;

// Request completed, remove the marker.
let ctx = context_guard.take().expect("gRPC context disappeared");

match r {
Ok(r) => r,
Err(_e) => {
warn!("{}: gRPC method panicked, args: {}", ctx.id, ctx.args);
Err(Status::cancelled(format!(
"{}: gRPC method panicked",
ctx.id
)))
}
}
self.replica_svc.locked(ctx, f).await
}
}
impl Default for SnapshotService {
fn default() -> Self {
Self::new()
async fn shared(&self, ctx: GrpcClientContext, f: F) -> Result<T, Status> {
self.replica_svc.shared(ctx, f).await
}
}

impl SnapshotService {
pub fn new() -> Self {
pub fn new(replica_svc: super::replica::ReplicaService) -> Self {
Self {
name: String::from("SnapshotSvc"),
client_context: tokio::sync::Mutex::new(None),
replica_svc,
}
}
async fn serialized<T, F>(
Expand Down Expand Up @@ -490,7 +459,7 @@ impl SnapshotRpc for SnapshotService {
&self,
request: Request<ListSnapshotsRequest>,
) -> GrpcResult<ListSnapshotsResponse> {
self.locked(
self.shared(
GrpcClientContext::new(&request, function_name!()),
async move {
let args = request.into_inner();
Expand Down Expand Up @@ -739,7 +708,7 @@ impl SnapshotRpc for SnapshotService {
&self,
request: Request<ListSnapshotCloneRequest>,
) -> GrpcResult<ListSnapshotCloneResponse> {
self.locked(
self.shared(
GrpcClientContext::new(&request, function_name!()),
async move {
let args = request.into_inner();
Expand Down

0 comments on commit a7cf92a

Please sign in to comment.