Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(csi-driver): make operations async #886

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions control-plane/csi-driver/src/bin/node/block_vol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub(crate) async fn publish_block_volume(msg: &NodePublishVolumeRequest) -> Resu
}

if let Err(error) =
mount::blockdevice_mount(&device_path, target_path.as_str(), msg.readonly)
mount::blockdevice_mount(&device_path, target_path.as_str(), msg.readonly).await
{
return Err(failure!(
Code::Internal,
Expand All @@ -108,14 +108,14 @@ pub(crate) async fn publish_block_volume(msg: &NodePublishVolumeRequest) -> Resu
}
}

pub(crate) fn unpublish_block_volume(msg: &NodeUnpublishVolumeRequest) -> Result<(), Status> {
pub(crate) async fn unpublish_block_volume(msg: &NodeUnpublishVolumeRequest) -> Result<(), Status> {
let target_path = &msg.target_path;
let volume_id = &msg.volume_id;

// block volumes are mounted on block special file, which is not
// a regular file.
if mount::find_mount(None, Some(target_path)).is_some() {
match mount::blockdevice_unmount(target_path) {
match mount::blockdevice_unmount(target_path).await {
Ok(_) => {}
Err(err) => {
return Err(Status::new(
Expand Down
16 changes: 8 additions & 8 deletions control-plane/csi-driver/src/bin/node/filesystem_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub(crate) trait FileSystemOps: Send + Sync {
/// Get the default mount options along with the user passed options for specific filesystems.
fn mount_flags(&self, mount_flags: Vec<String>) -> Vec<String>;
/// Unmount the filesystem if the filesystem uuid and the provided uuid differ.
fn unmount_on_fs_id_diff(
async fn unmount_on_fs_id_diff(
&self,
device_path: &str,
fs_staging_path: &str,
Expand Down Expand Up @@ -146,7 +146,7 @@ impl FileSystemOps for Ext4Fs {
mount_flags
}

fn unmount_on_fs_id_diff(
async fn unmount_on_fs_id_diff(
&self,
_device_path: &str,
_fs_staging_path: &str,
Expand Down Expand Up @@ -270,13 +270,13 @@ impl FileSystemOps for XFs {
mount_flags
}

fn unmount_on_fs_id_diff(
async fn unmount_on_fs_id_diff(
&self,
device_path: &str,
fs_staging_path: &str,
volume_uuid: &Uuid,
) -> Result<(), Error> {
mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid)
mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid).await
}

/// Xfs filesystem needs an unmount to clear the log, so that the parameters can be changed.
Expand All @@ -288,12 +288,12 @@ impl FileSystemOps for XFs {
options: &[String],
volume_uuid: &Uuid,
) -> Result<(), Error> {
mount::filesystem_mount(device, staging_path, &FileSystem(Fs::Xfs), options).map_err(|error| {
mount::filesystem_mount(device, staging_path, &FileSystem(Fs::Xfs), options).await.map_err(|error| {
format!(
"(xfs repairing) Failed to mount device {device} onto {staging_path} for {volume_uuid} : {error}",
)
})?;
mount::filesystem_unmount(staging_path).map_err(|error| {
mount::filesystem_unmount(staging_path).await.map_err(|error| {
format!(
"(xfs repairing) Failed to unmount device {device} from {staging_path} for {volume_uuid} : {error}",
)
Expand Down Expand Up @@ -358,13 +358,13 @@ impl FileSystemOps for BtrFs {
mount_flags
}

fn unmount_on_fs_id_diff(
async fn unmount_on_fs_id_diff(
&self,
device_path: &str,
fs_staging_path: &str,
volume_uuid: &Uuid,
) -> Result<(), Error> {
mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid)
mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid).await
}

/// `btrfs check --readonly` is a not a `DANGEROUS OPTION` as it only exists to calm potential
Expand Down
26 changes: 14 additions & 12 deletions control-plane/csi-driver/src/bin/node/filesystem_vol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub(crate) async fn stage_fs_volume(
// If clone's fs id change was requested and we were not able to change it in first attempt
// unmount and continue the stage again.
let continue_stage = if fs_id.is_some() {
continue_after_unmount_on_fs_id_diff(fstype ,device_path, fs_staging_path, &volume_uuid)
continue_after_unmount_on_fs_id_diff(fstype ,device_path, fs_staging_path, &volume_uuid).await
.map_err(|error| {
failure!(
Code::FailedPrecondition,
Expand All @@ -107,7 +107,7 @@ pub(crate) async fn stage_fs_volume(
if !continue_stage {
// todo: validate other flags?
if mnt.mount_flags.readonly() != existing.options.readonly() {
mount::remount(fs_staging_path, mnt.mount_flags.readonly())?;
mount::remount(fs_staging_path, mnt.mount_flags.readonly()).await?;
}

return Ok(());
Expand Down Expand Up @@ -161,7 +161,8 @@ pub(crate) async fn stage_fs_volume(

debug!("Mounting device {} onto {}", device_path, fs_staging_path);

if let Err(error) = mount::filesystem_mount(device_path, fs_staging_path, fstype, &mount_flags)
if let Err(error) =
mount::filesystem_mount(device_path, fs_staging_path, fstype, &mount_flags).await
{
return Err(failure!(
Code::Internal,
Expand Down Expand Up @@ -209,7 +210,7 @@ pub(crate) async fn unstage_fs_volume(msg: &NodeUnstageVolumeRequest) -> Result<
));
}

if let Err(error) = mount::filesystem_unmount(fs_staging_path) {
if let Err(error) = mount::filesystem_unmount(fs_staging_path).await {
return Err(failure!(
Code::Internal,
"Failed to unstage volume {}: failed to unmount device {:?} from {}: {}",
Expand All @@ -227,7 +228,7 @@ pub(crate) async fn unstage_fs_volume(msg: &NodeUnstageVolumeRequest) -> Result<
}

/// Publish a filesystem volume
pub(crate) fn publish_fs_volume(
pub(crate) async fn publish_fs_volume(
msg: &NodePublishVolumeRequest,
mnt: &MountVolume,
filesystems: &[FileSystem],
Expand Down Expand Up @@ -328,7 +329,7 @@ pub(crate) fn publish_fs_volume(

debug!("Mounting {} to {}", fs_staging_path, target_path);

if let Err(error) = mount::bind_mount(fs_staging_path, target_path, false) {
if let Err(error) = mount::bind_mount(fs_staging_path, target_path, false).await {
return Err(failure!(
Code::Internal,
"Failed to publish volume {}: failed to mount {} to {}: {}",
Expand All @@ -345,7 +346,7 @@ pub(crate) fn publish_fs_volume(

debug!("Remounting {} as readonly", target_path);

if let Err(error) = mount::bind_remount(target_path, &options) {
if let Err(error) = mount::bind_remount(target_path, &options).await {
let message = format!(
"Failed to publish volume {volume_id}: failed to mount {fs_staging_path} to {target_path} as readonly: {error}"
);
Expand All @@ -354,7 +355,7 @@ pub(crate) fn publish_fs_volume(

debug!("Unmounting {}", target_path);

if let Err(error) = mount::bind_unmount(target_path) {
if let Err(error) = mount::bind_unmount(target_path).await {
error!("Failed to unmount {}: {}", target_path, error);
}

Expand All @@ -367,7 +368,7 @@ pub(crate) fn publish_fs_volume(
Ok(())
}

pub(crate) fn unpublish_fs_volume(msg: &NodeUnpublishVolumeRequest) -> Result<(), Status> {
pub(crate) async fn unpublish_fs_volume(msg: &NodeUnpublishVolumeRequest) -> Result<(), Status> {
// filesystem mount
let target_path = &msg.target_path;
let volume_id = &msg.volume_id;
Expand Down Expand Up @@ -398,7 +399,7 @@ pub(crate) fn unpublish_fs_volume(msg: &NodeUnpublishVolumeRequest) -> Result<()

debug!("Unmounting {}", target_path);

if let Err(error) = mount::bind_unmount(target_path) {
if let Err(error) = mount::bind_unmount(target_path).await {
return Err(failure!(
Code::Internal,
"Failed to unpublish volume {}: failed to unmount {}: {}",
Expand Down Expand Up @@ -427,14 +428,15 @@ pub(crate) fn unpublish_fs_volume(msg: &NodeUnpublishVolumeRequest) -> Result<()

/// Check if we can continue the staging incase the change fs id failed mid way and we want to retry
/// the flow.
fn continue_after_unmount_on_fs_id_diff(
async fn continue_after_unmount_on_fs_id_diff(
fstype: &FileSystem,
device_path: &str,
fs_staging_path: &str,
volume_uuid: &Uuid,
) -> Result<bool, String> {
fstype
.fs_ops()?
.unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid)?;
.unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid)
.await?;
Ok(fstype == &Fs::Xfs.into())
}
1 change: 1 addition & 0 deletions control-plane/csi-driver/src/bin/node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod nodeplugin_nvme;
#[cfg(target_os = "linux")]
mod nodeplugin_svc;
mod registration;
mod runtime;
/// Shutdown event which lets the plugin know it needs to stop processing new events and
/// complete any existing ones before shutting down.
#[cfg(target_os = "linux")]
Expand Down
1 change: 1 addition & 0 deletions control-plane/csi-driver/src/bin/node/main_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ impl CsiServer {
let node = Node::new(node_name.into(), node_selector, probe_filesystems());
Ok(async move {
Server::builder()
.timeout(std::time::Duration::from_secs(10))
.add_service(NodeServer::new(node))
.add_service(IdentityServer::new(Identity {}))
.add_service(NvmeOperationsServer::new(NvmeOperationsSvc {}))
Expand Down
Loading