Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - fix: fluvio-test harness bug #2790

Closed
wants to merge 12 commits into from
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ jobs:
spu: [2]
test:
[
validate-test-harness,
smoke-test,
smoke-test-tls,
smoke-test-at-most-once,
Expand Down Expand Up @@ -582,6 +583,11 @@ jobs:
fluvio cluster start --local --develop --spu ${{ matrix.spu }} --rust-log ${{ env.SERVER_LOG }} ${{ env.TLS_ARGS }}
- name: sleep
run: sleep 15
- name: Validate test harness
if: matrix.test == 'validate-test-harness'
timeout-minutes: 5
run : |
make validate-test-harness
- name: Run smoke-test
if: matrix.test == 'smoke-test'
timeout-minutes: 5
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/hourly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ env:

jobs:
build_long_binaries:
name: Test buiild for ${{ matrix.binary }} on (${{ matrix.os }})
name: Test build for ${{ matrix.binary }} on (${{ matrix.os }})
if: ${{ false }}
runs-on: ${{ matrix.os }}
strategy:
Expand Down Expand Up @@ -191,6 +191,7 @@ jobs:
--topic ${{ matrix.topic }} \
--topic-segment-size ${{ matrix.topic_segment_bytes }} \
--topic-retention ${{ matrix.topic_retention }} \
--expect-timeout
longevity

## If the test passed, then copy the data from cluster to store into artifacts
Expand Down
10 changes: 9 additions & 1 deletion crates/fluvio-test-util/test_meta/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ pub struct EnvironmentSetup {
#[clap(long)]
pub disable_timeout: bool,

/// Global timeout for a test. Will report as fail when reached
/// Global timeout for a test. Will report as fail when reached (unless --expect-timeout)
/// ex. 30s, 15m, 2h, 1w
#[clap(long, default_value = "1h", value_parser=parse_duration)]
pub timeout: Duration,
Expand All @@ -276,4 +276,12 @@ pub struct EnvironmentSetup {
/// K8: use sc address
#[clap(long)]
pub proxy_addr: Option<String>,

/// Will report fail unless test times out
#[clap(long, conflicts_with = "expect_fail")]
pub expect_timeout: bool,

/// Expect a test to fail. (fail-> pass. pass or timeout -> fail)
#[clap(long, conflicts_with = "expect_timeout")]
pub expect_fail: bool,
}
66 changes: 45 additions & 21 deletions crates/fluvio-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ fn run_test(
let test_cluster_opts = TestCluster::new(environment.clone());
let test_driver = TestDriver::new(Some(test_cluster_opts));

let ok_signal = Arc::new(AtomicBool::new(false));
let finished_signal = Arc::new(AtomicBool::new(false));
let fail_signal = Arc::new(AtomicBool::new(false));
signal_hook::flag::register(signal_hook::consts::SIGUSR1, Arc::clone(&ok_signal))
signal_hook::flag::register(signal_hook::consts::SIGUSR1, Arc::clone(&finished_signal))
.expect("fail to register ok signal hook");
signal_hook::flag::register(signal_hook::consts::SIGUSR2, Arc::clone(&fail_signal))
.expect("fail to register fail signal hook");
Expand All @@ -107,17 +107,26 @@ fn run_test(
let status = std::panic::catch_unwind(AssertUnwindSafe(|| {
(test_meta.test_fn)(test_driver, test_case)
}));
match status {
Ok(_) => {
println!("test passed, signalling success to parent");
root_process.kill_with(Signal::User1);
process::exit(0)
}
Err(err) => {
println!("test failed {:#?}, signalling parent", err);
root_process.kill_with(Signal::User2);
process::exit(1);
let parent_id = get_parent_pid();
debug!(
"catch_unwind. PID {:?}, root_pid {root_pid}, parent_id {parent_id}",
get_current_pid().unwrap(),
);
if parent_id == root_pid {
println!("test complete, signaling to parent");
root_process.kill_with(Signal::User1);
}
if let Err(err) = status {
if environment.expect_fail {
println!("test failed as expected, signaling parent");
} else {
println!("test failed {:#?}, signaling parent", err);
}
// This doesn't actually kill root_process, just sends it the signal
root_process.kill_with(Signal::User2);
process::exit(1);
} else {
process::exit(0);
}
}
Err(_) => panic!("Fork failed"),
Expand All @@ -132,7 +141,7 @@ fn run_test(
let mut timed_out = false;

loop {
if ok_signal.load(Ordering::Relaxed) || fail_signal.load(Ordering::Relaxed) {
if finished_signal.load(Ordering::Relaxed) {
debug!("signal received");
break;
}
Expand All @@ -148,29 +157,36 @@ fn run_test(
}
}

let ok = ok_signal.load(Ordering::Relaxed);
let fail = fail_signal.load(Ordering::Relaxed);
let success = fail_signal.load(Ordering::Relaxed) == environment.expect_fail;

debug!(ok, fail, "signal status");
debug!(success, "signal status");

if timed_out {
println!("Test timed out after {} seconds", timeout.as_secs());
let success = if environment.expect_timeout {
println!(
"Test timed out as expected after {} seconds",
timeout.as_secs()
);
true
} else {
println!("Test timed out after {} seconds", timeout.as_secs());
false
};
kill_child_processes(root_process);
TestResult {
success: true,
success,
duration: start.elapsed().unwrap(),
..std::default::Default::default()
}
} else if ok {
} else if success {
println!("Test passed");
TestResult {
success: true,
duration: start.elapsed().unwrap(),
..std::default::Default::default()
}
} else {
println!("test failed, killing all child processes");
kill_child_processes(root_process);
println!("Test failed");
TestResult {
success: false,
duration: start.elapsed().unwrap(),
Expand Down Expand Up @@ -277,6 +293,14 @@ fn create_spinning_indicator() -> Option<ProgressBar> {
}
}

fn get_parent_pid() -> sysinfo::Pid {
let pid = get_current_pid().expect("Unable to get current pid");
let mut sys2 = System::new();
sys2.refresh_processes();
let current_process = sys2.process(pid).expect("Current process not found");
current_process.parent().expect("Parent process not found")
}

#[cfg(test)]
mod tests {
// CLI Tests
Expand Down
40 changes: 40 additions & 0 deletions crates/fluvio-test/src/tests/expected_fail.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use std::any::Any;
use std::time::Duration;

use clap::Parser;
use fluvio_future::timer::sleep;
use fluvio_test_derive::fluvio_test;
use fluvio_test_util::test_meta::{TestOption, TestCase};
use fluvio_test_util::async_process;

#[derive(Debug, Clone)]
pub struct ExpectedFailTestCase {}

impl From<TestCase> for ExpectedFailTestCase {
fn from(_test_case: TestCase) -> Self {
ExpectedFailTestCase {}
}
}

#[derive(Debug, Parser, Clone)]
#[clap(name = "Fluvio Expected Fail Test")]
pub struct ExpectedFailTestOption {}
impl TestOption for ExpectedFailTestOption {
fn as_any(&self) -> &dyn Any {
self
}
}

#[fluvio_test(name = "expected_fail", topic = "unused")]
pub fn run(mut test_driver: FluvioTestDriver, mut test_case: TestCase) {
println!("\nStarting example test that fails");

let fast_fail = async_process!(
async {
sleep(Duration::from_millis(2000)).await;
panic!("This test should fail");
},
"fast-fail"
);
fast_fail.join().unwrap();
}
48 changes: 48 additions & 0 deletions crates/fluvio-test/src/tests/expected_fail_join_fail_first.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::any::Any;
use std::time::Duration;

use clap::Parser;
use fluvio_future::timer::sleep;
use fluvio_test_derive::fluvio_test;
use fluvio_test_util::test_meta::{TestOption, TestCase};
use fluvio_test_util::async_process;

#[derive(Debug, Clone)]
pub struct ExpectedFailJoinFailFirstTestCase {}

impl From<TestCase> for ExpectedFailJoinFailFirstTestCase {
fn from(_test_case: TestCase) -> Self {
ExpectedFailJoinFailFirstTestCase {}
}
}

#[derive(Debug, Parser, Clone)]
#[clap(name = "Fluvio Expected FailJoinFailFirst Test")]
pub struct ExpectedFailJoinFailFirstTestOption {}
impl TestOption for ExpectedFailJoinFailFirstTestOption {
fn as_any(&self) -> &dyn Any {
self
}
}

#[fluvio_test(name = r#"expected_fail_join_fail_first"#, topic = "unused")]
pub fn run(mut test_driver: FluvioTestDriver, mut test_case: TestCase) {
println!("\nStarting example test that fails");

let success = async_process!(
async {
sleep(Duration::from_millis(2000)).await;
},
"success"
);

let fail = async_process!(
async {
sleep(Duration::from_millis(200)).await;
panic!("This test should fail");
},
"fail"
);
fail.join().unwrap();
success.join().unwrap();
}
48 changes: 48 additions & 0 deletions crates/fluvio-test/src/tests/expected_fail_join_success_first.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::any::Any;
use std::time::Duration;

use clap::Parser;
use fluvio_future::timer::sleep;
use fluvio_test_derive::fluvio_test;
use fluvio_test_util::test_meta::{TestOption, TestCase};
use fluvio_test_util::async_process;

#[derive(Debug, Clone)]
pub struct ExpectedFailJoinSuccessFirstTestCase {}

impl From<TestCase> for ExpectedFailJoinSuccessFirstTestCase {
fn from(_test_case: TestCase) -> Self {
ExpectedFailJoinSuccessFirstTestCase {}
}
}

#[derive(Debug, Parser, Clone)]
#[clap(name = "Fluvio Expected FailJoinSuccessFirst Test")]
pub struct ExpectedFailJoinSuccessFirstTestOption {}
impl TestOption for ExpectedFailJoinSuccessFirstTestOption {
fn as_any(&self) -> &dyn Any {
self
}
}

#[fluvio_test(name = "expected_fail_join_success_first", topic = "unused")]
pub fn run(mut test_driver: FluvioTestDriver, mut test_case: TestCase) {
println!("\nStarting example test that fails");

let success = async_process!(
async {
sleep(Duration::from_millis(100)).await;
},
"success"
);

let fail = async_process!(
async {
sleep(Duration::from_millis(200)).await;
panic!("This test should fail");
},
"fail"
);
success.join().unwrap();
fail.join().unwrap();
}
37 changes: 37 additions & 0 deletions crates/fluvio-test/src/tests/expected_pass.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::any::Any;

use clap::Parser;
use fluvio_test_derive::fluvio_test;
use fluvio_test_util::test_meta::{TestOption, TestCase};
use fluvio_test_util::async_process;

#[derive(Debug, Clone)]
pub struct ExpectedPassTestCase {}

impl From<TestCase> for ExpectedPassTestCase {
fn from(_test_case: TestCase) -> Self {
ExpectedPassTestCase {}
}
}

#[derive(Debug, Parser, Clone)]
#[clap(name = "Fluvio Expected Fail Test")]
pub struct ExpectedPassTestOption {}
impl TestOption for ExpectedPassTestOption {
fn as_any(&self) -> &dyn Any {
self
}
}

#[fluvio_test(name = "expected_pass", topic = "unused")]
pub fn run(mut test_driver: FluvioTestDriver, mut test_case: TestCase) {
println!("\nStarting example test that passes");

let fast_success = async_process!(
async {
// Do nothing and exit
},
"fast-success"
);
fast_success.join().unwrap();
}
42 changes: 42 additions & 0 deletions crates/fluvio-test/src/tests/expected_timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::any::Any;
use std::time::Duration;

use clap::Parser;
use fluvio_future::timer::sleep;
use fluvio_test_derive::fluvio_test;
use fluvio_test_util::test_meta::{TestOption, TestCase};
use fluvio_test_util::async_process;

#[derive(Debug, Clone)]
pub struct ExpectedTimeoutTestCase {}

impl From<TestCase> for ExpectedTimeoutTestCase {
fn from(_test_case: TestCase) -> Self {
ExpectedTimeoutTestCase {}
}
}

#[derive(Debug, Parser, Clone)]
#[clap(name = "Fluvio Expected timeout Test")]
pub struct ExpectedTimeoutTestOption {}
impl TestOption for ExpectedTimeoutTestOption {
fn as_any(&self) -> &dyn Any {
self
}
}

#[fluvio_test(name = "expected_timeout", topic = "unused")]
pub fn run(mut test_driver: FluvioTestDriver, mut test_case: TestCase) {
println!("\nStarting example test that timeouts");

let infinite_loop = async_process!(
async {
loop {
sleep(Duration::from_secs(1)).await
}
// Do nothing and exit
},
"infinite loop"
);
infinite_loop.join().unwrap();
}
Loading