Skip to content

Commit

Permalink
Harden scheduler to converge on common SchedulingPlan
Browse files Browse the repository at this point in the history
This commit adds an periodic SchedulingPlan check which allows multiple
Schedulers to eventually agree on the same SchedulingPlan. Before, it could
happen that there are two Schedulers that have different SchedulingPlans
(e.g. one still having an older version). If those plans are contradicting,
then the Schedulers would instruct nodes differently. The periodic update
helps preventing this situation.

Note: This logic can be removed once we allow the scheduler to accept
equivalent partition processor placements. See #2242.
  • Loading branch information
tillrohrmann committed Nov 22, 2024
1 parent ecc429b commit d169372
Showing 1 changed file with 31 additions and 11 deletions.
42 changes: 31 additions & 11 deletions crates/admin/src/cluster_controller/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::{BTreeMap, BTreeSet};

use rand::seq::IteratorRandom;
use std::collections::{BTreeMap, BTreeSet};
use std::time::{Duration, Instant};
use tracing::debug;
use xxhash_rust::xxh3::Xxh3Builder;

Expand Down Expand Up @@ -90,6 +90,7 @@ impl<T: PartitionProcessorPlacementHints> PartitionProcessorPlacementHints for &

pub struct Scheduler<T> {
scheduling_plan: SchedulingPlan,
last_updated_scheduling_plan: Instant,

task_center: TaskCenter,
metadata_store_client: MetadataStoreClient,
Expand Down Expand Up @@ -118,6 +119,7 @@ impl<T: TransportConnect> Scheduler<T> {

Ok(Self {
scheduling_plan,
last_updated_scheduling_plan: Instant::now(),
task_center,
metadata_store_client,
networking,
Expand Down Expand Up @@ -180,12 +182,12 @@ impl<T: TransportConnect> Scheduler<T> {
let scheduling_plan = self.try_update_scheduling_plan(scheduling_plan).await?;
match scheduling_plan {
UpdateOutcome::Written(scheduling_plan) => {
self.scheduling_plan = scheduling_plan;
self.assign_scheduling_plan(scheduling_plan);
break;
}
UpdateOutcome::NewerVersionFound(scheduling_plan) => {
self.scheduling_plan = scheduling_plan.clone();
builder = scheduling_plan.into_builder();
self.assign_scheduling_plan(scheduling_plan);
builder = self.scheduling_plan.clone().into_builder();
}
}
} else {
Expand All @@ -202,6 +204,21 @@ impl<T: TransportConnect> Scheduler<T> {
nodes_config: &NodesConfiguration,
placement_hints: impl PartitionProcessorPlacementHints,
) -> Result<(), Error> {
// todo temporary band-aid to ensure convergence of multiple schedulers. Remove once we
// accept equivalent configurations and remove persisting of the SchedulingPlan
if self.last_updated_scheduling_plan.elapsed() > Duration::from_secs(10) {
let new_scheduling_plan = self.fetch_scheduling_plan().await?;

if new_scheduling_plan.version() > self.scheduling_plan.version() {
debug!(
"Found a newer scheduling plan in the metadata store. Updating to version {}.",
new_scheduling_plan.version()
);
}

self.assign_scheduling_plan(new_scheduling_plan);
}

let mut builder = self.scheduling_plan.clone().into_builder();

self.ensure_replication(&mut builder, alive_workers, nodes_config, &placement_hints);
Expand All @@ -214,12 +231,17 @@ impl<T: TransportConnect> Scheduler<T> {
.into_inner();

debug!("Updated scheduling plan: {scheduling_plan:?}");
self.scheduling_plan = scheduling_plan;
self.assign_scheduling_plan(scheduling_plan);
}

Ok(())
}

fn assign_scheduling_plan(&mut self, scheduling_plan: SchedulingPlan) {
self.scheduling_plan = scheduling_plan;
self.last_updated_scheduling_plan = Instant::now();
}

async fn try_update_scheduling_plan(
&self,
scheduling_plan: SchedulingPlan,
Expand All @@ -237,21 +259,19 @@ impl<T: TransportConnect> Scheduler<T> {
Err(err) => match err {
WriteError::FailedPrecondition(_) => {
// There was a concurrent modification of the scheduling plan. Fetch the latest version.
let scheduling_plan = self
.fetch_scheduling_plan()
.await?
.expect("must be present");
let scheduling_plan = self.fetch_scheduling_plan().await?;
Ok(UpdateOutcome::NewerVersionFound(scheduling_plan))
}
err => Err(err.into()),
},
}
}

async fn fetch_scheduling_plan(&self) -> Result<Option<SchedulingPlan>, ReadError> {
async fn fetch_scheduling_plan(&self) -> Result<SchedulingPlan, ReadError> {
self.metadata_store_client
.get(SCHEDULING_PLAN_KEY.clone())
.await
.map(|scheduling_plan| scheduling_plan.expect("must be present"))
}

fn ensure_replication(
Expand Down

0 comments on commit d169372

Please sign in to comment.