diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 4d18a8f1006a9..ad885062665eb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -144,6 +144,7 @@ public final class Messages { public static final String JOB_AUDIT_SNAPSHOT_STORED = "Job model snapshot with id [{0}] stored"; public static final String JOB_AUDIT_REVERTED = "Job model snapshot reverted to ''{0}''"; public static final String JOB_AUDIT_SNAPSHOT_DELETED = "Model snapshot [{0}] with description ''{1}'' deleted"; + public static final String JOB_AUDIT_SNAPSHOTS_DELETED = "[{0}] expired model snapshots deleted"; public static final String JOB_AUDIT_FILTER_UPDATED_ON_PROCESS = "Updated filter [{0}] in running process"; public static final String JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS = "Updated calendars in running process"; public static final String JOB_AUDIT_MEMORY_STATUS_SOFT_LIMIT = "Job memory status changed to soft_limit; memory pruning will now be " + diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java index 4e4d58105c494..80b4eab384e41 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java @@ -238,6 +238,10 @@ public Date getLatestResultTimeStamp() { return latestResultTimeStamp; } + public boolean isRetain() { + return retain; + } + @Override public int hashCode() { return Objects.hash(jobId, minVersion, timestamp, description, snapshotId, quantiles, snapshotDocCount, modelSizeStats, diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 84dc8f84ead49..06ee0e48b6fd3 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -137,7 +137,6 @@ public void testDeleteExpiredDataActionDeletesEmptyStateIndices() throws Excepti is(arrayContaining(".ml-state-000001", ".ml-state-000005", ".ml-state-000007"))); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/62699") public void testDeleteExpiredDataWithStandardThrottle() throws Exception { testExpiredDeletion(-1.0f, 100); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index d8c97d165da0d..7f04f31810d5d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.SearchAfterJobsIterator; import org.elasticsearch.xpack.ml.job.retention.EmptyStateIndexRemover; import org.elasticsearch.xpack.ml.job.retention.ExpiredForecastsRemover; @@ -60,18 +61,19 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction createDataRemovers(OriginSettingClient client, new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), parentTaskId, auditor, threadPool), new ExpiredForecastsRemover(client, threadPool, parentTaskId), new ExpiredModelSnapshotsRemover(client, - new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), threadPool, parentTaskId), + new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), threadPool, parentTaskId, jobResultsProvider, auditor), new UnusedStateRemover(client, clusterService, parentTaskId), new EmptyStateIndexRemover(client, parentTaskId), new UnusedStatsRemover(client, parentTaskId)); @@ -185,7 +188,11 @@ private List createDataRemovers(List jobs, TaskId parentTask return Arrays.asList( new ExpiredResultsRemover(client, new VolatileCursorIterator<>(jobs), parentTaskId, auditor, threadPool), new ExpiredForecastsRemover(client, threadPool, parentTaskId), - new ExpiredModelSnapshotsRemover(client, new VolatileCursorIterator<>(jobs), threadPool, parentTaskId), + new ExpiredModelSnapshotsRemover(client, + new VolatileCursorIterator<>(jobs), + threadPool, parentTaskId, + jobResultsProvider, + auditor), new UnusedStateRemover(client, clusterService, parentTaskId), new EmptyStateIndexRemover(client, parentTaskId), new UnusedStatsRemover(client, parentTaskId)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index ac9566bb2f731..7b416dd1b2dee 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -7,13 +7,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ThreadedActionListener; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; @@ -25,21 +23,24 @@ import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.action.util.QueryPage; import org.elasticsearch.xpack.core.common.time.TimeUtils; -import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; +import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; -import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * Deletes all model snapshots that have expired the configured retention time @@ -65,11 +66,16 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover private static final int MODEL_SNAPSHOT_SEARCH_SIZE = 10000; private final ThreadPool threadPool; + private final JobResultsProvider jobResultsProvider; + private final AnomalyDetectionAuditor auditor; public ExpiredModelSnapshotsRemover(OriginSettingClient client, Iterator jobIterator, - ThreadPool threadPool, TaskId parentTaskId) { + ThreadPool threadPool, TaskId parentTaskId, JobResultsProvider jobResultsProvider, + AnomalyDetectionAuditor auditor) { super(client, jobIterator, parentTaskId); this.threadPool = Objects.requireNonNull(threadPool); + this.jobResultsProvider = jobResultsProvider; + this.auditor = auditor; } @Override @@ -157,66 +163,54 @@ protected void removeDataBefore( listener.onResponse(true); return; } - LOGGER.debug("Considering model snapshots of job [{}] that have a timestamp before [{}] for removal", job.getId(), cutoffEpochMs); - - SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())); - - QueryBuilder activeSnapshotFilter = QueryBuilders.termQuery( - ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), job.getModelSnapshotId()); - QueryBuilder retainFilter = QueryBuilders.termQuery(ModelSnapshot.RETAIN.getPreferredName(), true); - QueryBuilder query = createQuery(job.getId(), cutoffEpochMs) - .filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName())) - .mustNot(activeSnapshotFilter) - .mustNot(retainFilter); - - SearchSourceBuilder source = new SearchSourceBuilder(); - source.query(query); - source.size(MODEL_SNAPSHOT_SEARCH_SIZE); - source.sort(ModelSnapshot.TIMESTAMP.getPreferredName()); - source.fetchSource(false); - source.docValueField(Job.ID.getPreferredName(), null); - source.docValueField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), null); - source.docValueField(ModelSnapshot.TIMESTAMP.getPreferredName(), "epoch_millis"); - searchRequest.source(source); - searchRequest.setParentTask(getParentTaskId()); + LOGGER.debug(() -> new ParameterizedMessage( + "Considering model snapshots of job [{}] that have a timestamp before [{}] for removal", + job.getId(), + cutoffEpochMs)); long deleteAllBeforeMs = (job.getModelSnapshotRetentionDays() == null) ? 0 : latestTimeMs - TimeValue.timeValueDays(job.getModelSnapshotRetentionDays()).getMillis(); - client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool, - MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), deleteAllBeforeMs, listener), false)); + ActionListener> snapshotsListener = expiredSnapshotsListener(job, deleteAllBeforeMs, listener); + jobResultsProvider.modelSnapshots(job.getId(), + 0, + MODEL_SNAPSHOT_SEARCH_SIZE, + null, + String.valueOf(cutoffEpochMs), + ModelSnapshot.TIMESTAMP.getPreferredName(), + false, + null, + snapshotsListener::onResponse, + snapshotsListener::onFailure); } - private ActionListener expiredSnapshotsListener(String jobId, long deleteAllBeforeMs, - ActionListener listener) { + private ActionListener> expiredSnapshotsListener(Job job, + long deleteAllBeforeMs, + ActionListener listener) { return new ActionListener<>() { @Override - public void onResponse(SearchResponse searchResponse) { + public void onResponse(QueryPage searchResponse) { long nextToKeepMs = deleteAllBeforeMs; try { - List snapshotIds = new ArrayList<>(); - for (SearchHit hit : searchResponse.getHits()) { - String timestamp = stringFieldValueOrNull(hit, ModelSnapshot.TIMESTAMP.getPreferredName()); - if (timestamp == null) { - LOGGER.warn("Model snapshot document [{}] has a null timestamp field", hit.getId()); + List snapshots = new ArrayList<>(); + for (ModelSnapshot snapshot: searchResponse.results()) { + // We don't want to delete the currently used snapshot or a snapshot marked to be retained + if (snapshot.getSnapshotId().equals(job.getModelSnapshotId()) || snapshot.isRetain()) { continue; } - long timestampMs = TimeUtils.parseToEpochMs(timestamp); + if (snapshot.getTimestamp() == null) { + LOGGER.warn("Model snapshot document [{}] has a null timestamp field", snapshot.getSnapshotId()); + continue; + } + long timestampMs = snapshot.getTimestamp().getTime(); if (timestampMs >= nextToKeepMs) { do { nextToKeepMs += MS_IN_ONE_DAY; } while (timestampMs >= nextToKeepMs); continue; } - JobSnapshotId idPair = new JobSnapshotId( - stringFieldValueOrNull(hit, Job.ID.getPreferredName()), - stringFieldValueOrNull(hit, ModelSnapshotField.SNAPSHOT_ID.getPreferredName())); - - if (idPair.hasNullValue() == false) { - snapshotIds.add(idPair); - } + snapshots.add(snapshot); } - deleteModelSnapshots(new VolatileCursorIterator<>(snapshotIds), listener); + deleteModelSnapshots(snapshots, job.getId(), listener); } catch (Exception e) { onFailure(e); } @@ -224,49 +218,30 @@ public void onResponse(SearchResponse searchResponse) { @Override public void onFailure(Exception e) { - listener.onFailure(new ElasticsearchException("[" + jobId + "] Search for expired snapshots failed", e)); + listener.onFailure(new ElasticsearchException("[{}] Search for expired snapshots failed", e, job.getId())); } }; } - private void deleteModelSnapshots(Iterator modelSnapshotIterator, ActionListener listener) { - if (modelSnapshotIterator.hasNext() == false) { + private void deleteModelSnapshots(List modelSnapshots, String jobId, ActionListener listener) { + if (modelSnapshots.isEmpty()) { listener.onResponse(true); return; } - JobSnapshotId idPair = modelSnapshotIterator.next(); - DeleteModelSnapshotAction.Request deleteSnapshotRequest = - new DeleteModelSnapshotAction.Request(idPair.jobId, idPair.snapshotId); - deleteSnapshotRequest.setParentTask(getParentTaskId()); - client.execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener<>() { - @Override - public void onResponse(AcknowledgedResponse response) { - try { - deleteModelSnapshots(modelSnapshotIterator, listener); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(new ElasticsearchException("[" + idPair.jobId + "] Failed to delete snapshot [" - + idPair.snapshotId + "]", e)); - } - }); + JobDataDeleter deleter = new JobDataDeleter(client, jobId); + deleter.deleteModelSnapshots(modelSnapshots, ActionListener.wrap( + bulkResponse -> { + auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOTS_DELETED, modelSnapshots.size())); + LOGGER.debug(() -> new ParameterizedMessage( + "[{}] deleted model snapshots {} with descriptions {}", + jobId, + modelSnapshots.stream().map(ModelSnapshot::getSnapshotId).collect(Collectors.toList()), + modelSnapshots.stream().map(ModelSnapshot::getDescription).collect(Collectors.toList()) + )); + listener.onResponse(true); + }, + listener::onFailure + )); } - static class JobSnapshotId { - private final String jobId; - private final String snapshotId; - - JobSnapshotId(String jobId, String snapshotId) { - this.jobId = jobId; - this.snapshotId = snapshotId; - } - - boolean hasNullValue() { - return jobId == null || snapshotId == null; - } - } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java index 7c3fcbfe975be..2a424763c44e8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.retention.MlDataRemover; import org.junit.After; import org.junit.Before; @@ -56,7 +57,9 @@ public void setup() { Client client = mock(Client.class); ClusterService clusterService = mock(ClusterService.class); transportDeleteExpiredDataAction = new TransportDeleteExpiredDataAction(threadPool, ThreadPool.Names.SAME, transportService, - new ActionFilters(Collections.emptySet()), client, clusterService, mock(JobConfigProvider.class), Clock.systemUTC()); + new ActionFilters(Collections.emptySet()), client, clusterService, mock(JobConfigProvider.class), + mock(JobResultsProvider.class), + Clock.systemUTC()); } @After diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index 263e56ce7c615..bf3aa6e1849c7 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -12,19 +12,26 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.IdsQueryBuilder; +import org.elasticsearch.index.reindex.DeleteByQueryAction; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.search.SearchHit; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; -import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; +import org.elasticsearch.xpack.core.action.util.QueryPage; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; +import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.test.MockOriginSettingClient; import org.elasticsearch.xpack.ml.test.SearchHitBuilder; import org.junit.Before; @@ -36,15 +43,21 @@ import java.util.Arrays; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import static org.elasticsearch.xpack.ml.job.retention.AbstractExpiredJobDataRemoverTests.TestListener; +import static org.hamcrest.Matchers.arrayContainingInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doAnswer; @@ -56,18 +69,20 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { private Client client; + private JobResultsProvider resultsProvider; private OriginSettingClient originSettingClient; - private List capturedSearchRequests; - private List capturedDeleteModelSnapshotRequests; + private List capturedJobIds; + private List capturedDeleteModelSnapshotRequests; private TestListener listener; @Before public void setUpTests() { - capturedSearchRequests = new ArrayList<>(); + capturedJobIds = new ArrayList<>(); capturedDeleteModelSnapshotRequests = new ArrayList<>(); client = mock(Client.class); originSettingClient = MockOriginSettingClient.mockOriginSettingClient(client, ClientHelper.ML_ORIGIN); + resultsProvider = mock(JobResultsProvider.class); listener = new TestListener(); } @@ -77,7 +92,7 @@ public void testRemove_GivenJobWithoutActiveSnapshot() throws IOException { List responses = Collections.singletonList( AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList())); - givenClientRequestsSucceed(responses); + givenClientRequestsSucceed(responses, Collections.emptyMap()); createExpiredModelSnapshotsRemover(jobs.iterator()).remove(1.0f, listener, () -> false); @@ -97,35 +112,41 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() { Date oneDayAgo = new Date(now.getTime() - TimeValue.timeValueDays(1).getMillis()); SearchHit snapshot1_1 = createModelSnapshotQueryHit("job-1", "fresh-snapshot", oneDayAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1))); + SearchHit snapshot2_1 = createModelSnapshotQueryHit("job-2", "fresh-snapshot", oneDayAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_1))); // It needs to be strictly more than 7 days before the most recent snapshot, hence the extra millisecond Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1); - SearchHit snapshotToBeDeleted = createModelSnapshotQueryHit("job-1", "old-snapshot", eightDaysAndOneMsAgo); - - - searchResponses.add( - AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshotToBeDeleted))); - - SearchHit snapshot2_1 = createModelSnapshotQueryHit("job-1", "snapshots-1_1", eightDaysAndOneMsAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_1))); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.emptyList())); - - givenClientRequestsSucceed(searchResponses); + Map> snapshotResponses = new HashMap<>(); + snapshotResponses.put("job-1", + Arrays.asList( + // Keeping active as its expiration is not known. We can assume "worst case" and verify it is not removed + createModelSnapshot("job-1", "active", eightDaysAndOneMsAgo), + createModelSnapshot("job-1", "old-snapshot", eightDaysAndOneMsAgo) + )); + // Retention days for job-2 is 17 days, consequently, its query should return anything as we don't ask for snapshots + // created AFTER 17 days ago + snapshotResponses.put("job-2", Collections.emptyList()); + givenClientRequestsSucceed(searchResponses, snapshotResponses); createExpiredModelSnapshotsRemover(jobs.iterator()).remove(1.0f, listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); - assertThat(capturedSearchRequests.size(), equalTo(4)); - SearchRequest searchRequest = capturedSearchRequests.get(1); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("job-1")})); - searchRequest = capturedSearchRequests.get(3); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("job-2")})); + assertThat(capturedJobIds.size(), equalTo(2)); + assertThat(capturedJobIds.get(0), equalTo("job-1")); + assertThat(capturedJobIds.get(1), equalTo("job-2")); assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1)); - DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); - assertThat(deleteSnapshotRequest.getJobId(), equalTo("job-1")); - assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("old-snapshot")); + DeleteByQueryRequest deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); + assertThat(deleteSnapshotRequest.indices(), + arrayContainingInAnyOrder(AnomalyDetectorsIndex.jobResultsAliasedName("job-1"), + AnomalyDetectorsIndex.jobStateIndexPattern(), + AnnotationIndex.READ_ALIAS_NAME)); + assertThat(deleteSnapshotRequest.getSearchRequest().source().query() instanceof IdsQueryBuilder, is(true)); + IdsQueryBuilder idsQueryBuilder = (IdsQueryBuilder)deleteSnapshotRequest.getSearchRequest().source().query(); + assertTrue("expected ids related to [old-snapshot] but received [" + idsQueryBuilder.ids() + "]", + idsQueryBuilder.ids().stream().allMatch(s -> s.contains("old-snapshot"))); } public void testRemove_GivenTimeout() throws IOException { @@ -141,8 +162,13 @@ public void testRemove_GivenTimeout() throws IOException { List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1", now)); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); + HashMap> snapshots = new HashMap<>(); - givenClientRequestsSucceed(searchResponses); + // ALl snapshots are "now" and the retention days is much longer than that. + // So, getting the snapshots should return empty for both + snapshots.put("snapshots-1", Collections.emptyList()); + snapshots.put("snapshots-2", Collections.emptyList()); + givenClientRequestsSucceed(searchResponses, snapshots); final int timeoutAfter = randomIntBetween(0, 1); AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); @@ -160,16 +186,12 @@ public void testRemove_GivenClientSearchRequestsFail() { JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() ); - givenClientSearchRequestsFail(searchResponses); + givenClientSearchRequestsFail(searchResponses, Collections.emptyMap()); createExpiredModelSnapshotsRemover(jobs.iterator()).remove(1.0f, listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(false)); - assertThat(capturedSearchRequests.size(), equalTo(1)); - SearchRequest searchRequest = capturedSearchRequests.get(0); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); - assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(0)); } @@ -182,33 +204,39 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() { Date now = new Date(); Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis()); + Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1); SearchHit snapshot1_1 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_1", oneDayAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1))); - - // It needs to be strictly more than 7 days before the most recent snapshot, hence the extra millisecond - Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1); - List snapshots1JobSnapshots = Arrays.asList( - snapshot1_1, - createModelSnapshotQueryHit("snapshots-1", "snapshots-1_2", eightDaysAndOneMsAgo)); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(snapshots1JobSnapshots)); + Map> snapshots = new HashMap<>(); + // Should only return the one from 8 days ago + snapshots.put("snapshots-1", Collections.singletonList( + createModelSnapshot("snapshots-1", "snapshots-1_2", eightDaysAndOneMsAgo) + )); + // Shouldn't return anything as retention is 17 days + snapshots.put("snapshots-2", Collections.emptyList()); SearchHit snapshot2_2 = createModelSnapshotQueryHit("snapshots-2", "snapshots-2_1", eightDaysAndOneMsAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_2))); - givenClientDeleteModelSnapshotRequestsFail(searchResponses); + givenClientDeleteModelSnapshotRequestsFail(searchResponses, snapshots); createExpiredModelSnapshotsRemover(jobs.iterator()).remove(1.0f, listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(false)); - assertThat(capturedSearchRequests.size(), equalTo(2)); - SearchRequest searchRequest = capturedSearchRequests.get(1); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); + assertThat(capturedJobIds.size(), equalTo(1)); + assertThat(capturedJobIds.get(0), equalTo("snapshots-1")); assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1)); - DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); - assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1")); - assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_2")); + DeleteByQueryRequest deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); + assertThat(deleteSnapshotRequest.indices(), + arrayContainingInAnyOrder(AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1"), + AnomalyDetectorsIndex.jobStateIndexPattern(), + AnnotationIndex.READ_ALIAS_NAME)); + assertThat(deleteSnapshotRequest.getSearchRequest().source().query() instanceof IdsQueryBuilder, is(true)); + IdsQueryBuilder idsQueryBuilder = (IdsQueryBuilder)deleteSnapshotRequest.getSearchRequest().source().query(); + assertTrue("expected ids related to [snapshots-1_2] but received [" + idsQueryBuilder.ids() + "]", + idsQueryBuilder.ids().stream().allMatch(s -> s.contains("snapshots-1_2"))); } @SuppressWarnings("unchecked") @@ -219,7 +247,10 @@ public void testCalcCutoffEpochMs() { SearchHit snapshot1_1 = createModelSnapshotQueryHit("job-1", "newest-snapshot", oneDayAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1))); - givenClientRequests(searchResponses, true, true); + givenClientRequests(searchResponses, + true, + true, + Collections.singletonMap("job-1", Collections.singletonList(createModelSnapshot("job-1", "newest-snapshot", oneDayAgo)))); long retentionDays = 3L; ActionListener cutoffListener = mock(ActionListener.class); @@ -230,17 +261,6 @@ public void testCalcCutoffEpochMs() { verify(cutoffListener).onResponse(eq(new AbstractExpiredJobDataRemover.CutoffDetails(oneDayAgo.getTime(), expectedCutoffTime))); } - public void testJobSnapshotId() { - ExpiredModelSnapshotsRemover.JobSnapshotId id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", "b"); - assertFalse(id.hasNullValue()); - id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, "b"); - assertTrue(id.hasNullValue()); - id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", null); - assertTrue(id.hasNullValue()); - id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, null); - assertTrue(id.hasNullValue()); - } - private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover(Iterator jobIterator) { ThreadPool threadPool = mock(ThreadPool.class); ExecutorService executor = mock(ExecutorService.class); @@ -253,7 +273,13 @@ private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover(Iterator return null; } ).when(executor).execute(any()); - return new ExpiredModelSnapshotsRemover(originSettingClient, jobIterator, threadPool, new TaskId("test", 0L)); + return new ExpiredModelSnapshotsRemover( + originSettingClient, + jobIterator, + threadPool, + new TaskId("test", 0L), + resultsProvider, + mock(AnomalyDetectionAuditor.class)); } private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId, Date date) { @@ -269,21 +295,26 @@ private static SearchHit createModelSnapshotQueryHit(String jobId, String snapsh return hitBuilder.build(); } - private void givenClientRequestsSucceed(List searchResponses) { - givenClientRequests(searchResponses, true, true); + private void givenClientRequestsSucceed(List searchResponses, + Map> snapshots) { + givenClientRequests(searchResponses, true, true, snapshots); } - private void givenClientSearchRequestsFail(List searchResponses) { - givenClientRequests(searchResponses, false, true); + private void givenClientSearchRequestsFail(List searchResponses, + Map> snapshots) { + givenClientRequests(searchResponses, false, true, snapshots); } - private void givenClientDeleteModelSnapshotRequestsFail(List searchResponses) { - givenClientRequests(searchResponses, true, false); + private void givenClientDeleteModelSnapshotRequestsFail(List searchResponses, + Map> snapshots) { + givenClientRequests(searchResponses, true, false, snapshots); } @SuppressWarnings("unchecked") private void givenClientRequests(List searchResponses, - boolean shouldSearchRequestsSucceed, boolean shouldDeleteSnapshotRequestsSucceed) { + boolean shouldSearchRequestsSucceed, + boolean shouldDeleteSnapshotRequestsSucceed, + Map> snapshots) { doAnswer(new Answer() { AtomicInteger callCount = new AtomicInteger(); @@ -293,9 +324,8 @@ public Void answer(InvocationOnMock invocationOnMock) { ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; SearchRequest searchRequest = (SearchRequest) invocationOnMock.getArguments()[1]; - capturedSearchRequests.add(searchRequest); // Only the last search request should fail - if (shouldSearchRequestsSucceed || callCount.get() < searchResponses.size()) { + if (shouldSearchRequestsSucceed || callCount.get() < (searchResponses.size() + snapshots.size())) { SearchResponse response = searchResponses.get(callCount.getAndIncrement()); listener.onResponse(response); } else { @@ -306,7 +336,7 @@ public Void answer(InvocationOnMock invocationOnMock) { }).when(client).execute(same(SearchAction.INSTANCE), any(), any()); doAnswer(invocationOnMock -> { - capturedDeleteModelSnapshotRequests.add((DeleteModelSnapshotAction.Request) invocationOnMock.getArguments()[1]); + capturedDeleteModelSnapshotRequests.add((DeleteByQueryRequest) invocationOnMock.getArguments()[1]); ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; if (shouldDeleteSnapshotRequestsSucceed) { @@ -316,6 +346,34 @@ public Void answer(InvocationOnMock invocationOnMock) { } return null; } - ).when(client).execute(same(DeleteModelSnapshotAction.INSTANCE), any(), any()); + ).when(client).execute(same(DeleteByQueryAction.INSTANCE), any(), any()); + + for (Map.Entry> snapshot : snapshots.entrySet()) { + doAnswer(new Answer() { + AtomicInteger callCount = new AtomicInteger(); + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + capturedJobIds.add((String)invocationOnMock.getArguments()[0]); + Consumer> listener = (Consumer>) invocationOnMock.getArguments()[8]; + Consumer failure = (Consumer) invocationOnMock.getArguments()[9]; + if (shouldSearchRequestsSucceed || callCount.get() < snapshots.size()) { + callCount.incrementAndGet(); + listener.accept(new QueryPage<>(snapshot.getValue(), 10, new ParseField("snapshots"))); + } else { + failure.accept(new RuntimeException("search failed")); + } + return null; } + }).when(resultsProvider).modelSnapshots(eq(snapshot.getKey()), + anyInt(), + anyInt(), + any(), + any(), + any(), + anyBoolean(), + any(), + any(), + any()); + } + } }