Skip to content

Commit

Permalink
[Transform] automatic deletion of old checkpoints (elastic#49496)
Browse files Browse the repository at this point in the history
add automatic deletion of old checkpoints based on count and time
  • Loading branch information
Hendrik Muhs authored and SivagurunathanV committed Jan 21, 2020
1 parent f4368c3 commit cf952e7
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class TransformCheckpoint implements Writeable, ToXContentObject {
// checkpoint of the indexes (sequence id's)
public static final ParseField INDICES = new ParseField("indices");

private static final String NAME = "data_frame_transform_checkpoint";
public static final String NAME = "data_frame_transform_checkpoint";

private static final ConstructingObjectParser<TransformCheckpoint, Void> STRICT_PARSER = createParser(false);
private static final ConstructingObjectParser<TransformCheckpoint, Void> LENIENT_PARSER = createParser(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
Expand All @@ -63,6 +64,7 @@
import java.util.List;
import java.util.Set;

import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

Expand Down Expand Up @@ -146,16 +148,18 @@ public void updateTransformConfiguration(

@Override
public void deleteOldTransformConfigurations(String transformId, ActionListener<Boolean> listener) {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(
DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest();
deleteByQueryRequest.indices(
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
).setQuery(
);
deleteByQueryRequest.setQuery(
QueryBuilders.constantScoreQuery(
QueryBuilders.boolQuery()
.mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME))
.filter(QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId)))
)
).setIndicesOptions(IndicesOptions.lenientExpandOpen());
);

executeAsyncWithOrigin(
client,
Expand All @@ -177,17 +181,18 @@ public void deleteOldTransformConfigurations(String transformId, ActionListener<

@Override
public void deleteOldTransformStoredDocuments(String transformId, ActionListener<Boolean> listener) {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(
DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest();
deleteByQueryRequest.indices(
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
).setQuery(
);
deleteByQueryRequest.setQuery(
QueryBuilders.constantScoreQuery(
QueryBuilders.boolQuery()
.mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME))
.filter(QueryBuilders.termQuery("_id", TransformStoredDoc.documentId(transformId)))
)
).setIndicesOptions(IndicesOptions.lenientExpandOpen());

);
executeAsyncWithOrigin(
client,
TRANSFORM_ORIGIN,
Expand All @@ -206,6 +211,41 @@ public void deleteOldTransformStoredDocuments(String transformId, ActionListener
);
}

@Override
public void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener<Long> listener) {
DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest();
deleteByQueryRequest.indices(
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
);
deleteByQueryRequest.setQuery(
QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId))
.filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformCheckpoint.NAME))
.filter(QueryBuilders.rangeQuery(TransformCheckpoint.CHECKPOINT.getPreferredName()).lt(deleteCheckpointsBelow))
.filter(
QueryBuilders.rangeQuery(TransformField.TIMESTAMP_MILLIS.getPreferredName()).lt(deleteOlderThan).format("epoch_millis")
)
);
logger.debug("Deleting old checkpoints using {}", deleteByQueryRequest.getSearchRequest());
executeAsyncWithOrigin(
client,
TRANSFORM_ORIGIN,
DeleteByQueryAction.INSTANCE,
deleteByQueryRequest,
ActionListener.wrap(response -> {
if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) {
Tuple<RestStatus, Throwable> statusAndReason = getStatusAndReason(response);
listener.onFailure(
new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2())
);
return;
}
listener.onResponse(response.getDeleted());
}, listener::onFailure)
);
}

