Skip to content

Commit

Permalink
use u16 for the concurrency setting; modify runner
Browse files Browse the repository at this point in the history
Signed-off-by: Sanskar Jaiswal <jaiswalsanskar078@gmail.com>
  • Loading branch information
aryan9600 committed Aug 16, 2023
1 parent d99370c commit 5e7db07
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 17 deletions.
12 changes: 8 additions & 4 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ where
#[derive(Clone, Debug, Default)]
pub struct Config {
debounce: Duration,
concurrency: usize,
concurrency: u16,
}

impl Config {
Expand All @@ -448,12 +448,16 @@ impl Config {
self
}

/// The number of concurrent reconciliations that are allowed to run at any
/// given moment. This can be adjusted to the controller's needs to increase
/// The number of concurrent reconciliations of that are allowed to run at an given moment.
///
/// This can be adjusted to the controller's needs to increase
/// performance and/or make performance predictable. By default, its 0 meaning
/// the controller runs with unbounded concurrency.
///
/// Note that despite concurrency, a controller never schedules concurrent reconciles
/// on the same object.
#[must_use]
pub fn concurrency(mut self, concurrency: usize) -> Self {
pub fn concurrency(mut self, concurrency: u16) -> Self {
self.concurrency = concurrency;
self
}
Expand Down
63 changes: 50 additions & 13 deletions kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct Runner<T, R, F, MkF, Ready = future::Ready<Result<(), Infallible>>> {
ready_to_execute_after: future::Fuse<Ready>,
is_ready_to_execute: bool,
stopped: bool,
max_concurrent_executions: usize,
max_concurrent_executions: u16,
}

impl<T, R, F, MkF> Runner<T, R, F, MkF>
Expand All @@ -43,7 +43,7 @@ where
/// Creates a new [`Runner`]. [`max_concurrent_executions`] can be used to
/// limit the number of items are run concurrently. It can be set to 0 to
/// allow for unbounded concurrency.
pub fn new(scheduler: Scheduler<T, R>, max_concurrent_executions: usize, run_msg: MkF) -> Self {
pub fn new(scheduler: Scheduler<T, R>, max_concurrent_executions: u16, run_msg: MkF) -> Self {
Self {
scheduler,
run_msg,
Expand Down Expand Up @@ -112,13 +112,22 @@ where
// If we are at our limit or not ready to start executing, then there's
// no point in trying to get something from the scheduler, so just put
// all expired messages emitted from the queue into pending.
if (*this.max_concurrent_executions > 0 && slots.len() >= *this.max_concurrent_executions)
if (*this.max_concurrent_executions > 0
&& slots.len() >= *this.max_concurrent_executions as usize)
|| !*this.is_ready_to_execute
{
match scheduler.as_mut().project().pop_queue_message_into_pending(cx) {
match scheduler.as_mut().hold().poll_next_unpin(cx) {
Poll::Pending => break Poll::Pending,
// Since the above method never returns anything other than Poll::Pending
// we don't need to handle any other variant.
Poll::Ready(None) => {
break if has_active_slots {
// We're done listening for new messages, but still have some that
// haven't finished quite yet
Poll::Pending
} else {
Poll::Ready(None)
};
}
// The above future never returns Poll::Ready(Some(_)).
_ => unreachable!(),
};
};
Expand Down Expand Up @@ -163,18 +172,21 @@ mod tests {
};
use futures::{
channel::{mpsc, oneshot},
future, poll, stream, SinkExt, StreamExt, TryStreamExt,
future::{self},
poll, stream, Future, SinkExt, StreamExt, TryStreamExt,
};
use std::{
cell::RefCell,
collections::HashMap,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
time::Duration,
};
use tokio::{
runtime::Handle,
task::yield_now,
time::{pause, sleep, timeout, Instant},
time::{advance, pause, sleep, timeout, Instant},
};

#[tokio::test]
Expand Down Expand Up @@ -362,6 +374,33 @@ mod tests {
));
}

// A Future that is Ready after the specified duration from its initialization.
struct ConditionalFuture {
start: Instant,
expires_in: Duration,
}

impl ConditionalFuture {
fn new(expires_in: Duration) -> Self {
let start = Instant::now();
ConditionalFuture { start, expires_in }
}
}

impl Future for ConditionalFuture {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let now = Instant::now();
if now.duration_since(self.start) > self.expires_in {
Poll::Ready(())
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
}

#[tokio::test]
async fn runner_should_respect_max_concurrent_executions() {
pause();
Expand All @@ -372,9 +411,7 @@ mod tests {
Runner::new(scheduler(sched_rx), 2, |_| {
let mut num = count.lock().unwrap();
*num += 1;
Box::pin(async move {
sleep(Duration::from_secs(2)).await;
})
ConditionalFuture::new(Duration::from_secs(2))
})
.for_each(|_| async {}),
);
Expand Down Expand Up @@ -406,9 +443,9 @@ mod tests {
// Assert that we only ran two out of the three requests
assert_eq!(*count.lock().unwrap(), 2);

let _ = sleep(Duration::from_secs(5));
advance(Duration::from_secs(3)).await;
assert!(poll!(runner.as_mut()).is_pending());
// Assert that we run the third request when we have the capacity to
assert_eq!(*count.lock().unwrap(), 2);
assert_eq!(*count.lock().unwrap(), 3);
}
}
43 changes: 43 additions & 0 deletions kube-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
}
}

/// Attempt to retrieve a message from queue and mark it as pending.
pub fn pop_queue_message_into_pending(&mut self, cx: &mut Context<'_>) -> Poll<T> {
loop {
match self.queue.poll_expired(cx) {
Expand All @@ -140,6 +141,40 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
}
}

/// See [`Scheduler::hold`]
pub struct Hold<'a, T, R> {
scheduler: Pin<&'a mut Scheduler<T, R>>,
}

impl<'a, T, R> Stream for Hold<'a, T, R>
where
T: Eq + Hash + Clone,
R: Stream<Item = ScheduleRequest<T>>,
{
type Item = T;

#[allow(clippy::match_wildcard_for_single_variants)]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let mut scheduler = this.scheduler.as_mut().project();

loop {
match scheduler.requests.as_mut().poll_next(cx) {
Poll::Ready(Some(request)) => scheduler.schedule_message(request),
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => break,
}
}

match scheduler.pop_queue_message_into_pending(cx) {
Poll::Pending => Poll::Pending,
// Since the above method never returns anything other than Poll::Pending
// we don't need to handle any other variant.
_ => unreachable!(),
}
}
}

/// See [`Scheduler::hold_unless`]
pub struct HoldUnless<'a, T, R, C> {
scheduler: Pin<&'a mut Scheduler<T, R>>,
Expand Down Expand Up @@ -196,6 +231,14 @@ where
}
}

/// A restricted view of the [`Scheduler`], which will keep all items "pending".
/// Its equivalent to doing `self.hold_unless(|_| false)` and is useful when the
/// consumer is not ready to consume the expired messages that the [`Scheduler`] emits.
#[must_use]
pub fn hold(self: Pin<&mut Self>) -> Hold<T, R> {
Hold { scheduler: self }
}

/// Checks whether `msg` is currently a pending message (held by `hold_unless`)
#[cfg(test)]
pub fn contains_pending(&self, msg: &T) -> bool {
Expand Down

0 comments on commit 5e7db07

Please sign in to comment.