Skip to content

Commit

Permalink
Tests trigger shotover shutdown and then unwrap the result of the joi…
Browse files Browse the repository at this point in the history
…n handle (#139)
  • Loading branch information
rukai authored Aug 17, 2021
1 parent 00a8b04 commit e033bf7
Show file tree
Hide file tree
Showing 14 changed files with 208 additions and 86 deletions.
52 changes: 41 additions & 11 deletions shotover-proxy/benches/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,48 @@
use anyhow::Result;
use shotover_proxy::runner::{ConfigOpts, Runner};
use tokio::runtime::Runtime;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;

pub fn run_shotover_with_topology(topology_path: &str) -> (Runtime, JoinHandle<Result<()>>) {
let opts = ConfigOpts {
topology_file: topology_path.into(),
config_file: "config/config.yaml".into(),
..ConfigOpts::default()
};
let spawn = Runner::new(opts).unwrap().run_spawn();
pub struct ShotoverManager {
pub runtime: Runtime,
pub handle: Option<JoinHandle<Result<()>>>,
pub trigger_shutdown_tx: broadcast::Sender<()>,
}

impl ShotoverManager {
pub fn from_topology_file(topology_path: &str) -> ShotoverManager {
let opts = ConfigOpts {
topology_file: topology_path.into(),
config_file: "config/config.yaml".into(),
..ConfigOpts::default()
};
let spawn = Runner::new(opts).unwrap().run_spawn();

// If we allow the tracing_guard to be dropped then the following tests in the same file will not get tracing so we mem::forget it.
// This is because tracing can only be initialized once in the same execution, secondary attempts to initalize tracing will silently fail.
std::mem::forget(spawn.tracing_guard);

ShotoverManager {
runtime: spawn.runtime,
handle: Some(spawn.handle),
trigger_shutdown_tx: spawn.trigger_shutdown_tx,
}
}
}

// If we allow the tracing_guard to be dropped then the following tests in the same file will not get tracing so we mem::forget it.
// This is because tracing can only be initialized once in the same execution, secondary attempts to initalize tracing will silently fail.
std::mem::forget(spawn.tracing_guard);
(spawn.runtime, spawn.handle)
impl Drop for ShotoverManager {
fn drop(&mut self) {
if std::thread::panicking() {
// If already panicking do nothing in order to avoid a double panic.
// We only shutdown shotover to test the shutdown process not because we need to clean up any resources.
// So skipping shutdown on panic is fine.
} else {
self.trigger_shutdown_tx.send(()).unwrap();
self.runtime
.block_on(self.handle.take().unwrap())
.unwrap()
.unwrap();
}
}
}
11 changes: 7 additions & 4 deletions shotover-proxy/benches/redis_benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use std::time::Duration;
use test_helpers::docker_compose::DockerCompose;

mod helpers;
use helpers::run_shotover_with_topology;
use helpers::ShotoverManager;

fn redis_active_bench(c: &mut Criterion) {
let _compose = DockerCompose::new("examples/redis-multi/docker-compose.yml");
let _running = run_shotover_with_topology("examples/redis-multi/topology.yaml");
let _shotover_manager =
ShotoverManager::from_topology_file("examples/redis-multi/topology.yaml");

let client = redis::Client::open("redis://127.0.0.1:6379/").unwrap();
let mut con;
Expand Down Expand Up @@ -40,7 +41,8 @@ fn redis_active_bench(c: &mut Criterion) {

fn redis_cluster_bench(c: &mut Criterion) {
let _compose = DockerCompose::new("examples/redis-cluster/docker-compose.yml");
let _running = run_shotover_with_topology("examples/redis-cluster/topology.yaml");
let _shotover_manager =
ShotoverManager::from_topology_file("examples/redis-cluster/topology.yaml");

let client = redis::Client::open("redis://127.0.0.1:6379/").unwrap();
let mut con;
Expand Down Expand Up @@ -72,7 +74,8 @@ fn redis_cluster_bench(c: &mut Criterion) {

fn redis_passthrough_bench(c: &mut Criterion) {
let _compose = DockerCompose::new("examples/redis-passthrough/docker-compose.yml");
let _running = run_shotover_with_topology("examples/redis-passthrough/topology.yaml");
let _shotover_manager =
ShotoverManager::from_topology_file("examples/redis-passthrough/topology.yaml");

let client = redis::Client::open("redis://127.0.0.1:6379/").unwrap();
let mut con;
Expand Down
8 changes: 5 additions & 3 deletions shotover-proxy/src/config/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,15 @@ impl Topology {
}

#[allow(clippy::type_complexity)]
pub async fn run_chains(&self) -> Result<(Vec<Sources>, Receiver<()>)> {
pub async fn run_chains(
&self,
trigger_shutdown_rx: broadcast::Sender<()>,
) -> Result<(Vec<Sources>, Receiver<()>)> {
let mut topics = self.build_topics();
info!("Loaded topics {:?}", topics.topics_tx.keys());

let mut sources_list: Vec<Sources> = Vec::new();

let (notify_shutdown, _) = broadcast::channel(1);
let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1);

let chains = self.build_chains(&topics).await?;
Expand All @@ -125,7 +127,7 @@ impl Topology {
.get_source(
chain,
&mut topics,
notify_shutdown.clone(),
trigger_shutdown_rx.clone(),
shutdown_complete_tx.clone(),
)
.await?,
Expand Down
41 changes: 33 additions & 8 deletions shotover-proxy/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use anyhow::{anyhow, Result};
use clap::{crate_version, Clap};
use metrics_runtime::Receiver;
use tokio::runtime::{self, Runtime};
use tokio::signal;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tracing::{debug, info};
use tracing::{debug, error, info};
use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
use tracing_subscriber::fmt::format::{DefaultFields, Format};
use tracing_subscriber::fmt::Layer;
Expand Down Expand Up @@ -91,17 +93,30 @@ impl Runner {
}

pub fn run_spawn(self) -> RunnerSpawned {
let handle = self.runtime.spawn(run(self.topology, self.config));
let (trigger_shutdown_tx, _) = broadcast::channel(1);
let handle =
self.runtime
.spawn(run(self.topology, self.config, trigger_shutdown_tx.clone()));

RunnerSpawned {
runtime: self.runtime,
tracing_guard: self.tracing.guard,
trigger_shutdown_tx,
handle,
}
}

pub fn run_block(self) -> Result<()> {
self.runtime.block_on(run(self.topology, self.config))
let (trigger_shutdown_tx, _) = broadcast::channel(1);

let trigger_shutdown_tx_clone = trigger_shutdown_tx.clone();
self.runtime.spawn(async move {
signal::ctrl_c().await.unwrap();
trigger_shutdown_tx_clone.send(()).unwrap();
});

self.runtime
.block_on(run(self.topology, self.config, trigger_shutdown_tx))
}
}

Expand Down Expand Up @@ -134,9 +149,14 @@ pub struct RunnerSpawned {
pub runtime: Runtime,
pub handle: JoinHandle<Result<()>>,
pub tracing_guard: WorkerGuard,
pub trigger_shutdown_tx: broadcast::Sender<()>,
}

pub async fn run(topology: Topology, config: Config) -> Result<()> {
pub async fn run(
topology: Topology,
config: Config,
trigger_shutdown_tx: broadcast::Sender<()>,
) -> Result<()> {
info!("Starting Shotover {}", crate_version!());
info!(configuration = ?config);
info!(topology = ?topology);
Expand All @@ -151,12 +171,17 @@ pub async fn run(topology: Topology, config: Config) -> Result<()> {
std::mem::size_of::<Wrapper<'_>>()
);

match topology.run_chains().await {
match topology.run_chains(trigger_shutdown_tx).await {
Ok((_, mut shutdown_complete_rx)) => {
let _ = shutdown_complete_rx.recv().await;
info!("Goodbye!");
shutdown_complete_rx.recv().await;
info!("Shotover was shutdown cleanly.");
Ok(())
}
Err(error) => Err(anyhow!("Failed to run chains: {}", error)),
Err(error) => {
error!("{:?}", error);
Err(anyhow!(
"Shotover failed to initialize, the fatal error was logged."
))
}
}
}
4 changes: 2 additions & 2 deletions shotover-proxy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub struct TcpCodecListener<C: Codec> {
/// handle. When a graceful shutdown is initiated, a `()` value is sent via
/// the broadcast::Sender. Each active connection receives it, reaches a
/// safe terminal state, and completes the task.
pub notify_shutdown: broadcast::Sender<()>,
pub trigger_shutdown_tx: broadcast::Sender<()>,

/// Used as part of the graceful shutdown process to wait for client
/// connections to complete processing.
Expand Down Expand Up @@ -184,7 +184,7 @@ impl<C: Codec + 'static> TcpCodecListener<C> {
limit_connections: self.limit_connections.clone(),

// Receive shutdown notifications.
shutdown: Shutdown::new(self.notify_shutdown.subscribe()),
shutdown: Shutdown::new(self.trigger_shutdown_tx.subscribe()),

// Notifies the receiver half once all clones are
// dropped.
Expand Down
18 changes: 10 additions & 8 deletions shotover-proxy/src/sources/cassandra_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ impl SourcesFromConfig for CassandraConfig {
&self,
chain: &TransformChain,
_topics: &mut TopicHolder,
notify_shutdown: broadcast::Sender<()>,
trigger_shutdown_tx: broadcast::Sender<()>,
shutdown_complete_tx: mpsc::Sender<()>,
) -> Result<Vec<Sources>> {
Ok(vec![Sources::Cassandra(
CassandraSource::new(
chain,
self.listen_addr.clone(),
self.cassandra_ks.clone(),
notify_shutdown,
trigger_shutdown_tx,
shutdown_complete_tx,
self.bypass_query_processing.unwrap_or(true),
self.connection_limit,
Expand All @@ -61,7 +61,7 @@ impl CassandraSource {
chain: &TransformChain,
listen_addr: String,
cassandra_ks: HashMap<String, Vec<String>>,
notify_shutdown: broadcast::Sender<()>,
trigger_shutdown_tx: broadcast::Sender<()>,
shutdown_complete_tx: mpsc::Sender<()>,
bypass: bool,
connection_limit: Option<usize>,
Expand All @@ -71,6 +71,8 @@ impl CassandraSource {

info!("Starting Cassandra source on [{}]", listen_addr);

let mut trigger_shutdown_rx = trigger_shutdown_tx.subscribe();

let mut listener = TcpCodecListener {
chain: chain.clone(),
source_name: name.to_string(),
Expand All @@ -79,7 +81,7 @@ impl CassandraSource {
hard_connection_limit: hard_connection_limit.unwrap_or(false),
codec: CassandraCodec2::new(cassandra_ks, bypass),
limit_connections: Arc::new(Semaphore::new(connection_limit.unwrap_or(512))),
notify_shutdown,
trigger_shutdown_tx,
shutdown_complete_tx,
};

Expand All @@ -90,19 +92,19 @@ impl CassandraSource {
error!(cause = %err, "failed to accept");
}
}
_ = tokio::signal::ctrl_c() => {
info!("Shutdown signal received - shutting down")
_ = trigger_shutdown_rx.recv() => {
info!("cassandra source shutting down")
}
}

let TcpCodecListener {
notify_shutdown,
trigger_shutdown_tx,
shutdown_complete_tx,
..
} = listener;

drop(shutdown_complete_tx);
drop(notify_shutdown);
drop(trigger_shutdown_tx);

Ok(())
});
Expand Down
10 changes: 5 additions & 5 deletions shotover-proxy/src/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,20 @@ impl SourcesConfig {
&self,
chain: &TransformChain,
topics: &mut TopicHolder,
notify_shutdown: broadcast::Sender<()>,
trigger_shutdown_tx: broadcast::Sender<()>,
shutdown_complete_tx: mpsc::Sender<()>,
) -> Result<Vec<Sources>> {
match self {
SourcesConfig::Cassandra(c) => {
c.get_source(chain, topics, notify_shutdown, shutdown_complete_tx)
c.get_source(chain, topics, trigger_shutdown_tx, shutdown_complete_tx)
.await
}
SourcesConfig::Mpsc(m) => {
m.get_source(chain, topics, notify_shutdown, shutdown_complete_tx)
m.get_source(chain, topics, trigger_shutdown_tx, shutdown_complete_tx)
.await
}
SourcesConfig::Redis(r) => {
r.get_source(chain, topics, notify_shutdown, shutdown_complete_tx)
r.get_source(chain, topics, trigger_shutdown_tx, shutdown_complete_tx)
.await
}
}
Expand All @@ -88,7 +88,7 @@ pub trait SourcesFromConfig: Send {
&self,
chain: &TransformChain,
topics: &mut TopicHolder,
notify_shutdown: broadcast::Sender<()>,
trigger_shutdown: broadcast::Sender<()>,
shutdown_complete_tx: mpsc::Sender<()>,
) -> Result<Vec<Sources>>;
}
4 changes: 2 additions & 2 deletions shotover-proxy/src/sources/mpsc_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl SourcesFromConfig for AsyncMpscConfig {
&self,
chain: &TransformChain,
topics: &mut TopicHolder,
notify_shutdown: broadcast::Sender<()>,
trigger_shutdown_on_drop_rx: broadcast::Sender<()>,
shutdown_complete_tx: mpsc::Sender<()>,
) -> Result<Vec<Sources>> {
if let Some(rx) = topics.get_rx(&self.topic_name) {
Expand All @@ -41,7 +41,7 @@ impl SourcesFromConfig for AsyncMpscConfig {
chain.clone(),
rx,
&self.topic_name,
Shutdown::new(notify_shutdown.subscribe()),
Shutdown::new(trigger_shutdown_on_drop_rx.subscribe()),
shutdown_complete_tx,
behavior.clone(),
))])
Expand Down
Loading

0 comments on commit e033bf7

Please sign in to comment.