Skip to content

Commit

Permalink
outbound doens't re-traverse waypoint
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenctl committed Mar 11, 2024
1 parent a517c5a commit a17e43e
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 271 deletions.
254 changes: 252 additions & 2 deletions src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ use tracing::{error, trace, warn, Instrument};
use inbound::Inbound;
pub use metrics::*;

use crate::identity::SecretManager;
use crate::identity::{Identity, SecretManager};
use crate::metrics::Recorder;
use crate::proxy::connection_manager::{ConnectionManager, PolicyWatcher};
use crate::proxy::inbound_passthrough::InboundPassthrough;
use crate::proxy::outbound::Outbound;
use crate::proxy::socks5::Socks5;
use crate::rbac::Connection;
use crate::state::service::{endpoint_uid, Service, ServiceDescription};
use crate::state::workload::{network_addr, Workload};
use crate::state::workload::address::Address;
use crate::state::workload::{network_addr, GatewayAddress, Workload};
use crate::state::{DemandProxyState, WorkloadInfo};
use crate::{config, identity, socket, tls};

Expand Down Expand Up @@ -529,6 +530,60 @@ pub fn guess_inbound_service(
.map(ServiceDescription::from)
}

// Checks if the connection's source identity is the identity for the upstream's waypoint
async fn check_from_waypoint(
state: DemandProxyState,
upstream: &Workload,
src_identity: Option<&Identity>,
) -> bool {
check_gateway_address(state, src_identity, upstream.waypoint.as_ref()).await
}

// Checks if the connection's source identity is the identity for the upstream's network
// gateway
async fn check_from_network_gateway(
state: DemandProxyState,
upstream: &Workload,
src_identity: Option<&Identity>,
) -> bool {
check_gateway_address(state, src_identity, upstream.network_gateway.as_ref()).await
}

// Check if the source's identity matches any workloads that make up the given gateway
// TODO: This can be made more accurate by also checking addresses.
async fn check_gateway_address(
state: DemandProxyState,
src_identity: Option<&Identity>,
gateway_address: Option<&GatewayAddress>,
) -> bool {
let Some(src_identity) = src_identity else {
return false;
};
if let Some(gateway_address) = gateway_address {
let from_gateway = match state.fetch_destination(&gateway_address.destination).await {
Some(Address::Workload(wl)) => &wl.identity() == src_identity,
Some(Address::Service(svc)) => {
for (_ep_uid, ep) in svc.endpoints.iter() {
// fetch workloads by workload UID since we may not have an IP for an endpoint (e.g., endpoint is just a hostname)
if state
.fetch_workload_by_uid(&ep.workload_uid)
.await
.map(|w| w.identity())
.as_ref()
== Some(src_identity)
{
return true;
}
}
false
}
None => false,
};
return from_gateway;
}
false // this occurs if gateway_address was None
}

