Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] automatic deletion of old checkpoints #49496

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class TransformCheckpoint implements Writeable, ToXContentObject {
// checkpoint of the indexes (sequence id's)
public static final ParseField INDICES = new ParseField("indices");

private static final String NAME = "data_frame_transform_checkpoint";
public static final String NAME = "data_frame_transform_checkpoint";

private static final ConstructingObjectParser<TransformCheckpoint, Void> STRICT_PARSER = createParser(false);
private static final ConstructingObjectParser<TransformCheckpoint, Void> LENIENT_PARSER = createParser(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
Expand All @@ -63,6 +64,7 @@
import java.util.List;
import java.util.Set;

import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

Expand Down Expand Up @@ -146,16 +148,18 @@ public void updateTransformConfiguration(

@Override
public void deleteOldTransformConfigurations(String transformId, ActionListener<Boolean> listener) {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(
DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest();
deleteByQueryRequest.indices(
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
).setQuery(
);
deleteByQueryRequest.setQuery(
QueryBuilders.constantScoreQuery(
QueryBuilders.boolQuery()
.mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME))
.filter(QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId)))
)
).setIndicesOptions(IndicesOptions.lenientExpandOpen());
);

executeAsyncWithOrigin(
client,
Expand All @@ -177,17 +181,18 @@ public void deleteOldTransformConfigurations(String transformId, ActionListener<

@Override
public void deleteOldTransformStoredDocuments(String transformId, ActionListener<Boolean> listener) {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(
DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest();
deleteByQueryRequest.indices(
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
).setQuery(
);
deleteByQueryRequest.setQuery(
QueryBuilders.constantScoreQuery(
QueryBuilders.boolQuery()
.mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME))
.filter(QueryBuilders.termQuery("_id", TransformStoredDoc.documentId(transformId)))
)
).setIndicesOptions(IndicesOptions.lenientExpandOpen());

);
executeAsyncWithOrigin(
client,
TRANSFORM_ORIGIN,
Expand All @@ -206,6 +211,41 @@ public void deleteOldTransformStoredDocuments(String transformId, ActionListener
);
}

@Override
public void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener<Long> listener) {
DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest();
deleteByQueryRequest.indices(
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
);
deleteByQueryRequest.setQuery(
QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId))
.filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformCheckpoint.NAME))
.filter(QueryBuilders.rangeQuery(TransformCheckpoint.CHECKPOINT.getPreferredName()).lt(deleteCheckpointsBelow))
.filter(
QueryBuilders.rangeQuery(TransformField.TIMESTAMP_MILLIS.getPreferredName()).lt(deleteOlderThan).format("epoch_millis")
)
);
logger.debug("Deleting old checkpoints using {}", deleteByQueryRequest.getSearchRequest());
executeAsyncWithOrigin(
client,
TRANSFORM_ORIGIN,
DeleteByQueryAction.INSTANCE,
deleteByQueryRequest,
ActionListener.wrap(response -> {
if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) {
Tuple<RestStatus, Throwable> statusAndReason = getStatusAndReason(response);
listener.onFailure(
new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2())
);
return;
}
listener.onResponse(response.getDeleted());
}, listener::onFailure)
);
}

