Skip to content

Commit

Permalink
Attach PoC
Browse files Browse the repository at this point in the history
  • Loading branch information
kazk committed Dec 27, 2020
1 parent 5f00307 commit 877fa9e
Show file tree
Hide file tree
Showing 5 changed files with 297 additions and 4 deletions.
12 changes: 10 additions & 2 deletions kube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ edition = "2018"

[features]
default = ["native-tls"]
native-tls = ["openssl", "reqwest/native-tls"]
rustls-tls = ["rustls", "reqwest/rustls-tls"]
native-tls = ["openssl", "reqwest/native-tls", "real-native-tls", "tokio-native-tls", "async-tungstenite/tokio-native-tls"]
rustls-tls = ["rustls", "reqwest/rustls-tls", "tokio-rustls", "async-tungstenite/tokio-rustls"]
derive = ["kube-derive"]

[package.metadata.docs.rs]
Expand All @@ -41,13 +41,21 @@ futures = "0.3.5"
pem = "0.8.1"
openssl = { version = "0.10.30", optional = true }
rustls = { version = "0.18.1", optional = true }
real-native-tls = { version = "0.2", optional = true, package = "native-tls" }
tokio-native-tls = { version = "0.1", optional = true }
tokio-rustls = { version = "0.14.1", optional = true }
bytes = "0.5.6"
Inflector = "0.11.4"
tokio = { version = "0.2.22", features = ["time", "signal", "sync"] }
static_assertions = "1.1.0"
kube-derive = { path = "../kube-derive", version = "^0.44.0", optional = true }
jsonpath_lib = "0.2.5"

[dependencies.async-tungstenite]
version = "0.9.3"
default-features = false
features = ["tokio-runtime"]

[dependencies.reqwest]
version = "0.10.8"
default-features = false
Expand Down
2 changes: 1 addition & 1 deletion kube/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod dynamic;
pub use dynamic::DynamicResource;

mod subresource;
pub use subresource::{LogParams, LoggingObject, ScaleSpec, ScaleStatus};
pub use subresource::{AttachParams, AttachingObject, LogParams, LoggingObject, ScaleSpec, ScaleStatus};

pub(crate) mod object;
pub use self::object::{Object, ObjectList, WatchEvent};
Expand Down
135 changes: 134 additions & 1 deletion kube/src/api/subresource.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use async_tungstenite::tungstenite::{self as ws, Message};
use bytes::Bytes;
use futures::Stream;
use futures::{Stream, StreamExt};
use serde::de::DeserializeOwned;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};

use crate::{
api::{Api, PatchParams, PostParams, Resource},
Expand Down Expand Up @@ -208,3 +210,134 @@ where
Ok(self.client.request_text_stream(req).await?)
}
}

// ----------------------------------------------------------------------------
// Attach subresource
// ----------------------------------------------------------------------------
/// Params for attaching
#[derive(Default, Debug)]
pub struct AttachParams<R, W>
where
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
{
/// The container for which to attach. Defaults to only container if there is one container in the pod.
pub container: Option<String>,
/// If provided, the standard out stream of the pod will be redirected to it.
pub stdout: Option<W>,
/// If provided, the standard error stream of the pod will be redirected to it.
pub stderr: Option<W>,
/// If provided, the contents will be redirected to the standard input stream of the pod.
pub stdin: Option<R>,
/// Allocate a terminal for this attach call; defaults to `false`.
pub tty: bool,
}

impl Resource {
/// Attach to a pod
pub fn attach<R, W>(&self, name: &str, ap: &AttachParams<R, W>) -> Result<http::Request<()>>
where
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
{
let base_url = self.make_url() + "/" + name + "/" + "attach?";
let mut qp = url::form_urlencoded::Serializer::new(base_url);

if ap.stdin.is_some() {
qp.append_pair("stdin", "true");
}
if ap.stdout.is_some() {
qp.append_pair("stdout", "true");
}
if ap.stderr.is_some() {
qp.append_pair("stderr", "true");
}
if ap.tty {
qp.append_pair("tty", "true");
}
if let Some(container) = &ap.container {
qp.append_pair("container", &container);
}

let req = http::Request::get(qp.finish());
req.body(()).map_err(Error::HttpError)
}
}

/// Marker trait for objects that has attach
pub trait AttachingObject {}

