Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
| `NumProcessedCommands` | Num of processed commands of all BPServiceActors |
| `ProcessedCommandsOpNumOps` | Total number of processed commands operations |
| `ProcessedCommandsOpAvgTime` | Average time of processed commands operations in milliseconds |
| `NullStorageBlockReports` | Number of blocks in IBRs that failed due to null storage |

FsVolume
--------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Sets;

Expand Down Expand Up @@ -324,6 +325,12 @@ private void notifyNamenodeBlock(ExtendedBlock block, BlockStatus status,
final ReceivedDeletedBlockInfo info = new ReceivedDeletedBlockInfo(
block.getLocalBlock(), status, delHint);
final DatanodeStorage storage = dn.getFSDataset().getStorage(storageUuid);
if (storage == null) {
LOG.warn("Trying to add RDBI for null storage UUID {}. Trace: {}", storageUuid,
Joiner.on("\n").join(Thread.currentThread().getStackTrace()));
getDataNode().getMetrics().incrNullStorageBlockReports();
return;
}

for (BPServiceActor actor : bpServices) {
actor.getIbrManager().notifyNamenodeBlock(info, storage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4057,7 +4057,8 @@ public void checkDiskError() throws IOException {
}
}

private void handleVolumeFailures(Set<FsVolumeSpi> unhealthyVolumes) {
@VisibleForTesting
public void handleVolumeFailures(Set<FsVolumeSpi> unhealthyVolumes) {
if (unhealthyVolumes.isEmpty()) {
LOG.debug("handleVolumeFailures done with empty " +
"unhealthyVolumes");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,10 @@ public void delayDiffRecord() {}
* Just delay getMetaDataInputStream a while.
*/
public void delayGetMetaDataInputStream() {}

/**
* Used in {@link DirectoryScanner#reconcile()} to wait until a storage is removed,
* leaving a stale copy of {@link DirectoryScanner#diffs}.
*/
public void waitUntilStorageRemoved() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ void shutdown() {
public void reconcile() throws IOException {
LOG.debug("reconcile start DirectoryScanning");
scan();

DataNodeFaultInjector.get().waitUntilStorageRemoved();
// HDFS-14476: run checkAndUpdate with batch to avoid holding the lock too
// long
int loopCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2745,8 +2745,12 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo)
curDirScannerNotifyCount = 0;
lastDirScannerNotifyTime = startTimeMs;
}
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid,
vol.getStorageID())) {
String storageUuid = vol.getStorageID();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid, storageUuid)) {
if (!storageMap.containsKey(storageUuid)) {
// Storage was already removed
return;
}
memBlockInfo = volumeMap.get(bpid, blockId);
if (memBlockInfo != null &&
memBlockInfo.getState() != ReplicaState.FINALIZED) {
Expand Down Expand Up @@ -2833,7 +2837,7 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo)
maxDirScannerNotifyCount++;
datanode.notifyNamenodeReceivedBlock(
new ExtendedBlock(bpid, diskBlockInfo), null,
vol.getStorageID(), vol.isTransientStorage());
storageUuid, vol.isTransientStorage());
}
if (vol.isTransientStorage()) {
long lockedBytesReserved =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ public class DataNodeMetrics {
private MutableCounterLong numProcessedCommands;
@Metric("Rate of processed commands of all BPServiceActors")
private MutableRate processedCommandsOp;
@Metric("Number of blocks in IBRs that failed due to null storage")
private MutableCounterLong nullStorageBlockReports;

// FsDatasetImpl local file process metrics.
@Metric private MutableRate createRbwOp;
Expand Down Expand Up @@ -812,4 +814,7 @@ public void incrReplaceBlockOpToOtherHost() {
replaceBlockOpToOtherHost.incr();
}

public void incrNullStorageBlockReports() {
nullStorageBlockReports.incr();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public class TestBPOfferService {
private FsDatasetSpi<?> mockFSDataset;
private DataSetLockManager dataSetLockManager = new DataSetLockManager();
private boolean isSlownode;
private String mockStorageID;

@Before
public void setupMocks() throws Exception {
Expand All @@ -157,6 +158,7 @@ public void setupMocks() throws Exception {
// Set up a simulated dataset with our fake BP
mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf));
mockFSDataset.addBlockPool(FAKE_BPID, conf);
mockStorageID = ((SimulatedFSDataset) mockFSDataset).getStorages().get(0).getStorageUuid();

// Wire the dataset to the DN.
Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset();
Expand Down Expand Up @@ -289,7 +291,7 @@ public void testBasicFunctionality() throws Exception {
waitForBlockReport(mockNN2);

// When we receive a block, it should report it to both NNs
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false);
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, mockStorageID, false);

ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1);
assertEquals(1, ret.length);
Expand Down Expand Up @@ -1099,7 +1101,7 @@ public void testRefreshNameNodes() throws Exception {
waitForBlockReport(mockNN2);

// When we receive a block, it should report it to both NNs
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false);
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, mockStorageID, false);

ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK,
mockNN1);
Expand Down Expand Up @@ -1140,7 +1142,7 @@ public void testRefreshNameNodes() throws Exception {
Mockito.verify(mockNN3).registerDatanode(Mockito.any());

// When we receive a block, it should report it to both NNs
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false);
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, mockStorageID, false);

// veridfy new NN recieved block report
ret = waitForBlockReceived(FAKE_BLOCK, mockNN3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -1420,4 +1422,50 @@ private void writeFile(FileSystem fs, int numFiles) throws IOException {
DFSTestUtil.createFile(fs, filePath, 1, (short) 1, 0);
}
}

@Test(timeout = 30000)
public void testNullStorage() throws Exception {
DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();

Configuration conf = getConfiguration();
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
bpid = cluster.getNamesystem().getBlockPoolId();
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, false);
// Make sure checkAndUpdate will run
truncateBlockFile();

// Mock a volume corruption after DirectoryScanner.scan() but before checkAndUpdate()
FsVolumeImpl volumeToRemove = fds.getVolumeList().get(0);
DataNodeFaultInjector injector = new DataNodeFaultInjector() {
@Override
public void waitUntilStorageRemoved() {
Set<FsVolumeSpi> volumesToRemove = new HashSet<>();
volumesToRemove.add(volumeToRemove);
cluster.getDataNodes().get(0).handleVolumeFailures(volumesToRemove);
}
};
DataNodeFaultInjector.set(injector);

GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(DataNode.LOG);
scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
scanner.reconcile();
assertFalse(logCapturer.getOutput()
.contains("Trying to add RDBI for null storage UUID " + volumeToRemove.getStorageID()));
} finally {
if (scanner != null) {
scanner.shutdown();
scanner = null;
}
cluster.shutdown();
DataNodeFaultInjector.set(oldInjector);
}
}
}