Skip to content

Commit

Permalink
Fix cleaning up identities (#1021)
Browse files Browse the repository at this point in the history
  • Loading branch information
howardjohn authored May 6, 2024
1 parent 89fa6f2 commit 3d7b6d0
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 15 deletions.
11 changes: 11 additions & 0 deletions src/cert_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use tracing::{debug, error, info};
pub trait CertFetcher: Send + Sync {
fn prefetch_cert(&self, w: &Workload);
fn clear_cert(&self, id: &Identity);
fn should_track_certificates_for_removal(&self, w: &Workload) -> bool;
}

/// A no-op implementation of [CertFetcher].
Expand All @@ -33,6 +34,9 @@ pub struct NoCertFetcher();
impl CertFetcher for NoCertFetcher {
fn prefetch_cert(&self, _: &Workload) {}
fn clear_cert(&self, _: &Identity) {}
fn should_track_certificates_for_removal(&self, _w: &Workload) -> bool {
false
}
}

/// Constructs an appropriate [CertFetcher] for the proxy config.
Expand Down Expand Up @@ -114,4 +118,11 @@ impl CertFetcher for CertFetcherImpl {
info!("couldn't clear identity: {:?}", e)
}
}

fn should_track_certificates_for_removal(&self, w: &Workload) -> bool {
// Only shared mode fetches other workloads's certs
self.proxy_mode == ProxyMode::Shared &&
// We only get certs for our own node
Some(w.node.as_str()) == self.local_node.as_deref()
}
}
2 changes: 1 addition & 1 deletion src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ mod tests {
let w = mock_default_gateway_workload();
let s = mock_default_gateway_service();
let mut state = state::ProxyState::default();
state.workloads.insert(Arc::new(w));
state.workloads.insert(Arc::new(w), true);
state.services.insert(s);
let state = state::DemandProxyState::new(
Arc::new(RwLock::new(state)),
Expand Down
2 changes: 1 addition & 1 deletion src/proxy/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ mod tests {
state.services.insert(svc);
}
for wl in workloads {
state.workloads.insert(Arc::new(wl));
state.workloads.insert(Arc::new(wl), true);
}

Ok(DemandProxyState::new(
Expand Down
12 changes: 7 additions & 5 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ mod tests {
let mut state = ProxyState::default();
state
.workloads
.insert(Arc::new(test_helpers::test_default_workload()));
.insert(Arc::new(test_helpers::test_default_workload()), true);
state.services.insert(test_helpers::mock_default_service());

let mock_proxy_state = DemandProxyState::new(
Expand Down Expand Up @@ -950,7 +950,7 @@ mod tests {
workload_ips: vec![IpAddr::V4(Ipv4Addr::new(192, 168, 0, 2))],
..test_helpers::test_default_workload()
};
state.workloads.insert(Arc::new(wl));
state.workloads.insert(Arc::new(wl), true);

let mock_proxy_state = DemandProxyState::new(
Arc::new(RwLock::new(state)),
Expand Down Expand Up @@ -1153,9 +1153,11 @@ mod tests {
}),
..test_helpers::mock_default_service()
};
state.workloads.insert(Arc::new(wl_no_locality.clone()));
state.workloads.insert(Arc::new(wl_match.clone()));
state.workloads.insert(Arc::new(wl_almost.clone()));
state
.workloads
.insert(Arc::new(wl_no_locality.clone()), true);
state.workloads.insert(Arc::new(wl_match.clone()), true);
state.workloads.insert(Arc::new(wl_almost.clone()), true);
state.services.insert(strict_svc.clone());
state.services.insert(failover_svc.clone());

Expand Down
11 changes: 9 additions & 2 deletions src/state/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,12 +597,12 @@ pub struct WorkloadStore {
by_uid: HashMap<Strng, Arc<Workload>>,
/// byHostname maps workload hostname to workloads.
by_hostname: HashMap<Strng, Arc<Workload>>,
// Identity->Set of UIDs
// Identity->Set of UIDs. Only stores local nodes
by_identity: HashMap<Identity, HashSet<Strng>>,
}

impl WorkloadStore {
pub fn insert(&mut self, w: Arc<Workload>) {
pub fn insert(&mut self, w: Arc<Workload>, track_identity: bool) {
// First, remove the entry entirely to make sure things are cleaned up properly.
self.remove(&w.uid);

Expand All @@ -614,6 +614,13 @@ impl WorkloadStore {
self.by_hostname.insert(w.hostname.clone(), w.clone());
}
self.by_uid.insert(w.uid.clone(), w.clone());
// Only track local nodes to avoid overhead
if track_identity {
self.by_identity
.entry(w.identity())
.or_default()
.insert(w.uid.clone());
}
}

pub fn remove(&mut self, uid: &Strng) -> Option<Workload> {
Expand Down
19 changes: 16 additions & 3 deletions src/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,9 @@ pub fn local_xds_config(
Ok(b.into_inner().freeze())
}

pub async fn assert_eventually<F, T, Fut>(dur: Duration, f: F, expected: T)
/// check_eventually runs a function many times until it reaches the expected result.
/// If it doesn't the last result is returned
pub async fn check_eventually<F, T, Fut>(dur: Duration, f: F, expected: T) -> Result<(), T>
where
F: Fn() -> Fut,
Fut: Future<Output = T>,
Expand All @@ -381,17 +383,28 @@ where
attempts += 1;
last = f().await;
if last == expected {
return;
return Ok(());
}
trace!("attempt {attempts} with delay {delay:?}");
if SystemTime::now().add(delay) > end {
panic!("assert_eventually failed after {attempts}: last response: {last:?}")
return Err(last);
}
tokio::time::sleep(delay).await;
delay *= 2;
}
}

pub async fn assert_eventually<F, T, Fut>(dur: Duration, f: F, expected: T)
where
F: Fn() -> Fut,
Fut: Future<Output = T>,
T: Eq + Debug,
{
if let Err(last) = check_eventually(dur, f, expected).await {
panic!("assert_eventually failed: last response: {last:?}")
}
}

pub fn new_proxy_state(
xds_workloads: &[XdsWorkload],
xds_services: &[XdsService],
Expand Down
6 changes: 6 additions & 0 deletions src/test_helpers/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ impl WorkloadManager {
Ok(())
}

pub async fn delete_workload(&mut self, name: &str) -> anyhow::Result<()> {
self.workloads.retain(|w| w.workload.name != name);
self.refresh_config().await?;
Ok(())
}

/// workload_builder allows creating a new workload. It will run in its own network namespace.
pub fn workload_builder(&mut self, name: &str, node: &str) -> TestWorkloadBuilder {
TestWorkloadBuilder::new(name, self)
Expand Down
17 changes: 14 additions & 3 deletions src/xds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,12 @@ impl ProxyStateUpdateMutator {
self.cert_fetcher.prefetch_cert(&workload);

// Lock and upstate the stores.
let track = self
.cert_fetcher
.should_track_certificates_for_removal(&workload);
state.workloads.insert(workload.clone(), track);
// Unhealthy workloads are always inserted, as we may get or receive traffic to them.
// But we shouldn't include them in load balancing we do to Services.
state.workloads.insert(workload.clone());
if workload.status == HealthStatus::Healthy {
insert_service_endpoints(&workload, &services, &mut state.services)?;
}
Expand Down Expand Up @@ -174,7 +177,12 @@ impl ProxyStateUpdateMutator {

// This is a real removal (not a removal before insertion), and nothing else references the cert
// Clear it out
if !for_insert && !state.workloads.has_identity(&prev.identity()) {
if !for_insert
&& self
.cert_fetcher
.should_track_certificates_for_removal(&prev)
&& !state.workloads.has_identity(&prev.identity())
{
self.cert_fetcher.clear_cert(&prev.identity());
}
// We removed a workload, no reason to attempt to remove a service with the same name
Expand Down Expand Up @@ -457,7 +465,10 @@ impl LocalClient {
trace!("inserting local workload {}", &wl.workload.uid);
self.cert_fetcher.prefetch_cert(&wl.workload);
let w = Arc::new(wl.workload);
state.workloads.insert(w.clone());
state.workloads.insert(
w.clone(),
self.cert_fetcher.should_track_certificates_for_removal(&w),
);

let services: HashMap<String, PortList> = wl
.services
Expand Down
73 changes: 73 additions & 0 deletions tests/namespaced.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod namespaced {
use std::collections::HashMap;
use std::fs;
use std::fs::File;

use std::net::{IpAddr, SocketAddr};

use std::path::PathBuf;
Expand Down Expand Up @@ -713,6 +714,78 @@ mod namespaced {
Ok(())
}

#[tokio::test]
async fn test_prefetch_forget_certs() -> anyhow::Result<()> {
// TODO: this test doesn't really need namespacing, but the direct test doesn't allow dynamic config changes.
let mut manager = setup_netns_test!(InPod);
let id1 = identity::Identity::Spiffe {
trust_domain: "cluster.local".into(),
service_account: "sa1".into(),
namespace: "default".into(),
};
let id1s = id1.to_string();

let ta = manager.deploy_ztunnel(DEFAULT_NODE).await?;

let check = |want: Vec<String>, help: &str| {
let cm = ta.cert_manager.clone();
let help = help.to_string();
async move {
// Cert manager is async, so we need to wait
let res = check_eventually(
Duration::from_secs(2),
|| cm.collect_certs(|a, _b| a.to_string()),
want,
)
.await;
assert!(res.is_ok(), "{}: got {:?}", help, res.err().unwrap());
}
};
check(vec![], "initially empty").await;
manager
.workload_builder("id1-a-remote-node", REMOTE_NODE)
.identity(id1.clone())
.register()
.await?;
check(vec![], "we should not prefetch remote nodes").await;
manager
.workload_builder("id1-a-same-node", DEFAULT_NODE)
.identity(id1.clone())
.register()
.await?;
check(vec![id1s.clone()], "we should prefetch our nodes").await;
manager
.workload_builder("id1-b-same-node", DEFAULT_NODE)
.identity(id1.clone())
.register()
.await?;
check(
vec![id1s.clone()],
"multiple of same identity shouldn't do anything",
)
.await;
manager.delete_workload("id1-a-remote-node").await?;
check(
vec![id1s.clone()],
"removing remote node shouldn't impact anything",
)
.await;
manager.delete_workload("id1-b-same-node").await?;
check(
vec![id1s.clone()],
"removing local node shouldn't impact anything if I still have some running",
)
.await;
manager.delete_workload("id1-a-same-node").await?;
// TODO: this should be vec![], but our testing setup doesn't exercise the real codepath
check(
vec![id1s.clone()],
"removing final workload should clear things out",
)
.await;
Ok(())
}

/// initialize_namespace_tests sets up the namespace tests.
/// These utilize the `unshare` syscall to setup an environment where we:
/// * Are "root"
Expand Down

0 comments on commit 3d7b6d0

Please sign in to comment.