Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add handleUndeleteRequest method to AmbryRequests #1379

Merged
merged 6 commits into from
Feb 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ public class ServerConfig {
@Default("false")
public final boolean serverValidateRequestBasedOnStoreState;

/**
* True to enable ambry server handling undelete requests.
*/
@Config("server.handle.undelete.request.enabled")
@Default("false")
public final boolean serverHandleUndeleteRequestEnabled;

public ServerConfig(VerifiableProperties verifiableProperties) {
serverRequestHandlerNumOfThreads = verifiableProperties.getInt("server.request.handler.num.of.threads", 7);
serverSchedulerNumOfthreads = verifiableProperties.getInt("server.scheduler.num.of.threads", 10);
Expand All @@ -110,5 +117,7 @@ public ServerConfig(VerifiableProperties verifiableProperties) {
Utils.splitString(verifiableProperties.getString("server.stats.reports.to.publish", ""), ",");
serverValidateRequestBasedOnStoreState =
verifiableProperties.getBoolean("server.validate.request.based.on.store.state", false);
serverHandleUndeleteRequestEnabled =
verifiableProperties.getBoolean("server.handle.undelete.request.enabled", false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,9 @@ public enum ServerErrorCode {
Blob_Already_Updated,
Blob_Update_Not_Allowed,
Replica_Unavailable,
Blob_Authorization_Failure
Blob_Authorization_Failure,
Blob_Life_Version_Conflict,
Blob_Not_Deleted,
Blob_Already_Undeleted,
Blob_Deleted_Permanently
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public class ErrorMapping {
tempMap.put(StoreErrorCodes.Authorization_Failure, ServerErrorCode.Blob_Authorization_Failure);
tempMap.put(StoreErrorCodes.Already_Updated, ServerErrorCode.Blob_Already_Updated);
tempMap.put(StoreErrorCodes.Update_Not_Allowed, ServerErrorCode.Blob_Update_Not_Allowed);
tempMap.put(StoreErrorCodes.Life_Version_Conflict, ServerErrorCode.Blob_Life_Version_Conflict);
tempMap.put(StoreErrorCodes.ID_Not_Deleted, ServerErrorCode.Blob_Not_Deleted);
tempMap.put(StoreErrorCodes.ID_Undeleted, ServerErrorCode.Blob_Already_Undeleted);
tempMap.put(StoreErrorCodes.ID_Deleted_Permanently, ServerErrorCode.Blob_Deleted_Permanently);
storeErrorMapping = Collections.unmodifiableMap(tempMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ public class ServerMetrics {
public final Histogram deleteBlobSendTimeInMs;
public final Histogram deleteBlobTotalTimeInMs;

public final Histogram undeleteBlobRequestQueueTimeInMs;
public final Histogram undeleteBlobProcessingTimeInMs;
public final Histogram undeleteBlobResponseQueueTimeInMs;
public final Histogram undeleteBlobSendTimeInMs;
public final Histogram undeleteBlobTotalTimeInMs;

public final Histogram updateBlobTtlRequestQueueTimeInMs;
public final Histogram updateBlobTtlProcessingTimeInMs;
public final Histogram updateBlobTtlResponseQueueTimeInMs;
Expand Down Expand Up @@ -157,6 +163,7 @@ public class ServerMetrics {
public final Meter getBlobAllByReplicaRequestRate;
public final Meter getBlobInfoRequestRate;
public final Meter deleteBlobRequestRate;
public final Meter undeleteBlobRequestRate;
public final Meter updateBlobTtlRequestRate;
public final Meter replicaMetadataRequestRate;
public final Meter triggerCompactionRequestRate;
Expand All @@ -183,18 +190,23 @@ public class ServerMetrics {
public final Counter unExpectedStoreGetError;
public final Counter unExpectedStoreTtlUpdateError;
public final Counter unExpectedStoreDeleteError;
public final Counter unExpectedStoreUndeleteError;
public final Counter unExpectedAdminOperationError;
public final Counter unExpectedStoreFindEntriesError;
public final Counter idAlreadyExistError;
public final Counter dataCorruptError;
public final Counter unknownFormatError;
public final Counter idNotFoundError;
public final Counter idDeletedError;
public final Counter idUndeletedError;
public final Counter idNotDeletedError;
public final Counter lifeVersionConflictError;
public final Counter ttlExpiredError;
public final Counter badRequestError;
public final Counter temporarilyDisabledError;
public final Counter getAuthorizationFailure;
public final Counter deleteAuthorizationFailure;
public final Counter undeleteAuthorizationFailure;
public final Counter ttlUpdateAuthorizationFailure;
public final Counter ttlAlreadyUpdatedError;
public final Counter ttlUpdateRejectedError;
Expand Down Expand Up @@ -303,6 +315,15 @@ public ServerMetrics(MetricRegistry registry, Class<?> requestClass, Class<?> se
deleteBlobSendTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobSendTime"));
deleteBlobTotalTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobTotalTime"));

undeleteBlobRequestQueueTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobRequestQueueTime"));
undeleteBlobProcessingTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobProcessingTime"));
undeleteBlobResponseQueueTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobResponseQueueTime"));
undeleteBlobSendTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobSendTime"));
undeleteBlobTotalTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobTotalTime"));

updateBlobTtlRequestQueueTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "UpdateBlobTtlRequestQueueTime"));
updateBlobTtlProcessingTimeInMs =
Expand Down Expand Up @@ -399,6 +420,7 @@ public ServerMetrics(MetricRegistry registry, Class<?> requestClass, Class<?> se
registry.meter(MetricRegistry.name(requestClass, "GetBlobAllByReplicaRequestRate"));
getBlobInfoRequestRate = registry.meter(MetricRegistry.name(requestClass, "GetBlobInfoRequestRate"));
deleteBlobRequestRate = registry.meter(MetricRegistry.name(requestClass, "DeleteBlobRequestRate"));
undeleteBlobRequestRate = registry.meter(MetricRegistry.name(requestClass, "UndeleteBlobRequestRate"));
updateBlobTtlRequestRate = registry.meter(MetricRegistry.name(requestClass, "UpdateBlobTtlRequestRate"));
replicaMetadataRequestRate = registry.meter(MetricRegistry.name(requestClass, "ReplicaMetadataRequestRate"));
triggerCompactionRequestRate = registry.meter(MetricRegistry.name(requestClass, "TriggerCompactionRequestRate"));
Expand Down Expand Up @@ -426,12 +448,16 @@ public ServerMetrics(MetricRegistry registry, Class<?> requestClass, Class<?> se
unknownFormatError = registry.counter(MetricRegistry.name(requestClass, "UnknownFormatError"));
idNotFoundError = registry.counter(MetricRegistry.name(requestClass, "IDNotFoundError"));
idDeletedError = registry.counter(MetricRegistry.name(requestClass, "IDDeletedError"));
idUndeletedError = registry.counter(MetricRegistry.name(requestClass, "IDUndeletedError"));
idNotDeletedError = registry.counter(MetricRegistry.name(requestClass, "IDNotDeletedError"));
lifeVersionConflictError = registry.counter(MetricRegistry.name(requestClass, "lifeVersionConflictError"));
ttlExpiredError = registry.counter(MetricRegistry.name(requestClass, "TTLExpiredError"));
temporarilyDisabledError = registry.counter(MetricRegistry.name(requestClass, "TemporarilyDisabledError"));
badRequestError = registry.counter(MetricRegistry.name(requestClass, "BadRequestError"));
unExpectedStorePutError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStorePutError"));
unExpectedStoreGetError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreGetError"));
unExpectedStoreDeleteError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreDeleteError"));
unExpectedStoreUndeleteError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreUndeleteError"));
unExpectedAdminOperationError =
registry.counter(MetricRegistry.name(requestClass, "UnexpectedAdminOperationError"));
unExpectedStoreTtlUpdateError =
Expand All @@ -440,6 +466,7 @@ public ServerMetrics(MetricRegistry registry, Class<?> requestClass, Class<?> se
registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreFindEntriesError"));
getAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "GetAuthorizationFailure"));
deleteAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "DeleteAuthorizationFailure"));
undeleteAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "UndeleteAuthorizationFailure"));
ttlUpdateAuthorizationFailure =
registry.counter(MetricRegistry.name(requestClass, "TtlUpdateAuthorizationFailure"));
ttlAlreadyUpdatedError = registry.counter(MetricRegistry.name(requestClass, "TtlAlreadyUpdatedError"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ public void messageFormatUndeleteUpdateRecordTest() throws Exception {
* @param updateTimeMs the expected time of update
* @throws Exception any error.
*/
private static void checkUndeleteMessage(InputStream stream, Long expectedRecordSize, StoreKey key, short accountId,
public static void checkUndeleteMessage(InputStream stream, Long expectedRecordSize, StoreKey key, short accountId,
short containerId, long updateTimeMs, short lifeVersion) throws Exception {
checkHeaderAndStoreKeyForUpdate(stream, expectedRecordSize, key, lifeVersion);
checkUndeleteSubRecord(stream, accountId, containerId, updateTimeMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ public void handleRequests(NetworkRequest request) throws InterruptedException {
case AdminRequest:
handleAdminRequest(request);
break;
case UndeleteRequest:
handleUndeleteRequest(request);
break;
default:
throw new UnsupportedOperationException("Request type not supported");
}
Expand Down Expand Up @@ -641,9 +644,84 @@ public void handleReplicaMetadataRequest(NetworkRequest request) throws IOExcept
metrics.replicaMetadataSendTimeInMs, metrics.replicaMetadataTotalTimeInMs, null, null, totalTimeSpent));
}

private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutResponse response, NetworkRequest request,
Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, long totalTimeSpent,
long blobSize, ServerMetrics metrics) throws InterruptedException {
@Override
public void handleUndeleteRequest(NetworkRequest request) throws IOException, InterruptedException {
UndeleteRequest undeleteRequest =
UndeleteRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap);
long requestQueueTime = SystemTime.getInstance().milliseconds() - request.getStartTimeInMs();
long totalTimeSpent = requestQueueTime;
metrics.undeleteBlobRequestQueueTimeInMs.update(requestQueueTime);
metrics.undeleteBlobRequestRate.mark();
long startTime = SystemTime.getInstance().milliseconds();
UndeleteResponse response = null;
try {
StoreKey convertedStoreKey = getConvertedStoreKeys(Collections.singletonList(undeleteRequest.getBlobId())).get(0);
ServerErrorCode error =
validateRequest(undeleteRequest.getBlobId().getPartition(), RequestOrResponseType.UndeleteRequest, false);
if (error != ServerErrorCode.No_Error) {
logger.error("Validating undelete request failed with error {} for request {}", error, undeleteRequest);
response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), error);
} else {
BlobId convertedBlobId = (BlobId) convertedStoreKey;
MessageInfo info =
new MessageInfo(convertedBlobId, 0, convertedBlobId.getAccountId(), convertedBlobId.getContainerId(),
undeleteRequest.getOperationTimeMs());
Store storeToUndelete = storeManager.getStore(undeleteRequest.getBlobId().getPartition());
short lifeVersion = storeToUndelete.undelete(info);
response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), lifeVersion);
if (notification != null) {
notification.onBlobReplicaUndeleted(currentNode.getHostname(), currentNode.getPort(),
convertedStoreKey.getID(), BlobReplicaSourceType.PRIMARY);
}
}
} catch (StoreException e) {
boolean logInErrorLevel = false;
if (e.getErrorCode() == StoreErrorCodes.ID_Not_Found) {
metrics.idNotFoundError.inc();
} else if (e.getErrorCode() == StoreErrorCodes.TTL_Expired) {
metrics.ttlExpiredError.inc();
} else if (e.getErrorCode() == StoreErrorCodes.ID_Deleted_Permanently) {
metrics.idDeletedError.inc();
} else if (e.getErrorCode() == StoreErrorCodes.Life_Version_Conflict) {
metrics.lifeVersionConflictError.inc();
} else if (e.getErrorCode() == StoreErrorCodes.ID_Not_Deleted) {
metrics.idNotDeletedError.inc();
} else if (e.getErrorCode() == StoreErrorCodes.ID_Undeleted) {
metrics.idUndeletedError.inc();
} else if (e.getErrorCode() == StoreErrorCodes.Authorization_Failure) {
metrics.undeleteAuthorizationFailure.inc();
} else {
logInErrorLevel = true;
metrics.unExpectedStoreUndeleteError.inc();
}
if (logInErrorLevel) {
logger.error("Store exception on a undelete with error code {} for request {}", e.getErrorCode(),
undeleteRequest, e);
} else {
logger.trace("Store exception on a undelete with error code {} for request {}", e.getErrorCode(),
undeleteRequest, e);
}
response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(),
ErrorMapping.getStoreErrorMapping(e.getErrorCode()));
} catch (Exception e) {
logger.error("Unknown exception for undelete request " + undeleteRequest, e);
response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(),
ServerErrorCode.Unknown_Error);
metrics.unExpectedStoreUndeleteError.inc();
} finally {
long processingTime = SystemTime.getInstance().milliseconds() - startTime;
totalTimeSpent += processingTime;
publicAccessLogger.info("{} {} processingTime {}", undeleteRequest, response, processingTime);
metrics.undeleteBlobProcessingTimeInMs.update(processingTime);
}
requestResponseChannel.sendResponse(response, request,
new ServerNetworkResponseMetrics(metrics.undeleteBlobResponseQueueTimeInMs, metrics.undeleteBlobSendTimeInMs,
metrics.undeleteBlobTotalTimeInMs, null, null, totalTimeSpent));
}

private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutResponse response,
NetworkRequest request, Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime,
long totalTimeSpent, long blobSize, ServerMetrics metrics) throws InterruptedException {
if (response.getError() == ServerErrorCode.No_Error) {
metrics.markPutBlobRequestRateBySize(blobSize);
if (blobSize <= ServerMetrics.smallBlob) {
Expand All @@ -666,9 +744,9 @@ private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutR
}
}

private void sendGetResponse(RequestResponseChannel requestResponseChannel, GetResponse response, NetworkRequest request,
Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, long totalTimeSpent,
long blobSize, MessageFormatFlags flags, ServerMetrics metrics) throws InterruptedException {
private void sendGetResponse(RequestResponseChannel requestResponseChannel, GetResponse response,
NetworkRequest request, Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime,
long totalTimeSpent, long blobSize, MessageFormatFlags flags, ServerMetrics metrics) throws InterruptedException {

if (blobSize <= ServerMetrics.smallBlob) {
if (flags == MessageFormatFlags.Blob || flags == MessageFormatFlags.All) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.github.ambry.protocol;

import com.github.ambry.router.AsyncWritableChannel;
import com.github.ambry.router.Callback;
import com.github.ambry.server.ServerErrorCode;
import com.github.ambry.utils.Utils;
import java.io.DataInputStream;
Expand Down Expand Up @@ -80,9 +82,7 @@ public static UndeleteResponse readFrom(DataInputStream stream) throws IOExcepti
}
}

@Override
public long writeTo(WritableByteChannel channel) throws IOException {
long written = 0;
private void prepareBuffer() {
if (bufferToSend == null) {
bufferToSend = ByteBuffer.allocate((int) sizeInBytes());
writeHeader();
Expand All @@ -91,12 +91,24 @@ public long writeTo(WritableByteChannel channel) throws IOException {
}
bufferToSend.flip();
}
}

@Override
public long writeTo(WritableByteChannel channel) throws IOException {
long written = 0;
prepareBuffer();
if (bufferToSend.remaining() > 0) {
written = channel.write(bufferToSend);
}
return written;
}

@Override
public void writeTo(AsyncWritableChannel channel, Callback<Long> callback) {
prepareBuffer();
channel.write(bufferToSend, callback);
}

@Override
public long sizeInBytes() {
return super.sizeInBytes() + (long) (getError() == ServerErrorCode.No_Error ? Life_Version_InBytes : 0);
Expand Down
Loading