From 031b655a291d3f417d5e0da0cec5d1e4fec7853a Mon Sep 17 00:00:00 2001 From: Arun Sai Bhima Date: Thu, 27 Jun 2024 10:23:22 -0700 Subject: [PATCH] Add new version of DeleteRequest format (#2814) This version contains flag to indicate if we need to insert a delete tombstone even if blob is missing on servers --- .../github/ambry/protocol/DeleteRequest.java | 62 ++++++++++++++++--- .../ambry/protocol/RequestResponseTest.java | 2 +- 2 files changed, 53 insertions(+), 11 deletions(-) diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/DeleteRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/DeleteRequest.java index 11ab240621..aff29d8b1d 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/DeleteRequest.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/DeleteRequest.java @@ -18,8 +18,6 @@ import com.github.ambry.utils.Utils; import java.io.DataInputStream; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.WritableByteChannel; /** @@ -28,34 +26,57 @@ public class DeleteRequest extends RequestOrResponse { private final BlobId blobId; private final long deletionTimeInMs; + private final boolean isForceDelete; static final short DELETE_REQUEST_VERSION_1 = 1; static final short DELETE_REQUEST_VERSION_2 = 2; + static final short DELETE_REQUEST_VERSION_3 = 3; + // TODO Update CURRENT_VERSION to 3 after the change is rolled out on all servers private final static short CURRENT_VERSION = DELETE_REQUEST_VERSION_2; protected static final int DELETION_TIME_FIELD_SIZE_IN_BYTES = Long.BYTES; + protected static final int FORCE_DELETE_FLAG_SIZE = 1; /** - * Constructs {@link DeleteRequest} in {@link #DELETE_REQUEST_VERSION_2} + * Constructs {@link DeleteRequest} in current version * @param correlationId correlationId of the delete request * @param clientId clientId of the delete request * @param blobId blobId of the delete request * @param deletionTimeInMs deletion time of the blob in ms + * @param isForceDelete {@code true} if we should insert a Delete tombstone even if blob is missing on server. + * Else {@code false}. */ - public DeleteRequest(int correlationId, String clientId, BlobId blobId, long deletionTimeInMs) { - this(correlationId, clientId, blobId, deletionTimeInMs, CURRENT_VERSION); + public DeleteRequest(int correlationId, String clientId, BlobId blobId, long deletionTimeInMs, + boolean isForceDelete) { + this(correlationId, clientId, blobId, deletionTimeInMs, CURRENT_VERSION, isForceDelete); } /** - * Constructs {@link DeleteRequest} in given version. + * Constructs {@link DeleteRequest} in current version * @param correlationId correlationId of the delete request * @param clientId clientId of the delete request * @param blobId blobId of the delete request * @param deletionTimeInMs deletion time of the blob in ms - * @param version version of the {@link DeleteRequest} */ - protected DeleteRequest(int correlationId, String clientId, BlobId blobId, long deletionTimeInMs, short version) { + public DeleteRequest(int correlationId, String clientId, BlobId blobId, long deletionTimeInMs) { + this(correlationId, clientId, blobId, deletionTimeInMs, false); + } + + /** + * Constructs {@link DeleteRequest} in given version. + * + * @param correlationId correlationId of the delete request + * @param clientId clientId of the delete request + * @param blobId blobId of the delete request + * @param deletionTimeInMs deletion time of the blob in ms + * @param version version of the {@link DeleteRequest} + * @param isForceDelete {@code true} if we should insert a Delete tombstone even if blob is missing on server. + * Else {@code false}. + */ + protected DeleteRequest(int correlationId, String clientId, BlobId blobId, long deletionTimeInMs, short version, + boolean isForceDelete) { super(RequestOrResponseType.DeleteRequest, version, correlationId, clientId); this.blobId = blobId; this.deletionTimeInMs = deletionTimeInMs; + this.isForceDelete = isForceDelete; } public static DeleteRequest readFrom(DataInputStream stream, ClusterMap map) throws IOException { @@ -65,6 +86,8 @@ public static DeleteRequest readFrom(DataInputStream stream, ClusterMap map) thr return DeleteRequest_V1.readFrom(stream, map); case DELETE_REQUEST_VERSION_2: return DeleteRequest_V2.readFrom(stream, map); + case DELETE_REQUEST_VERSION_3: + return DeleteRequest_V3.readFrom(stream, map); default: throw new IllegalStateException("Unknown Delete Request version " + version); } @@ -82,6 +105,9 @@ protected void prepareBuffer() { if (versionId == DELETE_REQUEST_VERSION_2) { bufferToSend.writeLong(deletionTimeInMs); } + if (versionId == DELETE_REQUEST_VERSION_3) { + bufferToSend.writeByte(isForceDelete ? (byte) 1 : (byte) 0); + } } public BlobId getBlobId() { @@ -108,6 +134,10 @@ public long sizeInBytes() { // deletion time sizeInBytes += DELETION_TIME_FIELD_SIZE_IN_BYTES; } + if (versionId == DELETE_REQUEST_VERSION_3) { + // Force delete request flag size + sizeInBytes += FORCE_DELETE_FLAG_SIZE; + } return sizeInBytes; } @@ -122,6 +152,7 @@ public String toString() { sb.append(", ").append("AccountId=").append(blobId.getAccountId()); sb.append(", ").append("ContainerId=").append(blobId.getContainerId()); sb.append(", ").append("DeletionTimeInMs=").append(deletionTimeInMs); + sb.append(", ").append("ForceDeleteFlag=").append(isForceDelete); sb.append("]"); return sb.toString(); } @@ -134,7 +165,7 @@ static DeleteRequest readFrom(DataInputStream stream, ClusterMap map) throws IOE int correlationId = stream.readInt(); String clientId = Utils.readIntString(stream); BlobId id = new BlobId(stream, map); - return new DeleteRequest(correlationId, clientId, id, Utils.Infinite_Time, DELETE_REQUEST_VERSION_1); + return new DeleteRequest(correlationId, clientId, id, Utils.Infinite_Time, DELETE_REQUEST_VERSION_1, false); } } @@ -147,7 +178,18 @@ static DeleteRequest readFrom(DataInputStream stream, ClusterMap map) throws IOE String clientId = Utils.readIntString(stream); BlobId id = new BlobId(stream, map); long deletionTimeInMs = stream.readLong(); - return new DeleteRequest(correlationId, clientId, id, deletionTimeInMs); + return new DeleteRequest(correlationId, clientId, id, deletionTimeInMs, DELETE_REQUEST_VERSION_2, false); + } + } + + private static class DeleteRequest_V3 { + static DeleteRequest readFrom(DataInputStream stream, ClusterMap map) throws IOException { + int correlationId = stream.readInt(); + String clientId = Utils.readIntString(stream); + BlobId id = new BlobId(stream, map); + long deletionTimeInMs = stream.readLong(); + boolean isForceDelete = stream.readByte() == 1; + return new DeleteRequest(correlationId, clientId, id, deletionTimeInMs, DELETE_REQUEST_VERSION_3, isForceDelete); } } } diff --git a/ambry-protocol/src/test/java/com/github/ambry/protocol/RequestResponseTest.java b/ambry-protocol/src/test/java/com/github/ambry/protocol/RequestResponseTest.java index b1b859254b..d5895c4459 100644 --- a/ambry-protocol/src/test/java/com/github/ambry/protocol/RequestResponseTest.java +++ b/ambry-protocol/src/test/java/com/github/ambry/protocol/RequestResponseTest.java @@ -1360,7 +1360,7 @@ private class DeleteRequestV1 extends DeleteRequest { * @param blobId blobId of the delete request */ private DeleteRequestV1(int correlationId, String clientId, BlobId blobId) { - super(correlationId, clientId, blobId, Utils.Infinite_Time, DeleteRequest.DELETE_REQUEST_VERSION_1); + super(correlationId, clientId, blobId, Utils.Infinite_Time, DeleteRequest.DELETE_REQUEST_VERSION_1, false); } }