Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Applier: Improve reconciler reschedule context to avoid deadlocking on full channel #932

Merged
merged 7 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kube-derive/src/custom_resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,6 @@ mod tests {
struct FooSpec { foo: String }
};
let input = syn::parse2(input).unwrap();
let kube_attrs = KubeAttrs::from_derive_input(&input).unwrap();
let _kube_attrs = KubeAttrs::from_derive_input(&input).unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this is fixed in master

Copy link
Member Author

@nightkr nightkr Jun 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't look like it, the only PR that wasn't included in this branch was #931, which didn't touch this.

Copy link
Member

@clux clux Jun 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, my bad. turns out i had a different fix for it (by testing more) in https://github.com/kube-rs/kube-rs/pull/924/files but it hasn't been reviewed and thus not made it into master

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh

}
}
254 changes: 197 additions & 57 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ use derivative::Derivative;
use futures::{
channel,
future::{self, BoxFuture},
stream, Future, FutureExt, SinkExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
ready, stream, Future, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
};
use kube_client::api::{Api, DynamicObject, ListParams, Resource};
use pin_project::pin_project;
use serde::de::DeserializeOwned;
use std::{
fmt::{Debug, Display},
hash::Hash,
sync::Arc,
task::Poll,
time::Duration,
};
use stream::BoxStream;
Expand Down Expand Up @@ -202,6 +204,8 @@ impl Display for ReconcileReason {
}
}

const APPLIER_REQUEUE_BUF_SIZE: usize = 100;

