Skip to content

Commit

Permalink
feat(csi node): add mount utils that frontends and backends the sys m…
Browse files Browse the repository at this point in the history
…ount calls by spwaning process

Signed-off-by: Abhinandan Purkait <purkaitabhinandan@gmail.com>
  • Loading branch information
Abhinandan-Purkait committed Dec 3, 2024
1 parent 00190e9 commit 59331c1
Show file tree
Hide file tree
Showing 17 changed files with 669 additions and 253 deletions.
3 changes: 2 additions & 1 deletion control-plane/agents/src/bin/core/tests/controller/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use deployer_cluster::{etcd_client::Client, *};
use stor_port::transport_api::TimeoutOptions;
use stor_port::{
pstor::{etcd::Etcd, key_prefix_obj, ApiVersion, StorableObjectType, StoreKv, StoreObj},
types::v0::{
Expand Down Expand Up @@ -290,7 +291,7 @@ async fn etcd_pagination() {

cluster.restart_core().await;
cluster
.volume_service_liveness(None)
.volume_service_liveness(Some(TimeoutOptions::new().with_max_retries(10)))
.await
.expect("Should have restarted by now");

Expand Down
75 changes: 5 additions & 70 deletions control-plane/csi-driver/src/bin/controller/server.rs
Original file line number Diff line number Diff line change
@@ -1,72 +1,11 @@
use crate::{controller::CsiControllerSvc, identity::CsiIdentitySvc};
use rpc::csi::{controller_server::ControllerServer, identity_server::IdentityServer};

use futures::TryFutureExt;
use std::{
fs,
io::ErrorKind,
ops::Add,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::UnixListener,
};
use tonic::transport::{server::Connected, Server};
use std::{fs, io::ErrorKind, ops::Add};
use tokio::net::UnixListener;
use tonic::{codegen::tokio_stream::wrappers::UnixListenerStream, transport::Server};
use tracing::{debug, error, info};

#[derive(Debug)]
struct UnixStream(tokio::net::UnixStream);

impl Connected for UnixStream {
type ConnectInfo = UdsConnectInfo;

fn connect_info(&self) -> Self::ConnectInfo {
UdsConnectInfo {
peer_addr: self.0.peer_addr().ok().map(Arc::new),
peer_cred: self.0.peer_cred().ok(),
}
}
}

// Not sure why we need the inner fields, probably worth checking if we can remove them.
#[derive(Clone, Debug)]
#[allow(unused)]
struct UdsConnectInfo {
peer_addr: Option<Arc<tokio::net::unix::SocketAddr>>,
peer_cred: Option<tokio::net::unix::UCred>,
}

impl AsyncRead for UnixStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}

impl AsyncWrite for UnixStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.0).poll_write(cx, buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_flush(cx)
}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_shutdown(cx)
}
}

