Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add port forward #446

Merged
merged 9 commits into from
Jan 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,18 @@ path = "pod_evict.rs"
name = "pod_shell"
path = "pod_shell.rs"

[[example]]
name = "pod_portforward"
path = "pod_portforward.rs"

[[example]]
name = "pod_portforward_hyper_http"
path = "pod_portforward_hyper_http.rs"

[[example]]
name = "pod_portforward_bind"
path = "pod_portforward_bind.rs"

[[example]]
name = "pod_reflector"
path = "pod_reflector.rs"
Expand Down
65 changes: 65 additions & 0 deletions examples/pod_portforward.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use futures::StreamExt;
use k8s_openapi::api::core::v1::Pod;

use kube::{
api::{Api, DeleteParams, PostParams},
runtime::wait::{await_condition, conditions::is_pod_running},
Client, ResourceExt,
};

use tokio::io::AsyncWriteExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=debug");
env_logger::init();
let client = Client::try_default().await?;
let namespace = std::env::var("NAMESPACE").unwrap_or_else(|_| "default".into());

let p: Pod = serde_json::from_value(serde_json::json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": { "name": "example" },
"spec": {
"containers": [{
"name": "nginx",
"image": "nginx",
}],
}
}))?;

let pods: Api<Pod> = Api::namespaced(client, &namespace);
// Stop on error including a pod already exists or is still being deleted.
pods.create(&PostParams::default(), &p).await?;

// Wait until the pod is running, otherwise we get 500 error.
let running = await_condition(pods.clone(), "example", is_pod_running());
let _ = tokio::time::timeout(std::time::Duration::from_secs(15), running).await?;

let mut pf = pods.portforward("example", &[80]).await?;
let ports = pf.ports();
let mut port = ports[0].stream().unwrap();
port.write_all(b"GET / HTTP/1.1\r\nHost: 127.0.0.1\r\nConnection: close\r\nAccept: */*\r\n\r\n")
.await?;
let mut rstream = tokio_util::io::ReaderStream::new(port);
if let Some(res) = rstream.next().await {
match res {
Ok(bytes) => {
let response = std::str::from_utf8(&bytes[..]).unwrap();
println!("{}", response);
assert!(response.contains("Welcome to nginx!"));
}
Err(err) => eprintln!("{:?}", err),
}
}

// Delete it
println!("deleting");
pods.delete("example", &DeleteParams::default())
.await?
.map_left(|pdel| {
assert_eq!(pdel.name(), "example");
});

Ok(())
}
113 changes: 113 additions & 0 deletions examples/pod_portforward_bind.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Example to listen on port 8080 locally, forwarding to port 80 in the example pod.
// Similar to `kubectl port-forward pod/example 8080:80`.
use std::{convert::Infallible, net::SocketAddr, sync::Arc};

use futures::FutureExt;
use hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server,
};
use tokio::sync::Mutex;
use tower::ServiceExt;

use k8s_openapi::api::core::v1::Pod;
use kube::{
api::{Api, DeleteParams, PostParams},
runtime::wait::{await_condition, conditions::is_pod_running},
Client, ResourceExt,
};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=debug");
env_logger::init();
let client = Client::try_default().await?;
let namespace = std::env::var("NAMESPACE").unwrap_or_else(|_| "default".into());

let p: Pod = serde_json::from_value(serde_json::json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": { "name": "example" },
"spec": {
"containers": [{
"name": "nginx",
"image": "nginx",
}],
}
}))?;

let pods: Api<Pod> = Api::namespaced(client, &namespace);
// Stop on error including a pod already exists or is still being deleted.
pods.create(&PostParams::default(), &p).await?;

// Wait until the pod is running, otherwise we get 500 error.
let running = await_condition(pods.clone(), "example", is_pod_running());
let _ = tokio::time::timeout(std::time::Duration::from_secs(30), running).await?;

// Get `Portforwarder` that handles the WebSocket connection.
// There's no need to spawn a task to drive this, but it can be awaited to be notified on error.
let mut forwarder = pods.portforward("example", &[80]).await?;
let port = forwarder.ports()[0].stream().unwrap();

// let hyper drive the HTTP state in our DuplexStream via a task
let (sender, connection) = hyper::client::conn::handshake(port).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
log::error!("error in connection: {}", e);
}
});
// The following task is only used to show any error from the forwarder.
// This example can be stopped with Ctrl-C if anything happens.
tokio::spawn(async move {
if let Err(e) = forwarder.await {
log::error!("forwarder errored: {}", e);
}
});

