From 2f7284dc1062ec7d7495ccfa1626feb4c4990985 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Mon, 18 May 2020 08:46:42 -0400 Subject: [PATCH] [ML] relax throttling on expired data cleanup (#56711) (#56895) Throttling nightly cleanup as much as we do has been over cautious. Night cleanup should be more lenient in its throttling. We still keep the same batch size, but now the requests per second scale with the number of data nodes. If we have more than 5 data nodes, we don't throttle at all. Additionally, the API now has `requests_per_second` and `timeout` set. So users calling the API directly can set the throttling. This commit also adds a new setting `xpack.ml.nightly_maintenance_requests_per_second`. This will allow users to adjust throttling of the nightly maintenance. --- .../client/MLRequestConverters.java | 4 +- .../client/ml/DeleteExpiredDataRequest.java | 64 ++++++++++++++- .../client/MLRequestConvertersTests.java | 8 +- .../MlClientDocumentationIT.java | 6 +- .../ml/DeleteExpiredDataRequestTests.java | 62 +++++++++++++++ .../ml/delete-expired-data.asciidoc | 4 + .../apis/delete-expired-data.asciidoc | 11 +++ .../ml/action/DeleteExpiredDataAction.java | 79 ++++++++++++++++++- .../DeleteExpiredDataActionRequestTests.java | 36 +++++++++ .../ml/integration/DeleteExpiredDataIT.java | 41 ++++++---- .../ml/integration/MlNativeIntegTestCase.java | 10 +++ .../xpack/ml/MachineLearning.java | 20 ++++- .../xpack/ml/MlDailyMaintenanceService.java | 22 ++++-- .../xpack/ml/MlInitializationService.java | 69 ++++++++-------- .../TransportDeleteExpiredDataAction.java | 41 ++++++++-- .../AbstractExpiredJobDataRemover.java | 26 ++++-- .../job/retention/EmptyStateIndexRemover.java | 2 +- .../retention/ExpiredForecastsRemover.java | 15 +++- .../ExpiredModelSnapshotsRemover.java | 8 +- .../job/retention/ExpiredResultsRemover.java | 20 +++-- .../xpack/ml/job/retention/MlDataRemover.java | 2 +- .../ml/job/retention/UnusedStateRemover.java | 12 +-- .../ml/rest/RestDeleteExpiredDataAction.java | 4 +- .../ml/MlDailyMaintenanceServiceTests.java | 3 +- .../ml/MlInitializationServiceTests.java | 21 +---- ...TransportDeleteExpiredDataActionTests.java | 10 ++- .../AbstractExpiredJobDataRemoverTests.java | 14 +++- .../EmptyStateIndexRemoverTests.java | 10 +-- .../ExpiredModelSnapshotsRemoverTests.java | 10 +-- .../retention/ExpiredResultsRemoverTests.java | 10 +-- .../api/ml.delete_expired_data.json | 3 + .../test/ml/delete_expired_data.yml | 36 +++++++++ 32 files changed, 539 insertions(+), 144 deletions(-) create mode 100644 client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteExpiredDataRequestTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataActionRequestTests.java create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/test/ml/delete_expired_data.yml diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java index 54dd11bf6caff..db64869c337a0 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java @@ -167,13 +167,13 @@ static Request closeJob(CloseJobRequest closeJobRequest) throws IOException { return request; } - static Request deleteExpiredData(DeleteExpiredDataRequest deleteExpiredDataRequest) { + static Request deleteExpiredData(DeleteExpiredDataRequest deleteExpiredDataRequest) throws IOException { String endpoint = new EndpointBuilder() .addPathPartAsIs("_ml") .addPathPartAsIs("_delete_expired_data") .build(); Request request = new Request(HttpDelete.METHOD_NAME, endpoint); - + request.setEntity(createEntity(deleteExpiredDataRequest, REQUEST_BODY_CONTENT_TYPE)); return request; } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteExpiredDataRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteExpiredDataRequest.java index 25e340a8bab15..7a9cda69ffca7 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteExpiredDataRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteExpiredDataRequest.java @@ -20,20 +20,82 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; /** * Request to delete expired model snapshots and forecasts */ -public class DeleteExpiredDataRequest extends ActionRequest { +public class DeleteExpiredDataRequest extends ActionRequest implements ToXContentObject { + static final String REQUESTS_PER_SECOND = "requests_per_second"; + static final String TIMEOUT = "timeout"; + private final Float requestsPerSecond; + private final TimeValue timeout; /** * Create a new request to delete expired data */ public DeleteExpiredDataRequest() { + this(null, null); + } + + public DeleteExpiredDataRequest(Float requestsPerSecond, TimeValue timeout) { + this.requestsPerSecond = requestsPerSecond; + this.timeout = timeout; + } + + /** + * The requests allowed per second in the underlying Delete by Query requests executed. + * + * `-1.0f` indicates that the standard nightly cleanup behavior should be ran. + * Throttling scales according to the number of data nodes. + * `null` is default and means no throttling will occur. + */ + public Float getRequestsPerSecond() { + return requestsPerSecond; + } + + /** + * Indicates how long the deletion request will run until it timesout. + * + * Default value is 8 hours. + */ + public TimeValue getTimeout() { + return timeout; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeleteExpiredDataRequest that = (DeleteExpiredDataRequest) o; + return Objects.equals(requestsPerSecond, that.requestsPerSecond) && + Objects.equals(timeout, that.timeout); } @Override public ActionRequestValidationException validate() { return null; } + + public int hashCode() { + return Objects.hash(requestsPerSecond, timeout); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (requestsPerSecond != null) { + builder.field(REQUESTS_PER_SECOND, requestsPerSecond); + } + if (timeout != null) { + builder.field(TIMEOUT, timeout.getStringRep()); + } + builder.endObject(); + return builder; + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java index ac4196b4cc152..4f0c63ba4c611 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java @@ -214,12 +214,16 @@ public void testCloseJob() throws Exception { requestEntityToString(request)); } - public void testDeleteExpiredData() { - DeleteExpiredDataRequest deleteExpiredDataRequest = new DeleteExpiredDataRequest(); + public void testDeleteExpiredData() throws Exception { + float requestsPerSec = randomBoolean() ? -1.0f : (float)randomDoubleBetween(0.0, 100000.0, false); + DeleteExpiredDataRequest deleteExpiredDataRequest = new DeleteExpiredDataRequest( + requestsPerSec, + TimeValue.timeValueHours(1)); Request request = MLRequestConverters.deleteExpiredData(deleteExpiredDataRequest); assertEquals(HttpDelete.METHOD_NAME, request.getMethod()); assertEquals("/_ml/_delete_expired_data", request.getEndpoint()); + assertEquals("{\"requests_per_second\":" + requestsPerSec + ",\"timeout\":\"1h\"}", requestEntityToString(request)); } public void testDeleteJob() { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index 72bfe56c22848..dcbe83a567ab8 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -2035,7 +2035,11 @@ public void testDeleteExpiredData() throws IOException, InterruptedException { MachineLearningIT.buildJob(jobId); { // tag::delete-expired-data-request - DeleteExpiredDataRequest request = new DeleteExpiredDataRequest(); // <1> + DeleteExpiredDataRequest request = new DeleteExpiredDataRequest( // <1> + 1000.0f, // <2> + TimeValue.timeValueHours(12) // <3> + ); + // end::delete-expired-data-request // tag::delete-expired-data-execute diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteExpiredDataRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteExpiredDataRequestTests.java new file mode 100644 index 0000000000000..b5ed7a1e94b09 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteExpiredDataRequestTests.java @@ -0,0 +1,62 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; + + +public class DeleteExpiredDataRequestTests extends AbstractXContentTestCase { + + private static ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "delete_expired_data_request", + true, + (a) -> new DeleteExpiredDataRequest((Float) a[0], (TimeValue) a[1]) + ); + static { + PARSER.declareFloat(ConstructingObjectParser.optionalConstructorArg(), + new ParseField(DeleteExpiredDataRequest.REQUESTS_PER_SECOND)); + PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> TimeValue.parseTimeValue(p.text(), DeleteExpiredDataRequest.TIMEOUT), + new ParseField(DeleteExpiredDataRequest.TIMEOUT), + ObjectParser.ValueType.STRING); + } + + @Override + protected DeleteExpiredDataRequest createTestInstance() { + return new DeleteExpiredDataRequest(randomBoolean() ? null : randomFloat(), + randomBoolean() ? null : TimeValue.parseTimeValue(randomTimeValue(), "test")); + } + + @Override + protected DeleteExpiredDataRequest doParseInstance(XContentParser parser) throws IOException { + return PARSER.apply(parser, null); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } +} diff --git a/docs/java-rest/high-level/ml/delete-expired-data.asciidoc b/docs/java-rest/high-level/ml/delete-expired-data.asciidoc index 8dc47750cbefe..e51e44c5549cd 100644 --- a/docs/java-rest/high-level/ml/delete-expired-data.asciidoc +++ b/docs/java-rest/high-level/ml/delete-expired-data.asciidoc @@ -21,6 +21,10 @@ A `DeleteExpiredDataRequest` object does not require any arguments. include-tagged::{doc-tests-file}[{api}-request] --------------------------------------------------- <1> Constructing a new request. +<2> Providing requests per second throttling for the + deletion processes. Default is no throttling. +<3> Setting how long the deletion processes will be allowed + to run before they are canceled. Default value is `8h` (8 hours). [id="{upid}-{api}-response"] ==== Delete Expired Data Response diff --git a/docs/reference/ml/anomaly-detection/apis/delete-expired-data.asciidoc b/docs/reference/ml/anomaly-detection/apis/delete-expired-data.asciidoc index c1450ec43f8ee..44679911ab255 100644 --- a/docs/reference/ml/anomaly-detection/apis/delete-expired-data.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/delete-expired-data.asciidoc @@ -27,6 +27,17 @@ Deletes all job results, model snapshots and forecast data that have exceeded their `retention days` period. Machine learning state documents that are not associated with any job are also deleted. +[[ml-delete-expired-data-request-body]] +==== {api-request-body-title} + +`requests_per_second`:: +(Optional, float) The desired requests per second for the deletion processes. +The default behavior is no throttling. + +`timeout`:: +(Optional, string) How long can the underlying delete processes run until they are canceled. +The default value is `8h` (8 hours). + [[ml-delete-expired-data-example]] ==== {api-examples-title} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java index eb2d3e94c9084..ad5b85757108c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.ml.action; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; @@ -14,6 +15,8 @@ import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -31,20 +34,94 @@ private DeleteExpiredDataAction() { public static class Request extends ActionRequest { + public static final ParseField REQUESTS_PER_SECOND = new ParseField("requests_per_second"); + public static final ParseField TIMEOUT = new ParseField("timeout"); + + public static final ObjectParser PARSER = new ObjectParser<>( + "delete_expired_data_request", + false, + Request::new); + + static { + PARSER.declareFloat(Request::setRequestsPerSecond, REQUESTS_PER_SECOND); + PARSER.declareString((obj, value) -> obj.setTimeout(TimeValue.parseTimeValue(value, TIMEOUT.getPreferredName())), + TIMEOUT); + } + + private Float requestsPerSecond; + private TimeValue timeout; + public Request() {} + public Request(Float requestsPerSecond, TimeValue timeValue) { + this.requestsPerSecond = requestsPerSecond; + this.timeout = timeValue; + } + public Request(StreamInput in) throws IOException { super(in); + if (in.getVersion().onOrAfter(Version.V_7_8_0)) { + this.requestsPerSecond = in.readOptionalFloat(); + this.timeout = in.readOptionalTimeValue(); + } else { + this.requestsPerSecond = null; + this.timeout = null; + } + } + + public Float getRequestsPerSecond() { + return requestsPerSecond; + } + + public TimeValue getTimeout() { + return timeout; + } + + public Request setRequestsPerSecond(Float requestsPerSecond) { + this.requestsPerSecond = requestsPerSecond; + return this; + } + + public Request setTimeout(TimeValue timeout) { + this.timeout = timeout; + return this; } @Override public ActionRequestValidationException validate() { + if (this.requestsPerSecond != null && this.requestsPerSecond != -1.0f && this.requestsPerSecond <= 0) { + ActionRequestValidationException requestValidationException = new ActionRequestValidationException(); + requestValidationException.addValidationError("[requests_per_second] must either be -1 or greater than 0"); + return requestValidationException; + } return null; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Objects.equals(requestsPerSecond, request.requestsPerSecond) + && Objects.equals(timeout, request.timeout); + } + + @Override + public int hashCode() { + return Objects.hash(requestsPerSecond, timeout); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_7_8_0)) { + out.writeOptionalFloat(requestsPerSecond); + out.writeOptionalTimeValue(timeout); + } + } } static class RequestBuilder extends ActionRequestBuilder { - RequestBuilder(ElasticsearchClient client, DeleteExpiredDataAction action) { super(client, action, new Request()); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataActionRequestTests.java new file mode 100644 index 0000000000000..91fc144a8d085 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataActionRequestTests.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase; +import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction.Request; + +public class DeleteExpiredDataActionRequestTests extends AbstractBWCWireSerializationTestCase { + + @Override + protected Request createTestInstance() { + return new Request( + randomBoolean() ? null : randomFloat(), + randomBoolean() ? null : TimeValue.parseTimeValue(randomTimeValue(), "test") + ); + } + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request mutateInstanceForVersion(Request instance, Version version) { + if (version.before(Version.V_7_8_0)) { + return new Request(); + } + return instance; + } +} diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 6abf68b9b0605..ee895121e8383 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -96,12 +96,19 @@ public void testDeleteExpiredData_GivenNothingToDelete() throws Exception { client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get(); } - @AwaitsFix(bugUrl = "https://github.com/elastic/ml-cpp/pulls/468") - public void testDeleteExpiredData() throws Exception { + public void testDeleteExpiredDataNoThrottle() throws Exception { + testExpiredDeletion(null, 10010); + } + + public void testDeleteExpiredDataWithStandardThrottle() throws Exception { + testExpiredDeletion(-1.0f, 100); + } + + private void testExpiredDeletion(Float customThrottle, int numUnusedState) throws Exception { // Index some unused state documents (more than 10K to test scrolling works) String mlStateIndexName = AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001"; BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < 10010; i++) { + for (int i = 0; i < numUnusedState; i++) { String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i); IndexRequest indexRequest = new IndexRequest(mlStateIndexName) @@ -167,10 +174,10 @@ public void testDeleteExpiredData() throws Exception { // We must set a very small value for expires_in to keep this testable as the deletion cutoff point is the moment // the DeleteExpiredDataAction is called. - String forecastShortExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), TimeValue.timeValueSeconds(1)); + String forecastShortExpiryId = forecast(job.getId(), TimeValue.timeValueHours(1), TimeValue.timeValueSeconds(1)); shortExpiryForecastIds.add(forecastShortExpiryId); - String forecastDefaultExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), null); - String forecastNoExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), TimeValue.ZERO); + String forecastDefaultExpiryId = forecast(job.getId(), TimeValue.timeValueHours(1), null); + String forecastNoExpiryId = forecast(job.getId(), TimeValue.timeValueHours(1), TimeValue.ZERO); waitForecastToFinish(job.getId(), forecastShortExpiryId); waitForecastToFinish(job.getId(), forecastDefaultExpiryId); waitForecastToFinish(job.getId(), forecastNoExpiryId); @@ -197,9 +204,9 @@ public void testDeleteExpiredData() throws Exception { retainAllSnapshots("snapshots-retention-with-retain"); long totalModelSizeStatsBeforeDelete = client().prepareSearch("*") - .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) - .setQuery(QueryBuilders.termQuery("result_type", "model_size_stats")) - .get().getHits().getTotalHits().value; + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) + .setQuery(QueryBuilders.termQuery("result_type", "model_size_stats")) + .get().getHits().getTotalHits().value; long totalNotificationsCountBeforeDelete = client().prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX).get().getHits().getTotalHits().value; assertThat(totalModelSizeStatsBeforeDelete, greaterThan(0L)); @@ -216,7 +223,7 @@ public void testDeleteExpiredData() throws Exception { assertThat(indexUnusedStateDocsResponse.get().status(), equalTo(RestStatus.OK)); // Now call the action under test - assertThat(deleteExpiredData().isDeleted(), is(true)); + assertThat(deleteExpiredData(customThrottle).isDeleted(), is(true)); // no-retention job should have kept all data assertThat(getBuckets("no-retention").size(), is(greaterThanOrEqualTo(70))); @@ -246,9 +253,9 @@ public void testDeleteExpiredData() throws Exception { assertThat(getModelSnapshots("results-and-snapshots-retention").size(), equalTo(1)); long totalModelSizeStatsAfterDelete = client().prepareSearch("*") - .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) - .setQuery(QueryBuilders.termQuery("result_type", "model_size_stats")) - .get().getHits().getTotalHits().value; + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) + .setQuery(QueryBuilders.termQuery("result_type", "model_size_stats")) + .get().getHits().getTotalHits().value; long totalNotificationsCountAfterDelete = client().prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX).get().getHits().getTotalHits().value; assertThat(totalModelSizeStatsAfterDelete, equalTo(totalModelSizeStatsBeforeDelete)); @@ -268,10 +275,10 @@ public void testDeleteExpiredData() throws Exception { // Verify .ml-state doesn't contain unused state documents SearchResponse stateDocsResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()) - .setFetchSource(false) - .setTrackTotalHits(true) - .setSize(10000) - .get(); + .setFetchSource(false) + .setTrackTotalHits(true) + .setSize(10000) + .get(); // Assert at least one state doc for each job assertThat(stateDocsResponse.getHits().getTotalHits().value, greaterThanOrEqualTo(5L)); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java index f1271f7f48901..be7399bc54e77 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java @@ -197,6 +197,16 @@ protected DeleteExpiredDataAction.Response deleteExpiredData() throws Exception return response; } + protected DeleteExpiredDataAction.Response deleteExpiredData(Float customThrottle) throws Exception { + DeleteExpiredDataAction.Request request = new DeleteExpiredDataAction.Request(); + request.setRequestsPerSecond(customThrottle); + DeleteExpiredDataAction.Response response = client().execute(DeleteExpiredDataAction.INSTANCE, request).get(); + // We need to refresh to ensure the deletion is visible + refresh("*"); + + return response; + } + protected PutFilterAction.Response putMlFilter(MlFilter filter) { return client().execute(PutFilterAction.INSTANCE, new PutFilterAction.Request(filter)).actionGet(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index b8ea35915c50d..b346574d354d8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -407,6 +407,23 @@ public Set getRoles() { public static final Setting MIN_DISK_SPACE_OFF_HEAP = Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Setting.Property.NodeScope); + // Requests per second throttling for the nightly maintenance task + public static final Setting NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND = + new Setting<>( + "xpack.ml.nightly_maintenance_requests_per_second", + (s) -> Float.toString(-1.0f), + (s) -> { + float value = Float.parseFloat(s); + if (value <= 0.0f && value != -1.0f) { + throw new IllegalArgumentException("Failed to parse value [" + + s + "] for setting [xpack.ml.nightly_maintenance_requests_per_second] must be > 0.0 or exactly equal to -1.0"); + } + return value; + }, + Property.Dynamic, + Property.NodeScope + ); + private static final Logger logger = LogManager.getLogger(MachineLearning.class); private final Settings settings; @@ -454,7 +471,8 @@ public List> getSettings() { InferenceProcessor.MAX_INFERENCE_PROCESSORS, ModelLoadingService.INFERENCE_MODEL_CACHE_SIZE, ModelLoadingService.INFERENCE_MODEL_CACHE_TTL, - ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES)); + ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, + NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND)); } public Settings additionalSettings() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index 1a9ff34522f32..b529495b7c3f9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; @@ -51,19 +52,25 @@ public class MlDailyMaintenanceService implements Releasable { private final Supplier schedulerProvider; private volatile Scheduler.Cancellable cancellable; + private volatile float deleteExpiredDataRequestsPerSecond; - MlDailyMaintenanceService(ThreadPool threadPool, Client client, ClusterService clusterService, + MlDailyMaintenanceService(Settings settings, ThreadPool threadPool, Client client, ClusterService clusterService, MlAssignmentNotifier mlAssignmentNotifier, Supplier scheduleProvider) { this.threadPool = Objects.requireNonNull(threadPool); this.client = Objects.requireNonNull(client); this.clusterService = Objects.requireNonNull(clusterService); this.mlAssignmentNotifier = Objects.requireNonNull(mlAssignmentNotifier); this.schedulerProvider = Objects.requireNonNull(scheduleProvider); + this.deleteExpiredDataRequestsPerSecond = MachineLearning.NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND.get(settings); } - public MlDailyMaintenanceService(ClusterName clusterName, ThreadPool threadPool, Client client, ClusterService clusterService, - MlAssignmentNotifier mlAssignmentNotifier) { - this(threadPool, client, clusterService, mlAssignmentNotifier, () -> delayToNextTime(clusterName)); + public MlDailyMaintenanceService(Settings settings, ClusterName clusterName, ThreadPool threadPool, + Client client, ClusterService clusterService, MlAssignmentNotifier mlAssignmentNotifier) { + this(settings, threadPool, client, clusterService, mlAssignmentNotifier, () -> delayToNextTime(clusterName)); + } + + void setDeleteExpiredDataRequestsPerSecond(float value) { + this.deleteExpiredDataRequestsPerSecond = value; } /** @@ -101,7 +108,7 @@ public synchronized void stop() { } } - public boolean isStarted() { + boolean isStarted() { return cancellable != null; } @@ -129,7 +136,10 @@ private void triggerTasks() { return; } LOGGER.info("triggering scheduled [ML] maintenance tasks"); - executeAsyncWithOrigin(client, ML_ORIGIN, DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(), + executeAsyncWithOrigin(client, + ML_ORIGIN, + DeleteExpiredDataAction.INSTANCE, + new DeleteExpiredDataAction.Request(deleteExpiredDataRequestsPerSecond, TimeValue.timeValueHours(8)), ActionListener.wrap( response -> { if (response.isDeleted()) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java index 67b4e1e36b7e1..2859bd0c6f732 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java @@ -26,34 +26,56 @@ class MlInitializationService implements LocalNodeMasterListener, ClusterStateLi private static final Logger logger = LogManager.getLogger(MlInitializationService.class); - private final Settings settings; - private final ThreadPool threadPool; - private final ClusterService clusterService; private final Client client; - private final MlAssignmentNotifier mlAssignmentNotifier; private final AtomicBoolean isIndexCreationInProgress = new AtomicBoolean(false); - private volatile MlDailyMaintenanceService mlDailyMaintenanceService; + private final MlDailyMaintenanceService mlDailyMaintenanceService; MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client, MlAssignmentNotifier mlAssignmentNotifier) { - this.settings = Objects.requireNonNull(settings); - this.threadPool = Objects.requireNonNull(threadPool); - this.clusterService = Objects.requireNonNull(clusterService); + this(client, + new MlDailyMaintenanceService( + settings, + Objects.requireNonNull(clusterService).getClusterName(), + threadPool, + client, + clusterService, + mlAssignmentNotifier + ), + clusterService); + } + + // For testing + MlInitializationService(Client client, MlDailyMaintenanceService dailyMaintenanceService, ClusterService clusterService) { this.client = Objects.requireNonNull(client); - this.mlAssignmentNotifier = Objects.requireNonNull(mlAssignmentNotifier); + this.mlDailyMaintenanceService = dailyMaintenanceService; clusterService.addListener(this); clusterService.addLocalNodeMasterListener(this); + clusterService.addLifecycleListener(new LifecycleListener() { + @Override + public void afterStart() { + clusterService.getClusterSettings().addSettingsUpdateConsumer( + MachineLearning.NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND, + mlDailyMaintenanceService::setDeleteExpiredDataRequestsPerSecond + ); + } + + @Override + public void beforeStop() { + offMaster(); + } + }); } + @Override public void onMaster() { - installDailyMaintenanceService(); + mlDailyMaintenanceService.start(); } @Override public void offMaster() { - uninstallDailyMaintenanceService(); + mlDailyMaintenanceService.stop(); } @Override @@ -85,35 +107,10 @@ public String executorName() { return ThreadPool.Names.GENERIC; } - private synchronized void installDailyMaintenanceService() { - if (mlDailyMaintenanceService == null) { - mlDailyMaintenanceService = - new MlDailyMaintenanceService(clusterService.getClusterName(), threadPool, client, clusterService, mlAssignmentNotifier); - mlDailyMaintenanceService.start(); - clusterService.addLifecycleListener(new LifecycleListener() { - @Override - public void beforeStop() { - uninstallDailyMaintenanceService(); - } - }); - } - } - - private synchronized void uninstallDailyMaintenanceService() { - if (mlDailyMaintenanceService != null) { - mlDailyMaintenanceService.stop(); - mlDailyMaintenanceService = null; - } - } - /** For testing */ MlDailyMaintenanceService getDailyMaintenanceService() { return mlDailyMaintenanceService; } - /** For testing */ - synchronized void setDailyMaintenanceService(MlDailyMaintenanceService service) { - mlDailyMaintenanceService = service; - } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index fe3d1a73c89d8..4357fa438ba75 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -43,8 +44,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction listener) { logger.info("Deleting expired data"); - Instant timeoutTime = Instant.now(clock).plus(MAX_DURATION); + Instant timeoutTime = Instant.now(clock).plus( + request.getTimeout() == null ? DEFAULT_MAX_DURATION : Duration.ofMillis(request.getTimeout().millis()) + ); + Supplier isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime); - threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener, isTimedOutSupplier)); + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute( + () -> deleteExpiredData(request, listener, isTimedOutSupplier) + ); } - private void deleteExpiredData(ActionListener listener, + private void deleteExpiredData(DeleteExpiredDataAction.Request request, + ActionListener listener, Supplier isTimedOutSupplier) { AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName()); List dataRemovers = Arrays.asList( @@ -89,24 +95,43 @@ private void deleteExpiredData(ActionListener new EmptyStateIndexRemover(client) ); Iterator dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers); - deleteExpiredData(dataRemoversIterator, listener, isTimedOutSupplier, true); + // If there is no throttle provided, default to none + float requestsPerSec = request.getRequestsPerSecond() == null ? Float.POSITIVE_INFINITY : request.getRequestsPerSecond(); + int numberOfDatanodes = Math.max(clusterService.state().getNodes().getDataNodes().size(), 1); + if (requestsPerSec == -1.0f) { + // With DEFAULT_SCROLL_SIZE = 1000 and a single data node this implies we spread deletion of + // 1 million documents over 5000 seconds ~= 83 minutes. + // If we have > 5 data nodes, we don't set our throttling. + requestsPerSec = numberOfDatanodes < 5 ? + (float)(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE / 5) * numberOfDatanodes : + Float.POSITIVE_INFINITY; + } + deleteExpiredData(dataRemoversIterator, requestsPerSec, listener, isTimedOutSupplier, true); } void deleteExpiredData(Iterator mlDataRemoversIterator, + float requestsPerSecond, ActionListener listener, Supplier isTimedOutSupplier, boolean haveAllPreviousDeletionsCompleted) { if (haveAllPreviousDeletionsCompleted && mlDataRemoversIterator.hasNext()) { MlDataRemover remover = mlDataRemoversIterator.next(); ActionListener nextListener = ActionListener.wrap( - booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener, isTimedOutSupplier, booleanResponse), + booleanResponse -> + deleteExpiredData( + mlDataRemoversIterator, + requestsPerSecond, + listener, + isTimedOutSupplier, + booleanResponse + ), listener::onFailure); // Removing expired ML data and artifacts requires multiple operations. // These are queued up and executed sequentially in the action listener, // the chained calls must all run the ML utility thread pool NOT the thread // the previous action returned in which in the case of a transport_client_boss // thread is a disaster. - remover.remove(new ThreadedActionListener<>(logger, threadPool, executor, nextListener, false), + remover.remove(requestsPerSecond, new ThreadedActionListener<>(logger, threadPool, executor, nextListener, false), isTimedOutSupplier); } else { if (haveAllPreviousDeletionsCompleted) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index 0173f5b27d0d6..322466b23dd6c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -38,11 +38,15 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover { } @Override - public void remove(ActionListener listener, Supplier isTimedOutSupplier) { - removeData(newJobIterator(), listener, isTimedOutSupplier); + public void remove(float requestsPerSecond, + ActionListener listener, + Supplier isTimedOutSupplier) { + removeData(newJobIterator(), requestsPerSecond, listener, isTimedOutSupplier); } - private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener listener, + private void removeData(WrappedBatchedJobsIterator jobIterator, + float requestsPerSecond, + ActionListener listener, Supplier isTimedOutSupplier) { if (jobIterator.hasNext() == false) { listener.onResponse(true); @@ -62,17 +66,17 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener { if (response == null) { - removeData(jobIterator, listener, isTimedOutSupplier); + removeData(jobIterator, requestsPerSecond, listener, isTimedOutSupplier); } else { - removeDataBefore(job, response.latestTimeMs, response.cutoffEpochMs, ActionListener.wrap( - r -> removeData(jobIterator, listener, isTimedOutSupplier), + removeDataBefore(job, requestsPerSecond, response.latestTimeMs, response.cutoffEpochMs, ActionListener.wrap( + r -> removeData(jobIterator, requestsPerSecond, listener, isTimedOutSupplier), listener::onFailure)); } }, @@ -93,7 +97,13 @@ private WrappedBatchedJobsIterator newJobIterator() { * Template method to allow implementation details of various types of data (e.g. results, model snapshots). * Implementors need to call {@code listener.onResponse} when they are done in order to continue to the next job. */ - abstract void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener listener); + abstract void removeDataBefore( + Job job, + float requestsPerSecond, + long latestTimeMs, + long cutoffEpochMs, + ActionListener listener + ); static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) { return QueryBuilders.boolQuery() diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemover.java index 0246e42c778ae..4522ff8b4b66a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemover.java @@ -33,7 +33,7 @@ public EmptyStateIndexRemover(OriginSettingClient client) { } @Override - public void remove(ActionListener listener, Supplier isTimedOutSupplier) { + public void remove(float requestsPerSec, ActionListener listener, Supplier isTimedOutSupplier) { try { if (isTimedOutSupplier.get()) { listener.onResponse(false); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java index 40611438fda59..f4f85dc704360 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java @@ -73,10 +73,10 @@ public ExpiredForecastsRemover(OriginSettingClient client, ThreadPool threadPool } @Override - public void remove(ActionListener listener, Supplier isTimedOutSupplier) { + public void remove(float requestsPerSec, ActionListener listener, Supplier isTimedOutSupplier) { LOGGER.debug("Removing forecasts that expire before [{}]", cutoffEpochMs); ActionListener forecastStatsHandler = ActionListener.wrap( - searchResponse -> deleteForecasts(searchResponse, listener, isTimedOutSupplier), + searchResponse -> deleteForecasts(searchResponse, requestsPerSec, listener, isTimedOutSupplier), e -> listener.onFailure(new ElasticsearchException("An error occurred while searching forecasts to delete", e))); SearchSourceBuilder source = new SearchSourceBuilder(); @@ -95,7 +95,12 @@ public void remove(ActionListener listener, Supplier isTimedOu MachineLearning.UTILITY_THREAD_POOL_NAME, forecastStatsHandler, false)); } - private void deleteForecasts(SearchResponse searchResponse, ActionListener listener, Supplier isTimedOutSupplier) { + private void deleteForecasts( + SearchResponse searchResponse, + float requestsPerSec, + ActionListener listener, + Supplier isTimedOutSupplier + ) { List forecastsToDelete; try { forecastsToDelete = findForecastsToDelete(searchResponse); @@ -109,7 +114,9 @@ private void deleteForecasts(SearchResponse searchResponse, ActionListener() { @Override public void onResponse(BulkByScrollResponse bulkByScrollResponse) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 1bc863c6f5e52..241527325dd82 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -133,7 +133,13 @@ private void latestSnapshotTimeStamp(String jobId, ActionListener listener } @Override - protected void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener listener) { + protected void removeDataBefore( + Job job, + float requestsPerSec, + long latestTimeMs, + long cutoffEpochMs, + ActionListener listener + ) { // TODO: delete this test if we ever allow users to revert a job to no model snapshot, e.g. to recover from data loss if (job.getModelSnapshotId() == null) { // No snapshot to remove diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index b143ce9675783..c5c441560cbee 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -84,9 +84,15 @@ Long getRetentionDays(Job job) { } @Override - protected void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener listener) { + protected void removeDataBefore( + Job job, + float requestsPerSecond, + long latestTimeMs, + long cutoffEpochMs, + ActionListener listener + ) { LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs); - DeleteByQueryRequest request = createDBQRequest(job, cutoffEpochMs); + DeleteByQueryRequest request = createDBQRequest(job, requestsPerSecond, cutoffEpochMs); client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener() { @Override @@ -108,14 +114,14 @@ public void onFailure(Exception e) { }); } - private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) { + DeleteByQueryRequest createDBQRequest(Job job, float requestsPerSec, long cutoffEpochMs) { DeleteByQueryRequest request = new DeleteByQueryRequest(); request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); - // Delete the documents gradually. - // With DEFAULT_SCROLL_SIZE = 1000 this implies we spread deletion of 1 million documents over 5000 seconds ~= 83 minutes. - request.setBatchSize(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE); - request.setRequestsPerSecond(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE / 5); + request.setBatchSize(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) + // We are deleting old data, we should simply proceed as a version conflict could mean that another deletion is taking place + .setAbortOnVersionConflict(false) + .setRequestsPerSecond(requestsPerSec); request.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())); QueryBuilder excludeFilter = QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java index 485d8e9bfa22d..34a5335da8c76 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java @@ -10,5 +10,5 @@ import java.util.function.Supplier; public interface MlDataRemover { - void remove(ActionListener listener, Supplier isTimedOutSupplier); + void remove(float requestsPerSecond, ActionListener listener, Supplier isTimedOutSupplier); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java index 6d0a9de1569db..723902073916d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java @@ -58,14 +58,14 @@ public UnusedStateRemover(OriginSettingClient client, ClusterService clusterServ } @Override - public void remove(ActionListener listener, Supplier isTimedOutSupplier) { + public void remove(float requestsPerSec, ActionListener listener, Supplier isTimedOutSupplier) { try { List unusedStateDocIds = findUnusedStateDocIds(); if (isTimedOutSupplier.get()) { listener.onResponse(false); } else { if (unusedStateDocIds.size() > 0) { - executeDeleteUnusedStateDocs(unusedStateDocIds, listener); + executeDeleteUnusedStateDocs(unusedStateDocIds, requestsPerSec, listener); } else { listener.onResponse(true); } @@ -98,12 +98,12 @@ private List findUnusedStateDocIds() { private Set getJobIds() { Set jobIds = new HashSet<>(); - jobIds.addAll(getAnamalyDetectionJobIds()); + jobIds.addAll(getAnomalyDetectionJobIds()); jobIds.addAll(getDataFrameAnalyticsJobIds()); return jobIds; } - private Set getAnamalyDetectionJobIds() { + private Set getAnomalyDetectionJobIds() { Set jobIds = new HashSet<>(); // TODO Once at 8.0, we can stop searching for jobs in cluster state @@ -130,11 +130,13 @@ private Set getDataFrameAnalyticsJobIds() { return jobIds; } - private void executeDeleteUnusedStateDocs(List unusedDocIds, ActionListener listener) { + private void executeDeleteUnusedStateDocs(List unusedDocIds, float requestsPerSec, ActionListener listener) { LOGGER.info("Found [{}] unused state documents; attempting to delete", unusedDocIds.size()); DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setAbortOnVersionConflict(false) + .setRequestsPerSecond(requestsPerSec) .setQuery(QueryBuilders.idsQuery().addIds(unusedDocIds.toArray(new String[0]))); // _doc is the most efficient sort order and will also disable scoring diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/RestDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/RestDeleteExpiredDataAction.java index e6eb50f8b0410..dca079e3c556e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/RestDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/RestDeleteExpiredDataAction.java @@ -41,7 +41,9 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { - DeleteExpiredDataAction.Request request = new DeleteExpiredDataAction.Request(); + DeleteExpiredDataAction.Request request = restRequest.hasContent() ? + DeleteExpiredDataAction.Request.PARSER.apply(restRequest.contentParser(), null) : + new DeleteExpiredDataAction.Request(); return channel -> client.execute(DeleteExpiredDataAction.INSTANCE, request, new RestToXContentListener<>(channel)); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java index b3718366983b2..c7bc4b50679bd 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.test.ESTestCase; @@ -83,7 +84,7 @@ public void testScheduledTriggeringWhileUpgradeModeIsEnabled() throws Interrupte } private MlDailyMaintenanceService createService(CountDownLatch latch, Client client) { - return new MlDailyMaintenanceService(threadPool, client, clusterService, mlAssignmentNotifier, () -> { + return new MlDailyMaintenanceService(Settings.EMPTY, threadPool, client, clusterService, mlAssignmentNotifier, () -> { latch.countDown(); return TimeValue.timeValueMillis(100); }); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java index 860f461327537..5dba0453124e5 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java @@ -18,7 +18,6 @@ import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -65,31 +64,17 @@ public void testInitialize_noMasterNode() { MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, mlAssignmentNotifier); initializationService.offMaster(); - assertThat(initializationService.getDailyMaintenanceService(), is(nullValue())); - } - - public void testInitialize_alreadyInitialized() { - MlInitializationService initializationService = - new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, mlAssignmentNotifier); - MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class); - initializationService.setDailyMaintenanceService(initialDailyMaintenanceService); - initializationService.onMaster(); - - assertSame(initialDailyMaintenanceService, initializationService.getDailyMaintenanceService()); + assertThat(initializationService.getDailyMaintenanceService().isStarted(), is(false)); } public void testNodeGoesFromMasterToNonMasterAndBack() { - MlInitializationService initializationService = - new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, mlAssignmentNotifier); MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class); - initializationService.setDailyMaintenanceService(initialDailyMaintenanceService); + MlInitializationService initializationService = new MlInitializationService(client, initialDailyMaintenanceService, clusterService); initializationService.offMaster(); verify(initialDailyMaintenanceService).stop(); initializationService.onMaster(); - MlDailyMaintenanceService finalDailyMaintenanceService = initializationService.getDailyMaintenanceService(); - assertNotSame(initialDailyMaintenanceService, finalDailyMaintenanceService); - assertThat(initializationService.getDailyMaintenanceService().isStarted(), is(true)); + verify(initialDailyMaintenanceService).start(); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java index fab6c8ec67ef7..1a7c6d3b1eeb7 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java @@ -41,7 +41,11 @@ public class TransportDeleteExpiredDataActionTests extends ESTestCase { */ private static class DummyDataRemover implements MlDataRemover { - public void remove(ActionListener listener, Supplier isTimedOutSupplier) { + public void remove( + float requestsPerSec, + ActionListener listener, + Supplier isTimedOutSupplier + ) { listener.onResponse(isTimedOutSupplier.get() == false); } } @@ -77,7 +81,7 @@ public void testDeleteExpiredDataIterationNoTimeout() { Supplier isTimedOutSupplier = () -> false; - transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), finalListener, isTimedOutSupplier, true); + transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), 1.0f, finalListener, isTimedOutSupplier, true); assertTrue(succeeded.get()); } @@ -97,7 +101,7 @@ public void testDeleteExpiredDataIterationWithTimeout() { Supplier isTimedOutSupplier = () -> (removersRemaining.getAndDecrement() <= 0); - transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), finalListener, isTimedOutSupplier, true); + transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), 1.0f, finalListener, isTimedOutSupplier, true); assertFalse(succeeded.get()); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index 8dc5bec70fbe5..432d8ffa3e41e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -69,7 +69,13 @@ void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { + protected void removeDataBefore( + Job job, + float requestsPerSec, + long latestTimeMs, + long cutoffEpochMs, + ActionListener listener + ) { listener.onResponse(Boolean.TRUE); } } @@ -118,7 +124,7 @@ public void testRemoveGivenNoJobs() throws IOException { TestListener listener = new TestListener(); ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient); - remover.remove(listener, () -> false); + remover.remove(1.0f,listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); @@ -157,7 +163,7 @@ public void testRemoveGivenMultipleBatches() throws IOException { TestListener listener = new TestListener(); ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient); - remover.remove(listener, () -> false); + remover.remove(1.0f,listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); @@ -181,7 +187,7 @@ public void testRemoveGivenTimeOut() throws IOException { TestListener listener = new TestListener(); ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient); - remover.remove(listener, () -> (attemptsLeft.getAndDecrement() <= 0)); + remover.remove(1.0f,listener, () -> attemptsLeft.getAndDecrement() <= 0); listener.waitToCompletion(); assertThat(listener.success, is(false)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemoverTests.java index 2f250faef1406..cf253cb8e4233 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemoverTests.java @@ -70,7 +70,7 @@ public void verifyNoOtherInteractionsWithMocks() { } public void testRemove_TimedOut() { - remover.remove(listener, () -> true); + remover.remove(1.0f, listener, () -> true); InOrder inOrder = inOrder(client, listener); inOrder.verify(listener).onResponse(false); @@ -81,7 +81,7 @@ public void testRemove_NoStateIndices() { when(indicesStatsResponse.getIndices()).thenReturn(Collections.emptyMap()); doAnswer(withResponse(indicesStatsResponse)).when(client).execute(any(), any(), any()); - remover.remove(listener, () -> false); + remover.remove(1.0f, listener, () -> false); InOrder inOrder = inOrder(client, listener); inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); @@ -98,7 +98,7 @@ public void testRemove_NoEmptyStateIndices() { doReturn(indexStatsMap).when(indicesStatsResponse).getIndices(); doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); - remover.remove(listener, () -> false); + remover.remove(1.0f, listener, () -> false); InOrder inOrder = inOrder(client, listener); inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); @@ -122,7 +122,7 @@ private void assertDeleteActionExecuted(boolean acknowledged) { AcknowledgedResponse deleteIndexResponse = new AcknowledgedResponse(acknowledged); doAnswer(withResponse(deleteIndexResponse)).when(client).execute(eq(DeleteIndexAction.INSTANCE), any(), any()); - remover.remove(listener, () -> false); + remover.remove(1.0f, listener, () -> false); InOrder inOrder = inOrder(client, listener); inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); @@ -150,7 +150,7 @@ public void testRemove_NoIndicesToRemove() { GetIndexResponse getIndexResponse = new GetIndexResponse(new String[] { ".ml-state-a" }, null, null, null, null, null); doAnswer(withResponse(getIndexResponse)).when(client).execute(eq(GetIndexAction.INSTANCE), any(), any()); - remover.remove(listener, () -> false); + remover.remove(1.0f, listener, () -> false); InOrder inOrder = inOrder(client, listener); inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index f48ad5a7e1225..9ee4cb177aafd 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -74,7 +74,7 @@ public void testRemove_GivenJobWithoutActiveSnapshot() throws IOException { givenClientRequestsSucceed(responses); - createExpiredModelSnapshotsRemover().remove(listener, () -> false); + createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); @@ -104,7 +104,7 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList())); givenClientRequestsSucceed(searchResponses); - createExpiredModelSnapshotsRemover().remove(listener, () -> false); + createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); @@ -141,7 +141,7 @@ public void testRemove_GivenTimeout() throws IOException { final int timeoutAfter = randomIntBetween(0, 1); AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); - createExpiredModelSnapshotsRemover().remove(listener, () -> (attemptsLeft.getAndDecrement() <= 0)); + createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> (attemptsLeft.getAndDecrement() <= 0)); listener.waitToCompletion(); assertThat(listener.success, is(false)); @@ -156,7 +156,7 @@ public void testRemove_GivenClientSearchRequestsFail() throws IOException { ))); givenClientSearchRequestsFail(searchResponses); - createExpiredModelSnapshotsRemover().remove(listener, () -> false); + createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(false)); @@ -192,7 +192,7 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_2))); givenClientDeleteModelSnapshotRequestsFail(searchResponses); - createExpiredModelSnapshotsRemover().remove(listener, () -> false); + createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(false)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index b1691baca0c79..92b9e85f661c3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -65,7 +65,7 @@ public void testRemove_GivenNoJobs() throws IOException { givenDBQRequestsSucceed(); AbstractExpiredJobDataRemoverTests.givenJobs(client, Collections.emptyList()); - createExpiredResultsRemover().remove(listener, () -> false); + createExpiredResultsRemover().remove(1.0f, listener, () -> false); verify(client).execute(eq(SearchAction.INSTANCE), any(), any()); verify(listener).onResponse(true); @@ -79,7 +79,7 @@ public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException { JobTests.buildJobBuilder("bar").build() )); - createExpiredResultsRemover().remove(listener, () -> false); + createExpiredResultsRemover().remove(1.0f, listener, () -> false); verify(listener).onResponse(true); verify(client).execute(eq(SearchAction.INSTANCE), any(), any()); @@ -94,7 +94,7 @@ public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() { JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()), new Bucket("id_not_important", new Date(), 60)); - createExpiredResultsRemover().remove(listener, () -> false); + createExpiredResultsRemover().remove(1.0f, listener, () -> false); assertThat(capturedDeleteByQueryRequests.size(), equalTo(2)); DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0); @@ -114,7 +114,7 @@ public void testRemove_GivenTimeout() { final int timeoutAfter = randomIntBetween(0, 1); AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); - createExpiredResultsRemover().remove(listener, () -> (attemptsLeft.getAndDecrement() <= 0)); + createExpiredResultsRemover().remove(1.0f, listener, () -> (attemptsLeft.getAndDecrement() <= 0)); assertThat(capturedDeleteByQueryRequests.size(), equalTo(timeoutAfter)); verify(listener).onResponse(false); @@ -129,7 +129,7 @@ public void testRemove_GivenClientRequestsFailed() { JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()), new Bucket("id_not_important", new Date(), 60)); - createExpiredResultsRemover().remove(listener, () -> false); + createExpiredResultsRemover().remove(1.0f, listener, () -> false); assertThat(capturedDeleteByQueryRequests.size(), equalTo(1)); DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_expired_data.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_expired_data.json index ac38a7a344938..4c55f853a742d 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_expired_data.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_expired_data.json @@ -14,6 +14,9 @@ ] } ] + }, + "body":{ + "description":"deleting expired data parameters" } } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/delete_expired_data.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/delete_expired_data.yml new file mode 100644 index 0000000000000..319c564b26699 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/delete_expired_data.yml @@ -0,0 +1,36 @@ +setup: + - skip: + features: headers + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + ml.put_job: + job_id: delete-expired-data + body: > + { + "job_id": "delete-expired-data", + "description":"Analysis of response time by airline", + "analysis_config" : { + "bucket_span" : "1h", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "field_delimiter":",", + "time_field":"time", + "time_format":"yyyy-MM-dd HH:mm:ssX" + } + } + +--- +"Test delete expired data with no body": + - do: + ml.delete_expired_data: {} + + - match: { deleted: true} +--- +"Test delete expired data with body parameters": + - do: + ml.delete_expired_data: + body: > + { "timeout": "10h", "requests_per_second": 100000.0 } + - match: { deleted: true}