Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IT framework supports restarting cluster #12022

Merged
merged 10 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,12 @@ public CommonConfig setDriverTaskExecutionTimeSliceInMs(long driverTaskExecution
return this;
}

@Override
public CommonConfig setWalMode(String walMode) {
setProperty("wal_mode", walMode);
return this;
}

// For part of the log directory
public String getClusterConfigStr() {
return fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,4 +424,11 @@ public CommonConfig setDriverTaskExecutionTimeSliceInMs(long driverTaskExecution
cnConfig.setDriverTaskExecutionTimeSliceInMs(driverTaskExecutionTimeSliceInMs);
return this;
}

@Override
public CommonConfig setWalMode(String walMode) {
dnConfig.setWalMode(walMode);
cnConfig.setWalMode(walMode);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ public abstract class AbstractEnv implements BaseEnv {
protected String testMethodName = null;
protected int index = 0;
protected long startTime;
protected int testWorkingRetryCount = 30;

protected int retryCount = 30;
private IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;

/**
Expand Down Expand Up @@ -133,11 +132,11 @@ public List<String> getMetricPrometheusReporterContents() {
}

protected void initEnvironment(int configNodesNum, int dataNodesNum) {
initEnvironment(configNodesNum, dataNodesNum, 30);
initEnvironment(configNodesNum, dataNodesNum, retryCount);
}

protected void initEnvironment(int configNodesNum, int dataNodesNum, int testWorkingRetryCount) {
this.testWorkingRetryCount = testWorkingRetryCount;
protected void initEnvironment(int configNodesNum, int dataNodesNum, int retryCount) {
this.retryCount = retryCount;
this.configNodeWrapperList = new ArrayList<>();
this.dataNodeWrapperList = new ArrayList<>();

Expand Down Expand Up @@ -245,7 +244,7 @@ protected void initEnvironment(int configNodesNum, int dataNodesNum, int testWor
throw new AssertionError();
}

testWorkingNoUnknown();
checkClusterStatusWithoutUnknown();
}

public String getTestClassName() {
Expand All @@ -265,64 +264,34 @@ private Map<String, Integer> countNodeStatus(Map<Integer, String> nodeStatus) {
return result;
}

public void testWorkingNoUnknown() {
testWorking(nodeStatusMap -> nodeStatusMap.values().stream().noneMatch("Unknown"::equals));
public boolean checkClusterStatusWithoutUnknown() {
return checkClusterStatus(
nodeStatusMap -> nodeStatusMap.values().stream().noneMatch("Unknown"::equals))
&& testJDBCConnection();
}

public void testWorkingOneUnknownOtherRunning() {
testWorking(
nodeStatus -> {
Map<String, Integer> count = countNodeStatus(nodeStatus);
return count.getOrDefault("Unknown", 0) == 1
&& count.getOrDefault("Running", 0) == nodeStatus.size() - 1;
});
public boolean checkClusterStatusOneUnknownOtherRunning() {
return checkClusterStatus(
nodeStatus -> {
Map<String, Integer> count = countNodeStatus(nodeStatus);
return count.getOrDefault("Unknown", 0) == 1
&& count.getOrDefault("Running", 0) == nodeStatus.size() - 1;
})
&& testJDBCConnection();
}

public void testWorking(Predicate<Map<Integer, String>> statusCheck) {
logger.info("Testing DataNode connection...");
List<String> endpoints =
dataNodeWrapperList.stream()
.map(DataNodeWrapper::getIpAndPortString)
.collect(Collectors.toList());
RequestDelegate<Void> testDelegate =
new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT);
for (DataNodeWrapper dataNode : dataNodeWrapperList) {
final String dataNodeEndpoint = dataNode.getIpAndPortString();
testDelegate.addRequest(
() -> {
Exception lastException = null;
for (int i = 0; i < testWorkingRetryCount; i++) {
try (Connection ignored = getConnection(dataNodeEndpoint, PROBE_TIMEOUT_MS)) {
logger.info("Successfully connecting to DataNode: {}.", dataNodeEndpoint);
return null;
} catch (Exception e) {
lastException = e;
TimeUnit.SECONDS.sleep(1L);
}
}
throw lastException;
});
}
try {
long startTime = System.currentTimeMillis();
testDelegate.requestAll();
if (!configNodeWrapperList.isEmpty()) {
checkNodeHeartbeat(statusCheck);
}
logger.info("Start cluster costs: {}s", (System.currentTimeMillis() - startTime) / 1000.0);
} catch (Exception e) {
logger.error("exception in testWorking of ClusterID, message: {}", e.getMessage(), e);
throw new AssertionError(
String.format("After %d times retry, the cluster can't work!", testWorkingRetryCount));
}
}

private void checkNodeHeartbeat(Predicate<Map<Integer, String>> statusCheck) throws Exception {
/**
* Returns whether the all nodes' status all match the provided predicate. check nodes with RPC
*
* @param statusCheck the predicate to test the status of nodes
* @return {@code true} if all nodes' status of the cluster match the provided predicate,
* otherwise {@code false}
*/
public boolean checkClusterStatus(Predicate<Map<Integer, String>> statusCheck) {
logger.info("Testing cluster environment...");
TShowClusterResp showClusterResp;
Exception lastException = null;
boolean flag;
for (int i = 0; i < 30; i++) {
for (int i = 0; i < retryCount; i++) {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) {
flag = true;
Expand All @@ -347,18 +316,26 @@ private void checkNodeHeartbeat(Predicate<Map<Integer, String>> statusCheck) thr

if (flag) {
logger.info("The cluster is now ready for testing!");
return;
return true;
}
} catch (Exception e) {
lastException = e;
}
TimeUnit.SECONDS.sleep(1L);
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
lastException = e;
Thread.currentThread().interrupt();
}
}

if (lastException != null) {
throw lastException;
logger.error(
"exception in testWorking of ClusterID, message: {}",
lastException.getMessage(),
lastException);
}
throw new Exception("Check not pass");
logger.info("checkNodeHeartbeat failed after {} retries", retryCount);
return false;
}

@Override
Expand Down Expand Up @@ -392,18 +369,6 @@ public Connection getConnectionWithSpecifiedDataNode(
getReadConnections(null, username, password));
}

private Connection getConnection(String endpoint, int queryTimeout) throws SQLException {
IoTDBConnection connection =
(IoTDBConnection)
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX + endpoint + getParam(null, queryTimeout),
System.getProperty("User", "root"),
System.getProperty("Password", "root"));
connection.setQueryTimeout(queryTimeout);

return connection;
}

