Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public final class Messages {
public static final String JOB_AUDIT_SNAPSHOT_STORED = "Job model snapshot with id [{0}] stored";
public static final String JOB_AUDIT_REVERTED = "Job model snapshot reverted to ''{0}''";
public static final String JOB_AUDIT_SNAPSHOT_DELETED = "Model snapshot [{0}] with description ''{1}'' deleted";
public static final String JOB_AUDIT_SNAPSHOTS_DELETED = "[{0}] expired model snapshots deleted";
public static final String JOB_AUDIT_FILTER_UPDATED_ON_PROCESS = "Updated filter [{0}] in running process";
public static final String JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS = "Updated calendars in running process";
public static final String JOB_AUDIT_MEMORY_STATUS_SOFT_LIMIT = "Job memory status changed to soft_limit; memory pruning will now be " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ public Date getLatestResultTimeStamp() {
return latestResultTimeStamp;
}

public boolean isRetain() {
return retain;
}

@Override
public int hashCode() {
return Objects.hash(jobId, minVersion, timestamp, description, snapshotId, quantiles, snapshotDocCount, modelSizeStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ public void testDeleteExpiredDataActionDeletesEmptyStateIndices() throws Excepti
is(arrayContaining(".ml-state-000001", ".ml-state-000005", ".ml-state-000007")));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/62699")
public void testDeleteExpiredDataWithStandardThrottle() throws Exception {
testExpiredDeletion(-1.0f, 100);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.persistence.SearchAfterJobsIterator;
import org.elasticsearch.xpack.ml.job.retention.EmptyStateIndexRemover;
import org.elasticsearch.xpack.ml.job.retention.ExpiredForecastsRemover;
Expand Down Expand Up @@ -60,25 +61,27 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
private final ClusterService clusterService;
private final Clock clock;
private final JobConfigProvider jobConfigProvider;
private final JobResultsProvider jobResultsProvider;

@Inject
public TransportDeleteExpiredDataAction(ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, Client client, ClusterService clusterService,
JobConfigProvider jobConfigProvider) {
JobConfigProvider jobConfigProvider, JobResultsProvider jobResultsProvider) {
this(threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, transportService, actionFilters, client, clusterService,
jobConfigProvider, Clock.systemUTC());
jobConfigProvider, jobResultsProvider, Clock.systemUTC());
}

TransportDeleteExpiredDataAction(ThreadPool threadPool, String executor, TransportService transportService,
ActionFilters actionFilters, Client client, ClusterService clusterService,
JobConfigProvider jobConfigProvider, Clock clock) {
JobConfigProvider jobConfigProvider, JobResultsProvider jobResultsProvider, Clock clock) {
super(DeleteExpiredDataAction.NAME, transportService, actionFilters, DeleteExpiredDataAction.Request::new, executor);
this.threadPool = threadPool;
this.executor = executor;
this.client = new OriginSettingClient(client, ClientHelper.ML_ORIGIN);
this.clusterService = clusterService;
this.clock = clock;
this.jobConfigProvider = jobConfigProvider;
this.jobResultsProvider = jobResultsProvider;
}

@Override
Expand Down Expand Up @@ -175,7 +178,7 @@ private List<MlDataRemover> createDataRemovers(OriginSettingClient client,
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), parentTaskId, auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool, parentTaskId),
new ExpiredModelSnapshotsRemover(client,
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), threadPool, parentTaskId),
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), threadPool, parentTaskId, jobResultsProvider, auditor),
new UnusedStateRemover(client, clusterService, parentTaskId),
new EmptyStateIndexRemover(client, parentTaskId),
new UnusedStatsRemover(client, parentTaskId));
Expand All @@ -185,7 +188,11 @@ private List<MlDataRemover> createDataRemovers(List<Job> jobs, TaskId parentTask
return Arrays.asList(
new ExpiredResultsRemover(client, new VolatileCursorIterator<>(jobs), parentTaskId, auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool, parentTaskId),
new ExpiredModelSnapshotsRemover(client, new VolatileCursorIterator<>(jobs), threadPool, parentTaskId),
new ExpiredModelSnapshotsRemover(client,
new VolatileCursorIterator<>(jobs),
threadPool, parentTaskId,
jobResultsProvider,
auditor),
new UnusedStateRemover(client, clusterService, parentTaskId),
new EmptyStateIndexRemover(client, parentTaskId),
new UnusedStatsRemover(client, parentTaskId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
Expand All @@ -25,21 +23,24 @@
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.common.time.TimeUtils;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Deletes all model snapshots that have expired the configured retention time
Expand All @@ -65,11 +66,16 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
private static final int MODEL_SNAPSHOT_SEARCH_SIZE = 10000;

private final ThreadPool threadPool;
private final JobResultsProvider jobResultsProvider;
private final AnomalyDetectionAuditor auditor;

public ExpiredModelSnapshotsRemover(OriginSettingClient client, Iterator<Job> jobIterator,
ThreadPool threadPool, TaskId parentTaskId) {
ThreadPool threadPool, TaskId parentTaskId, JobResultsProvider jobResultsProvider,
AnomalyDetectionAuditor auditor) {
super(client, jobIterator, parentTaskId);
this.threadPool = Objects.requireNonNull(threadPool);
this.jobResultsProvider = jobResultsProvider;
this.auditor = auditor;
}

@Override
Expand Down Expand Up @@ -157,116 +163,85 @@ protected void removeDataBefore(
listener.onResponse(true);
return;
}
LOGGER.debug("Considering model snapshots of job [{}] that have a timestamp before [{}] for removal", job.getId(), cutoffEpochMs);

SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));

QueryBuilder activeSnapshotFilter = QueryBuilders.termQuery(
ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), job.getModelSnapshotId());
QueryBuilder retainFilter = QueryBuilders.termQuery(ModelSnapshot.RETAIN.getPreferredName(), true);
QueryBuilder query = createQuery(job.getId(), cutoffEpochMs)
.filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName()))
.mustNot(activeSnapshotFilter)
.mustNot(retainFilter);

