|
73 | 73 | import java.util.concurrent.LinkedBlockingQueue; |
74 | 74 | import java.util.concurrent.TimeUnit; |
75 | 75 |
|
| 76 | +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.ECRedundancy.DFS_CLIENT_EC_WRITE_FAILED_BLOCKS_TOLERATED; |
| 77 | +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.ECRedundancy.DFS_CLIENT_EC_WRITE_FAILED_BLOCKS_TOLERATED_DEFAILT; |
| 78 | + |
76 | 79 | /** |
77 | 80 | * This class supports writing files in striped layout and erasure coded format. |
78 | 81 | * Each stripe contains a sequence of cells. |
@@ -283,6 +286,7 @@ private void flipDataBuffers() { |
283 | 286 | private CompletionService<Void> flushAllExecutorCompletionService; |
284 | 287 | private int blockGroupIndex; |
285 | 288 | private long datanodeRestartTimeout; |
| 289 | + private final int failedBlocksTolerated; |
286 | 290 |
|
287 | 291 | /** Construct a new output stream for creating a file. */ |
288 | 292 | DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, |
@@ -322,6 +326,15 @@ private void flipDataBuffers() { |
322 | 326 | currentPackets = new DFSPacket[streamers.size()]; |
323 | 327 | datanodeRestartTimeout = dfsClient.getConf().getDatanodeRestartTimeout(); |
324 | 328 | setCurrentStreamer(0); |
| 329 | + |
| 330 | + int failedBlocksToleratedTmp = dfsClient.getConfiguration().getInt( |
| 331 | + DFS_CLIENT_EC_WRITE_FAILED_BLOCKS_TOLERATED, |
| 332 | + DFS_CLIENT_EC_WRITE_FAILED_BLOCKS_TOLERATED_DEFAILT); |
| 333 | + if (failedBlocksToleratedTmp < 0) { |
| 334 | + failedBlocksToleratedTmp = ecPolicy.getNumParityUnits(); |
| 335 | + } |
| 336 | + failedBlocksTolerated = Math.min(failedBlocksToleratedTmp, |
| 337 | + ecPolicy.getNumParityUnits()); |
325 | 338 | } |
326 | 339 |
|
327 | 340 | /** Construct a new output stream for appending to a file. */ |
@@ -402,11 +415,11 @@ private Set<StripedDataStreamer> checkStreamers() throws IOException { |
402 | 415 | LOG.debug("original failed streamers: {}", failedStreamers); |
403 | 416 | LOG.debug("newly failed streamers: {}", newFailed); |
404 | 417 | } |
405 | | - if (failCount > (numAllBlocks - numDataBlocks)) { |
| 418 | + if (failCount > failedBlocksTolerated) { |
406 | 419 | closeAllStreamers(); |
407 | 420 | throw new IOException("Failed: the number of failed blocks = " |
408 | | - + failCount + " > the number of parity blocks = " |
409 | | - + (numAllBlocks - numDataBlocks)); |
| 421 | + + failCount + " > the number of failed blocks tolerated = " |
| 422 | + + failedBlocksTolerated); |
410 | 423 | } |
411 | 424 | return newFailed; |
412 | 425 | } |
@@ -687,7 +700,7 @@ private void checkStreamerFailures(boolean isNeedFlushAllPackets) |
687 | 700 | // 2) create new block outputstream |
688 | 701 | newFailed = waitCreatingStreamers(healthySet); |
689 | 702 | if (newFailed.size() + failedStreamers.size() > |
690 | | - numAllBlocks - numDataBlocks) { |
| 703 | + failedBlocksTolerated) { |
691 | 704 | // The write has failed, Close all the streamers. |
692 | 705 | closeAllStreamers(); |
693 | 706 | throw new IOException( |
|
0 commit comments