Skip to content

Commit

Permalink
fix(nexus): fixing missing I/Os during nexus rebuild
Browse files Browse the repository at this point in the history
If a nexus was published after a rebuild started, the child being rebuilt
was not added to writers. This caused I/Os missing the child, leading to
a corrupted replica.

Signed-off-by: Dmitry Savitskiy <dmitry.savitskiy@datacore.com>
  • Loading branch information
dsavitskiy committed Feb 1, 2024
1 parent 3dcb080 commit f8d5eda
Show file tree
Hide file tree
Showing 5 changed files with 495 additions and 35 deletions.
3 changes: 3 additions & 0 deletions io-engine/src/bdev/nexus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ pub static ENABLE_PARTIAL_REBUILD: AtomicBool = AtomicBool::new(true);
/// Enables/disables nexus reset logic.
pub static ENABLE_NEXUS_RESET: AtomicBool = AtomicBool::new(false);

/// Enables/disables additional nexus I/O channel debugging.
pub static ENABLE_NEXUS_CHANNEL_DEBUG: AtomicBool = AtomicBool::new(false);

/// Whether the nexus channel should have readers/writers configured.
/// This must be set true ONLY from tests.
pub static ENABLE_IO_ALL_THRD_NX_CHAN: AtomicBool = AtomicBool::new(false);
134 changes: 102 additions & 32 deletions io-engine/src/bdev/nexus/nexus_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ impl<'n> Debug for NexusChannel<'n> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"I/O chan '{nex}' core:{core}({cur}) [R:{r} W:{w} L:{l} C:{c}]",
"{io} chan '{nex}' core:{core}({cur}) [R:{r} W:{w} L:{l} C:{c}]",
io = if self.is_io_chan { "I/O" } else { "Aux" },
nex = self.nexus.nexus_name(),
core = self.core,
cur = Cores::current(),
Expand Down Expand Up @@ -76,37 +77,23 @@ impl Display for DrEvent {
}
}

#[inline(always)]
fn is_channel_debug_enabled() -> bool {
super::ENABLE_NEXUS_CHANNEL_DEBUG.load(Ordering::SeqCst)
}

