|
86 | 86 | import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState; |
87 | 87 | import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; |
88 | 88 | import org.apache.hadoop.hdfs.server.blockmanagement.PendingReconstructionBlocks.PendingBlockInfo; |
| 89 | +import org.apache.hadoop.hdfs.server.blockmanagement.ExcessRedundancyMap.ExcessBlockInfo; |
89 | 90 | import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; |
90 | 91 | import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; |
91 | 92 | import org.apache.hadoop.hdfs.server.namenode.CachedBlock; |
|
116 | 117 |
|
117 | 118 | import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; |
118 | 119 |
|
| 120 | +import org.apache.hadoop.hdfs.util.LightWeightHashSet; |
119 | 121 | import org.apache.hadoop.metrics2.util.MBeans; |
120 | 122 | import org.apache.hadoop.net.Node; |
121 | 123 | import org.apache.hadoop.security.UserGroupInformation; |
@@ -482,6 +484,16 @@ public int getPendingSPSPaths() { |
482 | 484 | /** Storages accessible from multiple DNs. */ |
483 | 485 | private final ProvidedStorageMap providedStorageMap; |
484 | 486 |
|
| 487 | + /** |
| 488 | + * Timeout for excess redundancy block. |
| 489 | + */ |
| 490 | + private long excessRedundancyTimeout; |
| 491 | + |
| 492 | + /** |
| 493 | + * Limits number of blocks used to check for excess redundancy timeout. |
| 494 | + */ |
| 495 | + private long excessRedundancyTimeoutCheckLimit; |
| 496 | + |
485 | 497 | public BlockManager(final Namesystem namesystem, boolean haEnabled, |
486 | 498 | final Configuration conf) throws IOException { |
487 | 499 | this.namesystem = namesystem; |
@@ -589,6 +601,12 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled, |
589 | 601 | conf.getBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED, |
590 | 602 | DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT); |
591 | 603 |
|
| 604 | + setExcessRedundancyTimeout(conf.getLong(DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_KEY, |
| 605 | + DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC)); |
| 606 | + setExcessRedundancyTimeoutCheckLimit(conf.getLong( |
| 607 | + DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT, |
| 608 | + DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT)); |
| 609 | + |
592 | 610 | printInitialConfigs(); |
593 | 611 | } |
594 | 612 |
|
@@ -3040,6 +3058,98 @@ void rescanPostponedMisreplicatedBlocks() { |
3040 | 3058 | (Time.monotonicNow() - startTime), endSize, (startSize - endSize)); |
3041 | 3059 | } |
3042 | 3060 | } |
| 3061 | + |
| 3062 | + /** |
| 3063 | + * Sets the timeout (in seconds) for excess redundancy blocks, if the provided timeout is |
| 3064 | + * less than or equal to 0, the default value is used (converted to milliseconds). |
| 3065 | + * @param timeOut The time (in seconds) to set as the excess redundancy block timeout. |
| 3066 | + */ |
| 3067 | + public void setExcessRedundancyTimeout(long timeOut) { |
| 3068 | + if (timeOut <= 0) { |
| 3069 | + this.excessRedundancyTimeout = DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC * 1000L; |
| 3070 | + } else { |
| 3071 | + this.excessRedundancyTimeout = timeOut * 1000L; |
| 3072 | + } |
| 3073 | + } |
| 3074 | + |
| 3075 | + /** |
| 3076 | + * Sets the limit number of blocks for checking excess redundancy timeout. |
| 3077 | + * If the provided limit is less than or equal to 0, the default limit is used. |
| 3078 | + * |
| 3079 | + * @param limit The limit number of blocks used to check for excess redundancy timeout. |
| 3080 | + */ |
| 3081 | + public void setExcessRedundancyTimeoutCheckLimit(long limit) { |
| 3082 | + if (excessRedundancyTimeoutCheckLimit <= 0) { |
| 3083 | + this.excessRedundancyTimeoutCheckLimit = |
| 3084 | + DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT; |
| 3085 | + } else { |
| 3086 | + this.excessRedundancyTimeoutCheckLimit = limit; |
| 3087 | + } |
| 3088 | + } |
| 3089 | + |
| 3090 | + /** |
| 3091 | + * Process timed-out blocks in the excess redundancy map. |
| 3092 | + */ |
| 3093 | + void processTimedOutExcessBlocks() { |
| 3094 | + if (excessRedundancyMap.size() == 0) { |
| 3095 | + return; |
| 3096 | + } |
| 3097 | + namesystem.writeLock(); |
| 3098 | + long now = Time.monotonicNow(); |
| 3099 | + int processed = 0; |
| 3100 | + try { |
| 3101 | + Iterator<Map.Entry<String, LightWeightHashSet<ExcessBlockInfo>>> iter = |
| 3102 | + excessRedundancyMap.getExcessRedundancyMap().entrySet().iterator(); |
| 3103 | + while (iter.hasNext() && processed < excessRedundancyTimeoutCheckLimit) { |
| 3104 | + Map.Entry<String, LightWeightHashSet<ExcessBlockInfo>> entry = iter.next(); |
| 3105 | + String datanodeUuid = entry.getKey(); |
| 3106 | + LightWeightHashSet<ExcessBlockInfo> blocks = entry.getValue(); |
| 3107 | + List<ExcessRedundancyMap.ExcessBlockInfo> sortedBlocks = new ArrayList<>(blocks); |
| 3108 | + // Sort blocks by timestamp in descending order. |
| 3109 | + Collections.sort(sortedBlocks); |
| 3110 | + |
| 3111 | + for (ExcessBlockInfo excessBlockInfo : sortedBlocks) { |
| 3112 | + if (processed >= excessRedundancyTimeoutCheckLimit) { |
| 3113 | + break; |
| 3114 | + } |
| 3115 | + BlockInfo blockInfo = excessBlockInfo.getBlockInfo(); |
| 3116 | + BlockInfo bi = blocksMap.getStoredBlock(blockInfo); |
| 3117 | + if (bi == null || bi.isDeleted()) { |
| 3118 | + continue; |
| 3119 | + } |
| 3120 | + |
| 3121 | + // If the datanode doesn't have any excess block that has exceeded the timeout, |
| 3122 | + // can exit this loop. |
| 3123 | + if (now <= excessBlockInfo.getTimeStamp() + excessRedundancyTimeout) { |
| 3124 | + break; |
| 3125 | + } |
| 3126 | + |
| 3127 | + Iterator<DatanodeStorageInfo> iterator = blockInfo.getStorageInfos(); |
| 3128 | + while (iterator.hasNext()) { |
| 3129 | + DatanodeStorageInfo datanodeStorageInfo = iterator.next(); |
| 3130 | + DatanodeDescriptor datanodeDescriptor = datanodeStorageInfo.getDatanodeDescriptor(); |
| 3131 | + if (datanodeDescriptor.getDatanodeUuid().equals(datanodeUuid)) { |
| 3132 | + if (datanodeStorageInfo.getState().equals(State.NORMAL)) { |
| 3133 | + final Block block = getBlockOnStorage(blockInfo, |
| 3134 | + datanodeStorageInfo); |
| 3135 | + if (!containsInvalidateBlock(datanodeDescriptor, block)) { |
| 3136 | + addToInvalidates(block, datanodeDescriptor); |
| 3137 | + LOG.debug("Excess block timeout ({}, {}) is added to invalidated.", |
| 3138 | + block, datanodeDescriptor); |
| 3139 | + } |
| 3140 | + excessBlockInfo.setTimeStamp(); |
| 3141 | + processed ++; |
| 3142 | + break; |
| 3143 | + } |
| 3144 | + } |
| 3145 | + } |
| 3146 | + } |
| 3147 | + } |
| 3148 | + } finally { |
| 3149 | + namesystem.writeUnlock("processTimedOutExcessBlocks"); |
| 3150 | + LOG.info("processTimedOutExcessBlocks {} msecs.", (Time.monotonicNow() - now)); |
| 3151 | + } |
| 3152 | + } |
3043 | 3153 |
|
3044 | 3154 | Collection<Block> processReport( |
3045 | 3155 | final DatanodeStorageInfo storageInfo, |
@@ -5231,6 +5341,7 @@ public void run() { |
5231 | 5341 | computeDatanodeWork(); |
5232 | 5342 | processPendingReconstructions(); |
5233 | 5343 | rescanPostponedMisreplicatedBlocks(); |
| 5344 | + processTimedOutExcessBlocks(); |
5234 | 5345 | lastRedundancyCycleTS.set(Time.monotonicNow()); |
5235 | 5346 | } |
5236 | 5347 | TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs); |
|
0 commit comments