diff --git a/.gitignore b/.gitignore index 84f75f546..5bf5967f4 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,4 @@ __pycache__ /.rust-toolchain /rpc/mayastor-api /local-fio-0-verify.state +/.tmp diff --git a/control-plane/agents/src/bin/core/controller/reconciler/pool/mod.rs b/control-plane/agents/src/bin/core/controller/reconciler/pool/mod.rs index 73a6bbfe2..b2572e3d4 100644 --- a/control-plane/agents/src/bin/core/controller/reconciler/pool/mod.rs +++ b/control-plane/agents/src/bin/core/controller/reconciler/pool/mod.rs @@ -53,6 +53,7 @@ impl TaskPoller for PoolReconciler { results.push(Self::squash_results(vec![ pool.garbage_collect(context).await, pool.recreate_state(context).await, + retry_create_pool_reconciler(&mut pool, context).await, ])) } capacity::remove_larger_replicas(context.registry()).await; @@ -214,3 +215,12 @@ async fn deleting_pool_spec_reconciler( )) .await } + +#[tracing::instrument(skip(pool, context), level = "trace", fields(pool.id = %pool.id(), request.reconcile = true))] +async fn retry_create_pool_reconciler( + pool: &mut OperationGuardArc, + context: &PollContext, +) -> PollResult { + pool.retry_creating(context.registry()).await?; + Ok(PollerState::Idle) +} diff --git a/control-plane/agents/src/bin/core/controller/registry.rs b/control-plane/agents/src/bin/core/controller/registry.rs index 83dea07ea..41e3db591 100644 --- a/control-plane/agents/src/bin/core/controller/registry.rs +++ b/control-plane/agents/src/bin/core/controller/registry.rs @@ -86,6 +86,11 @@ pub(crate) struct RegistryInner { /// The duration for which the reconciler waits for the replica to /// to be healthy again before attempting to online the faulted child. faulted_child_wait_period: Option, + /// When the pool creation gRPC times out, the actual call in the io-engine + /// may still progress. + /// We wait up to this period before considering the operation a failure and + /// GC'ing the pool. + pool_async_creat_tmo: std::time::Duration, /// Disable partial rebuild for volume targets. disable_partial_rebuild: bool, /// Disable nvmf target access control gates. @@ -122,6 +127,7 @@ impl Registry { reconcile_period: std::time::Duration, reconcile_idle_period: std::time::Duration, faulted_child_wait_period: Option, + pool_async_creat_tmo: std::time::Duration, disable_partial_rebuild: bool, disable_target_acc: bool, max_rebuilds: Option, @@ -165,6 +171,7 @@ impl Registry { reconcile_period, reconcile_idle_period, faulted_child_wait_period, + pool_async_creat_tmo, disable_partial_rebuild, disable_target_acc, reconciler: ReconcilerControl::new(), @@ -298,6 +305,10 @@ impl Registry { pub(crate) fn faulted_child_wait_period(&self) -> Option { self.faulted_child_wait_period } + /// Allow for this given time before assuming failure and allowing the pool to get deleted. + pub(crate) fn pool_async_creat_tmo(&self) -> std::time::Duration { + self.pool_async_creat_tmo + } /// The maximum number of concurrent create volume requests. pub(crate) fn create_volume_limit(&self) -> usize { self.create_volume_limit diff --git a/control-plane/agents/src/bin/core/controller/resources/operations_helper.rs b/control-plane/agents/src/bin/core/controller/resources/operations_helper.rs index 2fdd7a0d6..0a366d840 100644 --- a/control-plane/agents/src/bin/core/controller/resources/operations_helper.rs +++ b/control-plane/agents/src/bin/core/controller/resources/operations_helper.rs @@ -54,9 +54,9 @@ enum SpecError { } /// What to do when creation fails. +#[derive(Debug)] pub(crate) enum OnCreateFail { /// Leave object as `Creating`, could allow for frontend retries. - #[allow(unused)] LeaveAsIs, /// When frontend retries don't make sense, set it to deleting so we can clean-up. SetDeleting, @@ -71,7 +71,28 @@ impl OnCreateFail { pub(crate) fn eeinval_delete(result: &Result) -> Self { match result { Err(error) if error.tonic_code() == tonic::Code::InvalidArgument => Self::Delete, - Err(error) if error.tonic_code() == tonic::Code::NotFound => Self::Delete, + _ => Self::SetDeleting, + } + } + /// Map errors into `Self` for pool creation requests, specifically. + pub(crate) fn on_pool_create_err(result: &Result) -> Self { + let Err(ref error) = result else { + // nonsensical but that's how the api is today... + return Self::SetDeleting; + }; + match error.tonic_code() { + // 1. the disk is open by another pool or bdev + // 2. the disk contains a pool with another name + tonic::Code::InvalidArgument => Self::Delete, + // 1. the pool disk is not available (ie not found or broken somehow) + tonic::Code::NotFound => Self::Delete, + // In this case, it's the pool operator's job to attempt re-creation of the pool. + // 1. pre-2.6 dataplane, contention on the pool service + // 2. pool disk is very slow or extremely large + // 3. dataplane core is shared with other processes + // TODO: use higher timeout on larger pool sizes or potentially make this + // an async operation. + tonic::Code::Cancelled => Self::LeaveAsIs, _ => Self::SetDeleting, } } diff --git a/control-plane/agents/src/bin/core/main.rs b/control-plane/agents/src/bin/core/main.rs index 2da31a078..3d7dac5b2 100644 --- a/control-plane/agents/src/bin/core/main.rs +++ b/control-plane/agents/src/bin/core/main.rs @@ -47,6 +47,13 @@ pub(crate) struct CliArgs { #[clap(long)] pub(crate) faulted_child_wait_period: Option, + /// When the pool creation gRPC times out, the actual call in the io-engine + /// may still progress. + /// We wait up to this period before considering the operation a failure and + /// GC'ing the pool. + #[clap(long, default_value = "15m")] + pub(crate) pool_async_creat_tmo: humantime::Duration, + /// Disable partial rebuild for volume targets. #[clap(long, env = "DISABLE_PARTIAL_REBUILD")] pub(crate) disable_partial_rebuild: bool, @@ -194,6 +201,7 @@ async fn server(cli_args: CliArgs) -> anyhow::Result<()> { cli_args.reconcile_period.into(), cli_args.reconcile_idle_period.into(), cli_args.faulted_child_wait_period.map(|t| t.into()), + cli_args.pool_async_creat_tmo.into(), cli_args.disable_partial_rebuild, cli_args.disable_target_acc, cli_args.max_rebuilds, diff --git a/control-plane/agents/src/bin/core/pool/operations_helper.rs b/control-plane/agents/src/bin/core/pool/operations_helper.rs index ce02ba3ae..0ed7152bf 100644 --- a/control-plane/agents/src/bin/core/pool/operations_helper.rs +++ b/control-plane/agents/src/bin/core/pool/operations_helper.rs @@ -1,14 +1,77 @@ -use crate::controller::{ - registry::Registry, - resources::{operations::ResourceLifecycle, OperationGuardArc}, +use crate::{ + controller::{ + io_engine::PoolApi, + registry::Registry, + resources::{ + operations::ResourceLifecycle, + operations_helper::{GuardedOperationsHelper, OnCreateFail, SpecOperationsHelper}, + OperationGuardArc, + }, + }, + node::wrapper::GetterOps, }; use agents::{errors, errors::SvcError}; use snafu::OptionExt; +use std::ops::Deref; use stor_port::types::v0::{ - store::replica::{PoolRef, ReplicaSpec}, - transport::{DestroyReplica, NodeId, ReplicaOwners}, + store::{ + pool::PoolSpec, + replica::{PoolRef, ReplicaSpec}, + }, + transport::{CreatePool, DestroyReplica, NodeId, ReplicaOwners}, }; +impl OperationGuardArc { + /// Retries the creation of the pool which is being done in the background by the io-engine. + /// This may happen if the pool create gRPC times out, for very large pools. + /// We could increase the timeout but as things stand today that would block all gRPC + /// access to the node. + /// TODO: Since the data-plane now allows concurrent gRPC we should also modify the + /// control-plane to allow this, which would allows to set large timeouts for some gRPCs. + pub(crate) async fn retry_creating(&mut self, registry: &Registry) -> Result<(), SvcError> { + let request = { + let spec = self.lock(); + if on_create_fail(&spec, registry).is_some() { + return Ok(()); + } + CreatePool::from(spec.deref()) + }; + + let node = registry.node_wrapper(&request.node).await?; + if node.pool(&request.id).await.is_none() { + return Ok(()); + } + + let _ = self.start_create(registry, &request).await?; + let result = node.create_pool(&request).await; + let _state = self + .complete_create(result, registry, OnCreateFail::LeaveAsIs) + .await?; + + Ok(()) + } + + /// Ge the `OnCreateFail` policy. + /// For more information see [`Self::retry_creating`]. + pub(crate) fn on_create_fail(&self, registry: &Registry) -> OnCreateFail { + let spec = self.lock(); + on_create_fail(&spec, registry).unwrap_or(OnCreateFail::LeaveAsIs) + } +} + +fn on_create_fail(pool: &PoolSpec, registry: &Registry) -> Option { + if !pool.status().creating() { + return Some(OnCreateFail::LeaveAsIs); + } + let Some(last_mod_elapsed) = pool.creat_tsc.and_then(|t| t.elapsed().ok()) else { + return Some(OnCreateFail::SetDeleting); + }; + if last_mod_elapsed > registry.pool_async_creat_tmo() { + return Some(OnCreateFail::SetDeleting); + } + None +} + impl OperationGuardArc { /// Destroy the replica from its volume pub(crate) async fn destroy_volume_replica( diff --git a/control-plane/agents/src/bin/core/pool/pool_operations.rs b/control-plane/agents/src/bin/core/pool/pool_operations.rs index da3ac6730..34d27abb0 100644 --- a/control-plane/agents/src/bin/core/pool/pool_operations.rs +++ b/control-plane/agents/src/bin/core/pool/pool_operations.rs @@ -65,8 +65,7 @@ impl ResourceLifecycle for OperationGuardArc { let _ = pool.start_create(registry, request).await?; let result = node.create_pool(request).await; - let on_fail = OnCreateFail::eeinval_delete(&result); - + let on_fail = OnCreateFail::on_pool_create_err(&result); let state = pool.complete_create(result, registry, on_fail).await?; let spec = pool.lock().clone(); Ok(Pool::new(spec, Some(CtrlPoolState::new(state)))) @@ -93,14 +92,11 @@ impl ResourceLifecycle for OperationGuardArc { Err(SvcError::PoolNotFound { .. }) => { match node.import_pool(&self.as_ref().into()).await { Ok(_) => node.destroy_pool(request).await, - Err(error) - if allow_not_found - && error.tonic_code() == tonic::Code::InvalidArgument => - { - Ok(()) - } - Err(error) if error.tonic_code() == tonic::Code::InvalidArgument => Ok(()), - Err(error) => Err(error), + Err(error) => match error.tonic_code() { + tonic::Code::NotFound if allow_not_found => Ok(()), + tonic::Code::InvalidArgument => Ok(()), + _other => Err(error), + }, } } Err(error) => Err(error), diff --git a/control-plane/agents/src/bin/core/pool/specs.rs b/control-plane/agents/src/bin/core/pool/specs.rs index 2ce080600..bcd070671 100644 --- a/control-plane/agents/src/bin/core/pool/specs.rs +++ b/control-plane/agents/src/bin/core/pool/specs.rs @@ -425,7 +425,9 @@ impl ResourceSpecsLocked { let pools = self.pools_rsc(); for pool in pools { if let Ok(mut guard) = pool.operation_guard() { - if !guard.handle_incomplete_ops(registry).await { + let on_fail = guard.on_create_fail(registry); + + if !guard.handle_incomplete_ops_ext(registry, on_fail).await { // Not all pending operations could be handled. pending_ops = true; } diff --git a/control-plane/agents/src/bin/core/tests/controller/mod.rs b/control-plane/agents/src/bin/core/tests/controller/mod.rs index 6ba1623f7..8cbf22989 100644 --- a/control-plane/agents/src/bin/core/tests/controller/mod.rs +++ b/control-plane/agents/src/bin/core/tests/controller/mod.rs @@ -9,7 +9,7 @@ use stor_port::{ }; use serde_json::Value; -use std::str::FromStr; +use std::{str::FromStr, time::Duration}; use uuid::Uuid; /// Test that the content of the registry is correctly loaded from the persistent store on start up. @@ -227,6 +227,7 @@ async fn etcd_pagination() { .with_rest(false) .with_jaeger(false) .with_store_lease_ttl(lease_ttl) + .with_req_timeouts(Duration::from_millis(200), Duration::from_millis(200)) .build() .await .unwrap(); diff --git a/control-plane/agents/src/bin/core/tests/deserializer.rs b/control-plane/agents/src/bin/core/tests/deserializer.rs index bcf4e8871..cd5f4375c 100644 --- a/control-plane/agents/src/bin/core/tests/deserializer.rs +++ b/control-plane/agents/src/bin/core/tests/deserializer.rs @@ -151,6 +151,7 @@ fn test_deserialization_v1_to_v2() { }, sequencer: Default::default(), operation: None, + creat_tsc: None, }), }, TestEntry { diff --git a/control-plane/agents/src/bin/core/tests/pool/mod.rs b/control-plane/agents/src/bin/core/tests/pool/mod.rs index 76b05d476..6addabcd5 100644 --- a/control-plane/agents/src/bin/core/tests/pool/mod.rs +++ b/control-plane/agents/src/bin/core/tests/pool/mod.rs @@ -21,7 +21,10 @@ use stor_port::{ VolumePolicy, }, }, - store::replica::{ReplicaSpec, ReplicaSpecKey}, + store::{ + pool::PoolLabel, + replica::{ReplicaSpec, ReplicaSpecKey}, + }, transport::{ CreatePool, CreateReplica, DestroyPool, DestroyReplica, Filter, GetSpecs, NexusId, NodeId, Protocol, Replica, ReplicaId, ReplicaName, ReplicaOwners, ReplicaShareProtocol, @@ -1035,3 +1038,84 @@ async fn destroy_after_restart() { assert_eq!(pool.state().unwrap().id, create.id); } + +#[tokio::test] +async fn slow_create() { + const POOL_SIZE_BYTES: u64 = 128 * 1024 * 1024; + + let vg = deployer_cluster::lvm::VolGroup::new("slow-pool", POOL_SIZE_BYTES).unwrap(); + let lvol = vg.create_lvol("lvol0", POOL_SIZE_BYTES / 2).unwrap(); + lvol.suspend().unwrap(); + { + let cluster = ClusterBuilder::builder() + .with_io_engines(1) + .with_reconcile_period(Duration::from_millis(250), Duration::from_millis(250)) + .with_cache_period("200ms") + .with_options(|o| o.with_io_engine_devices(vec![lvol.path()])) + .with_req_timeouts(Duration::from_millis(500), Duration::from_millis(500)) + .compose_build(|b| b.with_clean(true)) + .await + .unwrap(); + + let client = cluster.grpc_client(); + + let create = CreatePool { + node: cluster.node(0), + id: "bob".into(), + disks: vec![lvol.path().into()], + labels: Some(PoolLabel::from([("a".into(), "b".into())])), + }; + + let error = client + .pool() + .create(&create, None) + .await + .expect_err("device suspended"); + assert_eq!(error.kind, ReplyErrorKind::Cancelled); + + lvol.resume().unwrap(); + + let start = std::time::Instant::now(); + let timeout = Duration::from_secs(30); + loop { + if std::time::Instant::now() > (start + timeout) { + panic!("Timeout waiting for the pool"); + } + tokio::time::sleep(Duration::from_millis(100)).await; + + let pools = client + .pool() + .get(Filter::Pool(create.id.clone()), None) + .await + .unwrap(); + + let Some(pool) = pools.0.first() else { + continue; + }; + let Some(pool_spec) = pool.spec() else { + continue; + }; + if !pool_spec.status.created() { + continue; + } + break; + } + let destroy = DestroyPool::from(create.clone()); + client.pool().destroy(&destroy, None).await.unwrap(); + + // Now we try to recreate using an API call, rather than using the reconciler + lvol.suspend().unwrap(); + + let error = client + .pool() + .create(&create, None) + .await + .expect_err("device suspended"); + assert_eq!(error.kind, ReplyErrorKind::Cancelled); + + lvol.resume().unwrap(); + + let pool = client.pool().create(&create, None).await.unwrap(); + assert!(pool.spec().unwrap().status.created()); + } +} diff --git a/control-plane/agents/src/bin/core/tests/snapshot/fs_cons_snapshot.rs b/control-plane/agents/src/bin/core/tests/snapshot/fs_cons_snapshot.rs index f3a56552c..c48e957bb 100644 --- a/control-plane/agents/src/bin/core/tests/snapshot/fs_cons_snapshot.rs +++ b/control-plane/agents/src/bin/core/tests/snapshot/fs_cons_snapshot.rs @@ -8,7 +8,7 @@ struct DeviceDisconnect(nvmeadm::NvmeTarget); impl Drop for DeviceDisconnect { fn drop(&mut self) { if self.0.disconnect().is_err() { - std::process::Command::new("sudo") + std::process::Command::new(env!("SUDO")) .args(["nvme", "disconnect-all"]) .status() .unwrap(); diff --git a/control-plane/agents/src/bin/core/tests/volume/capacity.rs b/control-plane/agents/src/bin/core/tests/volume/capacity.rs index fe0dead3c..375bb22a1 100644 --- a/control-plane/agents/src/bin/core/tests/volume/capacity.rs +++ b/control-plane/agents/src/bin/core/tests/volume/capacity.rs @@ -89,7 +89,7 @@ struct DeviceDisconnect(nvmeadm::NvmeTarget); impl Drop for DeviceDisconnect { fn drop(&mut self) { if self.0.disconnect().is_err() { - std::process::Command::new("sudo") + std::process::Command::new(env!("SUDO")) .args(["nvme", "disconnect-all"]) .status() .unwrap(); diff --git a/control-plane/agents/src/bin/core/volume/service.rs b/control-plane/agents/src/bin/core/volume/service.rs index 188c05833..30dfa800b 100644 --- a/control-plane/agents/src/bin/core/volume/service.rs +++ b/control-plane/agents/src/bin/core/volume/service.rs @@ -47,7 +47,7 @@ use stor_port::{ pub(super) struct Service { registry: Registry, create_volume_limiter: std::sync::Arc, - capacity_limit_borrow: std::sync::Arc>, + capacity_limit_borrow: std::sync::Arc>, } #[tonic::async_trait] @@ -257,7 +257,7 @@ impl Service { create_volume_limiter: std::sync::Arc::new(tokio::sync::Semaphore::new( registry.create_volume_limit(), )), - capacity_limit_borrow: std::sync::Arc::new(parking_lot::RwLock::new(0)), + capacity_limit_borrow: std::sync::Arc::new(parking_lot::Mutex::new(0)), registry, } } @@ -614,20 +614,17 @@ impl Service { .requested_size .checked_sub(volume.as_ref().size) .unwrap_or_default(); - *self.capacity_limit_borrow.write() += required; - // If there is a defined system wide capacity limit, ensure we don't breach that. - let current = *self.capacity_limit_borrow.read(); - self.specs() - .check_capacity_limit_for_resize(limit, current) - .map_err(|e| { - *self.capacity_limit_borrow.write() -= required; - e - })?; + { + let capacity_limit = self.capacity_limit_borrow.lock(); + // If there is a defined system wide capacity limit, ensure we don't breach that. + self.specs() + .check_capacity_limit_for_resize(limit, capacity_limit, required)?; + } let resize_ret = volume.resize(&self.registry, request).await; // Reset the capacity limit that we consumed and will now be accounted in the system's // current total. - *self.capacity_limit_borrow.write() -= required; + *self.capacity_limit_borrow.lock() -= required; resize_ret } } diff --git a/control-plane/agents/src/bin/core/volume/specs.rs b/control-plane/agents/src/bin/core/volume/specs.rs index 07639e145..05313effa 100644 --- a/control-plane/agents/src/bin/core/volume/specs.rs +++ b/control-plane/agents/src/bin/core/volume/specs.rs @@ -845,18 +845,20 @@ impl ResourceSpecsLocked { pub(crate) fn check_capacity_limit_for_resize( &self, cluster_capacity_limit: u64, - current_borrowed_limit: u64, + mut capacity_limit: parking_lot::MutexGuard, + required: u64, ) -> Result<(), SvcError> { let specs = self.write(); let total: u64 = specs.volumes.values().map(|v| v.lock().size).sum(); - let forthcoming_total = current_borrowed_limit + total; - tracing::trace!(current_borrowed_limit=%current_borrowed_limit, total=%total, forthcoming_total=%forthcoming_total, "Cluster capacity limit checks "); + let forthcoming_total = *capacity_limit + total + required; + tracing::trace!(current_borrowed_limit=%capacity_limit, total=%total, forthcoming_total=%forthcoming_total, "Cluster capacity limit checks"); if forthcoming_total > cluster_capacity_limit { return Err(SvcError::CapacityLimitExceeded { cluster_capacity_limit, excess: forthcoming_total - cluster_capacity_limit, }); } + *capacity_limit += required; Ok(()) } diff --git a/control-plane/agents/src/common/errors.rs b/control-plane/agents/src/common/errors.rs index 7d5001cc1..89d31c0b0 100644 --- a/control-plane/agents/src/common/errors.rs +++ b/control-plane/agents/src/common/errors.rs @@ -1124,7 +1124,7 @@ fn grpc_to_reply_error(error: SvcError) -> ReplyError { } => { let kind = match source.code() { Code::Ok => ReplyErrorKind::Internal, - Code::Cancelled => ReplyErrorKind::Internal, + Code::Cancelled => ReplyErrorKind::Cancelled, Code::Unknown => ReplyErrorKind::Internal, Code::InvalidArgument => ReplyErrorKind::InvalidArgument, Code::DeadlineExceeded => ReplyErrorKind::DeadlineExceeded, diff --git a/control-plane/csi-driver/src/bin/controller/controller.rs b/control-plane/csi-driver/src/bin/controller/controller.rs index 5736a6a31..b17e83e78 100644 --- a/control-plane/csi-driver/src/bin/controller/controller.rs +++ b/control-plane/csi-driver/src/bin/controller/controller.rs @@ -18,7 +18,7 @@ use utils::{dsp_created_by_key, DSP_OPERATOR}; use regex::Regex; use std::{collections::HashMap, str::FromStr}; -use tonic::{Request, Response, Status}; +use tonic::{Code, Request, Response, Status}; use tracing::{debug, error, instrument, trace, warn}; use uuid::Uuid; use volume_capability::AccessType; @@ -838,7 +838,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { })?; let create_params = CreateSnapshotParams::try_from(&request.parameters)?; - // Get the volume object. Extract the app node endpoint if the quiesce is requested. + // Get the volume object. Extract the app node endpoint if quiesce is requested. let volume = RestApiClient::get_client().get_volume(&volume_uuid).await?; let app_node_endpoint_info = match volume_app_node(&volume) { // Volume is not published, so no need to quiesce. @@ -846,7 +846,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { Some(info) => match create_params.quiesce().clone() { None | Some(QuiesceFsCandidate::Freeze) => { // If quiesce is requested, get the app node endpoint. Request would fail to - // proceed if app node endpoint is not retieved. + // proceed if app node endpoint is not retrieved. let app_node = RestApiClient::get_client().get_app_node(&info).await?; Some(app_node.spec.endpoint) } @@ -861,10 +861,18 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { // If snapshot is already created in previous attempts, fetch and return it. Ok(snapshot) => Ok(snapshot), Err(ApiClientError::ResourceNotExists(_)) => { - if let Some(app_node_endpoint) = app_node_endpoint_info.clone() { - issue_fs_freeze(app_node_endpoint, volume_uuid.to_string()).await?; + if let Some(ref app_node_endpoint) = app_node_endpoint_info { + match issue_fs_freeze(app_node_endpoint.clone(), volume_uuid.to_string()).await + { + Err(error) if error.code() == Code::NotFound => { + Err(Status::not_found(format!( + "Failed to freeze volume {}, filesystem volume is not attached", + volume_uuid + ))) + } + _else => _else, + }?; } - // Create the snapshot. RestApiClient::get_client() .create_volume_snapshot(&volume_uuid, &snap_uuid) @@ -882,7 +890,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { Err(error) => Err(error.into()), }; - // Always unfreeze the filesystem if it quiesce was requested, as the retry mechanism can + // Always unfreeze the filesystem if quiesce was requested, as the retry mechanism can // leave filesystem frozen. let snapshot = if let Some(app_node_endpoint) = app_node_endpoint_info { let unfreeze_result = @@ -1071,28 +1079,28 @@ fn context_into_topology(context: &CreateParams) -> CreateVolumeTopology { pool_inclusive_label_topology.extend( context .publish_params() - .pool_affinity_topology_label() + .pool_has_topology_key() .clone() .unwrap_or_default(), ); pool_inclusive_label_topology.extend( context .publish_params() - .pool_has_topology_key() + .pool_affinity_topology_label() .clone() .unwrap_or_default(), ); node_inclusive_label_topology.extend( context .publish_params() - .node_affinity_topology_label() + .node_has_topology_key() .clone() .unwrap_or_default(), ); node_inclusive_label_topology.extend( context .publish_params() - .node_has_topology_key() + .node_affinity_topology_label() .clone() .unwrap_or_default(), ); diff --git a/control-plane/grpc/proto/v1/misc/common.proto b/control-plane/grpc/proto/v1/misc/common.proto index 403f3805e..c4569eaa1 100644 --- a/control-plane/grpc/proto/v1/misc/common.proto +++ b/control-plane/grpc/proto/v1/misc/common.proto @@ -69,6 +69,7 @@ enum ReplyErrorKind { InUse = 29; CapacityLimitExceeded = 30; NotAcceptable = 31; + Cancelled = 32; } // ResourceKind for the resource which has undergone this error diff --git a/control-plane/grpc/src/misc/traits.rs b/control-plane/grpc/src/misc/traits.rs index 5ff69c1b8..8f6cc1480 100644 --- a/control-plane/grpc/src/misc/traits.rs +++ b/control-plane/grpc/src/misc/traits.rs @@ -102,6 +102,7 @@ impl From for common::ReplyErrorKind { ReplyErrorKind::InUse => Self::InUse, ReplyErrorKind::CapacityLimitExceeded => Self::CapacityLimitExceeded, ReplyErrorKind::NotAcceptable => Self::NotAcceptable, + ReplyErrorKind::Cancelled => Self::Cancelled, } } } @@ -141,6 +142,7 @@ impl From for ReplyErrorKind { common::ReplyErrorKind::InUse => Self::InUse, common::ReplyErrorKind::CapacityLimitExceeded => Self::CapacityLimitExceeded, common::ReplyErrorKind::NotAcceptable => Self::NotAcceptable, + common::ReplyErrorKind::Cancelled => Self::Cancelled, } } } diff --git a/control-plane/grpc/src/operations/pool/traits.rs b/control-plane/grpc/src/operations/pool/traits.rs index 1c0c653c3..ff2258f25 100644 --- a/control-plane/grpc/src/operations/pool/traits.rs +++ b/control-plane/grpc/src/operations/pool/traits.rs @@ -97,6 +97,7 @@ impl TryFrom for PoolSpec { }, sequencer: Default::default(), operation: None, + creat_tsc: None, }) } } diff --git a/control-plane/plugin/src/resources/pool.rs b/control-plane/plugin/src/resources/pool.rs index b438604c8..7a4668980 100644 --- a/control-plane/plugin/src/resources/pool.rs +++ b/control-plane/plugin/src/resources/pool.rs @@ -323,7 +323,9 @@ impl Label for Pool { } OutputFormat::None => { // In case the output format is not specified, show a success message. - let labels = pool.spec.unwrap().labels.unwrap_or_default(); + let mut labels = pool.spec.unwrap().labels.unwrap_or_default(); + let internal_label = external_utils::dsp_created_by_key(); + labels.remove(&internal_label); println!("Pool {id} labelled successfully. Current labels: {labels:?}"); } } diff --git a/control-plane/rest/openapi-specs/v0_api_spec.yaml b/control-plane/rest/openapi-specs/v0_api_spec.yaml index 6d42b70a2..c7074dfa5 100644 --- a/control-plane/rest/openapi-specs/v0_api_spec.yaml +++ b/control-plane/rest/openapi-specs/v0_api_spec.yaml @@ -3253,6 +3253,7 @@ components: - InUse - CapacityLimitExceeded - NotAcceptable + - Cancelled required: - details - kind diff --git a/control-plane/stor-port/src/transport_api/mod.rs b/control-plane/stor-port/src/transport_api/mod.rs index d6dab6e8b..39423c8f9 100644 --- a/control-plane/stor-port/src/transport_api/mod.rs +++ b/control-plane/stor-port/src/transport_api/mod.rs @@ -446,11 +446,14 @@ pub enum ReplyErrorKind { InUse, CapacityLimitExceeded, NotAcceptable, + Cancelled, } impl From for ReplyErrorKind { fn from(code: tonic::Code) -> Self { match code { + Code::Ok => Self::Internal, + Code::Unknown => Self::Internal, Code::InvalidArgument => Self::InvalidArgument, Code::DeadlineExceeded => Self::DeadlineExceeded, Code::NotFound => Self::NotFound, @@ -465,7 +468,7 @@ impl From for ReplyErrorKind { Code::Unavailable => Self::Unavailable, Code::DataLoss => Self::FailedPersist, Code::Unauthenticated => Self::Unauthenticated, - _ => Self::Aborted, + Code::Cancelled => Self::Cancelled, } } } diff --git a/control-plane/stor-port/src/types/mod.rs b/control-plane/stor-port/src/types/mod.rs index b299aa085..da70ce265 100644 --- a/control-plane/stor-port/src/types/mod.rs +++ b/control-plane/stor-port/src/types/mod.rs @@ -142,6 +142,10 @@ impl From for RestError { let error = RestJsonError::new(details, message, Kind::NotAcceptable); (StatusCode::NOT_ACCEPTABLE, error) } + ReplyErrorKind::Cancelled => { + let error = RestJsonError::new(details, message, Kind::Cancelled); + (StatusCode::GATEWAY_TIMEOUT, error) + } }; RestError::new(status, error) diff --git a/control-plane/stor-port/src/types/v0/store/pool.rs b/control-plane/stor-port/src/types/v0/store/pool.rs index f6a344051..b4fc9a3f9 100644 --- a/control-plane/stor-port/src/types/v0/store/pool.rs +++ b/control-plane/stor-port/src/types/v0/store/pool.rs @@ -53,6 +53,17 @@ impl From<&CreatePool> for PoolSpec { labels: request.labels.clone(), sequencer: OperationSequence::new(), operation: None, + creat_tsc: None, + } + } +} +impl From<&PoolSpec> for CreatePool { + fn from(pool: &PoolSpec) -> Self { + Self { + node: pool.node.clone(), + id: pool.id.clone(), + disks: pool.disks.clone(), + labels: pool.labels.clone(), } } } @@ -61,6 +72,7 @@ impl PartialEq for PoolSpec { let mut other = PoolSpec::from(other); other.status = self.status.clone(); other.sequencer = self.sequencer.clone(); + other.creat_tsc = self.creat_tsc; &other == self } } @@ -83,6 +95,9 @@ pub struct PoolSpec { pub sequencer: OperationSequence, /// Record of the operation in progress pub operation: Option, + /// Last modification timestamp. + #[serde(skip)] + pub creat_tsc: Option, } impl PoolSpec { @@ -206,6 +221,9 @@ impl SpecTransaction for PoolSpec { } fn start_op(&mut self, operation: PoolOperation) { + if matches!(operation, PoolOperation::Create) && self.creat_tsc.is_none() { + self.creat_tsc = Some(std::time::SystemTime::now()); + } self.operation = Some(PoolOperationState { operation, result: None, diff --git a/control-plane/stor-port/src/types/v0/transport/pool.rs b/control-plane/stor-port/src/types/v0/transport/pool.rs index be5d211c9..3567cc653 100644 --- a/control-plane/stor-port/src/types/v0/transport/pool.rs +++ b/control-plane/stor-port/src/types/v0/transport/pool.rs @@ -346,6 +346,11 @@ impl DestroyPool { Self { node, id } } } +impl From for DestroyPool { + fn from(value: CreatePool) -> Self { + Self::new(value.node, value.id) + } +} /// Label Pool Request. #[derive(Serialize, Deserialize, Default, Debug, Clone, Eq, PartialEq)] diff --git a/deployer/src/infra/io_engine.rs b/deployer/src/infra/io_engine.rs index a5e7b04d8..544bed9bf 100644 --- a/deployer/src/infra/io_engine.rs +++ b/deployer/src/infra/io_engine.rs @@ -11,6 +11,7 @@ use utils::DEFAULT_GRPC_CLIENT_ADDR; impl ComponentAction for IoEngine { fn configure(&self, options: &StartOptions, cfg: Builder) -> Result { let mut cfg = cfg; + let host_tmp = crate::host_tmp()?; for i in 0 .. options.io_engines + options.idle_io_engines { let io_engine_socket = format!("{}:10124", cfg.next_ip_for_name(&Self::name(i, options))?); @@ -53,7 +54,7 @@ impl ComponentAction for IoEngine { .with_env("NEXUS_NVMF_ANA_ENABLE", "1") .with_env("NVMF_TGT_CRDT", "0") .with_env("ENABLE_SNAPSHOT_REBUILD", "true") - .with_bind("/tmp", "/host/tmp") + .with_bind(&host_tmp, "/host/tmp") .with_bind("/var/run/dpdk", "/var/run/dpdk"); let core_list = match options.io_engine_isolate { @@ -117,7 +118,7 @@ impl ComponentAction for IoEngine { options.latest_io_api_version(), &name, socket, - 40, + 100, tokio::time::sleep, ) .await?; diff --git a/deployer/src/lib.rs b/deployer/src/lib.rs index bf7807e46..baff0bf06 100644 --- a/deployer/src/lib.rs +++ b/deployer/src/lib.rs @@ -777,3 +777,12 @@ impl std::fmt::Debug for ClusterLabel { write!(f, "{self}") } } + +/// Get the host tmp folder for this workspace. +pub fn host_tmp() -> Result { + let root_tmp = format!("{root}/.tmp", root = env!("WORKSPACE_ROOT")); + if !std::path::Path::new(&root_tmp).exists() { + std::fs::create_dir(&root_tmp)?; + } + Ok(root_tmp) +} diff --git a/scripts/rust/deployer-cleanup.sh b/scripts/rust/deployer-cleanup.sh index 4367ad6e8..983a42fc4 100755 --- a/scripts/rust/deployer-cleanup.sh +++ b/scripts/rust/deployer-cleanup.sh @@ -2,8 +2,26 @@ SCRIPT_DIR="$(dirname "$0")" export ROOT_DIR="$SCRIPT_DIR/../.." +SUDO=$(which sudo) -sudo nvme disconnect-all +cleanup_ws_tmp() { + # This contains tmp for container artifacts, example: pool disk images + tmp_dir="$(realpath "$ROOT_DIR/.tmp")" + + devices=$(losetup -l -J | jq -r --arg tmp_dir=$tmp_dir '.loopdevices[]|select(."back-file" | startswith($tmp_dir))') + for device in $(echo $devices | jq -r '.loopdevices[].name'); do + echo "Found stale loop device: $device" + + $SUDO $(which vgremove) -y --select="pv_name=$device" || : + $SUDO losetup -d "$device" + done + + $SUDO rm -rf "$tmp_dir" + + return 0 +} + +$SUDO nvme disconnect-all "$ROOT_DIR"/target/debug/deployer stop for c in $(docker ps -a --filter "label=io.composer.test.name" --format '{{.ID}}') ; do @@ -12,7 +30,7 @@ for c in $(docker ps -a --filter "label=io.composer.test.name" --format '{{.ID}} done for n in $(docker network ls --filter "label=io.composer.test.name" --format '{{.ID}}') ; do - docker network rm "$n" || ( sudo systemctl restart docker && docker network rm "$n" ) + docker network rm "$n" || ( $SUDO systemctl restart docker && docker network rm "$n" ) done -exit 0 \ No newline at end of file +cleanup_ws_tmp diff --git a/shell.nix b/shell.nix index b1dc71953..f16205a68 100644 --- a/shell.nix +++ b/shell.nix @@ -50,6 +50,7 @@ mkShell { pytest_inputs tini udev + lvm2 ] ++ pkgs.lib.optional (system == "aarch64-darwin") darwin.apple_sdk.frameworks.Security; LIBCLANG_PATH = "${llvmPackages.libclang.lib}/lib"; @@ -87,6 +88,7 @@ mkShell { [ ! -z "${io-engine}" ] && cowsay "${io-engine-moth}" [ ! -z "${io-engine}" ] && export IO_ENGINE_BIN="${io-engine-moth}" export PATH="$PATH:$(pwd)/target/debug" + export SUDO=$(which sudo || echo /run/wrappers/bin/sudo) DOCKER_CONFIG=~/.docker/config.json if [ -f "$DOCKER_CONFIG" ]; then diff --git a/terraform/cluster/mod/k8s/repo.sh b/terraform/cluster/mod/k8s/repo.sh index a561ec7d5..12044c52c 100644 --- a/terraform/cluster/mod/k8s/repo.sh +++ b/terraform/cluster/mod/k8s/repo.sh @@ -45,10 +45,13 @@ fi sudo mkdir -p /etc/apt/keyrings/ KUBE_APT_V=$(echo "${kube_version}" | awk -F. '{ sub(/-.*/, ""); print "v" $1 "." $2 }') -curl -fsSL https://pkgs.k8s.io/core:/stable:/$KUBE_APT_V/deb/Release.key | sudo gpg --dearmor -o /etc/apt/keyrings/kubernetes-apt-keyring.gpg -cat < 10 * 1024 * 1024 + 4 * 1024 * 1024 + wait_volume_replica_allocated(pytest.volume) @then("the total number of healthy replicas is restored") @@ -306,3 +297,16 @@ def wait_volume_new_replica(volume, prev_replicas): ) ) assert len(new_replicas) == 1 + + +@retry(wait_fixed=200, stop_max_attempt_number=20) +def wait_volume_replica_allocated(volume): + volume = ApiClient.volumes_api().get_volume(volume.spec.uuid) + total_allocated = 0 + for replica in volume.state.replica_topology.values(): + assert replica.usage.capacity == volume.state.usage.capacity + assert replica.usage.allocated == volume.state.usage.allocated + total_allocated += replica.usage.allocated + + assert volume.state.usage.total_allocated == total_allocated + assert volume.state.usage.allocated > 10 * 1024 * 1024 + 4 * 1024 * 1024 diff --git a/tests/bdd/features/cordon/node/test_cordon_node.py b/tests/bdd/features/cordon/node/test_cordon_node.py index caee1fc7d..98d5e481b 100644 --- a/tests/bdd/features/cordon/node/test_cordon_node.py +++ b/tests/bdd/features/cordon/node/test_cordon_node.py @@ -60,14 +60,6 @@ def init_scenario(init, disks): remove_all_cordons(NODE_NAME_2) -@pytest.fixture -def tmp_files(): - files = [] - for index in range(0, 1): - files.append(f"/tmp/disk_{index}") - yield files - - @pytest.fixture(scope="module") def disks(): yield Deployer.create_disks(2, size=100 * 1024 * 1024) diff --git a/tests/bdd/features/garbage-collection/replicas/test_feature.py b/tests/bdd/features/garbage-collection/replicas/test_feature.py index f01e57eda..2dbe94768 100644 --- a/tests/bdd/features/garbage-collection/replicas/test_feature.py +++ b/tests/bdd/features/garbage-collection/replicas/test_feature.py @@ -29,26 +29,15 @@ IO_ENGINE_1 = "io-engine-1" IO_ENGINE_2 = "io-engine-2" -POOL_DISK1 = "pdisk1.img" POOL1_UUID = "4cc6ee64-7232-497d-a26f-38284a444980" -POOL_DISK2 = "pdisk2.img" POOL2_UUID = "4cc6ee64-7232-497d-a26f-38284a444990" @pytest.fixture(scope="function") def create_pool_disk_images(): - # When starting Io-Engine instances with the deployer a bind mount is created from /tmp to - # /host/tmp, so create disk images in /tmp - for disk in [POOL_DISK1, POOL_DISK2]: - path = "/tmp/{}".format(disk) - with open(path, "w") as file: - file.truncate(20 * 1024 * 1024) - - yield - for disk in [POOL_DISK1, POOL_DISK2]: - path = "/tmp/{}".format(disk) - if os.path.exists(path): - os.remove(path) + pools = Deployer.create_disks(2, size=20 * 1024 * 1024) + yield pools + Deployer.cleanup_disks(len(pools)) # This fixture will be automatically used by all tests. @@ -63,12 +52,12 @@ def init(create_pool_disk_images): ApiClient.pools_api().put_node_pool( IO_ENGINE_1, POOL1_UUID, - CreatePoolBody(["aio:///host/tmp/{}".format(POOL_DISK1)]), + CreatePoolBody(["aio://{}".format(create_pool_disk_images[0])]), ) ApiClient.pools_api().put_node_pool( IO_ENGINE_2, POOL2_UUID, - CreatePoolBody(["aio:///host/tmp/{}".format(POOL_DISK2)]), + CreatePoolBody(["aio://{}".format(create_pool_disk_images[1])]), ) # Create and publish a volume on node 1 diff --git a/tests/bdd/features/ha/core-agent/test_target_switchover.py b/tests/bdd/features/ha/core-agent/test_target_switchover.py index ec045f76a..0fcfcc0d3 100644 --- a/tests/bdd/features/ha/core-agent/test_target_switchover.py +++ b/tests/bdd/features/ha/core-agent/test_target_switchover.py @@ -40,25 +40,9 @@ @pytest.fixture -def tmp_files(): - files = [] - for index in range(0, 2): - files.append(f"/tmp/disk_{index}") - yield files - - -@pytest.fixture -def disks(tmp_files): - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) - with open(disk, "w") as file: - file.truncate(100 * 1024 * 1024) - # /tmp is mapped into /host/tmp within the io-engine containers - yield list(map(lambda file: f"/host{file}", tmp_files)) - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) +def disks(): + yield Deployer.create_disks(2, size=100 * 1024 * 1024) + Deployer.cleanup_disks(2) @pytest.fixture(scope="module") diff --git a/tests/bdd/features/ha/test_robustness.py b/tests/bdd/features/ha/test_robustness.py index 871de8d54..c53c46f5a 100644 --- a/tests/bdd/features/ha/test_robustness.py +++ b/tests/bdd/features/ha/test_robustness.py @@ -236,25 +236,10 @@ def the_volume_target_node_has_iopath_fixed(): @pytest.fixture -def tmp_files(): - files = [] - for index in range(0, 2): - files.append(f"/tmp/disk_{index}") - yield files - - -@pytest.fixture -def disks(tmp_files): - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) - with open(disk, "w") as file: - file.truncate(POOL_SIZE) - # /tmp is mapped into /host/tmp within the io-engine containers - yield list(map(lambda file: f"/host{file}", tmp_files)) - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) +def disks(): + pools = Deployer.create_disks(2, size=POOL_SIZE) + yield pools + Deployer.cleanup_disks(len(pools)) @pytest.fixture diff --git a/tests/bdd/features/pool/create/test_disks.py b/tests/bdd/features/pool/create/test_disks.py index 2b71cc809..64760a170 100644 --- a/tests/bdd/features/pool/create/test_disks.py +++ b/tests/bdd/features/pool/create/test_disks.py @@ -116,25 +116,10 @@ def pool_attempt(context): @pytest.fixture -def tmp_files(): - files = [] - for index in range(0, POOL_DISK_COUNT + 1): - files.append(f"/tmp/disk_{index}") - yield files - - -@pytest.fixture -def disks(tmp_files): - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) - with open(disk, "w") as file: - file.truncate(100 * 1024 * 1024) - # /tmp is mapped into /host/tmp within the io-engine containers - yield list(map(lambda file: f"/host{file}", tmp_files)) - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) +def disks(): + pools = Deployer.create_disks(POOL_DISK_COUNT, size=100 * 1024 * 1024) + yield pools + Deployer.cleanup_disks(len(pools)) @pytest.fixture diff --git a/tests/bdd/features/pool/delete/test_feature.py b/tests/bdd/features/pool/delete/test_feature.py index a1974f187..f2aa87f0f 100644 --- a/tests/bdd/features/pool/delete/test_feature.py +++ b/tests/bdd/features/pool/delete/test_feature.py @@ -143,7 +143,6 @@ def the_pool_should_be_deleted(pool, attempt_delete_the_pool): """" Implementations """ -POOLS_PER_NODE = 2 VOLUME_UUID = "5cd5378e-3f05-47f1-a830-a0f5873a1449" VOLUME_SIZE = 10485761 @@ -161,32 +160,10 @@ def nodes(): @pytest.fixture -def tmp_files(nodes): - files = [] - for node in range(len(nodes)): - node_files = [] - for disk in range(0, POOLS_PER_NODE + 1): - node_files.append(f"/tmp/node-{node}-disk_{disk}") - files.append(node_files) - yield files - - -@pytest.fixture -def node_disks(tmp_files): - for node_disks in tmp_files: - for disk in node_disks: - if os.path.exists(disk): - os.remove(disk) - with open(disk, "w") as file: - file.truncate(100 * 1024 * 1024) - - # /tmp is mapped into /host/tmp within the io-engine containers - yield list(map(lambda arr: list(map(lambda file: f"/host{file}", arr)), tmp_files)) - - for node_disks in tmp_files: - for disk in node_disks: - if os.path.exists(disk): - os.remove(disk) +def node_disks(nodes): + pools = Deployer.create_disks(1, size=100 * 1024 * 1024) + yield pools + Deployer.cleanup_disks(len(pools)) @pytest.fixture @@ -202,9 +179,8 @@ def pool(node_disks, nodes): assert len(nodes) > 0 assert len(node_disks) > 0 node = nodes[0] - disks = node_disks[0] pool = ApiClient.pools_api().put_node_pool( - node.id, f"{node.id}-pool", CreatePoolBody([disks[0]]) + node.id, f"{node.id}-pool", CreatePoolBody([node_disks[0]]) ) yield pool diff --git a/tests/bdd/features/pool/reconcile/test_feature.py b/tests/bdd/features/pool/reconcile/test_feature.py index edbd22ea4..a9a02fb16 100644 --- a/tests/bdd/features/pool/reconcile/test_feature.py +++ b/tests/bdd/features/pool/reconcile/test_feature.py @@ -78,8 +78,6 @@ def the_pool_should_eventually_be_imported(pool): """" Implementations """ -POOLS_PER_NODE = 2 - @pytest.fixture(scope="module") def background(): @@ -94,32 +92,10 @@ def nodes(): @pytest.fixture -def tmp_files(nodes): - files = [] - for node in range(len(nodes)): - node_files = [] - for disk in range(0, POOLS_PER_NODE + 1): - node_files.append(f"/tmp/node-{node}-disk_{disk}") - files.append(node_files) - yield files - - -@pytest.fixture -def node_disks(tmp_files): - for node_disks in tmp_files: - for disk in node_disks: - if os.path.exists(disk): - os.remove(disk) - with open(disk, "w") as file: - file.truncate(100 * 1024 * 1024) - - # /tmp is mapped into /host/tmp within the io-engine containers - yield list(map(lambda arr: list(map(lambda file: f"/host{file}", arr)), tmp_files)) - - for node_disks in tmp_files: - for disk in node_disks: - if os.path.exists(disk): - os.remove(disk) +def node_disks(nodes): + pools = Deployer.create_disks(1, size=100 * 1024 * 1024) + yield pools + Deployer.cleanup_disks(len(pools)) @pytest.fixture @@ -135,9 +111,8 @@ def pool(node_disks, nodes): assert len(nodes) > 0 assert len(node_disks) > 0 node = nodes[0] - disks = node_disks[0] pool = ApiClient.pools_api().put_node_pool( - node.id, f"{node.id}-pool", CreatePoolBody([disks[0]]) + node.id, f"{node.id}-pool", CreatePoolBody([node_disks[0]]) ) yield pool diff --git a/tests/bdd/features/rebuild/test_log_based_rebuild.py b/tests/bdd/features/rebuild/test_log_based_rebuild.py index 8c52abaa7..5b2795e7d 100644 --- a/tests/bdd/features/rebuild/test_log_based_rebuild.py +++ b/tests/bdd/features/rebuild/test_log_based_rebuild.py @@ -86,27 +86,10 @@ def init_scenario(init, disks): @pytest.fixture -def tmp_files(): - files = [] - for itr in range(NUM_VOLUME_REPLICAS + 1): - files.append(f"/tmp/node-{itr + 1}-disk") - yield files - - -@pytest.fixture -def disks(tmp_files): - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) - with open(disk, "w") as file: - file.truncate(POOL_SIZE) - - # /tmp is mapped into /host/tmp within the io-engine containers - yield list(map(lambda file: f"/host{file}", tmp_files)) - - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) +def disks(): + pools = Deployer.create_disks(NUM_VOLUME_REPLICAS + 1, size=POOL_SIZE) + yield pools + Deployer.cleanup_disks(len(pools)) @scenario( diff --git a/tests/bdd/features/rebuild/test_rebuild.py b/tests/bdd/features/rebuild/test_rebuild.py index 0047fc625..347d72800 100644 --- a/tests/bdd/features/rebuild/test_rebuild.py +++ b/tests/bdd/features/rebuild/test_rebuild.py @@ -42,25 +42,10 @@ @pytest.fixture -def tmp_files(): - files = [] - for index in range(0, POOL_DISK_COUNT): - files.append(f"/tmp/disk_{index}") - yield files - - -@pytest.fixture -def disks(tmp_files): - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) - with open(disk, "w") as file: - file.truncate(100 * 1024 * 1024) - # /tmp is mapped into /host/tmp within the io-engine containers - yield list(map(lambda file: f"/host{file}", tmp_files)) - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) +def disks(): + pools = Deployer.create_disks(POOL_DISK_COUNT, size=100 * 1024 * 1024) + yield pools + Deployer.cleanup_disks(len(pools)) @pytest.fixture(scope="module") diff --git a/tests/bdd/features/snapshot/create/test_feature.py b/tests/bdd/features/snapshot/create/test_feature.py index 3f3a3b34e..be669bb3b 100644 --- a/tests/bdd/features/snapshot/create/test_feature.py +++ b/tests/bdd/features/snapshot/create/test_feature.py @@ -34,26 +34,10 @@ @pytest.fixture(scope="module") -def tmp_files(): - files = [] - for index in range(2): - files.append(f"/tmp/disk_{index}") - yield files - - -@pytest.fixture(scope="module") -def disks(tmp_files): - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) - with open(disk, "w") as file: - file.truncate(POOL_SIZE) - # /tmp is mapped into /host/tmp within the io-engine containers - yield list(map(lambda file: f"/host{file}", tmp_files)) - - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) +def disks(): + pools = Deployer.create_disks(2, size=POOL_SIZE) + yield pools + Deployer.cleanup_disks(len(pools)) @pytest.fixture(scope="module") diff --git a/tests/bdd/features/snapshot/garbage_collection/test_garbage_collection.py b/tests/bdd/features/snapshot/garbage_collection/test_garbage_collection.py index 2b91a0e06..f2e3a7c14 100644 --- a/tests/bdd/features/snapshot/garbage_collection/test_garbage_collection.py +++ b/tests/bdd/features/snapshot/garbage_collection/test_garbage_collection.py @@ -39,15 +39,9 @@ def test_garbage_collection_for_stuck_creating_snapshots_when_source_is_deleted( @pytest.fixture(scope="module") def create_pool_disk_images(): - # Create the file. - path = "/tmp/{}".format(POOL_DISK1) - with open(path, "w") as file: - file.truncate(100 * 1024 * 1024) - - yield - # Clear the file - if os.path.exists(path): - os.remove(path) + pools = Deployer.create_disks(1, size=100 * 1024 * 1024) + yield pools + Deployer.cleanup_disks(len(pools)) @pytest.fixture(scope="module") @@ -63,7 +57,7 @@ def setup(create_pool_disk_images): ApiClient.pools_api().put_node_pool( NODE1, POOL1_NAME, - CreatePoolBody(["aio:///host/tmp/{}".format(POOL_DISK1)]), + CreatePoolBody(["aio://{}".format(create_pool_disk_images[0])]), ) pytest.exception = None yield diff --git a/tests/bdd/features/snapshot/list/test_list.py b/tests/bdd/features/snapshot/list/test_list.py index b5071cb92..28a489dcf 100644 --- a/tests/bdd/features/snapshot/list/test_list.py +++ b/tests/bdd/features/snapshot/list/test_list.py @@ -77,15 +77,9 @@ def test_volume_snapshots_list_involving_disrupted_node(): @pytest.fixture(scope="module") def create_pool_disk_images(): - # Create the file. - path = "/tmp/{}".format(POOL_DISK1) - with open(path, "w") as file: - file.truncate(800 * 1024 * 1024) - - yield - # Clear the file - if os.path.exists(path): - os.remove(path) + pools = Deployer.create_disks(1, size=800 * 1024 * 1024) + yield pools + Deployer.cleanup_disks(len(pools)) @pytest.fixture(scope="module") @@ -98,7 +92,7 @@ def setup(create_pool_disk_images): ApiClient.pools_api().put_node_pool( NODE1, POOL1_NAME, - CreatePoolBody(["aio:///host/tmp/{}".format(POOL_DISK1)]), + CreatePoolBody(["aio://{}".format(create_pool_disk_images[0])]), ) pytest.exception = None yield diff --git a/tests/bdd/features/volume/create/test_feature.py b/tests/bdd/features/volume/create/test_feature.py index 6c92a2139..97bac2d16 100644 --- a/tests/bdd/features/volume/create/test_feature.py +++ b/tests/bdd/features/volume/create/test_feature.py @@ -58,26 +58,10 @@ def init_scenario(init, disks): @pytest.fixture -def tmp_files(): - files = [] - for index in range(0, 1): - files.append(f"/tmp/disk_{index}") - yield files - - -@pytest.fixture -def disks(tmp_files): - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) - with open(disk, "w") as file: - file.truncate(100 * 1024 * 1024) - # /tmp is mapped into /host/tmp within the io-engine containers - yield list(map(lambda file: f"/host{file}", tmp_files)) - - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) +def disks(): + pools = Deployer.create_disks(1, size=100 * 1024 * 1024) + yield pools + Deployer.cleanup_disks(len(pools)) # Fixture used to pass the volume create request between test steps. diff --git a/tests/bdd/features/volume/nexus/test_feature.py b/tests/bdd/features/volume/nexus/test_feature.py index e4a51ada6..f8a4f3ed8 100644 --- a/tests/bdd/features/volume/nexus/test_feature.py +++ b/tests/bdd/features/volume/nexus/test_feature.py @@ -97,26 +97,14 @@ def the_nexus_target_shall_be_removed_from_its_associated_node(): IO_ENGINE_1 = "io-engine-1" IO_ENGINE_2 = "io-engine-2" -POOL_DISK1 = "cdisk1.img" -POOL1_UUID = "4cc6ee64-7232-497d-a26f-38284a444980" -POOL_DISK2 = "cdisk2.img" -POOL2_UUID = "4cc6ee64-7232-497d-a26f-38284a444990" +POOL_UUID = "4cc6ee64-7232-497d-a26f-38284a444990" @pytest.fixture(scope="function") def create_pool_disk_images(): - # When starting Io-Engine instances with the deployer a bind mount is created from /tmp to - # /host/tmp, so create disk images in /tmp - for disk in [POOL_DISK1, POOL_DISK2]: - path = f"/tmp/{disk}" - with open(path, "w") as file: - file.truncate(20 * 1024 * 1024) - - yield - for disk in [POOL_DISK1, POOL_DISK2]: - path = f"/tmp/{disk}" - if os.path.exists(path): - os.remove(path) + pools = Deployer.create_disks(1, size=20 * 1024 * 1024) + yield pools + Deployer.cleanup_disks(len(pools)) @pytest.fixture(scope="function") @@ -127,8 +115,8 @@ def init(create_pool_disk_images): # Create pools ApiClient.pools_api().put_node_pool( IO_ENGINE_2, - POOL2_UUID, - CreatePoolBody([f"aio:///host/tmp/{POOL_DISK2}"], labels={"node": IO_ENGINE_2}), + POOL_UUID, + CreatePoolBody([create_pool_disk_images[0]], labels={"node": IO_ENGINE_2}), ) yield diff --git a/tests/bdd/features/volume/replicas/test_feature.py b/tests/bdd/features/volume/replicas/test_feature.py index 882713edd..16e6749eb 100644 --- a/tests/bdd/features/volume/replicas/test_feature.py +++ b/tests/bdd/features/volume/replicas/test_feature.py @@ -50,25 +50,10 @@ def background(): @pytest.fixture -def tmp_files(): - files = [] - for index in range(0, NUM_IO_ENGINES): - files.append(f"/tmp/disk_{index}") - yield files - - -@pytest.fixture -def disks(tmp_files): - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) - with open(disk, "w") as file: - file.truncate(100 * 1024 * 1024) - # /tmp is mapped into /host/tmp within the io-engine containers - yield list(map(lambda file: f"/host{file}", tmp_files)) - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) +def disks(): + pools = Deployer.create_disks(NUM_IO_ENGINES, size=100 * 1024 * 1024) + yield pools + Deployer.cleanup_disks(len(pools)) @pytest.fixture diff --git a/tests/bdd/features/volume/resize/test_resize_offline.py b/tests/bdd/features/volume/resize/test_resize_offline.py index 3a45063d1..de84a7cd6 100644 --- a/tests/bdd/features/volume/resize/test_resize_offline.py +++ b/tests/bdd/features/volume/resize/test_resize_offline.py @@ -185,27 +185,10 @@ def a_deployer_cluster(): # Fixtures - BEGIN @pytest.fixture(scope="module") -def tmp_files(): - files = [] - for itr in range(DEFAULT_REPLICA_CNT + 1): - files.append(f"/tmp/node-{itr + 1}-disk") - yield files - - -@pytest.fixture(scope="module") -def disks(tmp_files): - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) - with open(disk, "w") as file: - file.truncate(DEFAULT_POOL_SIZE) - - # /tmp is mapped into /host/tmp within the io-engine containers - yield list(map(lambda file: f"/host{file}", tmp_files)) - - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) +def disks(): + pools = Deployer.create_disks(DEFAULT_REPLICA_CNT + 1, size=DEFAULT_POOL_SIZE) + yield pools + Deployer.cleanup_disks(len(pools)) # Fixtures - END diff --git a/tests/bdd/features/volume/resize/test_resize_online.py b/tests/bdd/features/volume/resize/test_resize_online.py index bfb051791..70cd237dd 100644 --- a/tests/bdd/features/volume/resize/test_resize_online.py +++ b/tests/bdd/features/volume/resize/test_resize_online.py @@ -62,27 +62,10 @@ # fixtures - BEGIN @pytest.fixture(scope="module") -def tmp_files(): - files = [] - for itr in range(DEFAULT_REPLICA_CNT + 1): - files.append(f"/tmp/node-{itr + 1}-disk") - yield files - - -@pytest.fixture(scope="module") -def disks(tmp_files): - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) - with open(disk, "w") as file: - file.truncate(DEFAULT_POOL_SIZE) - - # /tmp is mapped into /host/tmp within the io-engine containers - yield list(map(lambda file: f"/host{file}", tmp_files)) - - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) +def disks(): + pools = Deployer.create_disks(DEFAULT_REPLICA_CNT + 1, size=DEFAULT_POOL_SIZE) + yield pools + Deployer.cleanup_disks(len(pools)) @pytest.fixture(autouse=True, scope="module") diff --git a/utils/deployer-cluster/src/lib.rs b/utils/deployer-cluster/src/lib.rs index 1a275a7f6..74dcb0c38 100644 --- a/utils/deployer-cluster/src/lib.rs +++ b/utils/deployer-cluster/src/lib.rs @@ -1,3 +1,4 @@ +pub mod lvm; pub mod rest_client; use composer::{Builder, ComposeTest}; @@ -202,7 +203,7 @@ impl Cluster { Some(opts) => opts, None => TimeoutOptions::new() .with_req_timeout(Duration::from_millis(500)) - .with_max_retries(10), + .with_max_retries(20), }; for x in 1 .. timeout_opts.max_retries().unwrap_or_default() { match client @@ -619,6 +620,10 @@ impl TmpDiskFile { pub fn uri(&self) -> &str { self.inner.uri() } + /// Disk path on the host. + pub fn path(&self) -> &str { + &self.inner.path + } /// Get the inner disk if there are no other references to it. pub fn into_inner(self) -> Result> { @@ -633,20 +638,19 @@ impl TmpDiskFileInner { disk } fn make_new(name: &str) -> Self { - let path = Self::make_path(name); + let (path, container) = Self::make_path(name); + let pool_id = PoolId::new(); Self { - // the io-engine is setup with a bind mount from /tmp to /host/tmp - uri: format!( - "aio:///host{}?blk_size=512&uuid={}", - path, - transport::PoolId::new() - ), + // the io-engine is setup with a bind mount from /workspace/tmp to /host/tmp + uri: format!("aio://{container}?blk_size=512&uuid={pool_id}"), path, cleanup: true, } } - fn make_path(name: &str) -> String { - format!("/tmp/io-engine-disk-{name}") + fn make_path(name: &str) -> (String, String) { + let file = format!("io-engine-disk-{name}"); + let host_tmp = deployer_lib::host_tmp().expect("workspace error"); + (format!("{host_tmp}/{file}"), format!("/host/tmp/{file}")) } fn uri(&self) -> &str { &self.uri diff --git a/utils/deployer-cluster/src/lvm.rs b/utils/deployer-cluster/src/lvm.rs new file mode 100644 index 000000000..a944da7c4 --- /dev/null +++ b/utils/deployer-cluster/src/lvm.rs @@ -0,0 +1,100 @@ +//! LVM helper methods which are useful for setting up test block devices. + +use crate::TmpDiskFile; + +/// An LVM Logical Volume. +pub struct Lvol { + name: String, + path: String, +} +impl Lvol { + /// Get the host path for the lvol. + pub fn path(&self) -> &str { + &self.path + } + /// Suspends the device for IO. + pub fn suspend(&self) -> anyhow::Result<()> { + let _ = VolGroup::command(&["dmsetup", "suspend", self.path.as_str()])?; + Ok(()) + } + /// Resumes the device for IO. + pub fn resume(&self) -> anyhow::Result<()> { + let _ = VolGroup::command(&["dmsetup", "resume", self.path.as_str()])?; + Ok(()) + } +} +impl Drop for Lvol { + fn drop(&mut self) { + println!("Dropping Lvol {}", self.name); + self.resume().ok(); + } +} + +/// An LVM Volume Group. +pub struct VolGroup { + backing_file: TmpDiskFile, + dev_loop: String, + name: String, +} + +impl VolGroup { + /// Creates a new LVM Volume Group. + pub fn new(name: &str, size: u64) -> Result { + let backing_file = TmpDiskFile::new(name, size); + + let dev_loop = Self::command(&["losetup", "--show", "-f", backing_file.path()])?; + let dev_loop = dev_loop.trim_end().to_string(); + let _ = Self::command(&["pvcreate", dev_loop.as_str()])?; + let _ = Self::command(&["vgcreate", name, dev_loop.as_str()])?; + + Ok(Self { + backing_file, + dev_loop, + name: name.to_string(), + }) + } + /// Creates a new Lvol for the LVM Volume Group. + pub fn create_lvol(&self, name: &str, size: u64) -> Result { + let size = format!("{size}B"); + + let vg_name = self.name.as_str(); + let _ = Self::command(&["lvcreate", "-L", size.as_str(), "-n", name, vg_name])?; + + Ok(Lvol { + name: name.to_string(), + path: format!("/dev/{vg_name}/{name}"), + }) + } + /// Run a command with sudo, and the given args. + /// The output string is returned. + fn command(args: &[&str]) -> Result { + let cmd = args.first().unwrap(); + let output = std::process::Command::new(env!("SUDO")) + .arg("-E") + .args(args) + .output()?; + if !output.status.success() { + return Err(anyhow::anyhow!( + "{cmd} Exit Code: {}\nstdout: {}, stderr: {}", + output.status, + String::from_utf8(output.stdout).unwrap_or_default(), + String::from_utf8(output.stderr).unwrap_or_default() + )); + } + let output = String::from_utf8(output.stdout)?; + Ok(output) + } +} + +impl Drop for VolGroup { + fn drop(&mut self) { + println!( + "Dropping VolGroup {} <== {}", + self.name, + self.backing_file.path() + ); + + let _ = Self::command(&["vgremove", "-y", self.name.as_str()]); + let _ = Self::command(&["losetup", "-d", self.dev_loop.as_str()]); + } +}