diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java index 3e2dd844c362f..8431abc886d85 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java @@ -50,7 +50,7 @@ public class TransformCheckpoint implements Writeable, ToXContentObject { // checkpoint of the indexes (sequence id's) public static final ParseField INDICES = new ParseField("indices"); - private static final String NAME = "data_frame_transform_checkpoint"; + public static final String NAME = "data_frame_transform_checkpoint"; private static final ConstructingObjectParser STRICT_PARSER = createParser(false); private static final ConstructingObjectParser LENIENT_PARSER = createParser(true); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java index 5b747e9de01e1..e3345aa6a2d83 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java @@ -39,6 +39,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; @@ -63,6 +64,7 @@ import java.util.List; import java.util.Set; +import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; @@ -146,16 +148,18 @@ public void updateTransformConfiguration( @Override public void deleteOldTransformConfigurations(String transformId, ActionListener listener) { - DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest( + DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest(); + deleteByQueryRequest.indices( TransformInternalIndexConstants.INDEX_NAME_PATTERN, TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED - ).setQuery( + ); + deleteByQueryRequest.setQuery( QueryBuilders.constantScoreQuery( QueryBuilders.boolQuery() .mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME)) .filter(QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId))) ) - ).setIndicesOptions(IndicesOptions.lenientExpandOpen()); + ); executeAsyncWithOrigin( client, @@ -177,17 +181,18 @@ public void deleteOldTransformConfigurations(String transformId, ActionListener< @Override public void deleteOldTransformStoredDocuments(String transformId, ActionListener listener) { - DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest( + DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest(); + deleteByQueryRequest.indices( TransformInternalIndexConstants.INDEX_NAME_PATTERN, TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED - ).setQuery( + ); + deleteByQueryRequest.setQuery( QueryBuilders.constantScoreQuery( QueryBuilders.boolQuery() .mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME)) .filter(QueryBuilders.termQuery("_id", TransformStoredDoc.documentId(transformId))) ) - ).setIndicesOptions(IndicesOptions.lenientExpandOpen()); - + ); executeAsyncWithOrigin( client, TRANSFORM_ORIGIN, @@ -206,6 +211,41 @@ public void deleteOldTransformStoredDocuments(String transformId, ActionListener ); } + @Override + public void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener listener) { + DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest(); + deleteByQueryRequest.indices( + TransformInternalIndexConstants.INDEX_NAME_PATTERN, + TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED + ); + deleteByQueryRequest.setQuery( + QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId)) + .filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformCheckpoint.NAME)) + .filter(QueryBuilders.rangeQuery(TransformCheckpoint.CHECKPOINT.getPreferredName()).lt(deleteCheckpointsBelow)) + .filter( + QueryBuilders.rangeQuery(TransformField.TIMESTAMP_MILLIS.getPreferredName()).lt(deleteOlderThan).format("epoch_millis") + ) + ); + logger.debug("Deleting old checkpoints using {}", deleteByQueryRequest.getSearchRequest()); + executeAsyncWithOrigin( + client, + TRANSFORM_ORIGIN, + DeleteByQueryAction.INSTANCE, + deleteByQueryRequest, + ActionListener.wrap(response -> { + if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) { + Tuple statusAndReason = getStatusAndReason(response); + listener.onFailure( + new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2()) + ); + return; + } + listener.onResponse(response.getDeleted()); + }, listener::onFailure) + ); + } + private void putTransformConfiguration( TransformConfig transformConfig, DocWriteRequest.OpType optType, @@ -419,9 +459,7 @@ public void expandTransformIds( @Override public void deleteTransform(String transformId, ActionListener listener) { - DeleteByQueryRequest request = new DeleteByQueryRequest().setAbortOnVersionConflict(false); // since these documents are not - // updated, a conflict just means it was - // deleted previously + DeleteByQueryRequest request = createDeleteByQueryRequest(); request.indices(TransformInternalIndexConstants.INDEX_NAME_PATTERN, TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED); QueryBuilder query = QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId); @@ -675,4 +713,22 @@ private static Tuple getStatusAndReason(final BulkByScrol } return new Tuple<>(status, reason); } + + /** + * Create DBQ request with good defaults + * + * @return new DeleteByQueryRequest with some defaults set + */ + private static DeleteByQueryRequest createDeleteByQueryRequest() { + + DeleteByQueryRequest deleteByQuery = new DeleteByQueryRequest(); + + deleteByQuery.setAbortOnVersionConflict(false) + .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()); + + // disable scoring by using index order + deleteByQuery.getSearchRequest().source().sort(SINGLE_MAPPING_NAME); + return deleteByQuery; + } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java index 4d512166b1d16..61c639964e963 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java @@ -72,6 +72,18 @@ void updateTransformConfiguration( */ void deleteOldTransformStoredDocuments(String transformId, ActionListener listener); + /** + * This deletes stored checkpoint documents for the given transformId, based on number and age. + * + * Both criteria MUST apply for the deletion to happen. + * + * @param transformId The transform ID referenced by the documents + * @param deleteCheckpointsBelow checkpoints lower than this to delete + * @param deleteOlderThan checkpoints older than this to delete + * @param listener listener to alert on completion, returning number of deleted checkpoints + */ + void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener listener); + /** * Get a stored checkpoint, requires the transform id as well as the checkpoint id * diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java index d0b94bf6e3e52..2951ade8f47fb 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java @@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.transforms.DestConfig; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.TransformState; @@ -54,8 +55,9 @@ public final class TransformInternalIndex { * progress::docs_processed, progress::docs_indexed, * stats::exponential_avg_checkpoint_duration_ms, stats::exponential_avg_documents_indexed, * stats::exponential_avg_documents_processed - * + * version 3 (7.5): rename to .transform-internal-xxx * version 4 (7.6): state::should_stop_at_checkpoint + * checkpoint::checkpoint */ // constants for mappings @@ -77,14 +79,16 @@ public final class TransformInternalIndex { public static IndexTemplateMetaData getIndexTemplateMetaData() throws IOException { IndexTemplateMetaData transformTemplate = IndexTemplateMetaData.builder(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME) - .patterns(Collections.singletonList(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) - .version(Version.CURRENT.id) - .settings(Settings.builder() - // the configurations are expected to be small - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")) - .putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(mappings())) - .build(); + .patterns(Collections.singletonList(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) + .version(Version.CURRENT.id) + .settings( + Settings.builder() + // the configurations are expected to be small + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1") + ) + .putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(mappings())) + .build(); return transformTemplate; } @@ -92,10 +96,12 @@ public static IndexTemplateMetaData getAuditIndexTemplateMetaData() throws IOExc IndexTemplateMetaData transformTemplate = IndexTemplateMetaData.builder(TransformInternalIndexConstants.AUDIT_INDEX) .patterns(Collections.singletonList(TransformInternalIndexConstants.AUDIT_INDEX_PREFIX + "*")) .version(Version.CURRENT.id) - .settings(Settings.builder() - // the audits are expected to be small - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")) + .settings( + Settings.builder() + // the audits are expected to be small + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1") + ) .putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(auditMappings())) .putAlias(AliasMetaData.builder(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS)) .build(); @@ -107,26 +113,27 @@ private static XContentBuilder auditMappings() throws IOException { builder.startObject(SINGLE_MAPPING_NAME); addMetaInformation(builder); builder.field(DYNAMIC, "false"); - builder.startObject(PROPERTIES) - .startObject(TRANSFORM_ID) - .field(TYPE, KEYWORD) - .endObject() - .startObject(AbstractAuditMessage.LEVEL.getPreferredName()) - .field(TYPE, KEYWORD) - .endObject() - .startObject(AbstractAuditMessage.MESSAGE.getPreferredName()) - .field(TYPE, TEXT) - .startObject(FIELDS) - .startObject(RAW) - .field(TYPE, KEYWORD) - .endObject() - .endObject() + builder + .startObject(PROPERTIES) + .startObject(TRANSFORM_ID) + .field(TYPE, KEYWORD) + .endObject() + .startObject(AbstractAuditMessage.LEVEL.getPreferredName()) + .field(TYPE, KEYWORD) + .endObject() + .startObject(AbstractAuditMessage.MESSAGE.getPreferredName()) + .field(TYPE, TEXT) + .startObject(FIELDS) + .startObject(RAW) + .field(TYPE, KEYWORD) + .endObject() + .endObject() .endObject() .startObject(AbstractAuditMessage.TIMESTAMP.getPreferredName()) - .field(TYPE, DATE) + .field(TYPE, DATE) .endObject() .startObject(AbstractAuditMessage.NODE_NAME.getPreferredName()) - .field(TYPE, KEYWORD) + .field(TYPE, KEYWORD) .endObject() .endObject() .endObject() @@ -167,7 +174,6 @@ public static XContentBuilder mappings(XContentBuilder builder) throws IOExcepti return builder; } - private static XContentBuilder addTransformStoredDocMappings(XContentBuilder builder) throws IOException { return builder .startObject(TransformStoredDoc.STATE_FIELD.getPreferredName()) @@ -254,9 +260,6 @@ private static XContentBuilder addTransformStoredDocMappings(XContentBuilder bui .endObject() .endObject() .endObject(); - // This is obsolete and can be removed for future versions of the index, but is left here as a warning/reminder that - // we cannot declare this field differently in version 1 of the internal index as it would cause a mapping clash - // .startObject("checkpointing").field(ENABLED, false).endObject(); } public static XContentBuilder addTransformsConfigMappings(XContentBuilder builder) throws IOException { @@ -299,6 +302,9 @@ private static XContentBuilder addTransformCheckpointMappings(XContentBuilder bu .endObject() .startObject(TransformField.TIME_UPPER_BOUND_MILLIS.getPreferredName()) .field(TYPE, DATE) + .endObject() + .startObject(TransformCheckpoint.CHECKPOINT.getPreferredName()) + .field(TYPE, LONG) .endObject(); } @@ -310,9 +316,7 @@ private static XContentBuilder addTransformCheckpointMappings(XContentBuilder bu * @throws IOException On write error */ private static XContentBuilder addMetaInformation(XContentBuilder builder) throws IOException { - return builder.startObject("_meta") - .field("version", Version.CURRENT) - .endObject(); + return builder.startObject("_meta").field("version", Version.CURRENT).endObject(); } /** diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 01c2b8c4329b1..0848fc21217e5 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -79,8 +79,14 @@ private enum RunState { public static final int MINIMUM_PAGE_SIZE = 10; public static final String COMPOSITE_AGGREGATION_NAME = "_transform"; + private static final Logger logger = LogManager.getLogger(TransformIndexer.class); + // constant for checkpoint retention, static for now + private static final long NUMBER_OF_CHECKPOINTS_TO_KEEP = 10; + private static final long RETENTION_OF_CHECKPOINTS_MS = 864000000L; // 10 days + private static final long CHECKPOINT_CLEANUP_INTERVAL = 100L; // every 100 checkpoints + protected final TransformConfigManager transformsConfigManager; private final CheckpointProvider checkpointProvider; private final TransformProgressGatherer progressGatherer; @@ -111,6 +117,8 @@ private enum RunState { private volatile Map> changedBuckets; private volatile Map changedBucketsAfterKey; + private volatile long lastCheckpointCleanup = 0L; + public TransformIndexer( Executor executor, TransformConfigManager transformsConfigManager, @@ -375,7 +383,13 @@ protected void onFinish(ActionListener listener) { if (context.shouldStopAtCheckpoint()) { stop(); } - listener.onResponse(null); + + if (checkpoint - lastCheckpointCleanup > CHECKPOINT_CLEANUP_INTERVAL) { + // delete old checkpoints, on a failure we keep going + cleanupOldCheckpoints(listener); + } else { + listener.onResponse(null); + } } catch (Exception e) { listener.onFailure(e); } @@ -492,6 +506,44 @@ synchronized void handleFailure(Exception e) { } } + /** + * Cleanup old checkpoints + * + * @param listener listener to call after done + */ + private void cleanupOldCheckpoints(ActionListener listener) { + long now = getTime(); + long checkpointLowerBound = context.getCheckpoint() - NUMBER_OF_CHECKPOINTS_TO_KEEP; + long lowerBoundEpochMs = now - RETENTION_OF_CHECKPOINTS_MS; + + if (checkpointLowerBound > 0 && lowerBoundEpochMs > 0) { + transformsConfigManager.deleteOldCheckpoints( + transformConfig.getId(), + checkpointLowerBound, + lowerBoundEpochMs, + ActionListener.wrap(deletes -> { + logger.debug("[{}] deleted [{}] outdated checkpoints", getJobId(), deletes); + listener.onResponse(null); + lastCheckpointCleanup = context.getCheckpoint(); + }, e -> { + logger.warn( + new ParameterizedMessage("[{}] failed to cleanup old checkpoints, retrying after next checkpoint", getJobId()), + e + ); + auditor.warning( + getJobId(), + "Failed to cleanup old checkpoints, retrying after next checkpoint. Exception: " + e.getMessage() + ); + + listener.onResponse(null); + }) + ); + } else { + logger.debug("[{}] checked for outdated checkpoints", getJobId()); + listener.onResponse(null); + } + } + private void sourceHasChanged(ActionListener hasChangedListener) { checkpointProvider.sourceHasChanged(getLastCheckpoint(), ActionListener.wrap(hasChanged -> { logger.trace("[{}] change detected [{}].", getJobId(), hasChanged); @@ -788,6 +840,13 @@ protected void failIndexer(String failureMessage) { context.markAsFailed(failureMessage); } + /* + * Get the current time, abstracted for the purpose of testing + */ + long getTime() { + return System.currentTimeMillis(); + } + /** * Indicates if an audit message should be written when onFinish is called for the given checkpoint * We audit the first checkpoint, and then every 10 checkpoints until completedCheckpoint == 99 diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/InMemoryTransformConfigManager.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/InMemoryTransformConfigManager.java index 3b084c01a8a46..c52e8dea8fde2 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/InMemoryTransformConfigManager.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/InMemoryTransformConfigManager.java @@ -81,6 +81,16 @@ public void deleteOldTransformStoredDocuments(String transformId, ActionListener listener.onResponse(true); } + @Override + public void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener listener) { + List checkpointsById = checkpoints.get(transformId); + int sizeBeforeDelete = checkpointsById.size(); + if (checkpointsById != null) { + checkpointsById.removeIf(cp -> { return cp.getCheckpoint() < deleteCheckpointsBelow && cp.getTimestamp() < deleteOlderThan; }); + } + listener.onResponse(Long.valueOf(sizeBeforeDelete - checkpointsById.size())); + } + @Override public void getTransformCheckpoint(String transformId, long checkpoint, ActionListener resultListener) { List checkpointsById = checkpoints.get(transformId); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManagerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManagerTests.java index c3248a56afe92..818941bac5837 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManagerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManagerTests.java @@ -425,4 +425,94 @@ public void testDeleteOldTransformStoredDocuments() throws Exception { is(true) ); } + + public void testDeleteOldCheckpoints() throws InterruptedException { + String transformId = randomAlphaOfLengthBetween(1, 10); + long timestamp = System.currentTimeMillis() - randomLongBetween(20000, 40000); + + // create some other docs to check they are not getting accidentally deleted + TransformStoredDoc storedDocs = TransformStoredDocTests.randomTransformStoredDoc(transformId); + SeqNoPrimaryTermAndIndex firstIndex = new SeqNoPrimaryTermAndIndex(0, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME); + assertAsync(listener -> transformConfigManager.putOrUpdateTransformStoredDoc(storedDocs, null, listener), firstIndex, null, null); + + TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId); + assertAsync(listener -> transformConfigManager.putTransformConfiguration(transformConfig, listener), true, null, null); + + // create 100 checkpoints + for (int i = 1; i <= 100; i++) { + TransformCheckpoint checkpoint = new TransformCheckpoint( + transformId, + timestamp + i * 200, + i, + Collections.emptyMap(), + timestamp - 100 + i * 200 + ); + assertAsync(listener -> transformConfigManager.putTransformCheckpoint(checkpoint, listener), true, null, null); + } + + // read a random checkpoint + int randomCheckpoint = randomIntBetween(1, 100); + TransformCheckpoint checkpointExpected = new TransformCheckpoint( + transformId, + timestamp + randomCheckpoint * 200, + randomCheckpoint, + Collections.emptyMap(), + timestamp - 100 + randomCheckpoint * 200 + ); + + assertAsync( + listener -> transformConfigManager.getTransformCheckpoint(transformId, randomCheckpoint, listener), + checkpointExpected, + null, + null + ); + + // test delete based on checkpoint number (time would allow more) + assertAsync( + listener -> transformConfigManager.deleteOldCheckpoints(transformId, 11L, timestamp + 1 + 20L * 200, listener), + 10L, + null, + null + ); + + // test delete based on time (checkpoint number would allow more) + assertAsync( + listener -> transformConfigManager.deleteOldCheckpoints(transformId, 30L, timestamp + 1 + 20L * 200, listener), + 10L, + null, + null + ); + + // zero delete + assertAsync( + listener -> transformConfigManager.deleteOldCheckpoints(transformId, 30L, timestamp + 1 + 20L * 200, listener), + 0L, + null, + null + ); + + // delete the rest + assertAsync( + listener -> transformConfigManager.deleteOldCheckpoints(transformId, 101L, timestamp + 1 + 100L * 200, listener), + 80L, + null, + null + ); + + // test that the other docs are still there + assertAsync( + listener -> transformConfigManager.getTransformStoredDoc(transformId, listener), + Tuple.tuple(storedDocs, firstIndex), + null, + null + ); + + assertAsync( + listener -> transformConfigManager.getTransformConfiguration(transformConfig.getId(), listener), + transformConfig, + null, + null + ); + + } }