Skip to content

Commit

Permalink
fix: clean up nomad jobs per test
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Apr 18, 2024
1 parent 57a5f79 commit be3f2fc
Show file tree
Hide file tree
Showing 28 changed files with 254 additions and 141 deletions.
25 changes: 7 additions & 18 deletions .github/actions/pre-init/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ inputs:
required: true
SCCACHE_AWS_ACCESS_KEY_ID:
required: true
OP_SERVICE_ACCOUNT_TOKEN:
required: true

runs:
using: composite
Expand Down Expand Up @@ -51,28 +53,15 @@ runs:
run: nix-shell --pure --run "echo 'Built shell'"

# MARK: Bolt
- name: Write Configs
- name: Pull Config
shell: bash -e {0}
env:
OP_SERVICE_ACCOUNT_TOKEN: ${{ inputs.OP_SERVICE_ACCOUNT_TOKEN }}
# Cannot use --pure `https://github.com/NixOS/nixpkgs/issues/66716`
run: |
cat << 'EOF' > Bolt.local.toml
namespace = "ci"
EOF
cat << 'EOF' > namespaces/ci.toml
[rivet.test]
# We set the public IP to localhost because we are running inside of a github action, therefore
# it is not reachable from the public.
[cluster.single_node]
public_ip = "127.0.0.1"
EOF
nix-shell --run "bolt config pull -y ci --op-namespace-path 'op://Engineering/ice6g6zdnag4lxl5d3dcs7jj64/config' --op-secrets-path 'op://Engineering/ice6g6zdnag4lxl5d3dcs7jj64/secrets'"
# Delete old Bolt binaries in order to fall back to the Nix-built binary
- name: Remove Old Bolt Builds
shell: bash -e {0}
run: rm -f target/debug/bolt target/release/bolt

# Run `bolt config generate` so that the `Check` job can start working sooner
- name: Generate Bolt Config
shell: bash -e {0}
run: nix-shell --pure --run "bolt config generate ci"
7 changes: 2 additions & 5 deletions .github/workflows/bolt-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,18 @@ jobs:
with:
SCCACHE_AWS_SECRET_ACCESS_KEY: ${{ secrets.SCCACHE_AWS_SECRET_ACCESS_KEY }}
SCCACHE_AWS_ACCESS_KEY_ID: ${{ secrets.SCCACHE_AWS_ACCESS_KEY_ID }}
OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}

- name: Bolt Init
run: nix-shell --pure --run "bolt init --yes ci"

- name: Bolt Test
run: nix-shell --pure --run "bolt test"
run: nix-shell --pure --run "bolt test -c 4"

- name: Tmate
if: failure()
uses: mxschmitt/action-tmate@v3

- name: Bolt Cleanup
if: always()
run: nix-shell --pure --run "bolt infra destroy --yes"

- name: Force Parallel Failure
if: failure()
uses: andymckay/cancel-action@0.3
1 change: 0 additions & 1 deletion infra/tf/infra_artifacts/providers.tf
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@ provider "aws" {
s3 = local.s3_provider.endpoint_external
}
}

