Skip to content

Commit

Permalink
Remove datanode optimization (apache#13559)
Browse files Browse the repository at this point in the history
  • Loading branch information
HxpSerein authored Sep 30, 2024
1 parent b488a79 commit cc73946
Show file tree
Hide file tree
Showing 31 changed files with 1,555 additions and 616 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ private static String buildRegionMigrateCommand(int who, int from, int to) {
return result;
}

private static Map<Integer, Set<Integer>> getRegionMap(ResultSet showRegionsResult)
public static Map<Integer, Set<Integer>> getRegionMap(ResultSet showRegionsResult)
throws SQLException {
Map<Integer, Set<Integer>> regionMap = new HashMap<>();
while (showRegionsResult.next()) {
Expand Down Expand Up @@ -727,7 +727,7 @@ protected static <T extends Enum<?>> KeySetView<String, Boolean> buildSet(T... k
return result;
}

private static <T> T closeQuietly(T t) {
public static <T> T closeQuietly(T t) {
InvocationHandler handler =
(proxy, method, args) -> {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.it.removedatanode;

import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.itbase.exception.InconsistentDataException;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.thrift.TException;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateReliabilityITFramework.closeQuietly;
import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateReliabilityITFramework.getRegionMap;

public class IoTDBRemoveDataNodeITFramework {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBRemoveDataNodeITFramework.class);
private static final String INSERTION1 =
"INSERT INTO root.sg.d1(timestamp,speed,temperature) values(100, 1, 2)";

private static final String SHOW_REGIONS = "show regions";
private static final String SHOW_DATANODES = "show datanodes";

private static final String defaultSchemaRegionGroupExtensionPolicy = "CUSTOM";
private static final String defaultDataRegionGroupExtensionPolicy = "CUSTOM";

@Before
public void setUp() throws Exception {
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
.setSchemaRegionGroupExtensionPolicy(defaultSchemaRegionGroupExtensionPolicy)
.setDataRegionGroupExtensionPolicy(defaultDataRegionGroupExtensionPolicy);
}

@After
public void tearDown() throws InterruptedException {
EnvFactory.getEnv().cleanClusterEnvironment();
}

public void successTest(
final int dataReplicateFactor,
final int schemaReplicationFactor,
final int configNodeNum,
final int dataNodeNum,
final int removeDataNodeNum,
final int dataRegionPerDataNode)
throws Exception {
testRemoveDataNode(
dataReplicateFactor,
schemaReplicationFactor,
configNodeNum,
dataNodeNum,
removeDataNodeNum,
dataRegionPerDataNode,
true);
}

public void failTest(
final int dataReplicateFactor,
final int schemaReplicationFactor,
final int configNodeNum,
final int dataNodeNum,
final int removeDataNodeNum,
final int dataRegionPerDataNode)
throws Exception {
testRemoveDataNode(
dataReplicateFactor,
schemaReplicationFactor,
configNodeNum,
dataNodeNum,
removeDataNodeNum,
dataRegionPerDataNode,
false);
}

public void testRemoveDataNode(
final int dataReplicateFactor,
final int schemaReplicationFactor,
final int configNodeNum,
final int dataNodeNum,
final int removeDataNodeNum,
final int dataRegionPerDataNode,
final boolean expectRemoveSuccess)
throws Exception {
// Set up the environment
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
.setSchemaReplicationFactor(schemaReplicationFactor)
.setDataReplicationFactor(dataReplicateFactor)
.setDefaultDataRegionGroupNumPerDatabase(
dataRegionPerDataNode * dataNodeNum / dataReplicateFactor);
EnvFactory.getEnv().initClusterEnvironment(configNodeNum, dataNodeNum);

try (final Connection connection = closeQuietly(EnvFactory.getEnv().getConnection());
final Statement statement = closeQuietly(connection.createStatement());
SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {

// Insert data
statement.execute(INSERTION1);

ResultSet result = statement.executeQuery(SHOW_REGIONS);
Map<Integer, Set<Integer>> regionMap = getRegionMap(result);

// Get all data nodes
result = statement.executeQuery(SHOW_DATANODES);
Set<Integer> allDataNodeId = new HashSet<>();
while (result.next()) {
allDataNodeId.add(result.getInt(ColumnHeaderConstant.NODE_ID));
}

// Select data nodes to remove
final Set<Integer> removeDataNodes =
selectRemoveDataNodes(allDataNodeId, regionMap, removeDataNodeNum);

AtomicReference<SyncConfigNodeIServiceClient> clientRef = new AtomicReference<>(client);
List<TDataNodeLocation> removeDataNodeLocations =
clientRef
.get()
.getDataNodeConfiguration(-1)
.getDataNodeConfigurationMap()
.values()
.stream()
.map(TDataNodeConfiguration::getLocation)
.filter(location -> removeDataNodes.contains(location.getDataNodeId()))
.collect(Collectors.toList());
TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(removeDataNodeLocations);

// Remove data nodes
TDataNodeRemoveResp removeResp = clientRef.get().removeDataNode(removeReq);
LOGGER.info("Submit Remove DataNodes result {} ", removeResp);
if (removeResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
if (expectRemoveSuccess) {
LOGGER.error("Submit Remove DataNodes fail");
Assert.fail();
} else {
LOGGER.info("Submit Remove DataNodes fail, as expected");
return;
}
}
LOGGER.info("Submit Remove DataNodes request: {}", removeReq);

// Wait until success
boolean removeSuccess = false;
try {
awaitUntilSuccess(clientRef, removeDataNodeLocations);
removeSuccess = true;
} catch (ConditionTimeoutException e) {
if (expectRemoveSuccess) {
LOGGER.error("Remove DataNodes timeout in 2 minutes");
Assert.fail();
}
}

if (!expectRemoveSuccess && removeSuccess) {
LOGGER.error("Remove DataNodes success, but expect fail");
Assert.fail();
}

LOGGER.info("Remove DataNodes success");
} catch (InconsistentDataException ignored) {

}
}

private static Set<Integer> selectRemoveDataNodes(
Set<Integer> allDataNodeId, Map<Integer, Set<Integer>> regionMap, int removeDataNodeNum) {
Set<Integer> removeDataNodeIds = new HashSet<>();
for (int i = 0; i < removeDataNodeNum; i++) {
int removeDataNodeId = allDataNodeId.iterator().next();
removeDataNodeIds.add(removeDataNodeId);
allDataNodeId.remove(removeDataNodeId);
}
return removeDataNodeIds;
}

private static void awaitUntilSuccess(
AtomicReference<SyncConfigNodeIServiceClient> clientRef,
List<TDataNodeLocation> removeDataNodeLocations) {
AtomicReference<List<TDataNodeLocation>> lastTimeDataNodeLocations = new AtomicReference<>();
AtomicReference<Exception> lastException = new AtomicReference<>();

try {
Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.pollDelay(2, TimeUnit.SECONDS)
.until(
() -> {
try {
List<TDataNodeLocation> remainingDataNodes =
clientRef
.get()
.getDataNodeConfiguration(-1)
.getDataNodeConfigurationMap()
.values()
.stream()
.map(TDataNodeConfiguration::getLocation)
.collect(Collectors.toList());
lastTimeDataNodeLocations.set(remainingDataNodes);
for (TDataNodeLocation location : removeDataNodeLocations) {
if (remainingDataNodes.contains(location)) {
return false;
}
}
return true;
} catch (TException e) {
clientRef.set(
(SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection());
lastException.set(e);
return false;
} catch (Exception e) {
// Any exception can be ignored
lastException.set(e);
return false;
}
});
} catch (ConditionTimeoutException e) {
if (lastTimeDataNodeLocations.get() == null) {
LOGGER.error(
"Maybe getDataNodeConfiguration fail, lastTimeDataNodeLocations is null, last Exception:",
lastException.get());
throw e;
}
String actualSetStr = lastTimeDataNodeLocations.get().toString();
lastTimeDataNodeLocations.get().removeAll(removeDataNodeLocations);
String expectedSetStr = lastTimeDataNodeLocations.get().toString();
LOGGER.error(
"Remove DataNodes timeout in 2 minutes, expected set: {}, actual set: {}",
expectedSetStr,
actualSetStr);
if (lastException.get() == null) {
LOGGER.info("No exception during awaiting");
} else {
LOGGER.error("Last exception during awaiting:", lastException.get());
}
throw e;
}

LOGGER.info("DataNodes has been successfully changed to {}", lastTimeDataNodeLocations.get());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.it.removedatanode;

import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;

import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@Category({ClusterIT.class})
@RunWith(IoTDBTestRunner.class)
public class IoTDBRemoveDataNodeNormalIT extends IoTDBRemoveDataNodeITFramework {
@Test
public void success1C4DTest() throws Exception {
successTest(2, 3, 1, 4, 1, 2);
}

@Test
public void fail1C3DTest() throws Exception {
failTest(2, 3, 1, 3, 1, 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
public enum CnToDnRequestType {

// Node Maintenance
DISABLE_DATA_NODE,
CLEAN_DATA_NODE_CACHE,
STOP_DATA_NODE,

FLUSH,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.iotdb.mpp.rpc.thrift.TAlterViewReq;
import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCleanDataNodeCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListWithTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TConstructViewSchemaBlackListReq;
Expand Down Expand Up @@ -363,6 +364,14 @@ protected void initActionMapBuilder() {
CnToDnRequestType.UPDATE_TABLE,
(req, client, handler) ->
client.updateTable((TUpdateTableReq) req, (DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
CnToDnRequestType.CLEAN_DATA_NODE_CACHE,
(req, client, handler) ->
client.cleanDataNodeCache(
(TCleanDataNodeCacheReq) req, (DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
CnToDnRequestType.STOP_DATA_NODE,
(req, client, handler) -> client.stopDataNode((DataNodeTSStatusRPCHandler) handler));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ public static DataNodeAsyncRequestRPCHandler<?> buildHandler(
case FULL_MERGE:
case FLUSH:
case CLEAR_CACHE:
case CLEAN_DATA_NODE_CACHE:
case STOP_DATA_NODE:
case START_REPAIR_DATA:
case STOP_REPAIR_DATA:
case LOAD_CONFIGURATION:
Expand Down
Loading

0 comments on commit cc73946

Please sign in to comment.