Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new version of DeleteRequest format #2814

Merged
merged 3 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import com.github.ambry.utils.Utils;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;


/**
Expand All @@ -28,34 +26,57 @@
public class DeleteRequest extends RequestOrResponse {
private final BlobId blobId;
private final long deletionTimeInMs;
private final boolean isForceDelete;
static final short DELETE_REQUEST_VERSION_1 = 1;
static final short DELETE_REQUEST_VERSION_2 = 2;
static final short DELETE_REQUEST_VERSION_3 = 3;
// TODO Update CURRENT_VERSION to 3 after the change is rolled out on all servers
private final static short CURRENT_VERSION = DELETE_REQUEST_VERSION_2;
protected static final int DELETION_TIME_FIELD_SIZE_IN_BYTES = Long.BYTES;
protected static final int FORCE_DELETE_FLAG_SIZE = 1;

/**
* Constructs {@link DeleteRequest} in {@link #DELETE_REQUEST_VERSION_2}
* Constructs {@link DeleteRequest} in current version
* @param correlationId correlationId of the delete request
* @param clientId clientId of the delete request
* @param blobId blobId of the delete request
* @param deletionTimeInMs deletion time of the blob in ms
* @param isForceDelete {@code true} if we should insert a Delete tombstone even if blob is missing on server.
* Else {@code false}.
*/
public DeleteRequest(int correlationId, String clientId, BlobId blobId, long deletionTimeInMs) {
this(correlationId, clientId, blobId, deletionTimeInMs, CURRENT_VERSION);
public DeleteRequest(int correlationId, String clientId, BlobId blobId, long deletionTimeInMs,
boolean isForceDelete) {
this(correlationId, clientId, blobId, deletionTimeInMs, CURRENT_VERSION, isForceDelete);
}

/**
* Constructs {@link DeleteRequest} in given version.
* Constructs {@link DeleteRequest} in current version
* @param correlationId correlationId of the delete request
* @param clientId clientId of the delete request
* @param blobId blobId of the delete request
* @param deletionTimeInMs deletion time of the blob in ms
* @param version version of the {@link DeleteRequest}
*/
protected DeleteRequest(int correlationId, String clientId, BlobId blobId, long deletionTimeInMs, short version) {
public DeleteRequest(int correlationId, String clientId, BlobId blobId, long deletionTimeInMs) {
this(correlationId, clientId, blobId, deletionTimeInMs, false);
}

/**
* Constructs {@link DeleteRequest} in given version.
*
* @param correlationId correlationId of the delete request
* @param clientId clientId of the delete request
* @param blobId blobId of the delete request
* @param deletionTimeInMs deletion time of the blob in ms
* @param version version of the {@link DeleteRequest}
* @param isForceDelete {@code true} if we should insert a Delete tombstone even if blob is missing on server.
* Else {@code false}.
*/
protected DeleteRequest(int correlationId, String clientId, BlobId blobId, long deletionTimeInMs, short version,
boolean isForceDelete) {
super(RequestOrResponseType.DeleteRequest, version, correlationId, clientId);
this.blobId = blobId;
this.deletionTimeInMs = deletionTimeInMs;
this.isForceDelete = isForceDelete;
}

public static DeleteRequest readFrom(DataInputStream stream, ClusterMap map) throws IOException {
Expand All @@ -65,6 +86,8 @@ public static DeleteRequest readFrom(DataInputStream stream, ClusterMap map) thr
return DeleteRequest_V1.readFrom(stream, map);
case DELETE_REQUEST_VERSION_2:
return DeleteRequest_V2.readFrom(stream, map);
case DELETE_REQUEST_VERSION_3:
return DeleteRequest_V3.readFrom(stream, map);
default:
throw new IllegalStateException("Unknown Delete Request version " + version);
}
Expand All @@ -82,6 +105,9 @@ protected void prepareBuffer() {
if (versionId == DELETE_REQUEST_VERSION_2) {
bufferToSend.writeLong(deletionTimeInMs);
}
if (versionId == DELETE_REQUEST_VERSION_3) {
bufferToSend.writeByte(isForceDelete ? (byte) 1 : (byte) 0);
}
}

public BlobId getBlobId() {
Expand All @@ -108,6 +134,10 @@ public long sizeInBytes() {
// deletion time
sizeInBytes += DELETION_TIME_FIELD_SIZE_IN_BYTES;
}
if (versionId == DELETE_REQUEST_VERSION_3) {
// Force delete request flag size
sizeInBytes += FORCE_DELETE_FLAG_SIZE;
}
return sizeInBytes;
}

Expand All @@ -122,6 +152,7 @@ public String toString() {
sb.append(", ").append("AccountId=").append(blobId.getAccountId());
sb.append(", ").append("ContainerId=").append(blobId.getContainerId());
sb.append(", ").append("DeletionTimeInMs=").append(deletionTimeInMs);
sb.append(", ").append("ForceDeleteFlag=").append(isForceDelete);
sb.append("]");
return sb.toString();
}
Expand All @@ -134,7 +165,7 @@ static DeleteRequest readFrom(DataInputStream stream, ClusterMap map) throws IOE
int correlationId = stream.readInt();
String clientId = Utils.readIntString(stream);
BlobId id = new BlobId(stream, map);
return new DeleteRequest(correlationId, clientId, id, Utils.Infinite_Time, DELETE_REQUEST_VERSION_1);
return new DeleteRequest(correlationId, clientId, id, Utils.Infinite_Time, DELETE_REQUEST_VERSION_1, false);
}
}

Expand All @@ -147,7 +178,18 @@ static DeleteRequest readFrom(DataInputStream stream, ClusterMap map) throws IOE
String clientId = Utils.readIntString(stream);
BlobId id = new BlobId(stream, map);
long deletionTimeInMs = stream.readLong();
return new DeleteRequest(correlationId, clientId, id, deletionTimeInMs);
return new DeleteRequest(correlationId, clientId, id, deletionTimeInMs, DELETE_REQUEST_VERSION_2, false);
}
}

private static class DeleteRequest_V3 {
static DeleteRequest readFrom(DataInputStream stream, ClusterMap map) throws IOException {
int correlationId = stream.readInt();
String clientId = Utils.readIntString(stream);
BlobId id = new BlobId(stream, map);
long deletionTimeInMs = stream.readLong();
boolean isForceDelete = stream.readByte() == 1;
return new DeleteRequest(correlationId, clientId, id, deletionTimeInMs, DELETE_REQUEST_VERSION_3, isForceDelete);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,7 @@ private class DeleteRequestV1 extends DeleteRequest {
* @param blobId blobId of the delete request
*/
private DeleteRequestV1(int correlationId, String clientId, BlobId blobId) {
super(correlationId, clientId, blobId, Utils.Infinite_Time, DeleteRequest.DELETE_REQUEST_VERSION_1);
super(correlationId, clientId, blobId, Utils.Infinite_Time, DeleteRequest.DELETE_REQUEST_VERSION_1, false);
}
}

Expand Down
Loading