SearchSourceBuilder source = new SearchSourceBuilder();
source.query(query);
source.size(MODEL_SNAPSHOT_SEARCH_SIZE);
source.sort(ModelSnapshot.TIMESTAMP.getPreferredName());
source.fetchSource(false);
source.docValueField(Job.ID.getPreferredName(), null);
source.docValueField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), null);
source.docValueField(ModelSnapshot.TIMESTAMP.getPreferredName(), "epoch_millis");
searchRequest.source(source);
searchRequest.setParentTask(getParentTaskId());
LOGGER.debug(() -> new ParameterizedMessage(
"Considering model snapshots of job [{}] that have a timestamp before [{}] for removal",
job.getId(),
cutoffEpochMs));

long deleteAllBeforeMs = (job.getModelSnapshotRetentionDays() == null)
? 0 : latestTimeMs - TimeValue.timeValueDays(job.getModelSnapshotRetentionDays()).getMillis();
client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), deleteAllBeforeMs, listener), false));
ActionListener<QueryPage<ModelSnapshot>> snapshotsListener = expiredSnapshotsListener(job, deleteAllBeforeMs, listener);
jobResultsProvider.modelSnapshots(job.getId(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was running into async callback hell. But, if we just use the underlying modelSnapshots request from here and then filter out the assigned snapshot later, we reduce the number of calls and it flows more cleanly.

0,
MODEL_SNAPSHOT_SEARCH_SIZE,
null,
String.valueOf(cutoffEpochMs),
ModelSnapshot.TIMESTAMP.getPreferredName(),
false,
null,
snapshotsListener::onResponse,
snapshotsListener::onFailure);
}