#[cfg(test)]
mod tests {
use std::assert_eq;
Expand Down Expand Up @@ -561,4 +616,199 @@ mod tests {
let expect = expect.map(|i| i.parse::<IpAddr>().unwrap());
assert_eq!(get_original_src_from_fwded(&headers), expect)
}

use hickory_resolver::config::{ResolverConfig, ResolverOpts};

use crate::state::service::endpoint_uid;
use crate::state::workload::{NamespacedHostname, NetworkAddress};
use crate::{
identity::Identity,
state::{
self,
service::{Endpoint, Service},
workload::gatewayaddress::Destination,
},
};
use std::{
collections::HashMap,
net::{Ipv4Addr, SocketAddrV4},
sync::RwLock,
};

#[tokio::test]
async fn check_gateway() {
let w = mock_default_gateway_workload();
let s = mock_default_gateway_service();
let mut state = state::ProxyState::default();
if let Err(err) = state.workloads.insert(w) {
panic!("received error inserting workload: {}", err);
}
state.services.insert(s);
let state = state::DemandProxyState::new(
Arc::new(RwLock::new(state)),
None,
ResolverConfig::default(),
ResolverOpts::default(),
);

let gateawy_id = Identity::Spiffe {
trust_domain: "cluster.local".to_string(),
namespace: "gatewayns".to_string(),
service_account: "default".to_string(),
};
let from_gw_conn = Some(gateawy_id);
let not_from_gw_conn = Some(Identity::default());

let upstream_with_address = mock_wokload_with_gateway(Some(mock_default_gateway_address()));
assert!(
check_from_network_gateway(
state.clone(),
&upstream_with_address,
from_gw_conn.as_ref(),
)
.await
);
assert!(
!check_from_network_gateway(
state.clone(),
&upstream_with_address,
not_from_gw_conn.as_ref(),
)
.await
);

// using hostname (will check the service variant of address::Address)
let upstream_with_hostname =
mock_wokload_with_gateway(Some(mock_default_gateway_hostname()));
assert!(
check_from_network_gateway(
state.clone(),
&upstream_with_hostname,
from_gw_conn.as_ref(),
)
.await
);
assert!(
!check_from_network_gateway(state, &upstream_with_hostname, not_from_gw_conn.as_ref())
.await
);
}

// private helpers
fn mock_wokload_with_gateway(gw: Option<GatewayAddress>) -> Workload {
Workload {
workload_ips: vec![IpAddr::V4(Ipv4Addr::LOCALHOST)],
waypoint: None,
network_gateway: gw,
gateway_address: None,
protocol: Default::default(),
uid: "".to_string(),
name: "app".to_string(),
namespace: "appns".to_string(),
trust_domain: "cluster.local".to_string(),
service_account: "default".to_string(),
network: "".to_string(),
workload_name: "app".to_string(),
workload_type: "deployment".to_string(),
canonical_name: "app".to_string(),
canonical_revision: "".to_string(),
hostname: "".to_string(),
node: "".to_string(),
status: Default::default(),
cluster_id: "Kubernetes".to_string(),

authorization_policies: Vec::new(),
native_tunnel: false,
}
}

fn mock_default_gateway_workload() -> Workload {
Workload {
workload_ips: vec![IpAddr::V4(mock_default_gateway_ipaddr())],
waypoint: None,
network_gateway: None,
gateway_address: None,
protocol: Default::default(),
uid: "".to_string(),
name: "gateway".to_string(),
namespace: "gatewayns".to_string(),
trust_domain: "cluster.local".to_string(),
service_account: "default".to_string(),
network: "".to_string(),
workload_name: "gateway".to_string(),
workload_type: "deployment".to_string(),
canonical_name: "".to_string(),
canonical_revision: "".to_string(),
hostname: "".to_string(),
node: "".to_string(),
status: Default::default(),
cluster_id: "Kubernetes".to_string(),

authorization_policies: Vec::new(),
native_tunnel: false,
}
}

fn mock_default_gateway_service() -> Service {
let vip1 = NetworkAddress {
address: IpAddr::V4(Ipv4Addr::new(127, 0, 10, 1)),
network: "".to_string(),
};
let vips = vec![vip1];
let mut ports = HashMap::new();
ports.insert(8080, 80);
let mut endpoints = HashMap::new();
let addr = Some(NetworkAddress {
network: "".to_string(),
address: IpAddr::V4(mock_default_gateway_ipaddr()),
});
endpoints.insert(
endpoint_uid(&mock_default_gateway_workload().uid, addr.as_ref()),
Endpoint {
workload_uid: mock_default_gateway_workload().uid,
service: NamespacedHostname {
namespace: "gatewayns".to_string(),
hostname: "gateway".to_string(),
},
address: addr,
port: ports.clone(),
},
);
Service {
name: "gateway".to_string(),
namespace: "gatewayns".to_string(),
hostname: "gateway".to_string(),
vips,
ports,
endpoints,
subject_alt_names: vec![],
waypoint: None,
}
}

fn mock_default_gateway_address() -> GatewayAddress {
GatewayAddress {
destination: Destination::Address(NetworkAddress {
network: "".to_string(),
address: IpAddr::V4(mock_default_gateway_ipaddr()),
}),
hbone_mtls_port: 15008,
hbone_single_tls_port: Some(15003),
}
}

fn mock_default_gateway_hostname() -> GatewayAddress {
GatewayAddress {
destination: Destination::Hostname(state::workload::NamespacedHostname {
namespace: "gatewayns".to_string(),
hostname: "gateway".to_string(),
}),
hbone_mtls_port: 15008,
hbone_single_tls_port: Some(15003),
}
}

fn mock_default_gateway_ipaddr() -> Ipv4Addr {
Ipv4Addr::new(127, 0, 0, 100)
}
}
Loading

0 comments on commit a17e43e

Please sign in to comment.