Skip to content

Commit

Permalink
[#3214] improvement(IT): use Awaitility#await instead of `Thread#sl…
Browse files Browse the repository at this point in the history
…eep` (#3529)

### What changes were proposed in this pull request?

use `Awaitility#await` instead of `Thread#sleep`

### Why are the changes needed?

Fix: #3214 

### Does this PR introduce _any_ user-facing change?

N/A 

### How was this patch tested?

ITs

Co-authored-by: XiaoZ <57973980+xiaozcy@users.noreply.github.com>
Co-authored-by: zhanghan18 <zhanghan18@xiaomi.com>
  • Loading branch information
3 people authored May 23, 2024
1 parent dafca6f commit 1d1ff8e
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 279 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@

package com.datastrato.gravitino.catalog.lakehouse.iceberg.web.metrics;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;

import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergConfig;
import com.google.common.collect.ImmutableMap;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.metrics.ImmutableCommitMetricsResult;
import org.apache.iceberg.metrics.ImmutableCommitReport;
import org.apache.iceberg.metrics.MetricsReport;
Expand All @@ -31,12 +34,11 @@ private MetricsReport createMetricsReport() {
return metricsReport;
}

private MetricsReport tryGetIcebergMetrics(MemoryMetricsStore memoryMetricsStore)
throws InterruptedException {
Instant waitTime = Instant.now().plusSeconds(20);
while (memoryMetricsStore.getMetricsReport() == null && Instant.now().isBefore(waitTime)) {
Thread.sleep(100);
}
private MetricsReport tryGetIcebergMetrics(MemoryMetricsStore memoryMetricsStore) {
await()
.atMost(20, TimeUnit.SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.untilAsserted(() -> assertTrue(memoryMetricsStore.getMetricsReport() != null));
return memoryMetricsStore.getMetricsReport();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.datastrato.gravitino.integration.test.container;

import static java.lang.String.format;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;

import com.google.common.collect.ImmutableSet;
import java.io.File;
Expand All @@ -16,6 +17,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.rnorth.ducttape.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -91,39 +93,41 @@ private void copyDorisLog() {

@Override
protected boolean checkContainerStatus(int retryLimit) {
int nRetry = 0;

String dorisJdbcUrl = format("jdbc:mysql://%s:%d/", getContainerIpAddress(), FE_MYSQL_PORT);
LOG.info("Doris url is " + dorisJdbcUrl);

while (nRetry++ < retryLimit) {
try (Connection connection = DriverManager.getConnection(dorisJdbcUrl, USER_NAME, "");
Statement statement = connection.createStatement()) {

// execute `SHOW PROC '/backends';` to check if backends is ready
String query = "SHOW PROC '/backends';";
try (ResultSet resultSet = statement.executeQuery(query)) {
while (resultSet.next()) {
String alive = resultSet.getString("Alive");
String totalCapacity = resultSet.getString("TotalCapacity");
float totalCapacityFloat = Float.parseFloat(totalCapacity.split(" ")[0]);

// alive should be true and totalCapacity should not be 0.000
if (alive.equalsIgnoreCase("true") && totalCapacityFloat > 0.0f) {
LOG.info("Doris container startup success!");
return true;
}
}
}

LOG.info("Doris container is not ready yet!");
Thread.sleep(5000);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}

return false;
await()
.atMost(30, TimeUnit.SECONDS)
.pollInterval(30 / retryLimit, TimeUnit.SECONDS)
.until(
() -> {
try (Connection connection =
DriverManager.getConnection(dorisJdbcUrl, USER_NAME, "");
Statement statement = connection.createStatement()) {

// execute `SHOW PROC '/backends';` to check if backends is ready
String query = "SHOW PROC '/backends';";
try (ResultSet resultSet = statement.executeQuery(query)) {
while (resultSet.next()) {
String alive = resultSet.getString("Alive");
String totalCapacity = resultSet.getString("TotalCapacity");
float totalCapacityFloat = Float.parseFloat(totalCapacity.split(" ")[0]);

// alive should be true and totalCapacity should not be 0.000
if (alive.equalsIgnoreCase("true") && totalCapacityFloat > 0.0f) {
LOG.info("Doris container startup success!");
return true;
}
}
}
LOG.info("Doris container is not ready yet!");
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
return false;
});

return true;
}

private boolean changePassword() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.datastrato.gravitino.integration.test.container;

import static java.lang.String.format;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;

import com.google.common.collect.ImmutableSet;
import java.io.File;
Expand All @@ -13,6 +14,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.rnorth.ducttape.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -85,111 +87,90 @@ private void copyHiveLog() {

@Override
protected boolean checkContainerStatus(int retryLimit) {
int nRetry = 0;
boolean isHiveContainerReady = false;
int sleepTimeMillis = 10_000;
while (nRetry++ < retryLimit) {
try {
String[] commandAndArgs = new String[] {"bash", "/tmp/check-status.sh"};
Container.ExecResult execResult = executeInContainer(commandAndArgs);
if (execResult.getExitCode() != 0) {
String message =
format(
"Command [%s] exited with %s",
String.join(" ", commandAndArgs), execResult.getExitCode());
LOG.error("{}", message);
LOG.error("stderr: {}", execResult.getStderr());
LOG.error("stdout: {}", execResult.getStdout());
} else {
LOG.info("Hive container startup success!");
isHiveContainerReady = true;
break;
}
LOG.info(
"Hive container is not ready, recheck({}/{}) after {}ms",
nRetry,
retryLimit,
sleepTimeMillis);
Thread.sleep(sleepTimeMillis);
} catch (RuntimeException e) {
LOG.error(e.getMessage(), e);
} catch (InterruptedException e) {
// ignore
}
}

// Use JDBC driver to test if hive server is ready
boolean isHiveConnectSuccess = false;
boolean isHdfsConnectSuccess = false;
await()
.atMost(100, TimeUnit.SECONDS)
.pollInterval(100 / retryLimit, TimeUnit.SECONDS)
.until(
() -> {
try {
String[] commandAndArgs = new String[] {"bash", "/tmp/check-status.sh"};
Container.ExecResult execResult = executeInContainer(commandAndArgs);
if (execResult.getExitCode() != 0) {
String message =
format(
"Command [%s] exited with %s",
String.join(" ", commandAndArgs), execResult.getExitCode());
LOG.error("{}", message);
LOG.error("stderr: {}", execResult.getStderr());
LOG.error("stdout: {}", execResult.getStdout());
} else {
LOG.info("Hive container startup success!");
return true;
}
} catch (RuntimeException e) {
LOG.error(e.getMessage(), e);
}
return false;
});

// list all databases
int i = 0;
Container.ExecResult result;
String sql = "show databases";
while (i++ < retryLimit) {
try {
result = executeInContainer("hive", "-e", sql);
if (result.getStdout().contains("default")) {
isHiveConnectSuccess = true;
break;
}
Thread.sleep(3000);
} catch (Exception e) {
LOG.error("Failed to execute sql: {}", sql, e);
}
}

if (!isHiveConnectSuccess) {
return false;
}

i = 0;
// Create a simple table and insert a record
while (i++ < retryLimit) {
try {
result =
executeInContainer(
"hive",
"-e",
"CREATE TABLE IF NOT EXISTS default.employee ( eid int, name String, "
+ "salary String, destination String) ");
if (result.getExitCode() == 0) {
isHdfsConnectSuccess = true;
break;
}
Thread.sleep(3000);
} catch (Exception e) {
LOG.error("Failed to execute sql: {}", sql, e);
}
}
await()
.atMost(30, TimeUnit.SECONDS)
.pollInterval(30 / retryLimit, TimeUnit.SECONDS)
.until(
() -> {
try {
Container.ExecResult result = executeInContainer("hive", "-e", sql);
if (result.getStdout().contains("default")) {
return true;
}
} catch (Exception e) {
LOG.error("Failed to execute sql: {}", sql, e);
}
return false;
});

await()
.atMost(30, TimeUnit.SECONDS)
.pollInterval(30 / retryLimit, TimeUnit.SECONDS)
.until(
() -> {
try {
Container.ExecResult result =
executeInContainer(
"hive",
"-e",
"CREATE TABLE IF NOT EXISTS default.employee ( eid int, name String, "
+ "salary String, destination String) ");
if (result.getExitCode() == 0) {
return true;
}
} catch (Exception e) {
LOG.error("Failed to execute sql: {}", sql, e);
}
return false;
});

i = 0;
String containerIp = getContainerIpAddress();
while (i++ < retryLimit) {
try (Socket socket = new Socket()) {
socket.connect(new InetSocketAddress(containerIp, HiveContainer.HIVE_METASTORE_PORT), 3000);
break;
} catch (Exception e) {
LOG.warn(
"Can't connect to Hive Metastore:[{}:{}]",
containerIp,
HiveContainer.HIVE_METASTORE_PORT,
e);
}
}

if (i == retryLimit) {
LOG.error("Can't connect to Hive Metastore");
return false;
}

LOG.info(
"Hive container status: isHiveContainerReady={}, isHiveConnectSuccess={}, isHdfsConnectSuccess={}",
isHiveContainerReady,
isHiveConnectSuccess,
isHdfsConnectSuccess);

return isHiveContainerReady && isHiveConnectSuccess && isHdfsConnectSuccess;
await()
.atMost(10, TimeUnit.SECONDS)
.until(
() -> {
try (Socket socket = new Socket()) {
socket.connect(
new InetSocketAddress(containerIp, HiveContainer.HIVE_METASTORE_PORT), 3000);
return true;
} catch (Exception e) {
LOG.warn(
"Can't connect to Hive Metastore:[{}:{}]",
containerIp,
HiveContainer.HIVE_METASTORE_PORT,
e);
}
return false;
});

return true;
}

public static class Builder extends BaseContainer.Builder<Builder, HiveContainer> {
Expand Down
Loading

0 comments on commit 1d1ff8e

Please sign in to comment.