Skip to content

Commit

Permalink
Extend kubectl with apply method
Browse files Browse the repository at this point in the history
Can apply yaml from multidoc files using helpers from #896

Signed-off-by: clux <sszynrae@gmail.com>
  • Loading branch information
clux committed May 4, 2022
1 parent d1d7e8d commit d475f1d
Showing 1 changed file with 112 additions and 80 deletions.
192 changes: 112 additions & 80 deletions examples/kubectl.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
//! This is a simple imitation of the basic functionality of kubectl
//! Supports kubectl {get, delete, watch} <resource> [name] (name optional) with labels and namespace selectors
use anyhow::{bail, Context, Result};
use clap::Parser;
use either::Either;
//! Supports kubectl {get, delete, apply, watch} <resource> [name] (name optional) with labels and namespace selectors
use anyhow::{Context, Result};
use futures::{StreamExt, TryStreamExt};
use k8s_openapi::chrono::{Duration, Utc};
use kube::{
api::{Api, DynamicObject, ListParams, ObjectMeta, Resource, ResourceExt},
api::{Api, DynamicObject, ListParams, ObjectMeta, Patch, PatchParams, Resource, ResourceExt},
core::GroupVersionKind,
discovery::{ApiCapabilities, ApiResource, Discovery, Scope},
runtime::{
utils::try_flatten_applied,
Expand All @@ -15,19 +14,22 @@ use kube::{
},
Client,
};
use tracing::*;

#[derive(clap::Parser)]
struct Opts {
#[clap(long, short, arg_enum, default_value_t)]
output: OutputMode,
#[clap(long, short)]
file: Option<std::path::PathBuf>,
#[clap(long, short = 'l')]
selector: Option<String>,
#[clap(long, short)]
namespace: Option<String>,
#[clap(long, short = 'A')]
all: bool,
verb: String,
resource: String,
resource: Option<String>,
name: Option<String>,
}

Expand Down Expand Up @@ -72,102 +74,123 @@ async fn main() -> Result<()> {
// 1. arg parsing
let Opts {
output,
file,
selector,
namespace,
all,
verb,
resource,
name,
} = Opts::parse();
let mut lp = ListParams::default();
if let Some(label) = selector {
lp = lp.labels(&label);
}
} = clap::Parser::parse();

// 2. discovery (to be able to infer apis from kind/plural only)
// discovery (to be able to infer apis from kind/plural only)
let discovery = Discovery::new(client.clone()).run().await?;
let (ar, caps) = resolve_api_resource(&discovery, &resource)
.with_context(|| format!("resource {:?} not found in cluster", resource))?;

// 3. capability sanity checks and verb -> cap remapping
let cap = if verb == "get" && name.is_none() {
"list".into()
} else if verb == "apply" {
"patch".into()
} else {
verb.clone() // normally the colloquial verb matches the capability verb
};
if !caps.supports_operation(&cap) {
//tracing::warn!("supported verbs: {:?}", caps.operations);
bail!("resource {:?} does not support verb {:?}", resource, cap);
}

// 4. create an Api based on parsed parameters
let api: Api<DynamicObject> = if caps.scope == Scope::Namespaced {
if all {
Api::all_with(client.clone(), &ar)
} else if let Some(ns) = namespace {
Api::namespaced_with(client.clone(), &ns, &ar)
} else {
Api::default_namespaced_with(client.clone(), &ar)
// specialized handling for apply (can handle multiple resources)
if verb == "apply" {
let ssapply = PatchParams::apply("kubectl-light").force();
if let Some(pth) = file {
let yaml =
std::fs::read_to_string(&pth).with_context(|| format!("Failed to read {}", pth.display()))?;
for doc in multidoc_deserialize(&yaml)? {
let yaml_name = doc.get("metadata").map(|m| m.get("name").map(|v| v.as_str()));
let gvk = GroupVersionKind::from_yaml(&doc)?;
if let Some((ar, caps)) = discovery.resolve_gvk(&gvk) {
if let Some(Some(Some(name))) = yaml_name {
let api = dynamic_api(ar, caps, client.clone(), &namespace, false);
trace!("Applying {}: \n{}", gvk.kind, serde_yaml::to_string(&doc)?);
let data: serde_json::Value = serde_json::to_value(&doc)?;
let r = api.patch(name, &ssapply, &Patch::Apply(data)).await?;
info!("applied {:?}", r);
}
} else {
warn!("Cannot apply document for unknown {:?}", gvk);
}
}
}
} else {
Api::all_with(client.clone(), &ar)
};
} else if let Some(resource) = resource {
// common getters that all use the same apisesource via
let (ar, caps) = resolve_api_resource(&discovery, &resource)
.with_context(|| format!("resource {:?} not found in cluster", resource))?;

// 5. specialized handling for each verb (but resource agnostic)
tracing::info!(?verb, ?resource, name = ?name.clone().unwrap_or_default(), "requested objects");
if verb == "get" {
let mut result: Vec<_> = if let Some(n) = &name {
vec![api.get(n).await?]
} else {
api.list(&lp).await?.items
};
for x in &mut result {
x.metadata.managed_fields = None; // hide managed fields by default
let mut lp = ListParams::default();
if let Some(label) = selector {
lp = lp.labels(&label);
}

match output {
OutputMode::Yaml => println!("{}", serde_yaml::to_string(&result)?),
OutputMode::Pretty => {
// Display style; size colums according to biggest name
let max_name = result.iter().map(|x| x.name().len() + 2).max().unwrap_or(63);
println!("{0:<width$} {1:<20}", "NAME", "AGE", width = max_name);
for inst in result {
let age = format_creation_since(inst.meta());
println!("{0:<width$} {1:<20}", inst.name(), age, width = max_name);
// 4. create an Api based on parsed parameters
let api = dynamic_api(ar, caps, client.clone(), &namespace, all);

tracing::info!(?verb, ?resource, name = ?name.clone().unwrap_or_default(), "requested objects");
if verb == "get" {
let mut result: Vec<_> = if let Some(n) = &name {
vec![api.get(n).await?]
} else {
api.list(&lp).await?.items
};
for x in &mut result {
x.metadata.managed_fields = None; // hide managed fields by default
}

match output {
OutputMode::Yaml => println!("{}", serde_yaml::to_string(&result)?),
OutputMode::Pretty => {
// Display style; size colums according to biggest name
let max_name = result.iter().map(|x| x.name().len() + 2).max().unwrap_or(63);
println!("{0:<width$} {1:<20}", "NAME", "AGE", width = max_name);
for inst in result {
let age = format_creation_since(inst.meta());
println!("{0:<width$} {1:<20}", inst.name(), age, width = max_name);
}
}
}
}
} else if verb == "delete" {
if let Some(n) = &name {
if let Either::Left(pdel) = api.delete(n, &Default::default()).await? {
// await delete before returning
await_condition(api, n, is_deleted(&pdel.uid().unwrap())).await?
} else if verb == "delete" {
if let Some(n) = &name {
if let either::Either::Left(pdel) = api.delete(n, &Default::default()).await? {
// await delete before returning
await_condition(api, n, is_deleted(&pdel.uid().unwrap())).await?
}
} else {
api.delete_collection(&Default::default(), &lp).await?;
}
} else {
api.delete_collection(&Default::default(), &lp).await?;
}
} else if verb == "watch" {
let w = if let Some(n) = &name {
lp = lp.fields(&format!("metadata.name={}", n));
watcher(api, lp) // NB: keeps watching even if object dies
} else {
watcher(api, lp)
};
} else if verb == "watch" {
if let Some(n) = &name {
lp = lp.fields(&format!("metadata.name={}", n));
}
let w = watcher(api, lp);

// present a dumb table for it for now. maybe drop the whole watch. kubectl does not do it anymore.
let mut stream = try_flatten_applied(w).boxed();
println!("{0:<width$} {1:<20}", "NAME", "AGE", width = 63);
while let Some(inst) = stream.try_next().await? {
let age = format_creation_since(inst.meta());
println!("{0:<width$} {1:<20}", inst.name(), age, width = 63);
// present a dumb table for it for now. maybe drop the whole watch. kubectl does not do it anymore.
let mut stream = try_flatten_applied(w).boxed();
println!("{0:<width$} {1:<20}", "NAME", "AGE", width = 63);
while let Some(inst) = stream.try_next().await? {
let age = format_creation_since(inst.meta());
println!("{0:<width$} {1:<20}", inst.name(), age, width = 63);
}
}
}

Ok(())
}

fn dynamic_api(
ar: ApiResource,
caps: ApiCapabilities,
client: Client,
ns: &Option<String>,
all: bool,
) -> Api<DynamicObject> {
if caps.scope == Scope::Namespaced {
if all {
Api::all_with(client, &ar)
} else if let Some(namespace) = ns {
Api::namespaced_with(client, namespace, &ar)
} else {
Api::default_namespaced_with(client, &ar)
}
} else {
Api::all_with(client, &ar)
}
}

fn format_creation_since(meta: &ObjectMeta) -> String {
let ts = meta.creation_timestamp.clone().unwrap().0;
let age = Utc::now().signed_duration_since(ts);
Expand All @@ -185,3 +208,12 @@ fn format_duration(dur: Duration) -> String {
format!("{}m", mins)
}
}

pub fn multidoc_deserialize(data: &str) -> Result<Vec<serde_yaml::Value>> {
use serde::Deserialize;
let mut docs = vec![];
for de in serde_yaml::Deserializer::from_str(data) {
docs.push(serde_yaml::Value::deserialize(de)?);
}
Ok(docs)
}

0 comments on commit d475f1d

Please sign in to comment.