pub(super) struct CsiServer {}
impl CsiServer {
/// Runs the CSI Server identity and controller services.
Expand All @@ -91,6 +30,7 @@ impl CsiServer {

let incoming = {
let uds = UnixListener::bind(csi_socket)?;
info!("CSI plugin bound to {}", csi_socket);

// Change permissions on CSI socket to allow non-privileged clients to access it
// to simplify testing.
Expand All @@ -103,12 +43,7 @@ impl CsiServer {
debug!("Successfully changed file permissions for CSI socket");
}

async_stream::stream! {
loop {
let item = uds.accept().map_ok(|(st, _)| UnixStream(st)).await;
yield item;
}
}
UnixListenerStream::new(uds)
};

let cfg = crate::CsiControllerConfig::get_config();
Expand Down
6 changes: 3 additions & 3 deletions control-plane/csi-driver/src/bin/node/block_vol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub(crate) async fn publish_block_volume(msg: &NodePublishVolumeRequest) -> Resu
}

if let Err(error) =
mount::blockdevice_mount(&device_path, target_path.as_str(), msg.readonly)
mount::blockdevice_mount(&device_path, target_path.as_str(), msg.readonly).await
{
return Err(failure!(
Code::Internal,
Expand All @@ -108,14 +108,14 @@ pub(crate) async fn publish_block_volume(msg: &NodePublishVolumeRequest) -> Resu
}
}

pub(crate) fn unpublish_block_volume(msg: &NodeUnpublishVolumeRequest) -> Result<(), Status> {
pub(crate) async fn unpublish_block_volume(msg: &NodeUnpublishVolumeRequest) -> Result<(), Status> {
let target_path = &msg.target_path;
let volume_id = &msg.volume_id;

// block volumes are mounted on block special file, which is not
// a regular file.
if mount::find_mount(None, Some(target_path)).is_some() {
match mount::blockdevice_unmount(target_path) {
match mount::blockdevice_unmount(target_path).await {
Ok(_) => {}
Err(err) => {
return Err(Status::new(
Expand Down
35 changes: 22 additions & 13 deletions control-plane/csi-driver/src/bin/node/dev/nvmf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use csi_driver::PublishParams;
use glob::glob;
use nvmeadm::nvmf_subsystem::Subsystem;
use regex::Regex;
use tokio::task::spawn_blocking;
use tracing::warn;
use udev::{Device, Enumerator};
use url::Url;
Expand Down Expand Up @@ -192,12 +193,17 @@ impl Attach for NvmfAttach {
.hostnqn(self.hostnqn.clone())
.keep_alive_tmo(self.keep_alive_tmo)
.build()?;
match ca.connect() {
// Should we remove this arm?
Err(NvmeError::ConnectInProgress) => Ok(()),
Err(err) => Err(err.into()),
Ok(_) => Ok(()),
}

spawn_blocking(move || {
match ca.connect() {
// Should we remove this arm?
Err(NvmeError::ConnectInProgress) => Ok(()),
Err(err) => Err(err.into()),
Ok(_) => Ok(()),
}
})
.await
.map_err(|error| DeviceError::new(&error.to_string()))?
}
Err(err) => Err(err.into()),
}
Expand Down Expand Up @@ -283,14 +289,17 @@ impl NvmfDetach {
#[tonic::async_trait]
impl Detach for NvmfDetach {
async fn detach(&self) -> Result<(), DeviceError> {
if disconnect(&self.nqn)? == 0 {
return Err(DeviceError::from(format!(
let nqn = self.nqn.clone();
spawn_blocking(move || match disconnect(&nqn) {
Ok(0) => Err(DeviceError::from(format!(
"nvmf disconnect {} failed: no device found",
self.nqn
)));
}

Ok(())
nqn
))),
Err(error) => Err(error.into()),
Ok(_) => Ok(()),
})
.await
.map_err(|error| DeviceError::from(error.to_string()))?
}

fn devname(&self) -> DeviceName {
Expand Down
60 changes: 57 additions & 3 deletions control-plane/csi-driver/src/bin/node/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use nix::errno::Errno;
use nvmeadm::{error::NvmeError, nvmf_discovery};
use snafu::Snafu;
use std::{process::ExitCode, string::FromUtf8Error};
use tokio::task::JoinError;

/// A Device Attach/Detach error.
/// todo: should this be an enum?
Expand Down Expand Up @@ -171,8 +172,6 @@ pub(crate) enum FsfreezeError {
},
#[snafu(display("Not a filesystem mount: volume ID: {volume_id}"))]
BlockDeviceMount { volume_id: String },
#[snafu(display("Not a valid fsfreeze command"))]
InvalidFreezeCommand,
}

impl From<FsfreezeError> for ExitCode {
Expand All @@ -184,7 +183,62 @@ impl From<FsfreezeError> for ExitCode {
FsfreezeError::FsfreezeFailed { errno, .. } => ExitCode::from(errno as u8),
FsfreezeError::InternalFailure { .. } => ExitCode::from(Errno::ELIBACC as u8),
FsfreezeError::BlockDeviceMount { .. } => ExitCode::from(Errno::EMEDIUMTYPE as u8),
FsfreezeError::InvalidFreezeCommand => ExitCode::from(Errno::EINVAL as u8),
}
}
}

/// Error for the mount operations.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)), context(suffix(false)))]
pub(crate) enum MountError {
#[snafu(display("Failed mount, error: {error}"))]
MountFailed { error: std::io::Error },
#[snafu(display("Failed unmount, error: {error}"))]
UnmountFailed { error: std::io::Error },
#[snafu(display("Failed to wait for thread termination, error: {join_error}"))]
FailedWaitForThread { join_error: JoinError },
}

impl From<MountError> for ExitCode {
fn from(value: MountError) -> Self {
match value {
MountError::MountFailed { .. } => ExitCode::from(Errno::ELIBACC as u8),
MountError::UnmountFailed { .. } => ExitCode::from(Errno::ELIBACC as u8),
MountError::FailedWaitForThread { .. } => ExitCode::from(Errno::ESRCH as u8),
}
}
}

/// Error for the csi driver operations.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)), context(suffix(false)))]
pub(crate) enum CsiDriverError {
#[snafu(display("{source}"))]
Fsfreeze { source: FsfreezeError },
#[snafu(display("{source}"))]
Mount { source: MountError },
#[snafu(display("Not a valid mayastor csi driver command"))]
InvalidCsiDriverCommand,
}

impl From<FsfreezeError> for CsiDriverError {
fn from(value: FsfreezeError) -> Self {
Self::Fsfreeze { source: value }
}
}

impl From<MountError> for CsiDriverError {
fn from(value: MountError) -> Self {
Self::Mount { source: value }
}
}

impl From<CsiDriverError> for ExitCode {
fn from(value: CsiDriverError) -> Self {
match value {
CsiDriverError::Fsfreeze { source } => source.into(),
CsiDriverError::Mount { source } => source.into(),
CsiDriverError::InvalidCsiDriverCommand => ExitCode::from(Errno::EINVAL as u8),
}
}
}
18 changes: 10 additions & 8 deletions control-plane/csi-driver/src/bin/node/filesystem_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

use crate::{findmnt::get_devicepath, mount};
use csi_driver::filesystem::FileSystem as Fs;
use devinfo::{blkid::probe::Probe, mountinfo::MountInfo, DevInfoError};

use anyhow::anyhow;
use devinfo::{blkid::probe::Probe, mountinfo::MountInfo, DevInfoError};
use std::{process::Output, str, str::FromStr};
use tokio::process::Command;
use tonic::async_trait;
Expand Down Expand Up @@ -95,7 +95,7 @@ pub(crate) trait FileSystemOps: Send + Sync {
/// Get the default mount options along with the user passed options for specific filesystems.
fn mount_flags(&self, mount_flags: Vec<String>) -> Vec<String>;
/// Unmount the filesystem if the filesystem uuid and the provided uuid differ.
fn unmount_on_fs_id_diff(
async fn unmount_on_fs_id_diff(
&self,
device_path: &str,
fs_staging_path: &str,
Expand Down Expand Up @@ -146,7 +146,7 @@ impl FileSystemOps for Ext4Fs {
mount_flags
}

fn unmount_on_fs_id_diff(
async fn unmount_on_fs_id_diff(
&self,
_device_path: &str,
_fs_staging_path: &str,
Expand Down Expand Up @@ -270,13 +270,15 @@ impl FileSystemOps for XFs {
mount_flags
}

fn unmount_on_fs_id_diff(
async fn unmount_on_fs_id_diff(
&self,
device_path: &str,
fs_staging_path: &str,
volume_uuid: &Uuid,
) -> Result<(), Error> {
mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid)
.await
.map_err(|error| error.to_string())
}

/// Xfs filesystem needs an unmount to clear the log, so that the parameters can be changed.
Expand All @@ -288,12 +290,12 @@ impl FileSystemOps for XFs {
options: &[String],
volume_uuid: &Uuid,
) -> Result<(), Error> {
mount::filesystem_mount(device, staging_path, &FileSystem(Fs::Xfs), options).map_err(|error| {
mount::filesystem_mount(device, staging_path, &FileSystem(Fs::Xfs), options).await.map_err(|error| {
format!(
"(xfs repairing) Failed to mount device {device} onto {staging_path} for {volume_uuid} : {error}",
)
})?;
mount::filesystem_unmount(staging_path).map_err(|error| {
mount::filesystem_unmount(staging_path).await.map_err(|error| {
format!(
"(xfs repairing) Failed to unmount device {device} from {staging_path} for {volume_uuid} : {error}",
)
Expand Down Expand Up @@ -358,13 +360,13 @@ impl FileSystemOps for BtrFs {
mount_flags
}

fn unmount_on_fs_id_diff(
async fn unmount_on_fs_id_diff(
&self,
device_path: &str,
fs_staging_path: &str,
volume_uuid: &Uuid,
) -> Result<(), Error> {
mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid)
mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid).await
}

/// `btrfs check --readonly` is a not a `DANGEROUS OPTION` as it only exists to calm potential
Expand Down
Loading

0 comments on commit 59331c1

Please sign in to comment.