-
Notifications
You must be signed in to change notification settings - Fork 8.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
HDDS-2169. Avoid buffer copies while submitting client requests in Ra…
…tis. Contributed by Tsz-wo Sze(#1517).
- Loading branch information
1 parent
55c5436
commit 022fe5f
Showing
7 changed files
with
294 additions
and
39 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
107 changes: 107 additions & 0 deletions
107
...dds/common/src/main/java/org/apache/hadoop/hdds/ratis/ContainerCommandRequestMessage.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.hadoop.hdds.ratis; | ||
|
||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; | ||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto; | ||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; | ||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; | ||
import org.apache.ratis.protocol.Message; | ||
import org.apache.ratis.protocol.RaftGroupId; | ||
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; | ||
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; | ||
import org.apache.ratis.util.JavaUtils; | ||
|
||
import java.util.Objects; | ||
import java.util.function.Supplier; | ||
|
||
/** | ||
* Implementing the {@link Message} interface | ||
* for {@link ContainerCommandRequestProto}. | ||
*/ | ||
public final class ContainerCommandRequestMessage implements Message { | ||
public static ContainerCommandRequestMessage toMessage( | ||
ContainerCommandRequestProto request, String traceId) { | ||
final ContainerCommandRequestProto.Builder b | ||
= ContainerCommandRequestProto.newBuilder(request); | ||
if (traceId != null) { | ||
b.setTraceID(traceId); | ||
} | ||
|
||
ByteString data = ByteString.EMPTY; | ||
if (request.getCmdType() == Type.WriteChunk) { | ||
final WriteChunkRequestProto w = request.getWriteChunk(); | ||
data = w.getData(); | ||
b.setWriteChunk(w.toBuilder().clearData()); | ||
} else if (request.getCmdType() == Type.PutSmallFile) { | ||
final PutSmallFileRequestProto p = request.getPutSmallFile(); | ||
data = p.getData(); | ||
b.setPutSmallFile(p.toBuilder().setData(ByteString.EMPTY)); | ||
} | ||
return new ContainerCommandRequestMessage(b.build(), data); | ||
} | ||
|
||
public static ContainerCommandRequestProto toProto( | ||
ByteString bytes, RaftGroupId groupId) | ||
throws InvalidProtocolBufferException { | ||
final int i = 4 + bytes.asReadOnlyByteBuffer().getInt(); | ||
final ContainerCommandRequestProto header | ||
= ContainerCommandRequestProto.parseFrom(bytes.substring(4, i)); | ||
// TODO: setting pipeline id can be avoided if the client is sending it. | ||
// In such case, just have to validate the pipeline id. | ||
final ContainerCommandRequestProto.Builder b = header.toBuilder(); | ||
if (groupId != null) { | ||
b.setPipelineID(groupId.getUuid().toString()); | ||
} | ||
final ByteString data = bytes.substring(i); | ||
if (header.getCmdType() == Type.WriteChunk) { | ||
b.setWriteChunk(b.getWriteChunkBuilder().setData(data)); | ||
} else if (header.getCmdType() == Type.PutSmallFile) { | ||
b.setPutSmallFile(b.getPutSmallFileBuilder().setData(data)); | ||
} | ||
return b.build(); | ||
} | ||
|
||
private final ContainerCommandRequestProto header; | ||
private final ByteString data; | ||
private final Supplier<ByteString> contentSupplier | ||
= JavaUtils.memoize(this::buildContent); | ||
|
||
private ContainerCommandRequestMessage( | ||
ContainerCommandRequestProto header, ByteString data) { | ||
this.header = Objects.requireNonNull(header, "header == null"); | ||
this.data = Objects.requireNonNull(data, "data == null"); | ||
} | ||
|
||
private ByteString buildContent() { | ||
final ByteString headerBytes = header.toByteString(); | ||
return RatisHelper.int2ByteString(headerBytes.size()) | ||
.concat(headerBytes) | ||
.concat(data); | ||
} | ||
|
||
@Override | ||
public ByteString getContent() { | ||
return contentSupplier.get(); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return header + ", data.size=" + data.size(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
152 changes: 152 additions & 0 deletions
152
...common/src/test/java/org/apache/hadoop/hdds/ratis/TestContainerCommandRequestMessage.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.hadoop.hdds.ratis; | ||
|
||
import org.apache.hadoop.hdds.client.BlockID; | ||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; | ||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; | ||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; | ||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; | ||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto; | ||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto; | ||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; | ||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; | ||
import org.apache.hadoop.ozone.common.Checksum; | ||
import org.apache.hadoop.ozone.common.ChecksumData; | ||
import org.apache.hadoop.ozone.common.OzoneChecksumException; | ||
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; | ||
import org.junit.Assert; | ||
import org.junit.Test; | ||
|
||
import java.util.Random; | ||
import java.util.UUID; | ||
import java.util.function.BiFunction; | ||
|
||
/** Testing {@link ContainerCommandRequestMessage}. */ | ||
public class TestContainerCommandRequestMessage { | ||
static final Random RANDOM = new Random(); | ||
|
||
static ByteString newData(int length, Random random) { | ||
final ByteString.Output out = ByteString.newOutput(); | ||
for(int i = 0; i < length; i++) { | ||
out.write(random.nextInt()); | ||
} | ||
return out.toByteString(); | ||
} | ||
|
||
static ChecksumData checksum(ByteString data) { | ||
try { | ||
return new Checksum().computeChecksum(data.toByteArray()); | ||
} catch (OzoneChecksumException e) { | ||
throw new IllegalStateException(e); | ||
} | ||
} | ||
|
||
static ContainerCommandRequestProto newPutSmallFile( | ||
BlockID blockID, ByteString data) { | ||
final BlockData.Builder blockData | ||
= BlockData.newBuilder() | ||
.setBlockID(blockID.getDatanodeBlockIDProtobuf()); | ||
final PutBlockRequestProto.Builder putBlockRequest | ||
= PutBlockRequestProto.newBuilder() | ||
.setBlockData(blockData); | ||
final KeyValue keyValue = KeyValue.newBuilder() | ||
.setKey("OverWriteRequested") | ||
.setValue("true") | ||
.build(); | ||
final ChunkInfo chunk = ChunkInfo.newBuilder() | ||
.setChunkName(blockID.getLocalID() + "_chunk") | ||
.setOffset(0) | ||
.setLen(data.size()) | ||
.addMetadata(keyValue) | ||
.setChecksumData(checksum(data).getProtoBufMessage()) | ||
.build(); | ||
final PutSmallFileRequestProto putSmallFileRequest | ||
= PutSmallFileRequestProto.newBuilder() | ||
.setChunkInfo(chunk) | ||
.setBlock(putBlockRequest) | ||
.setData(data) | ||
.build(); | ||
return ContainerCommandRequestProto.newBuilder() | ||
.setCmdType(Type.PutSmallFile) | ||
.setContainerID(blockID.getContainerID()) | ||
.setDatanodeUuid(UUID.randomUUID().toString()) | ||
.setPutSmallFile(putSmallFileRequest) | ||
.build(); | ||
} | ||
|
||
static ContainerCommandRequestProto newWriteChunk( | ||
BlockID blockID, ByteString data) { | ||
final ChunkInfo chunk = ChunkInfo.newBuilder() | ||
.setChunkName(blockID.getLocalID() + "_chunk_" + 1) | ||
.setOffset(0) | ||
.setLen(data.size()) | ||
.setChecksumData(checksum(data).getProtoBufMessage()) | ||
.build(); | ||
|
||
final WriteChunkRequestProto.Builder writeChunkRequest | ||
= WriteChunkRequestProto.newBuilder() | ||
.setBlockID(blockID.getDatanodeBlockIDProtobuf()) | ||
.setChunkData(chunk) | ||
.setData(data); | ||
return ContainerCommandRequestProto.newBuilder() | ||
.setCmdType(Type.WriteChunk) | ||
.setContainerID(blockID.getContainerID()) | ||
.setDatanodeUuid(UUID.randomUUID().toString()) | ||
.setWriteChunk(writeChunkRequest) | ||
.build(); | ||
} | ||
|
||
@Test | ||
public void testPutSmallFile() throws Exception { | ||
runTest(TestContainerCommandRequestMessage::newPutSmallFile); | ||
} | ||
|
||
@Test | ||
public void testWriteChunk() throws Exception { | ||
runTest(TestContainerCommandRequestMessage::newWriteChunk); | ||
} | ||
|
||
static void runTest( | ||
BiFunction<BlockID, ByteString, ContainerCommandRequestProto> method) | ||
throws Exception { | ||
for(int i = 0; i < 2; i++) { | ||
runTest(i, method); | ||
} | ||
for(int i = 2; i < 1 << 10;) { | ||
runTest(i + 1 + RANDOM.nextInt(i - 1), method); | ||
i <<= 1; | ||
runTest(i, method); | ||
} | ||
} | ||
|
||
static void runTest(int length, | ||
BiFunction<BlockID, ByteString, ContainerCommandRequestProto> method) | ||
throws Exception { | ||
System.out.println("length=" + length); | ||
final BlockID blockID = new BlockID(RANDOM.nextLong(), RANDOM.nextLong()); | ||
final ByteString data = newData(length, RANDOM); | ||
|
||
final ContainerCommandRequestProto original = method.apply(blockID, data); | ||
final ContainerCommandRequestMessage message | ||
= ContainerCommandRequestMessage.toMessage(original, null); | ||
final ContainerCommandRequestProto computed | ||
= ContainerCommandRequestMessage.toProto(message.getContent(), null); | ||
Assert.assertEquals(original, computed); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.