Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into align-dot-prefix-vali…
Browse files Browse the repository at this point in the history
…dation-with-serverless
  • Loading branch information
dakrone committed Nov 5, 2024
2 parents 6a62be7 + 759bb7f commit 5ade6ac
Show file tree
Hide file tree
Showing 16 changed files with 408 additions and 53 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/116128.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 116128
summary: Add num docs and size to logsdb telemetry
area: Logs
type: enhancement
issues: []
14 changes: 14 additions & 0 deletions docs/changelog/116259.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
pr: 116259
summary: Fix `_type` deprecation on simulate pipeline API
area: Ingest Node
type: deprecation
issues: []
deprecation:
title: Document `_type` deprecated on simulate pipeline API
area: REST API
details: >-
Passing a document with a `_type` property is deprecated in the `/_ingest/pipeline/{id}/_simulate` and
`/_ingest/pipeline/_simulate` APIs.
impact: >-
Users should already have stopped using mapping types, which were deprecated in {es} 7. This deprecation warning
will fire if they specify mapping types on documents pass to the simulate pipeline API.
6 changes: 3 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ tests:
- class: org.elasticsearch.xpack.inference.InferenceCrudIT
method: testGet
issue: https://github.com/elastic/elasticsearch/issues/114135
- class: org.elasticsearch.xpack.inference.integration.ModelRegistryIT
method: testGetModel
issue: https://github.com/elastic/elasticsearch/issues/114657
- class: org.elasticsearch.smoketest.DocsClientYamlTestSuiteIT
method: test {yaml=reference/rest-api/usage/line_38}
issue: https://github.com/elastic/elasticsearch/issues/113694
Expand Down Expand Up @@ -282,6 +279,9 @@ tests:
- class: org.elasticsearch.reservedstate.service.FileSettingsServiceTests
method: testProcessFileChanges
issue: https://github.com/elastic/elasticsearch/issues/115280
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=ml/filter_crud/Test update filter}
issue: https://github.com/elastic/elasticsearch/issues/116271

# Examples:
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@

import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.mapper.OnScriptError;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -329,6 +331,10 @@ public void testIndicesMetrics() {
equalTo(0L)
)
);

verifyStatsPerIndexMode(
Map.of(IndexMode.STANDARD, numStandardDocs, IndexMode.LOGSDB, numLogsdbDocs, IndexMode.TIME_SERIES, numTimeSeriesDocs)
);
}

