-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize PipelineConfiguration-checking ClusterStateListeners #117038
Changes from 12 commits
7c0f9e3
7f1bdeb
7ff5879
8679f11
e6d827b
d577238
3da58e1
612d219
4030b56
a259354
b41f212
c6f97d5
8467326
659ad01
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,39 +9,47 @@ | |
|
||
package org.elasticsearch.ingest; | ||
|
||
import org.elasticsearch.TransportVersions; | ||
import org.elasticsearch.cluster.Diff; | ||
import org.elasticsearch.cluster.SimpleDiffable; | ||
import org.elasticsearch.common.Strings; | ||
import org.elasticsearch.common.bytes.BytesReference; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.common.util.Maps; | ||
import org.elasticsearch.common.xcontent.XContentHelper; | ||
import org.elasticsearch.xcontent.ContextParser; | ||
import org.elasticsearch.xcontent.ObjectParser; | ||
import org.elasticsearch.xcontent.ParseField; | ||
import org.elasticsearch.xcontent.ToXContentObject; | ||
import org.elasticsearch.xcontent.XContentBuilder; | ||
import org.elasticsearch.xcontent.XContentType; | ||
import org.elasticsearch.xcontent.json.JsonXContent; | ||
|
||
import java.io.IOException; | ||
import java.io.UncheckedIOException; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
/** | ||
* Encapsulates a pipeline's id and configuration as a blob | ||
* Encapsulates a pipeline's id and configuration as a loosely typed map -- see {@link Pipeline} for the | ||
* parsed and processed object(s) that a pipeline configuration will become. This class is used for things | ||
* like keeping track of pipelines in the cluster state (where a pipeline is 'just some json') whereas the | ||
* {@link Pipeline} class is used in the actual processing of ingest documents through pipelines in the | ||
* {@link IngestService}. | ||
*/ | ||
public final class PipelineConfiguration implements SimpleDiffable<PipelineConfiguration>, ToXContentObject { | ||
|
||
private static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("pipeline_config", true, Builder::new); | ||
static { | ||
PARSER.declareString(Builder::setId, new ParseField("id")); | ||
PARSER.declareField((parser, builder, aVoid) -> { | ||
XContentBuilder contentBuilder = XContentBuilder.builder(parser.contentType().xContent()); | ||
contentBuilder.generator().copyCurrentStructure(parser); | ||
builder.setConfig(BytesReference.bytes(contentBuilder), contentBuilder.contentType()); | ||
}, new ParseField("config"), ObjectParser.ValueType.OBJECT); | ||
|
||
PARSER.declareField( | ||
(parser, builder, aVoid) -> builder.setConfig(parser.map()), | ||
new ParseField("config"), | ||
ObjectParser.ValueType.OBJECT | ||
); | ||
} | ||
|
||
public static ContextParser<Void, PipelineConfiguration> getParser() { | ||
|
@@ -51,56 +59,95 @@ public static ContextParser<Void, PipelineConfiguration> getParser() { | |
private static class Builder { | ||
|
||
private String id; | ||
private BytesReference config; | ||
private XContentType xContentType; | ||
private Map<String, Object> config; | ||
|
||
void setId(String id) { | ||
this.id = id; | ||
} | ||
|
||
void setConfig(BytesReference config, XContentType xContentType) { | ||
void setConfig(Map<String, Object> config) { | ||
this.config = config; | ||
this.xContentType = xContentType; | ||
} | ||
|
||
PipelineConfiguration build() { | ||
return new PipelineConfiguration(id, config, xContentType); | ||
return new PipelineConfiguration(id, config); | ||
} | ||
} | ||
|
||
private final String id; | ||
// Store config as bytes reference, because the config is only used when the pipeline store reads the cluster state | ||
// and the way the map of maps config is read requires a deep copy (it removes instead of gets entries to check for unused options) | ||
// also the get pipeline api just directly returns this to the caller | ||
private final BytesReference config; | ||
private final XContentType xContentType; | ||
private final Map<String, Object> config; | ||
|
||
public PipelineConfiguration(String id, BytesReference config, XContentType xContentType) { | ||
public PipelineConfiguration(String id, Map<String, Object> config) { | ||
this.id = Objects.requireNonNull(id); | ||
this.config = Objects.requireNonNull(config); | ||
this.xContentType = Objects.requireNonNull(xContentType); | ||
this.config = deepCopy(config, true); // defensive deep copy | ||
} | ||
|
||
/** | ||
* A convenience constructor that parses some bytes as a map representing a pipeline's config and then delegates to the | ||
* conventional {@link #PipelineConfiguration(String, Map)} constructor. | ||
* | ||
* @param id the id of the pipeline | ||
* @param config a parse-able bytes reference that will return a pipeline configuration | ||
* @param xContentType the content-type to use while parsing the pipeline configuration | ||
*/ | ||
public PipelineConfiguration(String id, BytesReference config, XContentType xContentType) { | ||
this(id, XContentHelper.convertToMap(config, true, xContentType).v2()); | ||
} | ||
|
||
public String getId() { | ||
return id; | ||
} | ||
|
||
public Map<String, Object> getConfigAsMap() { | ||
return XContentHelper.convertToMap(config, true, xContentType).v2(); | ||
/** | ||
* @return a reference to the unmodifiable configuration map for this pipeline | ||
*/ | ||
public Map<String, Object> getConfig() { | ||
return getConfig(true); | ||
} | ||
|
||
// pkg-private for tests | ||
XContentType getXContentType() { | ||
return xContentType; | ||
/** | ||
* @param unmodifiable whether the returned map should be unmodifiable or not | ||
* @return a reference to the unmodifiable config map (if unmodifiable is true) or | ||
* a reference to a freshly-created mutable deep copy of the config map (if unmodifiable is false) | ||
*/ | ||
public Map<String, Object> getConfig(boolean unmodifiable) { | ||
if (unmodifiable) { | ||
return config; // already unmodifiable | ||
} else { | ||
return deepCopy(config, false); | ||
} | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
private static <T> T deepCopy(final T value, final boolean unmodifiable) { | ||
return (T) innerDeepCopy(value, unmodifiable); | ||
} | ||
|
||
// pkg-private for tests | ||
BytesReference getConfig() { | ||
return config; | ||
private static Object innerDeepCopy(final Object value, final boolean unmodifiable) { | ||
if (value instanceof Map<?, ?> mapValue) { | ||
final Map<Object, Object> copy = Maps.newLinkedHashMapWithExpectedSize(mapValue.size()); // n.b. maintain ordering | ||
for (Map.Entry<?, ?> entry : mapValue.entrySet()) { | ||
copy.put(innerDeepCopy(entry.getKey(), unmodifiable), innerDeepCopy(entry.getValue(), unmodifiable)); | ||
} | ||
return unmodifiable ? Collections.unmodifiableMap(copy) : copy; | ||
} else if (value instanceof List<?> listValue) { | ||
final List<Object> copy = new ArrayList<>(listValue.size()); | ||
for (Object itemValue : listValue) { | ||
copy.add(innerDeepCopy(itemValue, unmodifiable)); | ||
} | ||
return unmodifiable ? Collections.unmodifiableList(copy) : copy; | ||
} else if (value == null || value instanceof String || value instanceof Number || value instanceof Boolean) { | ||
return value; | ||
} else { | ||
// if the previous list of expected value types ends up not being exhaustive, then we want to learn about that | ||
// at development time, but it's probably better to err on the side of passing through the value at runtime | ||
assert false : "unexpected value type [" + value.getClass() + "]"; | ||
return value; | ||
} | ||
} | ||
|
||
public Integer getVersion() { | ||
Object o = getConfigAsMap().get("version"); | ||
Object o = config.get("version"); | ||
if (o == null) { | ||
return null; | ||
} else if (o instanceof Number number) { | ||
|
@@ -114,13 +161,22 @@ public Integer getVersion() { | |
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { | ||
builder.startObject(); | ||
builder.field("id", id); | ||
builder.field("config", getConfigAsMap()); | ||
builder.field("config", config); | ||
builder.endObject(); | ||
return builder; | ||
} | ||
|
||
public static PipelineConfiguration readFrom(StreamInput in) throws IOException { | ||
return new PipelineConfiguration(in.readString(), in.readBytesReference(), in.readEnum(XContentType.class)); | ||
final String id = in.readString(); | ||
final Map<String, Object> config; | ||
if (in.getTransportVersion().onOrAfter(TransportVersions.INGEST_PIPELINE_CONFIGURATION_AS_MAP)) { | ||
config = in.readGenericMap(); | ||
} else { | ||
final BytesReference bytes = in.readBytesReference(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: Not too important since it's BwC and these things aren't gigantic but in the spirit of doing this consistently, you could use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice, okay, 8467326. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@original-brownbear can you explain more about this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That delegation is only the default behavior. For the real world network buffers we have here we have overriding implementations that just slice the underlying buffer without copying. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gotcha, thanks for the explanation! |
||
final XContentType type = in.readEnum(XContentType.class); | ||
config = XContentHelper.convertToMap(bytes, true, type).v2(); | ||
} | ||
return new PipelineConfiguration(id, config); | ||
} | ||
|
||
public static Diff<PipelineConfiguration> readDiffFrom(StreamInput in) throws IOException { | ||
|
@@ -135,8 +191,14 @@ public String toString() { | |
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
out.writeString(id); | ||
out.writeBytesReference(config); | ||
XContentHelper.writeTo(out, xContentType); | ||
if (out.getTransportVersion().onOrAfter(TransportVersions.INGEST_PIPELINE_CONFIGURATION_AS_MAP)) { | ||
out.writeGenericMap(config); | ||
} else { | ||
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).prettyPrint(); | ||
builder.map(config); | ||
out.writeBytesReference(BytesReference.bytes(builder)); | ||
XContentHelper.writeTo(out, XContentType.JSON); | ||
} | ||
} | ||
|
||
@Override | ||
|
@@ -147,14 +209,14 @@ public boolean equals(Object o) { | |
PipelineConfiguration that = (PipelineConfiguration) o; | ||
|
||
if (id.equals(that.id) == false) return false; | ||
return getConfigAsMap().equals(that.getConfigAsMap()); | ||
return config.equals(that.config); | ||
|
||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
int result = id.hashCode(); | ||
result = 31 * result + getConfigAsMap().hashCode(); | ||
result = 31 * result + config.hashCode(); | ||
return result; | ||
} | ||
|
||
|
@@ -164,7 +226,7 @@ public int hashCode() { | |
* <p>The given upgrader is applied to the config map for any processor of the given type. | ||
*/ | ||
PipelineConfiguration maybeUpgradeProcessors(String type, IngestMetadata.ProcessorConfigUpgrader upgrader) { | ||
Map<String, Object> mutableConfigMap = getConfigAsMap(); | ||
Map<String, Object> mutableConfigMap = getConfig(false); | ||
boolean changed = false; | ||
// This should be a List of Maps, where the keys are processor types and the values are config maps. | ||
// But we'll skip upgrading rather than fail if not. | ||
|
@@ -180,11 +242,7 @@ PipelineConfiguration maybeUpgradeProcessors(String type, IngestMetadata.Process | |
} | ||
} | ||
if (changed) { | ||
try (XContentBuilder builder = XContentBuilder.builder(xContentType.xContent())) { | ||
return new PipelineConfiguration(id, BytesReference.bytes(builder.map(mutableConfigMap)), xContentType); | ||
} catch (IOException e) { | ||
throw new UncheckedIOException(e); | ||
} | ||
return new PipelineConfiguration(id, mutableConfigMap); | ||
} else { | ||
return this; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could just put this condition in the assertion since it only has an effect with assertions on anyway, otherwise it's the same as the else branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was on the fence about that one, I'm happy to take your comment as a tiebreaker in the opposite direction, 659ad01.