From 59331c100c3f833476a8529b53f131808566d5a3 Mon Sep 17 00:00:00 2001 From: Abhinandan Purkait Date: Sat, 23 Nov 2024 18:01:04 +0000 Subject: [PATCH] feat(csi node): add mount utils that frontends and backends the sys mount calls by spwaning process Signed-off-by: Abhinandan Purkait --- .../src/bin/core/tests/controller/mod.rs | 3 +- .../csi-driver/src/bin/controller/server.rs | 75 +------- .../csi-driver/src/bin/node/block_vol.rs | 6 +- .../csi-driver/src/bin/node/dev/nvmf.rs | 35 ++-- .../csi-driver/src/bin/node/error.rs | 60 +++++- .../csi-driver/src/bin/node/filesystem_ops.rs | 18 +- .../csi-driver/src/bin/node/filesystem_vol.rs | 26 +-- control-plane/csi-driver/src/bin/node/main.rs | 6 +- .../csi-driver/src/bin/node/main_.rs | 172 ++++++++++------- .../csi-driver/src/bin/node/mount.rs | 180 +++++++++++------- .../src/bin/node/mount_utils/bin/flag.rs | 122 ++++++++++++ .../src/bin/node/mount_utils/bin/mod.rs | 4 + .../bin/node/mount_utils/bin/mount_utils.rs | 50 +++++ .../src/bin/node/mount_utils/mod.rs | 150 +++++++++++++++ control-plane/csi-driver/src/bin/node/node.rs | 6 +- .../src/bin/node/nodeplugin_grpc.rs | 1 + tests/bdd/features/csi/node/node.feature | 8 + 17 files changed, 669 insertions(+), 253 deletions(-) create mode 100644 control-plane/csi-driver/src/bin/node/mount_utils/bin/flag.rs create mode 100644 control-plane/csi-driver/src/bin/node/mount_utils/bin/mod.rs create mode 100644 control-plane/csi-driver/src/bin/node/mount_utils/bin/mount_utils.rs create mode 100644 control-plane/csi-driver/src/bin/node/mount_utils/mod.rs diff --git a/control-plane/agents/src/bin/core/tests/controller/mod.rs b/control-plane/agents/src/bin/core/tests/controller/mod.rs index 9c825aec0..9ba199e72 100644 --- a/control-plane/agents/src/bin/core/tests/controller/mod.rs +++ b/control-plane/agents/src/bin/core/tests/controller/mod.rs @@ -1,4 +1,5 @@ use deployer_cluster::{etcd_client::Client, *}; +use stor_port::transport_api::TimeoutOptions; use stor_port::{ pstor::{etcd::Etcd, key_prefix_obj, ApiVersion, StorableObjectType, StoreKv, StoreObj}, types::v0::{ @@ -290,7 +291,7 @@ async fn etcd_pagination() { cluster.restart_core().await; cluster - .volume_service_liveness(None) + .volume_service_liveness(Some(TimeoutOptions::new().with_max_retries(10))) .await .expect("Should have restarted by now"); diff --git a/control-plane/csi-driver/src/bin/controller/server.rs b/control-plane/csi-driver/src/bin/controller/server.rs index 6fff9c8d2..957b9411b 100644 --- a/control-plane/csi-driver/src/bin/controller/server.rs +++ b/control-plane/csi-driver/src/bin/controller/server.rs @@ -1,72 +1,11 @@ use crate::{controller::CsiControllerSvc, identity::CsiIdentitySvc}; use rpc::csi::{controller_server::ControllerServer, identity_server::IdentityServer}; -use futures::TryFutureExt; -use std::{ - fs, - io::ErrorKind, - ops::Add, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; -use tokio::{ - io::{AsyncRead, AsyncWrite, ReadBuf}, - net::UnixListener, -}; -use tonic::transport::{server::Connected, Server}; +use std::{fs, io::ErrorKind, ops::Add}; +use tokio::net::UnixListener; +use tonic::{codegen::tokio_stream::wrappers::UnixListenerStream, transport::Server}; use tracing::{debug, error, info}; -#[derive(Debug)] -struct UnixStream(tokio::net::UnixStream); - -impl Connected for UnixStream { - type ConnectInfo = UdsConnectInfo; - - fn connect_info(&self) -> Self::ConnectInfo { - UdsConnectInfo { - peer_addr: self.0.peer_addr().ok().map(Arc::new), - peer_cred: self.0.peer_cred().ok(), - } - } -} - -// Not sure why we need the inner fields, probably worth checking if we can remove them. -#[derive(Clone, Debug)] -#[allow(unused)] -struct UdsConnectInfo { - peer_addr: Option>, - peer_cred: Option, -} - -impl AsyncRead for UnixStream { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - Pin::new(&mut self.0).poll_read(cx, buf) - } -} - -impl AsyncWrite for UnixStream { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.0).poll_write(cx, buf) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.0).poll_flush(cx) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.0).poll_shutdown(cx) - } -} - pub(super) struct CsiServer {} impl CsiServer { /// Runs the CSI Server identity and controller services. @@ -91,6 +30,7 @@ impl CsiServer { let incoming = { let uds = UnixListener::bind(csi_socket)?; + info!("CSI plugin bound to {}", csi_socket); // Change permissions on CSI socket to allow non-privileged clients to access it // to simplify testing. @@ -103,12 +43,7 @@ impl CsiServer { debug!("Successfully changed file permissions for CSI socket"); } - async_stream::stream! { - loop { - let item = uds.accept().map_ok(|(st, _)| UnixStream(st)).await; - yield item; - } - } + UnixListenerStream::new(uds) }; let cfg = crate::CsiControllerConfig::get_config(); diff --git a/control-plane/csi-driver/src/bin/node/block_vol.rs b/control-plane/csi-driver/src/bin/node/block_vol.rs index ce94e3b95..ca8d4491a 100644 --- a/control-plane/csi-driver/src/bin/node/block_vol.rs +++ b/control-plane/csi-driver/src/bin/node/block_vol.rs @@ -88,7 +88,7 @@ pub(crate) async fn publish_block_volume(msg: &NodePublishVolumeRequest) -> Resu } if let Err(error) = - mount::blockdevice_mount(&device_path, target_path.as_str(), msg.readonly) + mount::blockdevice_mount(&device_path, target_path.as_str(), msg.readonly).await { return Err(failure!( Code::Internal, @@ -108,14 +108,14 @@ pub(crate) async fn publish_block_volume(msg: &NodePublishVolumeRequest) -> Resu } } -pub(crate) fn unpublish_block_volume(msg: &NodeUnpublishVolumeRequest) -> Result<(), Status> { +pub(crate) async fn unpublish_block_volume(msg: &NodeUnpublishVolumeRequest) -> Result<(), Status> { let target_path = &msg.target_path; let volume_id = &msg.volume_id; // block volumes are mounted on block special file, which is not // a regular file. if mount::find_mount(None, Some(target_path)).is_some() { - match mount::blockdevice_unmount(target_path) { + match mount::blockdevice_unmount(target_path).await { Ok(_) => {} Err(err) => { return Err(Status::new( diff --git a/control-plane/csi-driver/src/bin/node/dev/nvmf.rs b/control-plane/csi-driver/src/bin/node/dev/nvmf.rs index c9549f980..8c76ffb0b 100644 --- a/control-plane/csi-driver/src/bin/node/dev/nvmf.rs +++ b/control-plane/csi-driver/src/bin/node/dev/nvmf.rs @@ -14,6 +14,7 @@ use csi_driver::PublishParams; use glob::glob; use nvmeadm::nvmf_subsystem::Subsystem; use regex::Regex; +use tokio::task::spawn_blocking; use tracing::warn; use udev::{Device, Enumerator}; use url::Url; @@ -192,12 +193,17 @@ impl Attach for NvmfAttach { .hostnqn(self.hostnqn.clone()) .keep_alive_tmo(self.keep_alive_tmo) .build()?; - match ca.connect() { - // Should we remove this arm? - Err(NvmeError::ConnectInProgress) => Ok(()), - Err(err) => Err(err.into()), - Ok(_) => Ok(()), - } + + spawn_blocking(move || { + match ca.connect() { + // Should we remove this arm? + Err(NvmeError::ConnectInProgress) => Ok(()), + Err(err) => Err(err.into()), + Ok(_) => Ok(()), + } + }) + .await + .map_err(|error| DeviceError::new(&error.to_string()))? } Err(err) => Err(err.into()), } @@ -283,14 +289,17 @@ impl NvmfDetach { #[tonic::async_trait] impl Detach for NvmfDetach { async fn detach(&self) -> Result<(), DeviceError> { - if disconnect(&self.nqn)? == 0 { - return Err(DeviceError::from(format!( + let nqn = self.nqn.clone(); + spawn_blocking(move || match disconnect(&nqn) { + Ok(0) => Err(DeviceError::from(format!( "nvmf disconnect {} failed: no device found", - self.nqn - ))); - } - - Ok(()) + nqn + ))), + Err(error) => Err(error.into()), + Ok(_) => Ok(()), + }) + .await + .map_err(|error| DeviceError::from(error.to_string()))? } fn devname(&self) -> DeviceName { diff --git a/control-plane/csi-driver/src/bin/node/error.rs b/control-plane/csi-driver/src/bin/node/error.rs index ec463133e..74e043cce 100644 --- a/control-plane/csi-driver/src/bin/node/error.rs +++ b/control-plane/csi-driver/src/bin/node/error.rs @@ -3,6 +3,7 @@ use nix::errno::Errno; use nvmeadm::{error::NvmeError, nvmf_discovery}; use snafu::Snafu; use std::{process::ExitCode, string::FromUtf8Error}; +use tokio::task::JoinError; /// A Device Attach/Detach error. /// todo: should this be an enum? @@ -171,8 +172,6 @@ pub(crate) enum FsfreezeError { }, #[snafu(display("Not a filesystem mount: volume ID: {volume_id}"))] BlockDeviceMount { volume_id: String }, - #[snafu(display("Not a valid fsfreeze command"))] - InvalidFreezeCommand, } impl From for ExitCode { @@ -184,7 +183,62 @@ impl From for ExitCode { FsfreezeError::FsfreezeFailed { errno, .. } => ExitCode::from(errno as u8), FsfreezeError::InternalFailure { .. } => ExitCode::from(Errno::ELIBACC as u8), FsfreezeError::BlockDeviceMount { .. } => ExitCode::from(Errno::EMEDIUMTYPE as u8), - FsfreezeError::InvalidFreezeCommand => ExitCode::from(Errno::EINVAL as u8), + } + } +} + +/// Error for the mount operations. +#[derive(Debug, Snafu)] +#[snafu(visibility(pub(crate)), context(suffix(false)))] +pub(crate) enum MountError { + #[snafu(display("Failed mount, error: {error}"))] + MountFailed { error: std::io::Error }, + #[snafu(display("Failed unmount, error: {error}"))] + UnmountFailed { error: std::io::Error }, + #[snafu(display("Failed to wait for thread termination, error: {join_error}"))] + FailedWaitForThread { join_error: JoinError }, +} + +impl From for ExitCode { + fn from(value: MountError) -> Self { + match value { + MountError::MountFailed { .. } => ExitCode::from(Errno::ELIBACC as u8), + MountError::UnmountFailed { .. } => ExitCode::from(Errno::ELIBACC as u8), + MountError::FailedWaitForThread { .. } => ExitCode::from(Errno::ESRCH as u8), + } + } +} + +/// Error for the csi driver operations. +#[derive(Debug, Snafu)] +#[snafu(visibility(pub(crate)), context(suffix(false)))] +pub(crate) enum CsiDriverError { + #[snafu(display("{source}"))] + Fsfreeze { source: FsfreezeError }, + #[snafu(display("{source}"))] + Mount { source: MountError }, + #[snafu(display("Not a valid mayastor csi driver command"))] + InvalidCsiDriverCommand, +} + +impl From for CsiDriverError { + fn from(value: FsfreezeError) -> Self { + Self::Fsfreeze { source: value } + } +} + +impl From for CsiDriverError { + fn from(value: MountError) -> Self { + Self::Mount { source: value } + } +} + +impl From for ExitCode { + fn from(value: CsiDriverError) -> Self { + match value { + CsiDriverError::Fsfreeze { source } => source.into(), + CsiDriverError::Mount { source } => source.into(), + CsiDriverError::InvalidCsiDriverCommand => ExitCode::from(Errno::EINVAL as u8), } } } diff --git a/control-plane/csi-driver/src/bin/node/filesystem_ops.rs b/control-plane/csi-driver/src/bin/node/filesystem_ops.rs index 2866e593f..395900e3f 100644 --- a/control-plane/csi-driver/src/bin/node/filesystem_ops.rs +++ b/control-plane/csi-driver/src/bin/node/filesystem_ops.rs @@ -5,9 +5,9 @@ use crate::{findmnt::get_devicepath, mount}; use csi_driver::filesystem::FileSystem as Fs; +use devinfo::{blkid::probe::Probe, mountinfo::MountInfo, DevInfoError}; use anyhow::anyhow; -use devinfo::{blkid::probe::Probe, mountinfo::MountInfo, DevInfoError}; use std::{process::Output, str, str::FromStr}; use tokio::process::Command; use tonic::async_trait; @@ -95,7 +95,7 @@ pub(crate) trait FileSystemOps: Send + Sync { /// Get the default mount options along with the user passed options for specific filesystems. fn mount_flags(&self, mount_flags: Vec) -> Vec; /// Unmount the filesystem if the filesystem uuid and the provided uuid differ. - fn unmount_on_fs_id_diff( + async fn unmount_on_fs_id_diff( &self, device_path: &str, fs_staging_path: &str, @@ -146,7 +146,7 @@ impl FileSystemOps for Ext4Fs { mount_flags } - fn unmount_on_fs_id_diff( + async fn unmount_on_fs_id_diff( &self, _device_path: &str, _fs_staging_path: &str, @@ -270,13 +270,15 @@ impl FileSystemOps for XFs { mount_flags } - fn unmount_on_fs_id_diff( + async fn unmount_on_fs_id_diff( &self, device_path: &str, fs_staging_path: &str, volume_uuid: &Uuid, ) -> Result<(), Error> { mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid) + .await + .map_err(|error| error.to_string()) } /// Xfs filesystem needs an unmount to clear the log, so that the parameters can be changed. @@ -288,12 +290,12 @@ impl FileSystemOps for XFs { options: &[String], volume_uuid: &Uuid, ) -> Result<(), Error> { - mount::filesystem_mount(device, staging_path, &FileSystem(Fs::Xfs), options).map_err(|error| { + mount::filesystem_mount(device, staging_path, &FileSystem(Fs::Xfs), options).await.map_err(|error| { format!( "(xfs repairing) Failed to mount device {device} onto {staging_path} for {volume_uuid} : {error}", ) })?; - mount::filesystem_unmount(staging_path).map_err(|error| { + mount::filesystem_unmount(staging_path).await.map_err(|error| { format!( "(xfs repairing) Failed to unmount device {device} from {staging_path} for {volume_uuid} : {error}", ) @@ -358,13 +360,13 @@ impl FileSystemOps for BtrFs { mount_flags } - fn unmount_on_fs_id_diff( + async fn unmount_on_fs_id_diff( &self, device_path: &str, fs_staging_path: &str, volume_uuid: &Uuid, ) -> Result<(), Error> { - mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid) + mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid).await } /// `btrfs check --readonly` is a not a `DANGEROUS OPTION` as it only exists to calm potential diff --git a/control-plane/csi-driver/src/bin/node/filesystem_vol.rs b/control-plane/csi-driver/src/bin/node/filesystem_vol.rs index dfe4d7649..c77e1845e 100644 --- a/control-plane/csi-driver/src/bin/node/filesystem_vol.rs +++ b/control-plane/csi-driver/src/bin/node/filesystem_vol.rs @@ -91,7 +91,7 @@ pub(crate) async fn stage_fs_volume( // If clone's fs id change was requested and we were not able to change it in first attempt // unmount and continue the stage again. let continue_stage = if fs_id.is_some() { - continue_after_unmount_on_fs_id_diff(fstype ,device_path, fs_staging_path, &volume_uuid) + continue_after_unmount_on_fs_id_diff(fstype ,device_path, fs_staging_path, &volume_uuid).await .map_err(|error| { failure!( Code::FailedPrecondition, @@ -107,7 +107,7 @@ pub(crate) async fn stage_fs_volume( if !continue_stage { // todo: validate other flags? if mnt.mount_flags.readonly() != existing.options.readonly() { - mount::remount(fs_staging_path, mnt.mount_flags.readonly())?; + mount::remount(fs_staging_path, mnt.mount_flags.readonly()).await?; } return Ok(()); @@ -161,7 +161,8 @@ pub(crate) async fn stage_fs_volume( debug!("Mounting device {} onto {}", device_path, fs_staging_path); - if let Err(error) = mount::filesystem_mount(device_path, fs_staging_path, fstype, &mount_flags) + if let Err(error) = + mount::filesystem_mount(device_path, fs_staging_path, fstype, &mount_flags).await { return Err(failure!( Code::Internal, @@ -209,7 +210,7 @@ pub(crate) async fn unstage_fs_volume(msg: &NodeUnstageVolumeRequest) -> Result< )); } - if let Err(error) = mount::filesystem_unmount(fs_staging_path) { + if let Err(error) = mount::filesystem_unmount(fs_staging_path).await { return Err(failure!( Code::Internal, "Failed to unstage volume {}: failed to unmount device {:?} from {}: {}", @@ -227,7 +228,7 @@ pub(crate) async fn unstage_fs_volume(msg: &NodeUnstageVolumeRequest) -> Result< } /// Publish a filesystem volume -pub(crate) fn publish_fs_volume( +pub(crate) async fn publish_fs_volume( msg: &NodePublishVolumeRequest, mnt: &MountVolume, filesystems: &[FileSystem], @@ -328,7 +329,7 @@ pub(crate) fn publish_fs_volume( debug!("Mounting {} to {}", fs_staging_path, target_path); - if let Err(error) = mount::bind_mount(fs_staging_path, target_path, false) { + if let Err(error) = mount::bind_mount(fs_staging_path, target_path, false).await { return Err(failure!( Code::Internal, "Failed to publish volume {}: failed to mount {} to {}: {}", @@ -345,7 +346,7 @@ pub(crate) fn publish_fs_volume( debug!("Remounting {} as readonly", target_path); - if let Err(error) = mount::bind_remount(target_path, &options) { + if let Err(error) = mount::bind_remount(target_path, &options).await { let message = format!( "Failed to publish volume {volume_id}: failed to mount {fs_staging_path} to {target_path} as readonly: {error}" ); @@ -354,7 +355,7 @@ pub(crate) fn publish_fs_volume( debug!("Unmounting {}", target_path); - if let Err(error) = mount::bind_unmount(target_path) { + if let Err(error) = mount::bind_unmount(target_path).await { error!("Failed to unmount {}: {}", target_path, error); } @@ -367,7 +368,7 @@ pub(crate) fn publish_fs_volume( Ok(()) } -pub(crate) fn unpublish_fs_volume(msg: &NodeUnpublishVolumeRequest) -> Result<(), Status> { +pub(crate) async fn unpublish_fs_volume(msg: &NodeUnpublishVolumeRequest) -> Result<(), Status> { // filesystem mount let target_path = &msg.target_path; let volume_id = &msg.volume_id; @@ -398,7 +399,7 @@ pub(crate) fn unpublish_fs_volume(msg: &NodeUnpublishVolumeRequest) -> Result<() debug!("Unmounting {}", target_path); - if let Err(error) = mount::bind_unmount(target_path) { + if let Err(error) = mount::bind_unmount(target_path).await { return Err(failure!( Code::Internal, "Failed to unpublish volume {}: failed to unmount {}: {}", @@ -427,7 +428,7 @@ pub(crate) fn unpublish_fs_volume(msg: &NodeUnpublishVolumeRequest) -> Result<() /// Check if we can continue the staging incase the change fs id failed mid way and we want to retry /// the flow. -fn continue_after_unmount_on_fs_id_diff( +async fn continue_after_unmount_on_fs_id_diff( fstype: &FileSystem, device_path: &str, fs_staging_path: &str, @@ -435,6 +436,7 @@ fn continue_after_unmount_on_fs_id_diff( ) -> Result { fstype .fs_ops()? - .unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid)?; + .unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid) + .await?; Ok(fstype == &Fs::Xfs.into()) } diff --git a/control-plane/csi-driver/src/bin/node/main.rs b/control-plane/csi-driver/src/bin/node/main.rs index 8dca319e6..c6ade51c2 100644 --- a/control-plane/csi-driver/src/bin/node/main.rs +++ b/control-plane/csi-driver/src/bin/node/main.rs @@ -1,4 +1,4 @@ -use crate::error::FsfreezeError; +use crate::error::CsiDriverError; use std::process::ExitCode; /// todo: cleanup this module cfg repetition.. @@ -31,6 +31,8 @@ mod match_dev; #[cfg(target_os = "linux")] mod mount; #[cfg(target_os = "linux")] +mod mount_utils; +#[cfg(target_os = "linux")] mod node; #[cfg(target_os = "linux")] mod nodeplugin_grpc; @@ -52,7 +54,7 @@ async fn main() -> anyhow::Result { error }) { Ok(_) => Ok(ExitCode::SUCCESS), - Err(error) => match error.downcast::() { + Err(error) => match error.downcast::() { Ok(error) => Ok(error.into()), Err(error) => Err(error), }, diff --git a/control-plane/csi-driver/src/bin/node/main_.rs b/control-plane/csi-driver/src/bin/node/main_.rs index 4f0b4d6ae..86b389e28 100644 --- a/control-plane/csi-driver/src/bin/node/main_.rs +++ b/control-plane/csi-driver/src/bin/node/main_.rs @@ -4,11 +4,16 @@ //! of volumes using iscsi/nvmf protocols on the node. use crate::{ - error::FsfreezeError, + client::AppNodesClientWrapper, + error::CsiDriverError, fsfreeze::{bin::fsfreeze, FsFreezeOpt}, identity::Identity, k8s::patch_k8s_node, mount::probe_filesystems, + mount_utils::bin::{ + flag::{parse_mount_flags, parse_unmount_flags}, + mount_utils::{mount, unmount}, + }, node::{Node, RDMA_CONNECT_CHECK}, nodeplugin_grpc::NodePluginGrpcServer, nodeplugin_nvme::NvmeOperationsSvc, @@ -20,9 +25,7 @@ use grpc::csi_node_nvme::nvme_operations_server::NvmeOperationsServer; use stor_port::platform; use utils::tracing_telemetry::{FmtLayer, FmtStyle}; -use crate::client::AppNodesClientWrapper; use clap::Arg; -use futures::TryFutureExt; use serde_json::json; use std::{ collections::HashMap, @@ -30,68 +33,12 @@ use std::{ future::Future, io::ErrorKind, net::{IpAddr, SocketAddr}, - pin::Pin, str::FromStr, - sync::Arc, - task::{Context, Poll}, }; -use tokio::{ - io::{AsyncRead, AsyncWrite, ReadBuf}, - net::UnixListener, -}; -use tonic::transport::{server::Connected, Server}; +use tokio::net::UnixListener; +use tonic::{codegen::tokio_stream::wrappers::UnixListenerStream, transport::Server}; use tracing::{debug, error, info}; -#[derive(Clone, Debug)] -pub struct UdsConnectInfo { - #[allow(dead_code)] - pub peer_addr: Option>, - #[allow(dead_code)] - pub peer_cred: Option, -} - -#[derive(Debug)] -struct UnixStream(tokio::net::UnixStream); - -impl Connected for UnixStream { - type ConnectInfo = UdsConnectInfo; - - fn connect_info(&self) -> Self::ConnectInfo { - UdsConnectInfo { - peer_addr: self.0.peer_addr().ok().map(Arc::new), - peer_cred: self.0.peer_cred().ok(), - } - } -} - -impl AsyncRead for UnixStream { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - Pin::new(&mut self.0).poll_read(cx, buf) - } -} - -impl AsyncWrite for UnixStream { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.0).poll_write(cx, buf) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.0).poll_flush(cx) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.0).poll_shutdown(cx) - } -} - const GRPC_PORT: u16 = 50051; pub(super) async fn main() -> anyhow::Result<()> { @@ -255,6 +202,61 @@ pub(super) async fn main() -> anyhow::Result<()> { .help("Uuid of the volume to unfreeze") ) ) + .subcommand( + clap::Command::new("mount") + .arg( + Arg::new("source") + .long("source") + .value_name("PATH") + .required(true) + .help("Mount source path") + ) + .arg( + Arg::new("target") + .long("target") + .value_name("PATH") + .required(true) + .help("Mount target path") + ) + .arg( + Arg::new("fstype") + .long("fstype") + .value_name("STRING") + .help("Filesystem type for filesystem volume") + ) + .arg( + Arg::new("data") + .long("data") + .value_name("STRING") + .help("Options to apply for the filesystem on mount") + ) + .arg( + Arg::new("mount-flags") + .long("mount-flags") + .value_name("STRING") + .value_parser(parse_mount_flags) + .required(true) + .help("Mount flags") + ) + ) + .subcommand( + clap::Command::new("unmount") + .arg( + Arg::new("target") + .long("target") + .value_name("PATH") + .required(true) + .help("Unmount target path") + ) + .arg( + Arg::new("unmount-flags") + .long("unmount-flags") + .value_name("STRING") + .value_parser(parse_unmount_flags) + .required(true) + .help("Unmount flags") + ) + ) .get_matches(); let tags = utils::tracing_telemetry::default_tracing_tags( utils::raw_version_str(), @@ -272,13 +274,44 @@ pub(super) async fn main() -> anyhow::Result<()> { match cmd { ("fs-freeze", arg_matches) => { let volume_id = arg_matches.get_one::("volume-id").unwrap(); - fsfreeze(volume_id, FsFreezeOpt::Freeze).await + fsfreeze(volume_id, FsFreezeOpt::Freeze) + .await + .map_err(|error| CsiDriverError::Fsfreeze { source: error }) } ("fs-unfreeze", arg_matches) => { let volume_id = arg_matches.get_one::("volume-id").unwrap(); - fsfreeze(volume_id, FsFreezeOpt::Unfreeze).await + fsfreeze(volume_id, FsFreezeOpt::Unfreeze) + .await + .map_err(|error| CsiDriverError::Fsfreeze { source: error }) } - _ => Err(FsfreezeError::InvalidFreezeCommand), + ("mount", arg_matches) => { + let src_path = arg_matches.get_one::("source").unwrap(); + let dsc_path = arg_matches.get_one::("target").unwrap(); + let fstype = arg_matches.get_one::("fstype"); + let data = arg_matches.get_one::("data"); + let mnt_flags = arg_matches + .get_one::("mount-flags") + .unwrap(); + mount( + src_path.to_string(), + dsc_path.to_string(), + data.cloned(), + *mnt_flags, + fstype.cloned(), + ) + .await + .map_err(|error| CsiDriverError::Mount { source: error }) + } + ("unmount", arg_matches) => { + let target_path = arg_matches.get_one::("target").unwrap(); + let unmnt_flags = arg_matches + .get_one::("unmount-flags") + .unwrap(); + unmount(target_path.to_string(), *unmnt_flags) + .await + .map_err(|error| CsiDriverError::Mount { source: error }) + } + _ => Err(CsiDriverError::InvalidCsiDriverCommand), }?; return Ok(()); } @@ -408,6 +441,7 @@ pub(super) async fn main() -> anyhow::Result<()> { registration_enabled ) ); + info!("Main process terminated!"); vec![csi, grpc, registration].into_iter().collect() } @@ -426,7 +460,7 @@ impl CsiServer { )?; let incoming = { - let uds = UnixListener::bind(csi_socket).unwrap(); + let uds = UnixListener::bind(csi_socket)?; info!("CSI plugin bound to {}", csi_socket); // Change permissions on CSI socket to allow non-privileged clients to access it @@ -440,17 +474,13 @@ impl CsiServer { debug!("Successfully changed file permissions for CSI socket"); } - async_stream::stream! { - loop { - let item = uds.accept().map_ok(|(st, _)| UnixStream(st)).await; - yield item; - } - } + UnixListenerStream::new(uds) }; let node = Node::new(node_name.into(), node_selector, probe_filesystems()); Ok(async move { Server::builder() + .timeout(std::time::Duration::from_secs(30)) .add_service(NodeServer::new(node)) .add_service(IdentityServer::new(Identity {})) .add_service(NvmeOperationsServer::new(NvmeOperationsSvc {})) diff --git a/control-plane/csi-driver/src/bin/node/mount.rs b/control-plane/csi-driver/src/bin/node/mount.rs index bc339ab44..7b969691a 100644 --- a/control-plane/csi-driver/src/bin/node/mount.rs +++ b/control-plane/csi-driver/src/bin/node/mount.rs @@ -1,10 +1,13 @@ //! Utility functions for mounting and unmounting filesystems. -use crate::filesystem_ops::FileSystem; +use crate::{ + filesystem_ops::FileSystem, + mount_utils::{bin::flag::MountFlag, MayastorMount}, +}; use csi_driver::filesystem::FileSystem as Fs; use devinfo::mountinfo::{MountInfo, SafeMountIter}; use std::{collections::HashSet, io::Error}; -use sys_mount::{unmount, FilesystemType, Mount, MountFlags, UnmountFlags}; +use tonic::Status; use tracing::{debug, info}; use uuid::Uuid; @@ -131,29 +134,32 @@ fn show(options: &[String]) -> String { } /// Mount a device to a directory (mountpoint) -pub(crate) fn filesystem_mount( +pub(crate) async fn filesystem_mount( device: &str, target: &str, fstype: &FileSystem, options: &[String], -) -> Result { - let mut flags = MountFlags::empty(); +) -> Result { + let mut flags = Vec::new(); let (readonly, value) = parse(options); if readonly { - flags.insert(MountFlags::RDONLY); + flags.push(MountFlag::RDONLY.to_string()); } // I'm not certain if it's fine to pass "" so keep existing behaviour let mount = if value.is_empty() { - Mount::builder() + MayastorMount::builder() } else { - Mount::builder().data(&value) + MayastorMount::builder().data(&value) } - .fstype(FilesystemType::Manual(fstype.as_ref())) - .flags(flags) - .mount(device, target)?; + .fstype(fstype.as_ref()) + .flags(flags.join(",").as_str()) + .source(device) + .target(target) + .mount() + .await?; debug!( "Filesystem ({}) on device {} mounted onto target {} (options: {})", @@ -168,10 +174,14 @@ pub(crate) fn filesystem_mount( /// Unmount a device from a directory (mountpoint) /// Should not be used for removing bind mounts. -pub(crate) fn filesystem_unmount(target: &str) -> Result<(), Error> { - let flags = UnmountFlags::empty(); - // read more about the umount system call and it's flags at `man 2 umount` - unmount(target, flags)?; +pub(crate) async fn filesystem_unmount(target: &str) -> Result<(), Status> { + let flags: Vec = Vec::new(); + + MayastorMount::builder() + .flags(flags.join(",").as_str()) + .target(target) + .unmount() + .await?; debug!("Target {} unmounted", target); @@ -180,19 +190,26 @@ pub(crate) fn filesystem_unmount(target: &str) -> Result<(), Error> { /// Bind mount a source path to a target path. /// Supports both directories and files. -pub(crate) fn bind_mount(source: &str, target: &str, file: bool) -> Result { - let mut flags = MountFlags::empty(); +pub(crate) async fn bind_mount( + source: &str, + target: &str, + file: bool, +) -> Result { + let mut flags: Vec = Vec::new(); - flags.insert(MountFlags::BIND); + flags.push(MountFlag::BIND.to_string()); if file { - flags.insert(MountFlags::RDONLY); + flags.push(MountFlag::RDONLY.to_string()); } - let mount = Mount::builder() - .fstype(FilesystemType::Manual("none")) - .flags(flags) - .mount(source, target)?; + let mount = MayastorMount::builder() + .fstype("none") + .flags(flags.join(",").as_str()) + .source(source) + .target(target) + .mount() + .await?; debug!("Source {} bind mounted onto target {}", source, target); @@ -201,27 +218,33 @@ pub(crate) fn bind_mount(source: &str, target: &str, file: bool) -> Result Result { - let mut flags = MountFlags::empty(); +pub(crate) async fn bind_remount( + target: &str, + options: &[String], +) -> Result { + let mut flags: Vec = Vec::new(); let (readonly, value) = parse(options); - flags.insert(MountFlags::BIND); + flags.push(MountFlag::BIND.to_string()); if readonly { - flags.insert(MountFlags::RDONLY); + flags.push(MountFlag::RDONLY.to_string()); } - flags.insert(MountFlags::REMOUNT); + flags.push(MountFlag::REMOUNT.to_string()); let mount = if value.is_empty() { - Mount::builder() + MayastorMount::builder() } else { - Mount::builder().data(&value) + MayastorMount::builder().data(&value) } - .fstype(FilesystemType::Manual("none")) - .flags(flags) - .mount("none", target)?; + .fstype("none") + .flags(flags.join(",").as_str()) + .source("none") + .target(target) + .mount() + .await?; debug!( "Target {} bind remounted (options: {})", @@ -234,10 +257,14 @@ pub(crate) fn bind_remount(target: &str, options: &[String]) -> Result Result<(), Error> { - let flags = UnmountFlags::empty(); +pub(crate) async fn bind_unmount(target: &str) -> Result<(), Status> { + let flags: Vec = Vec::new(); - unmount(target, flags)?; + MayastorMount::builder() + .flags(flags.join(",").as_str()) + .target(target) + .unmount() + .await?; debug!("Target {} bind unmounted", target); @@ -245,49 +272,60 @@ pub(crate) fn bind_unmount(target: &str) -> Result<(), Error> { } /// Remount existing mount as read only or read write. -pub(crate) fn remount(target: &str, ro: bool) -> Result { - let mut flags = MountFlags::empty(); - flags.insert(MountFlags::REMOUNT); +pub(crate) async fn remount(target: &str, ro: bool) -> Result { + let mut flags: Vec = Vec::new(); + flags.push(MountFlag::REMOUNT.to_string()); if ro { - flags.insert(MountFlags::RDONLY); + flags.push(MountFlag::RDONLY.to_string()); } - let mount = Mount::builder() - .fstype(FilesystemType::Manual("none")) - .flags(flags) - .mount("", target)?; + let mount = MayastorMount::builder() + .fstype("none") + .flags(flags.join(",").as_str()) + .source("") + .target(target) + .mount() + .await?; - debug!("Target {} remounted with {}", target, flags.bits()); + debug!("Target {} remounted with {}", target, flags.join(",")); Ok(mount) } /// Mount a block device -pub(crate) fn blockdevice_mount( +pub(crate) async fn blockdevice_mount( source: &str, target: &str, readonly: bool, -) -> Result { +) -> Result { debug!("Mounting {} ...", source); - let mut flags = MountFlags::empty(); - flags.insert(MountFlags::BIND); + let mut flags: Vec = Vec::new(); + flags.push(MountFlag::BIND.to_string()); + + let mount = MayastorMount::builder() + .fstype("none") + .flags(flags.join(",").as_str()) + .source(source) + .target(target) + .mount() + .await?; - let mount = Mount::builder() - .fstype(FilesystemType::Manual("none")) - .flags(flags) - .mount(source, target)?; - info!("Block device {} mounted to {}", source, target,); + info!("Block device {} mounted to {}", source, target); if readonly { - flags.insert(MountFlags::REMOUNT); - flags.insert(MountFlags::RDONLY); + flags.push(MountFlag::REMOUNT.to_string()); + flags.push(MountFlag::RDONLY.to_string()); + + let mount = MayastorMount::builder() + .fstype("") + .flags(flags.join(",").as_str()) + .source("") + .target(target) + .mount() + .await?; - let mount = Mount::builder() - .fstype(FilesystemType::Manual("")) - .flags(flags) - .mount("", target)?; info!("Remounted block device {} (readonly) to {}", source, target); return Ok(mount); } @@ -296,16 +334,20 @@ pub(crate) fn blockdevice_mount( } /// Unmount a block device. -pub(crate) fn blockdevice_unmount(target: &str) -> Result<(), Error> { - let flags = UnmountFlags::empty(); +pub(crate) async fn blockdevice_unmount(target: &str) -> Result<(), Status> { + let flags: Vec = Vec::new(); debug!( "Unmounting block device {} (flags={}) ...", target, - flags.bits() + flags.join(",") ); - unmount(target, flags)?; + MayastorMount::builder() + .flags(flags.join(",").as_str()) + .target(target) + .unmount() + .await?; info!("block device at {} has been unmounted", target); Ok(()) } @@ -349,10 +391,14 @@ async fn wait_file_removal( let check_interval = std::time::Duration::from_millis(200); let proc_str = proc.to_string_lossy().to_string(); let mut exists = proc.exists(); + let mut logged = false; while start.elapsed() < timeout && exists { - tracing::error!(proc = proc_str, "proc entry still exists"); + if !logged { + tracing::error!(proc = proc_str, "proc entry still exists"); + } tokio::time::sleep(check_interval).await; exists = proc.exists(); + logged = true; } match exists { false => Ok(()), @@ -364,7 +410,7 @@ async fn wait_file_removal( } /// If the filesystem uuid doesn't match with the provided uuid, unmount the device. -pub(crate) fn unmount_on_fs_id_diff( +pub(crate) async fn unmount_on_fs_id_diff( device_path: &str, fs_staging_path: &str, volume_uuid: &Uuid, @@ -374,7 +420,7 @@ pub(crate) fn unmount_on_fs_id_diff( return Ok(()); } } - filesystem_unmount(fs_staging_path).map_err(|error| { + filesystem_unmount(fs_staging_path).await.map_err(|error| { format!( "Failed to unmount on fs id difference, device {device_path} from {fs_staging_path} for {volume_uuid}, {error}", ) diff --git a/control-plane/csi-driver/src/bin/node/mount_utils/bin/flag.rs b/control-plane/csi-driver/src/bin/node/mount_utils/bin/flag.rs new file mode 100644 index 000000000..8b43cc08b --- /dev/null +++ b/control-plane/csi-driver/src/bin/node/mount_utils/bin/flag.rs @@ -0,0 +1,122 @@ +use std::str::FromStr; +use strum_macros::{AsRefStr, Display, EnumString}; + +/// Sys mount's MountFlags representation for conversion. +#[derive(EnumString, Clone, Debug, Eq, PartialEq, Display, AsRefStr)] +#[strum(serialize_all = "lowercase")] +#[allow(clippy::upper_case_acronyms)] +pub(crate) enum MountFlag { + BIND, + DIRSYNC, + MANDLOCK, + MOVE, + NOATIME, + NODEV, + NODIRATIME, + NOEXEC, + NOSUID, + RDONLY, + REC, + RELATIME, + REMOUNT, + SILENT, + STRICTATIME, + SYNCHRONOUS, +} + +struct MountFlags(Vec); + +/// Sys mount's UnmountFlags representation for conversion. +#[derive(EnumString, Clone, Debug, Eq, PartialEq, Display, AsRefStr)] +#[strum(serialize_all = "lowercase")] +#[allow(clippy::upper_case_acronyms)] +pub(crate) enum UnmountFlag { + FORCE, + DETACH, + EXPIRE, + NOFOLLOW, +} + +struct UnmountFlags(Vec); + +impl FromStr for MountFlags { + type Err = String; + + fn from_str(s: &str) -> Result { + let flags: Vec<&str> = s.split(',').collect(); + let mut mount_flags: Vec = Vec::with_capacity(flags.len()); + + for flag in flags { + if !flag.trim().is_empty() { + match flag.parse::() { + Ok(parsed_flag) => mount_flags.push(parsed_flag), + Err(_) => return Err(format!("Invalid mount flag: {}", flag)), + } + } + } + + Ok(MountFlags(mount_flags)) + } +} + +impl FromStr for UnmountFlags { + type Err = String; + + fn from_str(s: &str) -> Result { + let flags: Vec<&str> = s.split(',').collect(); + let mut unmount_flags: Vec = Vec::with_capacity(flags.len()); + + for flag in flags { + if !flag.trim().is_empty() { + match flag.parse::() { + Ok(parsed_flag) => unmount_flags.push(parsed_flag), + Err(_) => return Err(format!("Invalid unmount flag: {}", flag)), + } + } + } + + Ok(UnmountFlags(unmount_flags)) + } +} + +/// Converts comma seperated mount flag from cli input to sys mount's Mountflags +pub(crate) fn parse_mount_flags(flags_str: &str) -> Result { + let mount_flags = MountFlags::from_str(flags_str)?; + let mut _mount_flags = sys_mount::MountFlags::empty(); + for flag in mount_flags.0 { + match flag { + MountFlag::BIND => _mount_flags.insert(sys_mount::MountFlags::BIND), + MountFlag::DIRSYNC => _mount_flags.insert(sys_mount::MountFlags::DIRSYNC), + MountFlag::MANDLOCK => _mount_flags.insert(sys_mount::MountFlags::MANDLOCK), + MountFlag::MOVE => _mount_flags.insert(sys_mount::MountFlags::MOVE), + MountFlag::NOATIME => _mount_flags.insert(sys_mount::MountFlags::NOATIME), + MountFlag::NODEV => _mount_flags.insert(sys_mount::MountFlags::NODEV), + MountFlag::NODIRATIME => _mount_flags.insert(sys_mount::MountFlags::NODIRATIME), + MountFlag::NOEXEC => _mount_flags.insert(sys_mount::MountFlags::NOEXEC), + MountFlag::NOSUID => _mount_flags.insert(sys_mount::MountFlags::NOSUID), + MountFlag::RDONLY => _mount_flags.insert(sys_mount::MountFlags::RDONLY), + MountFlag::REC => _mount_flags.insert(sys_mount::MountFlags::REC), + MountFlag::RELATIME => _mount_flags.insert(sys_mount::MountFlags::RELATIME), + MountFlag::REMOUNT => _mount_flags.insert(sys_mount::MountFlags::REMOUNT), + MountFlag::SILENT => _mount_flags.insert(sys_mount::MountFlags::SILENT), + MountFlag::STRICTATIME => _mount_flags.insert(sys_mount::MountFlags::STRICTATIME), + MountFlag::SYNCHRONOUS => _mount_flags.insert(sys_mount::MountFlags::SYNCHRONOUS), + } + } + Ok(_mount_flags) +} + +/// Converts comma seperated mount flag from cli input to sys mount's Unmountflags +pub(crate) fn parse_unmount_flags(flags_str: &str) -> Result { + let unmount_flags = UnmountFlags::from_str(flags_str)?; + let mut _unmount_flags = sys_mount::UnmountFlags::empty(); + for flag in unmount_flags.0 { + match flag { + UnmountFlag::FORCE => _unmount_flags.insert(sys_mount::UnmountFlags::FORCE), + UnmountFlag::DETACH => _unmount_flags.insert(sys_mount::UnmountFlags::DETACH), + UnmountFlag::EXPIRE => _unmount_flags.insert(sys_mount::UnmountFlags::EXPIRE), + UnmountFlag::NOFOLLOW => _unmount_flags.insert(sys_mount::UnmountFlags::NOFOLLOW), + } + } + Ok(_unmount_flags) +} diff --git a/control-plane/csi-driver/src/bin/node/mount_utils/bin/mod.rs b/control-plane/csi-driver/src/bin/node/mount_utils/bin/mod.rs new file mode 100644 index 000000000..324016e14 --- /dev/null +++ b/control-plane/csi-driver/src/bin/node/mount_utils/bin/mod.rs @@ -0,0 +1,4 @@ +/// Sys mount's Mount and Unmount flag conversions. +pub(crate) mod flag; +/// Backend calling to sys mount's mount and unmount. +pub(crate) mod mount_utils; diff --git a/control-plane/csi-driver/src/bin/node/mount_utils/bin/mount_utils.rs b/control-plane/csi-driver/src/bin/node/mount_utils/bin/mount_utils.rs new file mode 100644 index 000000000..8c8a26a84 --- /dev/null +++ b/control-plane/csi-driver/src/bin/node/mount_utils/bin/mount_utils.rs @@ -0,0 +1,50 @@ +use crate::error::MountError; + +use sys_mount::{FilesystemType, Mount, MountFlags, UnmountFlags}; + +/// Calls sys mount's mount in a spawn blocking. +pub(crate) async fn mount( + device_path: String, + target: String, + data: Option, + mount_flags: MountFlags, + fstype: Option, +) -> Result<(), MountError> { + let blocking_task = tokio::task::spawn_blocking(move || { + let mut mount = Mount::builder(); + + if let Some(data_str) = data.as_deref().filter(|d| !d.is_empty()) { + mount = mount.data(data_str); + } + + mount + .flags(mount_flags) + .fstype(FilesystemType::Manual(fstype.unwrap_or_default().as_ref())) + .mount(device_path, target) + .map_err(|error| MountError::MountFailed { error }) + }); + + match blocking_task.await { + Ok(result) => match result { + Ok(_mount) => Ok(()), + Err(error) => Err(error), + }, + Err(error) => Err(MountError::FailedWaitForThread { join_error: error }), + } +} + +/// Calls sys mount's unmount in a spawn blocking. +pub(crate) async fn unmount(target: String, unmount_flags: UnmountFlags) -> Result<(), MountError> { + let blocking_task = tokio::task::spawn_blocking(move || { + sys_mount::unmount(target, unmount_flags) + .map_err(|error| MountError::UnmountFailed { error }) + }); + + match blocking_task.await { + Ok(result) => match result { + Ok(_) => Ok(()), + Err(error) => Err(error), + }, + Err(error) => Err(MountError::FailedWaitForThread { join_error: error }), + } +} diff --git a/control-plane/csi-driver/src/bin/node/mount_utils/mod.rs b/control-plane/csi-driver/src/bin/node/mount_utils/mod.rs new file mode 100644 index 000000000..c03f6c023 --- /dev/null +++ b/control-plane/csi-driver/src/bin/node/mount_utils/mod.rs @@ -0,0 +1,150 @@ +pub(crate) mod bin; + +use tokio::process::Command; +use tonic::Status; +use tracing::debug; + +const CSI_NODE_BINARY: &str = "csi-node"; +const MOUNT: &str = "mount"; +const UNMOUNT: &str = "unmount"; +const SOURCE: &str = "--source"; +const DATA: &str = "--data"; +const FSTYPE: &str = "--fstype"; +const TARGET: &str = "--target"; +const MOUNT_FLAGS: &str = "--mount-flags"; +const UNMOUNT_FLAGS: &str = "--unmount-flags"; + +/// Builder for mounting and unmounting mayastor devices by spawning a process. +#[derive(Debug)] +pub(crate) struct MayastorMount { + binary_name: String, + operation: String, + source: String, + target: String, + data: Option, + fstype: Option, + flags: String, +} + +impl std::fmt::Display for MayastorMount { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "operation: {}, args: source={}, target={}, data={}, fstype={}, flags={}", + self.operation, + self.source, + self.target, + self.data.as_deref().unwrap_or("None"), + self.fstype.as_deref().unwrap_or("None"), + self.flags + ) + } +} + +impl MayastorMount { + /// Get an initialized builder for MayastorMount. + pub(crate) fn builder() -> Self { + Self { + binary_name: CSI_NODE_BINARY.to_string(), + operation: String::new(), + source: String::new(), + target: String::new(), + data: None, + fstype: None, + flags: String::new(), + } + } + + /// Set mount source. + pub(crate) fn source(mut self, path: &str) -> Self { + self.source = path.into(); + self + } + + /// Set mount/umount target. + pub(crate) fn target(mut self, path: &str) -> Self { + self.target = path.into(); + self + } + + /// Options to apply for the file system on mount. + pub(crate) fn data(mut self, data: &str) -> Self { + self.data = Some(data.into()); + self + } + + /// The file system that is to be mounted. + pub(crate) fn fstype(mut self, fstype: &str) -> Self { + self.fstype = Some(fstype.into()); + self + } + + /// Mount flags for the mount syscall. + pub(crate) fn flags(mut self, flags: &str) -> Self { + self.flags = flags.into(); + self + } + + /// Mounts a filesystem/block at `source` to a `target` path in the system. + pub(crate) async fn mount(mut self) -> Result { + self.operation = MOUNT.into(); + let mut command = Command::new(&self.binary_name); + command + .arg(&self.operation) + .arg(SOURCE) + .arg(&self.source) + .arg(TARGET) + .arg(&self.target) + .arg(MOUNT_FLAGS) + .arg(&self.flags); + + if let Some(data) = &self.data { + command.arg(DATA).arg(data); + } + if let Some(fstype) = &self.fstype { + command.arg(FSTYPE).arg(fstype); + } + + debug!("Issuing {}", self); + spawn_and_wait_for_output(command).await.map(|_| self) + } + + /// Unmounts a filesystem/block from a `target` path in the system. + pub(crate) async fn unmount(mut self) -> Result<(), Status> { + self.operation = UNMOUNT.into(); + + let mut command = Command::new(&self.binary_name); + command + .arg(&self.operation) + .arg(TARGET) + .arg(&self.target) + .arg(UNMOUNT_FLAGS) + .arg(&self.flags); + + debug!("Issuing {}", self); + spawn_and_wait_for_output(command).await + } +} + +async fn spawn_and_wait_for_output(mut command: Command) -> Result<(), Status> { + let child = command + .spawn() + .map_err(|_| Status::aborted("Failed to spawn"))?; + + match child.wait_with_output().await { + Ok(output) => { + if output.status.success() { + return Ok(()); + } + + Err(Status::aborted(format!( + "Command failed: {}", + String::from_utf8(output.stderr).unwrap() + ))) + } + Err(error) => Err(Status::aborted(format!( + "Failed to execute command: {}", + error + ))), + } +} diff --git a/control-plane/csi-driver/src/bin/node/node.rs b/control-plane/csi-driver/src/bin/node/node.rs index 0f5b25590..029cc11db 100644 --- a/control-plane/csi-driver/src/bin/node/node.rs +++ b/control-plane/csi-driver/src/bin/node/node.rs @@ -305,7 +305,7 @@ impl node_server::Node for Node { ) })? { AccessType::Mount(mnt) => { - publish_fs_volume(&msg, mnt, &self.filesystems)?; + publish_fs_volume(&msg, mnt, &self.filesystems).await?; } AccessType::Block(_) => { publish_block_volume(&msg).await?; @@ -356,7 +356,7 @@ impl node_server::Node for Node { let target_path = Path::new(&msg.target_path); if target_path.exists() { if target_path.is_dir() { - unpublish_fs_volume(&msg)?; + unpublish_fs_volume(&msg).await?; } else { if target_path.is_file() { return Err(Status::new( @@ -368,7 +368,7 @@ impl node_server::Node for Node { )); } - unpublish_block_volume(&msg)?; + unpublish_block_volume(&msg).await?; } } Ok(Response::new(NodeUnpublishVolumeResponse {})) diff --git a/control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs b/control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs index eeb894409..b4dac068a 100644 --- a/control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs +++ b/control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs @@ -76,6 +76,7 @@ impl NodePluginGrpcServer { endpoint ); Server::builder() + .timeout(std::time::Duration::from_secs(30)) .add_service(NodePluginServer::new(NodePluginSvc {})) .serve_with_shutdown(endpoint, Shutdown::wait()) .await diff --git a/tests/bdd/features/csi/node/node.feature b/tests/bdd/features/csi/node/node.feature index 1fa97bf91..f59373479 100644 --- a/tests/bdd/features/csi/node/node.feature +++ b/tests/bdd/features/csi/node/node.feature @@ -106,3 +106,11 @@ Feature: CSI node plugin Scenario: publishing a reader only block volume as rw Given a block volume staged as "MULTI_NODE_READER_ONLY" When publishing the block volume as "rw" should fail + + Scenario: unstaging a unavailable target volume + Given a staged volume + And the target for the volume is removed + When the unstage call is made + When a new volume is created and staged + Then the newer stage should succeed + And the unstage should eventually exit with timeout