Skip to content

Commit

Permalink
ShotoverManager -> ShotoverProcess in cassandra_int_tests::cluster_mu…
Browse files Browse the repository at this point in the history
…lti_rack (#1000)
  • Loading branch information
rukai authored Jan 31, 2023
1 parent 010a1d6 commit fc54773
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 18 deletions.
1 change: 0 additions & 1 deletion shotover-proxy/tests/cassandra_int_tests/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use tokio::time::timeout;

pub mod multi_rack;
pub mod single_rack_v3;
#[cfg(feature = "alpha-transforms")]
pub mod single_rack_v4;

pub async fn run_topology_task(ca_path: Option<&str>, port: Option<u32>) -> Vec<CassandraNode> {
Expand Down
47 changes: 33 additions & 14 deletions shotover-proxy/tests/cassandra_int_tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::helpers::ShotoverManager;
use cassandra_protocol::frame::message_error::{ErrorBody, ErrorType};
use cdrs_tokio::frame::events::{
SchemaChange, SchemaChangeOptions, SchemaChangeTarget, SchemaChangeType, ServerEvent,
Expand All @@ -15,7 +14,9 @@ use test_helpers::connection::cassandra::{
};
use test_helpers::connection::redis_connection;
use test_helpers::docker_compose::DockerCompose;
use test_helpers::shotover_process::{shotover_from_topology_file, Count, EventMatcher, Level};
use test_helpers::shotover_process::{
shotover_from_topology_file, shotover_from_topology_file_with_name, Count, EventMatcher, Level,
};
use tokio::time::{timeout, Duration};

mod batch_statements;
Expand All @@ -27,7 +28,6 @@ mod keyspace;
mod native_types;
mod prepared_statements_all;
mod prepared_statements_simple;
#[cfg(feature = "alpha-transforms")]
mod protect;
mod routing;
mod table;
Expand Down Expand Up @@ -72,7 +72,6 @@ async fn passthrough_standard(#[case] driver: CassandraDriver) {
shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "alpha-transforms")]
#[rstest]
#[case::cdrs(CdrsTokio)]
#[cfg_attr(feature = "cassandra-cpp-driver-tests", case::datastax(Datastax))]
Expand Down Expand Up @@ -178,7 +177,6 @@ async fn cluster_single_rack_v3(#[case] driver: CassandraDriver) {
cluster::single_rack_v3::test_topology_task(None).await;
}

#[cfg(feature = "alpha-transforms")]
#[rstest]
#[case::cdrs(CdrsTokio)]
#[cfg_attr(feature = "cassandra-cpp-driver-tests", case::datastax(Datastax))]
Expand Down Expand Up @@ -279,15 +277,21 @@ async fn cluster_multi_rack(#[case] driver: CassandraDriver) {
DockerCompose::new("example-configs/cassandra-cluster-multi-rack/docker-compose.yaml");

{
let _shotover_manager_rack1 = ShotoverManager::from_topology_file_without_observability(
let shotover_rack1 = shotover_from_topology_file_with_name(
"example-configs/cassandra-cluster-multi-rack/topology_rack1.yaml",
);
let _shotover_manager_rack2 = ShotoverManager::from_topology_file_without_observability(
"Rack1",
)
.await;
let shotover_rack2 = shotover_from_topology_file_with_name(
"example-configs/cassandra-cluster-multi-rack/topology_rack2.yaml",
);
let _shotover_manager_rack3 = ShotoverManager::from_topology_file_without_observability(
"Rack2",
)
.await;
let shotover_rack3 = shotover_from_topology_file_with_name(
"example-configs/cassandra-cluster-multi-rack/topology_rack3.yaml",
);
"Rack3",
)
.await;

let connection = || async {
let mut connection = CassandraConnection::new("127.0.0.1", 9042, driver).await;
Expand All @@ -301,12 +305,29 @@ async fn cluster_multi_rack(#[case] driver: CassandraDriver) {

//Check for bugs in cross connection state
native_types::test(&connection().await).await;

// TODO: This can be removed by allowing the shotover_from_topology_file_* to specify the `observability_interface`.
// I've avoided doing that in this PR because shotover_from_topology_file should probably become a builder type,
// and its up in the air whether we will have a config file or not.
let metrics_port_collision = [EventMatcher::new()
.with_level(Level::Error)
.with_target("shotover_proxy::observability")
.with_message(r#"Metrics HTTP server failed: Failed to bind to 0.0.0.0:9001"#)
.with_count(Count::Any)];
shotover_rack1
.shutdown_and_then_consume_events(&metrics_port_collision)
.await;
shotover_rack2
.shutdown_and_then_consume_events(&metrics_port_collision)
.await;
shotover_rack3
.shutdown_and_then_consume_events(&metrics_port_collision)
.await;
}

cluster::multi_rack::test_topology_task(None).await;
}

#[cfg(feature = "alpha-transforms")]
#[rstest]
#[case::scylla(Scylla)]
//#[case::cdrs(CdrsTokio)] // TODO
Expand Down Expand Up @@ -397,7 +418,6 @@ async fn cassandra_redis_cache(#[case] driver: CassandraDriver) {
shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "alpha-transforms")]
#[rstest]
// #[case::cdrs(CdrsTokio)] // TODO
#[cfg_attr(feature = "cassandra-cpp-driver-tests", case::datastax(Datastax))]
Expand All @@ -420,7 +440,6 @@ async fn protect_transform_local(#[case] driver: CassandraDriver) {
shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "alpha-transforms")]
#[rstest]
//#[case::cdrs(CdrsTokio)] // TODO
#[cfg_attr(feature = "cassandra-cpp-driver-tests", case::datastax(Datastax))]
Expand Down
9 changes: 9 additions & 0 deletions test-helpers/src/shotover_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,17 @@ pub use tokio_bin_process::event_matcher::{Count, EventMatcher, Events};
pub use tokio_bin_process::BinProcess;

pub async fn shotover_from_topology_file(topology_path: &str) -> BinProcess {
shotover_from_topology_file_with_name(topology_path, "Shotover").await
}

pub async fn shotover_from_topology_file_with_name(
topology_path: &str,
log_name: &str,
) -> BinProcess {
let mut shotover = BinProcess::start_with_args(
"shotover-proxy",
&["-t", topology_path, "--log-format", "json"],
log_name,
)
.await;

Expand All @@ -33,6 +41,7 @@ pub async fn shotover_from_topology_file_fail_to_startup(
BinProcess::start_with_args(
"shotover-proxy",
&["-t", topology_path, "--log-format", "json"],
"Shotover",
)
.await
.consume_remaining_events_expect_failure(expected_errors_and_warnings)
Expand Down
17 changes: 14 additions & 3 deletions tokio-bin-process/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,19 @@ impl BinProcess {
///
/// Dropping the BinProcess will trigger a panic unless shutdown_and_then_consume_events or consume_remaining_events has been called.
/// This is done to avoid missing important assertions run by those methods.
pub async fn start_with_args(cargo_package_name: &str, binary_args: &[&str]) -> BinProcess {
pub async fn start_with_args(
cargo_package_name: &str,
binary_args: &[&str],
log_name: &str,
) -> BinProcess {
setup_tracing_subscriber_for_test_logic();

let log_name = if log_name.len() > 10 {
panic!("In order to line up in log outputs, argument log_name to BinProcess::start_with_args must be of length <= 10 but the value was: {log_name}");
} else {
format!("{log_name: <10}") //pads log_name up to 10 chars so that it lines up properly when included in log output.
};

// PROFILE is set in build.rs from PROFILE listed in https://doc.rust-lang.org/cargo/reference/environment-variables.html#environment-variables-cargo-sets-for-build-scripts
let release = env!("PROFILE") == "release";

Expand Down Expand Up @@ -104,7 +114,7 @@ impl BinProcess {
let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
let reader = BufReader::new(child.stdout.take().unwrap()).lines();
tokio::spawn(async move {
if let Err(err) = process_stdout_events(reader, &event_tx).await {
if let Err(err) = process_stdout_events(reader, &event_tx, log_name).await {
// Because we are in a task, panicking is likely to be ignored.
// Instead we generate a fake error event, which is possibly a bit confusing for the user but will at least cause the test to fail.
event_tx
Expand Down Expand Up @@ -266,13 +276,14 @@ impl BinProcess {
async fn process_stdout_events(
mut reader: tokio::io::Lines<BufReader<ChildStdout>>,
event_tx: &mpsc::UnboundedSender<Event>,
name: String,
) -> Result<()> {
while let Some(line) = reader.next_line().await.context("An IO error occured while reading stdout from the application, I'm not actually sure when this happens?")? {
let event = Event::from_json_str(&line).context(format!(
"The application emitted a line that was not a valid event encoded in json: {}",
line
))?;
println!("{} {event}", Color::Default.dimmed().paint("BINPROCESS"));
println!("{} {event}", Color::Default.dimmed().paint(&name));
if event_tx.send(event).is_err() {
// BinProcess is no longer interested in events
return Ok(());
Expand Down

0 comments on commit fc54773

Please sign in to comment.