Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(clusters): fix dc scale job downscale logic, prebake disk waiting #1078

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 65 additions & 43 deletions svc/pkg/cluster/src/workflows/datacenter/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
// tainted server: a tainted server

use std::{
cmp::Ordering,
collections::HashMap,
convert::{TryFrom, TryInto},
iter::{DoubleEndedIterator, Iterator},
Expand Down Expand Up @@ -278,13 +277,6 @@ async fn inner(
};

scale_servers(&ctx, tx, &mut actions, &servers, &pool_ctx).await?;

match pool_ctx.pool_type {
PoolType::Job => {
cleanup_tainted_job_servers(&ctx, tx, &mut actions, &servers, &pool_ctx).await?
}
_ => cleanup_tainted_servers(&ctx, tx, &mut actions, &servers, &pool_ctx).await?,
}
}

destroy_drained_servers(&ctx, tx, &mut actions, &servers).await?;
Expand All @@ -304,62 +296,92 @@ async fn scale_servers(
.filter(|server| server.pool_type == pctx.pool_type)
.filter(|server| !server.is_tainted);

let active_servers_in_pool = servers_in_pool
// Active servers may not be entirely installed. This is important as we cannot filter out servers that
// aren't installed or provisioned yet here.
let active_servers = servers_in_pool
.clone()
.filter(|server| matches!(server.drain_state, DrainState::None));
let active_count = active_servers_in_pool.clone().count();
let active_count = active_servers.clone().count();

tracing::info!(desired=%pctx.desired_count, active=%active_count, "comparing {:?}", pctx.pool_type);

match pctx.desired_count.cmp(&active_count) {
Ordering::Less => match pctx.pool_type {
PoolType::Job => {
scale_down_job_servers(ctx, tx, actions, pctx, active_servers_in_pool, active_count)
.await?
// Scale up
if pctx.desired_count > active_count {
scale_up_servers(ctx, tx, actions, pctx, servers_in_pool, active_count).await?;
}

// Scale down
match pctx.pool_type {
PoolType::Job => {
let (nomad_servers, without_nomad_servers) = active_servers
.clone()
.partition::<Vec<_>, _>(|server| server.has_nomad_node);

if pctx.desired_count < nomad_servers.len() {
scale_down_job_servers(
ctx,
tx,
actions,
pctx,
nomad_servers,
without_nomad_servers,
)
.await?;
}
PoolType::Gg => {
scale_down_gg_servers(ctx, tx, actions, pctx, active_servers_in_pool, active_count)
.await?
}
PoolType::Gg => {
let installed_servers = active_servers.filter(|server| server.is_installed);
let installed_count = installed_servers.clone().count();

if pctx.desired_count < installed_count {
scale_down_gg_servers(ctx, tx, actions, pctx, installed_servers, installed_count)
.await?;
}
PoolType::Ats => {
scale_down_ats_servers(ctx, tx, actions, pctx, active_servers_in_pool, active_count)
.await?
}
PoolType::Ats => {
let installed_servers = active_servers.filter(|server| server.is_installed);
let installed_count = installed_servers.clone().count();

if pctx.desired_count < installed_count {
scale_down_ats_servers(ctx, tx, actions, pctx, installed_servers, installed_count)
.await?;
}
},
Ordering::Greater => {
scale_up_servers(ctx, tx, actions, pctx, servers_in_pool, active_count).await?;
}
Ordering::Equal => {}
}

// Cleanup
match pctx.pool_type {
PoolType::Job => cleanup_tainted_job_servers(ctx, tx, actions, servers, pctx).await?,
_ => cleanup_tainted_servers(ctx, tx, actions, servers, pctx).await?,
}

Ok(())
}

async fn scale_down_job_servers<'a, I: Iterator<Item = &'a Server>>(
async fn scale_down_job_servers(
ctx: &ActivityCtx,
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
actions: &mut Vec<Action>,
pctx: &PoolCtx,
active_servers: I,
active_count: usize,
nomad_servers: Vec<&Server>,
without_nomad_servers: Vec<&Server>,
) -> GlobalResult<()> {
tracing::info!(
datacenter_id=?pctx.datacenter_id,
desired=%pctx.desired_count,
active=%active_count,
nomad_servers=%nomad_servers.len(),
"scaling down job"
);

let (nomad_servers, without_nomad_servers) =
active_servers.partition::<Vec<_>, _>(|server| server.has_nomad_node);
let diff = nomad_servers.len().saturating_sub(pctx.desired_count);

let destroy_count = match pctx.provider {
// Never destroy servers when scaling down with Linode, always drain
Provider::Linode => 0,
#[allow(unreachable_patterns)]
_ => (active_count - pctx.desired_count).min(without_nomad_servers.len()),
_ => diff.min(without_nomad_servers.len()),
};
let drain_count = active_count - pctx.desired_count - destroy_count;
let drain_count = diff - destroy_count;

// Destroy servers
if destroy_count != 0 {
Expand Down Expand Up @@ -394,23 +416,23 @@ async fn scale_down_gg_servers<'a, I: Iterator<Item = &'a Server> + DoubleEndedI
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
actions: &mut Vec<Action>,
pctx: &PoolCtx,
active_servers: I,
active_count: usize,
installed_servers: I,
installed_count: usize,
) -> GlobalResult<()> {
tracing::info!(
datacenter_id=?pctx.datacenter_id,
desired=%pctx.desired_count,
active=%active_count,
installed=%installed_count,
"scaling down gg"
);

let drain_count = active_count - pctx.desired_count;
let drain_count = installed_count.saturating_sub(pctx.desired_count);

// Drain servers
if drain_count != 0 {
tracing::info!(count=%drain_count, "draining gg servers");

let drain_candidates = active_servers
let drain_candidates = installed_servers
.rev()
.take(drain_count)
.map(|server| server.server_id);
Expand All @@ -429,23 +451,23 @@ async fn scale_down_ats_servers<
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
actions: &mut Vec<Action>,
pctx: &PoolCtx,
active_servers: I,
active_count: usize,
installed_servers: I,
installed_count: usize,
) -> GlobalResult<()> {
tracing::info!(
datacenter_id=?pctx.datacenter_id,
desired=%pctx.desired_count,
active=%active_count,
installed=%installed_count,
"scaling down ats"
);

let drain_count = active_count - pctx.desired_count;
let drain_count = installed_count.saturating_sub(pctx.desired_count);

// Drain servers
if drain_count != 0 {
tracing::info!(count=%drain_count, "draining ats servers");

let drain_candidates = active_servers
let drain_candidates = installed_servers
.rev()
.take(drain_count)
.map(|server| server.server_id);
Expand Down
9 changes: 9 additions & 0 deletions svc/pkg/cluster/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,15 @@ pub(crate) async fn cluster_server(ctx: &mut WorkflowCtx, input: &Input) -> Glob
})
.await?;

// Scale to bring up a new server to take this server's place
ctx.tagged_signal(
&json!({
"datacenter_id": input.datacenter_id,
}),
crate::workflows::datacenter::Scale {},
)
.await?;

bail!("failed all attempts to provision server");
};

Expand Down
63 changes: 43 additions & 20 deletions svc/pkg/cluster/standalone/workflow-backfill/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,23 +557,17 @@ pub async fn run_from_env() -> GlobalResult<()> {
)?;

#[derive(Serialize, Hash)]
struct CreateDisksInput {
struct CreateBootDiskInput {
api_token: Option<String>,
image: String,
ssh_public_key: String,
linode_id: u64,
disk_size: u64,
}

#[derive(Serialize, Hash)]
struct CreateDisksOutput {
boot_id: u64,
swap_id: u64,
}

wf.activity(
"create_disks",
CreateDisksInput {
"create_boot_disk",
CreateBootDiskInput {
api_token: dc.provider_api_token.clone(),
image: "linode/debian11".to_string(),
// Not the actual public key, but not required
Expand All @@ -582,20 +576,51 @@ pub async fn run_from_env() -> GlobalResult<()> {
// Not the actual server disk size, but not required
disk_size: 0,
},
CreateDisksOutput {
// Not the actual boot id, but not required
0,
)?;

#[derive(Serialize, Hash)]
struct WaitDiskReadyInput {
api_token: Option<String>,
linode_id: u64,
disk_id: u64,
}

wf.activity(
"wait_disk_ready",
WaitDiskReadyInput {
api_token: dc.provider_api_token.clone(),
linode_id,
// Not the actual boot id, but not required
boot_id: 0,
// Not the actual swap id, but not required
swap_id: 0,
disk_id: 0,
},
serde_json::Value::Null,
)?;

#[derive(Serialize, Hash)]
struct CreateSwapDiskInput {
api_token: Option<String>,
linode_id: u64,
}

wf.activity(
"create_swap_disk",
CreateSwapDiskInput {
api_token: dc.provider_api_token.clone(),
linode_id,
},
// Not the actual boot id, but not required
0,
)?;

#[derive(Serialize, Hash)]
struct CreateInstanceConfigInput {
api_token: Option<String>,
vlan_ip: Option<Ipv4Addr>,
linode_id: u64,
disks: CreateDisksOutput,
boot_disk_id: u64,
swap_disk_id: u64,
}

wf.activity(
Expand All @@ -613,12 +638,10 @@ pub async fn run_from_env() -> GlobalResult<()> {
})
.transpose()?,
linode_id,
disks: CreateDisksOutput {
// Not the actual boot id, but not required
boot_id: 0,
// Not the actual swap id, but not required
swap_id: 0,
},
// Not the actual boot id, but not required
boot_disk_id: 0,
// Not the actual swap id, but not required
swap_disk_id: 0,
},
serde_json::Value::Null,
)?;
Expand Down
18 changes: 6 additions & 12 deletions svc/pkg/linode/src/util/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,13 @@ pub struct CreateDiskResponse {
pub id: u64,
}

pub struct CreateDisksResponse {
pub boot_id: u64,
pub swap_id: u64,
}

pub async fn create_disks(
pub async fn create_boot_disk(
client: &Client,
ssh_key: &str,
linode_id: u64,
image: &str,
server_disk_size: u64,
) -> GlobalResult<CreateDisksResponse> {
) -> GlobalResult<u64> {
tracing::info!("creating boot disk");

let boot_disk_res = client
Expand All @@ -134,8 +129,10 @@ pub async fn create_disks(
)
.await?;

wait_disk_ready(client, linode_id, boot_disk_res.id).await?;
Ok(boot_disk_res.id)
}

pub async fn create_swap_disk(client: &Client, linode_id: u64) -> GlobalResult<u64> {
tracing::info!("creating swap disk");

let swap_disk_res = client
Expand All @@ -149,10 +146,7 @@ pub async fn create_disks(
)
.await?;

Ok(CreateDisksResponse {
boot_id: boot_disk_res.id,
swap_id: swap_disk_res.id,
})
Ok(swap_disk_res.id)
}

pub async fn create_instance_config(
Expand Down
Loading
Loading