@Override
public Connection getConnection(Constant.Version version, String username, String password)
throws SQLException {
Expand Down Expand Up @@ -519,6 +484,54 @@ protected List<NodeConnection> getReadConnections(
return readConnRequestDelegate.requestAll();
}

// use this to avoid some runtimeExceptions when try to get jdbc connections.
// because it is hard to add retry and handle exception when getting jdbc connections in
// getWriteConnectionWithSpecifiedDataNode and getReadConnections.
// so use this function to add retry when cluster is ready.
protected boolean testJDBCConnection() {
logger.info("Testing JDBC connection...");
List<String> endpoints =
dataNodeWrapperList.stream()
.map(DataNodeWrapper::getIpAndPortString)
.collect(Collectors.toList());
RequestDelegate<Void> testDelegate =
new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT);
for (DataNodeWrapper dataNode : dataNodeWrapperList) {
final String dataNodeEndpoint = dataNode.getIpAndPortString();
testDelegate.addRequest(
() -> {
Exception lastException = null;
for (int i = 0; i < retryCount; i++) {
try (IoTDBConnection ignored =
(IoTDBConnection)
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX
+ dataNodeEndpoint
+ getParam(null, NODE_NETWORK_TIMEOUT_MS),
System.getProperty("User", "root"),
System.getProperty("Password", "root"))) {
logger.info("Successfully connecting to DataNode: {}.", dataNodeEndpoint);
return null;
} catch (Exception e) {
lastException = e;
TimeUnit.SECONDS.sleep(1L);
}
}
if (lastException != null) {
throw lastException;
}
return null;
});
}
try {
testDelegate.requestAll();
} catch (Exception e) {
logger.error("Failed to connect to DataNode", e);
return false;
}
return true;
}

