Skip to content

Commit 707364c

Browse files
tomscutBogdan Stolojan
authored andcommitted
HDFS-16398. Reconfig block report parameters for datanode (apache#3831)
(cherry picked from commit c2ff390) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
1 parent 05ddaff commit 707364c

File tree

4 files changed

+130
-79
lines changed

4 files changed

+130
-79
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,12 @@ public class DNConf {
107107
final long heartBeatInterval;
108108
private final long lifelineIntervalMs;
109109
volatile long blockReportInterval;
110-
final long blockReportSplitThreshold;
110+
volatile long blockReportSplitThreshold;
111111
final boolean peerStatsEnabled;
112112
final boolean diskStatsEnabled;
113113
final long outliersReportIntervalMs;
114114
final long ibrInterval;
115-
final long initialBlockReportDelayMs;
115+
volatile long initialBlockReportDelayMs;
116116
volatile long cacheReportInterval;
117117
final long datanodeSlowIoWarningThresholdMs;
118118

@@ -215,19 +215,7 @@ public DNConf(final Configurable dn) {
215215
this.datanodeSlowIoWarningThresholdMs = getConf().getLong(
216216
DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY,
217217
DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
218-
219-
long initBRDelay = getConf().getTimeDuration(
220-
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
221-
DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT,
222-
TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
223-
if (initBRDelay >= blockReportInterval) {
224-
initBRDelay = 0;
225-
DataNode.LOG.info(DFS_BLOCKREPORT_INITIAL_DELAY_KEY + " is "
226-
+ "greater than or equal to" + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY
227-
+ ". Setting initial delay to 0 msec:");
228-
}
229-
initialBlockReportDelayMs = initBRDelay;
230-
218+
initBlockReportDelay();
231219
heartBeatInterval = getConf().getTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY,
232220
DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS,
233221
TimeUnit.MILLISECONDS);
@@ -311,6 +299,19 @@ public DNConf(final Configurable dn) {
311299
);
312300
}
313301

302+
private void initBlockReportDelay() {
303+
long initBRDelay = getConf().getTimeDuration(
304+
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
305+
DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT, TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
306+
if (initBRDelay >= blockReportInterval || initBRDelay < 0) {
307+
initBRDelay = 0;
308+
DataNode.LOG.info(DFS_BLOCKREPORT_INITIAL_DELAY_KEY +
309+
" is greater than or equal to " + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY +
310+
". Setting initial delay to 0 msec.");
311+
}
312+
initialBlockReportDelayMs = initBRDelay;
313+
}
314+
314315
// We get minimumNameNodeVersion via a method so it can be mocked out in tests.
315316
String getMinimumNameNodeVersion() {
316317
return this.minimumNameNodeVersion;
@@ -477,7 +478,8 @@ public long getProcessCommandsThresholdMs() {
477478
}
478479

479480
void setBlockReportInterval(long intervalMs) {
480-
Preconditions.checkArgument(intervalMs > 0);
481+
Preconditions.checkArgument(intervalMs > 0,
482+
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY + " should be larger than 0");
481483
blockReportInterval = intervalMs;
482484
}
483485

@@ -487,11 +489,22 @@ public long getBlockReportInterval() {
487489

488490
void setCacheReportInterval(long intervalMs) {
489491
Preconditions.checkArgument(intervalMs > 0,
490-
"dfs.cachereport.intervalMsec should be larger than 0");
492+
DFS_CACHEREPORT_INTERVAL_MSEC_KEY + " should be larger than 0");
491493
cacheReportInterval = intervalMs;
492494
}
493495

494496
public long getCacheReportInterval() {
495497
return cacheReportInterval;
496498
}
499+
500+
void setBlockReportSplitThreshold(long threshold) {
501+
Preconditions.checkArgument(threshold >= 0,
502+
DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY + " should be larger than or equal to 0");
503+
blockReportSplitThreshold = threshold;
504+
}
505+
506+
void setInitBRDelayMs(String delayMs) {
507+
dn.getConf().set(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, delayMs);
508+
initBlockReportDelay();
509+
}
497510
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

Lines changed: 53 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@
1818
package org.apache.hadoop.hdfs.server.datanode;
1919

2020

21+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
22+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
2123
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
2224
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
2325
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
2426
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
27+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
28+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
2529
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
2630
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
2731
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
@@ -312,6 +316,8 @@ public class DataNode extends ReconfigurableBase
312316
DFS_DATANODE_DATA_DIR_KEY,
313317
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
314318
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
319+
DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
320+
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
315321
DFS_DATANODE_MAX_RECEIVER_THREADS_KEY,
316322
DFS_CACHEREPORT_INTERVAL_MSEC_KEY));
317323

@@ -619,39 +625,10 @@ public String reconfigurePropertyImpl(String property, String newVal)
619625
}
620626
break;
621627
}
622-
case DFS_BLOCKREPORT_INTERVAL_MSEC_KEY: {
623-
ReconfigurationException rootException = null;
624-
try {
625-
LOG.info("Reconfiguring {} to {}", property, newVal);
626-
long intervalMs;
627-
if (newVal == null) {
628-
// Set to default.
629-
intervalMs = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
630-
} else {
631-
intervalMs = Long.parseLong(newVal);
632-
}
633-
dnConf.setBlockReportInterval(intervalMs);
634-
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
635-
if (bpos != null) {
636-
for (BPServiceActor actor : bpos.getBPServiceActors()) {
637-
actor.getScheduler().setBlockReportIntervalMs(intervalMs);
638-
}
639-
}
640-
}
641-
return Long.toString(intervalMs);
642-
} catch (IllegalArgumentException e) {
643-
rootException = new ReconfigurationException(
644-
property, newVal, getConf().get(property), e);
645-
} finally {
646-
if (rootException != null) {
647-
LOG.warn(String.format(
648-
"Exception in updating block report interval %s to %s",
649-
property, newVal), rootException);
650-
throw rootException;
651-
}
652-
}
653-
break;
654-
}
628+
case DFS_BLOCKREPORT_INTERVAL_MSEC_KEY:
629+
case DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY:
630+
case DFS_BLOCKREPORT_INITIAL_DELAY_KEY:
631+
return reconfBlockReportParameters(property, newVal);
655632
case DFS_DATANODE_MAX_RECEIVER_THREADS_KEY:
656633
return reconfDataXceiverParameters(property, newVal);
657634
case DFS_CACHEREPORT_INTERVAL_MSEC_KEY:
@@ -697,6 +674,44 @@ private String reconfCacheReportParameters(String property, String newVal)
697674
}
698675
}
699676