impl<'n> NexusChannel<'n> {
/// TODO
/// Creates a new nexus I/O channel.
pub(crate) fn new(nexus: Pin<&mut Nexus<'n>>) -> Self {
debug!("{nexus:?}: new channel on core {c}", c = Cores::current());

let b_init_thrd_hdls =
super::ENABLE_IO_ALL_THRD_NX_CHAN.load(Ordering::SeqCst);

let is_io_chan =
Thread::current().unwrap() != Thread::primary() || b_init_thrd_hdls;

let mut writers = Vec::new();
let mut readers = Vec::new();

if is_io_chan {
nexus.children_iter().filter(|c| c.is_healthy()).for_each(
|c| match (c.get_io_handle(), c.get_io_handle()) {
(Ok(w), Ok(r)) => {
writers.push(w);
readers.push(r);
}
_ => {
c.set_faulted_state(FaultReason::CantOpen);
error!(
"Failed to get I/O handle for {c}, \
skipping block device",
c = c.uri()
)
}
},
);
} else {
if !is_io_chan {
// If we are here, this means the nexus channel being created is not
// the one to be used for normal IOs. Such a channel is
// created in rebuild path today, and it's on the init
Expand All @@ -118,12 +105,16 @@ impl<'n> NexusChannel<'n> {
// And the rebuild IOs are dispatched by
// directly calling write API without going via writers abstraction.
// Refer GTM-1075 for the race condition details.
debug!("{nexus:?}: skip nexus channel setup({t:?}). is_io_channel: {is_io_chan}", t = Thread::current().unwrap());
debug!(
"{nexus:?}: skipping nexus channel setup on init thread \
({t:?}): not I/O channel",
t = Thread::current().unwrap()
);
}

Self {
writers,
readers,
let mut res = Self {
writers: Vec::new(),
readers: Vec::new(),
detached: Vec::new(),
io_logs: nexus.io_log_channels(),
previous_reader: UnsafeCell::new(0),
Expand All @@ -133,10 +124,19 @@ impl<'n> NexusChannel<'n> {
frozen_ios: Vec::new(),
core: Cores::current(),
is_io_chan,
};

res.connect_children();

if is_channel_debug_enabled() {
debug!("{res:?}: after new channel creation:");
res.dump_dbg();
}

res
}

/// TODO
/// Destroys a nexus I/O channel.
pub(crate) fn destroy(mut self) {
debug!(
"{nex:?}: destroying I/O channel on core {core}",
Expand All @@ -145,6 +145,7 @@ impl<'n> NexusChannel<'n> {
);
self.writers.clear();
self.readers.clear();
self.detached.clear();
self.io_logs.clear();
}

Expand Down Expand Up @@ -238,6 +239,11 @@ impl<'n> NexusChannel<'n> {
}

debug!("{self:?}: device '{device_name}' detached");

if is_channel_debug_enabled() {
debug!("{self:?}: after detach:");
self.dump_dbg();
}
}

/// Disconnects previously detached device handles by dropping them.
Expand Down Expand Up @@ -269,11 +275,34 @@ impl<'n> NexusChannel<'n> {
/// we simply put back all the channels, and reopen the bdevs that are in
/// the online state.
pub(crate) fn reconnect_all(&mut self) {
debug!("{self:?}: child devices reconnecting...");

if is_channel_debug_enabled() {
debug!("{self:?}: before reconnection:");
self.dump_dbg();
}

// clear the vector of channels and reset other internal values,
// clearing the values will drop any existing handles in the
// channel
self.previous_reader = UnsafeCell::new(0);

if self.is_io_channel() {
self.connect_children();
}

self.reconnect_io_logs();

if is_channel_debug_enabled() {
debug!("{self:?}: after reconnection:");
self.dump_dbg();
}

debug!("{self:?}: child devices reconnected");
}

/// (Re)connects readers and writes.
fn connect_children(&mut self) {
// nvmx will drop the I/O qpairs which is different from all other
// bdevs we might be dealing with. So instead of clearing and refreshing
// which had no side effects before, we create a new vector and
Expand All @@ -290,6 +319,8 @@ impl<'n> NexusChannel<'n> {
(Ok(w), Ok(r)) => {
writers.push(w);
readers.push(r);

debug!("{self:?}: connecting child device : {c:?}");
}
_ => {
c.set_faulted_state(FaultReason::CantOpen);
Expand Down Expand Up @@ -322,10 +353,6 @@ impl<'n> NexusChannel<'n> {

self.writers = writers;
self.readers = readers;

self.reconnect_io_logs();

debug!("{self:?}: child devices reconnected");
}

/// Reconnects all active I/O logs.
Expand Down Expand Up @@ -394,4 +421,47 @@ impl<'n> NexusChannel<'n> {
trace!("{io:?}: freezing I/O");
self.frozen_ios.push(io)
}

/// Prints elaborate debug info to the logs.
fn dump_dbg(&self) {
let me = format!(
"{self:p} [{io} {c}]",
io = if self.is_io_chan { "I/O" } else { "aux" },
c = self.core,
);

debug!("{me}: debug info: {self:?}");

debug!("{me}: {n} children:", n = self.nexus().child_count());
self.nexus().children_iter().for_each(|c| {
debug!(
"{me}: {dev}: {c:?}",
dev = c.get_device_name().unwrap_or("-".to_string()),
)
});

fn dbg_devs(
prefix: &str,
name: &str,
devs: &Vec<Box<dyn BlockDeviceHandle>>,
) {
if devs.is_empty() {
debug!("{prefix}: no {name}");
} else {
debug!("{prefix}: {n} {name}:", n = devs.len());
devs.iter().for_each(|dev| {
debug!(
"{prefix}: {d}",
d = dev.get_device().device_name()
);
});
}
}

dbg_devs(&me, "readers", &self.readers);
dbg_devs(&me, "writers", &self.writers);
dbg_devs(&me, "detached", &self.detached);

debug!("{me}: (end)");
}
}
4 changes: 2 additions & 2 deletions io-engine/src/bdev/nvmx/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,15 +303,15 @@ impl DeviceIoController for NvmeDeviceIoController {
* Lookup target NVMeOF device by its name (starts with nvmf://).
*/
pub fn lookup_by_name(name: &str) -> Option<Box<dyn BlockDevice>> {
debug!("Searching NVMe devices for '{}'...", name);
trace!("Searching NVMe devices for '{}'...", name);
if let Some(c) = NVME_CONTROLLERS.lookup_by_name(name) {
let controller = c.lock();
// Make sure controller is available.
if controller.get_state() == NvmeControllerState::Running {
let ns = controller
.namespace()
.expect("no namespaces for this controller");
debug!("NVMe device found: '{}'", name);
trace!("NVMe device found: '{}'", name);
return Some(Box::new(NvmeBlockDevice::from_ns(name, ns)));
}
}
Expand Down
11 changes: 10 additions & 1 deletion io-engine/src/bin/io-engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ use futures::future::FutureExt;

use io_engine::{
bdev::{
nexus::{ENABLE_NEXUS_RESET, ENABLE_PARTIAL_REBUILD},
nexus::{
ENABLE_NEXUS_CHANNEL_DEBUG,
ENABLE_NEXUS_RESET,
ENABLE_PARTIAL_REBUILD,
},
util::uring,
},
core::{
Expand Down Expand Up @@ -83,6 +87,11 @@ fn start_tokio_runtime(args: &MayastorCliArgs) {
warn!("Nexus reset is disabled");
}

if args.enable_nexus_channel_debug {
ENABLE_NEXUS_CHANNEL_DEBUG.store(true, Ordering::SeqCst);
warn!("Nexus channel debug is enabled");
}

print_feature!("Async QPair connection", "spdk-async-qpair-connect");
print_feature!("Fault injection", "fault-injection");

Expand Down
Loading

0 comments on commit f8d5eda

Please sign in to comment.