From f509f09e91d93bb7b788b503848762c579874326 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 22 Nov 2019 17:42:30 +0100 Subject: [PATCH 1/5] apply formatting --- .../persistence/TransformInternalIndex.java | 42 ++++++++++--------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java index d0b94bf6e3e52..545a5680ac214 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java @@ -77,14 +77,16 @@ public final class TransformInternalIndex { public static IndexTemplateMetaData getIndexTemplateMetaData() throws IOException { IndexTemplateMetaData transformTemplate = IndexTemplateMetaData.builder(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME) - .patterns(Collections.singletonList(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) - .version(Version.CURRENT.id) - .settings(Settings.builder() - // the configurations are expected to be small - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")) - .putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(mappings())) - .build(); + .patterns(Collections.singletonList(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) + .version(Version.CURRENT.id) + .settings( + Settings.builder() + // the configurations are expected to be small + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1") + ) + .putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(mappings())) + .build(); return transformTemplate; } @@ -92,10 +94,12 @@ public static IndexTemplateMetaData getAuditIndexTemplateMetaData() throws IOExc IndexTemplateMetaData transformTemplate = IndexTemplateMetaData.builder(TransformInternalIndexConstants.AUDIT_INDEX) .patterns(Collections.singletonList(TransformInternalIndexConstants.AUDIT_INDEX_PREFIX + "*")) .version(Version.CURRENT.id) - .settings(Settings.builder() - // the audits are expected to be small - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")) + .settings( + Settings.builder() + // the audits are expected to be small + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1") + ) .putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(auditMappings())) .putAlias(AliasMetaData.builder(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS)) .build(); @@ -167,8 +171,8 @@ public static XContentBuilder mappings(XContentBuilder builder) throws IOExcepti return builder; } - private static XContentBuilder addTransformStoredDocMappings(XContentBuilder builder) throws IOException { + // tag::disable_formating return builder .startObject(TransformStoredDoc.STATE_FIELD.getPreferredName()) .startObject(PROPERTIES) @@ -254,12 +258,11 @@ private static XContentBuilder addTransformStoredDocMappings(XContentBuilder bui .endObject() .endObject() .endObject(); - // This is obsolete and can be removed for future versions of the index, but is left here as a warning/reminder that - // we cannot declare this field differently in version 1 of the internal index as it would cause a mapping clash - // .startObject("checkpointing").field(ENABLED, false).endObject(); + // end::disable_formating } public static XContentBuilder addTransformsConfigMappings(XContentBuilder builder) throws IOException { + // tag::disable_formating return builder .startObject(TransformField.ID.getPreferredName()) .field(TYPE, KEYWORD) @@ -290,9 +293,11 @@ public static XContentBuilder addTransformsConfigMappings(XContentBuilder builde .startObject(TransformField.CREATE_TIME.getPreferredName()) .field(TYPE, DATE) .endObject(); + // end::disable_formating } private static XContentBuilder addTransformCheckpointMappings(XContentBuilder builder) throws IOException { + // tag::disable_formating return builder .startObject(TransformField.TIMESTAMP_MILLIS.getPreferredName()) .field(TYPE, DATE) @@ -300,6 +305,7 @@ private static XContentBuilder addTransformCheckpointMappings(XContentBuilder bu .startObject(TransformField.TIME_UPPER_BOUND_MILLIS.getPreferredName()) .field(TYPE, DATE) .endObject(); + // end::disable_formating } /** @@ -310,9 +316,7 @@ private static XContentBuilder addTransformCheckpointMappings(XContentBuilder bu * @throws IOException On write error */ private static XContentBuilder addMetaInformation(XContentBuilder builder) throws IOException { - return builder.startObject("_meta") - .field("version", Version.CURRENT) - .endObject(); + return builder.startObject("_meta").field("version", Version.CURRENT).endObject(); } /** From 46f3e3556989393a45cb702e88a4678aaffd4a00 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 22 Nov 2019 17:45:22 +0100 Subject: [PATCH 2/5] automatically delete old checkpoints --- .../transforms/TransformCheckpoint.java | 2 +- .../IndexBasedTransformConfigManager.java | 78 +++++++++++++--- .../persistence/TransformConfigManager.java | 10 +++ .../persistence/TransformInternalIndex.java | 7 +- .../transforms/TransformIndexer.java | 50 ++++++++++- .../InMemoryTransformConfigManager.java | 10 +++ .../TransformConfigManagerTests.java | 90 +++++++++++++++++++ 7 files changed, 234 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java index 3e2dd844c362f..8431abc886d85 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java @@ -50,7 +50,7 @@ public class TransformCheckpoint implements Writeable, ToXContentObject { // checkpoint of the indexes (sequence id's) public static final ParseField INDICES = new ParseField("indices"); - private static final String NAME = "data_frame_transform_checkpoint"; + public static final String NAME = "data_frame_transform_checkpoint"; private static final ConstructingObjectParser STRICT_PARSER = createParser(false); private static final ConstructingObjectParser LENIENT_PARSER = createParser(true); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java index 5b747e9de01e1..b96c690b3e774 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java @@ -39,6 +39,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; @@ -63,6 +64,7 @@ import java.util.List; import java.util.Set; +import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; @@ -146,16 +148,18 @@ public void updateTransformConfiguration( @Override public void deleteOldTransformConfigurations(String transformId, ActionListener listener) { - DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest( + DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest(); + deleteByQueryRequest.indices( TransformInternalIndexConstants.INDEX_NAME_PATTERN, TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED - ).setQuery( + ); + deleteByQueryRequest.setQuery( QueryBuilders.constantScoreQuery( QueryBuilders.boolQuery() .mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME)) .filter(QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId))) ) - ).setIndicesOptions(IndicesOptions.lenientExpandOpen()); + ); executeAsyncWithOrigin( client, @@ -177,17 +181,18 @@ public void deleteOldTransformConfigurations(String transformId, ActionListener< @Override public void deleteOldTransformStoredDocuments(String transformId, ActionListener listener) { - DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest( + DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest(); + deleteByQueryRequest.indices( TransformInternalIndexConstants.INDEX_NAME_PATTERN, TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED - ).setQuery( + ); + deleteByQueryRequest.setQuery( QueryBuilders.constantScoreQuery( QueryBuilders.boolQuery() .mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME)) .filter(QueryBuilders.termQuery("_id", TransformStoredDoc.documentId(transformId))) ) - ).setIndicesOptions(IndicesOptions.lenientExpandOpen()); - + ); executeAsyncWithOrigin( client, TRANSFORM_ORIGIN, @@ -206,6 +211,43 @@ public void deleteOldTransformStoredDocuments(String transformId, ActionListener ); } + @Override + public void deleteOldCheckpoints(String transformId, long checkpointLowerBound, long lowerBoundEpochMs, ActionListener listener) { + DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest(); + deleteByQueryRequest.indices( + TransformInternalIndexConstants.INDEX_NAME_PATTERN, + TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED + ); + deleteByQueryRequest.setQuery( + QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId)) + .filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformCheckpoint.NAME)) + .filter(QueryBuilders.rangeQuery(TransformCheckpoint.CHECKPOINT.getPreferredName()).lt(checkpointLowerBound)) + .filter( + QueryBuilders.rangeQuery(TransformField.TIMESTAMP_MILLIS.getPreferredName()) + .lt(lowerBoundEpochMs) + .format("epoch_millis") + ) + ); + logger.debug("Deleting old checkpoints using {}", deleteByQueryRequest.getSearchRequest()); + executeAsyncWithOrigin( + client, + TRANSFORM_ORIGIN, + DeleteByQueryAction.INSTANCE, + deleteByQueryRequest, + ActionListener.wrap(response -> { + if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) { + Tuple statusAndReason = getStatusAndReason(response); + listener.onFailure( + new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2()) + ); + return; + } + listener.onResponse(response.getDeleted()); + }, listener::onFailure) + ); + } + private void putTransformConfiguration( TransformConfig transformConfig, DocWriteRequest.OpType optType, @@ -419,9 +461,7 @@ public void expandTransformIds( @Override public void deleteTransform(String transformId, ActionListener listener) { - DeleteByQueryRequest request = new DeleteByQueryRequest().setAbortOnVersionConflict(false); // since these documents are not - // updated, a conflict just means it was - // deleted previously + DeleteByQueryRequest request = createDeleteByQueryRequest(); request.indices(TransformInternalIndexConstants.INDEX_NAME_PATTERN, TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED); QueryBuilder query = QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId); @@ -675,4 +715,22 @@ private static Tuple getStatusAndReason(final BulkByScrol } return new Tuple<>(status, reason); } + + /** + * Create DBQ request with good defaults + * + * @return new DeleteByQueryRequest with some defaults set + */ + private static DeleteByQueryRequest createDeleteByQueryRequest() { + + DeleteByQueryRequest deleteByQuery = new DeleteByQueryRequest(); + + deleteByQuery.setAbortOnVersionConflict(false) + .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()); + + // disable scoring by using index order + deleteByQuery.getSearchRequest().source().sort(SINGLE_MAPPING_NAME); + return deleteByQuery; + } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java index 4d512166b1d16..3bf306e080e86 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java @@ -72,6 +72,16 @@ void updateTransformConfiguration( */ void deleteOldTransformStoredDocuments(String transformId, ActionListener listener); + /** + * This deletes stored checkpoint documents for the given transformId, checkpoint lower bound and timestamp lower bound. + * + * @param transformId The transform ID referenced by the documents + * @param checkpointLowerBound lower bound of checkpoints to be deleted + * @param lowerBoundEpochMs timestamp until checkpoints shall be deleted + * @param listener listener to alert on completion, returning number of deleted checkpoints + */ + void deleteOldCheckpoints(String transformId, long checkpointLowerBound, long lowerBoundEpochMs, ActionListener listener); + /** * Get a stored checkpoint, requires the transform id as well as the checkpoint id * diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java index 545a5680ac214..3087a31243403 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java @@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.transforms.DestConfig; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.TransformState; @@ -54,8 +55,9 @@ public final class TransformInternalIndex { * progress::docs_processed, progress::docs_indexed, * stats::exponential_avg_checkpoint_duration_ms, stats::exponential_avg_documents_indexed, * stats::exponential_avg_documents_processed - * + * version 3 (7.5): rename to .transform-internal-xxx * version 4 (7.6): state::should_stop_at_checkpoint + * checkpoint::checkpoint */ // constants for mappings @@ -304,6 +306,9 @@ private static XContentBuilder addTransformCheckpointMappings(XContentBuilder bu .endObject() .startObject(TransformField.TIME_UPPER_BOUND_MILLIS.getPreferredName()) .field(TYPE, DATE) + .endObject() + .startObject(TransformCheckpoint.CHECKPOINT.getPreferredName()) + .field(TYPE, LONG) .endObject(); // end::disable_formating } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 01c2b8c4329b1..472677ce20f60 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -79,8 +79,14 @@ private enum RunState { public static final int MINIMUM_PAGE_SIZE = 10; public static final String COMPOSITE_AGGREGATION_NAME = "_transform"; + private static final Logger logger = LogManager.getLogger(TransformIndexer.class); + // constant for checkpoint retention, static for now + private static final long NUMBER_OF_CHECKPOINTS_TO_KEEP = 10; + private static final long RETENTION_OF_CHECKPOINTS_MS = 864000000L; // 10 days + private static final long CHECKPOINT_CLEANUP_INTERVAL = 100L; // every 100 checkpoints + protected final TransformConfigManager transformsConfigManager; private final CheckpointProvider checkpointProvider; private final TransformProgressGatherer progressGatherer; @@ -111,6 +117,8 @@ private enum RunState { private volatile Map> changedBuckets; private volatile Map changedBucketsAfterKey; + private long lastCheckpointCleanup = 0L; + public TransformIndexer( Executor executor, TransformConfigManager transformsConfigManager, @@ -375,7 +383,14 @@ protected void onFinish(ActionListener listener) { if (context.shouldStopAtCheckpoint()) { stop(); } - listener.onResponse(null); + + if (checkpoint - lastCheckpointCleanup > CHECKPOINT_CLEANUP_INTERVAL) { + // delete old checkpoints, this might even fail, but it will be retried in this case + cleanupOldCheckpoints(listener); + lastCheckpointCleanup = checkpoint; + } else { + listener.onResponse(null); + } } catch (Exception e) { listener.onFailure(e); } @@ -492,6 +507,32 @@ synchronized void handleFailure(Exception e) { } } + /** + * Cleanup old checkpoints + * + * @param listener listener to call after done + */ + private void cleanupOldCheckpoints(ActionListener listener) { + long now = getTime(); + long checkpointLowerBound = context.getCheckpoint() - NUMBER_OF_CHECKPOINTS_TO_KEEP; + long lowerBoundEpochMs = now - RETENTION_OF_CHECKPOINTS_MS; + + if (checkpointLowerBound > 0 && lowerBoundEpochMs > 0) { + transformsConfigManager.deleteOldCheckpoints( + transformConfig.getId(), + checkpointLowerBound, + lowerBoundEpochMs, + ActionListener.wrap(deletes -> { + logger.debug("[{}] deleted [{}] outdated checkpoints", getJobId(), deletes); + listener.onResponse(null); + }, listener::onFailure) + ); + } else { + logger.debug("[{}] checked for outdated checkpoints", getJobId()); + listener.onResponse(null); + } + } + private void sourceHasChanged(ActionListener hasChangedListener) { checkpointProvider.sourceHasChanged(getLastCheckpoint(), ActionListener.wrap(hasChanged -> { logger.trace("[{}] change detected [{}].", getJobId(), hasChanged); @@ -788,6 +829,13 @@ protected void failIndexer(String failureMessage) { context.markAsFailed(failureMessage); } + /* + * Get the current time, abstracted for the purpose of testing + */ + long getTime() { + return System.currentTimeMillis(); + } + /** * Indicates if an audit message should be written when onFinish is called for the given checkpoint * We audit the first checkpoint, and then every 10 checkpoints until completedCheckpoint == 99 diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/InMemoryTransformConfigManager.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/InMemoryTransformConfigManager.java index 3b084c01a8a46..7522a5e1b0e0b 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/InMemoryTransformConfigManager.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/InMemoryTransformConfigManager.java @@ -81,6 +81,16 @@ public void deleteOldTransformStoredDocuments(String transformId, ActionListener listener.onResponse(true); } + @Override + public void deleteOldCheckpoints(String transformId, long checkpointLowerBound, long lowerBoundEpochMs, ActionListener listener) { + List checkpointsById = checkpoints.get(transformId); + int sizeBeforeDelete = checkpointsById.size(); + if (checkpointsById != null) { + checkpointsById.removeIf(cp -> { return cp.getCheckpoint() < checkpointLowerBound && cp.getTimestamp() < lowerBoundEpochMs; }); + } + listener.onResponse(Long.valueOf(sizeBeforeDelete - checkpointsById.size())); + } + @Override public void getTransformCheckpoint(String transformId, long checkpoint, ActionListener resultListener) { List checkpointsById = checkpoints.get(transformId); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManagerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManagerTests.java index c3248a56afe92..818941bac5837 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManagerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManagerTests.java @@ -425,4 +425,94 @@ public void testDeleteOldTransformStoredDocuments() throws Exception { is(true) ); } + + public void testDeleteOldCheckpoints() throws InterruptedException { + String transformId = randomAlphaOfLengthBetween(1, 10); + long timestamp = System.currentTimeMillis() - randomLongBetween(20000, 40000); + + // create some other docs to check they are not getting accidentally deleted + TransformStoredDoc storedDocs = TransformStoredDocTests.randomTransformStoredDoc(transformId); + SeqNoPrimaryTermAndIndex firstIndex = new SeqNoPrimaryTermAndIndex(0, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME); + assertAsync(listener -> transformConfigManager.putOrUpdateTransformStoredDoc(storedDocs, null, listener), firstIndex, null, null); + + TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId); + assertAsync(listener -> transformConfigManager.putTransformConfiguration(transformConfig, listener), true, null, null); + + // create 100 checkpoints + for (int i = 1; i <= 100; i++) { + TransformCheckpoint checkpoint = new TransformCheckpoint( + transformId, + timestamp + i * 200, + i, + Collections.emptyMap(), + timestamp - 100 + i * 200 + ); + assertAsync(listener -> transformConfigManager.putTransformCheckpoint(checkpoint, listener), true, null, null); + } + + // read a random checkpoint + int randomCheckpoint = randomIntBetween(1, 100); + TransformCheckpoint checkpointExpected = new TransformCheckpoint( + transformId, + timestamp + randomCheckpoint * 200, + randomCheckpoint, + Collections.emptyMap(), + timestamp - 100 + randomCheckpoint * 200 + ); + + assertAsync( + listener -> transformConfigManager.getTransformCheckpoint(transformId, randomCheckpoint, listener), + checkpointExpected, + null, + null + ); + + // test delete based on checkpoint number (time would allow more) + assertAsync( + listener -> transformConfigManager.deleteOldCheckpoints(transformId, 11L, timestamp + 1 + 20L * 200, listener), + 10L, + null, + null + ); + + // test delete based on time (checkpoint number would allow more) + assertAsync( + listener -> transformConfigManager.deleteOldCheckpoints(transformId, 30L, timestamp + 1 + 20L * 200, listener), + 10L, + null, + null + ); + + // zero delete + assertAsync( + listener -> transformConfigManager.deleteOldCheckpoints(transformId, 30L, timestamp + 1 + 20L * 200, listener), + 0L, + null, + null + ); + + // delete the rest + assertAsync( + listener -> transformConfigManager.deleteOldCheckpoints(transformId, 101L, timestamp + 1 + 100L * 200, listener), + 80L, + null, + null + ); + + // test that the other docs are still there + assertAsync( + listener -> transformConfigManager.getTransformStoredDoc(transformId, listener), + Tuple.tuple(storedDocs, firstIndex), + null, + null + ); + + assertAsync( + listener -> transformConfigManager.getTransformConfiguration(transformConfig.getId(), listener), + transformConfig, + null, + null + ); + + } } From e3656d27e6495c7dab7c66f06dd934df234ab4d4 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 22 Nov 2019 20:19:32 +0100 Subject: [PATCH 3/5] revert disable formating tags as they cause problems with checkstyle --- .../persistence/TransformInternalIndex.java | 39 ++++++++----------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java index 3087a31243403..2951ade8f47fb 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java @@ -113,26 +113,27 @@ private static XContentBuilder auditMappings() throws IOException { builder.startObject(SINGLE_MAPPING_NAME); addMetaInformation(builder); builder.field(DYNAMIC, "false"); - builder.startObject(PROPERTIES) - .startObject(TRANSFORM_ID) - .field(TYPE, KEYWORD) - .endObject() - .startObject(AbstractAuditMessage.LEVEL.getPreferredName()) - .field(TYPE, KEYWORD) - .endObject() - .startObject(AbstractAuditMessage.MESSAGE.getPreferredName()) - .field(TYPE, TEXT) - .startObject(FIELDS) - .startObject(RAW) - .field(TYPE, KEYWORD) - .endObject() - .endObject() + builder + .startObject(PROPERTIES) + .startObject(TRANSFORM_ID) + .field(TYPE, KEYWORD) + .endObject() + .startObject(AbstractAuditMessage.LEVEL.getPreferredName()) + .field(TYPE, KEYWORD) + .endObject() + .startObject(AbstractAuditMessage.MESSAGE.getPreferredName()) + .field(TYPE, TEXT) + .startObject(FIELDS) + .startObject(RAW) + .field(TYPE, KEYWORD) + .endObject() + .endObject() .endObject() .startObject(AbstractAuditMessage.TIMESTAMP.getPreferredName()) - .field(TYPE, DATE) + .field(TYPE, DATE) .endObject() .startObject(AbstractAuditMessage.NODE_NAME.getPreferredName()) - .field(TYPE, KEYWORD) + .field(TYPE, KEYWORD) .endObject() .endObject() .endObject() @@ -174,7 +175,6 @@ public static XContentBuilder mappings(XContentBuilder builder) throws IOExcepti } private static XContentBuilder addTransformStoredDocMappings(XContentBuilder builder) throws IOException { - // tag::disable_formating return builder .startObject(TransformStoredDoc.STATE_FIELD.getPreferredName()) .startObject(PROPERTIES) @@ -260,11 +260,9 @@ private static XContentBuilder addTransformStoredDocMappings(XContentBuilder bui .endObject() .endObject() .endObject(); - // end::disable_formating } public static XContentBuilder addTransformsConfigMappings(XContentBuilder builder) throws IOException { - // tag::disable_formating return builder .startObject(TransformField.ID.getPreferredName()) .field(TYPE, KEYWORD) @@ -295,11 +293,9 @@ public static XContentBuilder addTransformsConfigMappings(XContentBuilder builde .startObject(TransformField.CREATE_TIME.getPreferredName()) .field(TYPE, DATE) .endObject(); - // end::disable_formating } private static XContentBuilder addTransformCheckpointMappings(XContentBuilder builder) throws IOException { - // tag::disable_formating return builder .startObject(TransformField.TIMESTAMP_MILLIS.getPreferredName()) .field(TYPE, DATE) @@ -310,7 +306,6 @@ private static XContentBuilder addTransformCheckpointMappings(XContentBuilder bu .startObject(TransformCheckpoint.CHECKPOINT.getPreferredName()) .field(TYPE, LONG) .endObject(); - // end::disable_formating } /** From 31bcbd6dd4f04e01ca2cfade4f4b96ce293c5e52 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 27 Nov 2019 10:36:44 +0100 Subject: [PATCH 4/5] improve variable naming, only warn/audit if cleanup fails --- .../IndexBasedTransformConfigManager.java | 8 +++----- .../persistence/TransformConfigManager.java | 10 ++++++---- .../transform/transforms/TransformIndexer.java | 17 ++++++++++++++--- .../InMemoryTransformConfigManager.java | 4 ++-- 4 files changed, 25 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java index b96c690b3e774..e3345aa6a2d83 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java @@ -212,7 +212,7 @@ public void deleteOldTransformStoredDocuments(String transformId, ActionListener } @Override - public void deleteOldCheckpoints(String transformId, long checkpointLowerBound, long lowerBoundEpochMs, ActionListener listener) { + public void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener listener) { DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest(); deleteByQueryRequest.indices( TransformInternalIndexConstants.INDEX_NAME_PATTERN, @@ -222,11 +222,9 @@ public void deleteOldCheckpoints(String transformId, long checkpointLowerBound, QueryBuilders.boolQuery() .filter(QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId)) .filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformCheckpoint.NAME)) - .filter(QueryBuilders.rangeQuery(TransformCheckpoint.CHECKPOINT.getPreferredName()).lt(checkpointLowerBound)) + .filter(QueryBuilders.rangeQuery(TransformCheckpoint.CHECKPOINT.getPreferredName()).lt(deleteCheckpointsBelow)) .filter( - QueryBuilders.rangeQuery(TransformField.TIMESTAMP_MILLIS.getPreferredName()) - .lt(lowerBoundEpochMs) - .format("epoch_millis") + QueryBuilders.rangeQuery(TransformField.TIMESTAMP_MILLIS.getPreferredName()).lt(deleteOlderThan).format("epoch_millis") ) ); logger.debug("Deleting old checkpoints using {}", deleteByQueryRequest.getSearchRequest()); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java index 3bf306e080e86..61c639964e963 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java @@ -73,14 +73,16 @@ void updateTransformConfiguration( void deleteOldTransformStoredDocuments(String transformId, ActionListener listener); /** - * This deletes stored checkpoint documents for the given transformId, checkpoint lower bound and timestamp lower bound. + * This deletes stored checkpoint documents for the given transformId, based on number and age. + * + * Both criteria MUST apply for the deletion to happen. * * @param transformId The transform ID referenced by the documents - * @param checkpointLowerBound lower bound of checkpoints to be deleted - * @param lowerBoundEpochMs timestamp until checkpoints shall be deleted + * @param deleteCheckpointsBelow checkpoints lower than this to delete + * @param deleteOlderThan checkpoints older than this to delete * @param listener listener to alert on completion, returning number of deleted checkpoints */ - void deleteOldCheckpoints(String transformId, long checkpointLowerBound, long lowerBoundEpochMs, ActionListener listener); + void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener listener); /** * Get a stored checkpoint, requires the transform id as well as the checkpoint id diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 472677ce20f60..569140cbcbdef 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -385,9 +385,8 @@ protected void onFinish(ActionListener listener) { } if (checkpoint - lastCheckpointCleanup > CHECKPOINT_CLEANUP_INTERVAL) { - // delete old checkpoints, this might even fail, but it will be retried in this case + // delete old checkpoints, on a failure we keep going cleanupOldCheckpoints(listener); - lastCheckpointCleanup = checkpoint; } else { listener.onResponse(null); } @@ -525,7 +524,19 @@ private void cleanupOldCheckpoints(ActionListener listener) { ActionListener.wrap(deletes -> { logger.debug("[{}] deleted [{}] outdated checkpoints", getJobId(), deletes); listener.onResponse(null); - }, listener::onFailure) + lastCheckpointCleanup = context.getCheckpoint(); + }, e -> { + logger.warn( + new ParameterizedMessage("[{}] failed to cleanup old checkpoints, retrying after next checkpoint", getJobId()), + e + ); + auditor.warning( + getJobId(), + "Failed to cleanup old checkpoints, retrying after next checkpoint. Exception: " + e.getMessage() + ); + + listener.onResponse(null); + }) ); } else { logger.debug("[{}] checked for outdated checkpoints", getJobId()); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/InMemoryTransformConfigManager.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/InMemoryTransformConfigManager.java index 7522a5e1b0e0b..c52e8dea8fde2 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/InMemoryTransformConfigManager.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/InMemoryTransformConfigManager.java @@ -82,11 +82,11 @@ public void deleteOldTransformStoredDocuments(String transformId, ActionListener } @Override - public void deleteOldCheckpoints(String transformId, long checkpointLowerBound, long lowerBoundEpochMs, ActionListener listener) { + public void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener listener) { List checkpointsById = checkpoints.get(transformId); int sizeBeforeDelete = checkpointsById.size(); if (checkpointsById != null) { - checkpointsById.removeIf(cp -> { return cp.getCheckpoint() < checkpointLowerBound && cp.getTimestamp() < lowerBoundEpochMs; }); + checkpointsById.removeIf(cp -> { return cp.getCheckpoint() < deleteCheckpointsBelow && cp.getTimestamp() < deleteOlderThan; }); } listener.onResponse(Long.valueOf(sizeBeforeDelete - checkpointsById.size())); } From c08cda448b873c200a3b6e79b0ee8987effbad9b Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 2 Dec 2019 15:36:28 +0100 Subject: [PATCH 5/5] make lastCheckpointCleanup volatile --- .../xpack/transform/transforms/TransformIndexer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 569140cbcbdef..0848fc21217e5 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -117,7 +117,7 @@ private enum RunState { private volatile Map> changedBuckets; private volatile Map changedBucketsAfterKey; - private long lastCheckpointCleanup = 0L; + private volatile long lastCheckpointCleanup = 0L; public TransformIndexer( Executor executor,