|
30 | 30 | import java.util.BitSet; |
31 | 31 | import java.util.Collection; |
32 | 32 | import java.util.Collections; |
| 33 | +import java.util.Comparator; |
33 | 34 | import java.util.EnumSet; |
34 | 35 | import java.util.HashMap; |
35 | 36 | import java.util.HashSet; |
|
50 | 51 | import java.util.concurrent.ConcurrentLinkedQueue; |
51 | 52 |
|
52 | 53 | import java.util.concurrent.atomic.AtomicLong; |
| 54 | +import java.util.stream.Collectors; |
53 | 55 | import javax.management.ObjectName; |
54 | 56 |
|
55 | 57 | import org.apache.hadoop.HadoopIllegalArgumentException; |
|
86 | 88 | import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState; |
87 | 89 | import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; |
88 | 90 | import org.apache.hadoop.hdfs.server.blockmanagement.PendingReconstructionBlocks.PendingBlockInfo; |
| 91 | +import org.apache.hadoop.hdfs.server.blockmanagement.ExcessRedundancyMap.ExcessBlockInfo; |
89 | 92 | import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; |
90 | 93 | import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; |
91 | 94 | import org.apache.hadoop.hdfs.server.namenode.CachedBlock; |
|
116 | 119 |
|
117 | 120 | import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; |
118 | 121 |
|
| 122 | +import org.apache.hadoop.hdfs.util.LightWeightHashSet; |
119 | 123 | import org.apache.hadoop.metrics2.util.MBeans; |
120 | 124 | import org.apache.hadoop.net.Node; |
121 | 125 | import org.apache.hadoop.security.UserGroupInformation; |
@@ -482,6 +486,16 @@ public int getPendingSPSPaths() { |
482 | 486 | /** Storages accessible from multiple DNs. */ |
483 | 487 | private final ProvidedStorageMap providedStorageMap; |
484 | 488 |
|
| 489 | + /** |
| 490 | + * Timeout for excess redundancy block. |
| 491 | + */ |
| 492 | + private long excessRedundancyTimeout; |
| 493 | + |
| 494 | + /** |
| 495 | + * Limits number of blocks used to check for excess redundancy timeout. |
| 496 | + */ |
| 497 | + private long excessRedundancyTimeoutCheckLimit; |
| 498 | + |
485 | 499 | public BlockManager(final Namesystem namesystem, boolean haEnabled, |
486 | 500 | final Configuration conf) throws IOException { |
487 | 501 | this.namesystem = namesystem; |
@@ -589,6 +603,12 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled, |
589 | 603 | conf.getBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED, |
590 | 604 | DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT); |
591 | 605 |
|
| 606 | + setExcessRedundancyTimeout(conf.getLong(DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_KEY, |
| 607 | + DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_DEAFULT)); |
| 608 | + setExcessRedundancyTimeoutCheckLimit(conf.getLong( |
| 609 | + DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT, |
| 610 | + DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT)); |
| 611 | + |
592 | 612 | printInitialConfigs(); |
593 | 613 | } |
594 | 614 |
|
@@ -3041,6 +3061,100 @@ void rescanPostponedMisreplicatedBlocks() { |
3041 | 3061 | (Time.monotonicNow() - startTime), endSize, (startSize - endSize)); |
3042 | 3062 | } |
3043 | 3063 | } |
| 3064 | + |
| 3065 | + /** |
| 3066 | + * Sets the timeout (in seconds) for excess redundancy blocks, if the provided timeout is |
| 3067 | + * less than or equal to 0, the default value is used (converted to milliseconds). |
| 3068 | + * @param timeout The time (in seconds) to set as the excess redundancy block timeout. |
| 3069 | + */ |
| 3070 | + public void setExcessRedundancyTimeout(long timeout) { |
| 3071 | + if (timeout <= 0) { |
| 3072 | + this.excessRedundancyTimeout = DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_DEAFULT * 1000L; |
| 3073 | + } else { |
| 3074 | + this.excessRedundancyTimeout = timeout * 1000L; |
| 3075 | + } |
| 3076 | + } |
| 3077 | + |
| 3078 | + /** |
| 3079 | + * Sets the limit number of blocks for checking excess redundancy timeout. |
| 3080 | + * If the provided limit is less than or equal to 0, the default limit is used. |
| 3081 | + * |
| 3082 | + * @param limit The limit number of blocks used to check for excess redundancy timeout. |
| 3083 | + */ |
| 3084 | + public void setExcessRedundancyTimeoutCheckLimit(long limit) { |
| 3085 | + if (excessRedundancyTimeoutCheckLimit <= 0) { |
| 3086 | + this.excessRedundancyTimeoutCheckLimit = |
| 3087 | + DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT; |
| 3088 | + } else { |
| 3089 | + this.excessRedundancyTimeoutCheckLimit = limit; |
| 3090 | + } |
| 3091 | + } |
| 3092 | + |
| 3093 | + /** |
| 3094 | + * Process timed-out blocks in the excess redundancy map. |
| 3095 | + */ |
| 3096 | + void processTimedOutExcessBlocks() { |
| 3097 | + if (excessRedundancyMap.size() == 0) { |
| 3098 | + return; |
| 3099 | + } |
| 3100 | + namesystem.writeLock(); |
| 3101 | + long now = Time.monotonicNow(); |
| 3102 | + int processed = 0; |
| 3103 | + try { |
| 3104 | + Iterator<Map.Entry<String, LightWeightHashSet<Block>>> iter = |
| 3105 | + excessRedundancyMap.getExcessRedundancyMap().entrySet().iterator(); |
| 3106 | + while (iter.hasNext() && processed < excessRedundancyTimeoutCheckLimit) { |
| 3107 | + Map.Entry<String, LightWeightHashSet<Block>> entry = iter.next(); |
| 3108 | + String datanodeUuid = entry.getKey(); |
| 3109 | + LightWeightHashSet<Block> blocks = entry.getValue(); |
| 3110 | + // Sort blocks by timestamp in descending order. |
| 3111 | + List<ExcessBlockInfo> sortedBlocks = blocks.stream() |
| 3112 | + .filter(block -> block instanceof ExcessBlockInfo) |
| 3113 | + .map(block -> (ExcessBlockInfo) block) |
| 3114 | + .sorted(Comparator.comparingLong(ExcessBlockInfo::getTimeStamp)) |
| 3115 | + .collect(Collectors.toList()); |
| 3116 | + |
| 3117 | + for (ExcessBlockInfo excessBlockInfo : sortedBlocks) { |
| 3118 | + if (processed >= excessRedundancyTimeoutCheckLimit) { |
| 3119 | + break; |
| 3120 | + } |
| 3121 | + |
| 3122 | + processed++; |
| 3123 | + // If the datanode doesn't have any excess block that has exceeded the timeout, |
| 3124 | + // can exit this loop. |
| 3125 | + if (now <= excessBlockInfo.getTimeStamp() + excessRedundancyTimeout) { |
| 3126 | + break; |
| 3127 | + } |
| 3128 | + |
| 3129 | + BlockInfo blockInfo = excessBlockInfo.getBlockInfo(); |
| 3130 | + BlockInfo bi = blocksMap.getStoredBlock(blockInfo); |
| 3131 | + if (bi == null || bi.isDeleted()) { |
| 3132 | + continue; |
| 3133 | + } |
| 3134 | + |
| 3135 | + Iterator<DatanodeStorageInfo> iterator = blockInfo.getStorageInfos(); |
| 3136 | + while (iterator.hasNext()) { |
| 3137 | + DatanodeStorageInfo datanodeStorageInfo = iterator.next(); |
| 3138 | + DatanodeDescriptor datanodeDescriptor = datanodeStorageInfo.getDatanodeDescriptor(); |
| 3139 | + if (datanodeDescriptor.getDatanodeUuid().equals(datanodeUuid) && |
| 3140 | + datanodeStorageInfo.getState().equals(State.NORMAL)) { |
| 3141 | + final Block b = getBlockOnStorage(blockInfo, datanodeStorageInfo); |
| 3142 | + if (!containsInvalidateBlock(datanodeDescriptor, b)) { |
| 3143 | + addToInvalidates(b, datanodeDescriptor); |
| 3144 | + LOG.debug("Excess block timeout ({}, {}) is added to invalidated.", |
| 3145 | + b, datanodeDescriptor); |
| 3146 | + } |
| 3147 | + excessBlockInfo.setTimeStamp(); |
| 3148 | + break; |
| 3149 | + } |
| 3150 | + } |
| 3151 | + } |
| 3152 | + } |
| 3153 | + } finally { |
| 3154 | + namesystem.writeUnlock("processTimedOutExcessBlocks"); |
| 3155 | + LOG.info("processTimedOutExcessBlocks {} msecs.", (Time.monotonicNow() - now)); |
| 3156 | + } |
| 3157 | + } |
3044 | 3158 |
|
3045 | 3159 | Collection<Block> processReport( |
3046 | 3160 | final DatanodeStorageInfo storageInfo, |
@@ -5232,6 +5346,7 @@ public void run() { |
5232 | 5346 | computeDatanodeWork(); |
5233 | 5347 | processPendingReconstructions(); |
5234 | 5348 | rescanPostponedMisreplicatedBlocks(); |
| 5349 | + processTimedOutExcessBlocks(); |
5235 | 5350 | lastRedundancyCycleTS.set(Time.monotonicNow()); |
5236 | 5351 | } |
5237 | 5352 | TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs); |
|
0 commit comments