677+
private String reconfBlockReportParameters(String property, String newVal)
678+
throws ReconfigurationException {
679+
String result = null;
680+
try {
681+
LOG.info("Reconfiguring {} to {}", property, newVal);
682+
if (property.equals(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY)) {
683+
Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
684+
long intervalMs = newVal == null ? DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT :
685+
Long.parseLong(newVal);
686+
result = Long.toString(intervalMs);
687+
dnConf.setBlockReportInterval(intervalMs);
688+
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
689+
if (bpos != null) {
690+
for (BPServiceActor actor : bpos.getBPServiceActors()) {
691+
actor.getScheduler().setBlockReportIntervalMs(intervalMs);
692+
}
693+
}
694+
}
695+
} else if (property.equals(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY)) {
696+
Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
697+
long threshold = newVal == null ? DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT :
698+
Long.parseLong(newVal);
699+
result = Long.toString(threshold);
700+
dnConf.setBlockReportSplitThreshold(threshold);
701+
} else if (property.equals(DFS_BLOCKREPORT_INITIAL_DELAY_KEY)) {
702+
Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
703+
int initialDelay = newVal == null ? DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT :
704+
Integer.parseInt(newVal);
705+
result = Integer.toString(initialDelay);
706+
dnConf.setInitBRDelayMs(result);
707+
}
708+
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
709+
return result;
710+
} catch (IllegalArgumentException e) {
711+
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
712+
}
713+
}
714+
700715
/**
701716
* Get a list of the keys of the re-configurable properties in configuration.
702717
*/
@@ -3823,4 +3838,9 @@ private static boolean isWrite(BlockConstructionStage stage) {
38233838
return (stage == PIPELINE_SETUP_STREAMING_RECOVERY
38243839
|| stage == PIPELINE_SETUP_APPEND_RECOVERY);
38253840
}
3841+
3842+
@VisibleForTesting
3843+
public BlockPoolManager getBlockPoolManager() {
3844+
return blockPoolManager;
3845+
}
38263846
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
package org.apache.hadoop.hdfs.server.datanode;
2020

