Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cleaning up identities #1021

Merged
merged 1 commit into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/cert_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use tracing::{debug, error, info};
pub trait CertFetcher: Send + Sync {
fn prefetch_cert(&self, w: &Workload);
fn clear_cert(&self, id: &Identity);
fn should_track_certificates_for_removal(&self, w: &Workload) -> bool;
}

/// A no-op implementation of [CertFetcher].
Expand All @@ -33,6 +34,9 @@ pub struct NoCertFetcher();
impl CertFetcher for NoCertFetcher {
fn prefetch_cert(&self, _: &Workload) {}
fn clear_cert(&self, _: &Identity) {}
fn should_track_certificates_for_removal(&self, _w: &Workload) -> bool {
false
}
}

/// Constructs an appropriate [CertFetcher] for the proxy config.
Expand Down Expand Up @@ -114,4 +118,11 @@ impl CertFetcher for CertFetcherImpl {
info!("couldn't clear identity: {:?}", e)
}
}

fn should_track_certificates_for_removal(&self, w: &Workload) -> bool {
// Only shared mode fetches other workloads's certs
self.proxy_mode == ProxyMode::Shared &&
// We only get certs for our own node
Some(w.node.as_str()) == self.local_node.as_deref()
}
}
2 changes: 1 addition & 1 deletion src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ mod tests {
let w = mock_default_gateway_workload();
let s = mock_default_gateway_service();
let mut state = state::ProxyState::default();
state.workloads.insert(Arc::new(w));
state.workloads.insert(Arc::new(w), true);
state.services.insert(s);
let state = state::DemandProxyState::new(
Arc::new(RwLock::new(state)),
Expand Down
2 changes: 1 addition & 1 deletion src/proxy/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ mod tests {
state.services.insert(svc);
}
for wl in workloads {
state.workloads.insert(Arc::new(wl));
state.workloads.insert(Arc::new(wl), true);
}

Ok(DemandProxyState::new(
Expand Down
12 changes: 7 additions & 5 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ mod tests {
let mut state = ProxyState::default();
state
.workloads
.insert(Arc::new(test_helpers::test_default_workload()));
.insert(Arc::new(test_helpers::test_default_workload()), true);
state.services.insert(test_helpers::mock_default_service());

let mock_proxy_state = DemandProxyState::new(
Expand Down Expand Up @@ -950,7 +950,7 @@ mod tests {
workload_ips: vec![IpAddr::V4(Ipv4Addr::new(192, 168, 0, 2))],
..test_helpers::test_default_workload()
};
state.workloads.insert(Arc::new(wl));
state.workloads.insert(Arc::new(wl), true);

let mock_proxy_state = DemandProxyState::new(
Arc::new(RwLock::new(state)),
Expand Down Expand Up @@ -1153,9 +1153,11 @@ mod tests {
}),
..test_helpers::mock_default_service()
};
state.workloads.insert(Arc::new(wl_no_locality.clone()));
state.workloads.insert(Arc::new(wl_match.clone()));
state.workloads.insert(Arc::new(wl_almost.clone()));
state
.workloads
.insert(Arc::new(wl_no_locality.clone()), true);
state.workloads.insert(Arc::new(wl_match.clone()), true);
state.workloads.insert(Arc::new(wl_almost.clone()), true);
state.services.insert(strict_svc.clone());
state.services.insert(failover_svc.clone());

Expand Down
11 changes: 9 additions & 2 deletions src/state/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,12 +597,12 @@ pub struct WorkloadStore {
by_uid: HashMap<Strng, Arc<Workload>>,
/// byHostname maps workload hostname to workloads.
by_hostname: HashMap<Strng, Arc<Workload>>,
// Identity->Set of UIDs
// Identity->Set of UIDs. Only stores local nodes
by_identity: HashMap<Identity, HashSet<Strng>>,
}

impl WorkloadStore {
pub fn insert(&mut self, w: Arc<Workload>) {
pub fn insert(&mut self, w: Arc<Workload>, track_identity: bool) {
// First, remove the entry entirely to make sure things are cleaned up properly.
self.remove(&w.uid);

Expand All @@ -614,6 +614,13 @@ impl WorkloadStore {
self.by_hostname.insert(w.hostname.clone(), w.clone());
}
self.by_uid.insert(w.uid.clone(), w.clone());
// Only track local nodes to avoid overhead
if track_identity {
self.by_identity
.entry(w.identity())
.or_default()
.insert(w.uid.clone());
}
Comment on lines +617 to +623
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment describes the intended use for track_identity rather than what this code does. Should we consider adding this comment to the insert method itself and/or changing track_identity to be is_node_local perhaps?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should_track_certificates_for_removal() probably belongs in workload and should be checked as part of insert, and not in cert_mgr - it's being called before workload.insert pretty much everywhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I did it this way is workload doesn't have access to the proxy mode, local node, etc at all, so it cannot make the call itself. We could plumb it down, but then its inconsistent with the should_prefetch_certificate and requires even more plumbing around

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I did it this way is workload doesn't have access to the proxy mode, local node, etc at all,

Figured.

We could plumb it down, but then its inconsistent with the should_prefetch_certificate and requires even more plumbing around

should_prefetch and should_track are almost the same check and it feels like both should probably go in workload but yeah that's getting OOS for this.

}

pub fn remove(&mut self, uid: &Strng) -> Option<Workload> {
Expand Down
19 changes: 16 additions & 3 deletions src/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,9 @@ pub fn local_xds_config(
Ok(b.into_inner().freeze())
}

pub async fn assert_eventually<F, T, Fut>(dur: Duration, f: F, expected: T)
/// check_eventually runs a function many times until it reaches the expected result.
/// If it doesn't the last result is returned
pub async fn check_eventually<F, T, Fut>(dur: Duration, f: F, expected: T) -> Result<(), T>
where
F: Fn() -> Fut,
Fut: Future<Output = T>,
Expand All @@ -381,17 +383,28 @@ where
attempts += 1;
last = f().await;
if last == expected {
return;
return Ok(());
}
trace!("attempt {attempts} with delay {delay:?}");
if SystemTime::now().add(delay) > end {
panic!("assert_eventually failed after {attempts}: last response: {last:?}")
return Err(last);
}
tokio::time::sleep(delay).await;
delay *= 2;
}
}

pub async fn assert_eventually<F, T, Fut>(dur: Duration, f: F, expected: T)
where
F: Fn() -> Fut,
Fut: Future<Output = T>,
T: Eq + Debug,
{
if let Err(last) = check_eventually(dur, f, expected).await {
panic!("assert_eventually failed: last response: {last:?}")
}
}

pub fn new_proxy_state(
xds_workloads: &[XdsWorkload],
xds_services: &[XdsService],
Expand Down
6 changes: 6 additions & 0 deletions src/test_helpers/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ impl WorkloadManager {
Ok(())
}

pub async fn delete_workload(&mut self, name: &str) -> anyhow::Result<()> {
self.workloads.retain(|w| w.workload.name != name);
self.refresh_config().await?;
Ok(())
}

/// workload_builder allows creating a new workload. It will run in its own network namespace.
pub fn workload_builder(&mut self, name: &str, node: &str) -> TestWorkloadBuilder {
TestWorkloadBuilder::new(name, self)
Expand Down
17 changes: 14 additions & 3 deletions src/xds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,12 @@ impl ProxyStateUpdateMutator {
self.cert_fetcher.prefetch_cert(&workload);

// Lock and upstate the stores.
let track = self
.cert_fetcher
.should_track_certificates_for_removal(&workload);
state.workloads.insert(workload.clone(), track);
// Unhealthy workloads are always inserted, as we may get or receive traffic to them.
// But we shouldn't include them in load balancing we do to Services.
state.workloads.insert(workload.clone());
if workload.status == HealthStatus::Healthy {
insert_service_endpoints(&workload, &services, &mut state.services)?;
}
Expand Down Expand Up @@ -174,7 +177,12 @@ impl ProxyStateUpdateMutator {

// This is a real removal (not a removal before insertion), and nothing else references the cert
// Clear it out
if !for_insert && !state.workloads.has_identity(&prev.identity()) {
if !for_insert
&& self
.cert_fetcher
.should_track_certificates_for_removal(&prev)
&& !state.workloads.has_identity(&prev.identity())
{
self.cert_fetcher.clear_cert(&prev.identity());
}
// We removed a workload, no reason to attempt to remove a service with the same name
Expand Down Expand Up @@ -457,7 +465,10 @@ impl LocalClient {
trace!("inserting local workload {}", &wl.workload.uid);
self.cert_fetcher.prefetch_cert(&wl.workload);
let w = Arc::new(wl.workload);
state.workloads.insert(w.clone());
state.workloads.insert(
w.clone(),
self.cert_fetcher.should_track_certificates_for_removal(&w),
);

let services: HashMap<String, PortList> = wl
.services
Expand Down
73 changes: 73 additions & 0 deletions tests/namespaced.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod namespaced {
use std::collections::HashMap;
use std::fs;
use std::fs::File;

use std::net::{IpAddr, SocketAddr};

use std::path::PathBuf;
Expand Down Expand Up @@ -713,6 +714,78 @@ mod namespaced {
Ok(())
}

#[tokio::test]
async fn test_prefetch_forget_certs() -> anyhow::Result<()> {
// TODO: this test doesn't really need namespacing, but the direct test doesn't allow dynamic config changes.
let mut manager = setup_netns_test!(InPod);
let id1 = identity::Identity::Spiffe {
trust_domain: "cluster.local".into(),
service_account: "sa1".into(),
namespace: "default".into(),
};
let id1s = id1.to_string();

let ta = manager.deploy_ztunnel(DEFAULT_NODE).await?;

let check = |want: Vec<String>, help: &str| {
let cm = ta.cert_manager.clone();
let help = help.to_string();
async move {
// Cert manager is async, so we need to wait
let res = check_eventually(
Duration::from_secs(2),
|| cm.collect_certs(|a, _b| a.to_string()),
want,
)
.await;
assert!(res.is_ok(), "{}: got {:?}", help, res.err().unwrap());
}
};
check(vec![], "initially empty").await;
manager
.workload_builder("id1-a-remote-node", REMOTE_NODE)
.identity(id1.clone())
.register()
.await?;
check(vec![], "we should not prefetch remote nodes").await;
manager
.workload_builder("id1-a-same-node", DEFAULT_NODE)
.identity(id1.clone())
.register()
.await?;
check(vec![id1s.clone()], "we should prefetch our nodes").await;
manager
.workload_builder("id1-b-same-node", DEFAULT_NODE)
.identity(id1.clone())
.register()
.await?;
check(
vec![id1s.clone()],
"multiple of same identity shouldn't do anything",
)
.await;
manager.delete_workload("id1-a-remote-node").await?;
check(
vec![id1s.clone()],
"removing remote node shouldn't impact anything",
)
.await;
manager.delete_workload("id1-b-same-node").await?;
check(
vec![id1s.clone()],
"removing local node shouldn't impact anything if I still have some running",
)
.await;
manager.delete_workload("id1-a-same-node").await?;
// TODO: this should be vec![], but our testing setup doesn't exercise the real codepath
check(
vec![id1s.clone()],
"removing final workload should clear things out",
)
.await;
Comment on lines +780 to +785
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something we're looking to address before merging? Is it something covered elsewhere so we're not that worried?

Kind of odd to assert a final state that isn't actually the state it should be

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love to before merging though it will take a lot of work I may not have time to in the timelines this would be good to merge in (1.22.0). I think asserting the final state in the meantime is better than no assertion so if/when we fix it, it is remembered

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, odd but probably the least bad option I suppose

Ok(())
}

/// initialize_namespace_tests sets up the namespace tests.
/// These utilize the `unshare` syscall to setup an environment where we:
/// * Are "root"
Expand Down