Skip to content

Commit

Permalink
Add new version of DeleteRequest format (linkedin#2814)
Browse files Browse the repository at this point in the history
This version contains flag to indicate if we need to insert a delete tombstone even if blob is missing on servers
  • Loading branch information
Arun-LinkedIn authored Jun 27, 2024
1 parent 85b0c77 commit 031b655
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import com.github.ambry.utils.Utils;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;


/**
Expand All @@ -28,34 +26,57 @@
public class DeleteRequest extends RequestOrResponse {
private final BlobId blobId;
private final long deletionTimeInMs;
private final boolean isForceDelete;
static final short DELETE_REQUEST_VERSION_1 = 1;
static final short DELETE_REQUEST_VERSION_2 = 2;
static final short DELETE_REQUEST_VERSION_3 = 3;
// TODO Update CURRENT_VERSION to 3 after the change is rolled out on all servers
private final static short CURRENT_VERSION = DELETE_REQUEST_VERSION_2;
protected static final int DELETION_TIME_FIELD_SIZE_IN_BYTES = Long.BYTES;
protected static final int FORCE_DELETE_FLAG_SIZE = 1;

/**
* Constructs {@link DeleteRequest} in {@link #DELETE_REQUEST_VERSION_2}
* Constructs {@link DeleteRequest} in current version
* @param correlationId correlationId of the delete request
* @param clientId clientId of the delete request
* @param blobId blobId of the delete request
* @param deletionTimeInMs deletion time of the blob in ms
* @param isForceDelete {@code true} if we should insert a Delete tombstone even if blob is missing on server.
* Else {@code false}.
*/
public DeleteRequest(int correlationId, String clientId, BlobId blobId, long deletionTimeInMs) {
this(correlationId, clientId, blobId, deletionTimeInMs, CURRENT_VERSION);
public DeleteRequest(int correlationId, String clientId, BlobId blobId, long deletionTimeInMs,
boolean isForceDelete) {
this(correlationId, clientId, blobId, deletionTimeInMs, CURRENT_VERSION, isForceDelete);
}

/**
* Constructs {@link DeleteRequest} in given version.
* Constructs {@link DeleteRequest} in current version
* @param correlationId correlationId of the delete request
* @param clientId clientId of the delete request
* @param blobId blobId of the delete request
* @param deletionTimeInMs deletion time of the blob in ms
* @param version version of the {@link DeleteRequest}
*/
protected DeleteRequest(int correlationId, String clientId, BlobId blobId, long deletionTimeInMs, short version) {
public DeleteRequest(int correlationId, String clientId, BlobId blobId, long deletionTimeInMs) {
this(correlationId, clientId, blobId, deletionTimeInMs, false);
}

/**
* Constructs {@link DeleteRequest} in given version.
*
* @param correlationId correlationId of the delete request
* @param clientId clientId of the delete request
* @param blobId blobId of the delete request
* @param deletionTimeInMs deletion time of the blob in ms
* @param version version of the {@link DeleteRequest}
* @param isForceDelete {@code true} if we should insert a Delete tombstone even if blob is missing on server.
* Else {@code false}.
*/
protected DeleteRequest(int correlationId, String clientId, BlobId blobId, long deletionTimeInMs, short version,
boolean isForceDelete) {
super(RequestOrResponseType.DeleteRequest, version, correlationId, clientId);
this.blobId = blobId;
this.deletionTimeInMs = deletionTimeInMs;
this.isForceDelete = isForceDelete;
}

public static DeleteRequest readFrom(DataInputStream stream, ClusterMap map) throws IOException {
Expand All @@ -65,6 +86,8 @@ public static DeleteRequest readFrom(DataInputStream stream, ClusterMap map) thr
return DeleteRequest_V1.readFrom(stream, map);
case DELETE_REQUEST_VERSION_2:
return DeleteRequest_V2.readFrom(stream, map);
case DELETE_REQUEST_VERSION_3:
return DeleteRequest_V3.readFrom(stream, map);
default:
throw new IllegalStateException("Unknown Delete Request version " + version);
}
Expand All @@ -82,6 +105,9 @@ protected void prepareBuffer() {
if (versionId == DELETE_REQUEST_VERSION_2) {
bufferToSend.writeLong(deletionTimeInMs);
}
if (versionId == DELETE_REQUEST_VERSION_3) {
bufferToSend.writeByte(isForceDelete ? (byte) 1 : (byte) 0);
}
}

public BlobId getBlobId() {
Expand All @@ -108,6 +134,10 @@ public long sizeInBytes() {
// deletion time
sizeInBytes += DELETION_TIME_FIELD_SIZE_IN_BYTES;
}
if (versionId == DELETE_REQUEST_VERSION_3) {
// Force delete request flag size
sizeInBytes += FORCE_DELETE_FLAG_SIZE;
}
return sizeInBytes;
}

Expand All @@ -122,6 +152,7 @@ public String toString() {
sb.append(", ").append("AccountId=").append(blobId.getAccountId());
sb.append(", ").append("ContainerId=").append(blobId.getContainerId());
sb.append(", ").append("DeletionTimeInMs=").append(deletionTimeInMs);
sb.append(", ").append("ForceDeleteFlag=").append(isForceDelete);
sb.append("]");
return sb.toString();
}
Expand All @@ -134,7 +165,7 @@ static DeleteRequest readFrom(DataInputStream stream, ClusterMap map) throws IOE
int correlationId = stream.readInt();
String clientId = Utils.readIntString(stream);
BlobId id = new BlobId(stream, map);
return new DeleteRequest(correlationId, clientId, id, Utils.Infinite_Time, DELETE_REQUEST_VERSION_1);
return new DeleteRequest(correlationId, clientId, id, Utils.Infinite_Time, DELETE_REQUEST_VERSION_1, false);
}
}

Expand All @@ -147,7 +178,18 @@ static DeleteRequest readFrom(DataInputStream stream, ClusterMap map) throws IOE
String clientId = Utils.readIntString(stream);
BlobId id = new BlobId(stream, map);
long deletionTimeInMs = stream.readLong();
return new DeleteRequest(correlationId, clientId, id, deletionTimeInMs);
return new DeleteRequest(correlationId, clientId, id, deletionTimeInMs, DELETE_REQUEST_VERSION_2, false);
}
}

private static class DeleteRequest_V3 {
static DeleteRequest readFrom(DataInputStream stream, ClusterMap map) throws IOException {
int correlationId = stream.readInt();
String clientId = Utils.readIntString(stream);
BlobId id = new BlobId(stream, map);
long deletionTimeInMs = stream.readLong();
boolean isForceDelete = stream.readByte() == 1;
return new DeleteRequest(correlationId, clientId, id, deletionTimeInMs, DELETE_REQUEST_VERSION_3, isForceDelete);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,7 @@ private class DeleteRequestV1 extends DeleteRequest {
* @param blobId blobId of the delete request
*/
private DeleteRequestV1(int correlationId, String clientId, BlobId blobId) {
super(correlationId, clientId, blobId, Utils.Infinite_Time, DeleteRequest.DELETE_REQUEST_VERSION_1);
super(correlationId, clientId, blobId, Utils.Infinite_Time, DeleteRequest.DELETE_REQUEST_VERSION_1, false);
}
}

Expand Down

0 comments on commit 031b655

Please sign in to comment.