Skip to content

Commit 549ccf9

Browse files
committed
HLRC: ML Flush job (#33187)
* HLRC: ML Flush job * Fixing package, paths, and test * Addressing comments
1 parent dabea4a commit 549ccf9

File tree

11 files changed

+668
-0
lines changed

11 files changed

+668
-0
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.client.ml.OpenJobRequest;
3434
import org.elasticsearch.client.ml.PutJobRequest;
3535
import org.elasticsearch.common.Strings;
36+
import org.elasticsearch.client.ml.FlushJobRequest;
3637

3738
import java.io.IOException;
3839

@@ -127,6 +128,19 @@ static Request getBuckets(GetBucketsRequest getBucketsRequest) throws IOExceptio
127128
return request;
128129
}
129130

131+
static Request flushJob(FlushJobRequest flushJobRequest) throws IOException {
132+
String endpoint = new EndpointBuilder()
133+
.addPathPartAsIs("_xpack")
134+
.addPathPartAsIs("ml")
135+
.addPathPartAsIs("anomaly_detectors")
136+
.addPathPart(flushJobRequest.getJobId())
137+
.addPathPartAsIs("_flush")
138+
.build();
139+
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
140+
request.setEntity(createEntity(flushJobRequest, REQUEST_BODY_CONTENT_TYPE));
141+
return request;
142+
}
143+
130144
static Request getJobStats(GetJobStatsRequest getJobStatsRequest) {
131145
String endpoint = new EndpointBuilder()
132146
.addPathPartAsIs("_xpack")

client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.elasticsearch.client;
2020

2121
import org.elasticsearch.action.ActionListener;
22+
import org.elasticsearch.client.ml.FlushJobRequest;
23+
import org.elasticsearch.client.ml.FlushJobResponse;
2224
import org.elasticsearch.client.ml.GetJobStatsRequest;
2325
import org.elasticsearch.client.ml.GetJobStatsResponse;
2426
import org.elasticsearch.client.ml.job.stats.JobStats;
@@ -292,6 +294,60 @@ public void getBucketsAsync(GetBucketsRequest request, RequestOptions options, A
292294
}
293295

294296
/**
297+
* Flushes internally buffered data for the given Machine Learning Job ensuring all data sent to the has been processed.
298+
* This may cause new results to be calculated depending on the contents of the buffer
299+
*
300+
* Both flush and close operations are similar,
301+
* however the flush is more efficient if you are expecting to send more data for analysis.
302+
*
303+
* When flushing, the job remains open and is available to continue analyzing data.
304+
* A close operation additionally prunes and persists the model state to disk and the
305+
* job must be opened again before analyzing further data.
306+
*
307+
* <p>
308+
* For additional info
309+
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-flush-job.html">Flush ML job documentation</a>
310+
*
311+
* @param request The {@link FlushJobRequest} object enclosing the `jobId` and additional request options
312+
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
313+
*/
314+
public FlushJobResponse flushJob(FlushJobRequest request, RequestOptions options) throws IOException {
315+
return restHighLevelClient.performRequestAndParseEntity(request,
316+
MLRequestConverters::flushJob,
317+
options,
318+
FlushJobResponse::fromXContent,
319+
Collections.emptySet());
320+
}
321+
322+
/**
323+
* Flushes internally buffered data for the given Machine Learning Job asynchronously ensuring all data sent to the has been processed.
324+
* This may cause new results to be calculated depending on the contents of the buffer
325+
*
326+
* Both flush and close operations are similar,
327+
* however the flush is more efficient if you are expecting to send more data for analysis.
328+
*
329+
* When flushing, the job remains open and is available to continue analyzing data.
330+
* A close operation additionally prunes and persists the model state to disk and the
331+
* job must be opened again before analyzing further data.
332+
*
333+
* <p>
334+
* For additional info
335+
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-flush-job.html">Flush ML job documentation</a>
336+
*
337+
* @param request The {@link FlushJobRequest} object enclosing the `jobId` and additional request options
338+
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
339+
* @param listener Listener to be notified upon request completion
340+
*/
341+
public void flushJobAsync(FlushJobRequest request, RequestOptions options, ActionListener<FlushJobResponse> listener) {
342+
restHighLevelClient.performRequestAsyncAndParseEntity(request,
343+
MLRequestConverters::flushJob,
344+
options,
345+
FlushJobResponse::fromXContent,
346+
listener,
347+
Collections.emptySet());
348+
}
349+
350+
/**
295351
* Gets usage statistics for one or more Machine Learning jobs
296352
*
297353
* <p>
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client.ml;
20+
21+
import org.elasticsearch.action.ActionRequest;
22+
import org.elasticsearch.action.ActionRequestValidationException;
23+
import org.elasticsearch.client.ml.job.config.Job;
24+
import org.elasticsearch.common.ParseField;
25+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
26+
import org.elasticsearch.common.xcontent.ToXContentObject;
27+
import org.elasticsearch.common.xcontent.XContentBuilder;
28+
29+
import java.io.IOException;
30+
import java.util.Objects;
31+
32+
/**
33+
* Request object to flush a given Machine Learning job.
34+
*/
35+
public class FlushJobRequest extends ActionRequest implements ToXContentObject {
36+
37+
public static final ParseField CALC_INTERIM = new ParseField("calc_interim");
38+
public static final ParseField START = new ParseField("start");
39+
public static final ParseField END = new ParseField("end");
40+
public static final ParseField ADVANCE_TIME = new ParseField("advance_time");
41+
public static final ParseField SKIP_TIME = new ParseField("skip_time");
42+
43+
public static final ConstructingObjectParser<FlushJobRequest, Void> PARSER =
44+
new ConstructingObjectParser<>("flush_job_request", (a) -> new FlushJobRequest((String) a[0]));
45+
46+
static {
47+
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
48+
PARSER.declareBoolean(FlushJobRequest::setCalcInterim, CALC_INTERIM);
49+
PARSER.declareString(FlushJobRequest::setStart, START);
50+
PARSER.declareString(FlushJobRequest::setEnd, END);
51+
PARSER.declareString(FlushJobRequest::setAdvanceTime, ADVANCE_TIME);
52+
PARSER.declareString(FlushJobRequest::setSkipTime, SKIP_TIME);
53+
}
54+
55+
private final String jobId;
56+
private Boolean calcInterim;
57+
private String start;
58+
private String end;
59+
private String advanceTime;
60+
private String skipTime;
61+
62+
/**
63+
* Create new Flush job request
64+
*
65+
* @param jobId The job ID of the job to flush
66+
*/
67+
public FlushJobRequest(String jobId) {
68+
this.jobId = jobId;
69+
}
70+
71+
public String getJobId() {
72+
return jobId;
73+
}
74+
75+
public boolean getCalcInterim() {
76+
return calcInterim;
77+
}
78+
79+
/**
80+
* When {@code true} calculates the interim results for the most recent bucket or all buckets within the latency period.
81+
*
82+
* @param calcInterim defaults to {@code false}.
83+
*/
84+
public void setCalcInterim(boolean calcInterim) {
85+
this.calcInterim = calcInterim;
86+
}
87+
88+
public String getStart() {
89+
return start;
90+
}
91+
92+
/**
93+
* When used in conjunction with {@link FlushJobRequest#calcInterim},
94+
* specifies the start of the range of buckets on which to calculate interim results.
95+
*
96+
* @param start the beginning of the range of buckets; may be an epoch seconds, epoch millis or an ISO string
97+
*/
98+
public void setStart(String start) {
99+
this.start = start;
100+
}
101+
102+
public String getEnd() {
103+
return end;
104+
}
105+
106+
/**
107+
* When used in conjunction with {@link FlushJobRequest#calcInterim}, specifies the end of the range
108+
* of buckets on which to calculate interim results
109+
*
110+
* @param end the end of the range of buckets; may be an epoch seconds, epoch millis or an ISO string
111+
*/
112+
public void setEnd(String end) {
113+
this.end = end;
114+
}
115+
116+
public String getAdvanceTime() {
117+
return advanceTime;
118+
}
119+
120+
/**
121+
* Specifies to advance to a particular time value.
122+
* Results are generated and the model is updated for data from the specified time interval.
123+
*
124+
* @param advanceTime String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO string
125+
*/
126+
public void setAdvanceTime(String advanceTime) {
127+
this.advanceTime = advanceTime;
128+
}
129+
130+
public String getSkipTime() {
131+
return skipTime;
132+
}
133+
134+
/**
135+
* Specifies to skip to a particular time value.
136+
* Results are not generated and the model is not updated for data from the specified time interval.
137+
*
138+
* @param skipTime String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO string
139+
*/
140+
public void setSkipTime(String skipTime) {
141+
this.skipTime = skipTime;
142+
}
143+
144+
@Override
145+
public int hashCode() {
146+
return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime);
147+
}
148+
149+
@Override
150+
public boolean equals(Object obj) {
151+
if (this == obj) {
152+
return true;
153+
}
154+
155+
if (obj == null || getClass() != obj.getClass()) {
156+
return false;
157+
}
158+
159+
FlushJobRequest other = (FlushJobRequest) obj;
160+
return Objects.equals(jobId, other.jobId) &&
161+
calcInterim == other.calcInterim &&
162+
Objects.equals(start, other.start) &&
163+
Objects.equals(end, other.end) &&
164+
Objects.equals(advanceTime, other.advanceTime) &&
165+
Objects.equals(skipTime, other.skipTime);
166+
}
167+
168+
@Override
169+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
170+
builder.startObject();
171+
builder.field(Job.ID.getPreferredName(), jobId);
172+
if (calcInterim != null) {
173+
builder.field(CALC_INTERIM.getPreferredName(), calcInterim);
174+
}
175+
if (start != null) {
176+
builder.field(START.getPreferredName(), start);
177+
}
178+
if (end != null) {
179+
builder.field(END.getPreferredName(), end);
180+
}
181+
if (advanceTime != null) {
182+
builder.field(ADVANCE_TIME.getPreferredName(), advanceTime);
183+
}
184+
if (skipTime != null) {
185+
builder.field(SKIP_TIME.getPreferredName(), skipTime);
186+
}
187+
builder.endObject();
188+
return builder;
189+
}
190+
191+
@Override
192+
public ActionRequestValidationException validate() {
193+
return null;
194+
}
195+
}

0 commit comments

Comments
 (0)