Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
Expand Down Expand Up @@ -114,6 +115,17 @@ public long getMostRecentCheckpointTxId() throws IOException {
return rpcServer.invokeAtAvailableNs(method, long.class);
}

@Override
public long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf)
throws IOException {
rpcServer.checkOperation(OperationCategory.READ);

RemoteMethod method =
new RemoteMethod(NamenodeProtocol.class, "getMostRecentNameNodeFileTxId",
new Class<?>[] {NNStorage.NameNodeFile.class}, nnf);
return rpcServer.invokeAtAvailableNs(method, long.class);
}

@Override
public CheckpointSignature rollEditLog() throws IOException {
rpcServer.checkOperation(OperationCategory.WRITE, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
Expand Down Expand Up @@ -1640,6 +1641,12 @@ public long getMostRecentCheckpointTxId() throws IOException {
return nnProto.getMostRecentCheckpointTxId();
}

@Override // NamenodeProtocol
public long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf)
throws IOException {
return nnProto.getMostRecentNameNodeFileTxId(nnf);
}

@Override // NamenodeProtocol
public CheckpointSignature rollEditLog() throws IOException {
return nnProto.rollEditLog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentNameNodeFileTxIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentNameNodeFileTxIdResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
Expand All @@ -51,6 +53,7 @@
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointResponseProto;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
Expand Down Expand Up @@ -141,6 +144,20 @@ public GetMostRecentCheckpointTxIdResponseProto getMostRecentCheckpointTxId(
return GetMostRecentCheckpointTxIdResponseProto.newBuilder().setTxId(txid).build();
}

@Override
public GetMostRecentNameNodeFileTxIdResponseProto getMostRecentNameNodeFileTxId(
RpcController unused, GetMostRecentNameNodeFileTxIdRequestProto request)
throws ServiceException {
long txid;
try {
txid = impl.getMostRecentNameNodeFileTxId(
NNStorage.NameNodeFile.valueOf(request.getNameNodeFile()));
} catch (IOException e) {
throw new ServiceException(e);
}
return GetMostRecentNameNodeFileTxIdResponseProto.newBuilder().setTxId(txid).build();
}


@Override
public RollEditLogResponseProto rollEditLog(RpcController unused,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentNameNodeFileTxIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
Expand All @@ -46,6 +47,7 @@
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointRequestProto;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
Expand Down Expand Up @@ -134,6 +136,14 @@ public long getMostRecentCheckpointTxId() throws IOException {
GetMostRecentCheckpointTxIdRequestProto.getDefaultInstance()).getTxId());
}

@Override
public long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf) throws IOException {
return ipc(() -> rpcProxy.getMostRecentNameNodeFileTxId(NULL_CONTROLLER,
GetMostRecentNameNodeFileTxIdRequestProto.newBuilder()
.setNameNodeFile(nnf.toString()).build()).getTxId());

}

