Skip to content

Commit

Permalink
fix: allow pool creation to complete asynchronously
Browse files Browse the repository at this point in the history
When the initial create gRPC times out, the data-plane may still be creating
the pool in the background, which can happen for very large pools.
Rather than assume failure, we allow this to complete in the background up to
a large arbitrary amount of time. If the pool creation completes before, then
we retry the creation flow.
The reason why we don't simply use very large timeouts is because the gRPC
operations are currently sequential, mostly due to historical reasons.
Now that the data-plane is allowing concurrent calls, we should also allow
this on the control-plane.
TODO: allow concurrent node operations

Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>
  • Loading branch information
tiagolobocastro committed Nov 21, 2024
1 parent 8c2b226 commit 2c9e066
Show file tree
Hide file tree
Showing 16 changed files with 157 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl TaskPoller for PoolReconciler {
results.push(Self::squash_results(vec![
pool.garbage_collect(context).await,
pool.recreate_state(context).await,
retry_create_pool_reconciler(&mut pool, context).await,
]))
}
capacity::remove_larger_replicas(context.registry()).await;
Expand Down Expand Up @@ -214,3 +215,12 @@ async fn deleting_pool_spec_reconciler(
))
.await
}

#[tracing::instrument(skip(pool, context), level = "trace", fields(pool.id = %pool.id(), request.reconcile = true))]
async fn retry_create_pool_reconciler(
pool: &mut OperationGuardArc<PoolSpec>,
context: &PollContext,
) -> PollResult {
pool.retry_creating(context.registry()).await?;
Ok(PollerState::Idle)
}
11 changes: 11 additions & 0 deletions control-plane/agents/src/bin/core/controller/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ pub(crate) struct RegistryInner<S: Store> {
/// The duration for which the reconciler waits for the replica to
/// to be healthy again before attempting to online the faulted child.
faulted_child_wait_period: Option<std::time::Duration>,
/// When the pool creation gRPC times out, the actual call in the io-engine
/// may still progress.
/// We wait up to this period before considering the operation a failure and
/// GC'ing the pool.
pool_async_creat_tmo: std::time::Duration,
/// Disable partial rebuild for volume targets.
disable_partial_rebuild: bool,
/// Disable nvmf target access control gates.
Expand Down Expand Up @@ -122,6 +127,7 @@ impl Registry {
reconcile_period: std::time::Duration,
reconcile_idle_period: std::time::Duration,
faulted_child_wait_period: Option<std::time::Duration>,
pool_async_creat_tmo: std::time::Duration,
disable_partial_rebuild: bool,
disable_target_acc: bool,
max_rebuilds: Option<NumRebuilds>,
Expand Down Expand Up @@ -165,6 +171,7 @@ impl Registry {
reconcile_period,
reconcile_idle_period,
faulted_child_wait_period,
pool_async_creat_tmo,
disable_partial_rebuild,
disable_target_acc,
reconciler: ReconcilerControl::new(),
Expand Down Expand Up @@ -298,6 +305,10 @@ impl Registry {
pub(crate) fn faulted_child_wait_period(&self) -> Option<std::time::Duration> {
self.faulted_child_wait_period
}
/// Allow for this given time before assuming failure and allowing the pool to get deleted.
pub(crate) fn pool_async_creat_tmo(&self) -> std::time::Duration {
self.pool_async_creat_tmo
}
/// The maximum number of concurrent create volume requests.
pub(crate) fn create_volume_limit(&self) -> usize {
self.create_volume_limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ enum SpecError {
}

/// What to do when creation fails.
#[derive(Debug)]
pub(crate) enum OnCreateFail {
/// Leave object as `Creating`, could allow for frontend retries.
#[allow(unused)]
LeaveAsIs,
/// When frontend retries don't make sense, set it to deleting so we can clean-up.
SetDeleting,
Expand All @@ -68,7 +68,28 @@ impl OnCreateFail {
pub(crate) fn eeinval_delete<O>(result: &Result<O, SvcError>) -> Self {
match result {
Err(error) if error.tonic_code() == tonic::Code::InvalidArgument => Self::Delete,
Err(error) if error.tonic_code() == tonic::Code::NotFound => Self::Delete,
_ => Self::SetDeleting,
}
}
/// Map errors into `Self` for pool creation requests, specifically.
pub(crate) fn on_pool_create_err<O>(result: &Result<O, SvcError>) -> Self {
let Err(ref error) = result else {
// nonsensical but that's how the api is today...
return Self::SetDeleting;
};
match error.tonic_code() {
// 1. the disk is open by another pool or bdev
// 2. the disk contains a pool with another name
tonic::Code::InvalidArgument => Self::Delete,
// 1. the pool disk is not available (ie not found or broken somehow)
tonic::Code::NotFound => Self::Delete,
// In this case, it's the pool operator's job to attempt re-creation of the pool.
// 1. pre-2.6 dataplane, contention on the pool service
// 2. pool disk is very slow or extremely large
// 3. dataplane core is shared with other processes
// TODO: use higher timeout on larger pool sizes or potentially make this
// an async operation.
tonic::Code::Cancelled => Self::LeaveAsIs,
_ => Self::SetDeleting,
}
}
Expand Down
8 changes: 8 additions & 0 deletions control-plane/agents/src/bin/core/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ pub(crate) struct CliArgs {
#[clap(long)]
pub(crate) faulted_child_wait_period: Option<humantime::Duration>,

/// When the pool creation gRPC times out, the actual call in the io-engine
/// may still progress.
/// We wait up to this period before considering the operation a failure and
/// GC'ing the pool.
#[clap(long, default_value = "10m")]
pub(crate) pool_async_creat_tmo: humantime::Duration,

/// Disable partial rebuild for volume targets.
#[clap(long, env = "DISABLE_PARTIAL_REBUILD")]
pub(crate) disable_partial_rebuild: bool,
Expand Down Expand Up @@ -194,6 +201,7 @@ async fn server(cli_args: CliArgs) -> anyhow::Result<()> {
cli_args.reconcile_period.into(),
cli_args.reconcile_idle_period.into(),
cli_args.faulted_child_wait_period.map(|t| t.into()),
cli_args.pool_async_creat_tmo.into(),
cli_args.disable_partial_rebuild,
cli_args.disable_target_acc,
cli_args.max_rebuilds,
Expand Down
73 changes: 68 additions & 5 deletions control-plane/agents/src/bin/core/pool/operations_helper.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,77 @@
use crate::controller::{
registry::Registry,
resources::{operations::ResourceLifecycle, OperationGuardArc},
use crate::{
controller::{
io_engine::PoolApi,
registry::Registry,
resources::{
operations::ResourceLifecycle,
operations_helper::{GuardedOperationsHelper, OnCreateFail, SpecOperationsHelper},
OperationGuardArc,
},
},
node::wrapper::GetterOps,
};
use agents::{errors, errors::SvcError};
use snafu::OptionExt;
use std::ops::Deref;
use stor_port::types::v0::{
store::replica::{PoolRef, ReplicaSpec},
transport::{DestroyReplica, NodeId, ReplicaOwners},
store::{
pool::PoolSpec,
replica::{PoolRef, ReplicaSpec},
},
transport::{CreatePool, DestroyReplica, NodeId, ReplicaOwners},
};

impl OperationGuardArc<PoolSpec> {
/// Retries the creation of the pool which is being done in the background by the io-engine.
/// This may happen if the pool create gRPC times out, for very large pools.
/// We could increase the timeout but as things stand today that would block all gRPC
/// access to the node.
/// TODO: Since the data-plane now allows concurrent gRPC we should also modify the
/// control-plane to allow this, which would allows to set large timeouts for some gRPCs.
pub(crate) async fn retry_creating(&mut self, registry: &Registry) -> Result<(), SvcError> {
let request = {
let spec = self.lock();
if on_create_fail(&spec, registry).is_some() {
return Ok(());
}
CreatePool::from(spec.deref())
};

let node = registry.node_wrapper(&request.node).await?;
if node.pool(&request.id).await.is_none() {
return Ok(());
}

let _ = self.start_create(registry, &request).await?;
let result = node.create_pool(&request).await;
let _state = self
.complete_create(result, registry, OnCreateFail::LeaveAsIs)
.await?;

Ok(())
}

/// Ge the `OnCreateFail` policy.
/// For more information see [`Self::retry_creating`].
pub(crate) fn on_create_fail(&self, registry: &Registry) -> OnCreateFail {
let spec = self.lock();
on_create_fail(&spec, registry).unwrap_or(OnCreateFail::LeaveAsIs)
}
}

fn on_create_fail(pool: &PoolSpec, registry: &Registry) -> Option<OnCreateFail> {
if !pool.status().creating() {
return Some(OnCreateFail::LeaveAsIs);
}
let Some(last_mod_elapsed) = pool.creat_tsc.and_then(|t| t.elapsed().ok()) else {
return Some(OnCreateFail::SetDeleting);
};
if last_mod_elapsed > registry.pool_async_creat_tmo() {
return Some(OnCreateFail::SetDeleting);
}
None
}

impl OperationGuardArc<ReplicaSpec> {
/// Destroy the replica from its volume
pub(crate) async fn destroy_volume_replica(
Expand Down
3 changes: 1 addition & 2 deletions control-plane/agents/src/bin/core/pool/pool_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ impl ResourceLifecycle for OperationGuardArc<PoolSpec> {
let _ = pool.start_create(registry, request).await?;

let result = node.create_pool(request).await;
let on_fail = OnCreateFail::eeinval_delete(&result);

let on_fail = OnCreateFail::on_pool_create_err(&result);
let state = pool.complete_create(result, registry, on_fail).await?;
let spec = pool.lock().clone();
Ok(Pool::new(spec, Some(CtrlPoolState::new(state))))
Expand Down
4 changes: 3 additions & 1 deletion control-plane/agents/src/bin/core/pool/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,9 @@ impl ResourceSpecsLocked {
let pools = self.pools_rsc();
for pool in pools {
if let Ok(mut guard) = pool.operation_guard() {
if !guard.handle_incomplete_ops(registry).await {
let on_fail = guard.on_create_fail(registry);

if !guard.handle_incomplete_ops_ext(registry, on_fail).await {
// Not all pending operations could be handled.
pending_ops = true;
}
Expand Down
1 change: 1 addition & 0 deletions control-plane/agents/src/bin/core/tests/deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ fn test_deserialization_v1_to_v2() {
},
sequencer: Default::default(),
operation: None,
creat_tsc: None,
}),
},
TestEntry {
Expand Down
2 changes: 1 addition & 1 deletion control-plane/agents/src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,7 @@ fn grpc_to_reply_error(error: SvcError) -> ReplyError {
} => {
let kind = match source.code() {
Code::Ok => ReplyErrorKind::Internal,
Code::Cancelled => ReplyErrorKind::Internal,
Code::Cancelled => ReplyErrorKind::Cancelled,
Code::Unknown => ReplyErrorKind::Internal,
Code::InvalidArgument => ReplyErrorKind::InvalidArgument,
Code::DeadlineExceeded => ReplyErrorKind::DeadlineExceeded,
Expand Down
1 change: 1 addition & 0 deletions control-plane/grpc/proto/v1/misc/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ enum ReplyErrorKind {
InUse = 29;
CapacityLimitExceeded = 30;
NotAcceptable = 31;
Cancelled = 32;
}

// ResourceKind for the resource which has undergone this error
Expand Down
2 changes: 2 additions & 0 deletions control-plane/grpc/src/misc/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl From<ReplyErrorKind> for common::ReplyErrorKind {
ReplyErrorKind::InUse => Self::InUse,
ReplyErrorKind::CapacityLimitExceeded => Self::CapacityLimitExceeded,
ReplyErrorKind::NotAcceptable => Self::NotAcceptable,
ReplyErrorKind::Cancelled => Self::Cancelled,
}
}
}
Expand Down Expand Up @@ -141,6 +142,7 @@ impl From<common::ReplyErrorKind> for ReplyErrorKind {
common::ReplyErrorKind::InUse => Self::InUse,
common::ReplyErrorKind::CapacityLimitExceeded => Self::CapacityLimitExceeded,
common::ReplyErrorKind::NotAcceptable => Self::NotAcceptable,
common::ReplyErrorKind::Cancelled => Self::Cancelled,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions control-plane/grpc/src/operations/pool/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl TryFrom<pool::PoolDefinition> for PoolSpec {
},
sequencer: Default::default(),
operation: None,
creat_tsc: None,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions control-plane/rest/openapi-specs/v0_api_spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3268,6 +3268,7 @@ components:
- InUse
- CapacityLimitExceeded
- NotAcceptable
- Cancelled
required:
- details
- kind
Expand Down
5 changes: 4 additions & 1 deletion control-plane/stor-port/src/transport_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,11 +444,14 @@ pub enum ReplyErrorKind {
InUse,
CapacityLimitExceeded,
NotAcceptable,
Cancelled,
}

impl From<tonic::Code> for ReplyErrorKind {
fn from(code: tonic::Code) -> Self {
match code {
Code::Ok => Self::Internal,
Code::Unknown => Self::Internal,
Code::InvalidArgument => Self::InvalidArgument,
Code::DeadlineExceeded => Self::DeadlineExceeded,
Code::NotFound => Self::NotFound,
Expand All @@ -463,7 +466,7 @@ impl From<tonic::Code> for ReplyErrorKind {
Code::Unavailable => Self::Unavailable,
Code::DataLoss => Self::FailedPersist,
Code::Unauthenticated => Self::Unauthenticated,
_ => Self::Aborted,
Code::Cancelled => Self::Cancelled,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions control-plane/stor-port/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ impl From<ReplyError> for RestError<RestJsonError> {
let error = RestJsonError::new(details, message, Kind::NotAcceptable);
(StatusCode::NOT_ACCEPTABLE, error)
}
ReplyErrorKind::Cancelled => {
let error = RestJsonError::new(details, message, Kind::Cancelled);
(StatusCode::GATEWAY_TIMEOUT, error)
}
};

RestError::new(status, error)
Expand Down
18 changes: 18 additions & 0 deletions control-plane/stor-port/src/types/v0/store/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ impl From<&CreatePool> for PoolSpec {
labels: request.labels.clone(),
sequencer: OperationSequence::new(),
operation: None,
creat_tsc: None,
}
}
}
impl From<&PoolSpec> for CreatePool {
fn from(pool: &PoolSpec) -> Self {
Self {
node: pool.node.clone(),
id: pool.id.clone(),
disks: pool.disks.clone(),
labels: pool.labels.clone(),
}
}
}
Expand All @@ -61,6 +72,7 @@ impl PartialEq<CreatePool> for PoolSpec {
let mut other = PoolSpec::from(other);
other.status = self.status.clone();
other.sequencer = self.sequencer.clone();
other.creat_tsc = self.creat_tsc;
&other == self
}
}
Expand All @@ -83,6 +95,9 @@ pub struct PoolSpec {
pub sequencer: OperationSequence,
/// Record of the operation in progress
pub operation: Option<PoolOperationState>,
/// Last modification timestamp.
#[serde(skip)]
pub creat_tsc: Option<std::time::SystemTime>,
}

impl PoolSpec {
Expand Down Expand Up @@ -206,6 +221,9 @@ impl SpecTransaction<PoolOperation> for PoolSpec {
}

fn start_op(&mut self, operation: PoolOperation) {
if matches!(operation, PoolOperation::Create) && self.creat_tsc.is_none() {
self.creat_tsc = Some(std::time::SystemTime::now());
}
self.operation = Some(PoolOperationState {
operation,
result: None,
Expand Down

0 comments on commit 2c9e066

Please sign in to comment.