diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/web/metrics/TestIcebergMetricsManager.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/web/metrics/TestIcebergMetricsManager.java index 5a295488091..1c978e37894 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/web/metrics/TestIcebergMetricsManager.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/web/metrics/TestIcebergMetricsManager.java @@ -5,10 +5,13 @@ package com.datastrato.gravitino.catalog.lakehouse.iceberg.web.metrics; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; + import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergConfig; import com.google.common.collect.ImmutableMap; -import java.time.Instant; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.iceberg.metrics.ImmutableCommitMetricsResult; import org.apache.iceberg.metrics.ImmutableCommitReport; import org.apache.iceberg.metrics.MetricsReport; @@ -31,12 +34,11 @@ private MetricsReport createMetricsReport() { return metricsReport; } - private MetricsReport tryGetIcebergMetrics(MemoryMetricsStore memoryMetricsStore) - throws InterruptedException { - Instant waitTime = Instant.now().plusSeconds(20); - while (memoryMetricsStore.getMetricsReport() == null && Instant.now().isBefore(waitTime)) { - Thread.sleep(100); - } + private MetricsReport tryGetIcebergMetrics(MemoryMetricsStore memoryMetricsStore) { + await() + .atMost(20, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .untilAsserted(() -> assertTrue(memoryMetricsStore.getMetricsReport() != null)); return memoryMetricsStore.getMetricsReport(); } diff --git a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/DorisContainer.java b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/DorisContainer.java index 9f70db4ebb7..f173b060836 100644 --- a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/DorisContainer.java +++ b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/DorisContainer.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.integration.test.container; import static java.lang.String.format; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; import com.google.common.collect.ImmutableSet; import java.io.File; @@ -16,6 +17,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.rnorth.ducttape.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,39 +93,41 @@ private void copyDorisLog() { @Override protected boolean checkContainerStatus(int retryLimit) { - int nRetry = 0; - String dorisJdbcUrl = format("jdbc:mysql://%s:%d/", getContainerIpAddress(), FE_MYSQL_PORT); LOG.info("Doris url is " + dorisJdbcUrl); - while (nRetry++ < retryLimit) { - try (Connection connection = DriverManager.getConnection(dorisJdbcUrl, USER_NAME, ""); - Statement statement = connection.createStatement()) { - - // execute `SHOW PROC '/backends';` to check if backends is ready - String query = "SHOW PROC '/backends';"; - try (ResultSet resultSet = statement.executeQuery(query)) { - while (resultSet.next()) { - String alive = resultSet.getString("Alive"); - String totalCapacity = resultSet.getString("TotalCapacity"); - float totalCapacityFloat = Float.parseFloat(totalCapacity.split(" ")[0]); - - // alive should be true and totalCapacity should not be 0.000 - if (alive.equalsIgnoreCase("true") && totalCapacityFloat > 0.0f) { - LOG.info("Doris container startup success!"); - return true; - } - } - } - - LOG.info("Doris container is not ready yet!"); - Thread.sleep(5000); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - } - - return false; + await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(30 / retryLimit, TimeUnit.SECONDS) + .until( + () -> { + try (Connection connection = + DriverManager.getConnection(dorisJdbcUrl, USER_NAME, ""); + Statement statement = connection.createStatement()) { + + // execute `SHOW PROC '/backends';` to check if backends is ready + String query = "SHOW PROC '/backends';"; + try (ResultSet resultSet = statement.executeQuery(query)) { + while (resultSet.next()) { + String alive = resultSet.getString("Alive"); + String totalCapacity = resultSet.getString("TotalCapacity"); + float totalCapacityFloat = Float.parseFloat(totalCapacity.split(" ")[0]); + + // alive should be true and totalCapacity should not be 0.000 + if (alive.equalsIgnoreCase("true") && totalCapacityFloat > 0.0f) { + LOG.info("Doris container startup success!"); + return true; + } + } + } + LOG.info("Doris container is not ready yet!"); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + return false; + }); + + return true; } private boolean changePassword() { diff --git a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/HiveContainer.java b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/HiveContainer.java index edb63b21e33..1006b957dbd 100644 --- a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/HiveContainer.java +++ b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/HiveContainer.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.integration.test.container; import static java.lang.String.format; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; import com.google.common.collect.ImmutableSet; import java.io.File; @@ -13,6 +14,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.rnorth.ducttape.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,111 +87,90 @@ private void copyHiveLog() { @Override protected boolean checkContainerStatus(int retryLimit) { - int nRetry = 0; - boolean isHiveContainerReady = false; - int sleepTimeMillis = 10_000; - while (nRetry++ < retryLimit) { - try { - String[] commandAndArgs = new String[] {"bash", "/tmp/check-status.sh"}; - Container.ExecResult execResult = executeInContainer(commandAndArgs); - if (execResult.getExitCode() != 0) { - String message = - format( - "Command [%s] exited with %s", - String.join(" ", commandAndArgs), execResult.getExitCode()); - LOG.error("{}", message); - LOG.error("stderr: {}", execResult.getStderr()); - LOG.error("stdout: {}", execResult.getStdout()); - } else { - LOG.info("Hive container startup success!"); - isHiveContainerReady = true; - break; - } - LOG.info( - "Hive container is not ready, recheck({}/{}) after {}ms", - nRetry, - retryLimit, - sleepTimeMillis); - Thread.sleep(sleepTimeMillis); - } catch (RuntimeException e) { - LOG.error(e.getMessage(), e); - } catch (InterruptedException e) { - // ignore - } - } - - // Use JDBC driver to test if hive server is ready - boolean isHiveConnectSuccess = false; - boolean isHdfsConnectSuccess = false; + await() + .atMost(100, TimeUnit.SECONDS) + .pollInterval(100 / retryLimit, TimeUnit.SECONDS) + .until( + () -> { + try { + String[] commandAndArgs = new String[] {"bash", "/tmp/check-status.sh"}; + Container.ExecResult execResult = executeInContainer(commandAndArgs); + if (execResult.getExitCode() != 0) { + String message = + format( + "Command [%s] exited with %s", + String.join(" ", commandAndArgs), execResult.getExitCode()); + LOG.error("{}", message); + LOG.error("stderr: {}", execResult.getStderr()); + LOG.error("stdout: {}", execResult.getStdout()); + } else { + LOG.info("Hive container startup success!"); + return true; + } + } catch (RuntimeException e) { + LOG.error(e.getMessage(), e); + } + return false; + }); - // list all databases - int i = 0; - Container.ExecResult result; String sql = "show databases"; - while (i++ < retryLimit) { - try { - result = executeInContainer("hive", "-e", sql); - if (result.getStdout().contains("default")) { - isHiveConnectSuccess = true; - break; - } - Thread.sleep(3000); - } catch (Exception e) { - LOG.error("Failed to execute sql: {}", sql, e); - } - } - - if (!isHiveConnectSuccess) { - return false; - } - - i = 0; - // Create a simple table and insert a record - while (i++ < retryLimit) { - try { - result = - executeInContainer( - "hive", - "-e", - "CREATE TABLE IF NOT EXISTS default.employee ( eid int, name String, " - + "salary String, destination String) "); - if (result.getExitCode() == 0) { - isHdfsConnectSuccess = true; - break; - } - Thread.sleep(3000); - } catch (Exception e) { - LOG.error("Failed to execute sql: {}", sql, e); - } - } + await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(30 / retryLimit, TimeUnit.SECONDS) + .until( + () -> { + try { + Container.ExecResult result = executeInContainer("hive", "-e", sql); + if (result.getStdout().contains("default")) { + return true; + } + } catch (Exception e) { + LOG.error("Failed to execute sql: {}", sql, e); + } + return false; + }); + + await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(30 / retryLimit, TimeUnit.SECONDS) + .until( + () -> { + try { + Container.ExecResult result = + executeInContainer( + "hive", + "-e", + "CREATE TABLE IF NOT EXISTS default.employee ( eid int, name String, " + + "salary String, destination String) "); + if (result.getExitCode() == 0) { + return true; + } + } catch (Exception e) { + LOG.error("Failed to execute sql: {}", sql, e); + } + return false; + }); - i = 0; String containerIp = getContainerIpAddress(); - while (i++ < retryLimit) { - try (Socket socket = new Socket()) { - socket.connect(new InetSocketAddress(containerIp, HiveContainer.HIVE_METASTORE_PORT), 3000); - break; - } catch (Exception e) { - LOG.warn( - "Can't connect to Hive Metastore:[{}:{}]", - containerIp, - HiveContainer.HIVE_METASTORE_PORT, - e); - } - } - - if (i == retryLimit) { - LOG.error("Can't connect to Hive Metastore"); - return false; - } - - LOG.info( - "Hive container status: isHiveContainerReady={}, isHiveConnectSuccess={}, isHdfsConnectSuccess={}", - isHiveContainerReady, - isHiveConnectSuccess, - isHdfsConnectSuccess); - - return isHiveContainerReady && isHiveConnectSuccess && isHdfsConnectSuccess; + await() + .atMost(10, TimeUnit.SECONDS) + .until( + () -> { + try (Socket socket = new Socket()) { + socket.connect( + new InetSocketAddress(containerIp, HiveContainer.HIVE_METASTORE_PORT), 3000); + return true; + } catch (Exception e) { + LOG.warn( + "Can't connect to Hive Metastore:[{}:{}]", + containerIp, + HiveContainer.HIVE_METASTORE_PORT, + e); + } + return false; + }); + + return true; } public static class Builder extends BaseContainer.Builder { diff --git a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/MySQLContainer.java b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/MySQLContainer.java index 6547c9497ae..d634385f427 100644 --- a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/MySQLContainer.java +++ b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/MySQLContainer.java @@ -62,47 +62,42 @@ public void start() { @Override protected boolean checkContainerStatus(int retryLimit) { - int nRetry = 0; - boolean isMySQLContainerReady = false; - int sleepTimeMillis = 20_00; - while (nRetry++ < retryLimit) { - try { - String[] commandAndArgs = - new String[] { - "mysqladmin", - "ping", - "-h", - "localhost", - "-u", - getUsername(), - String.format("-p%s", getPassword()) - }; - Container.ExecResult execResult = executeInContainer(commandAndArgs); - if (execResult.getExitCode() != 0) { - String message = - format( - "Command [%s] exited with %s", - String.join(" ", commandAndArgs), execResult.getExitCode()); - LOG.error("{}", message); - LOG.error("stderr: {}", execResult.getStderr()); - LOG.error("stdout: {}", execResult.getStdout()); - } else { - LOG.info("MySQL container startup success!"); - isMySQLContainerReady = true; - break; - } - LOG.info( - "MySQL container is not ready, recheck({}/{}) after {}ms", - nRetry, - retryLimit, - sleepTimeMillis); - await().atLeast(sleepTimeMillis, TimeUnit.MILLISECONDS); - } catch (RuntimeException e) { - LOG.error(e.getMessage(), e); - } - } - - return isMySQLContainerReady; + await() + .atMost(10, TimeUnit.SECONDS) + .pollInterval(10 / retryLimit, TimeUnit.SECONDS) + .until( + () -> { + try { + String[] commandAndArgs = + new String[] { + "mysqladmin", + "ping", + "-h", + "localhost", + "-u", + getUsername(), + String.format("-p%s", getPassword()) + }; + Container.ExecResult execResult = executeInContainer(commandAndArgs); + if (execResult.getExitCode() != 0) { + String message = + format( + "Command [%s] exited with %s", + String.join(" ", commandAndArgs), execResult.getExitCode()); + LOG.error("{}", message); + LOG.error("stderr: {}", execResult.getStderr()); + LOG.error("stdout: {}", execResult.getStdout()); + } else { + LOG.info("MySQL container startup success!"); + return true; + } + } catch (RuntimeException e) { + LOG.error(e.getMessage(), e); + } + return false; + }); + + return true; } public void createDatabase(TestDatabaseName testDatabaseName) { diff --git a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/TrinoContainer.java b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/TrinoContainer.java index 3710d0a31af..354dfd3eb67 100644 --- a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/TrinoContainer.java +++ b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/TrinoContainer.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.integration.test.container; import static java.lang.String.format; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; import com.google.common.collect.ImmutableSet; import java.sql.Connection; @@ -17,6 +18,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.rnorth.ducttape.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,23 +67,20 @@ public void start() { @Override protected boolean checkContainerStatus(int retryLimit) { - int nRetry = 0; - boolean isTrinoJdbcConnectionReady = false; - int sleepTime = 5000; - while (nRetry++ < retryLimit && !isTrinoJdbcConnectionReady) { - isTrinoJdbcConnectionReady = testTrinoJdbcConnection(); - if (isTrinoJdbcConnectionReady) { - break; - } else { - try { - Thread.sleep(sleepTime); - LOG.warn("Waiting for trino server to be ready... ({}ms)", nRetry * sleepTime); - } catch (InterruptedException e) { - // ignore - } - } - } - return isTrinoJdbcConnectionReady; + await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(30 / retryLimit, TimeUnit.SECONDS) + .until( + () -> { + try { + return testTrinoJdbcConnection(); + } catch (Exception e) { + LOG.error("Trino container startup failed!", e); + } + return false; + }); + + return true; } @Override @@ -118,27 +117,23 @@ public boolean initTrinoJdbcConnection() { // Check tha Trino has synchronized the catalog from Gravitino public boolean checkSyncCatalogFromGravitino(int retryLimit, String catalogName) { - int nRetry = 0; - int sleepTime = 5000; - while (nRetry++ < retryLimit) { - ArrayList> queryData = - executeQuerySQL(format("SHOW CATALOGS LIKE '%s'", catalogName)); - for (ArrayList record : queryData) { - String columnValue = record.get(0); - if (columnValue.equals(catalogName)) { - return true; - } - } - try { - Thread.sleep(sleepTime); - LOG.warn( - "Waiting for Trino synchronized the catalog from Gravitino... ({}ms)", - nRetry * sleepTime); - } catch (InterruptedException e) { - // ignore - } - } - return false; + await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(60 / retryLimit, TimeUnit.SECONDS) + .until( + () -> { + ArrayList> queryData = + executeQuerySQL(format("SHOW CATALOGS LIKE '%s'", catalogName)); + for (ArrayList record : queryData) { + String columnValue = record.get(0); + if (columnValue.equals(catalogName)) { + return true; + } + } + return false; + }); + + return true; } private boolean testTrinoJdbcConnection() { diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoConnectorIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoConnectorIT.java index 9a23f9de9ef..52d3075e40b 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoConnectorIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoConnectorIT.java @@ -4,6 +4,8 @@ */ package com.datastrato.gravitino.integration.test.trino; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; + import com.datastrato.gravitino.Catalog; import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.client.GravitinoMetalake; @@ -40,6 +42,7 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.thrift.TException; import org.junit.jupiter.api.AfterAll; @@ -434,7 +437,7 @@ void testHiveTableCreatedByTrino() { } @Test - void testHiveSchemaCreatedByGravitino() throws InterruptedException { + void testHiveSchemaCreatedByGravitino() { String catalogName = GravitinoITUtils.genRandomName("catalog").toLowerCase(); String schemaName = GravitinoITUtils.genRandomName("schema").toLowerCase(); @@ -477,50 +480,51 @@ void testHiveSchemaCreatedByGravitino() throws InterruptedException { } private static boolean checkTrinoHasRemoved(String sql, long maxWaitTimeSec) { - long current = System.currentTimeMillis(); - while (System.currentTimeMillis() - current <= maxWaitTimeSec * 1000) { - try { - ArrayList> lists = - containerSuite.getTrinoContainer().executeQuerySQL(sql); - if (lists.isEmpty()) { - return true; - } - - LOG.info("Catalog has not synchronized yet, wait 200ms and retry. The SQL is '{}'", sql); - } catch (Exception e) { - LOG.warn("Failed to execute sql: {}", sql, e); - } - - try { - Thread.sleep(200); - } catch (InterruptedException e) { - LOG.warn("Failed to sleep 200ms", e); - } - } + await() + .atMost(maxWaitTimeSec, TimeUnit.SECONDS) + .pollInterval(200, TimeUnit.MILLISECONDS) + .until( + () -> { + try { + ArrayList> lists = + containerSuite.getTrinoContainer().executeQuerySQL(sql); + if (lists.isEmpty()) { + return true; + } + + LOG.info( + "Catalog has not synchronized yet, wait 200ms and retry. The SQL is '{}'", sql); + } catch (Exception e) { + LOG.warn("Failed to execute sql: {}", sql, e); + } + return false; + }); - return false; + return true; } - private static boolean checkTrinoHasLoaded(String sql, long maxWaitTimeSec) - throws InterruptedException { - long current = System.currentTimeMillis(); - while (System.currentTimeMillis() - current <= maxWaitTimeSec * 1000) { - try { - ArrayList> lists = - containerSuite.getTrinoContainer().executeQuerySQL(sql); - if (!lists.isEmpty()) { - return true; - } - - LOG.info("Trino has not load the data yet, wait 200ms and retry. The SQL is '{}'", sql); - } catch (Exception e) { - LOG.warn("Failed to execute sql: {}", sql, e); - } - - Thread.sleep(200); - } + private static boolean checkTrinoHasLoaded(String sql, long maxWaitTimeSec) { + await() + .atMost(maxWaitTimeSec, TimeUnit.SECONDS) + .pollInterval(200, TimeUnit.MILLISECONDS) + .until( + () -> { + try { + ArrayList> lists = + containerSuite.getTrinoContainer().executeQuerySQL(sql); + if (!lists.isEmpty()) { + return true; + } + + LOG.info( + "Trino has not load the data yet, wait 200ms and retry. The SQL is '{}'", sql); + } catch (Exception e) { + LOG.warn("Failed to execute sql: {}", sql, e); + } + return false; + }); - return false; + return true; } private Column[] createHiveFullTypeColumns() { @@ -584,7 +588,7 @@ private Column[] createFullTypeColumns() { } @Test - void testColumnTypeNotNullByTrino() throws InterruptedException { + void testColumnTypeNotNullByTrino() { String catalogName = GravitinoITUtils.genRandomName("mysql_catalog").toLowerCase(); GravitinoMetalake createdMetalake = client.loadMetalake(NameIdentifier.of(metalakeName)); String[] command = { @@ -674,7 +678,7 @@ void testColumnTypeNotNullByTrino() throws InterruptedException { } @Test - void testHiveTableCreatedByGravitino() throws InterruptedException { + void testHiveTableCreatedByGravitino() { String catalogName = GravitinoITUtils.genRandomName("catalog").toLowerCase(); String schemaName = GravitinoITUtils.genRandomName("schema").toLowerCase(); String tableName = GravitinoITUtils.genRandomName("table").toLowerCase(); @@ -812,7 +816,7 @@ void testHiveTableCreatedByGravitino() throws InterruptedException { } @Test - void testHiveCatalogCreatedByGravitino() throws InterruptedException { + void testHiveCatalogCreatedByGravitino() { String catalogName = GravitinoITUtils.genRandomName("catalog").toLowerCase(); GravitinoMetalake createdMetalake = client.loadMetalake(NameIdentifier.of(metalakeName)); createdMetalake.createCatalog( @@ -854,7 +858,7 @@ void testHiveCatalogCreatedByGravitino() throws InterruptedException { } @Test - void testWrongHiveCatalogProperty() throws InterruptedException { + void testWrongHiveCatalogProperty() { String catalogName = GravitinoITUtils.genRandomName("catalog").toLowerCase(); GravitinoMetalake createdMetalake = client.loadMetalake(NameIdentifier.of(metalakeName)); createdMetalake.createCatalog( @@ -886,13 +890,14 @@ void testWrongHiveCatalogProperty() throws InterruptedException { "true", catalog.properties().get("trino.bypass.hive.validate-bucketing")); String sql = String.format("show catalogs like '%s'", catalogName); - checkTrinoHasLoaded(sql, 6); + await().atLeast(6, TimeUnit.SECONDS); + // Because we assign 'hive.target-max-file-size' a wrong value, trino can't load the catalog Assertions.assertTrue(containerSuite.getTrinoContainer().executeQuerySQL(sql).isEmpty()); } @Test - void testIcebergTableAndSchemaCreatedByGravitino() throws InterruptedException { + void testIcebergTableAndSchemaCreatedByGravitino() { String catalogName = GravitinoITUtils.genRandomName("catalog").toLowerCase(); String schemaName = GravitinoITUtils.genRandomName("schema").toLowerCase(); String tableName = GravitinoITUtils.genRandomName("table").toLowerCase(); @@ -1007,7 +1012,7 @@ void testIcebergTableAndSchemaCreatedByTrino() { } @Test - void testIcebergCatalogCreatedByGravitino() throws InterruptedException { + void testIcebergCatalogCreatedByGravitino() { String catalogName = GravitinoITUtils.genRandomName("iceberg_catalog").toLowerCase(); String schemaName = GravitinoITUtils.genRandomName("iceberg_catalog").toLowerCase(); GravitinoMetalake createdMetalake = client.loadMetalake(NameIdentifier.of(metalakeName)); @@ -1091,7 +1096,7 @@ void testIcebergCatalogCreatedByGravitino() throws InterruptedException { } @Test - void testMySQLCatalogCreatedByGravitino() throws InterruptedException { + void testMySQLCatalogCreatedByGravitino() { String catalogName = GravitinoITUtils.genRandomName("mysql_catalog").toLowerCase(); GravitinoMetalake createdMetalake = client.loadMetalake(NameIdentifier.of(metalakeName)); String[] command = { @@ -1132,7 +1137,7 @@ void testMySQLCatalogCreatedByGravitino() throws InterruptedException { } @Test - void testMySQLTableCreatedByGravitino() throws InterruptedException { + void testMySQLTableCreatedByGravitino() { String catalogName = GravitinoITUtils.genRandomName("mysql_catalog").toLowerCase(); String schemaName = GravitinoITUtils.genRandomName("mysql_schema").toLowerCase(); String tableName = GravitinoITUtils.genRandomName("mysql_table").toLowerCase(); @@ -1246,7 +1251,7 @@ void testMySQLTableCreatedByGravitino() throws InterruptedException { } @Test - void testMySQLTableCreatedByTrino() throws InterruptedException { + void testMySQLTableCreatedByTrino() { String catalogName = GravitinoITUtils.genRandomName("mysql_catalog").toLowerCase(); String schemaName = GravitinoITUtils.genRandomName("mysql_schema").toLowerCase(); String tableName = GravitinoITUtils.genRandomName("mysql_table").toLowerCase(); @@ -1368,7 +1373,7 @@ void testMySQLTableCreatedByTrino() throws InterruptedException { } @Test - void testDropCatalogAndCreateAgain() throws InterruptedException { + void testDropCatalogAndCreateAgain() { String catalogName = GravitinoITUtils.genRandomName("mysql_catalog").toLowerCase(); GravitinoMetalake createdMetalake = client.loadMetalake(NameIdentifier.of(metalakeName)); String[] command = { diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoQueryIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoQueryIT.java index 95d1ea01c29..d7700e68559 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoQueryIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoQueryIT.java @@ -4,6 +4,8 @@ */ package com.datastrato.gravitino.integration.test.trino; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; + import com.datastrato.gravitino.Namespace; import com.datastrato.gravitino.integration.test.util.ITUtils; import java.io.FileOutputStream; @@ -22,6 +24,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -80,16 +83,15 @@ private static void cleanupTestEnv() throws Exception { .filter(catalog -> catalog.name().startsWith("gt_")) .forEach(catalog -> TrinoQueryITBase.dropCatalog(catalog.name())); - int tries = 30; - while (tries-- >= 0) { - String[] catalogs = trinoQueryRunner.runQuery("show catalogs").split("\n"); - LOG.info("Catalogs: {}", Arrays.toString(catalogs)); - if (Arrays.stream(catalogs).filter(s -> s.startsWith("\"test.gt_")).count() == 0) { - break; - } - Thread.sleep(1000); - LOG.info("Waiting for test catalogs to be dropped"); - } + await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .until( + () -> { + String[] catalogs = trinoQueryRunner.runQuery("show catalogs").split("\n"); + LOG.info("Catalogs: {}", Arrays.toString(catalogs)); + return Arrays.stream(catalogs).filter(s -> s.startsWith("\"test.gt_")).count() == 0; + }); } catch (Exception e) { throw new Exception("Failed to clean up test env: " + e.getMessage(), e); }