diff --git a/svc/pkg/ds/src/workers/nomad_monitor_eval_update.rs b/svc/pkg/ds/src/workers/nomad_monitor_eval_update.rs index 0184c2a8c2..f5dae0751a 100644 --- a/svc/pkg/ds/src/workers/nomad_monitor_eval_update.rs +++ b/svc/pkg/ds/src/workers/nomad_monitor_eval_update.rs @@ -63,41 +63,42 @@ async fn worker( } }; - // Fetch and update the run - let server_row = sql_fetch_optional!( - [ctx, ServerRow] - " - SELECT server_id, datacenter_id, nomad_eval_plan_ts - UPDATE db_ds.server_nomad - SET nomad_eval_plan_ts = $2 - WHERE - nomad_dispatched_job_id = $1 AND - run_meta_nomad.nomad_eval_plan_ts IS NULL - ", - job_id, - ctx.ts(), - ) - .await?; - - // Check if server found - let Some(server_row) = server_row else { - if ctx.req_dt() > util::duration::minutes(5) { - tracing::error!("discarding stale message"); - return Ok(()); - } else { - retry_bail!("server not found, may be race condition with insertion"); - } - }; - let server_id = server_row.server_id; + // TODO: Rewrite on workflows - if let Some(eval_plan_ts) = server_row.nomad_eval_plan_ts { - tracing::info!(?eval_plan_ts, "eval already planned"); - return Ok(()); - } + // // Fetch and update the run + // let server_row = sql_fetch_optional!( + // [ctx, ServerRow] + // " + // UPDATE db_ds.server_nomad + // SET nomad_eval_plan_ts = $2 + // WHERE + // nomad_dispatched_job_id = $1 AND + // nomad_eval_plan_ts IS NULL + // RETURNING server_id, datacenter_id, nomad_eval_plan_ts + // ", + // job_id, + // ctx.ts(), + // ) + // .await?; + + // // Check if server found + // let Some(server_row) = server_row else { + // if ctx.req_dt() > util::duration::minutes(5) { + // tracing::error!("discarding stale message"); + // return Ok(()); + // } else { + // retry_bail!("server not found, may be race condition with insertion"); + // } + // }; + // let server_id = server_row.server_id; - tracing::info!(%job_id, %server_id, ?eval_status, "updated server"); + // if let Some(eval_plan_ts) = server_row.nomad_eval_plan_ts { + // tracing::info!(?eval_plan_ts, "eval already planned"); + // return Ok(()); + // } + + // tracing::info!(%job_id, %server_id, ?eval_status, "updated server"); - // TODO: Rewrite on workflows // match eval_status { // EvalStatus::Failed => { // tracing::info!(%server_id, "eval failed"); diff --git a/svc/pkg/job-run/src/util.rs b/svc/pkg/job-run/src/util.rs index e453f38667..cd0a880762 100644 --- a/svc/pkg/job-run/src/util.rs +++ b/svc/pkg/job-run/src/util.rs @@ -1,3 +1,5 @@ +pub const NOMAD_REGION: &str = "global"; + // Have to patch `nomad_client::apis::allocations_api::signal_allocation` because it uses `/allocation` // instead of `/client/allocation` pub async fn signal_allocation( diff --git a/svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs b/svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs index ee03f32009..bafe9591a7 100644 --- a/svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs +++ b/svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs @@ -3,7 +3,10 @@ use proto::backend::{self, pkg::*}; use redis::AsyncCommands; use serde::Deserialize; -use crate::workers::NEW_NOMAD_CONFIG; +use crate::{ + util::{signal_allocation, NOMAD_REGION}, + workers::{NEW_NOMAD_CONFIG, NOMAD_CONFIG}, +}; #[derive(Debug, Deserialize)] #[serde(rename_all = "PascalCase")] @@ -229,9 +232,10 @@ async fn update_db( let run_row = sql_fetch_optional!( [ctx, RunRow, @tx tx] " - SELECT runs.run_id, runs.region_id, runs.stop_ts, run_meta_nomad.alloc_plan_ts - FROM db_job_state.run_meta_nomad - INNER JOIN db_job_state.runs ON runs.run_id = run_meta_nomad.run_id + SELECT r.run_id, r.region_id, r.stop_ts, rn.alloc_plan_ts + FROM db_job_state.run_meta_nomad AS rn + INNER JOIN db_job_state.runs AS r + ON r.run_id = run_meta_nomad.run_id WHERE dispatched_job_id = $1 FOR UPDATE OF run_meta_nomad ", @@ -255,7 +259,13 @@ async fn update_db( [ctx, @tx tx] " UPDATE db_job_state.run_meta_nomad - SET alloc_id = $2, alloc_plan_ts = $3, node_id = $4, node_name = $5, node_public_ipv4 = $6, node_vlan_ipv4 = $7 + SET + alloc_id = $2, + alloc_plan_ts = $3, + node_id = $4, + node_name = $5, + node_public_ipv4 = $6, + node_vlan_ipv4 = $7 WHERE run_id = $1 ", run_row.run_id, @@ -301,6 +311,29 @@ async fn update_db( ) .await?; } + } else { + tracing::warn!(%run_id, %alloc_id, "run was already allocated before, killing new allocation"); + + if let Err(err) = signal_allocation( + &NOMAD_CONFIG, + &alloc_id, + None, + Some(NOMAD_REGION), + None, + None, + Some(nomad_client::models::AllocSignalRequest { + task: None, + signal: Some("SIGKILL".to_string()), + }), + ) + .await + { + tracing::warn!( + ?err, + ?alloc_id, + "error while trying to manually kill allocation" + ); + } } // Update the run ports diff --git a/svc/pkg/job-run/src/workers/nomad_monitor_alloc_update.rs b/svc/pkg/job-run/src/workers/nomad_monitor_alloc_update.rs index 2968aeee9a..6e4b412ba1 100644 --- a/svc/pkg/job-run/src/workers/nomad_monitor_alloc_update.rs +++ b/svc/pkg/job-run/src/workers/nomad_monitor_alloc_update.rs @@ -15,14 +15,28 @@ enum TaskState { Dead, } +#[derive(Clone)] +struct RunData { + job_id: String, + alloc_id: String, + alloc_state_json: String, + main_task_state: TaskState, +} + +#[derive(Debug, sqlx::FromRow)] +struct RunRow { + run_id: Uuid, + alloc_id: Option, + start_ts: Option, + finish_ts: Option, +} + #[worker(name = "job-run-nomad-monitor-alloc-update")] async fn worker( ctx: &OperationContext, ) -> GlobalResult<()> { - let _crdb = ctx.crdb().await?; - let AllocationUpdated { allocation: alloc } = serde_json::from_str(&ctx.payload_json)?; - let alloc_state_json = serde_json::to_value(&alloc)?; + let alloc_state_json = serde_json::to_string(&alloc)?; let alloc_id = unwrap_ref!(alloc.ID); let eval_id = unwrap_ref!(alloc.eval_id, "alloc has no eval"); @@ -64,142 +78,157 @@ async fn worker( } }; - match main_task_state { + let run_data = RunData { + job_id: job_id.clone(), + alloc_id: alloc_id.clone(), + alloc_state_json, + main_task_state, + }; + + let run_found = rivet_pools::utils::crdb::tx(&ctx.crdb().await?, |tx| { + let ctx = ctx.clone(); + let run_data = run_data.clone(); + Box::pin(update_db(ctx, tx, run_data)) + }) + .await?; + + // Check if run found + if !run_found { + if ctx.req_dt() > util::duration::minutes(5) { + tracing::error!("discarding stale message"); + return Ok(()); + } else { + retry_bail!("run not found, may be race condition with insertion"); + } + }; + + Ok(()) +} + +/// Returns false if the run could not be found. +#[tracing::instrument(skip_all)] +async fn update_db( + ctx: OperationContext, + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + run_data: RunData, +) -> GlobalResult { + let run_row = sql_fetch_optional!( + [ctx, RunRow, @tx tx] + " + SELECT r.run_id, rn.alloc_id, r.start_ts, r.finish_ts + FROM db_job_state.run_meta_nomad AS rn + INNER JOIN db_job_state.runs AS r + ON r.run_id = run_meta_nomad.run_id + WHERE dispatched_job_id = $1 + FOR UPDATE OF runs, run_meta_nomad + ", + &run_data.job_id, + ) + .await?; + + // Check if run found + let run_row = if let Some(run_row) = run_row { + run_row + } else { + tracing::info!("caught race condition"); + return Ok(false); + }; + + if run_row + .alloc_id + .as_ref() + .map(|id| id != &run_data.alloc_id) + .unwrap_or_default() + { + tracing::warn!(existing_alloc_id=?run_row.alloc_id, new_alloc_id=%run_data.alloc_id, "alloc id does not match existing alloc id for job"); + + return Ok(true); + } + + let run_id = run_row.run_id; + + match run_data.main_task_state { TaskState::Pending => { tracing::info!("run pending"); - let run_row = sql_fetch_optional!( - [ctx, (Uuid,)] + sql_execute!( + [ctx, @tx tx] " UPDATE db_job_state.run_meta_nomad SET alloc_state = $2 - WHERE dispatched_job_id = $1 - RETURNING run_id + WHERE run_id = $1 ", - job_id, - &alloc_state_json, + run_id, + &run_data.alloc_state_json, ) .await?; - - if run_row.is_none() { - if ctx.req_dt() > util::duration::minutes(5) { - tracing::error!("discarding stale message"); - return Ok(()); - } else { - retry_bail!("run not found, may be race condition with insertion"); - } - }; - - Ok(()) } TaskState::Running => { - let run_row = sql_fetch_optional!( - [ctx, (Uuid, Option)] - " - WITH - select_run AS ( - SELECT runs.run_id, runs.start_ts - FROM db_job_state.run_meta_nomad - INNER JOIN db_job_state.runs ON runs.run_id = run_meta_nomad.run_id - WHERE dispatched_job_id = $1 - ), - _update_runs AS ( - UPDATE db_job_state.runs - SET start_ts = $2 - FROM select_run - WHERE - runs.run_id = select_run.run_id AND - runs.start_ts IS NULL - RETURNING 1 - ), - _update_run_meta_nomad AS ( - UPDATE db_job_state.run_meta_nomad - SET alloc_state = $3 - FROM select_run - WHERE run_meta_nomad.run_id = select_run.run_id - RETURNING 1 - ) - SELECT * FROM select_run - ", - job_id, - ctx.ts(), - &alloc_state_json, - ) - .await?; - - let Some((run_id, start_ts)) = run_row else { - if ctx.req_dt() > util::duration::minutes(5) { - tracing::error!("discarding stale message"); - return Ok(()); - } else { - retry_bail!("run not found, may be race condition with insertion"); - } - }; + if run_row.start_ts.is_none() { + sql_execute!( + [ctx, @tx tx] + " + WITH + _update_runs AS ( + UPDATE db_job_state.runs + SET start_ts = $2 + WHERE run_id = $1 + RETURNING 1 + ), + _update_run_meta_nomad AS ( + UPDATE db_job_state.run_meta_nomad + SET alloc_state = $3 + WHERE run_id = $1 + RETURNING 1 + ) + SELECT 1 FROM select_run + ", + run_id, + ctx.ts(), + &run_data.alloc_state_json, + ) + .await?; - if start_ts.is_none() { tracing::info!("run started"); msg!([ctx] job_run::msg::started(run_id) { run_id: Some(run_id.into()), }) .await?; - - Ok(()) } else { tracing::info!("run already started"); - - Ok(()) } } TaskState::Dead => { - let run_row = sql_fetch_optional!( - [ctx, (Uuid, Option)] - r#" - WITH - select_run AS ( - SELECT runs.run_id, runs.finish_ts - FROM db_job_state.run_meta_nomad - INNER JOIN db_job_state.runs ON runs.run_id = run_meta_nomad.run_id - WHERE dispatched_job_id = $1 - ), - _update_runs AS ( - UPDATE db_job_state.runs - SET - -- If the job stops immediately, the task state will never be "running" so we need to - -- make sure start_ts is set here as well - start_ts = COALESCE(start_ts, $2), - finish_ts = $2 - FROM select_run - WHERE - runs.run_id = select_run.run_id AND - runs.finish_ts IS NULL - RETURNING 1 - ), - _update_run_meta_nomad AS ( - UPDATE db_job_state.run_meta_nomad - SET alloc_state = $3 - FROM select_run - WHERE run_meta_nomad.run_id = select_run.run_id - RETURNING 1 - ) - SELECT * FROM select_run - "#, - job_id, - ctx.ts(), - &alloc_state_json, - ) - .await?; - - let Some((run_id, finish_ts)) = run_row else { - if ctx.req_dt() > util::duration::minutes(5) { - tracing::error!("discarding stale message"); - return Ok(()); - } else { - retry_bail!("run not found, may be race condition with insertion"); - } - }; + if run_row.finish_ts.is_none() { + sql_execute!( + [ctx, @tx tx] + r#" + WITH + update_runs AS ( + UPDATE db_job_state.runs + SET + -- If the job stops immediately, the task state will never be "running" so we need to + -- make sure start_ts is set here as well + start_ts = COALESCE(start_ts, $2), + finish_ts = $2 + WHERE run_id = $1 + RETURNING 1 + ), + update_run_meta_nomad AS ( + UPDATE db_job_state.run_meta_nomad + SET alloc_state = $3 + WHERE run_id = $1 + RETURNING 1 + ) + SELECT 1 + "#, + run_id, + ctx.ts(), + &run_data.alloc_state_json, + ) + .await?; - if finish_ts.is_none() { tracing::info!("run finished"); // Publish message @@ -216,12 +245,11 @@ async fn worker( run_id: Some(run_id.into()), }) .await?; - - Ok(()) } else { tracing::info!("run already finished"); - Ok(()) } } } + + Ok(true) } diff --git a/svc/pkg/job-run/src/workflows/drain_all.rs b/svc/pkg/job-run/src/workflows/drain_all.rs index a733cd1d38..a480af5631 100644 --- a/svc/pkg/job-run/src/workflows/drain_all.rs +++ b/svc/pkg/job-run/src/workflows/drain_all.rs @@ -2,12 +2,14 @@ use chirp_workflow::prelude::*; use futures_util::StreamExt; use rivet_operation::prelude::proto::backend::pkg::*; -use crate::{util::signal_allocation, workers::NOMAD_CONFIG}; +use crate::{ + util::{signal_allocation, NOMAD_REGION}, + workers::NOMAD_CONFIG, +}; // In ms, a small amount of time to separate the completion of the drain to the deletion of the // cluster server. We want the drain to complete first. const DRAIN_PADDING: u64 = 10000; -const NOMAD_REGION: &str = "global"; #[derive(Debug, Serialize, Deserialize)] pub struct Input {