Skip to content

Commit

Permalink
Use dedicated ML APIs in tests (#30941)
Browse files Browse the repository at this point in the history
ML has dedicated APIs for datafeeds and jobs yet base test classes and
some tests were relying on the cluster state for this state. This commit
removes this usage in favor of using the dedicated endpoints.
  • Loading branch information
jasontedor committed May 30, 2018
1 parent fa126fd commit fa84940
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.elasticsearch.xpack.core.ml.integration;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.test.rest.ESRestTestCase;
Expand Down Expand Up @@ -35,10 +37,12 @@ public void clearMlMetadata() throws IOException {

@SuppressWarnings("unchecked")
private void deleteAllDatafeeds() throws IOException {
Map<String, Object> clusterStateAsMap = testCase.entityAsMap(adminClient.performRequest("GET", "/_cluster/state",
Collections.singletonMap("filter_path", "metadata.ml.datafeeds")));
List<Map<String, Object>> datafeeds =
(List<Map<String, Object>>) XContentMapValues.extractValue("metadata.ml.datafeeds", clusterStateAsMap);
final Request datafeedsRequest = new Request("GET", "/_xpack/ml/datafeeds");
datafeedsRequest.addParameter("filter_path", "datafeeds");
final Response datafeedsResponse = adminClient.performRequest(datafeedsRequest);
@SuppressWarnings("unchecked")
final List<Map<String, Object>> datafeeds =
(List<Map<String, Object>>) XContentMapValues.extractValue("datafeeds", testCase.entityAsMap(datafeedsResponse));
if (datafeeds == null) {
return;
}
Expand Down Expand Up @@ -75,11 +79,12 @@ private void deleteAllDatafeeds() throws IOException {
}

private void deleteAllJobs() throws IOException {
Map<String, Object> clusterStateAsMap = testCase.entityAsMap(adminClient.performRequest("GET", "/_cluster/state",
Collections.singletonMap("filter_path", "metadata.ml.jobs")));
final Request jobsRequest = new Request("GET", "/_xpack/ml/anomaly_detectors");
jobsRequest.addParameter("filter_path", "jobs");
final Response response = adminClient.performRequest(jobsRequest);
@SuppressWarnings("unchecked")
List<Map<String, Object>> jobConfigs =
(List<Map<String, Object>>) XContentMapValues.extractValue("metadata.ml.jobs", clusterStateAsMap);
final List<Map<String, Object>> jobConfigs =
(List<Map<String, Object>>) XContentMapValues.extractValue("jobs", testCase.entityAsMap(response));
if (jobConfigs == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsAction;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.client.MachineLearningClient;
import org.elasticsearch.xpack.ml.LocalStateMachineLearning;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
Expand Down Expand Up @@ -270,7 +274,9 @@ public static GetDatafeedsStatsAction.Response.DatafeedStats getDatafeedStats(St
}

public static void deleteAllDatafeeds(Logger logger, Client client) throws Exception {
MlMetadata mlMetadata = MlMetadata.getMlMetadata(client.admin().cluster().prepareState().get().getState());
final MachineLearningClient mlClient = new MachineLearningClient(client);
final QueryPage<DatafeedConfig> datafeeds =
mlClient.getDatafeeds(new GetDatafeedsAction.Request(GetDatafeedsAction.ALL)).actionGet().getResponse();
try {
logger.info("Closing all datafeeds (using _all)");
StopDatafeedAction.Response stopResponse = client
Expand All @@ -291,25 +297,25 @@ public static void deleteAllDatafeeds(Logger logger, Client client) throws Excep
"Had to resort to force-stopping datafeed, something went wrong?", e1);
}

for (DatafeedConfig datafeed : mlMetadata.getDatafeeds().values()) {
String datafeedId = datafeed.getId();
for (final DatafeedConfig datafeed : datafeeds.results()) {
assertBusy(() -> {
try {
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId);
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeed.getId());
GetDatafeedsStatsAction.Response r = client.execute(GetDatafeedsStatsAction.INSTANCE, request).get();
assertThat(r.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED));
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
DeleteDatafeedAction.Response deleteResponse =
client.execute(DeleteDatafeedAction.INSTANCE, new DeleteDatafeedAction.Request(datafeedId)).get();
client.execute(DeleteDatafeedAction.INSTANCE, new DeleteDatafeedAction.Request(datafeed.getId())).get();
assertTrue(deleteResponse.isAcknowledged());
}
}

public static void deleteAllJobs(Logger logger, Client client) throws Exception {
MlMetadata mlMetadata = MlMetadata.getMlMetadata(client.admin().cluster().prepareState().get().getState());
final MachineLearningClient mlClient = new MachineLearningClient(client);
final QueryPage<Job> jobs = mlClient.getJobs(new GetJobsAction.Request(MetaData.ALL)).actionGet().getResponse();

try {
CloseJobAction.Request closeRequest = new CloseJobAction.Request(MetaData.ALL);
Expand All @@ -333,15 +339,14 @@ public static void deleteAllJobs(Logger logger, Client client) throws Exception
e1);
}

for (Map.Entry<String, Job> entry : mlMetadata.getJobs().entrySet()) {
String jobId = entry.getKey();
for (final Job job : jobs.results()) {
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(jobId)).actionGet();
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(JobState.CLOSED, statsResponse.getResponse().results().get(0).getState());
});
DeleteJobAction.Response response =
client.execute(DeleteJobAction.INSTANCE, new DeleteJobAction.Request(jobId)).get();
client.execute(DeleteJobAction.INSTANCE, new DeleteJobAction.Request(job.getId())).get();
assertTrue(response.isAcknowledged());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,10 +548,9 @@
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
cluster.state:
metric: [ metadata ]
filter_path: metadata.persistent_tasks
- match: {"metadata.persistent_tasks.tasks.0.task.xpack/ml/job.status.state": opened}
xpack.ml.get_job_stats:
job_id: jobs-crud-close-job
- match: {"jobs.0.state": opened}

- do:
xpack.ml.close_job:
Expand All @@ -561,11 +560,9 @@
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
cluster.state:
metric: [ metadata ]
filter_path: metadata.persistent_tasks
- match:
metadata.persistent_tasks.tasks: []
xpack.ml.get_job_stats:
job_id: jobs-crud-close-job
- match: {"jobs.0.state": closed}

---
"Test closing a closed job isn't an error":
Expand Down Expand Up @@ -789,10 +786,9 @@
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
cluster.state:
metric: [ metadata ]
filter_path: metadata.persistent_tasks
- match: {"metadata.persistent_tasks.tasks.0.task.xpack/ml/job.status.state": opened}
xpack.ml.get_job_stats:
job_id: jobs-crud-force-close-job
- match: {"jobs.0.state": opened}

- do:
xpack.ml.close_job:
Expand All @@ -803,11 +799,9 @@
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
cluster.state:
metric: [ metadata ]
filter_path: metadata.persistent_tasks
- match:
metadata.persistent_tasks.tasks: []
xpack.ml.get_job_stats:
job_id: jobs-crud-force-close-job
- match: {"jobs.0.state": closed}

---
"Test force closing a closed job isn't an error":
Expand Down

0 comments on commit fa84940

Please sign in to comment.