Skip to content

Commit

Permalink
chore(bors): merge pull request #804
Browse files Browse the repository at this point in the history
804: CI fixes and disable partial rebuild with cli arg r=tiagolobocastro a=tiagolobocastro


    feat(rebuild): add cli arg to disable partial rebuild
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>

---

    test(rebuild): use only 3 io engines to reduce hugepages
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>

---

    test(snapshot): fix race condition
    
    Kill the agent-core and restart it after only. Otherwise during stop the
    core is still running for a little which the test code does not account for.
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>


Co-authored-by: Tiago Castro <tiagolobocastro@gmail.com>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Apr 10, 2024
2 parents b235ea9 + 489c995 commit 85562ed
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async fn nexus_reconciler(

/// Checks if nexus is Degraded and any child is Faulted. If yes, Depending on rebuild policy for
/// child it performs rebuild operation.
#[tracing::instrument(skip(nexus, context), level = "trace", fields(nexus.uuid = %nexus.uuid(), nexus.node = %nexus.as_ref().node, request.reconcile = true))]
#[tracing::instrument(skip(nexus, volume, context), level = "trace", fields(volume = ?volume.as_ref().map(|v| v.as_ref()), nexus.uuid = %nexus.uuid(), nexus.node = %nexus.as_ref().node, request.reconcile = true))]
pub(super) async fn handle_faulted_children(
nexus: &mut OperationGuardArc<NexusSpec>,
volume: &mut Option<&mut OperationGuardArc<VolumeSpec>>,
Expand Down Expand Up @@ -179,6 +179,12 @@ async fn handle_faulted_child(
tracing::warn!(%child.uri, "Unknown Child found, a full rebuild is required");
return faulted_children_remover(nexus_spec, volume, child, context).await;
};

if context.registry().partial_rebuild_disabled() {
tracing::warn!(%child.uri, child.uuid=%child_uuid, ?child.state_reason, "Partial rebuild disabled, a full rebuild is required");
return faulted_children_remover(nexus_spec, volume, child, context).await;
}

let Some(faulted_at) = child.faulted_at else {
tracing::warn!(%child.uri, child.uuid=%child_uuid, "Child faulted without a timestamp set, a full rebuild required");
return faulted_children_remover(nexus_spec, volume, child, context).await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async fn hot_spare_nexus_reconcile(
squash_results(results)
}

#[tracing::instrument(skip(context, nexus), fields(nexus.uuid = %nexus.uuid(), nexus.node = %nexus.as_ref().node, volume.uuid = tracing::field::Empty, request.reconcile = true))]
#[tracing::instrument(skip(context, volume, nexus), fields(volume = ?volume.as_ref(), nexus.uuid = %nexus.uuid(), nexus.node = %nexus.as_ref().node, volume.uuid = tracing::field::Empty, request.reconcile = true))]
async fn generic_nexus_reconciler(
volume: &mut OperationGuardArc<VolumeSpec>,
nexus: &mut OperationGuardArc<NexusSpec>,
Expand Down
11 changes: 10 additions & 1 deletion control-plane/agents/src/bin/core/controller/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ pub(crate) struct RegistryInner<S: Store> {
/// The duration for which the reconciler waits for the replica to
/// to be healthy again before attempting to online the faulted child.
faulted_child_wait_period: Option<std::time::Duration>,
/// Disable partial rebuild for volume targets.
disable_partial_rebuild: bool,
reconciler: ReconcilerControl,
config: parking_lot::RwLock<CoreRegistryConfig>,
/// system-wide maximum number of concurrent rebuilds allowed.
Expand Down Expand Up @@ -118,6 +120,7 @@ impl Registry {
reconcile_period: std::time::Duration,
reconcile_idle_period: std::time::Duration,
faulted_child_wait_period: Option<std::time::Duration>,
disable_partial_rebuild: bool,
max_rebuilds: Option<NumRebuilds>,
create_volume_limit: usize,
host_acl: Vec<HostAccessControl>,
Expand Down Expand Up @@ -159,6 +162,7 @@ impl Registry {
reconcile_period,
reconcile_idle_period,
faulted_child_wait_period,
disable_partial_rebuild,
reconciler: ReconcilerControl::new(),
config: parking_lot::RwLock::new(
Self::get_config(&mut store, legacy_prefix_present)
Expand Down Expand Up @@ -201,11 +205,16 @@ impl Registry {
Ok(registry)
}

/// Check if the HA feature is enabled.
/// Check if the HA feature is disabled.
pub(crate) fn ha_disabled(&self) -> bool {
self.ha_disabled
}

/// Check if the partial rebuilds are disabled.
pub(crate) fn partial_rebuild_disabled(&self) -> bool {
self.disable_partial_rebuild
}

/// Formats the store endpoint with a default port if one isn't supplied.
fn format_store_endpoint(endpoint: &str) -> String {
match endpoint.contains(':') {
Expand Down
5 changes: 5 additions & 0 deletions control-plane/agents/src/bin/core/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ pub(crate) struct CliArgs {
#[clap(long)]
pub(crate) faulted_child_wait_period: Option<humantime::Duration>,

/// Disable partial rebuild for volume targets.
#[clap(long, env = "DISABLE_PARTIAL_REBUILD")]
pub(crate) disable_partial_rebuild: bool,

/// Deadline for the io-engine instance keep alive registration.
#[clap(long, short, default_value = "10s")]
pub(crate) deadline: humantime::Duration,
Expand Down Expand Up @@ -185,6 +189,7 @@ async fn server(cli_args: CliArgs) -> anyhow::Result<()> {
cli_args.reconcile_period.into(),
cli_args.reconcile_idle_period.into(),
cli_args.faulted_child_wait_period.map(|t| t.into()),
cli_args.disable_partial_rebuild,
cli_args.max_rebuilds,
cli_args.create_volume_limit,
if cli_args.hosts_acl.contains(&HostAccessControl::None) {
Expand Down
132 changes: 124 additions & 8 deletions control-plane/agents/src/bin/core/tests/rebuild/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use stor_port::types::v0::{
const REBUILD_WAIT_TIME: u64 = 12;
const CHILD_WAIT: u64 = 5;

async fn build_cluster(num_ioe: u32, pool_size: u64) -> Cluster {
async fn build_cluster(num_ioe: u32, pool_size: u64, partial_disable: bool) -> Cluster {
let reconcile_period = Duration::from_millis(300);
let child_twait = Duration::from_secs(CHILD_WAIT);
ClusterBuilder::builder()
Expand All @@ -28,6 +28,9 @@ async fn build_cluster(num_ioe: u32, pool_size: u64) -> Cluster {
.with_cache_period("250ms")
.with_tmpfs_pool(pool_size)
.with_options(|b| b.with_isolated_io_engine(true))
.with_options_en(partial_disable, |b| {
b.with_agents_env("DISABLE_PARTIAL_REBUILD", "true")
})
.with_eventing(true)
.build()
.await
Expand All @@ -44,14 +47,14 @@ async fn build_cluster(num_ioe: u32, pool_size: u64) -> Cluster {
// gets and validates rebuild history response from rest api.
#[tokio::test]
async fn rebuild_history_for_full_rebuild() {
let cluster = build_cluster(4, 52428800).await;
let cluster = build_cluster(3, 52428800, false).await;

let vol_target = cluster.node(0).to_string();
let api_client = cluster.rest_v00();
let volume_api = api_client.volumes_api();
let nexus_client = cluster.grpc_client().nexus();

let body = CreateVolumeBody::new(VolumePolicy::new(true), 3, 10485760u64, false);
let body = CreateVolumeBody::new(VolumePolicy::new(true), 2, 10485760u64, false);
let volid = VolumeId::new();
let volume = volume_api.put_volume(&volid, body).await.unwrap();
let volume = volume_api
Expand Down Expand Up @@ -92,7 +95,7 @@ async fn rebuild_history_for_full_rebuild() {
let volume_state = volume.state.clone();
assert_eq!(volume_state.status, VolumeStatus::Online);
let nexus = volume_state.target.unwrap();
assert_eq!(nexus.children.len(), 3);
assert_eq!(nexus.children.len(), 2);
nexus
.children
.iter()
Expand Down Expand Up @@ -173,16 +176,15 @@ async fn rebuild_history_for_full_rebuild() {
// validates that rebuild type is Partial rebuild,
// validates that previously faulted child is not removed from nexus,
// gets and validates rebuild history response from rest api.

#[tokio::test]
async fn rebuild_history_for_partial_rebuild() {
let cluster = build_cluster(4, 52428800).await;
let cluster = build_cluster(3, 52428800, false).await;

let vol_target = cluster.node(0).to_string();
let api_client = cluster.rest_v00();
let volume_api = api_client.volumes_api();
let nexus_client = cluster.grpc_client().nexus();
let body = CreateVolumeBody::new(VolumePolicy::new(true), 3, 10485760u64, false);
let body = CreateVolumeBody::new(VolumePolicy::new(true), 2, 10485760u64, false);
let volid = VolumeId::new();
let volume = volume_api.put_volume(&volid, body).await.unwrap();
let volume = volume_api
Expand Down Expand Up @@ -220,7 +222,7 @@ async fn rebuild_history_for_partial_rebuild() {
history.records.len(),
"Number of rebuild record should be 0"
);
assert_eq!(nexus.children.len(), 3);
assert_eq!(nexus.children.len(), 2);
nexus
.children
.iter()
Expand Down Expand Up @@ -336,6 +338,120 @@ async fn rebuild_history_for_partial_rebuild() {
assert_eq!(2, history.records.len());
}

#[tokio::test]
async fn rebuild_partial_disabled() {
let cluster = build_cluster(2, 52428800, true).await;

let vol_target = cluster.node(0).to_string();
let api_client = cluster.rest_v00();
let volume_api = api_client.volumes_api();
let nexus_client = cluster.grpc_client().nexus();
let body = CreateVolumeBody::new(VolumePolicy::new(true), 2, 10485760u64, false);
let volid = VolumeId::new();
let volume = volume_api.put_volume(&volid, body).await.unwrap();
let volume = volume_api
.put_volume_target(
&volume.spec.uuid,
PublishVolumeBody::new_all(
HashMap::new(),
None,
vol_target.clone().to_string(),
models::VolumeShareProtocol::Nvmf,
None,
cluster.csi_node(0),
),
)
.await
.unwrap();

let volume_state = volume.state.clone();
assert!(volume_state.status == VolumeStatus::Online);
let nexus = volume_state.target.unwrap();
let nexus_id = NexusId::from(nexus.uuid);
sleep(Duration::from_millis(500));
// Before triggering rebuild, we expect rebuild history record to be available for the nexus
// without any history of rebuild.
let history = nexus_client
.get_rebuild_history(&GetRebuildRecord::new(nexus_id.clone()), None)
.await
.expect("Failed to get rebuild record");
assert_eq!(
nexus_id, history.uuid,
"Can't match nexus id in rebuild history"
);
assert!(
history.records.is_empty(),
"Number of rebuild record should be 0"
);
assert_eq!(nexus.children.len(), 2);
nexus
.children
.iter()
.for_each(|c| assert_eq!(c.state, ChildState::Online));

// Check where the replicas are, apart from vol target node.
let replicas = api_client.replicas_api().get_replicas().await.unwrap();
let testrep = replicas.iter().find(|r| r.node != vol_target).unwrap();
tracing::info!(
"Restarting node {} having replica {}",
testrep.node,
testrep.uri.clone()
);

cluster
.composer()
.restart(&testrep.node)
.await
.expect("container stop failure");

let nexus_client = cluster.grpc_client().nexus();
let history = poll_rebuild_history(&nexus_client, nexus_id.clone())
.await
.expect("Failed to get rebuild record");
assert_eq!(
nexus_id, history.uuid,
"Cant match nexus id in rebuild history"
);
assert_eq!(
1,
history.records.len(),
"Number of rebuild history is not equal to 1"
);

assert!(
!history.records.get(0).unwrap().is_partial,
"Rebuild type is not Full rebuild"
);

// Get the volume again for validations.
let vol = volume_api.get_volume(volid.uuid()).await.unwrap();
let volume_state = vol.state;
let nexus = volume_state.target.unwrap();
// The child must not be in the nexus anymore.
let is_removed = !nexus.children.iter().any(|c| c.uri == testrep.uri);
assert!(is_removed);
let history = api_client
.volumes_api()
.get_rebuild_history(&vol.spec.uuid)
.await
.expect("could not find rebuild history in rest");
assert_eq!(
nexus_id.to_string(),
history.target_uuid.to_string(),
"Cant match nexus id in rebuild history in rest"
);
assert_eq!(
1,
history.records.len(),
"Number of rebuild history is not equal to 1 in rest"
);

assert!(
!history.records.get(0).unwrap().is_partial,
"Rebuild type is not Full rebuild in rest"
);
}

/// Checks if node is online, returns true if yes.
async fn wait_nexus_online(nexus_client: &impl NexusOperations, nexus: NexusId) -> Result<(), ()> {
let timeout = Duration::from_secs(REBUILD_WAIT_TIME);
Expand Down
6 changes: 3 additions & 3 deletions control-plane/agents/src/bin/core/tests/volume/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,7 @@ async fn snapshot_timeout() {
cluster.composer().pause("core").await.unwrap();
cluster.composer().thaw(&cluster.node(1)).await.unwrap();
tokio::time::sleep(req_timeout - grpc_timeout).await;
cluster.composer().restart("core").await.unwrap();
cluster.volume_service_liveness(None).await.unwrap();
cluster.restart_core_with_liveness(None).await.unwrap();

let snapshots = vol_cli
.get_snapshots(Filter::Snapshot(snapshot_id.clone()), false, None, None)
Expand All @@ -355,7 +354,7 @@ async fn snapshot_timeout() {
.create_snapshot(&CreateVolumeSnapshot::new(volume.uuid(), snapshot_id), None)
.await
.unwrap();
tracing::info!("Replica Snapshot: {replica_snapshot:?}");
tracing::info!("Volume Snapshot: {snapshot:?}");

assert_eq!(snapshot.meta().txn_id().as_str(), "2");
assert_eq!(snapshot.meta().transactions().len(), 2);
Expand Down Expand Up @@ -402,6 +401,7 @@ async fn snapshot_timeout() {
.await
.unwrap();
let snapshot = &snapshots.entries[0];
tracing::info!("Snapshot: {snapshot:?}");

assert_eq!(snapshot.meta().txn_id().as_str(), "1");
assert!(snapshot.meta().transactions().is_empty());
Expand Down
7 changes: 7 additions & 0 deletions deployer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,13 @@ impl StartOptions {
self
}
#[must_use]
pub fn with_agents_env(mut self, key: &str, val: &str) -> Self {
let mut env = self.agents_env.unwrap_or_default();
env.push(KeyValue::new(key.to_string(), val.to_string()));
self.agents_env = Some(env);
self
}
#[must_use]
pub fn with_isolated_io_engine(mut self, isolate: bool) -> Self {
self.io_engine_isolate = isolate;
self
Expand Down
11 changes: 11 additions & 0 deletions utils/deployer-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,17 @@ impl ClusterBuilder {
self.opts = set(self.opts);
self
}
/// Update the start options, if enabled.
#[must_use]
pub fn with_options_en<F>(mut self, enabled: bool, set: F) -> Self
where
F: Fn(StartOptions) -> StartOptions,
{
if enabled {
self.opts = set(self.opts);
}
self
}
/// Enable/Disable the default tokio tracing setup.
#[must_use]
pub fn with_default_tracing(self) -> Self {
Expand Down

0 comments on commit 85562ed

Please sign in to comment.