21 changes: 14 additions & 7 deletions lib/bolt/core/src/context/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,6 @@ impl ProjectContextData {
9000, *minio_port,
"minio_port must not be changed if dns is configured"
);
} else {
assert!(
self.ns().rivet.provisioning.is_none(),
"must have dns configured to provision servers"
);
}
}
config::ns::ClusterKind::Distributed { .. } => {
Expand All @@ -209,8 +204,8 @@ impl ProjectContextData {
);

assert!(
self.ns().dns.is_some(),
"must have dns configured with a distributed cluster"
self.dns_enabled(),
"must have dns provider configured with a distributed cluster"
);
}
}
Expand All @@ -223,6 +218,10 @@ impl ProjectContextData {
.as_ref()
.and_then(|p| p.cluster.as_ref())
{
assert!(
self.dns_enabled(),
"must have dns configured to provision servers"
);
let mut unique_datacenter_ids = HashSet::new();

for (name_id, datacenter) in &cluster.datacenters {
Expand Down Expand Up @@ -787,6 +786,14 @@ impl ProjectContextData {
pub fn tls_enabled(&self) -> bool {
self.ns().dns.is_some()
}

pub fn dns_enabled(&self) -> bool {
self.ns()
.dns
.as_ref()
.and_then(|dns| dns.provider.as_ref())
.is_some()
}
}

pub struct S3Credentials {
Expand Down
4 changes: 3 additions & 1 deletion lib/bolt/core/src/context/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,12 +770,14 @@ impl ServiceContextData {
));

// Provide default Nomad variables if in test
if matches!(run_context, RunContext::Test { .. }) {
if let RunContext::Test { test_id } = run_context {
env.push(("KUBERNETES_REGION".into(), "global".into()));
env.push((
"KUBERNETES_TASK_DIR".into(),
project_ctx.gen_path().display().to_string(),
));

env.push(("RIVET_TEST_ID".into(), test_id.clone()));
}

// Generic context
Expand Down
162 changes: 115 additions & 47 deletions lib/bolt/core/src/tasks/test.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
use std::fmt;

use anyhow::*;
use futures_util::{FutureExt, StreamExt, TryStreamExt};
use rand::{seq::SliceRandom, thread_rng};
use reqwest::header;
use rivet_term::console::style;
use serde::Deserialize;

use indoc::formatdoc;
use std::{
collections::{HashMap, HashSet},
fmt,
path::{Path, PathBuf},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
};

use anyhow::*;
use futures_util::{FutureExt, StreamExt, TryStreamExt};
use indoc::formatdoc;
use rand::{seq::SliceRandom, thread_rng};
use reqwest::header;
use rivet_term::console::style;
use serde::Deserialize;
use tokio::{io::AsyncWriteExt, process::Command};

use crate::{
Expand Down Expand Up @@ -165,10 +164,11 @@ pub async fn test_services<T: AsRef<str>>(
};

// Generate test ID
let test_suite_id = gen_test_id();
let purge = !test_ctx.no_purge;

// Run tests
eprintln!();
let test_suite_id = gen_test_id();
rivet_term::status::progress("Running tests", &test_suite_id);
let tests_complete = Arc::new(AtomicUsize::new(0));
let test_count = test_binaries.len();
Expand All @@ -177,6 +177,7 @@ pub async fn test_services<T: AsRef<str>>(
let test_suite_id = test_suite_id.clone();
let tests_complete = tests_complete.clone();
let timeout = test_ctx.timeout;

async move {
run_test(
&ctx,
Expand All @@ -185,6 +186,7 @@ pub async fn test_services<T: AsRef<str>>(
tests_complete.clone(),
test_count,
timeout,
purge,
)
.await
}
Expand All @@ -196,7 +198,7 @@ pub async fn test_services<T: AsRef<str>>(
// Print results
print_results(&test_results);

cleanup_jobs(ctx, test_ctx.no_purge).await?;
cleanup_nomad(ctx, purge).await?;
cleanup_servers(ctx).await?;

// Error on failure
Expand Down Expand Up @@ -232,6 +234,7 @@ async fn run_test(
tests_complete: Arc<AtomicUsize>,
test_count: usize,
timeout: Option<u64>,
purge_nomad_jobs: bool,
) -> Result<TestResult> {
let svc_ctx = ctx
.all_services()
Expand Down Expand Up @@ -331,6 +334,8 @@ async fn run_test(
}
}

cleanup_nomad_test(ctx, &test_id, purge_nomad_jobs).await?;

Ok(TestResult { status })
}

Expand Down Expand Up @@ -505,41 +510,6 @@ fn print_results(test_results: &[TestResult]) {
}
}

async fn cleanup_jobs(ctx: &ProjectContext, no_purge: bool) -> Result<()> {
eprintln!();
rivet_term::status::progress("Cleaning up jobs", "");

let purge = if no_purge { "" } else { "-purge" };
let cleanup_cmd = formatdoc!(
r#"
nomad job status |
grep -v -e "ID" -e "No running jobs" |
cut -f1 -d ' ' |
xargs -I {{}} nomad job stop {purge} -detach {{}}
"#
);

let mut cmd = Command::new("kubectl");
cmd.args(&[
"exec",
"service/nomad-server",
"-n",
"nomad",
"--container",
"nomad-instance",
"--",
"sh",
"-c",
&cleanup_cmd,
]);
cmd.env("KUBECONFIG", ctx.gen_kubeconfig_path());

let status = cmd.status().await?;
ensure!(status.success());

Ok(())
}

#[derive(Deserialize)]
struct ApiErrorResponse {
errors: Vec<ApiError>,
Expand Down Expand Up @@ -759,3 +729,101 @@ fn gen_test_id() -> String {
})
.collect()
}

// Cleans up all nomad jobs from a specific test
async fn cleanup_nomad_test(ctx: &ProjectContext, test_id: &str, purge: bool) -> Result<()> {
// Fetch all jobs from this test
let fetch_cmd = format!(
r#"nomad operator api -filter 'Meta.rivet_test_id == "{test_id}"' /v1/jobs?meta=true"#
);

let mut cmd = Command::new("kubectl");
cmd.args(&[
"exec",
"service/nomad-server",
"-n",
"nomad",
"--container",
"nomad-instance",
"--",
"sh",
"-c",
&fetch_cmd,
]);
cmd.env("KUBECONFIG", ctx.gen_kubeconfig_path());

let output = cmd.output().await?;
ensure!(output.status.success());

let jobs: Vec<NomadJob> = serde_json::from_slice(&output.stdout)?;

// Cleanup jobs
let purge = if purge { "-purge" } else { "" };
let cleanup_cmd = jobs
.iter()
.map(|job| format!("nomad job stop {purge} -detach {}", job.id))
.collect::<Vec<_>>()
.join("\n");

let mut cmd = Command::new("kubectl");
cmd.args(&[
"exec",
"service/nomad-server",
"-n",
"nomad",
"--container",
"nomad-instance",
"--",
"sh",
"-c",
&cleanup_cmd,
]);
cmd.env("KUBECONFIG", ctx.gen_kubeconfig_path());

let output = cmd.output().await?;
ensure!(output.status.success());

Ok(())
}

// Cleans up all nomad jobs
async fn cleanup_nomad(ctx: &ProjectContext, purge: bool) -> Result<()> {
eprintln!();
rivet_term::status::progress("Cleaning up jobs", "");

let purge = if purge { "-purge" } else { "" };
let cleanup_cmd = formatdoc!(
r#"
nomad job status |
grep -v -e "ID" -e "No running jobs" |
cut -f1 -d ' ' |
xargs -I {{}} nomad job stop {purge} -detach {{}}
"#
);

let mut cmd = Command::new("kubectl");
cmd.args(&[
"exec",
"service/nomad-server",
"-n",
"nomad",
"--container",
"nomad-instance",
"--",
"sh",
"-c",
&cleanup_cmd,
]);
cmd.env("KUBECONFIG", ctx.gen_kubeconfig_path());

let status = cmd.status().await?;
ensure!(status.success());

Ok(())
}

#[derive(Debug, serde::Deserialize)]
struct NomadJob {
#[serde(rename = "ID")]
id: String,
}
1 change: 1 addition & 0 deletions lib/util/env/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ serde = { version = "1.0.193", features = ["derive"] }
serde_json = "1.0.109"
thiserror = "1.0"
uuid = { version = "1", features = ["v4"] }
types = { path = "../../types/core" }
8 changes: 8 additions & 0 deletions lib/util/env/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ lazy_static::lazy_static! {
static ref BILLING: Option<RivetBilling> = std::env::var("RIVET_BILLING")
.ok()
.map(|x| serde_json::from_str(&x).expect("failed to parse billing"));
static ref TEST_ID: Option<String> = std::env::var("RIVET_TEST_ID").ok();
}

/// Where this code is being written from. This is derived from the `RIVET_RUN_CONTEXT` environment
Expand Down Expand Up @@ -107,6 +108,13 @@ pub fn support_deprecated_subdomains() -> bool {
*SUPPORT_DEPRECATED_SUBDOMAINS
}

pub fn test_id_param() -> Vec<types::rivet::backend::job::Parameter> {
TEST_ID.as_ref().iter().map(|x| types::rivet::backend::job::Parameter {
key: "rivet_test_id".to_string(),
value: x.to_string(),
}).collect()
}

/// The host for the API.
pub fn host_api() -> &'static str {
match &*HOST_API {
Expand Down
Loading

0 comments on commit be3f2fc

Please sign in to comment.