Skip to content

[Merged by Bors] - Use a watch for waiting for deleted objects rather than polling #452

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

### Changed

- Objects are now streamed rather than polled when waiting for them to be deleted ([#452]).

[#452]: https://github.com/stackabletech/operator-rs/pull/452

## [0.24.0] - 2022-08-04

### Added
Expand Down
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ thiserror = "1.0.31"
tokio = { version = "1.20.1", features = ["macros", "rt-multi-thread"] }
tracing = "0.1.35"
tracing-subscriber = { version = "0.3.15", features = ["env-filter"] }
backoff = "0.4.0"
derivative = "2.2.0"
tracing-opentelemetry = "0.17.4"
opentelemetry = { version = "0.17.0", features = ["rt-tokio"] }
Expand Down
57 changes: 16 additions & 41 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
use crate::error::OperatorResult;
use crate::error::{Error, OperatorResult};
use crate::label_selector;

use backoff::backoff::Backoff;
use backoff::ExponentialBackoff;
use either::Either;
use futures::StreamExt;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use kube::api::{DeleteParams, ListParams, Patch, PatchParams, PostParams, Resource, ResourceExt};
use kube::client::Client as KubeClient;
use kube::core::Status;
use kube::runtime::wait::delete::delete_and_finalize;
use kube::runtime::WatchStreamExt;
use kube::{Api, Config};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::convert::TryFrom;
use std::fmt::{Debug, Display};
use tracing::{error, info, trace};
use tracing::trace;

/// This `Client` can be used to access Kubernetes.
/// It wraps an underlying [kube::client::Client] and provides some common functionality.
Expand Down Expand Up @@ -370,45 +369,21 @@ impl Client {
/// from Kubernetes
pub async fn ensure_deleted<T>(&self, resource: T) -> OperatorResult<()>
where
T: Clone + Debug + DeserializeOwned + Resource,
T: Clone + Debug + DeserializeOwned + Resource + Send + 'static,
<T as Resource>::DynamicType: Default,
{
let mut backoff_strategy = ExponentialBackoff {
max_elapsed_time: None,
..ExponentialBackoff::default()
};

self.delete(&resource).await?;

loop {
if self
.get_opt::<T>(&resource.name_any(), resource.namespace().as_deref())
.await?
.is_none()
{
return Ok(());
}

// When backoff returns `None` the timeout has expired
match backoff_strategy.next_backoff() {
Some(backoff) => {
info!(
"Waiting [{}] seconds before trying again..",
backoff.as_secs()
);
tokio::time::sleep(backoff).await;
}
None => {
// We offer no way of specifying a timeout, so this shouldn't happen,
// if it does we'll log an error for now and continue iterating and wait for
// the last interval we saw
error!(
"Waiting for deletion timed out, but no timeout was specified, this is an error and should not happen!"
);
tokio::time::sleep(backoff_strategy.current_interval).await;
}
}
}
Ok(delete_and_finalize(
self.get_api::<T>(resource.namespace().as_deref()),
resource
.meta()
.name
.as_deref()
.ok_or(Error::MissingObjectKey {
key: "metadata.name",
})?,
&self.delete_params,
)
.await?)
}

/// Returns an [kube::Api] object which is either namespaced or not depending on whether
Expand Down
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ pub enum Error {
source: kube::Error,
},

#[error("Kubernetes failed to delete object: {source}")]
KubeDeleteError {
#[from]
source: kube::runtime::wait::delete::Error,
},

#[error("Object is missing key: {key}")]
MissingObjectKey { key: &'static str },

Expand Down