Skip to content

Commit

Permalink
Try #1775:
Browse files Browse the repository at this point in the history
  • Loading branch information
mayastor-bors committed Nov 26, 2024
2 parents 40572c9 + a5dd255 commit 05f585a
Show file tree
Hide file tree
Showing 18 changed files with 321 additions and 79 deletions.
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[submodule "spdk-rs"]
path = spdk-rs
url = https://github.com/openebs/spdk-rs
url = ../spdk-rs.git
branch = release/2.7
[submodule "utils/dependencies"]
path = utils/dependencies
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions io-engine-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,28 @@ pub fn truncate_file_bytes(path: &str, size: u64) {
assert!(output.status.success());
}

/// Automatically assign a loopdev to path
pub fn setup_loopdev_file(path: &str, sector_size: Option<u64>) -> String {
let log_sec = sector_size.unwrap_or(512);

let output = Command::new("losetup")
.args(["-f", "--show", "-b", &format!("{log_sec}"), path])
.output()
.expect("failed exec losetup");
assert!(output.status.success());
// return the assigned loop device
String::from_utf8(output.stdout).unwrap().trim().to_string()
}

/// Detach the provided loop device.
pub fn detach_loopdev(dev: &str) {
let output = Command::new("losetup")
.args(["-d", dev])
.output()
.expect("failed exec losetup");
assert!(output.status.success());
}

pub fn fscheck(device: &str) {
let output = Command::new("fsck")
.args([device, "-n"])
Expand Down
5 changes: 3 additions & 2 deletions io-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ libc = "0.2.149"
log = "0.4.20"
md5 = "0.7.0"
merge = "0.1.0"
nix = { version = "0.27.1", default-features = false, features = [ "hostname", "net", "socket", "ioctl" ] }
nix = { version = "0.27.1", default-features = false, features = ["hostname", "net", "socket", "ioctl"] }
once_cell = "1.18.0"
parking_lot = "0.12.1"
pin-utils = "0.1.0"
Expand Down Expand Up @@ -98,9 +98,10 @@ async-process = { version = "1.8.1" }
rstack = { version = "0.3.3" }
tokio-stream = "0.1.14"
rustls = "0.21.12"
either = "1.9.0"

devinfo = { path = "../utils/dependencies/devinfo" }
jsonrpc = { path = "../jsonrpc"}
jsonrpc = { path = "../jsonrpc" }
io-engine-api = { path = "../utils/dependencies/apis/io-engine" }
spdk-rs = { path = "../spdk-rs" }
sysfs = { path = "../sysfs" }
Expand Down
16 changes: 13 additions & 3 deletions io-engine/src/bdev/aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
convert::TryFrom,
ffi::CString,
fmt::{Debug, Formatter},
os::unix::fs::FileTypeExt,
};

use async_trait::async_trait;
Expand All @@ -29,7 +30,7 @@ pub(super) struct Aio {

impl Debug for Aio {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Aio '{}'", self.name)
write!(f, "Aio '{}', 'blk_size: {}'", self.name, self.blk_size)
}
}

Expand All @@ -47,6 +48,10 @@ impl TryFrom<&Url> for Aio {
});
}

let path_is_blockdev = std::fs::metadata(url.path())
.ok()
.map_or(false, |meta| meta.file_type().is_block_device());

let mut parameters: HashMap<String, String> =
url.query_pairs().into_owned().collect();

Expand All @@ -58,9 +63,14 @@ impl TryFrom<&Url> for Aio {
value: value.clone(),
})?
}
None => 512,
None => {
if path_is_blockdev {
0
} else {
512
}
}
};

