Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(csi driver): make mount and unmounts non-blocking #888

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 5 additions & 70 deletions control-plane/csi-driver/src/bin/controller/server.rs
Original file line number Diff line number Diff line change
@@ -1,72 +1,11 @@
use crate::{controller::CsiControllerSvc, identity::CsiIdentitySvc};
use rpc::csi::{controller_server::ControllerServer, identity_server::IdentityServer};

use futures::TryFutureExt;
use std::{
fs,
io::ErrorKind,
ops::Add,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::UnixListener,
};
use tonic::transport::{server::Connected, Server};
use std::{fs, io::ErrorKind, ops::Add};
use tokio::net::UnixListener;
use tonic::{codegen::tokio_stream::wrappers::UnixListenerStream, transport::Server};
use tracing::{debug, error, info};

#[derive(Debug)]
struct UnixStream(tokio::net::UnixStream);

impl Connected for UnixStream {
type ConnectInfo = UdsConnectInfo;

fn connect_info(&self) -> Self::ConnectInfo {
UdsConnectInfo {
peer_addr: self.0.peer_addr().ok().map(Arc::new),
peer_cred: self.0.peer_cred().ok(),
}
}
}

// Not sure why we need the inner fields, probably worth checking if we can remove them.
#[derive(Clone, Debug)]
#[allow(unused)]
struct UdsConnectInfo {
peer_addr: Option<Arc<tokio::net::unix::SocketAddr>>,
peer_cred: Option<tokio::net::unix::UCred>,
}

impl AsyncRead for UnixStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}

impl AsyncWrite for UnixStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.0).poll_write(cx, buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_flush(cx)
}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_shutdown(cx)
}
}

