Skip to content

Commit

Permalink
convert Reflector to use Informer internally + self-drive
Browse files Browse the repository at this point in the history
fixes #151 and fixes #152 with tokio::signal and a futures::select call.
drops a lot of Reflector logic to defer to Informer.

Seems to work pretty well, but may need to deal with desyncs better.
Ran into the unordered resourceVersion issue on a really old namespace.
Need to double check if that can happen with old version.
  • Loading branch information
clux committed Apr 6, 2020
1 parent 3dceef4 commit 7f82abb
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 194 deletions.
24 changes: 11 additions & 13 deletions kube/examples/configmap_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,17 @@ async fn main() -> anyhow::Result<()> {

let cms: Api<ConfigMap> = Api::namespaced(client, &namespace);
let lp = ListParams::default().timeout(10); // short watch timeout in this example
let rf = Reflector::new(cms, lp).init().await?;
let rf = Reflector::new(cms).params(lp);
let runner = rf.clone().run();

// Can read initial state now:
rf.state().await?.into_iter().for_each(|cm| {
info!("Found configmap {} with data: {:?}", Meta::name(&cm), cm.data);
tokio::spawn(async move {
loop {
// Periodically read our state
tokio::time::delay_for(std::time::Duration::from_secs(5)).await;
let pods: Vec<_> = rf.state().await.unwrap().iter().map(Meta::name).collect();
info!("Current configmaps: {:?}", pods);
}
});

loop {
// Update internal state by calling watch (waits the full timeout)
rf.poll().await?; // ideally call this from a thread/task

// up to date state:
let pods: Vec<_> = rf.state().await?.iter().map(Meta::name).collect();
info!("Current configmaps: {:?}", pods);
}
runner.await?;
Ok(())
}
38 changes: 11 additions & 27 deletions kube/examples/deployment_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,17 @@ async fn main() -> anyhow::Result<()> {

let deploys: Api<Deployment> = Api::namespaced(client, &namespace);
let lp = ListParams::default().timeout(10); // short watch timeout in this example
let rf = Reflector::new(deploys, lp).init().await?;
let rf = Reflector::new(deploys).params(lp);
let runner = rf.clone().run();

// rf is initialized with full state, which can be extracted on demand.
// Output is an owned Vec<Deployment>
rf.state().await?.into_iter().for_each(|d| {
info!(
"Found deployment for {} - {} replicas running {:?}",
Meta::name(&d),
d.status.unwrap().replicas.unwrap(),
d.spec
.unwrap()
.template
.spec
.unwrap()
.containers
.into_iter()
.map(|c| c.image.unwrap())
.collect::<Vec<_>>()
);
tokio::spawn(async move {
loop {
// Periodically read our state
tokio::time::delay_for(std::time::Duration::from_secs(5)).await;
let deploys: Vec<_> = rf.state().await.unwrap().iter().map(Meta::name).collect();
info!("Current deploys: {:?}", deploys);
}
});

loop {
// Update internal state by calling watch (waits the full timeout)
rf.poll().await?;

// Read the updated internal state (instant):
let deploys: Vec<_> = rf.state().await?.iter().map(Meta::name).collect();
info!("Current deploys: {:?}", deploys);
}
runner.await?;
Ok(())
}
2 changes: 1 addition & 1 deletion kube/examples/pod_informer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main() -> anyhow::Result<()> {
let namespace = env::var("NAMESPACE").unwrap_or("default".into());

let pods: Api<Pod> = Api::namespaced(client, &namespace);
let inf = Informer::new(pods, ListParams::default());
let inf = Informer::new(pods).params(ListParams::default().timeout(10));

loop {
let mut pods = inf.poll().await?.boxed();
Expand Down
4 changes: 4 additions & 0 deletions kube/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ pub enum Error {
#[error("Invalid API method {0}")]
InvalidMethod(String),

/// Runtime reached an irrecoverable state
#[error("Runtime Error {0}")]
RuntimeError(String),

/// A request validation failed
#[error("Request validation failed with {0}")]
RequestValidation(String),
Expand Down
61 changes: 40 additions & 21 deletions kube/src/runtime/informer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,31 @@ impl<K> Informer<K>
where
K: Clone + DeserializeOwned + Meta,
{
/// Create an informer on an api resource with a set of parameters
pub fn new(api: Api<K>, lp: ListParams) -> Self {
/// Create an informer on an api resource
pub fn new(api: Api<K>) -> Self {
Informer {
api,
params: lp,
params: ListParams::default(),
version: Arc::new(Mutex::new(0.to_string())),
needs_resync: Arc::new(Mutex::new(false)),
}
}

/// Initialize from a prior version
/// Modify the default watch parameters for the underlying watch
pub fn params(mut self, lp: ListParams) -> Self {
self.params = lp;
self
}

/// Override the version to an externally tracked version
///
/// Prefer not using this. Even if you track previous resource versions,
/// you will miss deleted events if you have any downtime.
///
/// Controllers/finalizers/ownerReferences are the preferred ways
/// to garbage collect related resources.
pub fn init_from(self, v: String) -> Self {
info!("Recreating Informer for {} at {}", self.api.resource.kind, v);
pub fn set_version(self, v: String) -> Self {
debug!("Setting Informer version for {} to {}", self.api.resource.kind, v);

// We need to block on this as our mutex needs go be async compatible
futures::executor::block_on(async {
Expand All @@ -62,6 +68,20 @@ where
self
}

/// Reset the resourceVersion to 0
///
/// This will trigger new Added events for all existing resources
pub async fn reset(&self) {
*self.version.lock().await = 0.to_string();
}

/// Return the current version
pub fn version(&self) -> String {
// We need to block on a future here quickly
// to get a lock on our version
futures::executor::block_on(async { self.version.lock().await.clone() })
}

/// Start a single watch stream
///
/// Opens a long polling GET and returns a stream of WatchEvents.
Expand Down Expand Up @@ -92,6 +112,8 @@ where

// Clone Arcs for stream handling
let version = self.version.clone();
let origin = self.version.lock().await.clone();
info!("poll start at {}", origin);
let needs_resync = self.needs_resync.clone();

// Start watching from our previous watch point
Expand All @@ -104,12 +126,23 @@ where
let needs_resync = needs_resync.clone();
let version = version.clone();
async move {
let current = version.lock().await.clone();
// Check if we need to update our version based on the incoming events
match &event {
Ok(WatchEvent::Added(o)) | Ok(WatchEvent::Modified(o)) | Ok(WatchEvent::Deleted(o)) => {
// always store the last seen resourceVersion
if let Some(nv) = Meta::resource_ver(o) {
*version.lock().await = nv.clone();
use std::str::FromStr;
let u = if let (Ok(nvu), Ok(cu)) = (u32::from_str(&nv), u32::from_str(&current)) {
// actually parse int because k8s does not keep its contract
// https://github.com/kubernetes-client/python/issues/819
std::cmp::max(nvu, cu).to_string()
} else {
// recommended solution - treat resourceVersion as opaque string
nv.clone()
};
info!("updating informer version to: {} (got {})", u, nv);
*version.lock().await = u;
}
}
Ok(WatchEvent::Error(e)) => {
Expand All @@ -132,18 +165,4 @@ where
});
Ok(newstream)
}

/// Reset the resourceVersion to 0
///
/// This will trigger Added events for all existing resources
pub async fn reset(&self) {
*self.version.lock().await = 0.to_string();
}

/// Return the current version
pub fn version(&self) -> String {
// We need to block on a future here quickly
// to get a lock on our version
futures::executor::block_on(async { self.version.lock().await.clone() })
}
}
Loading

0 comments on commit 7f82abb

Please sign in to comment.