Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;

import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -706,8 +707,8 @@ public void run() {

// get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
LOG.debug("Allocating new block");
setPipeline(nextBlockOutputStream());
LOG.debug("Allocating new block: {}", this);
setupPipelineForCreate();
initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
LOG.debug("Append to block {}", block);
Expand Down Expand Up @@ -1241,7 +1242,8 @@ private boolean processDatanodeError() throws IOException {
streamerClosed = true;
return false;
}
boolean doSleep = setupPipelineForAppendOrRecovery();

setupPipelineForAppendOrRecovery();

if (!streamerClosed && dfsClient.clientRunning) {
if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
Expand Down Expand Up @@ -1275,7 +1277,7 @@ private boolean processDatanodeError() throws IOException {
}
}

return doSleep;
return false;
}

void setHflush() {
Expand Down Expand Up @@ -1449,9 +1451,11 @@ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
* it can be written to.
* This happens when a file is appended or data streaming fails
* It keeps on trying until a pipeline is setup
*
* Returns boolean whether pipeline was setup successfully or not.
* This boolean is used upstream on whether to continue creating pipeline or throw exception
*/
private boolean setupPipelineForAppendOrRecovery() throws IOException {
// check number of datanodes
if (nodes == null || nodes.length == 0) {
String msg = "Could not get block locations. " + "Source file \""
+ src + "\" - Aborting...";
Expand All @@ -1463,23 +1467,35 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {

boolean success = false;
long newGS = 0L;
boolean isCreateStage = BlockConstructionStage.PIPELINE_SETUP_CREATE == stage;
while (!success && !streamerClosed && dfsClient.clientRunning) {
if (!handleRestartingDatanode()) {
return false;
}

final boolean isRecovery = errorState.hasError();
final boolean isRecovery = errorState.hasError() && !isCreateStage;

if (!handleBadDatanode()) {
return false;
}

handleDatanodeReplacement();

// During create stage, min replication should still be satisfied.
if (isCreateStage && !(dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 &&
nodes.length >= dfsClient.dtpReplaceDatanodeOnFailureReplication)) {
return false;
}

// get a new generation stamp and an access token
final LocatedBlock lb = updateBlockForPipeline();
newGS = lb.getBlock().getGenerationStamp();
accessToken = lb.getBlockToken();

if (isCreateStage) {
block.setCurrentBlock(lb.getBlock());
}

// set up the pipeline again with the remaining nodes
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);

Expand All @@ -1491,7 +1507,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {
if (success) {
updatePipeline(newGS);
}
return false; // do not sleep, continue processing
return success;
}

/**
Expand Down Expand Up @@ -1629,17 +1645,18 @@ DatanodeInfo[] getExcludedNodes() {
* Must get block ID and the IDs of the destinations from the namenode.
* Returns the list of target datanodes.
*/
protected LocatedBlock nextBlockOutputStream() throws IOException {
protected void setupPipelineForCreate() throws IOException {
LocatedBlock lb;
DatanodeInfo[] nodes;
StorageType[] storageTypes;
StorageType[] nextStorageTypes;
int count = dfsClient.getConf().getNumBlockWriteRetry();
boolean success;
final ExtendedBlock oldBlock = block.getCurrentBlock();
do {
errorState.reset();
lastException.clear();
success = false;
streamerClosed = false;

DatanodeInfo[] excluded = getExcludedNodes();
lb = locateFollowingBlock(
Expand All @@ -1649,26 +1666,33 @@ protected LocatedBlock nextBlockOutputStream() throws IOException {
bytesSent = 0;
accessToken = lb.getBlockToken();
nodes = lb.getLocations();
storageTypes = lb.getStorageTypes();

// Connect to first DataNode in the list.
success = createBlockOutputStream(nodes, storageTypes, 0L, false);

nextStorageTypes = lb.getStorageTypes();
setPipeline(lb);
try {
// Connect to first DataNode in the list.
success = createBlockOutputStream(nodes, nextStorageTypes, 0L, false)
|| setupPipelineForAppendOrRecovery();
} catch(IOException ie) {
LOG.warn("Exception in setupPipelineForCreate " + this, ie);
success = false;
}
if (!success) {
LOG.warn("Abandoning " + block);
dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
stat.getFileId(), src, dfsClient.clientName);
block.setCurrentBlock(null);
final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
final DatanodeInfo badNode = errorState.getBadNodeIndex() == -1
? Iterables.getLast(failed)
: nodes[errorState.getBadNodeIndex()];
LOG.warn("Excluding datanode " + badNode);
excludedNodes.put(badNode, badNode);
setPipeline(null, null, null);
}
} while (!success && --count >= 0);

if (!success) {
throw new IOException("Unable to create new block.");
}
return lb;
}

// connects to the first datanode in the pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,10 @@ class BlockReceiver implements Closeable {
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist);
replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist, newGs);
if (newGs != 0L) {
block.setGenerationStamp(newGs);
}
datanode.notifyNamenodeReceivingBlock(
block, replicaHandler.getReplica().getStorageUuid());
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,16 @@ ReplicaHandler createTemporary(StorageType storageType,
ReplicaHandler createRbw(StorageType storageType,
ExtendedBlock b, boolean allowLazyPersist) throws IOException;

/**
* Creates a RBW replica and returns the meta info of the replica
*
* @param b block
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
ReplicaHandler createRbw(StorageType storageType,
ExtendedBlock b, boolean allowLazyPersist, long newGS) throws IOException;

/**
* Recovers a RBW replica and returns the meta info of the replica.
*
Expand Down Expand Up @@ -466,7 +476,7 @@ void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
boolean isValidRbw(ExtendedBlock b);

/**
* Invalidates the specified blocks
* Invalidates the specified blocks.
* @param bpid Block pool Id
* @param invalidBlks - the blocks to be invalidated
* @throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1472,13 +1472,29 @@ private void bumpReplicaGS(ReplicaInfo replicaInfo,
public ReplicaHandler createRbw(
StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
throws IOException {
return createRbw(storageType, b, allowLazyPersist, 0L);
}

@Override // FsDatasetSpi
public ReplicaHandler createRbw(
StorageType storageType, ExtendedBlock b,
boolean allowLazyPersist, long newGS) throws IOException {
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
if (replicaInfo != null) {
throw new ReplicaAlreadyExistsException("Block " + b +
" already exists in state " + replicaInfo.getState() +
" and thus cannot be created.");
// In case of retries with same blockPoolId + blockId as before
// with updated GS, cleanup the old replica to avoid
// any multiple copies with same blockPoolId + blockId
if (newGS != 0L) {
cleanupReplica(replicaInfo, replicaInfo.getBlockFile(), replicaInfo.getMetaFile(),
replicaInfo.getBlockFile().length(), replicaInfo.getMetaFile().length(),
b.getBlockPoolId());
} else {
throw new ReplicaAlreadyExistsException("Block " + b +
" already exists in state " + replicaInfo.getState() +
" and thus cannot be created.");
}
}
// create a new block
FsVolumeReference ref = null;
Expand Down Expand Up @@ -3198,16 +3214,21 @@ private void removeOldReplica(ReplicaInfo replicaInfo,
newReplicaInfo.isOnTransientStorage());

// Remove the old replicas
cleanupReplica(replicaInfo, blockFile, metaFile, blockFileUsed, metaFileUsed, bpid);

// If deletion failed then the directory scanner will cleanup the blocks
// eventually.
}

private void cleanupReplica(ReplicaInfo replicaInfo, File blockFile, File metaFile,
long blockFileUsed, long metaFileUsed, final String bpid) {
if (blockFile.delete() || !blockFile.exists()) {
FsVolumeImpl volume = (FsVolumeImpl) replicaInfo.getVolume();
volume.onBlockFileDeletion(bpid, blockFileUsed);
if (metaFile.delete() || !metaFile.exists()) {
volume.onMetaFileDeletion(bpid, metaFileUsed);
}
}

// If deletion failed then the directory scanner will cleanup the blocks
// eventually.
}

class LazyWriter implements Runnable {
Expand Down
Loading