// Shared `SendRequest<Body>` to relay the request.
let context = Arc::new(Mutex::new(sender));
let make_service = make_service_fn(move |_conn| {
let context = context.clone();
let service = service_fn(move |req| handle(context.clone(), req));
async move { Ok::<_, Infallible>(service) }
});
Comment on lines +68 to +73
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool quick use of tower!


let (tx, rx) = tokio::sync::oneshot::channel::<()>();
let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
let server = Server::bind(&addr)
.serve(make_service)
.with_graceful_shutdown(async {
rx.await.ok();
});
println!("Forwarding http://{} to port 80 in the pod", addr);
println!("Try opening http://{0} in a browser, or `curl http://{0}`", addr);
println!("Use Ctrl-C to stop the server and delete the pod");
// Stop the server and delete the pod on Ctrl-C.
tokio::spawn(async move {
tokio::signal::ctrl_c().map(|_| ()).await;
log::info!("stopping the server");
let _ = tx.send(());
});
if let Err(e) = server.await {
log::error!("server error: {}", e);
}

log::info!("deleting the pod");
pods.delete("example", &DeleteParams::default())
.await?
.map_left(|pdel| {
assert_eq!(pdel.name(), "example");
});

Ok(())
}

// Simply forwards the request to the port through the shared `SendRequest<Body>`.
async fn handle(
context: Arc<Mutex<hyper::client::conn::SendRequest<hyper::Body>>>,
req: Request<Body>,
) -> Result<Response<Body>, Infallible> {
let mut sender = context.lock().await;
let response = sender.ready().await.unwrap().send_request(req).await.unwrap();
Ok(response)
}
74 changes: 74 additions & 0 deletions examples/pod_portforward_hyper_http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use k8s_openapi::api::core::v1::Pod;

use kube::{
api::{Api, DeleteParams, PostParams},
runtime::wait::{await_condition, conditions::is_pod_running},
Client, ResourceExt,
};

use hyper::{body, Body, Request};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=debug");
env_logger::init();
let client = Client::try_default().await?;
let namespace = std::env::var("NAMESPACE").unwrap_or_else(|_| "default".into());

let p: Pod = serde_json::from_value(serde_json::json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": { "name": "example" },
"spec": {
"containers": [{
"name": "nginx",
"image": "nginx",
}],
}
}))?;

let pods: Api<Pod> = Api::namespaced(client, &namespace);
// Stop on error including a pod already exists or is still being deleted.
pods.create(&PostParams::default(), &p).await?;

// Wait until the pod is running, otherwise we get 500 error.
let running = await_condition(pods.clone(), "example", is_pod_running());
let _ = tokio::time::timeout(std::time::Duration::from_secs(15), running).await?;

let mut pf = pods.portforward("example", &[80]).await?;
let ports = pf.ports();
let port = ports[0].stream().unwrap();

// let hyper drive the HTTP state in our DuplexStream via a task
let (mut sender, connection) = hyper::client::conn::handshake(port).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Error in connection: {}", e);
}
});

let http_req = Request::builder()
.uri("/")
.header("Connection", "close")
.header("Host", "127.0.0.1")
.method("GET")
.body(Body::from(""))
.unwrap();

let (parts, body) = sender.send_request(http_req).await?.into_parts();
assert!(parts.status == 200);

let body_bytes = body::to_bytes(body).await?;
let body_str = std::str::from_utf8(&body_bytes)?;
assert!(body_str.contains("Welcome to nginx!"));

// Delete it
println!("deleting");
pods.delete("example", &DeleteParams::default())
.await?
.map_left(|pdel| {
assert_eq!(pdel.name(), "example");
});

Ok(())
}
4 changes: 3 additions & 1 deletion kube-client/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
mod core_methods;
#[cfg(feature = "ws")] mod remote_command;
#[cfg(feature = "ws")] pub use remote_command::AttachedProcess;
#[cfg(feature = "ws")] mod portforward;
#[cfg(feature = "ws")] pub use portforward::Portforwarder;

mod subresource;
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
pub use subresource::{Attach, AttachParams, Execute};
pub use subresource::{Attach, AttachParams, Execute, Portforward};
pub use subresource::{Evict, EvictParams, Log, LogParams, ScaleSpec, ScaleStatus};

mod util;
Expand Down
Loading