diff --git a/CHANGELOG.md b/CHANGELOG.md index 27117db3f..8b7036626 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Changed + +- Objects are now streamed rather than polled when waiting for them to be deleted ([#452]). + +[#452]: https://github.com/stackabletech/operator-rs/pull/452 + ## [0.24.0] - 2022-08-04 ### Added diff --git a/Cargo.toml b/Cargo.toml index 68e285d4c..94cb65ecf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,6 @@ thiserror = "1.0.31" tokio = { version = "1.20.1", features = ["macros", "rt-multi-thread"] } tracing = "0.1.35" tracing-subscriber = { version = "0.3.15", features = ["env-filter"] } -backoff = "0.4.0" derivative = "2.2.0" tracing-opentelemetry = "0.17.4" opentelemetry = { version = "0.17.0", features = ["rt-tokio"] } diff --git a/src/client.rs b/src/client.rs index cbf0437c3..a85f8a349 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,21 +1,20 @@ -use crate::error::OperatorResult; +use crate::error::{Error, OperatorResult}; use crate::label_selector; -use backoff::backoff::Backoff; -use backoff::ExponentialBackoff; use either::Either; use futures::StreamExt; use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector; use kube::api::{DeleteParams, ListParams, Patch, PatchParams, PostParams, Resource, ResourceExt}; use kube::client::Client as KubeClient; use kube::core::Status; +use kube::runtime::wait::delete::delete_and_finalize; use kube::runtime::WatchStreamExt; use kube::{Api, Config}; use serde::de::DeserializeOwned; use serde::Serialize; use std::convert::TryFrom; use std::fmt::{Debug, Display}; -use tracing::{error, info, trace}; +use tracing::trace; /// This `Client` can be used to access Kubernetes. /// It wraps an underlying [kube::client::Client] and provides some common functionality. @@ -370,45 +369,21 @@ impl Client { /// from Kubernetes pub async fn ensure_deleted(&self, resource: T) -> OperatorResult<()> where - T: Clone + Debug + DeserializeOwned + Resource, + T: Clone + Debug + DeserializeOwned + Resource + Send + 'static, ::DynamicType: Default, { - let mut backoff_strategy = ExponentialBackoff { - max_elapsed_time: None, - ..ExponentialBackoff::default() - }; - - self.delete(&resource).await?; - - loop { - if self - .get_opt::(&resource.name_any(), resource.namespace().as_deref()) - .await? - .is_none() - { - return Ok(()); - } - - // When backoff returns `None` the timeout has expired - match backoff_strategy.next_backoff() { - Some(backoff) => { - info!( - "Waiting [{}] seconds before trying again..", - backoff.as_secs() - ); - tokio::time::sleep(backoff).await; - } - None => { - // We offer no way of specifying a timeout, so this shouldn't happen, - // if it does we'll log an error for now and continue iterating and wait for - // the last interval we saw - error!( - "Waiting for deletion timed out, but no timeout was specified, this is an error and should not happen!" - ); - tokio::time::sleep(backoff_strategy.current_interval).await; - } - } - } + Ok(delete_and_finalize( + self.get_api::(resource.namespace().as_deref()), + resource + .meta() + .name + .as_deref() + .ok_or(Error::MissingObjectKey { + key: "metadata.name", + })?, + &self.delete_params, + ) + .await?) } /// Returns an [kube::Api] object which is either namespaced or not depending on whether diff --git a/src/error.rs b/src/error.rs index eb4fac511..9b80b031c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -15,6 +15,12 @@ pub enum Error { source: kube::Error, }, + #[error("Kubernetes failed to delete object: {source}")] + KubeDeleteError { + #[from] + source: kube::runtime::wait::delete::Error, + }, + #[error("Object is missing key: {key}")] MissingObjectKey { key: &'static str },