Skip to content

Commit

Permalink
improve variable naming, only warn/audit if cleanup fails
Browse files Browse the repository at this point in the history
  • Loading branch information
Hendrik Muhs committed Dec 2, 2019
1 parent e3656d2 commit 31bcbd6
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void deleteOldTransformStoredDocuments(String transformId, ActionListener
}

@Override
public void deleteOldCheckpoints(String transformId, long checkpointLowerBound, long lowerBoundEpochMs, ActionListener<Long> listener) {
public void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener<Long> listener) {
DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest();
deleteByQueryRequest.indices(
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
Expand All @@ -222,11 +222,9 @@ public void deleteOldCheckpoints(String transformId, long checkpointLowerBound,
QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId))
.filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformCheckpoint.NAME))
.filter(QueryBuilders.rangeQuery(TransformCheckpoint.CHECKPOINT.getPreferredName()).lt(checkpointLowerBound))
.filter(QueryBuilders.rangeQuery(TransformCheckpoint.CHECKPOINT.getPreferredName()).lt(deleteCheckpointsBelow))
.filter(
QueryBuilders.rangeQuery(TransformField.TIMESTAMP_MILLIS.getPreferredName())
.lt(lowerBoundEpochMs)
.format("epoch_millis")
QueryBuilders.rangeQuery(TransformField.TIMESTAMP_MILLIS.getPreferredName()).lt(deleteOlderThan).format("epoch_millis")
)
);
logger.debug("Deleting old checkpoints using {}", deleteByQueryRequest.getSearchRequest());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,16 @@ void updateTransformConfiguration(
void deleteOldTransformStoredDocuments(String transformId, ActionListener<Boolean> listener);

/**
* This deletes stored checkpoint documents for the given transformId, checkpoint lower bound and timestamp lower bound.
* This deletes stored checkpoint documents for the given transformId, based on number and age.
*
* Both criteria MUST apply for the deletion to happen.
*
* @param transformId The transform ID referenced by the documents
* @param checkpointLowerBound lower bound of checkpoints to be deleted
* @param lowerBoundEpochMs timestamp until checkpoints shall be deleted
* @param deleteCheckpointsBelow checkpoints lower than this to delete
* @param deleteOlderThan checkpoints older than this to delete
* @param listener listener to alert on completion, returning number of deleted checkpoints
*/
void deleteOldCheckpoints(String transformId, long checkpointLowerBound, long lowerBoundEpochMs, ActionListener<Long> listener);
void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener<Long> listener);

/**
* Get a stored checkpoint, requires the transform id as well as the checkpoint id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,8 @@ protected void onFinish(ActionListener<Void> listener) {
}

if (checkpoint - lastCheckpointCleanup > CHECKPOINT_CLEANUP_INTERVAL) {
// delete old checkpoints, this might even fail, but it will be retried in this case
// delete old checkpoints, on a failure we keep going
cleanupOldCheckpoints(listener);
lastCheckpointCleanup = checkpoint;
} else {
listener.onResponse(null);
}
Expand Down Expand Up @@ -525,7 +524,19 @@ private void cleanupOldCheckpoints(ActionListener<Void> listener) {
ActionListener.wrap(deletes -> {
logger.debug("[{}] deleted [{}] outdated checkpoints", getJobId(), deletes);
listener.onResponse(null);
}, listener::onFailure)
lastCheckpointCleanup = context.getCheckpoint();
}, e -> {
logger.warn(
new ParameterizedMessage("[{}] failed to cleanup old checkpoints, retrying after next checkpoint", getJobId()),
e
);
auditor.warning(
getJobId(),
"Failed to cleanup old checkpoints, retrying after next checkpoint. Exception: " + e.getMessage()
);

listener.onResponse(null);
})
);
} else {
logger.debug("[{}] checked for outdated checkpoints", getJobId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ public void deleteOldTransformStoredDocuments(String transformId, ActionListener
}

@Override
public void deleteOldCheckpoints(String transformId, long checkpointLowerBound, long lowerBoundEpochMs, ActionListener<Long> listener) {
public void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener<Long> listener) {
List<TransformCheckpoint> checkpointsById = checkpoints.get(transformId);
int sizeBeforeDelete = checkpointsById.size();
if (checkpointsById != null) {
checkpointsById.removeIf(cp -> { return cp.getCheckpoint() < checkpointLowerBound && cp.getTimestamp() < lowerBoundEpochMs; });
checkpointsById.removeIf(cp -> { return cp.getCheckpoint() < deleteCheckpointsBelow && cp.getTimestamp() < deleteOlderThan; });
}
listener.onResponse(Long.valueOf(sizeBeforeDelete - checkpointsById.size()));
}
Expand Down

0 comments on commit 31bcbd6

Please sign in to comment.