Skip to content

Commit

Permalink
Try #1594:
Browse files Browse the repository at this point in the history
  • Loading branch information
mayastor-bors committed Mar 1, 2024
2 parents 091e7da + 1158bc8 commit 4b69109
Show file tree
Hide file tree
Showing 12 changed files with 600 additions and 231 deletions.
14 changes: 8 additions & 6 deletions io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
rebuild::{
HistoryRecord,
NexusRebuildJob,
NexusRebuildJobStarter,
RebuildError,
RebuildJobOptions,
RebuildState,
Expand Down Expand Up @@ -119,7 +120,8 @@ impl<'n> Nexus<'n> {
}?;

// Create a rebuild job for the child.
self.create_rebuild_job(&src_child_uri, &dst_child_uri)
let starter = self
.create_rebuild_job(&src_child_uri, &dst_child_uri)
.await?;

self.event(
Expand All @@ -146,8 +148,8 @@ impl<'n> Nexus<'n> {
.lookup_child(&dst_child_uri)
.and_then(|c| c.stop_io_log());

self.rebuild_job_mut(&dst_child_uri)?
.start(map)
starter
.start(self.rebuild_job_mut(&dst_child_uri)?, map)
.await
.context(nexus_err::RebuildOperation {
job: child_uri.to_owned(),
Expand All @@ -160,7 +162,7 @@ impl<'n> Nexus<'n> {
&self,
src_child_uri: &str,
dst_child_uri: &str,
) -> Result<(), Error> {
) -> Result<NexusRebuildJobStarter, Error> {
let verify_mode = match std::env::var("NEXUS_REBUILD_VERIFY")
.unwrap_or_default()
.as_str()
Expand All @@ -186,7 +188,7 @@ impl<'n> Nexus<'n> {
verify_mode,
};

NexusRebuildJob::new(
NexusRebuildJob::new_starter(
&self.name,
src_child_uri,
dst_child_uri,
Expand All @@ -202,7 +204,7 @@ impl<'n> Nexus<'n> {
},
)
.await
.and_then(NexusRebuildJob::store)
.and_then(NexusRebuildJobStarter::store)
.context(nexus_err::CreateRebuild {
child: dst_child_uri.to_owned(),
name: self.name.clone(),
Expand Down
11 changes: 11 additions & 0 deletions io-engine/src/core/segment_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,15 @@ impl SegmentMap {
pub(crate) fn count_dirty_blks(&self) -> u64 {
self.count_ones() * self.segment_size / self.block_len
}

/// Get the segment size.
pub(crate) fn segment_size_blks(&self) -> u64 {
self.segment_size / self.block_len
}
}

impl From<SegmentMap> for BitVec {
fn from(value: SegmentMap) -> Self {
value.segments
}
}
68 changes: 33 additions & 35 deletions io-engine/src/rebuild/bdev_rebuild.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::{
ops::{Deref, Range},
rc::Rc,
};
use std::ops::{Deref, Range};

use super::{
rebuild_descriptor::RebuildDescriptor,
Expand All @@ -13,7 +10,10 @@ use super::{
SEGMENT_TASKS,
};

use crate::gen_rebuild_instances;
use crate::{
gen_rebuild_instances,
rebuild::rebuilders::{FullRebuild, RangeRebuilder},
};

/// A Bdev rebuild job is responsible for managing a rebuild (copy) which reads
/// from source_hdl and writes into destination_hdl from specified start to end.
Expand Down Expand Up @@ -59,47 +59,50 @@ gen_rebuild_instances!(BdevRebuildJob);
/// A rebuild job which is responsible for rebuilding from
/// source to target of the `RebuildDescriptor`.
pub(super) struct BdevRebuildJobBackend {
/// The next block to be rebuilt.
next: u64,
/// A pool of tasks which perform the actual data rebuild.
task_pool: RebuildTasks,
/// A generic rebuild descriptor.
descriptor: Rc<RebuildDescriptor>,
copier: FullRebuild<RebuildDescriptor>,
/// Notification callback with src and dst uri's.
notify_fn: fn(&str, &str) -> (),
}

#[async_trait::async_trait(?Send)]
impl RebuildBackend for BdevRebuildJobBackend {
fn on_state_change(&mut self) {
(self.notify_fn)(&self.descriptor.src_uri, &self.descriptor.dst_uri);
let desc = self.common_desc();
(self.notify_fn)(&desc.src_uri, &desc.dst_uri);
}

fn common_desc(&self) -> &RebuildDescriptor {
&self.descriptor
self.copier.desc()
}

fn blocks_remaining(&self) -> u64 {
self.copier.blocks_remaining()
}

fn is_partial(&self) -> bool {
self.copier.is_partial()
}

fn task_pool(&self) -> &RebuildTasks {
&self.task_pool
}

fn schedule_task_by_id(&mut self, id: usize) -> bool {
if self.next >= self.descriptor.range.end {
false
} else {
let next = std::cmp::min(
self.next + self.descriptor.segment_size_blks,
self.descriptor.range.end,
);
self.task_pool.schedule_segment_rebuild(
id,
self.next,
self.descriptor.clone(),
);
self.task_pool.active += 1;
self.next = next;
true
}
self.copier
.next()
.map(|blk| {
self.task_pool.schedule_segment_rebuild(
id,
blk,
self.copier.copier(),
);
self.task_pool.active += 1;
true
})
.unwrap_or_default()
}

async fn await_one_task(&mut self) -> Option<TaskResult> {
Expand All @@ -110,7 +113,7 @@ impl RebuildBackend for BdevRebuildJobBackend {
impl std::fmt::Debug for BdevRebuildJobBackend {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BdevRebuildJob")
.field("next", &self.next)
.field("next", &self.copier.peek_next())
.finish()
}
}
Expand All @@ -130,15 +133,10 @@ impl BdevRebuildJobBackend {
notify_fn: fn(&str, &str) -> (),
descriptor: RebuildDescriptor,
) -> Result<Self, RebuildError> {
let be = Self {
next: descriptor.range.start,
Ok(Self {
task_pool,
descriptor: Rc::new(descriptor),
copier: FullRebuild::new(descriptor),
notify_fn,
};

info!("{be}: backend created");

Ok(be)
})
}
}
3 changes: 2 additions & 1 deletion io-engine/src/rebuild/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ mod rebuild_map;
mod rebuild_state;
mod rebuild_stats;
mod rebuild_task;
mod rebuilders;

pub use bdev_rebuild::BdevRebuildJob;
pub use nexus_rebuild::NexusRebuildJob;
pub use nexus_rebuild::{NexusRebuildJob, NexusRebuildJobStarter};
use rebuild_descriptor::RebuildDescriptor;
pub(crate) use rebuild_error::RebuildError;
use rebuild_job::RebuildOperation;
Expand Down
Loading

0 comments on commit 4b69109

Please sign in to comment.