Skip to content

Commit a665fe1

Browse files
authored
HDFS-17299. Adding rack failure tolerance when creating a new file (#6614)
1 parent 3b4fe79 commit a665fe1

File tree

7 files changed

+245
-25
lines changed

7 files changed

+245
-25
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
import com.google.common.cache.RemovalListener;
9595
import com.google.common.cache.RemovalNotification;
9696

97+
import com.google.common.collect.Iterables;
9798
import org.slf4j.Logger;
9899
import org.slf4j.LoggerFactory;
99100

@@ -706,8 +707,8 @@ public void run() {
706707

707708
// get new block from namenode.
708709
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
709-
LOG.debug("Allocating new block");
710-
setPipeline(nextBlockOutputStream());
710+
LOG.debug("Allocating new block: {}", this);
711+
setupPipelineForCreate();
711712
initDataStreaming();
712713
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
713714
LOG.debug("Append to block {}", block);
@@ -1241,7 +1242,8 @@ private boolean processDatanodeError() throws IOException {
12411242
streamerClosed = true;
12421243
return false;
12431244
}
1244-
boolean doSleep = setupPipelineForAppendOrRecovery();
1245+
1246+
setupPipelineForAppendOrRecovery();
12451247

12461248
if (!streamerClosed && dfsClient.clientRunning) {
12471249
if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
@@ -1275,7 +1277,7 @@ private boolean processDatanodeError() throws IOException {
12751277
}
12761278
}
12771279

1278-
return doSleep;
1280+
return false;
12791281
}
12801282

12811283
void setHflush() {
@@ -1449,9 +1451,11 @@ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
14491451
* it can be written to.
14501452
* This happens when a file is appended or data streaming fails
14511453
* It keeps on trying until a pipeline is setup
1454+
*
1455+
* Returns boolean whether pipeline was setup successfully or not.
1456+
* This boolean is used upstream on whether to continue creating pipeline or throw exception
14521457
*/
14531458
private boolean setupPipelineForAppendOrRecovery() throws IOException {
1454-
// check number of datanodes
14551459
if (nodes == null || nodes.length == 0) {
14561460
String msg = "Could not get block locations. " + "Source file \""
14571461
+ src + "\" - Aborting...";
@@ -1463,23 +1467,35 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {
14631467

14641468
boolean success = false;
14651469
long newGS = 0L;
1470+
boolean isCreateStage = BlockConstructionStage.PIPELINE_SETUP_CREATE == stage;
14661471
while (!success && !streamerClosed && dfsClient.clientRunning) {
14671472
if (!handleRestartingDatanode()) {
14681473
return false;
14691474
}
14701475

1471-
final boolean isRecovery = errorState.hasError();
1476+
final boolean isRecovery = errorState.hasError() && !isCreateStage;
1477+
14721478
if (!handleBadDatanode()) {
14731479
return false;
14741480
}
14751481

14761482
handleDatanodeReplacement();
14771483

1484+
// During create stage, min replication should still be satisfied.
1485+
if (isCreateStage && !(dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 &&
1486+
nodes.length >= dfsClient.dtpReplaceDatanodeOnFailureReplication)) {
1487+
return false;
1488+
}
1489+
14781490
// get a new generation stamp and an access token
14791491
final LocatedBlock lb = updateBlockForPipeline();
14801492
newGS = lb.getBlock().getGenerationStamp();
14811493
accessToken = lb.getBlockToken();
14821494

1495+
if (isCreateStage) {
1496+
block.setCurrentBlock(lb.getBlock());
1497+
}
1498+
14831499
// set up the pipeline again with the remaining nodes
14841500
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
14851501

@@ -1491,7 +1507,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {
14911507
if (success) {
14921508
updatePipeline(newGS);
14931509
}
1494-
return false; // do not sleep, continue processing
1510+
return success;
14951511
}
14961512

14971513
/**
@@ -1629,17 +1645,18 @@ DatanodeInfo[] getExcludedNodes() {
16291645
* Must get block ID and the IDs of the destinations from the namenode.
16301646
* Returns the list of target datanodes.
16311647
*/
1632-
protected LocatedBlock nextBlockOutputStream() throws IOException {
1648+
protected void setupPipelineForCreate() throws IOException {
16331649
LocatedBlock lb;
16341650
DatanodeInfo[] nodes;
1635-
StorageType[] storageTypes;
1651+
StorageType[] nextStorageTypes;
16361652
int count = dfsClient.getConf().getNumBlockWriteRetry();
16371653
boolean success;
16381654
final ExtendedBlock oldBlock = block.getCurrentBlock();
16391655
do {
16401656
errorState.reset();
16411657
lastException.clear();
16421658
success = false;
1659+
streamerClosed = false;
16431660

16441661
DatanodeInfo[] excluded = getExcludedNodes();
16451662
lb = locateFollowingBlock(
@@ -1649,26 +1666,33 @@ protected LocatedBlock nextBlockOutputStream() throws IOException {
16491666
bytesSent = 0;
16501667
accessToken = lb.getBlockToken();
16511668
nodes = lb.getLocations();
1652-
storageTypes = lb.getStorageTypes();
1653-
1654-
// Connect to first DataNode in the list.
1655-
success = createBlockOutputStream(nodes, storageTypes, 0L, false);
1656-
1669+
nextStorageTypes = lb.getStorageTypes();
1670+
setPipeline(lb);
1671+
try {
1672+
// Connect to first DataNode in the list.
1673+
success = createBlockOutputStream(nodes, nextStorageTypes, 0L, false)
1674+
|| setupPipelineForAppendOrRecovery();
1675+
} catch(IOException ie) {
1676+
LOG.warn("Exception in setupPipelineForCreate " + this, ie);
1677+
success = false;
1678+
}
16571679
if (!success) {
16581680
LOG.warn("Abandoning " + block);
16591681
dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
16601682
stat.getFileId(), src, dfsClient.clientName);
16611683
block.setCurrentBlock(null);
1662-
final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
1684+
final DatanodeInfo badNode = errorState.getBadNodeIndex() == -1
1685+
? Iterables.getLast(failed)
1686+
: nodes[errorState.getBadNodeIndex()];
16631687
LOG.warn("Excluding datanode " + badNode);
16641688
excludedNodes.put(badNode, badNode);
1689+
setPipeline(null, null, null);
16651690
}
16661691
} while (!success && --count >= 0);
16671692

16681693
if (!success) {
16691694
throw new IOException("Unable to create new block.");
16701695
}
1671-
return lb;
16721696
}
16731697

16741698
// connects to the first datanode in the pipeline

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,10 @@ class BlockReceiver implements Closeable {
212212
} else {
213213
switch (stage) {
214214
case PIPELINE_SETUP_CREATE:
215-
replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist);
215+
replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist, newGs);
216+
if (newGs != 0L) {
217+
block.setGenerationStamp(newGs);
218+
}
216219
datanode.notifyNamenodeReceivingBlock(
217220
block, replicaHandler.getReplica().getStorageUuid());
218221
break;

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,16 @@ ReplicaHandler createTemporary(StorageType storageType,
333333
ReplicaHandler createRbw(StorageType storageType,
334334
ExtendedBlock b, boolean allowLazyPersist) throws IOException;
335335

336+
/**
337+
* Creates a RBW replica and returns the meta info of the replica
338+
*
339+
* @param b block
340+
* @return the meta info of the replica which is being written to
341+
* @throws IOException if an error occurs
342+
*/
343+
ReplicaHandler createRbw(StorageType storageType,
344+
ExtendedBlock b, boolean allowLazyPersist, long newGS) throws IOException;
345+
336346
/**
337347
* Recovers a RBW replica and returns the meta info of the replica.
338348
*
@@ -466,7 +476,7 @@ void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
466476
boolean isValidRbw(ExtendedBlock b);
467477

468478
/**
469-
* Invalidates the specified blocks
479+
* Invalidates the specified blocks.
470480
* @param bpid Block pool Id
471481
* @param invalidBlks - the blocks to be invalidated
472482
* @throws IOException

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1472,13 +1472,29 @@ private void bumpReplicaGS(ReplicaInfo replicaInfo,
14721472
public ReplicaHandler createRbw(
14731473
StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
14741474
throws IOException {
1475+
return createRbw(storageType, b, allowLazyPersist, 0L);
1476+
}
1477+
1478+
@Override // FsDatasetSpi
1479+
public ReplicaHandler createRbw(
1480+
StorageType storageType, ExtendedBlock b,
1481+
boolean allowLazyPersist, long newGS) throws IOException {
14751482
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
14761483
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
14771484
b.getBlockId());
14781485
if (replicaInfo != null) {
1479-
throw new ReplicaAlreadyExistsException("Block " + b +
1480-
" already exists in state " + replicaInfo.getState() +
1481-
" and thus cannot be created.");
1486+
// In case of retries with same blockPoolId + blockId as before
1487+
// with updated GS, cleanup the old replica to avoid
1488+
// any multiple copies with same blockPoolId + blockId
1489+
if (newGS != 0L) {
1490+
cleanupReplica(replicaInfo, replicaInfo.getBlockFile(), replicaInfo.getMetaFile(),
1491+
replicaInfo.getBlockFile().length(), replicaInfo.getMetaFile().length(),
1492+
b.getBlockPoolId());
1493+
} else {
1494+
throw new ReplicaAlreadyExistsException("Block " + b +
1495+
" already exists in state " + replicaInfo.getState() +
1496+
" and thus cannot be created.");
1497+
}
14821498
}
14831499
// create a new block
14841500
FsVolumeReference ref = null;
@@ -3198,16 +3214,21 @@ private void removeOldReplica(ReplicaInfo replicaInfo,
31983214
newReplicaInfo.isOnTransientStorage());
31993215

32003216
// Remove the old replicas
3217+
cleanupReplica(replicaInfo, blockFile, metaFile, blockFileUsed, metaFileUsed, bpid);
3218+
3219+
// If deletion failed then the directory scanner will cleanup the blocks
3220+
// eventually.
3221+
}
3222+
3223+
private void cleanupReplica(ReplicaInfo replicaInfo, File blockFile, File metaFile,
3224+
long blockFileUsed, long metaFileUsed, final String bpid) {
32013225
if (blockFile.delete() || !blockFile.exists()) {
32023226
FsVolumeImpl volume = (FsVolumeImpl) replicaInfo.getVolume();
32033227
volume.onBlockFileDeletion(bpid, blockFileUsed);
32043228
if (metaFile.delete() || !metaFile.exists()) {
32053229
volume.onMetaFileDeletion(bpid, metaFileUsed);
32063230
}
32073231
}
3208-
3209-
// If deletion failed then the directory scanner will cleanup the blocks
3210-
// eventually.
32113232
}
32123233

32133234
class LazyWriter implements Runnable {

0 commit comments

Comments
 (0)