Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Sophie Guo committed Aug 18, 2023
1 parent a6b696c commit 8b70ca8
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 107 deletions.
13 changes: 2 additions & 11 deletions ambry-api/src/main/java/com/github/ambry/named/NamedBlobDb.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,6 @@ default CompletableFuture<NamedBlobRecord> get(String accountName, String contai
return get(accountName, containerName, blobName, GetOption.None);
}

/**
* Support ttl update for {@link NamedBlobRecord}
* @param record the {@link NamedBlobRecord}
* @param state the {@link NamedBlobState}
* @return a {@link CompletableFuture} that will eventually contain either the {@link NamedBlobRecord} for the named
* blob or an exception if an error occurred.
*/
CompletableFuture<PutResult> ttlUpdate(NamedBlobRecord record, NamedBlobState state);

/**
* List blobs that start with a provided prefix in a container. This returns paginated results. If there are
* additional pages to read, {@link Page#getNextPageToken()} will be non null.
Expand Down Expand Up @@ -95,12 +86,12 @@ default CompletableFuture<PutResult> put(NamedBlobRecord record) {
}

/**
* Update a {@link NamedBlobRecord}'s state to READY in the database.
* Update a {@link NamedBlobRecord}'s state to READY and ttl to permanent in the database.
* @param record the {@link NamedBlobRecord}
* @return a {@link CompletableFuture} that will eventually contain a {@link PutResult} or an exception if an error
* occurred.
*/
CompletableFuture<PutResult> updateBlobStateToReady(NamedBlobRecord record);
CompletableFuture<PutResult> updateBlobTtlAndStateToReady(NamedBlobRecord record);

