@@ -286,6 +286,10 @@ private void flipDataBuffers() {
286286 private CompletionService <Void > flushAllExecutorCompletionService ;
287287 private int blockGroupIndex ;
288288 private long datanodeRestartTimeout ;
289+ public static final String DFS_CLIENT_WRITE_EC_CHECKSTREAMER_REDUNENCY_KEY =
290+ "dfs.client.write.ec.checkstreamer.redunency" ;
291+ public static final int DFS_CLIENT_WRITE_EC_CHECKSTREAMER_REDUNENCY_DEFAULT = 0 ;
292+ private final int extraStreamerRedunency ;
289293
290294 /** Construct a new output stream for creating a file. */
291295 DFSStripedOutputStream (DFSClient dfsClient , String src , HdfsFileStatus stat ,
@@ -325,6 +329,9 @@ private void flipDataBuffers() {
325329 currentPackets = new DFSPacket [streamers .size ()];
326330 datanodeRestartTimeout = dfsClient .getConf ().getDatanodeRestartTimeout ();
327331 setCurrentStreamer (0 );
332+ int extraStreamerTmp = dfsClient .getConfiguration ().getInt (DFS_CLIENT_WRITE_EC_CHECKSTREAMER_REDUNENCY_KEY ,
333+ DFS_CLIENT_WRITE_EC_CHECKSTREAMER_REDUNENCY_DEFAULT );
334+ extraStreamerRedunency = Math .max (extraStreamerTmp , 0 );
328335 }
329336
330337 /** Construct a new output stream for appending to a file. */
@@ -690,7 +697,7 @@ private void checkStreamerFailures(boolean isNeedFlushAllPackets)
690697 // 2) create new block outputstream
691698 newFailed = waitCreatingStreamers (healthySet );
692699 if (newFailed .size () + failedStreamers .size () >
693- numAllBlocks - numDataBlocks ) {
700+ numAllBlocks - numDataBlocks - extraStreamerRedunency ) {
694701 // The write has failed, Close all the streamers.
695702 closeAllStreamers ();
696703 throw new IOException (
0 commit comments