Skip to content

Commit

Permalink
Switch to a callback-based approach
Browse files Browse the repository at this point in the history
  • Loading branch information
gwbrown committed Aug 14, 2019
1 parent 4f22069 commit 14e4b6e
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ public static SnapshotHistoryItem deletionSuccessRecord(long timestamp, String s
return new SnapshotHistoryItem(timestamp, policyId, repository, snapshotName, DELETE_OPERATION, true, null, null);
}

public static SnapshotHistoryItem deletionPossibleSuccessRecord(long timestamp, String snapshotName, String policyId, String repository, String details) {
return new SnapshotHistoryItem(timestamp, policyId, repository, snapshotName, DELETE_OPERATION, true, null, details);
}

public static SnapshotHistoryItem deletionFailureRecord(long timestamp, String snapshotName, String policyId, String repository,
Exception exception) throws IOException {
String exceptionString = exceptionToString(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -256,21 +256,26 @@ void deleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDelete,

logger.info("starting snapshot retention deletion for [{}] snapshots", count);
long startTime = nowNanoSupplier.getAsLong();
int deleted = 0;
int failed = 0;
final AtomicInteger deleted = new AtomicInteger(0);
final AtomicInteger failed = new AtomicInteger(0);
for (Map.Entry<String, List<SnapshotInfo>> entry : snapshotsToDelete.entrySet()) {
String repo = entry.getKey();
List<SnapshotInfo> snapshots = entry.getValue();
for (SnapshotInfo info : snapshots) {
Optional<SnapshotHistoryItem> result = deleteSnapshot(getPolicyId(info), repo, info, slmStats);
if (result.isPresent()) {
if (result.get().isSuccess()) {
deleted++;
deleteSnapshot(getPolicyId(info), repo, info, slmStats, historyItem -> {
// This would be nicer if we could use ifPresentOrElse
historyItem.ifPresent(item -> {
if (item.isSuccess()) {
deleted.incrementAndGet();
} else {
failed.incrementAndGet();
}
historyStore.putAsync(item);
});
if (historyItem.isEmpty()) {
failed.incrementAndGet();
}
historyStore.putAsync(result.get());
} else {
failed++;
}
});
// Check whether we have exceeded the maximum time allowed to spend deleting
// snapshots, if we have, short-circuit the rest of the deletions
TimeValue elapsedDeletionTime = TimeValue.timeValueNanos(nowNanoSupplier.getAsLong() - startTime);
Expand All @@ -292,24 +297,29 @@ void deleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDelete,

/**
* Delete the given snapshot from the repository in blocking manner
* @param repo The repository the snapshot is in
* @param snapshot The snapshot metadata
* @return If present, a SnapshotHistoryItem containing the results of the deletion. Empty if no response or interrupted.
*
* @param repo The repository the snapshot is in
* @param snapshot The snapshot metadata
* @param onCompletion A callback taking info on the history of the snapshot. If present, a SnapshotHistoryItem containing the results
* of the deletion. Empty if interrupted or failed to serialize exception.
*/
Optional<SnapshotHistoryItem> deleteSnapshot(String slmPolicy, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats) {
void deleteSnapshot(String slmPolicy, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats,
Consumer<Optional<SnapshotHistoryItem>> onCompletion) {
logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, snapshot.snapshotId());
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SnapshotHistoryItem> result = new AtomicReference<>();
client.admin().cluster().prepareDeleteSnapshot(repo, snapshot.snapshotId().getName())
.execute(new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
if (acknowledgedResponse.isAcknowledged()) {
logger.debug("[{}] snapshot [{}] deleted successfully", repo, snapshot.snapshotId());
result.set(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(),
snapshot.snapshotId().getName(), slmPolicy, repo));
onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(),
snapshot.snapshotId().getName(), slmPolicy, repo)));
} else {
logger.warn("[{}] snapshot [{}] delete issued but the request was not acknowledged", repo, snapshot.snapshotId());
onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionPossibleSuccessRecord(Instant.now().toEpochMilli(),
snapshot.snapshotId().getName(), slmPolicy, repo,
"deletion request issued successfully, no acknowledgement received")));
}
slmStats.snapshotDeleted(slmPolicy);
}
Expand All @@ -318,16 +328,19 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) {
public void onFailure(Exception e) {
logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention",
repo, snapshot.snapshotId()), e);
slmStats.snapshotDeleteFailure(slmPolicy);
SnapshotHistoryItem result;
try {
result.set(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(),
snapshot.snapshotId().getName(), slmPolicy, repo, e));
result = SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(),
snapshot.snapshotId().getName(), slmPolicy, repo, e);
} catch (IOException ex) {
// This shouldn't happen unless there's an issue with serializing the original exception
logger.error(new ParameterizedMessage(
"failed to record snapshot creation failure for snapshot lifecycle policy [{}]",
slmPolicy), e);
result = null;
}
slmStats.snapshotDeleteFailure(slmPolicy);
onCompletion.accept(Optional.ofNullable(result));
}
}, latch));
try {
Expand All @@ -337,9 +350,9 @@ public void onFailure(Exception e) {
} catch (InterruptedException e) {
logger.error(new ParameterizedMessage("[{}] deletion of snapshot [{}] interrupted",
repo, snapshot.snapshotId()), e);
onCompletion.accept(Optional.empty());
slmStats.snapshotDeleteFailure(slmPolicy);
}
return Optional.ofNullable(result.get());
}