impl AttachingObject for k8s_openapi::api::core::v1::Pod {}

impl<K> Api<K>
where
K: Clone + DeserializeOwned + AttachingObject,
{
/// Attach to pod
pub async fn attach<R, W>(&self, name: &str, mut ap: AttachParams<R, W>) -> Result<()>
where
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
{
let req = self.resource.attach(name, &ap)?;
let stream = self.client.connect(req).await?;

let (mut _send, mut recv) = stream.split();
let mut p_msg = recv.next();

loop {
// TODO Handle stdin and tty
match p_msg.await {
Some(Ok(msg)) => {
match msg {
Message::Binary(bin) if !bin.is_empty() => {
// Write to appropriate channel
match bin[0] {
// stdin
0 => {}
// stdout
1 => {
if let Some(stdout) = ap.stdout.as_mut() {
stdout.write_all(&bin[1..]).await?;
}
}
// stderr
2 => {
if let Some(stderr) = ap.stderr.as_mut() {
stderr.write_all(&bin[1..]).await?;
}
}
// error?
3 => {}
// resize?
4 => {}
_ => {}
}
}

Message::Binary(_) => {}
Message::Text(_) => {}
Message::Ping(_) => {}
Message::Pong(_) => {}
Message::Close(_) => {
// Connection will terminate when None is received.
}
}
p_msg = recv.next();
}

Some(Err(ws::Error::ConnectionClosed)) => {
// not actually an error
break;
}

Some(Err(err)) => return Err(Error::from(err)),

None => {
break;
}
}
}
Ok(())
}
}
144 changes: 144 additions & 0 deletions kube/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ use crate::{
Error, Result,
};

use async_tungstenite::{
tokio::{connect_async_with_tls_connector, ConnectStream},
WebSocketStream,
};
use bytes::Bytes;
use either::{Either, Left, Right};
use futures::{self, Stream, TryStream, TryStreamExt};
Expand All @@ -21,6 +25,20 @@ use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
use serde::{de::DeserializeOwned, Deserialize};
use serde_json::{self, Value};

// TODO Newer version of tokio_native_tls re-exports native_tls.
#[cfg(feature = "native-tls")]
use real_native_tls::{Certificate, Identity, TlsConnector};
#[cfg(feature = "native-tls")]
use tokio_native_tls::TlsConnector as AsyncTlsConnector;

#[cfg(feature = "rustls-tls")]
use std::sync::Arc;
#[cfg(feature = "rustls-tls")]
use tokio_rustls::{
rustls::{self, Certificate, ClientConfig},
webpki, TlsConnector as AsyncTlsConnector,
};

use std::convert::{TryFrom, TryInto};

/// Client for connecting with a Kubernetes cluster.
Expand All @@ -35,6 +53,7 @@ pub struct Client {
default_ns: String,
inner: reqwest::Client,
config: Config,
tls_connector: AsyncTlsConnector,
}

impl Client {
Expand Down Expand Up @@ -93,6 +112,26 @@ impl Client {
Ok(res)
}

/// Make WebSocket connection.
pub async fn connect(&self, request: http::Request<()>) -> Result<WebSocketStream<ConnectStream>> {
let (mut parts, _) = request.into_parts();
if let Some(auth_header) = self.config.get_auth_header().await? {
parts.headers.insert(http::header::AUTHORIZATION, auth_header);
}
// Replace scheme to ws(s).
let pandq = parts.uri.path_and_query().expect("valid path+query from kube");
parts.uri = finalize_url(&self.cluster_url, &pandq)
.replacen("http", "ws", 1)
.parse()
.expect("valid URL");
let req = http::Request::from_parts(parts, ());

let (stream, _) = connect_async_with_tls_connector(req, Some(self.tls_connector.clone()))
.await
.expect("handshake error");
Ok(stream)
}

/// Perform a raw HTTP request against the API and deserialize the response
/// as JSON to some known type.
pub async fn request<T>(&self, request: http::Request<Vec<u8>>) -> Result<T>
Expand Down Expand Up @@ -355,11 +394,13 @@ impl TryFrom<Config> for Client {
let cluster_url = config.cluster_url.clone();
let default_ns = config.default_ns.clone();
let config_clone = config.clone();
let tls_connector: AsyncTlsConnector = config.clone().try_into()?;
let builder: reqwest::ClientBuilder = config.try_into()?;
Ok(Self {
cluster_url,
default_ns,
inner: builder.build()?,
tls_connector,
config: config_clone,
})
}
Expand Down Expand Up @@ -395,6 +436,109 @@ impl TryFrom<Config> for reqwest::ClientBuilder {
}
}