@Override
public CheckpointSignature rollEditLog() throws IOException {
return PBHelper.convert(ipc(() -> rpcProxy.rollEditLog(NULL_CONTROLLER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1562,4 +1562,31 @@ public void updateLastAppliedTxIdFromWritten() {
public long getMostRecentCheckpointTxId() {
return storage.getMostRecentCheckpointTxId();
}

/**
* Given a NameNodeFile type, retrieve the latest txid for that file or {@link
* HdfsServerConstants#INVALID_TXID} if the file does not exist.
*
* @param nnf The NameNodeFile type to retrieve the latest txid from.
* @return the latest txid for the NameNodeFile type, or {@link
* HdfsServerConstants#INVALID_TXID} if there is no FSImage file of the type
* requested.
* @throws IOException
*/
public long getMostRecentNameNodeFileTxId(NameNodeFile nnf)
throws IOException {
final FSImageStorageInspector inspector =
new FSImageTransactionalStorageInspector(EnumSet.of(nnf));
storage.inspectStorageDirs(inspector);
try {
List<FSImageFile> images = inspector.getLatestImages();
if (images != null && !images.isEmpty()) {
return images.get(0).getCheckpointTxId();
} else {
return HdfsServerConstants.INVALID_TXID;
}
} catch (FileNotFoundException e) {
return HdfsServerConstants.INVALID_TXID;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,14 @@ public long getMostRecentCheckpointTxId() throws IOException {
namesystem.checkSuperuserPrivilege(operationName);
return namesystem.getFSImage().getMostRecentCheckpointTxId();
}

@Override // NamenodeProtocol
public long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.UNCHECKED);
namesystem.checkSuperuserPrivilege();
return namesystem.getFSImage().getMostRecentNameNodeFileTxId(nnf);
}

@Override // NamenodeProtocol
public CheckpointSignature rollEditLog() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ private int doRun() throws IOException {
}

// download the fsimage from active namenode
int download = downloadImage(storage, proxy, proxyInfo);
int download = downloadImage(storage, proxy, proxyInfo, isRollingUpgrade);
if (download != 0) {
return download;
}
Expand Down Expand Up @@ -351,12 +351,32 @@ private void doUpgrade(NNStorage storage) throws IOException {
}
}

private int downloadImage(NNStorage storage, NamenodeProtocol proxy, RemoteNameNodeInfo proxyInfo)
private int downloadImage(NNStorage storage, NamenodeProtocol proxy, RemoteNameNodeInfo proxyInfo,
boolean isRollingUpgrade)
throws IOException {
// Load the newly formatted image, using all of the directories
// (including shared edits)
final long imageTxId = proxy.getMostRecentCheckpointTxId();
final long curTxId = proxy.getTransactionID();

if (isRollingUpgrade) {
final long rollbackTxId =
proxy.getMostRecentNameNodeFileTxId(NameNodeFile.IMAGE_ROLLBACK);
assert rollbackTxId != HdfsServerConstants.INVALID_TXID :
"Expected a valid TXID for fsimage_rollback file";
FSImage rollbackImage = new FSImage(conf);
try {
rollbackImage.getStorage().setStorageInfo(storage);
MD5Hash hash = TransferFsImage.downloadImageToStorage(
proxyInfo.getHttpAddress(), rollbackTxId, storage, true, true);
rollbackImage.saveDigestAndRenameCheckpointImage(
NameNodeFile.IMAGE_ROLLBACK, rollbackTxId, hash);
} catch (IOException ioe) {
throw ioe;
} finally {
rollbackImage.close();
}
}
FSImage image = new FSImage(conf);
try {
image.getStorage().setStorageInfo(storage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
import org.apache.hadoop.io.retry.AtMostOnce;
import org.apache.hadoop.io.retry.Idempotent;
Expand Down Expand Up @@ -111,6 +112,12 @@ BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
@Idempotent
public long getMostRecentCheckpointTxId() throws IOException;

/**
* Get the transaction ID of the most recent checkpoint for the given NameNodeFile.
*/
@Idempotent
long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf) throws IOException;

/**
* Closes the current edit log and opens a new one. The
* call fails if the file system is in SafeMode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ message GetMostRecentCheckpointTxIdResponseProto{
required uint64 txId = 1;
}

message GetMostRecentNameNodeFileTxIdRequestProto {
required string nameNodeFile = 1;
}

message GetMostRecentNameNodeFileTxIdResponseProto{
required uint64 txId = 1;
}

/**
* registration - Namenode reporting the error
* errorCode - error code indicating the error
Expand Down Expand Up @@ -253,6 +261,12 @@ service NamenodeProtocolService {
rpc getMostRecentCheckpointTxId(GetMostRecentCheckpointTxIdRequestProto)
returns(GetMostRecentCheckpointTxIdResponseProto);

/**
* Get the transaction ID of the NameNodeFile
*/
rpc getMostRecentNameNodeFileTxId(GetMostRecentNameNodeFileTxIdRequestProto)
returns(GetMostRecentNameNodeFileTxIdResponseProto);

/**
* Close the current editlog and open a new one for checkpointing purposes
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,23 @@ public static void assertNNHasCheckpoints(MiniDFSCluster cluster,
}
}

public static void assertNNHasRollbackCheckpoints(MiniDFSCluster cluster,
int nnIdx, List<Integer> txids) {

for (File nameDir : getNameNodeCurrentDirs(cluster, nnIdx)) {
LOG.info("examining name dir with files: {}",
Joiner.on(",").join(nameDir.listFiles()));
// Should have fsimage_N for the three checkpoints
LOG.info("Examining storage dir {} with contents: {}", nameDir,
StringUtils.join(nameDir.listFiles(), ", "));
for (long checkpointTxId : txids) {
File image = new File(nameDir,
NNStorage.getRollbackImageFileName(checkpointTxId));
assertTrue("Expected non-empty " + image, image.length() > 0);
}
}
}

public static List<File> getNameNodeCurrentDirs(MiniDFSCluster cluster, int nnIdx) {
List<File> nameDirs = Lists.newArrayList();
for (URI u : cluster.getNameDirs(nnIdx)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.LambdaTestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -189,7 +190,8 @@ public void testDownloadingLaterCheckpoint() throws Exception {
*/
@Test
public void testRollingUpgradeBootstrapStandby() throws Exception {
removeStandbyNameDirs();
// This node is needed to create the rollback fsimage
cluster.restartNameNode(1);

int futureVersion = NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1;

Expand All @@ -208,12 +210,18 @@ public void testRollingUpgradeBootstrapStandby() throws Exception {

// BootstrapStandby should fail if the node has a future version
// and the cluster isn't in rolling upgrade
bs.setConf(cluster.getConfiguration(1));
bs.setConf(cluster.getConfiguration(2));
assertEquals("BootstrapStandby should return ERR_CODE_INVALID_VERSION",
ERR_CODE_INVALID_VERSION, bs.run(new String[]{"-force"}));

// Start rolling upgrade
fs.rollingUpgrade(RollingUpgradeAction.PREPARE);
LambdaTestUtils.await(60000, 1000, () ->
fs.rollingUpgrade(RollingUpgradeAction.QUERY).createdRollbackImages());
// After the rollback image is created the standby is not needed
cluster.shutdownNameNode(1);
removeStandbyNameDirs();

nn0 = spy(nn0);

// Make nn0 think it is a future version
Expand All @@ -237,6 +245,9 @@ public void testRollingUpgradeBootstrapStandby() throws Exception {

long expectedCheckpointTxId = NameNodeAdapter.getNamesystem(nn0)
.getFSImage().getMostRecentCheckpointTxId();
long expectedRollbackTxId = NameNodeAdapter.getNamesystem(nn0)
.getFSImage().getMostRecentNameNodeFileTxId(
NNStorage.NameNodeFile.IMAGE_ROLLBACK);
assertEquals(11, expectedCheckpointTxId);

for (int i = 1; i < maxNNCount; i++) {
Expand All @@ -245,6 +256,8 @@ public void testRollingUpgradeBootstrapStandby() throws Exception {
bs.run(new String[]{"-force"});
FSImageTestUtil.assertNNHasCheckpoints(cluster, i,
ImmutableList.of((int) expectedCheckpointTxId));
FSImageTestUtil.assertNNHasRollbackCheckpoints(cluster, i,
ImmutableList.of((int) expectedRollbackTxId));
}

// Make sure the bootstrap was successful
Expand All @@ -253,6 +266,14 @@ public void testRollingUpgradeBootstrapStandby() throws Exception {
// We should now be able to start the standby successfully
restartNameNodesFromIndex(1, "-rollingUpgrade", "started");

for (int i = 1; i < maxNNCount; i++) {
NameNode nn = cluster.getNameNode(i);
assertTrue("NameNodes should all have the rollback FSImage",
nn.getFSImage().hasRollbackFSImage());
assertTrue("NameNodes should all be inRollingUpgrade",
nn.getNamesystem().isRollingUpgrade());
}

// Cleanup standby dirs
for (int i = 1; i < maxNNCount; i++) {
cluster.shutdownNameNode(i);
Expand Down