Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe & IT : Refactor pipe / IT and improved IT behaviour #13521

Merged
merged 4 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,25 @@ private MultiEnvFactory() {
// Empty constructor
}

public static void setTestMethodName(String testMethodName) {
public static void setTestMethodName(final String testMethodName) {
currentMethodName = testMethodName;
envList.forEach(baseEnv -> baseEnv.setTestMethodName(testMethodName));
}

/** Get an environment with the specific index. */
public static BaseEnv getEnv(int index) throws IndexOutOfBoundsException {
public static BaseEnv getEnv(final int index) throws IndexOutOfBoundsException {
return envList.get(index);
}

/** Create several environments according to the specific number. */
public static void createEnv(int num) {
public static void createEnv(final int num) {
// Not judge EnvType for individual test convenience
long startTime = System.currentTimeMillis();
final long startTime = System.currentTimeMillis();
for (int i = 0; i < num; ++i) {
try {
Class.forName(Config.JDBC_DRIVER_NAME);
envList.add(new MultiClusterEnv(startTime, i, currentMethodName));
} catch (ClassNotFoundException e) {
} catch (final ClassNotFoundException e) {
logger.error("Create env error", e);
System.exit(-1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ public class ClusterConstant {
"strongConsistencyClusterMode.dataRegionReplicaNumber";

// Property file names
public static final String CONFIG_NODE_PROPERTIES_FILE = "iotdb-confignode.properties";
public static final String DATA_NODE_PROPERTIES_FILE = "iotdb-datanode.properties";
public static final String COMMON_PROPERTIES_FILE = "iotdb-common.properties";
public static final String IOTDB_SYSTEM_PROPERTIES_FILE = "iotdb-system.properties";

// Properties' keys
Expand All @@ -142,10 +139,7 @@ public class ClusterConstant {
// ConfigNode
public static final String CN_SYSTEM_DIR = "cn_system_dir";
public static final String CN_CONSENSUS_DIR = "cn_consensus_dir";
public static final String CN_METRIC_PROMETHEUS_REPORTER_PORT =
"cn_metric_prometheus_reporter_port";
public static final String CN_METRIC_IOTDB_REPORTER_HOST = "cn_metric_iotdb_reporter_host";
public static final String CN_METRIC_IOTDB_REPORTER_PORT = "cn_metric_iotdb_reporter_port";

public static final String CN_CONNECTION_TIMEOUT_MS = "cn_connection_timeout_ms";

Expand All @@ -157,13 +151,10 @@ public class ClusterConstant {
public static final String DN_TRACING_DIR = "dn_tracing_dir";
public static final String DN_SYNC_DIR = "dn_sync_dir";
public static final String DN_METRIC_IOTDB_REPORTER_HOST = "dn_metric_iotdb_reporter_host";
public static final String DN_METRIC_PROMETHEUS_REPORTER_PORT =
"dn_metric_prometheus_reporter_port";

public static final String DN_MPP_DATA_EXCHANGE_PORT = "dn_mpp_data_exchange_port";
public static final String DN_DATA_REGION_CONSENSUS_PORT = "dn_data_region_consensus_port";
public static final String DN_SCHEMA_REGION_CONSENSUS_PORT = "dn_schema_region_consensus_port";
public static final String PIPE_AIR_GAP_RECEIVER_ENABLED = "pipe_air_gap_receiver_enabled";
public static final String PIPE_AIR_GAP_RECEIVER_PORT = "pipe_air_gap_receiver_port";
public static final String MAX_TSBLOCK_SIZE_IN_BYTES = "max_tsblock_size_in_bytes";
public static final String PAGE_SIZE_IN_BYTE = "page_size_in_byte";
Expand Down Expand Up @@ -205,7 +196,6 @@ public class ClusterConstant {

// Env Constant
public static final int NODE_START_TIMEOUT = 100;
public static final int PROBE_TIMEOUT_MS = 2000;
public static final int NODE_NETWORK_TIMEOUT_MS = 0;
public static final String ZERO_TIME_ZONE = "GMT+0";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,22 @@ public static int[] searchAvailablePorts() {
while (true) {
int randomPortStart = 1000 + (int) (Math.random() * (1999 - 1000));
randomPortStart = randomPortStart * (length + 1) + 1;
String lockFilePath = getLockFilePath(randomPortStart);
File lockFile = new File(lockFilePath);
final String lockFilePath = getLockFilePath(randomPortStart);
final File lockFile = new File(lockFilePath);
try {
// Lock the ports first to avoid to be occupied by other ForkedBooters during ports
// available detecting
if (!lockFile.createNewFile()) {
continue;
}
List<Integer> requiredPorts =
final List<Integer> requiredPorts =
IntStream.rangeClosed(randomPortStart, randomPortStart + length)
.boxed()
.collect(Collectors.toList());
if (checkPortsAvailable(requiredPorts)) {
return requiredPorts.stream().mapToInt(Integer::intValue).toArray();
}
} catch (IOException e) {
} catch (final IOException ignore) {
// ignore
}
// Delete the lock file if the ports can't be used or some error happens
Expand All @@ -95,39 +95,35 @@ public static int[] searchAvailablePorts() {
}
}

private static boolean checkPortsAvailable(List<Integer> ports) {
String cmd = getSearchAvailablePortCmd(ports);
private static boolean checkPortsAvailable(final List<Integer> ports) {
final String cmd = getSearchAvailablePortCmd(ports);
try {
Process proc = Runtime.getRuntime().exec(cmd);
return proc.waitFor() == 1;
} catch (IOException e) {
return Runtime.getRuntime().exec(cmd).waitFor() == 1;
} catch (final IOException ignore) {
// ignore
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}

private static String getSearchAvailablePortCmd(List<Integer> ports) {
if (SystemUtils.IS_OS_WINDOWS) {
return getWindowsSearchPortCmd(ports);
}
return getUnixSearchPortCmd(ports);
private static String getSearchAvailablePortCmd(final List<Integer> ports) {
return SystemUtils.IS_OS_WINDOWS ? getWindowsSearchPortCmd(ports) : getUnixSearchPortCmd(ports);
}

private static String getWindowsSearchPortCmd(List<Integer> ports) {
String cmd = "netstat -aon -p tcp | findStr ";
return cmd
private static String getWindowsSearchPortCmd(final List<Integer> ports) {
return "netstat -aon -p tcp | findStr "
+ ports.stream().map(v -> "/C:'127.0.0.1:" + v + "'").collect(Collectors.joining(" "));
}

private static String getUnixSearchPortCmd(List<Integer> ports) {
String cmd = "lsof -iTCP -sTCP:LISTEN -P -n | awk '{print $9}' | grep -E ";
return cmd + ports.stream().map(String::valueOf).collect(Collectors.joining("|")) + "\"";
private static String getUnixSearchPortCmd(final List<Integer> ports) {
return "lsof -iTCP -sTCP:LISTEN -P -n | awk '{print $9}' | grep -E "
+ ports.stream().map(String::valueOf).collect(Collectors.joining("|"))
+ "\"";
}

private static Pair<Integer, Integer> getClusterNodesNum(int index) {
String valueStr = System.getProperty(CLUSTER_CONFIGURATIONS);
private static Pair<Integer, Integer> getClusterNodesNum(final int index) {
final String valueStr = System.getProperty(CLUSTER_CONFIGURATIONS);
if (valueStr == null) {
return null;
}
Expand All @@ -154,17 +150,17 @@ private static Pair<Integer, Integer> getClusterNodesNum(int index) {
// Print nothing to avoid polluting test outputs
return null;
}
} catch (NumberFormatException ignore) {
} catch (final NumberFormatException ignore) {
return null;
}
}

public static String getLockFilePath(int port) {
public static String getLockFilePath(final int port) {
return LOCK_FILE_PATH + port;
}

public static Pair<Integer, Integer> getNodeNum() {
Pair<Integer, Integer> nodesNum = getClusterNodesNum(0);
final Pair<Integer, Integer> nodesNum = getClusterNodesNum(0);
if (nodesNum != null) {
return nodesNum;
}
Expand All @@ -173,8 +169,8 @@ public static Pair<Integer, Integer> getNodeNum() {
getIntFromSysVar(DEFAULT_DATA_NODE_NUM, 3, 0));
}

public static Pair<Integer, Integer> getNodeNum(int index) {
Pair<Integer, Integer> nodesNum = getClusterNodesNum(index);
public static Pair<Integer, Integer> getNodeNum(final int index) {
final Pair<Integer, Integer> nodesNum = getClusterNodesNum(index);
if (nodesNum != null) {
return nodesNum;
}
Expand All @@ -183,38 +179,38 @@ public static Pair<Integer, Integer> getNodeNum(int index) {
getIntFromSysVar(DEFAULT_DATA_NODE_NUM, 3, index));
}

public static String getFilePathFromSysVar(String key, int index) {
String valueStr = System.getProperty(key);
public static String getFilePathFromSysVar(final String key, final int index) {
final String valueStr = System.getProperty(key);
if (valueStr == null) {
return null;
}
return System.getProperty(USER_DIR) + getValueOfIndex(valueStr, index);
}

public static int getIntFromSysVar(String key, int defaultValue, int index) {
String valueStr = System.getProperty(key);
public static int getIntFromSysVar(final String key, final int defaultValue, final int index) {
final String valueStr = System.getProperty(key);
if (valueStr == null) {
return defaultValue;
}

String value = getValueOfIndex(valueStr, index);
final String value = getValueOfIndex(valueStr, index);
try {
return Integer.parseInt(value);
} catch (NumberFormatException e) {
} catch (final NumberFormatException e) {
throw new IllegalArgumentException("Invalid property value: " + value + " of key " + key);
}
}

public static String getValueOfIndex(String valueStr, int index) {
String[] values = valueStr.split(DELIMITER);
public static String getValueOfIndex(final String valueStr, final int index) {
final String[] values = valueStr.split(DELIMITER);
return index <= values.length - 1 ? values[index] : values[values.length - 1];
}

public static String getTimeForLogDirectory(long startTime) {
public static String getTimeForLogDirectory(final long startTime) {
return convertLongToDate(startTime, "ms").replace(":", DIR_TIME_REPLACEMENT);
}

public static String fromConsensusFullNameToAbbr(String consensus) {
public static String fromConsensusFullNameToAbbr(final String consensus) {
switch (consensus) {
case SIMPLE_CONSENSUS:
return SIMPLE_CONSENSUS_STR;
Expand All @@ -233,7 +229,7 @@ public static String fromConsensusFullNameToAbbr(String consensus) {
}
}

public static String fromConsensusAbbrToFullName(String consensus) {
public static String fromConsensusAbbrToFullName(final String consensus) {
switch (consensus) {
case SIMPLE_CONSENSUS_STR:
return SIMPLE_CONSENSUS;
Expand Down
Loading
Loading