Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(policy): run status reconcilation at fixed interval #13384

Merged
merged 1 commit into from
Nov 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 38 additions & 20 deletions policy-controller/k8s/status/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,30 +371,48 @@ impl Index {
/// Controller applies patches or the write leaseholder changes, all
/// routes have an up-to-date status.
pub async fn run(index: Arc<RwLock<Self>>, reconciliation_period: Duration) {
// Clone the claims watch out of the index. This will immediately
// drop the read lock on the index so that it is not held for the
// lifetime of this function.
let mut claims = index.read().claims.clone();
// Extract what we need from the index so we don't need to lock it for
// housekeeping.
let (instance, mut claims) = {
let idx = index.read();
(idx.name.clone(), idx.claims.clone())
};

// The timer is reset when this instance becomes the leader and it is
// polled as long as it is the leader. The timer ensures that
// reconciliation happens at consistent intervals after leadership is
// acquired.
let mut timer = time::interval(reconciliation_period);
timer.set_missed_tick_behavior(time::MissedTickBehavior::Delay);

let mut was_leader = false;
loop {
// Refresh the state of the lease on each iteration to ensure we're
// checking expiration.
let is_leader = claims.borrow_and_update().is_current_for(&instance);
if is_leader && !was_leader {
tracing::debug!("Became leader; resetting timer");
timer.reset_immediately();
}
was_leader = is_leader;

tokio::select! {
// Eagerly process claim updates to track leadership changes. If
// the claim changes, refesh the leadership status.
biased;
res = claims.changed() => {
res.expect("Claims watch must not be dropped");
tracing::trace!("Lease updated");
if tracing::enabled!(tracing::Level::TRACE) {
let c = claims.borrow();
tracing::trace!(claim=?*c, "Changed");
}
}
_ = time::sleep(reconciliation_period) => {}
}

// The claimant has changed, or we should attempt to reconcile all
//routes to account for any errors. In either case, we should
// only proceed if we are the current leader.
let claims = claims.borrow_and_update();
let index = index.read();

if !claims.is_current_for(&index.name) {
continue;
// Only wait for the timer if this instance is the leader.
_ = timer.tick(), if is_leader => {
index.read().reconcile_if_leader();
}
}
index.reconcile();
}
}

Expand Down Expand Up @@ -784,16 +802,16 @@ impl Index {
make_patch(id, status)
}

/// If this instance is the leader, reconcile the statuses for all resources
/// for which we control the status.
fn reconcile_if_leader(&self) {
let lease = self.claims.borrow();
if !lease.is_current_for(&self.name) {
tracing::trace!(%lease.holder, "Reconcilation skipped");
tracing::trace!(%lease.holder, ?lease.expiry, "Reconcilation skipped");
return;
}
self.reconcile();
}
drop(lease);

fn reconcile(&self) {
tracing::trace!(
egressnetworks = self.egress_networks.len(),
routes = self.route_refs.len(),
Expand Down
Loading