Skip to content

Commit

Permalink
Try #887:
Browse files Browse the repository at this point in the history
  • Loading branch information
mayastor-bors committed Nov 25, 2024
2 parents dd21e4e + 83ac4b4 commit 1471be1
Show file tree
Hide file tree
Showing 29 changed files with 420 additions and 37 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ __pycache__
/rpc/mayastor-api
/local-fio-0-verify.state
/report.xml
/.tmp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl TaskPoller for PoolReconciler {
results.push(Self::squash_results(vec![
pool.garbage_collect(context).await,
pool.recreate_state(context).await,
retry_create_pool_reconciler(&mut pool, context).await,
]))
}
capacity::remove_larger_replicas(context.registry()).await;
Expand Down Expand Up @@ -214,3 +215,12 @@ async fn deleting_pool_spec_reconciler(
))
.await
}

#[tracing::instrument(skip(pool, context), level = "trace", fields(pool.id = %pool.id(), request.reconcile = true))]
async fn retry_create_pool_reconciler(
pool: &mut OperationGuardArc<PoolSpec>,
context: &PollContext,
) -> PollResult {
pool.retry_creating(context.registry()).await?;
Ok(PollerState::Idle)
}
11 changes: 11 additions & 0 deletions control-plane/agents/src/bin/core/controller/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ pub(crate) struct RegistryInner<S: Store> {
/// The duration for which the reconciler waits for the replica to
/// to be healthy again before attempting to online the faulted child.
faulted_child_wait_period: Option<std::time::Duration>,
/// When the pool creation gRPC times out, the actual call in the io-engine
/// may still progress.
/// We wait up to this period before considering the operation a failure and
/// GC'ing the pool.
pool_async_creat_tmo: std::time::Duration,
/// Disable partial rebuild for volume targets.
disable_partial_rebuild: bool,
/// Disable nvmf target access control gates.
Expand Down Expand Up @@ -122,6 +127,7 @@ impl Registry {
reconcile_period: std::time::Duration,
reconcile_idle_period: std::time::Duration,
faulted_child_wait_period: Option<std::time::Duration>,
pool_async_creat_tmo: std::time::Duration,
disable_partial_rebuild: bool,
disable_target_acc: bool,
max_rebuilds: Option<NumRebuilds>,
Expand Down Expand Up @@ -165,6 +171,7 @@ impl Registry {
reconcile_period,
reconcile_idle_period,
faulted_child_wait_period,
pool_async_creat_tmo,
disable_partial_rebuild,
disable_target_acc,
reconciler: ReconcilerControl::new(),
Expand Down Expand Up @@ -298,6 +305,10 @@ impl Registry {
pub(crate) fn faulted_child_wait_period(&self) -> Option<std::time::Duration> {
self.faulted_child_wait_period
}
/// Allow for this given time before assuming failure and allowing the pool to get deleted.
pub(crate) fn pool_async_creat_tmo(&self) -> std::time::Duration {
self.pool_async_creat_tmo
}
/// The maximum number of concurrent create volume requests.
pub(crate) fn create_volume_limit(&self) -> usize {
self.create_volume_limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ enum SpecError {
}

/// What to do when creation fails.
#[derive(Debug)]
pub(crate) enum OnCreateFail {
/// Leave object as `Creating`, could allow for frontend retries.
#[allow(unused)]
LeaveAsIs,
/// When frontend retries don't make sense, set it to deleting so we can clean-up.
SetDeleting,
Expand All @@ -68,7 +68,28 @@ impl OnCreateFail {
pub(crate) fn eeinval_delete<O>(result: &Result<O, SvcError>) -> Self {
match result {
Err(error) if error.tonic_code() == tonic::Code::InvalidArgument => Self::Delete,
Err(error) if error.tonic_code() == tonic::Code::NotFound => Self::Delete,
_ => Self::SetDeleting,
}
}
/// Map errors into `Self` for pool creation requests, specifically.
pub(crate) fn on_pool_create_err<O>(result: &Result<O, SvcError>) -> Self {
let Err(ref error) = result else {
// nonsensical but that's how the api is today...
return Self::SetDeleting;
};
match error.tonic_code() {
// 1. the disk is open by another pool or bdev
// 2. the disk contains a pool with another name
tonic::Code::InvalidArgument => Self::Delete,
// 1. the pool disk is not available (ie not found or broken somehow)
tonic::Code::NotFound => Self::Delete,
// In this case, it's the pool operator's job to attempt re-creation of the pool.
// 1. pre-2.6 dataplane, contention on the pool service
// 2. pool disk is very slow or extremely large
// 3. dataplane core is shared with other processes
// TODO: use higher timeout on larger pool sizes or potentially make this
// an async operation.
tonic::Code::Cancelled => Self::LeaveAsIs,
_ => Self::SetDeleting,
}
}
Expand Down
8 changes: 8 additions & 0 deletions control-plane/agents/src/bin/core/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ pub(crate) struct CliArgs {
#[clap(long)]
pub(crate) faulted_child_wait_period: Option<humantime::Duration>,

/// When the pool creation gRPC times out, the actual call in the io-engine
/// may still progress.
/// We wait up to this period before considering the operation a failure and
/// GC'ing the pool.
#[clap(long, default_value = "15m")]
pub(crate) pool_async_creat_tmo: humantime::Duration,

/// Disable partial rebuild for volume targets.
#[clap(long, env = "DISABLE_PARTIAL_REBUILD")]
pub(crate) disable_partial_rebuild: bool,
Expand Down Expand Up @@ -194,6 +201,7 @@ async fn server(cli_args: CliArgs) -> anyhow::Result<()> {
cli_args.reconcile_period.into(),
cli_args.reconcile_idle_period.into(),
cli_args.faulted_child_wait_period.map(|t| t.into()),
cli_args.pool_async_creat_tmo.into(),
cli_args.disable_partial_rebuild,
cli_args.disable_target_acc,
cli_args.max_rebuilds,
Expand Down
73 changes: 68 additions & 5 deletions control-plane/agents/src/bin/core/pool/operations_helper.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,77 @@
use crate::controller::{
registry::Registry,
resources::{operations::ResourceLifecycle, OperationGuardArc},
use crate::{
controller::{
io_engine::PoolApi,
registry::Registry,
resources::{
operations::ResourceLifecycle,
operations_helper::{GuardedOperationsHelper, OnCreateFail, SpecOperationsHelper},
OperationGuardArc,
},
},
node::wrapper::GetterOps,
};
use agents::{errors, errors::SvcError};
use snafu::OptionExt;
use std::ops::Deref;
use stor_port::types::v0::{
store::replica::{PoolRef, ReplicaSpec},
transport::{DestroyReplica, NodeId, ReplicaOwners},
store::{
pool::PoolSpec,
replica::{PoolRef, ReplicaSpec},
},
transport::{CreatePool, DestroyReplica, NodeId, ReplicaOwners},
};

impl OperationGuardArc<PoolSpec> {
/// Retries the creation of the pool which is being done in the background by the io-engine.
/// This may happen if the pool create gRPC times out, for very large pools.
/// We could increase the timeout but as things stand today that would block all gRPC
/// access to the node.
/// TODO: Since the data-plane now allows concurrent gRPC we should also modify the
/// control-plane to allow this, which would allows to set large timeouts for some gRPCs.
pub(crate) async fn retry_creating(&mut self, registry: &Registry) -> Result<(), SvcError> {
let request = {
let spec = self.lock();
if on_create_fail(&spec, registry).is_some() {
return Ok(());
}
CreatePool::from(spec.deref())
};

let node = registry.node_wrapper(&request.node).await?;
if node.pool(&request.id).await.is_none() {
return Ok(());
}

let _ = self.start_create(registry, &request).await?;
let result = node.create_pool(&request).await;
let _state = self
.complete_create(result, registry, OnCreateFail::LeaveAsIs)
.await?;

Ok(())
}

/// Ge the `OnCreateFail` policy.
/// For more information see [`Self::retry_creating`].
pub(crate) fn on_create_fail(&self, registry: &Registry) -> OnCreateFail {
let spec = self.lock();
on_create_fail(&spec, registry).unwrap_or(OnCreateFail::LeaveAsIs)
}
}

fn on_create_fail(pool: &PoolSpec, registry: &Registry) -> Option<OnCreateFail> {
if !pool.status().creating() {
return Some(OnCreateFail::LeaveAsIs);
}
let Some(last_mod_elapsed) = pool.creat_tsc.and_then(|t| t.elapsed().ok()) else {
return Some(OnCreateFail::SetDeleting);
};
if last_mod_elapsed > registry.pool_async_creat_tmo() {
return Some(OnCreateFail::SetDeleting);
}
None
}

impl OperationGuardArc<ReplicaSpec> {
/// Destroy the replica from its volume
pub(crate) async fn destroy_volume_replica(
Expand Down
16 changes: 6 additions & 10 deletions control-plane/agents/src/bin/core/pool/pool_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ impl ResourceLifecycle for OperationGuardArc<PoolSpec> {
let _ = pool.start_create(registry, request).await?;

let result = node.create_pool(request).await;
let on_fail = OnCreateFail::eeinval_delete(&result);

let on_fail = OnCreateFail::on_pool_create_err(&result);
let state = pool.complete_create(result, registry, on_fail).await?;
let spec = pool.lock().clone();
Ok(Pool::new(spec, Some(CtrlPoolState::new(state))))
Expand All @@ -93,14 +92,11 @@ impl ResourceLifecycle for OperationGuardArc<PoolSpec> {
Err(SvcError::PoolNotFound { .. }) => {
match node.import_pool(&self.as_ref().into()).await {
Ok(_) => node.destroy_pool(request).await,
Err(error)
if allow_not_found
&& error.tonic_code() == tonic::Code::InvalidArgument =>
{
Ok(())
}
Err(error) if error.tonic_code() == tonic::Code::InvalidArgument => Ok(()),
Err(error) => Err(error),
Err(error) => match error.tonic_code() {
tonic::Code::NotFound if allow_not_found => Ok(()),
tonic::Code::InvalidArgument => Ok(()),
_other => Err(error),
},
}
}
Err(error) => Err(error),
Expand Down
4 changes: 3 additions & 1 deletion control-plane/agents/src/bin/core/pool/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,9 @@ impl ResourceSpecsLocked {
let pools = self.pools_rsc();
for pool in pools {
if let Ok(mut guard) = pool.operation_guard() {
if !guard.handle_incomplete_ops(registry).await {
let on_fail = guard.on_create_fail(registry);

if !guard.handle_incomplete_ops_ext(registry, on_fail).await {
// Not all pending operations could be handled.
pending_ops = true;
}
Expand Down
1 change: 1 addition & 0 deletions control-plane/agents/src/bin/core/tests/deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ fn test_deserialization_v1_to_v2() {
},
sequencer: Default::default(),
operation: None,
creat_tsc: None,
}),
},
TestEntry {
Expand Down
86 changes: 85 additions & 1 deletion control-plane/agents/src/bin/core/tests/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use stor_port::{
VolumePolicy,
},
},
store::replica::{ReplicaSpec, ReplicaSpecKey},
store::{
pool::PoolLabel,
replica::{ReplicaSpec, ReplicaSpecKey},
},
transport::{
CreatePool, CreateReplica, DestroyPool, DestroyReplica, Filter, GetSpecs, NexusId,
NodeId, Protocol, Replica, ReplicaId, ReplicaName, ReplicaOwners, ReplicaShareProtocol,
Expand Down Expand Up @@ -1027,3 +1030,84 @@ async fn destroy_after_restart() {

assert_eq!(pool.state().unwrap().id, create.id);
}

#[tokio::test]
async fn slow_create() {
const POOL_SIZE_BYTES: u64 = 128 * 1024 * 1024;

let vg = deployer_cluster::lvm::VolGroup::new("slow-pool", POOL_SIZE_BYTES).unwrap();
let lvol = vg.create_lvol("lvol0", POOL_SIZE_BYTES / 2).unwrap();
lvol.suspend().unwrap();
{
let cluster = ClusterBuilder::builder()
.with_io_engines(1)
.with_reconcile_period(Duration::from_millis(250), Duration::from_millis(250))
.with_cache_period("200ms")
.with_options(|o| o.with_io_engine_devices(vec![lvol.path()]))
.with_req_timeouts(Duration::from_millis(500), Duration::from_millis(500))
.compose_build(|b| b.with_clean(true))
.await
.unwrap();

let client = cluster.grpc_client();

let create = CreatePool {
node: cluster.node(0),
id: "bob".into(),
disks: vec![lvol.path().into()],
labels: Some(PoolLabel::from([("a".into(), "b".into())])),
};

let error = client
.pool()
.create(&create, None)
.await
.expect_err("device suspended");
assert_eq!(error.kind, ReplyErrorKind::Cancelled);

lvol.resume().unwrap();

let start = std::time::Instant::now();
let timeout = Duration::from_secs(30);
loop {
if std::time::Instant::now() > (start + timeout) {
panic!("Timeout waiting for the pool");
}
tokio::time::sleep(Duration::from_millis(100)).await;

let pools = client
.pool()
.get(Filter::Pool(create.id.clone()), None)
.await
.unwrap();

let Some(pool) = pools.0.first() else {
continue;
};
let Some(pool_spec) = pool.spec() else {
continue;
};
if !pool_spec.status.created() {
continue;
}
break;
}
let destroy = DestroyPool::from(create.clone());
client.pool().destroy(&destroy, None).await.unwrap();

// Now we try to recreate using an API call, rather than using the reconciler
lvol.suspend().unwrap();

let error = client
.pool()
.create(&create, None)
.await
.expect_err("device suspended");
assert_eq!(error.kind, ReplyErrorKind::Cancelled);

lvol.resume().unwrap();

let pool = client.pool().create(&create, None).await.unwrap();
assert!(pool.spec().unwrap().status.created());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ struct DeviceDisconnect(nvmeadm::NvmeTarget);
impl Drop for DeviceDisconnect {
fn drop(&mut self) {
if self.0.disconnect().is_err() {
std::process::Command::new("sudo")
std::process::Command::new(env!("SUDO"))
.args(["nvme", "disconnect-all"])
.status()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion control-plane/agents/src/bin/core/tests/volume/capacity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ struct DeviceDisconnect(nvmeadm::NvmeTarget);
impl Drop for DeviceDisconnect {
fn drop(&mut self) {
if self.0.disconnect().is_err() {
std::process::Command::new("sudo")
std::process::Command::new(env!("SUDO"))
.args(["nvme", "disconnect-all"])
.status()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion control-plane/agents/src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,7 @@ fn grpc_to_reply_error(error: SvcError) -> ReplyError {
} => {
let kind = match source.code() {
Code::Ok => ReplyErrorKind::Internal,
Code::Cancelled => ReplyErrorKind::Internal,
Code::Cancelled => ReplyErrorKind::Cancelled,
Code::Unknown => ReplyErrorKind::Internal,
Code::InvalidArgument => ReplyErrorKind::InvalidArgument,
Code::DeadlineExceeded => ReplyErrorKind::DeadlineExceeded,
Expand Down
1 change: 1 addition & 0 deletions control-plane/grpc/proto/v1/misc/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ enum ReplyErrorKind {
InUse = 29;
CapacityLimitExceeded = 30;
NotAcceptable = 31;
Cancelled = 32;
}

// ResourceKind for the resource which has undergone this error
Expand Down
Loading

0 comments on commit 1471be1

Please sign in to comment.