From ab431393250630ad9fde83809ba065056f40ef2e Mon Sep 17 00:00:00 2001 From: lyf <1548150065@qq.com> Date: Sun, 4 Feb 2024 10:22:25 +0800 Subject: [PATCH 01/10] fix recover it --- .../env/cluster/config/MppCommonConfig.java | 5 + .../cluster/config/MppSharedCommonConfig.java | 7 + .../iotdb/it/env/cluster/env/AbstractEnv.java | 52 ++-- .../env/remote/config/RemoteCommonConfig.java | 5 + .../it/env/remote/env/RemoteServerEnv.java | 18 +- .../org/apache/iotdb/itbase/env/BaseEnv.java | 21 +- .../apache/iotdb/itbase/env/CommonConfig.java | 2 + .../it/cluster/IoTDBClusterRestartIT.java | 30 +- .../apache/iotdb/db/it/IoTDBRecoverIT.java | 284 ++++++------------ .../iotdb/db/it/IoTDBRecoverUnclosedIT.java | 149 ++++----- .../iotdb/db/utils/EnvironmentUtils.java | 57 ++-- 11 files changed, 258 insertions(+), 372 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index e1b16ee6e7d5..3a98ae81d262 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -419,6 +419,11 @@ public CommonConfig setDriverTaskExecutionTimeSliceInMs(long driverTaskExecution "driver_task_execution_time_slice_in_ms", String.valueOf(driverTaskExecutionTimeSliceInMs)); return this; } + @Override + public CommonConfig setWalMode(String walMode) { + setProperty("wal_mode", walMode); + return this; + } // For part of the log directory public String getClusterConfigStr() { diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index 3b478765c2c8..cfbe1814afa3 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -424,4 +424,11 @@ public CommonConfig setDriverTaskExecutionTimeSliceInMs(long driverTaskExecution cnConfig.setDriverTaskExecutionTimeSliceInMs(driverTaskExecutionTimeSliceInMs); return this; } + + @Override + public CommonConfig setWalMode(String walMode) { + dnConfig.setWalMode(walMode); + cnConfig.setWalMode(walMode); + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index b96c8f4d4f96..729989467dea 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -610,24 +610,6 @@ public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() } } - @Override - public IConfigNodeRPCService.Iface getConfigNodeConnection(int index) throws Exception { - Exception lastException = null; - ConfigNodeWrapper configNodeWrapper = configNodeWrapperList.get(index); - for (int i = 0; i < 30; i++) { - try { - return clientManager.borrowClient( - new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getPort())); - } catch (Exception e) { - lastException = e; - } - // Sleep 1s before next retry - TimeUnit.SECONDS.sleep(1); - } - throw new IOException( - "Failed to get connection to this ConfigNode. Last error: " + lastException); - } - @Override public int getLeaderConfigNodeIndex() throws IOException, InterruptedException { Exception lastException = null; @@ -675,11 +657,25 @@ public void startConfigNode(int index) { configNodeWrapperList.get(index).start(); } + @Override + public void startAllConfigNodes(){ + for(ConfigNodeWrapper configNodeWrapper:configNodeWrapperList){ + configNodeWrapper.start(); + } + } + @Override public void shutdownConfigNode(int index) { configNodeWrapperList.get(index).stop(); } + @Override + public void shutdownAllConfigNodes() { + for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) { + configNodeWrapper.stop(); + } + } + @Override public ConfigNodeWrapper getConfigNodeWrapper(int index) { return configNodeWrapperList.get(index); @@ -799,11 +795,25 @@ public void startDataNode(int index) { dataNodeWrapperList.get(index).start(); } + @Override + public void startAllDataNodes() { + for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) { + dataNodeWrapper.start(); + } + } + @Override public void shutdownDataNode(int index) { dataNodeWrapperList.get(index).stop(); } + @Override + public void shutdownAllDataNodes() { + for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) { + dataNodeWrapper.stop(); + } + } + @Override public void ensureNodeStatus(List nodes, List targetStatus) throws IllegalStateException { @@ -858,12 +868,6 @@ public void ensureNodeStatus(List nodes, List targe throw new IllegalStateException(lastException); } - @Override - public int getMqttPort() { - int randomIndex = new Random(System.currentTimeMillis()).nextInt(dataNodeWrapperList.size()); - return dataNodeWrapperList.get(randomIndex).getMqttPort(); - } - @Override public String getIP() { return dataNodeWrapperList.get(0).getIp(); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index f7ee0789b773..0495edb933e8 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -298,4 +298,9 @@ public CommonConfig setPipeAirGapReceiverEnabled(boolean isPipeAirGapReceiverEna public CommonConfig setDriverTaskExecutionTimeSliceInMs(long driverTaskExecutionTimeSliceInMs) { return this; } + + @Override + public CommonConfig setWalMode(String walMode) { + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java index 15f07a143700..be4638f49e69 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java @@ -244,11 +244,22 @@ public void startConfigNode(int index) { throw new UnsupportedOperationException(); } + @Override + public void startAllConfigNodes(){ + throw new UnsupportedOperationException(); + } + @Override public void shutdownConfigNode(int index) { throw new UnsupportedOperationException(); } + @Override + public void shutdownAllConfigNodes(){ + throw new UnsupportedOperationException(); + } + + @Override public void ensureNodeStatus(List nodes, List targetStatus) { throw new UnsupportedOperationException(); @@ -299,13 +310,18 @@ public void startDataNode(int index) { throw new UnsupportedOperationException(); } + @Override + public void startAllDataNodes() { + throw new UnsupportedOperationException(); + } + @Override public void shutdownDataNode(int index) { throw new UnsupportedOperationException(); } @Override - public int getMqttPort() { + public void shutdownAllDataNodes(){ throw new UnsupportedOperationException(); } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java index ebdeb6a18753..971b9b07fb41 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java @@ -52,10 +52,6 @@ public interface BaseEnv { */ void initClusterEnvironment(int configNodesNum, int dataNodesNum); - default void addClusterDataNodes(int dataNodesNum) throws IOException, InterruptedException { - throw new UnsupportedOperationException(); - } - /** * Init a cluster with the specified number of ConfigNodes and DataNodes. * @@ -131,10 +127,6 @@ Connection getConnectionWithSpecifiedDataNode( IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() throws ClientManagerException, IOException, InterruptedException; - default IConfigNodeRPCService.Iface getConfigNodeConnection(int index) throws Exception { - throw new UnsupportedOperationException(); - } - ISessionPool getSessionPool(int maxSize); ISession getSessionConnection() throws IoTDBConnectionException; @@ -153,9 +145,15 @@ default IConfigNodeRPCService.Iface getConfigNodeConnection(int index) throws Ex /** Start an existed ConfigNode. */ void startConfigNode(int index); + /** Start all existed ConfigNodes. */ + void startAllConfigNodes(); + /** Shutdown an existed ConfigNode. */ void shutdownConfigNode(int index); + /** Shutdown all existed ConfigNodes. */ + void shutdownAllConfigNodes(); + /** * Ensure all the nodes being in the corresponding status. * @@ -211,10 +209,15 @@ void ensureNodeStatus(List nodes, List targetStatus /** Start an existed DataNode. */ void startDataNode(int index); + /** Start all existed DataNodes. */ + void startAllDataNodes(); + /** Shutdown an existed DataNode. */ void shutdownDataNode(int index); - int getMqttPort(); + /** Shutdown all existed DataNodes. */ + void shutdownAllDataNodes(); + String getIP(); diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index 3bc7407470e1..fa0e69927041 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -133,4 +133,6 @@ CommonConfig setEnableAutoLeaderBalanceForIoTConsensus( CommonConfig setPipeAirGapReceiverEnabled(boolean isPipeAirGapReceiverEnabled); CommonConfig setDriverTaskExecutionTimeSliceInMs(long driverTaskExecutionTimeSliceInMs); + + CommonConfig setWalMode(String walMode); } diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java index 8e5e17b17bf2..c6c0e2c6cc5b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java @@ -93,35 +93,17 @@ public void tearDown() { } @Test - public void clusterRestartTest() throws InterruptedException { + public void clusterRestartTest(){ // Shutdown all cluster nodes - for (int i = 0; i < testConfigNodeNum; i++) { - EnvFactory.getEnv().shutdownConfigNode(i); - } - for (int i = 0; i < testDataNodeNum; i++) { - EnvFactory.getEnv().shutdownDataNode(i); - } - - // Sleep 1s before restart - TimeUnit.SECONDS.sleep(1); + logger.info("Shutting down all ConfigNodes and DataNodes..."); + EnvFactory.getEnv().shutdownAllConfigNodes(); + EnvFactory.getEnv().shutdownAllDataNodes(); // Restart all cluster nodes logger.info("Restarting all ConfigNodes..."); - for (int i = 0; i < testConfigNodeNum; i++) { - EnvFactory.getEnv().startConfigNode(i); - } - try (SyncConfigNodeIServiceClient client = - (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { - // Do noting, just try to connect to the ConfigNode-leader - // in order to ensure the ConfigNode-leader is ready - } catch (Exception e) { - logger.error("Failed to get ConfigNode-leader connection", e); - } + EnvFactory.getEnv().startAllConfigNodes(); logger.info("Restarting all DataNodes..."); - for (int i = 0; i < testDataNodeNum; i++) { - EnvFactory.getEnv().startDataNode(i); - } - + EnvFactory.getEnv().startAllDataNodes(); ((AbstractEnv) EnvFactory.getEnv()).testWorkingNoUnknown(); } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverIT.java index c93ebde8f59c..c4df4ccef4b6 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverIT.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.it; import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.env.AbstractEnv; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.itbase.category.LocalStandaloneIT; @@ -27,14 +28,14 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.ResultSet; -import java.sql.SQLException; import java.sql.Statement; import java.util.Locale; @@ -45,14 +46,15 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; -@Ignore @RunWith(IoTDBTestRunner.class) @Category({LocalStandaloneIT.class, ClusterIT.class}) public class IoTDBRecoverIT { + private static final Logger logger = LoggerFactory.getLogger(IoTDBRecoverIT.class); + private static final String TIMESTAMP_STR = "Time"; private static final String TEMPERATURE_STR = "root.ln.wf01.wt01.temperature"; - private static String[] creationSqls = + private static final String[] creationSqls = new String[] { "CREATE DATABASE root.vehicle.d0", "CREATE DATABASE root.vehicle.d1", @@ -62,7 +64,7 @@ public class IoTDBRecoverIT { "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT, ENCODING=PLAIN", "CREATE TIMESERIES root.vehicle.d0.s4 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN" }; - private static String[] dataSet2 = + private static final String[] dataSet2 = new String[] { "CREATE DATABASE root.ln.wf01.wt01", "CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN", @@ -83,10 +85,8 @@ public class IoTDBRecoverIT { private final String d0s1 = "root.vehicle.d0.s1"; private final String d0s2 = "root.vehicle.d0.s2"; private final String d0s3 = "root.vehicle.d0.s3"; - private String insertTemplate = - "INSERT INTO root.vehicle.d0(timestamp,s0,s1,s2,s3,s4)" + " VALUES(%d,%d,%d,%f,%s,%s)"; - @Before + @Before public void setUp() throws Exception { EnvFactory.getEnv().initClusterEnvironment(); prepareData(); @@ -98,53 +98,53 @@ public void tearDown() throws Exception { } @Test - public void mergeTest() { + public void RecoverTest1() { + // stop cluster + EnvFactory.getEnv().shutdownAllDataNodes(); + logger.info("All DataNodes are shut down"); + EnvFactory.getEnv().shutdownAllConfigNodes(); + logger.info("All ConfigNodes are shut down"); + EnvFactory.getEnv().startAllConfigNodes(); + logger.info("All ConfigNodes are started"); + EnvFactory.getEnv().startAllDataNodes(); + logger.info("All DataNodes are started"); + // check cluster whether restart + ((AbstractEnv) EnvFactory.getEnv()).testWorkingNoUnknown(); String[] retArray = new String[] {"0,2", "0,4", "0,3"}; try (Connection connection = EnvFactory.getEnv().getConnection(); Statement statement = connection.createStatement()) { String selectSql = "select count(temperature) from root.ln.wf01.wt01 where time > 3"; - int cnt; try (ResultSet resultSet = statement.executeQuery(selectSql)) { assertNotNull(resultSet); - cnt = 0; - while (resultSet.next()) { + resultSet.next(); String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(count(TEMPERATURE_STR)); - Assert.assertEquals(retArray[cnt], ans); - cnt++; - } - Assert.assertEquals(1, cnt); + Assert.assertEquals(retArray[0], ans); } selectSql = "select min_time(temperature) from root.ln.wf01.wt01 where time > 3"; try (ResultSet resultSet = statement.executeQuery(selectSql)) { assertNotNull(resultSet); - while (resultSet.next()) { + resultSet.next(); String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(minTime(TEMPERATURE_STR)); - Assert.assertEquals(retArray[cnt], ans); - cnt++; - } - Assert.assertEquals(2, cnt); + Assert.assertEquals(retArray[1], ans); } selectSql = "select min_time(temperature) from root.ln.wf01.wt01 where temperature > 3"; try (ResultSet resultSet = statement.executeQuery(selectSql)) { assertNotNull(resultSet); - while (resultSet.next()) { - String ans = - resultSet.getString(TIMESTAMP_STR) - + "," - + resultSet.getString(minTime(TEMPERATURE_STR)); - Assert.assertEquals(retArray[cnt], ans); - cnt++; - } - Assert.assertEquals(3, cnt); + resultSet.next(); + String ans = + resultSet.getString(TIMESTAMP_STR) + + "," + + resultSet.getString(minTime(TEMPERATURE_STR)); + Assert.assertEquals(retArray[2], ans); } } catch (Exception e) { @@ -152,85 +152,7 @@ public void mergeTest() { fail(e.getMessage()); } - // we want to recover - // TODO: replace stopDaemon() and activeDaemon() with new methods in Env. - // EnvironmentUtils.stopDaemon(); - // wait for close - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - Thread.currentThread().interrupt(); - } - // EnvironmentUtils.activeDaemon(); - - // count test - retArray = new String[] {"0,2001,2001,2001,2001", "0,7500,7500,7500,7500"}; - try (Connection connection = EnvFactory.getEnv().getConnection(); - Statement statement = connection.createStatement()) { - - String selectSql = - "select count(s0),count(s1),count(s2),count(s3) " - + "from root.vehicle.d0 where time >= 6000 and time <= 9000"; - int cnt; - try (ResultSet resultSet = statement.executeQuery(selectSql)) { - assertNotNull(resultSet); - cnt = 0; - while (resultSet.next()) { - String ans = - resultSet.getString(TIMESTAMP_STR) - + "," - + resultSet.getString(count(d0s0)) - + "," - + resultSet.getString(count(d0s1)) - + "," - + resultSet.getString(count(d0s2)) - + "," - + resultSet.getString(count(d0s3)); - Assert.assertEquals(retArray[cnt], ans); - cnt++; - } - Assert.assertEquals(1, cnt); - } - - selectSql = "select count(s0),count(s1),count(s2),count(s3) " + "from root.vehicle.d0"; - try (ResultSet resultSet = statement.executeQuery(selectSql)) { - assertNotNull(resultSet); - while (resultSet.next()) { - String ans = - resultSet.getString(TIMESTAMP_STR) - + "," - + resultSet.getString(count(d0s0)) - + "," - + resultSet.getString(count(d0s1)) - + "," - + resultSet.getString(count(d0s2)) - + "," - + resultSet.getString(count(d0s3)); - Assert.assertEquals(retArray[cnt], ans); - cnt++; - } - Assert.assertEquals(2, cnt); - } - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - // we want to recover - // EnvironmentUtils.stopDaemon(); - // wait for close - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - Thread.currentThread().interrupt(); - } - - // EnvironmentUtils.activeDaemon(); - - // maxminValueTest - + // max min ValueTest retArray = new String[] {"0,8499,500.0", "0,2499,500.0"}; try (Connection connection = EnvFactory.getEnv().getConnection(); Statement statement = connection.createStatement()) { @@ -238,37 +160,29 @@ public void mergeTest() { String selectSql = "select max_value(s0),min_value(s2) " + "from root.vehicle.d0 where time >= 100 and time < 9000"; - int cnt; try (ResultSet resultSet = statement.executeQuery(selectSql)) { assertNotNull(resultSet); - cnt = 0; - while (resultSet.next()) { + resultSet.next(); String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(maxValue(d0s0)) + "," + resultSet.getString(minValue(d0s2)); - Assert.assertEquals(retArray[cnt], ans); - cnt++; - } - Assert.assertEquals(1, cnt); + Assert.assertEquals(retArray[0], ans); } selectSql = "select max_value(s0),min_value(s2) from root.vehicle.d0 where time < 2500"; try (ResultSet resultSet = statement.executeQuery(selectSql)) { assertNotNull(resultSet); - while (resultSet.next()) { - String ans = - resultSet.getString(TIMESTAMP_STR) - + "," - + resultSet.getString(maxValue(d0s0)) - + "," - + resultSet.getString(minValue(d0s2)); - Assert.assertEquals(retArray[cnt], ans); - cnt++; - } - Assert.assertEquals(2, cnt); + resultSet.next(); + String ans = + resultSet.getString(TIMESTAMP_STR) + + "," + + resultSet.getString(maxValue(d0s0)) + + "," + + resultSet.getString(minValue(d0s2)); + Assert.assertEquals(retArray[1], ans); } } catch (Exception e) { e.printStackTrace(); @@ -277,29 +191,18 @@ public void mergeTest() { } @Test - public void vmTest() throws SQLException { - try (Connection connection = EnvFactory.getEnv().getConnection(); - Statement statement = connection.createStatement()) { - // prepare more data to flush - for (int i = 2000; i < 2500; i++) { - statement.execute( - String.format( - Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "false")); - } - statement.execute("flush"); - } - - // we want to recover - // EnvironmentUtils.stopDaemon(); - // wait for close - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - Thread.currentThread().interrupt(); - } - // EnvironmentUtils.activeDaemon(); - + public void recoverTest2(){ + // stop cluster + EnvFactory.getEnv().shutdownAllDataNodes(); + logger.info("All DataNodes are shut down"); + EnvFactory.getEnv().shutdownAllConfigNodes(); + logger.info("All ConfigNodes are shut down"); + EnvFactory.getEnv().startAllConfigNodes(); + logger.info("All ConfigNodes are started"); + EnvFactory.getEnv().startAllDataNodes(); + logger.info("All DataNodes are started"); + // wait for cluster to start and check + ((AbstractEnv) EnvFactory.getEnv()).testWorkingNoUnknown(); // count test String[] retArray = new String[] {"0,2001,2001,2001,2001", "0,7500,7500,7500,7500"}; try (Connection connection = EnvFactory.getEnv().getConnection(); @@ -308,45 +211,37 @@ public void vmTest() throws SQLException { String selectSql = "select count(s0),count(s1),count(s2),count(s3) " + "from root.vehicle.d0 where time >= 6000 and time <= 9000"; - int cnt; try (ResultSet resultSet = statement.executeQuery(selectSql)) { assertNotNull(resultSet); - cnt = 0; - while (resultSet.next()) { - String ans = - resultSet.getString(TIMESTAMP_STR) - + "," - + resultSet.getString(count(d0s0)) - + "," - + resultSet.getString(count(d0s1)) - + "," - + resultSet.getString(count(d0s2)) - + "," - + resultSet.getString(count(d0s3)); - Assert.assertEquals(retArray[cnt], ans); - cnt++; - } - Assert.assertEquals(1, cnt); + resultSet.next(); + String ans = + resultSet.getString(TIMESTAMP_STR) + + "," + + resultSet.getString(count(d0s0)) + + "," + + resultSet.getString(count(d0s1)) + + "," + + resultSet.getString(count(d0s2)) + + "," + + resultSet.getString(count(d0s3)); + Assert.assertEquals(retArray[0], ans); } selectSql = "select count(s0),count(s1),count(s2),count(s3) from root.vehicle.d0"; try (ResultSet resultSet = statement.executeQuery(selectSql)) { assertNotNull(resultSet); - while (resultSet.next()) { - String ans = - resultSet.getString(TIMESTAMP_STR) - + "," - + resultSet.getString(count(d0s0)) - + "," - + resultSet.getString(count(d0s1)) - + "," - + resultSet.getString(count(d0s2)) - + "," - + resultSet.getString(count(d0s3)); - Assert.assertEquals(retArray[cnt], ans); - cnt++; - } - Assert.assertEquals(2, cnt); + resultSet.next(); + String ans = + resultSet.getString(TIMESTAMP_STR) + + "," + + resultSet.getString(count(d0s0)) + + "," + + resultSet.getString(count(d0s1)) + + "," + + resultSet.getString(count(d0s2)) + + "," + + resultSet.getString(count(d0s3)); + Assert.assertEquals(retArray[1], ans); } } catch (Exception e) { e.printStackTrace(); @@ -367,45 +262,52 @@ private void prepareData() { } // prepare BufferWrite file - for (int i = 5000; i < 7000; i++) { - statement.execute( + String insertTemplate = "INSERT INTO root.vehicle.d0(timestamp,s0,s1,s2,s3,s4)" + " VALUES(%d,%d,%d,%f,%s,%s)"; + for (int i = 5000; i < 7000; i++) { + statement.addBatch( String.format( Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "true")); } + statement.executeBatch(); statement.execute("flush"); for (int i = 7500; i < 8500; i++) { - statement.execute( + statement.addBatch( String.format( Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "false")); } + statement.executeBatch(); statement.execute("flush"); // prepare Unseq-File for (int i = 500; i < 1500; i++) { - statement.execute( + statement.addBatch( String.format( Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "true")); } + statement.executeBatch(); statement.execute("flush"); for (int i = 3000; i < 6500; i++) { - statement.execute( + statement.addBatch( String.format( Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "false")); } - statement.execute("merge"); + statement.executeBatch(); + statement.execute("flush"); // prepare BufferWrite cache for (int i = 9000; i < 10000; i++) { - statement.execute( + statement.addBatch( String.format( Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "true")); } + statement.executeBatch(); // prepare Overflow cache for (int i = 2000; i < 2500; i++) { - statement.execute( + statement.addBatch( String.format( Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "false")); } - + statement.executeBatch(); + statement.execute("flush"); } catch (Exception e) { e.printStackTrace(); } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverUnclosedIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverUnclosedIT.java index 48c7426614f1..31febfa7baea 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverUnclosedIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverUnclosedIT.java @@ -19,20 +19,19 @@ package org.apache.iotdb.db.it; -import org.apache.iotdb.db.conf.IoTDBConfig; -import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode; import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.env.AbstractEnv; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.itbase.category.LocalStandaloneIT; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.sql.Connection; @@ -49,15 +48,13 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; -@Ignore @RunWith(IoTDBTestRunner.class) @Category({LocalStandaloneIT.class, ClusterIT.class}) public class IoTDBRecoverUnclosedIT { - private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - + private static final Logger logger = LoggerFactory.getLogger(IoTDBRecoverUnclosedIT.class); private static final String TIMESTAMP_STR = "Time"; private static final String TEMPERATURE_STR = "root.ln.wf01.wt01.temperature"; - private static String[] creationSqls = + private static final String[] creationSqls = new String[] { "CREATE DATABASE root.vehicle.d0", "CREATE DATABASE root.vehicle.d1", @@ -67,7 +64,7 @@ public class IoTDBRecoverUnclosedIT { "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT, ENCODING=PLAIN", "CREATE TIMESERIES root.vehicle.d0.s4 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN" }; - private static String[] dataSet2 = + private static final String[] dataSet2 = new String[] { "CREATE DATABASE root.ln.wf01.wt01", "CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN", @@ -84,26 +81,16 @@ public class IoTDBRecoverUnclosedIT { "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + "values(5, 5.5, false, 55)" }; - private final String d0s0 = "root.vehicle.d0.s0"; - private final String d0s1 = "root.vehicle.d0.s1"; - private final String d0s2 = "root.vehicle.d0.s2"; - private final String d0s3 = "root.vehicle.d0.s3"; - private String insertTemplate = - "INSERT INTO root.vehicle.d0(timestamp,s0,s1,s2,s3,s4)" + " VALUES(%d,%d,%d,%f,%s,%s)"; - - private WALMode prevWALMode; - @Before + @Before public void setUp() throws Exception { + EnvFactory.getEnv().getConfig().getCommonConfig().setWalMode("SYNC"); EnvFactory.getEnv().initClusterEnvironment(); - prevWALMode = config.getWalMode(); - config.setWalMode(WALMode.SYNC); prepareData(); } @After public void tearDown() throws Exception { - config.setWalMode(prevWALMode); EnvFactory.getEnv().cleanClusterEnvironment(); } @@ -114,47 +101,36 @@ public void test() throws SQLException, IOException { Statement statement = connection.createStatement()) { String selectSql = "select count(temperature) from root.ln.wf01.wt01 where time > 3"; - int cnt; try (ResultSet resultSet = statement.executeQuery(selectSql)) { assertNotNull(resultSet); - cnt = 0; - while (resultSet.next()) { + resultSet.next(); String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(count(TEMPERATURE_STR)); - assertEquals(retArray[cnt], ans); - cnt++; - } - assertEquals(1, cnt); + assertEquals(retArray[0], ans); } selectSql = "select min_time(temperature) from root.ln.wf01.wt01 where time > 3"; try (ResultSet resultSet = statement.executeQuery(selectSql)) { assertNotNull(resultSet); - while (resultSet.next()) { + resultSet.next(); String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(minTime(TEMPERATURE_STR)); - assertEquals(retArray[cnt], ans); - cnt++; - } - assertEquals(2, cnt); + assertEquals(retArray[1], ans); } selectSql = "select min_time(temperature) from root.ln.wf01.wt01 where temperature > 3"; try (ResultSet resultSet = statement.executeQuery(selectSql)) { assertNotNull(resultSet); - while (resultSet.next()) { + resultSet.next(); String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(minTime(TEMPERATURE_STR)); - assertEquals(retArray[cnt], ans); - cnt++; - } - assertEquals(3, cnt); + assertEquals(retArray[2], ans); } } catch (Exception e) { @@ -162,63 +138,60 @@ public void test() throws SQLException, IOException { fail(e.getMessage()); } - // TODO: replace restartDaemon() with new methods in Env. - try { - // EnvironmentUtils.restartDaemon(); - } catch (Exception e) { - fail(); - } insertMoreData(); - try { - // EnvironmentUtils.restartDaemon(); - } catch (Exception e) { - fail(); - } - // test count, max, min value - retArray = new String[] {"0,8499,500.0", "0,2499,500.0"}; + // stop cluster + EnvFactory.getEnv().shutdownAllDataNodes(); + logger.info("All DataNodes are shut down"); + EnvFactory.getEnv().shutdownAllConfigNodes(); + logger.info("All ConfigNodes are shut down"); + EnvFactory.getEnv().startAllConfigNodes(); + logger.info("All ConfigNodes are started"); + EnvFactory.getEnv().startAllDataNodes(); + logger.info("All DataNodes are started"); + // wait for cluster to start and check + ((AbstractEnv) EnvFactory.getEnv()).testWorkingNoUnknown(); + + // test count, try (Connection connection = EnvFactory.getEnv().getConnection(); Statement statement = connection.createStatement()) { String selectSql = "select count(*) from root.vehicle.d0"; ResultSet tempResultSet = statement.executeQuery(selectSql); assertNotNull(tempResultSet); tempResultSet.next(); - assertEquals(7500, tempResultSet.getInt("count(" + d0s0 + ")")); + String d0s0 = "root.vehicle.d0.s0"; + String d0s1 = "root.vehicle.d0.s1"; + String d0s2 = "root.vehicle.d0.s2"; + assertEquals(7500, tempResultSet.getInt("count(" + d0s0 + ")")); + // test max, min value + retArray = new String[] {"0,8499,500.0", "0,2499,500.0"}; selectSql = "select max_value(s0),min_value(s2) " + "from root.vehicle.d0 where time >= 100 and time < 9000"; - int cnt; try (ResultSet resultSet = statement.executeQuery(selectSql)) { assertNotNull(resultSet); - cnt = 0; - while (resultSet.next()) { + resultSet.next(); String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(maxValue(d0s0)) + "," + resultSet.getString(minValue(d0s2)); - assertEquals(retArray[cnt], ans); - cnt++; - } - assertEquals(1, cnt); + assertEquals(retArray[0], ans); } - selectSql = "select max_value(s0),min_value(s2) from root.vehicle.d0 where time < 2500"; + selectSql = "select max_value(s1),min_value(s2) from root.vehicle.d0 where time < 2500"; try (ResultSet resultSet = statement.executeQuery(selectSql)) { assertNotNull(resultSet); - while (resultSet.next()) { + resultSet.next(); String ans = resultSet.getString(TIMESTAMP_STR) + "," - + resultSet.getString(maxValue(d0s0)) + + resultSet.getString(maxValue(d0s1)) + "," + resultSet.getString(minValue(d0s2)); - assertEquals(retArray[cnt], ans); - cnt++; - } - assertEquals(2, cnt); + assertEquals(retArray[1], ans); } } catch (Exception e) { e.printStackTrace(); @@ -247,41 +220,45 @@ private void insertMoreData() { Statement statement = connection.createStatement()) { // prepare BufferWrite file + String insertTemplate = "INSERT INTO root.vehicle.d0(timestamp,s0,s1,s2,s3,s4)" + " VALUES(%d,%d,%d,%f,%s,%s)"; for (int i = 5000; i < 7000; i++) { - statement.execute( - String.format( - Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "true")); + statement.addBatch( + String.format( + Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "true")); } + statement.executeBatch(); for (int i = 7500; i < 8500; i++) { - statement.execute( - String.format( - Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "false")); + statement.addBatch( + String.format( + Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "false")); } + statement.executeBatch(); // prepare Unseq-File for (int i = 500; i < 1500; i++) { - statement.execute( - String.format( - Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "true")); + statement.addBatch( + String.format( + Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "true")); } for (int i = 3000; i < 6500; i++) { - statement.execute( - String.format( - Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "false")); + statement.addBatch( + String.format( + Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "false")); } - + statement.executeBatch(); // prepare BufferWrite cache for (int i = 9000; i < 10000; i++) { - statement.execute( - String.format( - Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "true")); + statement.addBatch( + String.format( + Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "true")); } + statement.executeBatch(); // prepare Overflow cache for (int i = 2000; i < 2500; i++) { - statement.execute( - String.format( - Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "false")); + statement.addBatch( + String.format( + Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "false")); } - + statement.executeBatch(); } catch (Exception e) { e.printStackTrace(); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java index 9766269e4207..db3235ce5c90 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java @@ -175,29 +175,29 @@ public static void cleanEnv() throws IOException, StorageEngineException { private static boolean examinePorts() { TTransport transport = TSocketWrapper.wrap(tConfiguration, "127.0.0.1", 6667, 100); - if (!transport.isOpen()) { - try { - transport.open(); - logger.error("stop daemon failed. 6667 can be connected now."); - transport.close(); - return false; - } catch (TTransportException e) { - // do nothing + if (transport != null && !transport.isOpen()) { + try { + transport.open(); + logger.error("stop daemon failed. 6667 can be connected now."); + transport.close(); + return false; + } catch (TTransportException e) { + // do nothing + } } - } - // try sync service + // try sync service transport = TSocketWrapper.wrap(tConfiguration, "127.0.0.1", 5555, 100); - if (!transport.isOpen()) { - try { - transport.open(); - logger.error("stop Sync daemon failed. 5555 can be connected now."); - transport.close(); - return false; - } catch (TTransportException e) { - // do nothing + if (transport != null && !transport.isOpen()) { + try { + transport.open(); + logger.error("stop Sync daemon failed. 5555 can be connected now."); + transport.close(); + return false; + } catch (TTransportException e) { + // do nothing + } } - } - // try jmx connection + // try jmx connection try { JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:31999/jmxrmi"); JMXConnector jmxConnector = JMXConnectorFactory.connect(url); @@ -288,23 +288,6 @@ public static void envSetUp() { TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); } - public static void stopDaemon() {} - - public static void shutdownDaemon() throws Exception {} - - public static void activeDaemon() {} - - public static void reactiveDaemon() {} - - public static void restartDaemon() throws Exception { - shutdownDaemon(); - stopDaemon(); - TsFileResourceManager.getInstance().clear(); - WALManager.getInstance().clear(); - WALRecoverManager.getInstance().clear(); - reactiveDaemon(); - } - private static void createAllDir() { // create sequential files for (String path : tierManager.getAllLocalSequenceFileFolders()) { From 598a4c86dc8c9a5f2659f321e4315f97df074f1d Mon Sep 17 00:00:00 2001 From: lyf <1548150065@qq.com> Date: Sun, 4 Feb 2024 10:23:20 +0800 Subject: [PATCH 02/10] spotless apply --- .../env/cluster/config/MppCommonConfig.java | 1 + .../iotdb/it/env/cluster/env/AbstractEnv.java | 4 +- .../it/env/remote/env/RemoteServerEnv.java | 7 +- .../org/apache/iotdb/itbase/env/BaseEnv.java | 1 - .../it/cluster/IoTDBClusterRestartIT.java | 2 +- .../apache/iotdb/db/it/IoTDBRecoverIT.java | 39 ++++----- .../iotdb/db/it/IoTDBRecoverUnclosedIT.java | 87 +++++++++---------- .../iotdb/db/utils/EnvironmentUtils.java | 40 ++++----- 8 files changed, 89 insertions(+), 92 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index 3a98ae81d262..496e9e5e23f7 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -419,6 +419,7 @@ public CommonConfig setDriverTaskExecutionTimeSliceInMs(long driverTaskExecution "driver_task_execution_time_slice_in_ms", String.valueOf(driverTaskExecutionTimeSliceInMs)); return this; } + @Override public CommonConfig setWalMode(String walMode) { setProperty("wal_mode", walMode); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 729989467dea..9286688a449c 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -658,8 +658,8 @@ public void startConfigNode(int index) { } @Override - public void startAllConfigNodes(){ - for(ConfigNodeWrapper configNodeWrapper:configNodeWrapperList){ + public void startAllConfigNodes() { + for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) { configNodeWrapper.start(); } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java index be4638f49e69..543fec5bbdb4 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java @@ -245,7 +245,7 @@ public void startConfigNode(int index) { } @Override - public void startAllConfigNodes(){ + public void startAllConfigNodes() { throw new UnsupportedOperationException(); } @@ -255,11 +255,10 @@ public void shutdownConfigNode(int index) { } @Override - public void shutdownAllConfigNodes(){ + public void shutdownAllConfigNodes() { throw new UnsupportedOperationException(); } - @Override public void ensureNodeStatus(List nodes, List targetStatus) { throw new UnsupportedOperationException(); @@ -321,7 +320,7 @@ public void shutdownDataNode(int index) { } @Override - public void shutdownAllDataNodes(){ + public void shutdownAllDataNodes() { throw new UnsupportedOperationException(); } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java index 971b9b07fb41..9ff51830de0b 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java @@ -218,7 +218,6 @@ void ensureNodeStatus(List nodes, List targetStatus /** Shutdown all existed DataNodes. */ void shutdownAllDataNodes(); - String getIP(); String getPort(); diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java index c6c0e2c6cc5b..f82efcdd06c3 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java @@ -93,7 +93,7 @@ public void tearDown() { } @Test - public void clusterRestartTest(){ + public void clusterRestartTest() { // Shutdown all cluster nodes logger.info("Shutting down all ConfigNodes and DataNodes..."); EnvFactory.getEnv().shutdownAllConfigNodes(); diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverIT.java index c4df4ccef4b6..8c3d084697e0 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverIT.java @@ -86,7 +86,7 @@ public class IoTDBRecoverIT { private final String d0s2 = "root.vehicle.d0.s2"; private final String d0s3 = "root.vehicle.d0.s3"; - @Before + @Before public void setUp() throws Exception { EnvFactory.getEnv().initClusterEnvironment(); prepareData(); @@ -118,10 +118,8 @@ public void RecoverTest1() { try (ResultSet resultSet = statement.executeQuery(selectSql)) { assertNotNull(resultSet); resultSet.next(); - String ans = - resultSet.getString(TIMESTAMP_STR) - + "," - + resultSet.getString(count(TEMPERATURE_STR)); + String ans = + resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(count(TEMPERATURE_STR)); Assert.assertEquals(retArray[0], ans); } @@ -129,10 +127,10 @@ public void RecoverTest1() { try (ResultSet resultSet = statement.executeQuery(selectSql)) { assertNotNull(resultSet); resultSet.next(); - String ans = - resultSet.getString(TIMESTAMP_STR) - + "," - + resultSet.getString(minTime(TEMPERATURE_STR)); + String ans = + resultSet.getString(TIMESTAMP_STR) + + "," + + resultSet.getString(minTime(TEMPERATURE_STR)); Assert.assertEquals(retArray[1], ans); } @@ -163,13 +161,13 @@ public void RecoverTest1() { try (ResultSet resultSet = statement.executeQuery(selectSql)) { assertNotNull(resultSet); resultSet.next(); - String ans = - resultSet.getString(TIMESTAMP_STR) - + "," - + resultSet.getString(maxValue(d0s0)) - + "," - + resultSet.getString(minValue(d0s2)); - Assert.assertEquals(retArray[0], ans); + String ans = + resultSet.getString(TIMESTAMP_STR) + + "," + + resultSet.getString(maxValue(d0s0)) + + "," + + resultSet.getString(minValue(d0s2)); + Assert.assertEquals(retArray[0], ans); } selectSql = "select max_value(s0),min_value(s2) from root.vehicle.d0 where time < 2500"; @@ -191,7 +189,7 @@ public void RecoverTest1() { } @Test - public void recoverTest2(){ + public void recoverTest2() { // stop cluster EnvFactory.getEnv().shutdownAllDataNodes(); logger.info("All DataNodes are shut down"); @@ -224,7 +222,7 @@ public void recoverTest2(){ + resultSet.getString(count(d0s2)) + "," + resultSet.getString(count(d0s3)); - Assert.assertEquals(retArray[0], ans); + Assert.assertEquals(retArray[0], ans); } selectSql = "select count(s0),count(s1),count(s2),count(s3) from root.vehicle.d0"; @@ -262,8 +260,9 @@ private void prepareData() { } // prepare BufferWrite file - String insertTemplate = "INSERT INTO root.vehicle.d0(timestamp,s0,s1,s2,s3,s4)" + " VALUES(%d,%d,%d,%f,%s,%s)"; - for (int i = 5000; i < 7000; i++) { + String insertTemplate = + "INSERT INTO root.vehicle.d0(timestamp,s0,s1,s2,s3,s4)" + " VALUES(%d,%d,%d,%f,%s,%s)"; + for (int i = 5000; i < 7000; i++) { statement.addBatch( String.format( Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "true")); diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverUnclosedIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverUnclosedIT.java index 31febfa7baea..b8d2a90a01d6 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverUnclosedIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverUnclosedIT.java @@ -82,7 +82,7 @@ public class IoTDBRecoverUnclosedIT { + "values(5, 5.5, false, 55)" }; - @Before + @Before public void setUp() throws Exception { EnvFactory.getEnv().getConfig().getCommonConfig().setWalMode("SYNC"); EnvFactory.getEnv().initClusterEnvironment(); @@ -104,33 +104,31 @@ public void test() throws SQLException, IOException { try (ResultSet resultSet = statement.executeQuery(selectSql)) { assertNotNull(resultSet); resultSet.next(); - String ans = - resultSet.getString(TIMESTAMP_STR) - + "," - + resultSet.getString(count(TEMPERATURE_STR)); - assertEquals(retArray[0], ans); + String ans = + resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(count(TEMPERATURE_STR)); + assertEquals(retArray[0], ans); } selectSql = "select min_time(temperature) from root.ln.wf01.wt01 where time > 3"; try (ResultSet resultSet = statement.executeQuery(selectSql)) { assertNotNull(resultSet); resultSet.next(); - String ans = - resultSet.getString(TIMESTAMP_STR) - + "," - + resultSet.getString(minTime(TEMPERATURE_STR)); - assertEquals(retArray[1], ans); + String ans = + resultSet.getString(TIMESTAMP_STR) + + "," + + resultSet.getString(minTime(TEMPERATURE_STR)); + assertEquals(retArray[1], ans); } selectSql = "select min_time(temperature) from root.ln.wf01.wt01 where temperature > 3"; try (ResultSet resultSet = statement.executeQuery(selectSql)) { assertNotNull(resultSet); resultSet.next(); - String ans = - resultSet.getString(TIMESTAMP_STR) - + "," - + resultSet.getString(minTime(TEMPERATURE_STR)); - assertEquals(retArray[2], ans); + String ans = + resultSet.getString(TIMESTAMP_STR) + + "," + + resultSet.getString(minTime(TEMPERATURE_STR)); + assertEquals(retArray[2], ans); } } catch (Exception e) { @@ -162,7 +160,7 @@ public void test() throws SQLException, IOException { String d0s0 = "root.vehicle.d0.s0"; String d0s1 = "root.vehicle.d0.s1"; String d0s2 = "root.vehicle.d0.s2"; - assertEquals(7500, tempResultSet.getInt("count(" + d0s0 + ")")); + assertEquals(7500, tempResultSet.getInt("count(" + d0s0 + ")")); // test max, min value retArray = new String[] {"0,8499,500.0", "0,2499,500.0"}; @@ -172,26 +170,26 @@ public void test() throws SQLException, IOException { try (ResultSet resultSet = statement.executeQuery(selectSql)) { assertNotNull(resultSet); resultSet.next(); - String ans = - resultSet.getString(TIMESTAMP_STR) - + "," - + resultSet.getString(maxValue(d0s0)) - + "," - + resultSet.getString(minValue(d0s2)); - assertEquals(retArray[0], ans); + String ans = + resultSet.getString(TIMESTAMP_STR) + + "," + + resultSet.getString(maxValue(d0s0)) + + "," + + resultSet.getString(minValue(d0s2)); + assertEquals(retArray[0], ans); } selectSql = "select max_value(s1),min_value(s2) from root.vehicle.d0 where time < 2500"; try (ResultSet resultSet = statement.executeQuery(selectSql)) { assertNotNull(resultSet); resultSet.next(); - String ans = - resultSet.getString(TIMESTAMP_STR) - + "," - + resultSet.getString(maxValue(d0s1)) - + "," - + resultSet.getString(minValue(d0s2)); - assertEquals(retArray[1], ans); + String ans = + resultSet.getString(TIMESTAMP_STR) + + "," + + resultSet.getString(maxValue(d0s1)) + + "," + + resultSet.getString(minValue(d0s2)); + assertEquals(retArray[1], ans); } } catch (Exception e) { e.printStackTrace(); @@ -220,43 +218,44 @@ private void insertMoreData() { Statement statement = connection.createStatement()) { // prepare BufferWrite file - String insertTemplate = "INSERT INTO root.vehicle.d0(timestamp,s0,s1,s2,s3,s4)" + " VALUES(%d,%d,%d,%f,%s,%s)"; + String insertTemplate = + "INSERT INTO root.vehicle.d0(timestamp,s0,s1,s2,s3,s4)" + " VALUES(%d,%d,%d,%f,%s,%s)"; for (int i = 5000; i < 7000; i++) { statement.addBatch( - String.format( - Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "true")); + String.format( + Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "true")); } statement.executeBatch(); for (int i = 7500; i < 8500; i++) { statement.addBatch( - String.format( - Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "false")); + String.format( + Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "false")); } statement.executeBatch(); // prepare Unseq-File for (int i = 500; i < 1500; i++) { statement.addBatch( - String.format( - Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "true")); + String.format( + Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "true")); } for (int i = 3000; i < 6500; i++) { statement.addBatch( - String.format( - Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "false")); + String.format( + Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "false")); } statement.executeBatch(); // prepare BufferWrite cache for (int i = 9000; i < 10000; i++) { statement.addBatch( - String.format( - Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "true")); + String.format( + Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "true")); } statement.executeBatch(); // prepare Overflow cache for (int i = 2000; i < 2500; i++) { statement.addBatch( - String.format( - Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "false")); + String.format( + Locale.ENGLISH, insertTemplate, i, i, i, (double) i, "'" + i + "'", "false")); } statement.executeBatch(); } catch (Exception e) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java index db3235ce5c90..b416f0033b7f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java @@ -175,29 +175,29 @@ public static void cleanEnv() throws IOException, StorageEngineException { private static boolean examinePorts() { TTransport transport = TSocketWrapper.wrap(tConfiguration, "127.0.0.1", 6667, 100); - if (transport != null && !transport.isOpen()) { - try { - transport.open(); - logger.error("stop daemon failed. 6667 can be connected now."); - transport.close(); - return false; - } catch (TTransportException e) { - // do nothing - } + if (transport != null && !transport.isOpen()) { + try { + transport.open(); + logger.error("stop daemon failed. 6667 can be connected now."); + transport.close(); + return false; + } catch (TTransportException e) { + // do nothing } - // try sync service + } + // try sync service transport = TSocketWrapper.wrap(tConfiguration, "127.0.0.1", 5555, 100); - if (transport != null && !transport.isOpen()) { - try { - transport.open(); - logger.error("stop Sync daemon failed. 5555 can be connected now."); - transport.close(); - return false; - } catch (TTransportException e) { - // do nothing - } + if (transport != null && !transport.isOpen()) { + try { + transport.open(); + logger.error("stop Sync daemon failed. 5555 can be connected now."); + transport.close(); + return false; + } catch (TTransportException e) { + // do nothing } - // try jmx connection + } + // try jmx connection try { JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:31999/jmxrmi"); JMXConnector jmxConnector = JMXConnectorFactory.connect(url); From 0aab75e3d81f0dff5fa9e25d0428e8f3c6ef6309 Mon Sep 17 00:00:00 2001 From: lyf <1548150065@qq.com> Date: Sun, 4 Feb 2024 12:24:59 +0800 Subject: [PATCH 03/10] make cluster status check more clear --- .../iotdb/it/env/cluster/env/AbstractEnv.java | 106 +++++------------- .../it/cluster/IoTDBClusterRestartIT.java | 4 +- .../apache/iotdb/db/it/IoTDBRecoverIT.java | 4 +- .../iotdb/db/it/IoTDBRecoverUnclosedIT.java | 3 +- .../apache/iotdb/db/it/utils/TestUtils.java | 2 +- .../iotdb/pipe/it/IoTDBPipeClusterIT.java | 4 +- 6 files changed, 40 insertions(+), 83 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 9286688a449c..42d01d3f555e 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -45,7 +45,6 @@ import org.apache.iotdb.itbase.runtime.*; import org.apache.iotdb.jdbc.Config; import org.apache.iotdb.jdbc.Constant; -import org.apache.iotdb.jdbc.IoTDBConnection; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.session.Session; @@ -133,7 +132,7 @@ public List getMetricPrometheusReporterContents() { } protected void initEnvironment(int configNodesNum, int dataNodesNum) { - initEnvironment(configNodesNum, dataNodesNum, 30); + initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount); } protected void initEnvironment(int configNodesNum, int dataNodesNum, int testWorkingRetryCount) { @@ -245,7 +244,7 @@ protected void initEnvironment(int configNodesNum, int dataNodesNum, int testWor throw new AssertionError(); } - testWorkingNoUnknown(); + checkClusterStatusWithoutUnknown(); } public String getTestClassName() { @@ -265,66 +264,32 @@ private Map countNodeStatus(Map nodeStatus) { return result; } - public void testWorkingNoUnknown() { - testWorking(nodeStatusMap -> nodeStatusMap.values().stream().noneMatch("Unknown"::equals)); + public boolean checkClusterStatusWithoutUnknown() { + return checkClusterStatus(nodeStatusMap -> nodeStatusMap.values().stream().noneMatch("Unknown"::equals)); } - public void testWorkingOneUnknownOtherRunning() { - testWorking( + public boolean checkClusterStatusOneUnknownOtherRunning() { + return checkClusterStatus( nodeStatus -> { Map count = countNodeStatus(nodeStatus); return count.getOrDefault("Unknown", 0) == 1 && count.getOrDefault("Running", 0) == nodeStatus.size() - 1; }); } - - public void testWorking(Predicate> statusCheck) { - logger.info("Testing DataNode connection..."); - List endpoints = - dataNodeWrapperList.stream() - .map(DataNodeWrapper::getIpAndPortString) - .collect(Collectors.toList()); - RequestDelegate testDelegate = - new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT); - for (DataNodeWrapper dataNode : dataNodeWrapperList) { - final String dataNodeEndpoint = dataNode.getIpAndPortString(); - testDelegate.addRequest( - () -> { - Exception lastException = null; - for (int i = 0; i < testWorkingRetryCount; i++) { - try (Connection ignored = getConnection(dataNodeEndpoint, PROBE_TIMEOUT_MS)) { - logger.info("Successfully connecting to DataNode: {}.", dataNodeEndpoint); - return null; - } catch (Exception e) { - lastException = e; - TimeUnit.SECONDS.sleep(1L); - } - } - throw lastException; - }); - } - try { - long startTime = System.currentTimeMillis(); - testDelegate.requestAll(); - if (!configNodeWrapperList.isEmpty()) { - checkNodeHeartbeat(statusCheck); - } - logger.info("Start cluster costs: {}s", (System.currentTimeMillis() - startTime) / 1000.0); - } catch (Exception e) { - logger.error("exception in testWorking of ClusterID, message: {}", e.getMessage(), e); - throw new AssertionError( - String.format("After %d times retry, the cluster can't work!", testWorkingRetryCount)); - } - } - - private void checkNodeHeartbeat(Predicate> statusCheck) throws Exception { + /** + * Returns whether the all nodes' status all match the provided predicate. + * @param statusCheck the predicate to test the status of nodes + * @return {@code true} if all nodes' status of the cluster match the provided + * predicate, otherwise {@code false} + */ + public boolean checkClusterStatus(Predicate> statusCheck){ logger.info("Testing cluster environment..."); TShowClusterResp showClusterResp; Exception lastException = null; boolean flag; - for (int i = 0; i < 30; i++) { + for (int i = 0; i < testWorkingRetryCount; i++) { try (SyncConfigNodeIServiceClient client = - (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) { + (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) { flag = true; showClusterResp = client.showCluster(); @@ -335,7 +300,7 @@ private void checkNodeHeartbeat(Predicate> statusCheck) thr // Check the number of nodes if (showClusterResp.getNodeStatus().size() - != configNodeWrapperList.size() + dataNodeWrapperList.size()) { + != configNodeWrapperList.size() + dataNodeWrapperList.size()) { flag = false; } @@ -347,20 +312,24 @@ private void checkNodeHeartbeat(Predicate> statusCheck) thr if (flag) { logger.info("The cluster is now ready for testing!"); - return; + return true; } } catch (Exception e) { lastException = e; } - TimeUnit.SECONDS.sleep(1L); + try { + TimeUnit.SECONDS.sleep(1L); + } catch (InterruptedException e) { + lastException = e; + Thread.currentThread().interrupt(); + } } - if (lastException != null) { - throw lastException; + logger.error("exception in testWorking of ClusterID, message: {}", lastException.getMessage(), lastException); } - throw new Exception("Check not pass"); + logger.info("checkNodeHeartbeat failed after {} retries",testWorkingRetryCount); + return false; } - @Override public void cleanClusterEnvironment() { for (AbstractNodeWrapper nodeWrapper : @@ -391,19 +360,6 @@ public Connection getConnectionWithSpecifiedDataNode( getWriteConnectionWithSpecifiedDataNode(dataNode, null, username, password), getReadConnections(null, username, password)); } - - private Connection getConnection(String endpoint, int queryTimeout) throws SQLException { - IoTDBConnection connection = - (IoTDBConnection) - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + endpoint + getParam(null, queryTimeout), - System.getProperty("User", "root"), - System.getProperty("Password", "root")); - connection.setQueryTimeout(queryTimeout); - - return connection; - } - @Override public Connection getConnection(Constant.Version version, String username, String password) throws SQLException { @@ -570,7 +526,7 @@ public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() throws IOException, InterruptedException { Exception lastException = null; ConfigNodeWrapper lastErrorNode = null; - for (int i = 0; i < 30; i++) { + for (int i = 0; i < testWorkingRetryCount; i++) { for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) { try { lastErrorNode = configNodeWrapper; @@ -614,7 +570,7 @@ public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() public int getLeaderConfigNodeIndex() throws IOException, InterruptedException { Exception lastException = null; ConfigNodeWrapper lastErrorNode = null; - for (int retry = 0; retry < 30; retry++) { + for (int retry = 0; retry < testWorkingRetryCount; retry++) { for (int configNodeId = 0; configNodeId < configNodeWrapperList.size(); configNodeId++) { ConfigNodeWrapper configNodeWrapper = configNodeWrapperList.get(configNodeId); lastErrorNode = configNodeWrapper; @@ -761,7 +717,7 @@ public void registerNewConfigNode(ConfigNodeWrapper newConfigNodeWrapper, boolea if (isNeedVerify) { // Test whether register success - testWorkingNoUnknown(); + checkClusterStatusWithoutUnknown(); } } @@ -786,7 +742,7 @@ public void registerNewDataNode(DataNodeWrapper newDataNodeWrapper, boolean isNe if (isNeedVerify) { // Test whether register success - testWorkingNoUnknown(); + checkClusterStatusWithoutUnknown(); } } @@ -818,7 +774,7 @@ public void shutdownAllDataNodes() { public void ensureNodeStatus(List nodes, List targetStatus) throws IllegalStateException { Throwable lastException = null; - for (int i = 0; i < 30; i++) { + for (int i = 0; i < testWorkingRetryCount; i++) { try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { List errorMessages = new ArrayList<>(nodes.size()); diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java index f82efcdd06c3..65ef103c5545 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java @@ -104,7 +104,7 @@ public void clusterRestartTest() { EnvFactory.getEnv().startAllConfigNodes(); logger.info("Restarting all DataNodes..."); EnvFactory.getEnv().startAllDataNodes(); - ((AbstractEnv) EnvFactory.getEnv()).testWorkingNoUnknown(); + Assert.assertTrue(((AbstractEnv) EnvFactory.getEnv()).checkClusterStatusWithoutUnknown()); } @Test @@ -232,7 +232,7 @@ public void clusterRestartWithoutSeedConfigNode() { EnvFactory.getEnv().startDataNode(i); } logger.info("Restarted"); - ((AbstractEnv) EnvFactory.getEnv()).testWorkingOneUnknownOtherRunning(); + Assert.assertTrue(((AbstractEnv) EnvFactory.getEnv()).checkClusterStatusOneUnknownOtherRunning()); logger.info("Working without Seed-ConfigNode"); } } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverIT.java index 8c3d084697e0..2b6c558618d2 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverIT.java @@ -109,7 +109,7 @@ public void RecoverTest1() { EnvFactory.getEnv().startAllDataNodes(); logger.info("All DataNodes are started"); // check cluster whether restart - ((AbstractEnv) EnvFactory.getEnv()).testWorkingNoUnknown(); + Assert.assertTrue(((AbstractEnv) EnvFactory.getEnv()).checkClusterStatusWithoutUnknown()); String[] retArray = new String[] {"0,2", "0,4", "0,3"}; try (Connection connection = EnvFactory.getEnv().getConnection(); Statement statement = connection.createStatement()) { @@ -200,7 +200,7 @@ public void recoverTest2() { EnvFactory.getEnv().startAllDataNodes(); logger.info("All DataNodes are started"); // wait for cluster to start and check - ((AbstractEnv) EnvFactory.getEnv()).testWorkingNoUnknown(); + Assert.assertTrue(((AbstractEnv) EnvFactory.getEnv()).checkClusterStatusWithoutUnknown()); // count test String[] retArray = new String[] {"0,2001,2001,2001,2001", "0,7500,7500,7500,7500"}; try (Connection connection = EnvFactory.getEnv().getConnection(); diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverUnclosedIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverUnclosedIT.java index b8d2a90a01d6..8478bf290da7 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverUnclosedIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRecoverUnclosedIT.java @@ -26,6 +26,7 @@ import org.apache.iotdb.itbase.category.LocalStandaloneIT; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -148,7 +149,7 @@ public void test() throws SQLException, IOException { EnvFactory.getEnv().startAllDataNodes(); logger.info("All DataNodes are started"); // wait for cluster to start and check - ((AbstractEnv) EnvFactory.getEnv()).testWorkingNoUnknown(); + Assert.assertTrue(((AbstractEnv) EnvFactory.getEnv()).checkClusterStatusWithoutUnknown()); // test count, try (Connection connection = EnvFactory.getEnv().getConnection(); diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java index 5e5c8bdd1cb0..9a89788838e3 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java @@ -645,7 +645,7 @@ public static void restartCluster(BaseEnv env) throws Exception { for (int i = 0; i < env.getDataNodeWrapperList().size(); ++i) { env.startDataNode(i); } - ((AbstractEnv) env).testWorkingNoUnknown(); + ((AbstractEnv) env).checkClusterStatusWithoutUnknown(); } public static void assertDataOnEnv( diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java index 91a4c13d1f90..7f2722608fb5 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java @@ -237,7 +237,7 @@ public void testPipeAfterDataRegionLeaderStop() throws Exception { } try { senderEnv.startDataNode(i); - ((AbstractEnv) senderEnv).testWorkingNoUnknown(); + Assert.assertTrue(((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown()); } catch (Exception e) { e.printStackTrace(); return; @@ -637,7 +637,7 @@ public void testNewDataNodeFailureParallelToTransferringData() throws Exception senderEnv.startDataNode(senderEnv.getDataNodeWrapperList().size() - 1); senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 1); senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size() - 1); - ((AbstractEnv) senderEnv).testWorkingNoUnknown(); + Assert.assertTrue(((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown()); } catch (Exception e) { e.printStackTrace(); return; From 1c2b5402c6ca77e961d6ef59a5e76ca767a3fac4 Mon Sep 17 00:00:00 2001 From: lyf <1548150065@qq.com> Date: Sun, 4 Feb 2024 12:25:41 +0800 Subject: [PATCH 04/10] spotless apply --- .../iotdb/it/env/cluster/env/AbstractEnv.java | 29 ++++++++++++------- .../it/cluster/IoTDBClusterRestartIT.java | 3 +- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 42d01d3f555e..c605c71cb7bb 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -265,7 +265,8 @@ private Map countNodeStatus(Map nodeStatus) { } public boolean checkClusterStatusWithoutUnknown() { - return checkClusterStatus(nodeStatusMap -> nodeStatusMap.values().stream().noneMatch("Unknown"::equals)); + return checkClusterStatus( + nodeStatusMap -> nodeStatusMap.values().stream().noneMatch("Unknown"::equals)); } public boolean checkClusterStatusOneUnknownOtherRunning() { @@ -277,19 +278,20 @@ public boolean checkClusterStatusOneUnknownOtherRunning() { }); } /** - * Returns whether the all nodes' status all match the provided predicate. - * @param statusCheck the predicate to test the status of nodes - * @return {@code true} if all nodes' status of the cluster match the provided - * predicate, otherwise {@code false} - */ - public boolean checkClusterStatus(Predicate> statusCheck){ + * Returns whether the all nodes' status all match the provided predicate. + * + * @param statusCheck the predicate to test the status of nodes + * @return {@code true} if all nodes' status of the cluster match the provided predicate, + * otherwise {@code false} + */ + public boolean checkClusterStatus(Predicate> statusCheck) { logger.info("Testing cluster environment..."); TShowClusterResp showClusterResp; Exception lastException = null; boolean flag; for (int i = 0; i < testWorkingRetryCount; i++) { try (SyncConfigNodeIServiceClient client = - (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) { + (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) { flag = true; showClusterResp = client.showCluster(); @@ -300,7 +302,7 @@ public boolean checkClusterStatus(Predicate> statusCheck){ // Check the number of nodes if (showClusterResp.getNodeStatus().size() - != configNodeWrapperList.size() + dataNodeWrapperList.size()) { + != configNodeWrapperList.size() + dataNodeWrapperList.size()) { flag = false; } @@ -325,11 +327,15 @@ public boolean checkClusterStatus(Predicate> statusCheck){ } } if (lastException != null) { - logger.error("exception in testWorking of ClusterID, message: {}", lastException.getMessage(), lastException); + logger.error( + "exception in testWorking of ClusterID, message: {}", + lastException.getMessage(), + lastException); } - logger.info("checkNodeHeartbeat failed after {} retries",testWorkingRetryCount); + logger.info("checkNodeHeartbeat failed after {} retries", testWorkingRetryCount); return false; } + @Override public void cleanClusterEnvironment() { for (AbstractNodeWrapper nodeWrapper : @@ -360,6 +366,7 @@ public Connection getConnectionWithSpecifiedDataNode( getWriteConnectionWithSpecifiedDataNode(dataNode, null, username, password), getReadConnections(null, username, password)); } + @Override public Connection getConnection(Constant.Version version, String username, String password) throws SQLException { diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java index 65ef103c5545..39a73ea6ac43 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java @@ -232,7 +232,8 @@ public void clusterRestartWithoutSeedConfigNode() { EnvFactory.getEnv().startDataNode(i); } logger.info("Restarted"); - Assert.assertTrue(((AbstractEnv) EnvFactory.getEnv()).checkClusterStatusOneUnknownOtherRunning()); + Assert.assertTrue( + ((AbstractEnv) EnvFactory.getEnv()).checkClusterStatusOneUnknownOtherRunning()); logger.info("Working without Seed-ConfigNode"); } } From 28003485a8c834da78f2c84cb365aac1245bfad8 Mon Sep 17 00:00:00 2001 From: lyf <1548150065@qq.com> Date: Mon, 5 Feb 2024 17:13:23 +0800 Subject: [PATCH 05/10] add all confignodes/datanodes start/stop --- .../iotdb/it/env/cluster/env/AbstractEnv.java | 37 ++++++++++++++----- .../org/apache/iotdb/itbase/env/BaseEnv.java | 4 ++ .../it/cluster/IoTDBClusterRestartIT.java | 16 ++------ .../apache/iotdb/db/it/utils/TestUtils.java | 21 +++-------- .../iotdb/pipe/it/IoTDBPipeClusterIT.java | 27 +++----------- .../iotdb/pipe/it/IoTDBPipeLifeCycleIT.java | 25 +++---------- 6 files changed, 51 insertions(+), 79 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index c605c71cb7bb..4dac8107b2e9 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -76,8 +76,7 @@ public abstract class AbstractEnv implements BaseEnv { protected String testMethodName = null; protected int index = 0; protected long startTime; - protected int testWorkingRetryCount = 30; - + protected int retryCount = 30; private IClientManager clientManager; /** @@ -132,11 +131,11 @@ public List getMetricPrometheusReporterContents() { } protected void initEnvironment(int configNodesNum, int dataNodesNum) { - initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount); + initEnvironment(configNodesNum, dataNodesNum, retryCount); } - protected void initEnvironment(int configNodesNum, int dataNodesNum, int testWorkingRetryCount) { - this.testWorkingRetryCount = testWorkingRetryCount; + protected void initEnvironment(int configNodesNum, int dataNodesNum, int retryCount) { + this.retryCount = retryCount; this.configNodeWrapperList = new ArrayList<>(); this.dataNodeWrapperList = new ArrayList<>(); @@ -289,7 +288,7 @@ public boolean checkClusterStatus(Predicate> statusCheck) { TShowClusterResp showClusterResp; Exception lastException = null; boolean flag; - for (int i = 0; i < testWorkingRetryCount; i++) { + for (int i = 0; i < retryCount; i++) { try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) { flag = true; @@ -332,7 +331,7 @@ public boolean checkClusterStatus(Predicate> statusCheck) { lastException.getMessage(), lastException); } - logger.info("checkNodeHeartbeat failed after {} retries", testWorkingRetryCount); + logger.info("checkNodeHeartbeat failed after {} retries", retryCount); return false; } @@ -533,7 +532,7 @@ public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() throws IOException, InterruptedException { Exception lastException = null; ConfigNodeWrapper lastErrorNode = null; - for (int i = 0; i < testWorkingRetryCount; i++) { + for (int i = 0; i < retryCount; i++) { for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) { try { lastErrorNode = configNodeWrapper; @@ -573,11 +572,29 @@ public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() } } + @Override + public IConfigNodeRPCService.Iface getConfigNodeConnection(int index) throws Exception { + Exception lastException = null; + ConfigNodeWrapper configNodeWrapper = configNodeWrapperList.get(index); + for (int i = 0; i < 30; i++) { + try { + return clientManager.borrowClient( + new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getPort())); + } catch (Exception e) { + lastException = e; + } + // Sleep 1s before next retry + TimeUnit.SECONDS.sleep(1); + } + throw new IOException( + "Failed to get connection to this ConfigNode. Last error: " + lastException); + } + @Override public int getLeaderConfigNodeIndex() throws IOException, InterruptedException { Exception lastException = null; ConfigNodeWrapper lastErrorNode = null; - for (int retry = 0; retry < testWorkingRetryCount; retry++) { + for (int retry = 0; retry < retryCount; retry++) { for (int configNodeId = 0; configNodeId < configNodeWrapperList.size(); configNodeId++) { ConfigNodeWrapper configNodeWrapper = configNodeWrapperList.get(configNodeId); lastErrorNode = configNodeWrapper; @@ -781,7 +798,7 @@ public void shutdownAllDataNodes() { public void ensureNodeStatus(List nodes, List targetStatus) throws IllegalStateException { Throwable lastException = null; - for (int i = 0; i < testWorkingRetryCount; i++) { + for (int i = 0; i < retryCount; i++) { try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { List errorMessages = new ArrayList<>(nodes.size()); diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java index 9ff51830de0b..ad7b598e2e3a 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java @@ -142,6 +142,10 @@ IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() */ int getLeaderConfigNodeIndex() throws IOException, InterruptedException; + default IConfigNodeRPCService.Iface getConfigNodeConnection(int index) throws Exception { + throw new UnsupportedOperationException(); + } + /** Start an existed ConfigNode. */ void startConfigNode(int index); diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java index 39a73ea6ac43..b04a9fc3d7c9 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java @@ -150,9 +150,7 @@ public void clusterRestartAfterUpdateDataNodeTest() dataPartitionTableResp.getDataPartitionTable()); // Shutdown all DataNodes - for (int i = 0; i < testDataNodeNum; i++) { - EnvFactory.getEnv().shutdownDataNode(i); - } + EnvFactory.getEnv().shutdownAllDataNodes(); TimeUnit.SECONDS.sleep(1); List dataNodeWrapperList = EnvFactory.getEnv().getDataNodeWrapperList(); @@ -217,20 +215,14 @@ public void clusterRestartAfterUpdateDataNodeTest() @Test public void clusterRestartWithoutSeedConfigNode() { // shutdown all ConfigNodes and DataNodes - for (int i = testConfigNodeNum - 1; i >= 0; i--) { - EnvFactory.getEnv().shutdownConfigNode(i); - } - for (int i = testDataNodeNum - 1; i >= 0; i--) { - EnvFactory.getEnv().shutdownDataNode(i); - } + EnvFactory.getEnv().shutdownAllConfigNodes(); + EnvFactory.getEnv().shutdownAllDataNodes(); logger.info("Shutdown all ConfigNodes and DataNodes"); // restart without seed ConfigNode, the cluster should still work for (int i = 1; i < testConfigNodeNum; i++) { EnvFactory.getEnv().startConfigNode(i); } - for (int i = 0; i < testDataNodeNum; i++) { - EnvFactory.getEnv().startDataNode(i); - } + EnvFactory.getEnv().startAllDataNodes(); logger.info("Restarted"); Assert.assertTrue( ((AbstractEnv) EnvFactory.getEnv()).checkClusterStatusOneUnknownOtherRunning()); diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java index 9a89788838e3..a87e3282f235 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java @@ -631,21 +631,12 @@ public static void revokeUserSeriesPrivilege( } } - public static void restartCluster(BaseEnv env) throws Exception { - for (int i = 0; i < env.getConfigNodeWrapperList().size(); ++i) { - env.shutdownConfigNode(i); - } - for (int i = 0; i < env.getDataNodeWrapperList().size(); ++i) { - env.shutdownDataNode(i); - } - TimeUnit.SECONDS.sleep(1); - for (int i = 0; i < env.getConfigNodeWrapperList().size(); ++i) { - env.startConfigNode(i); - } - for (int i = 0; i < env.getDataNodeWrapperList().size(); ++i) { - env.startDataNode(i); - } - ((AbstractEnv) env).checkClusterStatusWithoutUnknown(); + public static boolean restartCluster(BaseEnv env){ + env.shutdownAllDataNodes(); + env.shutdownAllConfigNodes(); + env.startAllConfigNodes(); + env.startAllDataNodes(); + return ((AbstractEnv) env).checkClusterStatusWithoutUnknown(); } public static void assertDataOnEnv( diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java index 7f2722608fb5..0639497a6b1a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java @@ -261,14 +261,8 @@ public void testPipeAfterDataRegionLeaderStop() throws Exception { "count(root.db.d1.s1),", Collections.singleton("2,")); } - - try { - TestUtils.restartCluster(senderEnv); - TestUtils.restartCluster(receiverEnv); - } catch (Exception e) { - e.printStackTrace(); - return; - } + Assert.assertTrue(TestUtils.restartCluster(senderEnv)); + Assert.assertTrue(TestUtils.restartCluster(receiverEnv)); try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { @@ -363,13 +357,8 @@ public void testPipeAfterRegisterNewDataNode() throws Exception { Collections.singleton("2,")); } - try { - TestUtils.restartCluster(senderEnv); - TestUtils.restartCluster(receiverEnv); - } catch (Exception e) { - e.printStackTrace(); - return; - } + Assert.assertTrue(TestUtils.restartCluster(senderEnv)); + Assert.assertTrue(TestUtils.restartCluster(receiverEnv)); try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { @@ -695,13 +684,7 @@ public void testSenderRestartWhenTransferring() throws Exception { return; } - try { - TestUtils.restartCluster(senderEnv); - } catch (Exception e) { - e.printStackTrace(); - return; - } - + Assert.assertTrue(TestUtils.restartCluster(senderEnv)); TestUtils.assertDataOnEnv( receiverEnv, "select count(*) from root.**", diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java index aadf05a9b6b6..473b9ea80867 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java @@ -427,13 +427,8 @@ public void testLifeCycleWithClusterRestart() throws Exception { receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet); } - try { - TestUtils.restartCluster(senderEnv); - TestUtils.restartCluster(receiverEnv); - } catch (Exception e) { - e.printStackTrace(); - return; - } + Assert.assertTrue(TestUtils.restartCluster(senderEnv)); + Assert.assertTrue(TestUtils.restartCluster(receiverEnv)); try (SyncConfigNodeIServiceClient ignored = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { @@ -496,12 +491,7 @@ public void testReceiverRestartWhenTransferring() throws Exception { }); t.start(); - try { - TestUtils.restartCluster(receiverEnv); - } catch (Exception e) { - e.printStackTrace(); - return; - } + Assert.assertTrue(TestUtils.restartCluster(receiverEnv)); t.join(); TestUtils.assertDataOnEnv( @@ -674,13 +664,8 @@ public void testDoubleLiving() throws Exception { TestUtils.assertDataOnEnv( receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet); - try { - TestUtils.restartCluster(senderEnv); - TestUtils.restartCluster(receiverEnv); - } catch (Exception e) { - e.printStackTrace(); - return; - } + Assert.assertTrue(TestUtils.restartCluster(senderEnv)); + Assert.assertTrue(TestUtils.restartCluster(receiverEnv)); for (int i = 400; i < 500; ++i) { if (!TestUtils.tryExecuteNonQueryWithRetry( From 3477e9d121254fed997f5bb8c25d56c3c235c614 Mon Sep 17 00:00:00 2001 From: lyf <1548150065@qq.com> Date: Mon, 5 Feb 2024 18:05:33 +0800 Subject: [PATCH 06/10] add some retry --- .../iotdb/it/env/cluster/env/AbstractEnv.java | 68 +++++++++++++------ .../apache/iotdb/db/it/utils/TestUtils.java | 2 +- 2 files changed, 48 insertions(+), 22 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 4dac8107b2e9..ce469560ec1f 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -443,17 +443,29 @@ protected NodeConnection getWriteConnection( protected NodeConnection getWriteConnectionWithSpecifiedDataNode( DataNodeWrapper dataNode, Constant.Version version, String username, String password) throws SQLException { - String endpoint = dataNode.getIp() + ":" + dataNode.getPort(); - Connection writeConnection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + endpoint + getParam(version, NODE_NETWORK_TIMEOUT_MS), - System.getProperty("User", username), - System.getProperty("Password", password)); - return new NodeConnection( - endpoint, - NodeConnection.NodeRole.DATA_NODE, - NodeConnection.ConnectionRole.WRITE, - writeConnection); + for (int i = 0; i < retryCount; i++) { + try { + String endpoint = dataNode.getIp() + ":" + dataNode.getPort(); + Connection writeConnection = + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + endpoint + getParam(version, NODE_NETWORK_TIMEOUT_MS), + System.getProperty("User", username), + System.getProperty("Password", password)); + return new NodeConnection( + endpoint, + NodeConnection.NodeRole.DATA_NODE, + NodeConnection.ConnectionRole.WRITE, + writeConnection); + } catch (Exception e) { + retryCount++; + try { + TimeUnit.SECONDS.sleep(1L); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + } + throw new SQLException("Failed to get write connection"); } protected List getReadConnections( @@ -466,16 +478,30 @@ protected List getReadConnections( endpoints.add(endpoint); readConnRequestDelegate.addRequest( () -> { - Connection readConnection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + endpoint + getParam(version, NODE_NETWORK_TIMEOUT_MS), - System.getProperty("User", username), - System.getProperty("Password", password)); - return new NodeConnection( - endpoint, - NodeConnection.NodeRole.DATA_NODE, - NodeConnection.ConnectionRole.READ, - readConnection); + for (int i = 0; i < retryCount; i++) { + try { + Connection readConnection = + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + + endpoint + + getParam(version, NODE_NETWORK_TIMEOUT_MS), + System.getProperty("User", username), + System.getProperty("Password", password)); + return new NodeConnection( + endpoint, + NodeConnection.NodeRole.DATA_NODE, + NodeConnection.ConnectionRole.READ, + readConnection); + } catch (Exception e) { + retryCount++; + try { + TimeUnit.SECONDS.sleep(1L); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + } + return null; }); } return readConnRequestDelegate.requestAll(); diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java index a87e3282f235..5062f6e9ee9d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java @@ -631,7 +631,7 @@ public static void revokeUserSeriesPrivilege( } } - public static boolean restartCluster(BaseEnv env){ + public static boolean restartCluster(BaseEnv env) { env.shutdownAllDataNodes(); env.shutdownAllConfigNodes(); env.startAllConfigNodes(); From c276d0db47d4e20041b3106870ee94a2863e20a7 Mon Sep 17 00:00:00 2001 From: lyf <1548150065@qq.com> Date: Tue, 6 Feb 2024 11:04:36 +0800 Subject: [PATCH 07/10] fix it --- .../iotdb/it/env/cluster/env/AbstractEnv.java | 123 +++++++++++------- 1 file changed, 74 insertions(+), 49 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index ce469560ec1f..d1535d588e89 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -45,6 +45,7 @@ import org.apache.iotdb.itbase.runtime.*; import org.apache.iotdb.jdbc.Config; import org.apache.iotdb.jdbc.Constant; +import org.apache.iotdb.jdbc.IoTDBConnection; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.session.Session; @@ -265,16 +266,18 @@ private Map countNodeStatus(Map nodeStatus) { public boolean checkClusterStatusWithoutUnknown() { return checkClusterStatus( - nodeStatusMap -> nodeStatusMap.values().stream().noneMatch("Unknown"::equals)); + nodeStatusMap -> nodeStatusMap.values().stream().noneMatch("Unknown"::equals)) + && testJDBCConnection(); } public boolean checkClusterStatusOneUnknownOtherRunning() { return checkClusterStatus( - nodeStatus -> { - Map count = countNodeStatus(nodeStatus); - return count.getOrDefault("Unknown", 0) == 1 - && count.getOrDefault("Running", 0) == nodeStatus.size() - 1; - }); + nodeStatus -> { + Map count = countNodeStatus(nodeStatus); + return count.getOrDefault("Unknown", 0) == 1 + && count.getOrDefault("Running", 0) == nodeStatus.size() - 1; + }) + && testJDBCConnection(); } /** * Returns whether the all nodes' status all match the provided predicate. @@ -443,29 +446,17 @@ protected NodeConnection getWriteConnection( protected NodeConnection getWriteConnectionWithSpecifiedDataNode( DataNodeWrapper dataNode, Constant.Version version, String username, String password) throws SQLException { - for (int i = 0; i < retryCount; i++) { - try { - String endpoint = dataNode.getIp() + ":" + dataNode.getPort(); - Connection writeConnection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + endpoint + getParam(version, NODE_NETWORK_TIMEOUT_MS), - System.getProperty("User", username), - System.getProperty("Password", password)); - return new NodeConnection( - endpoint, - NodeConnection.NodeRole.DATA_NODE, - NodeConnection.ConnectionRole.WRITE, - writeConnection); - } catch (Exception e) { - retryCount++; - try { - TimeUnit.SECONDS.sleep(1L); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } - } - throw new SQLException("Failed to get write connection"); + String endpoint = dataNode.getIp() + ":" + dataNode.getPort(); + Connection writeConnection = + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + endpoint + getParam(version, NODE_NETWORK_TIMEOUT_MS), + System.getProperty("User", username), + System.getProperty("Password", password)); + return new NodeConnection( + endpoint, + NodeConnection.NodeRole.DATA_NODE, + NodeConnection.ConnectionRole.WRITE, + writeConnection); } protected List getReadConnections( @@ -478,33 +469,67 @@ protected List getReadConnections( endpoints.add(endpoint); readConnRequestDelegate.addRequest( () -> { + Connection readConnection = + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + endpoint + getParam(version, NODE_NETWORK_TIMEOUT_MS), + System.getProperty("User", username), + System.getProperty("Password", password)); + return new NodeConnection( + endpoint, + NodeConnection.NodeRole.DATA_NODE, + NodeConnection.ConnectionRole.READ, + readConnection); + }); + } + return readConnRequestDelegate.requestAll(); + } + + // use this to avoid some runtimeExceptions when try to get jdbc connections. + // because it is hard to add retry when getting jdbc connections in + // getWriteConnectionWithSpecifiedDataNode and getReadConnections. + // so use this function to add retry when cluster is ready. + protected boolean testJDBCConnection() { + logger.info("Testing JDBC connection..."); + List endpoints = + dataNodeWrapperList.stream() + .map(DataNodeWrapper::getIpAndPortString) + .collect(Collectors.toList()); + RequestDelegate testDelegate = + new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT); + for (DataNodeWrapper dataNode : dataNodeWrapperList) { + final String dataNodeEndpoint = dataNode.getIpAndPortString(); + testDelegate.addRequest( + () -> { + Exception lastException = null; for (int i = 0; i < retryCount; i++) { - try { - Connection readConnection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX - + endpoint - + getParam(version, NODE_NETWORK_TIMEOUT_MS), - System.getProperty("User", username), - System.getProperty("Password", password)); - return new NodeConnection( - endpoint, - NodeConnection.NodeRole.DATA_NODE, - NodeConnection.ConnectionRole.READ, - readConnection); + try (IoTDBConnection ignored = + (IoTDBConnection) + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + + dataNodeEndpoint + + getParam(null, NODE_NETWORK_TIMEOUT_MS), + System.getProperty("User", "root"), + System.getProperty("Password", "root"))) { + logger.info("Successfully connecting to DataNode: {}.", dataNodeEndpoint); + return null; } catch (Exception e) { - retryCount++; - try { - TimeUnit.SECONDS.sleep(1L); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } + lastException = e; + TimeUnit.SECONDS.sleep(1L); } } + if (lastException != null) { + throw lastException; + } return null; }); } - return readConnRequestDelegate.requestAll(); + try { + testDelegate.requestAll(); + } catch (Exception e) { + logger.error("Failed to connect to DataNode", e); + return false; + } + return true; } private String getParam(Constant.Version version, int timeout) { From 89090f82f383f5f3348c2b30de0f924a7f96b02d Mon Sep 17 00:00:00 2001 From: lyf <1548150065@qq.com> Date: Tue, 6 Feb 2024 11:05:55 +0800 Subject: [PATCH 08/10] spotless --- .../java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index d1535d588e89..eb64870678fe 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -485,7 +485,7 @@ protected List getReadConnections( } // use this to avoid some runtimeExceptions when try to get jdbc connections. - // because it is hard to add retry when getting jdbc connections in + // because it is hard to add retry and handle exception when getting jdbc connections in // getWriteConnectionWithSpecifiedDataNode and getReadConnections. // so use this function to add retry when cluster is ready. protected boolean testJDBCConnection() { From 3cbb9bb6af270b2922ae9fcfb31fe94a173aa12c Mon Sep 17 00:00:00 2001 From: lyf <1548150065@qq.com> Date: Tue, 6 Feb 2024 14:38:41 +0800 Subject: [PATCH 09/10] fix by review --- .../org/apache/iotdb/it/env/cluster/env/AbstractEnv.java | 2 +- .../confignode/it/cluster/IoTDBClusterRestartIT.java | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index eb64870678fe..286460a0cae1 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -280,7 +280,7 @@ public boolean checkClusterStatusOneUnknownOtherRunning() { && testJDBCConnection(); } /** - * Returns whether the all nodes' status all match the provided predicate. + * Returns whether the all nodes' status all match the provided predicate. check nodes with RPC * * @param statusCheck the predicate to test the status of nodes * @return {@code true} if all nodes' status of the cluster match the provided predicate, diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java index b04a9fc3d7c9..f05274f12895 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java @@ -215,8 +215,12 @@ public void clusterRestartAfterUpdateDataNodeTest() @Test public void clusterRestartWithoutSeedConfigNode() { // shutdown all ConfigNodes and DataNodes - EnvFactory.getEnv().shutdownAllConfigNodes(); - EnvFactory.getEnv().shutdownAllDataNodes(); + for (int i = testConfigNodeNum - 1; i >= 0; i--) { + EnvFactory.getEnv().shutdownConfigNode(i); + } + for (int i = testDataNodeNum - 1; i >= 0; i--) { + EnvFactory.getEnv().shutdownDataNode(i); + } logger.info("Shutdown all ConfigNodes and DataNodes"); // restart without seed ConfigNode, the cluster should still work for (int i = 1; i < testConfigNodeNum; i++) { From ce690c91e8de81cde35e006bf605c89df5d2e23d Mon Sep 17 00:00:00 2001 From: lyf <1548150065@qq.com> Date: Tue, 6 Feb 2024 14:50:32 +0800 Subject: [PATCH 10/10] fix by review --- .../org/apache/iotdb/it/env/cluster/env/AbstractEnv.java | 6 ++++++ .../org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java | 5 +++++ .../src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java | 2 ++ 3 files changed, 13 insertions(+) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 286460a0cae1..b7e4c4dcb50b 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -899,6 +899,12 @@ public void ensureNodeStatus(List nodes, List targe throw new IllegalStateException(lastException); } + @Override + public int getMqttPort() { + int randomIndex = new Random(System.currentTimeMillis()).nextInt(dataNodeWrapperList.size()); + return dataNodeWrapperList.get(randomIndex).getMqttPort(); + } + @Override public String getIP() { return dataNodeWrapperList.get(0).getIp(); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java index 543fec5bbdb4..25a9ac319ca1 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java @@ -324,6 +324,11 @@ public void shutdownAllDataNodes() { throw new UnsupportedOperationException(); } + @Override + public int getMqttPort() { + throw new UnsupportedOperationException(); + } + @Override public String getIP() { return ip_addr; diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java index ad7b598e2e3a..120b0c1a8105 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java @@ -222,6 +222,8 @@ void ensureNodeStatus(List nodes, List targetStatus /** Shutdown all existed DataNodes. */ void shutdownAllDataNodes(); + int getMqttPort(); + String getIP(); String getPort();