private void putTransformConfiguration(
TransformConfig transformConfig,
DocWriteRequest.OpType optType,
Expand Down Expand Up @@ -419,9 +459,7 @@ public void expandTransformIds(

@Override
public void deleteTransform(String transformId, ActionListener<Boolean> listener) {
DeleteByQueryRequest request = new DeleteByQueryRequest().setAbortOnVersionConflict(false); // since these documents are not
// updated, a conflict just means it was
// deleted previously
DeleteByQueryRequest request = createDeleteByQueryRequest();

request.indices(TransformInternalIndexConstants.INDEX_NAME_PATTERN, TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED);
QueryBuilder query = QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId);
Expand Down Expand Up @@ -675,4 +713,22 @@ private static Tuple<RestStatus, Throwable> getStatusAndReason(final BulkByScrol
}
return new Tuple<>(status, reason);
}

/**
* Create DBQ request with good defaults
*
* @return new DeleteByQueryRequest with some defaults set
*/
private static DeleteByQueryRequest createDeleteByQueryRequest() {

DeleteByQueryRequest deleteByQuery = new DeleteByQueryRequest();

deleteByQuery.setAbortOnVersionConflict(false)
.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
.setIndicesOptions(IndicesOptions.lenientExpandOpen());

// disable scoring by using index order
deleteByQuery.getSearchRequest().source().sort(SINGLE_MAPPING_NAME);
return deleteByQuery;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ void updateTransformConfiguration(
*/
void deleteOldTransformStoredDocuments(String transformId, ActionListener<Boolean> listener);

/**
* This deletes stored checkpoint documents for the given transformId, based on number and age.
*
* Both criteria MUST apply for the deletion to happen.
*
* @param transformId The transform ID referenced by the documents
* @param deleteCheckpointsBelow checkpoints lower than this to delete
* @param deleteOlderThan checkpoints older than this to delete
* @param listener listener to alert on completion, returning number of deleted checkpoints
*/
void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener<Long> listener);

/**
* Get a stored checkpoint, requires the transform id as well as the checkpoint id
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
Expand All @@ -54,8 +55,9 @@ public final class TransformInternalIndex {
* progress::docs_processed, progress::docs_indexed,
* stats::exponential_avg_checkpoint_duration_ms, stats::exponential_avg_documents_indexed,
* stats::exponential_avg_documents_processed
*
* version 3 (7.5): rename to .transform-internal-xxx
* version 4 (7.6): state::should_stop_at_checkpoint
* checkpoint::checkpoint
*/

// constants for mappings
Expand All @@ -77,25 +79,29 @@ public final class TransformInternalIndex {

public static IndexTemplateMetaData getIndexTemplateMetaData() throws IOException {
IndexTemplateMetaData transformTemplate = IndexTemplateMetaData.builder(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)
.patterns(Collections.singletonList(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME))
.version(Version.CURRENT.id)
.settings(Settings.builder()
// the configurations are expected to be small
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1"))
.putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(mappings()))
.build();
.patterns(Collections.singletonList(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME))
.version(Version.CURRENT.id)
.settings(
Settings.builder()
// the configurations are expected to be small
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
)
.putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(mappings()))
.build();
return transformTemplate;
}

public static IndexTemplateMetaData getAuditIndexTemplateMetaData() throws IOException {
IndexTemplateMetaData transformTemplate = IndexTemplateMetaData.builder(TransformInternalIndexConstants.AUDIT_INDEX)
.patterns(Collections.singletonList(TransformInternalIndexConstants.AUDIT_INDEX_PREFIX + "*"))
.version(Version.CURRENT.id)
.settings(Settings.builder()
// the audits are expected to be small
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1"))
.settings(
Settings.builder()
// the audits are expected to be small
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
)
.putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(auditMappings()))
.putAlias(AliasMetaData.builder(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS))
.build();
Expand All @@ -107,26 +113,27 @@ private static XContentBuilder auditMappings() throws IOException {
builder.startObject(SINGLE_MAPPING_NAME);
addMetaInformation(builder);
builder.field(DYNAMIC, "false");
builder.startObject(PROPERTIES)
.startObject(TRANSFORM_ID)
.field(TYPE, KEYWORD)
.endObject()
.startObject(AbstractAuditMessage.LEVEL.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AbstractAuditMessage.MESSAGE.getPreferredName())
.field(TYPE, TEXT)
.startObject(FIELDS)
.startObject(RAW)
.field(TYPE, KEYWORD)
.endObject()
.endObject()
builder
.startObject(PROPERTIES)
.startObject(TRANSFORM_ID)
.field(TYPE, KEYWORD)
.endObject()
.startObject(AbstractAuditMessage.LEVEL.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AbstractAuditMessage.MESSAGE.getPreferredName())
.field(TYPE, TEXT)
.startObject(FIELDS)
.startObject(RAW)
.field(TYPE, KEYWORD)
.endObject()
.endObject()
.endObject()
.startObject(AbstractAuditMessage.TIMESTAMP.getPreferredName())
.field(TYPE, DATE)
.field(TYPE, DATE)
.endObject()
.startObject(AbstractAuditMessage.NODE_NAME.getPreferredName())
.field(TYPE, KEYWORD)
.field(TYPE, KEYWORD)
.endObject()
.endObject()
.endObject()
Expand Down Expand Up @@ -167,7 +174,6 @@ public static XContentBuilder mappings(XContentBuilder builder) throws IOExcepti
return builder;
}


private static XContentBuilder addTransformStoredDocMappings(XContentBuilder builder) throws IOException {
return builder
.startObject(TransformStoredDoc.STATE_FIELD.getPreferredName())
Expand Down Expand Up @@ -254,9 +260,6 @@ private static XContentBuilder addTransformStoredDocMappings(XContentBuilder bui
.endObject()
.endObject()
.endObject();
// This is obsolete and can be removed for future versions of the index, but is left here as a warning/reminder that
// we cannot declare this field differently in version 1 of the internal index as it would cause a mapping clash
// .startObject("checkpointing").field(ENABLED, false).endObject();
}

public static XContentBuilder addTransformsConfigMappings(XContentBuilder builder) throws IOException {
Expand Down Expand Up @@ -299,6 +302,9 @@ private static XContentBuilder addTransformCheckpointMappings(XContentBuilder bu
.endObject()
.startObject(TransformField.TIME_UPPER_BOUND_MILLIS.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(TransformCheckpoint.CHECKPOINT.getPreferredName())
.field(TYPE, LONG)
.endObject();
}

Expand All @@ -310,9 +316,7 @@ private static XContentBuilder addTransformCheckpointMappings(XContentBuilder bu
* @throws IOException On write error
*/
private static XContentBuilder addMetaInformation(XContentBuilder builder) throws IOException {
return builder.startObject("_meta")
.field("version", Version.CURRENT)
.endObject();
return builder.startObject("_meta").field("version", Version.CURRENT).endObject();
}

/**
Expand Down
Loading