Skip to content

Commit

Permalink
HDFS-16663. Allow block reconstruction pending timeout to be refresha…
Browse files Browse the repository at this point in the history
…ble (apache#4567)

Reviewed-by: Hiroyuki Adachi <hadachi@yahoo-corp.jp>
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
  • Loading branch information
lfxy authored and zhanghaobo@kanzhun.com committed Aug 21, 2023
1 parent 860e9ea commit f6266fa
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,26 @@ public void setBlocksReplWorkMultiplier(int newVal) {
blocksReplWorkMultiplier = newVal;
}

/**
* Updates the value used for pendingReconstruction timeout, which is set by
* {@code DFSConfigKeys.
* DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY} initially.
*
* @param newVal - Must be a positive non-zero integer.
*/
public void setReconstructionPendingTimeout(int newVal) {
ensurePositiveInt(newVal,
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY);
pendingReconstruction.setTimeout(newVal * 1000L);
}

/** Returns the current setting for pendingReconstruction timeout, set by
* {@code DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY}.
*/
public int getReconstructionPendingTimeout() {
return (int)(pendingReconstruction.getTimeout() / 1000L);
}

public int getDefaultStorageNum(BlockInfo block) {
switch (block.getBlockType()) {
case STRIPED: return ((BlockInfoStriped) block).getRealTotalBlockNum();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ void start() {
timerThread.start();
}

public void setTimeout(long timeoutPeriod) {
this.timeout = timeoutPeriod;
}

public long getTimeout() {
return this.timeout;
}

/**
* Add a block to the list of pending reconstructions
* @param block The corresponding block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT;

import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
Expand Down Expand Up @@ -342,7 +344,8 @@ public enum OperationCategory {
DFS_IMAGE_PARALLEL_LOAD_KEY,
DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY,
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY));
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY,
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY));

private static final String USAGE = "Usage: hdfs namenode ["
+ StartupOption.BACKUP.getName() + "] | \n\t["
Expand Down Expand Up @@ -2245,7 +2248,8 @@ protected String reconfigurePropertyImpl(String property, String newVal)
} else if (property.equals(DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY)
|| property.equals(DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY)
|| property.equals(
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)) {
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)
|| property.equals(DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY)) {
return reconfReplicationParameters(newVal, property);
} else if (property.equals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY) || property
.equals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY)) {
Expand Down Expand Up @@ -2287,6 +2291,14 @@ private String reconfReplicationParameters(final String newVal,
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT,
newVal));
newSetting = bm.getBlocksReplWorkMultiplier();
} else if (
property.equals(
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY)) {
bm.setReconstructionPendingTimeout(
adjustNewVal(
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT,
newVal));
newSetting = bm.getReconstructionPendingTimeout();
} else {
throw new IllegalArgumentException("Unexpected property " +
property + " in reconfReplicationParameters");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public void setup() throws IOException {
config.setInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
12);
config.setInt(
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
300);

cluster = new MiniDFSCluster.Builder(config)
.nnTopology(MiniDFSNNTopology.simpleSingleNN(0, 0))
Expand All @@ -72,6 +75,7 @@ public void testParamsCanBeReconfigured() throws ReconfigurationException {
assertEquals(8, bm.getMaxReplicationStreams());
assertEquals(10, bm.getReplicationStreamsHardLimit());
assertEquals(12, bm.getBlocksReplWorkMultiplier());
assertEquals(300, bm.getReconstructionPendingTimeout());

cluster.getNameNode().reconfigurePropertyImpl(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, "20");
Expand All @@ -81,10 +85,14 @@ public void testParamsCanBeReconfigured() throws ReconfigurationException {
cluster.getNameNode().reconfigurePropertyImpl(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
"24");
cluster.getNameNode().reconfigurePropertyImpl(
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
"180");

assertEquals(20, bm.getMaxReplicationStreams());
assertEquals(22, bm.getReplicationStreamsHardLimit());
assertEquals(24, bm.getBlocksReplWorkMultiplier());
assertEquals(180, bm.getReconstructionPendingTimeout());
}

/**
Expand All @@ -96,7 +104,8 @@ public void testReconfigureFailsWithInvalidValues() throws Exception {
String[] keys = new String[]{
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY
};

// Ensure we cannot set any of the parameters negative
Expand All @@ -112,6 +121,7 @@ public void testReconfigureFailsWithInvalidValues() throws Exception {
assertEquals(8, bm.getMaxReplicationStreams());
assertEquals(10, bm.getReplicationStreamsHardLimit());
assertEquals(12, bm.getBlocksReplWorkMultiplier());
assertEquals(300, bm.getReconstructionPendingTimeout());

for (String key : keys) {
ReconfigurationException e =
Expand All @@ -126,6 +136,7 @@ public void testReconfigureFailsWithInvalidValues() throws Exception {
assertEquals(8, bm.getMaxReplicationStreams());
assertEquals(10, bm.getReplicationStreamsHardLimit());
assertEquals(12, bm.getBlocksReplWorkMultiplier());
assertEquals(300, bm.getReconstructionPendingTimeout());

// Ensure none of the parameters can be set to a string value
for (String key : keys) {
Expand All @@ -139,5 +150,6 @@ public void testReconfigureFailsWithInvalidValues() throws Exception {
assertEquals(8, bm.getMaxReplicationStreams());
assertEquals(10, bm.getReplicationStreamsHardLimit());
assertEquals(12, bm.getBlocksReplWorkMultiplier());
assertEquals(300, bm.getReconstructionPendingTimeout());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;

import java.util.function.Supplier;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
Expand Down Expand Up @@ -90,7 +87,6 @@
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.is;
Expand Down Expand Up @@ -482,15 +478,12 @@ public void testNameNodeGetReconfigurableProperties() throws IOException {
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("namenode", address, outs, errs);
assertEquals(16, outs.size());
assertEquals(17, outs.size());
assertTrue(outs.get(0).contains("Reconfigurable properties:"));
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(1));
assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(2));
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(3));
assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(4));
assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(5));
assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, outs.get(6));
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(7));
assertEquals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, outs.get(8));
System.out.println(outs);
assertEquals(errs.size(), 0);
}

Expand Down Expand Up @@ -1114,4 +1107,51 @@ public Integer run() throws Exception {
}
});
}
}

@Test
public void testAllDatanodesReconfig()
throws IOException, InterruptedException, TimeoutException {
ReconfigurationUtil reconfigurationUtil = mock(ReconfigurationUtil.class);
cluster.getDataNodes().get(0).setReconfigurationUtil(reconfigurationUtil);
cluster.getDataNodes().get(1).setReconfigurationUtil(reconfigurationUtil);

List<ReconfigurationUtil.PropertyChange> changes = new ArrayList<>();
changes.add(new ReconfigurationUtil.PropertyChange(
DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true",
datanode.getConf().get(DFS_DATANODE_PEER_STATS_ENABLED_KEY)));
when(reconfigurationUtil.parseChangedProperties(any(Configuration.class),
any(Configuration.class))).thenReturn(changes);

assertEquals(0, admin.startReconfiguration("datanode", "livenodes"));
final List<String> outsForStartReconf = new ArrayList<>();
final List<String> errsForStartReconf = new ArrayList<>();
reconfigurationOutErrFormatter("startReconfiguration", "datanode",
"livenodes", outsForStartReconf, errsForStartReconf);
assertEquals(3, outsForStartReconf.size());
assertEquals(0, errsForStartReconf.size());
assertTrue(outsForStartReconf.get(0).startsWith("Started reconfiguration task on node"));
assertTrue(outsForStartReconf.get(1).startsWith("Started reconfiguration task on node"));
assertEquals("Starting of reconfiguration task successful on 2 nodes, failed on 0 nodes.",
outsForStartReconf.get(2));

Thread.sleep(1000);
final List<String> outs = new ArrayList<>();
final List<String> errs = new ArrayList<>();
awaitReconfigurationFinished("datanode", "livenodes", outs, errs);
assertEquals(9, outs.size());
assertEquals(0, errs.size());
LOG.info("dfsadmin -status -livenodes output:");
outs.forEach(s -> LOG.info("{}", s));
assertTrue(outs.get(0).startsWith("Reconfiguring status for node"));
assertTrue("SUCCESS: Changed property dfs.datanode.peer.stats.enabled".equals(outs.get(2))
|| "SUCCESS: Changed property dfs.datanode.peer.stats.enabled".equals(outs.get(1)));
assertTrue("\tFrom: \"false\"".equals(outs.get(3)) || "\tFrom: \"false\"".equals(outs.get(2)));
assertTrue("\tTo: \"true\"".equals(outs.get(4)) || "\tTo: \"true\"".equals(outs.get(3)));
assertEquals("SUCCESS: Changed property dfs.datanode.peer.stats.enabled", outs.get(5));
assertEquals("\tFrom: \"false\"", outs.get(6));
assertEquals("\tTo: \"true\"", outs.get(7));
assertEquals("Retrieval of reconfiguration status successful on 2 nodes, failed on 0 nodes.",
outs.get(8));
}

}

0 comments on commit f6266fa

Please sign in to comment.