Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove client feature tracking #44929

Merged
merged 5 commits into from
Jul 28, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 3 additions & 37 deletions server/src/main/java/org/elasticsearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -95,40 +94,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>

public static final ClusterState EMPTY_STATE = builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();

/**
* An interface that implementors use when a class requires a client to maybe have a feature.
*/
public interface FeatureAware {

/**
* An optional feature that is required for the client to have.
*
* @return an empty optional if no feature is required otherwise a string representing the required feature
*/
default Optional<String> getRequiredFeature() {
return Optional.empty();
}

/**
* Tests whether or not the custom should be serialized. The criteria are:
* <ul>
* <li>the output stream must be at least the minimum supported version of the custom</li>
* </ul>
* <p>
* That is, we only serialize customs to clients than can understand the custom based on the version of the client.
*
* @param out the output stream
* @param custom the custom to serialize
* @param <T> the type of the custom
* @return true if the custom should be serialized and false otherwise
*/
static <T extends VersionedNamedWriteable & FeatureAware> boolean shouldSerialize(final StreamOutput out, final T custom) {
return out.getVersion().onOrAfter(custom.getMinimalSupportedVersion());
}

}

public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, FeatureAware {
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {

/**
* Returns <code>true</code> iff this {@link Custom} is private to the cluster and should never be send to a client.
Expand Down Expand Up @@ -777,13 +743,13 @@ public void writeTo(StreamOutput out) throws IOException {
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerialize(out, cursor.value)) {
if (VersionedNamedWriteable.shouldSerialize(out, cursor.value)) {
numberOfCustoms++;
}
}
out.writeVInt(numberOfCustoms);
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerialize(out, cursor.value)) {
if (VersionedNamedWriteable.shouldSerialize(out, cursor.value)) {
out.writeNamedWriteable(cursor.value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.Version;
import org.elasticsearch.action.AliasesRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterState.FeatureAware;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.Diffable;
import org.elasticsearch.cluster.DiffableUtils;
Expand All @@ -44,6 +42,7 @@
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand Down Expand Up @@ -125,7 +124,7 @@ public enum XContentContext {
*/
public static EnumSet<XContentContext> ALL_CONTEXTS = EnumSet.allOf(XContentContext.class);

public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, ClusterState.FeatureAware {
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {

EnumSet<XContentContext> context();
}
Expand Down Expand Up @@ -916,13 +915,13 @@ public void writeTo(StreamOutput out) throws IOException {
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerialize(out, cursor.value)) {
if (VersionedNamedWriteable.shouldSerialize(out, cursor.value)) {
numberOfCustoms++;
}
}
out.writeVInt(numberOfCustoms);
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerialize(out, cursor.value)) {
if (VersionedNamedWriteable.shouldSerialize(out, cursor.value)) {
out.writeNamedWriteable(cursor.value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,19 @@ public interface VersionedNamedWriteable extends NamedWriteable {
* The minimal version of the recipient this object can be sent to
*/
Version getMinimalSupportedVersion();

/**
* Tests whether or not the custom should be serialized. The criteria is the output stream must be at least the minimum supported
* version of the custom. That is, we only serialize customs to clients than can understand the custom based on the version of the
* client.
*
* @param out the output stream
* @param custom the custom to serialize
* @param <T> the type of the custom
* @return true if the custom should be serialized and false otherwise
*/
static <T extends VersionedNamedWriteable> boolean shouldSerialize(final StreamOutput out, final T custom) {
return out.getVersion().onOrAfter(custom.getMinimalSupportedVersion());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@

package org.elasticsearch.persistent;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
import org.elasticsearch.common.xcontent.ToXContentObject;

/**
* Parameters used to start persistent task
*/
public interface PersistentTaskParams extends VersionedNamedWriteable, ToXContentObject, ClusterState.FeatureAware {
public interface PersistentTaskParams extends VersionedNamedWriteable, ToXContentObject {

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
Expand Down Expand Up @@ -532,7 +533,7 @@ public PersistentTasksCustomMetaData(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(lastAllocationId);
Map<String, PersistentTask<?>> filteredTasks = tasks.values().stream()
.filter(t -> ClusterState.FeatureAware.shouldSerialize(out, t.getParams()))
.filter(t -> VersionedNamedWriteable.shouldSerialize(out, t.getParams()))
.collect(Collectors.toMap(PersistentTask::getId, Function.identity()));
out.writeMap(filteredTasks, StreamOutput::writeString, (stream, value) -> value.writeTo(stream));
}
Expand Down
13 changes: 0 additions & 13 deletions server/src/main/java/org/elasticsearch/plugins/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.elasticsearch.bootstrap.BootstrapCheck;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
Expand Down Expand Up @@ -49,7 +48,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.UnaryOperator;

Expand All @@ -72,17 +70,6 @@
*/
public abstract class Plugin implements Closeable {

/**
* A feature exposed by the plugin. This should be used if a plugin exposes {@link ClusterState.Custom} or {@link MetaData.Custom}; see
* also {@link ClusterState.FeatureAware}.
*
* @return a feature set represented by this plugin, or the empty optional if the plugin does not expose cluster state or metadata
* customs
*/
protected Optional<String> getFeature() {
return Optional.empty();
}

/**
* Returns components added by this plugin.
*
Expand Down
21 changes: 0 additions & 21 deletions server/src/main/java/org/elasticsearch/plugins/PluginsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.transport.TransportSettings;

import java.io.IOException;
import java.lang.reflect.Constructor;
Expand All @@ -59,9 +58,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -202,7 +199,6 @@ private static void logPluginInfo(final List<PluginInfo> pluginInfos, final Stri

public Settings updatedSettings() {
Map<String, String> foundSettings = new HashMap<>();
final Map<String, String> features = new TreeMap<>();
final Settings.Builder builder = Settings.builder();
for (Tuple<PluginInfo, Plugin> plugin : plugins) {
Settings settings = plugin.v2().additionalSettings();
Expand All @@ -214,23 +210,6 @@ public Settings updatedSettings() {
}
}
builder.put(settings);
final Optional<String> maybeFeature = plugin.v2().getFeature();
if (maybeFeature.isPresent()) {
final String feature = maybeFeature.get();
if (features.containsKey(feature)) {
final String message = String.format(
Locale.ROOT,
"duplicate feature [%s] in plugin [%s], already added in [%s]",
feature,
plugin.v1().getName(),
features.get(feature));
throw new IllegalArgumentException(message);
}
features.put(feature, plugin.v1().getName());
}
}
for (final String feature : features.keySet()) {
builder.put(TransportSettings.FEATURE_PREFIX + "." + feature, true);
}
return builder.put(this.settings).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Map;
import java.util.Set;

public class InboundHandler {

Expand Down Expand Up @@ -153,7 +152,6 @@ private void messageReceived(BytesReference reference, TcpChannel channel) throw
}

private void handleRequest(TcpChannel channel, InboundMessage.Request message, int messageLengthBytes) {
final Set<String> features = message.getFeatures();
final String action = message.getActionName();
final long requestId = message.getRequestId();
final StreamInput stream = message.getStreamInput();
Expand All @@ -162,7 +160,7 @@ private void handleRequest(TcpChannel channel, InboundMessage.Request message, i
TransportChannel transportChannel = null;
try {
if (message.isHandshake()) {
handshaker.handleHandshake(version, features, channel, requestId, stream);
handshaker.handleHandshake(version, channel, requestId, stream);
} else {
final RequestHandlerRegistry reg = getRequestHandler(action);
if (reg == null) {
Expand All @@ -174,7 +172,7 @@ private void handleRequest(TcpChannel channel, InboundMessage.Request message, i
} else {
breaker.addWithoutBreaking(messageLengthBytes);
}
transportChannel = new TcpTransportChannel(outboundHandler, channel, action, requestId, version, features,
transportChannel = new TcpTransportChannel(outboundHandler, channel, action, requestId, version,
circuitBreakerService, messageLengthBytes, message.isCompress());
final TransportRequest request = reg.newRequest(stream);
request.remoteAddress(new TransportAddress(channel.getRemoteAddress()));
Expand All @@ -190,7 +188,7 @@ private void handleRequest(TcpChannel channel, InboundMessage.Request message, i
} catch (Exception e) {
// the circuit breaker tripped
if (transportChannel == null) {
transportChannel = new TcpTransportChannel(outboundHandler, channel, action, requestId, version, features,
transportChannel = new TcpTransportChannel(outboundHandler, channel, action, requestId, version,
circuitBreakerService, 0, message.isCompress());
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.TreeSet;

public abstract class InboundMessage extends NetworkMessage implements Closeable {

Expand Down Expand Up @@ -96,15 +92,12 @@ InboundMessage deserialize(BytesReference reference) throws IOException {

InboundMessage message;
if (TransportStatus.isRequest(status)) {
final String[] featuresFound = streamInput.readStringArray();
final Set<String> features;
if (featuresFound.length == 0) {
features = Collections.emptySet();
} else {
features = Collections.unmodifiableSet(new TreeSet<>(Arrays.asList(featuresFound)));
if (streamInput.getVersion().before(Version.V_8_0_0)) {
// discard features
streamInput.readStringArray();
}
final String action = streamInput.readString();
message = new Request(threadContext, remoteVersion, status, requestId, action, features, streamInput);
message = new Request(threadContext, remoteVersion, status, requestId, action, streamInput);
} else {
message = new Response(threadContext, remoteVersion, status, requestId, streamInput);
}
Expand Down Expand Up @@ -146,22 +139,17 @@ private static void ensureVersionCompatibility(Version version, Version currentV
public static class Request extends InboundMessage {

private final String actionName;
private final Set<String> features;

Request(ThreadContext threadContext, Version version, byte status, long requestId, String actionName, Set<String> features,
Request(ThreadContext threadContext, Version version, byte status, long requestId, String actionName,
StreamInput streamInput) {
super(threadContext, version, status, requestId, streamInput);
this.actionName = actionName;
this.features = features;
}

String getActionName() {
return actionName;
}

Set<String> getFeatures() {
return features;
}
}

public static class Response extends InboundMessage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,15 @@ final class OutboundHandler {

private final String nodeName;
private final Version version;
private final String[] features;
private final ThreadPool threadPool;
private final BigArrays bigArrays;
private final TransportLogger transportLogger;
private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;

OutboundHandler(String nodeName, Version version, String[] features, ThreadPool threadPool, BigArrays bigArrays,
OutboundHandler(String nodeName, Version version, ThreadPool threadPool, BigArrays bigArrays,
TransportLogger transportLogger) {
this.nodeName = nodeName;
this.version = version;
this.features = features;
this.threadPool = threadPool;
this.bigArrays = bigArrays;
this.transportLogger = transportLogger;
Expand All @@ -83,8 +81,8 @@ void sendRequest(final DiscoveryNode node, final TcpChannel channel, final long
final TransportRequest request, final TransportRequestOptions options, final Version channelVersion,
final boolean compressRequest, final boolean isHandshake) throws IOException, TransportException {
Version version = Version.min(this.version, channelVersion);
OutboundMessage.Request message = new OutboundMessage.Request(threadPool.getThreadContext(), features, request, version, action,
requestId, isHandshake, compressRequest);
OutboundMessage.Request message =
new OutboundMessage.Request(threadPool.getThreadContext(), request, version, action, requestId, isHandshake, compressRequest);
ActionListener<Void> listener = ActionListener.wrap(() ->
messageListener.onRequestSent(node, requestId, action, request, options));
sendMessage(channel, message, listener);
Expand Down
Loading