Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CI fixes and disable partial rebuild with cli arg #804

Merged
merged 3 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async fn nexus_reconciler(

/// Checks if nexus is Degraded and any child is Faulted. If yes, Depending on rebuild policy for
/// child it performs rebuild operation.
#[tracing::instrument(skip(nexus, context), level = "trace", fields(nexus.uuid = %nexus.uuid(), nexus.node = %nexus.as_ref().node, request.reconcile = true))]
#[tracing::instrument(skip(nexus, volume, context), level = "trace", fields(volume = ?volume.as_ref().map(|v| v.as_ref()), nexus.uuid = %nexus.uuid(), nexus.node = %nexus.as_ref().node, request.reconcile = true))]
pub(super) async fn handle_faulted_children(
nexus: &mut OperationGuardArc<NexusSpec>,
volume: &mut Option<&mut OperationGuardArc<VolumeSpec>>,
Expand Down Expand Up @@ -179,6 +179,12 @@ async fn handle_faulted_child(
tracing::warn!(%child.uri, "Unknown Child found, a full rebuild is required");
return faulted_children_remover(nexus_spec, volume, child, context).await;
};

if context.registry().partial_rebuild_disabled() {
tracing::warn!(%child.uri, child.uuid=%child_uuid, ?child.state_reason, "Partial rebuild disabled, a full rebuild is required");
return faulted_children_remover(nexus_spec, volume, child, context).await;
}

let Some(faulted_at) = child.faulted_at else {
tracing::warn!(%child.uri, child.uuid=%child_uuid, "Child faulted without a timestamp set, a full rebuild required");
return faulted_children_remover(nexus_spec, volume, child, context).await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async fn hot_spare_nexus_reconcile(
squash_results(results)
}

#[tracing::instrument(skip(context, nexus), fields(nexus.uuid = %nexus.uuid(), nexus.node = %nexus.as_ref().node, volume.uuid = tracing::field::Empty, request.reconcile = true))]
#[tracing::instrument(skip(context, volume, nexus), fields(volume = ?volume.as_ref(), nexus.uuid = %nexus.uuid(), nexus.node = %nexus.as_ref().node, volume.uuid = tracing::field::Empty, request.reconcile = true))]
async fn generic_nexus_reconciler(
volume: &mut OperationGuardArc<VolumeSpec>,
nexus: &mut OperationGuardArc<NexusSpec>,
Expand Down
11 changes: 10 additions & 1 deletion control-plane/agents/src/bin/core/controller/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ pub(crate) struct RegistryInner<S: Store> {
/// The duration for which the reconciler waits for the replica to
/// to be healthy again before attempting to online the faulted child.
faulted_child_wait_period: Option<std::time::Duration>,
/// Disable partial rebuild for volume targets.
disable_partial_rebuild: bool,
reconciler: ReconcilerControl,
config: parking_lot::RwLock<CoreRegistryConfig>,
/// system-wide maximum number of concurrent rebuilds allowed.
Expand Down Expand Up @@ -118,6 +120,7 @@ impl Registry {
reconcile_period: std::time::Duration,
reconcile_idle_period: std::time::Duration,
faulted_child_wait_period: Option<std::time::Duration>,
disable_partial_rebuild: bool,
max_rebuilds: Option<NumRebuilds>,
create_volume_limit: usize,
host_acl: Vec<HostAccessControl>,
Expand Down Expand Up @@ -159,6 +162,7 @@ impl Registry {
reconcile_period,
reconcile_idle_period,
faulted_child_wait_period,
disable_partial_rebuild,
reconciler: ReconcilerControl::new(),
config: parking_lot::RwLock::new(
Self::get_config(&mut store, legacy_prefix_present)
Expand Down Expand Up @@ -201,11 +205,16 @@ impl Registry {
Ok(registry)
}

/// Check if the HA feature is enabled.
/// Check if the HA feature is disabled.
pub(crate) fn ha_disabled(&self) -> bool {
self.ha_disabled
}

/// Check if the partial rebuilds are disabled.
pub(crate) fn partial_rebuild_disabled(&self) -> bool {
self.disable_partial_rebuild
}

/// Formats the store endpoint with a default port if one isn't supplied.
fn format_store_endpoint(endpoint: &str) -> String {
match endpoint.contains(':') {
Expand Down
5 changes: 5 additions & 0 deletions control-plane/agents/src/bin/core/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ pub(crate) struct CliArgs {
#[clap(long)]
pub(crate) faulted_child_wait_period: Option<humantime::Duration>,

/// Disable partial rebuild for volume targets.
#[clap(long, env = "DISABLE_PARTIAL_REBUILD")]
pub(crate) disable_partial_rebuild: bool,

/// Deadline for the io-engine instance keep alive registration.
#[clap(long, short, default_value = "10s")]
pub(crate) deadline: humantime::Duration,
Expand Down Expand Up @@ -185,6 +189,7 @@ async fn server(cli_args: CliArgs) -> anyhow::Result<()> {
cli_args.reconcile_period.into(),
cli_args.reconcile_idle_period.into(),
cli_args.faulted_child_wait_period.map(|t| t.into()),
cli_args.disable_partial_rebuild,
cli_args.max_rebuilds,
cli_args.create_volume_limit,
if cli_args.hosts_acl.contains(&HostAccessControl::None) {
Expand Down
132 changes: 124 additions & 8 deletions control-plane/agents/src/bin/core/tests/rebuild/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use stor_port::types::v0::{
const REBUILD_WAIT_TIME: u64 = 12;
const CHILD_WAIT: u64 = 5;

async fn build_cluster(num_ioe: u32, pool_size: u64) -> Cluster {
async fn build_cluster(num_ioe: u32, pool_size: u64, partial_disable: bool) -> Cluster {
let reconcile_period = Duration::from_millis(300);
let child_twait = Duration::from_secs(CHILD_WAIT);
ClusterBuilder::builder()
Expand All @@ -28,6 +28,9 @@ async fn build_cluster(num_ioe: u32, pool_size: u64) -> Cluster {
.with_cache_period("250ms")
.with_tmpfs_pool(pool_size)
.with_options(|b| b.with_isolated_io_engine(true))
.with_options_en(partial_disable, |b| {
b.with_agents_env("DISABLE_PARTIAL_REBUILD", "true")
})
.with_eventing(true)
.build()
.await
Expand All @@ -44,14 +47,14 @@ async fn build_cluster(num_ioe: u32, pool_size: u64) -> Cluster {
// gets and validates rebuild history response from rest api.
#[tokio::test]
async fn rebuild_history_for_full_rebuild() {
let cluster = build_cluster(4, 52428800).await;
let cluster = build_cluster(3, 52428800, false).await;

let vol_target = cluster.node(0).to_string();
let api_client = cluster.rest_v00();
let volume_api = api_client.volumes_api();
let nexus_client = cluster.grpc_client().nexus();

let body = CreateVolumeBody::new(VolumePolicy::new(true), 3, 10485760u64, false);
let body = CreateVolumeBody::new(VolumePolicy::new(true), 2, 10485760u64, false);
let volid = VolumeId::new();
let volume = volume_api.put_volume(&volid, body).await.unwrap();
let volume = volume_api
Expand Down Expand Up @@ -92,7 +95,7 @@ async fn rebuild_history_for_full_rebuild() {
let volume_state = volume.state.clone();
assert_eq!(volume_state.status, VolumeStatus::Online);
let nexus = volume_state.target.unwrap();
assert_eq!(nexus.children.len(), 3);
assert_eq!(nexus.children.len(), 2);
nexus
.children
.iter()
Expand Down Expand Up @@ -173,16 +176,15 @@ async fn rebuild_history_for_full_rebuild() {
// validates that rebuild type is Partial rebuild,
// validates that previously faulted child is not removed from nexus,
// gets and validates rebuild history response from rest api.

#[tokio::test]
async fn rebuild_history_for_partial_rebuild() {
let cluster = build_cluster(4, 52428800).await;
let cluster = build_cluster(3, 52428800, false).await;

let vol_target = cluster.node(0).to_string();
let api_client = cluster.rest_v00();
let volume_api = api_client.volumes_api();
let nexus_client = cluster.grpc_client().nexus();
let body = CreateVolumeBody::new(VolumePolicy::new(true), 3, 10485760u64, false);
let body = CreateVolumeBody::new(VolumePolicy::new(true), 2, 10485760u64, false);
let volid = VolumeId::new();
let volume = volume_api.put_volume(&volid, body).await.unwrap();
let volume = volume_api
Expand Down Expand Up @@ -220,7 +222,7 @@ async fn rebuild_history_for_partial_rebuild() {
history.records.len(),
"Number of rebuild record should be 0"
);
assert_eq!(nexus.children.len(), 3);
assert_eq!(nexus.children.len(), 2);
nexus
.children
.iter()
Expand Down Expand Up @@ -336,6 +338,120 @@ async fn rebuild_history_for_partial_rebuild() {
assert_eq!(2, history.records.len());
}

#[tokio::test]
async fn rebuild_partial_disabled() {
let cluster = build_cluster(2, 52428800, true).await;

let vol_target = cluster.node(0).to_string();
let api_client = cluster.rest_v00();
let volume_api = api_client.volumes_api();
let nexus_client = cluster.grpc_client().nexus();
let body = CreateVolumeBody::new(VolumePolicy::new(true), 2, 10485760u64, false);
let volid = VolumeId::new();
let volume = volume_api.put_volume(&volid, body).await.unwrap();
let volume = volume_api
.put_volume_target(
&volume.spec.uuid,
PublishVolumeBody::new_all(
HashMap::new(),
None,
vol_target.clone().to_string(),
models::VolumeShareProtocol::Nvmf,
None,
cluster.csi_node(0),
),
)
.await
.unwrap();

let volume_state = volume.state.clone();
assert!(volume_state.status == VolumeStatus::Online);
let nexus = volume_state.target.unwrap();
let nexus_id = NexusId::from(nexus.uuid);
sleep(Duration::from_millis(500));
// Before triggering rebuild, we expect rebuild history record to be available for the nexus
// without any history of rebuild.
let history = nexus_client
.get_rebuild_history(&GetRebuildRecord::new(nexus_id.clone()), None)
.await
.expect("Failed to get rebuild record");
assert_eq!(
nexus_id, history.uuid,
"Can't match nexus id in rebuild history"
);
assert!(
history.records.is_empty(),
"Number of rebuild record should be 0"
);
assert_eq!(nexus.children.len(), 2);
nexus
.children
.iter()
.for_each(|c| assert_eq!(c.state, ChildState::Online));

// Check where the replicas are, apart from vol target node.
let replicas = api_client.replicas_api().get_replicas().await.unwrap();
let testrep = replicas.iter().find(|r| r.node != vol_target).unwrap();
tracing::info!(
"Restarting node {} having replica {}",
testrep.node,
testrep.uri.clone()
);

cluster
.composer()
.restart(&testrep.node)
.await
.expect("container stop failure");

let nexus_client = cluster.grpc_client().nexus();
let history = poll_rebuild_history(&nexus_client, nexus_id.clone())
.await
.expect("Failed to get rebuild record");
assert_eq!(
nexus_id, history.uuid,
"Cant match nexus id in rebuild history"
);
assert_eq!(
1,
history.records.len(),
"Number of rebuild history is not equal to 1"
);

assert!(
!history.records.get(0).unwrap().is_partial,
"Rebuild type is not Full rebuild"
);

// Get the volume again for validations.
let vol = volume_api.get_volume(volid.uuid()).await.unwrap();
let volume_state = vol.state;
let nexus = volume_state.target.unwrap();
// The child must not be in the nexus anymore.
let is_removed = !nexus.children.iter().any(|c| c.uri == testrep.uri);
assert!(is_removed);
let history = api_client
.volumes_api()
.get_rebuild_history(&vol.spec.uuid)
.await
.expect("could not find rebuild history in rest");
assert_eq!(
nexus_id.to_string(),
history.target_uuid.to_string(),
"Cant match nexus id in rebuild history in rest"
);
assert_eq!(
1,
history.records.len(),
"Number of rebuild history is not equal to 1 in rest"
);

assert!(
!history.records.get(0).unwrap().is_partial,
"Rebuild type is not Full rebuild in rest"
);
}

/// Checks if node is online, returns true if yes.
async fn wait_nexus_online(nexus_client: &impl NexusOperations, nexus: NexusId) -> Result<(), ()> {
let timeout = Duration::from_secs(REBUILD_WAIT_TIME);
Expand Down
6 changes: 3 additions & 3 deletions control-plane/agents/src/bin/core/tests/volume/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,7 @@ async fn snapshot_timeout() {
cluster.composer().pause("core").await.unwrap();
cluster.composer().thaw(&cluster.node(1)).await.unwrap();
tokio::time::sleep(req_timeout - grpc_timeout).await;
cluster.composer().restart("core").await.unwrap();
cluster.volume_service_liveness(None).await.unwrap();
cluster.restart_core_with_liveness(None).await.unwrap();

let snapshots = vol_cli
.get_snapshots(Filter::Snapshot(snapshot_id.clone()), false, None, None)
Expand All @@ -355,7 +354,7 @@ async fn snapshot_timeout() {
.create_snapshot(&CreateVolumeSnapshot::new(volume.uuid(), snapshot_id), None)
.await
.unwrap();
tracing::info!("Replica Snapshot: {replica_snapshot:?}");
tracing::info!("Volume Snapshot: {snapshot:?}");

assert_eq!(snapshot.meta().txn_id().as_str(), "2");
assert_eq!(snapshot.meta().transactions().len(), 2);
Expand Down Expand Up @@ -402,6 +401,7 @@ async fn snapshot_timeout() {
.await
.unwrap();
let snapshot = &snapshots.entries[0];
tracing::info!("Snapshot: {snapshot:?}");

assert_eq!(snapshot.meta().txn_id().as_str(), "1");
assert!(snapshot.meta().transactions().is_empty());
Expand Down
7 changes: 7 additions & 0 deletions deployer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,13 @@ impl StartOptions {
self
}
#[must_use]
pub fn with_agents_env(mut self, key: &str, val: &str) -> Self {
let mut env = self.agents_env.unwrap_or_default();
env.push(KeyValue::new(key.to_string(), val.to_string()));
self.agents_env = Some(env);
self
}
#[must_use]
pub fn with_isolated_io_engine(mut self, isolate: bool) -> Self {
self.io_engine_isolate = isolate;
self
Expand Down
11 changes: 11 additions & 0 deletions utils/deployer-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,17 @@ impl ClusterBuilder {
self.opts = set(self.opts);
self
}
/// Update the start options, if enabled.
#[must_use]
pub fn with_options_en<F>(mut self, enabled: bool, set: F) -> Self
where
F: Fn(StartOptions) -> StartOptions,
{
if enabled {
self.opts = set(self.opts);
}
self
}
/// Enable/Disable the default tokio tracing setup.
#[must_use]
pub fn with_default_tracing(self) -> Self {
Expand Down
Loading