void collectThenAssertMetrics(TestTelemetryPlugin telemetry, int times, Map<String, Matcher<Long>> matchers) {
Expand Down Expand Up @@ -434,6 +440,16 @@ int populateLogsdbIndices(long numIndices) {
return totalDocs;
}

private void verifyStatsPerIndexMode(Map<IndexMode, Long> expectedDocs) {
var nodes = clusterService().state().nodes().stream().toArray(DiscoveryNode[]::new);
var request = new IndexModeStatsActionType.StatsRequest(nodes);
var resp = client().execute(IndexModeStatsActionType.TYPE, request).actionGet();
var stats = resp.stats();
for (Map.Entry<IndexMode, Long> e : expectedDocs.entrySet()) {
assertThat(stats.get(e.getKey()).numDocs(), equalTo(e.getValue()));
}
}

private Map<String, Object> parseMapping(String mapping) throws IOException {
try (XContentParser parser = createParser(JsonXContent.jsonXContent, mapping)) {
return parser.map();
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -470,5 +470,6 @@
org.elasticsearch.serverless.apifiltering;
exports org.elasticsearch.lucene.spatial;
exports org.elasticsearch.inference.configuration;
exports org.elasticsearch.monitor.metrics;

}
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ static TransportVersion def(int id) {
public static final TransportVersion QUERY_RULES_RETRIEVER = def(8_782_00_0);
public static final TransportVersion ESQL_CCS_EXEC_INFO_WITH_FAILURES = def(8_783_00_0);
public static final TransportVersion LOGSDB_TELEMETRY = def(8_784_00_0);
public static final TransportVersion LOGSDB_TELEMETRY_STATS = def(8_785_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@
import org.elasticsearch.injection.guice.AbstractModule;
import org.elasticsearch.injection.guice.TypeLiteral;
import org.elasticsearch.injection.guice.multibindings.MapBinder;
import org.elasticsearch.monitor.metrics.IndexModeStatsActionType;
import org.elasticsearch.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.persistent.RemovePersistentTaskAction;
import org.elasticsearch.persistent.StartPersistentTaskAction;
Expand Down Expand Up @@ -628,6 +629,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(TransportNodesFeaturesAction.TYPE, TransportNodesFeaturesAction.class);
actions.register(RemoteClusterNodesAction.TYPE, RemoteClusterNodesAction.TransportAction.class);
actions.register(TransportNodesStatsAction.TYPE, TransportNodesStatsAction.class);
actions.register(IndexModeStatsActionType.TYPE, IndexModeStatsActionType.TransportAction.class);
actions.register(TransportNodesUsageAction.TYPE, TransportNodesUsageAction.class);
actions.register(TransportNodesHotThreadsAction.TYPE, TransportNodesHotThreadsAction.class);
actions.register(TransportListTasksAction.TYPE, TransportListTasksAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
Expand Down Expand Up @@ -156,6 +157,7 @@ static Parsed parse(Map<String, Object> config, boolean verbose, IngestService i
return new Parsed(pipeline, ingestDocumentList, verbose);
}

@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) // Unconditionally deprecate the _type field once V8 BWC support is removed
private static List<IngestDocument> parseDocs(Map<String, Object> config, RestApiVersion restApiVersion) {
List<Map<String, Object>> docs = ConfigurationUtils.readList(null, null, config, Fields.DOCS);
if (docs.isEmpty()) {
Expand All @@ -172,7 +174,7 @@ private static List<IngestDocument> parseDocs(Map<String, Object> config, RestAp
String index = ConfigurationUtils.readStringOrIntProperty(null, null, dataMap, Metadata.INDEX.getFieldName(), "_index");
String id = ConfigurationUtils.readStringOrIntProperty(null, null, dataMap, Metadata.ID.getFieldName(), "_id");
String routing = ConfigurationUtils.readOptionalStringOrIntProperty(null, null, dataMap, Metadata.ROUTING.getFieldName());
if (restApiVersion == RestApiVersion.V_7 && dataMap.containsKey(Metadata.TYPE.getFieldName())) {
if (restApiVersion != RestApiVersion.V_8 && dataMap.containsKey(Metadata.TYPE.getFieldName())) {
deprecationLogger.compatibleCritical(
"simulate_pipeline_with_types",
"[types removal] specifying _type in pipeline simulation requests is deprecated"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.monitor.metrics;

import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;

public final class IndexModeStatsActionType extends ActionType<IndexModeStatsActionType.StatsResponse> {
public static final IndexModeStatsActionType TYPE = new IndexModeStatsActionType();

private IndexModeStatsActionType() {
super("cluster:monitor/nodes/index_mode_stats");
}

public static final class StatsRequest extends BaseNodesRequest {
public StatsRequest(String[] nodesIds) {
super(nodesIds);
}

public StatsRequest(DiscoveryNode... concreteNodes) {
super(concreteNodes);
}
}

public static final class StatsResponse extends BaseNodesResponse<NodeResponse> {
StatsResponse(ClusterName clusterName, List<NodeResponse> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
assert false : "must be local";
throw new UnsupportedOperationException("must be local");
}

@Override
protected List<NodeResponse> readNodesFrom(StreamInput in) throws IOException {
assert false : "must be local";
throw new UnsupportedOperationException("must be local");
}

@Override
protected void writeNodesTo(StreamOutput out, List<NodeResponse> nodes) throws IOException {
assert false : "must be local";
throw new UnsupportedOperationException("must be local");
}

public Map<IndexMode, IndexStats> stats() {
final Map<IndexMode, IndexStats> stats = new EnumMap<>(IndexMode.class);
for (IndexMode mode : IndexMode.values()) {
stats.put(mode, new IndexStats());
}
for (NodeResponse node : getNodes()) {
for (Map.Entry<IndexMode, IndexStats> e : node.stats.entrySet()) {
stats.get(e.getKey()).add(e.getValue());
}
}
return stats;
}
}

public static final class NodeRequest extends TransportRequest {
NodeRequest() {

}

NodeRequest(StreamInput in) throws IOException {
super(in);
}
}

public static class NodeResponse extends BaseNodeResponse {
private final Map<IndexMode, IndexStats> stats;

NodeResponse(DiscoveryNode node, Map<IndexMode, IndexStats> stats) {
super(node);
this.stats = stats;
}

NodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
super(in, node);
stats = in.readMap(IndexMode::readFrom, IndexStats::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(stats, (o, m) -> IndexMode.writeTo(m, o), (o, s) -> s.writeTo(o));
}
}

public static class TransportAction extends TransportNodesAction<StatsRequest, StatsResponse, NodeRequest, NodeResponse, Void> {
private final IndicesService indicesService;

@Inject
public TransportAction(
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
IndicesService indicesService
) {
super(
TYPE.name(),
clusterService,
transportService,
actionFilters,
NodeRequest::new,
transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT)
);
this.indicesService = indicesService;
}

@Override
protected StatsResponse newResponse(StatsRequest request, List<NodeResponse> nodeResponses, List<FailedNodeException> failures) {
return new StatsResponse(ClusterName.DEFAULT, nodeResponses, failures);
}

@Override
protected NodeRequest newNodeRequest(StatsRequest request) {
return new NodeRequest();
}

@Override
protected NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeResponse(in, node);
}

@Override
protected NodeResponse nodeOperation(NodeRequest request, Task task) {
return new NodeResponse(clusterService.localNode(), IndicesMetrics.getStatsWithoutCache(indicesService));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.monitor.metrics;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.shard.IndexingStats;

import java.io.IOException;

public final class IndexStats implements Writeable {
int numIndices = 0;
long numDocs = 0;
long numBytes = 0;
SearchStats.Stats search = new SearchStats().getTotal();
IndexingStats.Stats indexing = new IndexingStats().getTotal();

IndexStats() {

}

IndexStats(StreamInput in) throws IOException {
this.numIndices = in.readVInt();
this.numDocs = in.readVLong();
this.numBytes = in.readVLong();
this.search = SearchStats.Stats.readStats(in);
this.indexing = new IndexingStats.Stats(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(numIndices);
out.writeVLong(numDocs);
out.writeVLong(numBytes);
search.writeTo(out);
indexing.writeTo(out);
}

void add(IndexStats other) {
this.numIndices += other.numIndices;
this.numDocs += other.numDocs;
this.numBytes += other.numBytes;
this.search.add(other.search);
this.indexing.add(other.indexing);
}

public int numIndices() {
return numIndices;
}

public long numDocs() {
return numDocs;
}

public long numBytes() {
return numBytes;
}
}
Loading

0 comments on commit 5ade6ac

Please sign in to comment.