Skip to content

Commit

Permalink
chore: add meta stable check into integration test (#1202)
Browse files Browse the repository at this point in the history
## Rationale
Currently, we just left 20s for ceresmeta to do the initialization work
in integration test.
However, it will be not enough something... And the test failure will
make us so panic...
In this pr, I add a meta stable check to reduce the integration test
failure.

## Detailed Changes
Add a meta stable check by keep creating a radom named table utill
succeeding(surely, retry max exists).

## Test Plan
Test manually.
  • Loading branch information
Rachelint authored Sep 8, 2023
1 parent 395debb commit 0451a9c
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ reqwest = { workspace = true }
serde = { workspace = true }
sqlness = "0.4.3"
tokio = { workspace = true }
uuid = { version = "1.3", features = ["v4"] }
80 changes: 72 additions & 8 deletions integration_tests/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const CERESDB_CONFIG_FILE_0_ENV: &str = "CERESDB_CONFIG_FILE_0";
const CERESDB_CONFIG_FILE_1_ENV: &str = "CERESDB_CONFIG_FILE_1";
const CLUSTER_CERESDB_STDOUT_FILE_0_ENV: &str = "CLUSTER_CERESDB_STDOUT_FILE_0";
const CLUSTER_CERESDB_STDOUT_FILE_1_ENV: &str = "CLUSTER_CERESDB_STDOUT_FILE_1";
const CLUSTER_CERESDB_HEALTH_CHECK_INTERVAL_SECONDS: usize = 5;

const CERESDB_SERVER_ADDR: &str = "CERESDB_SERVER_ADDR";

Expand All @@ -63,9 +64,10 @@ impl HttpClient {
}
}

#[async_trait]
pub trait Backend {
fn start() -> Self;
fn wait_for_ready(&self);
async fn wait_for_ready(&self);
fn stop(&mut self);
}

Expand All @@ -77,6 +79,10 @@ pub struct CeresDBCluster {
server0: CeresDBServer,
server1: CeresDBServer,
ceresmeta_process: Child,

/// Used in meta health check
db_client: Arc<dyn DbClient>,
meta_stable_check_sql: String,
}

impl CeresDBServer {
Expand All @@ -97,6 +103,7 @@ impl CeresDBServer {
}
}

#[async_trait]
impl Backend for CeresDBServer {
fn start() -> Self {
let config = env::var(CERESDB_CONFIG_FILE_ENV).expect("Cannot parse ceresdb config env");
Expand All @@ -105,15 +112,33 @@ impl Backend for CeresDBServer {
Self::spawn(bin, config, stdout)
}

fn wait_for_ready(&self) {
std::thread::sleep(Duration::from_secs(5));
async fn wait_for_ready(&self) {
tokio::time::sleep(Duration::from_secs(10)).await
}

fn stop(&mut self) {
self.server_process.kill().expect("Failed to kill server");
}
}

impl CeresDBCluster {
async fn check_meta_stable(&self) -> bool {
let query_ctx = RpcContext {
database: Some("public".to_string()),
timeout: None,
};

let query_req = Request {
tables: vec![],
sql: self.meta_stable_check_sql.clone(),
};

let result = self.db_client.sql_query(&query_ctx, &query_req).await;
result.is_ok()
}
}

#[async_trait]
impl Backend for CeresDBCluster {
fn start() -> Self {
let ceresmeta_bin =
Expand Down Expand Up @@ -149,16 +174,55 @@ impl Backend for CeresDBCluster {
let server0 = CeresDBServer::spawn(ceresdb_bin.clone(), ceresdb_config_0, stdout0);
let server1 = CeresDBServer::spawn(ceresdb_bin, ceresdb_config_1, stdout1);

// Meta stable check context
let endpoint = env::var(SERVER_GRPC_ENDPOINT_ENV).unwrap_or_else(|_| {
panic!("Cannot read server endpoint from env {SERVER_GRPC_ENDPOINT_ENV:?}")
});
let db_client = Builder::new(endpoint, Mode::Proxy).build();

let meta_stable_check_sql = format!(
r#"CREATE TABLE `stable_check_{}`
(`name` string TAG, `value` double NOT NULL, `t` timestamp NOT NULL, TIMESTAMP KEY(t))"#,
uuid::Uuid::new_v4()
);

Self {
server0,
server1,
ceresmeta_process,
db_client,
meta_stable_check_sql,
}
}

fn wait_for_ready(&self) {
println!("wait for cluster service ready...\n");
std::thread::sleep(Duration::from_secs(20));
async fn wait_for_ready(&self) {
println!("wait for cluster service initialized...");
tokio::time::sleep(Duration::from_secs(20_u64)).await;

println!("wait for cluster service stable begin...");
let mut wait_cnt = 0;
let wait_max = 6;
loop {
if wait_cnt >= wait_max {
println!(
"wait too long for cluster service stable, maybe somethings went wrong..."
);
return;
}

if self.check_meta_stable().await {
println!("wait for cluster service stable finished...");
return;
}

wait_cnt += 1;
let has_waited = wait_cnt * CLUSTER_CERESDB_HEALTH_CHECK_INTERVAL_SECONDS;
println!("waiting for cluster service stable, has_waited:{has_waited}s");
tokio::time::sleep(Duration::from_secs(
CLUSTER_CERESDB_HEALTH_CHECK_INTERVAL_SECONDS as u64,
))
.await;
}
}

fn stop(&mut self) {
Expand Down Expand Up @@ -226,9 +290,9 @@ impl<T: Send + Sync> Database for CeresDB<T> {
}

impl<T: Backend> CeresDB<T> {
pub fn create() -> CeresDB<T> {
pub async fn create() -> CeresDB<T> {
let backend = T::start();
backend.wait_for_ready();
backend.wait_for_ready().await;

let endpoint = env::var(SERVER_GRPC_ENDPOINT_ENV).unwrap_or_else(|_| {
panic!("Cannot read server endpoint from env {SERVER_GRPC_ENDPOINT_ENV:?}")
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ impl EnvController for CeresDBController {
async fn start(&self, env: &str, _config: Option<&Path>) -> Self::DB {
println!("start with env {env}");
let db = match env {
"local" => Box::new(CeresDB::<CeresDBServer>::create()) as DbRef,
"cluster" => Box::new(CeresDB::<CeresDBCluster>::create()) as DbRef,
"local" => Box::new(CeresDB::<CeresDBServer>::create().await) as DbRef,
"cluster" => Box::new(CeresDB::<CeresDBCluster>::create().await) as DbRef,
_ => panic!("invalid env {env}"),
};

Expand Down

0 comments on commit 0451a9c

Please sign in to comment.