Skip to content

Commit 8fed077

Browse files
authored
[ML] relax throttling on expired data cleanup (#56711)
Throttling nightly cleanup as much as we do has been over cautious. Night cleanup should be more lenient in its throttling. We still keep the same batch size, but now the requests per second scale with the number of data nodes. If we have more than 5 data nodes, we don't throttle at all. Additionally, the API now has `requests_per_second` and `timeout` set. So users calling the API directly can set the throttling. This commit also adds a new setting `xpack.ml.nightly_maintenance_requests_per_second`. This will allow users to adjust throttling of the nightly maintenance.
1 parent 79a69cb commit 8fed077

File tree

32 files changed

+577
-181
lines changed

32 files changed

+577
-181
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,13 +167,13 @@ static Request closeJob(CloseJobRequest closeJobRequest) throws IOException {
167167
return request;
168168
}
169169

170-
static Request deleteExpiredData(DeleteExpiredDataRequest deleteExpiredDataRequest) {
170+
static Request deleteExpiredData(DeleteExpiredDataRequest deleteExpiredDataRequest) throws IOException {
171171
String endpoint = new EndpointBuilder()
172172
.addPathPartAsIs("_ml")
173173
.addPathPartAsIs("_delete_expired_data")
174174
.build();
175175
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
176-
176+
request.setEntity(createEntity(deleteExpiredDataRequest, REQUEST_BODY_CONTENT_TYPE));
177177
return request;
178178
}
179179

client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteExpiredDataRequest.java

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,78 @@
1919
package org.elasticsearch.client.ml;
2020

2121
import org.elasticsearch.client.Validatable;
22+
import org.elasticsearch.common.unit.TimeValue;
23+
import org.elasticsearch.common.xcontent.ToXContentObject;
24+
import org.elasticsearch.common.xcontent.XContentBuilder;
25+
26+
import java.io.IOException;
27+
import java.util.Objects;
2228

2329
/**
2430
* Request to delete expired model snapshots and forecasts
2531
*/
26-
public class DeleteExpiredDataRequest implements Validatable {
32+
public class DeleteExpiredDataRequest implements Validatable, ToXContentObject {
2733

34+
static final String REQUESTS_PER_SECOND = "requests_per_second";
35+
static final String TIMEOUT = "timeout";
36+
private final Float requestsPerSecond;
37+
private final TimeValue timeout;
2838
/**
2939
* Create a new request to delete expired data
3040
*/
3141
public DeleteExpiredDataRequest() {
42+
this(null, null);
43+
}
44+
45+
public DeleteExpiredDataRequest(Float requestsPerSecond, TimeValue timeout) {
46+
this.requestsPerSecond = requestsPerSecond;
47+
this.timeout = timeout;
48+
}
49+
50+
/**
51+
* The requests allowed per second in the underlying Delete by Query requests executed.
52+
*
53+
* `-1.0f` indicates that the standard nightly cleanup behavior should be ran.
54+
* Throttling scales according to the number of data nodes.
55+
* `null` is default and means no throttling will occur.
56+
*/
57+
public Float getRequestsPerSecond() {
58+
return requestsPerSecond;
59+
}
60+
61+
/**
62+
* Indicates how long the deletion request will run until it timesout.
63+
*
64+
* Default value is 8 hours.
65+
*/
66+
public TimeValue getTimeout() {
67+
return timeout;
68+
}
69+
70+
@Override
71+
public boolean equals(Object o) {
72+
if (this == o) return true;
73+
if (o == null || getClass() != o.getClass()) return false;
74+
DeleteExpiredDataRequest that = (DeleteExpiredDataRequest) o;
75+
return Objects.equals(requestsPerSecond, that.requestsPerSecond) &&
76+
Objects.equals(timeout, that.timeout);
3277
}
3378

79+
@Override
80+
public int hashCode() {
81+
return Objects.hash(requestsPerSecond, timeout);
82+
}
83+
84+
@Override
85+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
86+
builder.startObject();
87+
if (requestsPerSecond != null) {
88+
builder.field(REQUESTS_PER_SECOND, requestsPerSecond);
89+
}
90+
if (timeout != null) {
91+
builder.field(TIMEOUT, timeout.getStringRep());
92+
}
93+
builder.endObject();
94+
return builder;
95+
}
3496
}

client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,12 +214,16 @@ public void testCloseJob() throws Exception {
214214
requestEntityToString(request));
215215
}
216216

217-
public void testDeleteExpiredData() {
218-
DeleteExpiredDataRequest deleteExpiredDataRequest = new DeleteExpiredDataRequest();
217+
public void testDeleteExpiredData() throws Exception {
218+
float requestsPerSec = randomBoolean() ? -1.0f : (float)randomDoubleBetween(0.0, 100000.0, false);
219+
DeleteExpiredDataRequest deleteExpiredDataRequest = new DeleteExpiredDataRequest(
220+
requestsPerSec,
221+
TimeValue.timeValueHours(1));
219222

220223
Request request = MLRequestConverters.deleteExpiredData(deleteExpiredDataRequest);
221224
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
222225
assertEquals("/_ml/_delete_expired_data", request.getEndpoint());
226+
assertEquals("{\"requests_per_second\":" + requestsPerSec + ",\"timeout\":\"1h\"}", requestEntityToString(request));
223227
}
224228

225229
public void testDeleteJob() {

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2035,7 +2035,11 @@ public void testDeleteExpiredData() throws IOException, InterruptedException {
20352035
MachineLearningIT.buildJob(jobId);
20362036
{
20372037
// tag::delete-expired-data-request
2038-
DeleteExpiredDataRequest request = new DeleteExpiredDataRequest(); // <1>
2038+
DeleteExpiredDataRequest request = new DeleteExpiredDataRequest( // <1>
2039+
1000.0f, // <2>
2040+
TimeValue.timeValueHours(12) // <3>
2041+
);
2042+
20392043
// end::delete-expired-data-request
20402044

20412045
// tag::delete-expired-data-execute
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client.ml;
20+
21+
import org.elasticsearch.common.ParseField;
22+
import org.elasticsearch.common.unit.TimeValue;
23+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
24+
import org.elasticsearch.common.xcontent.ObjectParser;
25+
import org.elasticsearch.common.xcontent.XContentParser;
26+
import org.elasticsearch.test.AbstractXContentTestCase;
27+
28+
import java.io.IOException;
29+
30+
31+
public class DeleteExpiredDataRequestTests extends AbstractXContentTestCase<DeleteExpiredDataRequest> {
32+
33+
private static ConstructingObjectParser<DeleteExpiredDataRequest, Void> PARSER = new ConstructingObjectParser<>(
34+
"delete_expired_data_request",
35+
true,
36+
(a) -> new DeleteExpiredDataRequest((Float) a[0], (TimeValue) a[1])
37+
);
38+
static {
39+
PARSER.declareFloat(ConstructingObjectParser.optionalConstructorArg(),
40+
new ParseField(DeleteExpiredDataRequest.REQUESTS_PER_SECOND));
41+
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
42+
(p, c) -> TimeValue.parseTimeValue(p.text(), DeleteExpiredDataRequest.TIMEOUT),
43+
new ParseField(DeleteExpiredDataRequest.TIMEOUT),
44+
ObjectParser.ValueType.STRING);
45+
}
46+
47+
@Override
48+
protected DeleteExpiredDataRequest createTestInstance() {
49+
return new DeleteExpiredDataRequest(randomBoolean() ? null : randomFloat(),
50+
randomBoolean() ? null : TimeValue.parseTimeValue(randomTimeValue(), "test"));
51+
}
52+
53+
@Override
54+
protected DeleteExpiredDataRequest doParseInstance(XContentParser parser) throws IOException {
55+
return PARSER.apply(parser, null);
56+
}
57+
58+
@Override
59+
protected boolean supportsUnknownFields() {
60+
return true;
61+
}
62+
}

docs/java-rest/high-level/ml/delete-expired-data.asciidoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ A `DeleteExpiredDataRequest` object does not require any arguments.
2121
include-tagged::{doc-tests-file}[{api}-request]
2222
---------------------------------------------------
2323
<1> Constructing a new request.
24+
<2> Providing requests per second throttling for the
25+
deletion processes. Default is no throttling.
26+
<3> Setting how long the deletion processes will be allowed
27+
to run before they are canceled. Default value is `8h` (8 hours).
2428

2529
[id="{upid}-{api}-response"]
2630
==== Delete Expired Data Response

docs/reference/ml/anomaly-detection/apis/delete-expired-data.asciidoc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,17 @@ Deletes all job results, model snapshots and forecast data that have exceeded
2727
their `retention days` period. Machine learning state documents that are not
2828
associated with any job are also deleted.
2929

30+
[[ml-delete-expired-data-request-body]]
31+
==== {api-request-body-title}
32+
33+
`requests_per_second`::
34+
(Optional, float) The desired requests per second for the deletion processes.
35+
The default behavior is no throttling.
36+
37+
`timeout`::
38+
(Optional, string) How long can the underlying delete processes run until they are canceled.
39+
The default value is `8h` (8 hours).
40+
3041
[[ml-delete-expired-data-example]]
3142
==== {api-examples-title}
3243

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.core.ml.action;
77

8+
import org.elasticsearch.Version;
89
import org.elasticsearch.action.ActionRequest;
910
import org.elasticsearch.action.ActionRequestBuilder;
1011
import org.elasticsearch.action.ActionRequestValidationException;
@@ -14,6 +15,8 @@
1415
import org.elasticsearch.common.ParseField;
1516
import org.elasticsearch.common.io.stream.StreamInput;
1617
import org.elasticsearch.common.io.stream.StreamOutput;
18+
import org.elasticsearch.common.unit.TimeValue;
19+
import org.elasticsearch.common.xcontent.ObjectParser;
1720
import org.elasticsearch.common.xcontent.ToXContentObject;
1821
import org.elasticsearch.common.xcontent.XContentBuilder;
1922

@@ -31,20 +34,94 @@ private DeleteExpiredDataAction() {
3134

3235
public static class Request extends ActionRequest {
3336

37+
public static final ParseField REQUESTS_PER_SECOND = new ParseField("requests_per_second");
38+
public static final ParseField TIMEOUT = new ParseField("timeout");
39+
40+
public static final ObjectParser<Request, Void> PARSER = new ObjectParser<>(
41+
"delete_expired_data_request",
42+
false,
43+
Request::new);
44+
45+
static {
46+
PARSER.declareFloat(Request::setRequestsPerSecond, REQUESTS_PER_SECOND);
47+
PARSER.declareString((obj, value) -> obj.setTimeout(TimeValue.parseTimeValue(value, TIMEOUT.getPreferredName())),
48+
TIMEOUT);
49+
}
50+
51+
private Float requestsPerSecond;
52+
private TimeValue timeout;
53+
3454
public Request() {}
3555

56+
public Request(Float requestsPerSecond, TimeValue timeValue) {
57+
this.requestsPerSecond = requestsPerSecond;
58+
this.timeout = timeValue;
59+
}
60+
3661
public Request(StreamInput in) throws IOException {
3762
super(in);
63+
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
64+
this.requestsPerSecond = in.readOptionalFloat();
65+
this.timeout = in.readOptionalTimeValue();
66+
} else {
67+
this.requestsPerSecond = null;
68+
this.timeout = null;
69+
}
70+
}
71+
72+
public Float getRequestsPerSecond() {
73+
return requestsPerSecond;
74+
}
75+
76+
public TimeValue getTimeout() {
77+
return timeout;
78+
}
79+
80+
public Request setRequestsPerSecond(Float requestsPerSecond) {
81+
this.requestsPerSecond = requestsPerSecond;
82+
return this;
83+
}
84+
85+
public Request setTimeout(TimeValue timeout) {
86+
this.timeout = timeout;
87+
return this;
3888
}
3989

4090
@Override
4191
public ActionRequestValidationException validate() {
92+
if (this.requestsPerSecond != null && this.requestsPerSecond != -1.0f && this.requestsPerSecond <= 0) {
93+
ActionRequestValidationException requestValidationException = new ActionRequestValidationException();
94+
requestValidationException.addValidationError("[requests_per_second] must either be -1 or greater than 0");
95+
return requestValidationException;
96+
}
4297
return null;
4398
}
99+
100+
@Override
101+
public boolean equals(Object o) {
102+
if (this == o) return true;
103+
if (o == null || getClass() != o.getClass()) return false;
104+
Request request = (Request) o;
105+
return Objects.equals(requestsPerSecond, request.requestsPerSecond)
106+
&& Objects.equals(timeout, request.timeout);
107+
}
108+
109+
@Override
110+
public int hashCode() {
111+
return Objects.hash(requestsPerSecond, timeout);
112+
}
113+
114+
@Override
115+
public void writeTo(StreamOutput out) throws IOException {
116+
super.writeTo(out);
117+
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
118+
out.writeOptionalFloat(requestsPerSecond);
119+
out.writeOptionalTimeValue(timeout);
120+
}
121+
}
44122
}
45123

46124
static class RequestBuilder extends ActionRequestBuilder<Request, Response> {
47-
48125
RequestBuilder(ElasticsearchClient client, DeleteExpiredDataAction action) {
49126
super(client, action, new Request());
50127
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.core.ml.action;
7+
8+
import org.elasticsearch.Version;
9+
import org.elasticsearch.common.io.stream.Writeable;
10+
import org.elasticsearch.common.unit.TimeValue;
11+
import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase;
12+
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction.Request;
13+
14+
public class DeleteExpiredDataActionRequestTests extends AbstractBWCWireSerializationTestCase<Request> {
15+
16+
@Override
17+
protected Request createTestInstance() {
18+
return new Request(
19+
randomBoolean() ? null : randomFloat(),
20+
randomBoolean() ? null : TimeValue.parseTimeValue(randomTimeValue(), "test")
21+
);
22+
}
23+
24+
@Override
25+
protected Writeable.Reader<Request> instanceReader() {
26+
return Request::new;
27+
}
28+
29+
@Override
30+
protected Request mutateInstanceForVersion(Request instance, Version version) {
31+
if (version.before(Version.V_8_0_0)) {
32+
return new Request();
33+
}
34+
return instance;
35+
}
36+
}

0 commit comments

Comments
 (0)