diff --git a/shotover-proxy/benches/helpers.rs b/shotover-proxy/benches/helpers.rs index dc5fd9307..af7b4b777 100644 --- a/shotover-proxy/benches/helpers.rs +++ b/shotover-proxy/benches/helpers.rs @@ -1,18 +1,48 @@ use anyhow::Result; use shotover_proxy::runner::{ConfigOpts, Runner}; use tokio::runtime::Runtime; +use tokio::sync::broadcast; use tokio::task::JoinHandle; -pub fn run_shotover_with_topology(topology_path: &str) -> (Runtime, JoinHandle>) { - let opts = ConfigOpts { - topology_file: topology_path.into(), - config_file: "config/config.yaml".into(), - ..ConfigOpts::default() - }; - let spawn = Runner::new(opts).unwrap().run_spawn(); +pub struct ShotoverManager { + pub runtime: Runtime, + pub handle: Option>>, + pub trigger_shutdown_tx: broadcast::Sender<()>, +} + +impl ShotoverManager { + pub fn from_topology_file(topology_path: &str) -> ShotoverManager { + let opts = ConfigOpts { + topology_file: topology_path.into(), + config_file: "config/config.yaml".into(), + ..ConfigOpts::default() + }; + let spawn = Runner::new(opts).unwrap().run_spawn(); + + // If we allow the tracing_guard to be dropped then the following tests in the same file will not get tracing so we mem::forget it. + // This is because tracing can only be initialized once in the same execution, secondary attempts to initalize tracing will silently fail. + std::mem::forget(spawn.tracing_guard); + + ShotoverManager { + runtime: spawn.runtime, + handle: Some(spawn.handle), + trigger_shutdown_tx: spawn.trigger_shutdown_tx, + } + } +} - // If we allow the tracing_guard to be dropped then the following tests in the same file will not get tracing so we mem::forget it. - // This is because tracing can only be initialized once in the same execution, secondary attempts to initalize tracing will silently fail. - std::mem::forget(spawn.tracing_guard); - (spawn.runtime, spawn.handle) +impl Drop for ShotoverManager { + fn drop(&mut self) { + if std::thread::panicking() { + // If already panicking do nothing in order to avoid a double panic. + // We only shutdown shotover to test the shutdown process not because we need to clean up any resources. + // So skipping shutdown on panic is fine. + } else { + self.trigger_shutdown_tx.send(()).unwrap(); + self.runtime + .block_on(self.handle.take().unwrap()) + .unwrap() + .unwrap(); + } + } } diff --git a/shotover-proxy/benches/redis_benches.rs b/shotover-proxy/benches/redis_benches.rs index 36d131564..5c0ce9e82 100644 --- a/shotover-proxy/benches/redis_benches.rs +++ b/shotover-proxy/benches/redis_benches.rs @@ -4,11 +4,12 @@ use std::time::Duration; use test_helpers::docker_compose::DockerCompose; mod helpers; -use helpers::run_shotover_with_topology; +use helpers::ShotoverManager; fn redis_active_bench(c: &mut Criterion) { let _compose = DockerCompose::new("examples/redis-multi/docker-compose.yml"); - let _running = run_shotover_with_topology("examples/redis-multi/topology.yaml"); + let _shotover_manager = + ShotoverManager::from_topology_file("examples/redis-multi/topology.yaml"); let client = redis::Client::open("redis://127.0.0.1:6379/").unwrap(); let mut con; @@ -40,7 +41,8 @@ fn redis_active_bench(c: &mut Criterion) { fn redis_cluster_bench(c: &mut Criterion) { let _compose = DockerCompose::new("examples/redis-cluster/docker-compose.yml"); - let _running = run_shotover_with_topology("examples/redis-cluster/topology.yaml"); + let _shotover_manager = + ShotoverManager::from_topology_file("examples/redis-cluster/topology.yaml"); let client = redis::Client::open("redis://127.0.0.1:6379/").unwrap(); let mut con; @@ -72,7 +74,8 @@ fn redis_cluster_bench(c: &mut Criterion) { fn redis_passthrough_bench(c: &mut Criterion) { let _compose = DockerCompose::new("examples/redis-passthrough/docker-compose.yml"); - let _running = run_shotover_with_topology("examples/redis-passthrough/topology.yaml"); + let _shotover_manager = + ShotoverManager::from_topology_file("examples/redis-passthrough/topology.yaml"); let client = redis::Client::open("redis://127.0.0.1:6379/").unwrap(); let mut con; diff --git a/shotover-proxy/src/config/topology.rs b/shotover-proxy/src/config/topology.rs index 7b6edd44d..fe8d79bc4 100644 --- a/shotover-proxy/src/config/topology.rs +++ b/shotover-proxy/src/config/topology.rs @@ -105,13 +105,15 @@ impl Topology { } #[allow(clippy::type_complexity)] - pub async fn run_chains(&self) -> Result<(Vec, Receiver<()>)> { + pub async fn run_chains( + &self, + trigger_shutdown_rx: broadcast::Sender<()>, + ) -> Result<(Vec, Receiver<()>)> { let mut topics = self.build_topics(); info!("Loaded topics {:?}", topics.topics_tx.keys()); let mut sources_list: Vec = Vec::new(); - let (notify_shutdown, _) = broadcast::channel(1); let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1); let chains = self.build_chains(&topics).await?; @@ -125,7 +127,7 @@ impl Topology { .get_source( chain, &mut topics, - notify_shutdown.clone(), + trigger_shutdown_rx.clone(), shutdown_complete_tx.clone(), ) .await?, diff --git a/shotover-proxy/src/runner.rs b/shotover-proxy/src/runner.rs index 75de1df37..1058909fb 100644 --- a/shotover-proxy/src/runner.rs +++ b/shotover-proxy/src/runner.rs @@ -4,8 +4,10 @@ use anyhow::{anyhow, Result}; use clap::{crate_version, Clap}; use metrics_runtime::Receiver; use tokio::runtime::{self, Runtime}; +use tokio::signal; +use tokio::sync::broadcast; use tokio::task::JoinHandle; -use tracing::{debug, info}; +use tracing::{debug, error, info}; use tracing_appender::non_blocking::{NonBlocking, WorkerGuard}; use tracing_subscriber::fmt::format::{DefaultFields, Format}; use tracing_subscriber::fmt::Layer; @@ -91,17 +93,30 @@ impl Runner { } pub fn run_spawn(self) -> RunnerSpawned { - let handle = self.runtime.spawn(run(self.topology, self.config)); + let (trigger_shutdown_tx, _) = broadcast::channel(1); + let handle = + self.runtime + .spawn(run(self.topology, self.config, trigger_shutdown_tx.clone())); RunnerSpawned { runtime: self.runtime, tracing_guard: self.tracing.guard, + trigger_shutdown_tx, handle, } } pub fn run_block(self) -> Result<()> { - self.runtime.block_on(run(self.topology, self.config)) + let (trigger_shutdown_tx, _) = broadcast::channel(1); + + let trigger_shutdown_tx_clone = trigger_shutdown_tx.clone(); + self.runtime.spawn(async move { + signal::ctrl_c().await.unwrap(); + trigger_shutdown_tx_clone.send(()).unwrap(); + }); + + self.runtime + .block_on(run(self.topology, self.config, trigger_shutdown_tx)) } } @@ -134,9 +149,14 @@ pub struct RunnerSpawned { pub runtime: Runtime, pub handle: JoinHandle>, pub tracing_guard: WorkerGuard, + pub trigger_shutdown_tx: broadcast::Sender<()>, } -pub async fn run(topology: Topology, config: Config) -> Result<()> { +pub async fn run( + topology: Topology, + config: Config, + trigger_shutdown_tx: broadcast::Sender<()>, +) -> Result<()> { info!("Starting Shotover {}", crate_version!()); info!(configuration = ?config); info!(topology = ?topology); @@ -151,12 +171,17 @@ pub async fn run(topology: Topology, config: Config) -> Result<()> { std::mem::size_of::>() ); - match topology.run_chains().await { + match topology.run_chains(trigger_shutdown_tx).await { Ok((_, mut shutdown_complete_rx)) => { - let _ = shutdown_complete_rx.recv().await; - info!("Goodbye!"); + shutdown_complete_rx.recv().await; + info!("Shotover was shutdown cleanly."); Ok(()) } - Err(error) => Err(anyhow!("Failed to run chains: {}", error)), + Err(error) => { + error!("{:?}", error); + Err(anyhow!( + "Shotover failed to initialize, the fatal error was logged." + )) + } } } diff --git a/shotover-proxy/src/server.rs b/shotover-proxy/src/server.rs index dcd6c5b0e..110d9bca3 100644 --- a/shotover-proxy/src/server.rs +++ b/shotover-proxy/src/server.rs @@ -64,7 +64,7 @@ pub struct TcpCodecListener { /// handle. When a graceful shutdown is initiated, a `()` value is sent via /// the broadcast::Sender. Each active connection receives it, reaches a /// safe terminal state, and completes the task. - pub notify_shutdown: broadcast::Sender<()>, + pub trigger_shutdown_tx: broadcast::Sender<()>, /// Used as part of the graceful shutdown process to wait for client /// connections to complete processing. @@ -184,7 +184,7 @@ impl TcpCodecListener { limit_connections: self.limit_connections.clone(), // Receive shutdown notifications. - shutdown: Shutdown::new(self.notify_shutdown.subscribe()), + shutdown: Shutdown::new(self.trigger_shutdown_tx.subscribe()), // Notifies the receiver half once all clones are // dropped. diff --git a/shotover-proxy/src/sources/cassandra_source.rs b/shotover-proxy/src/sources/cassandra_source.rs index e54388cff..680148c7e 100644 --- a/shotover-proxy/src/sources/cassandra_source.rs +++ b/shotover-proxy/src/sources/cassandra_source.rs @@ -30,7 +30,7 @@ impl SourcesFromConfig for CassandraConfig { &self, chain: &TransformChain, _topics: &mut TopicHolder, - notify_shutdown: broadcast::Sender<()>, + trigger_shutdown_tx: broadcast::Sender<()>, shutdown_complete_tx: mpsc::Sender<()>, ) -> Result> { Ok(vec![Sources::Cassandra( @@ -38,7 +38,7 @@ impl SourcesFromConfig for CassandraConfig { chain, self.listen_addr.clone(), self.cassandra_ks.clone(), - notify_shutdown, + trigger_shutdown_tx, shutdown_complete_tx, self.bypass_query_processing.unwrap_or(true), self.connection_limit, @@ -61,7 +61,7 @@ impl CassandraSource { chain: &TransformChain, listen_addr: String, cassandra_ks: HashMap>, - notify_shutdown: broadcast::Sender<()>, + trigger_shutdown_tx: broadcast::Sender<()>, shutdown_complete_tx: mpsc::Sender<()>, bypass: bool, connection_limit: Option, @@ -71,6 +71,8 @@ impl CassandraSource { info!("Starting Cassandra source on [{}]", listen_addr); + let mut trigger_shutdown_rx = trigger_shutdown_tx.subscribe(); + let mut listener = TcpCodecListener { chain: chain.clone(), source_name: name.to_string(), @@ -79,7 +81,7 @@ impl CassandraSource { hard_connection_limit: hard_connection_limit.unwrap_or(false), codec: CassandraCodec2::new(cassandra_ks, bypass), limit_connections: Arc::new(Semaphore::new(connection_limit.unwrap_or(512))), - notify_shutdown, + trigger_shutdown_tx, shutdown_complete_tx, }; @@ -90,19 +92,19 @@ impl CassandraSource { error!(cause = %err, "failed to accept"); } } - _ = tokio::signal::ctrl_c() => { - info!("Shutdown signal received - shutting down") + _ = trigger_shutdown_rx.recv() => { + info!("cassandra source shutting down") } } let TcpCodecListener { - notify_shutdown, + trigger_shutdown_tx, shutdown_complete_tx, .. } = listener; drop(shutdown_complete_tx); - drop(notify_shutdown); + drop(trigger_shutdown_tx); Ok(()) }); diff --git a/shotover-proxy/src/sources/mod.rs b/shotover-proxy/src/sources/mod.rs index 8c82e9e2e..f0fffe861 100644 --- a/shotover-proxy/src/sources/mod.rs +++ b/shotover-proxy/src/sources/mod.rs @@ -62,20 +62,20 @@ impl SourcesConfig { &self, chain: &TransformChain, topics: &mut TopicHolder, - notify_shutdown: broadcast::Sender<()>, + trigger_shutdown_tx: broadcast::Sender<()>, shutdown_complete_tx: mpsc::Sender<()>, ) -> Result> { match self { SourcesConfig::Cassandra(c) => { - c.get_source(chain, topics, notify_shutdown, shutdown_complete_tx) + c.get_source(chain, topics, trigger_shutdown_tx, shutdown_complete_tx) .await } SourcesConfig::Mpsc(m) => { - m.get_source(chain, topics, notify_shutdown, shutdown_complete_tx) + m.get_source(chain, topics, trigger_shutdown_tx, shutdown_complete_tx) .await } SourcesConfig::Redis(r) => { - r.get_source(chain, topics, notify_shutdown, shutdown_complete_tx) + r.get_source(chain, topics, trigger_shutdown_tx, shutdown_complete_tx) .await } } @@ -88,7 +88,7 @@ pub trait SourcesFromConfig: Send { &self, chain: &TransformChain, topics: &mut TopicHolder, - notify_shutdown: broadcast::Sender<()>, + trigger_shutdown: broadcast::Sender<()>, shutdown_complete_tx: mpsc::Sender<()>, ) -> Result>; } diff --git a/shotover-proxy/src/sources/mpsc_source.rs b/shotover-proxy/src/sources/mpsc_source.rs index eeb6f7add..62cb64417 100644 --- a/shotover-proxy/src/sources/mpsc_source.rs +++ b/shotover-proxy/src/sources/mpsc_source.rs @@ -29,7 +29,7 @@ impl SourcesFromConfig for AsyncMpscConfig { &self, chain: &TransformChain, topics: &mut TopicHolder, - notify_shutdown: broadcast::Sender<()>, + trigger_shutdown_on_drop_rx: broadcast::Sender<()>, shutdown_complete_tx: mpsc::Sender<()>, ) -> Result> { if let Some(rx) = topics.get_rx(&self.topic_name) { @@ -41,7 +41,7 @@ impl SourcesFromConfig for AsyncMpscConfig { chain.clone(), rx, &self.topic_name, - Shutdown::new(notify_shutdown.subscribe()), + Shutdown::new(trigger_shutdown_on_drop_rx.subscribe()), shutdown_complete_tx, behavior.clone(), ))]) diff --git a/shotover-proxy/src/sources/redis_source.rs b/shotover-proxy/src/sources/redis_source.rs index 8ac4938d3..366d57337 100644 --- a/shotover-proxy/src/sources/redis_source.rs +++ b/shotover-proxy/src/sources/redis_source.rs @@ -28,7 +28,7 @@ impl SourcesFromConfig for RedisConfig { &self, chain: &TransformChain, _topics: &mut TopicHolder, - notify_shutdown: broadcast::Sender<()>, + trigger_shutdown_tx: broadcast::Sender<()>, shutdown_complete_tx: mpsc::Sender<()>, ) -> Result> { Ok(vec![Sources::Redis( @@ -36,7 +36,7 @@ impl SourcesFromConfig for RedisConfig { chain, self.listen_addr.clone(), self.batch_size_hint, - notify_shutdown, + trigger_shutdown_tx, shutdown_complete_tx, self.connection_limit, self.hard_connection_limit, @@ -58,7 +58,7 @@ impl RedisSource { chain: &TransformChain, listen_addr: String, batch_hint: u64, - notify_shutdown: broadcast::Sender<()>, + trigger_shutdown_tx: broadcast::Sender<()>, shutdown_complete_tx: mpsc::Sender<()>, connection_limit: Option, hard_connection_limit: Option, @@ -66,6 +66,8 @@ impl RedisSource { info!("Starting Redis source on [{}]", listen_addr); let name = "Redis Source"; + let mut trigger_shutdown_rx = trigger_shutdown_tx.subscribe(); + let mut listener = TcpCodecListener { chain: chain.clone(), source_name: name.to_string(), @@ -74,7 +76,7 @@ impl RedisSource { hard_connection_limit: hard_connection_limit.unwrap_or(false), codec: RedisCodec::new(false, batch_hint as usize), limit_connections: Arc::new(Semaphore::new(connection_limit.unwrap_or(512))), - notify_shutdown, + trigger_shutdown_tx, shutdown_complete_tx, }; @@ -85,19 +87,19 @@ impl RedisSource { error!(cause = %err, "failed to accept"); } } - _ = tokio::signal::ctrl_c() => { - info!("Shutdown signal received - shutting down") + _ = trigger_shutdown_rx.recv() => { + info!("redis source shutting down") } } let TcpCodecListener { - notify_shutdown, + trigger_shutdown_tx, shutdown_complete_tx, .. } = listener; drop(shutdown_complete_tx); - drop(notify_shutdown); + drop(trigger_shutdown_tx); Ok(()) }); diff --git a/shotover-proxy/tests/helpers/mod.rs b/shotover-proxy/tests/helpers/mod.rs index dc5fd9307..af7b4b777 100644 --- a/shotover-proxy/tests/helpers/mod.rs +++ b/shotover-proxy/tests/helpers/mod.rs @@ -1,18 +1,48 @@ use anyhow::Result; use shotover_proxy::runner::{ConfigOpts, Runner}; use tokio::runtime::Runtime; +use tokio::sync::broadcast; use tokio::task::JoinHandle; -pub fn run_shotover_with_topology(topology_path: &str) -> (Runtime, JoinHandle>) { - let opts = ConfigOpts { - topology_file: topology_path.into(), - config_file: "config/config.yaml".into(), - ..ConfigOpts::default() - }; - let spawn = Runner::new(opts).unwrap().run_spawn(); +pub struct ShotoverManager { + pub runtime: Runtime, + pub handle: Option>>, + pub trigger_shutdown_tx: broadcast::Sender<()>, +} + +impl ShotoverManager { + pub fn from_topology_file(topology_path: &str) -> ShotoverManager { + let opts = ConfigOpts { + topology_file: topology_path.into(), + config_file: "config/config.yaml".into(), + ..ConfigOpts::default() + }; + let spawn = Runner::new(opts).unwrap().run_spawn(); + + // If we allow the tracing_guard to be dropped then the following tests in the same file will not get tracing so we mem::forget it. + // This is because tracing can only be initialized once in the same execution, secondary attempts to initalize tracing will silently fail. + std::mem::forget(spawn.tracing_guard); + + ShotoverManager { + runtime: spawn.runtime, + handle: Some(spawn.handle), + trigger_shutdown_tx: spawn.trigger_shutdown_tx, + } + } +} - // If we allow the tracing_guard to be dropped then the following tests in the same file will not get tracing so we mem::forget it. - // This is because tracing can only be initialized once in the same execution, secondary attempts to initalize tracing will silently fail. - std::mem::forget(spawn.tracing_guard); - (spawn.runtime, spawn.handle) +impl Drop for ShotoverManager { + fn drop(&mut self) { + if std::thread::panicking() { + // If already panicking do nothing in order to avoid a double panic. + // We only shutdown shotover to test the shutdown process not because we need to clean up any resources. + // So skipping shutdown on panic is fine. + } else { + self.trigger_shutdown_tx.send(()).unwrap(); + self.runtime + .block_on(self.handle.take().unwrap()) + .unwrap() + .unwrap(); + } + } } diff --git a/shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs b/shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs index 75d99f58a..af52222fc 100644 --- a/shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs +++ b/shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs @@ -2,7 +2,7 @@ use redis::{Commands, ErrorKind, RedisError, Value}; -use crate::helpers::run_shotover_with_topology; +use crate::helpers::ShotoverManager; use crate::redis_int_tests::support::TestContext; use test_helpers::docker_compose::DockerCompose; @@ -693,7 +693,9 @@ fn test_cluster_script() { #[serial(redis)] fn test_pass_through() { let _compose = DockerCompose::new("examples/redis-passthrough/docker-compose.yml"); - let _running = run_shotover_with_topology("examples/redis-passthrough/topology.yaml"); + let _shotover_manager = + ShotoverManager::from_topology_file("examples/redis-passthrough/topology.yaml"); + run_all(); } @@ -702,7 +704,9 @@ fn test_pass_through() { #[allow(dead_code)] fn test_pass_through_one() { let _compose = DockerCompose::new("examples/redis-passthrough/docker-compose.yml"); - let _running = run_shotover_with_topology("examples/redis-passthrough/topology.yaml"); + let _shotover_manager = + ShotoverManager::from_topology_file("examples/redis-passthrough/topology.yaml"); + test_real_transaction(); } @@ -710,7 +714,9 @@ fn test_pass_through_one() { #[serial(redis)] fn test_active_active_redis() { let _compose = DockerCompose::new("examples/redis-multi/docker-compose.yml"); - let _running = run_shotover_with_topology("examples/redis-multi/topology.yaml"); + let _shotover_manager = + ShotoverManager::from_topology_file("examples/redis-multi/topology.yaml"); + run_all_active_safe(); } @@ -718,7 +724,8 @@ fn test_active_active_redis() { #[serial(redis)] fn test_active_one_active_redis() { let _compose = DockerCompose::new("examples/redis-multi/docker-compose.yml"); - let _running = run_shotover_with_topology("examples/redis-multi/topology.yaml"); + let _shotover_manager = + ShotoverManager::from_topology_file("examples/redis-multi/topology.yaml"); // test_args(); test_cluster_basics(); @@ -731,7 +738,8 @@ fn test_active_one_active_redis() { #[serial(redis)] fn test_pass_redis_cluster_one() { let _compose = DockerCompose::new("examples/redis-cluster/docker-compose.yml"); - let _running = run_shotover_with_topology("examples/redis-cluster/topology.yaml"); + let _shotover_manager = + ShotoverManager::from_topology_file("examples/redis-cluster/topology.yaml"); // test_args()test_args; test_pipeline_error(); //TODO: script does not seem to be loading in the server? @@ -742,7 +750,8 @@ fn test_pass_redis_cluster_one() { // #[serial(redis)] fn _test_cluster_auth_redis() { let _compose = DockerCompose::new("examples/redis-cluster-auth/docker-compose.yml"); - let _running = run_shotover_with_topology("examples/redis-cluster-auth/topology.yaml"); + let _shotover_manager = + ShotoverManager::from_topology_file("examples/redis-cluster-auth/topology.yaml"); let ctx = TestContext::new_auth(); let mut con = ctx.connection(); @@ -797,7 +806,8 @@ fn _test_cluster_auth_redis() { #[serial(redis)] fn test_cluster_all_redis() { let _compose = DockerCompose::new("examples/redis-cluster/docker-compose.yml"); - let _running = run_shotover_with_topology("examples/redis-cluster/topology.yaml"); + let _shotover_manager = + ShotoverManager::from_topology_file("examples/redis-cluster/topology.yaml"); // panic!("Loooks like we are getting some out of order issues with pipelined request"); run_all_cluster_safe(); } @@ -806,7 +816,8 @@ fn test_cluster_all_redis() { #[serial(redis)] fn test_cluster_all_script_redis() { let _compose = DockerCompose::new("examples/redis-cluster/docker-compose.yml"); - let _running = run_shotover_with_topology("examples/redis-cluster/topology.yaml"); + let _shotover_manager = + ShotoverManager::from_topology_file("examples/redis-cluster/topology.yaml"); // panic!("Loooks like we are getting some out of order issues with pipelined request"); for _i in 0..1999 { test_script(); @@ -817,7 +828,8 @@ fn test_cluster_all_script_redis() { #[serial(redis)] fn test_cluster_all_pipeline_safe_redis() { let _compose = DockerCompose::new("examples/redis-cluster/docker-compose.yml"); - let _running = run_shotover_with_topology("examples/redis-cluster/topology.yaml"); + let _shotover_manager = + ShotoverManager::from_topology_file("examples/redis-cluster/topology.yaml"); let ctx = TestContext::new(); let mut con = ctx.connection(); diff --git a/shotover-proxy/tests/redis_presence_integration.rs b/shotover-proxy/tests/redis_presence_integration.rs index 9b72e521c..d3d680daa 100644 --- a/shotover-proxy/tests/redis_presence_integration.rs +++ b/shotover-proxy/tests/redis_presence_integration.rs @@ -1,6 +1,6 @@ mod helpers; -use helpers::run_shotover_with_topology; +use helpers::ShotoverManager; use redis::{Commands, Connection, RedisResult}; use std::{thread, time}; use tracing::info; @@ -198,7 +198,8 @@ async fn test_simple_pipeline_workflow() { // #[tokio::test(threaded_scheduler)] #[allow(dead_code)] async fn run_all() { - let _running = run_shotover_with_topology("examples/redis-multi/topology.yaml"); + let _manager = ShotoverManager::from_topology_file("examples/redis-multi/topology.yaml"); + thread::sleep(time::Duration::from_secs(2)); test_simple_pipeline_workflow().await; test_presence_fresh_join_pipeline_workflow().await; diff --git a/test-helpers/Cargo.toml b/test-helpers/Cargo.toml index a940bce5e..b04071c75 100644 --- a/test-helpers/Cargo.toml +++ b/test-helpers/Cargo.toml @@ -8,3 +8,4 @@ edition = "2018" [dependencies] tracing = "0.1.15" subprocess = "0.2.7" +anyhow = "1.0.42" diff --git a/test-helpers/src/docker_compose.rs b/test-helpers/src/docker_compose.rs index f85ae6f0f..bc94c9e53 100644 --- a/test-helpers/src/docker_compose.rs +++ b/test-helpers/src/docker_compose.rs @@ -1,23 +1,26 @@ +use anyhow::{anyhow, Result}; use std::thread; use std::time; use subprocess::{Exec, Redirection}; use tracing::info; -fn run_command(command: &str, args: &[&str]) { +fn run_command(command: &str, args: &[&str]) -> Result<()> { let data = Exec::cmd(command) .args(args) .stdout(Redirection::Pipe) .stderr(Redirection::Merge) - .capture() - .unwrap(); - if !data.exit_status.success() { - panic!( + .capture()?; + + if data.exit_status.success() { + Ok(()) + } else { + Err(anyhow!( "command {} {:?} exited with {:?} and output:\n{}", command, args, data.exit_status, data.stdout_str() - ) + )) } } @@ -27,11 +30,11 @@ pub struct DockerCompose { impl DockerCompose { pub fn new(file_path: &str) -> Self { - DockerCompose::clean_up(file_path); + DockerCompose::clean_up(file_path).unwrap(); info!("bringing up docker compose {}", file_path); - run_command("docker-compose", &["-f", file_path, "up", "-d"]); + run_command("docker-compose", &["-f", file_path, "up", "-d"]).unwrap(); thread::sleep(time::Duration::from_secs(4)); @@ -40,18 +43,29 @@ impl DockerCompose { } } - fn clean_up(file_path: &str) { + fn clean_up(file_path: &str) -> Result<()> { info!("bringing down docker compose {}", file_path); - run_command("docker-compose", &["-f", file_path, "down", "-v"]); - run_command("docker-compose", &["-f", file_path, "rm", "-f", "-s", "-v"]); + run_command("docker-compose", &["-f", file_path, "down", "-v"])?; + run_command("docker-compose", &["-f", file_path, "rm", "-f", "-s", "-v"])?; thread::sleep(time::Duration::from_secs(1)); + + Ok(()) } } impl Drop for DockerCompose { fn drop(&mut self) { - DockerCompose::clean_up(&self.file_path); + if std::thread::panicking() { + if let Err(err) = DockerCompose::clean_up(&self.file_path) { + println!( + "ERROR: docker compose failed to bring down while already panicking: {:?}", + err + ); + } + } else { + DockerCompose::clean_up(&self.file_path).unwrap(); + } } }