Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand All @@ -39,8 +35,6 @@
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -83,6 +77,10 @@ public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOu
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE))
.filter(QueryBuilders.existsQuery(ForecastRequestStats.EXPIRY_TIME.getPreferredName())));
source.size(MAX_FORECASTS);
source.fetchSource(false);
source.docValueField(Job.ID.getPreferredName(), null);
source.docValueField(ForecastRequestStats.FORECAST_ID.getPreferredName(), null);
source.docValueField(ForecastRequestStats.EXPIRY_TIME.getPreferredName(), "epoch_millis");

// _doc is the most efficient sort order and will also disable scoring
source.sort(ElasticsearchMappings.ES_DOC);
Expand All @@ -94,11 +92,9 @@ public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOu
}

private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
List<ForecastRequestStats> forecastsToDelete;
try {
forecastsToDelete = findForecastsToDelete(searchResponse);
} catch (IOException e) {
listener.onFailure(e);
List<JobForecastId> forecastsToDelete = findForecastsToDelete(searchResponse);
if (forecastsToDelete.isEmpty()) {
listener.onResponse(true);
return;
}

Expand Down Expand Up @@ -129,39 +125,56 @@ public void onFailure(Exception e) {
});
}

private List<ForecastRequestStats> findForecastsToDelete(SearchResponse searchResponse) throws IOException {
List<ForecastRequestStats> forecastsToDelete = new ArrayList<>();
private List<JobForecastId> findForecastsToDelete(SearchResponse searchResponse) {
List<JobForecastId> forecastsToDelete = new ArrayList<>();

SearchHits hits = searchResponse.getHits();
if (hits.getTotalHits() > MAX_FORECASTS) {
LOGGER.info("More than [{}] forecasts were found. This run will only delete [{}] of them", MAX_FORECASTS, MAX_FORECASTS);
}

for (SearchHit hit : hits.getHits()) {
try (InputStream stream = hit.getSourceRef().streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
ForecastRequestStats forecastRequestStats = ForecastRequestStats.LENIENT_PARSER.apply(parser, null);
if (forecastRequestStats.getExpiryTime().toEpochMilli() < cutoffEpochMs) {
forecastsToDelete.add(forecastRequestStats);
DocumentField docField = hit.field(ForecastRequestStats.EXPIRY_TIME.getPreferredName());
if (docField == null) {
LOGGER.warn("Forecast request stats document [{}] has a null [{}] field", hit.getId(),
ForecastRequestStats.EXPIRY_TIME.getPreferredName());
continue;
}

Long expiryMs = parseDateField(docField.getValue());
if (expiryMs == null) {
LOGGER.warn("Forecast request stats document [{}] date field [{}] cannot be parsed", hit.getId(),
ForecastRequestStats.EXPIRY_TIME.getPreferredName());
continue;
}

if (expiryMs < cutoffEpochMs) {
JobForecastId idPair = new JobForecastId(
stringFieldValueOrNull(hit, Job.ID.getPreferredName()),
stringFieldValueOrNull(hit, Forecast.FORECAST_ID.getPreferredName()));

if (idPair.hasNullValue() == false) {
forecastsToDelete.add(idPair);
}
}
}
return forecastsToDelete;
}

private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forecastsToDelete) {
private DeleteByQueryRequest buildDeleteByQuery(List<JobForecastId> ids) {
DeleteByQueryRequest request = new DeleteByQueryRequest();
request.setSlices(5);

request.indices(RESULTS_INDEX_PATTERN);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1);
boolQuery.must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(),
ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE));
for (ForecastRequestStats forecastToDelete : forecastsToDelete) {
boolQuery.should(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(Job.ID.getPreferredName(), forecastToDelete.getJobId()))
.must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastToDelete.getForecastId())));
for (JobForecastId jobForecastId : ids) {
if (jobForecastId.hasNullValue() == false) {
boolQuery.should(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobForecastId.jobId))
.must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), jobForecastId.forecastId)));
}
}
QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery);
request.setQuery(query);
Expand All @@ -171,4 +184,28 @@ private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forec

