diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 2aee20d0f930..fd10e5ba8b25 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -730,10 +730,8 @@ mod tests { use super::*; use crate::error::PostgresExecutionSnafu; - const CREATE_TABLE: &str = - "CREATE TABLE IF NOT EXISTS greptime_metakv(k bytea PRIMARY KEY, v bytea);"; - async fn create_postgres_client() -> Result { + async fn create_postgres_client(table_name: Option<&str>) -> Result { let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); if endpoint.is_empty() { return UnexpectedSnafu { @@ -747,17 +745,30 @@ mod tests { tokio::spawn(async move { connection.await.context(PostgresExecutionSnafu).unwrap(); }); - client.execute(CREATE_TABLE, &[]).await.unwrap(); + if let Some(table_name) = table_name { + let create_table_sql = format!( + "CREATE TABLE IF NOT EXISTS {}(k bytea PRIMARY KEY, v bytea);", + table_name + ); + client.execute(&create_table_sql, &[]).await.unwrap(); + } Ok(client) } + async fn drop_table(client: &Client, table_name: &str) { + let sql = format!("DROP TABLE IF EXISTS {};", table_name); + client.execute(&sql, &[]).await.unwrap(); + } + #[tokio::test] async fn test_postgres_crud() { - let client = create_postgres_client().await.unwrap(); - let key = "test_key".to_string(); let value = "test_value".to_string(); + let uuid = uuid::Uuid::new_v4().to_string(); + let table_name = "test_postgres_crud_greptime_metakv"; + let client = create_postgres_client(Some(table_name)).await.unwrap(); + let (tx, _) = broadcast::channel(100); let pg_election = PgElection { leader_value: "test_leader".to_string(), @@ -765,9 +776,9 @@ mod tests { is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, - store_key_prefix: uuid::Uuid::new_v4().to_string(), + store_key_prefix: uuid, candidate_lease_ttl_secs: 10, - sql_set: ElectionSqlFactory::new(28319, "greptime_metakv").build(), + sql_set: ElectionSqlFactory::new(28319, table_name).build(), }; let res = pg_election @@ -823,14 +834,17 @@ mod tests { .unwrap(); assert!(res.is_empty()); assert!(current == Timestamp::default()); + + drop_table(&pg_election.client, table_name).await; } async fn candidate( leader_value: String, candidate_lease_ttl_secs: u64, store_key_prefix: String, + table_name: String, ) { - let client = create_postgres_client().await.unwrap(); + let client = create_postgres_client(None).await.unwrap(); let (tx, _) = broadcast::channel(100); let pg_election = PgElection { @@ -841,7 +855,7 @@ mod tests { leader_watcher: tx, store_key_prefix, candidate_lease_ttl_secs, - sql_set: ElectionSqlFactory::new(28319, "greptime_metakv").build(), + sql_set: ElectionSqlFactory::new(28319, &table_name).build(), }; let node_info = MetasrvNodeInfo { @@ -857,22 +871,24 @@ mod tests { async fn test_candidate_registration() { let leader_value_prefix = "test_leader".to_string(); let candidate_lease_ttl_secs = 5; - let store_key_prefix = uuid::Uuid::new_v4().to_string(); + let uuid = uuid::Uuid::new_v4().to_string(); + let table_name = "test_candidate_registration_greptime_metakv"; let mut handles = vec![]; + let client = create_postgres_client(Some(table_name)).await.unwrap(); + for i in 0..10 { let leader_value = format!("{}{}", leader_value_prefix, i); let handle = tokio::spawn(candidate( leader_value, candidate_lease_ttl_secs, - store_key_prefix.clone(), + uuid.clone(), + table_name.to_string(), )); handles.push(handle); } // Wait for candidates to registrate themselves and renew their leases at least once. tokio::time::sleep(Duration::from_secs(3)).await; - let client = create_postgres_client().await.unwrap(); - let (tx, _) = broadcast::channel(100); let leader_value = "test_leader".to_string(); let pg_election = PgElection { @@ -881,9 +897,9 @@ mod tests { is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, - store_key_prefix: store_key_prefix.clone(), + store_key_prefix: uuid.clone(), candidate_lease_ttl_secs, - sql_set: ElectionSqlFactory::new(28319, "greptime_metakv").build(), + sql_set: ElectionSqlFactory::new(28319, table_name).build(), }; let candidates = pg_election.all_candidates().await.unwrap(); @@ -900,20 +916,21 @@ mod tests { // Garbage collection for i in 0..10 { - let key = format!( - "{}{}{}{}", - store_key_prefix, CANDIDATES_ROOT, leader_value_prefix, i - ); + let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i); let res = pg_election.delete_value(&key).await.unwrap(); assert!(res); } + + drop_table(&pg_election.client, table_name).await; } #[tokio::test] async fn test_elected_and_step_down() { let leader_value = "test_leader".to_string(); let candidate_lease_ttl_secs = 5; - let client = create_postgres_client().await.unwrap(); + let uuid = uuid::Uuid::new_v4().to_string(); + let table_name = "test_elected_and_step_down_greptime_metakv"; + let client = create_postgres_client(Some(table_name)).await.unwrap(); let (tx, mut rx) = broadcast::channel(100); let leader_pg_election = PgElection { @@ -922,9 +939,9 @@ mod tests { is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, - store_key_prefix: uuid::Uuid::new_v4().to_string(), + store_key_prefix: uuid, candidate_lease_ttl_secs, - sql_set: ElectionSqlFactory::new(28320, "greptime_metakv").build(), + sql_set: ElectionSqlFactory::new(28320, table_name).build(), }; leader_pg_election.elected().await.unwrap(); @@ -1015,14 +1032,17 @@ mod tests { } _ => panic!("Expected LeaderChangeMessage::StepDown"), } + + drop_table(&leader_pg_election.client, table_name).await; } #[tokio::test] async fn test_leader_action() { let leader_value = "test_leader".to_string(); - let store_key_prefix = uuid::Uuid::new_v4().to_string(); + let uuid = uuid::Uuid::new_v4().to_string(); + let table_name = "test_leader_action_greptime_metakv"; let candidate_lease_ttl_secs = 5; - let client = create_postgres_client().await.unwrap(); + let client = create_postgres_client(Some(table_name)).await.unwrap(); let (tx, mut rx) = broadcast::channel(100); let leader_pg_election = PgElection { @@ -1031,9 +1051,9 @@ mod tests { is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, - store_key_prefix, + store_key_prefix: uuid, candidate_lease_ttl_secs, - sql_set: ElectionSqlFactory::new(28321, "greptime_metakv").build(), + sql_set: ElectionSqlFactory::new(28321, table_name).build(), }; // Step 1: No leader exists, campaign and elected. @@ -1246,15 +1266,18 @@ mod tests { .query(&leader_pg_election.sql_set.step_down, &[]) .await .unwrap(); + + drop_table(&leader_pg_election.client, table_name).await; } #[tokio::test] async fn test_follower_action() { common_telemetry::init_default_ut_logging(); let candidate_lease_ttl_secs = 5; - let store_key_prefix = uuid::Uuid::new_v4().to_string(); + let uuid = uuid::Uuid::new_v4().to_string(); + let table_name = "test_follower_action_greptime_metakv"; - let follower_client = create_postgres_client().await.unwrap(); + let follower_client = create_postgres_client(Some(table_name)).await.unwrap(); let (tx, mut rx) = broadcast::channel(100); let follower_pg_election = PgElection { leader_value: "test_follower".to_string(), @@ -1262,12 +1285,12 @@ mod tests { is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, - store_key_prefix: store_key_prefix.clone(), + store_key_prefix: uuid.clone(), candidate_lease_ttl_secs, - sql_set: ElectionSqlFactory::new(28322, "greptime_metakv").build(), + sql_set: ElectionSqlFactory::new(28322, table_name).build(), }; - let leader_client = create_postgres_client().await.unwrap(); + let leader_client = create_postgres_client(Some(table_name)).await.unwrap(); let (tx, _) = broadcast::channel(100); let leader_pg_election = PgElection { leader_value: "test_leader".to_string(), @@ -1275,9 +1298,9 @@ mod tests { is_leader: AtomicBool::new(false), leader_infancy: AtomicBool::new(true), leader_watcher: tx, - store_key_prefix, + store_key_prefix: uuid, candidate_lease_ttl_secs, - sql_set: ElectionSqlFactory::new(28322, "greptime_metakv").build(), + sql_set: ElectionSqlFactory::new(28322, table_name).build(), }; leader_pg_election @@ -1326,5 +1349,7 @@ mod tests { .query(&leader_pg_election.sql_set.step_down, &[]) .await .unwrap(); + + drop_table(&follower_pg_election.client, table_name).await; } }