Skip to content

Commit aad533d

Browse files
authored
HDFS-14989. Add a 'swapBlockList' operation to Namenode. (#1819)
* HDFS-14989. Add a 'swapBlockList' operation to Namenode. * HDFS-14989. Add a 'swapBlockList' operation to Namenode. (Fix checkstyle issues) * HDFS-14989. Address review comments. * HDFS-14989. Swap storage policy ID, check destination file genStamp. * HDFS-14989. Remove unused import. * HDFS-14989. Address review comment. * HDFS-14989. Use FSDirectory.resolveLastINode.
1 parent 839e607 commit aad533d

File tree

5 files changed

+481
-0
lines changed

5 files changed

+481
-0
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@
112112
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
113113
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
114114
import org.apache.hadoop.hdfs.server.common.ECTopologyVerifier;
115+
import org.apache.hadoop.hdfs.server.namenode.SwapBlockListOp.SwapBlockListResult;
115116
import org.apache.hadoop.hdfs.server.namenode.metrics.ReplicatedBlocksMBean;
116117
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
117118
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
@@ -8498,5 +8499,35 @@ public void checkErasureCodingSupported(String operationName)
84988499
throw new UnsupportedActionException(operationName + " not supported.");
84998500
}
85008501
}
8502+
8503+
/**
8504+
* Namesystem API to swap block list between source and destination files.
8505+
*
8506+
* @param src source file.
8507+
* @param dst destination file.
8508+
* @throws IOException on Error.
8509+
*/
8510+
boolean swapBlockList(final String src, final String dst, long genTimestamp)
8511+
throws IOException {
8512+
final String operationName = "swapBlockList";
8513+
checkOperation(OperationCategory.WRITE);
8514+
final FSPermissionChecker pc = getPermissionChecker();
8515+
SwapBlockListResult res = null;
8516+
try {
8517+
writeLock();
8518+
try {
8519+
checkOperation(OperationCategory.WRITE);
8520+
checkNameNodeSafeMode("Cannot swap block list." + src + ", " + dst);
8521+
res = SwapBlockListOp.swapBlocks(dir, pc, src, dst, genTimestamp);
8522+
} finally {
8523+
writeUnlock(operationName);
8524+
}
8525+
} catch (AccessControlException e) {
8526+
logAuditEvent(false, operationName, src, dst, null);
8527+
throw e;
8528+
}
8529+
logAuditEvent(true, operationName, src, dst, res.getDstFileAuditStat());
8530+
return res.isSuccess();
8531+
}
85018532
}
85028533

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ static byte getStoragePolicyID(long header) {
163163
return (byte)STORAGE_POLICY_ID.BITS.retrieve(header);
164164
}
165165

166+
static byte getBlockLayoutPolicy(long header) {
167+
return (byte)BLOCK_LAYOUT_AND_REDUNDANCY.BITS.retrieve(header);
168+
}
169+
166170
// Union of all the block type masks. Currently there is only
167171
// BLOCK_TYPE_MASK_STRIPED
168172
static final long BLOCK_TYPE_MASK = 1 << 11;
@@ -728,6 +732,17 @@ public void clearBlocks() {
728732
this.blocks = BlockInfo.EMPTY_ARRAY;
729733
}
730734

