Skip to content

Commit 5a5abab

Browse files
committed
fix(nexus): fixing missing I/Os during nexus rebuild
If a nexus was published after a rebuild started, the child being rebuilt was not added to writers. This caused I/Os missing the child, leading to a corrupted replica. Signed-off-by: Dmitry Savitskiy <dmitry.savitskiy@datacore.com>
1 parent dd329b3 commit 5a5abab

File tree

5 files changed

+495
-35
lines changed

5 files changed

+495
-35
lines changed

io-engine/src/bdev/nexus/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,9 @@ pub static ENABLE_PARTIAL_REBUILD: AtomicBool = AtomicBool::new(true);
191191
/// Enables/disables nexus reset logic.
192192
pub static ENABLE_NEXUS_RESET: AtomicBool = AtomicBool::new(false);
193193

194+
/// Enables/disables additional nexus I/O channel debugging.
195+
pub static ENABLE_NEXUS_CHANNEL_DEBUG: AtomicBool = AtomicBool::new(false);
196+
194197
/// Whether the nexus channel should have readers/writers configured.
195198
/// This must be set true ONLY from tests.
196199
pub static ENABLE_IO_ALL_THRD_NX_CHAN: AtomicBool = AtomicBool::new(false);

io-engine/src/bdev/nexus/nexus_channel.rs

