diff --git a/CHANGELOG.md b/CHANGELOG.md index 62176778824f3..04f3fbeb4b068 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151)) - Add feature flag for extensions ([#5211](https://github.com/opensearch-project/OpenSearch/pull/5211)) - Added jackson dependency to server ([#5366] (https://github.com/opensearch-project/OpenSearch/pull/5366)) +- Added experimental extensions to main ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)) ### Dependencies - Bumps `log4j-core` from 2.18.0 to 2.19.0 diff --git a/plugins/repository-azure/licenses/jackson-LICENSE b/plugins/repository-azure/licenses/jackson-LICENSE.txt similarity index 100% rename from plugins/repository-azure/licenses/jackson-LICENSE rename to plugins/repository-azure/licenses/jackson-LICENSE.txt diff --git a/plugins/repository-azure/licenses/jackson-NOTICE b/plugins/repository-azure/licenses/jackson-NOTICE.txt similarity index 100% rename from plugins/repository-azure/licenses/jackson-NOTICE rename to plugins/repository-azure/licenses/jackson-NOTICE.txt diff --git a/plugins/repository-hdfs/build.gradle b/plugins/repository-hdfs/build.gradle index e5d65c9451c1f..2ff0b4e3765b0 100644 --- a/plugins/repository-hdfs/build.gradle +++ b/plugins/repository-hdfs/build.gradle @@ -66,7 +66,6 @@ dependencies { api 'org.apache.htrace:htrace-core4:4.2.0-incubating' api "org.apache.logging.log4j:log4j-core:${versions.log4j}" api 'org.apache.avro:avro:1.11.1' - api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}" api 'com.google.code.gson:gson:2.10' runtimeOnly 'com.google.guava:guava:31.1-jre' api 'com.google.protobuf:protobuf-java:3.21.9' diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/ClusterStateResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/ClusterStateResponse.java index d2d7d843e19db..f65d15c5c64aa 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/ClusterStateResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/ClusterStateResponse.java @@ -96,6 +96,11 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(waitForTimedOut); } + @Override + public String toString() { + return "ClusterStateResponse{" + "clusterState=" + clusterState + '}'; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/server/src/main/java/org/opensearch/bootstrap/Security.java b/server/src/main/java/org/opensearch/bootstrap/Security.java index 749c146de4f16..0eab6f9cbcbf1 100644 --- a/server/src/main/java/org/opensearch/bootstrap/Security.java +++ b/server/src/main/java/org/opensearch/bootstrap/Security.java @@ -36,6 +36,7 @@ import org.opensearch.common.SuppressForbidden; import org.opensearch.common.io.PathUtils; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.env.Environment; import org.opensearch.http.HttpTransportSettings; import org.opensearch.plugins.PluginInfo; @@ -316,6 +317,9 @@ static void addFilePermissions(Permissions policy, Environment environment) thro addDirectoryPath(policy, Environment.PATH_HOME_SETTING.getKey(), environment.libDir(), "read,readlink", false); addDirectoryPath(policy, Environment.PATH_HOME_SETTING.getKey(), environment.modulesDir(), "read,readlink", false); addDirectoryPath(policy, Environment.PATH_HOME_SETTING.getKey(), environment.pluginsDir(), "read,readlink", false); + if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { + addDirectoryPath(policy, Environment.PATH_HOME_SETTING.getKey(), environment.extensionDir(), "read,readlink", false); + } addDirectoryPath(policy, "path.conf'", environment.configDir(), "read,readlink", false); // read-write dirs addDirectoryPath(policy, "java.io.tmpdir", environment.tmpDir(), "read,readlink,write,delete", false); diff --git a/server/src/main/java/org/opensearch/cluster/ClusterSettingsResponse.java b/server/src/main/java/org/opensearch/cluster/ClusterSettingsResponse.java new file mode 100644 index 0000000000000..e84c2c902abdd --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/ClusterSettingsResponse.java @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.settings.Settings; +import org.opensearch.transport.TransportResponse; + +import java.io.IOException; +import java.util.Objects; + +/** + * PluginSettings Response for Extensibility + * + * @opensearch.internal + */ +public class ClusterSettingsResponse extends TransportResponse { + private final Settings clusterSettings; + + public ClusterSettingsResponse(ClusterService clusterService) { + this.clusterSettings = clusterService.getSettings(); + } + + public ClusterSettingsResponse(StreamInput in) throws IOException { + super(in); + this.clusterSettings = Settings.readSettingsFromStream(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + Settings.writeSettingsToStream(clusterSettings, out); + } + + @Override + public String toString() { + return "ClusterSettingsResponse{" + "clusterSettings=" + clusterSettings + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ClusterSettingsResponse that = (ClusterSettingsResponse) o; + return Objects.equals(clusterSettings, that.clusterSettings); + } + + @Override + public int hashCode() { + return Objects.hash(clusterSettings); + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/LocalNodeResponse.java b/server/src/main/java/org/opensearch/cluster/LocalNodeResponse.java new file mode 100644 index 0000000000000..ef1ef4a49ad62 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/LocalNodeResponse.java @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster; + +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.transport.TransportResponse; + +import java.io.IOException; +import java.util.Objects; + +/** + * LocalNode Response for Extensibility + * + * @opensearch.internal + */ +public class LocalNodeResponse extends TransportResponse { + private final DiscoveryNode localNode; + + public LocalNodeResponse(ClusterService clusterService) { + this.localNode = clusterService.localNode(); + } + + public LocalNodeResponse(StreamInput in) throws IOException { + super(in); + this.localNode = new DiscoveryNode(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + this.localNode.writeTo(out); + } + + @Override + public String toString() { + return "LocalNodeResponse{" + "localNode=" + localNode + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + LocalNodeResponse that = (LocalNodeResponse) o; + return Objects.equals(localNode, that.localNode); + } + + @Override + public int hashCode() { + return Objects.hash(localNode); + } + +} diff --git a/server/src/main/java/org/opensearch/discovery/PluginRequest.java b/server/src/main/java/org/opensearch/discovery/PluginRequest.java new file mode 100644 index 0000000000000..7992de4342d86 --- /dev/null +++ b/server/src/main/java/org/opensearch/discovery/PluginRequest.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.discovery; + +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.extensions.DiscoveryExtensionNode; +import org.opensearch.transport.TransportRequest; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/** + * PluginRequest to intialize plugin + * + * @opensearch.internal + */ +public class PluginRequest extends TransportRequest { + private final DiscoveryNode sourceNode; + /* + * TODO change DiscoveryNode to Extension information + */ + private final List extensions; + + public PluginRequest(DiscoveryNode sourceNode, List extensions) { + this.sourceNode = sourceNode; + this.extensions = extensions; + } + + public PluginRequest(StreamInput in) throws IOException { + super(in); + sourceNode = new DiscoveryNode(in); + extensions = in.readList(DiscoveryExtensionNode::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + sourceNode.writeTo(out); + out.writeList(extensions); + } + + public List getExtensions() { + return extensions; + } + + public DiscoveryNode getSourceNode() { + return sourceNode; + } + + @Override + public String toString() { + return "PluginRequest{" + "sourceNode=" + sourceNode + ", extensions=" + extensions + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PluginRequest that = (PluginRequest) o; + return Objects.equals(sourceNode, that.sourceNode) && Objects.equals(extensions, that.extensions); + } + + @Override + public int hashCode() { + return Objects.hash(sourceNode, extensions); + } +} diff --git a/server/src/main/java/org/opensearch/discovery/PluginResponse.java b/server/src/main/java/org/opensearch/discovery/PluginResponse.java new file mode 100644 index 0000000000000..f8f20214e5846 --- /dev/null +++ b/server/src/main/java/org/opensearch/discovery/PluginResponse.java @@ -0,0 +1,88 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.discovery; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.transport.TransportResponse; + +import java.io.IOException; +import java.util.Objects; + +/** + * PluginResponse to intialize plugin + * + * @opensearch.internal + */ +public class PluginResponse extends TransportResponse { + private String name; + + public PluginResponse(String name) { + this.name = name; + } + + public PluginResponse(StreamInput in) throws IOException { + name = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + } + + /** + * @return the node that is currently leading, according to the responding node. + */ + + public String getName() { + return this.name; + } + + @Override + public String toString() { + return "PluginResponse{" + "name" + name + "}"; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PluginResponse that = (PluginResponse) o; + return Objects.equals(name, that.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } +} diff --git a/server/src/main/java/org/opensearch/env/Environment.java b/server/src/main/java/org/opensearch/env/Environment.java index c9e75bcbb616f..938bca58c7081 100644 --- a/server/src/main/java/org/opensearch/env/Environment.java +++ b/server/src/main/java/org/opensearch/env/Environment.java @@ -93,6 +93,8 @@ public class Environment { private final Path pluginsDir; + private final Path extensionsDir; + private final Path modulesDir; private final Path sharedDataDir; @@ -137,6 +139,7 @@ public Environment(final Settings settings, final Path configPath, final boolean tmpDir = Objects.requireNonNull(tmpPath); pluginsDir = homeFile.resolve("plugins"); + extensionsDir = homeFile.resolve("extensions"); List dataPaths = PATH_DATA_SETTING.get(settings); if (nodeLocalStorage) { @@ -308,6 +311,10 @@ public Path pluginsDir() { return pluginsDir; } + public Path extensionDir() { + return extensionsDir; + } + public Path binDir() { return binDir; } diff --git a/server/src/main/java/org/opensearch/extensions/DiscoveryExtensionNode.java b/server/src/main/java/org/opensearch/extensions/DiscoveryExtensionNode.java new file mode 100644 index 0000000000000..e4fa0d74f78f0 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/DiscoveryExtensionNode.java @@ -0,0 +1,70 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions; + +import org.opensearch.Version; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.transport.TransportAddress; +import org.opensearch.common.xcontent.ToXContentFragment; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.plugins.PluginInfo; + +import java.io.IOException; +import java.util.Map; + +/** + * Discover extensions running independently or in a separate process + * + * @opensearch.internal + */ +public class DiscoveryExtensionNode extends DiscoveryNode implements Writeable, ToXContentFragment { + + private final PluginInfo pluginInfo; + + public DiscoveryExtensionNode( + String name, + String id, + String ephemeralId, + String hostName, + String hostAddress, + TransportAddress address, + Map attributes, + Version version, + PluginInfo pluginInfo + ) { + super(name, id, ephemeralId, hostName, hostAddress, address, attributes, DiscoveryNodeRole.BUILT_IN_ROLES, version); + this.pluginInfo = pluginInfo; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + pluginInfo.writeTo(out); + } + + /** + * Construct DiscoveryExtensionNode from a stream. + * + * @param in the stream + * @throws IOException if an I/O exception occurred reading the plugin info from the stream + */ + public DiscoveryExtensionNode(final StreamInput in) throws IOException { + super(in); + this.pluginInfo = new PluginInfo(in); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return null; + } +} diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionRequest.java b/server/src/main/java/org/opensearch/extensions/ExtensionRequest.java new file mode 100644 index 0000000000000..924fce49a5dc2 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/ExtensionRequest.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.transport.TransportRequest; + +import java.io.IOException; +import java.util.Objects; + +/** + * CLusterService Request for Extensibility + * + * @opensearch.internal + */ +public class ExtensionRequest extends TransportRequest { + private static final Logger logger = LogManager.getLogger(ExtensionRequest.class); + private ExtensionsManager.RequestType requestType; + + public ExtensionRequest(ExtensionsManager.RequestType requestType) { + this.requestType = requestType; + } + + public ExtensionRequest(StreamInput in) throws IOException { + super(in); + this.requestType = in.readEnum(ExtensionsManager.RequestType.class); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeEnum(requestType); + } + + public ExtensionsManager.RequestType getRequestType() { + return this.requestType; + } + + public String toString() { + return "ExtensionRequest{" + "requestType=" + requestType + '}'; + } + + @Override + public boolean equals(Object o) { + + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ExtensionRequest that = (ExtensionRequest) o; + return Objects.equals(requestType, that.requestType); + } + + @Override + public int hashCode() { + return Objects.hash(requestType); + } + +} diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java new file mode 100644 index 0000000000000..b809f2e35a483 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java @@ -0,0 +1,440 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions; + +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.Version; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.cluster.ClusterSettingsResponse; +import org.opensearch.cluster.LocalNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.io.FileSystemUtils; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.transport.TransportAddress; + +import org.opensearch.discovery.PluginRequest; +import org.opensearch.discovery.PluginResponse; +import org.opensearch.extensions.ExtensionsSettings.Extension; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexService; +import org.opensearch.index.AcknowledgedResponse; +import org.opensearch.index.IndicesModuleRequest; +import org.opensearch.index.IndicesModuleResponse; +import org.opensearch.index.shard.IndexEventListener; +import org.opensearch.indices.cluster.IndicesClusterStateService; +import org.opensearch.plugins.PluginInfo; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponse; +import org.opensearch.transport.TransportResponseHandler; +import org.opensearch.transport.TransportService; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; + +/** + * The main class for Plugin Extensibility + * + * @opensearch.internal + */ +public class ExtensionsManager { + public static final String REQUEST_EXTENSION_ACTION_NAME = "internal:discovery/extensions"; + public static final String INDICES_EXTENSION_POINT_ACTION_NAME = "indices:internal/extensions"; + public static final String INDICES_EXTENSION_NAME_ACTION_NAME = "indices:internal/name"; + public static final String REQUEST_EXTENSION_CLUSTER_STATE = "internal:discovery/clusterstate"; + public static final String REQUEST_EXTENSION_LOCAL_NODE = "internal:discovery/localnode"; + public static final String REQUEST_EXTENSION_CLUSTER_SETTINGS = "internal:discovery/clustersettings"; + + private static final Logger logger = LogManager.getLogger(ExtensionsManager.class); + + /** + * Enum for Extension Requests + * + * @opensearch.internal + */ + public static enum RequestType { + REQUEST_EXTENSION_CLUSTER_STATE, + REQUEST_EXTENSION_LOCAL_NODE, + REQUEST_EXTENSION_CLUSTER_SETTINGS, + CREATE_COMPONENT, + ON_INDEX_MODULE, + GET_SETTINGS + }; + + private final Path extensionsPath; + private final List uninitializedExtensions; + private List extensions; + private TransportService transportService; + private ClusterService clusterService; + + public ExtensionsManager() { + this.extensionsPath = Path.of(""); + this.uninitializedExtensions = new ArrayList(); + } + + public ExtensionsManager(Settings settings, Path extensionsPath) throws IOException { + logger.info("ExtensionsManager initialized"); + this.extensionsPath = extensionsPath; + this.transportService = null; + this.uninitializedExtensions = new ArrayList(); + this.extensions = new ArrayList(); + this.clusterService = null; + + /* + * Now Discover extensions + */ + discover(); + + } + + public void setTransportService(TransportService transportService) { + this.transportService = transportService; + registerRequestHandler(); + } + + public void setClusterService(ClusterService clusterService) { + this.clusterService = clusterService; + } + + private void registerRequestHandler() { + transportService.registerRequestHandler( + REQUEST_EXTENSION_CLUSTER_STATE, + ThreadPool.Names.GENERIC, + false, + false, + ExtensionRequest::new, + ((request, channel, task) -> channel.sendResponse(handleExtensionRequest(request))) + ); + transportService.registerRequestHandler( + REQUEST_EXTENSION_LOCAL_NODE, + ThreadPool.Names.GENERIC, + false, + false, + ExtensionRequest::new, + ((request, channel, task) -> channel.sendResponse(handleExtensionRequest(request))) + ); + transportService.registerRequestHandler( + REQUEST_EXTENSION_CLUSTER_SETTINGS, + ThreadPool.Names.GENERIC, + false, + false, + ExtensionRequest::new, + ((request, channel, task) -> channel.sendResponse(handleExtensionRequest(request))) + ); + } + + /* + * Load and populate all extensions + */ + private void discover() throws IOException { + logger.info("Extensions Config Directory :" + extensionsPath.toString()); + if (!FileSystemUtils.isAccessibleDirectory(extensionsPath, logger)) { + return; + } + + List extensions = new ArrayList(); + if (Files.exists(extensionsPath.resolve("extensions.yml"))) { + try { + extensions = readFromExtensionsYml(extensionsPath.resolve("extensions.yml")).getExtensions(); + } catch (IOException e) { + throw new IOException("Could not read from extensions.yml", e); + } + for (Extension extension : extensions) { + loadExtension(extension); + } + if (!uninitializedExtensions.isEmpty()) { + logger.info("Loaded all extensions"); + } + } else { + logger.info("Extensions.yml file is not present. No extensions will be loaded."); + } + } + + /** + * Loads a single extension + * @param extension The extension to be loaded + */ + private void loadExtension(Extension extension) throws IOException { + try { + uninitializedExtensions.add( + new DiscoveryExtensionNode( + extension.getName(), + extension.getUniqueId(), + // placeholder for ephemeral id, will change with POC discovery + extension.getUniqueId(), + extension.getHostName(), + extension.getHostAddress(), + new TransportAddress(InetAddress.getByName(extension.getHostAddress()), Integer.parseInt(extension.getPort())), + new HashMap(), + Version.fromString(extension.getOpensearchVersion()), + new PluginInfo( + extension.getName(), + extension.getDescription(), + extension.getVersion(), + Version.fromString(extension.getOpensearchVersion()), + extension.getJavaVersion(), + extension.getClassName(), + new ArrayList(), + Boolean.parseBoolean(extension.hasNativeController()) + ) + ) + ); + logger.info("Loaded extension: " + extension); + } catch (IllegalArgumentException e) { + throw e; + } + } + + public void initialize() { + for (DiscoveryNode extensionNode : uninitializedExtensions) { + initializeExtension(extensionNode); + } + } + + private void initializeExtension(DiscoveryNode extensionNode) { + + final TransportResponseHandler pluginResponseHandler = new TransportResponseHandler() { + + @Override + public PluginResponse read(StreamInput in) throws IOException { + return new PluginResponse(in); + } + + @Override + public void handleResponse(PluginResponse response) { + for (DiscoveryExtensionNode extension : uninitializedExtensions) { + if (extension.getName().equals(response.getName())) { + extensions.add(extension); + break; + } + } + } + + @Override + public void handleException(TransportException exp) { + logger.error(new ParameterizedMessage("Plugin request failed"), exp); + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + }; + try { + transportService.connectToExtensionNode(extensionNode); + transportService.sendRequest( + extensionNode, + REQUEST_EXTENSION_ACTION_NAME, + new PluginRequest(transportService.getLocalNode(), new ArrayList(uninitializedExtensions)), + pluginResponseHandler + ); + } catch (Exception e) { + throw e; + } + } + + TransportResponse handleExtensionRequest(ExtensionRequest extensionRequest) throws Exception { + // Read enum + if (extensionRequest.getRequestType() == RequestType.REQUEST_EXTENSION_CLUSTER_STATE) { + ClusterStateResponse clusterStateResponse = new ClusterStateResponse( + clusterService.getClusterName(), + clusterService.state(), + false + ); + return clusterStateResponse; + } else if (extensionRequest.getRequestType() == RequestType.REQUEST_EXTENSION_LOCAL_NODE) { + LocalNodeResponse localNodeResponse = new LocalNodeResponse(clusterService); + return localNodeResponse; + } else if (extensionRequest.getRequestType() == RequestType.REQUEST_EXTENSION_CLUSTER_SETTINGS) { + ClusterSettingsResponse clusterSettingsResponse = new ClusterSettingsResponse(clusterService); + return clusterSettingsResponse; + } + throw new IllegalStateException("Handler not present for the provided request: " + extensionRequest.getRequestType()); + } + + public void onIndexModule(IndexModule indexModule) throws UnknownHostException { + for (DiscoveryNode extensionNode : uninitializedExtensions) { + onIndexModule(indexModule, extensionNode); + } + } + + private void onIndexModule(IndexModule indexModule, DiscoveryNode extensionNode) throws UnknownHostException { + logger.info("onIndexModule index:" + indexModule.getIndex()); + final CompletableFuture inProgressFuture = new CompletableFuture<>(); + final CompletableFuture inProgressIndexNameFuture = new CompletableFuture<>(); + final TransportResponseHandler acknowledgedResponseHandler = new TransportResponseHandler< + AcknowledgedResponse>() { + @Override + public void handleResponse(AcknowledgedResponse response) { + logger.info("ACK Response" + response); + inProgressIndexNameFuture.complete(response); + } + + @Override + public void handleException(TransportException exp) { + + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + public AcknowledgedResponse read(StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + + }; + + final TransportResponseHandler indicesModuleResponseHandler = new TransportResponseHandler< + IndicesModuleResponse>() { + + @Override + public IndicesModuleResponse read(StreamInput in) throws IOException { + return new IndicesModuleResponse(in); + } + + @Override + public void handleResponse(IndicesModuleResponse response) { + logger.info("received {}", response); + if (response.getIndexEventListener() == true) { + indexModule.addIndexEventListener(new IndexEventListener() { + @Override + public void beforeIndexRemoved( + IndexService indexService, + IndicesClusterStateService.AllocatedIndices.IndexRemovalReason reason + ) { + logger.info("Index Event Listener is called"); + String indexName = indexService.index().getName(); + logger.info("Index Name" + indexName.toString()); + try { + logger.info("Sending request of index name to extension"); + transportService.sendRequest( + extensionNode, + INDICES_EXTENSION_NAME_ACTION_NAME, + new IndicesModuleRequest(indexModule), + acknowledgedResponseHandler + ); + /* + * Making async synchronous for now. + */ + inProgressIndexNameFuture.get(100, TimeUnit.SECONDS); + logger.info("Received ack response from Extension"); + } catch (Exception e) { + logger.error(e.toString()); + } + } + }); + } + inProgressFuture.complete(response); + } + + @Override + public void handleException(TransportException exp) { + logger.error(new ParameterizedMessage("IndicesModuleRequest failed"), exp); + inProgressFuture.completeExceptionally(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + }; + + try { + logger.info("Sending request to extension"); + transportService.sendRequest( + extensionNode, + INDICES_EXTENSION_POINT_ACTION_NAME, + new IndicesModuleRequest(indexModule), + indicesModuleResponseHandler + ); + /* + * Making async synchronous for now. + */ + inProgressFuture.get(100, TimeUnit.SECONDS); + logger.info("Received response from Extension"); + } catch (Exception e) { + logger.error(e.toString()); + } + } + + private ExtensionsSettings readFromExtensionsYml(Path filePath) throws IOException { + ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory()); + InputStream input = Files.newInputStream(filePath); + ExtensionsSettings extensionSettings = objectMapper.readValue(input, ExtensionsSettings.class); + return extensionSettings; + } + + public static String getRequestExtensionActionName() { + return REQUEST_EXTENSION_ACTION_NAME; + } + + public static String getIndicesExtensionPointActionName() { + return INDICES_EXTENSION_POINT_ACTION_NAME; + } + + public static String getIndicesExtensionNameActionName() { + return INDICES_EXTENSION_NAME_ACTION_NAME; + } + + public static String getRequestExtensionClusterState() { + return REQUEST_EXTENSION_CLUSTER_STATE; + } + + public static String getRequestExtensionLocalNode() { + return REQUEST_EXTENSION_LOCAL_NODE; + } + + public static String getRequestExtensionClusterSettings() { + return REQUEST_EXTENSION_CLUSTER_SETTINGS; + } + + public static Logger getLogger() { + return logger; + } + + public Path getExtensionsPath() { + return extensionsPath; + } + + public List getUninitializedExtensions() { + return uninitializedExtensions; + } + + public List getExtensions() { + return extensions; + } + + public TransportService getTransportService() { + return transportService; + } + + public ClusterService getClusterService() { + return clusterService; + } + +} diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsSettings.java b/server/src/main/java/org/opensearch/extensions/ExtensionsSettings.java new file mode 100644 index 0000000000000..8b6226e578ea3 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsSettings.java @@ -0,0 +1,202 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions; + +import java.util.ArrayList; +import java.util.List; + +/** + * List of extension configurations from extension.yml + * + * @opensearch.internal + */ +public class ExtensionsSettings { + + private List extensions; + + public ExtensionsSettings() { + extensions = new ArrayList(); + } + + /** + * Extension configuration used for extension discovery + * + * @opensearch.internal + */ + public static class Extension { + + private String name; + private String uniqueId; + private String hostName; + private String hostAddress; + private String port; + private String version; + private String description; + private String opensearchVersion; + private String jvmVersion; + private String className; + private String customFolderName; + private String hasNativeController; + + public Extension() { + name = ""; + uniqueId = ""; + hostName = ""; + hostAddress = ""; + port = ""; + version = ""; + description = ""; + opensearchVersion = ""; + jvmVersion = ""; + className = ""; + customFolderName = ""; + hasNativeController = "false"; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getUniqueId() { + return uniqueId; + } + + public void setUniqueId(String uniqueId) { + this.uniqueId = uniqueId; + } + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public String getHostAddress() { + return hostAddress; + } + + public void setHostAddress(String hostAddress) { + this.hostAddress = hostAddress; + } + + public String getPort() { + return port; + } + + public void setPort(String port) { + this.port = port; + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + @Override + public String toString() { + return "Extension [className=" + + className + + ", customFolderName=" + + customFolderName + + ", description=" + + description + + ", hasNativeController=" + + hasNativeController + + ", hostAddress=" + + hostAddress + + ", hostName=" + + hostName + + ", jvmVersion=" + + jvmVersion + + ", name=" + + name + + ", opensearchVersion=" + + opensearchVersion + + ", port=" + + port + + ", uniqueId=" + + uniqueId + + ", version=" + + version + + "]"; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public String getOpensearchVersion() { + return opensearchVersion; + } + + public void setOpensearchVersion(String opensearchVersion) { + this.opensearchVersion = opensearchVersion; + } + + public String getJavaVersion() { + return jvmVersion; + } + + public void setJavaVersion(String jvmVersion) { + this.jvmVersion = jvmVersion; + } + + public String getClassName() { + return className; + } + + public void setClassName(String className) { + this.className = className; + } + + public String getCustomFolderName() { + return customFolderName; + } + + public void setCustomFolderName(String customFolderName) { + this.customFolderName = customFolderName; + } + + public String hasNativeController() { + return hasNativeController; + } + + public void setHasNativeController(String hasNativeController) { + this.hasNativeController = hasNativeController; + } + + } + + public List getExtensions() { + return extensions; + } + + public void setExtensions(List extensions) { + this.extensions = extensions; + } + + @Override + public String toString() { + return "ExtensionsSettings [extensions=" + extensions + "]"; + } + +} diff --git a/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java new file mode 100644 index 0000000000000..24f71476dcb1e --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions; + +/** + * Noop class for ExtensionsManager + * + * @opensearch.internal + */ +public class NoopExtensionsManager extends ExtensionsManager { + + public NoopExtensionsManager() { + super(); + } +} diff --git a/server/src/main/java/org/opensearch/extensions/package-info.java b/server/src/main/java/org/opensearch/extensions/package-info.java new file mode 100644 index 0000000000000..c6efd42499240 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Main OpenSearch extensions package. OpenSearch extensions provide extensibility to OpenSearch.*/ +package org.opensearch.extensions; diff --git a/server/src/main/java/org/opensearch/index/AcknowledgedResponse.java b/server/src/main/java/org/opensearch/index/AcknowledgedResponse.java new file mode 100644 index 0000000000000..5993a81158d30 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/AcknowledgedResponse.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.transport.TransportResponse; + +import java.io.IOException; + +/** + * Response for index name of onIndexModule extension point + * + * @opensearch.internal + */ +public class AcknowledgedResponse extends TransportResponse { + private boolean requestAck; + + public AcknowledgedResponse(StreamInput in) throws IOException { + this.requestAck = in.readBoolean(); + } + + public AcknowledgedResponse(Boolean requestAck) { + this.requestAck = requestAck; + } + + public void AcknowledgedResponse(StreamInput in) throws IOException { + this.requestAck = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(requestAck); + } + +} diff --git a/server/src/main/java/org/opensearch/index/IndicesModuleRequest.java b/server/src/main/java/org/opensearch/index/IndicesModuleRequest.java new file mode 100644 index 0000000000000..0e0fe87df76cd --- /dev/null +++ b/server/src/main/java/org/opensearch/index/IndicesModuleRequest.java @@ -0,0 +1,68 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.settings.Settings; +import org.opensearch.transport.TransportRequest; + +import java.io.IOException; +import java.util.Objects; + +/** + * Request for onIndexModule extension point + * + * @opensearch.internal + */ +public class IndicesModuleRequest extends TransportRequest { + private final Index index; + private final Settings indexSettings; + + public IndicesModuleRequest(IndexModule indexModule) { + this.index = indexModule.getIndex(); + this.indexSettings = indexModule.getSettings(); + } + + public IndicesModuleRequest(StreamInput in) throws IOException { + super(in); + this.index = new Index(in); + this.indexSettings = Settings.readSettingsFromStream(in); + } + + public IndicesModuleRequest(Index index, Settings settings) { + this.index = index; + this.indexSettings = settings; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + index.writeTo(out); + Settings.writeSettingsToStream(indexSettings, out); + } + + @Override + public String toString() { + return "IndicesModuleRequest{" + "index=" + index + ", indexSettings=" + indexSettings + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + IndicesModuleRequest that = (IndicesModuleRequest) o; + return Objects.equals(index, that.index) && Objects.equals(indexSettings, that.indexSettings); + } + + @Override + public int hashCode() { + return Objects.hash(index, indexSettings); + } +} diff --git a/server/src/main/java/org/opensearch/index/IndicesModuleResponse.java b/server/src/main/java/org/opensearch/index/IndicesModuleResponse.java new file mode 100644 index 0000000000000..7b41f629e48ed --- /dev/null +++ b/server/src/main/java/org/opensearch/index/IndicesModuleResponse.java @@ -0,0 +1,89 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.transport.TransportResponse; + +import java.io.IOException; +import java.util.Objects; + +/** + * Response for onIndexModule extension point + * + * @opensearch.internal + */ +public class IndicesModuleResponse extends TransportResponse { + private boolean supportsIndexEventListener; + private boolean addIndexOperationListener; + private boolean addSearchOperationListener; + + public IndicesModuleResponse( + boolean supportsIndexEventListener, + boolean addIndexOperationListener, + boolean addSearchOperationListener + ) { + this.supportsIndexEventListener = supportsIndexEventListener; + this.addIndexOperationListener = addIndexOperationListener; + this.addSearchOperationListener = addSearchOperationListener; + } + + public IndicesModuleResponse(StreamInput in) throws IOException { + this.supportsIndexEventListener = in.readBoolean(); + this.addIndexOperationListener = in.readBoolean(); + this.addSearchOperationListener = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(supportsIndexEventListener); + out.writeBoolean(addIndexOperationListener); + out.writeBoolean(addSearchOperationListener); + } + + public boolean getIndexEventListener() { + return this.supportsIndexEventListener; + } + + public boolean getIndexOperationListener() { + return this.addIndexOperationListener; + } + + public boolean getSearchOperationListener() { + return this.addSearchOperationListener; + } + + @Override + public String toString() { + return "IndicesModuleResponse{" + + "supportsIndexEventListener" + + supportsIndexEventListener + + " addIndexOperationListener" + + addIndexOperationListener + + " addSearchOperationListener" + + addSearchOperationListener + + "}"; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + IndicesModuleResponse that = (IndicesModuleResponse) o; + return Objects.equals(supportsIndexEventListener, that.supportsIndexEventListener) + && Objects.equals(addIndexOperationListener, that.addIndexOperationListener) + && Objects.equals(addSearchOperationListener, that.addSearchOperationListener); + } + + @Override + public int hashCode() { + return Objects.hash(supportsIndexEventListener, addIndexOperationListener, addSearchOperationListener); + } +} diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index b2f48ccdd389c..204bf4204511e 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -82,6 +82,7 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.iterable.Iterables; import org.opensearch.common.util.set.Sets; import org.opensearch.common.xcontent.LoggingDeprecationHandler; @@ -142,6 +143,7 @@ import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.node.Node; import org.opensearch.plugins.IndexStorePlugin; +import org.opensearch.extensions.ExtensionsManager; import org.opensearch.plugins.PluginsService; import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; @@ -227,6 +229,7 @@ public class IndicesService extends AbstractLifecycleComponent */ private final Settings settings; private final PluginsService pluginsService; + private final ExtensionsManager extensionsManager; private final NodeEnvironment nodeEnv; private final NamedXContentRegistry xContentRegistry; private final TimeValue shardsClosedTimeout; @@ -299,6 +302,120 @@ public IndicesService( this.settings = settings; this.threadPool = threadPool; this.pluginsService = pluginsService; + this.extensionsManager = null; + this.nodeEnv = nodeEnv; + this.xContentRegistry = xContentRegistry; + this.valuesSourceRegistry = valuesSourceRegistry; + this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS)); + this.analysisRegistry = analysisRegistry; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.indicesRequestCache = new IndicesRequestCache(settings); + this.indicesQueryCache = new IndicesQueryCache(settings); + this.mapperRegistry = mapperRegistry; + this.namedWriteableRegistry = namedWriteableRegistry; + indexingMemoryController = new IndexingMemoryController( + settings, + threadPool, + // ensure we pull an iter with new shards - flatten makes a copy + () -> Iterables.flatten(this).iterator() + ); + this.indexScopedSettings = indexScopedSettings; + this.circuitBreakerService = circuitBreakerService; + this.bigArrays = bigArrays; + this.scriptService = scriptService; + this.clusterService = clusterService; + this.client = client; + this.idFieldDataEnabled = INDICES_ID_FIELD_DATA_ENABLED_SETTING.get(clusterService.getSettings()); + clusterService.getClusterSettings().addSettingsUpdateConsumer(INDICES_ID_FIELD_DATA_ENABLED_SETTING, this::setIdFieldDataEnabled); + this.indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() { + @Override + public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, long sizeInBytes) { + assert sizeInBytes >= 0 : "When reducing circuit breaker, it should be adjusted with a number higher or " + + "equal to 0 and not [" + + sizeInBytes + + "]"; + circuitBreakerService.getBreaker(CircuitBreaker.FIELDDATA).addWithoutBreaking(-sizeInBytes); + } + }); + this.cleanInterval = INDICES_CACHE_CLEAN_INTERVAL_SETTING.get(settings); + this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, indicesRequestCache, logger, threadPool, this.cleanInterval); + this.metaStateService = metaStateService; + this.engineFactoryProviders = engineFactoryProviders; + + this.directoryFactories = directoryFactories; + this.recoveryStateFactories = recoveryStateFactories; + // doClose() is called when shutting down a node, yet there might still be ongoing requests + // that we need to wait for before closing some resources such as the caches. In order to + // avoid closing these resources while ongoing requests are still being processed, we use a + // ref count which will only close them when both this service and all index services are + // actually closed + indicesRefCount = new AbstractRefCounted("indices") { + @Override + protected void closeInternal() { + try { + IOUtils.close( + analysisRegistry, + indexingMemoryController, + indicesFieldDataCache, + cacheCleaner, + indicesRequestCache, + indicesQueryCache + ); + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + closeLatch.countDown(); + } + } + }; + + final String nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings)); + nodeWriteDanglingIndicesInfo = WRITE_DANGLING_INDICES_INFO_SETTING.get(settings); + danglingIndicesThreadPoolExecutor = nodeWriteDanglingIndicesInfo + ? OpenSearchExecutors.newScaling( + nodeName + "/" + DANGLING_INDICES_UPDATE_THREAD_NAME, + 1, + 1, + 0, + TimeUnit.MILLISECONDS, + daemonThreadFactory(nodeName, DANGLING_INDICES_UPDATE_THREAD_NAME), + threadPool.getThreadContext() + ) + : null; + + this.allowExpensiveQueries = ALLOW_EXPENSIVE_QUERIES.get(clusterService.getSettings()); + clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries); + this.remoteDirectoryFactory = remoteDirectoryFactory; + } + + public IndicesService( + Settings settings, + PluginsService pluginsService, + ExtensionsManager extensionsManager, + NodeEnvironment nodeEnv, + NamedXContentRegistry xContentRegistry, + AnalysisRegistry analysisRegistry, + IndexNameExpressionResolver indexNameExpressionResolver, + MapperRegistry mapperRegistry, + NamedWriteableRegistry namedWriteableRegistry, + ThreadPool threadPool, + IndexScopedSettings indexScopedSettings, + CircuitBreakerService circuitBreakerService, + BigArrays bigArrays, + ScriptService scriptService, + ClusterService clusterService, + Client client, + MetaStateService metaStateService, + Collection>> engineFactoryProviders, + Map directoryFactories, + ValuesSourceRegistry valuesSourceRegistry, + Map recoveryStateFactories, + IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory + ) { + this.settings = settings; + this.threadPool = threadPool; + this.pluginsService = pluginsService; + this.extensionsManager = extensionsManager; this.nodeEnv = nodeEnv; this.xContentRegistry = xContentRegistry; this.valuesSourceRegistry = valuesSourceRegistry; @@ -721,6 +838,9 @@ private synchronized IndexService createIndexService( indexModule.addIndexOperationListener(operationListener); } pluginsService.onIndexModule(indexModule); + if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { + extensionsManager.onIndexModule(indexModule); + } for (IndexEventListener listener : builtInListeners) { indexModule.addIndexEventListener(listener); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 93de057285012..f204723709965 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -43,6 +43,8 @@ import org.opensearch.indices.replication.SegmentReplicationSourceFactory; import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.SegmentReplicationSourceService; +import org.opensearch.extensions.ExtensionsManager; +import org.opensearch.extensions.NoopExtensionsManager; import org.opensearch.search.backpressure.SearchBackpressureService; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.tasks.TaskResourceTrackingService; @@ -340,6 +342,7 @@ public static class DiscoverySettings { private final Environment environment; private final NodeEnvironment nodeEnvironment; private final PluginsService pluginsService; + private final ExtensionsManager extensionsManager; private final NodeClient client; private final Collection pluginLifecycleComponents; private final LocalNodeFactory localNodeFactory; @@ -424,6 +427,13 @@ protected Node( initialEnvironment.pluginsDir(), classpathPlugins ); + + if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { + this.extensionsManager = new ExtensionsManager(tmpSettings, initialEnvironment.extensionDir()); + } else { + this.extensionsManager = new NoopExtensionsManager(); + } + final Settings settings = pluginsService.updatedSettings(); final Set additionalRoles = pluginsService.filterPlugins(Plugin.class) @@ -652,29 +662,58 @@ protected Node( repositoriesServiceReference::get ); - final IndicesService indicesService = new IndicesService( - settings, - pluginsService, - nodeEnvironment, - xContentRegistry, - analysisModule.getAnalysisRegistry(), - clusterModule.getIndexNameExpressionResolver(), - indicesModule.getMapperRegistry(), - namedWriteableRegistry, - threadPool, - settingsModule.getIndexScopedSettings(), - circuitBreakerService, - bigArrays, - scriptService, - clusterService, - client, - metaStateService, - engineFactoryProviders, - Map.copyOf(directoryFactories), - searchModule.getValuesSourceRegistry(), - recoveryStateFactories, - remoteDirectoryFactory - ); + final IndicesService indicesService; + + if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { + indicesService = new IndicesService( + settings, + pluginsService, + extensionsManager, + nodeEnvironment, + xContentRegistry, + analysisModule.getAnalysisRegistry(), + clusterModule.getIndexNameExpressionResolver(), + indicesModule.getMapperRegistry(), + namedWriteableRegistry, + threadPool, + settingsModule.getIndexScopedSettings(), + circuitBreakerService, + bigArrays, + scriptService, + clusterService, + client, + metaStateService, + engineFactoryProviders, + Map.copyOf(directoryFactories), + searchModule.getValuesSourceRegistry(), + recoveryStateFactories, + remoteDirectoryFactory + ); + } else { + indicesService = new IndicesService( + settings, + pluginsService, + nodeEnvironment, + xContentRegistry, + analysisModule.getAnalysisRegistry(), + clusterModule.getIndexNameExpressionResolver(), + indicesModule.getMapperRegistry(), + namedWriteableRegistry, + threadPool, + settingsModule.getIndexScopedSettings(), + circuitBreakerService, + bigArrays, + scriptService, + clusterService, + client, + metaStateService, + engineFactoryProviders, + Map.copyOf(directoryFactories), + searchModule.getValuesSourceRegistry(), + recoveryStateFactories, + remoteDirectoryFactory + ); + } final AliasValidator aliasValidator = new AliasValidator(); @@ -787,6 +826,10 @@ protected Node( settingsModule.getClusterSettings(), taskHeaders ); + if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { + this.extensionsManager.setTransportService(transportService); + this.extensionsManager.setClusterService(clusterService); + } final GatewayMetaState gatewayMetaState = new GatewayMetaState(); final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService); final SearchTransportService searchTransportService = new SearchTransportService( @@ -1200,6 +1243,9 @@ public Node start() throws NodeValidationException { assert clusterService.localNode().equals(localNodeFactory.getNode()) : "clusterService has a different local node than the factory provided"; transportService.acceptIncomingRequests(); + if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { + extensionsManager.initialize(); + } discovery.startInitialJoin(); final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings()); configureNodeAndClusterIdStateListener(clusterService); diff --git a/server/src/main/java/org/opensearch/plugins/PluginsService.java b/server/src/main/java/org/opensearch/plugins/PluginsService.java index bff880e5a41d7..c336bf156f40c 100644 --- a/server/src/main/java/org/opensearch/plugins/PluginsService.java +++ b/server/src/main/java/org/opensearch/plugins/PluginsService.java @@ -305,6 +305,7 @@ public Collection> getGuiceServiceClasses() } public void onIndexModule(IndexModule indexModule) { + logger.info("PluginService:onIndexModule index:" + indexModule.getIndex()); for (Tuple plugin : plugins) { plugin.v2().onIndexModule(indexModule); } diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index b9bf035a7fa77..1d94c5600818f 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -397,6 +397,11 @@ public void connectToNode(DiscoveryNode node) throws ConnectTransportException { connectToNode(node, (ConnectionProfile) null); } + // We are skipping node validation for extensibility as extensionNode and opensearchNode(LocalNode) will have different ephemeral id's + public void connectToExtensionNode(final DiscoveryNode node) { + PlainActionFuture.get(fut -> connectToExtensionNode(node, (ConnectionProfile) null, ActionListener.map(fut, x -> null))); + } + /** * Connect to the specified node with the given connection profile * @@ -407,6 +412,10 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection PlainActionFuture.get(fut -> connectToNode(node, connectionProfile, ActionListener.map(fut, x -> null))); } + public void connectToExtensionNode(final DiscoveryNode node, ConnectionProfile connectionProfile) { + PlainActionFuture.get(fut -> connectToExtensionNode(node, connectionProfile, ActionListener.map(fut, x -> null))); + } + /** * Connect to the specified node with the given connection profile. * The ActionListener will be called on the calling thread or the generic thread pool. @@ -418,6 +427,10 @@ public void connectToNode(DiscoveryNode node, ActionListener listener) thr connectToNode(node, null, listener); } + public void connectToExtensionNode(DiscoveryNode node, ActionListener listener) throws ConnectTransportException { + connectToExtensionNode(node, null, listener); + } + /** * Connect to the specified node with the given connection profile. * The ActionListener will be called on the calling thread or the generic thread pool. @@ -434,14 +447,35 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection connectionManager.connectToNode(node, connectionProfile, connectionValidator(node), listener); } + public void connectToExtensionNode(final DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener listener) { + if (isLocalNode(node)) { + listener.onResponse(null); + return; + } + connectionManager.connectToNode(node, connectionProfile, extensionConnectionValidator(node), listener); + } + public ConnectionManager.ConnectionValidator connectionValidator(DiscoveryNode node) { return (newConnection, actualProfile, listener) -> { // We don't validate cluster names to allow for CCS connections. handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true, ActionListener.map(listener, resp -> { final DiscoveryNode remote = resp.discoveryNode; + if (node.equals(remote) == false) { throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote); } + + return null; + })); + }; + } + + public ConnectionManager.ConnectionValidator extensionConnectionValidator(DiscoveryNode node) { + return (newConnection, actualProfile, listener) -> { + // We don't validate cluster names to allow for CCS connections. + handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true, ActionListener.map(listener, resp -> { + final DiscoveryNode remote = resp.discoveryNode; + logger.info("Connection validation was skipped"); return null; })); }; @@ -731,6 +765,7 @@ public final void sendRequest( final TransportResponseHandler handler ) { try { + logger.info("Action: " + action); final TransportResponseHandler delegate; if (request.getParentTask().isSet()) { // TODO: capture the connection instead so that we can cancel child tasks on the remote connections. diff --git a/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java b/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java index a4f2b242564e2..b493771876b99 100644 --- a/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java +++ b/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java @@ -22,6 +22,7 @@ public class FeatureFlagTests extends OpenSearchTestCase { public static void enableFeature() { AccessController.doPrivileged((PrivilegedAction) () -> System.setProperty(FeatureFlags.REPLICATION_TYPE, "true")); AccessController.doPrivileged((PrivilegedAction) () -> System.setProperty(FeatureFlags.REMOTE_STORE, "true")); + AccessController.doPrivileged((PrivilegedAction) () -> System.setProperty(FeatureFlags.EXTENSIONS, "true")); } public void testReplicationTypeFeatureFlag() { diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java new file mode 100644 index 0000000000000..cbd86378c0fac --- /dev/null +++ b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java @@ -0,0 +1,418 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.mock; +import static org.opensearch.test.ClusterServiceUtils.createClusterService; + +import java.io.IOException; +import java.net.InetAddress; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.AccessControlException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.junit.After; +import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.cluster.ClusterSettingsResponse; +import org.opensearch.cluster.LocalNodeResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.io.PathUtils; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.transport.TransportAddress; +import org.opensearch.common.util.FeatureFlagTests; +import org.opensearch.common.util.PageCacheRecycler; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.env.Environment; +import org.opensearch.env.TestEnvironment; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.analysis.AnalysisRegistry; +import org.opensearch.index.engine.EngineConfigFactory; +import org.opensearch.index.engine.InternalEngineFactory; +import org.opensearch.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.plugins.PluginInfo; +import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.MockLogAppender; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.ConnectTransportException; +import org.opensearch.transport.Transport; +import org.opensearch.transport.TransportService; +import org.opensearch.transport.nio.MockNioTransport; + +public class ExtensionsManagerTests extends OpenSearchTestCase { + + private TransportService transportService; + private ClusterService clusterService; + private MockNioTransport transport; + private final ThreadPool threadPool = new TestThreadPool(ExtensionsManagerTests.class.getSimpleName()); + private final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) + .build(); + + @Before + public void setup() throws Exception { + FeatureFlagTests.enableFeature(); + Settings settings = Settings.builder().put("cluster.name", "test").build(); + transport = new MockNioTransport( + settings, + Version.CURRENT, + threadPool, + new NetworkService(Collections.emptyList()), + PageCacheRecycler.NON_RECYCLING_INSTANCE, + new NamedWriteableRegistry(Collections.emptyList()), + new NoneCircuitBreakerService() + ); + transportService = new MockTransportService( + settings, + transport, + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + (boundAddress) -> new DiscoveryNode( + "test_node", + "test_node", + boundAddress.publishAddress(), + emptyMap(), + emptySet(), + Version.CURRENT + ), + null, + Collections.emptySet() + ); + clusterService = createClusterService(threadPool); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + transportService.close(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + } + + public void testExtensionsDiscovery() throws Exception { + Path extensionDir = createTempDir(); + + List extensionsYmlLines = Arrays.asList( + "extensions:", + " - name: firstExtension", + " uniqueId: uniqueid1", + " hostName: 'myIndependentPluginHost1'", + " hostAddress: '127.0.0.0'", + " port: '9300'", + " version: '0.0.7'", + " description: Fake description 1", + " opensearchVersion: '3.0.0'", + " javaVersion: '14'", + " className: fakeClass1", + " customFolderName: fakeFolder1", + " hasNativeController: false", + " - name: secondExtension", + " uniqueId: 'uniqueid2'", + " hostName: 'myIndependentPluginHost2'", + " hostAddress: '127.0.0.1'", + " port: '9301'", + " version: '3.14.16'", + " description: Fake description 2", + " opensearchVersion: '2.0.0'", + " javaVersion: '17'", + " className: fakeClass2", + " customFolderName: fakeFolder2", + " hasNativeController: true" + ); + Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); + + ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); + + List expectedUninitializedExtensions = new ArrayList(); + + expectedUninitializedExtensions.add( + new DiscoveryExtensionNode( + "firstExtension", + "uniqueid1", + "uniqueid1", + "myIndependentPluginHost1", + "127.0.0.0", + new TransportAddress(InetAddress.getByName("127.0.0.0"), 9300), + new HashMap(), + Version.fromString("3.0.0"), + new PluginInfo( + "firstExtension", + "Fake description 1", + "0.0.7", + Version.fromString("3.0.0"), + "14", + "fakeClass1", + new ArrayList(), + false + ) + ) + ); + + expectedUninitializedExtensions.add( + new DiscoveryExtensionNode( + "secondExtension", + "uniqueid2", + "uniqueid2", + "myIndependentPluginHost2", + "127.0.0.1", + new TransportAddress(TransportAddress.META_ADDRESS, 9301), + new HashMap(), + Version.fromString("2.0.0"), + new PluginInfo( + "secondExtension", + "Fake description 2", + "3.14.16", + Version.fromString("2.0.0"), + "17", + "fakeClass2", + new ArrayList(), + true + ) + ) + ); + assertEquals(expectedUninitializedExtensions, extensionsManager.getUninitializedExtensions()); + } + + public void testNonAccessibleDirectory() throws Exception { + AccessControlException e = expectThrows( + + AccessControlException.class, + () -> new ExtensionsManager(settings, PathUtils.get("")) + ); + assertEquals("access denied (\"java.io.FilePermission\" \"\" \"read\")", e.getMessage()); + } + + public void testNoExtensionsFile() throws Exception { + Path extensionDir = createTempDir(); + + Settings settings = Settings.builder().build(); + + try (MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(LogManager.getLogger(ExtensionsManager.class))) { + + mockLogAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "No Extensions File Present", + "org.opensearch.extensions.ExtensionsManager", + Level.INFO, + "Extensions.yml file is not present. No extensions will be loaded." + ) + ); + + new ExtensionsManager(settings, extensionDir); + + mockLogAppender.assertAllExpectationsMatched(); + } + } + + public void testEmptyExtensionsFile() throws Exception { + Path extensionDir = createTempDir(); + + List extensionsYmlLines = Arrays.asList(); + Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); + + Settings settings = Settings.builder().build(); + + expectThrows(IOException.class, () -> new ExtensionsManager(settings, extensionDir)); + } + + public void testInitialize() throws Exception { + Path extensionDir = createTempDir(); + + List extensionsYmlLines = Arrays.asList( + "extensions:", + " - name: firstExtension", + " uniqueId: uniqueid1", + " hostName: 'myIndependentPluginHost1'", + " hostAddress: '127.0.0.0'", + " port: '9300'", + " version: '0.0.7'", + " description: Fake description 1", + " opensearchVersion: '3.0.0'", + " javaVersion: '14'", + " className: fakeClass1", + " customFolderName: fakeFolder1", + " hasNativeController: false", + " - name: secondExtension", + " uniqueId: 'uniqueid2'", + " hostName: 'myIndependentPluginHost2'", + " hostAddress: '127.0.0.1'", + " port: '9301'", + " version: '3.14.16'", + " description: Fake description 2", + " opensearchVersion: '2.0.0'", + " javaVersion: '17'", + " className: fakeClass2", + " customFolderName: fakeFolder2", + " hasNativeController: true" + ); + Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); + + ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); + + transportService.start(); + transportService.acceptIncomingRequests(); + extensionsManager.setTransportService(transportService); + + expectThrows(ConnectTransportException.class, () -> extensionsManager.initialize()); + + } + + public void testHandleExtensionRequest() throws Exception { + + Path extensionDir = createTempDir(); + + ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); + + extensionsManager.setTransportService(transportService); + extensionsManager.setClusterService(clusterService); + ExtensionRequest clusterStateRequest = new ExtensionRequest(ExtensionsManager.RequestType.REQUEST_EXTENSION_CLUSTER_STATE); + assertEquals(extensionsManager.handleExtensionRequest(clusterStateRequest).getClass(), ClusterStateResponse.class); + + ExtensionRequest clusterSettingRequest = new ExtensionRequest(ExtensionsManager.RequestType.REQUEST_EXTENSION_CLUSTER_SETTINGS); + assertEquals(extensionsManager.handleExtensionRequest(clusterSettingRequest).getClass(), ClusterSettingsResponse.class); + + ExtensionRequest localNodeRequest = new ExtensionRequest(ExtensionsManager.RequestType.REQUEST_EXTENSION_LOCAL_NODE); + assertEquals(extensionsManager.handleExtensionRequest(localNodeRequest).getClass(), LocalNodeResponse.class); + + ExtensionRequest exceptionRequest = new ExtensionRequest(ExtensionsManager.RequestType.GET_SETTINGS); + Exception exception = expectThrows(IllegalStateException.class, () -> extensionsManager.handleExtensionRequest(exceptionRequest)); + assertEquals(exception.getMessage(), "Handler not present for the provided request: " + exceptionRequest.getRequestType()); + } + + public void testRegisterHandler() throws Exception { + Path extensionDir = createTempDir(); + + ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); + + TransportService mockTransportService = spy( + new TransportService( + Settings.EMPTY, + mock(Transport.class), + null, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + x -> null, + null, + Collections.emptySet() + ) + ); + + extensionsManager.setTransportService(mockTransportService); + verify(mockTransportService, times(3)).registerRequestHandler(anyString(), anyString(), anyBoolean(), anyBoolean(), any(), any()); + + } + + public void testOnIndexModule() throws Exception { + + Path extensionDir = createTempDir(); + + List extensionsYmlLines = Arrays.asList( + "extensions:", + " - name: firstExtension", + " uniqueId: uniqueid1", + " hostName: 'myIndependentPluginHost1'", + " hostAddress: '127.0.0.0'", + " port: '9300'", + " version: '0.0.7'", + " description: Fake description 1", + " opensearchVersion: '3.0.0'", + " javaVersion: '14'", + " className: fakeClass1", + " customFolderName: fakeFolder1", + " hasNativeController: false", + " - name: secondExtension", + " uniqueId: 'uniqueid2'", + " hostName: 'myIndependentPluginHost2'", + " hostAddress: '127.0.0.1'", + " port: '9301'", + " version: '3.14.16'", + " description: Fake description 2", + " opensearchVersion: '2.0.0'", + " javaVersion: '17'", + " className: fakeClass2", + " customFolderName: fakeFolder2", + " hasNativeController: true" + ); + Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); + + ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); + + transportService.start(); + transportService.acceptIncomingRequests(); + extensionsManager.setTransportService(transportService); + + Environment environment = TestEnvironment.newEnvironment(settings); + AnalysisRegistry emptyAnalysisRegistry = new AnalysisRegistry( + environment, + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap() + ); + + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings); + IndexModule indexModule = new IndexModule( + indexSettings, + emptyAnalysisRegistry, + new InternalEngineFactory(), + new EngineConfigFactory(indexSettings), + Collections.emptyMap(), + () -> true, + new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)), + Collections.emptyMap() + ); + + try (MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(LogManager.getLogger(ExtensionsManager.class))) { + + mockLogAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "IndicesModuleRequest Failure", + "org.opensearch.extensions.ExtensionsManager", + Level.ERROR, + "IndicesModuleRequest failed" + ) + ); + + extensionsManager.onIndexModule(indexModule); + mockLogAppender.assertAllExpectationsMatched(); + } + } + +} diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index ff4005d9bcedf..663c325db12c2 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -156,6 +156,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.transport.TransportAddress; import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor; @@ -191,6 +192,7 @@ import org.opensearch.ingest.IngestService; import org.opensearch.monitor.StatusInfo; import org.opensearch.node.ResponseCollectorService; +import org.opensearch.extensions.ExtensionsManager; import org.opensearch.plugins.PluginsService; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -1795,40 +1797,79 @@ public void onFailure(final Exception e) { ); final BigArrays bigArrays = new BigArrays(new PageCacheRecycler(settings), null, "test"); final MapperRegistry mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry(); - indicesService = new IndicesService( - settings, - mock(PluginsService.class), - nodeEnv, - namedXContentRegistry, - new AnalysisRegistry( - environment, - emptyMap(), - emptyMap(), - emptyMap(), - emptyMap(), + if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { + indicesService = new IndicesService( + settings, + mock(PluginsService.class), + mock(ExtensionsManager.class), + nodeEnv, + namedXContentRegistry, + new AnalysisRegistry( + environment, + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap() + ), + indexNameExpressionResolver, + mapperRegistry, + namedWriteableRegistry, + threadPool, + indexScopedSettings, + new NoneCircuitBreakerService(), + bigArrays, + scriptService, + clusterService, + client, + new MetaStateService(nodeEnv, namedXContentRegistry), + Collections.emptyList(), emptyMap(), + null, emptyMap(), + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService) + ); + } else { + indicesService = new IndicesService( + settings, + mock(PluginsService.class), + nodeEnv, + namedXContentRegistry, + new AnalysisRegistry( + environment, + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap() + ), + indexNameExpressionResolver, + mapperRegistry, + namedWriteableRegistry, + threadPool, + indexScopedSettings, + new NoneCircuitBreakerService(), + bigArrays, + scriptService, + clusterService, + client, + new MetaStateService(nodeEnv, namedXContentRegistry), + Collections.emptyList(), emptyMap(), + null, emptyMap(), - emptyMap() - ), - indexNameExpressionResolver, - mapperRegistry, - namedWriteableRegistry, - threadPool, - indexScopedSettings, - new NoneCircuitBreakerService(), - bigArrays, - scriptService, - clusterService, - client, - new MetaStateService(nodeEnv, namedXContentRegistry), - Collections.emptyList(), - emptyMap(), - null, - emptyMap(), - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService) - ); + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService) + ); + } + final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); snapshotShardsService = new SnapshotShardsService( settings, diff --git a/server/src/test/java/org/opensearch/transport/TransportServiceHandshakeTests.java b/server/src/test/java/org/opensearch/transport/TransportServiceHandshakeTests.java index 8463d9268e760..c0af5d6e76c59 100644 --- a/server/src/test/java/org/opensearch/transport/TransportServiceHandshakeTests.java +++ b/server/src/test/java/org/opensearch/transport/TransportServiceHandshakeTests.java @@ -41,12 +41,15 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.test.MockLogAppender; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; import org.opensearch.test.transport.MockTransportService; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.nio.MockNioTransport; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -223,6 +226,36 @@ public void testNodeConnectWithDifferentNodeId() { assertFalse(handleA.transportService.nodeConnected(discoveryNode)); } + public void testNodeConnectWithDifferentNodeIDSkipValidation() throws IllegalAccessException { + Settings settings = Settings.builder().put("cluster.name", "test").build(); + NetworkHandle handleA = startServices("TS_A", settings, Version.CURRENT); + NetworkHandle handleB = startServices("TS_B", settings, Version.CURRENT); + DiscoveryNode discoveryNode = new DiscoveryNode( + randomAlphaOfLength(10), + handleB.discoveryNode.getAddress(), + emptyMap(), + emptySet(), + handleB.discoveryNode.getVersion() + ); + try (MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(LogManager.getLogger(TransportService.class))) { + + mockLogAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "Validation Skipped", + "org.opensearch.transport.TransportService", + Level.INFO, + "Connection validation was skipped" + ) + ); + + handleA.transportService.connectToExtensionNode(discoveryNode, TestProfiles.LIGHT_PROFILE); + + mockLogAppender.assertAllExpectationsMatched(); + + assertTrue(handleA.transportService.nodeConnected(discoveryNode)); + } + } + private static class NetworkHandle { private TransportService transportService; private DiscoveryNode discoveryNode; diff --git a/server/src/test/resources/config/extensions.yml b/server/src/test/resources/config/extensions.yml new file mode 100644 index 0000000000000..6264e9630ad60 --- /dev/null +++ b/server/src/test/resources/config/extensions.yml @@ -0,0 +1,13 @@ +extensions: + - name: firstExtension + uniqueId: uniqueid1 + hostName: 'myIndependentPluginHost1' + hostAddress: '127.0.0.0' + port: '9300' + version: '3.0.0' + - name: "secondExtension" + uniqueId: 'uniqueid2' + hostName: 'myIndependentPluginHost2' + hostAddress: '127.0.0.1' + port: '9301' + version: '2.0.0'