735+
/**
736+
* This method replaces blocks in a file with the supplied blocks.
737+
* @param newBlocks List of new blocks.
738+
*/
739+
void replaceBlocks(BlockInfo[] newBlocks) {
740+
this.blocks = Arrays.copyOf(newBlocks, newBlocks.length);
741+
for (BlockInfo block : blocks) {
742+
block.setBlockCollectionId(getId());
743+
}
744+
}
745+
731746
private void updateRemovedUnderConstructionFiles(
732747
ReclaimContext reclaimContext) {
733748
if (isUnderConstruction() && reclaimContext.removedUCFiles != null) {
@@ -1257,4 +1272,17 @@ boolean isBlockInLatestSnapshot(BlockInfo block) {
12571272
return snapshotBlocks != null &&
12581273
Arrays.asList(snapshotBlocks).contains(block);
12591274
}
1275+
1276+
/**
1277+
* Update Header with new Block Layout and Redundancy bits.
1278+
* @param newBlockLayoutPolicy new block layout policy.
1279+
* @param newStoragePolicy new storage policy ID.
1280+
*/
1281+
void updateHeaderWithNewPolicy(byte newBlockLayoutPolicy,
1282+
byte newStoragePolicy) {
1283+
this.header = HeaderFormat.toLong(
1284+
HeaderFormat.getPreferredBlockSize(header),
1285+
newBlockLayoutPolicy,
1286+
newStoragePolicy);
1287+
}
12601288
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2667,4 +2667,14 @@ public Long getNextSPSPath() throws IOException {
26672667
}
26682668
return namesystem.getBlockManager().getSPSManager().getNextPathId();
26692669
}
2670+
2671+
public boolean swapBlockList(String src, String dst, long maxTimestamp)
2672+
throws IOException {
2673+
checkNNStartup();
2674+
if (stateChangeLog.isDebugEnabled()) {
2675+
stateChangeLog.debug("*DIR* NameNode.swapBlockList: {} and {}", src, dst);
2676+
}
2677+
return namesystem.swapBlockList(src, dst, maxTimestamp);
2678+
}
2679+
26702680
}
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdfs.server.namenode;
20+
21+
import java.io.IOException;
22+
23+
import org.apache.hadoop.fs.FileAlreadyExistsException;
24+
import org.apache.hadoop.fs.FileStatus;
25+
import org.apache.hadoop.fs.permission.FsAction;
26+
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
27+
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
28+
import org.apache.hadoop.hdfs.server.namenode.INodeFile.HeaderFormat;
29+
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
30+
import org.apache.hadoop.util.Time;
31+
32+
/**
33+
* Class to carry out the operation of swapping blocks from one file to another.
34+
* Along with swapping blocks, we can also optionally swap the block layout
35+
* of a file header, which is useful for client operations like converting
36+
* replicated to EC file.
37+
*/
38+
public final class SwapBlockListOp {
39+
40+
private SwapBlockListOp() {
41+
}
42+
43+
static SwapBlockListResult swapBlocks(FSDirectory fsd, FSPermissionChecker pc,
44+
String src, String dst, long genTimestamp)
45+
throws IOException {
46+
47+
final INodesInPath srcIIP = fsd.resolvePath(pc, src, DirOp.WRITE);
48+
final INodesInPath dstIIP = fsd.resolvePath(pc, dst, DirOp.WRITE);
49+
if (fsd.isPermissionEnabled()) {
50+
fsd.checkAncestorAccess(pc, srcIIP, FsAction.WRITE);
51+
fsd.checkAncestorAccess(pc, dstIIP, FsAction.WRITE);
52+
}
53+
if (NameNode.stateChangeLog.isDebugEnabled()) {
54+
NameNode.stateChangeLog.debug("DIR* FSDirectory.swapBlockList: "
55+
+ srcIIP.getPath() + " and " + dstIIP.getPath());
56+
}
57+
SwapBlockListResult result;
58+
fsd.writeLock();
59+
try {
60+
result = swapBlockList(fsd, srcIIP, dstIIP, genTimestamp);
61+
} finally {
62+
fsd.writeUnlock();
63+
}
64+
return result;
65+
}
66+
67+
private static SwapBlockListResult swapBlockList(FSDirectory fsd,
68+
final INodesInPath srcIIP,
69+
final INodesInPath dstIIP,
70+
long genTimestamp)
71+
throws IOException {
72+
73+
assert fsd.hasWriteLock();
74+
validateInode(srcIIP);
75+
validateInode(dstIIP);
76+
fsd.ezManager.checkMoveValidity(srcIIP, dstIIP);
77+
78+
final String src = srcIIP.getPath();
79+
final String dst = dstIIP.getPath();
80+
if (dst.equals(src)) {
81+
throw new FileAlreadyExistsException("The source " + src +
82+
" and destination " + dst + " are the same");
83+
}
84+
85+
INodeFile srcINodeFile = srcIIP.getLastINode().asFile();
86+
INodeFile dstINodeFile = dstIIP.getLastINode().asFile();
87+
88+
String errorPrefix = "DIR* FSDirectory.swapBlockList: ";
89+
String error = "Swap Block List destination file ";
90+
BlockInfo lastBlock = dstINodeFile.getLastBlock();
91+
if (lastBlock != null && lastBlock.getGenerationStamp() != genTimestamp) {
92+
error += dstIIP.getPath() +
93+
" has last block with different gen timestamp.";
94+
NameNode.stateChangeLog.warn(errorPrefix + error);
95+
throw new IOException(error);
96+
}
97+
98+
long mtime = Time.now();
99+
BlockInfo[] dstINodeFileBlocks = dstINodeFile.getBlocks();
100+
dstINodeFile.replaceBlocks(srcINodeFile.getBlocks());
101+
srcINodeFile.replaceBlocks(dstINodeFileBlocks);
102+
103+
long srcHeader = srcINodeFile.getHeaderLong();
104+
long dstHeader = dstINodeFile.getHeaderLong();
105+
106+
byte dstBlockLayoutPolicy =
107+
HeaderFormat.getBlockLayoutPolicy(dstHeader);
108+
byte srcBlockLayoutPolicy =
109+
HeaderFormat.getBlockLayoutPolicy(srcHeader);
110+
111+
byte dstStoragePolicyID = HeaderFormat.getStoragePolicyID(dstHeader);
112+
byte srcStoragePolicyID = HeaderFormat.getStoragePolicyID(srcHeader);
113+
114+
dstINodeFile.updateHeaderWithNewPolicy(srcBlockLayoutPolicy,
115+
srcStoragePolicyID);
116+
dstINodeFile.setModificationTime(mtime);
117+
118+
srcINodeFile.updateHeaderWithNewPolicy(dstBlockLayoutPolicy,
119+
dstStoragePolicyID);
120+
srcINodeFile.setModificationTime(mtime);
121+
122+
return new SwapBlockListResult(true,
123+
fsd.getAuditFileInfo(srcIIP),
124+
fsd.getAuditFileInfo(dstIIP));
125+
}
126+
127+
private static void validateInode(INodesInPath srcIIP)
128+
throws IOException {
129+
130+
String errorPrefix = "DIR* FSDirectory.swapBlockList: ";
131+
String error = "Swap Block List input ";
132+
133+
INode srcInode = FSDirectory.resolveLastINode(srcIIP);
134+
135+
// Check if INode is a file and NOT a directory.
136+
if (!srcInode.isFile()) {
137+
error += srcIIP.getPath() + " is not a file.";
138+
NameNode.stateChangeLog.warn(errorPrefix + error);
139+
throw new IOException(error);
140+
}
141+
142+
// Check if file is under construction.
143+
INodeFile iNodeFile = (INodeFile) srcIIP.getLastINode();
144+
if (iNodeFile.isUnderConstruction()) {
145+
error += srcIIP.getPath() + " is under construction.";
146+
NameNode.stateChangeLog.warn(errorPrefix + error);
147+
throw new IOException(error);
148+
}
149+
150+
// Check if any parent directory is in a snapshot.
151+
if (srcIIP.getLatestSnapshotId() != Snapshot.CURRENT_STATE_ID) {
152+
error += srcIIP.getPath() + " is in a snapshot directory.";
153+
NameNode.stateChangeLog.warn(errorPrefix + error);
154+
throw new IOException(error);
155+
}
156+
}
157+
158+
static class SwapBlockListResult {
159+
private final boolean success;
160+
private final FileStatus srcFileAuditStat;
161+
private final FileStatus dstFileAuditStat;
162+
163+
SwapBlockListResult(boolean success,
164+
FileStatus srcFileAuditStat,
165+
FileStatus dstFileAuditStat) {
166+
this.success = success;
167+
this.srcFileAuditStat = srcFileAuditStat;
168+
this.dstFileAuditStat = dstFileAuditStat;
169+
}
170+
171+
public boolean isSuccess() {
172+
return success;
173+
}
174+
175+
public FileStatus getDstFileAuditStat() {
176+
return dstFileAuditStat;
177+
}
178+
179+
public FileStatus getSrcFileAuditStat() {
180+
return srcFileAuditStat;
181+
}
182+
}
183+
}

0 commit comments

Comments
 (0)