Skip to content

Commit ef80728

Browse files
[ML] Extract base class for integ tests with native processes (#38850)
1 parent 07fd261 commit ef80728

File tree

2 files changed

+158
-128
lines changed

2 files changed

+158
-128
lines changed

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java

Lines changed: 2 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,10 @@
55
*/
66
package org.elasticsearch.xpack.ml.integration;
77

8-
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
98
import org.elasticsearch.action.search.SearchResponse;
109
import org.elasticsearch.action.support.master.AcknowledgedResponse;
11-
import org.elasticsearch.client.Client;
12-
import org.elasticsearch.cluster.ClusterModule;
13-
import org.elasticsearch.cluster.ClusterState;
14-
import org.elasticsearch.cluster.metadata.MetaData;
1510
import org.elasticsearch.common.Strings;
1611
import org.elasticsearch.common.bytes.BytesArray;
17-
import org.elasticsearch.common.io.PathUtils;
18-
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
19-
import org.elasticsearch.common.network.NetworkModule;
20-
import org.elasticsearch.common.settings.Settings;
2112
import org.elasticsearch.common.unit.TimeValue;
2213
import org.elasticsearch.common.xcontent.DeprecationHandler;
2314
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -26,23 +17,10 @@
2617
import org.elasticsearch.common.xcontent.XContentType;
2718
import org.elasticsearch.common.xcontent.json.JsonXContent;
2819
import org.elasticsearch.index.query.QueryBuilders;
29-
import org.elasticsearch.index.reindex.ReindexPlugin;
30-
import org.elasticsearch.persistent.PersistentTaskParams;
31-
import org.elasticsearch.persistent.PersistentTaskState;
32-
import org.elasticsearch.plugins.Plugin;
3320
import org.elasticsearch.search.SearchHit;
3421
import org.elasticsearch.search.SearchHits;
35-
import org.elasticsearch.search.SearchModule;
3622
import org.elasticsearch.search.sort.SortBuilders;
3723
import org.elasticsearch.search.sort.SortOrder;
38-
import org.elasticsearch.test.ESIntegTestCase;
39-
import org.elasticsearch.test.SecuritySettingsSourceField;
40-
import org.elasticsearch.transport.Netty4Plugin;
41-
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
42-
import org.elasticsearch.xpack.core.XPackClientPlugin;
43-
import org.elasticsearch.xpack.core.XPackSettings;
44-
import org.elasticsearch.xpack.core.ml.MlMetadata;
45-
import org.elasticsearch.xpack.core.ml.MlTasks;
4624
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
4725
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
4826
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
@@ -70,10 +48,8 @@
7048
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
7149
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
7250
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
73-
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
7451
import org.elasticsearch.xpack.core.ml.job.config.Job;
7552
import org.elasticsearch.xpack.core.ml.job.config.JobState;
76-
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
7753
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
7854
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
7955
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@@ -85,71 +61,31 @@
8561
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
8662
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
8763
import org.elasticsearch.xpack.core.ml.job.results.Result;
88-
import org.elasticsearch.xpack.core.security.SecurityField;
89-
import org.elasticsearch.xpack.core.security.authc.TokenMetaData;
9064

9165
import java.io.IOException;
92-
import java.net.URISyntaxException;
93-
import java.nio.file.Path;
9466
import java.util.ArrayList;
95-
import java.util.Arrays;
96-
import java.util.Collection;
97-
import java.util.Collections;
9867
import java.util.HashMap;
9968
import java.util.List;
10069
import java.util.Map;
10170
import java.util.concurrent.TimeUnit;
10271
import java.util.function.Function;
10372

104-
import static org.elasticsearch.test.XContentTestUtils.convertToMap;
105-
import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
10673
import static org.hamcrest.Matchers.equalTo;
10774
import static org.hamcrest.Matchers.is;
10875
import static org.hamcrest.Matchers.notNullValue;
10976

11077
/**
11178
* Base class of ML integration tests that use a native autodetect process
11279
*/
113-
abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase {
80+
abstract class MlNativeAutodetectIntegTestCase extends MlNativeIntegTestCase {
11481

11582
private List<Job.Builder> jobs = new ArrayList<>();
11683
private List<DatafeedConfig> datafeeds = new ArrayList<>();
117-
@Override
118-
protected Collection<Class<? extends Plugin>> nodePlugins() {
119-
return Arrays.asList(LocalStateCompositeXPackPlugin.class, Netty4Plugin.class);
120-
}
12184

12285
@Override
123-
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
124-
return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class, ReindexPlugin.class);
125-
}
126-
127-
@Override
128-
protected Settings externalClusterClientSettings() {
129-
Path key;
130-
Path certificate;
131-
try {
132-
key = PathUtils.get(getClass().getResource("/testnode.pem").toURI());
133-
certificate = PathUtils.get(getClass().getResource("/testnode.crt").toURI());
134-
} catch (URISyntaxException e) {
135-
throw new IllegalStateException("error trying to get keystore path", e);
136-
}
137-
Settings.Builder builder = Settings.builder();
138-
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4);
139-
builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING);
140-
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true);
141-
builder.put("xpack.security.transport.ssl.enabled", true);
142-
builder.put("xpack.security.transport.ssl.key", key.toAbsolutePath().toString());
143-
builder.put("xpack.security.transport.ssl.certificate", certificate.toAbsolutePath().toString());
144-
builder.put("xpack.security.transport.ssl.key_passphrase", "testnode");
145-
builder.put("xpack.security.transport.ssl.verification_mode", "certificate");
146-
return builder.build();
147-
}
148-
149-
protected void cleanUp() {
86+
protected void cleanUpResources() {
15087
cleanUpDatafeeds();
15188
cleanUpJobs();
152-
waitForPendingTasks();
15389
}
15490

15591
private void cleanUpDatafeeds() {
@@ -182,18 +118,6 @@ private void cleanUpJobs() {
182118
}
183119
}
184120

185-
private void waitForPendingTasks() {
186-
ListTasksRequest listTasksRequest = new ListTasksRequest();
187-
listTasksRequest.setWaitForCompletion(true);
188-
listTasksRequest.setDetailed(true);
189-
listTasksRequest.setTimeout(TimeValue.timeValueSeconds(10));
190-
try {
191-
admin().cluster().listTasks(listTasksRequest).get();
192-
} catch (Exception e) {
193-
throw new AssertionError("Failed to wait for pending tasks to complete", e);
194-
}
195-
}
196-
197121
protected void registerJob(Job.Builder job) {
198122
if (jobs.add(job) == false) {
199123
throw new IllegalArgumentException("job [" + job.getId() + "] is already registered");
@@ -441,56 +365,6 @@ protected PersistJobAction.Response persistJob(String jobId) {
441365
return client().execute(PersistJobAction.INSTANCE, request).actionGet();
442366
}
443367

444-
@Override
445-
protected void ensureClusterStateConsistency() throws IOException {
446-
if (cluster() != null && cluster().size() > 0) {
447-
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>(ClusterModule.getNamedWriteables());
448-
entries.addAll(new SearchModule(Settings.EMPTY, true, Collections.emptyList()).getNamedWriteables());
449-
entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new));
450-
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME,
451-
StartDatafeedAction.DatafeedParams::new));
452-
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME,
453-
OpenJobAction.JobParams::new));
454-
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new));
455-
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream));
456-
entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TokenMetaData.TYPE, TokenMetaData::new));
457-
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
458-
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
459-
byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState);
460-
// remove local node reference
461-
masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry);
462-
Map<String, Object> masterStateMap = convertToMap(masterClusterState);
463-
int masterClusterStateSize = ClusterState.Builder.toBytes(masterClusterState).length;
464-
String masterId = masterClusterState.nodes().getMasterNodeId();
465-
for (Client client : cluster().getClients()) {
466-
ClusterState localClusterState = client.admin().cluster().prepareState().all().setLocal(true).get().getState();
467-
byte[] localClusterStateBytes = ClusterState.Builder.toBytes(localClusterState);
468-
// remove local node reference
469-
localClusterState = ClusterState.Builder.fromBytes(localClusterStateBytes, null, namedWriteableRegistry);
470-
final Map<String, Object> localStateMap = convertToMap(localClusterState);
471-
final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length;
472-
// Check that the non-master node has the same version of the cluster state as the master and
473-
// that the master node matches the master (otherwise there is no requirement for the cluster state to match)
474-
if (masterClusterState.version() == localClusterState.version() &&
475-
masterId.equals(localClusterState.nodes().getMasterNodeId())) {
476-
try {
477-
assertEquals("clusterstate UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID());
478-
// We cannot compare serialization bytes since serialization order of maps is not guaranteed
479-
// but we can compare serialization sizes - they should be the same
480-
assertEquals("clusterstate size does not match", masterClusterStateSize, localClusterStateSize);
481-
// Compare JSON serialization
482-
assertNull("clusterstate JSON serialization does not match",
483-
differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap));
484-
} catch (AssertionError error) {
485-
logger.error("Cluster state from master:\n{}\nLocal cluster state:\n{}",
486-
masterClusterState.toString(), localClusterState.toString());
487-
throw error;
488-
}
489-
}
490-
}
491-
}
492-
}
493-
494368
protected List<String> generateData(long timestamp, TimeValue bucketSpan, int bucketCount,
495369
Function<Integer, Integer> timeToCountFunction) throws IOException {
496370
List<String> data = new ArrayList<>();
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.integration;
7+
8+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
9+
import org.elasticsearch.client.Client;
10+
import org.elasticsearch.cluster.ClusterModule;
11+
import org.elasticsearch.cluster.ClusterState;
12+
import org.elasticsearch.cluster.metadata.MetaData;
13+
import org.elasticsearch.common.io.PathUtils;
14+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
15+
import org.elasticsearch.common.network.NetworkModule;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.common.unit.TimeValue;
18+
import org.elasticsearch.index.reindex.ReindexPlugin;
19+
import org.elasticsearch.persistent.PersistentTaskParams;
20+
import org.elasticsearch.persistent.PersistentTaskState;
21+
import org.elasticsearch.plugins.Plugin;
22+
import org.elasticsearch.search.SearchModule;
23+
import org.elasticsearch.test.ESIntegTestCase;
24+
import org.elasticsearch.test.SecuritySettingsSourceField;
25+
import org.elasticsearch.transport.Netty4Plugin;
26+
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
27+
import org.elasticsearch.xpack.core.XPackClientPlugin;
28+
import org.elasticsearch.xpack.core.XPackSettings;
29+
import org.elasticsearch.xpack.core.ml.MlMetadata;
30+
import org.elasticsearch.xpack.core.ml.MlTasks;
31+
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
32+
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
33+
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
34+
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
35+
import org.elasticsearch.xpack.core.security.SecurityField;
36+
import org.elasticsearch.xpack.core.security.authc.TokenMetaData;
37+
38+
import java.io.IOException;
39+
import java.net.URISyntaxException;
40+
import java.nio.file.Path;
41+
import java.util.ArrayList;
42+
import java.util.Arrays;
43+
import java.util.Collection;
44+
import java.util.Collections;
45+
import java.util.List;
46+
import java.util.Map;
47+
48+
import static org.elasticsearch.test.XContentTestUtils.convertToMap;
49+
import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
50+
51+
/**
52+
* Base class of ML integration tests that use a native autodetect process
53+
*/
54+
abstract class MlNativeIntegTestCase extends ESIntegTestCase {
55+
56+
@Override
57+
protected Collection<Class<? extends Plugin>> nodePlugins() {
58+
return Arrays.asList(LocalStateCompositeXPackPlugin.class, Netty4Plugin.class);
59+
}
60+
61+
@Override
62+
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
63+
return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class, ReindexPlugin.class);
64+
}
65+
66+
@Override
67+
protected Settings externalClusterClientSettings() {
68+
Path key;
69+
Path certificate;
70+
try {
71+
key = PathUtils.get(getClass().getResource("/testnode.pem").toURI());
72+
certificate = PathUtils.get(getClass().getResource("/testnode.crt").toURI());
73+
} catch (URISyntaxException e) {
74+
throw new IllegalStateException("error trying to get keystore path", e);
75+
}
76+
Settings.Builder builder = Settings.builder();
77+
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4);
78+
builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING);
79+
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true);
80+
builder.put("xpack.security.transport.ssl.enabled", true);
81+
builder.put("xpack.security.transport.ssl.key", key.toAbsolutePath().toString());
82+
builder.put("xpack.security.transport.ssl.certificate", certificate.toAbsolutePath().toString());
83+
builder.put("xpack.security.transport.ssl.key_passphrase", "testnode");
84+
builder.put("xpack.security.transport.ssl.verification_mode", "certificate");
85+
return builder.build();
86+
}
87+
88+
protected void cleanUp() {
89+
cleanUpResources();
90+
waitForPendingTasks();
91+
}
92+
93+
protected abstract void cleanUpResources();
94+
95+
private void waitForPendingTasks() {
96+
ListTasksRequest listTasksRequest = new ListTasksRequest();
97+
listTasksRequest.setWaitForCompletion(true);
98+
listTasksRequest.setDetailed(true);
99+
listTasksRequest.setTimeout(TimeValue.timeValueSeconds(10));
100+
try {
101+
admin().cluster().listTasks(listTasksRequest).get();
102+
} catch (Exception e) {
103+
throw new AssertionError("Failed to wait for pending tasks to complete", e);
104+
}
105+
}
106+
107+
@Override
108+
protected void ensureClusterStateConsistency() throws IOException {
109+
if (cluster() != null && cluster().size() > 0) {
110+
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>(ClusterModule.getNamedWriteables());
111+
entries.addAll(new SearchModule(Settings.EMPTY, true, Collections.emptyList()).getNamedWriteables());
112+
entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new));
113+
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME,
114+
StartDatafeedAction.DatafeedParams::new));
115+
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME,
116+
OpenJobAction.JobParams::new));
117+
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new));
118+
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream));
119+
entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TokenMetaData.TYPE, TokenMetaData::new));
120+
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
121+
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
122+
byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState);
123+
// remove local node reference
124+
masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry);
125+
Map<String, Object> masterStateMap = convertToMap(masterClusterState);
126+
int masterClusterStateSize = ClusterState.Builder.toBytes(masterClusterState).length;
127+
String masterId = masterClusterState.nodes().getMasterNodeId();
128+
for (Client client : cluster().getClients()) {
129+
ClusterState localClusterState = client.admin().cluster().prepareState().all().setLocal(true).get().getState();
130+
byte[] localClusterStateBytes = ClusterState.Builder.toBytes(localClusterState);
131+
// remove local node reference
132+
localClusterState = ClusterState.Builder.fromBytes(localClusterStateBytes, null, namedWriteableRegistry);
133+
final Map<String, Object> localStateMap = convertToMap(localClusterState);
134+
final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length;
135+
// Check that the non-master node has the same version of the cluster state as the master and
136+
// that the master node matches the master (otherwise there is no requirement for the cluster state to match)
137+
if (masterClusterState.version() == localClusterState.version() &&
138+
masterId.equals(localClusterState.nodes().getMasterNodeId())) {
139+
try {
140+
assertEquals("clusterstate UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID());
141+
// We cannot compare serialization bytes since serialization order of maps is not guaranteed
142+
// but we can compare serialization sizes - they should be the same
143+
assertEquals("clusterstate size does not match", masterClusterStateSize, localClusterStateSize);
144+
// Compare JSON serialization
145+
assertNull("clusterstate JSON serialization does not match",
146+
differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap));
147+
} catch (AssertionError error) {
148+
logger.error("Cluster state from master:\n{}\nLocal cluster state:\n{}",
149+
masterClusterState.toString(), localClusterState.toString());
150+
throw error;
151+
}
152+
}
153+
}
154+
}
155+
}
156+
}

0 commit comments

Comments
 (0)