Skip to content

Commit 8871ccc

Browse files
committed
Adding node_count to ML Usage (#33850) (#33863)
1 parent 8cd563b commit 8871ccc

File tree

3 files changed

+124
-11
lines changed

3 files changed

+124
-11
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.core.ml;
77

8+
import org.elasticsearch.Version;
89
import org.elasticsearch.common.io.stream.StreamInput;
910
import org.elasticsearch.common.io.stream.StreamOutput;
1011
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -24,28 +25,39 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage {
2425
public static final String DETECTORS = "detectors";
2526
public static final String FORECASTS = "forecasts";
2627
public static final String MODEL_SIZE = "model_size";
28+
public static final String NODE_COUNT = "node_count";
2729

2830
private final Map<String, Object> jobsUsage;
2931
private final Map<String, Object> datafeedsUsage;
32+
private final int nodeCount;
3033

3134
public MachineLearningFeatureSetUsage(boolean available, boolean enabled, Map<String, Object> jobsUsage,
32-
Map<String, Object> datafeedsUsage) {
35+
Map<String, Object> datafeedsUsage, int nodeCount) {
3336
super(XPackField.MACHINE_LEARNING, available, enabled);
3437
this.jobsUsage = Objects.requireNonNull(jobsUsage);
3538
this.datafeedsUsage = Objects.requireNonNull(datafeedsUsage);
39+
this.nodeCount = nodeCount;
3640
}
3741

3842
public MachineLearningFeatureSetUsage(StreamInput in) throws IOException {
3943
super(in);
4044
this.jobsUsage = in.readMap();
4145
this.datafeedsUsage = in.readMap();
46+
if (in.getVersion().onOrAfter(Version.V_6_5_0)) {
47+
this.nodeCount = in.readInt();
48+
} else {
49+
this.nodeCount = -1;
50+
}
4251
}
4352

4453
@Override
4554
public void writeTo(StreamOutput out) throws IOException {
4655
super.writeTo(out);
4756
out.writeMap(jobsUsage);
4857
out.writeMap(datafeedsUsage);
58+
if (out.getVersion().onOrAfter(Version.V_6_5_0)) {
59+
out.writeInt(nodeCount);
60+
}
4961
}
5062

5163
@Override
@@ -57,6 +69,9 @@ protected void innerXContent(XContentBuilder builder, Params params) throws IOEx
5769
if (datafeedsUsage != null) {
5870
builder.field(DATAFEEDS_FIELD, datafeedsUsage);
5971
}
72+
if (nodeCount >= 0) {
73+
builder.field(NODE_COUNT, nodeCount);
74+
}
6075
}
6176

6277
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.client.Client;
1313
import org.elasticsearch.cluster.ClusterState;
1414
import org.elasticsearch.cluster.metadata.MetaData;
15+
import org.elasticsearch.cluster.node.DiscoveryNode;
1516
import org.elasticsearch.cluster.service.ClusterService;
1617
import org.elasticsearch.common.Nullable;
1718
import org.elasticsearch.common.inject.Inject;
@@ -134,7 +135,22 @@ public Map<String, Object> nativeCodeInfo() {
134135
@Override
135136
public void usage(ActionListener<XPackFeatureSet.Usage> listener) {
136137
ClusterState state = clusterService.state();
137-
new Retriever(client, MlMetadata.getMlMetadata(state), available(), enabled()).execute(listener);
138+
new Retriever(client, MlMetadata.getMlMetadata(state), available(), enabled(), mlNodeCount(state)).execute(listener);
139+
}
140+
141+
private int mlNodeCount(final ClusterState clusterState) {
142+
if (enabled == false) {
143+
return 0;
144+
}
145+
146+
int mlNodeCount = 0;
147+
for (DiscoveryNode node : clusterState.getNodes()) {
148+
String enabled = node.getAttributes().get(MachineLearning.ML_ENABLED_NODE_ATTR);
149+
if (Boolean.parseBoolean(enabled)) {
150+
++mlNodeCount;
151+
}
152+
}
153+
return mlNodeCount;
138154
}
139155

140156
public static class Retriever {
@@ -145,19 +161,22 @@ public static class Retriever {
145161
private final boolean enabled;
146162
private Map<String, Object> jobsUsage;
147163
private Map<String, Object> datafeedsUsage;
164+
private int nodeCount;
148165

149-
public Retriever(Client client, MlMetadata mlMetadata, boolean available, boolean enabled) {
166+
public Retriever(Client client, MlMetadata mlMetadata, boolean available, boolean enabled, int nodeCount) {
150167
this.client = Objects.requireNonNull(client);
151168
this.mlMetadata = mlMetadata;
152169
this.available = available;
153170
this.enabled = enabled;
154171
this.jobsUsage = new LinkedHashMap<>();
155172
this.datafeedsUsage = new LinkedHashMap<>();
173+
this.nodeCount = nodeCount;
156174
}
157175

158176
public void execute(ActionListener<Usage> listener) {
159177
if (enabled == false) {
160-
listener.onResponse(new MachineLearningFeatureSetUsage(available, enabled, Collections.emptyMap(), Collections.emptyMap()));
178+
listener.onResponse(
179+
new MachineLearningFeatureSetUsage(available, enabled, Collections.emptyMap(), Collections.emptyMap(), 0));
161180
return;
162181
}
163182

@@ -166,11 +185,9 @@ public void execute(ActionListener<Usage> listener) {
166185
ActionListener.wrap(response -> {
167186
addDatafeedsUsage(response);
168187
listener.onResponse(new MachineLearningFeatureSetUsage(
169-
available, enabled, jobsUsage, datafeedsUsage));
188+
available, enabled, jobsUsage, datafeedsUsage, nodeCount));
170189
},
171-
error -> {
172-
listener.onFailure(error);
173-
}
190+
listener::onFailure
174191
);
175192

176193
// Step 1. Extract usage from jobs stats and then request stats for all datafeeds
@@ -183,9 +200,7 @@ public void execute(ActionListener<Usage> listener) {
183200
client.execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest,
184201
datafeedStatsListener);
185202
},
186-
error -> {
187-
listener.onFailure(error);
188-
}
203+
listener::onFailure
189204
);
190205

191206
// Step 0. Kick off the chain of callbacks by requesting jobs stats

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,19 @@
66
package org.elasticsearch.xpack.ml;
77

88
import org.elasticsearch.ElasticsearchException;
9+
import org.elasticsearch.Version;
910
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.action.support.PlainActionFuture;
1112
import org.elasticsearch.client.Client;
1213
import org.elasticsearch.cluster.ClusterState;
1314
import org.elasticsearch.cluster.metadata.MetaData;
15+
import org.elasticsearch.cluster.node.DiscoveryNode;
16+
import org.elasticsearch.cluster.node.DiscoveryNodes;
1417
import org.elasticsearch.cluster.service.ClusterService;
1518
import org.elasticsearch.common.io.stream.BytesStreamOutput;
19+
import org.elasticsearch.common.io.stream.StreamInput;
1620
import org.elasticsearch.common.settings.Settings;
21+
import org.elasticsearch.common.transport.TransportAddress;
1722
import org.elasticsearch.common.xcontent.ToXContent;
1823
import org.elasticsearch.common.xcontent.XContentBuilder;
1924
import org.elasticsearch.common.xcontent.XContentFactory;
@@ -46,7 +51,11 @@
4651
import java.util.Arrays;
4752
import java.util.Collections;
4853
import java.util.Date;
54+
import java.util.HashMap;
55+
import java.util.HashSet;
4956
import java.util.List;
57+
import java.util.Map;
58+
import java.util.Set;
5059

5160
import static org.hamcrest.Matchers.equalTo;
5261
import static org.hamcrest.Matchers.notNullValue;
@@ -223,6 +232,49 @@ public void testUsage() throws Exception {
223232
}
224233
}
225234

235+
public void testNodeCount() throws Exception {
236+
when(licenseState.isMachineLearningAllowed()).thenReturn(true);
237+
int nodeCount = randomIntBetween(1, 3);
238+
givenNodeCount(nodeCount);
239+
Settings.Builder settings = Settings.builder().put(commonSettings);
240+
settings.put("xpack.ml.enabled", true);
241+
MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(TestEnvironment.newEnvironment(settings.build()),
242+
clusterService, client, licenseState);
243+
244+
PlainActionFuture<Usage> future = new PlainActionFuture<>();
245+
featureSet.usage(future);
246+
XPackFeatureSet.Usage usage = future.get();
247+
248+
assertThat(usage.available(), is(true));
249+
assertThat(usage.enabled(), is(true));
250+
251+
BytesStreamOutput out = new BytesStreamOutput();
252+
usage.writeTo(out);
253+
XPackFeatureSet.Usage serializedUsage = new MachineLearningFeatureSetUsage(out.bytes().streamInput());
254+
255+
XContentSource source;
256+
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
257+
serializedUsage.toXContent(builder, ToXContent.EMPTY_PARAMS);
258+
source = new XContentSource(builder);
259+
}
260+
assertThat(source.getValue("node_count"), equalTo(nodeCount));
261+
262+
BytesStreamOutput oldOut = new BytesStreamOutput();
263+
oldOut.setVersion(Version.V_6_0_0);
264+
usage.writeTo(oldOut);
265+
StreamInput oldInput = oldOut.bytes().streamInput();
266+
oldInput.setVersion(Version.V_6_0_0);
267+
XPackFeatureSet.Usage oldSerializedUsage = new MachineLearningFeatureSetUsage(oldInput);
268+
269+
XContentSource oldSource;
270+
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
271+
oldSerializedUsage.toXContent(builder, ToXContent.EMPTY_PARAMS);
272+
oldSource = new XContentSource(builder);
273+
}
274+
275+
assertNull(oldSource.getValue("node_count"));
276+
}
277+
226278
public void testUsageGivenMlMetadataNotInstalled() throws Exception {
227279
when(licenseState.isMachineLearningAllowed()).thenReturn(true);
228280
Settings.Builder settings = Settings.builder().put(commonSettings);
@@ -286,6 +338,37 @@ private void givenJobs(List<Job> jobs, List<GetJobsStatsAction.Response.JobStats
286338
}).when(client).execute(same(GetJobsStatsAction.INSTANCE), any(), any());
287339
}
288340