return request;
}

static Long parseDateField(Object value) {
if (value instanceof String) { // doc_value field with the epoch_millis format
return Long.parseLong((String)value);
} else if (value instanceof Long) { // pre-6.0 field
return (Long)value;
} else {
return null;
}
}

private static class JobForecastId {
private final String jobId;
private final String forecastId;

private JobForecastId(String jobId, String forecastId) {
this.jobId = jobId;
this.forecastId = forecastId;
}

boolean hasNullValue() {
return jobId == null || forecastId == null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,14 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Bool
.mustNot(activeSnapshotFilter)
.mustNot(retainFilter);

searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE).sort(ElasticsearchMappings.ES_DOC));
SearchSourceBuilder source = new SearchSourceBuilder();
source.query(query);
source.size(MODEL_SNAPSHOT_SEARCH_SIZE);
source.sort(ElasticsearchMappings.ES_DOC);
source.fetchSource(false);
source.docValueField(Job.ID.getPreferredName(), null);
source.docValueField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), null);
searchRequest.source(source);

getClient().execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false));
Expand All @@ -99,11 +106,18 @@ private ActionListener<SearchResponse> expiredSnapshotsListener(String jobId, Ac
@Override
public void onResponse(SearchResponse searchResponse) {
try {
List<ModelSnapshot> modelSnapshots = new ArrayList<>();
List<JobSnapshotId> snapshotIds = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits()) {
modelSnapshots.add(ModelSnapshot.fromJson(hit.getSourceRef()));
JobSnapshotId idPair = new JobSnapshotId(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here is different to the later branches as it doesn't have the new model snapshot retention options added in #56125

stringFieldValueOrNull(hit, Job.ID.getPreferredName()),
stringFieldValueOrNull(hit, ModelSnapshotField.SNAPSHOT_ID.getPreferredName()));

if (idPair.hasNullValue() == false) {
snapshotIds.add(idPair);
}
}
deleteModelSnapshots(new VolatileCursorIterator<>(modelSnapshots), listener);

deleteModelSnapshots(new VolatileCursorIterator<>(snapshotIds), listener);
} catch (Exception e) {
onFailure(e);
}
Expand All @@ -116,14 +130,14 @@ public void onFailure(Exception e) {
};
}

private void deleteModelSnapshots(Iterator<ModelSnapshot> modelSnapshotIterator, ActionListener<Boolean> listener) {
private void deleteModelSnapshots(Iterator<JobSnapshotId> modelSnapshotIterator, ActionListener<Boolean> listener) {
if (modelSnapshotIterator.hasNext() == false) {
listener.onResponse(true);
return;
}
ModelSnapshot modelSnapshot = modelSnapshotIterator.next();
DeleteModelSnapshotAction.Request deleteSnapshotRequest = new DeleteModelSnapshotAction.Request(
modelSnapshot.getJobId(), modelSnapshot.getSnapshotId());
JobSnapshotId idPair = modelSnapshotIterator.next();
DeleteModelSnapshotAction.Request deleteSnapshotRequest =
new DeleteModelSnapshotAction.Request(idPair.jobId, idPair.snapshotId);
getClient().execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse response) {
Expand All @@ -136,9 +150,23 @@ public void onResponse(AcknowledgedResponse response) {

@Override
public void onFailure(Exception e) {
listener.onFailure(new ElasticsearchException("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot ["
+ modelSnapshot.getSnapshotId() + "]", e));
listener.onFailure(new ElasticsearchException("[" + idPair.jobId + "] Failed to delete snapshot ["
+ idPair.snapshotId + "]", e));
}
});
}

