diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MultiEnvFactory.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MultiEnvFactory.java index f8c28567be1c..5832f1c485bc 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/MultiEnvFactory.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MultiEnvFactory.java @@ -38,24 +38,25 @@ private MultiEnvFactory() { // Empty constructor } - public static void setTestMethodName(String testMethodName) { + public static void setTestMethodName(final String testMethodName) { currentMethodName = testMethodName; + envList.forEach(baseEnv -> baseEnv.setTestMethodName(testMethodName)); } /** Get an environment with the specific index. */ - public static BaseEnv getEnv(int index) throws IndexOutOfBoundsException { + public static BaseEnv getEnv(final int index) throws IndexOutOfBoundsException { return envList.get(index); } /** Create several environments according to the specific number. */ - public static void createEnv(int num) { + public static void createEnv(final int num) { // Not judge EnvType for individual test convenience - long startTime = System.currentTimeMillis(); + final long startTime = System.currentTimeMillis(); for (int i = 0; i < num; ++i) { try { Class.forName(Config.JDBC_DRIVER_NAME); envList.add(new MultiClusterEnv(startTime, i, currentMethodName)); - } catch (ClassNotFoundException e) { + } catch (final ClassNotFoundException e) { logger.error("Create env error", e); System.exit(-1); } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java index b3802f3ea8d8..147b57f2f67a 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java @@ -115,9 +115,6 @@ public class ClusterConstant { "strongConsistencyClusterMode.dataRegionReplicaNumber"; // Property file names - public static final String CONFIG_NODE_PROPERTIES_FILE = "iotdb-confignode.properties"; - public static final String DATA_NODE_PROPERTIES_FILE = "iotdb-datanode.properties"; - public static final String COMMON_PROPERTIES_FILE = "iotdb-common.properties"; public static final String IOTDB_SYSTEM_PROPERTIES_FILE = "iotdb-system.properties"; // Properties' keys @@ -142,10 +139,7 @@ public class ClusterConstant { // ConfigNode public static final String CN_SYSTEM_DIR = "cn_system_dir"; public static final String CN_CONSENSUS_DIR = "cn_consensus_dir"; - public static final String CN_METRIC_PROMETHEUS_REPORTER_PORT = - "cn_metric_prometheus_reporter_port"; public static final String CN_METRIC_IOTDB_REPORTER_HOST = "cn_metric_iotdb_reporter_host"; - public static final String CN_METRIC_IOTDB_REPORTER_PORT = "cn_metric_iotdb_reporter_port"; public static final String CN_CONNECTION_TIMEOUT_MS = "cn_connection_timeout_ms"; @@ -157,13 +151,10 @@ public class ClusterConstant { public static final String DN_TRACING_DIR = "dn_tracing_dir"; public static final String DN_SYNC_DIR = "dn_sync_dir"; public static final String DN_METRIC_IOTDB_REPORTER_HOST = "dn_metric_iotdb_reporter_host"; - public static final String DN_METRIC_PROMETHEUS_REPORTER_PORT = - "dn_metric_prometheus_reporter_port"; public static final String DN_MPP_DATA_EXCHANGE_PORT = "dn_mpp_data_exchange_port"; public static final String DN_DATA_REGION_CONSENSUS_PORT = "dn_data_region_consensus_port"; public static final String DN_SCHEMA_REGION_CONSENSUS_PORT = "dn_schema_region_consensus_port"; - public static final String PIPE_AIR_GAP_RECEIVER_ENABLED = "pipe_air_gap_receiver_enabled"; public static final String PIPE_AIR_GAP_RECEIVER_PORT = "pipe_air_gap_receiver_port"; public static final String MAX_TSBLOCK_SIZE_IN_BYTES = "max_tsblock_size_in_bytes"; public static final String PAGE_SIZE_IN_BYTE = "page_size_in_byte"; @@ -205,7 +196,6 @@ public class ClusterConstant { // Env Constant public static final int NODE_START_TIMEOUT = 100; - public static final int PROBE_TIMEOUT_MS = 2000; public static final int NODE_NETWORK_TIMEOUT_MS = 0; public static final String ZERO_TIME_ZONE = "GMT+0"; diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java index f3c9527e5952..9663fa371e95 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java @@ -70,22 +70,22 @@ public static int[] searchAvailablePorts() { while (true) { int randomPortStart = 1000 + (int) (Math.random() * (1999 - 1000)); randomPortStart = randomPortStart * (length + 1) + 1; - String lockFilePath = getLockFilePath(randomPortStart); - File lockFile = new File(lockFilePath); + final String lockFilePath = getLockFilePath(randomPortStart); + final File lockFile = new File(lockFilePath); try { // Lock the ports first to avoid to be occupied by other ForkedBooters during ports // available detecting if (!lockFile.createNewFile()) { continue; } - List requiredPorts = + final List requiredPorts = IntStream.rangeClosed(randomPortStart, randomPortStart + length) .boxed() .collect(Collectors.toList()); if (checkPortsAvailable(requiredPorts)) { return requiredPorts.stream().mapToInt(Integer::intValue).toArray(); } - } catch (IOException e) { + } catch (final IOException ignore) { // ignore } // Delete the lock file if the ports can't be used or some error happens @@ -95,39 +95,35 @@ public static int[] searchAvailablePorts() { } } - private static boolean checkPortsAvailable(List ports) { - String cmd = getSearchAvailablePortCmd(ports); + private static boolean checkPortsAvailable(final List ports) { + final String cmd = getSearchAvailablePortCmd(ports); try { - Process proc = Runtime.getRuntime().exec(cmd); - return proc.waitFor() == 1; - } catch (IOException e) { + return Runtime.getRuntime().exec(cmd).waitFor() == 1; + } catch (final IOException ignore) { // ignore - } catch (InterruptedException e) { + } catch (final InterruptedException e) { Thread.currentThread().interrupt(); } return false; } - private static String getSearchAvailablePortCmd(List ports) { - if (SystemUtils.IS_OS_WINDOWS) { - return getWindowsSearchPortCmd(ports); - } - return getUnixSearchPortCmd(ports); + private static String getSearchAvailablePortCmd(final List ports) { + return SystemUtils.IS_OS_WINDOWS ? getWindowsSearchPortCmd(ports) : getUnixSearchPortCmd(ports); } - private static String getWindowsSearchPortCmd(List ports) { - String cmd = "netstat -aon -p tcp | findStr "; - return cmd + private static String getWindowsSearchPortCmd(final List ports) { + return "netstat -aon -p tcp | findStr " + ports.stream().map(v -> "/C:'127.0.0.1:" + v + "'").collect(Collectors.joining(" ")); } - private static String getUnixSearchPortCmd(List ports) { - String cmd = "lsof -iTCP -sTCP:LISTEN -P -n | awk '{print $9}' | grep -E "; - return cmd + ports.stream().map(String::valueOf).collect(Collectors.joining("|")) + "\""; + private static String getUnixSearchPortCmd(final List ports) { + return "lsof -iTCP -sTCP:LISTEN -P -n | awk '{print $9}' | grep -E " + + ports.stream().map(String::valueOf).collect(Collectors.joining("|")) + + "\""; } - private static Pair getClusterNodesNum(int index) { - String valueStr = System.getProperty(CLUSTER_CONFIGURATIONS); + private static Pair getClusterNodesNum(final int index) { + final String valueStr = System.getProperty(CLUSTER_CONFIGURATIONS); if (valueStr == null) { return null; } @@ -154,17 +150,17 @@ private static Pair getClusterNodesNum(int index) { // Print nothing to avoid polluting test outputs return null; } - } catch (NumberFormatException ignore) { + } catch (final NumberFormatException ignore) { return null; } } - public static String getLockFilePath(int port) { + public static String getLockFilePath(final int port) { return LOCK_FILE_PATH + port; } public static Pair getNodeNum() { - Pair nodesNum = getClusterNodesNum(0); + final Pair nodesNum = getClusterNodesNum(0); if (nodesNum != null) { return nodesNum; } @@ -173,8 +169,8 @@ public static Pair getNodeNum() { getIntFromSysVar(DEFAULT_DATA_NODE_NUM, 3, 0)); } - public static Pair getNodeNum(int index) { - Pair nodesNum = getClusterNodesNum(index); + public static Pair getNodeNum(final int index) { + final Pair nodesNum = getClusterNodesNum(index); if (nodesNum != null) { return nodesNum; } @@ -183,38 +179,38 @@ public static Pair getNodeNum(int index) { getIntFromSysVar(DEFAULT_DATA_NODE_NUM, 3, index)); } - public static String getFilePathFromSysVar(String key, int index) { - String valueStr = System.getProperty(key); + public static String getFilePathFromSysVar(final String key, final int index) { + final String valueStr = System.getProperty(key); if (valueStr == null) { return null; } return System.getProperty(USER_DIR) + getValueOfIndex(valueStr, index); } - public static int getIntFromSysVar(String key, int defaultValue, int index) { - String valueStr = System.getProperty(key); + public static int getIntFromSysVar(final String key, final int defaultValue, final int index) { + final String valueStr = System.getProperty(key); if (valueStr == null) { return defaultValue; } - String value = getValueOfIndex(valueStr, index); + final String value = getValueOfIndex(valueStr, index); try { return Integer.parseInt(value); - } catch (NumberFormatException e) { + } catch (final NumberFormatException e) { throw new IllegalArgumentException("Invalid property value: " + value + " of key " + key); } } - public static String getValueOfIndex(String valueStr, int index) { - String[] values = valueStr.split(DELIMITER); + public static String getValueOfIndex(final String valueStr, final int index) { + final String[] values = valueStr.split(DELIMITER); return index <= values.length - 1 ? values[index] : values[values.length - 1]; } - public static String getTimeForLogDirectory(long startTime) { + public static String getTimeForLogDirectory(final long startTime) { return convertLongToDate(startTime, "ms").replace(":", DIR_TIME_REPLACEMENT); } - public static String fromConsensusFullNameToAbbr(String consensus) { + public static String fromConsensusFullNameToAbbr(final String consensus) { switch (consensus) { case SIMPLE_CONSENSUS: return SIMPLE_CONSENSUS_STR; @@ -233,7 +229,7 @@ public static String fromConsensusFullNameToAbbr(String consensus) { } } - public static String fromConsensusAbbrToFullName(String consensus) { + public static String fromConsensusAbbrToFullName(final String consensus) { switch (consensus) { case SIMPLE_CONSENSUS_STR: return SIMPLE_CONSENSUS; diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 482e869cf1c3..2b2a262dbd2a 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -19,9 +19,7 @@ package org.apache.iotdb.it.env.cluster.env; -import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; -import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.ClientPoolFactory; import org.apache.iotdb.commons.client.IClientManager; @@ -104,7 +102,7 @@ protected AbstractEnv() { } // For multiple environment ITs, time must be consistent across environments. - protected AbstractEnv(long startTime) { + protected AbstractEnv(final long startTime) { this.startTime = startTime; this.clusterConfig = new MppClusterConfig(); } @@ -116,10 +114,10 @@ public ClusterConfig getConfig() { @Override public List getMetricPrometheusReporterContents() { - List result = new ArrayList<>(); + final List result = new ArrayList<>(); // get all report content of confignodes - for (ConfigNodeWrapper configNode : this.configNodeWrapperList) { - String configNodeMetricContent = + for (final ConfigNodeWrapper configNode : this.configNodeWrapperList) { + final String configNodeMetricContent = getUrlContent( Config.IOTDB_HTTP_URL_PREFIX + configNode.getIp() @@ -129,8 +127,8 @@ public List getMetricPrometheusReporterContents() { result.add(configNodeMetricContent); } // get all report content of datanodes - for (DataNodeWrapper dataNode : this.dataNodeWrapperList) { - String dataNodeMetricContent = + for (final DataNodeWrapper dataNode : this.dataNodeWrapperList) { + final String dataNodeMetricContent = getUrlContent( Config.IOTDB_HTTP_URL_PREFIX + dataNode.getIp() @@ -142,16 +140,20 @@ public List getMetricPrometheusReporterContents() { return result; } - protected void initEnvironment(int configNodesNum, int dataNodesNum) { + protected void initEnvironment(final int configNodesNum, final int dataNodesNum) { initEnvironment(configNodesNum, dataNodesNum, retryCount); } - protected void initEnvironment(int configNodesNum, int dataNodesNum, int testWorkingRetryCount) { + protected void initEnvironment( + final int configNodesNum, final int dataNodesNum, final int testWorkingRetryCount) { initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount, false); } protected void initEnvironment( - int configNodesNum, int dataNodesNum, int retryCount, boolean addAINode) { + final int configNodesNum, + final int dataNodesNum, + final int retryCount, + final boolean addAINode) { this.retryCount = retryCount; this.configNodeWrapperList = new ArrayList<>(); this.dataNodeWrapperList = new ArrayList<>(); @@ -162,7 +164,7 @@ protected void initEnvironment( final String testClassName = getTestClassName(); - ConfigNodeWrapper seedConfigNodeWrapper = + final ConfigNodeWrapper seedConfigNodeWrapper = new ConfigNodeWrapper( true, "", @@ -180,22 +182,23 @@ protected void initEnvironment( seedConfigNodeWrapper.createLogDir(); seedConfigNodeWrapper.setKillPoints(configNodeKillPoints); seedConfigNodeWrapper.start(); - String seedConfigNode = seedConfigNodeWrapper.getIpAndPortString(); + final String seedConfigNode = seedConfigNodeWrapper.getIpAndPortString(); this.configNodeWrapperList.add(seedConfigNodeWrapper); // Check if the Seed-ConfigNode started successfully - try (SyncConfigNodeIServiceClient ignored = + try (final SyncConfigNodeIServiceClient ignored = (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) { // Do nothing logger.info("The Seed-ConfigNode started successfully!"); - } catch (Exception e) { + } catch (final Exception e) { logger.error("Failed to get connection to the Seed-ConfigNode", e); } - List configNodeEndpoints = new ArrayList<>(); - RequestDelegate configNodesDelegate = new SerialRequestDelegate<>(configNodeEndpoints); + final List configNodeEndpoints = new ArrayList<>(); + final RequestDelegate configNodesDelegate = + new SerialRequestDelegate<>(configNodeEndpoints); for (int i = 1; i < configNodesNum; i++) { - ConfigNodeWrapper configNodeWrapper = + final ConfigNodeWrapper configNodeWrapper = new ConfigNodeWrapper( false, seedConfigNode, @@ -222,16 +225,16 @@ protected void initEnvironment( } try { configNodesDelegate.requestAll(); - } catch (SQLException e) { + } catch (final SQLException e) { logger.error("Start configNodes failed", e); throw new AssertionError(); } - List dataNodeEndpoints = new ArrayList<>(); - RequestDelegate dataNodesDelegate = + final List dataNodeEndpoints = new ArrayList<>(); + final RequestDelegate dataNodesDelegate = new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT); for (int i = 0; i < dataNodesNum; i++) { - DataNodeWrapper dataNodeWrapper = + final DataNodeWrapper dataNodeWrapper = new DataNodeWrapper( seedConfigNode, testClassName, @@ -258,7 +261,7 @@ protected void initEnvironment( try { dataNodesDelegate.requestAll(); - } catch (SQLException e) { + } catch (final SQLException e) { logger.error("Start dataNodes failed", e); throw new AssertionError(); } @@ -271,9 +274,9 @@ protected void initEnvironment( checkClusterStatusWithoutUnknown(); } - private void startAINode(String seedConfigNode, String testClassName) { - String aiNodeEndPoint; - AINodeWrapper aiNodeWrapper = + private void startAINode(final String seedConfigNode, final String testClassName) { + final String aiNodeEndPoint; + final AINodeWrapper aiNodeWrapper = new AINodeWrapper( seedConfigNode, testClassName, @@ -285,29 +288,29 @@ private void startAINode(String seedConfigNode, String testClassName) { aiNodeEndPoint = aiNodeWrapper.getIpAndPortString(); aiNodeWrapper.createNodeDir(); aiNodeWrapper.createLogDir(); - RequestDelegate AINodesDelegate = + final RequestDelegate aiNodesDelegate = new ParallelRequestDelegate<>( Collections.singletonList(aiNodeEndPoint), NODE_START_TIMEOUT); - AINodesDelegate.addRequest( + aiNodesDelegate.addRequest( () -> { aiNodeWrapper.start(); return null; }); try { - AINodesDelegate.requestAll(); - } catch (SQLException e) { + aiNodesDelegate.requestAll(); + } catch (final SQLException e) { logger.error("Start aiNodes failed", e); } } public String getTestClassName() { - StackTraceElement[] stack = Thread.currentThread().getStackTrace(); - for (StackTraceElement stackTraceElement : stack) { - String className = stackTraceElement.getClassName(); + final StackTraceElement[] stack = Thread.currentThread().getStackTrace(); + for (final StackTraceElement stackTraceElement : stack) { + final String className = stackTraceElement.getClassName(); if (className.endsWith("IT")) { - String result = className.substring(className.lastIndexOf(".") + 1); + final String result = className.substring(className.lastIndexOf(".") + 1); if (!result.startsWith("Abstract")) { return result; } @@ -316,8 +319,8 @@ public String getTestClassName() { return "UNKNOWN-IT"; } - private Map countNodeStatus(Map nodeStatus) { - Map result = new HashMap<>(); + private Map countNodeStatus(final Map nodeStatus) { + final Map result = new HashMap<>(); nodeStatus.values().forEach(status -> result.put(status, result.getOrDefault(status, 0) + 1)); return result; } @@ -344,13 +347,13 @@ public void checkClusterStatusOneUnknownOtherRunning() { * * @param statusCheck the predicate to test the status of nodes */ - public void checkClusterStatus(Predicate> statusCheck) { + public void checkClusterStatus(final Predicate> statusCheck) { logger.info("Testing cluster environment..."); TShowClusterResp showClusterResp; Exception lastException = null; boolean flag; for (int i = 0; i < retryCount; i++) { - try (SyncConfigNodeIServiceClient client = + try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) { flag = true; showClusterResp = client.showCluster(); @@ -370,20 +373,19 @@ public void checkClusterStatus(Predicate> statusCheck) { // Check the status of nodes if (flag) { - Map nodeStatus = showClusterResp.getNodeStatus(); - flag = statusCheck.test(nodeStatus); + flag = statusCheck.test(showClusterResp.getNodeStatus()); } if (flag) { logger.info("The cluster is now ready for testing!"); return; } - } catch (Exception e) { + } catch (final Exception e) { lastException = e; } try { TimeUnit.SECONDS.sleep(1L); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { lastException = e; Thread.currentThread().interrupt(); } @@ -400,7 +402,7 @@ public void checkClusterStatus(Predicate> statusCheck) { @Override public void cleanClusterEnvironment() { - List allNodeWrappers = + final List allNodeWrappers = Stream.concat( dataNodeWrapperList.stream(), Stream.concat(configNodeWrapperList.stream(), aiNodeWrapperList.stream())) @@ -409,10 +411,10 @@ public void cleanClusterEnvironment() { .findAny() .ifPresent( nodeWrapper -> logger.info("You can find logs at {}", nodeWrapper.getLogDirPath())); - for (AbstractNodeWrapper nodeWrapper : allNodeWrappers) { + for (final AbstractNodeWrapper nodeWrapper : allNodeWrappers) { nodeWrapper.stopForcibly(); nodeWrapper.destroyDir(); - String lockPath = EnvUtils.getLockFilePath(nodeWrapper.getPort()); + final String lockPath = EnvUtils.getLockFilePath(nodeWrapper.getPort()); if (!new File(lockPath).delete()) { logger.error("Delete lock file {} failed", lockPath); } @@ -425,8 +427,8 @@ public void cleanClusterEnvironment() { } @Override - public Connection getConnection(String username, String password, String sqlDialect) - throws SQLException { + public Connection getConnection( + final String username, final String password, final String sqlDialect) throws SQLException { return new ClusterTestConnection( getWriteConnection(null, username, password, sqlDialect), getReadConnections(null, username, password, sqlDialect)); @@ -434,7 +436,8 @@ public Connection getConnection(String username, String password, String sqlDial @Override public Connection getWriteOnlyConnectionWithSpecifiedDataNode( - DataNodeWrapper dataNode, String username, String password) throws SQLException { + final DataNodeWrapper dataNode, final String username, final String password) + throws SQLException { return new ClusterTestConnection( getWriteConnectionWithSpecifiedDataNode( dataNode, null, username, password, TREE_SQL_DIALECT), @@ -443,7 +446,8 @@ public Connection getWriteOnlyConnectionWithSpecifiedDataNode( @Override public Connection getConnectionWithSpecifiedDataNode( - DataNodeWrapper dataNode, String username, String password) throws SQLException { + final DataNodeWrapper dataNode, final String username, final String password) + throws SQLException { return new ClusterTestConnection( getWriteConnectionWithSpecifiedDataNode( dataNode, null, username, password, TREE_SQL_DIALECT), @@ -452,22 +456,23 @@ public Connection getConnectionWithSpecifiedDataNode( @Override public Connection getConnection( - Constant.Version version, String username, String password, String sqlDialect) + final Constant.Version version, + final String username, + final String password, + final String sqlDialect) throws SQLException { - if (System.getProperty("ReadAndVerifyWithMultiNode", "true").equalsIgnoreCase("true")) { - return new ClusterTestConnection( - getWriteConnection(version, username, password, sqlDialect), - getReadConnections(version, username, password, sqlDialect)); - } else { - return getWriteConnection(version, username, password, sqlDialect).getUnderlyingConnecton(); - } + return System.getProperty("ReadAndVerifyWithMultiNode", "true").equalsIgnoreCase("true") + ? new ClusterTestConnection( + getWriteConnection(version, username, password, sqlDialect), + getReadConnections(version, username, password, sqlDialect)) + : getWriteConnection(version, username, password, sqlDialect).getUnderlyingConnecton(); } @Override - public ISession getSessionConnection(String sqlDialect) throws IoTDBConnectionException { - DataNodeWrapper dataNode = + public ISession getSessionConnection(final String sqlDialect) throws IoTDBConnectionException { + final DataNodeWrapper dataNode = this.dataNodeWrapperList.get(rand.nextInt(this.dataNodeWrapperList.size())); - Session session = + final Session session = new Session.Builder() .host(dataNode.getIp()) .port(dataNode.getPort()) @@ -478,11 +483,11 @@ public ISession getSessionConnection(String sqlDialect) throws IoTDBConnectionEx } @Override - public ISession getSessionConnectionWithDB(String sqlDialect, String database) + public ISession getSessionConnectionWithDB(final String sqlDialect, final String database) throws IoTDBConnectionException { - DataNodeWrapper dataNode = + final DataNodeWrapper dataNode = this.dataNodeWrapperList.get(rand.nextInt(this.dataNodeWrapperList.size())); - Session session = + final Session session = new Session.Builder() .host(dataNode.getIp()) .port(dataNode.getPort()) @@ -494,11 +499,12 @@ public ISession getSessionConnectionWithDB(String sqlDialect, String database) } @Override - public ISession getSessionConnection(String userName, String password, String sqlDialect) + public ISession getSessionConnection( + final String userName, final String password, final String sqlDialect) throws IoTDBConnectionException { - DataNodeWrapper dataNode = + final DataNodeWrapper dataNode = this.dataNodeWrapperList.get(rand.nextInt(this.dataNodeWrapperList.size())); - Session session = + final Session session = new Session.Builder() .host(dataNode.getIp()) .port(dataNode.getPort()) @@ -511,9 +517,9 @@ public ISession getSessionConnection(String userName, String password, String sq } @Override - public ISession getSessionConnection(List nodeUrls, String sqlDialect) + public ISession getSessionConnection(final List nodeUrls, final String sqlDialect) throws IoTDBConnectionException { - Session session = + final Session session = new Session.Builder() .nodeUrls(nodeUrls) .username(SessionConfig.DEFAULT_USER) @@ -531,8 +537,8 @@ public ISession getSessionConnection(List nodeUrls, String sqlDialect) } @Override - public ISessionPool getSessionPool(int maxSize, String sqlDialect) { - DataNodeWrapper dataNode = + public ISessionPool getSessionPool(final int maxSize, final String sqlDialect) { + final DataNodeWrapper dataNode = this.dataNodeWrapperList.get(rand.nextInt(this.dataNodeWrapperList.size())); return new SessionPool.Builder() .host(dataNode.getIp()) @@ -545,7 +551,8 @@ public ISessionPool getSessionPool(int maxSize, String sqlDialect) { } @Override - public ISessionPool getSessionPool(int maxSize, String sqlDialect, String database) { + public ISessionPool getSessionPool( + final int maxSize, final String sqlDialect, final String database) { DataNodeWrapper dataNode = this.dataNodeWrapperList.get(rand.nextInt(this.dataNodeWrapperList.size())); return new SessionPool.Builder() @@ -576,14 +583,14 @@ protected NodeConnection getWriteConnection( } protected NodeConnection getWriteConnectionWithSpecifiedDataNode( - DataNodeWrapper dataNode, - Constant.Version version, - String username, - String password, - String sqlDialect) + final DataNodeWrapper dataNode, + final Constant.Version version, + final String username, + final String password, + final String sqlDialect) throws SQLException { - String endpoint = dataNode.getIp() + ":" + dataNode.getPort(); - Connection writeConnection = + final String endpoint = dataNode.getIp() + ":" + dataNode.getPort(); + final Connection writeConnection = DriverManager.getConnection( Config.IOTDB_URL_PREFIX + endpoint @@ -597,20 +604,20 @@ protected NodeConnection getWriteConnectionWithSpecifiedDataNode( } protected NodeConnection getWriteConnectionFromDataNodeList( - List dataNodeList, - Constant.Version version, - String username, - String password, - String sqlDialect) + final List dataNodeList, + final Constant.Version version, + final String username, + final String password, + final String sqlDialect) throws SQLException { - List dataNodeWrapperListCopy = new ArrayList<>(dataNodeList); + final List dataNodeWrapperListCopy = new ArrayList<>(dataNodeList); Collections.shuffle(dataNodeWrapperListCopy); SQLException lastException = null; - for (DataNodeWrapper dataNode : dataNodeWrapperListCopy) { + for (final DataNodeWrapper dataNode : dataNodeWrapperListCopy) { try { return getWriteConnectionWithSpecifiedDataNode( dataNode, version, username, password, sqlDialect); - } catch (SQLException e) { + } catch (final SQLException e) { lastException = e; } } @@ -621,29 +628,32 @@ protected NodeConnection getWriteConnectionFromDataNodeList( } protected List getReadConnections( - Constant.Version version, String username, String password, String sqlDialect) + final Constant.Version version, + final String username, + final String password, + final String sqlDialect) throws SQLException { - List endpoints = new ArrayList<>(); - ParallelRequestDelegate readConnRequestDelegate = + final List endpoints = new ArrayList<>(); + final ParallelRequestDelegate readConnRequestDelegate = new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT); - for (DataNodeWrapper dataNodeWrapper : this.dataNodeWrapperList) { - final String endpoint = dataNodeWrapper.getIpAndPortString(); - endpoints.add(endpoint); - readConnRequestDelegate.addRequest( - () -> { - Connection readConnection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX - + endpoint - + getParam(version, NODE_NETWORK_TIMEOUT_MS, ZERO_TIME_ZONE), - BaseEnv.constructProperties(username, password, sqlDialect)); - return new NodeConnection( - endpoint, - NodeConnection.NodeRole.DATA_NODE, - NodeConnection.ConnectionRole.READ, - readConnection); - }); - } + + dataNodeWrapperList.stream() + .map(AbstractNodeWrapper::getIpAndPortString) + .forEach( + endpoint -> { + endpoints.add(endpoint); + readConnRequestDelegate.addRequest( + () -> + new NodeConnection( + endpoint, + NodeConnection.NodeRole.DATA_NODE, + NodeConnection.ConnectionRole.READ, + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + + endpoint + + getParam(version, NODE_NETWORK_TIMEOUT_MS, ZERO_TIME_ZONE), + BaseEnv.constructProperties(username, password, sqlDialect)))); + }); return readConnRequestDelegate.requestAll(); } @@ -655,19 +665,19 @@ protected List getReadConnections( // AssertionError. protected void testJDBCConnection() { logger.info("Testing JDBC connection..."); - List endpoints = + final List endpoints = dataNodeWrapperList.stream() .map(DataNodeWrapper::getIpAndPortString) .collect(Collectors.toList()); - RequestDelegate testDelegate = + final RequestDelegate testDelegate = new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT); - for (DataNodeWrapper dataNode : dataNodeWrapperList) { + for (final DataNodeWrapper dataNode : dataNodeWrapperList) { final String dataNodeEndpoint = dataNode.getIpAndPortString(); testDelegate.addRequest( () -> { Exception lastException = null; for (int i = 0; i < retryCount; i++) { - try (IoTDBConnection ignored = + try (final IoTDBConnection ignored = (IoTDBConnection) DriverManager.getConnection( Config.IOTDB_URL_PREFIX @@ -677,7 +687,7 @@ protected void testJDBCConnection() { System.getProperty("Password", "root"))) { logger.info("Successfully connecting to DataNode: {}.", dataNodeEndpoint); return null; - } catch (Exception e) { + } catch (final Exception e) { lastException = e; TimeUnit.SECONDS.sleep(1L); } @@ -690,15 +700,16 @@ protected void testJDBCConnection() { } try { testDelegate.requestAll(); - } catch (Exception e) { + } catch (final Exception e) { logger.error("exception in test Cluster with RPC, message: {}", e.getMessage(), e); throw new AssertionError( String.format("After %d times retry, the cluster can't work!", retryCount)); } } - private String getParam(Constant.Version version, int timeout, String timeZone) { - StringBuilder sb = new StringBuilder("?"); + private String getParam( + final Constant.Version version, final int timeout, final String timeZone) { + final StringBuilder sb = new StringBuilder("?"); sb.append(Config.NETWORK_TIMEOUT).append("=").append(timeout); if (version != null) { sb.append("&").append(VERSION).append("=").append(version); @@ -714,23 +725,20 @@ public String getTestMethodName() { } @Override - public void setTestMethodName(String testMethodName) { + public void setTestMethodName(final String testMethodName) { this.testMethodName = testMethodName; } @Override public void dumpTestJVMSnapshot() { - for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) { - configNodeWrapper.executeJstack(testMethodName); - } - for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) { - dataNodeWrapper.executeJstack(testMethodName); - } + configNodeWrapperList.forEach( + configNodeWrapper -> configNodeWrapper.executeJstack(testMethodName)); + dataNodeWrapperList.forEach(dataNodeWrapper -> dataNodeWrapper.executeJstack(testMethodName)); } @Override public List getNodeWrapperList() { - List result = new ArrayList<>(configNodeWrapperList); + final List result = new ArrayList<>(configNodeWrapperList); result.addAll(dataNodeWrapperList); return result; } @@ -759,13 +767,13 @@ public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() Exception lastException = null; ConfigNodeWrapper lastErrorNode = null; for (int i = 0; i < retryCount; i++) { - for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) { + for (final ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) { try { lastErrorNode = configNodeWrapper; - SyncConfigNodeIServiceClient client = + final SyncConfigNodeIServiceClient client = clientManager.borrowClient( new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getPort())); - TShowClusterResp resp = client.showCluster(); + final TShowClusterResp resp = client.showCluster(); if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { // Only the ConfigNodeClient who connects to the ConfigNode-leader @@ -780,7 +788,7 @@ public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() + " message: " + resp.getStatus().getMessage()); } - } catch (Exception e) { + } catch (final Exception e) { lastException = e; } @@ -801,12 +809,12 @@ public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() @Override public IConfigNodeRPCService.Iface getConfigNodeConnection(int index) throws Exception { Exception lastException = null; - ConfigNodeWrapper configNodeWrapper = configNodeWrapperList.get(index); + final ConfigNodeWrapper configNodeWrapper = configNodeWrapperList.get(index); for (int i = 0; i < 30; i++) { try { return clientManager.borrowClient( new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getPort())); - } catch (Exception e) { + } catch (final Exception e) { lastException = e; } // Sleep 1s before next retry @@ -822,9 +830,9 @@ public int getFirstLeaderSchemaRegionDataNodeIndex() throws IOException, Interru ConfigNodeWrapper lastErrorNode = null; for (int retry = 0; retry < 30; retry++) { for (int configNodeId = 0; configNodeId < configNodeWrapperList.size(); configNodeId++) { - ConfigNodeWrapper configNodeWrapper = configNodeWrapperList.get(configNodeId); + final ConfigNodeWrapper configNodeWrapper = configNodeWrapperList.get(configNodeId); lastErrorNode = configNodeWrapper; - try (SyncConfigNodeIServiceClient client = + try (final SyncConfigNodeIServiceClient client = clientManager.borrowClient( new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getPort()))) { TShowRegionResp resp = @@ -837,12 +845,12 @@ public int getFirstLeaderSchemaRegionDataNodeIndex() throws IOException, Interru int port; if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - for (TRegionInfo tRegionInfo : resp.getRegionInfoList()) { + for (final TRegionInfo tRegionInfo : resp.getRegionInfoList()) { if (tRegionInfo.getRoleType().equals("Leader")) { ip = tRegionInfo.getClientRpcIp(); port = tRegionInfo.getClientRpcPort(); for (int dataNodeId = 0; dataNodeId < dataNodeWrapperList.size(); ++dataNodeId) { - DataNodeWrapper dataNodeWrapper = dataNodeWrapperList.get(dataNodeId); + final DataNodeWrapper dataNodeWrapper = dataNodeWrapperList.get(dataNodeId); if (dataNodeWrapper.getIp().equals(ip) && dataNodeWrapper.getPort() == port) { return dataNodeId; } @@ -858,7 +866,7 @@ public int getFirstLeaderSchemaRegionDataNodeIndex() throws IOException, Interru + " message: " + resp.getStatus().getMessage()); } - } catch (Exception e) { + } catch (final Exception e) { lastException = e; } @@ -882,12 +890,12 @@ public int getLeaderConfigNodeIndex() throws IOException, InterruptedException { ConfigNodeWrapper lastErrorNode = null; for (int retry = 0; retry < retryCount; retry++) { for (int configNodeId = 0; configNodeId < configNodeWrapperList.size(); configNodeId++) { - ConfigNodeWrapper configNodeWrapper = configNodeWrapperList.get(configNodeId); + final ConfigNodeWrapper configNodeWrapper = configNodeWrapperList.get(configNodeId); lastErrorNode = configNodeWrapper; - try (SyncConfigNodeIServiceClient client = + try (final SyncConfigNodeIServiceClient client = clientManager.borrowClient( new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getPort()))) { - TShowClusterResp resp = client.showCluster(); + final TShowClusterResp resp = client.showCluster(); // Only the ConfigNodeClient who connects to the ConfigNode-leader // will respond the SUCCESS_STATUS if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { @@ -899,7 +907,7 @@ public int getLeaderConfigNodeIndex() throws IOException, InterruptedException { + " message: " + resp.getStatus().getMessage()); } - } catch (Exception e) { + } catch (final Exception e) { lastException = e; } @@ -919,15 +927,13 @@ public int getLeaderConfigNodeIndex() throws IOException, InterruptedException { } @Override - public void startConfigNode(int index) { + public void startConfigNode(final int index) { configNodeWrapperList.get(index).start(); } @Override public void startAllConfigNodes() { - for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) { - configNodeWrapper.start(); - } + configNodeWrapperList.forEach(AbstractNodeWrapper::start); } @Override @@ -937,24 +943,22 @@ public void shutdownConfigNode(int index) { @Override public void shutdownAllConfigNodes() { - for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) { - configNodeWrapper.stop(); - } + configNodeWrapperList.forEach(AbstractNodeWrapper::stop); } @Override - public ConfigNodeWrapper getConfigNodeWrapper(int index) { + public ConfigNodeWrapper getConfigNodeWrapper(final int index) { return configNodeWrapperList.get(index); } @Override - public DataNodeWrapper getDataNodeWrapper(int index) { + public DataNodeWrapper getDataNodeWrapper(final int index) { return dataNodeWrapperList.get(index); } @Override public ConfigNodeWrapper generateRandomConfigNodeWrapper() { - ConfigNodeWrapper newConfigNodeWrapper = + final ConfigNodeWrapper newConfigNodeWrapper = new ConfigNodeWrapper( false, configNodeWrapperList.get(0).getIpAndPortString(), @@ -976,7 +980,7 @@ public ConfigNodeWrapper generateRandomConfigNodeWrapper() { @Override public DataNodeWrapper generateRandomDataNodeWrapper() { - DataNodeWrapper newDataNodeWrapper = + final DataNodeWrapper newDataNodeWrapper = new DataNodeWrapper( configNodeWrapperList.get(0).getIpAndPortString(), getTestClassName(), @@ -996,19 +1000,20 @@ public DataNodeWrapper generateRandomDataNodeWrapper() { } @Override - public void registerNewDataNode(boolean isNeedVerify) { + public void registerNewDataNode(final boolean isNeedVerify) { registerNewDataNode(generateRandomDataNodeWrapper(), isNeedVerify); } @Override - public void registerNewConfigNode(boolean isNeedVerify) { + public void registerNewConfigNode(final boolean isNeedVerify) { registerNewConfigNode(generateRandomConfigNodeWrapper(), isNeedVerify); } @Override - public void registerNewConfigNode(ConfigNodeWrapper newConfigNodeWrapper, boolean isNeedVerify) { + public void registerNewConfigNode( + final ConfigNodeWrapper newConfigNodeWrapper, final boolean isNeedVerify) { // Start new ConfigNode - RequestDelegate configNodeDelegate = + final RequestDelegate configNodeDelegate = new ParallelRequestDelegate<>( Collections.singletonList(newConfigNodeWrapper.getIpAndPortString()), NODE_START_TIMEOUT); @@ -1020,7 +1025,7 @@ public void registerNewConfigNode(ConfigNodeWrapper newConfigNodeWrapper, boolea try { configNodeDelegate.requestAll(); - } catch (SQLException e) { + } catch (final SQLException e) { logger.error("Start configNode failed", e); throw new AssertionError(); } @@ -1032,11 +1037,12 @@ public void registerNewConfigNode(ConfigNodeWrapper newConfigNodeWrapper, boolea } @Override - public void registerNewDataNode(DataNodeWrapper newDataNodeWrapper, boolean isNeedVerify) { + public void registerNewDataNode( + final DataNodeWrapper newDataNodeWrapper, final boolean isNeedVerify) { // Start new DataNode - List dataNodeEndpoints = + final List dataNodeEndpoints = Collections.singletonList(newDataNodeWrapper.getIpAndPortString()); - RequestDelegate dataNodesDelegate = + final RequestDelegate dataNodesDelegate = new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT); dataNodesDelegate.addRequest( () -> { @@ -1045,7 +1051,7 @@ public void registerNewDataNode(DataNodeWrapper newDataNodeWrapper, boolean isNe }); try { dataNodesDelegate.requestAll(); - } catch (SQLException e) { + } catch (final SQLException e) { logger.error("Start dataNodes failed", e); throw new AssertionError(); } @@ -1057,58 +1063,63 @@ public void registerNewDataNode(DataNodeWrapper newDataNodeWrapper, boolean isNe } @Override - public void startDataNode(int index) { + public void startDataNode(final int index) { dataNodeWrapperList.get(index).start(); } @Override public void startAllDataNodes() { - for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) { - dataNodeWrapper.start(); - } + dataNodeWrapperList.forEach(AbstractNodeWrapper::start); } @Override - public void shutdownDataNode(int index) { + public void shutdownDataNode(final int index) { dataNodeWrapperList.get(index).stop(); } @Override public void shutdownAllDataNodes() { - for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) { - dataNodeWrapper.stop(); - } + dataNodeWrapperList.forEach(AbstractNodeWrapper::stop); } @Override - public void ensureNodeStatus(List nodes, List targetStatus) + public void ensureNodeStatus( + final List nodes, final List targetStatus) throws IllegalStateException { Throwable lastException = null; for (int i = 0; i < retryCount; i++) { - try (SyncConfigNodeIServiceClient client = + try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { - List errorMessages = new ArrayList<>(nodes.size()); - Map nodeIds = new HashMap<>(nodes.size()); - TShowClusterResp showClusterResp = client.showCluster(); - for (TConfigNodeLocation node : showClusterResp.getConfigNodeList()) { - nodeIds.put( - node.getInternalEndPoint().getIp() + ":" + node.getInternalEndPoint().getPort(), - node.getConfigNodeId()); - } - for (TDataNodeLocation node : showClusterResp.getDataNodeList()) { - nodeIds.put( - node.getClientRpcEndPoint().getIp() + ":" + node.getClientRpcEndPoint().getPort(), - node.getDataNodeId()); - } + final List errorMessages = new ArrayList<>(nodes.size()); + final Map nodeIds = new HashMap<>(nodes.size()); + final TShowClusterResp showClusterResp = client.showCluster(); + showClusterResp + .getConfigNodeList() + .forEach( + node -> + nodeIds.put( + node.getInternalEndPoint().getIp() + + ":" + + node.getInternalEndPoint().getPort(), + node.getConfigNodeId())); + showClusterResp + .getDataNodeList() + .forEach( + node -> + nodeIds.put( + node.getClientRpcEndPoint().getIp() + + ":" + + node.getClientRpcEndPoint().getPort(), + node.getDataNodeId())); for (int j = 0; j < nodes.size(); j++) { - String endpoint = nodes.get(j).getIpAndPortString(); + final String endpoint = nodes.get(j).getIpAndPortString(); if (!nodeIds.containsKey(endpoint)) { // Node not exist // Notice: Never modify this line, since the NodeLocation might be modified in IT errorMessages.add("The node " + nodes.get(j).getIpAndPortString() + " is not found!"); continue; } - String status = showClusterResp.getNodeStatus().get(nodeIds.get(endpoint)); + final String status = showClusterResp.getNodeStatus().get(nodeIds.get(endpoint)); if (!targetStatus.get(j).getStatus().equals(status)) { // Error status errorMessages.add( @@ -1122,12 +1133,12 @@ public void ensureNodeStatus(List nodes, List targe } else { lastException = new IllegalStateException(String.join(". ", errorMessages)); } - } catch (TException | ClientManagerException | IOException | InterruptedException e) { + } catch (final TException | ClientManagerException | IOException | InterruptedException e) { lastException = e; } try { TimeUnit.SECONDS.sleep(1); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { throw new RuntimeException(e); } } @@ -1136,8 +1147,9 @@ public void ensureNodeStatus(List nodes, List targe @Override public int getMqttPort() { - int randomIndex = new Random(System.currentTimeMillis()).nextInt(dataNodeWrapperList.size()); - return dataNodeWrapperList.get(randomIndex).getMqttPort(); + return dataNodeWrapperList + .get(new Random(System.currentTimeMillis()).nextInt(dataNodeWrapperList.size())) + .getMqttPort(); } @Override @@ -1166,11 +1178,11 @@ public String getLibPath() { } @Override - public Optional dataNodeIdToWrapper(int nodeId) { - try (SyncConfigNodeIServiceClient leaderClient = + public Optional dataNodeIdToWrapper(final int nodeId) { + try (final SyncConfigNodeIServiceClient leaderClient = (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) { - TShowDataNodesResp resp = leaderClient.showDataNodes(); - for (TDataNodeInfo dataNodeInfo : resp.getDataNodesInfoList()) { + final TShowDataNodesResp resp = leaderClient.showDataNodes(); + for (final TDataNodeInfo dataNodeInfo : resp.getDataNodesInfoList()) { if (dataNodeInfo.getDataNodeId() == nodeId) { return dataNodeWrapperList.stream() .filter(dataNodeWrapper -> dataNodeWrapper.getPort() == dataNodeInfo.getRpcPort()) @@ -1178,18 +1190,18 @@ public Optional dataNodeIdToWrapper(int nodeId) { } } return Optional.empty(); - } catch (Exception e) { + } catch (final Exception e) { return Optional.empty(); } } @Override - public void registerConfigNodeKillPoints(List killPoints) { + public void registerConfigNodeKillPoints(final List killPoints) { this.configNodeKillPoints = killPoints; } @Override - public void registerDataNodeKillPoints(List killPoints) { + public void registerDataNodeKillPoints(final List killPoints) { this.dataNodeKillPoints = killPoints; } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MultiClusterEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MultiClusterEnv.java index 1eb05cb315f4..d462b5a668b3 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MultiClusterEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MultiClusterEnv.java @@ -25,7 +25,7 @@ public class MultiClusterEnv extends AbstractEnv { - public MultiClusterEnv(long startTime, int index, String currentMethodName) { + public MultiClusterEnv(final long startTime, final int index, final String currentMethodName) { super(startTime); this.index = index; this.testMethodName = currentMethodName; @@ -33,18 +33,18 @@ public MultiClusterEnv(long startTime, int index, String currentMethodName) { @Override public void initClusterEnvironment() { - Pair nodeNum = EnvUtils.getNodeNum(index); + final Pair nodeNum = EnvUtils.getNodeNum(index); super.initEnvironment(nodeNum.getLeft(), nodeNum.getRight()); } @Override - public void initClusterEnvironment(int configNodesNum, int dataNodesNum) { + public void initClusterEnvironment(final int configNodesNum, final int dataNodesNum) { super.initEnvironment(configNodesNum, dataNodesNum); } @Override public void initClusterEnvironment( - int configNodesNum, int dataNodesNum, int testWorkingRetryCount) { + final int configNodesNum, final int dataNodesNum, final int testWorkingRetryCount) { super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount); } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java index bf28c407f728..e3f9b0b5d3ec 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java @@ -649,7 +649,7 @@ protected String getTestLogDirName() { if (testMethodName == null) { return testClassName; } - return testClassName + "_" + testMethodName; + return testClassName + File.separator + testMethodName; } public void setKillPoints(List killPoints) { diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/ConfigNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/ConfigNodeWrapper.java index a509790ad075..2b3211fe8681 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/ConfigNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/ConfigNodeWrapper.java @@ -57,23 +57,17 @@ public class ConfigNodeWrapper extends AbstractNodeWrapper { private final String defaultCommonPropertiesFile; public ConfigNodeWrapper( - boolean isSeed, - String targetCNs, - String testClassName, - String testMethodName, - int[] portList, - int clusterIndex, - boolean isMultiCluster, - long startTime) { + final boolean isSeed, + final String targetCNs, + final String testClassName, + final String testMethodName, + final int[] portList, + final int clusterIndex, + final boolean isMultiCluster, + final long startTime) { super(testClassName, testMethodName, portList, clusterIndex, isMultiCluster, startTime); this.consensusPort = portList[1]; this.isSeed = isSeed; - String seedConfigNodes; - if (isSeed) { - seedConfigNodes = getIpAndPortString(); - } else { - seedConfigNodes = targetCNs; - } this.defaultNodePropertiesFile = EnvUtils.getFilePathFromSysVar(DEFAULT_CONFIG_NODE_PROPERTIES, clusterIndex); this.defaultCommonPropertiesFile = @@ -83,7 +77,8 @@ public ConfigNodeWrapper( reloadMutableFields(); // initialize immutable properties - immutableNodeProperties.setProperty(IoTDBConstant.CN_SEED_CONFIG_NODE, seedConfigNodes); + immutableNodeProperties.setProperty( + IoTDBConstant.CN_SEED_CONFIG_NODE, isSeed ? getIpAndPortString() : targetCNs); immutableNodeProperties.setProperty(CN_SYSTEM_DIR, MppBaseConfig.NULL_VALUE); immutableNodeProperties.setProperty(CN_CONSENSUS_DIR, MppBaseConfig.NULL_VALUE); immutableNodeProperties.setProperty(CN_METRIC_IOTDB_REPORTER_HOST, MppBaseConfig.NULL_VALUE); @@ -129,7 +124,7 @@ public final String getId() { } @Override - protected void addStartCmdParams(List params) { + protected void addStartCmdParams(final List params) { final String workDir = getNodePath(); final String confDir = workDir + File.separator + "conf"; params.addAll( @@ -166,14 +161,14 @@ protected void reloadMutableFields() { @Override protected void renameFile() { - String configNodeName = isSeed ? "SeedConfigNode" : "ConfigNode"; + final String configNodeName = isSeed ? "SeedConfigNode" : "ConfigNode"; // rename log file - File oldLogFile = + final File oldLogFile = new File(getLogDirPath() + File.separator + configNodeName + portList[0] + ".log"); oldLogFile.renameTo(new File(getLogDirPath() + File.separator + getId() + ".log")); // rename node dir - File oldNodeDir = + final File oldNodeDir = new File( System.getProperty(USER_DIR) + File.separator @@ -184,7 +179,7 @@ protected void renameFile() { oldNodeDir.renameTo(new File(getNodePath())); } - public void setConsensusPort(int consensusPort) { + public void setConsensusPort(final int consensusPort) { this.consensusPort = consensusPort; } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java index cb7e840af636..127f9b331857 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java @@ -83,13 +83,13 @@ public class DataNodeWrapper extends AbstractNodeWrapper { private final String defaultCommonPropertiesFile; public DataNodeWrapper( - String seedConfigNode, - String testClassName, - String testMethodName, - int[] portList, - int clusterIndex, - boolean isMultiCluster, - long startTime) { + final String seedConfigNode, + final String testClassName, + final String testMethodName, + final int[] portList, + final int clusterIndex, + final boolean isMultiCluster, + final long startTime) { super(testClassName, testMethodName, portList, clusterIndex, isMultiCluster, startTime); this.internalAddress = super.getIp(); this.mppDataExchangePort = portList[1]; @@ -161,7 +161,7 @@ public final String getId() { } @Override - protected void addStartCmdParams(List params) { + protected void addStartCmdParams(final List params) { final String workDir = getNodePath(); final String confDir = workDir + File.separator + "conf"; params.addAll( @@ -214,22 +214,22 @@ protected void reloadMutableFields() { @Override public void renameFile() { // Rename log file - String oldLogFilePath = + final String oldLogFilePath = getLogDirPath() + File.separator + DATA_NODE_NAME + portList[0] + ".log"; - String newLogFilePath = getLogDirPath() + File.separator + getId() + ".log"; - File oldLogFile = new File(oldLogFilePath); + final String newLogFilePath = getLogDirPath() + File.separator + getId() + ".log"; + final File oldLogFile = new File(oldLogFilePath); oldLogFile.renameTo(new File(newLogFilePath)); // Rename node dir - String oldNodeDirPath = + final String oldNodeDirPath = System.getProperty(USER_DIR) + File.separator + TARGET + File.separator + DATA_NODE_NAME + portList[0]; - String newNodeDirPath = getNodePath(); - File oldNodeDir = new File(oldNodeDirPath); + final String newNodeDirPath = getNodePath(); + final File oldNodeDir = new File(oldNodeDirPath); oldNodeDir.renameTo(new File(newNodeDirPath)); } @@ -237,7 +237,7 @@ public int getMppDataExchangePort() { return mppDataExchangePort; } - public void setMppDataExchangePort(int mppDataExchangePort) { + public void setMppDataExchangePort(final int mppDataExchangePort) { this.mppDataExchangePort = mppDataExchangePort; } @@ -249,7 +249,7 @@ public int getInternalPort() { return internalPort; } - public void setInternalPort(int internalPort) { + public void setInternalPort(final int internalPort) { this.internalPort = internalPort; } diff --git a/integration-test/src/test/java/org/apache/iotdb/it/framework/IoTDBTestRunner.java b/integration-test/src/test/java/org/apache/iotdb/it/framework/IoTDBTestRunner.java index 07e045146883..17e20d91e9ac 100644 --- a/integration-test/src/test/java/org/apache/iotdb/it/framework/IoTDBTestRunner.java +++ b/integration-test/src/test/java/org/apache/iotdb/it/framework/IoTDBTestRunner.java @@ -37,33 +37,30 @@ public class IoTDBTestRunner extends BlockJUnit4ClassRunner { private static final Logger logger = IoTDBTestLogger.logger; private IoTDBTestListener listener; - public IoTDBTestRunner(Class testClass) throws InitializationError { + public IoTDBTestRunner(final Class testClass) throws InitializationError { super(testClass); } @Override - public void run(RunNotifier notifier) { - TimeZone.setDefault(TimeZone.getTimeZone("Bejing")); + public void run(final RunNotifier notifier) { + TimeZone.setDefault(TimeZone.getTimeZone("UTC+08:00")); listener = new IoTDBTestListener(this.getName()); notifier.addListener(listener); super.run(notifier); } @Override - protected void runChild(final FrameworkMethod method, RunNotifier notifier) { - Description description = describeChild(method); + protected void runChild(final FrameworkMethod method, final RunNotifier notifier) { + final Description description = describeChild(method); logger.info("Run {}", description.getMethodName()); - long currentTime = System.currentTimeMillis(); + final long currentTime = System.currentTimeMillis(); if (EnvType.getSystemEnvType() != EnvType.MultiCluster) { EnvFactory.getEnv().setTestMethodName(description.getMethodName()); - } else { - // TestMethodName must be set globally in MultiEnvFactory, since the - // cluster environments are not created now - MultiEnvFactory.setTestMethodName(description.getMethodName()); } + MultiEnvFactory.setTestMethodName(description.getMethodName()); super.runChild(method, notifier); - double timeCost = (System.currentTimeMillis() - currentTime) / 1000.0; - String testName = description.getClassName() + "." + description.getMethodName(); + final double timeCost = (System.currentTimeMillis() - currentTime) / 1000.0; + final String testName = description.getClassName() + "." + description.getMethodName(); logger.info("Done {}. Cost: {}s", description.getMethodName(), timeCost); listener.addTestStat(new IoTDBTestStat(testName, timeCost)); } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java index 14567b01f112..8caabaf6f626 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java @@ -22,15 +22,21 @@ import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT1; import org.apache.iotdb.rpc.TSStatusCode; import org.junit.Assert; import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; import java.util.Collections; import java.util.HashMap; import java.util.Map; +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT1.class}) public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT { @Test public void testOPCUASink() throws Exception { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index 6b2b17a308bd..5ecdc23c72cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -533,10 +533,7 @@ private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource reso return true; } - return deviceSet.stream() - .anyMatch( - // TODO: use IDeviceID - deviceID -> pipePattern.mayOverlapWithDevice(deviceID)); + return deviceSet.stream().anyMatch(deviceID -> pipePattern.mayOverlapWithDevice(deviceID)); } private boolean isTsFileResourceOverlappedWithTimeRange(final TsFileResource resource) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SchemaRegionSnapshotParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SchemaRegionSnapshotParser.java index 7f6a719070be..a9ad880bebb6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SchemaRegionSnapshotParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SchemaRegionSnapshotParser.java @@ -34,7 +34,6 @@ import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -49,7 +48,8 @@ private SchemaRegionSnapshotParser() { // Empty constructor } - private static Path getLatestSnapshotPath(List snapshotPathList, boolean includingTmp) { + private static Path getLatestSnapshotPath( + final List snapshotPathList, final boolean includingTmp) { if (snapshotPathList.isEmpty()) { return null; } @@ -68,69 +68,28 @@ private static Path getLatestSnapshotPath(List snapshotPathList, boolean i return pathArray[pathArray.length - 1]; } - // Return all schema region's latest snapshot units in this datanode. - public static List> getSnapshotPaths() { - final String snapshotPath = CONFIG.getSchemaRegionConsensusDir(); - final File snapshotDir = new File(snapshotPath); - final ArrayList> snapshotUnits = new ArrayList<>(); - - // Get schema region path - try (DirectoryStream stream = - Files.newDirectoryStream(snapshotDir.toPath(), "[0-9]*-[0-9]*-[0-9]*-[0-9]*-[0-9]*")) { - for (Path path : stream) { - try (DirectoryStream filestream = - Files.newDirectoryStream(Paths.get(path.toString() + File.separator + "sm"))) { - // Find the latest snapshots - final ArrayList snapshotList = new ArrayList<>(); - for (Path snapshotFolder : filestream) { - if (snapshotFolder.toFile().isDirectory()) { - snapshotList.add(snapshotFolder); - } - } - final Path latestSnapshotPath = getLatestSnapshotPath(snapshotList, false); - if (latestSnapshotPath != null) { - // Get metadata from the latest snapshot folder. - final File mTreeSnapshot = - SystemFileFactory.INSTANCE.getFile( - latestSnapshotPath + File.separator + SchemaConstant.MTREE_SNAPSHOT); - final File tagSnapshot = - SystemFileFactory.INSTANCE.getFile( - latestSnapshotPath + File.separator + SchemaConstant.TAG_LOG_SNAPSHOT); - if (mTreeSnapshot.exists()) { - snapshotUnits.add( - new Pair<>( - mTreeSnapshot.toPath(), tagSnapshot.exists() ? tagSnapshot.toPath() : null)); - } - } - } - } - } catch (IOException exception) { - LOGGER.warn("Cannot construct snapshot directory stream", exception); - } - return snapshotUnits; - } - // In schema snapshot path: datanode/consensus/schema_region/47474747-4747-4747-4747-000200000000 // this func will get schema region id = 47474747-4747-4747-4747-000200000000's latest snapshot. // In one schema region, there is only one snapshot unit. - public static Pair getSnapshotPaths(String schemaRegionId, boolean isTmp) { + public static Pair getSnapshotPaths( + final String schemaRegionId, final boolean isTmp) { final String snapshotPath = CONFIG.getSchemaRegionConsensusDir(); final File snapshotDir = new File(snapshotPath + File.separator + schemaRegionId + File.separator + "sm"); // Get the latest snapshot file final ArrayList snapshotList = new ArrayList<>(); - try (DirectoryStream stream = + try (final DirectoryStream stream = Files.newDirectoryStream( snapshotDir.toPath(), isTmp ? ".tmp.[0-9]*_[0-9]*" : "[0-9]*_[0-9]*")) { - for (Path path : stream) { + for (final Path path : stream) { snapshotList.add(path); } - } catch (IOException ioException) { + } catch (final IOException ioException) { LOGGER.warn("ioexception when get {}'s folder", schemaRegionId, ioException); return null; } - Path latestSnapshotPath = getLatestSnapshotPath(snapshotList, isTmp); + final Path latestSnapshotPath = getLatestSnapshotPath(snapshotList, isTmp); if (latestSnapshotPath != null) { // Get metadata from the latest snapshot folder. final File mTreeSnapshot = @@ -148,17 +107,14 @@ public static Pair getSnapshotPaths(String schemaRegionId, boolean i } public static SRStatementGenerator translate2Statements( - Path mtreePath, Path tagFilePath, PartialPath databasePath) throws IOException { + final Path mtreePath, final Path tagFilePath, final PartialPath databasePath) + throws IOException { if (mtreePath == null) { return null; } final File mtreefile = mtreePath.toFile(); - final File tagfile; - if (tagFilePath != null && tagFilePath.toFile().exists()) { - tagfile = tagFilePath.toFile(); - } else { - tagfile = null; - } + final File tagfile = + tagFilePath != null && tagFilePath.toFile().exists() ? tagFilePath.toFile() : null; if (!mtreefile.exists()) { return null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java index 99c0445485e6..95f24142229c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java @@ -738,16 +738,17 @@ public static String convertLongToDate(long timestamp) { ZoneId.systemDefault()); } - public static String convertLongToDate(long timestamp, ZoneId zoneId) { + public static String convertLongToDate(final long timestamp, final ZoneId zoneId) { return convertLongToDate( timestamp, CommonDescriptor.getInstance().getConfig().getTimestampPrecision(), zoneId); } - public static String convertLongToDate(long timestamp, String sourcePrecision) { + public static String convertLongToDate(final long timestamp, final String sourcePrecision) { return convertLongToDate(timestamp, sourcePrecision, ZoneId.systemDefault()); } - public static String convertLongToDate(long timestamp, String sourcePrecision, ZoneId zoneId) { + public static String convertLongToDate( + long timestamp, final String sourcePrecision, final ZoneId zoneId) { switch (sourcePrecision) { case "ns": case "nanosecond": @@ -757,6 +758,8 @@ public static String convertLongToDate(long timestamp, String sourcePrecision, Z case "microsecond": timestamp /= 1000; break; + default: + break; } return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zoneId).toString(); }