Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parse PutPipelineRequest#source earlier #117775

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.logstashbridge.ingest;

import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.logstashbridge.StableBridgeAPI;
import org.elasticsearch.xcontent.XContentType;
Expand All @@ -21,7 +22,12 @@ public PipelineConfigurationBridge(final PipelineConfiguration delegate) {
}

public PipelineConfigurationBridge(final String pipelineId, final String jsonEncodedConfig) {
this(new PipelineConfiguration(pipelineId, new BytesArray(jsonEncodedConfig), XContentType.JSON));
this(
new PipelineConfiguration(
pipelineId,
XContentHelper.convertToMap(new BytesArray(jsonEncodedConfig), true, XContentType.JSON).v2()
)
);
}

public String getId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,20 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.ingest.IngestPipelineTestUtils.jsonPipelineConfiguration;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -47,9 +46,8 @@ public void testHasAtLeastOneGeoipProcessorWhenDownloadDatabaseOnPipelineCreatio
when(metadata.indices()).thenReturn(Map.of("index", indexMetadata));

for (String pipelineConfigJson : getPipelinesWithGeoIpProcessors(false)) {
ingestMetadata[0] = new IngestMetadata(
Map.of("_id1", new PipelineConfiguration("_id1", new BytesArray(pipelineConfigJson), XContentType.JSON))
);
ingestMetadata[0] = new IngestMetadata(Map.of("_id1", jsonPipelineConfiguration("_id1", pipelineConfigJson)));

// The pipeline is not used in any index, expected to return false.
indexSettings[0] = Settings.EMPTY;
assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
Expand All @@ -76,18 +74,14 @@ public void testHasAtLeastOneGeoipProcessor() throws IOException {
{
// Test that hasAtLeastOneGeoipProcessor returns true for any pipeline with a geoip processor:
for (String pipeline : expectHitsInputs) {
ingestMetadata[0] = new IngestMetadata(
Map.of("_id1", new PipelineConfiguration("_id1", new BytesArray(pipeline), XContentType.JSON))
);
ingestMetadata[0] = new IngestMetadata(Map.of("_id1", jsonPipelineConfiguration("_id1", pipeline)));
assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
}
}
{
// Test that hasAtLeastOneGeoipProcessor returns false for any pipeline without a geoip processor:
for (String pipeline : expectMissesInputs) {
ingestMetadata[0] = new IngestMetadata(
Map.of("_id1", new PipelineConfiguration("_id1", new BytesArray(pipeline), XContentType.JSON))
);
ingestMetadata[0] = new IngestMetadata(Map.of("_id1", jsonPipelineConfiguration("_id1", pipeline)));
assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
}
}
Expand All @@ -99,11 +93,11 @@ public void testHasAtLeastOneGeoipProcessor() throws IOException {
Map<String, PipelineConfiguration> configs = new HashMap<>();
for (String pipeline : expectHitsInputs) {
String id = randomAlphaOfLength(20);
configs.put(id, new PipelineConfiguration(id, new BytesArray(pipeline), XContentType.JSON));
configs.put(id, jsonPipelineConfiguration(id, pipeline));
}
for (String pipeline : expectMissesInputs) {
String id = randomAlphaOfLength(20);
configs.put(id, new PipelineConfiguration(id, new BytesArray(pipeline), XContentType.JSON));
configs.put(id, jsonPipelineConfiguration(id, pipeline));
}
ingestMetadata[0] = new IngestMetadata(configs);
assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.action.ingest;

import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;

import java.util.Map;
import java.util.Objects;

public record ParsedPutPipelineRequest(
TimeValue masterNodeTimeout,
String id,
@Nullable Integer version,
Map<String, Object> pipelineConfig
) {

public ParsedPutPipelineRequest {
Objects.requireNonNull(masterNodeTimeout);
Objects.requireNonNull(id);
Objects.requireNonNull(pipelineConfig);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.util.Objects;

public class PutPipelineRequest extends AcknowledgedRequest<PutPipelineRequest> implements ToXContentObject {
public class PutPipelineRequest extends AcknowledgedRequest<PutPipelineRequest> {

private final String id;
private final BytesReference source;
Expand Down Expand Up @@ -90,13 +88,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalInt(version);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (source != null) {
builder.rawValue(source.streamInput(), xContentType);
} else {
builder.startObject().endObject();
}
return builder;
public ParsedPutPipelineRequest parse() {
return new ParsedPutPipelineRequest(
masterNodeTimeout(),
getId(),
getVersion(),
XContentHelper.convertToMap(getSource(), true, getXContentType()).v2()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public PutPipelineTransportAction(
@Override
protected void masterOperation(Task task, PutPipelineRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener)
throws Exception {
ingestService.putPipeline(request, listener, (nodeListener) -> {
ingestService.putPipeline(request.parse(), listener, nodeListener -> {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.clear();
nodesInfoRequest.addMetric(NodesInfoMetrics.Metric.INGEST.metricName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public TransformState transform(Object source, TransformState prevState) throws

ClusterState state = prevState.state();

for (var request : requests) {
for (var rawRequest : requests) {
final var request = rawRequest.parse();
var nopUpdate = IngestService.isNoOpPipelineUpdate(state, request);

if (nopUpdate) {
Expand Down
70 changes: 26 additions & 44 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.ParsedPutPipelineRequest;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
Expand All @@ -46,14 +46,12 @@
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.TriConsumer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
Expand All @@ -72,15 +70,14 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -481,7 +478,7 @@ static List<PipelineConfiguration> innerGetPipelines(IngestMetadata ingestMetada
* Stores the specified pipeline definition in the request.
*/
public void putPipeline(
PutPipelineRequest request,
ParsedPutPipelineRequest request,
ActionListener<AcknowledgedResponse> listener,
Consumer<ActionListener<NodesInfoResponse>> nodeInfoListener
) throws Exception {
Expand All @@ -495,36 +492,27 @@ public void putPipeline(
validatePipelineRequest(request, nodeInfos);

taskQueue.submitTask(
"put-pipeline-" + request.getId(),
"put-pipeline-" + request.id(),
new PutPipelineClusterStateUpdateTask(l, request),
request.masterNodeTimeout()
);
}));
}

public void validatePipelineRequest(PutPipelineRequest request, NodesInfoResponse nodeInfos) throws Exception {
final Map<String, Object> config = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
public void validatePipelineRequest(ParsedPutPipelineRequest request, NodesInfoResponse nodeInfos) throws Exception {
Map<DiscoveryNode, IngestInfo> ingestInfos = new HashMap<>();
for (NodeInfo nodeInfo : nodeInfos.getNodes()) {
ingestInfos.put(nodeInfo.getNode(), nodeInfo.getInfo(IngestInfo.class));
}

validatePipeline(ingestInfos, request.getId(), config);
validatePipeline(ingestInfos, request.id(), request.pipelineConfig());
}

public static boolean isNoOpPipelineUpdate(ClusterState state, PutPipelineRequest request) {
public static boolean isNoOpPipelineUpdate(ClusterState state, ParsedPutPipelineRequest request) {
IngestMetadata currentIngestMetadata = state.metadata().custom(IngestMetadata.TYPE);
if (request.getVersion() == null
return request.version() == null
&& currentIngestMetadata != null
&& currentIngestMetadata.getPipelines().containsKey(request.getId())) {
var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId());
if (currentPipeline.getConfig().equals(pipelineConfig)) {
return true;
}
}

return false;
&& currentIngestMetadata.getPipelines().containsKey(request.id())
&& currentIngestMetadata.getPipelines().get(request.id()).getConfig().equals(request.pipelineConfig());
}

/**
Expand Down Expand Up @@ -609,71 +597,65 @@ private static void collectProcessorMetrics(
* Used in this class and externally by the {@link org.elasticsearch.action.ingest.ReservedPipelineAction}
*/
public static class PutPipelineClusterStateUpdateTask extends PipelineClusterStateUpdateTask {
private final PutPipelineRequest request;
private final ParsedPutPipelineRequest request;

PutPipelineClusterStateUpdateTask(ActionListener<AcknowledgedResponse> listener, PutPipelineRequest request) {
PutPipelineClusterStateUpdateTask(ActionListener<AcknowledgedResponse> listener, ParsedPutPipelineRequest request) {
super(listener);
this.request = request;
}

/**
* Used by {@link org.elasticsearch.action.ingest.ReservedPipelineAction}
*/
public PutPipelineClusterStateUpdateTask(PutPipelineRequest request) {
public PutPipelineClusterStateUpdateTask(ParsedPutPipelineRequest request) {
this(null, request);
}

@Override
public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<IndexMetadata> allIndexMetadata) {
BytesReference pipelineSource = request.getSource();
if (request.getVersion() != null) {
var currentPipeline = currentIngestMetadata != null ? currentIngestMetadata.getPipelines().get(request.getId()) : null;
var pipelineConfig = request.pipelineConfig();
if (request.version() != null) {
var currentPipeline = currentIngestMetadata != null ? currentIngestMetadata.getPipelines().get(request.id()) : null;
if (currentPipeline == null) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"version conflict, required version [%s] for pipeline [%s] but no pipeline was found",
request.getVersion(),
request.getId()
request.version(),
request.id()
)
);
}

final Integer currentVersion = currentPipeline.getVersion();
if (Objects.equals(request.getVersion(), currentVersion) == false) {
if (Objects.equals(request.version(), currentVersion) == false) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"version conflict, required version [%s] for pipeline [%s] but current version is [%s]",
request.getVersion(),
request.getId(),
request.version(),
request.id(),
currentVersion
)
);
}

var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
final Integer specifiedVersion = (Integer) pipelineConfig.get("version");
if (pipelineConfig.containsKey("version") && Objects.equals(specifiedVersion, currentVersion)) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"cannot update pipeline [%s] with the same version [%s]",
request.getId(),
request.getVersion()
request.id(),
request.version()
)
);
}

// if no version specified in the pipeline definition, inject a version of [request.getVersion() + 1]
if (specifiedVersion == null) {
pipelineConfig.put("version", request.getVersion() == null ? 1 : request.getVersion() + 1);
try {
var builder = XContentBuilder.builder(request.getXContentType().xContent()).map(pipelineConfig);
pipelineSource = BytesReference.bytes(builder);
} catch (IOException e) {
throw new IllegalStateException(e);
}
pipelineConfig = new LinkedHashMap<>(pipelineConfig); // shallow copy
pipelineConfig.put("version", request.version() == null ? 1 : request.version() + 1);
}
}

Expand All @@ -684,7 +666,7 @@ public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<I
pipelines = new HashMap<>();
}

pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), pipelineSource, request.getXContentType()));
pipelines.put(request.id(), new PipelineConfiguration(request.id(), pipelineConfig));
return new IngestMetadata(pipelines);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,6 @@ public PipelineConfiguration(String id, Map<String, Object> config) {
this.config = deepCopy(config, true); // defensive deep copy
}

/**
* A convenience constructor that parses some bytes as a map representing a pipeline's config and then delegates to the
* conventional {@link #PipelineConfiguration(String, Map)} constructor.
*
* @param id the id of the pipeline
* @param config a parse-able bytes reference that will return a pipeline configuration
* @param xContentType the content-type to use while parsing the pipeline configuration
*/
public PipelineConfiguration(String id, BytesReference config, XContentType xContentType) {
this(id, XContentHelper.convertToMap(config, true, xContentType).v2());
}

public String getId() {
return id;
}
Expand Down
Loading