Skip to content

Commit

Permalink
Update to use the colum map to original value
Browse files Browse the repository at this point in the history
  • Loading branch information
Sophie Guo committed Nov 14, 2024
1 parent 50a47a2 commit e93152a
Show file tree
Hide file tree
Showing 16 changed files with 500 additions and 106 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,21 @@ public void deleteDatasetVersion(String accountName, String containerName, Strin
}
}

@Override
public void deleteDatasetVersionForDatasetDelete(String accountName, String containerName, String datasetName,
String version) throws AccountServiceException {
try {
Pair<Short, Short> accountAndContainerIdPair = getAccountAndContainerIdFromName(accountName, containerName);
if (mySqlAccountStore == null) {
mySqlAccountStore = this.supplier.get();
}
mySqlAccountStore.deleteDatasetVersion(accountAndContainerIdPair.getFirst(),
accountAndContainerIdPair.getSecond(), datasetName, version);
} catch (SQLException e) {
throw translateSQLException(e);
}
}

@Override
public void updateDatasetVersionTtl(String accountName, String containerName, String datasetName, String version)
throws AccountServiceException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,12 @@ public void deleteDatasetVersion(String accountName, String containerName, Strin
cachedAccountService.deleteDatasetVersion(accountName, containerName, datasetName, version);
}

@Override
public void deleteDatasetVersionForDatasetDelete(String accountName, String containerName, String datasetName,
String version) throws AccountServiceException {
cachedAccountService.deleteDatasetVersionForDatasetDelete(accountName, containerName, datasetName, version);
}