/// Apply a reconciler to an input stream, with a given retry policy
///
/// Takes a `store` parameter for the core objects, which should usually be updated by a [`reflector`].
Expand All @@ -215,7 +219,7 @@ impl Display for ReconcileReason {
/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
mut error_policy: impl FnMut(&ReconcilerFut::Error, Arc<Ctx>) -> Action,
error_policy: impl Fn(&ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>,
store: Store<K>,
queue: QueueStream,
Expand All @@ -230,22 +234,25 @@ where
QueueStream::Error: std::error::Error + 'static,
{
let (scheduler_shutdown_tx, scheduler_shutdown_rx) = channel::oneshot::channel();
let err_context = context.clone();
let (scheduler_tx, scheduler_rx) = channel::mpsc::unbounded::<ScheduleRequest<ReconcileRequest<K>>>();
let (scheduler_tx, scheduler_rx) =
channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(APPLIER_REQUEUE_BUF_SIZE);
Copy link
Member

@clux clux Jun 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some points that i wanted to lift regarding the unbounded channels here because i'm not sure about the full reasoning here. it feels partly defensible to have unbounded queues:

  • there's generally a limit to how many items people put inside clusters because of how impractical it is to have a large-scale one-to-many relations with common items that have realistic cluster bounds (pods generally do not grow forever, people often make new clusters at some point after 10k or higher)
  • large numbers of objects make global reconcilers an O(n^2) problem. constant requeuing, and retriggering in such cases are also likely to waste of resources (node affecting controllers in controller manager are a huge IO hogs in particular)
  • if we are in such a large-scale case where we have 10k+ items in our reflector cache, and we want to reconcile all of them, we first need enough memory to house 10k+ of these specs, and an unbounded queue would at most double resources

but on the other hand. if i understand this queue correctly, it also serves as a limiter of the amount of parallelism in a controller? in that case, limiting it actually makes a lot of sense, because 10k objects being reconciled at once might DoS a third party service. is that a correct understanding of this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite, there are actually four different "queues" going on in applier:

  1. The executor itself, this has no hard limit but is de-duped (so there will only be one task running at any one moment per K8s object)
  2. The "pending reconciliations" queue (for objects that are already reconciling but should be retried as soon as their current job is done) queue, this is also unbounded but deduped
  3. The scheduler (for objects that are scheduled to be reconciled again at some point in the future because they requested it via Action), this is also unbounded but deduped
  4. The "pending scheduling requests" queue, for objects that haven't been organized into one of the previous three queues yet

This PR is only concerned with queue 4, where we have no practical way to implement deduping.

Copy link
Member

@clux clux Jun 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I understand it right:

  • the pending reconciliations queue is pending in Scheduler (i.e. 2)
  • the internal DelayQueue on Scheduler is 3
  • 4 is effectively the queuestream merged with unclassified requeues (scheduler_tx)

and the executor is going to work at its regular pace. ..so this means it is actually possible to reconcile 1000s of reconciles at the same time on re-lists currently? Is that something that is viable to bound at some point, somehow? In the Runner?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I understand it right:

Yes.

and the executor is going to work at its regular pace. ..so this means it is actually possible to reconcile 1000s of reconciles at the same time on re-lists currently? 😬

Yes.

Is that something that is viable to bound at some point, somehow? In the Runner?

Well, we could add additional constraints for when we actually start running a pending reconciliation. That's not implemented at the moment, on the assumption that you could "just" use a semaphore in your reconciler function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing the semaphoring in the executor (which has other benefits, like not having to allocate the task before it actually has a semaphore permit) shouldn't be too difficult either, the main problem there would be that applier's configuration is already getting pretty unwieldy as it is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it makes sense to refactor a lot of appliers configuration into some kind of ApplierParams struct that can be heavily defaulted.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, yeah..

let error_policy = Arc::new(error_policy);
clux marked this conversation as resolved.
Show resolved Hide resolved
// Create a stream of ObjectRefs that need to be reconciled
trystream_try_via(
// input: stream combining scheduled tasks and user specified inputs event
Box::pin(stream::select(
// 1. inputs from users queue stream
queue.map_err(Error::QueueError).map_ok(|request| ScheduleRequest {
message: request.into(),
run_at: Instant::now() + Duration::from_millis(1),
})
.on_complete(async move {
// On error: scheduler has already been shut down and there is nothing for us to do
let _ = scheduler_shutdown_tx.send(());
tracing::debug!("applier queue terminated, starting graceful shutdown")
}),
queue
.map_err(Error::QueueError)
.map_ok(|request| ScheduleRequest {
message: request.into(),
run_at: Instant::now() + Duration::from_millis(1),
})
.on_complete(async move {
// On error: scheduler has already been shut down and there is nothing for us to do
let _ = scheduler_shutdown_tx.send(());
tracing::debug!("applier queue terminated, starting graceful shutdown")
}),
// 2. requests sent to scheduler_tx
scheduler_rx
.map(Ok)
Expand All @@ -258,56 +265,121 @@ where
let request = request.clone();
match store.get(&request.obj_ref) {
Some(obj) => {
let reconciler_span = info_span!("reconciling object", "object.ref" = %request.obj_ref, object.reason = %request.reason);
reconciler_span.in_scope(|| reconciler(obj, context.clone()))
.into_future()
.instrument(reconciler_span.clone())
// Reconciler errors are OK from the applier's PoV, we need to apply the error policy
// to them separately
.map(|res| Ok((request.obj_ref, res, reconciler_span)))
.left_future()
},
None => future::err(
Error::ObjectNotFound(request.obj_ref.erase())
)
.right_future(),
let scheduler_tx = scheduler_tx.clone();
let error_policy_ctx = context.clone();
let error_policy = error_policy.clone();
let reconciler_span = info_span!(
"reconciling object",
"object.ref" = %request.obj_ref,
object.reason = %request.reason
);
reconciler_span
.in_scope(|| reconciler(obj, context.clone()))
.into_future()
.then(move |res| {
let error_policy = error_policy;
RescheduleReconciliation::new(
res,
|err| error_policy(err, error_policy_ctx),
request.obj_ref.clone(),
scheduler_tx,
)
// Reconciler errors are OK from the applier's PoV, we need to apply the error policy
// to them separately
.map(|res| Ok((request.obj_ref, res)))
})
.instrument(reconciler_span)
.left_future()
}
None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(),
}
})
.on_complete(async { tracing::debug!("applier runner terminated") })
},
)
.on_complete(async { tracing::debug!("applier runner-merge terminated") })
// finally, for each completed reconcile call:
.and_then(move |(obj_ref, reconciler_result, reconciler_span)| {
let (Action { requeue_after }, requeue_reason) = match &reconciler_result {
Ok(action) =>
// do what user told us
(action.clone(), ReconcileReason::ReconcilerRequestedRetry),
Err(err) =>
// reconciler fn call failed
(reconciler_span.in_scope(|| error_policy(err, err_context.clone())), ReconcileReason::ErrorPolicyRequestedRetry),
};
let mut scheduler_tx = scheduler_tx.clone();
async move {
// Transmit the requeue request to the scheduler (picked up again at top)
if let Some(delay) = requeue_after {
// Failure to schedule item = in graceful shutdown mode, ignore
let _ = scheduler_tx
.send(ScheduleRequest {
message: ReconcileRequest {obj_ref: obj_ref.clone(), reason: requeue_reason},
run_at: Instant::now() + delay,
})
.await;
}
match reconciler_result {
Ok(action) => Ok((obj_ref, action)),
Err(err) => Err(Error::ReconcilerFailed(err, obj_ref.erase()))
}
.and_then(move |(obj_ref, reconciler_result)| async move {
match reconciler_result {
Ok(action) => Ok((obj_ref, action)),
Err(err) => Err(Error::ReconcilerFailed(err, obj_ref.erase())),
}
})
.on_complete(async { tracing::debug!("applier terminated") })
}

/// Internal helper [`Future`] that reschedules reconciliation of objects (if required), in the scheduled context of the reconciler
///
/// This could be an `async fn`, but isn't because we want it to be [`Unpin`]
#[pin_project]
#[must_use]
struct RescheduleReconciliation<K: Resource, ReconcilerErr> {
reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,

reschedule_request: Option<ScheduleRequest<ReconcileRequest<K>>>,
result: Option<Result<Action, ReconcilerErr>>,
}

impl<K, ReconcilerErr> RescheduleReconciliation<K, ReconcilerErr>
where
K: Resource,
{
fn new(
result: Result<Action, ReconcilerErr>,
error_policy: impl FnOnce(&ReconcilerErr) -> Action,
obj_ref: ObjectRef<K>,
reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
) -> Self {
let reconciler_finished_at = Instant::now();

let (action, reschedule_reason) = result.as_ref().map_or_else(
|err| (error_policy(err), ReconcileReason::ErrorPolicyRequestedRetry),
|action| (action.clone(), ReconcileReason::ReconcilerRequestedRetry),
);

Self {
reschedule_tx,
reschedule_request: action.requeue_after.map(|requeue_after| ScheduleRequest {
message: ReconcileRequest {
obj_ref,
reason: reschedule_reason,
},
run_at: reconciler_finished_at + requeue_after,
}),
result: Some(result),
}
}
}

impl<K, ReconcilerErr> Future for RescheduleReconciliation<K, ReconcilerErr>
where
K: Resource,
{
type Output = Result<Action, ReconcilerErr>;

fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

if this.reschedule_request.is_some() {
let rescheduler_ready = ready!(this.reschedule_tx.poll_ready(cx));
let reschedule_request = this
.reschedule_request
.take()
.expect("PostReconciler::reschedule_request was taken during processing");
// Failure to schedule item = in graceful shutdown mode, ignore
if let Ok(()) = rescheduler_ready {
let _ = this.reschedule_tx.start_send(reschedule_request);
}
}

Poll::Ready(
this.result
.take()
.expect("PostReconciler::result was already taken"),
)
}
}

/// Controller
///
/// A controller is made up of:
Expand Down Expand Up @@ -736,7 +808,7 @@ where
pub fn run<ReconcilerFut, Ctx>(
self,
mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
error_policy: impl FnMut(&ReconcilerFut::Error, Arc<Ctx>) -> Action,
error_policy: impl Fn(&ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>,
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, watcher::Error>>>
where
Expand All @@ -763,12 +835,18 @@ where

#[cfg(test)]
mod tests {
use std::sync::Arc;

use super::Action;
use crate::Controller;
use std::{convert::Infallible, sync::Arc, time::Duration};

use super::{Action, APPLIER_REQUEUE_BUF_SIZE};
use crate::{
applier,
reflector::{self, ObjectRef},
watcher, Controller,
};
use futures::{pin_mut, StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::ConfigMap;
use kube_client::Api;
use kube_client::{core::ObjectMeta, Api};
use tokio::time::timeout;

fn assert_send<T: Send>(x: T) -> T {
x
Expand All @@ -791,4 +869,66 @@ mod tests {
),
);
}

#[tokio::test]
async fn applier_must_not_deadlock_if_reschedule_buffer_fills() {
// This tests that `applier` handles reschedule queue backpressure correctly, by trying to flood it with no-op reconciles
// This is intended to avoid regressing on https://github.com/kube-rs/kube-rs/issues/926

// Assume that we can keep APPLIER_REQUEUE_BUF_SIZE flooded if we have 100x the number of objects "in rotation"
// On my (@teozkr)'s 3900X I can reliably trigger this with 10x, but let's have some safety margin to avoid false negatives
let items = APPLIER_REQUEUE_BUF_SIZE * 50;
// Assume that everything's OK if we can reconcile every object 3 times on average
let reconciles = items * 3;

let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::<ObjectRef<ConfigMap>>();
let (store_rx, mut store_tx) = reflector::store();
let applier = applier(
|obj, _| {
Box::pin(async move {
// Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately
println!("reconciling {:?}", obj.metadata.name);
Ok(Action::requeue(Duration::ZERO))
})
},
|_: &Infallible, _| todo!(),
Arc::new(()),
store_rx,
queue_rx.map(Result::<_, Infallible>::Ok),
);
clux marked this conversation as resolved.
Show resolved Hide resolved
pin_mut!(applier);
for i in 0..items {
let obj = ConfigMap {
metadata: ObjectMeta {
name: Some(format!("cm-{i}")),
namespace: Some("default".to_string()),
..Default::default()
},
..Default::default()
};
store_tx.apply_watcher_event(&watcher::Event::Applied(obj.clone()));
queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap();
}

timeout(
Duration::from_secs(10),
applier
.as_mut()
.take(reconciles)
.try_for_each(|_| async { Ok(()) }),
)
.await
.expect("test timeout expired, applier likely deadlocked")
.unwrap();

// Do an orderly shutdown to ensure that no individual reconcilers are stuck
drop(queue_tx);
timeout(
Duration::from_secs(10),
applier.try_for_each(|_| async { Ok(()) }),
)
.await
.expect("applier cleanup timeout expired, individual reconciler likely deadlocked?")
.unwrap();
}
}
33 changes: 33 additions & 0 deletions kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,36 @@ pub(crate) trait KubeRuntimeStreamExt: Stream + Sized {
}

impl<S: Stream> KubeRuntimeStreamExt for S {}

#[cfg(test)]
mod tests {
use std::convert::Infallible;

use futures::stream::{self, StreamExt};

use super::trystream_try_via;

// Type-level test does not need to be executed
#[allow(dead_code)]
fn trystream_try_via_should_be_able_to_borrow() {
struct WeirdComplexObject {}
impl Drop for WeirdComplexObject {
fn drop(&mut self) {}
}

let mut x = WeirdComplexObject {};
let y = WeirdComplexObject {};
drop(trystream_try_via(
Box::pin(stream::once(async {
let _ = &mut x;
Result::<_, Infallible>::Ok(())
})),
|s| {
s.map(|_| {
let _ = &y;
Ok(())
})
},
));
}
}