From f6266faa9e78f9a1c98713fd7d6e4ca1e2cbb2f7 Mon Sep 17 00:00:00 2001 From: caozhiqiang Date: Mon, 12 Sep 2022 10:45:01 +0800 Subject: [PATCH] HDFS-16663. Allow block reconstruction pending timeout to be refreshable (#4567) Reviewed-by: Hiroyuki Adachi Signed-off-by: Takanobu Asanuma --- .../server/blockmanagement/BlockManager.java | 20 ++++++ .../PendingReconstructionBlocks.java | 8 +++ .../hadoop/hdfs/server/namenode/NameNode.java | 16 ++++- .../TestRefreshNamenodeReplicationConfig.java | 16 ++++- .../hadoop/hdfs/tools/TestDFSAdmin.java | 64 +++++++++++++++---- 5 files changed, 108 insertions(+), 16 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 98adedded07e91..57a2acaae1c5a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1027,6 +1027,26 @@ public void setBlocksReplWorkMultiplier(int newVal) { blocksReplWorkMultiplier = newVal; } + /** + * Updates the value used for pendingReconstruction timeout, which is set by + * {@code DFSConfigKeys. + * DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY} initially. + * + * @param newVal - Must be a positive non-zero integer. + */ + public void setReconstructionPendingTimeout(int newVal) { + ensurePositiveInt(newVal, + DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY); + pendingReconstruction.setTimeout(newVal * 1000L); + } + + /** Returns the current setting for pendingReconstruction timeout, set by + * {@code DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY}. + */ + public int getReconstructionPendingTimeout() { + return (int)(pendingReconstruction.getTimeout() / 1000L); + } + public int getDefaultStorageNum(BlockInfo block) { switch (block.getBlockType()) { case STRIPED: return ((BlockInfoStriped) block).getRealTotalBlockNum(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java index 81495ebaf249a0..6a29199ef6494a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java @@ -76,6 +76,14 @@ void start() { timerThread.start(); } + public void setTimeout(long timeoutPeriod) { + this.timeout = timeoutPeriod; + } + + public long getTimeout() { + return this.timeout; + } + /** * Add a block to the list of pending reconstructions * @param block The corresponding block diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 64569927684410..bebddbc9b0c029 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -198,6 +198,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT; import static org.apache.hadoop.util.ExitUtil.terminate; import static org.apache.hadoop.util.ToolRunner.confirmPrompt; @@ -342,7 +344,8 @@ public enum OperationCategory { DFS_IMAGE_PARALLEL_LOAD_KEY, DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, - DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY)); + DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, + DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY)); private static final String USAGE = "Usage: hdfs namenode [" + StartupOption.BACKUP.getName() + "] | \n\t[" @@ -2245,7 +2248,8 @@ protected String reconfigurePropertyImpl(String property, String newVal) } else if (property.equals(DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY) || property.equals(DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY) || property.equals( - DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)) { + DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION) + || property.equals(DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY)) { return reconfReplicationParameters(newVal, property); } else if (property.equals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY) || property .equals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY)) { @@ -2287,6 +2291,14 @@ private String reconfReplicationParameters(final String newVal, DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT, newVal)); newSetting = bm.getBlocksReplWorkMultiplier(); + } else if ( + property.equals( + DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY)) { + bm.setReconstructionPendingTimeout( + adjustNewVal( + DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT, + newVal)); + newSetting = bm.getReconstructionPendingTimeout(); } else { throw new IllegalArgumentException("Unexpected property " + property + " in reconfReplicationParameters"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshNamenodeReplicationConfig.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshNamenodeReplicationConfig.java index 8dc81f8c1a21d3..8336a432b9a41e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshNamenodeReplicationConfig.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshNamenodeReplicationConfig.java @@ -49,6 +49,9 @@ public void setup() throws IOException { config.setInt( DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 12); + config.setInt( + DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, + 300); cluster = new MiniDFSCluster.Builder(config) .nnTopology(MiniDFSNNTopology.simpleSingleNN(0, 0)) @@ -72,6 +75,7 @@ public void testParamsCanBeReconfigured() throws ReconfigurationException { assertEquals(8, bm.getMaxReplicationStreams()); assertEquals(10, bm.getReplicationStreamsHardLimit()); assertEquals(12, bm.getBlocksReplWorkMultiplier()); + assertEquals(300, bm.getReconstructionPendingTimeout()); cluster.getNameNode().reconfigurePropertyImpl( DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, "20"); @@ -81,10 +85,14 @@ public void testParamsCanBeReconfigured() throws ReconfigurationException { cluster.getNameNode().reconfigurePropertyImpl( DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, "24"); + cluster.getNameNode().reconfigurePropertyImpl( + DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, + "180"); assertEquals(20, bm.getMaxReplicationStreams()); assertEquals(22, bm.getReplicationStreamsHardLimit()); assertEquals(24, bm.getBlocksReplWorkMultiplier()); + assertEquals(180, bm.getReconstructionPendingTimeout()); } /** @@ -96,7 +104,8 @@ public void testReconfigureFailsWithInvalidValues() throws Exception { String[] keys = new String[]{ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, - DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION + DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, + DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY }; // Ensure we cannot set any of the parameters negative @@ -112,6 +121,7 @@ public void testReconfigureFailsWithInvalidValues() throws Exception { assertEquals(8, bm.getMaxReplicationStreams()); assertEquals(10, bm.getReplicationStreamsHardLimit()); assertEquals(12, bm.getBlocksReplWorkMultiplier()); + assertEquals(300, bm.getReconstructionPendingTimeout()); for (String key : keys) { ReconfigurationException e = @@ -126,6 +136,7 @@ public void testReconfigureFailsWithInvalidValues() throws Exception { assertEquals(8, bm.getMaxReplicationStreams()); assertEquals(10, bm.getReplicationStreamsHardLimit()); assertEquals(12, bm.getBlocksReplWorkMultiplier()); + assertEquals(300, bm.getReconstructionPendingTimeout()); // Ensure none of the parameters can be set to a string value for (String key : keys) { @@ -139,5 +150,6 @@ public void testReconfigureFailsWithInvalidValues() throws Exception { assertEquals(8, bm.getMaxReplicationStreams()); assertEquals(10, bm.getReplicationStreamsHardLimit()); assertEquals(12, bm.getBlocksReplWorkMultiplier()); + assertEquals(300, bm.getReconstructionPendingTimeout()); } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index 3d22806d352ddd..dbb4c329c07c4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -19,14 +19,11 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY; import java.util.function.Supplier; import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; @@ -90,7 +87,6 @@ import java.util.Scanner; import java.util.concurrent.TimeoutException; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY; import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.is; @@ -482,15 +478,12 @@ public void testNameNodeGetReconfigurableProperties() throws IOException { final List outs = Lists.newArrayList(); final List errs = Lists.newArrayList(); getReconfigurableProperties("namenode", address, outs, errs); - assertEquals(16, outs.size()); + assertEquals(17, outs.size()); + assertTrue(outs.get(0).contains("Reconfigurable properties:")); assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(1)); - assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(2)); assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(3)); - assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(4)); assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(5)); - assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, outs.get(6)); - assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(7)); - assertEquals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, outs.get(8)); + System.out.println(outs); assertEquals(errs.size(), 0); } @@ -1114,4 +1107,51 @@ public Integer run() throws Exception { } }); } -} \ No newline at end of file + + @Test + public void testAllDatanodesReconfig() + throws IOException, InterruptedException, TimeoutException { + ReconfigurationUtil reconfigurationUtil = mock(ReconfigurationUtil.class); + cluster.getDataNodes().get(0).setReconfigurationUtil(reconfigurationUtil); + cluster.getDataNodes().get(1).setReconfigurationUtil(reconfigurationUtil); + + List changes = new ArrayList<>(); + changes.add(new ReconfigurationUtil.PropertyChange( + DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true", + datanode.getConf().get(DFS_DATANODE_PEER_STATS_ENABLED_KEY))); + when(reconfigurationUtil.parseChangedProperties(any(Configuration.class), + any(Configuration.class))).thenReturn(changes); + + assertEquals(0, admin.startReconfiguration("datanode", "livenodes")); + final List outsForStartReconf = new ArrayList<>(); + final List errsForStartReconf = new ArrayList<>(); + reconfigurationOutErrFormatter("startReconfiguration", "datanode", + "livenodes", outsForStartReconf, errsForStartReconf); + assertEquals(3, outsForStartReconf.size()); + assertEquals(0, errsForStartReconf.size()); + assertTrue(outsForStartReconf.get(0).startsWith("Started reconfiguration task on node")); + assertTrue(outsForStartReconf.get(1).startsWith("Started reconfiguration task on node")); + assertEquals("Starting of reconfiguration task successful on 2 nodes, failed on 0 nodes.", + outsForStartReconf.get(2)); + + Thread.sleep(1000); + final List outs = new ArrayList<>(); + final List errs = new ArrayList<>(); + awaitReconfigurationFinished("datanode", "livenodes", outs, errs); + assertEquals(9, outs.size()); + assertEquals(0, errs.size()); + LOG.info("dfsadmin -status -livenodes output:"); + outs.forEach(s -> LOG.info("{}", s)); + assertTrue(outs.get(0).startsWith("Reconfiguring status for node")); + assertTrue("SUCCESS: Changed property dfs.datanode.peer.stats.enabled".equals(outs.get(2)) + || "SUCCESS: Changed property dfs.datanode.peer.stats.enabled".equals(outs.get(1))); + assertTrue("\tFrom: \"false\"".equals(outs.get(3)) || "\tFrom: \"false\"".equals(outs.get(2))); + assertTrue("\tTo: \"true\"".equals(outs.get(4)) || "\tTo: \"true\"".equals(outs.get(3))); + assertEquals("SUCCESS: Changed property dfs.datanode.peer.stats.enabled", outs.get(5)); + assertEquals("\tFrom: \"false\"", outs.get(6)); + assertEquals("\tTo: \"true\"", outs.get(7)); + assertEquals("Retrieval of reconfiguration status successful on 2 nodes, failed on 0 nodes.", + outs.get(8)); + } + +}