diff --git a/kube/Cargo.toml b/kube/Cargo.toml index 70078eeef..cbfe4aaaf 100644 --- a/kube/Cargo.toml +++ b/kube/Cargo.toml @@ -16,8 +16,8 @@ edition = "2018" [features] default = ["native-tls"] -native-tls = ["openssl", "reqwest/native-tls"] -rustls-tls = ["rustls", "reqwest/rustls-tls"] +native-tls = ["openssl", "reqwest/native-tls", "real-native-tls", "tokio-native-tls", "async-tungstenite/tokio-native-tls"] +rustls-tls = ["rustls", "reqwest/rustls-tls", "tokio-rustls", "async-tungstenite/tokio-rustls"] derive = ["kube-derive"] [package.metadata.docs.rs] @@ -41,6 +41,9 @@ futures = "0.3.5" pem = "0.8.1" openssl = { version = "0.10.30", optional = true } rustls = { version = "0.18.1", optional = true } +real-native-tls = { version = "0.2", optional = true, package = "native-tls" } +tokio-native-tls = { version = "0.1", optional = true } +tokio-rustls = { version = "0.14.1", optional = true } bytes = "0.5.6" Inflector = "0.11.4" tokio = { version = "0.2.22", features = ["time", "signal", "sync"] } @@ -48,6 +51,11 @@ static_assertions = "1.1.0" kube-derive = { path = "../kube-derive", version = "^0.44.0", optional = true } jsonpath_lib = "0.2.5" +[dependencies.async-tungstenite] +version = "0.9.3" +default-features = false +features = ["tokio-runtime"] + [dependencies.reqwest] version = "0.10.8" default-features = false diff --git a/kube/src/api/subresource.rs b/kube/src/api/subresource.rs index 541553ec6..db12eb14c 100644 --- a/kube/src/api/subresource.rs +++ b/kube/src/api/subresource.rs @@ -1,6 +1,8 @@ +use async_tungstenite::tungstenite::{self as ws, Message}; use bytes::Bytes; -use futures::Stream; +use futures::{Stream, StreamExt}; use serde::de::DeserializeOwned; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use crate::{ api::{Api, PatchParams, PostParams, Resource}, @@ -208,3 +210,134 @@ where Ok(self.client.request_text_stream(req).await?) } } + +// ---------------------------------------------------------------------------- +// Attach subresource +// ---------------------------------------------------------------------------- +/// Params for attaching +#[derive(Default)] +pub struct AttachParams +where + R: AsyncRead + Unpin, + W: AsyncWrite + Unpin, +{ + /// The container for which to attach. Defaults to only container if there is one container in the pod. + pub container: Option, + /// If provided, the standard out stream of the pod will be redirected to it. + pub stdout: Option, + /// If provided, the standard error stream of the pod will be redirected to it. + pub stderr: Option, + /// If provided, the contents will be redirected to the standard input stream of the pod. + pub stdin: Option, + /// Allocate a terminal for this attach call; defaults to `false`. + pub tty: bool, +} + +impl Resource { + /// Attach to a pod + pub fn attach(&self, name: &str, ap: &AttachParams) -> Result> + where + R: AsyncRead + Unpin, + W: AsyncWrite + Unpin, + { + let base_url = self.make_url() + "/" + name + "/" + "attach?"; + let mut qp = url::form_urlencoded::Serializer::new(base_url); + + if ap.stdin.is_some() { + qp.append_pair("stdin", "true"); + } + if ap.stdout.is_some() { + qp.append_pair("stdout", "true"); + } + if ap.stderr.is_some() { + qp.append_pair("stderr", "true"); + } + if ap.tty { + qp.append_pair("tty", "true"); + } + if let Some(container) = &ap.container { + qp.append_pair("container", &container); + } + + let req = http::Request::post(qp.finish()); + req.body(()).map_err(Error::HttpError) + } +} + +/// Marker trait for objects that has attach +pub trait AttachingObject {} + +impl AttachingObject for k8s_openapi::api::core::v1::Pod {} + +impl Api +where + K: Clone + DeserializeOwned + AttachingObject, +{ + /// Attach to pod + pub async fn attach(&self, name: &str, mut ap: AttachParams) -> Result<()> + where + R: AsyncRead + Unpin, + W: AsyncWrite + Unpin, + { + let req = self.resource.attach(name, &ap)?; + let stream = self.client.connect(req).await?; + + let (mut _send, mut recv) = stream.split(); + let mut p_msg = recv.next(); + + loop { + // TODO Handle stdin and tty + match p_msg.await { + Some(Ok(msg)) => { + match msg { + Message::Binary(bin) if !bin.is_empty() => { + // Write to appropriate channel + match bin[0] { + // stdin + 0 => {} + // stdout + 1 => { + if let Some(stdout) = ap.stdout.as_mut() { + stdout.write_all(&bin[1..]).await?; + } + } + // stderr + 2 => { + if let Some(stderr) = ap.stderr.as_mut() { + stderr.write_all(&bin[1..]).await?; + } + } + // error? + 3 => {} + // resize? + 4 => {} + _ => {} + } + } + + Message::Binary(_) => {} + Message::Text(_) => {} + Message::Ping(_) => {} + Message::Pong(_) => {} + Message::Close(_) => { + // Connection will terminate when None is received. + } + } + p_msg = recv.next(); + } + + Some(Err(ws::Error::ConnectionClosed)) => { + // not actually an error + break; + } + + Some(Err(err)) => return Err(Error::from(err)), + + None => { + break; + } + } + } + Ok(()) + } +} diff --git a/kube/src/client/mod.rs b/kube/src/client/mod.rs index 1f90dd5ac..7a6d684ba 100644 --- a/kube/src/client/mod.rs +++ b/kube/src/client/mod.rs @@ -13,6 +13,10 @@ use crate::{ Error, Result, }; +use async_tungstenite::{ + tokio::{connect_async_with_tls_connector, ConnectStream}, + WebSocketStream, +}; use bytes::Bytes; use either::{Either, Left, Right}; use futures::{self, Stream, TryStream, TryStreamExt}; @@ -21,6 +25,17 @@ use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1; use serde::{de::DeserializeOwned, Deserialize}; use serde_json::{self, Value}; +// TODO Newer version of tokio_native_tls re-exports native_tls. +#[cfg(feature = "native-tls")] +use real_native_tls::{Certificate, Identity, TlsConnector}; +#[cfg(feature = "native-tls")] +use tokio_native_tls::TlsConnector as AsyncTlsConnector; + +#[cfg(feature = "rustls-tls")] +use std::sync::Arc; +#[cfg(feature = "rustls-tls")] +use tokio_rustls::{rustls::ClientConfig, TlsConnector as AsyncTlsConnector}; + use std::convert::{TryFrom, TryInto}; /// Client for connecting with a Kubernetes cluster. @@ -35,6 +50,7 @@ pub struct Client { default_ns: String, inner: reqwest::Client, config: Config, + tls_connector: AsyncTlsConnector, } impl Client { @@ -93,6 +109,26 @@ impl Client { Ok(res) } + /// Make WebSocket connection. + pub async fn connect(&self, request: http::Request<()>) -> Result> { + let (mut parts, _) = request.into_parts(); + if let Some(auth_header) = self.config.get_auth_header().await? { + parts.headers.insert(http::header::AUTHORIZATION, auth_header); + } + // Replace scheme to ws(s). + let pandq = parts.uri.path_and_query().expect("valid path+query from kube"); + parts.uri = finalize_url(&self.cluster_url, &pandq) + .replacen("http", "ws", 1) + .parse() + .expect("valid URL"); + let req = http::Request::from_parts(parts, ()); + + let (stream, _) = connect_async_with_tls_connector(req, Some(self.tls_connector.clone())) + .await + .expect("handshake error"); + Ok(stream) + } + /// Perform a raw HTTP request against the API and deserialize the response /// as JSON to some known type. pub async fn request(&self, request: http::Request>) -> Result @@ -355,11 +391,13 @@ impl TryFrom for Client { let cluster_url = config.cluster_url.clone(); let default_ns = config.default_ns.clone(); let config_clone = config.clone(); + let tls_connector: AsyncTlsConnector = config.clone().try_into()?; let builder: reqwest::ClientBuilder = config.try_into()?; Ok(Self { cluster_url, default_ns, inner: builder.build()?, + tls_connector, config: config_clone, }) } @@ -395,6 +433,42 @@ impl TryFrom for reqwest::ClientBuilder { } } +#[cfg(feature = "native-tls")] +impl TryFrom for AsyncTlsConnector { + type Error = Error; + fn try_from(config: Config) -> Result { + let mut builder = TlsConnector::builder(); + if let Some((identity, identity_password)) = config.identity.as_ref() { + builder.identity( + Identity::from_pkcs12(identity, identity_password) + .map_err(|e| Error::SslError(format!("{}", e)))?, + ); + } + if let Some(ders) = config.root_cert { + for der in ders { + builder.add_root_certificate( + Certificate::from_der(&der.0).map_err(|e| Error::SslError(format!("{}", e)))?, + ); + } + } + if config.accept_invalid_certs { + builder.danger_accept_invalid_certs(config.accept_invalid_certs); + } + let connector = builder.build().map_err(|e| Error::SslError(format!("{}", e)))?; + Ok(AsyncTlsConnector::from(connector)) + } +} + +// TODO rustls +#[cfg(feature = "rustls-tls")] +impl TryFrom for AsyncTlsConnector { + type Error = Error; + fn try_from(config: Config) -> Result { + let client_config = ClientConfig::new(); + Ok(AsyncTlsConnector::from(Arc::new(client_config))) + } +} + // TODO: replace with Status in k8s openapi? /// A Kubernetes status object diff --git a/kube/src/error.rs b/kube/src/error.rs index eb088ea14..c3254f3dd 100644 --- a/kube/src/error.rs +++ b/kube/src/error.rs @@ -70,6 +70,14 @@ pub enum Error { #[cfg(feature = "native-tls")] #[error("OpensslError: {0}")] OpensslError(#[from] openssl::error::ErrorStack), + + /// An error from `tokio::io` + #[error("IO Error: {0}")] + IoError(#[from] tokio::io::Error), + + /// An error from `async_tungstenite::tungstenite` + #[error("WebSocket Error: {0}")] + WebSocket(#[from] async_tungstenite::tungstenite::Error), } #[derive(Error, Debug)]