private ActionListener<SearchResponse> expiredSnapshotsListener(String jobId, long deleteAllBeforeMs,
ActionListener<Boolean> listener) {
private ActionListener<QueryPage<ModelSnapshot>> expiredSnapshotsListener(Job job,
long deleteAllBeforeMs,
ActionListener<Boolean> listener) {
return new ActionListener<>() {
@Override
public void onResponse(SearchResponse searchResponse) {
public void onResponse(QueryPage<ModelSnapshot> searchResponse) {
long nextToKeepMs = deleteAllBeforeMs;
try {
List<JobSnapshotId> snapshotIds = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits()) {
String timestamp = stringFieldValueOrNull(hit, ModelSnapshot.TIMESTAMP.getPreferredName());
if (timestamp == null) {
LOGGER.warn("Model snapshot document [{}] has a null timestamp field", hit.getId());
List<ModelSnapshot> snapshots = new ArrayList<>();
for (ModelSnapshot snapshot: searchResponse.results()) {
// We don't want to delete the currently used snapshot or a snapshot marked to be retained
if (snapshot.getSnapshotId().equals(job.getModelSnapshotId()) || snapshot.isRetain()) {
continue;
}
long timestampMs = TimeUtils.parseToEpochMs(timestamp);
if (snapshot.getTimestamp() == null) {
LOGGER.warn("Model snapshot document [{}] has a null timestamp field", snapshot.getSnapshotId());
continue;
}
long timestampMs = snapshot.getTimestamp().getTime();
if (timestampMs >= nextToKeepMs) {
do {
nextToKeepMs += MS_IN_ONE_DAY;
} while (timestampMs >= nextToKeepMs);
continue;
}
JobSnapshotId idPair = new JobSnapshotId(
stringFieldValueOrNull(hit, Job.ID.getPreferredName()),
stringFieldValueOrNull(hit, ModelSnapshotField.SNAPSHOT_ID.getPreferredName()));

if (idPair.hasNullValue() == false) {
snapshotIds.add(idPair);
}
snapshots.add(snapshot);
}
deleteModelSnapshots(new VolatileCursorIterator<>(snapshotIds), listener);
deleteModelSnapshots(snapshots, job.getId(), listener);
} catch (Exception e) {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
listener.onFailure(new ElasticsearchException("[" + jobId + "] Search for expired snapshots failed", e));
listener.onFailure(new ElasticsearchException("[{}] Search for expired snapshots failed", e, job.getId()));
}
};
}

private void deleteModelSnapshots(Iterator<JobSnapshotId> modelSnapshotIterator, ActionListener<Boolean> listener) {
if (modelSnapshotIterator.hasNext() == false) {
private void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, String jobId, ActionListener<Boolean> listener) {
if (modelSnapshots.isEmpty()) {
listener.onResponse(true);
return;
}
JobSnapshotId idPair = modelSnapshotIterator.next();
DeleteModelSnapshotAction.Request deleteSnapshotRequest =
new DeleteModelSnapshotAction.Request(idPair.jobId, idPair.snapshotId);
deleteSnapshotRequest.setParentTask(getParentTaskId());
client.execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse response) {
try {
deleteModelSnapshots(modelSnapshotIterator, listener);
} catch (Exception e) {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
listener.onFailure(new ElasticsearchException("[" + idPair.jobId + "] Failed to delete snapshot ["
+ idPair.snapshotId + "]", e));
}
});
JobDataDeleter deleter = new JobDataDeleter(client, jobId);
deleter.deleteModelSnapshots(modelSnapshots, ActionListener.wrap(
bulkResponse -> {
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOTS_DELETED, modelSnapshots.size()));
LOGGER.debug(() -> new ParameterizedMessage(
"[{}] deleted model snapshots {} with descriptions {}",
jobId,
modelSnapshots.stream().map(ModelSnapshot::getSnapshotId).collect(Collectors.toList()),
modelSnapshots.stream().map(ModelSnapshot::getDescription).collect(Collectors.toList())
));
listener.onResponse(true);
},
listener::onFailure
));
}

static class JobSnapshotId {
private final String jobId;
private final String snapshotId;

JobSnapshotId(String jobId, String snapshotId) {
this.jobId = jobId;
this.snapshotId = snapshotId;
}

boolean hasNullValue() {
return jobId == null || snapshotId == null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.retention.MlDataRemover;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -56,7 +57,9 @@ public void setup() {
Client client = mock(Client.class);
ClusterService clusterService = mock(ClusterService.class);
transportDeleteExpiredDataAction = new TransportDeleteExpiredDataAction(threadPool, ThreadPool.Names.SAME, transportService,
new ActionFilters(Collections.emptySet()), client, clusterService, mock(JobConfigProvider.class), Clock.systemUTC());
new ActionFilters(Collections.emptySet()), client, clusterService, mock(JobConfigProvider.class),
mock(JobResultsProvider.class),
Clock.systemUTC());
}

@After
Expand Down
Loading