Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: File Copy MetaData Exchange Changes #2942

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.github.ambry.protocol;

import com.github.ambry.utils.Utils;
import io.netty.buffer.ByteBuf;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.charset.Charset;


public class FileInfo{
private String fileName;
private long fileSizeInBytes;

private static final int FileName_Field_Size_In_Bytes = 4;

private static final int FileSize_Field_Size_In_Bytes = 8;


public FileInfo(String fileName, long fileSize) {
this.fileName = fileName;
this.fileSizeInBytes = fileSize;
}

public long sizeInBytes() {
return FileName_Field_Size_In_Bytes + fileName.length() + FileSize_Field_Size_In_Bytes;
}
public static FileInfo readFrom(DataInputStream stream) throws IOException {
String fileName = Utils.readIntString(stream);
long fileSize = stream.readLong();
return new FileInfo(fileName, fileSize);
}
public void writeTo(ByteBuf buf) {
Utils.serializeString(buf, fileName, Charset.defaultCharset());
buf.writeLong(fileSizeInBytes);
}

public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("FileInfo[").append("FileName=").append(fileName).append(", FileSizeInBytes=").append(fileSizeInBytes)
.append("]");
return sb.toString();
}

public String getFileName() {
return fileName;
}

public long getFileSizeInBytes() {
return fileSizeInBytes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.github.ambry.protocol;

import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.utils.Utils;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.charset.Charset;


public class FileMetaDataRequest extends RequestOrResponse{
private PartitionId partitionId;
private String hostName;

private static final short File_Metadata_Request_Version_V1 = 1;
private static final int HostName_Field_Size_In_Bytes = 4;

public FileMetaDataRequest(short versionId, int correlationId, String clientId,
PartitionId partitionId, String hostName) {
super(RequestOrResponseType.FileCopyMetaDataRequest, versionId, correlationId, clientId);
if (partitionId == null || hostName.isEmpty()) {
throw new IllegalArgumentException("Partition and Host Name cannot be null");
}
this.partitionId = partitionId;
this.hostName = hostName;
}

public String getHostName() {
return hostName;
}

public PartitionId getPartitionId() {
return partitionId;
}

protected static FileMetaDataRequest readFrom(DataInputStream stream, ClusterMap clusterMap) throws IOException {
Short versionId = stream.readShort();
validateVersion(versionId);
int correlationId = stream.readInt();
String clientId = Utils.readIntString(stream);
String hostName = Utils.readIntString(stream);
PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream);
return new FileMetaDataRequest(versionId, correlationId, clientId, partitionId, hostName);
}

public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("FileMetaDataRequest[").append("PartitionId=").append(partitionId).append(", HostName=").append(hostName)
.append("]");
return sb.toString();
}

public long sizeInBytes() {
return super.sizeInBytes() + HostName_Field_Size_In_Bytes + hostName.length() + partitionId.getBytes().length;
}

protected void prepareBuffer() {
super.prepareBuffer();
Utils.serializeString(bufferToSend, hostName, Charset.defaultCharset());
bufferToSend.writeBytes(partitionId.getBytes());
}

