Skip to content

Commit

Permalink
Pipe & IT : Refactor pipe / IT and improved IT behaviour (apache#13521)
Browse files Browse the repository at this point in the history
1. Refactored pipe and IT codes.
2. Changed the IT's log_dir from class_method to class(separator)method, and fixed the bug that multiClusterIT's method is not set.
3. Fixed the bug that IoTDBPipeOPCUAIT is not run.
  • Loading branch information
Caideyipi authored Sep 18, 2024
1 parent f1cfc48 commit e07d39d
Show file tree
Hide file tree
Showing 13 changed files with 317 additions and 364 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,25 @@ private MultiEnvFactory() {
// Empty constructor
}

public static void setTestMethodName(String testMethodName) {
public static void setTestMethodName(final String testMethodName) {
currentMethodName = testMethodName;
envList.forEach(baseEnv -> baseEnv.setTestMethodName(testMethodName));
}

/** Get an environment with the specific index. */
public static BaseEnv getEnv(int index) throws IndexOutOfBoundsException {
public static BaseEnv getEnv(final int index) throws IndexOutOfBoundsException {
return envList.get(index);
}

/** Create several environments according to the specific number. */
public static void createEnv(int num) {
public static void createEnv(final int num) {
// Not judge EnvType for individual test convenience
long startTime = System.currentTimeMillis();
final long startTime = System.currentTimeMillis();
for (int i = 0; i < num; ++i) {
try {
Class.forName(Config.JDBC_DRIVER_NAME);
envList.add(new MultiClusterEnv(startTime, i, currentMethodName));
} catch (ClassNotFoundException e) {
} catch (final ClassNotFoundException e) {
logger.error("Create env error", e);
System.exit(-1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ public class ClusterConstant {
"strongConsistencyClusterMode.dataRegionReplicaNumber";

// Property file names
public static final String CONFIG_NODE_PROPERTIES_FILE = "iotdb-confignode.properties";
public static final String DATA_NODE_PROPERTIES_FILE = "iotdb-datanode.properties";
public static final String COMMON_PROPERTIES_FILE = "iotdb-common.properties";
public static final String IOTDB_SYSTEM_PROPERTIES_FILE = "iotdb-system.properties";

// Properties' keys
Expand All @@ -142,10 +139,7 @@ public class ClusterConstant {
// ConfigNode
public static final String CN_SYSTEM_DIR = "cn_system_dir";
public static final String CN_CONSENSUS_DIR = "cn_consensus_dir";
public static final String CN_METRIC_PROMETHEUS_REPORTER_PORT =
"cn_metric_prometheus_reporter_port";
public static final String CN_METRIC_IOTDB_REPORTER_HOST = "cn_metric_iotdb_reporter_host";
public static final String CN_METRIC_IOTDB_REPORTER_PORT = "cn_metric_iotdb_reporter_port";

public static final String CN_CONNECTION_TIMEOUT_MS = "cn_connection_timeout_ms";

Expand All @@ -157,13 +151,10 @@ public class ClusterConstant {
public static final String DN_TRACING_DIR = "dn_tracing_dir";
public static final String DN_SYNC_DIR = "dn_sync_dir";
public static final String DN_METRIC_IOTDB_REPORTER_HOST = "dn_metric_iotdb_reporter_host";
public static final String DN_METRIC_PROMETHEUS_REPORTER_PORT =
"dn_metric_prometheus_reporter_port";

public static final String DN_MPP_DATA_EXCHANGE_PORT = "dn_mpp_data_exchange_port";
public static final String DN_DATA_REGION_CONSENSUS_PORT = "dn_data_region_consensus_port";
public static final String DN_SCHEMA_REGION_CONSENSUS_PORT = "dn_schema_region_consensus_port";
public static final String PIPE_AIR_GAP_RECEIVER_ENABLED = "pipe_air_gap_receiver_enabled";
public static final String PIPE_AIR_GAP_RECEIVER_PORT = "pipe_air_gap_receiver_port";
public static final String MAX_TSBLOCK_SIZE_IN_BYTES = "max_tsblock_size_in_bytes";
public static final String PAGE_SIZE_IN_BYTE = "page_size_in_byte";
Expand Down Expand Up @@ -205,7 +196,6 @@ public class ClusterConstant {

// Env Constant
public static final int NODE_START_TIMEOUT = 100;
public static final int PROBE_TIMEOUT_MS = 2000;
public static final int NODE_NETWORK_TIMEOUT_MS = 0;
public static final String ZERO_TIME_ZONE = "GMT+0";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,22 @@ public static int[] searchAvailablePorts() {
while (true) {
int randomPortStart = 1000 + (int) (Math.random() * (1999 - 1000));
randomPortStart = randomPortStart * (length + 1) + 1;
String lockFilePath = getLockFilePath(randomPortStart);
File lockFile = new File(lockFilePath);
final String lockFilePath = getLockFilePath(randomPortStart);
final File lockFile = new File(lockFilePath);
try {
// Lock the ports first to avoid to be occupied by other ForkedBooters during ports
// available detecting
if (!lockFile.createNewFile()) {
continue;
}
List<Integer> requiredPorts =
final List<Integer> requiredPorts =
IntStream.rangeClosed(randomPortStart, randomPortStart + length)
.boxed()
.collect(Collectors.toList());
if (checkPortsAvailable(requiredPorts)) {
return requiredPorts.stream().mapToInt(Integer::intValue).toArray();
}
} catch (IOException e) {
} catch (final IOException ignore) {
// ignore
}
// Delete the lock file if the ports can't be used or some error happens
Expand All @@ -95,39 +95,35 @@ public static int[] searchAvailablePorts() {
}
}

private static boolean checkPortsAvailable(List<Integer> ports) {
String cmd = getSearchAvailablePortCmd(ports);
private static boolean checkPortsAvailable(final List<Integer> ports) {
final String cmd = getSearchAvailablePortCmd(ports);
try {
Process proc = Runtime.getRuntime().exec(cmd);
return proc.waitFor() == 1;
} catch (IOException e) {
return Runtime.getRuntime().exec(cmd).waitFor() == 1;
} catch (final IOException ignore) {
// ignore
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}

private static String getSearchAvailablePortCmd(List<Integer> ports) {
if (SystemUtils.IS_OS_WINDOWS) {
return getWindowsSearchPortCmd(ports);
}
return getUnixSearchPortCmd(ports);
private static String getSearchAvailablePortCmd(final List<Integer> ports) {
return SystemUtils.IS_OS_WINDOWS ? getWindowsSearchPortCmd(ports) : getUnixSearchPortCmd(ports);
}

private static String getWindowsSearchPortCmd(List<Integer> ports) {
String cmd = "netstat -aon -p tcp | findStr ";
return cmd
private static String getWindowsSearchPortCmd(final List<Integer> ports) {
return "netstat -aon -p tcp | findStr "
+ ports.stream().map(v -> "/C:'127.0.0.1:" + v + "'").collect(Collectors.joining(" "));
}

private static String getUnixSearchPortCmd(List<Integer> ports) {
String cmd = "lsof -iTCP -sTCP:LISTEN -P -n | awk '{print $9}' | grep -E ";
return cmd + ports.stream().map(String::valueOf).collect(Collectors.joining("|")) + "\"";
private static String getUnixSearchPortCmd(final List<Integer> ports) {
return "lsof -iTCP -sTCP:LISTEN -P -n | awk '{print $9}' | grep -E "
+ ports.stream().map(String::valueOf).collect(Collectors.joining("|"))
+ "\"";
}

private static Pair<Integer, Integer> getClusterNodesNum(int index) {
String valueStr = System.getProperty(CLUSTER_CONFIGURATIONS);
private static Pair<Integer, Integer> getClusterNodesNum(final int index) {
final String valueStr = System.getProperty(CLUSTER_CONFIGURATIONS);
if (valueStr == null) {
return null;
}
Expand All @@ -154,17 +150,17 @@ private static Pair<Integer, Integer> getClusterNodesNum(int index) {
// Print nothing to avoid polluting test outputs
return null;
}
} catch (NumberFormatException ignore) {
} catch (final NumberFormatException ignore) {
return null;
}
}

public static String getLockFilePath(int port) {
public static String getLockFilePath(final int port) {
return LOCK_FILE_PATH + port;
}

public static Pair<Integer, Integer> getNodeNum() {
Pair<Integer, Integer> nodesNum = getClusterNodesNum(0);
final Pair<Integer, Integer> nodesNum = getClusterNodesNum(0);
if (nodesNum != null) {
return nodesNum;
}
Expand All @@ -173,8 +169,8 @@ public static Pair<Integer, Integer> getNodeNum() {
getIntFromSysVar(DEFAULT_DATA_NODE_NUM, 3, 0));
}

public static Pair<Integer, Integer> getNodeNum(int index) {
Pair<Integer, Integer> nodesNum = getClusterNodesNum(index);
public static Pair<Integer, Integer> getNodeNum(final int index) {
final Pair<Integer, Integer> nodesNum = getClusterNodesNum(index);
if (nodesNum != null) {
return nodesNum;
}
Expand All @@ -183,38 +179,38 @@ public static Pair<Integer, Integer> getNodeNum(int index) {
getIntFromSysVar(DEFAULT_DATA_NODE_NUM, 3, index));
}

public static String getFilePathFromSysVar(String key, int index) {
String valueStr = System.getProperty(key);
public static String getFilePathFromSysVar(final String key, final int index) {
final String valueStr = System.getProperty(key);
if (valueStr == null) {
return null;
}
return System.getProperty(USER_DIR) + getValueOfIndex(valueStr, index);
}

public static int getIntFromSysVar(String key, int defaultValue, int index) {
String valueStr = System.getProperty(key);
public static int getIntFromSysVar(final String key, final int defaultValue, final int index) {
final String valueStr = System.getProperty(key);
if (valueStr == null) {
return defaultValue;
}

String value = getValueOfIndex(valueStr, index);
final String value = getValueOfIndex(valueStr, index);
try {
return Integer.parseInt(value);
} catch (NumberFormatException e) {
} catch (final NumberFormatException e) {
throw new IllegalArgumentException("Invalid property value: " + value + " of key " + key);
}
}

public static String getValueOfIndex(String valueStr, int index) {
String[] values = valueStr.split(DELIMITER);
public static String getValueOfIndex(final String valueStr, final int index) {
final String[] values = valueStr.split(DELIMITER);
return index <= values.length - 1 ? values[index] : values[values.length - 1];
}

public static String getTimeForLogDirectory(long startTime) {
public static String getTimeForLogDirectory(final long startTime) {
return convertLongToDate(startTime, "ms").replace(":", DIR_TIME_REPLACEMENT);
}

public static String fromConsensusFullNameToAbbr(String consensus) {
public static String fromConsensusFullNameToAbbr(final String consensus) {
switch (consensus) {
case SIMPLE_CONSENSUS:
return SIMPLE_CONSENSUS_STR;
Expand All @@ -233,7 +229,7 @@ public static String fromConsensusFullNameToAbbr(String consensus) {
}
}

public static String fromConsensusAbbrToFullName(String consensus) {
public static String fromConsensusAbbrToFullName(final String consensus) {
switch (consensus) {
case SIMPLE_CONSENSUS_STR:
return SIMPLE_CONSENSUS;
Expand Down
Loading

0 comments on commit e07d39d

Please sign in to comment.