#[cfg(feature = "native-tls")]
impl TryFrom<Config> for AsyncTlsConnector {
type Error = Error;
fn try_from(config: Config) -> Result<Self> {
let mut builder = TlsConnector::builder();
if let Some((identity, identity_password)) = config.identity.as_ref() {
builder.identity(
Identity::from_pkcs12(identity, identity_password)
.map_err(|e| Error::SslError(format!("{}", e)))?,
);
}
if let Some(ders) = config.root_cert {
for der in ders {
builder.add_root_certificate(
Certificate::from_der(&der.0).map_err(|e| Error::SslError(format!("{}", e)))?,
);
}
}
if config.accept_invalid_certs {
builder.danger_accept_invalid_certs(config.accept_invalid_certs);
}
let connector = builder.build().map_err(|e| Error::SslError(format!("{}", e)))?;
Ok(AsyncTlsConnector::from(connector))
}
}

#[cfg(feature = "rustls-tls")]
impl TryFrom<Config> for AsyncTlsConnector {
type Error = Error;
fn try_from(config: Config) -> Result<Self> {
use rustls::internal::pemfile;
use std::io::Cursor;

let mut client_config = ClientConfig::new();
// This is based on how `reqwest` does
if let Some((buf, _)) = config.identity.as_ref() {
let (key, certs) = {
let mut pem = Cursor::new(buf);
let certs = pemfile::certs(&mut pem)
.map_err(|_| Error::SslError("No valid certificate was found".into()))?;
pem.set_position(0);

let mut sk = pemfile::pkcs8_private_keys(&mut pem)
.and_then(|pkcs8_keys| {
if pkcs8_keys.is_empty() {
Err(())
} else {
Ok(pkcs8_keys)
}
})
.or_else(|_| {
pem.set_position(0);
pemfile::rsa_private_keys(&mut pem)
})
.map_err(|_| Error::SslError("No valid private key was found".into()))?;

if let (Some(sk), false) = (sk.pop(), certs.is_empty()) {
(sk, certs)
} else {
return Err(Error::SslError("private key or certificate not found".into()));
}
};

client_config
.set_single_client_cert(certs, key)
.map_err(|e| Error::SslError(format!("{}", e)))?;
}

if let Some(ders) = config.root_cert {
for der in ders {
client_config
.root_store
.add(&Certificate(der.0))
.map_err(|e| Error::SslError(format!("{}", e)))?;
}
}

if config.accept_invalid_certs {
client_config
.dangerous()
.set_certificate_verifier(Arc::new(NoCertificateVerification {}));
}

Ok(AsyncTlsConnector::from(Arc::new(client_config)))
}
}

#[cfg(feature = "rustls-tls")]
struct NoCertificateVerification {}

#[cfg(feature = "rustls-tls")]
impl rustls::ServerCertVerifier for NoCertificateVerification {
fn verify_server_cert(
&self,
_roots: &rustls::RootCertStore,
_presented_certs: &[rustls::Certificate],
_dns_name: webpki::DNSNameRef<'_>,
_ocsp: &[u8],
) -> Result<rustls::ServerCertVerified, rustls::TLSError> {
Ok(rustls::ServerCertVerified::assertion())
}
}

// TODO: replace with Status in k8s openapi?

/// A Kubernetes status object
Expand Down
8 changes: 8 additions & 0 deletions kube/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ pub enum Error {
#[cfg(feature = "native-tls")]
#[error("OpensslError: {0}")]
OpensslError(#[from] openssl::error::ErrorStack),

/// An error from `tokio::io`
#[error("IO Error: {0}")]
IoError(#[from] tokio::io::Error),

/// An error from `async_tungstenite::tungstenite`
#[error("WebSocket Error: {0}")]
WebSocket(#[from] async_tungstenite::tungstenite::Error),
}

#[derive(Error, Debug)]
Expand Down

0 comments on commit 877fa9e

Please sign in to comment.