Skip to content

Commit

Permalink
refactor(kube-proxy): expose error to make it extensible
Browse files Browse the repository at this point in the history
The plugin can then add a more human error message.

Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>
  • Loading branch information
tiagolobocastro committed Nov 7, 2024
1 parent 1670d8c commit 401e460
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 76 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions k8s/forward/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tower = "0.5.1"
tower-http = { version = "0.6.1", features = ["map-response-body"] }
hyper = { version = "1.5.0", features = ["client", "http1", "http2"] }
hyper-body = { path = "../../utils/hyper-body" }
thiserror = "1.0.68"

[dev-dependencies]
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
38 changes: 38 additions & 0 deletions k8s/forward/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/// A kube-forward error.
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum Error {
#[error("Service '{selector}' not found on '{namespace:?}'")]
ServiceNotFound {
selector: String,
namespace: crate::NameSpace,
},
#[error("{source}")]
Kube { source: kube::Error },
#[error("{source}")]
AnyHow { source: anyhow::Error },
#[error("Invalid uri: {source}")]
InvalidUri {
source: hyper::http::uri::InvalidUri,
},
#[error("{source}")]
Io { source: std::io::Error },
}

impl From<anyhow::Error> for Error {
fn from(source: anyhow::Error) -> Self {
Self::AnyHow { source }
}
}

impl From<kube::Error> for Error {
fn from(source: kube::Error) -> Self {
Self::Kube { source }
}
}

impl From<hyper::http::uri::InvalidUri> for Error {
fn from(source: hyper::http::uri::InvalidUri) -> Self {
Self::InvalidUri { source }
}
}
14 changes: 9 additions & 5 deletions k8s/forward/src/http_forward.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
error::Error,
pod_selection::{AnyReady, PodSelection},
vx::{Pod, Service},
};
Expand Down Expand Up @@ -48,10 +49,10 @@ impl HttpForward {
}