/**
* Delete a record for a blob in the database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ private Callback<Void> routerTtlUpdateCallback(BlobInfo blobInfo, String blobId)
NamedBlobPath namedBlobPath = NamedBlobPath.parse(RestUtils.getRequestPath(restRequest), restRequest.getArgs());
NamedBlobRecord record = new NamedBlobRecord(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(),
namedBlobPath.getBlobName(), blobIdClean, Utils.Infinite_Time, namedBlobVersion);
namedBlobDb.updateBlobStateToReady(record).get();
namedBlobDb.updateBlobTtlAndStateToReady(record).get();
if (RestUtils.isDatasetVersionQueryEnabled(restRequest.getArgs())) {
//Make sure to process response after delete finished
updateVersionStateAndDeleteDatasetVersionOutOfRetentionCount(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ public void testRetentionCountLogic() throws Exception {
blobNameNew)).thenReturn(CompletableFuture.completedFuture(new DeleteResult(blobIdFromRouter, false)));
when(namedBlobDb.get(namedBlobRecord.getAccountName(), namedBlobRecord.getContainerName(), blobNameNew,
GetOption.None)).thenReturn(CompletableFuture.completedFuture(namedBlobRecord));
when(namedBlobDb.updateBlobStateToReady(any())).thenReturn(
when(namedBlobDb.updateBlobTtlAndStateToReady(any())).thenReturn(
CompletableFuture.completedFuture(new PutResult(namedBlobRecord)));

doOperation(restRequest, restResponseChannel);
Expand Down Expand Up @@ -701,7 +701,7 @@ public void testRetentionCountLogic() throws Exception {
blobNameNew1)).thenReturn(CompletableFuture.completedFuture(new DeleteResult(blobIdFromRouter1, false)));
when(namedBlobDb.get(namedBlobRecord1.getAccountName(), namedBlobRecord1.getContainerName(), blobNameNew1,
GetOption.None)).thenReturn(CompletableFuture.completedFuture(namedBlobRecord1));
when(namedBlobDb.updateBlobStateToReady(any())).thenReturn(
when(namedBlobDb.updateBlobTtlAndStateToReady(any())).thenReturn(
CompletableFuture.completedFuture(new PutResult(namedBlobRecord1)));

doOperation(restRequest, restResponseChannel);
Expand Down Expand Up @@ -736,7 +736,7 @@ public void testRetentionCountLogic() throws Exception {
blobNameNew2)).thenReturn(CompletableFuture.completedFuture(new DeleteResult(blobIdFromRouter2, false)));
when(namedBlobDb.get(namedBlobRecord2.getAccountName(), namedBlobRecord2.getContainerName(), blobNameNew2,
GetOption.None)).thenReturn(CompletableFuture.completedFuture(namedBlobRecord2));
when(namedBlobDb.updateBlobStateToReady(any())).thenReturn(
when(namedBlobDb.updateBlobTtlAndStateToReady(any())).thenReturn(
CompletableFuture.completedFuture(new PutResult(namedBlobRecord2)));

doOperation(restRequest, restResponseChannel);
Expand Down Expand Up @@ -771,7 +771,7 @@ public void testRetentionCountLogic() throws Exception {
blobNameNew3)).thenReturn(CompletableFuture.completedFuture(new DeleteResult(blobIdFromRouter3, false)));
when(namedBlobDb.get(namedBlobRecord3.getAccountName(), namedBlobRecord3.getContainerName(), blobNameNew3,
GetOption.None)).thenReturn(CompletableFuture.completedFuture(namedBlobRecord3));
when(namedBlobDb.updateBlobStateToReady(any())).thenReturn(
when(namedBlobDb.updateBlobTtlAndStateToReady(any())).thenReturn(
CompletableFuture.completedFuture(new PutResult(namedBlobRecord3)));
doOperation(restRequest, restResponseChannel);
assertEquals("Mismatch on response status", ResponseStatus.Created, restResponseChannel.getStatus());
Expand Down Expand Up @@ -819,7 +819,7 @@ public void testRetentionCountLogic() throws Exception {
blobNameNew4)).thenReturn(CompletableFuture.completedFuture(new DeleteResult(blobIdFromRouter4, false)));
when(namedBlobDb.get(namedBlobRecord4.getAccountName(), namedBlobRecord4.getContainerName(), blobNameNew4,
GetOption.None)).thenReturn(CompletableFuture.completedFuture(namedBlobRecord4));
when(namedBlobDb.updateBlobStateToReady(any())).thenReturn(
when(namedBlobDb.updateBlobTtlAndStateToReady(any())).thenReturn(
CompletableFuture.completedFuture(new PutResult(namedBlobRecord4)));
doOperation(restRequest, restResponseChannel);
assertEquals("Mismatch on response status", ResponseStatus.Created, restResponseChannel.getStatus());
Expand Down Expand Up @@ -944,7 +944,7 @@ public void testPutSucceedWhenDeleteOutOfRetentionFailed() throws Exception {
blobNameNew)).thenReturn(CompletableFuture.completedFuture(new DeleteResult(blobIdFromRouter, false)));
when(namedBlobDb.get(namedBlobRecord.getAccountName(), namedBlobRecord.getContainerName(), blobNameNew,
GetOption.None)).thenReturn(CompletableFuture.completedFuture(namedBlobRecord));
when(namedBlobDb.updateBlobStateToReady(any())).thenReturn(
when(namedBlobDb.updateBlobTtlAndStateToReady(any())).thenReturn(
CompletableFuture.completedFuture(new PutResult(namedBlobRecord)));

doOperation(restRequest, restResponseChannel);
Expand Down Expand Up @@ -980,7 +980,7 @@ public void testPutSucceedWhenDeleteOutOfRetentionFailed() throws Exception {
blobNameNew1)).thenThrow(new RuntimeException());
when(namedBlobDb.get(namedBlobRecord1.getAccountName(), namedBlobRecord1.getContainerName(), blobNameNew1,
GetOption.None)).thenReturn(CompletableFuture.completedFuture(namedBlobRecord1));
when(namedBlobDb.updateBlobStateToReady(any())).thenReturn(
when(namedBlobDb.updateBlobTtlAndStateToReady(any())).thenReturn(
CompletableFuture.completedFuture(new PutResult(namedBlobRecord1)));

doOperation(restRequest, restResponseChannel);
Expand Down Expand Up @@ -1015,7 +1015,7 @@ public void testPutSucceedWhenDeleteOutOfRetentionFailed() throws Exception {
blobNameNew2)).thenReturn(CompletableFuture.completedFuture(new DeleteResult(blobIdFromRouter2, false)));
when(namedBlobDb.get(namedBlobRecord2.getAccountName(), namedBlobRecord2.getContainerName(), blobNameNew2,
GetOption.None)).thenReturn(CompletableFuture.completedFuture(namedBlobRecord2));
when(namedBlobDb.updateBlobStateToReady(any())).thenReturn(
when(namedBlobDb.updateBlobTtlAndStateToReady(any())).thenReturn(
CompletableFuture.completedFuture(new PutResult(namedBlobRecord2)));

doOperation(restRequest, restResponseChannel);
Expand Down Expand Up @@ -1050,7 +1050,7 @@ public void testPutSucceedWhenDeleteOutOfRetentionFailed() throws Exception {
blobNameNew3)).thenReturn(CompletableFuture.completedFuture(new DeleteResult(blobIdFromRouter3, false)));
when(namedBlobDb.get(namedBlobRecord3.getAccountName(), namedBlobRecord3.getContainerName(), blobNameNew3,
GetOption.None)).thenReturn(CompletableFuture.completedFuture(namedBlobRecord3));
when(namedBlobDb.updateBlobStateToReady(any())).thenReturn(
when(namedBlobDb.updateBlobTtlAndStateToReady(any())).thenReturn(
CompletableFuture.completedFuture(new PutResult(namedBlobRecord3)));
doOperation(restRequest, restResponseChannel);
assertEquals("Mismatch on response status", ResponseStatus.Created, restResponseChannel.getStatus());
Expand Down Expand Up @@ -1098,7 +1098,7 @@ public void testPutSucceedWhenDeleteOutOfRetentionFailed() throws Exception {
blobNameNew4)).thenReturn(CompletableFuture.completedFuture(new DeleteResult(blobIdFromRouter4, false)));
when(namedBlobDb.get(namedBlobRecord4.getAccountName(), namedBlobRecord4.getContainerName(), blobNameNew4,
GetOption.None)).thenReturn(CompletableFuture.completedFuture(namedBlobRecord4));
when(namedBlobDb.updateBlobStateToReady(any())).thenReturn(
when(namedBlobDb.updateBlobTtlAndStateToReady(any())).thenReturn(
CompletableFuture.completedFuture(new PutResult(namedBlobRecord4)));
doOperation(restRequest, restResponseChannel);
assertEquals("Mismatch on response status", ResponseStatus.Created, restResponseChannel.getStatus());
Expand Down Expand Up @@ -1354,7 +1354,7 @@ public void addAndGetDatasetVersionTest() throws Exception {
CompletableFuture.completedFuture(new DeleteResult(blobIdFromRouter, false)));
when(namedBlobDb.get(namedBlobRecord.getAccountName(), namedBlobRecord.getContainerName(),
blobName, null)).thenReturn(CompletableFuture.completedFuture(namedBlobRecord));
when(namedBlobDb.updateBlobStateToReady(any())).thenReturn(
when(namedBlobDb.updateBlobTtlAndStateToReady(any())).thenReturn(
CompletableFuture.completedFuture(new PutResult(namedBlobRecord)));

// Issue put dataset version request, should not throw null point exception.
Expand Down Expand Up @@ -1398,7 +1398,7 @@ public void addAndGetDatasetVersionTest() throws Exception {
CompletableFuture.completedFuture(new DeleteResult(blobIdFromRouter1, false)));
when(namedBlobDb.get(namedBlobRecord1.getAccountName(), namedBlobRecord1.getContainerName(),
blobName1, null)).thenReturn(CompletableFuture.completedFuture(namedBlobRecord1));
when(namedBlobDb.updateBlobStateToReady(any())).thenReturn(
when(namedBlobDb.updateBlobTtlAndStateToReady(any())).thenReturn(
CompletableFuture.completedFuture(new PutResult(namedBlobRecord1)));

doOperation(restRequest, restResponseChannel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,7 @@ public CompletableFuture<PutResult> put(NamedBlobRecord record, NamedBlobState s
}

@Override
public CompletableFuture<PutResult> ttlUpdate(NamedBlobRecord record, NamedBlobState state) {
//TODO: implement later
return null;
}

@Override
public CompletableFuture<PutResult> updateBlobStateToReady(NamedBlobRecord record) {
public CompletableFuture<PutResult> updateBlobTtlAndStateToReady(NamedBlobRecord record) {
CompletableFuture<PutResult> future = new CompletableFuture<>();
if (exception != null) {
future.completeExceptionally(exception);
Expand Down Expand Up @@ -285,6 +279,5 @@ private void setException(Exception exception) {

@Override
public void close() throws IOException {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,31 +199,6 @@ public void testPutGetListDeleteSequence() throws Exception {
}
}

/**
* Test ttl update for named blob.
* @throws Exception
*/
@Test
public void testUpdateTtlForNamedBlob() throws Exception {
Account account = accountService.getAllAccounts().iterator().next();
Container container = account.getAllContainers().iterator().next();
String blobId = getBlobId(account, container);
String blobName = "blobName";
long expirationTime = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
NamedBlobRecord record =
new NamedBlobRecord(account.getName(), container.getName(), blobName, blobId, expirationTime);
namedBlobDb.put(record).get();
time.setCurrentMilliseconds(System.currentTimeMillis());
PutResult ttlUpdateResult = namedBlobDb.ttlUpdate(record, NamedBlobState.IN_PROGRESS).get();
record.setVersion(ttlUpdateResult.getInsertedRecord().getVersion());
time.setCurrentMilliseconds(System.currentTimeMillis());
namedBlobDb.updateBlobStateToReady(record).get();
time.setCurrentMilliseconds(System.currentTimeMillis());
NamedBlobRecord recordFromDb = namedBlobDb.get(account.getName(), container.getName(), blobName).get();
assertEquals("Mismatch on blob version", ttlUpdateResult.getInsertedRecord().getVersion(), recordFromDb.getVersion());
assertEquals("Mismatch on blob expiration time", Utils.Infinite_Time, recordFromDb.getExpirationTimeMs());
}