private void putTransformConfiguration(
TransformConfig transformConfig,
DocWriteRequest.OpType optType,
Expand Down Expand Up @@ -419,9 +459,7 @@ public void expandTransformIds(

@Override
public void deleteTransform(String transformId, ActionListener<Boolean> listener) {
DeleteByQueryRequest request = new DeleteByQueryRequest().setAbortOnVersionConflict(false); // since these documents are not
// updated, a conflict just means it was
// deleted previously
DeleteByQueryRequest request = createDeleteByQueryRequest();

request.indices(TransformInternalIndexConstants.INDEX_NAME_PATTERN, TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED);
QueryBuilder query = QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId);
Expand Down Expand Up @@ -675,4 +713,22 @@ private static Tuple<RestStatus, Throwable> getStatusAndReason(final BulkByScrol
}
return new Tuple<>(status, reason);
}

/**
* Create DBQ request with good defaults
*
* @return new DeleteByQueryRequest with some defaults set
*/
private static DeleteByQueryRequest createDeleteByQueryRequest() {

DeleteByQueryRequest deleteByQuery = new DeleteByQueryRequest();

deleteByQuery.setAbortOnVersionConflict(false)
.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
.setIndicesOptions(IndicesOptions.lenientExpandOpen());

// disable scoring by using index order
deleteByQuery.getSearchRequest().source().sort(SINGLE_MAPPING_NAME);
return deleteByQuery;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ void updateTransformConfiguration(
*/
void deleteOldTransformStoredDocuments(String transformId, ActionListener<Boolean> listener);

/**
* This deletes stored checkpoint documents for the given transformId, based on number and age.
*
* Both criteria MUST apply for the deletion to happen.
*
* @param transformId The transform ID referenced by the documents
* @param deleteCheckpointsBelow checkpoints lower than this to delete
* @param deleteOlderThan checkpoints older than this to delete
* @param listener listener to alert on completion, returning number of deleted checkpoints
*/
void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener<Long> listener);

/**
* Get a stored checkpoint, requires the transform id as well as the checkpoint id
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
Expand All @@ -54,8 +55,9 @@ public final class TransformInternalIndex {
* progress::docs_processed, progress::docs_indexed,
* stats::exponential_avg_checkpoint_duration_ms, stats::exponential_avg_documents_indexed,
* stats::exponential_avg_documents_processed
*
* version 3 (7.5): rename to .transform-internal-xxx
* version 4 (7.6): state::should_stop_at_checkpoint
* checkpoint::checkpoint
*/

// constants for mappings
Expand All @@ -77,25 +79,29 @@ public final class TransformInternalIndex {

public static IndexTemplateMetaData getIndexTemplateMetaData() throws IOException {
IndexTemplateMetaData transformTemplate = IndexTemplateMetaData.builder(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)
.patterns(Collections.singletonList(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME))
.version(Version.CURRENT.id)
.settings(Settings.builder()
// the configurations are expected to be small
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1"))
.putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(mappings()))
.build();
.patterns(Collections.singletonList(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME))
.version(Version.CURRENT.id)
.settings(
Settings.builder()
// the configurations are expected to be small
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
)
.putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(mappings()))
.build();
return transformTemplate;
}

public static IndexTemplateMetaData getAuditIndexTemplateMetaData() throws IOException {
IndexTemplateMetaData transformTemplate = IndexTemplateMetaData.builder(TransformInternalIndexConstants.AUDIT_INDEX)
.patterns(Collections.singletonList(TransformInternalIndexConstants.AUDIT_INDEX_PREFIX + "*"))
.version(Version.CURRENT.id)
.settings(Settings.builder()
// the audits are expected to be small
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1"))
.settings(
Settings.builder()
// the audits are expected to be small
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
)
.putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(auditMappings()))
.putAlias(AliasMetaData.builder(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS))
.build();
Expand All @@ -107,26 +113,27 @@ private static XContentBuilder auditMappings() throws IOException {
builder.startObject(SINGLE_MAPPING_NAME);
addMetaInformation(builder);
builder.field(DYNAMIC, "false");
builder.startObject(PROPERTIES)
.startObject(TRANSFORM_ID)
.field(TYPE, KEYWORD)
.endObject()
.startObject(AbstractAuditMessage.LEVEL.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AbstractAuditMessage.MESSAGE.getPreferredName())
.field(TYPE, TEXT)
.startObject(FIELDS)
.startObject(RAW)
.field(TYPE, KEYWORD)
.endObject()
.endObject()
builder
.startObject(PROPERTIES)
.startObject(TRANSFORM_ID)
.field(TYPE, KEYWORD)
.endObject()
.startObject(AbstractAuditMessage.LEVEL.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AbstractAuditMessage.MESSAGE.getPreferredName())
.field(TYPE, TEXT)
.startObject(FIELDS)
.startObject(RAW)
.field(TYPE, KEYWORD)
.endObject()
.endObject()
.endObject()
.startObject(AbstractAuditMessage.TIMESTAMP.getPreferredName())
.field(TYPE, DATE)
.field(TYPE, DATE)
.endObject()
.startObject(AbstractAuditMessage.NODE_NAME.getPreferredName())
.field(TYPE, KEYWORD)
.field(TYPE, KEYWORD)
.endObject()
.endObject()
.endObject()
Expand Down Expand Up @@ -167,7 +174,6 @@ public static XContentBuilder mappings(XContentBuilder builder) throws IOExcepti
return builder;
}


private static XContentBuilder addTransformStoredDocMappings(XContentBuilder builder) throws IOException {
return builder
.startObject(TransformStoredDoc.STATE_FIELD.getPreferredName())
Expand Down Expand Up @@ -254,9 +260,6 @@ private static XContentBuilder addTransformStoredDocMappings(XContentBuilder bui
.endObject()
.endObject()
.endObject();
// This is obsolete and can be removed for future versions of the index, but is left here as a warning/reminder that
// we cannot declare this field differently in version 1 of the internal index as it would cause a mapping clash
// .startObject("checkpointing").field(ENABLED, false).endObject();
}

public static XContentBuilder addTransformsConfigMappings(XContentBuilder builder) throws IOException {
Expand Down Expand Up @@ -299,6 +302,9 @@ private static XContentBuilder addTransformCheckpointMappings(XContentBuilder bu
.endObject()
.startObject(TransformField.TIME_UPPER_BOUND_MILLIS.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(TransformCheckpoint.CHECKPOINT.getPreferredName())
.field(TYPE, LONG)
.endObject();
}

Expand All @@ -310,9 +316,7 @@ private static XContentBuilder addTransformCheckpointMappings(XContentBuilder bu
* @throws IOException On write error
*/
private static XContentBuilder addMetaInformation(XContentBuilder builder) throws IOException {
return builder.startObject("_meta")
.field("version", Version.CURRENT)
.endObject();
return builder.startObject("_meta").field("version", Version.CURRENT).endObject();
}

/**
Expand Down
Loading

0 comments on commit cf952e7

Please sign in to comment.