From c8ad04c8c7bf1ca1a22ed25cdc44eb891849bd66 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 19 Nov 2024 16:42:07 -0500 Subject: [PATCH] Optimize PipelineConfiguration-checking ClusterStateListeners (#117038) --- .../ingest/PipelineConfigurationBridge.java | 8 +- .../geoip/GeoIpDownloaderTaskExecutor.java | 2 +- .../org/elasticsearch/TransportVersions.java | 1 + .../action/ingest/GetPipelineResponse.java | 2 +- .../MetadataIndexTemplateService.java | 2 +- .../elasticsearch/ingest/IngestService.java | 6 +- .../ingest/PipelineConfiguration.java | 132 +++++++++++++----- .../ingest/GetPipelineResponseTests.java | 2 +- .../ingest/IngestMetadataTests.java | 4 +- .../ingest/PipelineConfigurationTests.java | 51 +++++-- .../InferenceProcessorInfoExtractor.java | 8 +- .../template/IndexTemplateRegistryTests.java | 2 +- .../enrich/EnrichPolicyReindexPipeline.java | 2 +- .../loadingservice/ModelLoadingService.java | 2 +- .../LegacyStackTemplateRegistryTests.java | 2 +- .../stack/StackTemplateRegistryTests.java | 2 +- 16 files changed, 163 insertions(+), 65 deletions(-) diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java index e146b06fe3f53..cb90d10665659 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java @@ -28,8 +28,12 @@ public String getId() { return delegate.getId(); } - public Map getConfigAsMap() { - return delegate.getConfigAsMap(); + public Map getConfig() { + return delegate.getConfig(); + } + + public Map getConfig(final boolean unmodifiable) { + return delegate.getConfig(unmodifiable); } @Override diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java index 24ed2ba73c0af..f245b0cfd2405 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -268,7 +268,7 @@ private static Set pipelinesWithGeoIpProcessor(ClusterState clusterState Set ids = new HashSet<>(); // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph for (PipelineConfiguration configuration : configurations) { - List> processors = (List>) configuration.getConfigAsMap().get(Pipeline.PROCESSORS_KEY); + List> processors = (List>) configuration.getConfig().get(Pipeline.PROCESSORS_KEY); if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation)) { ids.add(configuration.getId()); } diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 824a49a4372f5..6a2d2457e4a66 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -201,6 +201,7 @@ static TransportVersion def(int id) { public static final TransportVersion QUERY_RULES_LIST_INCLUDES_TYPES = def(8_792_00_0); public static final TransportVersion INDEX_STATS_ADDITIONAL_FIELDS = def(8_793_00_0); public static final TransportVersion INDEX_STATS_ADDITIONAL_FIELDS_REVERT = def(8_794_00_0); + public static final TransportVersion INGEST_PIPELINE_CONFIGURATION_AS_MAP = def(8_797_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java b/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java index 3ed1dfef50053..760b87af49a78 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java @@ -80,7 +80,7 @@ public RestStatus status() { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); for (PipelineConfiguration pipeline : pipelines) { - builder.field(pipeline.getId(), summary ? Map.of() : pipeline.getConfigAsMap()); + builder.field(pipeline.getId(), summary ? Map.of() : pipeline.getConfig()); } builder.endObject(); return builder; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java index d6ed28454df96..3878a3329b634 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java @@ -782,7 +782,7 @@ private void validateUseOfDeprecatedIngestPipelines(String name, IngestMetadata private void emitWarningIfPipelineIsDeprecated(String name, Map pipelines, String pipelineName) { Optional.ofNullable(pipelineName) .map(pipelines::get) - .filter(p -> Boolean.TRUE.equals(p.getConfigAsMap().get("deprecated"))) + .filter(p -> Boolean.TRUE.equals(p.getConfig().get("deprecated"))) .ifPresent( p -> deprecationLogger.warn( DeprecationCategory.TEMPLATES, diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index c891dd21f0753..a2a0ad48cdc73 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -523,7 +523,7 @@ public static boolean isNoOpPipelineUpdate(ClusterState state, PutPipelineReques && currentIngestMetadata.getPipelines().containsKey(request.getId())) { var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId()); - if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) { + if (currentPipeline.getConfig().equals(pipelineConfig)) { return true; } } @@ -1287,7 +1287,7 @@ synchronized void innerUpdatePipelines(IngestMetadata newIngestMetadata) { try { Pipeline newPipeline = Pipeline.create( newConfiguration.getId(), - newConfiguration.getConfigAsMap(), + newConfiguration.getConfig(false), processorFactories, scriptService ); @@ -1411,7 +1411,7 @@ public

Collection getPipelineWithProcessorType(Cla public synchronized void reloadPipeline(String id) throws Exception { PipelineHolder holder = pipelines.get(id); - Pipeline updatedPipeline = Pipeline.create(id, holder.configuration.getConfigAsMap(), processorFactories, scriptService); + Pipeline updatedPipeline = Pipeline.create(id, holder.configuration.getConfig(false), processorFactories, scriptService); Map updatedPipelines = new HashMap<>(this.pipelines); updatedPipelines.put(id, new PipelineHolder(holder.configuration, updatedPipeline)); this.pipelines = Map.copyOf(updatedPipelines); diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java index 7406ee8837264..9f3f3aaba62fc 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java @@ -9,12 +9,14 @@ package org.elasticsearch.ingest; +import org.elasticsearch.TransportVersions; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.SimpleDiffable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.xcontent.ContextParser; import org.elasticsearch.xcontent.ObjectParser; @@ -22,25 +24,32 @@ import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xcontent.json.JsonXContent; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Objects; /** - * Encapsulates a pipeline's id and configuration as a blob + * Encapsulates a pipeline's id and configuration as a loosely typed map -- see {@link Pipeline} for the + * parsed and processed object(s) that a pipeline configuration will become. This class is used for things + * like keeping track of pipelines in the cluster state (where a pipeline is 'just some json') whereas the + * {@link Pipeline} class is used in the actual processing of ingest documents through pipelines in the + * {@link IngestService}. */ public final class PipelineConfiguration implements SimpleDiffable, ToXContentObject { private static final ObjectParser PARSER = new ObjectParser<>("pipeline_config", true, Builder::new); static { PARSER.declareString(Builder::setId, new ParseField("id")); - PARSER.declareField((parser, builder, aVoid) -> { - XContentBuilder contentBuilder = XContentBuilder.builder(parser.contentType().xContent()); - contentBuilder.generator().copyCurrentStructure(parser); - builder.setConfig(BytesReference.bytes(contentBuilder), contentBuilder.contentType()); - }, new ParseField("config"), ObjectParser.ValueType.OBJECT); - + PARSER.declareField( + (parser, builder, aVoid) -> builder.setConfig(parser.map()), + new ParseField("config"), + ObjectParser.ValueType.OBJECT + ); } public static ContextParser getParser() { @@ -50,56 +59,94 @@ public static ContextParser getParser() { private static class Builder { private String id; - private BytesReference config; - private XContentType xContentType; + private Map config; void setId(String id) { this.id = id; } - void setConfig(BytesReference config, XContentType xContentType) { + void setConfig(Map config) { this.config = config; - this.xContentType = xContentType; } PipelineConfiguration build() { - return new PipelineConfiguration(id, config, xContentType); + return new PipelineConfiguration(id, config); } } private final String id; - // Store config as bytes reference, because the config is only used when the pipeline store reads the cluster state - // and the way the map of maps config is read requires a deep copy (it removes instead of gets entries to check for unused options) - // also the get pipeline api just directly returns this to the caller - private final BytesReference config; - private final XContentType xContentType; + private final Map config; - public PipelineConfiguration(String id, BytesReference config, XContentType xContentType) { + public PipelineConfiguration(String id, Map config) { this.id = Objects.requireNonNull(id); - this.config = Objects.requireNonNull(config); - this.xContentType = Objects.requireNonNull(xContentType); + this.config = deepCopy(config, true); // defensive deep copy + } + + /** + * A convenience constructor that parses some bytes as a map representing a pipeline's config and then delegates to the + * conventional {@link #PipelineConfiguration(String, Map)} constructor. + * + * @param id the id of the pipeline + * @param config a parse-able bytes reference that will return a pipeline configuration + * @param xContentType the content-type to use while parsing the pipeline configuration + */ + public PipelineConfiguration(String id, BytesReference config, XContentType xContentType) { + this(id, XContentHelper.convertToMap(config, true, xContentType).v2()); } public String getId() { return id; } - public Map getConfigAsMap() { - return XContentHelper.convertToMap(config, true, xContentType).v2(); + /** + * @return a reference to the unmodifiable configuration map for this pipeline + */ + public Map getConfig() { + return getConfig(true); } - // pkg-private for tests - XContentType getXContentType() { - return xContentType; + /** + * @param unmodifiable whether the returned map should be unmodifiable or not + * @return a reference to the unmodifiable config map (if unmodifiable is true) or + * a reference to a freshly-created mutable deep copy of the config map (if unmodifiable is false) + */ + public Map getConfig(boolean unmodifiable) { + if (unmodifiable) { + return config; // already unmodifiable + } else { + return deepCopy(config, false); + } } - // pkg-private for tests - BytesReference getConfig() { - return config; + @SuppressWarnings("unchecked") + private static T deepCopy(final T value, final boolean unmodifiable) { + return (T) innerDeepCopy(value, unmodifiable); + } + + private static Object innerDeepCopy(final Object value, final boolean unmodifiable) { + if (value instanceof Map mapValue) { + final Map copy = Maps.newLinkedHashMapWithExpectedSize(mapValue.size()); // n.b. maintain ordering + for (Map.Entry entry : mapValue.entrySet()) { + copy.put(innerDeepCopy(entry.getKey(), unmodifiable), innerDeepCopy(entry.getValue(), unmodifiable)); + } + return unmodifiable ? Collections.unmodifiableMap(copy) : copy; + } else if (value instanceof List listValue) { + final List copy = new ArrayList<>(listValue.size()); + for (Object itemValue : listValue) { + copy.add(innerDeepCopy(itemValue, unmodifiable)); + } + return unmodifiable ? Collections.unmodifiableList(copy) : copy; + } else { + // if this list of expected value types ends up not being exhaustive, then we want to learn about that + // at development time, but it's probably better to err on the side of passing through the value at runtime + assert (value == null || value instanceof String || value instanceof Number || value instanceof Boolean) + : "unexpected value type [" + value.getClass() + "]"; + return value; + } } public Integer getVersion() { - Object o = getConfigAsMap().get("version"); + Object o = config.get("version"); if (o == null) { return null; } else if (o instanceof Number number) { @@ -113,13 +160,22 @@ public Integer getVersion() { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field("id", id); - builder.field("config", getConfigAsMap()); + builder.field("config", config); builder.endObject(); return builder; } public static PipelineConfiguration readFrom(StreamInput in) throws IOException { - return new PipelineConfiguration(in.readString(), in.readBytesReference(), in.readEnum(XContentType.class)); + final String id = in.readString(); + final Map config; + if (in.getTransportVersion().onOrAfter(TransportVersions.INGEST_PIPELINE_CONFIGURATION_AS_MAP)) { + config = in.readGenericMap(); + } else { + final BytesReference bytes = in.readSlicedBytesReference(); + final XContentType type = in.readEnum(XContentType.class); + config = XContentHelper.convertToMap(bytes, true, type).v2(); + } + return new PipelineConfiguration(id, config); } public static Diff readDiffFrom(StreamInput in) throws IOException { @@ -134,8 +190,14 @@ public String toString() { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(id); - out.writeBytesReference(config); - XContentHelper.writeTo(out, xContentType); + if (out.getTransportVersion().onOrAfter(TransportVersions.INGEST_PIPELINE_CONFIGURATION_AS_MAP)) { + out.writeGenericMap(config); + } else { + XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).prettyPrint(); + builder.map(config); + out.writeBytesReference(BytesReference.bytes(builder)); + XContentHelper.writeTo(out, XContentType.JSON); + } } @Override @@ -146,14 +208,14 @@ public boolean equals(Object o) { PipelineConfiguration that = (PipelineConfiguration) o; if (id.equals(that.id) == false) return false; - return getConfigAsMap().equals(that.getConfigAsMap()); + return config.equals(that.config); } @Override public int hashCode() { int result = id.hashCode(); - result = 31 * result + getConfigAsMap().hashCode(); + result = 31 * result + config.hashCode(); return result; } } diff --git a/server/src/test/java/org/elasticsearch/action/ingest/GetPipelineResponseTests.java b/server/src/test/java/org/elasticsearch/action/ingest/GetPipelineResponseTests.java index 4e6b2b17b2554..61284a49b2502 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/GetPipelineResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/GetPipelineResponseTests.java @@ -79,7 +79,7 @@ public void testXContentDeserialization() throws IOException { assertEquals(actualPipelines.size(), parsedPipelines.size()); for (PipelineConfiguration pipeline : parsedPipelines) { assertTrue(pipelinesMap.containsKey(pipeline.getId())); - assertEquals(pipelinesMap.get(pipeline.getId()).getConfigAsMap(), pipeline.getConfigAsMap()); + assertEquals(pipelinesMap.get(pipeline.getId()).getConfig(), pipeline.getConfig()); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestMetadataTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestMetadataTests.java index 6198d6580cb3d..7ddd2b3d636d7 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestMetadataTests.java @@ -59,8 +59,8 @@ public void testFromXContent() throws IOException { assertEquals(2, m.getPipelines().size()); assertEquals("1", m.getPipelines().get("1").getId()); assertEquals("2", m.getPipelines().get("2").getId()); - assertEquals(pipeline.getConfigAsMap(), m.getPipelines().get("1").getConfigAsMap()); - assertEquals(pipeline2.getConfigAsMap(), m.getPipelines().get("2").getConfigAsMap()); + assertEquals(pipeline.getConfig(), m.getPipelines().get("1").getConfig()); + assertEquals(pipeline2.getConfig(), m.getPipelines().get("2").getConfig()); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java index 202c4edb2d0c8..7be6e97762ccf 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java @@ -26,26 +26,57 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; import java.util.function.Predicate; +import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; public class PipelineConfigurationTests extends AbstractXContentTestCase { + public void testConfigInvariants() { + Map original = Map.of("a", 1); + Map mutable = new HashMap<>(original); + PipelineConfiguration configuration = new PipelineConfiguration("1", mutable); + // the config is equal to the original & mutable map, regardless of how you get a reference to it + assertThat(configuration.getConfig(), equalTo(original)); + assertThat(configuration.getConfig(), equalTo(mutable)); + assertThat(configuration.getConfig(), equalTo(configuration.getConfig(false))); + assertThat(configuration.getConfig(), equalTo(configuration.getConfig(true))); + // the config is the same instance as itself when unmodifiable is true + assertThat(configuration.getConfig(), sameInstance(configuration.getConfig())); + assertThat(configuration.getConfig(), sameInstance(configuration.getConfig(true))); + // but it's not the same instance as the original mutable map, nor if unmodifiable is false + assertThat(configuration.getConfig(), not(sameInstance(mutable))); + assertThat(configuration.getConfig(), not(sameInstance(configuration.getConfig(false)))); + + // changing the mutable map doesn't alter the pipeline's configuration + mutable.put("b", 2); + assertThat(configuration.getConfig(), equalTo(original)); + + // the modifiable map can be modified + Map modifiable = configuration.getConfig(false); + modifiable.put("c", 3); // this doesn't throw an exception + assertThat(modifiable.get("c"), equalTo(3)); + // but the next modifiable copy is a new fresh copy, and doesn't reflect those changes + assertThat(configuration.getConfig(), equalTo(configuration.getConfig(false))); + } + public void testSerialization() throws IOException { PipelineConfiguration configuration = new PipelineConfiguration( "1", new BytesArray("{}".getBytes(StandardCharsets.UTF_8)), XContentType.JSON ); - assertEquals(XContentType.JSON, configuration.getXContentType()); - + assertThat(configuration.getConfig(), anEmptyMap()); BytesStreamOutput out = new BytesStreamOutput(); configuration.writeTo(out); StreamInput in = StreamInput.wrap(out.bytes().toBytesRef().bytes); PipelineConfiguration serialized = PipelineConfiguration.readFrom(in); - assertEquals(XContentType.JSON, serialized.getXContentType()); - assertEquals("{}", serialized.getConfig().utf8ToString()); + assertThat(serialized.getConfig(), anEmptyMap()); } public void testMetaSerialization() throws IOException { @@ -56,13 +87,14 @@ public void testMetaSerialization() throws IOException { new BytesArray(configJson.getBytes(StandardCharsets.UTF_8)), XContentType.JSON ); - assertEquals(XContentType.JSON, configuration.getXContentType()); BytesStreamOutput out = new BytesStreamOutput(); configuration.writeTo(out); StreamInput in = StreamInput.wrap(out.bytes().toBytesRef().bytes); PipelineConfiguration serialized = PipelineConfiguration.readFrom(in); - assertEquals(XContentType.JSON, serialized.getXContentType()); - assertEquals(configJson, serialized.getConfig().utf8ToString()); + assertEquals( + XContentHelper.convertToMap(new BytesArray(configJson.getBytes(StandardCharsets.UTF_8)), true, XContentType.JSON).v2(), + serialized.getConfig() + ); } public void testParser() throws IOException { @@ -80,9 +112,8 @@ public void testParser() throws IOException { XContentParser xContentParser = xContentType.xContent() .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, bytes.streamInput()); PipelineConfiguration parsed = parser.parse(xContentParser, null); - assertEquals(xContentType.canonical(), parsed.getXContentType()); - assertEquals("{}", XContentHelper.convertToJson(parsed.getConfig(), false, parsed.getXContentType())); - assertEquals("1", parsed.getId()); + assertThat(parsed.getId(), equalTo("1")); + assertThat(parsed.getConfig(), anEmptyMap()); } public void testGetVersion() { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/InferenceProcessorInfoExtractor.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/InferenceProcessorInfoExtractor.java index 83f7832645270..ad8a55a5f8443 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/InferenceProcessorInfoExtractor.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/InferenceProcessorInfoExtractor.java @@ -51,7 +51,7 @@ public static int countInferenceProcessors(ClusterState state) { } Counter counter = Counter.newCounter(); ingestMetadata.getPipelines().forEach((pipelineId, configuration) -> { - Map configMap = configuration.getConfigAsMap(); + Map configMap = configuration.getConfig(); List> processorConfigs = (List>) configMap.get(PROCESSORS_KEY); for (Map processorConfigWithKey : processorConfigs) { for (Map.Entry entry : processorConfigWithKey.entrySet()) { @@ -73,7 +73,7 @@ public static Set getModelIdsFromInferenceProcessors(IngestMetadata inge Set modelIds = new LinkedHashSet<>(); ingestMetadata.getPipelines().forEach((pipelineId, configuration) -> { - Map configMap = configuration.getConfigAsMap(); + Map configMap = configuration.getConfig(); List> processorConfigs = readList(configMap, PROCESSORS_KEY); for (Map processorConfigWithKey : processorConfigs) { for (Map.Entry entry : processorConfigWithKey.entrySet()) { @@ -100,7 +100,7 @@ public static Map> pipelineIdsByResource(ClusterState state, return pipelineIdsByModelIds; } ingestMetadata.getPipelines().forEach((pipelineId, configuration) -> { - Map configMap = configuration.getConfigAsMap(); + Map configMap = configuration.getConfig(); List> processorConfigs = readList(configMap, PROCESSORS_KEY); for (Map processorConfigWithKey : processorConfigs) { for (Map.Entry entry : processorConfigWithKey.entrySet()) { @@ -131,7 +131,7 @@ public static Set pipelineIdsForResource(ClusterState state, Set return pipelineIds; } ingestMetadata.getPipelines().forEach((pipelineId, configuration) -> { - Map configMap = configuration.getConfigAsMap(); + Map configMap = configuration.getConfig(); List> processorConfigs = readList(configMap, PROCESSORS_KEY); for (Map processorConfigWithKey : processorConfigs) { for (Map.Entry entry : processorConfigWithKey.entrySet()) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistryTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistryTests.java index e396712cbc360..356fac4539137 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistryTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistryTests.java @@ -726,7 +726,7 @@ private static void assertPutPipelineAction( putRequest.getSource(), putRequest.getXContentType() ); - List processors = (List) pipelineConfiguration.getConfigAsMap().get("processors"); + List processors = (List) pipelineConfiguration.getConfig().get("processors"); assertThat(processors, hasSize(1)); Map setProcessor = (Map) ((Map) processors.get(0)).get("set"); assertNotNull(setProcessor.get("field")); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyReindexPipeline.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyReindexPipeline.java index 5b7020c3f2bb0..28300974dd8fb 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyReindexPipeline.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyReindexPipeline.java @@ -53,7 +53,7 @@ static boolean exists(ClusterState clusterState) { if (ingestMetadata != null) { final PipelineConfiguration pipeline = ingestMetadata.getPipelines().get(pipelineName()); if (pipeline != null) { - Object version = pipeline.getConfigAsMap().get("version"); + Object version = pipeline.getConfig().get("version"); return version instanceof Number number && number.intValue() >= ENRICH_PIPELINE_LAST_UPDATED_VERSION; } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingService.java index deb645ff96133..4a9d65481d412 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingService.java @@ -981,7 +981,7 @@ private static Set countInferenceProcessors(IngestMetadata ingestMetadat return allReferencedModelKeys; } ingestMetadata.getPipelines().forEach((pipelineId, pipelineConfiguration) -> { - Object processors = pipelineConfiguration.getConfigAsMap().get("processors"); + Object processors = pipelineConfiguration.getConfig().get("processors"); if (processors instanceof List) { for (Object processor : (List) processors) { if (processor instanceof Map) { diff --git a/x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/LegacyStackTemplateRegistryTests.java b/x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/LegacyStackTemplateRegistryTests.java index 39f58e638aa68..cf20bad0fc2cf 100644 --- a/x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/LegacyStackTemplateRegistryTests.java +++ b/x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/LegacyStackTemplateRegistryTests.java @@ -67,7 +67,7 @@ public void testThatTemplatesAreDeprecated() { registry.getIngestPipelines() .stream() .map(ipc -> new PipelineConfiguration(ipc.getId(), ipc.loadConfig(), XContentType.JSON)) - .map(PipelineConfiguration::getConfigAsMap) + .map(PipelineConfiguration::getConfig) .forEach(p -> assertTrue((Boolean) p.get("deprecated"))); } diff --git a/x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/StackTemplateRegistryTests.java b/x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/StackTemplateRegistryTests.java index 25ff3b5311fa2..2ca3efa5655f5 100644 --- a/x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/StackTemplateRegistryTests.java +++ b/x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/StackTemplateRegistryTests.java @@ -551,7 +551,7 @@ public void testThatTemplatesAreNotDeprecated() { registry.getIngestPipelines() .stream() .map(ipc -> new PipelineConfiguration(ipc.getId(), ipc.loadConfig(), XContentType.JSON)) - .map(PipelineConfiguration::getConfigAsMap) + .map(PipelineConfiguration::getConfig) .forEach(p -> assertFalse((Boolean) p.get("deprecated"))); }