From 867e7c04adcf53fc32c7010ce29e510c13190846 Mon Sep 17 00:00:00 2001 From: mayastor-bors Date: Tue, 26 Nov 2024 09:31:35 +0000 Subject: [PATCH] chore(bors): merge pull request #887 887: Fix regression for pool creation timeout retry r=tiagolobocastro a=tiagolobocastro test: use tmp in project workspace Use a tmp folder from the workspace allowing us to cleanup up things like LVM volumes a lot easier as we can just purge it. Signed-off-by: Tiago Castro --- test(pool): create on very large or very slow disks Uses LVM Lvols as backend devices for the pool. We suspend these before pool creation, allowing us to simulate slow pool creation. This test ensures that the pool creation is completed by itself and also that a client can also complete it by calling create again. Signed-off-by: Tiago Castro --- fix: allow pool creation to complete asynchronously When the initial create gRPC times out, the data-plane may still be creating the pool in the background, which can happen for very large pools. Rather than assume failure, we allow this to complete in the background up to a large arbitrary amount of time. If the pool creation completes before, then we retry the creation flow. The reason why we don't simply use very large timeouts is because the gRPC operations are currently sequential, mostly due to historical reasons. Now that the data-plane is allowing concurrent calls, we should also allow this on the control-plane. TODO: allow concurrent node operations Signed-off-by: Tiago Castro --- fix: check for correct not found error code A previous fix ended up not working correctly because it was merged incorrectly, somehow! Signed-off-by: Tiago Castro --- chore: update terraform node prep Pull the Release key from a recent k8s version since the old keys are no longer valid. This will have to be updated from time to time. Co-authored-by: Tiago Castro --- .gitignore | 1 + .../core/controller/reconciler/pool/mod.rs | 10 ++ .../src/bin/core/controller/registry.rs | 11 ++ .../controller/resources/operations_helper.rs | 25 ++++- control-plane/agents/src/bin/core/main.rs | 8 ++ .../src/bin/core/pool/operations_helper.rs | 73 ++++++++++++- .../src/bin/core/pool/pool_operations.rs | 16 ++- .../agents/src/bin/core/pool/specs.rs | 4 +- .../src/bin/core/tests/controller/mod.rs | 3 +- .../agents/src/bin/core/tests/deserializer.rs | 1 + .../agents/src/bin/core/tests/pool/mod.rs | 86 ++++++++++++++- .../core/tests/snapshot/fs_cons_snapshot.rs | 2 +- .../src/bin/core/tests/volume/capacity.rs | 2 +- control-plane/agents/src/common/errors.rs | 2 +- control-plane/grpc/proto/v1/misc/common.proto | 1 + control-plane/grpc/src/misc/traits.rs | 2 + .../grpc/src/operations/pool/traits.rs | 1 + .../rest/openapi-specs/v0_api_spec.yaml | 1 + .../stor-port/src/transport_api/mod.rs | 5 +- control-plane/stor-port/src/types/mod.rs | 4 + .../stor-port/src/types/v0/store/pool.rs | 18 ++++ .../stor-port/src/types/v0/transport/pool.rs | 5 + deployer/src/infra/io_engine.rs | 3 +- deployer/src/lib.rs | 9 ++ scripts/rust/deployer-cleanup.sh | 24 ++++- shell.nix | 2 + terraform/cluster/mod/k8s/repo.sh | 7 +- tests/bdd/common/deployer.py | 16 ++- .../capacity/thin/volume/test_create.py | 24 +---- .../features/cordon/node/test_cordon_node.py | 8 -- .../replicas/test_feature.py | 21 +--- .../ha/core-agent/test_target_switchover.py | 22 +--- tests/bdd/features/ha/test_robustness.py | 23 +--- tests/bdd/features/pool/create/test_disks.py | 23 +--- .../bdd/features/pool/delete/test_feature.py | 34 +----- .../features/pool/reconcile/test_feature.py | 35 +----- .../rebuild/test_log_based_rebuild.py | 25 +---- tests/bdd/features/rebuild/test_rebuild.py | 23 +--- .../features/snapshot/create/test_feature.py | 24 +---- .../test_garbage_collection.py | 14 +-- tests/bdd/features/snapshot/list/test_list.py | 14 +-- .../features/volume/create/test_feature.py | 24 +---- .../bdd/features/volume/nexus/test_feature.py | 24 ++--- .../features/volume/replicas/test_feature.py | 23 +--- .../volume/resize/test_resize_offline.py | 25 +---- .../volume/resize/test_resize_online.py | 25 +---- utils/deployer-cluster/src/lib.rs | 24 +++-- utils/deployer-cluster/src/lvm.rs | 100 ++++++++++++++++++ 48 files changed, 495 insertions(+), 382 deletions(-) create mode 100644 utils/deployer-cluster/src/lvm.rs diff --git a/.gitignore b/.gitignore index 84f75f546..5bf5967f4 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,4 @@ __pycache__ /.rust-toolchain /rpc/mayastor-api /local-fio-0-verify.state +/.tmp diff --git a/control-plane/agents/src/bin/core/controller/reconciler/pool/mod.rs b/control-plane/agents/src/bin/core/controller/reconciler/pool/mod.rs index 73a6bbfe2..b2572e3d4 100644 --- a/control-plane/agents/src/bin/core/controller/reconciler/pool/mod.rs +++ b/control-plane/agents/src/bin/core/controller/reconciler/pool/mod.rs @@ -53,6 +53,7 @@ impl TaskPoller for PoolReconciler { results.push(Self::squash_results(vec![ pool.garbage_collect(context).await, pool.recreate_state(context).await, + retry_create_pool_reconciler(&mut pool, context).await, ])) } capacity::remove_larger_replicas(context.registry()).await; @@ -214,3 +215,12 @@ async fn deleting_pool_spec_reconciler( )) .await } + +#[tracing::instrument(skip(pool, context), level = "trace", fields(pool.id = %pool.id(), request.reconcile = true))] +async fn retry_create_pool_reconciler( + pool: &mut OperationGuardArc, + context: &PollContext, +) -> PollResult { + pool.retry_creating(context.registry()).await?; + Ok(PollerState::Idle) +} diff --git a/control-plane/agents/src/bin/core/controller/registry.rs b/control-plane/agents/src/bin/core/controller/registry.rs index 83dea07ea..41e3db591 100644 --- a/control-plane/agents/src/bin/core/controller/registry.rs +++ b/control-plane/agents/src/bin/core/controller/registry.rs @@ -86,6 +86,11 @@ pub(crate) struct RegistryInner { /// The duration for which the reconciler waits for the replica to /// to be healthy again before attempting to online the faulted child. faulted_child_wait_period: Option, + /// When the pool creation gRPC times out, the actual call in the io-engine + /// may still progress. + /// We wait up to this period before considering the operation a failure and + /// GC'ing the pool. + pool_async_creat_tmo: std::time::Duration, /// Disable partial rebuild for volume targets. disable_partial_rebuild: bool, /// Disable nvmf target access control gates. @@ -122,6 +127,7 @@ impl Registry { reconcile_period: std::time::Duration, reconcile_idle_period: std::time::Duration, faulted_child_wait_period: Option, + pool_async_creat_tmo: std::time::Duration, disable_partial_rebuild: bool, disable_target_acc: bool, max_rebuilds: Option, @@ -165,6 +171,7 @@ impl Registry { reconcile_period, reconcile_idle_period, faulted_child_wait_period, + pool_async_creat_tmo, disable_partial_rebuild, disable_target_acc, reconciler: ReconcilerControl::new(), @@ -298,6 +305,10 @@ impl Registry { pub(crate) fn faulted_child_wait_period(&self) -> Option { self.faulted_child_wait_period } + /// Allow for this given time before assuming failure and allowing the pool to get deleted. + pub(crate) fn pool_async_creat_tmo(&self) -> std::time::Duration { + self.pool_async_creat_tmo + } /// The maximum number of concurrent create volume requests. pub(crate) fn create_volume_limit(&self) -> usize { self.create_volume_limit diff --git a/control-plane/agents/src/bin/core/controller/resources/operations_helper.rs b/control-plane/agents/src/bin/core/controller/resources/operations_helper.rs index 2fdd7a0d6..0a366d840 100644 --- a/control-plane/agents/src/bin/core/controller/resources/operations_helper.rs +++ b/control-plane/agents/src/bin/core/controller/resources/operations_helper.rs @@ -54,9 +54,9 @@ enum SpecError { } /// What to do when creation fails. +#[derive(Debug)] pub(crate) enum OnCreateFail { /// Leave object as `Creating`, could allow for frontend retries. - #[allow(unused)] LeaveAsIs, /// When frontend retries don't make sense, set it to deleting so we can clean-up. SetDeleting, @@ -71,7 +71,28 @@ impl OnCreateFail { pub(crate) fn eeinval_delete(result: &Result) -> Self { match result { Err(error) if error.tonic_code() == tonic::Code::InvalidArgument => Self::Delete, - Err(error) if error.tonic_code() == tonic::Code::NotFound => Self::Delete, + _ => Self::SetDeleting, + } + } + /// Map errors into `Self` for pool creation requests, specifically. + pub(crate) fn on_pool_create_err(result: &Result) -> Self { + let Err(ref error) = result else { + // nonsensical but that's how the api is today... + return Self::SetDeleting; + }; + match error.tonic_code() { + // 1. the disk is open by another pool or bdev + // 2. the disk contains a pool with another name + tonic::Code::InvalidArgument => Self::Delete, + // 1. the pool disk is not available (ie not found or broken somehow) + tonic::Code::NotFound => Self::Delete, + // In this case, it's the pool operator's job to attempt re-creation of the pool. + // 1. pre-2.6 dataplane, contention on the pool service + // 2. pool disk is very slow or extremely large + // 3. dataplane core is shared with other processes + // TODO: use higher timeout on larger pool sizes or potentially make this + // an async operation. + tonic::Code::Cancelled => Self::LeaveAsIs, _ => Self::SetDeleting, } } diff --git a/control-plane/agents/src/bin/core/main.rs b/control-plane/agents/src/bin/core/main.rs index 2da31a078..3d7dac5b2 100644 --- a/control-plane/agents/src/bin/core/main.rs +++ b/control-plane/agents/src/bin/core/main.rs @@ -47,6 +47,13 @@ pub(crate) struct CliArgs { #[clap(long)] pub(crate) faulted_child_wait_period: Option, + /// When the pool creation gRPC times out, the actual call in the io-engine + /// may still progress. + /// We wait up to this period before considering the operation a failure and + /// GC'ing the pool. + #[clap(long, default_value = "15m")] + pub(crate) pool_async_creat_tmo: humantime::Duration, + /// Disable partial rebuild for volume targets. #[clap(long, env = "DISABLE_PARTIAL_REBUILD")] pub(crate) disable_partial_rebuild: bool, @@ -194,6 +201,7 @@ async fn server(cli_args: CliArgs) -> anyhow::Result<()> { cli_args.reconcile_period.into(), cli_args.reconcile_idle_period.into(), cli_args.faulted_child_wait_period.map(|t| t.into()), + cli_args.pool_async_creat_tmo.into(), cli_args.disable_partial_rebuild, cli_args.disable_target_acc, cli_args.max_rebuilds, diff --git a/control-plane/agents/src/bin/core/pool/operations_helper.rs b/control-plane/agents/src/bin/core/pool/operations_helper.rs index ce02ba3ae..0ed7152bf 100644 --- a/control-plane/agents/src/bin/core/pool/operations_helper.rs +++ b/control-plane/agents/src/bin/core/pool/operations_helper.rs @@ -1,14 +1,77 @@ -use crate::controller::{ - registry::Registry, - resources::{operations::ResourceLifecycle, OperationGuardArc}, +use crate::{ + controller::{ + io_engine::PoolApi, + registry::Registry, + resources::{ + operations::ResourceLifecycle, + operations_helper::{GuardedOperationsHelper, OnCreateFail, SpecOperationsHelper}, + OperationGuardArc, + }, + }, + node::wrapper::GetterOps, }; use agents::{errors, errors::SvcError}; use snafu::OptionExt; +use std::ops::Deref; use stor_port::types::v0::{ - store::replica::{PoolRef, ReplicaSpec}, - transport::{DestroyReplica, NodeId, ReplicaOwners}, + store::{ + pool::PoolSpec, + replica::{PoolRef, ReplicaSpec}, + }, + transport::{CreatePool, DestroyReplica, NodeId, ReplicaOwners}, }; +impl OperationGuardArc { + /// Retries the creation of the pool which is being done in the background by the io-engine. + /// This may happen if the pool create gRPC times out, for very large pools. + /// We could increase the timeout but as things stand today that would block all gRPC + /// access to the node. + /// TODO: Since the data-plane now allows concurrent gRPC we should also modify the + /// control-plane to allow this, which would allows to set large timeouts for some gRPCs. + pub(crate) async fn retry_creating(&mut self, registry: &Registry) -> Result<(), SvcError> { + let request = { + let spec = self.lock(); + if on_create_fail(&spec, registry).is_some() { + return Ok(()); + } + CreatePool::from(spec.deref()) + }; + + let node = registry.node_wrapper(&request.node).await?; + if node.pool(&request.id).await.is_none() { + return Ok(()); + } + + let _ = self.start_create(registry, &request).await?; + let result = node.create_pool(&request).await; + let _state = self + .complete_create(result, registry, OnCreateFail::LeaveAsIs) + .await?; + + Ok(()) + } + + /// Ge the `OnCreateFail` policy. + /// For more information see [`Self::retry_creating`]. + pub(crate) fn on_create_fail(&self, registry: &Registry) -> OnCreateFail { + let spec = self.lock(); + on_create_fail(&spec, registry).unwrap_or(OnCreateFail::LeaveAsIs) + } +} + +fn on_create_fail(pool: &PoolSpec, registry: &Registry) -> Option { + if !pool.status().creating() { + return Some(OnCreateFail::LeaveAsIs); + } + let Some(last_mod_elapsed) = pool.creat_tsc.and_then(|t| t.elapsed().ok()) else { + return Some(OnCreateFail::SetDeleting); + }; + if last_mod_elapsed > registry.pool_async_creat_tmo() { + return Some(OnCreateFail::SetDeleting); + } + None +} + impl OperationGuardArc { /// Destroy the replica from its volume pub(crate) async fn destroy_volume_replica( diff --git a/control-plane/agents/src/bin/core/pool/pool_operations.rs b/control-plane/agents/src/bin/core/pool/pool_operations.rs index da3ac6730..34d27abb0 100644 --- a/control-plane/agents/src/bin/core/pool/pool_operations.rs +++ b/control-plane/agents/src/bin/core/pool/pool_operations.rs @@ -65,8 +65,7 @@ impl ResourceLifecycle for OperationGuardArc { let _ = pool.start_create(registry, request).await?; let result = node.create_pool(request).await; - let on_fail = OnCreateFail::eeinval_delete(&result); - + let on_fail = OnCreateFail::on_pool_create_err(&result); let state = pool.complete_create(result, registry, on_fail).await?; let spec = pool.lock().clone(); Ok(Pool::new(spec, Some(CtrlPoolState::new(state)))) @@ -93,14 +92,11 @@ impl ResourceLifecycle for OperationGuardArc { Err(SvcError::PoolNotFound { .. }) => { match node.import_pool(&self.as_ref().into()).await { Ok(_) => node.destroy_pool(request).await, - Err(error) - if allow_not_found - && error.tonic_code() == tonic::Code::InvalidArgument => - { - Ok(()) - } - Err(error) if error.tonic_code() == tonic::Code::InvalidArgument => Ok(()), - Err(error) => Err(error), + Err(error) => match error.tonic_code() { + tonic::Code::NotFound if allow_not_found => Ok(()), + tonic::Code::InvalidArgument => Ok(()), + _other => Err(error), + }, } } Err(error) => Err(error), diff --git a/control-plane/agents/src/bin/core/pool/specs.rs b/control-plane/agents/src/bin/core/pool/specs.rs index 2ce080600..bcd070671 100644 --- a/control-plane/agents/src/bin/core/pool/specs.rs +++ b/control-plane/agents/src/bin/core/pool/specs.rs @@ -425,7 +425,9 @@ impl ResourceSpecsLocked { let pools = self.pools_rsc(); for pool in pools { if let Ok(mut guard) = pool.operation_guard() { - if !guard.handle_incomplete_ops(registry).await { + let on_fail = guard.on_create_fail(registry); + + if !guard.handle_incomplete_ops_ext(registry, on_fail).await { // Not all pending operations could be handled. pending_ops = true; } diff --git a/control-plane/agents/src/bin/core/tests/controller/mod.rs b/control-plane/agents/src/bin/core/tests/controller/mod.rs index 6ba1623f7..8cbf22989 100644 --- a/control-plane/agents/src/bin/core/tests/controller/mod.rs +++ b/control-plane/agents/src/bin/core/tests/controller/mod.rs @@ -9,7 +9,7 @@ use stor_port::{ }; use serde_json::Value; -use std::str::FromStr; +use std::{str::FromStr, time::Duration}; use uuid::Uuid; /// Test that the content of the registry is correctly loaded from the persistent store on start up. @@ -227,6 +227,7 @@ async fn etcd_pagination() { .with_rest(false) .with_jaeger(false) .with_store_lease_ttl(lease_ttl) + .with_req_timeouts(Duration::from_millis(200), Duration::from_millis(200)) .build() .await .unwrap(); diff --git a/control-plane/agents/src/bin/core/tests/deserializer.rs b/control-plane/agents/src/bin/core/tests/deserializer.rs index bcf4e8871..cd5f4375c 100644 --- a/control-plane/agents/src/bin/core/tests/deserializer.rs +++ b/control-plane/agents/src/bin/core/tests/deserializer.rs @@ -151,6 +151,7 @@ fn test_deserialization_v1_to_v2() { }, sequencer: Default::default(), operation: None, + creat_tsc: None, }), }, TestEntry { diff --git a/control-plane/agents/src/bin/core/tests/pool/mod.rs b/control-plane/agents/src/bin/core/tests/pool/mod.rs index 76b05d476..6addabcd5 100644 --- a/control-plane/agents/src/bin/core/tests/pool/mod.rs +++ b/control-plane/agents/src/bin/core/tests/pool/mod.rs @@ -21,7 +21,10 @@ use stor_port::{ VolumePolicy, }, }, - store::replica::{ReplicaSpec, ReplicaSpecKey}, + store::{ + pool::PoolLabel, + replica::{ReplicaSpec, ReplicaSpecKey}, + }, transport::{ CreatePool, CreateReplica, DestroyPool, DestroyReplica, Filter, GetSpecs, NexusId, NodeId, Protocol, Replica, ReplicaId, ReplicaName, ReplicaOwners, ReplicaShareProtocol, @@ -1035,3 +1038,84 @@ async fn destroy_after_restart() { assert_eq!(pool.state().unwrap().id, create.id); } + +#[tokio::test] +async fn slow_create() { + const POOL_SIZE_BYTES: u64 = 128 * 1024 * 1024; + + let vg = deployer_cluster::lvm::VolGroup::new("slow-pool", POOL_SIZE_BYTES).unwrap(); + let lvol = vg.create_lvol("lvol0", POOL_SIZE_BYTES / 2).unwrap(); + lvol.suspend().unwrap(); + { + let cluster = ClusterBuilder::builder() + .with_io_engines(1) + .with_reconcile_period(Duration::from_millis(250), Duration::from_millis(250)) + .with_cache_period("200ms") + .with_options(|o| o.with_io_engine_devices(vec![lvol.path()])) + .with_req_timeouts(Duration::from_millis(500), Duration::from_millis(500)) + .compose_build(|b| b.with_clean(true)) + .await + .unwrap(); + + let client = cluster.grpc_client(); + + let create = CreatePool { + node: cluster.node(0), + id: "bob".into(), + disks: vec![lvol.path().into()], + labels: Some(PoolLabel::from([("a".into(), "b".into())])), + }; + + let error = client + .pool() + .create(&create, None) + .await + .expect_err("device suspended"); + assert_eq!(error.kind, ReplyErrorKind::Cancelled); + + lvol.resume().unwrap(); + + let start = std::time::Instant::now(); + let timeout = Duration::from_secs(30); + loop { + if std::time::Instant::now() > (start + timeout) { + panic!("Timeout waiting for the pool"); + } + tokio::time::sleep(Duration::from_millis(100)).await; + + let pools = client + .pool() + .get(Filter::Pool(create.id.clone()), None) + .await + .unwrap(); + + let Some(pool) = pools.0.first() else { + continue; + }; + let Some(pool_spec) = pool.spec() else { + continue; + }; + if !pool_spec.status.created() { + continue; + } + break; + } + let destroy = DestroyPool::from(create.clone()); + client.pool().destroy(&destroy, None).await.unwrap(); + + // Now we try to recreate using an API call, rather than using the reconciler + lvol.suspend().unwrap(); + + let error = client + .pool() + .create(&create, None) + .await + .expect_err("device suspended"); + assert_eq!(error.kind, ReplyErrorKind::Cancelled); + + lvol.resume().unwrap(); + + let pool = client.pool().create(&create, None).await.unwrap(); + assert!(pool.spec().unwrap().status.created()); + } +} diff --git a/control-plane/agents/src/bin/core/tests/snapshot/fs_cons_snapshot.rs b/control-plane/agents/src/bin/core/tests/snapshot/fs_cons_snapshot.rs index f3a56552c..c48e957bb 100644 --- a/control-plane/agents/src/bin/core/tests/snapshot/fs_cons_snapshot.rs +++ b/control-plane/agents/src/bin/core/tests/snapshot/fs_cons_snapshot.rs @@ -8,7 +8,7 @@ struct DeviceDisconnect(nvmeadm::NvmeTarget); impl Drop for DeviceDisconnect { fn drop(&mut self) { if self.0.disconnect().is_err() { - std::process::Command::new("sudo") + std::process::Command::new(env!("SUDO")) .args(["nvme", "disconnect-all"]) .status() .unwrap(); diff --git a/control-plane/agents/src/bin/core/tests/volume/capacity.rs b/control-plane/agents/src/bin/core/tests/volume/capacity.rs index fe0dead3c..375bb22a1 100644 --- a/control-plane/agents/src/bin/core/tests/volume/capacity.rs +++ b/control-plane/agents/src/bin/core/tests/volume/capacity.rs @@ -89,7 +89,7 @@ struct DeviceDisconnect(nvmeadm::NvmeTarget); impl Drop for DeviceDisconnect { fn drop(&mut self) { if self.0.disconnect().is_err() { - std::process::Command::new("sudo") + std::process::Command::new(env!("SUDO")) .args(["nvme", "disconnect-all"]) .status() .unwrap(); diff --git a/control-plane/agents/src/common/errors.rs b/control-plane/agents/src/common/errors.rs index 7d5001cc1..89d31c0b0 100644 --- a/control-plane/agents/src/common/errors.rs +++ b/control-plane/agents/src/common/errors.rs @@ -1124,7 +1124,7 @@ fn grpc_to_reply_error(error: SvcError) -> ReplyError { } => { let kind = match source.code() { Code::Ok => ReplyErrorKind::Internal, - Code::Cancelled => ReplyErrorKind::Internal, + Code::Cancelled => ReplyErrorKind::Cancelled, Code::Unknown => ReplyErrorKind::Internal, Code::InvalidArgument => ReplyErrorKind::InvalidArgument, Code::DeadlineExceeded => ReplyErrorKind::DeadlineExceeded, diff --git a/control-plane/grpc/proto/v1/misc/common.proto b/control-plane/grpc/proto/v1/misc/common.proto index 403f3805e..c4569eaa1 100644 --- a/control-plane/grpc/proto/v1/misc/common.proto +++ b/control-plane/grpc/proto/v1/misc/common.proto @@ -69,6 +69,7 @@ enum ReplyErrorKind { InUse = 29; CapacityLimitExceeded = 30; NotAcceptable = 31; + Cancelled = 32; } // ResourceKind for the resource which has undergone this error diff --git a/control-plane/grpc/src/misc/traits.rs b/control-plane/grpc/src/misc/traits.rs index 5ff69c1b8..8f6cc1480 100644 --- a/control-plane/grpc/src/misc/traits.rs +++ b/control-plane/grpc/src/misc/traits.rs @@ -102,6 +102,7 @@ impl From for common::ReplyErrorKind { ReplyErrorKind::InUse => Self::InUse, ReplyErrorKind::CapacityLimitExceeded => Self::CapacityLimitExceeded, ReplyErrorKind::NotAcceptable => Self::NotAcceptable, + ReplyErrorKind::Cancelled => Self::Cancelled, } } } @@ -141,6 +142,7 @@ impl From for ReplyErrorKind { common::ReplyErrorKind::InUse => Self::InUse, common::ReplyErrorKind::CapacityLimitExceeded => Self::CapacityLimitExceeded, common::ReplyErrorKind::NotAcceptable => Self::NotAcceptable, + common::ReplyErrorKind::Cancelled => Self::Cancelled, } } } diff --git a/control-plane/grpc/src/operations/pool/traits.rs b/control-plane/grpc/src/operations/pool/traits.rs index 1c0c653c3..ff2258f25 100644 --- a/control-plane/grpc/src/operations/pool/traits.rs +++ b/control-plane/grpc/src/operations/pool/traits.rs @@ -97,6 +97,7 @@ impl TryFrom for PoolSpec { }, sequencer: Default::default(), operation: None, + creat_tsc: None, }) } } diff --git a/control-plane/rest/openapi-specs/v0_api_spec.yaml b/control-plane/rest/openapi-specs/v0_api_spec.yaml index 6d42b70a2..c7074dfa5 100644 --- a/control-plane/rest/openapi-specs/v0_api_spec.yaml +++ b/control-plane/rest/openapi-specs/v0_api_spec.yaml @@ -3253,6 +3253,7 @@ components: - InUse - CapacityLimitExceeded - NotAcceptable + - Cancelled required: - details - kind diff --git a/control-plane/stor-port/src/transport_api/mod.rs b/control-plane/stor-port/src/transport_api/mod.rs index d6dab6e8b..39423c8f9 100644 --- a/control-plane/stor-port/src/transport_api/mod.rs +++ b/control-plane/stor-port/src/transport_api/mod.rs @@ -446,11 +446,14 @@ pub enum ReplyErrorKind { InUse, CapacityLimitExceeded, NotAcceptable, + Cancelled, } impl From for ReplyErrorKind { fn from(code: tonic::Code) -> Self { match code { + Code::Ok => Self::Internal, + Code::Unknown => Self::Internal, Code::InvalidArgument => Self::InvalidArgument, Code::DeadlineExceeded => Self::DeadlineExceeded, Code::NotFound => Self::NotFound, @@ -465,7 +468,7 @@ impl From for ReplyErrorKind { Code::Unavailable => Self::Unavailable, Code::DataLoss => Self::FailedPersist, Code::Unauthenticated => Self::Unauthenticated, - _ => Self::Aborted, + Code::Cancelled => Self::Cancelled, } } } diff --git a/control-plane/stor-port/src/types/mod.rs b/control-plane/stor-port/src/types/mod.rs index b299aa085..da70ce265 100644 --- a/control-plane/stor-port/src/types/mod.rs +++ b/control-plane/stor-port/src/types/mod.rs @@ -142,6 +142,10 @@ impl From for RestError { let error = RestJsonError::new(details, message, Kind::NotAcceptable); (StatusCode::NOT_ACCEPTABLE, error) } + ReplyErrorKind::Cancelled => { + let error = RestJsonError::new(details, message, Kind::Cancelled); + (StatusCode::GATEWAY_TIMEOUT, error) + } }; RestError::new(status, error) diff --git a/control-plane/stor-port/src/types/v0/store/pool.rs b/control-plane/stor-port/src/types/v0/store/pool.rs index f6a344051..b4fc9a3f9 100644 --- a/control-plane/stor-port/src/types/v0/store/pool.rs +++ b/control-plane/stor-port/src/types/v0/store/pool.rs @@ -53,6 +53,17 @@ impl From<&CreatePool> for PoolSpec { labels: request.labels.clone(), sequencer: OperationSequence::new(), operation: None, + creat_tsc: None, + } + } +} +impl From<&PoolSpec> for CreatePool { + fn from(pool: &PoolSpec) -> Self { + Self { + node: pool.node.clone(), + id: pool.id.clone(), + disks: pool.disks.clone(), + labels: pool.labels.clone(), } } } @@ -61,6 +72,7 @@ impl PartialEq for PoolSpec { let mut other = PoolSpec::from(other); other.status = self.status.clone(); other.sequencer = self.sequencer.clone(); + other.creat_tsc = self.creat_tsc; &other == self } } @@ -83,6 +95,9 @@ pub struct PoolSpec { pub sequencer: OperationSequence, /// Record of the operation in progress pub operation: Option, + /// Last modification timestamp. + #[serde(skip)] + pub creat_tsc: Option, } impl PoolSpec { @@ -206,6 +221,9 @@ impl SpecTransaction for PoolSpec { } fn start_op(&mut self, operation: PoolOperation) { + if matches!(operation, PoolOperation::Create) && self.creat_tsc.is_none() { + self.creat_tsc = Some(std::time::SystemTime::now()); + } self.operation = Some(PoolOperationState { operation, result: None, diff --git a/control-plane/stor-port/src/types/v0/transport/pool.rs b/control-plane/stor-port/src/types/v0/transport/pool.rs index be5d211c9..3567cc653 100644 --- a/control-plane/stor-port/src/types/v0/transport/pool.rs +++ b/control-plane/stor-port/src/types/v0/transport/pool.rs @@ -346,6 +346,11 @@ impl DestroyPool { Self { node, id } } } +impl From for DestroyPool { + fn from(value: CreatePool) -> Self { + Self::new(value.node, value.id) + } +} /// Label Pool Request. #[derive(Serialize, Deserialize, Default, Debug, Clone, Eq, PartialEq)] diff --git a/deployer/src/infra/io_engine.rs b/deployer/src/infra/io_engine.rs index 4e6d145c5..544bed9bf 100644 --- a/deployer/src/infra/io_engine.rs +++ b/deployer/src/infra/io_engine.rs @@ -11,6 +11,7 @@ use utils::DEFAULT_GRPC_CLIENT_ADDR; impl ComponentAction for IoEngine { fn configure(&self, options: &StartOptions, cfg: Builder) -> Result { let mut cfg = cfg; + let host_tmp = crate::host_tmp()?; for i in 0 .. options.io_engines + options.idle_io_engines { let io_engine_socket = format!("{}:10124", cfg.next_ip_for_name(&Self::name(i, options))?); @@ -53,7 +54,7 @@ impl ComponentAction for IoEngine { .with_env("NEXUS_NVMF_ANA_ENABLE", "1") .with_env("NVMF_TGT_CRDT", "0") .with_env("ENABLE_SNAPSHOT_REBUILD", "true") - .with_bind("/tmp", "/host/tmp") + .with_bind(&host_tmp, "/host/tmp") .with_bind("/var/run/dpdk", "/var/run/dpdk"); let core_list = match options.io_engine_isolate { diff --git a/deployer/src/lib.rs b/deployer/src/lib.rs index bf7807e46..baff0bf06 100644 --- a/deployer/src/lib.rs +++ b/deployer/src/lib.rs @@ -777,3 +777,12 @@ impl std::fmt::Debug for ClusterLabel { write!(f, "{self}") } } + +/// Get the host tmp folder for this workspace. +pub fn host_tmp() -> Result { + let root_tmp = format!("{root}/.tmp", root = env!("WORKSPACE_ROOT")); + if !std::path::Path::new(&root_tmp).exists() { + std::fs::create_dir(&root_tmp)?; + } + Ok(root_tmp) +} diff --git a/scripts/rust/deployer-cleanup.sh b/scripts/rust/deployer-cleanup.sh index 4367ad6e8..983a42fc4 100755 --- a/scripts/rust/deployer-cleanup.sh +++ b/scripts/rust/deployer-cleanup.sh @@ -2,8 +2,26 @@ SCRIPT_DIR="$(dirname "$0")" export ROOT_DIR="$SCRIPT_DIR/../.." +SUDO=$(which sudo) -sudo nvme disconnect-all +cleanup_ws_tmp() { + # This contains tmp for container artifacts, example: pool disk images + tmp_dir="$(realpath "$ROOT_DIR/.tmp")" + + devices=$(losetup -l -J | jq -r --arg tmp_dir=$tmp_dir '.loopdevices[]|select(."back-file" | startswith($tmp_dir))') + for device in $(echo $devices | jq -r '.loopdevices[].name'); do + echo "Found stale loop device: $device" + + $SUDO $(which vgremove) -y --select="pv_name=$device" || : + $SUDO losetup -d "$device" + done + + $SUDO rm -rf "$tmp_dir" + + return 0 +} + +$SUDO nvme disconnect-all "$ROOT_DIR"/target/debug/deployer stop for c in $(docker ps -a --filter "label=io.composer.test.name" --format '{{.ID}}') ; do @@ -12,7 +30,7 @@ for c in $(docker ps -a --filter "label=io.composer.test.name" --format '{{.ID}} done for n in $(docker network ls --filter "label=io.composer.test.name" --format '{{.ID}}') ; do - docker network rm "$n" || ( sudo systemctl restart docker && docker network rm "$n" ) + docker network rm "$n" || ( $SUDO systemctl restart docker && docker network rm "$n" ) done -exit 0 \ No newline at end of file +cleanup_ws_tmp diff --git a/shell.nix b/shell.nix index b1dc71953..f16205a68 100644 --- a/shell.nix +++ b/shell.nix @@ -50,6 +50,7 @@ mkShell { pytest_inputs tini udev + lvm2 ] ++ pkgs.lib.optional (system == "aarch64-darwin") darwin.apple_sdk.frameworks.Security; LIBCLANG_PATH = "${llvmPackages.libclang.lib}/lib"; @@ -87,6 +88,7 @@ mkShell { [ ! -z "${io-engine}" ] && cowsay "${io-engine-moth}" [ ! -z "${io-engine}" ] && export IO_ENGINE_BIN="${io-engine-moth}" export PATH="$PATH:$(pwd)/target/debug" + export SUDO=$(which sudo || echo /run/wrappers/bin/sudo) DOCKER_CONFIG=~/.docker/config.json if [ -f "$DOCKER_CONFIG" ]; then diff --git a/terraform/cluster/mod/k8s/repo.sh b/terraform/cluster/mod/k8s/repo.sh index a561ec7d5..12044c52c 100644 --- a/terraform/cluster/mod/k8s/repo.sh +++ b/terraform/cluster/mod/k8s/repo.sh @@ -45,10 +45,13 @@ fi sudo mkdir -p /etc/apt/keyrings/ KUBE_APT_V=$(echo "${kube_version}" | awk -F. '{ sub(/-.*/, ""); print "v" $1 "." $2 }') -curl -fsSL https://pkgs.k8s.io/core:/stable:/$KUBE_APT_V/deb/Release.key | sudo gpg --dearmor -o /etc/apt/keyrings/kubernetes-apt-keyring.gpg -cat < 0 assert len(node_disks) > 0 node = nodes[0] - disks = node_disks[0] pool = ApiClient.pools_api().put_node_pool( - node.id, f"{node.id}-pool", CreatePoolBody([disks[0]]) + node.id, f"{node.id}-pool", CreatePoolBody([node_disks[0]]) ) yield pool diff --git a/tests/bdd/features/pool/reconcile/test_feature.py b/tests/bdd/features/pool/reconcile/test_feature.py index edbd22ea4..a9a02fb16 100644 --- a/tests/bdd/features/pool/reconcile/test_feature.py +++ b/tests/bdd/features/pool/reconcile/test_feature.py @@ -78,8 +78,6 @@ def the_pool_should_eventually_be_imported(pool): """" Implementations """ -POOLS_PER_NODE = 2 - @pytest.fixture(scope="module") def background(): @@ -94,32 +92,10 @@ def nodes(): @pytest.fixture -def tmp_files(nodes): - files = [] - for node in range(len(nodes)): - node_files = [] - for disk in range(0, POOLS_PER_NODE + 1): - node_files.append(f"/tmp/node-{node}-disk_{disk}") - files.append(node_files) - yield files - - -@pytest.fixture -def node_disks(tmp_files): - for node_disks in tmp_files: - for disk in node_disks: - if os.path.exists(disk): - os.remove(disk) - with open(disk, "w") as file: - file.truncate(100 * 1024 * 1024) - - # /tmp is mapped into /host/tmp within the io-engine containers - yield list(map(lambda arr: list(map(lambda file: f"/host{file}", arr)), tmp_files)) - - for node_disks in tmp_files: - for disk in node_disks: - if os.path.exists(disk): - os.remove(disk) +def node_disks(nodes): + pools = Deployer.create_disks(1, size=100 * 1024 * 1024) + yield pools + Deployer.cleanup_disks(len(pools)) @pytest.fixture @@ -135,9 +111,8 @@ def pool(node_disks, nodes): assert len(nodes) > 0 assert len(node_disks) > 0 node = nodes[0] - disks = node_disks[0] pool = ApiClient.pools_api().put_node_pool( - node.id, f"{node.id}-pool", CreatePoolBody([disks[0]]) + node.id, f"{node.id}-pool", CreatePoolBody([node_disks[0]]) ) yield pool diff --git a/tests/bdd/features/rebuild/test_log_based_rebuild.py b/tests/bdd/features/rebuild/test_log_based_rebuild.py index 8c52abaa7..5b2795e7d 100644 --- a/tests/bdd/features/rebuild/test_log_based_rebuild.py +++ b/tests/bdd/features/rebuild/test_log_based_rebuild.py @@ -86,27 +86,10 @@ def init_scenario(init, disks): @pytest.fixture -def tmp_files(): - files = [] - for itr in range(NUM_VOLUME_REPLICAS + 1): - files.append(f"/tmp/node-{itr + 1}-disk") - yield files - - -@pytest.fixture -def disks(tmp_files): - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) - with open(disk, "w") as file: - file.truncate(POOL_SIZE) - - # /tmp is mapped into /host/tmp within the io-engine containers - yield list(map(lambda file: f"/host{file}", tmp_files)) - - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) +def disks(): + pools = Deployer.create_disks(NUM_VOLUME_REPLICAS + 1, size=POOL_SIZE) + yield pools + Deployer.cleanup_disks(len(pools)) @scenario( diff --git a/tests/bdd/features/rebuild/test_rebuild.py b/tests/bdd/features/rebuild/test_rebuild.py index 0047fc625..347d72800 100644 --- a/tests/bdd/features/rebuild/test_rebuild.py +++ b/tests/bdd/features/rebuild/test_rebuild.py @@ -42,25 +42,10 @@ @pytest.fixture -def tmp_files(): - files = [] - for index in range(0, POOL_DISK_COUNT): - files.append(f"/tmp/disk_{index}") - yield files - - -@pytest.fixture -def disks(tmp_files): - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) - with open(disk, "w") as file: - file.truncate(100 * 1024 * 1024) - # /tmp is mapped into /host/tmp within the io-engine containers - yield list(map(lambda file: f"/host{file}", tmp_files)) - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) +def disks(): + pools = Deployer.create_disks(POOL_DISK_COUNT, size=100 * 1024 * 1024) + yield pools + Deployer.cleanup_disks(len(pools)) @pytest.fixture(scope="module") diff --git a/tests/bdd/features/snapshot/create/test_feature.py b/tests/bdd/features/snapshot/create/test_feature.py index 3f3a3b34e..be669bb3b 100644 --- a/tests/bdd/features/snapshot/create/test_feature.py +++ b/tests/bdd/features/snapshot/create/test_feature.py @@ -34,26 +34,10 @@ @pytest.fixture(scope="module") -def tmp_files(): - files = [] - for index in range(2): - files.append(f"/tmp/disk_{index}") - yield files - - -@pytest.fixture(scope="module") -def disks(tmp_files): - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) - with open(disk, "w") as file: - file.truncate(POOL_SIZE) - # /tmp is mapped into /host/tmp within the io-engine containers - yield list(map(lambda file: f"/host{file}", tmp_files)) - - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) +def disks(): + pools = Deployer.create_disks(2, size=POOL_SIZE) + yield pools + Deployer.cleanup_disks(len(pools)) @pytest.fixture(scope="module") diff --git a/tests/bdd/features/snapshot/garbage_collection/test_garbage_collection.py b/tests/bdd/features/snapshot/garbage_collection/test_garbage_collection.py index 2b91a0e06..f2e3a7c14 100644 --- a/tests/bdd/features/snapshot/garbage_collection/test_garbage_collection.py +++ b/tests/bdd/features/snapshot/garbage_collection/test_garbage_collection.py @@ -39,15 +39,9 @@ def test_garbage_collection_for_stuck_creating_snapshots_when_source_is_deleted( @pytest.fixture(scope="module") def create_pool_disk_images(): - # Create the file. - path = "/tmp/{}".format(POOL_DISK1) - with open(path, "w") as file: - file.truncate(100 * 1024 * 1024) - - yield - # Clear the file - if os.path.exists(path): - os.remove(path) + pools = Deployer.create_disks(1, size=100 * 1024 * 1024) + yield pools + Deployer.cleanup_disks(len(pools)) @pytest.fixture(scope="module") @@ -63,7 +57,7 @@ def setup(create_pool_disk_images): ApiClient.pools_api().put_node_pool( NODE1, POOL1_NAME, - CreatePoolBody(["aio:///host/tmp/{}".format(POOL_DISK1)]), + CreatePoolBody(["aio://{}".format(create_pool_disk_images[0])]), ) pytest.exception = None yield diff --git a/tests/bdd/features/snapshot/list/test_list.py b/tests/bdd/features/snapshot/list/test_list.py index b5071cb92..28a489dcf 100644 --- a/tests/bdd/features/snapshot/list/test_list.py +++ b/tests/bdd/features/snapshot/list/test_list.py @@ -77,15 +77,9 @@ def test_volume_snapshots_list_involving_disrupted_node(): @pytest.fixture(scope="module") def create_pool_disk_images(): - # Create the file. - path = "/tmp/{}".format(POOL_DISK1) - with open(path, "w") as file: - file.truncate(800 * 1024 * 1024) - - yield - # Clear the file - if os.path.exists(path): - os.remove(path) + pools = Deployer.create_disks(1, size=800 * 1024 * 1024) + yield pools + Deployer.cleanup_disks(len(pools)) @pytest.fixture(scope="module") @@ -98,7 +92,7 @@ def setup(create_pool_disk_images): ApiClient.pools_api().put_node_pool( NODE1, POOL1_NAME, - CreatePoolBody(["aio:///host/tmp/{}".format(POOL_DISK1)]), + CreatePoolBody(["aio://{}".format(create_pool_disk_images[0])]), ) pytest.exception = None yield diff --git a/tests/bdd/features/volume/create/test_feature.py b/tests/bdd/features/volume/create/test_feature.py index 6c92a2139..97bac2d16 100644 --- a/tests/bdd/features/volume/create/test_feature.py +++ b/tests/bdd/features/volume/create/test_feature.py @@ -58,26 +58,10 @@ def init_scenario(init, disks): @pytest.fixture -def tmp_files(): - files = [] - for index in range(0, 1): - files.append(f"/tmp/disk_{index}") - yield files - - -@pytest.fixture -def disks(tmp_files): - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) - with open(disk, "w") as file: - file.truncate(100 * 1024 * 1024) - # /tmp is mapped into /host/tmp within the io-engine containers - yield list(map(lambda file: f"/host{file}", tmp_files)) - - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) +def disks(): + pools = Deployer.create_disks(1, size=100 * 1024 * 1024) + yield pools + Deployer.cleanup_disks(len(pools)) # Fixture used to pass the volume create request between test steps. diff --git a/tests/bdd/features/volume/nexus/test_feature.py b/tests/bdd/features/volume/nexus/test_feature.py index e4a51ada6..f8a4f3ed8 100644 --- a/tests/bdd/features/volume/nexus/test_feature.py +++ b/tests/bdd/features/volume/nexus/test_feature.py @@ -97,26 +97,14 @@ def the_nexus_target_shall_be_removed_from_its_associated_node(): IO_ENGINE_1 = "io-engine-1" IO_ENGINE_2 = "io-engine-2" -POOL_DISK1 = "cdisk1.img" -POOL1_UUID = "4cc6ee64-7232-497d-a26f-38284a444980" -POOL_DISK2 = "cdisk2.img" -POOL2_UUID = "4cc6ee64-7232-497d-a26f-38284a444990" +POOL_UUID = "4cc6ee64-7232-497d-a26f-38284a444990" @pytest.fixture(scope="function") def create_pool_disk_images(): - # When starting Io-Engine instances with the deployer a bind mount is created from /tmp to - # /host/tmp, so create disk images in /tmp - for disk in [POOL_DISK1, POOL_DISK2]: - path = f"/tmp/{disk}" - with open(path, "w") as file: - file.truncate(20 * 1024 * 1024) - - yield - for disk in [POOL_DISK1, POOL_DISK2]: - path = f"/tmp/{disk}" - if os.path.exists(path): - os.remove(path) + pools = Deployer.create_disks(1, size=20 * 1024 * 1024) + yield pools + Deployer.cleanup_disks(len(pools)) @pytest.fixture(scope="function") @@ -127,8 +115,8 @@ def init(create_pool_disk_images): # Create pools ApiClient.pools_api().put_node_pool( IO_ENGINE_2, - POOL2_UUID, - CreatePoolBody([f"aio:///host/tmp/{POOL_DISK2}"], labels={"node": IO_ENGINE_2}), + POOL_UUID, + CreatePoolBody([create_pool_disk_images[0]], labels={"node": IO_ENGINE_2}), ) yield diff --git a/tests/bdd/features/volume/replicas/test_feature.py b/tests/bdd/features/volume/replicas/test_feature.py index 882713edd..16e6749eb 100644 --- a/tests/bdd/features/volume/replicas/test_feature.py +++ b/tests/bdd/features/volume/replicas/test_feature.py @@ -50,25 +50,10 @@ def background(): @pytest.fixture -def tmp_files(): - files = [] - for index in range(0, NUM_IO_ENGINES): - files.append(f"/tmp/disk_{index}") - yield files - - -@pytest.fixture -def disks(tmp_files): - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) - with open(disk, "w") as file: - file.truncate(100 * 1024 * 1024) - # /tmp is mapped into /host/tmp within the io-engine containers - yield list(map(lambda file: f"/host{file}", tmp_files)) - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) +def disks(): + pools = Deployer.create_disks(NUM_IO_ENGINES, size=100 * 1024 * 1024) + yield pools + Deployer.cleanup_disks(len(pools)) @pytest.fixture diff --git a/tests/bdd/features/volume/resize/test_resize_offline.py b/tests/bdd/features/volume/resize/test_resize_offline.py index 3a45063d1..de84a7cd6 100644 --- a/tests/bdd/features/volume/resize/test_resize_offline.py +++ b/tests/bdd/features/volume/resize/test_resize_offline.py @@ -185,27 +185,10 @@ def a_deployer_cluster(): # Fixtures - BEGIN @pytest.fixture(scope="module") -def tmp_files(): - files = [] - for itr in range(DEFAULT_REPLICA_CNT + 1): - files.append(f"/tmp/node-{itr + 1}-disk") - yield files - - -@pytest.fixture(scope="module") -def disks(tmp_files): - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) - with open(disk, "w") as file: - file.truncate(DEFAULT_POOL_SIZE) - - # /tmp is mapped into /host/tmp within the io-engine containers - yield list(map(lambda file: f"/host{file}", tmp_files)) - - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) +def disks(): + pools = Deployer.create_disks(DEFAULT_REPLICA_CNT + 1, size=DEFAULT_POOL_SIZE) + yield pools + Deployer.cleanup_disks(len(pools)) # Fixtures - END diff --git a/tests/bdd/features/volume/resize/test_resize_online.py b/tests/bdd/features/volume/resize/test_resize_online.py index bfb051791..70cd237dd 100644 --- a/tests/bdd/features/volume/resize/test_resize_online.py +++ b/tests/bdd/features/volume/resize/test_resize_online.py @@ -62,27 +62,10 @@ # fixtures - BEGIN @pytest.fixture(scope="module") -def tmp_files(): - files = [] - for itr in range(DEFAULT_REPLICA_CNT + 1): - files.append(f"/tmp/node-{itr + 1}-disk") - yield files - - -@pytest.fixture(scope="module") -def disks(tmp_files): - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) - with open(disk, "w") as file: - file.truncate(DEFAULT_POOL_SIZE) - - # /tmp is mapped into /host/tmp within the io-engine containers - yield list(map(lambda file: f"/host{file}", tmp_files)) - - for disk in tmp_files: - if os.path.exists(disk): - os.remove(disk) +def disks(): + pools = Deployer.create_disks(DEFAULT_REPLICA_CNT + 1, size=DEFAULT_POOL_SIZE) + yield pools + Deployer.cleanup_disks(len(pools)) @pytest.fixture(autouse=True, scope="module") diff --git a/utils/deployer-cluster/src/lib.rs b/utils/deployer-cluster/src/lib.rs index 1a275a7f6..74dcb0c38 100644 --- a/utils/deployer-cluster/src/lib.rs +++ b/utils/deployer-cluster/src/lib.rs @@ -1,3 +1,4 @@ +pub mod lvm; pub mod rest_client; use composer::{Builder, ComposeTest}; @@ -202,7 +203,7 @@ impl Cluster { Some(opts) => opts, None => TimeoutOptions::new() .with_req_timeout(Duration::from_millis(500)) - .with_max_retries(10), + .with_max_retries(20), }; for x in 1 .. timeout_opts.max_retries().unwrap_or_default() { match client @@ -619,6 +620,10 @@ impl TmpDiskFile { pub fn uri(&self) -> &str { self.inner.uri() } + /// Disk path on the host. + pub fn path(&self) -> &str { + &self.inner.path + } /// Get the inner disk if there are no other references to it. pub fn into_inner(self) -> Result> { @@ -633,20 +638,19 @@ impl TmpDiskFileInner { disk } fn make_new(name: &str) -> Self { - let path = Self::make_path(name); + let (path, container) = Self::make_path(name); + let pool_id = PoolId::new(); Self { - // the io-engine is setup with a bind mount from /tmp to /host/tmp - uri: format!( - "aio:///host{}?blk_size=512&uuid={}", - path, - transport::PoolId::new() - ), + // the io-engine is setup with a bind mount from /workspace/tmp to /host/tmp + uri: format!("aio://{container}?blk_size=512&uuid={pool_id}"), path, cleanup: true, } } - fn make_path(name: &str) -> String { - format!("/tmp/io-engine-disk-{name}") + fn make_path(name: &str) -> (String, String) { + let file = format!("io-engine-disk-{name}"); + let host_tmp = deployer_lib::host_tmp().expect("workspace error"); + (format!("{host_tmp}/{file}"), format!("/host/tmp/{file}")) } fn uri(&self) -> &str { &self.uri diff --git a/utils/deployer-cluster/src/lvm.rs b/utils/deployer-cluster/src/lvm.rs new file mode 100644 index 000000000..a944da7c4 --- /dev/null +++ b/utils/deployer-cluster/src/lvm.rs @@ -0,0 +1,100 @@ +//! LVM helper methods which are useful for setting up test block devices. + +use crate::TmpDiskFile; + +/// An LVM Logical Volume. +pub struct Lvol { + name: String, + path: String, +} +impl Lvol { + /// Get the host path for the lvol. + pub fn path(&self) -> &str { + &self.path + } + /// Suspends the device for IO. + pub fn suspend(&self) -> anyhow::Result<()> { + let _ = VolGroup::command(&["dmsetup", "suspend", self.path.as_str()])?; + Ok(()) + } + /// Resumes the device for IO. + pub fn resume(&self) -> anyhow::Result<()> { + let _ = VolGroup::command(&["dmsetup", "resume", self.path.as_str()])?; + Ok(()) + } +} +impl Drop for Lvol { + fn drop(&mut self) { + println!("Dropping Lvol {}", self.name); + self.resume().ok(); + } +} + +/// An LVM Volume Group. +pub struct VolGroup { + backing_file: TmpDiskFile, + dev_loop: String, + name: String, +} + +impl VolGroup { + /// Creates a new LVM Volume Group. + pub fn new(name: &str, size: u64) -> Result { + let backing_file = TmpDiskFile::new(name, size); + + let dev_loop = Self::command(&["losetup", "--show", "-f", backing_file.path()])?; + let dev_loop = dev_loop.trim_end().to_string(); + let _ = Self::command(&["pvcreate", dev_loop.as_str()])?; + let _ = Self::command(&["vgcreate", name, dev_loop.as_str()])?; + + Ok(Self { + backing_file, + dev_loop, + name: name.to_string(), + }) + } + /// Creates a new Lvol for the LVM Volume Group. + pub fn create_lvol(&self, name: &str, size: u64) -> Result { + let size = format!("{size}B"); + + let vg_name = self.name.as_str(); + let _ = Self::command(&["lvcreate", "-L", size.as_str(), "-n", name, vg_name])?; + + Ok(Lvol { + name: name.to_string(), + path: format!("/dev/{vg_name}/{name}"), + }) + } + /// Run a command with sudo, and the given args. + /// The output string is returned. + fn command(args: &[&str]) -> Result { + let cmd = args.first().unwrap(); + let output = std::process::Command::new(env!("SUDO")) + .arg("-E") + .args(args) + .output()?; + if !output.status.success() { + return Err(anyhow::anyhow!( + "{cmd} Exit Code: {}\nstdout: {}, stderr: {}", + output.status, + String::from_utf8(output.stdout).unwrap_or_default(), + String::from_utf8(output.stderr).unwrap_or_default() + )); + } + let output = String::from_utf8(output.stdout)?; + Ok(output) + } +} + +impl Drop for VolGroup { + fn drop(&mut self) { + println!( + "Dropping VolGroup {} <== {}", + self.name, + self.backing_file.path() + ); + + let _ = Self::command(&["vgremove", "-y", self.name.as_str()]); + let _ = Self::command(&["losetup", "-d", self.dev_loop.as_str()]); + } +}