Skip to content

Commit

Permalink
chore(bors): merge pull request #885
Browse files Browse the repository at this point in the history
885: fix(k8s): use provided kube-config path r=tiagolobocastro a=tiagolobocastro

Ensure we always use the provided kube-config path.

Co-authored-by: Tiago Castro <tiagolobocastro@gmail.com>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Nov 7, 2024
2 parents 4ec8c08 + 401e460 commit dd21e4e
Show file tree
Hide file tree
Showing 12 changed files with 194 additions and 98 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions k8s/forward/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tower = "0.5.1"
tower-http = { version = "0.6.1", features = ["map-response-body"] }
hyper = { version = "1.5.0", features = ["client", "http1", "http2"] }
hyper-body = { path = "../../utils/hyper-body" }
thiserror = "1.0.68"

[dev-dependencies]
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
3 changes: 1 addition & 2 deletions k8s/forward/examples/http-forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ async fn main() -> anyhow::Result<()> {

let selector = kube_forward::TargetSelector::svc_label("app", "api-rest");
let target = kube_forward::Target::new(selector, "http", "mayastor");
let uri = kube_forward::HttpForward::new(target, None)
.await?
let uri = kube_forward::HttpForward::new(target, None, kube::Client::try_default().await?)
.uri()
.await?;

Expand Down
2 changes: 1 addition & 1 deletion k8s/forward/examples/port-forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ async fn main() -> anyhow::Result<()> {

let selector = kube_forward::TargetSelector::svc_label("app", "api-rest");
let target = kube_forward::Target::new(selector, "http", "mayastor");
let pf = kube_forward::PortForward::new(target, 30011).await?;
let pf = kube_forward::PortForward::new(target, 30011, kube::Client::try_default().await?);

let (_, handle) = pf.port_forward().await?;
handle.await?;
Expand Down
38 changes: 38 additions & 0 deletions k8s/forward/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/// A kube-forward error.
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum Error {
#[error("Service '{selector}' not found on '{namespace:?}'")]
ServiceNotFound {
selector: String,
namespace: crate::NameSpace,
},
#[error("{source}")]
Kube { source: kube::Error },
#[error("{source}")]
AnyHow { source: anyhow::Error },
#[error("Invalid uri: {source}")]
InvalidUri {
source: hyper::http::uri::InvalidUri,
},
#[error("{source}")]
Io { source: std::io::Error },
}

impl From<anyhow::Error> for Error {
fn from(source: anyhow::Error) -> Self {
Self::AnyHow { source }
}
}

impl From<kube::Error> for Error {
fn from(source: kube::Error) -> Self {
Self::Kube { source }
}
}

impl From<hyper::http::uri::InvalidUri> for Error {
fn from(source: hyper::http::uri::InvalidUri) -> Self {
Self::InvalidUri { source }
}
}
27 changes: 16 additions & 11 deletions k8s/forward/src/http_forward.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
error::Error,
pod_selection::{AnyReady, PodSelection},
vx::{Pod, Service},
};
Expand Down Expand Up @@ -32,26 +33,26 @@ impl HttpForward {
/// Return a new `Self`.
/// # Arguments
/// * `target` - the target we'll forward to
pub async fn new<SO: Into<Option<Scheme>>>(
pub fn new<SO: Into<Option<Scheme>>>(
target: crate::Target,
scheme: SO,
) -> anyhow::Result<Self> {
let client = kube::Client::try_default().await?;
client: kube::Client,
) -> Self {
let namespace = target.namespace.name_any();

Ok(Self {
Self {
target,
pod_api: Api::namespaced(client.clone(), &namespace),
svc_api: Api::namespaced(client, &namespace),
scheme: scheme.into().unwrap_or(Scheme::HTTP),
})
}
}

/// Returns the `hyper::Uri` that can be used to proxy with the kubeapi server.
pub async fn uri(self) -> anyhow::Result<hyper::Uri> {
pub async fn uri(self) -> Result<hyper::Uri, Error> {
let target = self.finder().find(&self.target).await?;
let uri = hyper::Uri::try_from(target.with_scheme(self.scheme))?;
tracing::info!(%uri, "generated kube-api");
tracing::debug!(%uri, "generated kube-api");
Ok(uri)
}

Expand All @@ -70,7 +71,8 @@ impl HttpForward {
/// ```ignore
/// let selector = kube_forward::TargetSelector::svc_label("app", "api-rest");
/// let target = kube_forward::Target::new(selector, "http", "mayastor");
/// let pf = kube_forward::HttpForward::new(target, None).await?;
/// let client = kube::Client::try_default().await?;
/// let pf = kube_forward::HttpForward::new(target, None, client).await?;
///
/// let uri = pf.uri().await?;
/// tracing::info!(%uri, "generated kube-api");
Expand Down Expand Up @@ -100,7 +102,7 @@ impl HttpProxy {
Self { client }
}
/// Tries to return a default `HttpProxy` with a default `kube::Client`.
pub async fn try_default() -> anyhow::Result<Self> {
pub async fn try_default() -> Result<Self, Error> {
Ok(Self {
client: kube::Client::try_default().await?,
})
Expand Down Expand Up @@ -166,7 +168,7 @@ impl<'a> TargetFinder<'a> {
/// Finds the `HttpTarget` according to the specified target.
/// # Arguments
/// * `target` - the target to be found
async fn find(&self, target: &crate::Target) -> anyhow::Result<HttpTarget> {
async fn find(&self, target: &crate::Target) -> Result<HttpTarget, Error> {
let pod_api = self.pod_api;
let svc_api = self.svc_api;

Expand All @@ -191,7 +193,10 @@ impl<'a> TargetFinder<'a> {
let services = svc_api.list(&Self::svc_params(&selector)).await?;
let service = match services.items.into_iter().next() {
Some(service) => Ok(service),
None => Err(anyhow::anyhow!("Service '{}' not found", selector)),
None => Err(Error::ServiceNotFound {
selector,
namespace: namespace.clone(),
}),
}?;

Ok(HttpTarget::new(
Expand Down
14 changes: 10 additions & 4 deletions k8s/forward/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//!
//! If you're looking at a higher-level construct, please take a look at kube-proxy.

mod error;
mod http_forward;
mod pod_selection;
mod port_forward;
Expand All @@ -23,6 +24,9 @@ use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
use kube::ResourceExt;
use vx::Pod;

/// The error exposed.
pub use crate::error::Error;

/// Different types of target selectors.
#[derive(Clone)]
pub enum TargetSelector {
Expand Down Expand Up @@ -104,8 +108,8 @@ pub struct Target {

/// A kubernetes namespace.
/// If None, the default is "default".
#[derive(Clone)]
pub(crate) struct NameSpace(Option<String>);
#[derive(Debug, Clone)]
pub struct NameSpace(Option<String>);
impl NameSpace {
/// Returns the configured namespace or the default.
pub(crate) fn name_any(&self) -> String {
Expand All @@ -121,7 +125,7 @@ pub(crate) struct TargetPod {
port_number: u16,
}
impl TargetPod {
fn new(pod_name: String, port_number: i32) -> anyhow::Result<Self> {
fn new(pod_name: String, port_number: i32) -> Result<Self, Error> {
let port_number = u16::try_from(port_number).context("Port not valid")?;
Ok(Self {
pod_name,
Expand All @@ -140,6 +144,8 @@ impl Target {
/// * `selector` - target selector
/// * `port` - target port
/// * `namespace` - target namespace
///
/// TODO: this namespace api is not bad, needs refactoring...
pub fn new<I: Into<Option<T>>, T: Into<String>, P: Into<Port>>(
selector: TargetSelector,
port: P,
Expand Down Expand Up @@ -177,7 +183,7 @@ impl Target {
}

/// Returns the `TargetPod` for the given pod/port or pod/self.port.
pub(crate) fn find(&self, pod: &Pod, port: Option<Port>) -> anyhow::Result<TargetPod> {
pub(crate) fn find(&self, pod: &Pod, port: Option<Port>) -> Result<TargetPod, Error> {
let port = match &port {
None => &self.port,
Some(port) => port,
Expand Down
43 changes: 21 additions & 22 deletions k8s/forward/src/port_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use tokio_stream::wrappers::TcpListenerStream;
use crate::{
pod_selection::{AnyReady, PodSelection},
vx::{Pod, Service},
Error,
};
use kube::{
api::{Api, ListParams},
Expand All @@ -18,7 +19,8 @@ use kube::{
/// ```ignore
/// let selector = kube_forward::TargetSelector::pod_label("app", "etcd");
/// let target = kube_forward::Target::new(selector, "client", "mayastor");
/// let pf = kube_forward::PortForward::new(target, 35003).await?;
/// let client = kube::Client::try_default().await?;
/// let pf = kube_forward::PortForward::new(target, 35003, client).await?;
///
/// let (_port, handle) = pf.port_forward().await?;
/// handle.await?;
Expand All @@ -36,19 +38,15 @@ impl PortForward {
/// # Arguments
/// * `target` - the target we'll forward to
/// * `local_port` - specific local port to use, if Some
pub async fn new(
target: crate::Target,
local_port: impl Into<Option<u16>>,
) -> anyhow::Result<Self> {
let client = Client::try_default().await?;
pub fn new(target: crate::Target, local_port: impl Into<Option<u16>>, client: Client) -> Self {
let namespace = target.namespace.name_any();

Ok(Self {
Self {
target,
local_port: local_port.into(),
pod_api: Api::namespaced(client.clone(), &namespace),
svc_api: Api::namespaced(client, &namespace),
})
}
}

/// The specified local port, or 0.
Expand All @@ -58,11 +56,16 @@ impl PortForward {
}

/// Runs the port forwarding proxy until a SIGINT signal is received.
pub async fn port_forward(self) -> anyhow::Result<(u16, tokio::task::JoinHandle<()>)> {
pub async fn port_forward(self) -> Result<(u16, tokio::task::JoinHandle<()>), Error> {
let addr = SocketAddr::from(([127, 0, 0, 1], self.local_port()));

let bind = TcpListener::bind(addr).await?;
let port = bind.local_addr()?.port();
let bind = TcpListener::bind(addr)
.await
.map_err(|source| Error::Io { source })?;
let port = bind
.local_addr()
.map_err(|source| Error::Io { source })?
.port();
tracing::trace!(port, "Bound to local port");

let server = TcpListenerStream::new(bind)
Expand All @@ -77,11 +80,8 @@ impl PortForward {
}

tokio::spawn(async move {
if let Err(e) = pf.forward_connection(client_conn).await {
tracing::error!(
error = e.as_ref() as &dyn std::error::Error,
"failed to forward connection"
);
if let Err(error) = pf.forward_connection(client_conn).await {
tracing::error!(%error, "failed to forward connection");
}
});

Expand All @@ -99,10 +99,7 @@ impl PortForward {
}),
))
}
async fn forward_connection(
self,
mut client_conn: tokio::net::TcpStream,
) -> anyhow::Result<()> {
async fn forward_connection(self, mut client_conn: tokio::net::TcpStream) -> Result<(), Error> {
let target = self.finder().find(&self.target).await?;
let (pod_name, pod_port) = target.into_parts();

Expand All @@ -123,7 +120,9 @@ impl PortForward {
}

drop(upstream_conn);
forwarder.join().await?;
forwarder.join().await.map_err(|error| Error::AnyHow {
source: error.into(),
})?;
tracing::debug!(local_port, pod_port, pod_name, "connection closed");
Ok(())
}
Expand All @@ -146,7 +145,7 @@ impl<'a> TargetPodFinder<'a> {
/// Finds the name and port of the target pod specified by the selector.
/// # Arguments
/// * `target` - the target to be found
pub(crate) async fn find(&self, target: &crate::Target) -> anyhow::Result<crate::TargetPod> {
pub(crate) async fn find(&self, target: &crate::Target) -> Result<crate::TargetPod, Error> {
let pod_api = self.pod_api;
let svc_api = self.svc_api;
let ready_pod = AnyReady {};
Expand Down
2 changes: 2 additions & 0 deletions k8s/proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ hyper = { version = "1.5.0", features = ["client", "http1", "http2"] }
hyper-body = { path = "../../utils/hyper-body" }

anyhow = "1.0.92"
thiserror = "1.0.68"
url = "2.5.2"
47 changes: 47 additions & 0 deletions k8s/proxy/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/// A kube-proxy error.
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum Error {
#[error("{source}")]
Forward { source: kube_forward::Error },
#[error("{source}")]
Kube { source: kube::Error },
#[error("{source}")]
AnyHow { source: anyhow::Error },
#[error("Invalid url: {source}")]
InvalidUrl { source: url::ParseError },
#[error("Invalid uri: {source}")]
InvalidUri {
source: hyper::http::uri::InvalidUri,
},
}

impl From<kube_forward::Error> for Error {
fn from(source: kube_forward::Error) -> Self {
Self::Forward { source }
}
}

impl From<anyhow::Error> for Error {
fn from(source: anyhow::Error) -> Self {
Self::AnyHow { source }
}
}

impl From<kube::Error> for Error {
fn from(source: kube::Error) -> Self {
Self::Kube { source }
}
}

impl From<url::ParseError> for Error {
fn from(source: url::ParseError) -> Self {
Self::InvalidUrl { source }
}
}

impl From<hyper::http::uri::InvalidUri> for Error {
fn from(source: hyper::http::uri::InvalidUri) -> Self {
Self::InvalidUri { source }
}
}
Loading

0 comments on commit dd21e4e

Please sign in to comment.