Skip to content

Commit

Permalink
Runtime: Add WatchStreamExt::subscribe (kube-rs#1131)
Browse files Browse the repository at this point in the history
* Added stream_subscribe which allows watch streams to have additional event subscribers

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* StreamSubscribe now wraps items in arcs so that:

1. Remove expensive cloning of Kubernetes objects
2. Allow propogation of err events

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Renamed watch_ext subscribable to subscribe

Co-authored-by: Eirik A <sszynrae@gmail.com>
Signed-off-by: Dan Spencer <danrspen@gmail.com>

* StreamSubscribe now allows subscribers how to handle lagging

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Fixed clippy errors in StreamSubscribe

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Fixed grammar in StreamSubscribe docs

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Fixed rustfmt errors in StreamSubscribe

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Improved the documentation for WatchStreamExt::stream_subscribe method.

Also renamed WatchStreamExt::subscribe to WatchStreamExt::stream_subscribe. The compiler was unable to tell if we were trying to call WatchStreamExt::subscribe or StreamSubscribe::subscribe when they were named the same.

e.g. this code would not compile:

let stream_subscribe = stream.subscribe();
let subscription = stream_subscribe.subscribe();

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Put StreamSubscribe behind a feature flag unstable_runtime_subscribe

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Update kube-runtime/src/utils/mod.rs

Co-authored-by: Eirik A <sszynrae@gmail.com>
Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Fixed rustfmt error in kube-runtime utils mod.rs

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Fixed incorrect feature flag usage for the unstable-runtime-subscribe feature

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Made substream_subscribe pub so that its error can be accessed / matched on by consumers

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Fixed issue with doctest for stream_subscribe

Signed-off-by: Dan Spencer <danrspen@gmail.com>

---------

Signed-off-by: Dan Spencer <danrspen@gmail.com>
Co-authored-by: Eirik A <sszynrae@gmail.com>
  • Loading branch information
2 people authored and jmintb committed Mar 6, 2023
1 parent c762933 commit 537c34b
Show file tree
Hide file tree
Showing 5 changed files with 318 additions and 2 deletions.
6 changes: 5 additions & 1 deletion kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ categories = ["web-programming::http-client", "caching", "network-programming"]
rust-version = "1.63.0"
edition = "2021"

[features]
unstable-runtime = ["unstable-runtime-subscribe"]
unstable-runtime-subscribe = []

[package.metadata.docs.rs]
features = ["k8s-openapi/v1_26"]
features = ["k8s-openapi/v1_26", "unstable-runtime"]
# Define the configuration attribute `docsrs`. Used to enable `doc_cfg` feature.
rustdoc-args = ["--cfg", "docsrs"]

Expand Down
3 changes: 3 additions & 0 deletions kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
mod backoff_reset_timer;
mod event_flatten;
mod stream_backoff;
#[cfg(feature = "unstable-runtime-subscribe")] pub mod stream_subscribe;
mod watch_ext;

pub use backoff_reset_timer::ResetTimerBackoff;
pub use event_flatten::EventFlatten;
pub use stream_backoff::StreamBackoff;
#[cfg(feature = "unstable-runtime-subscribe")]
pub use stream_subscribe::StreamSubscribe;
pub use watch_ext::WatchStreamExt;

use futures::{
Expand Down
243 changes: 243 additions & 0 deletions kube-runtime/src/utils/stream_subscribe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
use core::{
pin::Pin,
task::{Context, Poll},
};
use futures::{stream, Stream};
use pin_project::pin_project;
use std::{fmt, sync::Arc};
use tokio::sync::{broadcast, broadcast::error::RecvError};

const CHANNEL_CAPACITY: usize = 128;

/// Exposes the [`StreamSubscribe::subscribe()`] method which allows additional
/// consumers of events from a stream without consuming the stream itself.
///
/// If a subscriber begins to lag behind the stream, it will receive an [`Error::Lagged`]
/// error. The subscriber can then decide to abort its task or tolerate the lost events.
///
/// If the [`Stream`] is dropped or ends, any [`StreamSubscribe::subscribe()`] streams
/// will also end.
///
/// ## Warning
///
/// If the primary [`Stream`] is not polled, the [`StreamSubscribe::subscribe()`] streams
/// will never receive any events.
#[pin_project]
#[must_use = "subscribers will not get events unless this stream is polled"]
pub struct StreamSubscribe<S>
where
S: Stream,
{
#[pin]
stream: S,
sender: broadcast::Sender<Option<Arc<S::Item>>>,
}

impl<S: Stream> StreamSubscribe<S> {
pub fn new(stream: S) -> Self {
let (sender, _) = broadcast::channel(CHANNEL_CAPACITY);

Self { stream, sender }
}

/// Subscribe to events from this stream
#[must_use = "streams do nothing unless polled"]
pub fn subscribe(&self) -> impl Stream<Item = Result<Arc<S::Item>, Error>> {
stream::unfold(self.sender.subscribe(), |mut rx| async {
match rx.recv().await {
Ok(Some(obj)) => Some((Ok(obj), rx)),
Err(RecvError::Lagged(amt)) => Some((Err(Error::Lagged(amt)), rx)),
_ => None,
}
})
}
}

impl<S: Stream> Stream for StreamSubscribe<S> {
type Item = Arc<S::Item>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let item = this.stream.poll_next(cx);

match item {
Poll::Ready(Some(item)) => {
let item = Arc::new(item);
this.sender.send(Some(item.clone())).ok();
Poll::Ready(Some(item))
}
Poll::Ready(None) => {
this.sender.send(None).ok();
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
}

/// An error returned from the inner stream of a [`StreamSubscribe`].
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum Error {
/// The subscriber lagged too far behind. Polling again will return
/// the oldest event still retained.
///
/// Includes the number of skipped events.
Lagged(u64),
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::Lagged(amt) => write!(f, "subscriber lagged by {amt}"),
}
}
}

impl std::error::Error for Error {}

#[cfg(test)]
mod tests {
use super::*;
use futures::{pin_mut, poll, stream, StreamExt};

#[tokio::test]
async fn stream_subscribe_continues_to_propagate_values() {
let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]);
let rx = StreamSubscribe::new(rx);

pin_mut!(rx);
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(0)))));
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(1)))));
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Err(2)))));
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(3)))));
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(4)))));
assert_eq!(poll!(rx.next()), Poll::Ready(None));
}