/// Returns the `hyper::Uri` that can be used to proxy with the kubeapi server.
pub async fn uri(self) -> anyhow::Result<hyper::Uri> {
pub async fn uri(self) -> Result<hyper::Uri, Error> {
let target = self.finder().find(&self.target).await?;
let uri = hyper::Uri::try_from(target.with_scheme(self.scheme))?;
tracing::info!(%uri, "generated kube-api");
tracing::debug!(%uri, "generated kube-api");
Ok(uri)
}

Expand Down Expand Up @@ -101,7 +102,7 @@ impl HttpProxy {
Self { client }
}
/// Tries to return a default `HttpProxy` with a default `kube::Client`.
pub async fn try_default() -> anyhow::Result<Self> {
pub async fn try_default() -> Result<Self, Error> {
Ok(Self {
client: kube::Client::try_default().await?,
})
Expand Down Expand Up @@ -167,7 +168,7 @@ impl<'a> TargetFinder<'a> {
/// Finds the `HttpTarget` according to the specified target.
/// # Arguments
/// * `target` - the target to be found
async fn find(&self, target: &crate::Target) -> anyhow::Result<HttpTarget> {
async fn find(&self, target: &crate::Target) -> Result<HttpTarget, Error> {
let pod_api = self.pod_api;
let svc_api = self.svc_api;

Expand All @@ -192,7 +193,10 @@ impl<'a> TargetFinder<'a> {
let services = svc_api.list(&Self::svc_params(&selector)).await?;
let service = match services.items.into_iter().next() {
Some(service) => Ok(service),
None => Err(anyhow::anyhow!("Service '{}' not found", selector)),
None => Err(Error::ServiceNotFound {
selector,
namespace: namespace.clone(),
}),
}?;

Ok(HttpTarget::new(
Expand Down
14 changes: 10 additions & 4 deletions k8s/forward/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//!
//! If you're looking at a higher-level construct, please take a look at kube-proxy.

mod error;
mod http_forward;
mod pod_selection;
mod port_forward;
Expand All @@ -23,6 +24,9 @@ use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
use kube::ResourceExt;
use vx::Pod;

/// The error exposed.
pub use crate::error::Error;

/// Different types of target selectors.
#[derive(Clone)]
pub enum TargetSelector {
Expand Down Expand Up @@ -104,8 +108,8 @@ pub struct Target {

/// A kubernetes namespace.
/// If None, the default is "default".
#[derive(Clone)]
pub(crate) struct NameSpace(Option<String>);
#[derive(Debug, Clone)]
pub struct NameSpace(Option<String>);
impl NameSpace {
/// Returns the configured namespace or the default.
pub(crate) fn name_any(&self) -> String {
Expand All @@ -121,7 +125,7 @@ pub(crate) struct TargetPod {
port_number: u16,
}
impl TargetPod {
fn new(pod_name: String, port_number: i32) -> anyhow::Result<Self> {
fn new(pod_name: String, port_number: i32) -> Result<Self, Error> {
let port_number = u16::try_from(port_number).context("Port not valid")?;
Ok(Self {
pod_name,
Expand All @@ -140,6 +144,8 @@ impl Target {
/// * `selector` - target selector
/// * `port` - target port
/// * `namespace` - target namespace
///
/// TODO: this namespace api is not bad, needs refactoring...
pub fn new<I: Into<Option<T>>, T: Into<String>, P: Into<Port>>(
selector: TargetSelector,
port: P,
Expand Down Expand Up @@ -177,7 +183,7 @@ impl Target {
}

/// Returns the `TargetPod` for the given pod/port or pod/self.port.
pub(crate) fn find(&self, pod: &Pod, port: Option<Port>) -> anyhow::Result<TargetPod> {
pub(crate) fn find(&self, pod: &Pod, port: Option<Port>) -> Result<TargetPod, Error> {
let port = match &port {
None => &self.port,
Some(port) => port,
Expand Down
30 changes: 16 additions & 14 deletions k8s/forward/src/port_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use tokio_stream::wrappers::TcpListenerStream;
use crate::{
pod_selection::{AnyReady, PodSelection},
vx::{Pod, Service},
Error,
};
use kube::{
api::{Api, ListParams},
Expand Down Expand Up @@ -55,11 +56,16 @@ impl PortForward {
}

/// Runs the port forwarding proxy until a SIGINT signal is received.
pub async fn port_forward(self) -> anyhow::Result<(u16, tokio::task::JoinHandle<()>)> {
pub async fn port_forward(self) -> Result<(u16, tokio::task::JoinHandle<()>), Error> {
let addr = SocketAddr::from(([127, 0, 0, 1], self.local_port()));

let bind = TcpListener::bind(addr).await?;
let port = bind.local_addr()?.port();
let bind = TcpListener::bind(addr)
.await
.map_err(|source| Error::Io { source })?;
let port = bind
.local_addr()
.map_err(|source| Error::Io { source })?
.port();
tracing::trace!(port, "Bound to local port");

let server = TcpListenerStream::new(bind)
Expand All @@ -74,11 +80,8 @@ impl PortForward {
}

tokio::spawn(async move {
if let Err(e) = pf.forward_connection(client_conn).await {
tracing::error!(
error = e.as_ref() as &dyn std::error::Error,
"failed to forward connection"
);
if let Err(error) = pf.forward_connection(client_conn).await {
tracing::error!(%error, "failed to forward connection");
}
});

Expand All @@ -96,10 +99,7 @@ impl PortForward {
}),
))
}
async fn forward_connection(
self,
mut client_conn: tokio::net::TcpStream,
) -> anyhow::Result<()> {
async fn forward_connection(self, mut client_conn: tokio::net::TcpStream) -> Result<(), Error> {
let target = self.finder().find(&self.target).await?;
let (pod_name, pod_port) = target.into_parts();

Expand All @@ -120,7 +120,9 @@ impl PortForward {
}

drop(upstream_conn);
forwarder.join().await?;
forwarder.join().await.map_err(|error| Error::AnyHow {
source: error.into(),
})?;
tracing::debug!(local_port, pod_port, pod_name, "connection closed");
Ok(())
}
Expand All @@ -143,7 +145,7 @@ impl<'a> TargetPodFinder<'a> {
/// Finds the name and port of the target pod specified by the selector.
/// # Arguments
/// * `target` - the target to be found
pub(crate) async fn find(&self, target: &crate::Target) -> anyhow::Result<crate::TargetPod> {
pub(crate) async fn find(&self, target: &crate::Target) -> Result<crate::TargetPod, Error> {
let pod_api = self.pod_api;
let svc_api = self.svc_api;
let ready_pod = AnyReady {};
Expand Down
2 changes: 2 additions & 0 deletions k8s/proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ hyper = { version = "1.5.0", features = ["client", "http1", "http2"] }
hyper-body = { path = "../../utils/hyper-body" }

anyhow = "1.0.92"
thiserror = "1.0.68"
url = "2.5.2"
47 changes: 47 additions & 0 deletions k8s/proxy/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/// A kube-proxy error.
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum Error {
#[error("{source}")]
Forward { source: kube_forward::Error },
#[error("{source}")]
Kube { source: kube::Error },
#[error("{source}")]
AnyHow { source: anyhow::Error },
#[error("Invalid url: {source}")]
InvalidUrl { source: url::ParseError },
#[error("Invalid uri: {source}")]
InvalidUri {
source: hyper::http::uri::InvalidUri,
},
}

impl From<kube_forward::Error> for Error {
fn from(source: kube_forward::Error) -> Self {
Self::Forward { source }
}
}

impl From<anyhow::Error> for Error {
fn from(source: anyhow::Error) -> Self {
Self::AnyHow { source }
}
}

impl From<kube::Error> for Error {
fn from(source: kube::Error) -> Self {
Self::Kube { source }
}
}

impl From<url::ParseError> for Error {
fn from(source: url::ParseError) -> Self {
Self::InvalidUrl { source }
}
}

impl From<hyper::http::uri::InvalidUri> for Error {
fn from(source: hyper::http::uri::InvalidUri) -> Self {
Self::InvalidUri { source }
}
}
58 changes: 23 additions & 35 deletions k8s/proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,40 @@
//! A utility library to facilitate connections to a kubernetes cluster via
//! the k8s-proxy library.

use std::{
env,
path::{Path, PathBuf},
};
use std::path::PathBuf;

mod error;
mod proxy;

/// A [`error::Error`].
pub use error::Error;
use kube::config::KubeConfigOptions;
/// OpenApi client helpers.
pub use proxy::{ConfigBuilder, ForwardingProxy, LokiClient, Scheme};

/// Get the `kube::Config` from the given kubeconfig file, or the default.
pub async fn config_from_kubeconfig(
kube_config_path: Option<PathBuf>,
) -> anyhow::Result<kube::Config> {
let file = match kube_config_path {
Some(config_path) => config_path,
None => {
let file_path = match env::var("KUBECONFIG") {
Ok(value) => Some(value),
Err(_) => {
// Look for kubeconfig file in default location.
#[cfg(any(target_os = "linux", target_os = "macos"))]
let default_path = format!("{}/.kube/config", env::var("HOME")?);
#[cfg(target_os = "windows")]
let default_path = format!("{}/.kube/config", env::var("USERPROFILE")?);
match Path::new(&default_path).exists() {
true => Some(default_path),
false => None,
}
}
};
if file_path.is_none() {
return Err(anyhow::anyhow!(
"kubeconfig file not found in default location"
));
}
let mut path = PathBuf::new();
path.push(file_path.unwrap_or_default());
path
let mut config = match kube_config_path {
Some(config_path) => {
// NOTE: Kubeconfig file may hold multiple contexts to communicate
// with different kubernetes clusters. We have to pick master
// address of current-context config only
let kube_config = kube::config::Kubeconfig::read_from(&config_path)?;
kube::Config::from_custom_kubeconfig(kube_config, &Default::default()).await?
}
None => kube::Config::from_kubeconfig(&KubeConfigOptions::default()).await?,
};

// NOTE: Kubeconfig file may hold multiple contexts to communicate
// with different kubernetes clusters. We have to pick master
// address of current-context config only
let kube_config = kube::config::Kubeconfig::read_from(&file)?;
let config = kube::Config::from_custom_kubeconfig(kube_config, &Default::default()).await?;
config.apply_debug_overrides();
Ok(config)
}

/// Get the `kube::Client` from the given kubeconfig file, or the default.
pub async fn client_from_kubeconfig(
kube_config_path: Option<PathBuf>,
) -> anyhow::Result<kube::Client> {
Ok(kube::Client::try_from(
config_from_kubeconfig(kube_config_path).await?,
)?)
}
Loading

0 comments on commit 401e460

Please sign in to comment.