Skip to content

Commit

Permalink
fix(job-run): delete second allocation immediately
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Aug 29, 2024
1 parent f526f3a commit 3c4cee7
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 157 deletions.
65 changes: 33 additions & 32 deletions svc/pkg/ds/src/workers/nomad_monitor_eval_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,41 +63,42 @@ async fn worker(
}
};

// Fetch and update the run
let server_row = sql_fetch_optional!(
[ctx, ServerRow]
"
SELECT server_id, datacenter_id, nomad_eval_plan_ts
UPDATE db_ds.server_nomad
SET nomad_eval_plan_ts = $2
WHERE
nomad_dispatched_job_id = $1 AND
run_meta_nomad.nomad_eval_plan_ts IS NULL
",
job_id,
ctx.ts(),
)
.await?;

// Check if server found
let Some(server_row) = server_row else {
if ctx.req_dt() > util::duration::minutes(5) {
tracing::error!("discarding stale message");
return Ok(());
} else {
retry_bail!("server not found, may be race condition with insertion");
}
};
let server_id = server_row.server_id;
// TODO: Rewrite on workflows

if let Some(eval_plan_ts) = server_row.nomad_eval_plan_ts {
tracing::info!(?eval_plan_ts, "eval already planned");
return Ok(());
}
// // Fetch and update the run
// let server_row = sql_fetch_optional!(
// [ctx, ServerRow]
// "
// UPDATE db_ds.server_nomad
// SET nomad_eval_plan_ts = $2
// WHERE
// nomad_dispatched_job_id = $1 AND
// nomad_eval_plan_ts IS NULL
// RETURNING server_id, datacenter_id, nomad_eval_plan_ts
// ",
// job_id,
// ctx.ts(),
// )
// .await?;

// // Check if server found
// let Some(server_row) = server_row else {
// if ctx.req_dt() > util::duration::minutes(5) {
// tracing::error!("discarding stale message");
// return Ok(());
// } else {
// retry_bail!("server not found, may be race condition with insertion");
// }
// };
// let server_id = server_row.server_id;

tracing::info!(%job_id, %server_id, ?eval_status, "updated server");
// if let Some(eval_plan_ts) = server_row.nomad_eval_plan_ts {
// tracing::info!(?eval_plan_ts, "eval already planned");
// return Ok(());
// }

// tracing::info!(%job_id, %server_id, ?eval_status, "updated server");

// TODO: Rewrite on workflows
// match eval_status {
// EvalStatus::Failed => {
// tracing::info!(%server_id, "eval failed");
Expand Down
2 changes: 2 additions & 0 deletions svc/pkg/job-run/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub const NOMAD_REGION: &str = "global";

// Have to patch `nomad_client::apis::allocations_api::signal_allocation` because it uses `/allocation`
// instead of `/client/allocation`
pub async fn signal_allocation(
Expand Down
43 changes: 38 additions & 5 deletions svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use proto::backend::{self, pkg::*};
use redis::AsyncCommands;
use serde::Deserialize;

use crate::workers::NEW_NOMAD_CONFIG;
use crate::{
util::{signal_allocation, NOMAD_REGION},
workers::{NEW_NOMAD_CONFIG, NOMAD_CONFIG},
};

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
Expand Down Expand Up @@ -229,9 +232,10 @@ async fn update_db(
let run_row = sql_fetch_optional!(
[ctx, RunRow, @tx tx]
"
SELECT runs.run_id, runs.region_id, runs.stop_ts, run_meta_nomad.alloc_plan_ts
FROM db_job_state.run_meta_nomad
INNER JOIN db_job_state.runs ON runs.run_id = run_meta_nomad.run_id
SELECT r.run_id, r.region_id, r.stop_ts, rn.alloc_plan_ts
FROM db_job_state.run_meta_nomad AS rn
INNER JOIN db_job_state.runs AS r
ON r.run_id = run_meta_nomad.run_id
WHERE dispatched_job_id = $1
FOR UPDATE OF run_meta_nomad
",
Expand All @@ -255,7 +259,13 @@ async fn update_db(
[ctx, @tx tx]
"
UPDATE db_job_state.run_meta_nomad
SET alloc_id = $2, alloc_plan_ts = $3, node_id = $4, node_name = $5, node_public_ipv4 = $6, node_vlan_ipv4 = $7
SET
alloc_id = $2,
alloc_plan_ts = $3,
node_id = $4,
node_name = $5,
node_public_ipv4 = $6,
node_vlan_ipv4 = $7
WHERE run_id = $1
",
run_row.run_id,
Expand Down Expand Up @@ -301,6 +311,29 @@ async fn update_db(
)
.await?;
}
} else {
tracing::warn!(%run_id, %alloc_id, "run was already allocated before, killing new allocation");

if let Err(err) = signal_allocation(
&NOMAD_CONFIG,
&alloc_id,
None,
Some(NOMAD_REGION),
None,
None,
Some(nomad_client::models::AllocSignalRequest {
task: None,
signal: Some("SIGKILL".to_string()),
}),
)
.await
{
tracing::warn!(
?err,
?alloc_id,
"error while trying to manually kill allocation"
);
}
}

// Update the run ports
Expand Down
Loading

0 comments on commit 3c4cee7

Please sign in to comment.