#[tokio::test]
async fn all_subscribers_get_events() {
let events = [Ok(0), Ok(1), Err(2), Ok(3), Ok(4)];
let rx = stream::iter(events);
let rx = StreamSubscribe::new(rx);

let rx_s1 = rx.subscribe();
let rx_s2 = rx.subscribe();

pin_mut!(rx);
pin_mut!(rx_s1);
pin_mut!(rx_s2);

// Subscribers are pending until we start consuming the stream
assert_eq!(poll!(rx_s1.next()), Poll::Pending, "rx_s1");
assert_eq!(poll!(rx_s2.next()), Poll::Pending, "rx_s2");

for item in events {
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(item))), "rx");
let expected = Poll::Ready(Some(Ok(Arc::new(item))));
assert_eq!(poll!(rx_s1.next()), expected, "rx_s1");
assert_eq!(poll!(rx_s2.next()), expected, "rx_s2");
}

// Ensure that if the stream is closed, all subscribers are closed
assert_eq!(poll!(rx.next()), Poll::Ready(None), "rx");
assert_eq!(poll!(rx_s1.next()), Poll::Ready(None), "rx_s1");
assert_eq!(poll!(rx_s2.next()), Poll::Ready(None), "rx_s2");
}

#[tokio::test]
async fn subscribers_can_catch_up_to_the_main_stream() {
let events = (0..CHANNEL_CAPACITY).map(Ok::<_, ()>).collect::<Vec<_>>();
let rx = stream::iter(events.clone());
let rx = StreamSubscribe::new(rx);

let rx_s1 = rx.subscribe();

pin_mut!(rx);
pin_mut!(rx_s1);

for item in events.clone() {
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(item))), "rx",);
}

for item in events {
assert_eq!(
poll!(rx_s1.next()),
Poll::Ready(Some(Ok(Arc::new(item)))),
"rx_s1"
);
}
}

#[tokio::test]
async fn if_the_subscribers_lag_they_get_a_lagged_error_as_the_next_event() {
// The broadcast channel rounds the capacity up to the next power of two.
let max_capacity = CHANNEL_CAPACITY.next_power_of_two();
let overflow = 5;
let events = (0..max_capacity + overflow).collect::<Vec<_>>();
let rx = stream::iter(events.clone());
let rx = StreamSubscribe::new(rx);

let rx_s1 = rx.subscribe();

pin_mut!(rx);
pin_mut!(rx_s1);

// Consume the entire stream, overflowing the inner channel
for _ in events {
rx.next().await;
}

assert_eq!(
poll!(rx_s1.next()),
Poll::Ready(Some(Err(Error::Lagged(overflow as u64)))),
);

let expected_next_event = overflow;
assert_eq!(
poll!(rx_s1.next()),
Poll::Ready(Some(Ok(Arc::new(expected_next_event)))),
);
}

