|
73 | 73 | import java.util.concurrent.LinkedBlockingQueue; |
74 | 74 | import java.util.concurrent.TimeUnit; |
75 | 75 |
|
| 76 | +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.ECRedunency.DFS_CLIENT_EC_RS_10_4_1024k_FAILED_WRITE_BLOCK_TOLERATED; |
| 77 | +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.ECRedunency.DFS_CLIENT_EC_RS_10_4_1024k_FAILED_WRITE_BLOCK_TOLERATED_DEFAILT; |
| 78 | +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.ECRedunency.DFS_CLIENT_EC_RS_3_2_1024k_FAILED_WRITE_BLOCK_TOLERATED; |
| 79 | +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.ECRedunency.DFS_CLIENT_EC_RS_3_2_1024k_FAILED_WRITE_BLOCK_TOLERATED_DEFAILT; |
| 80 | +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.ECRedunency.DFS_CLIENT_EC_RS_6_3_1024k_FAILED_WRITE_BLOCK_TOLERATED; |
| 81 | +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.ECRedunency.DFS_CLIENT_EC_RS_6_3_1024k_FAILED_WRITE_BLOCK_TOLERATED_DEFAILT; |
| 82 | +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.ECRedunency.DFS_CLIENT_EC_RS_LEGACY_6_3_1024k_FAILED_WRITE_BLOCK_TOLERATED; |
| 83 | +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.ECRedunency.DFS_CLIENT_EC_RS_LEGACY_6_3_1024k_FAILED_WRITE_BLOCK_TOLERATED_DEFAILT; |
| 84 | +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.ECRedunency.DFS_CLIENT_EC_XOR_2_1_1024k_FAILED_WRITE_BLOCK_TOLERATED; |
| 85 | +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.ECRedunency.DFS_CLIENT_EC_XOR_2_1_1024k_FAILED_WRITE_BLOCK_TOLERATED_DEFAILT; |
76 | 86 | import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_DEFAULT; |
77 | 87 | import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY; |
78 | 88 |
|
@@ -286,6 +296,7 @@ private void flipDataBuffers() { |
286 | 296 | private CompletionService<Void> flushAllExecutorCompletionService; |
287 | 297 | private int blockGroupIndex; |
288 | 298 | private long datanodeRestartTimeout; |
| 299 | + private final int failedStreamerTolerated; |
289 | 300 |
|
290 | 301 | /** Construct a new output stream for creating a file. */ |
291 | 302 | DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, |
@@ -325,6 +336,38 @@ private void flipDataBuffers() { |
325 | 336 | currentPackets = new DFSPacket[streamers.size()]; |
326 | 337 | datanodeRestartTimeout = dfsClient.getConf().getDatanodeRestartTimeout(); |
327 | 338 | setCurrentStreamer(0); |
| 339 | + |
| 340 | + int extraFailedStreamerToleratedTmp; |
| 341 | + switch (ecPolicy.getName()) { |
| 342 | + case "RS-10-4-1024k": |
| 343 | + extraFailedStreamerToleratedTmp = dfsClient.getConfiguration().getInt( |
| 344 | + DFS_CLIENT_EC_RS_10_4_1024k_FAILED_WRITE_BLOCK_TOLERATED, |
| 345 | + DFS_CLIENT_EC_RS_10_4_1024k_FAILED_WRITE_BLOCK_TOLERATED_DEFAILT); |
| 346 | + break; |
| 347 | + case "RS-3-2-1024k": |
| 348 | + extraFailedStreamerToleratedTmp = dfsClient.getConfiguration().getInt( |
| 349 | + DFS_CLIENT_EC_RS_3_2_1024k_FAILED_WRITE_BLOCK_TOLERATED, |
| 350 | + DFS_CLIENT_EC_RS_3_2_1024k_FAILED_WRITE_BLOCK_TOLERATED_DEFAILT); |
| 351 | + break; |
| 352 | + case "RS-6-3-1024k": |
| 353 | + extraFailedStreamerToleratedTmp = dfsClient.getConfiguration().getInt( |
| 354 | + DFS_CLIENT_EC_RS_6_3_1024k_FAILED_WRITE_BLOCK_TOLERATED, |
| 355 | + DFS_CLIENT_EC_RS_6_3_1024k_FAILED_WRITE_BLOCK_TOLERATED_DEFAILT); |
| 356 | + break; |
| 357 | + case "RS-LEGACY-6-3-1024k": |
| 358 | + extraFailedStreamerToleratedTmp = dfsClient.getConfiguration().getInt( |
| 359 | + DFS_CLIENT_EC_RS_LEGACY_6_3_1024k_FAILED_WRITE_BLOCK_TOLERATED, |
| 360 | + DFS_CLIENT_EC_RS_LEGACY_6_3_1024k_FAILED_WRITE_BLOCK_TOLERATED_DEFAILT); |
| 361 | + break; |
| 362 | + case "XOR-2-1-1024k": |
| 363 | + extraFailedStreamerToleratedTmp = dfsClient.getConfiguration().getInt( |
| 364 | + DFS_CLIENT_EC_XOR_2_1_1024k_FAILED_WRITE_BLOCK_TOLERATED, |
| 365 | + DFS_CLIENT_EC_XOR_2_1_1024k_FAILED_WRITE_BLOCK_TOLERATED_DEFAILT); |
| 366 | + break; |
| 367 | + default : |
| 368 | + extraFailedStreamerToleratedTmp = 0; |
| 369 | + } |
| 370 | + failedStreamerTolerated = Math.max(extraFailedStreamerToleratedTmp, 0); |
328 | 371 | } |
329 | 372 |
|
330 | 373 | /** Construct a new output stream for appending to a file. */ |
@@ -690,7 +733,7 @@ private void checkStreamerFailures(boolean isNeedFlushAllPackets) |
690 | 733 | // 2) create new block outputstream |
691 | 734 | newFailed = waitCreatingStreamers(healthySet); |
692 | 735 | if (newFailed.size() + failedStreamers.size() > |
693 | | - numAllBlocks - numDataBlocks) { |
| 736 | + failedStreamerTolerated) { |
694 | 737 | // The write has failed, Close all the streamers. |
695 | 738 | closeAllStreamers(); |
696 | 739 | throw new IOException( |
|
0 commit comments