@Override
public void updateDatasetVersionTtl(String accountName, String containerName, String datasetName,
String version) throws AccountServiceException {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@ public synchronized void deleteDatasetVersion(short accountId, short containerId
datasetDao.deleteDatasetVersion(accountId, containerId, datasetName, version);
}

public synchronized void deleteDatasetVersionForDatasetDelete(short accountId, short containerId, String datasetName,
String version) throws SQLException, AccountServiceException {
datasetDao.deleteDatasetVersionForDatasetDelete(accountId, containerId, datasetName, version);
}

/**
* Update ttl for a version of {@link Dataset}
* @param accountId the id for the parent account.
Expand Down
1 change: 1 addition & 0 deletions ambry-account/src/main/resources/AccountSchema.ddl
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ CREATE TABLE IF NOT EXISTS DatasetVersions (
lastModifiedTime DATETIME(3) NOT NULL,
delete_ts DATETIME(6) DEFAULT NULL,
deleted_ts DATETIME(6) DEFAULT NULL,
rename_from VARCHAR(25) DEFAULT NULL,
PRIMARY KEY (accountId, containerId, datasetName, version)
)
CHARACTER SET utf8 COLLATE utf8_bin;
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,11 @@ default void deleteDatasetVersion(String accountName, String containerName, Stri
throw new UnsupportedOperationException("This method is not supported");
}

default void deleteDatasetVersionForDatasetDelete(String accountName, String containerName, String datasetName,
String version) throws AccountServiceException {
throw new UnsupportedOperationException("This method is not supported");
}

/**
* Get all valid dataset versions for dataset deletion.
* @param accountName The name for the parent account.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,37 +27,43 @@ public class DatasetVersionRecord {
private final String version;
private final long expirationTimeMs;
private final Long creationTimeMs;
private final String renameFrom;

/**
* Constructor that takes individual arguments.
* @param accountId the id of the parent account.
* @param containerId the id of the container.
* @param datasetName the name of the dataset.
* @param version the version of the dataset.
*
* @param accountId the id of the parent account.
* @param containerId the id of the container.
* @param datasetName the name of the dataset.
* @param version the version of the dataset.
* @param expirationTimeMs the expiration time in milliseconds since epoch, or -1 if the blob should be permanent.
* @param renameFrom the original version which renamed from
*/
public DatasetVersionRecord(int accountId, int containerId, String datasetName, String version,
long expirationTimeMs) {
this(accountId, containerId, datasetName, version, expirationTimeMs, null);
long expirationTimeMs, String renameFrom) {
this(accountId, containerId, datasetName, version, expirationTimeMs, null, renameFrom);
}

/**
* Constructor for retention policy support.
* @param accountId the id of the parent account.
* @param containerId the id of the container.
* @param datasetName the name of the dataset.
* @param version the version of the dataset.
*
* @param accountId the id of the parent account.
* @param containerId the id of the container.
* @param datasetName the name of the dataset.
* @param version the version of the dataset.
* @param expirationTimeMs the expiration time in milliseconds since epoch, or -1 if the blob should be permanent.
* @param creationTimeMs the creation time in milliseconds since epoch for dataset version.
* @param creationTimeMs the creation time in milliseconds since epoch for dataset version.
* @param renameFrom
*/
public DatasetVersionRecord(int accountId, int containerId, String datasetName, String version, long expirationTimeMs,
Long creationTimeMs) {
Long creationTimeMs, String renameFrom) {
this.accountId = accountId;
this.containerId = containerId;
this.datasetName = datasetName;
this.version = version;
this.expirationTimeMs = expirationTimeMs;
this.creationTimeMs = creationTimeMs;
this.renameFrom = renameFrom;
}

/**
Expand Down Expand Up @@ -102,6 +108,10 @@ public long getCreationTimeMs() {
return creationTimeMs;
}

public String getRenameFrom() {
return renameFrom;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@

public enum DatasetVersionState {
IN_PROGRESS,
READY
READY,
RENAMED
}
10 changes: 10 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ public static final class Headers {
* for put or get dataset request; long; version of dataset.
*/
public final static String TARGET_DATASET_VERSION = "x-ambry-target-dataset-version";
/**
* The original named blob name for a certain dataset version.
*/
public final static String TARGET_ORIGINAL_NAMED_BLOB_NAME = "x-ambry-target-original-named-blob-name";
/**
* The dataset version expiration time.
*/
Expand Down Expand Up @@ -478,6 +482,12 @@ public static final class InternalKeys {
*/
public static final String REQUEST_PATH = KEY_PREFIX + "request-path";

/**
* The internal header to determine if the delete request is coming from a dataset deletion.
*/
public static final String DATASET_DELETE_ENABLED = KEY_PREFIX + "dataset-delete-enabled";


/**
* To be set to {@code true} if failures reason should be attached to frontend responses.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ private Callback<Void> securityProcessRequestCallback() {
private Callback<Void> securityPostProcessRequestCallback() {
return buildCallback(metrics.deleteBlobSecurityPostProcessRequestMetrics, result -> {
String serviceId = RestUtils.getHeader(restRequest.getArgs(), RestUtils.Headers.SERVICE_ID, false);
//TODO: Delete the renamed blob if rename exist.
router.deleteBlob(restRequest, null, serviceId, routerCallback(),
QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, false));
}, restRequest.getUri(), LOGGER, finalCallback);
Expand Down Expand Up @@ -207,7 +208,11 @@ private void deleteDatasetVersion(RestRequest restRequest) throws RestServiceExc
containerName = dataset.getContainerName();
datasetName = dataset.getDatasetName();
version = (String) restRequest.getArgs().get(TARGET_DATASET_VERSION);
accountService.deleteDatasetVersion(accountName, containerName, datasetName, version);
if (restRequest.getArgs().get(DATASET_DELETE_ENABLED) != null) {
accountService.deleteDatasetVersionForDatasetDelete(accountName, containerName, datasetName, version);
} else {
accountService.deleteDatasetVersion(accountName, containerName, datasetName, version);
}
LOGGER.debug(
"Successfully deleteDataset version for accountName: " + accountName + " containerName: " + containerName
+ " datasetName: " + datasetName + " version: " + version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,22 @@ private void deleteDataset() throws RestServiceException {
DatasetVersionRecord record = datasetVersionRecordList.get(0);
String version = record.getVersion();
RequestPath requestPath = getRequestPath(restRequest);
RequestPath newRequestPath =
new RequestPath(requestPath.getPrefix(), requestPath.getClusterName(), requestPath.getPathAfterPrefixes(),
NAMED_BLOB_PREFIX + SLASH + accountName + SLASH + containerName + SLASH + datasetName + SLASH
+ version, requestPath.getSubResource(), requestPath.getBlobSegmentIdx());
RequestPath newRequestPath;
if (record.getRenameFrom() != null) {
newRequestPath = new RequestPath(requestPath.getPrefix(), requestPath.getClusterName(),
requestPath.getPathAfterPrefixes(),
NAMED_BLOB_PREFIX + SLASH + accountName + SLASH + containerName + SLASH + datasetName + SLASH
+ record.getRenameFrom(), requestPath.getSubResource(), requestPath.getBlobSegmentIdx());
} else {
newRequestPath =
new RequestPath(requestPath.getPrefix(), requestPath.getClusterName(), requestPath.getPathAfterPrefixes(),
NAMED_BLOB_PREFIX + SLASH + accountName + SLASH + containerName + SLASH + datasetName + SLASH + version,
requestPath.getSubResource(), requestPath.getBlobSegmentIdx());
}
// Replace RequestPath in the RestRequest and call DeleteBlobHandler.handle.
restRequest.setArg(InternalKeys.REQUEST_PATH, newRequestPath);
restRequest.setArg(Headers.DATASET_VERSION_QUERY_ENABLED, "true");
restRequest.setArg(InternalKeys.DATASET_DELETE_ENABLED, "true");
deleteBlobHandler.handle(restRequest, restResponseChannel,
recursiveCallback(datasetVersionRecordList, 1, accountName, containerName, datasetName));
} else {
Expand Down Expand Up @@ -199,10 +208,18 @@ private Callback<Void> recursiveCallback(List<DatasetVersionRecord> datasetVersi
return buildCallback(frontendMetrics.deleteBlobSecurityProcessResponseMetrics, securityCheckResult -> {
String version = record.getVersion();
RequestPath requestPath = getRequestPath(restRequest);
RequestPath newRequestPath =
new RequestPath(requestPath.getPrefix(), requestPath.getClusterName(), requestPath.getPathAfterPrefixes(),
NAMED_BLOB_PREFIX + SLASH + accountName + SLASH + containerName + SLASH + datasetName + SLASH + version,
requestPath.getSubResource(), requestPath.getBlobSegmentIdx());
RequestPath newRequestPath;
if (record.getRenameFrom() != null) {
newRequestPath =
new RequestPath(requestPath.getPrefix(), requestPath.getClusterName(), requestPath.getPathAfterPrefixes(),
NAMED_BLOB_PREFIX + SLASH + accountName + SLASH + containerName + SLASH + datasetName + SLASH
+ record.getRenameFrom(), requestPath.getSubResource(), requestPath.getBlobSegmentIdx());
} else {
newRequestPath =
new RequestPath(requestPath.getPrefix(), requestPath.getClusterName(), requestPath.getPathAfterPrefixes(),
NAMED_BLOB_PREFIX + SLASH + accountName + SLASH + containerName + SLASH + datasetName + SLASH + version,
requestPath.getSubResource(), requestPath.getBlobSegmentIdx());
}
// Replace RequestPath in the RestRequest and call DeleteBlobHandler.handle.
restRequest.setArg(InternalKeys.REQUEST_PATH, newRequestPath);
restRequest.setArg(Headers.DATASET_VERSION_QUERY_ENABLED, "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,21 @@ private String getDatasetVersion(RestRequest restRequest) throws RestServiceExce
restResponseChannel.setHeader(RestUtils.Headers.TARGET_CONTAINER_NAME, containerName);
restResponseChannel.setHeader(RestUtils.Headers.TARGET_DATASET_NAME, datasetName);
restResponseChannel.setHeader(RestUtils.Headers.TARGET_DATASET_VERSION, datasetVersionRecord.getVersion());
restResponseChannel.setHeader(RestUtils.Headers.TARGET_ORIGINAL_NAMED_BLOB_NAME,
datasetVersionRecord.getRenameFrom());
if (datasetVersionRecord.getExpirationTimeMs() != Utils.Infinite_Time) {
restResponseChannel.setHeader(RestUtils.Headers.DATASET_EXPIRATION_TIME,
new Date(datasetVersionRecord.getExpirationTimeMs()));
}
metrics.getDatasetVersionProcessingTimeInMs.update(System.currentTimeMillis() - startGetDatasetVersionTime);
// If version is null, use the latest version + 1 from DatasetVersionRecord to construct named blob path.
return NAMED_BLOB_PREFIX + SLASH + accountName + SLASH + containerName + SLASH + datasetName + SLASH
+ datasetVersionRecord.getVersion();
if (datasetVersionRecord.getRenameFrom() != null) {
return NAMED_BLOB_PREFIX + SLASH + accountName + SLASH + containerName + SLASH + datasetName + SLASH
+ datasetVersionRecord.getRenameFrom();
} else {
return NAMED_BLOB_PREFIX + SLASH + accountName + SLASH + containerName + SLASH + datasetName + SLASH
+ datasetVersionRecord.getVersion();
}
} catch (AccountServiceException ex) {
LOGGER.error(
"Failed to get dataset version for accountName: " + accountName + " containerName: " + containerName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,10 +748,18 @@ private void recursiveDeleteDatasetVersions(RestRequest restRequest,
DatasetVersionRecord record = datasetVersionRecordList.get(idx);
String version = record.getVersion();
RequestPath requestPath = getRequestPath(restRequest);
RequestPath newRequestPath =
new RequestPath(requestPath.getPrefix(), requestPath.getClusterName(), requestPath.getPathAfterPrefixes(),
NAMED_BLOB_PREFIX + SLASH + accountName + SLASH + containerName + SLASH + datasetName + SLASH + version,
requestPath.getSubResource(), requestPath.getBlobSegmentIdx());
RequestPath newRequestPath;
if (record.getRenameFrom() != null) {
newRequestPath =
new RequestPath(requestPath.getPrefix(), requestPath.getClusterName(), requestPath.getPathAfterPrefixes(),
NAMED_BLOB_PREFIX + SLASH + accountName + SLASH + containerName + SLASH + datasetName + SLASH
+ record.getRenameFrom(), requestPath.getSubResource(), requestPath.getBlobSegmentIdx());
} else {
newRequestPath =
new RequestPath(requestPath.getPrefix(), requestPath.getClusterName(), requestPath.getPathAfterPrefixes(),
NAMED_BLOB_PREFIX + SLASH + accountName + SLASH + containerName + SLASH + datasetName + SLASH + version,
requestPath.getSubResource(), requestPath.getBlobSegmentIdx());
}
LOGGER.debug("New request path : " + newRequestPath);
// Replace RequestPath in the WrappedRestRequest for delete and call DeleteBlobHandler.handle.
restRequest.setArg(InternalKeys.REQUEST_PATH, newRequestPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ private Callback<String> idConverterCallback() {
private Callback<Void> securityPostProcessRequestCallback(BlobId convertedBlobId) {
return buildCallback(metrics.updateBlobTtlSecurityPostProcessRequestMetrics, result -> {
String serviceId = RestUtils.getHeader(restRequest.getArgs(), RestUtils.Headers.SERVICE_ID, true);
//TODO: rebuild the request if the dataset has a renamed field
router.updateBlobTtl(null, convertedBlobId.getID(), serviceId, Utils.Infinite_Time, routerCallback(convertedBlobId),
QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, false));
}, restRequest.getUri(), LOGGER, finalCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public synchronized DatasetVersionRecord addDatasetVersion(String accountName, S
}
}
DatasetVersionRecord datasetVersionRecord =
new DatasetVersionRecord(accountId, containerId, datasetName, version, updatedExpirationTimeMs);
new DatasetVersionRecord(accountId, containerId, datasetName, version, updatedExpirationTimeMs, null);
idToDatasetVersionMap.get(new Pair<>(accountId, containerId))
.put(new Pair<>(datasetName, version), datasetVersionRecord);
return datasetVersionRecord;
Expand All @@ -184,6 +184,16 @@ public synchronized void deleteDatasetVersion(String accountName, String contain
Account account = nameToAccountMap.get(accountName);
short accountId = account.getId();
short containerId = account.getContainerByName(containerName).getId();
//TODO: if state == renamed, return not found.
idToDatasetVersionMap.get(new Pair<>(accountId, containerId)).remove(new Pair<>(datasetName, version));
}

@Override
public synchronized void deleteDatasetVersionForDatasetDelete(String accountName, String containerName,
String datasetName, String version) throws AccountServiceException {
Account account = nameToAccountMap.get(accountName);
short accountId = account.getId();
short containerId = account.getContainerByName(containerName).getId();
idToDatasetVersionMap.get(new Pair<>(accountId, containerId)).remove(new Pair<>(datasetName, version));
}

Expand Down Expand Up @@ -324,7 +334,7 @@ public synchronized void updateDatasetVersionTtl(String accountName, String cont
throw new AccountServiceException("Dataset version has been deleted", AccountServiceErrorCode.Deleted);
}
DatasetVersionRecord updatedDatasetVersionRecord =
new DatasetVersionRecord(accountId, containerId, datasetName, version, Utils.Infinite_Time);
new DatasetVersionRecord(accountId, containerId, datasetName, version, Utils.Infinite_Time, null);
idToDatasetVersionMap.get(new Pair<>(accountId, containerId))
.put(new Pair<>(datasetName, version), updatedDatasetVersionRecord);
}
Expand Down

0 comments on commit e93152a

Please sign in to comment.