Lines changed: 102 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ impl<'n> Debug for NexusChannel<'n> {
3232
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3333
write!(
3434
f,
35-
"I/O chan '{nex}' core:{core}({cur}) [R:{r} W:{w} L:{l} C:{c}]",
35+
"{io} chan '{nex}' core:{core}({cur}) [R:{r} W:{w} L:{l} C:{c}]",
36+
io = if self.is_io_chan { "I/O" } else { "Aux" },
3637
nex = self.nexus.nexus_name(),
3738
core = self.core,
3839
cur = Cores::current(),
@@ -76,37 +77,23 @@ impl Display for DrEvent {
7677
}
7778
}
7879

80+
#[inline(always)]
81+
fn is_channel_debug_enabled() -> bool {
82+
super::ENABLE_NEXUS_CHANNEL_DEBUG.load(Ordering::SeqCst)
83+
}
84+
7985
impl<'n> NexusChannel<'n> {
80-
/// TODO
86+
/// Creates a new nexus I/O channel.
8187
pub(crate) fn new(nexus: Pin<&mut Nexus<'n>>) -> Self {
8288
debug!("{nexus:?}: new channel on core {c}", c = Cores::current());
8389

8490
let b_init_thrd_hdls =
8591
super::ENABLE_IO_ALL_THRD_NX_CHAN.load(Ordering::SeqCst);
92+
8693
let is_io_chan =
8794
Thread::current().unwrap() != Thread::primary() || b_init_thrd_hdls;
8895

89-
let mut writers = Vec::new();
90-
let mut readers = Vec::new();
91-
92-
if is_io_chan {
93-
nexus.children_iter().filter(|c| c.is_healthy()).for_each(
94-
|c| match (c.get_io_handle(), c.get_io_handle()) {
95-
(Ok(w), Ok(r)) => {
96-
writers.push(w);
97-
readers.push(r);
98-
}
99-
_ => {
100-
c.set_faulted_state(FaultReason::CantOpen);
101-
error!(
102-
"Failed to get I/O handle for {c}, \
103-
skipping block device",
104-
c = c.uri()
105-
)
106-
}
107-
},
108-
);
109-
} else {
96+
if !is_io_chan {
11097
// If we are here, this means the nexus channel being created is not
11198
// the one to be used for normal IOs. Such a channel is
11299
// created in rebuild path today, and it's on the init
@@ -118,12 +105,16 @@ impl<'n> NexusChannel<'n> {
118105
// And the rebuild IOs are dispatched by
119106
// directly calling write API without going via writers abstraction.
120107
// Refer GTM-1075 for the race condition details.
121-
debug!("{nexus:?}: skip nexus channel setup({t:?}). is_io_channel: {is_io_chan}", t = Thread::current().unwrap());
108+
debug!(
109+
"{nexus:?}: skipping nexus channel setup on init thread \
110+
({t:?}): not I/O channel",
111+
t = Thread::current().unwrap()
112+
);
122113
}
123114

124-
Self {
125-
writers,
126-
readers,
115+
let mut res = Self {
116+
writers: Vec::new(),
117+
readers: Vec::new(),
127118
detached: Vec::new(),
128119
io_logs: nexus.io_log_channels(),
129120
previous_reader: UnsafeCell::new(0),
@@ -133,10 +124,19 @@ impl<'n> NexusChannel<'n> {
133124
frozen_ios: Vec::new(),
134125
core: Cores::current(),
135126
is_io_chan,
127+
};
128+
129+
res.connect_children();
130+
131+
if is_channel_debug_enabled() {
132+
debug!("{res:?}: after new channel creation:");
133+
res.dump_dbg();
136134
}
135+
136+
res
137137
}
138138

139-
/// TODO
139+
/// Destroys a nexus I/O channel.
140140
pub(crate) fn destroy(mut self) {
141141
debug!(
142142
"{nex:?}: destroying I/O channel on core {core}",
@@ -145,6 +145,7 @@ impl<'n> NexusChannel<'n> {
145145
);
146146
self.writers.clear();
147147
self.readers.clear();
148+
self.detached.clear();
148149
self.io_logs.clear();
149150
}
150151

@@ -238,6 +239,11 @@ impl<'n> NexusChannel<'n> {
238239
}
239240

240241
debug!("{self:?}: device '{device_name}' detached");
242+
243+
if is_channel_debug_enabled() {
244+
debug!("{self:?}: after detach:");
245+
self.dump_dbg();
246+
}
241247
}
242248

243249
/// Disconnects previously detached device handles by dropping them.
@@ -269,11 +275,34 @@ impl<'n> NexusChannel<'n> {
269275
/// we simply put back all the channels, and reopen the bdevs that are in
270276
/// the online state.
271277
pub(crate) fn reconnect_all(&mut self) {
278+
debug!("{self:?}: child devices reconnecting...");
279+
280+
if is_channel_debug_enabled() {
281+
debug!("{self:?}: before reconnection:");
282+
self.dump_dbg();
283+
}
284+
272285
// clear the vector of channels and reset other internal values,
273286
// clearing the values will drop any existing handles in the
274287
// channel
275288
self.previous_reader = UnsafeCell::new(0);
276289

290+
if self.is_io_channel() {
291+
self.connect_children();
292+
}
293+
294+
self.reconnect_io_logs();
295+
296+
if is_channel_debug_enabled() {
297+
debug!("{self:?}: after reconnection:");
298+
self.dump_dbg();
299+
}
300+
301+
debug!("{self:?}: child devices reconnected");
302+
}
303+
304+
/// (Re)connects readers and writes.
305+
fn connect_children(&mut self) {
277306
// nvmx will drop the I/O qpairs which is different from all other
278307
// bdevs we might be dealing with. So instead of clearing and refreshing
279308
// which had no side effects before, we create a new vector and
@@ -290,6 +319,8 @@ impl<'n> NexusChannel<'n> {
290319
(Ok(w), Ok(r)) => {
291320
writers.push(w);
292321
readers.push(r);
322+
323+
debug!("{self:?}: connecting child device : {c:?}");
293324
}
294325
_ => {
295326
c.set_faulted_state(FaultReason::CantOpen);
@@ -322,10 +353,6 @@ impl<'n> NexusChannel<'n> {
322353

323354
self.writers = writers;
324355
self.readers = readers;
325-
326-
self.reconnect_io_logs();
327-
328-
debug!("{self:?}: child devices reconnected");
329356
}
330357

331358
/// Reconnects all active I/O logs.
@@ -394,4 +421,47 @@ impl<'n> NexusChannel<'n> {
394421
trace!("{io:?}: freezing I/O");
395422
self.frozen_ios.push(io)
396423
}
424+
425+
/// Prints elaborate debug info to the logs.
426+
fn dump_dbg(&self) {
427+
let me = format!(
428+
"{self:p} [{io} {c}]",
429+
io = if self.is_io_chan { "I/O" } else { "aux" },
430+
c = self.core,
431+
);
432+
433+
debug!("{me}: debug info: {self:?}");
434+
435+
debug!("{me}: {n} children:", n = self.nexus().child_count());
436+
self.nexus().children_iter().for_each(|c| {
437+
debug!(
438+
"{me}: {dev}: {c:?}",
439+
dev = c.get_device_name().unwrap_or("-".to_string()),
440+
)
441+
});
442+
443+
fn dbg_devs(
444+
prefix: &str,
445+
name: &str,
446+
devs: &Vec<Box<dyn BlockDeviceHandle>>,
447+
) {
448+
if devs.is_empty() {
449+
debug!("{prefix}: no {name}");
450+
} else {
451+
debug!("{prefix}: {n} {name}:", n = devs.len());
452+
devs.iter().for_each(|dev| {
453+
debug!(
454+
"{prefix}: {d}",
455+
d = dev.get_device().device_name()
456+
);
457+
});
458+
}
459+
}
460+
461+
dbg_devs(&me, "readers", &self.readers);
462+
dbg_devs(&me, "writers", &self.writers);
463+
dbg_devs(&me, "detached", &self.detached);
464+
465+
debug!("{me}: (end)");
466+
}
397467
}

io-engine/src/bdev/nvmx/device.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,15 +303,15 @@ impl DeviceIoController for NvmeDeviceIoController {
303303
* Lookup target NVMeOF device by its name (starts with nvmf://).
304304
*/
305305
pub fn lookup_by_name(name: &str) -> Option<Box<dyn BlockDevice>> {
306-
debug!("Searching NVMe devices for '{}'...", name);
306+
trace!("Searching NVMe devices for '{}'...", name);
307307
if let Some(c) = NVME_CONTROLLERS.lookup_by_name(name) {
308308
let controller = c.lock();
309309
// Make sure controller is available.
310310
if controller.get_state() == NvmeControllerState::Running {
311311
let ns = controller
312312
.namespace()
313313
.expect("no namespaces for this controller");
314-
debug!("NVMe device found: '{}'", name);
314+
trace!("NVMe device found: '{}'", name);
315315
return Some(Box::new(NvmeBlockDevice::from_ns(name, ns)));
316316
}
317317
}

io-engine/src/bin/io-engine.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@ use futures::future::FutureExt;
77

88
use io_engine::{
99
bdev::{
10-
nexus::{ENABLE_NEXUS_RESET, ENABLE_PARTIAL_REBUILD},
10+
nexus::{
11+
ENABLE_NEXUS_CHANNEL_DEBUG,
12+
ENABLE_NEXUS_RESET,
13+
ENABLE_PARTIAL_REBUILD,
14+
},
1115
util::uring,
1216
},
1317
core::{
@@ -83,6 +87,11 @@ fn start_tokio_runtime(args: &MayastorCliArgs) {
8387
warn!("Nexus reset is disabled");
8488
}
8589

90+
if args.enable_nexus_channel_debug {
91+
ENABLE_NEXUS_CHANNEL_DEBUG.store(true, Ordering::SeqCst);
92+
warn!("Nexus channel debug is enabled");
93+
}
94+
8695
print_feature!("Async QPair connection", "spdk-async-qpair-connect");
8796
print_feature!("Fault injection", "fault-injection");
8897

0 commit comments

Comments
 (0)