8787import org .apache .hadoop .thirdparty .com .google .common .cache .LoadingCache ;
8888import org .apache .hadoop .thirdparty .com .google .common .cache .RemovalListener ;
8989import org .apache .hadoop .thirdparty .com .google .common .cache .RemovalNotification ;
90+ import org .apache .hadoop .thirdparty .com .google .common .collect .Iterables ;
9091
9192import org .slf4j .Logger ;
9293import org .slf4j .LoggerFactory ;
@@ -643,17 +644,17 @@ void setAccessToken(Token<BlockTokenIdentifier> t) {
643644 this .accessToken = t ;
644645 }
645646
646- private void setPipeline (LocatedBlock lb ) {
647+ protected void setPipeline (LocatedBlock lb ) {
647648 setPipeline (lb .getLocations (), lb .getStorageTypes (), lb .getStorageIDs ());
648649 }
649650
650- private void setPipeline (DatanodeInfo [] nodes , StorageType [] storageTypes ,
651- String [] storageIDs ) {
651+ protected void setPipeline (DatanodeInfo [] newNodes , StorageType [] newStorageTypes ,
652+ String [] newStorageIDs ) {
652653 synchronized (nodesLock ) {
653- this .nodes = nodes ;
654+ this .nodes = newNodes ;
654655 }
655- this .storageTypes = storageTypes ;
656- this .storageIDs = storageIDs ;
656+ this .storageTypes = newStorageTypes ;
657+ this .storageIDs = newStorageIDs ;
657658 }
658659
659660 /**
@@ -748,7 +749,7 @@ public void run() {
748749
749750 if (stage == BlockConstructionStage .PIPELINE_SETUP_CREATE ) {
750751 LOG .debug ("Allocating new block: {}" , this );
751- setPipeline ( nextBlockOutputStream () );
752+ setupPipelineForCreate ( );
752753 initDataStreaming ();
753754 } else if (stage == BlockConstructionStage .PIPELINE_SETUP_APPEND ) {
754755 LOG .debug ("Append to block {}" , block );
@@ -1607,8 +1608,11 @@ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
16071608 * it can be written to.
16081609 * This happens when a file is appended or data streaming fails
16091610 * It keeps on trying until a pipeline is setup
1611+ *
1612+ * Returns boolean whether pipeline was setup successfully or not.
1613+ * This boolean is used upstream on whether to continue creating pipeline or throw exception
16101614 */
1611- private void setupPipelineForAppendOrRecovery () throws IOException {
1615+ private boolean setupPipelineForAppendOrRecovery () throws IOException {
16121616 // Check number of datanodes. Note that if there is no healthy datanode,
16131617 // this must be internal error because we mark external error in striped
16141618 // outputstream only when all the streamers are in the DATA_STREAMING stage
@@ -1618,33 +1622,46 @@ private void setupPipelineForAppendOrRecovery() throws IOException {
16181622 LOG .warn (msg );
16191623 lastException .set (new IOException (msg ));
16201624 streamerClosed = true ;
1621- return ;
1625+ return false ;
16221626 }
1623- setupPipelineInternal (nodes , storageTypes , storageIDs );
1627+ return setupPipelineInternal (nodes , storageTypes , storageIDs );
16241628 }
16251629
1626- protected void setupPipelineInternal (DatanodeInfo [] datanodes ,
1630+ protected boolean setupPipelineInternal (DatanodeInfo [] datanodes ,
16271631 StorageType [] nodeStorageTypes , String [] nodeStorageIDs )
16281632 throws IOException {
16291633 boolean success = false ;
16301634 long newGS = 0L ;
1635+ boolean isCreateStage = BlockConstructionStage .PIPELINE_SETUP_CREATE == stage ;
16311636 while (!success && !streamerClosed && dfsClient .clientRunning ) {
16321637 if (!handleRestartingDatanode ()) {
1633- return ;
1638+ return false ;
16341639 }
16351640
1636- final boolean isRecovery = errorState .hasInternalError ();
1641+ final boolean isRecovery = errorState .hasInternalError () && !isCreateStage ;
1642+
1643+
16371644 if (!handleBadDatanode ()) {
1638- return ;
1645+ return false ;
16391646 }
16401647
16411648 handleDatanodeReplacement ();
16421649
1650+ // During create stage, min replication should still be satisfied.
1651+ if (isCreateStage && !(dfsClient .dtpReplaceDatanodeOnFailureReplication > 0 &&
1652+ nodes .length >= dfsClient .dtpReplaceDatanodeOnFailureReplication )) {
1653+ return false ;
1654+ }
1655+
16431656 // get a new generation stamp and an access token
16441657 final LocatedBlock lb = updateBlockForPipeline ();
16451658 newGS = lb .getBlock ().getGenerationStamp ();
16461659 accessToken = lb .getBlockToken ();
16471660
1661+ if (isCreateStage ) {
1662+ block .setCurrentBlock (lb .getBlock ());
1663+ }
1664+
16481665 // set up the pipeline again with the remaining nodes
16491666 success = createBlockOutputStream (nodes , storageTypes , storageIDs , newGS ,
16501667 isRecovery );
@@ -1657,6 +1674,7 @@ protected void setupPipelineInternal(DatanodeInfo[] datanodes,
16571674 if (success ) {
16581675 updatePipeline (newGS );
16591676 }
1677+ return success ;
16601678 }
16611679
16621680 /**
@@ -1795,7 +1813,7 @@ DatanodeInfo[] getExcludedNodes() {
17951813 * Must get block ID and the IDs of the destinations from the namenode.
17961814 * Returns the list of target datanodes.
17971815 */
1798- protected LocatedBlock nextBlockOutputStream () throws IOException {
1816+ protected void setupPipelineForCreate () throws IOException {
17991817 LocatedBlock lb ;
18001818 DatanodeInfo [] nodes ;
18011819 StorageType [] nextStorageTypes ;
@@ -1806,6 +1824,7 @@ protected LocatedBlock nextBlockOutputStream() throws IOException {
18061824 do {
18071825 errorState .resetInternalError ();
18081826 lastException .clear ();
1827+ streamerClosed = false ;
18091828
18101829 DatanodeInfo [] excluded = getExcludedNodes ();
18111830 lb = locateFollowingBlock (
@@ -1817,26 +1836,33 @@ protected LocatedBlock nextBlockOutputStream() throws IOException {
18171836 nodes = lb .getLocations ();
18181837 nextStorageTypes = lb .getStorageTypes ();
18191838 nextStorageIDs = lb .getStorageIDs ();
1839+ setPipeline (lb );
1840+ try {
1841+ // Connect to first DataNode in the list.
1842+ success = createBlockOutputStream (nodes , nextStorageTypes , nextStorageIDs , 0L , false )
1843+ || setupPipelineForAppendOrRecovery ();
18201844
1821- // Connect to first DataNode in the list.
1822- success = createBlockOutputStream ( nodes , nextStorageTypes , nextStorageIDs ,
1823- 0L , false ) ;
1824-
1845+ } catch ( IOException ie ) {
1846+ LOG . warn ( "Exception in setupPipelineForCreate " + this , ie );
1847+ success = false ;
1848+ }
18251849 if (!success ) {
18261850 LOG .warn ("Abandoning " + block );
18271851 dfsClient .namenode .abandonBlock (block .getCurrentBlock (),
18281852 stat .getFileId (), src , dfsClient .clientName );
18291853 block .setCurrentBlock (null );
1830- final DatanodeInfo badNode = nodes [errorState .getBadNodeIndex ()];
1854+ final DatanodeInfo badNode = errorState .getBadNodeIndex () == -1
1855+ ? Iterables .getLast (failed )
1856+ : nodes [errorState .getBadNodeIndex ()];
18311857 LOG .warn ("Excluding datanode " + badNode );
18321858 excludedNodes .put (badNode , badNode );
1859+ setPipeline (null , null , null );
18331860 }
18341861 } while (!success && --count >= 0 );
18351862
18361863 if (!success ) {
18371864 throw new IOException ("Unable to create new block." );
18381865 }
1839- return lb ;
18401866 }
18411867
18421868 // connects to the first datanode in the pipeline
0 commit comments