From 9e98f7484939e4fe82931e12649510a5ebce9e06 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Thu, 28 Mar 2024 21:01:31 +0000 Subject: [PATCH] fix: run all tests in one pod --- lib/bolt/core/src/context/service.rs | 228 ++++++------ lib/bolt/core/src/dep/k8s/gen.rs | 2 +- lib/bolt/core/src/tasks/test.rs | 522 +++++++++++++++++++-------- 3 files changed, 490 insertions(+), 262 deletions(-) diff --git a/lib/bolt/core/src/context/service.rs b/lib/bolt/core/src/context/service.rs index dca60a4735..04ae6fba4e 100644 --- a/lib/bolt/core/src/context/service.rs +++ b/lib/bolt/core/src/context/service.rs @@ -1,5 +1,6 @@ use anyhow::{ensure, Context, Result}; use async_recursion::async_recursion; +use indexmap::IndexMap; use std::{ collections::HashMap, hash::{Hash, Hasher}, @@ -711,11 +712,11 @@ impl ServiceContextData { Ok(secrets) } - pub async fn env(&self, run_context: &RunContext) -> Result> { + pub async fn env(&self, run_context: &RunContext) -> Result> { let project_ctx = self.project().await; let region_id = project_ctx.primary_region_or_local(); - let mut env = Vec::new(); + let mut env = IndexMap::new(); // HACK: Link to dynamically linked libraries in /nix/store // @@ -724,19 +725,19 @@ impl ServiceContextData { // // The `/nix/store` directory is mounted as a volume. if let config::ns::ClusterKind::SingleNode { .. } = project_ctx.ns().cluster.kind { - env.push(( + env.insert( "LD_LIBRARY_PATH".into(), std::env::var("LD_LIBRARY_PATH").context("missing LD_LIBRARY_PATH")?, - )); + ); } // TODO: This is re-running the hashing function for every service when we already did this in the planning step // Provide source hash to purge the cache when the service is updated let source_hash = self.source_hash_dev(&BuildOptimization::Debug).await?; - env.push(("RIVET_SOURCE_HASH".into(), source_hash.clone())); + env.insert("RIVET_SOURCE_HASH".into(), source_hash.clone()); let ns_service_config = self.ns_service_config().await; - env.push(( + env.insert( "TOKIO_WORKER_THREADS".into(), if ns_service_config.resources.cpu >= 1000 { (ns_service_config.resources.cpu / 1000).max(1) @@ -744,60 +745,58 @@ impl ServiceContextData { 1 } .to_string(), - )); + ); // Provide default Nomad variables if in test - if let RunContext::Test { test_id } = run_context { - env.push(("KUBERNETES_REGION".into(), "global".into())); - env.push(("KUBERNETES_DC".into(), region_id.clone())); - env.push(( + if let RunContext::Test { .. } = run_context { + env.insert("KUBERNETES_REGION".into(), "global".into()); + env.insert("KUBERNETES_DC".into(), region_id.clone()); + env.insert( "KUBERNETES_TASK_DIR".into(), project_ctx.gen_path().display().to_string(), - )); - - env.push(("RIVET_TEST_ID".into(), test_id.clone())); + ); } // Generic context - env.push(("RIVET_RUN_CONTEXT".into(), run_context.short().into())); - env.push(("RIVET_NAMESPACE".into(), project_ctx.ns_id().into())); - env.push(( + env.insert("RIVET_RUN_CONTEXT".into(), run_context.short().into()); + env.insert("RIVET_NAMESPACE".into(), project_ctx.ns_id().into()); + env.insert( "RIVET_CLUSTER_ID".into(), project_ctx.ns().cluster.id.to_string(), - )); + ); if self.enable_tokio_console() { - env.push(("TOKIO_CONSOLE_ENABLE".into(), "1".into())); - env.push(( + env.insert("TOKIO_CONSOLE_ENABLE".into(), "1".into()); + env.insert( "TOKIO_CONSOLE_BIND".into(), format!("0.0.0.0:{}", k8s::gen::TOKIO_CONSOLE_PORT), - )); + ); } - env.push(( + env.insert( "RIVET_ACCESS_KIND".into(), match project_ctx.ns().rivet.access { config::ns::RivetAccess::Private {} => "private".into(), config::ns::RivetAccess::Public {} => "public".into(), }, - )); + ); if project_ctx.ns().rivet.login.enable_admin { - env.push(("RIVET_ACCESS_TOKEN_LOGIN".into(), "1".into())); + env.insert("RIVET_ACCESS_TOKEN_LOGIN".into(), "1".into()); } // Domains if let Some(x) = project_ctx.domain_main() { - env.push(("RIVET_DOMAIN_MAIN".into(), x)); + env.insert("RIVET_DOMAIN_MAIN".into(), x); } if let Some(x) = project_ctx.domain_cdn() { - env.push(("RIVET_DOMAIN_CDN".into(), x)); + env.insert("RIVET_DOMAIN_CDN".into(), x); } if let Some(x) = project_ctx.domain_job() { - env.push(("RIVET_DOMAIN_JOB".into(), x)); + env.insert("RIVET_DOMAIN_JOB".into(), x); } if let Some(x) = project_ctx.domain_main_api() { - env.push(("RIVET_DOMAIN_MAIN_API".into(), x)); + env.insert("RIVET_DOMAIN_MAIN_API".into(), x); } if let Some(true) = project_ctx .ns() @@ -805,142 +804,142 @@ impl ServiceContextData { .as_ref() .map(|x| x.deprecated_subdomains) { - env.push(("RIVET_SUPPORT_DEPRECATED_SUBDOMAINS".into(), "1".into())); + env.insert("RIVET_SUPPORT_DEPRECATED_SUBDOMAINS".into(), "1".into()); } - env.push(("RIVET_HOST_API".into(), project_ctx.host_api())); - env.push(("RIVET_ORIGIN_API".into(), project_ctx.origin_api())); - env.push(("RIVET_ORIGIN_HUB".into(), project_ctx.origin_hub())); + env.insert("RIVET_HOST_API".into(), project_ctx.host_api()); + env.insert("RIVET_ORIGIN_API".into(), project_ctx.origin_api()); + env.insert("RIVET_ORIGIN_HUB".into(), project_ctx.origin_hub()); // DNS if let Some(dns) = &project_ctx.ns().dns { if let Some(provider) = &dns.provider { - env.push(( + env.insert( "RIVET_DNS_PROVIDER".into(), match provider { config::ns::DnsProvider::Cloudflare { .. } => "cloudflare".into(), }, - )); + ); } } // Pools if !project_ctx.ns().pools.is_empty() { - env.push(("RIVET_HAS_POOLS".into(), "1".into())); + env.insert("RIVET_HAS_POOLS".into(), "1".into()); } // Regions - env.push(("RIVET_REGION".into(), region_id.clone())); - env.push(("RIVET_PRIMARY_REGION".into(), project_ctx.primary_region())); + env.insert("RIVET_REGION".into(), region_id.clone()); + env.insert("RIVET_PRIMARY_REGION".into(), project_ctx.primary_region()); // Networking if matches!(run_context, RunContext::Service { .. }) { - env.push(("HEALTH_PORT".into(), k8s::gen::HEALTH_PORT.to_string())); - env.push(("METRICS_PORT".into(), k8s::gen::METRICS_PORT.to_string())); + env.insert("HEALTH_PORT".into(), k8s::gen::HEALTH_PORT.to_string()); + env.insert("METRICS_PORT".into(), k8s::gen::METRICS_PORT.to_string()); if self.config().kind.has_server() { - env.push(("PORT".into(), k8s::gen::HTTP_SERVER_PORT.to_string())); + env.insert("PORT".into(), k8s::gen::HTTP_SERVER_PORT.to_string()); } } // Add billing flag if let Some(billing) = &project_ctx.ns().rivet.billing { - env.push(( + env.insert( "RIVET_BILLING".to_owned(), serde_json::to_string(&billing).unwrap(), - )); + ); } if project_ctx.ns().dns.is_some() { let dns = terraform::output::read_dns(&project_ctx).await; - env.push(( + env.insert( "CLOUDFLARE_ZONE_ID_BASE".into(), (*dns.cloudflare_zone_ids).main.clone(), - )); - env.push(( + ); + env.insert( "CLOUDFLARE_ZONE_ID_GAME".into(), (*dns.cloudflare_zone_ids).cdn.clone(), - )); - env.push(( + ); + env.insert( "CLOUDFLARE_ZONE_ID_JOB".into(), (*dns.cloudflare_zone_ids).job.clone(), - )); + ); } if self.depends_on_captcha() { if let Some(hcaptcha) = &project_ctx.ns().captcha.hcaptcha { - env.push(( + env.insert( "HCAPTCHA_SITE_KEY_FALLBACK".into(), hcaptcha.site_key_fallback.clone(), - )); + ); } if let Some(turnstile) = &project_ctx.ns().captcha.turnstile { - env.push(( + env.insert( "TURNSTILE_SITE_KEY_MAIN".into(), turnstile.site_key_main.clone(), - )); - env.push(( + ); + env.insert( "TURNSTILE_SITE_KEY_CDN".into(), turnstile.site_key_cdn.clone(), - )); + ); } } if self.depends_on_nomad_api() { // TODO: Read host url from terraform - env.push(( + env.insert( "NOMAD_URL".into(), "http://nomad-server.nomad.svc.cluster.local:4646".into(), - )); + ); } - env.push(( + env.insert( "CRDB_MIN_CONNECTIONS".into(), self.config().cockroachdb.min_connections.to_string(), - )); + ); if self.depends_on_prometheus_api() { - env.push(( + env.insert( format!("PROMETHEUS_URL"), "http://prometheus-operated.prometheus.svc.cluster.local:9090".into(), - )); + ); } // NATS - env.push(( + env.insert( "NATS_URL".into(), // TODO: Add back passing multiple NATS nodes for failover instead of using DNS resolution "nats.nats.svc.cluster.local:4222".into(), - )); + ); // Chirp config (used for both Chirp clients and Chirp workers) - env.push(("CHIRP_SERVICE_NAME".into(), self.name())); + env.insert("CHIRP_SERVICE_NAME".into(), self.name()); // Chirp worker config if (matches!(run_context, RunContext::Service { .. }) && matches!(&self.config().kind, ServiceKind::Consumer { .. })) || self.is_monolith_worker() { - env.push(( + env.insert( "CHIRP_WORKER_INSTANCE".into(), format!("{}-$(KUBERNETES_POD_ID)", self.name()), - )); + ); - env.push(("CHIRP_WORKER_KIND".into(), "consumer".into())); - env.push(("CHIRP_WORKER_CONSUMER_GROUP".into(), self.name())); + env.insert("CHIRP_WORKER_KIND".into(), "consumer".into()); + env.insert("CHIRP_WORKER_CONSUMER_GROUP".into(), self.name()); } // Fly if let Some(fly) = &project_ctx.ns().fly { - env.push(("FLY_ORGANIZATION_ID".into(), fly.organization_id.clone())); - env.push(("FLY_REGION".into(), fly.region.clone())); + env.insert("FLY_ORGANIZATION_ID".into(), fly.organization_id.clone()); + env.insert("FLY_REGION".into(), fly.region.clone()); } // Add default provider let (default_provider, _) = project_ctx.default_s3_provider()?; - env.push(( + env.insert( "S3_DEFAULT_PROVIDER".to_string(), default_provider.as_str().to_string(), - )); + ); // Expose all S3 endpoints to services that need them let s3_deps = if self.depends_on_s3() { @@ -975,36 +974,36 @@ impl ServiceContextData { // S3 backfill if self.depends_on_s3_backfill() { if let Some(backfill) = &project_ctx.ns().s3.backfill { - env.push(("S3_BACKFILL_PROVIDER".into(), backfill.as_str().to_string())); + env.insert("S3_BACKFILL_PROVIDER".into(), backfill.as_str().to_string()); } } // Runtime-specific match &self.config().runtime { RuntimeKind::Rust { .. } => { - env.push(("RUST_BACKTRACE".into(), "1".into())); + env.insert("RUST_BACKTRACE".into(), "1".into()); } _ => {} } if project_ctx.ns().rivet.telemetry.disable { - env.push(("RIVET_TELEMETRY_DISABLE".into(), "1".into())); + env.insert("RIVET_TELEMETRY_DISABLE".into(), "1".into()); } - env.push(( + env.insert( "RIVET_API_HUB_ORIGIN_REGEX".into(), project_ctx.origin_hub_regex(), - )); + ); if project_ctx.ns().rivet.api.error_verbose { - env.push(("RIVET_API_ERROR_VERBOSE".into(), "1".into())); + env.insert("RIVET_API_ERROR_VERBOSE".into(), "1".into()); } if project_ctx.ns().rivet.profanity.filter_disable { - env.push(("RIVET_PROFANITY_FILTER_DISABLE".into(), "1".into())); + env.insert("RIVET_PROFANITY_FILTER_DISABLE".into(), "1".into()); } if project_ctx.ns().rivet.upload.nsfw_error_verbose { - env.push(("RIVET_UPLOAD_NSFW_ERROR_VERBOSE".into(), "1".into())); + env.insert("RIVET_UPLOAD_NSFW_ERROR_VERBOSE".into(), "1".into()); } - env.push(( + env.insert( "RIVET_DS_BUILD_DELIVERY_METHOD".into(), project_ctx .ns() @@ -1012,55 +1011,52 @@ impl ServiceContextData { .dynamic_servers .build_delivery_method .to_string(), - )); - - // Sort env by keys so it's always in the same order - env.sort_by_cached_key(|x| x.0.clone()); + ); Ok(env) } - pub async fn secret_env(&self, run_context: &RunContext) -> Result> { + pub async fn secret_env(&self, run_context: &RunContext) -> Result> { let project_ctx = self.project().await; - let mut env = Vec::new(); + let mut env = IndexMap::new(); // Write secrets for (secret_key, secret_config) in self.required_secrets(run_context).await? { let env_key = secret_env_var_key(&secret_key); if secret_config.optional { if let Some(value) = project_ctx.read_secret_opt(&secret_key).await? { - env.push((env_key, value)); + env.insert(env_key, value); } } else { - env.push((env_key, project_ctx.read_secret(&secret_key).await?)); + env.insert(env_key, project_ctx.read_secret(&secret_key).await?); } } // NATS - env.push(("NATS_USERNAME".into(), "chirp".into())); - env.push(("NATS_PASSWORD".into(), "password".into())); + env.insert("NATS_USERNAME".into(), "chirp".into()); + env.insert("NATS_PASSWORD".into(), "password".into()); - env.push(( + env.insert( "RIVET_JWT_KEY_PUBLIC".into(), project_ctx .read_secret(&["jwt", "key", "public_pem"]) .await?, - )); + ); if self.depends_on_jwt_key_private() { - env.push(( + env.insert( "RIVET_JWT_KEY_PRIVATE".into(), project_ctx .read_secret(&["jwt", "key", "private_pem"]) .await?, - )); + ); } if self.depends_on_sendgrid_key(&project_ctx) { - env.push(( + env.insert( "SENDGRID_KEY".into(), project_ctx.read_secret(&["sendgrid", "key"]).await?, - )); + ); } // CRDB @@ -1076,7 +1072,7 @@ impl ServiceContextData { "postgres://{}:{}@{crdb_host}/postgres?sslmode={sslmode}", username, password, ); - env.push(("CRDB_URL".into(), uri)); + env.insert("CRDB_URL".into(), uri); } // Redis @@ -1149,10 +1145,10 @@ impl ServiceContextData { format!("rediss://{}@{host}", username) }; - env.push(( + env.insert( format!("REDIS_URL_{}", db_name.to_uppercase().replace("-", "_")), url, - )); + ); } // ClickHouse @@ -1167,7 +1163,7 @@ impl ServiceContextData { username, password, *clickhouse_data.host, *clickhouse_data.port_https ); - env.push(("CLICKHOUSE_URL".into(), uri)); + env.insert("CLICKHOUSE_URL".into(), uri); } // Expose S3 endpoints to services that need them @@ -1202,12 +1198,12 @@ impl ServiceContextData { } if project_ctx.ns().dns.is_some() && self.depends_on_cloudflare() { - env.push(( + env.insert( "CLOUDFLARE_AUTH_TOKEN".into(), project_ctx .read_secret(&["cloudflare", "terraform", "auth_token"]) .await?, - )); + ); } Ok(env) @@ -1289,7 +1285,7 @@ impl ServiceContextData { async fn add_s3_env( project_ctx: &ProjectContext, - env: &mut Vec<(String, String)>, + env: &mut IndexMap, s3_dep: &Arc, provider: s3_util::Provider, ) -> Result<()> { @@ -1298,14 +1294,14 @@ async fn add_s3_env( let s3_dep_name = s3_dep.name_screaming_snake(); let s3_config = project_ctx.s3_config(provider).await?; - env.push(( + env.insert( format!("S3_{provider_upper}_BUCKET_{s3_dep_name}"), s3_dep.s3_bucket_name().await, - )); - env.push(( + ); + env.insert( format!("S3_{provider_upper}_ENDPOINT_INTERNAL_{s3_dep_name}"), s3_config.endpoint_internal, - )); + ); // External endpoint { let mut external_endpoint = s3_config.endpoint_external; @@ -1331,22 +1327,22 @@ async fn add_s3_env( } } - env.push(( + env.insert( format!("S3_{provider_upper}_ENDPOINT_EXTERNAL_{s3_dep_name}",), external_endpoint, - )); + ); } - env.push(( + env.insert( format!("S3_{provider_upper}_REGION_{s3_dep_name}"), s3_config.region, - )); + ); Ok(()) } async fn add_s3_secret_env( project_ctx: &ProjectContext, - env: &mut Vec<(String, String)>, + env: &mut IndexMap, s3_dep: &Arc, provider: s3_util::Provider, ) -> Result<()> { @@ -1355,14 +1351,14 @@ async fn add_s3_secret_env( let s3_dep_name = s3_dep.name_screaming_snake(); let s3_creds = project_ctx.s3_credentials(provider).await?; - env.push(( + env.insert( format!("S3_{provider_upper}_ACCESS_KEY_ID_{s3_dep_name}"), s3_creds.access_key_id, - )); - env.push(( + ); + env.insert( format!("S3_{provider_upper}_SECRET_ACCESS_KEY_{s3_dep_name}"), s3_creds.access_key_secret, - )); + ); Ok(()) } diff --git a/lib/bolt/core/src/dep/k8s/gen.rs b/lib/bolt/core/src/dep/k8s/gen.rs index 7acd8ab9e0..5b1a05584c 100644 --- a/lib/bolt/core/src/dep/k8s/gen.rs +++ b/lib/bolt/core/src/dep/k8s/gen.rs @@ -782,7 +782,7 @@ async fn build_volumes( } // Added for ease of use -fn generate_k8s_variables() -> Vec { +pub fn generate_k8s_variables() -> Vec { vec![ json!({ "name": "KUBERNETES_NODE_NAME", diff --git a/lib/bolt/core/src/tasks/test.rs b/lib/bolt/core/src/tasks/test.rs index e4332801d6..a2cfa78fd5 100644 --- a/lib/bolt/core/src/tasks/test.rs +++ b/lib/bolt/core/src/tasks/test.rs @@ -10,18 +10,19 @@ use std::{ use anyhow::*; use futures_util::{StreamExt, TryStreamExt}; +use indexmap::{IndexMap, IndexSet}; use indoc::formatdoc; use rand::{seq::SliceRandom, thread_rng}; use rivet_term::console::style; +use serde_json::json; use tokio::{io::AsyncWriteExt, process::Command}; use crate::{ config::{ns, service::RuntimeKind}, - context::{ProjectContext, RunContext}, + context::{ProjectContext, RunContext, ServiceContext}, dep::{ self, cargo::{self, cli::TestBinary}, - k8s::gen::{ExecServiceContext, ExecServiceDriver}, }, utils, }; @@ -128,7 +129,7 @@ pub async fn test_services>( let test_binaries = { // Collect rust services by their workspace root let mut svcs_by_workspace = HashMap::new(); - for svc in rust_svcs { + for svc in &rust_svcs { let workspace = svcs_by_workspace .entry(svc.workspace_path()) .or_insert_with(Vec::new); @@ -164,14 +165,69 @@ pub async fn test_services>( let test_suite_id = gen_test_id(); let purge = !test_ctx.no_purge; + // Build exec ctx + let run_context = RunContext::Test { + test_id: test_suite_id.clone(), + }; + let k8s_svc_name = format!("test-{test_suite_id}"); + + // Apply pod + let specs = gen_spec(ctx, &run_context, &rust_svcs, &k8s_svc_name).await; + dep::k8s::cli::apply_specs(ctx, specs).await?; + + // Wait for pod to start + eprintln!(); + rivet_term::status::progress("Waiting for pod start", ""); + let label = format!("app.kubernetes.io/name={k8s_svc_name}"); + let status = Command::new("kubectl") + .args(&[ + "wait", + "--for=condition=Ready", + "pod", + "--selector", + &label, + "-n", + "rivet-service", + ]) + .env("KUBECONFIG", ctx.gen_kubeconfig_path()) + .stdout(std::process::Stdio::null()) + .status() + .await?; + if !status.success() { + bail!("failed to check pod readiness"); + } + + // Install CA + rivet_term::status::progress("Installing CA", ""); + let status = Command::new("kubectl") + .args(&[ + "exec", + &format!("job/{k8s_svc_name}"), + "-n", + "rivet-service", + "--", + "/usr/bin/install_ca.sh", + ]) + .env("KUBECONFIG", ctx.gen_kubeconfig_path()) + .stdout(std::process::Stdio::null()) + .status() + .await?; + if !status.success() { + bail!("failed to check pod readiness"); + } + // Run tests + let test_suite_start_time = Instant::now(); + eprintln!(); rivet_term::status::progress("Running tests", &test_suite_id); + eprintln!(); let tests_complete = Arc::new(AtomicUsize::new(0)); let test_count = test_binaries.len(); let test_results = futures_util::stream::iter(test_binaries.into_iter().map(|test_binary| { let ctx = ctx.clone(); let test_suite_id = test_suite_id.clone(); + let k8s_svc_name = k8s_svc_name.clone(); let tests_complete = tests_complete.clone(); let timeout = test_ctx.timeout; @@ -179,6 +235,7 @@ pub async fn test_services>( run_test( &ctx, test_suite_id, + k8s_svc_name, test_binary, tests_complete.clone(), test_count, @@ -192,8 +249,15 @@ pub async fn test_services>( .try_collect::>() .await?; + // Delete job + Command::new("kubectl") + .args(&["delete", "job", &k8s_svc_name, "-n", "rivet-service"]) + .env("KUBECONFIG", ctx.gen_kubeconfig_path()) + .output() + .await?; + // Print results - print_results(&test_results); + print_results(&test_results, test_suite_start_time); cleanup_nomad(ctx, purge).await?; @@ -226,6 +290,7 @@ struct TestResult { async fn run_test( ctx: &ProjectContext, test_suite_id: String, + k8s_svc_name: String, test_binary: TestBinary, tests_complete: Arc, test_count: usize, @@ -239,78 +304,51 @@ async fn run_test( .find(|x| x.cargo_name() == Some(&test_binary.package)) .context("svc not found for package")?; let display_name = format!("{}::{}", svc_ctx.name(), test_binary.test_name); - - // Convert path relative to project - let relative_path = test_binary - .path - .strip_prefix(ctx.path()) - .context("path not in project")?; - let container_path = Path::new("/rivet-src").join(relative_path); - - // Build exec ctx let test_id = gen_test_id(); - let exec_ctx = ExecServiceContext { - svc_ctx: svc_ctx.clone(), - run_context: RunContext::Test { - test_id: test_id.clone(), - }, - driver: ExecServiceDriver::LocalBinaryArtifact { - exec_path: container_path, - // Limit test running in parallel & filter the tests that get ran - args: vec!["--exact".to_string(), test_binary.test_name.clone()], - }, - }; - - // Build specs - let specs = dep::k8s::gen::gen_svc(&exec_ctx).await; - let k8s_svc_name = dep::k8s::gen::k8s_svc_name(&exec_ctx); - - // Apply pod - dep::k8s::cli::apply_specs(ctx, specs).await?; // Build path to logs let logs_dir = Path::new("/tmp") .join(test_suite_id) .join(svc_ctx.name()) - .join(test_binary.target); + .join(&test_binary.target); tokio::fs::create_dir_all(&logs_dir).await?; let logs_path = logs_dir.join(format!("{}.log", test_binary.test_name)); - // Watch pod rivet_term::status::info( "Running", format!( - "{display_name} [{test_id}] [{logs_path}]", + "{display_name} [{logs_path}]", logs_path = logs_path.display() ), ); + let test_start_time = Instant::now(); let timeout = timeout .map(Duration::from_secs) .unwrap_or(DEFAULT_TEST_TIMEOUT); - let status = - match tokio::time::timeout(timeout, watch_pod(ctx, &k8s_svc_name, logs_path.clone())).await - { - Result::Ok(Result::Ok(x)) => x, - Result::Ok(Err(err)) => TestStatus::UnknownError(err.to_string()), - Err(_) => { - Command::new("kubectl") - .args(&["delete", "job", &k8s_svc_name, "-n", "rivet-service"]) - .env("KUBECONFIG", ctx.gen_kubeconfig_path()) - .output() - .await?; - - TestStatus::Timeout - } - }; + let status = match tokio::time::timeout( + timeout, + exec_test( + ctx, + &k8s_svc_name, + &test_binary, + &test_id, + logs_path.clone(), + ), + ) + .await + { + Result::Ok(Result::Ok(x)) => x, + Result::Ok(Err(err)) => TestStatus::UnknownError(err.to_string()), + Err(_) => TestStatus::Timeout, + }; // Print status - let test_duration = test_start_time.elapsed(); + let test_duration = test_start_time.elapsed().as_secs_f32(); let complete_count = tests_complete.fetch_add(1, Ordering::SeqCst) + 1; let run_info = format!( - "{display_name} ({complete_count}/{test_count}) [{test_id}] [{logs_path}] [{td:.1}s]", - td = test_duration.as_secs_f32(), - logs_path = logs_path.display() + "{display_name} ({complete_count}/{test_count}) [{logs_path}] [{test_duration:.1}s]", + logs_path = logs_path.display(), ); match &status { TestStatus::Pass => { @@ -336,117 +374,66 @@ async fn run_test( } /// Follow the pod logs and write them to a file. -async fn pipe_pod_logs( - ctx: ProjectContext, - k8s_svc_name: String, +async fn exec_test( + ctx: &ProjectContext, + k8s_svc_name: &str, + test_binary: &TestBinary, + test_id: &str, logs_path: PathBuf, -) -> Result<()> { - let label = format!("app.kubernetes.io/name={k8s_svc_name}"); +) -> Result { + // Convert path relative to project + let relative_path = test_binary + .path + .strip_prefix(ctx.path()) + .context("path not in project")?; + let container_path = Path::new("/rivet-src").join(relative_path); + + let command = format!( + "RIVET_TEST_ID={test_id} {} --exact {}", + &container_path.display(), + &test_binary.test_name + ); // Write logs to file let file = tokio::task::block_in_place(|| std::fs::File::create(&logs_path))?; let mut logs_child = Command::new("kubectl") - .args(&["logs", "-f", "--selector", &label, "-n", "rivet-service"]) + .args(&[ + "exec", + &format!("job/{k8s_svc_name}"), + "-n", + "rivet-service", + "--", + "sh", + "-c", + &command, + ]) .env("KUBECONFIG", ctx.gen_kubeconfig_path()) .stdout(file) + .stderr(std::process::Stdio::null()) + .kill_on_drop(true) .spawn()?; - logs_child.wait().await?; + let status = logs_child.wait().await?; // Write end of file let mut file = tokio::fs::OpenOptions::new() .append(true) .open(&logs_path) .await?; - file.write_all(b"\n=== POD STOPPED ===\n").await?; - - Ok(()) -} - -/// Watch the pod to look for completion or failure. -async fn watch_pod( - ctx: &ProjectContext, - k8s_svc_name: &str, - logs_path: PathBuf, -) -> Result { - let label = format!("app.kubernetes.io/name={k8s_svc_name}"); + file.write_all(b"\n=== TEST FINISHED ===\n").await?; - let mut spawned_pipe_logs_task = false; - loop { - // TODO: Use --wait for better performance - let output = Command::new("kubectl") - .args(&[ - "get", - "pod", - "--selector", - &label, - "-n", - "rivet-service", - "-o", - "jsonpath={.items[0].status.phase}", - ]) - .env("KUBECONFIG", ctx.gen_kubeconfig_path()) - .output() - .await?; - - let output_str = String::from_utf8_lossy(&output.stdout); - let output_str = output_str.trim(); - - // Start piping logs to a file once the container has started - if !spawned_pipe_logs_task && matches!(output_str, "Running" | "Succeeded" | "Failed") { - spawned_pipe_logs_task = true; - tokio::spawn(pipe_pod_logs( - ctx.clone(), - k8s_svc_name.to_string(), - logs_path.clone(), - )); - } - - // Wait for container to finish - match output_str { - "Pending" | "Running" | "" => { - // Continue - tokio::time::sleep(Duration::from_millis(500)).await; - } - "Succeeded" | "Failed" => { - // Get the exit code of the pod - let output = Command::new("kubectl") - .args(&[ - "get", - "pod", - "--selector", - &label, - "-n", - "rivet-service", - "-o", - "jsonpath={.items[0].status.containerStatuses[0].state.terminated.exitCode}", - ]) - .env("KUBECONFIG", ctx.gen_kubeconfig_path()) - .output() - .await?; - - let exit_code_str = String::from_utf8_lossy(&output.stdout); - let Result::Ok(exit_code) = exit_code_str.trim().parse::() else { - return Ok(TestStatus::UnknownError(format!( - "Could not parse exit code: {exit_code_str:?}" - ))); - }; - - let test_status = match exit_code { - 0 => TestStatus::Pass, - 101 => TestStatus::TestFailed, - x => TestStatus::UnknownExitCode(x), - }; - - return Ok(test_status); - } - _ => bail!("unexpected pod status: {}", output_str), - } + match status.code() { + Some(0) => Ok(TestStatus::Pass), + Some(101) => Ok(TestStatus::TestFailed), + Some(x) => Ok(TestStatus::UnknownExitCode(x)), + None => Ok(TestStatus::UnknownError("no status code".to_string())), } } -fn print_results(test_results: &[TestResult]) { +fn print_results(test_results: &[TestResult], start_time: Instant) { + let test_duration = start_time.elapsed().as_secs_f32(); + eprintln!(); - rivet_term::status::success("Complete", ""); + rivet_term::status::success("Complete", format!("{test_duration:.1}s")); let passed_count = test_results .iter() @@ -614,3 +601,248 @@ struct NomadJob { #[serde(rename = "ID")] id: String, } + +/// Generates the k8s spec for the main test pod. +pub async fn gen_spec( + ctx: &ProjectContext, + run_context: &RunContext, + svcs: &[&ServiceContext], + k8s_svc_name: &str, +) -> Vec { + let mut specs = Vec::new(); + + // Render env + let mut env = IndexMap::new(); + let mut secret_env = IndexMap::new(); + + for svc_ctx in svcs { + env.extend(svc_ctx.env(&run_context).await.unwrap()); + secret_env.extend(svc_ctx.secret_env(&run_context).await.unwrap()); + } + + let env = dep::k8s::gen::generate_k8s_variables() + .into_iter() + .chain( + env.into_iter() + .map(|(k, v)| json!({ "name": k, "value": v })), + ) + .collect::>(); + + // Create secret env vars + let secret_env_name = format!("{}-secret-env", k8s_svc_name); + let secret_data = secret_env + .into_iter() + .map(|(k, v)| (k, base64::encode(v))) + .collect::>(); + specs.push(json!({ + "apiVersion": "v1", + "kind": "Secret", + "metadata": { + "name": secret_env_name, + "namespace": "rivet-service" + }, + "data": secret_data + })); + + let (volumes, volume_mounts) = build_volumes(&ctx, run_context, svcs).await; + + let metadata = json!({ + "name": k8s_svc_name, + "namespace": "rivet-service", + "labels": { + "app.kubernetes.io/name": k8s_svc_name + } + }); + + let pod_spec = json!({ + "restartPolicy": "Never", + "terminationGracePeriodSeconds": 0, + "imagePullSecrets": [{ + "name": "docker-auth" + }], + "containers": [{ + "name": "service", + "image": "ghcr.io/rivet-gg/rivet-local-binary-artifact-runner:07e8de0", + "imagePullPolicy": "IfNotPresent", + "command": ["/bin/sh"], + "args": ["-c", "sleep 100000"], + "env": env, + "envFrom": [{ + "secretRef": { + "name": secret_env_name + } + }], + "volumeMounts": volume_mounts, + }], + "volumes": volumes + }); + let pod_template = json!({ + "metadata": { + "labels": { + "app.kubernetes.io/name": k8s_svc_name + }, + }, + "spec": pod_spec, + }); + + specs.push(json!({ + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": metadata, + "spec": { + "ttlSecondsAfterFinished": 3, + "completions": 1, + "backoffLimit": 0, + "template": pod_template + } + })); + + specs +} + +pub async fn build_volumes( + project_ctx: &ProjectContext, + run_context: &RunContext, + svcs: &[&ServiceContext], +) -> (Vec, Vec) { + // Shared data between containers + let mut volumes = Vec::::new(); + let mut volume_mounts = Vec::::new(); + + // Volumes + volumes.push(json!({ + "name": "rivet-src", + "hostPath": { + "path": "/rivet-src", + "type": "Directory" + } + })); + volumes.push(json!({ + "name": "nix-store", + "hostPath": { + "path": "/nix/store", + "type": "Directory" + } + })); + + // Mounts + volume_mounts.push(json!({ + "name": "rivet-src", + "mountPath": "/rivet-src", + "readOnly": true + })); + volume_mounts.push(json!({ + "name": "nix-store", + "mountPath": "/nix/store", + "readOnly": true + })); + + // Add Redis CA + match project_ctx.ns().redis.provider { + ns::RedisProvider::Kubernetes {} => { + let mut redis_deps = IndexSet::with_capacity(2); + + for svc in svcs { + let svc_redis_deps = + svc.redis_dependencies(run_context) + .await + .into_iter() + .map(|redis_dep| { + if let RuntimeKind::Redis { persistent } = redis_dep.config().runtime { + if persistent { + "persistent" + } else { + "ephemeral" + } + } else { + unreachable!(); + } + }); + + redis_deps.extend(svc_redis_deps); + } + + volumes.extend(redis_deps.iter().map(|db| { + json!({ + "name": format!("redis-{}-ca", db), + "configMap": { + "name": format!("redis-{}-ca", db), + "defaultMode": 420, + "items": [ + { + "key": "ca.crt", + "path": format!("redis-{}-ca.crt", db) + } + ] + } + }) + })); + volume_mounts.extend(redis_deps.iter().map(|db| { + json!({ + "name": format!("redis-{}-ca", db), + "mountPath": format!("/usr/local/share/ca-certificates/redis-{}-ca.crt", db), + "subPath": format!("redis-{}-ca.crt", db) + }) + })); + } + ns::RedisProvider::Aws { .. } | ns::RedisProvider::Aiven { .. } => { + // Uses publicly signed cert + } + } + + // Add CRDB CA + match project_ctx.ns().cockroachdb.provider { + ns::CockroachDBProvider::Kubernetes {} => { + volumes.push(json!({ + "name": "crdb-ca", + "configMap": { + "name": "crdb-ca", + "defaultMode": 420, + "items": [ + { + "key": "ca.crt", + "path": "crdb-ca.crt" + } + ] + } + })); + volume_mounts.push(json!({ + "name": "crdb-ca", + "mountPath": "/usr/local/share/ca-certificates/crdb-ca.crt", + "subPath": "crdb-ca.crt" + })); + } + ns::CockroachDBProvider::Managed { .. } => { + // Uses publicly signed cert + } + } + + // Add ClickHouse CA + match project_ctx.ns().clickhouse.provider { + ns::ClickHouseProvider::Kubernetes {} => { + volumes.push(json!({ + "name": "clickhouse-ca", + "configMap": { + "name": "clickhouse-ca", + "defaultMode": 420, + "items": [ + { + "key": "ca.crt", + "path": "clickhouse-ca.crt" + } + ] + } + })); + volume_mounts.push(json!({ + "name": "clickhouse-ca", + "mountPath": "/usr/local/share/ca-certificates/clickhouse-ca.crt", + "subPath": "clickhouse-ca.crt" + })); + } + ns::ClickHouseProvider::Managed { .. } => { + // Uses publicly signed cert + } + } + + (volumes, volume_mounts) +}