/**
* Test behavior with expired blobs
*/
Expand Down Expand Up @@ -579,7 +554,7 @@ public void testCleanupBlobGoodCase5() throws Exception {
time.setCurrentMilliseconds(staleCutoffTime);
PutResult putResult = namedBlobDb.put(record, NamedBlobState.IN_PROGRESS, false).get();
record.setVersion(putResult.getInsertedRecord().getVersion());
namedBlobDb.updateBlobStateToReady(record).get();
namedBlobDb.updateBlobTtlAndStateToReady(record).get();

List<StaleNamedBlob> staleNamedBlobs = namedBlobDb.pullStaleBlobs().get();

Expand Down Expand Up @@ -611,7 +586,7 @@ public void testCleanupBlobGoodCase6() throws Exception {
PutResult putResult = namedBlobDb.put(record, NamedBlobState.IN_PROGRESS, true).get();
checkErrorCode(() -> namedBlobDb.get(record.getAccountName(), record.getContainerName(), record.getBlobName()),
RestServiceErrorCode.NotFound);
PutResult updateResult = namedBlobDb.updateBlobStateToReady(record).get();
PutResult updateResult = namedBlobDb.updateBlobTtlAndStateToReady(record).get();

List<StaleNamedBlob> staleNamedBlobs = namedBlobDb.pullStaleBlobs().get();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,11 @@ public CompletableFuture<PutResult> put(NamedBlobRecord record, NamedBlobState s
}

@Override
public CompletableFuture<PutResult> updateBlobStateToReady(NamedBlobRecord record) {
public CompletableFuture<PutResult> updateBlobTtlAndStateToReady(NamedBlobRecord record) {
return executeTransactionAsync(record.getAccountName(), record.getContainerName(), true,
(accountId, containerId, connection) -> {
long startTime = this.time.milliseconds();
logger.trace("Updating to READY for Named Blob: {}", record);
logger.trace("Updating ttl and status to READY for Named Blob: {}", record);
PutResult result = apply_ttl_update(record, accountId, containerId, connection);
metricsRecoder.namedTtlupdateTimeInMs.update(this.time.milliseconds() - startTime);
return result;
Expand Down Expand Up @@ -658,7 +658,7 @@ private PutResult run_put_v2(NamedBlobRecord record, NamedBlobState state, short
}

private PutResult apply_ttl_update(NamedBlobRecord record, short accountId, short containerId, Connection connection)
throws Exception{
throws Exception {
String query = "";
try (PreparedStatement statement = connection.prepareStatement(TTL_UPDATE_QUERY)) {
statement.setInt(1, accountId);
Expand Down Expand Up @@ -764,30 +764,6 @@ private void applySoftDelete(short accountId, short containerId, String blobName
}
}

@Override
public CompletableFuture<PutResult> ttlUpdate(NamedBlobRecord record, NamedBlobState state) {
return executeTransactionAsync(record.getAccountName(), record.getContainerName(), true,
(accountId, containerId, connection) -> {
long startTime = this.time.milliseconds();
logger.trace("NamedBlobPutInfo: accountId='{}', containerId='{}', blobName='{}'", accountId, containerId,
record.getBlobName());
//for ttl update, get the blob id first before insert a new record.
NamedBlobRecord recordCurrent;
try {
recordCurrent =
run_get_v2(record.getAccountName(), record.getContainerName(), record.getBlobName(), GetOption.None,
accountId, containerId, connection);
} catch (RestServiceException e) {
throw buildException("Failed to do ttl update due to not able to found existing record", e.getErrorCode(),
record.getAccountName(), record.getContainerName(), record.getBlobName());
}
record.setBlobId(recordCurrent.getBlobId());
PutResult putResult = run_put_v2(record, state, accountId, containerId, connection);
metricsRecoder.namedBlobPutTimeInMs.update(this.time.milliseconds() - startTime);
return putResult;
}, null);
}

/**
* Build the version for Named Blob row based on timestamp and uuid postfix.
* @return a long number whose rightmost 5 digits are uuid postfix, and the remaining digits are current timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,31 +159,12 @@ public void testUpdateBlobStateToReady() throws Exception {
PutResult putResult = namedBlobDb.put(record, NamedBlobState.IN_PROGRESS, true).get();

record.setVersion(putResult.getInsertedRecord().getVersion());
namedBlobDb.updateBlobStateToReady(record).get();
namedBlobDb.updateBlobTtlAndStateToReady(record).get();

NamedBlobRecord namedBlobRecord = namedBlobDb.get(account.getName(), container.getName(), "blobName").get();
assertEquals("Blob Id is not matched with the record", id, namedBlobRecord.getBlobId());
}

/**
* Test update ttl for named blob.
* @throws Exception
*/
@Test
public void testUpdateTtlForNamedBlob() throws Exception {
dataSourceFactory.setLocalDatacenter(foundDatacenter);
dataSourceFactory.triggerDataResultSet(datacenters);
long expirationTimeMs = System.currentTimeMillis() + 3600;
NamedBlobRecord record = new NamedBlobRecord(account.getName(), container.getName(), "blobName", id, expirationTimeMs);
namedBlobDb.put(record, NamedBlobState.READY, true).get();
assertEquals("Mismatch on expiration time", expirationTimeMs , record.getExpirationTimeMs());
PutResult putResult = namedBlobDb.ttlUpdate(record, NamedBlobState.IN_PROGRESS).get();
record.setVersion(putResult.getInsertedRecord().getVersion());
namedBlobDb.updateBlobStateToReady(record).get();
record = namedBlobDb.get(account.getName(), container.getName(), "blobName").get();
assertEquals("Mismatch on expiration time", Utils.Infinite_Time , record.getExpirationTimeMs());
}

/**
* @param callable an async call, where the {@link Future} is expected to be completed with an exception.
* @param errorCode the expected {@link RestServiceErrorCode}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ public void run() {
System.out.println("PerformanceTestWorker " + id + " finishes writing " + numberOfPuts + " records");

for (NamedBlobRecord record : allRecords) {
namedBlobDb.updateBlobStateToReady(record).get();
namedBlobDb.updateBlobTtlAndStateToReady(record).get();
}
System.out.println("PerformanceTestWorker " + id + " finishes updating " + numberOfPuts + " records");

Expand Down

0 comments on commit 8b70ca8

Please sign in to comment.