Skip to content

Commit

Permalink
Add /exec subresource
Browse files Browse the repository at this point in the history
  • Loading branch information
kazk committed Dec 30, 2020
1 parent 8354966 commit 5dfff99
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 1 deletion.
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ path = "pod_api.rs"
name = "pod_attach"
path = "pod_attach.rs"

[[example]]
name = "pod_exec"
path = "pod_exec.rs"

[[example]]
name = "proxy"
path = "proxy.rs"
Expand Down
150 changes: 150 additions & 0 deletions examples/pod_exec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#[macro_use]
extern crate log;

use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::Pod;

use kube::{
api::{Api, AttachedProcess, DeleteParams, ExecParams, ListParams, Meta, PostParams, WatchEvent},
Client,
};
use tokio::io::AsyncWriteExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=debug");
env_logger::init();
let client = Client::try_default().await?;
let namespace = std::env::var("NAMESPACE").unwrap_or_else(|_| "default".into());

let p: Pod = serde_json::from_value(serde_json::json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": { "name": "example" },
"spec": {
"containers": [{
"name": "example",
"image": "alpine",
// Do nothing
"command": ["tail", "-f", "/dev/null"],
}],
}
}))?;

let pods: Api<Pod> = Api::namespaced(client, &namespace);
// Stop on error including a pod already exists or is still being deleted.
pods.create(&PostParams::default(), &p).await?;

// Wait until the pod is running, otherwise we get 500 error.
let lp = ListParams::default().fields("metadata.name=example").timeout(10);
let mut stream = pods.watch(&lp, "0").await?.boxed();
while let Some(status) = stream.try_next().await? {
match status {
WatchEvent::Added(o) => {
info!("Added {}", Meta::name(&o));
}
WatchEvent::Modified(o) => {
let s = o.status.as_ref().expect("status exists on pod");
if s.phase.clone().unwrap_or_default() == "Running" {
info!("Ready to attach to {}", Meta::name(&o));
break;
}
}
_ => {}
}
}

// These examples are mostly taken from Python client's integration tests.
{
let attached = pods
.exec(
"example",
&ExecParams {
command: Some(
vec!["sh", "-c", "for i in $(seq 1 3); do date; done"]
.into_iter()
.map(|s| s.to_owned())
.collect::<Vec<_>>(),
),
stderr: false,
..ExecParams::default()
},
)
.await?;
let output = get_output(attached).await;
println!("{}", output);
assert_eq!(output.lines().count(), 3);
}

{
let attached = pods
.exec(
"example",
&ExecParams {
command: Some(vec!["uptime".to_owned()]),
stderr: false,
..ExecParams::default()
},
)
.await?;
let output = get_output(attached).await;
println!("{}", output);
assert_eq!(output.lines().count(), 1);
}

// Stdin example
{
let mut attached = pods
.exec(
"example",
&ExecParams {
command: Some(vec!["sh".to_owned()]),
stdin: true,
stdout: true,
stderr: false,
..ExecParams::default()
},
)
.await?;
let mut stdin_writer = attached.stdin().take().unwrap();
let mut stdout_stream = attached.stdout().take().unwrap();
let next_stdout = stdout_stream.next();
stdin_writer.write(b"echo test string 1\n").await?;
let stdout = String::from_utf8(next_stdout.await.unwrap().ok().unwrap())
.ok()
.unwrap();
println!("{}", stdout);
assert_eq!(stdout, "test string 1\n");

// AttachedProcess resolves with status object.
// Send `exit 1` to get a failure status.
stdin_writer.write(b"exit 1\n").await?;
if let Some(status) = attached.await {
println!("{:?}", status);
assert_eq!(status.status, Some("Failure".to_owned()));
assert_eq!(status.reason, Some("NonZeroExitCode".to_owned()));
}
}

// Delete it
pods.delete("example", &DeleteParams::default())
.await?
.map_left(|pdel| {
assert_eq!(Meta::name(&pdel), "example");
});

Ok(())
}

async fn get_output(mut attached: AttachedProcess) -> String {
let out = attached
.stdout()
.take()
.unwrap()
.filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v).ok()) })
.collect::<Vec<_>>()
.await
.join("");
attached.await;
out
}
2 changes: 1 addition & 1 deletion kube/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub use streaming::AttachedProcess;

mod subresource;
#[cfg(feature = "ws")]
pub use subresource::{AttachParams, AttachingObject};
pub use subresource::{AttachParams, AttachingObject, ExecParams, ExecutingObject};
pub use subresource::{LogParams, LoggingObject, ScaleSpec, ScaleStatus};

pub(crate) mod object;
Expand Down
91 changes: 91 additions & 0 deletions kube/src/api/subresource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,94 @@ where
Ok(AttachedProcess::new(stream, ap.stdin, ap.stdout, ap.stderr))
}
}

// ----------------------------------------------------------------------------
// Exec subresource
// ----------------------------------------------------------------------------
/// Params for execution
///
/// Note that the server rejects when none of `stdin`, `stdout`, `stderr` are attached.
#[cfg(feature = "ws")]
pub struct ExecParams {
/// The container in which to execute the command.
/// Defaults to only container if there is only one container in the pod.
pub container: Option<String>,
/// Command to execute.
pub command: Option<Vec<String>>,
/// If `true`, the contents will be redirected to the standard input stream of the pod. Defaults to `false`.
pub stdin: bool,
/// If `true`, the standard out stream of the pod will be redirected to it. Defaults to `true`.
pub stdout: bool,
/// If `true`, the standard error stream of the pod will be redirected to it. Defaults to `true`.
pub stderr: bool,
/// If `true`, TTY will be allocated for the attach call. Defaults to `false`.
pub tty: bool,
}

#[cfg(feature = "ws")]
impl Default for ExecParams {
// Default matching the server's defaults.
fn default() -> Self {
Self {
container: None,
command: None,
stdin: false,
stdout: true,
stderr: true,
tty: false,
}
}
}

#[cfg(feature = "ws")]
impl Resource {
/// Execute command in a pod
pub fn exec(&self, name: &str, ep: &ExecParams) -> Result<http::Request<()>> {
let base_url = self.make_url() + "/" + name + "/" + "exec?";
let mut qp = url::form_urlencoded::Serializer::new(base_url);

if ep.stdin {
qp.append_pair("stdin", "true");
}
if ep.stdout {
qp.append_pair("stdout", "true");
}
if ep.stderr {
qp.append_pair("stderr", "true");
}
if ep.tty {
qp.append_pair("tty", "true");
}
if let Some(container) = &ep.container {
qp.append_pair("container", &container);
}
if let Some(command) = &ep.command {
for c in command.iter() {
qp.append_pair("command", &c);
}
}

let req = http::Request::get(qp.finish());
req.body(()).map_err(Error::HttpError)
}
}

/// Marker trait for objects that has exec
#[cfg(feature = "ws")]
pub trait ExecutingObject {}

#[cfg(feature = "ws")]
impl ExecutingObject for k8s_openapi::api::core::v1::Pod {}

#[cfg(feature = "ws")]
impl<K> Api<K>
where
K: Clone + DeserializeOwned + ExecutingObject,
{
/// Execute a command in a pod
pub async fn exec(&self, name: &str, ep: &ExecParams) -> Result<AttachedProcess> {
let req = self.resource.exec(name, ep)?;
let stream = self.client.connect(req).await?;
Ok(AttachedProcess::new(stream, ep.stdin, ep.stdout, ep.stderr))
}
}

0 comments on commit 5dfff99

Please sign in to comment.