Skip to content

Commit

Permalink
[ML] Add graceful retry for anomaly detector result indexing failures (
Browse files Browse the repository at this point in the history
…elastic#49508)

All results indexing now retry the amount of times configured in `xpack.ml.persist_results_max_retries`. The retries are done in a semi-random, exponential backoff.
  • Loading branch information
benwtrent authored and SivagurunathanV committed Jan 21, 2020
1 parent 9008cf3 commit 2f6a6b1
Show file tree
Hide file tree
Showing 21 changed files with 960 additions and 223 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.junit.After;
import org.junit.Before;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder;
import static org.hamcrest.Matchers.greaterThan;

public class BulkFailureRetryIT extends MlNativeAutodetectIntegTestCase {

private final String index = "bulk-failure-retry";
private long now = System.currentTimeMillis();
private static long DAY = Duration.ofDays(1).toMillis();
private final String jobId = "bulk-failure-retry-job";
private final String resultsIndex = ".ml-anomalies-custom-bulk-failure-retry-job";

@Before
public void putPastDataIntoIndex() {
client().admin().indices().prepareCreate(index)
.addMapping("type", "time", "type=date", "value", "type=long")
.get();
long twoDaysAgo = now - DAY * 2;
long threeDaysAgo = now - DAY * 3;
writeData(logger, index, 250, threeDaysAgo, twoDaysAgo);
}

@After
public void cleanUpTest() {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.putNull("xpack.ml.persist_results_max_retries")
.putNull("logger.org.elasticsearch.xpack.ml.datafeed.DatafeedJob")
.putNull("logger.org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister")
.putNull("logger.org.elasticsearch.xpack.ml.job.process.autodetect.output")
.build()).get();
cleanUp();
}

private void ensureAnomaliesWrite() throws InterruptedException {
Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), false).build();
AtomicReference<AcknowledgedResponse> acknowledgedResponseHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(
listener -> client().admin().indices().prepareUpdateSettings(resultsIndex).setSettings(settings).execute(listener),
acknowledgedResponseHolder,
exceptionHolder);
if (exceptionHolder.get() != null) {
fail("FAILED TO MARK ["+ resultsIndex + "] as read-write again" + exceptionHolder.get());
}
}

private void setAnomaliesReadOnlyBlock() throws InterruptedException {
Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true).build();
AtomicReference<AcknowledgedResponse> acknowledgedResponseHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(
listener -> client().admin().indices().prepareUpdateSettings(resultsIndex).setSettings(settings).execute(listener),
acknowledgedResponseHolder,
exceptionHolder);
if (exceptionHolder.get() != null) {
fail("FAILED TO MARK ["+ resultsIndex + "] as read-ONLY: " + exceptionHolder.get());
}
}

public void testBulkFailureRetries() throws Exception {
Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null);
job.setResultsIndexName(jobId);

DatafeedConfig.Builder datafeedConfigBuilder =
createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index));
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
registerJob(job);
putJob(job);
openJob(job.getId());
registerDatafeed(datafeedConfig);
putDatafeed(datafeedConfig);
long twoDaysAgo = now - 2 * DAY;
startDatafeed(datafeedConfig.getId(), 0L, twoDaysAgo);
waitUntilJobIsClosed(jobId);

// Get the job stats
Bucket initialLatestBucket = getLatestFinalizedBucket(jobId);
assertThat(initialLatestBucket.getEpoch(), greaterThan(0L));

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put("logger.org.elasticsearch.xpack.ml.datafeed.DatafeedJob", "TRACE")
.put("logger.org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister", "TRACE")
.put("logger.org.elasticsearch.xpack.ml.job.process.autodetect.output", "TRACE")
.put("xpack.ml.persist_results_max_retries", "15")
.build()).get();

setAnomaliesReadOnlyBlock();

int moreDocs = 1_000;
writeData(logger, index, moreDocs, twoDaysAgo, now);

openJob(job.getId());
startDatafeed(datafeedConfig.getId(), twoDaysAgo, now);

ensureAnomaliesWrite();
waitUntilJobIsClosed(jobId);

Bucket newLatestBucket = getLatestFinalizedBucket(jobId);
assertThat(newLatestBucket.getEpoch(), greaterThan(initialLatestBucket.getEpoch()));
}

private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field) {
return createJob(id, bucketSpan, function, field, null);
}

private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field, String summaryCountField) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
dataDescription.setTimeField("time");
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);

Detector.Builder d = new Detector.Builder(function, field);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()))
.setBucketSpan(bucketSpan)
.setSummaryCountFieldName(summaryCountField);

return new Job.Builder().setId(id).setAnalysisConfig(analysisConfig).setDataDescription(dataDescription);
}

private void writeData(Logger logger, String index, long numDocs, long start, long end) {
int maxDelta = (int) (end - start - 1);
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int i = 0; i < numDocs; i++) {
IndexRequest indexRequest = new IndexRequest(index);
long timestamp = start + randomIntBetween(0, maxDelta);
assert timestamp >= start && timestamp < end;
indexRequest.source("time", timestamp, "value", i);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
if (bulkResponse.hasFailures()) {
int failures = 0;
for (BulkItemResponse itemResponse : bulkResponse) {
if (itemResponse.isFailed()) {
failures++;
logger.error("Item response failure [{}]", itemResponse.getFailureMessage());
}
}
fail("Bulk response contained " + failures + " failures");
}
logger.info("Indexed [{}] documents", numDocs);
}

private Bucket getLatestFinalizedBucket(String jobId) {
GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId);
getBucketsRequest.setExcludeInterim(true);
getBucketsRequest.setSort(Result.TIMESTAMP.getPreferredName());
getBucketsRequest.setDescending(true);
getBucketsRequest.setPageParams(new PageParams(0, 1));
return getBuckets(getBucketsRequest).get(0);
}

