From 7c0f9e398e3e88e80a4e4911708aa415c8d778ef Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Wed, 30 Oct 2024 13:33:53 -0400 Subject: [PATCH 01/14] Whitespace --- .../java/org/elasticsearch/ingest/PipelineConfiguration.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java index 9067cdb2040fd..5b2c1633357cc 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java @@ -41,7 +41,6 @@ public final class PipelineConfiguration implements SimpleDiffable getParser() { From 7f1bdeb33fbc320d55200e8f2ab52348f1cc6928 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Mon, 18 Nov 2024 14:49:35 -0500 Subject: [PATCH 02/14] Rewrite some test assertions --- .../org/elasticsearch/ingest/PipelineConfigurationTests.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java index 202c4edb2d0c8..d799d66c2a88b 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java @@ -28,6 +28,7 @@ import java.nio.charset.StandardCharsets; import java.util.function.Predicate; +import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.equalTo; public class PipelineConfigurationTests extends AbstractXContentTestCase { @@ -38,7 +39,7 @@ public void testSerialization() throws IOException { new BytesArray("{}".getBytes(StandardCharsets.UTF_8)), XContentType.JSON ); - assertEquals(XContentType.JSON, configuration.getXContentType()); + assertThat(configuration.getConfigAsMap(), anEmptyMap()); BytesStreamOutput out = new BytesStreamOutput(); configuration.writeTo(out); @@ -81,8 +82,8 @@ public void testParser() throws IOException { .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, bytes.streamInput()); PipelineConfiguration parsed = parser.parse(xContentParser, null); assertEquals(xContentType.canonical(), parsed.getXContentType()); + assertThat(parsed.getId(), equalTo("1")); assertEquals("{}", XContentHelper.convertToJson(parsed.getConfig(), false, parsed.getXContentType())); - assertEquals("1", parsed.getId()); } public void testGetVersion() { From 7ff58794f4df2fdcf546cf4c48e6b3c663851236 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 22 Oct 2024 21:18:21 -0400 Subject: [PATCH 03/14] Cache a parsed deep copy and hand it out like candy --- .../elasticsearch/ingest/IngestService.java | 4 +- .../ingest/PipelineConfiguration.java | 37 ++++++++++++++++++- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index ce61f197b4831..bb5b1f2c17508 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -1292,7 +1292,7 @@ synchronized void innerUpdatePipelines(IngestMetadata newIngestMetadata) { try { Pipeline newPipeline = Pipeline.create( newConfiguration.getId(), - newConfiguration.getConfigAsMap(), + newConfiguration.parseConfigAsMap(), processorFactories, scriptService ); @@ -1416,7 +1416,7 @@ public

Collection getPipelineWithProcessorType(Cla public synchronized void reloadPipeline(String id) throws Exception { PipelineHolder holder = pipelines.get(id); - Pipeline updatedPipeline = Pipeline.create(id, holder.configuration.getConfigAsMap(), processorFactories, scriptService); + Pipeline updatedPipeline = Pipeline.create(id, holder.configuration.parseConfigAsMap(), processorFactories, scriptService); Map updatedPipelines = new HashMap<>(this.pipelines); updatedPipelines.put(id, new PipelineHolder(holder.configuration, updatedPipeline)); this.pipelines = Map.copyOf(updatedPipelines); diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java index 5b2c1633357cc..90d3116948bde 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.xcontent.ContextParser; import org.elasticsearch.xcontent.ObjectParser; @@ -25,6 +26,9 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -73,11 +77,13 @@ PipelineConfiguration build() { // also the get pipeline api just directly returns this to the caller private final BytesReference config; private final XContentType xContentType; + private final Map parsedConfigMap; public PipelineConfiguration(String id, BytesReference config, XContentType xContentType) { this.id = Objects.requireNonNull(id); this.config = Objects.requireNonNull(config); this.xContentType = Objects.requireNonNull(xContentType); + this.parsedConfigMap = deepCopy(parseConfigAsMap(), true); } public String getId() { @@ -85,9 +91,38 @@ public String getId() { } public Map getConfigAsMap() { + return parsedConfigMap; + } + + public Map parseConfigAsMap() { return XContentHelper.convertToMap(config, true, xContentType).v2(); } + @SuppressWarnings("unchecked") + private static T deepCopy(final T value, final boolean unmodifiable) { + return (T) innerDeepCopy(value, unmodifiable); + } + + private static Object innerDeepCopy(final Object value, final boolean unmodifiable) { + if (value instanceof Map mapValue) { + final Map copy = Maps.newLinkedHashMapWithExpectedSize(mapValue.size()); // n.b. maintain ordering + for (Map.Entry entry : mapValue.entrySet()) { + copy.put(innerDeepCopy(entry.getKey(), unmodifiable), innerDeepCopy(entry.getValue(), unmodifiable)); + } + return unmodifiable ? Collections.unmodifiableMap(copy) : copy; + } else if (value instanceof List listValue) { + final List copy = new ArrayList<>(listValue.size()); + for (Object itemValue : listValue) { + copy.add(innerDeepCopy(itemValue, unmodifiable)); + } + return unmodifiable ? Collections.unmodifiableList(copy) : copy; + } else if (value == null || value instanceof String || value instanceof Number || value instanceof Boolean) { + return value; + } else { + throw new IllegalArgumentException("unexpected value type [" + value.getClass() + "]"); + } + } + // pkg-private for tests XContentType getXContentType() { return xContentType; @@ -163,7 +198,7 @@ public int hashCode() { *

The given upgrader is applied to the config map for any processor of the given type. */ PipelineConfiguration maybeUpgradeProcessors(String type, IngestMetadata.ProcessorConfigUpgrader upgrader) { - Map mutableConfigMap = getConfigAsMap(); + Map mutableConfigMap = parseConfigAsMap(); boolean changed = false; // This should be a List of Maps, where the keys are processor types and the values are config maps. // But we'll skip upgrading rather than fail if not. From 8679f111945b00021ebb698abdf658b287193057 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Mon, 18 Nov 2024 11:30:54 -0500 Subject: [PATCH 04/14] Add a transport version for changing this structure --- .../org/elasticsearch/TransportVersions.java | 1 + .../ingest/PipelineConfiguration.java | 87 +++++++++---------- .../ingest/PipelineConfigurationTests.java | 14 ++- 3 files changed, 49 insertions(+), 53 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 1a99123ebdac6..887cfea36a199 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -203,6 +203,7 @@ static TransportVersion def(int id) { public static final TransportVersion INDEX_STATS_ADDITIONAL_FIELDS_REVERT = def(8_794_00_0); public static final TransportVersion FAST_REFRESH_RCO_2 = def(8_795_00_0); public static final TransportVersion ESQL_ENRICH_RUNTIME_WARNINGS = def(8_796_00_0); + public static final TransportVersion INGEST_PIPELINE_CONFIGURATION_AS_MAP = def(8_797_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java index 90d3116948bde..226ee30d1f30d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java @@ -9,6 +9,7 @@ package org.elasticsearch.ingest; +import org.elasticsearch.TransportVersions; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.SimpleDiffable; import org.elasticsearch.common.Strings; @@ -23,9 +24,9 @@ import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xcontent.json.JsonXContent; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -40,11 +41,11 @@ public final class PipelineConfiguration implements SimpleDiffable PARSER = new ObjectParser<>("pipeline_config", true, Builder::new); static { PARSER.declareString(Builder::setId, new ParseField("id")); - PARSER.declareField((parser, builder, aVoid) -> { - XContentBuilder contentBuilder = XContentBuilder.builder(parser.contentType().xContent()); - contentBuilder.generator().copyCurrentStructure(parser); - builder.setConfig(BytesReference.bytes(contentBuilder), contentBuilder.contentType()); - }, new ParseField("config"), ObjectParser.ValueType.OBJECT); + PARSER.declareField( + (parser, builder, aVoid) -> builder.setConfig(parser.map()), + new ParseField("config"), + ObjectParser.ValueType.OBJECT + ); } public static ContextParser getParser() { @@ -54,36 +55,31 @@ public static ContextParser getParser() { private static class Builder { private String id; - private BytesReference config; - private XContentType xContentType; + private Map config; void setId(String id) { this.id = id; } - void setConfig(BytesReference config, XContentType xContentType) { + void setConfig(Map config) { this.config = config; - this.xContentType = xContentType; } PipelineConfiguration build() { - return new PipelineConfiguration(id, config, xContentType); + return new PipelineConfiguration(id, config); } } private final String id; - // Store config as bytes reference, because the config is only used when the pipeline store reads the cluster state - // and the way the map of maps config is read requires a deep copy (it removes instead of gets entries to check for unused options) - // also the get pipeline api just directly returns this to the caller - private final BytesReference config; - private final XContentType xContentType; - private final Map parsedConfigMap; + private final Map config; - public PipelineConfiguration(String id, BytesReference config, XContentType xContentType) { + public PipelineConfiguration(String id, Map config) { this.id = Objects.requireNonNull(id); - this.config = Objects.requireNonNull(config); - this.xContentType = Objects.requireNonNull(xContentType); - this.parsedConfigMap = deepCopy(parseConfigAsMap(), true); + this.config = deepCopy(config, true); // defensive deep copy + } + + public PipelineConfiguration(String id, BytesReference config, XContentType xContentType) { + this(id, XContentHelper.convertToMap(config, true, xContentType).v2()); } public String getId() { @@ -91,11 +87,11 @@ public String getId() { } public Map getConfigAsMap() { - return parsedConfigMap; + return config; } public Map parseConfigAsMap() { - return XContentHelper.convertToMap(config, true, xContentType).v2(); + return deepCopy(config, false); } @SuppressWarnings("unchecked") @@ -123,18 +119,8 @@ private static Object innerDeepCopy(final Object value, final boolean unmodifiab } } - // pkg-private for tests - XContentType getXContentType() { - return xContentType; - } - - // pkg-private for tests - BytesReference getConfig() { - return config; - } - public Integer getVersion() { - Object o = getConfigAsMap().get("version"); + Object o = config.get("version"); if (o == null) { return null; } else if (o instanceof Number number) { @@ -148,13 +134,22 @@ public Integer getVersion() { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field("id", id); - builder.field("config", getConfigAsMap()); + builder.field("config", config); builder.endObject(); return builder; } public static PipelineConfiguration readFrom(StreamInput in) throws IOException { - return new PipelineConfiguration(in.readString(), in.readBytesReference(), in.readEnum(XContentType.class)); + final String id = in.readString(); + final Map config; + if (in.getTransportVersion().onOrAfter(TransportVersions.INGEST_PIPELINE_CONFIGURATION_AS_MAP)) { + config = in.readGenericMap(); + } else { + final BytesReference bytes = in.readBytesReference(); + final XContentType type = in.readEnum(XContentType.class); + config = XContentHelper.convertToMap(bytes, true, type).v2(); + } + return new PipelineConfiguration(id, config); } public static Diff readDiffFrom(StreamInput in) throws IOException { @@ -169,8 +164,14 @@ public String toString() { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(id); - out.writeBytesReference(config); - XContentHelper.writeTo(out, xContentType); + if (out.getTransportVersion().onOrAfter(TransportVersions.INGEST_PIPELINE_CONFIGURATION_AS_MAP)) { + out.writeGenericMap(config); + } else { + XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).prettyPrint(); + builder.map(config); + out.writeBytesReference(BytesReference.bytes(builder)); + XContentHelper.writeTo(out, XContentType.JSON); + } } @Override @@ -181,14 +182,14 @@ public boolean equals(Object o) { PipelineConfiguration that = (PipelineConfiguration) o; if (id.equals(that.id) == false) return false; - return getConfigAsMap().equals(that.getConfigAsMap()); + return config.equals(that.config); } @Override public int hashCode() { int result = id.hashCode(); - result = 31 * result + getConfigAsMap().hashCode(); + result = 31 * result + config.hashCode(); return result; } @@ -214,11 +215,7 @@ PipelineConfiguration maybeUpgradeProcessors(String type, IngestMetadata.Process } } if (changed) { - try (XContentBuilder builder = XContentBuilder.builder(xContentType.xContent())) { - return new PipelineConfiguration(id, BytesReference.bytes(builder.map(mutableConfigMap)), xContentType); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + return new PipelineConfiguration(id, mutableConfigMap); } else { return this; } diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java index d799d66c2a88b..0a6f1b22932b9 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java @@ -40,13 +40,11 @@ public void testSerialization() throws IOException { XContentType.JSON ); assertThat(configuration.getConfigAsMap(), anEmptyMap()); - BytesStreamOutput out = new BytesStreamOutput(); configuration.writeTo(out); StreamInput in = StreamInput.wrap(out.bytes().toBytesRef().bytes); PipelineConfiguration serialized = PipelineConfiguration.readFrom(in); - assertEquals(XContentType.JSON, serialized.getXContentType()); - assertEquals("{}", serialized.getConfig().utf8ToString()); + assertThat(serialized.getConfigAsMap(), anEmptyMap()); } public void testMetaSerialization() throws IOException { @@ -57,13 +55,14 @@ public void testMetaSerialization() throws IOException { new BytesArray(configJson.getBytes(StandardCharsets.UTF_8)), XContentType.JSON ); - assertEquals(XContentType.JSON, configuration.getXContentType()); BytesStreamOutput out = new BytesStreamOutput(); configuration.writeTo(out); StreamInput in = StreamInput.wrap(out.bytes().toBytesRef().bytes); PipelineConfiguration serialized = PipelineConfiguration.readFrom(in); - assertEquals(XContentType.JSON, serialized.getXContentType()); - assertEquals(configJson, serialized.getConfig().utf8ToString()); + assertEquals( + XContentHelper.convertToMap(new BytesArray(configJson.getBytes(StandardCharsets.UTF_8)), true, XContentType.JSON).v2(), + serialized.getConfigAsMap() + ); } public void testParser() throws IOException { @@ -81,9 +80,8 @@ public void testParser() throws IOException { XContentParser xContentParser = xContentType.xContent() .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, bytes.streamInput()); PipelineConfiguration parsed = parser.parse(xContentParser, null); - assertEquals(xContentType.canonical(), parsed.getXContentType()); assertThat(parsed.getId(), equalTo("1")); - assertEquals("{}", XContentHelper.convertToJson(parsed.getConfig(), false, parsed.getXContentType())); + assertThat(parsed.getConfigAsMap(), anEmptyMap()); } public void testGetVersion() { From e6d827b1cd82ca98b52877f1c82bb4cffc40caa2 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 12 Nov 2024 16:24:25 -0500 Subject: [PATCH 05/14] Hide this in an overloaded method --- .../java/org/elasticsearch/ingest/IngestService.java | 4 ++-- .../elasticsearch/ingest/PipelineConfiguration.java | 10 +++++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index bb5b1f2c17508..78a0487a27472 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -1292,7 +1292,7 @@ synchronized void innerUpdatePipelines(IngestMetadata newIngestMetadata) { try { Pipeline newPipeline = Pipeline.create( newConfiguration.getId(), - newConfiguration.parseConfigAsMap(), + newConfiguration.getConfigAsMap(false), processorFactories, scriptService ); @@ -1416,7 +1416,7 @@ public

Collection getPipelineWithProcessorType(Cla public synchronized void reloadPipeline(String id) throws Exception { PipelineHolder holder = pipelines.get(id); - Pipeline updatedPipeline = Pipeline.create(id, holder.configuration.parseConfigAsMap(), processorFactories, scriptService); + Pipeline updatedPipeline = Pipeline.create(id, holder.configuration.getConfigAsMap(false), processorFactories, scriptService); Map updatedPipelines = new HashMap<>(this.pipelines); updatedPipelines.put(id, new PipelineHolder(holder.configuration, updatedPipeline)); this.pipelines = Map.copyOf(updatedPipelines); diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java index 226ee30d1f30d..3b204eea934d9 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java @@ -87,11 +87,15 @@ public String getId() { } public Map getConfigAsMap() { - return config; + return getConfigAsMap(true); } - public Map parseConfigAsMap() { - return deepCopy(config, false); + public Map getConfigAsMap(boolean unmodifiable) { + if (unmodifiable) { + return config; // already unmodifiable + } else { + return deepCopy(config, false); + } } @SuppressWarnings("unchecked") From d577238121c6e5880fd911e01adbebafe7928969 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 12 Nov 2024 16:24:54 -0500 Subject: [PATCH 06/14] Use a better method name --- .../ingest/PipelineConfigurationBridge.java | 2 +- .../ingest/geoip/GeoIpDownloaderTaskExecutor.java | 2 +- .../elasticsearch/action/ingest/GetPipelineResponse.java | 2 +- .../cluster/metadata/MetadataIndexTemplateService.java | 2 +- .../java/org/elasticsearch/ingest/IngestService.java | 6 +++--- .../org/elasticsearch/ingest/PipelineConfiguration.java | 9 +++++---- .../action/ingest/GetPipelineResponseTests.java | 2 +- .../org/elasticsearch/ingest/IngestMetadataTests.java | 4 ++-- .../elasticsearch/ingest/PipelineConfigurationTests.java | 8 ++++---- .../core/ml/utils/InferenceProcessorInfoExtractor.java | 8 ++++---- .../xpack/core/template/IndexTemplateRegistryTests.java | 2 +- .../xpack/enrich/EnrichPolicyReindexPipeline.java | 2 +- .../ml/inference/loadingservice/ModelLoadingService.java | 2 +- .../xpack/stack/LegacyStackTemplateRegistryTests.java | 2 +- .../xpack/stack/StackTemplateRegistryTests.java | 2 +- 15 files changed, 28 insertions(+), 27 deletions(-) diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java index e146b06fe3f53..9d357a658c415 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java @@ -29,7 +29,7 @@ public String getId() { } public Map getConfigAsMap() { - return delegate.getConfigAsMap(); + return delegate.getConfig(); } @Override diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java index 61ca050d91c13..2f96aa3cbc69a 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -268,7 +268,7 @@ private static Set pipelinesWithGeoIpProcessor(ClusterState clusterState Set ids = new HashSet<>(); // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph for (PipelineConfiguration configuration : configurations) { - List> processors = (List>) configuration.getConfigAsMap().get(Pipeline.PROCESSORS_KEY); + List> processors = (List>) configuration.getConfig().get(Pipeline.PROCESSORS_KEY); if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation)) { ids.add(configuration.getId()); } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java b/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java index 3ed1dfef50053..760b87af49a78 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java @@ -80,7 +80,7 @@ public RestStatus status() { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); for (PipelineConfiguration pipeline : pipelines) { - builder.field(pipeline.getId(), summary ? Map.of() : pipeline.getConfigAsMap()); + builder.field(pipeline.getId(), summary ? Map.of() : pipeline.getConfig()); } builder.endObject(); return builder; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java index d6ed28454df96..3878a3329b634 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java @@ -782,7 +782,7 @@ private void validateUseOfDeprecatedIngestPipelines(String name, IngestMetadata private void emitWarningIfPipelineIsDeprecated(String name, Map pipelines, String pipelineName) { Optional.ofNullable(pipelineName) .map(pipelines::get) - .filter(p -> Boolean.TRUE.equals(p.getConfigAsMap().get("deprecated"))) + .filter(p -> Boolean.TRUE.equals(p.getConfig().get("deprecated"))) .ifPresent( p -> deprecationLogger.warn( DeprecationCategory.TEMPLATES, diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 78a0487a27472..1494d2a46f9d0 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -519,7 +519,7 @@ public static boolean isNoOpPipelineUpdate(ClusterState state, PutPipelineReques && currentIngestMetadata.getPipelines().containsKey(request.getId())) { var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId()); - if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) { + if (currentPipeline.getConfig().equals(pipelineConfig)) { return true; } } @@ -1292,7 +1292,7 @@ synchronized void innerUpdatePipelines(IngestMetadata newIngestMetadata) { try { Pipeline newPipeline = Pipeline.create( newConfiguration.getId(), - newConfiguration.getConfigAsMap(false), + newConfiguration.getConfig(false), processorFactories, scriptService ); @@ -1416,7 +1416,7 @@ public

Collection getPipelineWithProcessorType(Cla public synchronized void reloadPipeline(String id) throws Exception { PipelineHolder holder = pipelines.get(id); - Pipeline updatedPipeline = Pipeline.create(id, holder.configuration.getConfigAsMap(false), processorFactories, scriptService); + Pipeline updatedPipeline = Pipeline.create(id, holder.configuration.getConfig(false), processorFactories, scriptService); Map updatedPipelines = new HashMap<>(this.pipelines); updatedPipelines.put(id, new PipelineHolder(holder.configuration, updatedPipeline)); this.pipelines = Map.copyOf(updatedPipelines); diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java index 3b204eea934d9..df26ba6f1b93b 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java @@ -86,11 +86,11 @@ public String getId() { return id; } - public Map getConfigAsMap() { - return getConfigAsMap(true); + public Map getConfig() { + return getConfig(true); } - public Map getConfigAsMap(boolean unmodifiable) { + public Map getConfig(boolean unmodifiable) { if (unmodifiable) { return config; // already unmodifiable } else { @@ -203,7 +203,8 @@ public int hashCode() { *

The given upgrader is applied to the config map for any processor of the given type. */ PipelineConfiguration maybeUpgradeProcessors(String type, IngestMetadata.ProcessorConfigUpgrader upgrader) { - Map mutableConfigMap = parseConfigAsMap(); + Map mutableConfigMap = getConfig(false); + ; boolean changed = false; // This should be a List of Maps, where the keys are processor types and the values are config maps. // But we'll skip upgrading rather than fail if not. diff --git a/server/src/test/java/org/elasticsearch/action/ingest/GetPipelineResponseTests.java b/server/src/test/java/org/elasticsearch/action/ingest/GetPipelineResponseTests.java index 4e6b2b17b2554..61284a49b2502 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/GetPipelineResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/GetPipelineResponseTests.java @@ -79,7 +79,7 @@ public void testXContentDeserialization() throws IOException { assertEquals(actualPipelines.size(), parsedPipelines.size()); for (PipelineConfiguration pipeline : parsedPipelines) { assertTrue(pipelinesMap.containsKey(pipeline.getId())); - assertEquals(pipelinesMap.get(pipeline.getId()).getConfigAsMap(), pipeline.getConfigAsMap()); + assertEquals(pipelinesMap.get(pipeline.getId()).getConfig(), pipeline.getConfig()); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestMetadataTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestMetadataTests.java index b62fff2eceb28..8235c66ef976b 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestMetadataTests.java @@ -56,8 +56,8 @@ public void testFromXContent() throws IOException { assertEquals(2, custom.getPipelines().size()); assertEquals("1", custom.getPipelines().get("1").getId()); assertEquals("2", custom.getPipelines().get("2").getId()); - assertEquals(pipeline.getConfigAsMap(), custom.getPipelines().get("1").getConfigAsMap()); - assertEquals(pipeline2.getConfigAsMap(), custom.getPipelines().get("2").getConfigAsMap()); + assertEquals(pipeline.getConfig(), custom.getPipelines().get("1").getConfig()); + assertEquals(pipeline2.getConfig(), custom.getPipelines().get("2").getConfig()); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java index 0a6f1b22932b9..951ebba7ba16f 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java @@ -39,12 +39,12 @@ public void testSerialization() throws IOException { new BytesArray("{}".getBytes(StandardCharsets.UTF_8)), XContentType.JSON ); - assertThat(configuration.getConfigAsMap(), anEmptyMap()); + assertThat(configuration.getConfig(), anEmptyMap()); BytesStreamOutput out = new BytesStreamOutput(); configuration.writeTo(out); StreamInput in = StreamInput.wrap(out.bytes().toBytesRef().bytes); PipelineConfiguration serialized = PipelineConfiguration.readFrom(in); - assertThat(serialized.getConfigAsMap(), anEmptyMap()); + assertThat(serialized.getConfig(), anEmptyMap()); } public void testMetaSerialization() throws IOException { @@ -61,7 +61,7 @@ public void testMetaSerialization() throws IOException { PipelineConfiguration serialized = PipelineConfiguration.readFrom(in); assertEquals( XContentHelper.convertToMap(new BytesArray(configJson.getBytes(StandardCharsets.UTF_8)), true, XContentType.JSON).v2(), - serialized.getConfigAsMap() + serialized.getConfig() ); } @@ -81,7 +81,7 @@ public void testParser() throws IOException { .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, bytes.streamInput()); PipelineConfiguration parsed = parser.parse(xContentParser, null); assertThat(parsed.getId(), equalTo("1")); - assertThat(parsed.getConfigAsMap(), anEmptyMap()); + assertThat(parsed.getConfig(), anEmptyMap()); } public void testGetVersion() { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/InferenceProcessorInfoExtractor.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/InferenceProcessorInfoExtractor.java index 83f7832645270..ad8a55a5f8443 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/InferenceProcessorInfoExtractor.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/InferenceProcessorInfoExtractor.java @@ -51,7 +51,7 @@ public static int countInferenceProcessors(ClusterState state) { } Counter counter = Counter.newCounter(); ingestMetadata.getPipelines().forEach((pipelineId, configuration) -> { - Map configMap = configuration.getConfigAsMap(); + Map configMap = configuration.getConfig(); List> processorConfigs = (List>) configMap.get(PROCESSORS_KEY); for (Map processorConfigWithKey : processorConfigs) { for (Map.Entry entry : processorConfigWithKey.entrySet()) { @@ -73,7 +73,7 @@ public static Set getModelIdsFromInferenceProcessors(IngestMetadata inge Set modelIds = new LinkedHashSet<>(); ingestMetadata.getPipelines().forEach((pipelineId, configuration) -> { - Map configMap = configuration.getConfigAsMap(); + Map configMap = configuration.getConfig(); List> processorConfigs = readList(configMap, PROCESSORS_KEY); for (Map processorConfigWithKey : processorConfigs) { for (Map.Entry entry : processorConfigWithKey.entrySet()) { @@ -100,7 +100,7 @@ public static Map> pipelineIdsByResource(ClusterState state, return pipelineIdsByModelIds; } ingestMetadata.getPipelines().forEach((pipelineId, configuration) -> { - Map configMap = configuration.getConfigAsMap(); + Map configMap = configuration.getConfig(); List> processorConfigs = readList(configMap, PROCESSORS_KEY); for (Map processorConfigWithKey : processorConfigs) { for (Map.Entry entry : processorConfigWithKey.entrySet()) { @@ -131,7 +131,7 @@ public static Set pipelineIdsForResource(ClusterState state, Set return pipelineIds; } ingestMetadata.getPipelines().forEach((pipelineId, configuration) -> { - Map configMap = configuration.getConfigAsMap(); + Map configMap = configuration.getConfig(); List> processorConfigs = readList(configMap, PROCESSORS_KEY); for (Map processorConfigWithKey : processorConfigs) { for (Map.Entry entry : processorConfigWithKey.entrySet()) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistryTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistryTests.java index e396712cbc360..356fac4539137 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistryTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistryTests.java @@ -726,7 +726,7 @@ private static void assertPutPipelineAction( putRequest.getSource(), putRequest.getXContentType() ); - List processors = (List) pipelineConfiguration.getConfigAsMap().get("processors"); + List processors = (List) pipelineConfiguration.getConfig().get("processors"); assertThat(processors, hasSize(1)); Map setProcessor = (Map) ((Map) processors.get(0)).get("set"); assertNotNull(setProcessor.get("field")); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyReindexPipeline.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyReindexPipeline.java index 7cddd7e037742..512955a5fe2eb 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyReindexPipeline.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyReindexPipeline.java @@ -56,7 +56,7 @@ static boolean exists(ClusterState clusterState) { if (ingestMetadata != null) { final PipelineConfiguration pipeline = ingestMetadata.getPipelines().get(pipelineName()); if (pipeline != null) { - Object version = pipeline.getConfigAsMap().get("version"); + Object version = pipeline.getConfig().get("version"); return version instanceof Number number && number.intValue() >= ENRICH_PIPELINE_LAST_UPDATED_VERSION; } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingService.java index deb645ff96133..4a9d65481d412 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingService.java @@ -981,7 +981,7 @@ private static Set countInferenceProcessors(IngestMetadata ingestMetadat return allReferencedModelKeys; } ingestMetadata.getPipelines().forEach((pipelineId, pipelineConfiguration) -> { - Object processors = pipelineConfiguration.getConfigAsMap().get("processors"); + Object processors = pipelineConfiguration.getConfig().get("processors"); if (processors instanceof List) { for (Object processor : (List) processors) { if (processor instanceof Map) { diff --git a/x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/LegacyStackTemplateRegistryTests.java b/x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/LegacyStackTemplateRegistryTests.java index b8c64f945db0a..654cf494e0e6f 100644 --- a/x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/LegacyStackTemplateRegistryTests.java +++ b/x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/LegacyStackTemplateRegistryTests.java @@ -56,7 +56,7 @@ public void testThatTemplatesAreDeprecated() { registry.getIngestPipelines() .stream() .map(ipc -> new PipelineConfiguration(ipc.getId(), ipc.loadConfig(), XContentType.JSON)) - .map(PipelineConfiguration::getConfigAsMap) + .map(PipelineConfiguration::getConfig) .forEach(p -> assertTrue((Boolean) p.get("deprecated"))); } diff --git a/x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/StackTemplateRegistryTests.java b/x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/StackTemplateRegistryTests.java index 35e81f6f4c8c7..a8043f3d5e4e5 100644 --- a/x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/StackTemplateRegistryTests.java +++ b/x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/StackTemplateRegistryTests.java @@ -516,7 +516,7 @@ public void testThatTemplatesAreNotDeprecated() { registry.getIngestPipelines() .stream() .map(ipc -> new PipelineConfiguration(ipc.getId(), ipc.loadConfig(), XContentType.JSON)) - .map(PipelineConfiguration::getConfigAsMap) + .map(PipelineConfiguration::getConfig) .forEach(p -> assertFalse((Boolean) p.get("deprecated"))); } From 3da58e10dec6c3063d91b02302a98be8333a45ea Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 12 Nov 2024 16:37:35 -0500 Subject: [PATCH 07/14] Updates for the logstashbridge --- .../logstashbridge/ingest/PipelineConfigurationBridge.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java index 9d357a658c415..cb90d10665659 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java @@ -28,10 +28,14 @@ public String getId() { return delegate.getId(); } - public Map getConfigAsMap() { + public Map getConfig() { return delegate.getConfig(); } + public Map getConfig(final boolean unmodifiable) { + return delegate.getConfig(unmodifiable); + } + @Override public int hashCode() { return delegate.hashCode(); From 612d2191112ee3f9f1ca14717f86cd42e0822806 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 12 Nov 2024 17:08:05 -0500 Subject: [PATCH 08/14] Add some javadocs --- .../ingest/PipelineConfiguration.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java index df26ba6f1b93b..2c7063d09d5ab 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java @@ -78,6 +78,14 @@ public PipelineConfiguration(String id, Map config) { this.config = deepCopy(config, true); // defensive deep copy } + /** + * A convenience constructor that parses some bytes as a map representing a pipeline's config and then delegates to the + * conventional {@link #PipelineConfiguration(String, Map)} constructor. + * + * @param id the id of the pipeline + * @param config a parse-able bytes reference that will return a pipeline configuration + * @param xContentType the content-type to use while parsing the pipeline configuration + */ public PipelineConfiguration(String id, BytesReference config, XContentType xContentType) { this(id, XContentHelper.convertToMap(config, true, xContentType).v2()); } @@ -86,10 +94,18 @@ public String getId() { return id; } + /** + * @return a reference to the unmodifiable configuration map for this pipeline + */ public Map getConfig() { return getConfig(true); } + /** + * @param unmodifiable whether the returned map should be unmodifiable or not + * @return a reference to the unmodifiable config map (if unmodifiable is true) or + * a reference to a freshly-created mutable deep copy of the config map (if unmodifiable is false) + */ public Map getConfig(boolean unmodifiable) { if (unmodifiable) { return config; // already unmodifiable From 4030b560a858d1a187921a4dcc875333f1381762 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Wed, 13 Nov 2024 10:29:28 -0500 Subject: [PATCH 09/14] Rewrite the class's javadoc --- .../org/elasticsearch/ingest/PipelineConfiguration.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java index 2c7063d09d5ab..2a3b5386b1d97 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java @@ -34,7 +34,11 @@ import java.util.Objects; /** - * Encapsulates a pipeline's id and configuration as a blob + * Encapsulates a pipeline's id and configuration as a loosely typed map -- see {@link Pipeline} for the + * parsed and processed object(s) that a pipeline configuration will become. This class is used for things + * like keeping track of pipelines in the cluster state (where a pipeline is 'just some json') whereas the + * {@link Pipeline} class is used in the actual processing of ingest documents through pipelines in the + * {@link IngestService}. */ public final class PipelineConfiguration implements SimpleDiffable, ToXContentObject { From a2593547119ee38031539e6d883b065fde6ba660 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Thu, 14 Nov 2024 09:50:15 -0500 Subject: [PATCH 10/14] Add a big test of the new code --- .../ingest/PipelineConfigurationTests.java | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java index 951ebba7ba16f..7be6e97762ccf 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java @@ -26,13 +26,45 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; import java.util.function.Predicate; import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; public class PipelineConfigurationTests extends AbstractXContentTestCase { + public void testConfigInvariants() { + Map original = Map.of("a", 1); + Map mutable = new HashMap<>(original); + PipelineConfiguration configuration = new PipelineConfiguration("1", mutable); + // the config is equal to the original & mutable map, regardless of how you get a reference to it + assertThat(configuration.getConfig(), equalTo(original)); + assertThat(configuration.getConfig(), equalTo(mutable)); + assertThat(configuration.getConfig(), equalTo(configuration.getConfig(false))); + assertThat(configuration.getConfig(), equalTo(configuration.getConfig(true))); + // the config is the same instance as itself when unmodifiable is true + assertThat(configuration.getConfig(), sameInstance(configuration.getConfig())); + assertThat(configuration.getConfig(), sameInstance(configuration.getConfig(true))); + // but it's not the same instance as the original mutable map, nor if unmodifiable is false + assertThat(configuration.getConfig(), not(sameInstance(mutable))); + assertThat(configuration.getConfig(), not(sameInstance(configuration.getConfig(false)))); + + // changing the mutable map doesn't alter the pipeline's configuration + mutable.put("b", 2); + assertThat(configuration.getConfig(), equalTo(original)); + + // the modifiable map can be modified + Map modifiable = configuration.getConfig(false); + modifiable.put("c", 3); // this doesn't throw an exception + assertThat(modifiable.get("c"), equalTo(3)); + // but the next modifiable copy is a new fresh copy, and doesn't reflect those changes + assertThat(configuration.getConfig(), equalTo(configuration.getConfig(false))); + } + public void testSerialization() throws IOException { PipelineConfiguration configuration = new PipelineConfiguration( "1", From b41f2127c797f23c02af507bdd77deb2e3208732 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 19 Nov 2024 10:20:21 -0500 Subject: [PATCH 11/14] Switch to an assert for the bottom case --- .../java/org/elasticsearch/ingest/PipelineConfiguration.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java index 2a3b5386b1d97..3cfe77564915f 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java @@ -139,7 +139,10 @@ private static Object innerDeepCopy(final Object value, final boolean unmodifiab } else if (value == null || value instanceof String || value instanceof Number || value instanceof Boolean) { return value; } else { - throw new IllegalArgumentException("unexpected value type [" + value.getClass() + "]"); + // if the previous list of expected value types ends up not being exhaustive, then we want to learn about that + // at development time, but it's probably better to err on the side of passing through the value at runtime + assert false : "unexpected value type [" + value.getClass() + "]"; + return value; } } From c6f97d50583b218a0476e1c5ec79ca90caba7054 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 19 Nov 2024 11:30:21 -0500 Subject: [PATCH 12/14] Remove a stray semicolon --- .../java/org/elasticsearch/ingest/PipelineConfiguration.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java index 3cfe77564915f..41726612d6baf 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java @@ -227,7 +227,6 @@ public int hashCode() { */ PipelineConfiguration maybeUpgradeProcessors(String type, IngestMetadata.ProcessorConfigUpgrader upgrader) { Map mutableConfigMap = getConfig(false); - ; boolean changed = false; // This should be a List of Maps, where the keys are processor types and the values are config maps. // But we'll skip upgrading rather than fail if not. From 8467326d013fe51de6ba418c9191c4a267a06313 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 19 Nov 2024 12:19:31 -0500 Subject: [PATCH 13/14] Use readSlicedBytesReference here --- .../java/org/elasticsearch/ingest/PipelineConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java index 41726612d6baf..2d5655ea53465 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java @@ -172,7 +172,7 @@ public static PipelineConfiguration readFrom(StreamInput in) throws IOException if (in.getTransportVersion().onOrAfter(TransportVersions.INGEST_PIPELINE_CONFIGURATION_AS_MAP)) { config = in.readGenericMap(); } else { - final BytesReference bytes = in.readBytesReference(); + final BytesReference bytes = in.readSlicedBytesReference(); final XContentType type = in.readEnum(XContentType.class); config = XContentHelper.convertToMap(bytes, true, type).v2(); } From 659ad014653ac795f069e82977a53a805da63dc5 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 19 Nov 2024 12:19:44 -0500 Subject: [PATCH 14/14] Collapse the condition into the assertion --- .../org/elasticsearch/ingest/PipelineConfiguration.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java index 2d5655ea53465..64142caf4189d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java @@ -136,12 +136,11 @@ private static Object innerDeepCopy(final Object value, final boolean unmodifiab copy.add(innerDeepCopy(itemValue, unmodifiable)); } return unmodifiable ? Collections.unmodifiableList(copy) : copy; - } else if (value == null || value instanceof String || value instanceof Number || value instanceof Boolean) { - return value; } else { - // if the previous list of expected value types ends up not being exhaustive, then we want to learn about that + // if this list of expected value types ends up not being exhaustive, then we want to learn about that // at development time, but it's probably better to err on the side of passing through the value at runtime - assert false : "unexpected value type [" + value.getClass() + "]"; + assert (value == null || value instanceof String || value instanceof Number || value instanceof Boolean) + : "unexpected value type [" + value.getClass() + "]"; return value; } }