pub(super) struct CsiServer {}
impl CsiServer {
/// Runs the CSI Server identity and controller services.
Expand All @@ -91,6 +30,7 @@ impl CsiServer {

let incoming = {
let uds = UnixListener::bind(csi_socket)?;
info!("CSI plugin bound to {}", csi_socket);

// Change permissions on CSI socket to allow non-privileged clients to access it
// to simplify testing.
Expand All @@ -103,12 +43,7 @@ impl CsiServer {
debug!("Successfully changed file permissions for CSI socket");
}

async_stream::stream! {
loop {
let item = uds.accept().map_ok(|(st, _)| UnixStream(st)).await;
yield item;
}
}
UnixListenerStream::new(uds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

};

let cfg = crate::CsiControllerConfig::get_config();
Expand Down
6 changes: 3 additions & 3 deletions control-plane/csi-driver/src/bin/node/block_vol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub(crate) async fn publish_block_volume(msg: &NodePublishVolumeRequest) -> Resu
}

if let Err(error) =
mount::blockdevice_mount(&device_path, target_path.as_str(), msg.readonly)
mount::blockdevice_mount(&device_path, target_path.as_str(), msg.readonly).await
{
return Err(failure!(
Code::Internal,
Expand All @@ -108,14 +108,14 @@ pub(crate) async fn publish_block_volume(msg: &NodePublishVolumeRequest) -> Resu
}
}

pub(crate) fn unpublish_block_volume(msg: &NodeUnpublishVolumeRequest) -> Result<(), Status> {
pub(crate) async fn unpublish_block_volume(msg: &NodeUnpublishVolumeRequest) -> Result<(), Status> {
let target_path = &msg.target_path;
let volume_id = &msg.volume_id;

// block volumes are mounted on block special file, which is not
// a regular file.
if mount::find_mount(None, Some(target_path)).is_some() {
match mount::blockdevice_unmount(target_path) {
match mount::blockdevice_unmount(target_path).await {
Ok(_) => {}
Err(err) => {
return Err(Status::new(
Expand Down
35 changes: 22 additions & 13 deletions control-plane/csi-driver/src/bin/node/dev/nvmf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use csi_driver::PublishParams;
use glob::glob;
use nvmeadm::nvmf_subsystem::Subsystem;
use regex::Regex;
use tokio::task::spawn_blocking;
use tracing::warn;
use udev::{Device, Enumerator};
use url::Url;
Expand Down Expand Up @@ -206,12 +207,17 @@ impl Attach for NvmfAttach {
.hostnqn(self.hostnqn.clone())
.keep_alive_tmo(self.keep_alive_tmo)
.build()?;
match ca.connect() {
// Should we remove this arm?
Err(NvmeError::ConnectInProgress) => Ok(()),
Err(err) => Err(err.into()),
Ok(_) => Ok(()),
}

spawn_blocking(move || {
match ca.connect() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that this is not blocking, what happens if the future is dropped but the connect/disconnect ioctls are still in progress.
Maybe we should issue these by non-cancellable futures (worker-threads)? Otherwise each retry will stack them up (probably can be another PR)

Copy link
Contributor

@dsharma-dc dsharma-dc Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understood, tasks spawned by spawn_blocking are already non-cancellable if they have started. We can choose to wait on JoinHandle as we are doing here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the join handle is dropped?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understood, tasks spawned by spawn_blocking are already non-cancellable if they have started. We can choose to wait on JoinHandle as we are doing here.

The ca.connect() will be non-cancellable because it's blocking and stuck in ioctl anyway but the code which is calling this is cancellable and will potentially re-run again, and schedule another cannot, or potentially a disconnect.

Ah, we could ref-count the volume guard in the sawn_blocking?

// Should we remove this arm?
Err(NvmeError::ConnectInProgress) => Ok(()),
Err(err) => Err(err.into()),
Ok(_) => Ok(()),
}
})
.await
.map_err(|error| DeviceError::new(&error.to_string()))?
}
Err(err) => Err(err.into()),
}
Expand Down Expand Up @@ -295,14 +301,17 @@ impl NvmfDetach {
#[tonic::async_trait]
impl Detach for NvmfDetach {
async fn detach(&self) -> Result<(), DeviceError> {
if disconnect(&self.nqn)? == 0 {
return Err(DeviceError::from(format!(
let nqn = self.nqn.clone();
spawn_blocking(move || match disconnect(&nqn) {
Ok(0) => Err(DeviceError::from(format!(
"nvmf disconnect {} failed: no device found",
self.nqn
)));
}

Ok(())
nqn
))),
Err(error) => Err(error.into()),
Ok(_) => Ok(()),
})
.await
.map_err(|error| DeviceError::from(error.to_string()))?
}

fn devname(&self) -> DeviceName {
Expand Down
60 changes: 57 additions & 3 deletions control-plane/csi-driver/src/bin/node/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use nix::errno::Errno;
use nvmeadm::{error::NvmeError, nvmf_discovery};
use snafu::Snafu;
use std::{process::ExitCode, string::FromUtf8Error};
use tokio::task::JoinError;

/// A Device Attach/Detach error.
/// todo: should this be an enum?
Expand Down Expand Up @@ -171,8 +172,6 @@ pub(crate) enum FsfreezeError {
},
#[snafu(display("Not a filesystem mount: volume ID: {volume_id}"))]
BlockDeviceMount { volume_id: String },
#[snafu(display("Not a valid fsfreeze command"))]
InvalidFreezeCommand,
}

impl From<FsfreezeError> for ExitCode {
Expand All @@ -184,7 +183,62 @@ impl From<FsfreezeError> for ExitCode {
FsfreezeError::FsfreezeFailed { errno, .. } => ExitCode::from(errno as u8),
FsfreezeError::InternalFailure { .. } => ExitCode::from(Errno::ELIBACC as u8),
FsfreezeError::BlockDeviceMount { .. } => ExitCode::from(Errno::EMEDIUMTYPE as u8),
FsfreezeError::InvalidFreezeCommand => ExitCode::from(Errno::EINVAL as u8),
}
}
}

/// Error for the mount operations.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)), context(suffix(false)))]
pub(crate) enum MountError {
#[snafu(display("Failed mount, error: {error}"))]
MountFailed { error: std::io::Error },
#[snafu(display("Failed unmount, error: {error}"))]
UnmountFailed { error: std::io::Error },
#[snafu(display("Failed to wait for thread termination, error: {join_error}"))]
FailedWaitForThread { join_error: JoinError },
}

impl From<MountError> for ExitCode {
fn from(value: MountError) -> Self {
match value {
MountError::MountFailed { .. } => ExitCode::from(Errno::ELIBACC as u8),
MountError::UnmountFailed { .. } => ExitCode::from(Errno::ELIBACC as u8),
MountError::FailedWaitForThread { .. } => ExitCode::from(Errno::ESRCH as u8),
}
}
}

/// Error for the csi driver operations.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)), context(suffix(false)))]
pub(crate) enum CsiDriverError {
#[snafu(display("{source}"))]
Fsfreeze { source: FsfreezeError },
#[snafu(display("{source}"))]
Mount { source: MountError },
#[snafu(display("Not a valid mayastor csi driver command"))]
InvalidCsiDriverCommand,
}

impl From<FsfreezeError> for CsiDriverError {
fn from(value: FsfreezeError) -> Self {
Self::Fsfreeze { source: value }
}
}

impl From<MountError> for CsiDriverError {
fn from(value: MountError) -> Self {
Self::Mount { source: value }
}
}

impl From<CsiDriverError> for ExitCode {
fn from(value: CsiDriverError) -> Self {
match value {
CsiDriverError::Fsfreeze { source } => source.into(),
CsiDriverError::Mount { source } => source.into(),
CsiDriverError::InvalidCsiDriverCommand => ExitCode::from(Errno::EINVAL as u8),
}
}
}
18 changes: 10 additions & 8 deletions control-plane/csi-driver/src/bin/node/filesystem_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

use crate::{findmnt::get_devicepath, mount};
use csi_driver::filesystem::FileSystem as Fs;
use devinfo::{blkid::probe::Probe, mountinfo::MountInfo, DevInfoError};

use anyhow::anyhow;
use devinfo::{blkid::probe::Probe, mountinfo::MountInfo, DevInfoError};
use std::{process::Output, str, str::FromStr};
use tokio::process::Command;
use tonic::async_trait;
Expand Down Expand Up @@ -95,7 +95,7 @@ pub(crate) trait FileSystemOps: Send + Sync {
/// Get the default mount options along with the user passed options for specific filesystems.
fn mount_flags(&self, mount_flags: Vec<String>) -> Vec<String>;
/// Unmount the filesystem if the filesystem uuid and the provided uuid differ.
fn unmount_on_fs_id_diff(
async fn unmount_on_fs_id_diff(
&self,
device_path: &str,
fs_staging_path: &str,
Expand Down Expand Up @@ -146,7 +146,7 @@ impl FileSystemOps for Ext4Fs {
mount_flags
}

fn unmount_on_fs_id_diff(
async fn unmount_on_fs_id_diff(
&self,
_device_path: &str,
_fs_staging_path: &str,
Expand Down Expand Up @@ -270,13 +270,15 @@ impl FileSystemOps for XFs {
mount_flags
}

fn unmount_on_fs_id_diff(
async fn unmount_on_fs_id_diff(
&self,
device_path: &str,
fs_staging_path: &str,
volume_uuid: &Uuid,
) -> Result<(), Error> {
mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid)
.await
.map_err(|error| error.to_string())
}

/// Xfs filesystem needs an unmount to clear the log, so that the parameters can be changed.
Expand All @@ -288,12 +290,12 @@ impl FileSystemOps for XFs {
options: &[String],
volume_uuid: &Uuid,
) -> Result<(), Error> {
mount::filesystem_mount(device, staging_path, &FileSystem(Fs::Xfs), options).map_err(|error| {
mount::filesystem_mount(device, staging_path, &FileSystem(Fs::Xfs), options).await.map_err(|error| {
format!(
"(xfs repairing) Failed to mount device {device} onto {staging_path} for {volume_uuid} : {error}",
)
})?;
mount::filesystem_unmount(staging_path).map_err(|error| {
mount::filesystem_unmount(staging_path).await.map_err(|error| {
format!(
"(xfs repairing) Failed to unmount device {device} from {staging_path} for {volume_uuid} : {error}",
)
Expand Down Expand Up @@ -358,13 +360,13 @@ impl FileSystemOps for BtrFs {
mount_flags
}

fn unmount_on_fs_id_diff(
async fn unmount_on_fs_id_diff(
&self,
device_path: &str,
fs_staging_path: &str,
volume_uuid: &Uuid,
) -> Result<(), Error> {
mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid)
mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid).await
}

/// `btrfs check --readonly` is a not a `DANGEROUS OPTION` as it only exists to calm potential
Expand Down
Loading
Loading