From a17e43e3488a4567c4c84eaae0638bf09d045c4d Mon Sep 17 00:00:00 2001 From: Steven Landow Date: Mon, 11 Mar 2024 02:20:08 -0700 Subject: [PATCH] outbound doens't re-traverse waypoint --- src/proxy.rs | 254 +++++++++++++++++++++++++++++++++++++++- src/proxy/inbound.rs | 263 +----------------------------------------- src/proxy/outbound.rs | 30 +++-- 3 files changed, 276 insertions(+), 271 deletions(-) diff --git a/src/proxy.rs b/src/proxy.rs index a42a28a58..a79fbd922 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -29,7 +29,7 @@ use tracing::{error, trace, warn, Instrument}; use inbound::Inbound; pub use metrics::*; -use crate::identity::SecretManager; +use crate::identity::{Identity, SecretManager}; use crate::metrics::Recorder; use crate::proxy::connection_manager::{ConnectionManager, PolicyWatcher}; use crate::proxy::inbound_passthrough::InboundPassthrough; @@ -37,7 +37,8 @@ use crate::proxy::outbound::Outbound; use crate::proxy::socks5::Socks5; use crate::rbac::Connection; use crate::state::service::{endpoint_uid, Service, ServiceDescription}; -use crate::state::workload::{network_addr, Workload}; +use crate::state::workload::address::Address; +use crate::state::workload::{network_addr, GatewayAddress, Workload}; use crate::state::{DemandProxyState, WorkloadInfo}; use crate::{config, identity, socket, tls}; @@ -529,6 +530,60 @@ pub fn guess_inbound_service( .map(ServiceDescription::from) } +// Checks if the connection's source identity is the identity for the upstream's waypoint +async fn check_from_waypoint( + state: DemandProxyState, + upstream: &Workload, + src_identity: Option<&Identity>, +) -> bool { + check_gateway_address(state, src_identity, upstream.waypoint.as_ref()).await +} + +// Checks if the connection's source identity is the identity for the upstream's network +// gateway +async fn check_from_network_gateway( + state: DemandProxyState, + upstream: &Workload, + src_identity: Option<&Identity>, +) -> bool { + check_gateway_address(state, src_identity, upstream.network_gateway.as_ref()).await +} + +// Check if the source's identity matches any workloads that make up the given gateway +// TODO: This can be made more accurate by also checking addresses. +async fn check_gateway_address( + state: DemandProxyState, + src_identity: Option<&Identity>, + gateway_address: Option<&GatewayAddress>, +) -> bool { + let Some(src_identity) = src_identity else { + return false; + }; + if let Some(gateway_address) = gateway_address { + let from_gateway = match state.fetch_destination(&gateway_address.destination).await { + Some(Address::Workload(wl)) => &wl.identity() == src_identity, + Some(Address::Service(svc)) => { + for (_ep_uid, ep) in svc.endpoints.iter() { + // fetch workloads by workload UID since we may not have an IP for an endpoint (e.g., endpoint is just a hostname) + if state + .fetch_workload_by_uid(&ep.workload_uid) + .await + .map(|w| w.identity()) + .as_ref() + == Some(src_identity) + { + return true; + } + } + false + } + None => false, + }; + return from_gateway; + } + false // this occurs if gateway_address was None +} + #[cfg(test)] mod tests { use std::assert_eq; @@ -561,4 +616,199 @@ mod tests { let expect = expect.map(|i| i.parse::().unwrap()); assert_eq!(get_original_src_from_fwded(&headers), expect) } + + use hickory_resolver::config::{ResolverConfig, ResolverOpts}; + + use crate::state::service::endpoint_uid; + use crate::state::workload::{NamespacedHostname, NetworkAddress}; + use crate::{ + identity::Identity, + state::{ + self, + service::{Endpoint, Service}, + workload::gatewayaddress::Destination, + }, + }; + use std::{ + collections::HashMap, + net::{Ipv4Addr, SocketAddrV4}, + sync::RwLock, + }; + + #[tokio::test] + async fn check_gateway() { + let w = mock_default_gateway_workload(); + let s = mock_default_gateway_service(); + let mut state = state::ProxyState::default(); + if let Err(err) = state.workloads.insert(w) { + panic!("received error inserting workload: {}", err); + } + state.services.insert(s); + let state = state::DemandProxyState::new( + Arc::new(RwLock::new(state)), + None, + ResolverConfig::default(), + ResolverOpts::default(), + ); + + let gateawy_id = Identity::Spiffe { + trust_domain: "cluster.local".to_string(), + namespace: "gatewayns".to_string(), + service_account: "default".to_string(), + }; + let from_gw_conn = Some(gateawy_id); + let not_from_gw_conn = Some(Identity::default()); + + let upstream_with_address = mock_wokload_with_gateway(Some(mock_default_gateway_address())); + assert!( + check_from_network_gateway( + state.clone(), + &upstream_with_address, + from_gw_conn.as_ref(), + ) + .await + ); + assert!( + !check_from_network_gateway( + state.clone(), + &upstream_with_address, + not_from_gw_conn.as_ref(), + ) + .await + ); + + // using hostname (will check the service variant of address::Address) + let upstream_with_hostname = + mock_wokload_with_gateway(Some(mock_default_gateway_hostname())); + assert!( + check_from_network_gateway( + state.clone(), + &upstream_with_hostname, + from_gw_conn.as_ref(), + ) + .await + ); + assert!( + !check_from_network_gateway(state, &upstream_with_hostname, not_from_gw_conn.as_ref()) + .await + ); + } + + // private helpers + fn mock_wokload_with_gateway(gw: Option) -> Workload { + Workload { + workload_ips: vec![IpAddr::V4(Ipv4Addr::LOCALHOST)], + waypoint: None, + network_gateway: gw, + gateway_address: None, + protocol: Default::default(), + uid: "".to_string(), + name: "app".to_string(), + namespace: "appns".to_string(), + trust_domain: "cluster.local".to_string(), + service_account: "default".to_string(), + network: "".to_string(), + workload_name: "app".to_string(), + workload_type: "deployment".to_string(), + canonical_name: "app".to_string(), + canonical_revision: "".to_string(), + hostname: "".to_string(), + node: "".to_string(), + status: Default::default(), + cluster_id: "Kubernetes".to_string(), + + authorization_policies: Vec::new(), + native_tunnel: false, + } + } + + fn mock_default_gateway_workload() -> Workload { + Workload { + workload_ips: vec![IpAddr::V4(mock_default_gateway_ipaddr())], + waypoint: None, + network_gateway: None, + gateway_address: None, + protocol: Default::default(), + uid: "".to_string(), + name: "gateway".to_string(), + namespace: "gatewayns".to_string(), + trust_domain: "cluster.local".to_string(), + service_account: "default".to_string(), + network: "".to_string(), + workload_name: "gateway".to_string(), + workload_type: "deployment".to_string(), + canonical_name: "".to_string(), + canonical_revision: "".to_string(), + hostname: "".to_string(), + node: "".to_string(), + status: Default::default(), + cluster_id: "Kubernetes".to_string(), + + authorization_policies: Vec::new(), + native_tunnel: false, + } + } + + fn mock_default_gateway_service() -> Service { + let vip1 = NetworkAddress { + address: IpAddr::V4(Ipv4Addr::new(127, 0, 10, 1)), + network: "".to_string(), + }; + let vips = vec![vip1]; + let mut ports = HashMap::new(); + ports.insert(8080, 80); + let mut endpoints = HashMap::new(); + let addr = Some(NetworkAddress { + network: "".to_string(), + address: IpAddr::V4(mock_default_gateway_ipaddr()), + }); + endpoints.insert( + endpoint_uid(&mock_default_gateway_workload().uid, addr.as_ref()), + Endpoint { + workload_uid: mock_default_gateway_workload().uid, + service: NamespacedHostname { + namespace: "gatewayns".to_string(), + hostname: "gateway".to_string(), + }, + address: addr, + port: ports.clone(), + }, + ); + Service { + name: "gateway".to_string(), + namespace: "gatewayns".to_string(), + hostname: "gateway".to_string(), + vips, + ports, + endpoints, + subject_alt_names: vec![], + waypoint: None, + } + } + + fn mock_default_gateway_address() -> GatewayAddress { + GatewayAddress { + destination: Destination::Address(NetworkAddress { + network: "".to_string(), + address: IpAddr::V4(mock_default_gateway_ipaddr()), + }), + hbone_mtls_port: 15008, + hbone_single_tls_port: Some(15003), + } + } + + fn mock_default_gateway_hostname() -> GatewayAddress { + GatewayAddress { + destination: Destination::Hostname(state::workload::NamespacedHostname { + namespace: "gatewayns".to_string(), + hostname: "gateway".to_string(), + }), + hbone_mtls_port: 15008, + hbone_single_tls_port: Some(15003), + } + } + + fn mock_default_gateway_ipaddr() -> Ipv4Addr { + Ipv4Addr::new(127, 0, 0, 100) + } } diff --git a/src/proxy/inbound.rs b/src/proxy/inbound.rs index ca9eda09e..d54b9994a 100644 --- a/src/proxy/inbound.rs +++ b/src/proxy/inbound.rs @@ -44,7 +44,7 @@ use crate::state::service::Service; use crate::state::workload::address::Address; use crate::{proxy, tls}; -use crate::state::workload::{address, GatewayAddress, NetworkAddress, Workload}; +use crate::state::workload::{NetworkAddress, Workload}; use crate::state::DemandProxyState; use crate::tls::TlsError; @@ -316,9 +316,9 @@ impl Inbound { }; let has_waypoint = upstream.waypoint.is_some(); - let from_waypoint = Self::check_from_waypoint(pi.state.clone(), &upstream, &conn).await; + let from_waypoint = proxy::check_from_waypoint(pi.state.clone(), &upstream, conn.src_identity.as_ref()).await; let from_gateway = - Self::check_from_network_gateway(pi.state.clone(), &upstream, &conn).await; + proxy::check_from_network_gateway(pi.state.clone(), &upstream, conn.src_identity.as_ref()).await; if from_gateway { debug!("request from gateway"); @@ -417,55 +417,6 @@ impl Inbound { .unwrap()) } - // Checks if the connection's source identity is the identity for the upstream's waypoint - async fn check_from_waypoint( - state: DemandProxyState, - upstream: &Workload, - conn: &Connection, - ) -> bool { - Self::check_gateway_address(state, conn, upstream.waypoint.as_ref()).await - } - - // Checks if the connection's source identity is the identity for the upstream's network - // gateway - async fn check_from_network_gateway( - state: DemandProxyState, - upstream: &Workload, - conn: &Connection, - ) -> bool { - Self::check_gateway_address(state, conn, upstream.network_gateway.as_ref()).await - } - - // Check if the source's identity matches any workloads that make up the given gateway - async fn check_gateway_address( - state: DemandProxyState, - conn: &Connection, - gateway_address: Option<&GatewayAddress>, - ) -> bool { - if let Some(gateway_address) = gateway_address { - let from_gateway = match state.fetch_destination(&gateway_address.destination).await { - Some(address::Address::Workload(wl)) => Some(wl.identity()) == conn.src_identity, - Some(address::Address::Service(svc)) => { - for (_ep_uid, ep) in svc.endpoints.iter() { - // fetch workloads by workload UID since we may not have an IP for an endpoint (e.g., endpoint is just a hostname) - if state - .fetch_workload_by_uid(&ep.workload_uid) - .await - .map(|w| w.identity()) - == conn.src_identity - { - return true; - } - } - false - } - None => false, - }; - return from_gateway; - } - false // this occurs if gateway_address was None - } - async fn find_inbound_upstream( state: DemandProxyState, conn: &Connection, @@ -619,211 +570,3 @@ impl crate::tls::ServerCertProvider for InboundCertProvider { } } -#[cfg(test)] -mod test { - use hickory_resolver::config::{ResolverConfig, ResolverOpts}; - - use super::*; - use crate::state::service::endpoint_uid; - use crate::state::workload::NamespacedHostname; - use crate::{ - identity::Identity, - state::{ - self, - service::{Endpoint, Service}, - workload::gatewayaddress::Destination, - }, - }; - use std::{ - collections::HashMap, - net::{Ipv4Addr, SocketAddrV4}, - sync::RwLock, - }; - - #[tokio::test] - async fn check_gateway() { - let w = mock_default_gateway_workload(); - let s = mock_default_gateway_service(); - let mut state = state::ProxyState::default(); - if let Err(err) = state.workloads.insert(w) { - panic!("received error inserting workload: {}", err); - } - state.services.insert(s); - let state = state::DemandProxyState::new( - Arc::new(RwLock::new(state)), - None, - ResolverConfig::default(), - ResolverOpts::default(), - ); - - let gateawy_id = Identity::Spiffe { - trust_domain: "cluster.local".to_string(), - namespace: "gatewayns".to_string(), - service_account: "default".to_string(), - }; - let from_gw_conn = Connection { - src_identity: Some(gateawy_id), - src_ip: IpAddr::V4(mock_default_gateway_ipaddr()), - dst_network: "default".to_string(), - dst: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 10), 80)), - }; - let not_from_gw_conn = Connection { - src_identity: Some(Identity::default()), - src_ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - dst_network: "default".to_string(), - dst: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 10), 80)), - }; - - let upstream_with_address = mock_wokload_with_gateway(Some(mock_default_gateway_address())); - assert!( - Inbound::check_from_network_gateway( - state.clone(), - &upstream_with_address, - &from_gw_conn - ) - .await - ); - assert!( - !Inbound::check_from_network_gateway( - state.clone(), - &upstream_with_address, - ¬_from_gw_conn - ) - .await - ); - - // using hostname (will check the service variant of address::Address) - let upstream_with_hostname = - mock_wokload_with_gateway(Some(mock_default_gateway_hostname())); - assert!( - Inbound::check_from_network_gateway( - state.clone(), - &upstream_with_hostname, - &from_gw_conn - ) - .await - ); - assert!( - !Inbound::check_from_network_gateway(state, &upstream_with_hostname, ¬_from_gw_conn) - .await - ); - } - - // private helpers - fn mock_wokload_with_gateway(gw: Option) -> Workload { - Workload { - workload_ips: vec![IpAddr::V4(Ipv4Addr::LOCALHOST)], - waypoint: None, - network_gateway: gw, - gateway_address: None, - protocol: Default::default(), - uid: "".to_string(), - name: "app".to_string(), - namespace: "appns".to_string(), - trust_domain: "cluster.local".to_string(), - service_account: "default".to_string(), - network: "".to_string(), - workload_name: "app".to_string(), - workload_type: "deployment".to_string(), - canonical_name: "app".to_string(), - canonical_revision: "".to_string(), - hostname: "".to_string(), - node: "".to_string(), - status: Default::default(), - cluster_id: "Kubernetes".to_string(), - - authorization_policies: Vec::new(), - native_tunnel: false, - } - } - - fn mock_default_gateway_workload() -> Workload { - Workload { - workload_ips: vec![IpAddr::V4(mock_default_gateway_ipaddr())], - waypoint: None, - network_gateway: None, - gateway_address: None, - protocol: Default::default(), - uid: "".to_string(), - name: "gateway".to_string(), - namespace: "gatewayns".to_string(), - trust_domain: "cluster.local".to_string(), - service_account: "default".to_string(), - network: "".to_string(), - workload_name: "gateway".to_string(), - workload_type: "deployment".to_string(), - canonical_name: "".to_string(), - canonical_revision: "".to_string(), - hostname: "".to_string(), - node: "".to_string(), - status: Default::default(), - cluster_id: "Kubernetes".to_string(), - - authorization_policies: Vec::new(), - native_tunnel: false, - } - } - - fn mock_default_gateway_service() -> Service { - let vip1 = NetworkAddress { - address: IpAddr::V4(Ipv4Addr::new(127, 0, 10, 1)), - network: "".to_string(), - }; - let vips = vec![vip1]; - let mut ports = HashMap::new(); - ports.insert(8080, 80); - let mut endpoints = HashMap::new(); - let addr = Some(NetworkAddress { - network: "".to_string(), - address: IpAddr::V4(mock_default_gateway_ipaddr()), - }); - endpoints.insert( - endpoint_uid(&mock_default_gateway_workload().uid, addr.as_ref()), - Endpoint { - workload_uid: mock_default_gateway_workload().uid, - service: NamespacedHostname { - namespace: "gatewayns".to_string(), - hostname: "gateway".to_string(), - }, - address: addr, - port: ports.clone(), - }, - ); - Service { - name: "gateway".to_string(), - namespace: "gatewayns".to_string(), - hostname: "gateway".to_string(), - vips, - ports, - endpoints, - subject_alt_names: vec![], - waypoint: None, - } - } - - fn mock_default_gateway_address() -> GatewayAddress { - GatewayAddress { - destination: Destination::Address(NetworkAddress { - network: "".to_string(), - address: IpAddr::V4(mock_default_gateway_ipaddr()), - }), - hbone_mtls_port: 15008, - hbone_single_tls_port: Some(15003), - } - } - - fn mock_default_gateway_hostname() -> GatewayAddress { - GatewayAddress { - destination: Destination::Hostname(state::workload::NamespacedHostname { - namespace: "gatewayns".to_string(), - hostname: "gateway".to_string(), - }), - hbone_mtls_port: 15008, - hbone_single_tls_port: Some(15003), - } - } - - fn mock_default_gateway_ipaddr() -> Ipv4Addr { - Ipv4Addr::new(127, 0, 0, 100) - } -} diff --git a/src/proxy/outbound.rs b/src/proxy/outbound.rs index c6c868c35..f644989a6 100644 --- a/src/proxy/outbound.rs +++ b/src/proxy/outbound.rs @@ -470,15 +470,26 @@ impl OutboundConnection { ) .await?; + // TODO src_id may not be enough; should also check addresses/uid + let src_id = Some(source_workload.identity()); + let from_waypoint = proxy::check_from_waypoint( + self.pi.state.clone(), + &mutable_us.workload, + src_id.as_ref(), + ) + .await; + // For case upstream server has enabled waypoint - match self - .pi - .state - .fetch_waypoint(&mutable_us.workload, workload_ip) - .await - { - Ok(None) => {} // workload doesn't have a waypoint; this is fine - Ok(Some(waypoint_us)) => { + match ( + from_waypoint, + self.pi + .state + .fetch_waypoint(&mutable_us.workload, workload_ip) + .await, + ) { + (_, Ok(None)) => {} // workload doesn't have a waypoint; this is fine + (true, _) => {} // we already traversed the waypoint + (false, Ok(Some(waypoint_us))) => { let waypoint_workload = waypoint_us.workload; let waypoint_ip = self .pi @@ -507,7 +518,7 @@ impl OutboundConnection { }); } // we expected the workload to have a waypoint, but could not find one - Err(e) => return Err(Error::UnknownWaypoint(e.to_string())), + (_, Err(e)) => return Err(Error::UnknownWaypoint(e.to_string())), } let us = match set_gateway_address(&mut mutable_us, workload_ip, self.pi.hbone_port) { @@ -666,6 +677,7 @@ mod tests { namespace: "ns".to_string(), addresses: vec![Bytes::copy_from_slice(&[127, 0, 0, 10])], node: "local-node".to_string(), + service_account: "waypoint-sa".to_string(), ..Default::default() }; let state = match xds {