Skip to content

Commit 86c9862

Browse files
author
Lei Xu
committed
HDFS-10636. Modify ReplicaInfo to remove the assumption that replica metadata and data are stored in java.io.File. (Virajith Jalaparti via lei)
1 parent 1c0d18f commit 86c9862

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2219
-1308
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.io.File;
2222
import java.io.IOException;
23+
import java.net.URI;
2324
import java.util.ArrayList;
2425
import java.util.Collection;
2526
import java.util.Collections;
@@ -741,7 +742,20 @@ public boolean isTrashAllowed(File blockFile) {
741742
*
742743
* @return the trash directory for a given block file that is being deleted.
743744
*/
744-
public String getTrashDirectory(File blockFile) {
745+
public String getTrashDirectory(ReplicaInfo info) {
746+
747+
URI blockURI = info.getBlockURI();
748+
try{
749+
File blockFile = new File(blockURI);
750+
return getTrashDirectory(blockFile);
751+
} catch (IllegalArgumentException e) {
752+
LOG.warn("Failed to get block file for replica " + info, e);
753+
}
754+
755+
return null;
756+
}
757+
758+
private String getTrashDirectory(File blockFile) {
745759
if (isTrashAllowed(blockFile)) {
746760
Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent());
747761
String trashDirectory = matcher.replaceFirst("$1$2" + TRASH_ROOT_DIR + "$4");

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ class BlockReceiver implements Closeable {
121121
/** the block to receive */
122122
private final ExtendedBlock block;
123123
/** the replica to write */
124-
private ReplicaInPipelineInterface replicaInfo;
124+
private ReplicaInPipeline replicaInfo;
125125
/** pipeline stage */
126126
private final BlockConstructionStage stage;
127127
private final boolean isTransfer;

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.hadoop.hdfs.HdfsConfiguration;
3939
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
4040
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
41+
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
4142
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
4243
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
4344
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@@ -248,8 +249,8 @@ class BlockSender implements java.io.Closeable {
248249
}
249250
// if there is a write in progress
250251
ChunkChecksum chunkChecksum = null;
251-
if (replica instanceof ReplicaBeingWritten) {
252-
final ReplicaBeingWritten rbw = (ReplicaBeingWritten)replica;
252+
if (replica.getState() == ReplicaState.RBW) {
253+
final ReplicaInPipeline rbw = (ReplicaInPipeline) replica;
253254
waitForMinLength(rbw, startOffset + length);
254255
chunkChecksum = rbw.getLastChecksumAndDataLen();
255256
}
@@ -473,7 +474,7 @@ private static Replica getReplica(ExtendedBlock block, DataNode datanode)
473474
* @param len minimum length to reach
474475
* @throws IOException on failing to reach the len in given wait time
475476
*/
476-
private static void waitForMinLength(ReplicaBeingWritten rbw, long len)
477+
private static void waitForMinLength(ReplicaInPipeline rbw, long len)
477478
throws IOException {
478479
// Wait for 3 seconds for rbw replica to reach the minimum length
479480
for (int i = 0; i < 30 && rbw.getBytesOnDisk() < len; i++) {

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -3474,4 +3474,4 @@ public String getDiskBalancerSetting(String key) throws IOException {
34743474
void setBlockScanner(BlockScanner blockScanner) {
34753475
this.blockScanner = blockScanner;
34763476
}
3477-
}
3477+
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,6 @@ public void noRegistration() throws IOException { }
5656

5757
public void failMirrorConnection() throws IOException { }
5858

59-
public void failPipeline(ReplicaInPipelineInterface replicaInfo,
59+
public void failPipeline(ReplicaInPipeline replicaInfo,
6060
String mirrorAddr) throws IOException { }
6161
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,9 @@ public void clearRollingUpgradeMarker(String bpid) throws IOException {
204204
* @return trash directory if rolling upgrade is in progress, null
205205
* otherwise.
206206
*/
207-
public String getTrashDirectoryForBlockFile(String bpid, File blockFile) {
207+
public String getTrashDirectoryForReplica(String bpid, ReplicaInfo info) {
208208
if (trashEnabledBpids.contains(bpid)) {
209-
return getBPStorage(bpid).getTrashDirectory(blockFile);
209+
return getBPStorage(bpid).getTrashDirectory(info);
210210
}
211211
return null;
212212
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -597,14 +597,14 @@ private void scan() {
597597
diffs.put(bpid, diffRecord);
598598

599599
statsRecord.totalBlocks = blockpoolReport.length;
600-
List<FinalizedReplica> bl = dataset.getFinalizedBlocks(bpid);
601-
FinalizedReplica[] memReport = bl.toArray(new FinalizedReplica[bl.size()]);
600+
List<ReplicaInfo> bl = dataset.getFinalizedBlocks(bpid);
601+
ReplicaInfo[] memReport = bl.toArray(new ReplicaInfo[bl.size()]);
602602
Arrays.sort(memReport); // Sort based on blockId
603603

604604
int d = 0; // index for blockpoolReport
605605
int m = 0; // index for memReprot
606606
while (m < memReport.length && d < blockpoolReport.length) {
607-
FinalizedReplica memBlock = memReport[m];
607+
ReplicaInfo memBlock = memReport[m];
608608
ScanInfo info = blockpoolReport[d];
609609
if (info.getBlockId() < memBlock.getBlockId()) {
610610
if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
@@ -633,7 +633,7 @@ private void scan() {
633633
// or block file length is different than expected
634634
statsRecord.mismatchBlocks++;
635635
addDifference(diffRecord, statsRecord, info);
636-
} else if (info.getBlockFile().compareTo(memBlock.getBlockFile()) != 0) {
636+
} else if (memBlock.compareWith(info) != 0) {
637637
// volumeMap record and on-disk files don't match.
638638
statsRecord.duplicateBlocks++;
639639
addDifference(diffRecord, statsRecord, info);
@@ -652,7 +652,7 @@ private void scan() {
652652
}
653653
}
654654
while (m < memReport.length) {
655-
FinalizedReplica current = memReport[m++];
655+
ReplicaInfo current = memReport[m++];
656656
addDifference(diffRecord, statsRecord,
657657
current.getBlockId(), current.getVolume());
658658
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java

+26-1
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@
2222
import org.apache.hadoop.hdfs.protocol.Block;
2323
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
2424
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
25+
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
2526

2627
/**
2728
* This class describes a replica that has been finalized.
2829
*/
29-
public class FinalizedReplica extends ReplicaInfo {
30+
public class FinalizedReplica extends LocalReplica {
3031

3132
/**
3233
* Constructor
@@ -88,4 +89,28 @@ public int hashCode() {
8889
public String toString() {
8990
return super.toString();
9091
}
92+
93+
@Override
94+
public ReplicaInfo getOriginalReplica() {
95+
throw new UnsupportedOperationException("Replica of type " + getState() +
96+
" does not support getOriginalReplica");
97+
}
98+
99+
@Override
100+
public long getRecoveryID() {
101+
throw new UnsupportedOperationException("Replica of type " + getState() +
102+
" does not support getRecoveryID");
103+
}
104+
105+
@Override
106+
public void setRecoveryID(long recoveryId) {
107+
throw new UnsupportedOperationException("Replica of type " + getState() +
108+
" does not support setRecoveryID");
109+
}
110+
111+
@Override
112+
public ReplicaRecoveryInfo createInfo() {
113+
throw new UnsupportedOperationException("Replica of type " + getState() +
114+
" does not support createInfo");
115+
}
91116
}

0 commit comments

Comments
 (0)