21+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
2122
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
2223
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
24+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
2325
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
2426
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
2527
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
@@ -303,40 +305,49 @@ private void testAcquireOnMaxConcurrentMoversReconfiguration(
303305

304306
@Test
305307
public void testBlockReportIntervalReconfiguration()
306-
throws ReconfigurationException, IOException {
308+
throws ReconfigurationException {
307309
int blockReportInterval = 300 * 1000;
310+
String[] blockReportParameters = {
311+
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
312+
DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
313+
DFS_BLOCKREPORT_INITIAL_DELAY_KEY};
314+
308315
for (int i = 0; i < NUM_DATA_NODE; i++) {
309316
DataNode dn = cluster.getDataNodes().get(i);
317+
BlockPoolManager blockPoolManager = dn.getBlockPoolManager();
310318

311319
// Try invalid values.
320+
for (String blockReportParameter : blockReportParameters) {
321+
try {
322+
dn.reconfigureProperty(blockReportParameter, "text");
323+
fail("ReconfigurationException expected");
324+
} catch (ReconfigurationException expected) {
325+
assertTrue("expecting NumberFormatException",
326+
expected.getCause() instanceof NumberFormatException);
327+
}
328+
}
329+
312330
try {
313-
dn.reconfigureProperty(
314-
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, "text");
331+
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, String.valueOf(-1));
315332
fail("ReconfigurationException expected");
316333
} catch (ReconfigurationException expected) {
317-
assertTrue("expecting NumberFormatException",
318-
expected.getCause() instanceof NumberFormatException);
334+
assertTrue("expecting IllegalArgumentException",
335+
expected.getCause() instanceof IllegalArgumentException);
319336
}
320337
try {
321-
dn.reconfigureProperty(
322-
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
323-
String.valueOf(-1));
338+
dn.reconfigureProperty(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, String.valueOf(-1));
324339
fail("ReconfigurationException expected");
325340
} catch (ReconfigurationException expected) {
326341
assertTrue("expecting IllegalArgumentException",
327342
expected.getCause() instanceof IllegalArgumentException);
328343
}
344+
dn.reconfigureProperty(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, String.valueOf(-1));
345+
assertEquals(0, dn.getDnConf().initialBlockReportDelayMs);
329346

330-
// Change properties.
347+
// Change properties and verify the change.
331348
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
332349
String.valueOf(blockReportInterval));
333-
334-
// Verify change.
335-
assertEquals(String.format("%s has wrong value",
336-
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
337-
blockReportInterval,
338-
dn.getDnConf().getBlockReportInterval());
339-
for (BPOfferService bpos : dn.getAllBpOs()) {
350+
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
340351
if (bpos != null) {
341352
for (BPServiceActor actor : bpos.getBPServiceActors()) {
342353
assertEquals(String.format("%s has wrong value",
@@ -347,15 +358,15 @@ public void testBlockReportIntervalReconfiguration()
347358
}
348359
}
349360

350-
// Revert to default.
351-
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
352-
null);
353-
assertEquals(String.format("%s has wrong value",
354-
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
355-
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT,
356-
dn.getDnConf().getBlockReportInterval());
357-
// Verify default.
358-
for (BPOfferService bpos : dn.getAllBpOs()) {
361+
dn.reconfigureProperty(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, String.valueOf(123));
362+
assertEquals(123, dn.getDnConf().blockReportSplitThreshold);
363+
364+
dn.reconfigureProperty(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, "123");
365+
assertEquals(123000, dn.getDnConf().initialBlockReportDelayMs);
366+
367+
// Revert to default and verify default.
368+
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, null);
369+
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
359370
if (bpos != null) {
360371
for (BPServiceActor actor : bpos.getBPServiceActors()) {
361372
assertEquals(String.format("%s has wrong value",
@@ -365,9 +376,16 @@ public void testBlockReportIntervalReconfiguration()
365376
}
366377
}
367378
}
368-
assertEquals(String.format("expect %s is not configured",
369-
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY), null, dn
370-
.getConf().get(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY));
379+
assertNull(String.format("expect %s is not configured", DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
380+
dn.getConf().get(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY));
381+
382+
dn.reconfigureProperty(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, null);
383+
assertNull(String.format("expect %s is not configured", DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY),
384+
dn.getConf().get(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY));
385+
386+
dn.reconfigureProperty(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, null);
387+
assertNull(String.format("expect %s is not configured", DFS_BLOCKREPORT_INITIAL_DELAY_KEY),
388+
dn.getConf().get(DFS_BLOCKREPORT_INITIAL_DELAY_KEY));
371389
}
372390
}
373391

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException {
330330
final List<String> outs = Lists.newArrayList();
331331
final List<String> errs = Lists.newArrayList();
332332
getReconfigurableProperties("datanode", address, outs, errs);
333-
assertEquals(6, outs.size());
333+
assertEquals(8, outs.size());
334334
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
335335
}
336336

0 commit comments

Comments
 (0)