void updateStateWithStats(SnapshotLifecycleStats newStats) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,21 +190,20 @@ private void retentionTaskTest(final boolean deletionSuccess) throws Exception {
logger.info("--> retrieving snapshots [{}]", snaps);
return Collections.singletonMap(repoId, snaps);
},
(deletionPolicyId, repo, snapInfo, slmStats) -> {
(deletionPolicyId, repo, snapInfo, slmStats, onCompletion) -> {
logger.info("--> deleting {} from repo {}", snapInfo, repo);
deleted.add(snapInfo);
deletionLatch.countDown();
if (deletionSuccess) {
return Optional.of(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(),
snapInfo.snapshotId().getName(), policy.getId(), repo));
onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(),
snapInfo.snapshotId().getName(), policy.getId(), repo)));
} else {
try {
return Optional.of(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(),
snapInfo.snapshotId().getName(), policy.getId(), repo, new RuntimeException("deletion_failed")));
onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(),
snapInfo.snapshotId().getName(), policy.getId(), repo, new RuntimeException("deletion_failed"))));
} catch (IOException e) {
logger.error(e);
fail("failed to serialize an exception to json, this should never happen");
return Optional.empty(); // impossible to hit this but necessary to make the compiler happy
}
}
},
Expand Down Expand Up @@ -289,7 +288,7 @@ private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception
logger.info("--> retrieving snapshots [{}]", snaps);
return Collections.singletonMap(repoId, snaps);
},
(deletionPolicyId, repo, snapInfo, slmStats) -> {
(deletionPolicyId, repo, snapInfo, slmStats, onCompletion) -> {
logger.info("--> deleting {}", snapInfo.snapshotId());
// Don't pause until snapshot 2
if (snapInfo.snapshotId().equals(snap2.snapshotId())) {
Expand All @@ -299,16 +298,15 @@ private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception
deleted.add(snapInfo.snapshotId());
deletionLatch.countDown();
if (deletionSuccess) {
return Optional.of(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(),
snapInfo.snapshotId().getName(), policy.getId(), repo));
onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(),
snapInfo.snapshotId().getName(), policy.getId(), repo)));
} else {
try {
return Optional.of(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(),
snapInfo.snapshotId().getName(), policy.getId(), repo, new RuntimeException("deletion_failed")));
onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(),
snapInfo.snapshotId().getName(), policy.getId(), repo, new RuntimeException("deletion_failed"))));
} catch (IOException e) {
logger.error(e);
fail("failed to serialize an exception to json, this should never happen");
return Optional.empty(); // impossible to hit this but necessary to make the compiler happy
}
}
},
Expand Down Expand Up @@ -377,13 +375,15 @@ void getAllSuccessfulSnapshots(Collection<String> repositories,
}

@Override
Optional<SnapshotHistoryItem> deleteSnapshot(String policyId, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats) {
return deleteRunner.apply(policyId, repo, snapshot, slmStats);
void deleteSnapshot(String policyId, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats,
Consumer<Optional<SnapshotHistoryItem>> onCompletion) {
deleteRunner.apply(policyId, repo, snapshot, slmStats, onCompletion);
}
}

@FunctionalInterface
interface DeleteSnapshotMock {
Optional<SnapshotHistoryItem> apply(String policyId, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats);
void apply(String policyId, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats,
Consumer<Optional<SnapshotHistoryItem>> onCompletion);
}
}

0 comments on commit 14e4b6e

Please sign in to comment.