Skip to content

Commit

Permalink
refactor(rebuild): use new rebuild rangers
Browse files Browse the repository at this point in the history
Make use of the rebuild rangers to configure rebuild types.
This allows us to remove the setting of the rebuild map being done
after the rebuild job is created for the nexus and removing it from
the shared rebuild descriptor.

The nexus still uses the partial but sequential rebuild to reduce
the scope of changes.
Once the fully partial rebuild is validated we can switch the nexus
to it.

Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>
  • Loading branch information
tiagolobocastro committed Feb 29, 2024
1 parent 5c72e5f commit 22c704b
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 219 deletions.
14 changes: 8 additions & 6 deletions io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
rebuild::{
HistoryRecord,
NexusRebuildJob,
NexusRebuildJobStarter,
RebuildError,
RebuildJobOptions,
RebuildState,
Expand Down Expand Up @@ -119,7 +120,8 @@ impl<'n> Nexus<'n> {
}?;

// Create a rebuild job for the child.
self.create_rebuild_job(&src_child_uri, &dst_child_uri)
let starter = self
.create_rebuild_job(&src_child_uri, &dst_child_uri)
.await?;

self.event(
Expand All @@ -146,8 +148,8 @@ impl<'n> Nexus<'n> {
.lookup_child(&dst_child_uri)
.and_then(|c| c.stop_io_log());

self.rebuild_job_mut(&dst_child_uri)?
.start(map)
starter
.start(self.rebuild_job_mut(&dst_child_uri)?, map)
.await
.context(nexus_err::RebuildOperation {
job: child_uri.to_owned(),
Expand All @@ -160,7 +162,7 @@ impl<'n> Nexus<'n> {
&self,
src_child_uri: &str,
dst_child_uri: &str,
) -> Result<(), Error> {
) -> Result<NexusRebuildJobStarter, Error> {
let verify_mode = match std::env::var("NEXUS_REBUILD_VERIFY")
.unwrap_or_default()
.as_str()
Expand All @@ -186,7 +188,7 @@ impl<'n> Nexus<'n> {
verify_mode,
};

NexusRebuildJob::new(
NexusRebuildJob::new_starter(
&self.name,
src_child_uri,
dst_child_uri,
Expand All @@ -202,7 +204,7 @@ impl<'n> Nexus<'n> {
},
)
.await
.and_then(NexusRebuildJob::store)
.and_then(NexusRebuildJobStarter::store)
.context(nexus_err::CreateRebuild {
child: dst_child_uri.to_owned(),
name: self.name.clone(),
Expand Down
68 changes: 33 additions & 35 deletions io-engine/src/rebuild/bdev_rebuild.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::{
ops::{Deref, Range},
rc::Rc,
};
use std::ops::{Deref, Range};

use super::{
rebuild_descriptor::RebuildDescriptor,
Expand All @@ -13,7 +10,10 @@ use super::{
SEGMENT_TASKS,
};

use crate::gen_rebuild_instances;
use crate::{
gen_rebuild_instances,
rebuild::rebuilders::{FullRebuild, RangeRebuilder},
};

/// A Bdev rebuild job is responsible for managing a rebuild (copy) which reads
/// from source_hdl and writes into destination_hdl from specified start to end.
Expand Down Expand Up @@ -59,47 +59,50 @@ gen_rebuild_instances!(BdevRebuildJob);
/// A rebuild job which is responsible for rebuilding from
/// source to target of the `RebuildDescriptor`.
pub(super) struct BdevRebuildJobBackend {
/// The next block to be rebuilt.
next: u64,
/// A pool of tasks which perform the actual data rebuild.
task_pool: RebuildTasks,
/// A generic rebuild descriptor.
descriptor: Rc<RebuildDescriptor>,
copier: FullRebuild<RebuildDescriptor>,
/// Notification callback with src and dst uri's.
notify_fn: fn(&str, &str) -> (),
}

#[async_trait::async_trait(?Send)]
impl RebuildBackend for BdevRebuildJobBackend {
fn on_state_change(&mut self) {
(self.notify_fn)(&self.descriptor.src_uri, &self.descriptor.dst_uri);
let desc = self.common_desc();
(self.notify_fn)(&desc.src_uri, &desc.dst_uri);
}

fn common_desc(&self) -> &RebuildDescriptor {
&self.descriptor
self.copier.desc()
}

fn blocks_remaining(&self) -> u64 {
self.copier.blocks_remaining()
}

fn is_partial(&self) -> bool {
self.copier.is_partial()
}

fn task_pool(&self) -> &RebuildTasks {
&self.task_pool
}

fn schedule_task_by_id(&mut self, id: usize) -> bool {
if self.next >= self.descriptor.range.end {
false
} else {
let next = std::cmp::min(
self.next + self.descriptor.segment_size_blks,
self.descriptor.range.end,
);
self.task_pool.schedule_segment_rebuild(
id,
self.next,
self.descriptor.clone(),
);
self.task_pool.active += 1;
self.next = next;
true
}
self.copier
.next()
.map(|blk| {
self.task_pool.schedule_segment_rebuild(
id,
blk,
self.copier.copier(),
);
self.task_pool.active += 1;
true
})
.unwrap_or_default()
}

async fn await_one_task(&mut self) -> Option<TaskResult> {
Expand All @@ -110,7 +113,7 @@ impl RebuildBackend for BdevRebuildJobBackend {
impl std::fmt::Debug for BdevRebuildJobBackend {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BdevRebuildJob")
.field("next", &self.next)
.field("next", &self.copier.peek_next())
.finish()
}
}
Expand All @@ -130,15 +133,10 @@ impl BdevRebuildJobBackend {
notify_fn: fn(&str, &str) -> (),
descriptor: RebuildDescriptor,
) -> Result<Self, RebuildError> {
let be = Self {
next: descriptor.range.start,
Ok(Self {
task_pool,
descriptor: Rc::new(descriptor),
copier: FullRebuild::new(descriptor),
notify_fn,
};

info!("{be}: backend created");

Ok(be)
})
}
}
2 changes: 1 addition & 1 deletion io-engine/src/rebuild/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod rebuild_task;
mod rebuilders;

pub use bdev_rebuild::BdevRebuildJob;
pub use nexus_rebuild::NexusRebuildJob;
pub use nexus_rebuild::{NexusRebuildJob, NexusRebuildJobStarter};
use rebuild_descriptor::RebuildDescriptor;
pub(crate) use rebuild_error::RebuildError;
use rebuild_job::RebuildOperation;
Expand Down
Loading

0 comments on commit 22c704b

Please sign in to comment.