static class JobSnapshotId {
private final String jobId;
private final String snapshotId;

JobSnapshotId(String jobId, String snapshotId) {
this.jobId = jobId;
this.snapshotId = snapshotId;
}

boolean hasNullValue() {
return jobId == null || snapshotId == null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,29 @@
package org.elasticsearch.xpack.ml.job.retention;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.search.SearchHit;

import java.util.function.Supplier;

public interface MlDataRemover {
void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier);

/**
* Extract {@code fieldName} from {@code hit} and if it is a string
* return the string else {@code null}.
* @param hit The search hit
* @param fieldName Field to find
* @return value iff the docfield is present and it is a string. Otherwise {@code null}
*/
default String stringFieldValueOrNull(SearchHit hit, String fieldName) {
DocumentField docField = hit.field(fieldName);
if (docField != null) {
Object value = docField.getValue();
if (value instanceof String) {
return (String)value;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ private static SearchResponse createSearchResponse(List<? extends ToXContent> to
return searchResponse;
}

static SearchResponse createSearchResponseFromHits(List<SearchHit> hits) {
SearchHits searchHits = new SearchHits(hits.toArray(new SearchHit[] {}), hits.size(), 1.0f);
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.getHits()).thenReturn(searchHits);
return searchResponse;
}

public void testRemoveGivenNoJobs() throws IOException {
SearchResponse response = createSearchResponse(Collections.emptyList());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ml.job.retention;

import org.elasticsearch.test.ESTestCase;

import java.util.Date;

public class ExpiredForecastsRemoverTests extends ESTestCase {

public void testDateParsing() {
assertEquals(Long.valueOf(1462096800000L), ExpiredForecastsRemover.parseDateField("1462096800000"));
assertEquals(Long.valueOf(1462096800000L), ExpiredForecastsRemover.parseDateField(1462096800000L));
assertNull(ExpiredForecastsRemover.parseDateField(new Date()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -25,7 +26,9 @@
import org.elasticsearch.xpack.core.ml.job.config.JobTests;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.test.SearchHitBuilder;
import org.junit.After;
import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
Expand Down Expand Up @@ -118,11 +121,13 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException
JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
));

List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"),
createModelSnapshot("snapshots-1", "snapshots-1_2"));
List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots));
SearchHit snapshot1_1 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_1");
SearchHit snapshot1_2 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_2");
searchResponsesPerCall.add(
AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Arrays.asList(snapshot1_1, snapshot1_2)));

SearchHit snapshot2_1 = createModelSnapshotQueryHit("snapshots-2", "snapshots-2_1");
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_1)));

createExpiredModelSnapshotsRemover().remove(listener, () -> false);

Expand Down Expand Up @@ -203,12 +208,13 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio
JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
));

List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"),
createModelSnapshot("snapshots-1", "snapshots-1_2"));
List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots));
SearchHit snapshot1_1 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_1");
SearchHit snapshot1_2 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_2");
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(
Arrays.asList(snapshot1_1, snapshot1_2)));

SearchHit snapshot2_2 = createModelSnapshotQueryHit("snapshots-2", "snapshots-2_1");
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_2)));
createExpiredModelSnapshotsRemover().remove(listener, () -> false);

listener.waitToCompletion();
Expand All @@ -224,6 +230,17 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1"));
}

public void testJobSnapshotId() {
ExpiredModelSnapshotsRemover.JobSnapshotId id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", "b");
assertFalse(id.hasNullValue());
id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, "b");
assertTrue(id.hasNullValue());
id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", null);
assertTrue(id.hasNullValue());
id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, null);
assertTrue(id.hasNullValue());
}

@SuppressWarnings("unchecked")
private void givenJobs(List<Job> jobs) throws IOException {
SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs);
Expand Down Expand Up @@ -287,4 +304,10 @@ public Void answer(InvocationOnMock invocationOnMock) {
}).when(client).execute(same(DeleteModelSnapshotAction.INSTANCE), any(), any());
}

private static SearchHit createModelSnapshotQueryHit(String jobId, String snapshotId) {
SearchHitBuilder hitBuilder = new SearchHitBuilder(0);
hitBuilder.addField(Job.ID.getPreferredName(), Collections.singletonList(jobId));
hitBuilder.addField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), Collections.singletonList(snapshotId));
return hitBuilder.build();
}
}
Loading