private <T> void blockingCall(Consumer<ActionListener<T>> function,
AtomicReference<T> response,
AtomicReference<Exception> error) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
ActionListener<T> listener = ActionListener.wrap(
r -> {
response.set(r);
latch.countDown();
},
e -> {
error.set(e);
latch.countDown();
}
);

function.accept(listener);
latch.await();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@
import org.elasticsearch.xpack.ml.rest.results.RestGetRecordsAction;
import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction;
import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;

import java.io.IOException;
import java.math.BigInteger;
Expand Down Expand Up @@ -446,7 +447,8 @@ public List<Setting<?>> getSettings() {
MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION,
InferenceProcessor.MAX_INFERENCE_PROCESSORS,
ModelLoadingService.INFERENCE_MODEL_CACHE_SIZE,
ModelLoadingService.INFERENCE_MODEL_CACHE_TTL
ModelLoadingService.INFERENCE_MODEL_CACHE_TTL,
ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES
);
}

Expand Down Expand Up @@ -520,9 +522,12 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor = new DataFrameAnalyticsAuditor(client, clusterService.getNodeName());
InferenceAuditor inferenceAuditor = new InferenceAuditor(client, clusterService.getNodeName());
this.dataFrameAnalyticsAuditor.set(dataFrameAnalyticsAuditor);
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(client, clusterService, settings);
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
JobResultsPersister jobResultsPersister = new JobResultsPersister(client);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client);
JobResultsPersister jobResultsPersister = new JobResultsPersister(client, resultsPersisterService, anomalyDetectionAuditor);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client,
resultsPersisterService,
anomalyDetectionAuditor);
JobConfigProvider jobConfigProvider = new JobConfigProvider(client, xContentRegistry);
DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry);
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(client, clusterService, threadPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private ActionListener<RevertModelSnapshotAction.Response> wrapRevertDataCountsL
return ActionListener.wrap(response -> {
jobResultsProvider.dataCounts(jobId, counts -> {
counts.setLatestRecordTimeStamp(modelSnapshot.getLatestRecordTimeStamp());
jobDataCountsPersister.persistDataCounts(jobId, counts, new ActionListener<Boolean>() {
jobDataCountsPersister.persistDataCountsAsync(jobId, counts, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
listener.onResponse(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;

import java.io.IOException;

Expand All @@ -33,30 +36,60 @@ public class JobDataCountsPersister {

private static final Logger logger = LogManager.getLogger(JobDataCountsPersister.class);

private final ResultsPersisterService resultsPersisterService;
private final Client client;
private final AnomalyDetectionAuditor auditor;

public JobDataCountsPersister(Client client) {
public JobDataCountsPersister(Client client, ResultsPersisterService resultsPersisterService, AnomalyDetectionAuditor auditor) {
this.resultsPersisterService = resultsPersisterService;
this.client = client;
this.auditor = auditor;
}

private XContentBuilder serialiseCounts(DataCounts counts) throws IOException {
private static XContentBuilder serialiseCounts(DataCounts counts) throws IOException {
XContentBuilder builder = jsonBuilder();
return counts.toXContent(builder, ToXContent.EMPTY_PARAMS);
}

/**
* Update the job's data counts stats and figures.
* NOTE: This call is synchronous and pauses the calling thread.
* @param jobId Job to update
* @param counts The counts
*/
public void persistDataCounts(String jobId, DataCounts counts) {
try {
resultsPersisterService.indexWithRetry(jobId,
AnomalyDetectorsIndex.resultsWriteAlias(jobId),
counts,
ToXContent.EMPTY_PARAMS,
WriteRequest.RefreshPolicy.NONE,
DataCounts.documentId(jobId),
() -> true,
(msg) -> auditor.warning(jobId, "Job data_counts " + msg));
} catch (IOException ioe) {
logger.error(() -> new ParameterizedMessage("[{}] Failed writing data_counts stats", jobId), ioe);
} catch (Exception ex) {
logger.error(() -> new ParameterizedMessage("[{}] Failed persisting data_counts stats", jobId), ex);
}
}

/**
* The same as {@link JobDataCountsPersister#persistDataCounts(String, DataCounts)} but done Asynchronously.
*
* Two differences are:
* - The listener is notified on persistence failure
* - If the persistence fails, it is not automatically retried
* @param jobId Job to update
* @param counts The counts
* @param listener ActionType response listener
*/
public void persistDataCounts(String jobId, DataCounts counts, ActionListener<Boolean> listener) {
public void persistDataCountsAsync(String jobId, DataCounts counts, ActionListener<Boolean> listener) {
try (XContentBuilder content = serialiseCounts(counts)) {
final IndexRequest request = new IndexRequest(AnomalyDetectorsIndex.resultsWriteAlias(jobId))
.id(DataCounts.documentId(jobId))
.source(content);
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, request, new ActionListener<IndexResponse>() {
.id(DataCounts.documentId(jobId))
.source(content);
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, request, new ActionListener<>() {
@Override
public void onResponse(IndexResponse indexResponse) {
listener.onResponse(true);
Expand All @@ -68,7 +101,9 @@ public void onFailure(Exception e) {
}
});
} catch (IOException ioe) {
logger.warn((Supplier<?>)() -> new ParameterizedMessage("[{}] Error serialising DataCounts stats", jobId), ioe);
String msg = new ParameterizedMessage("[{}] Failed writing data_counts stats", jobId).getFormattedMessage();
logger.error(msg, ioe);
listener.onFailure(ExceptionsHelper.serverError(msg, ioe));
}
}
}
Loading

0 comments on commit 2f6a6b1

Please sign in to comment.