9494import com .google .common .cache .RemovalListener ;
9595import com .google .common .cache .RemovalNotification ;
9696
97+ import com .google .common .collect .Iterables ;
9798import org .slf4j .Logger ;
9899import org .slf4j .LoggerFactory ;
99100
@@ -607,6 +608,10 @@ private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
607608 this .storageIDs = storageIDs ;
608609 }
609610
611+ void setAccessToken (Token <BlockTokenIdentifier > t ) {
612+ this .accessToken = t ;
613+ }
614+
610615 /**
611616 * Initialize for data streaming
612617 */
@@ -706,8 +711,8 @@ public void run() {
706711
707712 // get new block from namenode.
708713 if (stage == BlockConstructionStage .PIPELINE_SETUP_CREATE ) {
709- LOG .debug ("Allocating new block" );
710- setPipeline ( nextBlockOutputStream () );
714+ LOG .debug ("Allocating new block: {}" , this );
715+ setupPipelineForCreate ( );
711716 initDataStreaming ();
712717 } else if (stage == BlockConstructionStage .PIPELINE_SETUP_APPEND ) {
713718 LOG .debug ("Append to block {}" , block );
@@ -1449,9 +1454,11 @@ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
14491454 * it can be written to.
14501455 * This happens when a file is appended or data streaming fails
14511456 * It keeps on trying until a pipeline is setup
1457+ *
1458+ * Returns boolean whether pipeline was setup successfully or not.
1459+ * This boolean is used upstream on whether to continue creating pipeline or throw exception
14521460 */
14531461 private boolean setupPipelineForAppendOrRecovery () throws IOException {
1454- // check number of datanodes
14551462 if (nodes == null || nodes .length == 0 ) {
14561463 String msg = "Could not get block locations. " + "Source file \" "
14571464 + src + "\" - Aborting..." ;
@@ -1463,23 +1470,35 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {
14631470
14641471 boolean success = false ;
14651472 long newGS = 0L ;
1473+ boolean isCreateStage = BlockConstructionStage .PIPELINE_SETUP_CREATE == stage ;
14661474 while (!success && !streamerClosed && dfsClient .clientRunning ) {
14671475 if (!handleRestartingDatanode ()) {
14681476 return false ;
14691477 }
14701478
1471- final boolean isRecovery = errorState .hasError ();
1479+ final boolean isRecovery = errorState .hasError () && !isCreateStage ;
1480+
14721481 if (!handleBadDatanode ()) {
14731482 return false ;
14741483 }
14751484
14761485 handleDatanodeReplacement ();
14771486
1487+ // During create stage, min replication should still be satisfied.
1488+ if (isCreateStage && !(dfsClient .dtpReplaceDatanodeOnFailureReplication > 0 &&
1489+ nodes .length >= dfsClient .dtpReplaceDatanodeOnFailureReplication )) {
1490+ return false ;
1491+ }
1492+
14781493 // get a new generation stamp and an access token
14791494 final LocatedBlock lb = updateBlockForPipeline ();
14801495 newGS = lb .getBlock ().getGenerationStamp ();
14811496 accessToken = lb .getBlockToken ();
14821497
1498+ if (isCreateStage ) {
1499+ block .setCurrentBlock (lb .getBlock ());
1500+ }
1501+
14831502 // set up the pipeline again with the remaining nodes
14841503 success = createBlockOutputStream (nodes , storageTypes , newGS , isRecovery );
14851504
@@ -1491,7 +1510,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {
14911510 if (success ) {
14921511 updatePipeline (newGS );
14931512 }
1494- return false ; // do not sleep, continue processing
1513+ return success ;
14951514 }
14961515
14971516 /**
@@ -1629,17 +1648,18 @@ DatanodeInfo[] getExcludedNodes() {
16291648 * Must get block ID and the IDs of the destinations from the namenode.
16301649 * Returns the list of target datanodes.
16311650 */
1632- protected LocatedBlock nextBlockOutputStream () throws IOException {
1651+ protected void setupPipelineForCreate () throws IOException {
16331652 LocatedBlock lb ;
16341653 DatanodeInfo [] nodes ;
1635- StorageType [] storageTypes ;
1654+ StorageType [] nextStorageTypes ;
16361655 int count = dfsClient .getConf ().getNumBlockWriteRetry ();
16371656 boolean success ;
16381657 final ExtendedBlock oldBlock = block .getCurrentBlock ();
16391658 do {
16401659 errorState .reset ();
16411660 lastException .clear ();
16421661 success = false ;
1662+ streamerClosed = false ;
16431663
16441664 DatanodeInfo [] excluded = getExcludedNodes ();
16451665 lb = locateFollowingBlock (
@@ -1649,26 +1669,34 @@ protected LocatedBlock nextBlockOutputStream() throws IOException {
16491669 bytesSent = 0 ;
16501670 accessToken = lb .getBlockToken ();
16511671 nodes = lb .getLocations ();
1652- storageTypes = lb .getStorageTypes ();
1653-
1654- // Connect to first DataNode in the list.
1655- success = createBlockOutputStream (nodes , storageTypes , 0L , false );
1672+ nextStorageTypes = lb .getStorageTypes ();
1673+ setPipeline (lb );
1674+ try {
1675+ // Connect to first DataNode in the list.
1676+ success = createBlockOutputStream (nodes , nextStorageTypes , 0L , false )
1677+ || setupPipelineForAppendOrRecovery ();
16561678
1679+ } catch (IOException ie ) {
1680+ LOG .warn ("Exception in setupPipelineForCreate " + this , ie );
1681+ success = false ;
1682+ }
16571683 if (!success ) {
16581684 LOG .warn ("Abandoning " + block );
16591685 dfsClient .namenode .abandonBlock (block .getCurrentBlock (),
16601686 stat .getFileId (), src , dfsClient .clientName );
16611687 block .setCurrentBlock (null );
1662- final DatanodeInfo badNode = nodes [errorState .getBadNodeIndex ()];
1688+ final DatanodeInfo badNode = errorState .getBadNodeIndex () == -1
1689+ ? Iterables .getLast (failed )
1690+ : nodes [errorState .getBadNodeIndex ()];
16631691 LOG .warn ("Excluding datanode " + badNode );
16641692 excludedNodes .put (badNode , badNode );
1693+ setPipeline (null , null , null );
16651694 }
16661695 } while (!success && --count >= 0 );
16671696
16681697 if (!success ) {
16691698 throw new IOException ("Unable to create new block." );
16701699 }
1671- return lb ;
16721700 }
16731701
16741702 // connects to the first datanode in the pipeline
0 commit comments