diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java index bce61cb4c23eb..d82dd1a063dff 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java @@ -14,11 +14,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.Client; -import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.document.DocumentField; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -39,8 +35,6 @@ import org.joda.time.DateTime; import org.joda.time.chrono.ISOChronology; -import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -83,6 +77,10 @@ public void remove(ActionListener listener, Supplier isTimedOu .filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE)) .filter(QueryBuilders.existsQuery(ForecastRequestStats.EXPIRY_TIME.getPreferredName()))); source.size(MAX_FORECASTS); + source.fetchSource(false); + source.docValueField(Job.ID.getPreferredName(), null); + source.docValueField(ForecastRequestStats.FORECAST_ID.getPreferredName(), null); + source.docValueField(ForecastRequestStats.EXPIRY_TIME.getPreferredName(), "epoch_millis"); // _doc is the most efficient sort order and will also disable scoring source.sort(ElasticsearchMappings.ES_DOC); @@ -94,11 +92,9 @@ public void remove(ActionListener listener, Supplier isTimedOu } private void deleteForecasts(SearchResponse searchResponse, ActionListener listener, Supplier isTimedOutSupplier) { - List forecastsToDelete; - try { - forecastsToDelete = findForecastsToDelete(searchResponse); - } catch (IOException e) { - listener.onFailure(e); + List forecastsToDelete = findForecastsToDelete(searchResponse); + if (forecastsToDelete.isEmpty()) { + listener.onResponse(true); return; } @@ -129,8 +125,8 @@ public void onFailure(Exception e) { }); } - private List findForecastsToDelete(SearchResponse searchResponse) throws IOException { - List forecastsToDelete = new ArrayList<>(); + private List findForecastsToDelete(SearchResponse searchResponse) { + List forecastsToDelete = new ArrayList<>(); SearchHits hits = searchResponse.getHits(); if (hits.getTotalHits() > MAX_FORECASTS) { @@ -138,19 +134,34 @@ private List findForecastsToDelete(SearchResponse searchRe } for (SearchHit hit : hits.getHits()) { - try (InputStream stream = hit.getSourceRef().streamInput(); - XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser( - NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { - ForecastRequestStats forecastRequestStats = ForecastRequestStats.LENIENT_PARSER.apply(parser, null); - if (forecastRequestStats.getExpiryTime().toEpochMilli() < cutoffEpochMs) { - forecastsToDelete.add(forecastRequestStats); + DocumentField docField = hit.field(ForecastRequestStats.EXPIRY_TIME.getPreferredName()); + if (docField == null) { + LOGGER.warn("Forecast request stats document [{}] has a null [{}] field", hit.getId(), + ForecastRequestStats.EXPIRY_TIME.getPreferredName()); + continue; + } + + Long expiryMs = parseDateField(docField.getValue()); + if (expiryMs == null) { + LOGGER.warn("Forecast request stats document [{}] date field [{}] cannot be parsed", hit.getId(), + ForecastRequestStats.EXPIRY_TIME.getPreferredName()); + continue; + } + + if (expiryMs < cutoffEpochMs) { + JobForecastId idPair = new JobForecastId( + stringFieldValueOrNull(hit, Job.ID.getPreferredName()), + stringFieldValueOrNull(hit, Forecast.FORECAST_ID.getPreferredName())); + + if (idPair.hasNullValue() == false) { + forecastsToDelete.add(idPair); } } } return forecastsToDelete; } - private DeleteByQueryRequest buildDeleteByQuery(List forecastsToDelete) { + private DeleteByQueryRequest buildDeleteByQuery(List ids) { DeleteByQueryRequest request = new DeleteByQueryRequest(); request.setSlices(5); @@ -158,10 +169,12 @@ private DeleteByQueryRequest buildDeleteByQuery(List forec BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1); boolQuery.must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE)); - for (ForecastRequestStats forecastToDelete : forecastsToDelete) { - boolQuery.should(QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery(Job.ID.getPreferredName(), forecastToDelete.getJobId())) - .must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastToDelete.getForecastId()))); + for (JobForecastId jobForecastId : ids) { + if (jobForecastId.hasNullValue() == false) { + boolQuery.should(QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobForecastId.jobId)) + .must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), jobForecastId.forecastId))); + } } QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery); request.setQuery(query); @@ -171,4 +184,28 @@ private DeleteByQueryRequest buildDeleteByQuery(List forec return request; } + + static Long parseDateField(Object value) { + if (value instanceof String) { // doc_value field with the epoch_millis format + return Long.parseLong((String)value); + } else if (value instanceof Long) { // pre-6.0 field + return (Long)value; + } else { + return null; + } + } + + private static class JobForecastId { + private final String jobId; + private final String forecastId; + + private JobForecastId(String jobId, String forecastId) { + this.jobId = jobId; + this.forecastId = forecastId; + } + + boolean hasNullValue() { + return jobId == null || forecastId == null; + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 7f46f63516d73..0ee1029e636a4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -88,7 +88,14 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener(LOGGER, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false)); @@ -99,11 +106,18 @@ private ActionListener expiredSnapshotsListener(String jobId, Ac @Override public void onResponse(SearchResponse searchResponse) { try { - List modelSnapshots = new ArrayList<>(); + List snapshotIds = new ArrayList<>(); for (SearchHit hit : searchResponse.getHits()) { - modelSnapshots.add(ModelSnapshot.fromJson(hit.getSourceRef())); + JobSnapshotId idPair = new JobSnapshotId( + stringFieldValueOrNull(hit, Job.ID.getPreferredName()), + stringFieldValueOrNull(hit, ModelSnapshotField.SNAPSHOT_ID.getPreferredName())); + + if (idPair.hasNullValue() == false) { + snapshotIds.add(idPair); + } } - deleteModelSnapshots(new VolatileCursorIterator<>(modelSnapshots), listener); + + deleteModelSnapshots(new VolatileCursorIterator<>(snapshotIds), listener); } catch (Exception e) { onFailure(e); } @@ -116,14 +130,14 @@ public void onFailure(Exception e) { }; } - private void deleteModelSnapshots(Iterator modelSnapshotIterator, ActionListener listener) { + private void deleteModelSnapshots(Iterator modelSnapshotIterator, ActionListener listener) { if (modelSnapshotIterator.hasNext() == false) { listener.onResponse(true); return; } - ModelSnapshot modelSnapshot = modelSnapshotIterator.next(); - DeleteModelSnapshotAction.Request deleteSnapshotRequest = new DeleteModelSnapshotAction.Request( - modelSnapshot.getJobId(), modelSnapshot.getSnapshotId()); + JobSnapshotId idPair = modelSnapshotIterator.next(); + DeleteModelSnapshotAction.Request deleteSnapshotRequest = + new DeleteModelSnapshotAction.Request(idPair.jobId, idPair.snapshotId); getClient().execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener() { @Override public void onResponse(AcknowledgedResponse response) { @@ -136,9 +150,23 @@ public void onResponse(AcknowledgedResponse response) { @Override public void onFailure(Exception e) { - listener.onFailure(new ElasticsearchException("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot [" - + modelSnapshot.getSnapshotId() + "]", e)); + listener.onFailure(new ElasticsearchException("[" + idPair.jobId + "] Failed to delete snapshot [" + + idPair.snapshotId + "]", e)); } }); } + + static class JobSnapshotId { + private final String jobId; + private final String snapshotId; + + JobSnapshotId(String jobId, String snapshotId) { + this.jobId = jobId; + this.snapshotId = snapshotId; + } + + boolean hasNullValue() { + return jobId == null || snapshotId == null; + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java index 485d8e9bfa22d..0fa06262801f3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java @@ -6,9 +6,29 @@ package org.elasticsearch.xpack.ml.job.retention; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.document.DocumentField; +import org.elasticsearch.search.SearchHit; import java.util.function.Supplier; public interface MlDataRemover { void remove(ActionListener listener, Supplier isTimedOutSupplier); + + /** + * Extract {@code fieldName} from {@code hit} and if it is a string + * return the string else {@code null}. + * @param hit The search hit + * @param fieldName Field to find + * @return value iff the docfield is present and it is a string. Otherwise {@code null} + */ + default String stringFieldValueOrNull(SearchHit hit, String fieldName) { + DocumentField docField = hit.field(fieldName); + if (docField != null) { + Object value = docField.getValue(); + if (value instanceof String) { + return (String)value; + } + } + return null; + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index 5256cb9cc0e35..a6287d6f1a031 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -92,6 +92,13 @@ private static SearchResponse createSearchResponse(List to return searchResponse; } + static SearchResponse createSearchResponseFromHits(List hits) { + SearchHits searchHits = new SearchHits(hits.toArray(new SearchHit[] {}), hits.size(), 1.0f); + SearchResponse searchResponse = mock(SearchResponse.class); + when(searchResponse.getHits()).thenReturn(searchHits); + return searchResponse; + } + public void testRemoveGivenNoJobs() throws IOException { SearchResponse response = createSearchResponse(Collections.emptyList()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemoverTests.java new file mode 100644 index 0000000000000..6ce7951374f34 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemoverTests.java @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.job.retention; + +import org.elasticsearch.test.ESTestCase; + +import java.util.Date; + +public class ExpiredForecastsRemoverTests extends ESTestCase { + + public void testDateParsing() { + assertEquals(Long.valueOf(1462096800000L), ExpiredForecastsRemover.parseDateField("1462096800000")); + assertEquals(Long.valueOf(1462096800000L), ExpiredForecastsRemover.parseDateField(1462096800000L)); + assertNull(ExpiredForecastsRemover.parseDateField(new Date())); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index 42e97efbe3f13..6b66965829ac3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.mock.orig.Mockito; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; @@ -25,7 +26,9 @@ import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.test.SearchHitBuilder; import org.junit.After; import org.junit.Before; import org.mockito.invocation.InvocationOnMock; @@ -118,11 +121,13 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() )); - List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), - createModelSnapshot("snapshots-1", "snapshots-1_2")); - List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); + SearchHit snapshot1_1 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_1"); + SearchHit snapshot1_2 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_2"); + searchResponsesPerCall.add( + AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Arrays.asList(snapshot1_1, snapshot1_2))); + + SearchHit snapshot2_1 = createModelSnapshotQueryHit("snapshots-2", "snapshots-2_1"); + searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_1))); createExpiredModelSnapshotsRemover().remove(listener, () -> false); @@ -203,12 +208,13 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() )); - List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), - createModelSnapshot("snapshots-1", "snapshots-1_2")); - List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); + SearchHit snapshot1_1 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_1"); + SearchHit snapshot1_2 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_2"); + searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits( + Arrays.asList(snapshot1_1, snapshot1_2))); + SearchHit snapshot2_2 = createModelSnapshotQueryHit("snapshots-2", "snapshots-2_1"); + searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_2))); createExpiredModelSnapshotsRemover().remove(listener, () -> false); listener.waitToCompletion(); @@ -224,6 +230,17 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1")); } + public void testJobSnapshotId() { + ExpiredModelSnapshotsRemover.JobSnapshotId id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", "b"); + assertFalse(id.hasNullValue()); + id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, "b"); + assertTrue(id.hasNullValue()); + id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", null); + assertTrue(id.hasNullValue()); + id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, null); + assertTrue(id.hasNullValue()); + } + @SuppressWarnings("unchecked") private void givenJobs(List jobs) throws IOException { SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs); @@ -287,4 +304,10 @@ public Void answer(InvocationOnMock invocationOnMock) { }).when(client).execute(same(DeleteModelSnapshotAction.INSTANCE), any(), any()); } + private static SearchHit createModelSnapshotQueryHit(String jobId, String snapshotId) { + SearchHitBuilder hitBuilder = new SearchHitBuilder(0); + hitBuilder.addField(Job.ID.getPreferredName(), Collections.singletonList(jobId)); + hitBuilder.addField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), Collections.singletonList(snapshotId)); + return hitBuilder.build(); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java new file mode 100644 index 0000000000000..5b5638a904a99 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.job.retention; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.test.SearchHitBuilder; + +import java.util.Collections; +import java.util.Date; + +public class MlDataRemoverTests extends ESTestCase { + public void testStringOrNull() { + MlDataRemover remover = (listener, isTimedOutSupplier) -> { }; + + SearchHitBuilder hitBuilder = new SearchHitBuilder(0); + assertNull(remover.stringFieldValueOrNull(hitBuilder.build(), "missing")); + + hitBuilder = new SearchHitBuilder(0); + hitBuilder.addField("not_a_string", Collections.singletonList(new Date())); + assertNull(remover.stringFieldValueOrNull(hitBuilder.build(), "not_a_string")); + + hitBuilder = new SearchHitBuilder(0); + hitBuilder.addField("string_field", Collections.singletonList("actual_string_value")); + assertEquals("actual_string_value", remover.stringFieldValueOrNull(hitBuilder.build(), "string_field")); + } +}