Skip to content

Commit 765f72e

Browse files
committed
HDFS-16757. Add a new method copyBlockCrossNamespace to DataNode
1 parent c33d868 commit 765f72e

File tree

22 files changed

+566
-63
lines changed

22 files changed

+566
-63
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT;
2929
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY;
3030

31+
import java.io.DataInputStream;
3132
import java.io.DataOutputStream;
3233
import java.io.FileNotFoundException;
3334
import java.io.IOException;
@@ -1956,6 +1957,26 @@ protected IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
19561957
socketFactory, getConf().isConnectToDnViaHostname(), this, blockToken);
19571958
}
19581959

1960+
protected void copyBlockCrossNamespace(ExtendedBlock sourceBlk,
1961+
Token<BlockTokenIdentifier> sourceBlockToken, DatanodeInfo sourceDatanode,
1962+
ExtendedBlock targetBlk, Token<BlockTokenIdentifier> targetBlockToken,
1963+
DatanodeInfo targetDatanode) throws IOException {
1964+
IOStreamPair pair =
1965+
DFSUtilClient.connectToDN(sourceDatanode, getConf().getSocketTimeout(), conf, saslClient,
1966+
socketFactory, getConf().isConnectToDnViaHostname(), this, sourceBlockToken);
1967+
1968+
new Sender((DataOutputStream) pair.out).copyBlockCrossNamespace(sourceBlk, sourceBlockToken,
1969+
targetBlk, targetBlockToken, targetDatanode);
1970+
1971+
pair.out.flush();
1972+
1973+
DataInputStream reply = new DataInputStream(pair.in);
1974+
BlockOpResponseProto proto = BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(reply));
1975+
DataTransferProtoUtil.checkBlockOpStatus(proto,
1976+
"copyBlockCrossNamespace " + sourceBlk + " to " + targetBlk + " from " + sourceDatanode
1977+
+ " to " + targetDatanode);
1978+
}
1979+
19591980
/**
19601981
* Infer the checksum type for a replica by sending an OP_READ_BLOCK
19611982
* for the first byte of that replica. This is used for compatibility

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ public class DFSOutputStream extends FSOutputSummer
131131
private FileEncryptionInfo fileEncryptionInfo;
132132
private int writePacketSize;
133133
private boolean leaseRecovered = false;
134+
private ExtendedBlock userAssignmentLastBlock;
134135

135136
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
136137
protected DFSPacket createPacket(int packetSize, int chunksPerPkt,
@@ -949,6 +950,9 @@ protected void recoverLease(boolean recoverLeaseOnCloseException) {
949950
void completeFile() throws IOException {
950951
// get last block before destroying the streamer
951952
ExtendedBlock lastBlock = getStreamer().getBlock();
953+
if (lastBlock == null) {
954+
lastBlock = getUserAssignmentLastBlock();
955+
}
952956
try (TraceScope ignored = dfsClient.getTracer()
953957
.newScope("DFSOutputStream#completeFile")) {
954958
completeFile(lastBlock);
@@ -1095,6 +1099,14 @@ ExtendedBlock getBlock() {
10951099
return getStreamer().getBlock();
10961100
}
10971101

1102+
public ExtendedBlock getUserAssignmentLastBlock() {
1103+
return userAssignmentLastBlock;
1104+
}
1105+
1106+
public void setUserAssignmentLastBlock(ExtendedBlock userAssignmentLastBlock) {
1107+
this.userAssignmentLastBlock = userAssignmentLastBlock;
1108+
}
1109+
10981110
@VisibleForTesting
10991111
public long getFileId() {
11001112
return fileId;
@@ -1199,4 +1211,16 @@ private static long calculateDelayForNextRetry(long previousDelay,
11991211
long maxDelay) {
12001212
return Math.min(previousDelay * 2, maxDelay);
12011213
}
1214+
1215+
public DFSClient getDfsClient() {
1216+
return dfsClient;
1217+
}
1218+
1219+
protected void copyBlockCrossNamespace(ExtendedBlock sourceBlk,
1220+
Token<BlockTokenIdentifier> sourceBlockToken, DatanodeInfo sourceDatanode,
1221+
ExtendedBlock targetBlk, Token<BlockTokenIdentifier> targetBlockToken,
1222+
DatanodeInfo targetDatanode) throws IOException {
1223+
dfsClient.copyBlockCrossNamespace(sourceBlk, sourceBlockToken, sourceDatanode, targetBlk,
1224+
targetBlockToken, targetDatanode);
1225+
}
12021226
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1271,6 +1271,9 @@ protected synchronized void closeImpl() throws IOException {
12711271

12721272
try (TraceScope ignored =
12731273
dfsClient.getTracer().newScope("completeFile")) {
1274+
if (currentBlockGroup == null) {
1275+
currentBlockGroup = getUserAssignmentLastBlock();
1276+
}
12741277
completeFile(currentBlockGroup);
12751278
}
12761279
logCorruptBlocks();

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -600,9 +600,10 @@ public FSDataOutputStream next(final FileSystem fs, final Path p)
600600
* inherited policy.
601601
*
602602
*/
603-
private HdfsDataOutputStream create(final Path f,
604-
final FsPermission permission, final EnumSet<CreateFlag> flag,
605-
final int bufferSize, final short replication, final long blockSize,
603+
public HdfsDataOutputStream create(
604+
final Path f, final FsPermission permission,
605+
final EnumSet<CreateFlag> flag, final int bufferSize,
606+
final short replication, final long blockSize,
606607
final Progressable progress, final ChecksumOpt checksumOpt,
607608
final InetSocketAddress[] favoredNodes, final String ecPolicyName,
608609
final String storagePolicy)

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,4 +238,18 @@ void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
238238
Token<BlockTokenIdentifier> blockToken,
239239
long requestedNumBytes,
240240
BlockChecksumOptions blockChecksumOptions) throws IOException;
241+
242+
/**
243+
* Copy a block cross Namespace.
244+
* It is used for fastcopy.
245+
*
246+
* @param sourceBlk the block being copied.
247+
* @param sourceBlockToken security token for accessing sourceBlk.
248+
* @param targetBlk the block to be writted.
249+
* @param targetBlockToken security token for accessing targetBlk.
250+
* @param targetDatanode the target datnode which sourceBlk will copy to as targetBlk.
251+
*/
252+
void copyBlockCrossNamespace(ExtendedBlock sourceBlk,
253+
Token<BlockTokenIdentifier> sourceBlockToken, ExtendedBlock targetBlk,
254+
Token<BlockTokenIdentifier> targetBlockToken, DatanodeInfo targetDatanode) throws IOException;
241255
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public enum Op {
3939
RELEASE_SHORT_CIRCUIT_FDS((byte)88),
4040
REQUEST_SHORT_CIRCUIT_SHM((byte)89),
4141
BLOCK_GROUP_CHECKSUM((byte)90),
42+
COPY_BLOCK_CROSSNAMESPACE((byte)91),
4243
CUSTOM((byte)127);
4344

4445
/** The code for this operation. */

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
3838
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
3939
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto;
40+
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockCrossNamespaceProto;
4041
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
4142
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
4243
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
@@ -308,4 +309,18 @@ public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
308309

309310
send(out, Op.BLOCK_GROUP_CHECKSUM, proto);
310311
}
312+
313+
@Override
314+
public void copyBlockCrossNamespace(ExtendedBlock sourceBlk,
315+
Token<BlockTokenIdentifier> sourceBlockToken, ExtendedBlock targetBlk,
316+
Token<BlockTokenIdentifier> targetBlockToken, DatanodeInfo targetDatanode)
317+
throws IOException {
318+
OpCopyBlockCrossNamespaceProto proto = OpCopyBlockCrossNamespaceProto.newBuilder()
319+
.setHeader(DataTransferProtoUtil.buildBaseHeader(sourceBlk, sourceBlockToken))
320+
.setTargetBlock(PBHelperClient.convert(targetBlk))
321+
.setTargetToken(PBHelperClient.convert(targetBlockToken))
322+
.setTargetDatanode(PBHelperClient.convert(targetDatanode)).build();
323+
324+
send(out, Op.COPY_BLOCK_CROSSNAMESPACE, proto);
325+
}
311326
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,3 +331,10 @@ message OpBlockChecksumResponseProto {
331331
message OpCustomProto {
332332
required string customId = 1;
333333
}
334+
335+
message OpCopyBlockCrossNamespaceProto {
336+
required BaseHeaderProto header = 1;
337+
required ExtendedBlockProto targetBlock = 2;
338+
required hadoop.common.TokenProto targetToken = 3;
339+
required DatanodeInfoProto targetDatanode = 4;
340+
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
136136
"dfs.datanode.ec.reconstruct.write.bandwidthPerSec";
137137
public static final long DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_DEFAULT =
138138
0; // A value of zero indicates no limit
139+
public static final String DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_KEY =
140+
"dfs.datanode.copy.block.cross.namespace.socket-timeout.ms";
141+
public static final int DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_DEFAULT =
142+
5 * 60 * 1000;
143+
139144
@Deprecated
140145
public static final String DFS_DATANODE_READAHEAD_BYTES_KEY =
141146
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY;

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
3535
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
3636
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto;
37+
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockCrossNamespaceProto;
3738
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
3839
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
3940
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
@@ -133,6 +134,9 @@ protected final void processOp(Op op) throws IOException {
133134
case REQUEST_SHORT_CIRCUIT_SHM:
134135
opRequestShortCircuitShm(in);
135136
break;
137+
case COPY_BLOCK_CROSSNAMESPACE:
138+
opCopyBlockCrossNamespace(in);
139+
break;
136140
default:
137141
throw new IOException("Unknown op " + op + " in data stream");
138142
}
@@ -339,4 +343,21 @@ private void opStripedBlockChecksum(DataInputStream dis) throws IOException {
339343
}
340344
}
341345
}
346+
347+
private void opCopyBlockCrossNamespace(DataInputStream dis) throws IOException {
348+
OpCopyBlockCrossNamespaceProto proto =
349+
OpCopyBlockCrossNamespaceProto.parseFrom(vintPrefixed(dis));
350+
TraceScope traceScope = continueTraceSpan(proto.getHeader(), proto.getClass().getSimpleName());
351+
try {
352+
copyBlockCrossNamespace(PBHelperClient.convert(proto.getHeader().getBlock()),
353+
PBHelperClient.convert(proto.getHeader().getToken()),
354+
PBHelperClient.convert(proto.getTargetBlock()),
355+
PBHelperClient.convert(proto.getTargetToken()),
356+
PBHelperClient.convert(proto.getTargetDatanode()));
357+
} finally {
358+
if (traceScope != null) {
359+
traceScope.close();
360+
}
361+
}
362+
}
342363
}

0 commit comments

Comments
 (0)