#[tokio::test]
async fn a_lagging_subscriber_does_not_impact_a_well_behaved_subscriber() {
// The broadcast channel rounds the capacity up to the next power of two.
let max_capacity = CHANNEL_CAPACITY.next_power_of_two();
let overflow = 5;
let events = (0..max_capacity + overflow).collect::<Vec<_>>();
let rx = stream::iter(events.clone());
let rx = StreamSubscribe::new(rx);

let rx_s1 = rx.subscribe();
let rx_s2 = rx.subscribe();

pin_mut!(rx);
pin_mut!(rx_s1);
pin_mut!(rx_s2);

for event in events {
assert_eq!(poll!(rx_s1.next()), Poll::Pending, "rx_s1");

rx.next().await;

assert_eq!(
poll!(rx_s1.next()),
Poll::Ready(Some(Ok(Arc::new(event)))),
"rx_s1"
);
}

assert_eq!(
poll!(rx_s2.next()),
Poll::Ready(Some(Err(Error::Lagged(overflow as u64)))),
"rx_s2"
);

let expected_next_event = overflow;
assert_eq!(
poll!(rx_s2.next()),
Poll::Ready(Some(Ok(Arc::new(expected_next_event)))),
"rx_s2"
);
}
}
67 changes: 66 additions & 1 deletion kube-runtime/src/utils/watch_ext.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#[cfg(feature = "unstable-runtime-subscribe")]
use crate::utils::stream_subscribe::StreamSubscribe;
use crate::{
utils::{event_flatten::EventFlatten, stream_backoff::StreamBackoff},
watcher,
};
use backoff::backoff::Backoff;

use futures::{Stream, TryStream};

/// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector)
Expand Down Expand Up @@ -36,5 +37,69 @@ pub trait WatchStreamExt: Stream {
{
EventFlatten::new(self, true)
}

/// Create a [`StreamSubscribe`] from a [`watcher()`] stream.
///
/// The [`StreamSubscribe::subscribe()`] method which allows additional consumers
/// of events from a stream without consuming the stream itself.
///
/// If a subscriber begins to lag behind the stream, it will receive an [`Error::Lagged`]
/// error. The subscriber can then decide to abort its task or tolerate the lost events.
///
/// If the [`Stream`] is dropped or ends, any [`StreamSubscribe::subscribe()`] streams
/// will also end.
///
/// ## Warning
///
/// If the primary [`Stream`] is not polled, the [`StreamSubscribe::subscribe()`] streams
/// will never receive any events.
///
/// # Usage
///
/// ```
/// use futures::{Stream, StreamExt};
/// use std::{fmt::Debug, sync::Arc};
/// use kube_runtime::{watcher, WatchStreamExt};
///
/// fn explain_events<K, S>(
/// stream: S,
/// ) -> (
/// impl Stream<Item = Arc<Result<watcher::Event<K>, watcher::Error>>> + Send + Sized + 'static,
/// impl Stream<Item = String> + Send + Sized + 'static,
/// )
/// where
/// K: Debug + Send + Sync + 'static,
/// S: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Send + Sized + 'static,
/// {
/// // Create a stream that can be subscribed to
/// let stream_subscribe = stream.stream_subscribe();
/// // Create a subscription to that stream
/// let subscription = stream_subscribe.subscribe();
///
/// // Create a stream of descriptions of the events
/// let explain_stream = subscription.filter_map(|event| async move {
/// // We don't care about lagged events so we can throw that error away
/// match event.ok()?.as_ref() {
/// Ok(watcher::Event::Applied(event)) => {
/// Some(format!("An object was added or modified: {event:?}"))
/// }
/// Ok(_) => todo!("explain other events"),
/// // We don't care about watcher errors either
/// Err(_) => None,
/// }
/// });
///
/// // We now still have the original stream, and a secondary stream of explanations
/// (stream_subscribe, explain_stream)
/// }
/// ```
#[cfg(feature = "unstable-runtime-subscribe")]
fn stream_subscribe<K>(self) -> StreamSubscribe<Self>
where
Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Send + Sized + 'static,
{
StreamSubscribe::new(self)
}
}

impl<St: ?Sized> WatchStreamExt for St where St: Stream {}
1 change: 1 addition & 0 deletions kube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ admission = ["kube-core/admission"]
derive = ["kube-derive", "kube-core/schema"]
config = ["kube-client/config"]
runtime = ["kube-runtime"]
unstable-runtime = ["kube-runtime/unstable-runtime"]

[package.metadata.docs.rs]
features = ["client", "rustls-tls", "openssl-tls", "derive", "ws", "oauth", "jsonpatch", "admission", "runtime", "k8s-openapi/v1_26"]
Expand Down

0 comments on commit 537c34b

Please sign in to comment.