341+
private void givenNodeCount(int nodeCount) {
342+
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
343+
for (int i = 0; i < nodeCount; i++) {
344+
Map<String, String> attrs = new HashMap<>();
345+
attrs.put(MachineLearning.ML_ENABLED_NODE_ATTR, Boolean.toString(true));
346+
Set<DiscoveryNode.Role> roles = new HashSet<>();
347+
roles.add(DiscoveryNode.Role.DATA);
348+
roles.add(DiscoveryNode.Role.MASTER);
349+
roles.add(DiscoveryNode.Role.INGEST);
350+
nodesBuilder.add(new DiscoveryNode(randomAlphaOfLength(i+1),
351+
new TransportAddress(TransportAddress.META_ADDRESS, 9100 + i),
352+
attrs,
353+
roles,
354+
Version.CURRENT));
355+
}
356+
for (int i = 0; i < randomIntBetween(1, 3); i++) {
357+
Map<String, String> attrs = new HashMap<>();
358+
Set<DiscoveryNode.Role> roles = new HashSet<>();
359+
roles.add(DiscoveryNode.Role.DATA);
360+
roles.add(DiscoveryNode.Role.MASTER);
361+
roles.add(DiscoveryNode.Role.INGEST);
362+
nodesBuilder.add(new DiscoveryNode(randomAlphaOfLength(i+1),
363+
new TransportAddress(TransportAddress.META_ADDRESS, 9300 + i),
364+
attrs,
365+
roles,
366+
Version.CURRENT));
367+
}
368+
ClusterState clusterState = new ClusterState.Builder(ClusterState.EMPTY_STATE).nodes(nodesBuilder.build()).build();
369+
when(clusterService.state()).thenReturn(clusterState);
370+
}
371+
289372
private void givenDatafeeds(List<GetDatafeedsStatsAction.Response.DatafeedStats> datafeedStats) {
290373
doAnswer(invocationOnMock -> {
291374
ActionListener<GetDatafeedsStatsAction.Response> listener =

0 commit comments

Comments
 (0)