From 5cc7c45473510b57140232293977ba8628567bd1 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 1 Oct 2020 11:41:53 -0400 Subject: [PATCH 1/6] [ML] optimize delete expired snapshots When deleting expired snapshots, we do an individual delete action per snapshot per job. We should instead gather the expired snapshots and delete them in a single call. This commit achieves this and a side-effect is there is less audit log spam on nightly cleanup closes https://github.com/elastic/elasticsearch/issues/62875 --- .../xpack/core/ml/job/messages/Messages.java | 1 + .../TransportDeleteExpiredDataAction.java | 17 ++- .../ExpiredModelSnapshotsRemover.java | 95 ++++++++------- ...TransportDeleteExpiredDataActionTests.java | 5 +- .../ExpiredModelSnapshotsRemoverTests.java | 113 ++++++++++++------ 5 files changed, 142 insertions(+), 89 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 4d18a8f1006a9..222aafbfde479 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -144,6 +144,7 @@ public final class Messages { public static final String JOB_AUDIT_SNAPSHOT_STORED = "Job model snapshot with id [{0}] stored"; public static final String JOB_AUDIT_REVERTED = "Job model snapshot reverted to ''{0}''"; public static final String JOB_AUDIT_SNAPSHOT_DELETED = "Model snapshot [{0}] with description ''{1}'' deleted"; + public static final String JOB_AUDIT_SNAPSHOTS_DELETED = "Model snapshots {0} with descriptions {1} deleted"; public static final String JOB_AUDIT_FILTER_UPDATED_ON_PROCESS = "Updated filter [{0}] in running process"; public static final String JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS = "Updated calendars in running process"; public static final String JOB_AUDIT_MEMORY_STATUS_SOFT_LIMIT = "Job memory status changed to soft_limit; memory pruning will now be " + diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index d8c97d165da0d..7f04f31810d5d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.SearchAfterJobsIterator; import org.elasticsearch.xpack.ml.job.retention.EmptyStateIndexRemover; import org.elasticsearch.xpack.ml.job.retention.ExpiredForecastsRemover; @@ -60,18 +61,19 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction createDataRemovers(OriginSettingClient client, new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), parentTaskId, auditor, threadPool), new ExpiredForecastsRemover(client, threadPool, parentTaskId), new ExpiredModelSnapshotsRemover(client, - new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), threadPool, parentTaskId), + new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), threadPool, parentTaskId, jobResultsProvider, auditor), new UnusedStateRemover(client, clusterService, parentTaskId), new EmptyStateIndexRemover(client, parentTaskId), new UnusedStatsRemover(client, parentTaskId)); @@ -185,7 +188,11 @@ private List createDataRemovers(List jobs, TaskId parentTask return Arrays.asList( new ExpiredResultsRemover(client, new VolatileCursorIterator<>(jobs), parentTaskId, auditor, threadPool), new ExpiredForecastsRemover(client, threadPool, parentTaskId), - new ExpiredModelSnapshotsRemover(client, new VolatileCursorIterator<>(jobs), threadPool, parentTaskId), + new ExpiredModelSnapshotsRemover(client, + new VolatileCursorIterator<>(jobs), + threadPool, parentTaskId, + jobResultsProvider, + auditor), new UnusedStateRemover(client, clusterService, parentTaskId), new EmptyStateIndexRemover(client, parentTaskId), new UnusedStatsRemover(client, parentTaskId)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index ac9566bb2f731..84be7721485dc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -7,13 +7,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ThreadedActionListener; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; @@ -26,14 +26,16 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.common.time.TimeUtils; -import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; +import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; -import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; import java.util.ArrayList; import java.util.Iterator; @@ -65,11 +67,16 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover private static final int MODEL_SNAPSHOT_SEARCH_SIZE = 10000; private final ThreadPool threadPool; + private final JobResultsProvider jobResultsProvider; + private final AnomalyDetectionAuditor auditor; public ExpiredModelSnapshotsRemover(OriginSettingClient client, Iterator jobIterator, - ThreadPool threadPool, TaskId parentTaskId) { + ThreadPool threadPool, TaskId parentTaskId, JobResultsProvider jobResultsProvider, + AnomalyDetectionAuditor auditor) { super(client, jobIterator, parentTaskId); this.threadPool = Objects.requireNonNull(threadPool); + this.jobResultsProvider = jobResultsProvider; + this.auditor = auditor; } @Override @@ -157,7 +164,10 @@ protected void removeDataBefore( listener.onResponse(true); return; } - LOGGER.debug("Considering model snapshots of job [{}] that have a timestamp before [{}] for removal", job.getId(), cutoffEpochMs); + LOGGER.debug(() -> new ParameterizedMessage( + "Considering model snapshots of job [{}] that have a timestamp before [{}] for removal", + job.getId(), + cutoffEpochMs)); SearchRequest searchRequest = new SearchRequest(); searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())); @@ -187,14 +197,15 @@ protected void removeDataBefore( MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), deleteAllBeforeMs, listener), false)); } - private ActionListener expiredSnapshotsListener(String jobId, long deleteAllBeforeMs, + private ActionListener expiredSnapshotsListener(String jobId, + long deleteAllBeforeMs, ActionListener listener) { return new ActionListener<>() { @Override public void onResponse(SearchResponse searchResponse) { long nextToKeepMs = deleteAllBeforeMs; try { - List snapshotIds = new ArrayList<>(); + List snapshots = new ArrayList<>(); for (SearchHit hit : searchResponse.getHits()) { String timestamp = stringFieldValueOrNull(hit, ModelSnapshot.TIMESTAMP.getPreferredName()); if (timestamp == null) { @@ -208,15 +219,22 @@ public void onResponse(SearchResponse searchResponse) { } while (timestampMs >= nextToKeepMs); continue; } - JobSnapshotId idPair = new JobSnapshotId( - stringFieldValueOrNull(hit, Job.ID.getPreferredName()), - stringFieldValueOrNull(hit, ModelSnapshotField.SNAPSHOT_ID.getPreferredName())); + String jobId = stringFieldValueOrNull(hit, Job.ID.getPreferredName()); + String snapshotId = stringFieldValueOrNull(hit, ModelSnapshotField.SNAPSHOT_ID.getPreferredName()); - if (idPair.hasNullValue() == false) { - snapshotIds.add(idPair); + if (jobId != null && snapshotId != null) { + jobResultsProvider.getModelSnapshot( + jobId, + snapshotId, + // We are safe to grab this snapshot as the query is by DOC ID and we already filtered out the + // currently active snapshot earlier in the call chain + (shots) -> snapshots.add(shots.result), + (failure) -> + LOGGER.warn(new ParameterizedMessage("[{}] failed to find snapshot [{}]", jobId, snapshotId), failure) + ); } } - deleteModelSnapshots(new VolatileCursorIterator<>(snapshotIds), listener); + deleteModelSnapshots(snapshots, jobId, listener); } catch (Exception e) { onFailure(e); } @@ -224,49 +242,30 @@ public void onResponse(SearchResponse searchResponse) { @Override public void onFailure(Exception e) { - listener.onFailure(new ElasticsearchException("[" + jobId + "] Search for expired snapshots failed", e)); + listener.onFailure(new ElasticsearchException("[{}] Search for expired snapshots failed", e, jobId)); } }; } - private void deleteModelSnapshots(Iterator modelSnapshotIterator, ActionListener listener) { - if (modelSnapshotIterator.hasNext() == false) { + private void deleteModelSnapshots(List modelSnapshots, String jobId, ActionListener listener) { + if (modelSnapshots.isEmpty()) { listener.onResponse(true); return; } - JobSnapshotId idPair = modelSnapshotIterator.next(); - DeleteModelSnapshotAction.Request deleteSnapshotRequest = - new DeleteModelSnapshotAction.Request(idPair.jobId, idPair.snapshotId); - deleteSnapshotRequest.setParentTask(getParentTaskId()); - client.execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener<>() { - @Override - public void onResponse(AcknowledgedResponse response) { - try { - deleteModelSnapshots(modelSnapshotIterator, listener); - } catch (Exception e) { - onFailure(e); - } - } + JobDataDeleter deleter = new JobDataDeleter(client, jobId); + deleter.deleteModelSnapshots(modelSnapshots, ActionListener.wrap( + bulkResponse -> { + String msg = Messages.getMessage( + Messages.JOB_AUDIT_SNAPSHOTS_DELETED, + modelSnapshots.stream().map(ModelSnapshot::getSnapshotId), + modelSnapshots.stream().map(ModelSnapshot::getDescription)); - @Override - public void onFailure(Exception e) { - listener.onFailure(new ElasticsearchException("[" + idPair.jobId + "] Failed to delete snapshot [" - + idPair.snapshotId + "]", e)); - } - }); + auditor.info(jobId, msg); + LOGGER.debug(() -> new ParameterizedMessage("[{}] {}", jobId, msg)); + listener.onResponse(true); + }, + listener::onFailure + )); } - static class JobSnapshotId { - private final String jobId; - private final String snapshotId; - - JobSnapshotId(String jobId, String snapshotId) { - this.jobId = jobId; - this.snapshotId = snapshotId; - } - - boolean hasNullValue() { - return jobId == null || snapshotId == null; - } - } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java index 7c3fcbfe975be..2a424763c44e8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.retention.MlDataRemover; import org.junit.After; import org.junit.Before; @@ -56,7 +57,9 @@ public void setup() { Client client = mock(Client.class); ClusterService clusterService = mock(ClusterService.class); transportDeleteExpiredDataAction = new TransportDeleteExpiredDataAction(threadPool, ThreadPool.Names.SAME, transportService, - new ActionFilters(Collections.emptySet()), client, clusterService, mock(JobConfigProvider.class), Clock.systemUTC()); + new ActionFilters(Collections.emptySet()), client, clusterService, mock(JobConfigProvider.class), + mock(JobResultsProvider.class), + Clock.systemUTC()); } @After diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index 263e56ce7c615..4f48024b53dd8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -13,18 +13,24 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.IdsQueryBuilder; +import org.elasticsearch.index.reindex.DeleteByQueryAction; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.search.SearchHit; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; -import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; +import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; +import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.test.MockOriginSettingClient; import org.elasticsearch.xpack.ml.test.SearchHitBuilder; import org.junit.Before; @@ -40,8 +46,10 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import static org.elasticsearch.xpack.ml.job.retention.AbstractExpiredJobDataRemoverTests.TestListener; +import static org.hamcrest.Matchers.arrayContainingInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; @@ -56,9 +64,10 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { private Client client; + private JobResultsProvider resultsProvider; private OriginSettingClient originSettingClient; private List capturedSearchRequests; - private List capturedDeleteModelSnapshotRequests; + private List capturedDeleteModelSnapshotRequests; private TestListener listener; @Before @@ -68,6 +77,7 @@ public void setUpTests() { client = mock(Client.class); originSettingClient = MockOriginSettingClient.mockOriginSettingClient(client, ClientHelper.ML_ORIGIN); + resultsProvider = mock(JobResultsProvider.class); listener = new TestListener(); } @@ -77,7 +87,7 @@ public void testRemove_GivenJobWithoutActiveSnapshot() throws IOException { List responses = Collections.singletonList( AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList())); - givenClientRequestsSucceed(responses); + givenClientRequestsSucceed(responses, Collections.emptyList()); createExpiredModelSnapshotsRemover(jobs.iterator()).remove(1.0f, listener, () -> false); @@ -110,7 +120,11 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() { searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_1))); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.emptyList())); - givenClientRequestsSucceed(searchResponses); + givenClientRequestsSucceed(searchResponses, Arrays.asList( + createModelSnapshot("job-1", "snapshots-1_1", eightDaysAndOneMsAgo), + createModelSnapshot("job-1", "fresh-snapshot", oneDayAgo), + createModelSnapshot("job-1", "old-snapshot", eightDaysAndOneMsAgo) + )); createExpiredModelSnapshotsRemover(jobs.iterator()).remove(1.0f, listener, () -> false); listener.waitToCompletion(); @@ -123,9 +137,15 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() { assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("job-2")})); assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1)); - DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); - assertThat(deleteSnapshotRequest.getJobId(), equalTo("job-1")); - assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("old-snapshot")); + DeleteByQueryRequest deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); + assertThat(deleteSnapshotRequest.indices(), + arrayContainingInAnyOrder(AnomalyDetectorsIndex.jobResultsAliasedName("job-1"), + AnomalyDetectorsIndex.jobStateIndexPattern(), + AnnotationIndex.READ_ALIAS_NAME)); + assertThat(deleteSnapshotRequest.getSearchRequest().source().query() instanceof IdsQueryBuilder, is(true)); + IdsQueryBuilder idsQueryBuilder = (IdsQueryBuilder)deleteSnapshotRequest.getSearchRequest().source().query(); + assertTrue("expected ids related to [old-snapshot] but received [" + idsQueryBuilder.ids() + "]", + idsQueryBuilder.ids().stream().allMatch(s -> s.contains("old-snapshot"))); } public void testRemove_GivenTimeout() throws IOException { @@ -142,7 +162,11 @@ public void testRemove_GivenTimeout() throws IOException { searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); - givenClientRequestsSucceed(searchResponses); + givenClientRequestsSucceed(searchResponses, Arrays.asList( + createModelSnapshot("snapshots-1", "snapshots-1_1", now), + createModelSnapshot("snapshots-1", "snapshots-1_2", now), + createModelSnapshot("snapshots-2", "snapshots-2_1", now) + )); final int timeoutAfter = randomIntBetween(0, 1); AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); @@ -160,7 +184,7 @@ public void testRemove_GivenClientSearchRequestsFail() { JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() ); - givenClientSearchRequestsFail(searchResponses); + givenClientSearchRequestsFail(searchResponses, Collections.emptyList()); createExpiredModelSnapshotsRemover(jobs.iterator()).remove(1.0f, listener, () -> false); listener.waitToCompletion(); @@ -195,7 +219,11 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() { SearchHit snapshot2_2 = createModelSnapshotQueryHit("snapshots-2", "snapshots-2_1", eightDaysAndOneMsAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_2))); - givenClientDeleteModelSnapshotRequestsFail(searchResponses); + givenClientDeleteModelSnapshotRequestsFail(searchResponses, Arrays.asList( + createModelSnapshot("snapshots-1", "snapshots-1_1", oneDayAgo), + createModelSnapshot("snapshots-1", "snapshots-1_2", eightDaysAndOneMsAgo), + createModelSnapshot("snapshots-2", "snapshots-2_1", eightDaysAndOneMsAgo) + )); createExpiredModelSnapshotsRemover(jobs.iterator()).remove(1.0f, listener, () -> false); listener.waitToCompletion(); @@ -206,9 +234,15 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() { assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1)); - DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); - assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1")); - assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_2")); + DeleteByQueryRequest deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); + assertThat(deleteSnapshotRequest.indices(), + arrayContainingInAnyOrder(AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1"), + AnomalyDetectorsIndex.jobStateIndexPattern(), + AnnotationIndex.READ_ALIAS_NAME)); + assertThat(deleteSnapshotRequest.getSearchRequest().source().query() instanceof IdsQueryBuilder, is(true)); + IdsQueryBuilder idsQueryBuilder = (IdsQueryBuilder)deleteSnapshotRequest.getSearchRequest().source().query(); + assertTrue("expected ids related to [snapshots-1_2] but received [" + idsQueryBuilder.ids() + "]", + idsQueryBuilder.ids().stream().allMatch(s -> s.contains("snapshots-1_2"))); } @SuppressWarnings("unchecked") @@ -219,7 +253,10 @@ public void testCalcCutoffEpochMs() { SearchHit snapshot1_1 = createModelSnapshotQueryHit("job-1", "newest-snapshot", oneDayAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1))); - givenClientRequests(searchResponses, true, true); + givenClientRequests(searchResponses, + true, + true, + Collections.singletonList(createModelSnapshot("job-1", "newest-snapshot", oneDayAgo))); long retentionDays = 3L; ActionListener cutoffListener = mock(ActionListener.class); @@ -230,17 +267,6 @@ public void testCalcCutoffEpochMs() { verify(cutoffListener).onResponse(eq(new AbstractExpiredJobDataRemover.CutoffDetails(oneDayAgo.getTime(), expectedCutoffTime))); } - public void testJobSnapshotId() { - ExpiredModelSnapshotsRemover.JobSnapshotId id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", "b"); - assertFalse(id.hasNullValue()); - id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, "b"); - assertTrue(id.hasNullValue()); - id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", null); - assertTrue(id.hasNullValue()); - id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, null); - assertTrue(id.hasNullValue()); - } - private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover(Iterator jobIterator) { ThreadPool threadPool = mock(ThreadPool.class); ExecutorService executor = mock(ExecutorService.class); @@ -253,7 +279,13 @@ private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover(Iterator return null; } ).when(executor).execute(any()); - return new ExpiredModelSnapshotsRemover(originSettingClient, jobIterator, threadPool, new TaskId("test", 0L)); + return new ExpiredModelSnapshotsRemover( + originSettingClient, + jobIterator, + threadPool, + new TaskId("test", 0L), + resultsProvider, + mock(AnomalyDetectionAuditor.class)); } private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId, Date date) { @@ -269,21 +301,23 @@ private static SearchHit createModelSnapshotQueryHit(String jobId, String snapsh return hitBuilder.build(); } - private void givenClientRequestsSucceed(List searchResponses) { - givenClientRequests(searchResponses, true, true); + private void givenClientRequestsSucceed(List searchResponses, List modelSnapshots) { + givenClientRequests(searchResponses, true, true, modelSnapshots); } - private void givenClientSearchRequestsFail(List searchResponses) { - givenClientRequests(searchResponses, false, true); + private void givenClientSearchRequestsFail(List searchResponses, List snapshots) { + givenClientRequests(searchResponses, false, true, snapshots); } - private void givenClientDeleteModelSnapshotRequestsFail(List searchResponses) { - givenClientRequests(searchResponses, true, false); + private void givenClientDeleteModelSnapshotRequestsFail(List searchResponses, List snapshots) { + givenClientRequests(searchResponses, true, false, snapshots); } @SuppressWarnings("unchecked") private void givenClientRequests(List searchResponses, - boolean shouldSearchRequestsSucceed, boolean shouldDeleteSnapshotRequestsSucceed) { + boolean shouldSearchRequestsSucceed, + boolean shouldDeleteSnapshotRequestsSucceed, + List snapshots) { doAnswer(new Answer() { AtomicInteger callCount = new AtomicInteger(); @@ -306,7 +340,7 @@ public Void answer(InvocationOnMock invocationOnMock) { }).when(client).execute(same(SearchAction.INSTANCE), any(), any()); doAnswer(invocationOnMock -> { - capturedDeleteModelSnapshotRequests.add((DeleteModelSnapshotAction.Request) invocationOnMock.getArguments()[1]); + capturedDeleteModelSnapshotRequests.add((DeleteByQueryRequest) invocationOnMock.getArguments()[1]); ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; if (shouldDeleteSnapshotRequestsSucceed) { @@ -316,6 +350,15 @@ public Void answer(InvocationOnMock invocationOnMock) { } return null; } - ).when(client).execute(same(DeleteModelSnapshotAction.INSTANCE), any(), any()); + ).when(client).execute(same(DeleteByQueryAction.INSTANCE), any(), any()); + + for (ModelSnapshot snapshot : snapshots) { + doAnswer(invocationOnMock -> { + Consumer> listener = (Consumer>) invocationOnMock.getArguments()[2]; + listener.accept(new Result<>("", snapshot)); + return null; + }).when(resultsProvider).getModelSnapshot(eq(snapshot.getJobId()), eq(snapshot.getSnapshotId()), any(), any()); + } + } } From 371c61b1821acb06e02fd0d748167b8a38e5f140 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 1 Oct 2020 12:41:02 -0400 Subject: [PATCH 2/6] addressing pr comments --- .../xpack/core/ml/job/messages/Messages.java | 2 +- .../retention/ExpiredModelSnapshotsRemover.java | 15 ++++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 222aafbfde479..ad885062665eb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -144,7 +144,7 @@ public final class Messages { public static final String JOB_AUDIT_SNAPSHOT_STORED = "Job model snapshot with id [{0}] stored"; public static final String JOB_AUDIT_REVERTED = "Job model snapshot reverted to ''{0}''"; public static final String JOB_AUDIT_SNAPSHOT_DELETED = "Model snapshot [{0}] with description ''{1}'' deleted"; - public static final String JOB_AUDIT_SNAPSHOTS_DELETED = "Model snapshots {0} with descriptions {1} deleted"; + public static final String JOB_AUDIT_SNAPSHOTS_DELETED = "[{0}] expired model snapshots deleted"; public static final String JOB_AUDIT_FILTER_UPDATED_ON_PROCESS = "Updated filter [{0}] in running process"; public static final String JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS = "Updated calendars in running process"; public static final String JOB_AUDIT_MEMORY_STATUS_SOFT_LIMIT = "Job memory status changed to soft_limit; memory pruning will now be " + diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 84be7721485dc..a46452bf3afe5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * Deletes all model snapshots that have expired the configured retention time @@ -255,13 +256,13 @@ private void deleteModelSnapshots(List modelSnapshots, String job JobDataDeleter deleter = new JobDataDeleter(client, jobId); deleter.deleteModelSnapshots(modelSnapshots, ActionListener.wrap( bulkResponse -> { - String msg = Messages.getMessage( - Messages.JOB_AUDIT_SNAPSHOTS_DELETED, - modelSnapshots.stream().map(ModelSnapshot::getSnapshotId), - modelSnapshots.stream().map(ModelSnapshot::getDescription)); - - auditor.info(jobId, msg); - LOGGER.debug(() -> new ParameterizedMessage("[{}] {}", jobId, msg)); + auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOTS_DELETED, modelSnapshots.size())); + LOGGER.debug(() -> new ParameterizedMessage( + "[{}] deleted model snapshots {} with descriptions {}", + jobId, + modelSnapshots.stream().map(ModelSnapshot::getSnapshotId).collect(Collectors.toList()), + modelSnapshots.stream().map(ModelSnapshot::getDescription).collect(Collectors.toList()) + )); listener.onResponse(true); }, listener::onFailure From 474670fc65225d9fb2efd4648d763f9b3749030c Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 1 Oct 2020 14:13:28 -0400 Subject: [PATCH 3/6] addressing asynchronicity and reducing number of queries --- .../ExpiredModelSnapshotsRemover.java | 78 ++++------ .../ExpiredModelSnapshotsRemoverTests.java | 146 +++++++++++------- 2 files changed, 112 insertions(+), 112 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index a46452bf3afe5..97f91f3987557 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -10,9 +10,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.unit.TimeValue; @@ -25,12 +23,12 @@ import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.action.util.QueryPage; import org.elasticsearch.xpack.core.common.time.TimeUtils; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; @@ -170,72 +168,48 @@ protected void removeDataBefore( job.getId(), cutoffEpochMs)); - SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())); - - QueryBuilder activeSnapshotFilter = QueryBuilders.termQuery( - ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), job.getModelSnapshotId()); - QueryBuilder retainFilter = QueryBuilders.termQuery(ModelSnapshot.RETAIN.getPreferredName(), true); - QueryBuilder query = createQuery(job.getId(), cutoffEpochMs) - .filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName())) - .mustNot(activeSnapshotFilter) - .mustNot(retainFilter); - - SearchSourceBuilder source = new SearchSourceBuilder(); - source.query(query); - source.size(MODEL_SNAPSHOT_SEARCH_SIZE); - source.sort(ModelSnapshot.TIMESTAMP.getPreferredName()); - source.fetchSource(false); - source.docValueField(Job.ID.getPreferredName(), null); - source.docValueField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), null); - source.docValueField(ModelSnapshot.TIMESTAMP.getPreferredName(), "epoch_millis"); - searchRequest.source(source); - searchRequest.setParentTask(getParentTaskId()); - long deleteAllBeforeMs = (job.getModelSnapshotRetentionDays() == null) ? 0 : latestTimeMs - TimeValue.timeValueDays(job.getModelSnapshotRetentionDays()).getMillis(); - client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool, - MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), deleteAllBeforeMs, listener), false)); + ActionListener> snapshotsListener = expiredSnapshotsListener(job, deleteAllBeforeMs, listener); + jobResultsProvider.modelSnapshots(job.getId(), + 0, + MODEL_SNAPSHOT_SEARCH_SIZE, + null, + String.valueOf(cutoffEpochMs), + ModelSnapshot.TIMESTAMP.getPreferredName(), + true, + null, + snapshotsListener::onResponse, + snapshotsListener::onFailure); } - private ActionListener expiredSnapshotsListener(String jobId, - long deleteAllBeforeMs, - ActionListener listener) { + private ActionListener> expiredSnapshotsListener(Job job, + long deleteAllBeforeMs, + ActionListener listener) { return new ActionListener<>() { @Override - public void onResponse(SearchResponse searchResponse) { + public void onResponse(QueryPage searchResponse) { long nextToKeepMs = deleteAllBeforeMs; try { List snapshots = new ArrayList<>(); - for (SearchHit hit : searchResponse.getHits()) { - String timestamp = stringFieldValueOrNull(hit, ModelSnapshot.TIMESTAMP.getPreferredName()); - if (timestamp == null) { - LOGGER.warn("Model snapshot document [{}] has a null timestamp field", hit.getId()); + for (ModelSnapshot snapshot: searchResponse.results()) { + if (snapshot.getSnapshotId().equals(job.getModelSnapshotId())) { continue; } - long timestampMs = TimeUtils.parseToEpochMs(timestamp); + if (snapshot.getTimestamp() == null) { + LOGGER.warn("Model snapshot document [{}] has a null timestamp field", snapshot.getSnapshotId()); + continue; + } + long timestampMs = snapshot.getTimestamp().getTime(); if (timestampMs >= nextToKeepMs) { do { nextToKeepMs += MS_IN_ONE_DAY; } while (timestampMs >= nextToKeepMs); continue; } - String jobId = stringFieldValueOrNull(hit, Job.ID.getPreferredName()); - String snapshotId = stringFieldValueOrNull(hit, ModelSnapshotField.SNAPSHOT_ID.getPreferredName()); - - if (jobId != null && snapshotId != null) { - jobResultsProvider.getModelSnapshot( - jobId, - snapshotId, - // We are safe to grab this snapshot as the query is by DOC ID and we already filtered out the - // currently active snapshot earlier in the call chain - (shots) -> snapshots.add(shots.result), - (failure) -> - LOGGER.warn(new ParameterizedMessage("[{}] failed to find snapshot [{}]", jobId, snapshotId), failure) - ); - } + snapshots.add(snapshot); } - deleteModelSnapshots(snapshots, jobId, listener); + deleteModelSnapshots(snapshots, job.getId(), listener); } catch (Exception e) { onFailure(e); } @@ -243,7 +217,7 @@ public void onResponse(SearchResponse searchResponse) { @Override public void onFailure(Exception e) { - listener.onFailure(new ElasticsearchException("[{}] Search for expired snapshots failed", e, jobId)); + listener.onFailure(new ElasticsearchException("[{}] Search for expired snapshots failed", e, job.getId())); } }; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index 4f48024b53dd8..6e2585c933698 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.reindex.DeleteByQueryAction; @@ -21,13 +22,13 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; -import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; @@ -42,8 +43,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -53,6 +56,8 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doAnswer; @@ -66,13 +71,13 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { private Client client; private JobResultsProvider resultsProvider; private OriginSettingClient originSettingClient; - private List capturedSearchRequests; + private List capturedJobIds; private List capturedDeleteModelSnapshotRequests; private TestListener listener; @Before public void setUpTests() { - capturedSearchRequests = new ArrayList<>(); + capturedJobIds = new ArrayList<>(); capturedDeleteModelSnapshotRequests = new ArrayList<>(); client = mock(Client.class); @@ -87,7 +92,7 @@ public void testRemove_GivenJobWithoutActiveSnapshot() throws IOException { List responses = Collections.singletonList( AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList())); - givenClientRequestsSucceed(responses, Collections.emptyList()); + givenClientRequestsSucceed(responses, Collections.emptyMap()); createExpiredModelSnapshotsRemover(jobs.iterator()).remove(1.0f, listener, () -> false); @@ -107,34 +112,33 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() { Date oneDayAgo = new Date(now.getTime() - TimeValue.timeValueDays(1).getMillis()); SearchHit snapshot1_1 = createModelSnapshotQueryHit("job-1", "fresh-snapshot", oneDayAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1))); + SearchHit snapshot2_1 = createModelSnapshotQueryHit("job-2", "fresh-snapshot", oneDayAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_1))); // It needs to be strictly more than 7 days before the most recent snapshot, hence the extra millisecond Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1); - SearchHit snapshotToBeDeleted = createModelSnapshotQueryHit("job-1", "old-snapshot", eightDaysAndOneMsAgo); - - - searchResponses.add( - AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshotToBeDeleted))); - - SearchHit snapshot2_1 = createModelSnapshotQueryHit("job-1", "snapshots-1_1", eightDaysAndOneMsAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_1))); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.emptyList())); - - givenClientRequestsSucceed(searchResponses, Arrays.asList( - createModelSnapshot("job-1", "snapshots-1_1", eightDaysAndOneMsAgo), - createModelSnapshot("job-1", "fresh-snapshot", oneDayAgo), - createModelSnapshot("job-1", "old-snapshot", eightDaysAndOneMsAgo) - )); + Map> snapshotResponses = new HashMap<>(); + snapshotResponses.put("job-1", + Arrays.asList( + createModelSnapshot("job-1", "active", oneDayAgo), + createModelSnapshot("job-1", "fresh-snapshot", oneDayAgo), + createModelSnapshot("job-1", "old-snapshot", eightDaysAndOneMsAgo) + )); + snapshotResponses.put("job-2", + Arrays.asList( + createModelSnapshot("job-2", "active", oneDayAgo), + createModelSnapshot("job-2", "snapshots-1_1", eightDaysAndOneMsAgo), + createModelSnapshot("job-2", "fresh-snapshot", oneDayAgo) + )); + givenClientRequestsSucceed(searchResponses, snapshotResponses); createExpiredModelSnapshotsRemover(jobs.iterator()).remove(1.0f, listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); - assertThat(capturedSearchRequests.size(), equalTo(4)); - SearchRequest searchRequest = capturedSearchRequests.get(1); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("job-1")})); - searchRequest = capturedSearchRequests.get(3); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("job-2")})); + assertThat(capturedJobIds.size(), equalTo(2)); + assertThat(capturedJobIds.get(0), equalTo("job-1")); + assertThat(capturedJobIds.get(1), equalTo("job-2")); assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1)); DeleteByQueryRequest deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); @@ -161,12 +165,18 @@ public void testRemove_GivenTimeout() throws IOException { List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1", now)); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); + HashMap> snapshots = new HashMap<>(); - givenClientRequestsSucceed(searchResponses, Arrays.asList( + snapshots.put("snapshots-1", Arrays.asList( + createModelSnapshot("snapshots-1", "active", now), createModelSnapshot("snapshots-1", "snapshots-1_1", now), - createModelSnapshot("snapshots-1", "snapshots-1_2", now), + createModelSnapshot("snapshots-1", "snapshots-1_2", now) + )); + snapshots.put("snapshots-2", Arrays.asList( + createModelSnapshot("snapshots-2", "active", now), createModelSnapshot("snapshots-2", "snapshots-2_1", now) )); + givenClientRequestsSucceed(searchResponses, snapshots); final int timeoutAfter = randomIntBetween(0, 1); AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); @@ -184,16 +194,12 @@ public void testRemove_GivenClientSearchRequestsFail() { JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() ); - givenClientSearchRequestsFail(searchResponses, Collections.emptyList()); + givenClientSearchRequestsFail(searchResponses, Collections.emptyMap()); createExpiredModelSnapshotsRemover(jobs.iterator()).remove(1.0f, listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(false)); - assertThat(capturedSearchRequests.size(), equalTo(1)); - SearchRequest searchRequest = capturedSearchRequests.get(0); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); - assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(0)); } @@ -206,32 +212,31 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() { Date now = new Date(); Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis()); + Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1); SearchHit snapshot1_1 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_1", oneDayAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1))); - - // It needs to be strictly more than 7 days before the most recent snapshot, hence the extra millisecond - Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1); - List snapshots1JobSnapshots = Arrays.asList( - snapshot1_1, - createModelSnapshotQueryHit("snapshots-1", "snapshots-1_2", eightDaysAndOneMsAgo)); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(snapshots1JobSnapshots)); + Map> snapshots = new HashMap<>(); + snapshots.put("snapshots-1", Arrays.asList( + createModelSnapshot("snapshots-1", "active", oneDayAgo), + createModelSnapshot("snapshots-1", "snapshots-1_1", oneDayAgo), + createModelSnapshot("snapshots-1", "snapshots-1_2", eightDaysAndOneMsAgo) + )); + snapshots.put("snapshots-2", Arrays.asList( + createModelSnapshot("snapshots-2", "active", eightDaysAndOneMsAgo), + createModelSnapshot("snapshots-2", "snapshots-2_1", eightDaysAndOneMsAgo) + )); SearchHit snapshot2_2 = createModelSnapshotQueryHit("snapshots-2", "snapshots-2_1", eightDaysAndOneMsAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_2))); - givenClientDeleteModelSnapshotRequestsFail(searchResponses, Arrays.asList( - createModelSnapshot("snapshots-1", "snapshots-1_1", oneDayAgo), - createModelSnapshot("snapshots-1", "snapshots-1_2", eightDaysAndOneMsAgo), - createModelSnapshot("snapshots-2", "snapshots-2_1", eightDaysAndOneMsAgo) - )); + givenClientDeleteModelSnapshotRequestsFail(searchResponses, snapshots); createExpiredModelSnapshotsRemover(jobs.iterator()).remove(1.0f, listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(false)); - assertThat(capturedSearchRequests.size(), equalTo(2)); - SearchRequest searchRequest = capturedSearchRequests.get(1); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); + assertThat(capturedJobIds.size(), equalTo(1)); + assertThat(capturedJobIds.get(0), equalTo("snapshots-1")); assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1)); DeleteByQueryRequest deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); @@ -256,7 +261,7 @@ public void testCalcCutoffEpochMs() { givenClientRequests(searchResponses, true, true, - Collections.singletonList(createModelSnapshot("job-1", "newest-snapshot", oneDayAgo))); + Collections.singletonMap("job-1", Collections.singletonList(createModelSnapshot("job-1", "newest-snapshot", oneDayAgo)))); long retentionDays = 3L; ActionListener cutoffListener = mock(ActionListener.class); @@ -301,15 +306,18 @@ private static SearchHit createModelSnapshotQueryHit(String jobId, String snapsh return hitBuilder.build(); } - private void givenClientRequestsSucceed(List searchResponses, List modelSnapshots) { - givenClientRequests(searchResponses, true, true, modelSnapshots); + private void givenClientRequestsSucceed(List searchResponses, + Map> snapshots) { + givenClientRequests(searchResponses, true, true, snapshots); } - private void givenClientSearchRequestsFail(List searchResponses, List snapshots) { + private void givenClientSearchRequestsFail(List searchResponses, + Map> snapshots) { givenClientRequests(searchResponses, false, true, snapshots); } - private void givenClientDeleteModelSnapshotRequestsFail(List searchResponses, List snapshots) { + private void givenClientDeleteModelSnapshotRequestsFail(List searchResponses, + Map> snapshots) { givenClientRequests(searchResponses, true, false, snapshots); } @@ -317,7 +325,7 @@ private void givenClientDeleteModelSnapshotRequestsFail(List sea private void givenClientRequests(List searchResponses, boolean shouldSearchRequestsSucceed, boolean shouldDeleteSnapshotRequestsSucceed, - List snapshots) { + Map> snapshots) { doAnswer(new Answer() { AtomicInteger callCount = new AtomicInteger(); @@ -327,9 +335,8 @@ public Void answer(InvocationOnMock invocationOnMock) { ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; SearchRequest searchRequest = (SearchRequest) invocationOnMock.getArguments()[1]; - capturedSearchRequests.add(searchRequest); // Only the last search request should fail - if (shouldSearchRequestsSucceed || callCount.get() < searchResponses.size()) { + if (shouldSearchRequestsSucceed || callCount.get() < (searchResponses.size() + snapshots.size())) { SearchResponse response = searchResponses.get(callCount.getAndIncrement()); listener.onResponse(response); } else { @@ -352,12 +359,31 @@ public Void answer(InvocationOnMock invocationOnMock) { } ).when(client).execute(same(DeleteByQueryAction.INSTANCE), any(), any()); - for (ModelSnapshot snapshot : snapshots) { - doAnswer(invocationOnMock -> { - Consumer> listener = (Consumer>) invocationOnMock.getArguments()[2]; - listener.accept(new Result<>("", snapshot)); - return null; - }).when(resultsProvider).getModelSnapshot(eq(snapshot.getJobId()), eq(snapshot.getSnapshotId()), any(), any()); + for (Map.Entry> snapshot : snapshots.entrySet()) { + doAnswer(new Answer() { + AtomicInteger callCount = new AtomicInteger(); + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + capturedJobIds.add((String)invocationOnMock.getArguments()[0]); + Consumer> listener = (Consumer>) invocationOnMock.getArguments()[8]; + Consumer failure = (Consumer) invocationOnMock.getArguments()[9]; + if (shouldSearchRequestsSucceed || callCount.get() < snapshots.size()) { + callCount.incrementAndGet(); + listener.accept(new QueryPage<>(snapshot.getValue(), 10, new ParseField("snapshots"))); + } else { + failure.accept(new RuntimeException("search failed")); + } + return null; } + }).when(resultsProvider).modelSnapshots(eq(snapshot.getKey()), + anyInt(), + anyInt(), + any(), + any(), + any(), + anyBoolean(), + any(), + any(), + any()); } } From 49e5ea913b96a603b584809f7f9ef4a61933bd47 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Fri, 2 Oct 2020 07:41:03 -0400 Subject: [PATCH 4/6] respect retain --- .../core/ml/job/process/autodetect/state/ModelSnapshot.java | 4 ++++ .../xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java index 4e4d58105c494..80b4eab384e41 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java @@ -238,6 +238,10 @@ public Date getLatestResultTimeStamp() { return latestResultTimeStamp; } + public boolean isRetain() { + return retain; + } + @Override public int hashCode() { return Objects.hash(jobId, minVersion, timestamp, description, snapshotId, quantiles, snapshotDocCount, modelSizeStats, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 97f91f3987557..dc7d1afbb017a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -193,7 +193,8 @@ public void onResponse(QueryPage searchResponse) { try { List snapshots = new ArrayList<>(); for (ModelSnapshot snapshot: searchResponse.results()) { - if (snapshot.getSnapshotId().equals(job.getModelSnapshotId())) { + // We don't want to delete the currently used snapshot or a snapshot marked to be retained + if (snapshot.getSnapshotId().equals(job.getModelSnapshotId()) || snapshot.isRetain()) { continue; } if (snapshot.getTimestamp() == null) { From 5f7028394ab807f5fe2764f2cd7944e5b85bd02e Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Fri, 2 Oct 2020 08:00:41 -0400 Subject: [PATCH 5/6] fixing sort order --- .../xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index dc7d1afbb017a..7b416dd1b2dee 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -177,7 +177,7 @@ protected void removeDataBefore( null, String.valueOf(cutoffEpochMs), ModelSnapshot.TIMESTAMP.getPreferredName(), - true, + false, null, snapshotsListener::onResponse, snapshotsListener::onFailure); From 1bdb6b53d8731b3890add41e0efe8e2cacc24e74 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Fri, 2 Oct 2020 10:02:54 -0400 Subject: [PATCH 6/6] adjusting tests --- .../ml/integration/DeleteExpiredDataIT.java | 1 - .../ExpiredModelSnapshotsRemoverTests.java | 37 +++++++------------ 2 files changed, 13 insertions(+), 25 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 84dc8f84ead49..06ee0e48b6fd3 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -137,7 +137,6 @@ public void testDeleteExpiredDataActionDeletesEmptyStateIndices() throws Excepti is(arrayContaining(".ml-state-000001", ".ml-state-000005", ".ml-state-000007"))); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/62699") public void testDeleteExpiredDataWithStandardThrottle() throws Exception { testExpiredDeletion(-1.0f, 100); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index 6e2585c933698..bf3aa6e1849c7 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -120,16 +120,13 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() { Map> snapshotResponses = new HashMap<>(); snapshotResponses.put("job-1", Arrays.asList( - createModelSnapshot("job-1", "active", oneDayAgo), - createModelSnapshot("job-1", "fresh-snapshot", oneDayAgo), + // Keeping active as its expiration is not known. We can assume "worst case" and verify it is not removed + createModelSnapshot("job-1", "active", eightDaysAndOneMsAgo), createModelSnapshot("job-1", "old-snapshot", eightDaysAndOneMsAgo) )); - snapshotResponses.put("job-2", - Arrays.asList( - createModelSnapshot("job-2", "active", oneDayAgo), - createModelSnapshot("job-2", "snapshots-1_1", eightDaysAndOneMsAgo), - createModelSnapshot("job-2", "fresh-snapshot", oneDayAgo) - )); + // Retention days for job-2 is 17 days, consequently, its query should return anything as we don't ask for snapshots + // created AFTER 17 days ago + snapshotResponses.put("job-2", Collections.emptyList()); givenClientRequestsSucceed(searchResponses, snapshotResponses); createExpiredModelSnapshotsRemover(jobs.iterator()).remove(1.0f, listener, () -> false); @@ -167,15 +164,10 @@ public void testRemove_GivenTimeout() throws IOException { searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); HashMap> snapshots = new HashMap<>(); - snapshots.put("snapshots-1", Arrays.asList( - createModelSnapshot("snapshots-1", "active", now), - createModelSnapshot("snapshots-1", "snapshots-1_1", now), - createModelSnapshot("snapshots-1", "snapshots-1_2", now) - )); - snapshots.put("snapshots-2", Arrays.asList( - createModelSnapshot("snapshots-2", "active", now), - createModelSnapshot("snapshots-2", "snapshots-2_1", now) - )); + // ALl snapshots are "now" and the retention days is much longer than that. + // So, getting the snapshots should return empty for both + snapshots.put("snapshots-1", Collections.emptyList()); + snapshots.put("snapshots-2", Collections.emptyList()); givenClientRequestsSucceed(searchResponses, snapshots); final int timeoutAfter = randomIntBetween(0, 1); @@ -216,15 +208,12 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() { SearchHit snapshot1_1 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_1", oneDayAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1))); Map> snapshots = new HashMap<>(); - snapshots.put("snapshots-1", Arrays.asList( - createModelSnapshot("snapshots-1", "active", oneDayAgo), - createModelSnapshot("snapshots-1", "snapshots-1_1", oneDayAgo), + // Should only return the one from 8 days ago + snapshots.put("snapshots-1", Collections.singletonList( createModelSnapshot("snapshots-1", "snapshots-1_2", eightDaysAndOneMsAgo) )); - snapshots.put("snapshots-2", Arrays.asList( - createModelSnapshot("snapshots-2", "active", eightDaysAndOneMsAgo), - createModelSnapshot("snapshots-2", "snapshots-2_1", eightDaysAndOneMsAgo) - )); + // Shouldn't return anything as retention is 17 days + snapshots.put("snapshots-2", Collections.emptyList()); SearchHit snapshot2_2 = createModelSnapshotQueryHit("snapshots-2", "snapshots-2_1", eightDaysAndOneMsAgo); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_2)));