static void validateVersion(short version) {
if (version != File_Metadata_Request_Version_V1) {
throw new IllegalArgumentException("Unknown version for FileMetadataRequest: " + version);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.github.ambry.protocol;

import com.github.ambry.server.ServerErrorCode;
import com.github.ambry.utils.Utils;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;


public class FileMetaDataResponse extends Response {
private int numberOfLogfiles;
private List<LogInfo> logInfoList;

public FileMetaDataResponse(short versionId, int correlationId, String clientId, int numberOfLogfiles,
List<LogInfo> logInfoList, ServerErrorCode errorCode) {
super(RequestOrResponseType.FileCopyMetaDataResponse, versionId, correlationId, clientId, errorCode);
this.numberOfLogfiles = numberOfLogfiles;
this.logInfoList = logInfoList;
}

public static FileMetaDataResponse readFrom(DataInputStream stream) throws IOException {
RequestOrResponseType type = RequestOrResponseType.values()[stream.readShort()];
if (type != RequestOrResponseType.FileCopyMetaDataResponse) {
throw new IllegalArgumentException("The type of request response is not compatible");
}
short versionId = stream.readShort();
int correlationId = stream.readInt();
String clientId = Utils.readIntString(stream);
ServerErrorCode errorCode = ServerErrorCode.values()[stream.readShort()];
int numberOfLogfiles = stream.readInt();
int logInfoListSize = stream.readInt();
List<LogInfo> logInfoList = new ArrayList<>();
for (int i = 0; i < logInfoListSize; i++) {
logInfoList.add(LogInfo.readFrom(stream));
}
return new FileMetaDataResponse(versionId, correlationId, clientId, numberOfLogfiles, logInfoList, errorCode);
}
protected void prepareBuffer() {
super.prepareBuffer();
bufferToSend.writeInt(numberOfLogfiles);
bufferToSend.writeInt(logInfoList.size());
for (LogInfo logInfo : logInfoList) {
logInfo.writeTo(bufferToSend);
}
}

public long sizeInBytes() {
return super.sizeInBytes() + Integer.BYTES + Integer.BYTES + logInfoList.stream().mapToLong(LogInfo::sizeInBytes).sum();
}

public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("FileMetaDataResponse[NumberOfLogfiles=").append(numberOfLogfiles).append(", logInfoList").append(logInfoList.toString()).append("]");
return sb.toString();
}

public int getNumberOfLogfiles() {
return numberOfLogfiles;
}

public List<LogInfo> getLogInfoList() {
return logInfoList;
}
}
103 changes: 103 additions & 0 deletions ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.github.ambry.protocol;

import com.github.ambry.utils.Utils;
import io.netty.buffer.ByteBuf;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;


public class LogInfo {
private String fileName;
private long fileSizeInBytes;
List<FileInfo> listOfIndexFiles;
List<FileInfo> listOfBloomFilters;

private static final int FileName_Field_Size_In_Bytes = 4;
private static final int FileSize_Field_Size_In_Bytes = 8;

private static final int ListSize_In_Bytes = 4;
public LogInfo(String fileName, long fileSizeInBytes, List<FileInfo> listOfIndexFiles, List<FileInfo> listOfBloomFilters) {
this.fileName = fileName;
this.fileSizeInBytes = fileSizeInBytes;
this.listOfIndexFiles = listOfIndexFiles;
this.listOfBloomFilters = listOfBloomFilters;
}

public String getFileName() {
return fileName;
}

public long getFileSizeInBytes() {
return fileSizeInBytes;
}

public List<FileInfo> getListOfBloomFilters() {
return listOfBloomFilters;
}

public List<FileInfo> getListOfIndexFiles() {
return listOfIndexFiles;
}

public long sizeInBytes() {
long size = FileName_Field_Size_In_Bytes + fileName.length() + FileSize_Field_Size_In_Bytes + ListSize_In_Bytes;
for (FileInfo fileInfo : listOfIndexFiles) {
size += fileInfo.sizeInBytes();
}
size += ListSize_In_Bytes;
for (FileInfo fileInfo : listOfBloomFilters) {
size += fileInfo.sizeInBytes();
}
return size;
}

public static LogInfo readFrom(DataInputStream stream) throws IOException {
String fileName = Utils.readIntString(stream );
long fileSize = stream.readLong();
List<FileInfo> listOfIndexFiles = new ArrayList<>();
List<FileInfo> listOfBloomFilters = new ArrayList<>();

int indexFilesCount = stream.readInt();
for (int i = 0; i < indexFilesCount; i++) {
listOfIndexFiles.add(FileInfo.readFrom(stream));
}

int bloomFiltersCount = stream.readInt();
for(int i= 0;i< bloomFiltersCount; i++){
listOfBloomFilters.add(FileInfo.readFrom(stream));
}
return new LogInfo(fileName, fileSize, listOfIndexFiles, listOfBloomFilters);
}

public void writeTo(ByteBuf buf){
Utils.serializeString(buf, fileName, Charset.defaultCharset());
buf.writeLong(fileSizeInBytes);
buf.writeInt(listOfIndexFiles.size());
for(FileInfo fileInfo : listOfIndexFiles){
fileInfo.writeTo(buf);
}
buf.writeInt(listOfBloomFilters.size());
for(FileInfo fileInfo: listOfBloomFilters){
fileInfo.writeTo(buf);
}
}

public String toString(){
StringBuilder sb = new StringBuilder();
sb.append("LogInfo[");
sb.append("FileName=").append(fileName).append(", FileSizeInBytes=").append(fileSizeInBytes).append(",");
for(FileInfo fileInfo : listOfIndexFiles) {
sb.append(fileInfo.toString());
}
for(FileInfo fileInfo: listOfBloomFilters){
sb.append(fileInfo.toString());
}
sb.append("]");
return sb.toString();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void writeTo(ByteBuf buf) {

public long sizeInBytes() {
long size = HostName_Field_Size_In_Bytes + hostName.getBytes().length + ReplicaPath_Field_Size_In_Bytes
+ replicaPath.getBytes().length + +partitionId.getBytes().length + token.toBytes().length;
+ replicaPath.getBytes().length + partitionId.getBytes().length + token.toBytes().length;
if (requestVersion == ReplicaMetadataRequest.Replica_Metadata_Request_Version_V2) {
size += ReplicaType_Size_In_Bytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,8 @@ public enum RequestOrResponseType {
PurgeRequest,
PurgeResponse,
BatchDeleteRequest,
BatchDeleteResponse
BatchDeleteResponse,

FileCopyMetaDataRequest,
FileCopyMetaDataResponse
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.ReplicaType;
import com.github.ambry.commons.BlobId;
import com.github.ambry.protocol.FileMetaDataRequest;
import com.github.ambry.commons.BlobIdFactory;
import com.github.ambry.commons.CommonTestUtils;
import com.github.ambry.messageformat.BlobProperties;
Expand Down Expand Up @@ -48,6 +49,7 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
Expand Down Expand Up @@ -672,6 +674,37 @@
MessageInfoAndMetadataListSerde.AUTO_VERSION = oldMessageInfoVersion;
}


@Test
public void doFileMetaDataRequestTest() throws IOException {
MockClusterMap clusterMap = new MockClusterMap();
short requestVersionToUse = 1;
FileMetaDataRequest request = new FileMetaDataRequest(requestVersionToUse, 111, "id1", new MockPartitionId(), "host3");
DataInputStream requestStream = serAndPrepForRead(request, -1, true);
FileMetaDataRequest fileMetadataRequestFromBytes = FileMetaDataRequest.readFrom(requestStream, new MockClusterMap());
Assert.assertEquals(fileMetadataRequestFromBytes.getHostName(), "host3");
Assert.assertEquals(fileMetadataRequestFromBytes.getPartitionId().getId(), 0l);
Assert.assertEquals(fileMetadataRequestFromBytes.getPartitionId().toPathString(), "0");


List<LogInfo> logInfoList = new ArrayList<>();
List<FileInfo> indexInfoList = new ArrayList<>();
indexInfoList.add(new FileInfo("0_8_index", 100));
List<FileInfo> bloomInfolist = new ArrayList<>();
bloomInfolist.add(new FileInfo("0_1_bloom" ,50));

FileInfo fileInfo = new FileInfo("0_log", 20);
fileInfo.writeTo(ByteBuffer.allocate((int) fileInfo.sizeInBytes()));

Check failure on line 697 in ambry-protocol/src/test/java/com/github/ambry/protocol/RequestResponseTest.java

View workflow job for this annotation

GitHub Actions / int-test

[Task :ambry-protocol:compileTestJava FAILED] incompatible types: ByteBuffer cannot be converted to ByteBuf fileInfo.writeTo(ByteBuffer.allocate((int) fileInfo.sizeInBytes())); ^
// logInfoList.add(new LogInfo("0_log", 20, indexInfoList, bloomInfolist));
// FileMetaDataResponse response = new FileMetaDataResponse(requestVersionToUse, 111, "id2", 4, logInfoList, ServerErrorCode.No_Error);
// requestStream = serAndPrepForRead(response, -1, false);
// FileMetaDataResponse fileMetaDataResponse = FileMetaDataResponse.readFrom(requestStream);
// fileMetaDataResponse.getNumberOfLgogfiles();

FileInfo fileInfo = new FileInfo("g");

Check failure on line 704 in ambry-protocol/src/test/java/com/github/ambry/protocol/RequestResponseTest.java

View workflow job for this annotation

GitHub Actions / int-test

[Task :ambry-protocol:compileTestJava FAILED] variable fileInfo is already defined in method doFileMetaDataRequestTest() FileInfo fileInfo = new FileInfo("g"); ^

Check failure on line 704 in ambry-protocol/src/test/java/com/github/ambry/protocol/RequestResponseTest.java

View workflow job for this annotation

GitHub Actions / int-test

[Task :ambry-protocol:compileTestJava FAILED] constructor FileInfo in class FileInfo cannot be applied to given types; FileInfo fileInfo = new FileInfo("g"); ^ required: String,long
request.release();
}

private void doReplicaMetadataRequestTest(short responseVersionToUse, short requestVersionToUse,
short messageInfoToUse, ReplicaType replicaType) throws IOException {
MessageInfoAndMetadataListSerde.AUTO_VERSION = messageInfoToUse;
Expand Down
Loading
Loading