let uuid = uri::uuid(parameters.remove("uuid")).context(
bdev_api::UuidParamParseFailed {
uri: url.to_string(),
Expand Down
9 changes: 5 additions & 4 deletions io-engine/src/bdev/nexus/nexus_bdev_children.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1152,10 +1152,11 @@ impl<'n> Nexus<'n> {
// Cancel rebuild job for this child, if any.
if let Some(job) = child.rebuild_job() {
debug!("{self:?}: retire: stopping rebuild job...");
let terminated = job.force_fail();
Reactors::master().send_future(async move {
terminated.await.ok();
});
if let either::Either::Left(terminated) = job.force_fail() {
Reactors::master().send_future(async move {
terminated.await.ok();
});
}
}

debug!("{child:?}: retire: enqueuing device '{dev}' to retire");
Expand Down
22 changes: 14 additions & 8 deletions io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,15 +247,18 @@ impl<'n> Nexus<'n> {
async fn terminate_rebuild(&self, child_uri: &str) {
// If a rebuild job is not found that's ok
// as we were just going to remove it anyway.
if let Ok(rj) = self.rebuild_job_mut(child_uri) {
let ch = rj.force_stop();
if let Err(e) = ch.await {
error!(
"Failed to wait on rebuild job for child {child_uri} \
let Ok(rj) = self.rebuild_job_mut(child_uri) else {
return;
};
let either::Either::Left(ch) = rj.force_stop() else {
return;
};
if let Err(e) = ch.await {
error!(
"Failed to wait on rebuild job for child {child_uri} \
to terminate with error {}",
e.verbose()
);
}
e.verbose()
);
}
}

Expand Down Expand Up @@ -355,6 +358,9 @@ impl<'n> Nexus<'n> {

// wait for the jobs to complete terminating
for job in terminated_jobs {
let either::Either::Left(job) = job else {
continue;
};
if let Err(e) = job.await {
error!(
"{:?}: error when waiting for the rebuild job \
Expand Down
11 changes: 6 additions & 5 deletions io-engine/src/bdev/nvmx/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ pub enum NvmeAerInfoNvmCommandSet {

/// Check if the Completion Queue Entry indicates abnormal termination of
/// request due to any of the following conditions:
/// - Any media specific errors that occur in the NVM or data integrity type
/// errors.
/// - An Status Code Type(SCT) of media specific errors that occur in the NVM
/// or data integrity type errors, AND a Status Code(SC) value pertaining to
/// one of the below:
/// - The command was aborted due to an end-to-end guard check failure.
/// - The command was aborted due to an end-to-end application tag check
/// failure.
Expand All @@ -59,9 +60,9 @@ pub(crate) fn nvme_cpl_is_pi_error(cpl: *const spdk_nvme_cpl) -> bool {
}

sct == NvmeStatusCodeType::MediaError as u16
|| sc == NvmeMediaErrorStatusCode::Guard as u16
|| sc == NvmeMediaErrorStatusCode::ApplicationTag as u16
|| sc == NvmeMediaErrorStatusCode::ReferenceTag as u16
&& (sc == NvmeMediaErrorStatusCode::Guard as u16
|| sc == NvmeMediaErrorStatusCode::ApplicationTag as u16
|| sc == NvmeMediaErrorStatusCode::ReferenceTag as u16)
}

#[inline]
Expand Down
19 changes: 17 additions & 2 deletions io-engine/src/bdev/uring.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{collections::HashMap, convert::TryFrom, ffi::CString};
use std::{
collections::HashMap,
convert::TryFrom,
ffi::CString,
os::unix::fs::FileTypeExt,
};

use async_trait::async_trait;
use futures::channel::oneshot;
Expand Down Expand Up @@ -36,6 +41,10 @@ impl TryFrom<&Url> for Uring {
});
}

let path_is_blockdev = std::fs::metadata(url.path())
.ok()
.map_or(false, |meta| meta.file_type().is_block_device());

let mut parameters: HashMap<String, String> =
url.query_pairs().into_owned().collect();

Expand All @@ -47,7 +56,13 @@ impl TryFrom<&Url> for Uring {
value: value.clone(),
})?
}
None => 512,
None => {
if path_is_blockdev {
0
} else {
512
}
}
};

let uuid = uri::uuid(parameters.remove("uuid")).context(
Expand Down
5 changes: 4 additions & 1 deletion io-engine/src/grpc/v1/snapshot_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ impl SnapshotRebuildRpc for SnapshotRebuildService {
let Ok(job) = SnapshotRebuildJob::lookup(&args.uuid) else {
return Err(tonic::Status::not_found(""));
};
let rx = job.force_stop().await.ok();
let rx = match job.force_stop() {
either::Either::Left(chan) => chan.await,
either::Either::Right(stopped) => Ok(stopped),
};
info!("Snapshot Rebuild stopped: {rx:?}");
job.destroy();
Ok(())
Expand Down
5 changes: 4 additions & 1 deletion io-engine/src/rebuild/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ impl WithinRange<u64> for std::ops::Range<u64> {
/// Shutdown all pending snapshot rebuilds.
pub(crate) async fn shutdown_snapshot_rebuilds() {
let jobs = SnapshotRebuildJob::list().into_iter();
for recv in jobs.map(|job| job.force_stop()).collect::<Vec<_>>() {
for recv in jobs
.flat_map(|job| job.force_stop().left())
.collect::<Vec<_>>()
{
recv.await.ok();
}
}
Expand Down
29 changes: 12 additions & 17 deletions io-engine/src/rebuild/rebuild_descriptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ pub(super) struct RebuildDescriptor {
/// Pre-opened descriptor for the source block device.
#[allow(clippy::non_send_fields_in_send_ty)]
pub(super) src_descriptor: Box<dyn BlockDeviceDescriptor>,
pub(super) src_handle: Box<dyn BlockDeviceHandle>,
/// Pre-opened descriptor for destination block device.
#[allow(clippy::non_send_fields_in_send_ty)]
pub(super) dst_descriptor: Box<dyn BlockDeviceDescriptor>,
pub(super) dst_handle: Box<dyn BlockDeviceHandle>,
/// Start time of this rebuild.
pub(super) start_time: DateTime<Utc>,
}
Expand Down Expand Up @@ -90,9 +92,8 @@ impl RebuildDescriptor {
});
}

let source_hdl = RebuildDescriptor::io_handle(&*src_descriptor).await?;
let destination_hdl =
RebuildDescriptor::io_handle(&*dst_descriptor).await?;
let src_handle = RebuildDescriptor::io_handle(&*src_descriptor).await?;
let dst_handle = RebuildDescriptor::io_handle(&*dst_descriptor).await?;

let range = match range {
None => {
Expand All @@ -105,8 +106,8 @@ impl RebuildDescriptor {
};

if !Self::validate(
source_hdl.get_device(),
destination_hdl.get_device(),
src_handle.get_device(),
dst_handle.get_device(),
&range,
) {
return Err(RebuildError::InvalidSrcDstRange {});
Expand All @@ -123,7 +124,9 @@ impl RebuildDescriptor {
block_size,
segment_size_blks,
src_descriptor,
src_handle,
dst_descriptor,
dst_handle,
start_time: Utc::now(),
})
}
Expand Down Expand Up @@ -173,18 +176,14 @@ impl RebuildDescriptor {

/// Get a `BlockDeviceHandle` for the source.
#[inline(always)]
pub(super) async fn src_io_handle(
&self,
) -> Result<Box<dyn BlockDeviceHandle>, RebuildError> {
Self::io_handle(&*self.src_descriptor).await
pub(super) fn src_io_handle(&self) -> &dyn BlockDeviceHandle {
self.src_handle.as_ref()
}

/// Get a `BlockDeviceHandle` for the destination.
#[inline(always)]
pub(super) async fn dst_io_handle(
&self,
) -> Result<Box<dyn BlockDeviceHandle>, RebuildError> {
Self::io_handle(&*self.dst_descriptor).await
pub(super) fn dst_io_handle(&self) -> &dyn BlockDeviceHandle {
self.dst_handle.as_ref()
}

/// Get a `BlockDeviceHandle` for the given block device descriptor.
Expand Down Expand Up @@ -231,7 +230,6 @@ impl RebuildDescriptor {
) -> Result<bool, RebuildError> {
match self
.src_io_handle()
.await?
.readv_blocks_async(
iovs,
offset_blk,
Expand Down Expand Up @@ -269,7 +267,6 @@ impl RebuildDescriptor {
iovs: &[IoVec],
) -> Result<(), RebuildError> {
self.dst_io_handle()
.await?
.writev_blocks_async(
iovs,
offset_blk,
Expand All @@ -291,7 +288,6 @@ impl RebuildDescriptor {
) -> Result<(), RebuildError> {
// Read the source again.
self.src_io_handle()
.await?
.readv_blocks_async(
iovs,
offset_blk,
Expand All @@ -306,7 +302,6 @@ impl RebuildDescriptor {

match self
.dst_io_handle()
.await?
.comparev_blocks_async(
iovs,
offset_blk,
Expand Down
17 changes: 12 additions & 5 deletions io-engine/src/rebuild/rebuild_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,17 @@ impl RebuildJob {

/// Forcefully stops the job, overriding any pending client operation
/// returns an async channel which can be used to await for termination.
pub(crate) fn force_stop(&self) -> oneshot::Receiver<RebuildState> {
pub(crate) fn force_stop(
&self,
) -> either::Either<oneshot::Receiver<RebuildState>, RebuildState> {
self.force_terminate(RebuildOperation::Stop)
}

/// Forcefully fails the job, overriding any pending client operation
/// returns an async channel which can be used to await for termination.
pub(crate) fn force_fail(&self) -> oneshot::Receiver<RebuildState> {
pub(crate) fn force_fail(
&self,
) -> either::Either<oneshot::Receiver<RebuildState>, RebuildState> {
self.force_terminate(RebuildOperation::Fail)
}

Expand All @@ -179,10 +183,13 @@ impl RebuildJob {
fn force_terminate(
&self,
op: RebuildOperation,
) -> oneshot::Receiver<RebuildState> {
) -> either::Either<oneshot::Receiver<RebuildState>, RebuildState> {
self.exec_internal_op(op).ok();
self.add_completion_listener()
.unwrap_or_else(|_| oneshot::channel().1)

match self.add_completion_listener() {
Ok(chan) => either::Either::Left(chan),
Err(_) => either::Either::Right(self.state()),
}
}

/// Get the rebuild stats.
Expand Down
Loading

0 comments on commit 05f585a

Please sign in to comment.