private String getParam(Constant.Version version, int timeout) {
StringBuilder sb = new StringBuilder("?");
sb.append(Config.NETWORK_TIMEOUT).append("=").append(timeout);
Expand Down Expand Up @@ -570,7 +583,7 @@ public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection()
throws IOException, InterruptedException {
Exception lastException = null;
ConfigNodeWrapper lastErrorNode = null;
for (int i = 0; i < 30; i++) {
for (int i = 0; i < retryCount; i++) {
for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
try {
lastErrorNode = configNodeWrapper;
Expand Down Expand Up @@ -632,7 +645,7 @@ public IConfigNodeRPCService.Iface getConfigNodeConnection(int index) throws Exc
public int getLeaderConfigNodeIndex() throws IOException, InterruptedException {
Exception lastException = null;
ConfigNodeWrapper lastErrorNode = null;
for (int retry = 0; retry < 30; retry++) {
for (int retry = 0; retry < retryCount; retry++) {
for (int configNodeId = 0; configNodeId < configNodeWrapperList.size(); configNodeId++) {
ConfigNodeWrapper configNodeWrapper = configNodeWrapperList.get(configNodeId);
lastErrorNode = configNodeWrapper;
Expand Down Expand Up @@ -675,11 +688,25 @@ public void startConfigNode(int index) {
configNodeWrapperList.get(index).start();
}

@Override
public void startAllConfigNodes() {
for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
configNodeWrapper.start();
}
}

@Override
public void shutdownConfigNode(int index) {
configNodeWrapperList.get(index).stop();
}

@Override
public void shutdownAllConfigNodes() {
for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
configNodeWrapper.stop();
}
}

@Override
public ConfigNodeWrapper getConfigNodeWrapper(int index) {
return configNodeWrapperList.get(index);
Expand Down Expand Up @@ -765,7 +792,7 @@ public void registerNewConfigNode(ConfigNodeWrapper newConfigNodeWrapper, boolea

if (isNeedVerify) {
// Test whether register success
testWorkingNoUnknown();
checkClusterStatusWithoutUnknown();
}
}

Expand All @@ -790,7 +817,7 @@ public void registerNewDataNode(DataNodeWrapper newDataNodeWrapper, boolean isNe

if (isNeedVerify) {
// Test whether register success
testWorkingNoUnknown();
checkClusterStatusWithoutUnknown();
}
}

Expand All @@ -799,16 +826,30 @@ public void startDataNode(int index) {
dataNodeWrapperList.get(index).start();
}

@Override
public void startAllDataNodes() {
for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) {
dataNodeWrapper.start();
}
}

@Override
public void shutdownDataNode(int index) {
dataNodeWrapperList.get(index).stop();
}

@Override
public void shutdownAllDataNodes() {
for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) {
dataNodeWrapper.stop();
}
}

@Override
public void ensureNodeStatus(List<BaseNodeWrapper> nodes, List<NodeStatus> targetStatus)
throws IllegalStateException {
Throwable lastException = null;
for (int i = 0; i < 30; i++) {
for (int i = 0; i < retryCount; i++) {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
List<String> errorMessages = new ArrayList<>(nodes.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,4 +298,9 @@ public CommonConfig setPipeAirGapReceiverEnabled(boolean isPipeAirGapReceiverEna
public CommonConfig setDriverTaskExecutionTimeSliceInMs(long driverTaskExecutionTimeSliceInMs) {
return this;
}

@Override
public CommonConfig setWalMode(String walMode) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,21 @@ public void startConfigNode(int index) {
throw new UnsupportedOperationException();
}

@Override
public void startAllConfigNodes() {
throw new UnsupportedOperationException();
}

@Override
public void shutdownConfigNode(int index) {
throw new UnsupportedOperationException();
}

@Override
public void shutdownAllConfigNodes() {
throw new UnsupportedOperationException();
}

@Override
public void ensureNodeStatus(List<BaseNodeWrapper> nodes, List<NodeStatus> targetStatus) {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -299,11 +309,21 @@ public void startDataNode(int index) {
throw new UnsupportedOperationException();
}

@Override
public void startAllDataNodes() {
throw new UnsupportedOperationException();
}

@Override
public void shutdownDataNode(int index) {
throw new UnsupportedOperationException();
}

@Override
public void shutdownAllDataNodes() {
throw new UnsupportedOperationException();
}

@Override
public int getMqttPort() {
throw new UnsupportedOperationException();
Expand Down
Loading
Loading