Skip to content

Commit

Permalink
Split the pr to support named blob ttl update only
Browse files Browse the repository at this point in the history
  • Loading branch information
Sophie Guo committed Aug 17, 2023
1 parent 1307007 commit 90d2f5c
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 4 deletions.
10 changes: 10 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/named/NamedBlobDb.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public interface NamedBlobDb extends Closeable {
*/
CompletableFuture<NamedBlobRecord> get(String accountName, String containerName, String blobName, GetOption option);


/**
* Look up a {@link NamedBlobRecord} by name.
* @param accountName the name of the account.
Expand All @@ -50,6 +51,15 @@ default CompletableFuture<NamedBlobRecord> get(String accountName, String contai
return get(accountName, containerName, blobName, GetOption.None);
}

/**
* Support ttl update for {@link NamedBlobRecord}
* @param record the {@link NamedBlobRecord}
* @param state the {@link NamedBlobState}
* @return a {@link CompletableFuture} that will eventually contain either the {@link NamedBlobRecord} for the named
* blob or an exception if an error occurred.
*/
CompletableFuture<PutResult> ttlUpdate(NamedBlobRecord record, NamedBlobState state);

/**
* List blobs that start with a provided prefix in a container. This returns paginated results. If there are
* additional pages to read, {@link Page#getNextPageToken()} will be non null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public class NamedBlobRecord {
private final String accountName;
private final String containerName;
private final String blobName;
private final String blobId;
private final long expirationTimeMs;
private long version;
private String blobId;

/**
* @param accountName the account name.
Expand Down Expand Up @@ -98,6 +98,10 @@ public void setVersion(long newVersion) {
version = newVersion;
}

public void setBlobId(String blobId) {
this.blobId = blobId;
}

/**
* @return the expiration time in milliseconds since epoch, or -1 if the blob should be permanent.
*/
Expand Down
5 changes: 5 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,11 @@ public static final class InternalKeys {
* The version for the NamedBlob record in MySQL DB
*/
public static final String NAMED_BLOB_VERSION = KEY_PREFIX + "named-blob-version";

/**
* The blob id for the NamedBlob record in MySQL DB.
*/
public static final String NAMED_BLOB_MAPPED_ID = KEY_PREFIX + "named-blob-mapped-id";
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.github.ambry.utils.Utils.*;


/**
* Factory that instantiates an {@link IdConverter} implementation for the frontend.
Expand Down Expand Up @@ -167,6 +169,18 @@ private CompletionStage<String> convertId(String input, RestRequest restRequest,
// on delete requests we can soft delete the record from NamedBlobDb and get the blob ID in one step.
conversionFuture = getNamedBlobDb().delete(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(),
namedBlobPath.getBlobName()).thenApply(DeleteResult::getBlobId);
} else if (restRequest.getRestMethod() == RestMethod.PUT && RestUtils.getRequestPath(restRequest)
.matchesOperation(Operations.UPDATE_TTL)) {
NamedBlobRecord record = new NamedBlobRecord(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(),
namedBlobPath.getBlobName(), null, Infinite_Time);
// Set named blob state as 'IN_PROGRESS', will set the state to be 'READY' in the ttlUpdate success callback: routerTtlUpdateCallback
NamedBlobState state = NamedBlobState.IN_PROGRESS;
// Always enable upsert to ttl update request.
conversionFuture = getNamedBlobDb().ttlUpdate(record, state).thenApply(result -> {
restRequest.setArg(RestUtils.InternalKeys.NAMED_BLOB_VERSION, result.getInsertedRecord().getVersion());
restRequest.setArg(RestUtils.InternalKeys.NAMED_BLOB_MAPPED_ID, result.getInsertedRecord().getBlobId());
return result.getInsertedRecord().getBlobId();
});
} else {
conversionFuture = getNamedBlobDb().get(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(),
namedBlobPath.getBlobName(), getOption).thenApply(NamedBlobRecord::getBlobId);
Expand All @@ -182,7 +196,7 @@ private CompletionStage<String> convertId(String input, RestRequest restRequest,
NamedBlobRecord record = new NamedBlobRecord(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(),
namedBlobPath.getBlobName(), blobId, expirationTimeMs);
NamedBlobState state = NamedBlobState.READY;
if (properties.getTimeToLiveInSeconds() == Utils.Infinite_Time) {
if (properties.getTimeToLiveInSeconds() == Infinite_Time) {
// Set named blob state as 'IN_PROGRESS', will set the state to be 'READY' in the ttlUpdate success callback: routerTtlUpdateCallback
state = NamedBlobState.IN_PROGRESS;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.github.ambry.rest.RestServiceException;
import com.github.ambry.utils.Pair;
import com.github.ambry.utils.Time;
import com.github.ambry.utils.Utils;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
Expand Down Expand Up @@ -70,7 +71,7 @@ public CompletableFuture<NamedBlobRecord> get(String accountName, String contain
} else if (recordWithDelete.getSecond() != 0 && recordWithDelete.getSecond() < time.milliseconds()
&& !includeDeletedOptions.contains(option)) {
future.completeExceptionally(new RestServiceException("Deleted", RestServiceErrorCode.Deleted));
} else if (recordWithDelete.getFirst().getExpirationTimeMs() != 0
} else if (recordWithDelete.getFirst().getExpirationTimeMs() != Utils.Infinite_Time
&& recordWithDelete.getFirst().getExpirationTimeMs() < time.milliseconds() && !includeExpiredOptions.contains(
option)) {
future.completeExceptionally(new RestServiceException("Deleted", RestServiceErrorCode.Deleted));
Expand Down Expand Up @@ -173,6 +174,26 @@ public CompletableFuture<DeleteResult> delete(String accountName, String contain
return future;
}

@Override
public CompletableFuture<PutResult> ttlUpdate(NamedBlobRecord record, NamedBlobState state) {
CompletableFuture<PutResult> future = new CompletableFuture<>();
if (exception != null) {
future.completeExceptionally(exception);
return future;
}
Pair<NamedBlobRecord, Long> recordFromDb = getInternal(record.getAccountName(), record.getContainerName(), record.getBlobName());
if (recordFromDb == null) {
future.completeExceptionally(new RestServiceException("NotFound", RestServiceErrorCode.NotFound));
} else if (recordFromDb.getSecond() != 0 && recordFromDb.getSecond() < time.milliseconds()) {
future.completeExceptionally(new RestServiceException("Deleted", RestServiceErrorCode.Deleted));
} else {
record.setBlobId(recordFromDb.getFirst().getBlobId());
putInternal(record, NamedBlobState.IN_PROGRESS, 0L);
future.complete(new PutResult(record));
}
return future;
}

@Override
public CompletableFuture<List<StaleNamedBlob>> pullStaleBlobs() {
CompletableFuture<List<StaleNamedBlob>> future = new CompletableFuture<>();
Expand Down Expand Up @@ -278,6 +299,6 @@ private void setException(Exception exception) {

@Override
public void close() throws IOException {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,31 @@ public void testPutGetListDeleteSequence() throws Exception {
}
}

/**
* Test ttl update for named blob.
* @throws Exception
*/
@Test
public void testUpdateTtlForNamedBlob() throws Exception {
Account account = accountService.getAllAccounts().iterator().next();
Container container = account.getAllContainers().iterator().next();
String blobId = getBlobId(account, container);
String blobName = "blobName";
long expirationTime = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
NamedBlobRecord record =
new NamedBlobRecord(account.getName(), container.getName(), blobName, blobId, expirationTime);
namedBlobDb.put(record).get();
time.setCurrentMilliseconds(System.currentTimeMillis());
PutResult ttlUpdateResult = namedBlobDb.ttlUpdate(record, NamedBlobState.IN_PROGRESS).get();
record.setVersion(ttlUpdateResult.getInsertedRecord().getVersion());
time.setCurrentMilliseconds(System.currentTimeMillis());
namedBlobDb.updateBlobStateToReady(record).get();
time.setCurrentMilliseconds(System.currentTimeMillis());
NamedBlobRecord recordFromDb = namedBlobDb.get(account.getName(), container.getName(), blobName).get();
assertEquals("Mismatch on blob version", ttlUpdateResult.getInsertedRecord().getVersion(), recordFromDb.getVersion());
assertEquals("Mismatch on blob expiration time", Utils.Infinite_Time, recordFromDb.getExpirationTimeMs());
}

/**
* Test behavior with expired blobs
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,31 @@ private void applySoftDelete(short accountId, short containerId, String blobName
}
}

@Override
public CompletableFuture<PutResult> ttlUpdate(NamedBlobRecord record, NamedBlobState state) {
return executeTransactionAsync(record.getAccountName(), record.getContainerName(), true,
(accountId, containerId, connection) -> {
long startTime = this.time.milliseconds();
// Do upsert when it's using new table and 'x-ambry-named-upsert' header is not set to false (default is true)
logger.trace("NamedBlobPutInfo: accountId='{}', containerId='{}', blobName='{}'", accountId, containerId,
record.getBlobName());
//for ttl update, get the blob id first before insert a new record.
NamedBlobRecord recordCurrent;
try {
recordCurrent =
run_get_v2(record.getAccountName(), record.getContainerName(), record.getBlobName(), GetOption.None,
accountId, containerId, connection);
} catch (RestServiceException e) {
throw buildException("Failed to do ttl update due to not able to found existing record", e.getErrorCode(),
record.getAccountName(), record.getContainerName(), record.getBlobName());
}
record.setBlobId(recordCurrent.getBlobId());
PutResult putResult = run_put_v2(record, state, accountId, containerId, connection);
metricsRecoder.namedBlobPutTimeInMs.update(this.time.milliseconds() - startTime);
return putResult;
}, null);
}

/**
* Build the version for Named Blob row based on timestamp and uuid postfix.
* @return a long number whose rightmost 5 digits are uuid postfix, and the remaining digits are current timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.github.ambry.rest.RestServiceErrorCode;
import com.github.ambry.rest.RestServiceException;
import com.github.ambry.utils.TestUtils;
import com.github.ambry.utils.Utils;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
Expand Down Expand Up @@ -164,6 +165,25 @@ public void testUpdateBlobStateToReady() throws Exception {
assertEquals("Blob Id is not matched with the record", id, namedBlobRecord.getBlobId());
}

/**
* Test update ttl for named blob.
* @throws Exception
*/
@Test
public void testUpdateTtlForNamedBlob() throws Exception {
dataSourceFactory.setLocalDatacenter(foundDatacenter);
dataSourceFactory.triggerDataResultSet(datacenters);
long expirationTimeMs = System.currentTimeMillis() + 3600;
NamedBlobRecord record = new NamedBlobRecord(account.getName(), container.getName(), "blobName", id, expirationTimeMs);
namedBlobDb.put(record, NamedBlobState.READY, true).get();
assertEquals("Mismatch on expiration time", expirationTimeMs , record.getExpirationTimeMs());
PutResult putResult = namedBlobDb.ttlUpdate(record, NamedBlobState.IN_PROGRESS).get();
record.setVersion(putResult.getInsertedRecord().getVersion());
namedBlobDb.updateBlobStateToReady(record).get();
record = namedBlobDb.get(account.getName(), container.getName(), "blobName").get();
assertEquals("Mismatch on expiration time", Utils.Infinite_Time , record.getExpirationTimeMs());
}

/**
* @param callable an async call, where the {@link Future} is expected to be completed with an exception.
* @param errorCode the expected {@link RestServiceErrorCode}.
Expand Down

0 comments on commit 90d2f5c

Please sign in to comment.