diff --git a/CHANGELOG.md b/CHANGELOG.md index 07f70f9f4a7aa..42cd569c9fb49 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add feature flag for extensions ([#5211](https://github.com/opensearch-project/OpenSearch/pull/5211)) - Added jackson dependency to server ([#5366] (https://github.com/opensearch-project/OpenSearch/pull/5366)) - Adding support to register settings dynamically ([#5495](https://github.com/opensearch-project/OpenSearch/pull/5495)) -- Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518), ([#5597](https://github.com/opensearch-project/OpenSearch/pull/5597))) +- Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518), ([#5597](https://github.com/opensearch-project/OpenSearch/pull/5597)), ([#5615](https://github.com/opensearch-project/OpenSearch/pull/5615))) - Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348)) - Add support for ppc64le architecture ([#5459](https://github.com/opensearch-project/OpenSearch/pull/5459)) diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 84bc9b395c5dc..bba3aabdd61f9 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -280,6 +280,8 @@ import org.opensearch.common.inject.TypeLiteral; import org.opensearch.common.inject.multibindings.MapBinder; import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.extensions.action.ExtensionProxyAction; +import org.opensearch.extensions.action.ExtensionTransportAction; import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; @@ -703,6 +705,11 @@ public void reg // Remote Store actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class); + if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { + // ExtensionProxyAction + actions.register(ExtensionProxyAction.INSTANCE, ExtensionTransportAction.class); + } + // Decommission actions actions.register(DecommissionAction.INSTANCE, TransportDecommissionAction.class); actions.register(GetDecommissionStateAction.INSTANCE, TransportGetDecommissionStateAction.class); diff --git a/server/src/main/java/org/opensearch/cluster/LocalNodeResponse.java b/server/src/main/java/org/opensearch/cluster/LocalNodeResponse.java deleted file mode 100644 index ef1ef4a49ad62..0000000000000 --- a/server/src/main/java/org/opensearch/cluster/LocalNodeResponse.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.cluster; - -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.transport.TransportResponse; - -import java.io.IOException; -import java.util.Objects; - -/** - * LocalNode Response for Extensibility - * - * @opensearch.internal - */ -public class LocalNodeResponse extends TransportResponse { - private final DiscoveryNode localNode; - - public LocalNodeResponse(ClusterService clusterService) { - this.localNode = clusterService.localNode(); - } - - public LocalNodeResponse(StreamInput in) throws IOException { - super(in); - this.localNode = new DiscoveryNode(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - this.localNode.writeTo(out); - } - - @Override - public String toString() { - return "LocalNodeResponse{" + "localNode=" + localNode + '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - LocalNodeResponse that = (LocalNodeResponse) o; - return Objects.equals(localNode, that.localNode); - } - - @Override - public int hashCode() { - return Objects.hash(localNode); - } - -} diff --git a/server/src/main/java/org/opensearch/extensions/DiscoveryExtensionNode.java b/server/src/main/java/org/opensearch/extensions/DiscoveryExtensionNode.java index e4fa0d74f78f0..1d9e8b768be33 100644 --- a/server/src/main/java/org/opensearch/extensions/DiscoveryExtensionNode.java +++ b/server/src/main/java/org/opensearch/extensions/DiscoveryExtensionNode.java @@ -20,6 +20,9 @@ import org.opensearch.plugins.PluginInfo; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; /** @@ -30,6 +33,7 @@ public class DiscoveryExtensionNode extends DiscoveryNode implements Writeable, ToXContentFragment { private final PluginInfo pluginInfo; + private List dependencies = Collections.emptyList(); public DiscoveryExtensionNode( String name, @@ -40,16 +44,22 @@ public DiscoveryExtensionNode( TransportAddress address, Map attributes, Version version, - PluginInfo pluginInfo + PluginInfo pluginInfo, + List dependencies ) { super(name, id, ephemeralId, hostName, hostAddress, address, attributes, DiscoveryNodeRole.BUILT_IN_ROLES, version); this.pluginInfo = pluginInfo; + this.dependencies = dependencies; } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); pluginInfo.writeTo(out); + out.writeVInt(dependencies.size()); + for (ExtensionDependency dependency : dependencies) { + dependency.writeTo(out); + } } /** @@ -61,6 +71,15 @@ public void writeTo(StreamOutput out) throws IOException { public DiscoveryExtensionNode(final StreamInput in) throws IOException { super(in); this.pluginInfo = new PluginInfo(in); + int size = in.readVInt(); + dependencies = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + dependencies.add(new ExtensionDependency(in)); + } + } + + public List getDependencies() { + return dependencies; } @Override diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionDependency.java b/server/src/main/java/org/opensearch/extensions/ExtensionDependency.java new file mode 100644 index 0000000000000..5e7fd651edfac --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/ExtensionDependency.java @@ -0,0 +1,89 @@ +/* +* Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions; + +import java.io.IOException; +import java.util.Objects; + +import org.opensearch.Version; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; + +/** + * This class handles the dependent extensions information + * + * @opensearch.internal + */ +public class ExtensionDependency implements Writeable { + private String uniqueId; + private Version version; + + public ExtensionDependency(String uniqueId, Version version) { + this.uniqueId = uniqueId; + this.version = version; + } + + /** + * Jackson requires a no-arg constructor. + * + */ + @SuppressWarnings("unused") + private ExtensionDependency() {} + + /** + * Reads the extension dependency information + * + * @throws IOException if an I/O exception occurred reading the extension dependency information + */ + public ExtensionDependency(StreamInput in) throws IOException { + uniqueId = in.readString(); + version = Version.readVersion(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(uniqueId); + Version.writeVersion(version, out); + } + + /** + * The uniqueId of the dependency extension + * + * @return the extension uniqueId + */ + public String getUniqueId() { + return uniqueId; + } + + /** + * The minimum version of the dependency extension + * + * @return the extension version + */ + public Version getVersion() { + return version; + } + + public String toString() { + return "ExtensionDependency:{uniqueId=" + uniqueId + ", version=" + version + "}"; + } + + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + ExtensionDependency that = (ExtensionDependency) obj; + return Objects.equals(uniqueId, that.uniqueId) && Objects.equals(version, that.version); + } + + public int hashCode() { + return Objects.hash(uniqueId, version); + } +} diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java index e42b9d1e755a5..e638faab3a747 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java @@ -26,8 +26,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.Version; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterSettingsResponse; -import org.opensearch.cluster.LocalNodeResponse; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.io.FileSystemUtils; @@ -39,6 +39,10 @@ import org.opensearch.discovery.InitializeExtensionRequest; import org.opensearch.discovery.InitializeExtensionResponse; import org.opensearch.extensions.ExtensionsSettings.Extension; +import org.opensearch.extensions.action.ExtensionActionRequest; +import org.opensearch.extensions.action.ExtensionActionResponse; +import org.opensearch.extensions.action.ExtensionTransportActionsHandler; +import org.opensearch.extensions.action.TransportActionRequestFromExtension; import org.opensearch.extensions.rest.RegisterRestActionsRequest; import org.opensearch.extensions.rest.RestActionsRequestHandler; import org.opensearch.extensions.settings.CustomSettingsRequestHandler; @@ -71,7 +75,6 @@ public class ExtensionsManager { public static final String INDICES_EXTENSION_POINT_ACTION_NAME = "indices:internal/extensions"; public static final String INDICES_EXTENSION_NAME_ACTION_NAME = "indices:internal/name"; public static final String REQUEST_EXTENSION_CLUSTER_STATE = "internal:discovery/clusterstate"; - public static final String REQUEST_EXTENSION_LOCAL_NODE = "internal:discovery/localnode"; public static final String REQUEST_EXTENSION_CLUSTER_SETTINGS = "internal:discovery/clustersettings"; public static final String REQUEST_EXTENSION_ENVIRONMENT_SETTINGS = "internal:discovery/enviornmentsettings"; public static final String REQUEST_EXTENSION_ADD_SETTINGS_UPDATE_CONSUMER = "internal:discovery/addsettingsupdateconsumer"; @@ -81,6 +84,9 @@ public class ExtensionsManager { public static final String REQUEST_EXTENSION_REGISTER_TRANSPORT_ACTIONS = "internal:discovery/registertransportactions"; public static final String REQUEST_OPENSEARCH_PARSE_NAMED_WRITEABLE = "internal:discovery/parsenamedwriteable"; public static final String REQUEST_REST_EXECUTE_ON_EXTENSION_ACTION = "internal:extensions/restexecuteonextensiontaction"; + public static final String REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION = "internal:extensions/handle-transportaction"; + public static final String TRANSPORT_ACTION_REQUEST_FROM_EXTENSION = "internal:extensions/request-transportaction-from-extension"; + public static final int EXTENSION_REQUEST_WAIT_TIMEOUT = 10; private static final Logger logger = LogManager.getLogger(ExtensionsManager.class); @@ -91,7 +97,6 @@ public class ExtensionsManager { */ public static enum RequestType { REQUEST_EXTENSION_CLUSTER_STATE, - REQUEST_EXTENSION_LOCAL_NODE, REQUEST_EXTENSION_CLUSTER_SETTINGS, REQUEST_EXTENSION_REGISTER_REST_ACTIONS, REQUEST_EXTENSION_REGISTER_SETTINGS, @@ -111,6 +116,7 @@ public static enum OpenSearchRequestType { } private final Path extensionsPath; + private ExtensionTransportActionsHandler extensionTransportActionsHandler; // A list of initialized extensions, a subset of the values of map below which includes all extensions private List extensions; private Map extensionIdMap; @@ -120,6 +126,7 @@ public static enum OpenSearchRequestType { private ClusterService clusterService; private Settings environmentSettings; private AddSettingsUpdateConsumerRequestHandler addSettingsUpdateConsumerRequestHandler; + private NodeClient client; public ExtensionsManager() { this.extensionsPath = Path.of(""); @@ -135,10 +142,13 @@ public ExtensionsManager() { public ExtensionsManager(Settings settings, Path extensionsPath) throws IOException { logger.info("ExtensionsManager initialized"); this.extensionsPath = extensionsPath; - this.transportService = null; this.extensions = new ArrayList(); this.extensionIdMap = new HashMap(); + // will be initialized in initializeServicesAndRestHandler which is called after the Node is initialized + this.transportService = null; this.clusterService = null; + this.client = null; + this.extensionTransportActionsHandler = null; /* * Now Discover extensions @@ -156,13 +166,15 @@ public ExtensionsManager(Settings settings, Path extensionsPath) throws IOExcept * @param transportService The Node's transport service. * @param clusterService The Node's cluster service. * @param initialEnvironmentSettings The finalized view of settings for the Environment + * @param client The client used to make transport requests */ public void initializeServicesAndRestHandler( RestController restController, SettingsModule settingsModule, TransportService transportService, ClusterService clusterService, - Settings initialEnvironmentSettings + Settings initialEnvironmentSettings, + NodeClient client ) { this.restActionsRequestHandler = new RestActionsRequestHandler(restController, extensionIdMap, transportService); this.customSettingsRequestHandler = new CustomSettingsRequestHandler(settingsModule); @@ -174,9 +186,20 @@ public void initializeServicesAndRestHandler( transportService, REQUEST_EXTENSION_UPDATE_SETTINGS ); + this.client = client; + this.extensionTransportActionsHandler = new ExtensionTransportActionsHandler(extensionIdMap, transportService, client); registerRequestHandler(); } + /** + * Handles Transport Request from {@link org.opensearch.extensions.action.ExtensionTransportAction} which was invoked by an extension via {@link ExtensionTransportActionsHandler}. + * + * @param request which was sent by an extension. + */ + public ExtensionActionResponse handleTransportRequest(ExtensionActionRequest request) throws InterruptedException { + return extensionTransportActionsHandler.sendTransportRequestToExtension(request); + } + private void registerRequestHandler() { transportService.registerRequestHandler( REQUEST_EXTENSION_REGISTER_REST_ACTIONS, @@ -202,14 +225,6 @@ private void registerRequestHandler() { ExtensionRequest::new, ((request, channel, task) -> channel.sendResponse(handleExtensionRequest(request))) ); - transportService.registerRequestHandler( - REQUEST_EXTENSION_LOCAL_NODE, - ThreadPool.Names.GENERIC, - false, - false, - ExtensionRequest::new, - ((request, channel, task) -> channel.sendResponse(handleExtensionRequest(request))) - ); transportService.registerRequestHandler( REQUEST_EXTENSION_CLUSTER_SETTINGS, ThreadPool.Names.GENERIC, @@ -242,7 +257,19 @@ private void registerRequestHandler() { false, false, RegisterTransportActionsRequest::new, - ((request, channel, task) -> channel.sendResponse(handleRegisterTransportActionsRequest(request))) + ((request, channel, task) -> channel.sendResponse( + extensionTransportActionsHandler.handleRegisterTransportActionsRequest(request) + )) + ); + transportService.registerRequestHandler( + TRANSPORT_ACTION_REQUEST_FROM_EXTENSION, + ThreadPool.Names.GENERIC, + false, + false, + TransportActionRequestFromExtension::new, + ((request, channel, task) -> channel.sendResponse( + extensionTransportActionsHandler.handleTransportActionRequestFromExtension(request) + )) ); } @@ -301,7 +328,8 @@ private void loadExtension(Extension extension) throws IOException { extension.getClassName(), new ArrayList(), Boolean.parseBoolean(extension.hasNativeController()) - ) + ), + extension.getDependencies() ); extensionIdMap.put(extension.getUniqueId(), discoveryExtensionNode); logger.info("Loaded extension with uniqueId " + extension.getUniqueId() + ": " + extension); @@ -364,7 +392,7 @@ public String executor() { initializeExtensionResponseHandler ); // TODO: make asynchronous - inProgressFuture.get(100, TimeUnit.SECONDS); + inProgressFuture.get(EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS); } catch (Exception e) { try { throw e; @@ -374,22 +402,6 @@ public String executor() { } } - /** - * Handles a {@link RegisterTransportActionsRequest}. - * - * @param transportActionsRequest The request to handle. - * @return A {@link AcknowledgedResponse} indicating success. - * @throws Exception if the request is not handled properly. - */ - TransportResponse handleRegisterTransportActionsRequest(RegisterTransportActionsRequest transportActionsRequest) throws Exception { - /* - * TODO: https://github.com/opensearch-project/opensearch-sdk-java/issues/107 - * Register these new Transport Actions with ActionModule - * and add support for NodeClient to recognise these actions when making transport calls. - */ - return new AcknowledgedResponse(true); - } - /** * Handles an {@link ExtensionRequest}. * @@ -401,8 +413,6 @@ TransportResponse handleExtensionRequest(ExtensionRequest extensionRequest) thro switch (extensionRequest.getRequestType()) { case REQUEST_EXTENSION_CLUSTER_STATE: return new ClusterStateResponse(clusterService.getClusterName(), clusterService.state(), false); - case REQUEST_EXTENSION_LOCAL_NODE: - return new LocalNodeResponse(clusterService); case REQUEST_EXTENSION_CLUSTER_SETTINGS: return new ClusterSettingsResponse(clusterService); case REQUEST_EXTENSION_ENVIRONMENT_SETTINGS: @@ -477,7 +487,7 @@ public void beforeIndexRemoved( acknowledgedResponseHandler ); // TODO: make asynchronous - inProgressIndexNameFuture.get(100, TimeUnit.SECONDS); + inProgressIndexNameFuture.get(EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS); logger.info("Received ack response from Extension"); } catch (Exception e) { try { @@ -513,7 +523,7 @@ public String executor() { indicesModuleResponseHandler ); // TODO: make asynchronous - inProgressFuture.get(100, TimeUnit.SECONDS); + inProgressFuture.get(EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS); logger.info("Received response from Extension"); } catch (Exception e) { try { @@ -547,10 +557,6 @@ public static String getRequestExtensionClusterState() { return REQUEST_EXTENSION_CLUSTER_STATE; } - public static String getRequestExtensionLocalNode() { - return REQUEST_EXTENSION_LOCAL_NODE; - } - public static String getRequestExtensionClusterSettings() { return REQUEST_EXTENSION_CLUSTER_SETTINGS; } @@ -661,4 +667,32 @@ public void setEnvironmentSettings(Settings environmentSettings) { this.environmentSettings = environmentSettings; } + public static String getRequestExtensionHandleTransportAction() { + return REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION; + } + + public static String getTransportActionRequestFromExtension() { + return TRANSPORT_ACTION_REQUEST_FROM_EXTENSION; + } + + public static int getExtensionRequestWaitTimeout() { + return EXTENSION_REQUEST_WAIT_TIMEOUT; + } + + public ExtensionTransportActionsHandler getExtensionTransportActionsHandler() { + return extensionTransportActionsHandler; + } + + public void setExtensionTransportActionsHandler(ExtensionTransportActionsHandler extensionTransportActionsHandler) { + this.extensionTransportActionsHandler = extensionTransportActionsHandler; + } + + public NodeClient getClient() { + return client; + } + + public void setClient(NodeClient client) { + this.client = client; + } + } diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsSettings.java b/server/src/main/java/org/opensearch/extensions/ExtensionsSettings.java index 8b6226e578ea3..61ab481bc0b76 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsSettings.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsSettings.java @@ -9,6 +9,7 @@ package org.opensearch.extensions; import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** @@ -43,6 +44,7 @@ public static class Extension { private String className; private String customFolderName; private String hasNativeController; + private List dependencies = Collections.emptyList(); public Extension() { name = ""; @@ -184,6 +186,10 @@ public void setHasNativeController(String hasNativeController) { this.hasNativeController = hasNativeController; } + public List getDependencies() { + return dependencies; + } + } public List getExtensions() { diff --git a/server/src/main/java/org/opensearch/extensions/RegisterTransportActionsRequest.java b/server/src/main/java/org/opensearch/extensions/RegisterTransportActionsRequest.java index a3603aaf22dd0..47061f94dee83 100644 --- a/server/src/main/java/org/opensearch/extensions/RegisterTransportActionsRequest.java +++ b/server/src/main/java/org/opensearch/extensions/RegisterTransportActionsRequest.java @@ -8,6 +8,9 @@ package org.opensearch.extensions; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionResponse; +import org.opensearch.action.support.TransportAction; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.transport.TransportRequest; @@ -16,6 +19,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Map.Entry; /** * Request to register extension Transport actions @@ -23,20 +27,28 @@ * @opensearch.internal */ public class RegisterTransportActionsRequest extends TransportRequest { - private Map transportActions; + private String uniqueId; + private Map>> transportActions; - public RegisterTransportActionsRequest(Map transportActions) { + public RegisterTransportActionsRequest( + String uniqueId, + Map>> transportActions + ) { + this.uniqueId = uniqueId; this.transportActions = new HashMap<>(transportActions); } public RegisterTransportActionsRequest(StreamInput in) throws IOException { super(in); - Map actions = new HashMap<>(); + this.uniqueId = in.readString(); + Map>> actions = new HashMap<>(); int actionCount = in.readVInt(); for (int i = 0; i < actionCount; i++) { try { String actionName = in.readString(); - Class transportAction = Class.forName(in.readString()); + @SuppressWarnings("unchecked") + Class> transportAction = (Class< + ? extends TransportAction>) Class.forName(in.readString()); actions.put(actionName, transportAction); } catch (ClassNotFoundException e) { throw new IllegalArgumentException("Could not read transport action"); @@ -45,15 +57,21 @@ public RegisterTransportActionsRequest(StreamInput in) throws IOException { this.transportActions = actions; } - public Map getTransportActions() { + public String getUniqueId() { + return uniqueId; + } + + public Map>> getTransportActions() { return transportActions; } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + out.writeString(uniqueId); out.writeVInt(this.transportActions.size()); - for (Map.Entry action : transportActions.entrySet()) { + for (Entry>> action : transportActions + .entrySet()) { out.writeString(action.getKey()); out.writeString(action.getValue().getName()); } @@ -61,7 +79,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public String toString() { - return "TransportActionsRequest{actions=" + transportActions + "}"; + return "TransportActionsRequest{uniqueId=" + uniqueId + ", actions=" + transportActions + "}"; } @Override @@ -69,11 +87,11 @@ public boolean equals(Object obj) { if (this == obj) return true; if (obj == null || getClass() != obj.getClass()) return false; RegisterTransportActionsRequest that = (RegisterTransportActionsRequest) obj; - return Objects.equals(transportActions, that.transportActions); + return Objects.equals(uniqueId, that.uniqueId) && Objects.equals(transportActions, that.transportActions); } @Override public int hashCode() { - return Objects.hash(transportActions); + return Objects.hash(uniqueId, transportActions); } } diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionActionRequest.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionActionRequest.java new file mode 100644 index 0000000000000..801b40e847d21 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionActionRequest.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * This class translates Extension transport request to ActionRequest + * which is internally used to make transport action call. + * + * @opensearch.internal + */ +public class ExtensionActionRequest extends ActionRequest { + /** + * action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}. + */ + private final String action; + /** + * requestBytes is the raw bytes being transported between extensions. + */ + private final byte[] requestBytes; + + /** + * ExtensionActionRequest constructor. + * + * @param action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}. + * @param requestBytes is the raw bytes being transported between extensions. + */ + public ExtensionActionRequest(String action, byte[] requestBytes) { + this.action = action; + this.requestBytes = requestBytes; + } + + /** + * ExtensionActionRequest constructor from {@link StreamInput}. + * + * @param in bytes stream input used to de-serialize the message. + * @throws IOException when message de-serialization fails. + */ + ExtensionActionRequest(StreamInput in) throws IOException { + super(in); + action = in.readString(); + requestBytes = in.readByteArray(); + } + + public String getAction() { + return action; + } + + public byte[] getRequestBytes() { + return requestBytes; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(action); + out.writeByteArray(requestBytes); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } +} diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionActionResponse.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionActionResponse.java new file mode 100644 index 0000000000000..68729ada48c25 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionActionResponse.java @@ -0,0 +1,59 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.action.ActionResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * This class encapsulates the transport response from extension + * + * @opensearch.internal + */ +public class ExtensionActionResponse extends ActionResponse { + /** + * responseBytes is the raw bytes being transported between extensions. + */ + private byte[] responseBytes; + + /** + * ExtensionActionResponse constructor. + * + * @param responseBytes is the raw bytes being transported between extensions. + */ + public ExtensionActionResponse(byte[] responseBytes) { + this.responseBytes = responseBytes; + } + + /** + * ExtensionActionResponse constructor from {@link StreamInput}. + * + * @param in bytes stream input used to de-serialize the message. + * @throws IOException when message de-serialization fails. + */ + public ExtensionActionResponse(StreamInput in) throws IOException { + responseBytes = in.readByteArray(); + } + + public byte[] getResponseBytes() { + return responseBytes; + } + + public void setResponseBytes(byte[] responseBytes) { + this.responseBytes = responseBytes; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeByteArray(responseBytes); + } +} diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionHandleTransportRequest.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionHandleTransportRequest.java new file mode 100644 index 0000000000000..1b946d08f0459 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionHandleTransportRequest.java @@ -0,0 +1,89 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.transport.TransportRequest; + +import java.io.IOException; +import java.util.Objects; + +/** + * This class encapsulates a transport request to extension + * + * @opensearch.api + */ +public class ExtensionHandleTransportRequest extends TransportRequest { + /** + * action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}. + */ + private final String action; + /** + * requestBytes is the raw bytes being transported between extensions. + */ + private final byte[] requestBytes; + + /** + * ExtensionHandleTransportRequest constructor. + * + * @param action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}. + * @param requestBytes is the raw bytes being transported between extensions. + */ + public ExtensionHandleTransportRequest(String action, byte[] requestBytes) { + this.action = action; + this.requestBytes = requestBytes; + } + + /** + * ExtensionHandleTransportRequest constructor from {@link StreamInput}. + * + * @param in bytes stream input used to de-serialize the message. + * @throws IOException when message de-serialization fails. + */ + public ExtensionHandleTransportRequest(StreamInput in) throws IOException { + super(in); + this.action = in.readString(); + this.requestBytes = in.readByteArray(); + } + + public String getAction() { + return this.action; + } + + public byte[] getRequestBytes() { + return this.requestBytes; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(action); + out.writeByteArray(requestBytes); + } + + @Override + public String toString() { + return "ExtensionHandleTransportRequest{action=" + action + ", requestBytes=" + requestBytes + "}"; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + ExtensionHandleTransportRequest that = (ExtensionHandleTransportRequest) obj; + return Objects.equals(action, that.action) && Objects.equals(requestBytes, that.requestBytes); + } + + @Override + public int hashCode() { + return Objects.hash(action, requestBytes); + } + +} diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionProxyAction.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionProxyAction.java new file mode 100644 index 0000000000000..7345cf44e007f --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionProxyAction.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.action.ActionType; + +/** + * The main proxy action for all extensions + * + * @opensearch.internal + */ +public class ExtensionProxyAction extends ActionType { + public static final String NAME = "cluster:internal/extensions"; + public static final ExtensionProxyAction INSTANCE = new ExtensionProxyAction(); + + public ExtensionProxyAction() { + super(NAME, ExtensionActionResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportAction.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportAction.java new file mode 100644 index 0000000000000..5976db78002eb --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportAction.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Settings; +import org.opensearch.extensions.ExtensionsManager; +import org.opensearch.node.Node; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +/** + * The main proxy transport action used to proxy a transport request from extension to another extension + * + * @opensearch.internal + */ +public class ExtensionTransportAction extends HandledTransportAction { + + private final String nodeName; + private final ClusterService clusterService; + private final ExtensionsManager extensionsManager; + + @Inject + public ExtensionTransportAction( + Settings settings, + TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService, + ExtensionsManager extensionsManager + ) { + super(ExtensionProxyAction.NAME, transportService, actionFilters, ExtensionActionRequest::new); + this.nodeName = Node.NODE_NAME_SETTING.get(settings); + this.clusterService = clusterService; + this.extensionsManager = extensionsManager; + } + + @Override + protected void doExecute(Task task, ExtensionActionRequest request, ActionListener listener) { + try { + listener.onResponse(extensionsManager.handleTransportRequest(request)); + } catch (Exception e) { + listener.onFailure(e); + } + } +} diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java new file mode 100644 index 0000000000000..ac3ec6630634a --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java @@ -0,0 +1,193 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.extensions.DiscoveryExtensionNode; +import org.opensearch.extensions.AcknowledgedResponse; +import org.opensearch.extensions.ExtensionsManager; +import org.opensearch.extensions.RegisterTransportActionsRequest; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.ActionNotFoundTransportException; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponse; +import org.opensearch.transport.TransportResponseHandler; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * This class manages TransportActions for extensions + * + * @opensearch.internal + */ +public class ExtensionTransportActionsHandler { + private static final Logger logger = LogManager.getLogger(ExtensionTransportActionsHandler.class); + private Map actionsMap; + private final Map extensionIdMap; + private final TransportService transportService; + private final NodeClient client; + + public ExtensionTransportActionsHandler( + Map extensionIdMap, + TransportService transportService, + NodeClient client + ) { + this.actionsMap = new HashMap<>(); + this.extensionIdMap = extensionIdMap; + this.transportService = transportService; + this.client = client; + } + + /** + * Method to register actions for extensions. + * + * @param action to be registered. + * @param extension for which action is being registered. + * @throws IllegalArgumentException when action being registered already is registered. + */ + void registerAction(String action, DiscoveryExtensionNode extension) throws IllegalArgumentException { + if (actionsMap.containsKey(action)) { + throw new IllegalArgumentException("The " + action + " you are trying to register is already registered"); + } + actionsMap.putIfAbsent(action, extension); + } + + /** + * Method to get extension for a given action. + * + * @param action for which to get the registered extension. + * @return the extension. + */ + public DiscoveryExtensionNode getExtension(String action) { + return actionsMap.get(action); + } + + /** + * Handles a {@link RegisterTransportActionsRequest}. + * + * @param transportActionsRequest The request to handle. + * @return A {@link AcknowledgedResponse} indicating success. + */ + public TransportResponse handleRegisterTransportActionsRequest(RegisterTransportActionsRequest transportActionsRequest) { + /* + * We are proxying the transport Actions through ExtensionProxyAction, so we really dont need to register dynamic actions for now. + */ + logger.debug("Register Transport Actions request recieved {}", transportActionsRequest); + DiscoveryExtensionNode extension = extensionIdMap.get(transportActionsRequest.getUniqueId()); + try { + for (String action : transportActionsRequest.getTransportActions().keySet()) { + registerAction(action, extension); + } + } catch (Exception e) { + logger.error("Could not register Transport Action " + e); + return new AcknowledgedResponse(false); + } + return new AcknowledgedResponse(true); + } + + /** + * Method which handles transport action request from an extension. + * + * @param request from extension. + * @return {@link TransportResponse} which is sent back to the transport action invoker. + * @throws InterruptedException when message transport fails. + */ + public TransportResponse handleTransportActionRequestFromExtension(TransportActionRequestFromExtension request) + throws InterruptedException { + DiscoveryExtensionNode extension = extensionIdMap.get(request.getUniqueId()); + final CountDownLatch inProgressLatch = new CountDownLatch(1); + final TransportActionResponseToExtension response = new TransportActionResponseToExtension(new byte[0]); + client.execute( + ExtensionProxyAction.INSTANCE, + new ExtensionActionRequest(request.getAction(), request.getRequestBytes()), + new ActionListener() { + @Override + public void onResponse(ExtensionActionResponse actionResponse) { + response.setResponseBytes(actionResponse.getResponseBytes()); + inProgressLatch.countDown(); + } + + @Override + public void onFailure(Exception exp) { + logger.debug("Transport request failed", exp); + byte[] responseBytes = ("Request failed: " + exp.getMessage()).getBytes(StandardCharsets.UTF_8); + response.setResponseBytes(responseBytes); + inProgressLatch.countDown(); + } + } + ); + inProgressLatch.await(ExtensionsManager.EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS); + return response; + } + + /** + * Method to send transport action request to an extension to handle. + * + * @param request to extension to handle transport request. + * @return {@link ExtensionActionResponse} which encapsulates the transport response from the extension. + * @throws InterruptedException when message transport fails. + */ + public ExtensionActionResponse sendTransportRequestToExtension(ExtensionActionRequest request) throws InterruptedException { + DiscoveryExtensionNode extension = actionsMap.get(request.getAction()); + if (extension == null) { + throw new ActionNotFoundTransportException(request.getAction()); + } + final CountDownLatch inProgressLatch = new CountDownLatch(1); + final ExtensionActionResponse extensionActionResponse = new ExtensionActionResponse(new byte[0]); + final TransportResponseHandler extensionActionResponseTransportResponseHandler = + new TransportResponseHandler() { + + @Override + public ExtensionActionResponse read(StreamInput in) throws IOException { + return new ExtensionActionResponse(in); + } + + @Override + public void handleResponse(ExtensionActionResponse response) { + extensionActionResponse.setResponseBytes(response.getResponseBytes()); + inProgressLatch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + logger.debug("Transport request failed", exp); + byte[] responseBytes = ("Request failed: " + exp.getMessage()).getBytes(StandardCharsets.UTF_8); + extensionActionResponse.setResponseBytes(responseBytes); + inProgressLatch.countDown(); + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + }; + try { + transportService.sendRequest( + extension, + ExtensionsManager.REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION, + new ExtensionHandleTransportRequest(request.getAction(), request.getRequestBytes()), + extensionActionResponseTransportResponseHandler + ); + } catch (Exception e) { + logger.info("Failed to send transport action to extension " + extension.getName(), e); + } + inProgressLatch.await(ExtensionsManager.EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS); + return extensionActionResponse; + } +} diff --git a/server/src/main/java/org/opensearch/extensions/action/TransportActionRequestFromExtension.java b/server/src/main/java/org/opensearch/extensions/action/TransportActionRequestFromExtension.java new file mode 100644 index 0000000000000..df494297559b3 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/TransportActionRequestFromExtension.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.transport.TransportRequest; + +import java.io.IOException; +import java.util.Objects; + +/** + * Transport Action Request from Extension + * + * @opensearch.api + */ +public class TransportActionRequestFromExtension extends TransportRequest { + /** + * action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}. + */ + private final String action; + /** + * requestBytes is the raw bytes being transported between extensions. + */ + private final byte[] requestBytes; + /** + * uniqueId to identify which extension is making a transport request call. + */ + private final String uniqueId; + + /** + * TransportActionRequestFromExtension constructor. + * + * @param action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}. + * @param requestBytes is the raw bytes being transported between extensions. + * @param uniqueId to identify which extension is making a transport request call. + */ + public TransportActionRequestFromExtension(String action, byte[] requestBytes, String uniqueId) { + this.action = action; + this.requestBytes = requestBytes; + this.uniqueId = uniqueId; + } + + /** + * TransportActionRequestFromExtension constructor from {@link StreamInput}. + * + * @param in bytes stream input used to de-serialize the message. + * @throws IOException when message de-serialization fails. + */ + public TransportActionRequestFromExtension(StreamInput in) throws IOException { + super(in); + this.action = in.readString(); + this.requestBytes = in.readByteArray(); + this.uniqueId = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(action); + out.writeByteArray(requestBytes); + out.writeString(uniqueId); + } + + public String getAction() { + return this.action; + } + + public byte[] getRequestBytes() { + return this.requestBytes; + } + + public String getUniqueId() { + return this.uniqueId; + } + + @Override + public String toString() { + return "TransportActionRequestFromExtension{action=" + action + ", requestBytes=" + requestBytes + ", uniqueId=" + uniqueId + "}"; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + TransportActionRequestFromExtension that = (TransportActionRequestFromExtension) obj; + return Objects.equals(action, that.action) + && Objects.equals(requestBytes, that.requestBytes) + && Objects.equals(uniqueId, that.uniqueId); + } + + @Override + public int hashCode() { + return Objects.hash(action, requestBytes, uniqueId); + } +} diff --git a/server/src/main/java/org/opensearch/extensions/action/TransportActionResponseToExtension.java b/server/src/main/java/org/opensearch/extensions/action/TransportActionResponseToExtension.java new file mode 100644 index 0000000000000..2913402bcd5e1 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/TransportActionResponseToExtension.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.transport.TransportResponse; + +import java.io.IOException; + +/** + * This class encapsulates transport response to extension. + * + * @opensearch.api + */ +public class TransportActionResponseToExtension extends TransportResponse { + /** + * responseBytes is the raw bytes being transported between extensions. + */ + private byte[] responseBytes; + + /** + * TransportActionResponseToExtension constructor. + * + * @param responseBytes is the raw bytes being transported between extensions. + */ + public TransportActionResponseToExtension(byte[] responseBytes) { + this.responseBytes = responseBytes; + } + + /** + * TransportActionResponseToExtension constructor from {@link StreamInput} + * @param in bytes stream input used to de-serialize the message. + * @throws IOException when message de-serialization fails. + */ + public TransportActionResponseToExtension(StreamInput in) throws IOException { + this.responseBytes = in.readByteArray(); + } + + public void setResponseBytes(byte[] responseBytes) { + this.responseBytes = responseBytes; + } + + public byte[] getResponseBytes() { + return responseBytes; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeByteArray(responseBytes); + } +} diff --git a/server/src/main/java/org/opensearch/extensions/action/package-info.java b/server/src/main/java/org/opensearch/extensions/action/package-info.java new file mode 100644 index 0000000000000..9bad08eaeb921 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Actions classes for the extensions package. OpenSearch extensions provide extensibility to OpenSearch.*/ +package org.opensearch.extensions.action; diff --git a/server/src/main/java/org/opensearch/extensions/rest/ExtensionRestRequest.java b/server/src/main/java/org/opensearch/extensions/rest/ExtensionRestRequest.java new file mode 100644 index 0000000000000..da59578b4917e --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/rest/ExtensionRestRequest.java @@ -0,0 +1,292 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.rest; + +import org.opensearch.OpenSearchParseException; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestRequest.Method; +import org.opensearch.transport.TransportRequest; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * Request to execute REST actions on extension node. + * This contains necessary portions of a {@link RestRequest} object, but does not pass the full request for security concerns. + * + * @opensearch.api + */ +public class ExtensionRestRequest extends TransportRequest { + + private Method method; + private String path; + private Map params; + private XContentType xContentType = null; + private BytesReference content; + // The owner of this request object + // Will be replaced with PrincipalIdentifierToken class from feature/identity + private String principalIdentifierToken; + + // Tracks consumed parameters and content + private final Set consumedParams = new HashSet<>(); + private boolean contentConsumed = false; + + /** + * This object can be instantiated given method, uri, params, content and identifier + * + * @param method of type {@link Method} + * @param path the REST path string (excluding the query) + * @param params the REST params + * @param xContentType the content type, or null for plain text or no content + * @param content the REST request content + * @param principalIdentifier the owner of this request + */ + public ExtensionRestRequest( + Method method, + String path, + Map params, + XContentType xContentType, + BytesReference content, + String principalIdentifier + ) { + this.method = method; + this.path = path; + this.params = params; + this.xContentType = xContentType; + this.content = content; + this.principalIdentifierToken = principalIdentifier; + } + + /** + * Instantiate this request from input stream + * + * @param in Input stream + * @throws IOException on failure to read the stream + */ + public ExtensionRestRequest(StreamInput in) throws IOException { + super(in); + method = in.readEnum(RestRequest.Method.class); + path = in.readString(); + params = in.readMap(StreamInput::readString, StreamInput::readString); + if (in.readBoolean()) { + xContentType = in.readEnum(XContentType.class); + } + content = in.readBytesReference(); + principalIdentifierToken = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeEnum(method); + out.writeString(path); + out.writeMap(params, StreamOutput::writeString, StreamOutput::writeString); + out.writeBoolean(xContentType != null); + if (xContentType != null) { + out.writeEnum(xContentType); + } + out.writeBytesReference(content); + out.writeString(principalIdentifierToken); + } + + /** + * Gets the REST method + * + * @return This REST request {@link Method} type + */ + public Method method() { + return method; + } + + /** + * Gets the REST path + * + * @return This REST request's path + */ + public String path() { + return path; + } + + /** + * Gets the full map of params without consuming them. Rest Handlers should use {@link #param(String)} or {@link #param(String, String)} + * to get parameter values. + * + * @return This REST request's params + */ + public Map params() { + return params; + } + + /** + * Tests whether a parameter named {@code key} exists. + * + * @param key The parameter to test. + * @return True if there is a value for this parameter. + */ + public boolean hasParam(String key) { + return params.containsKey(key); + } + + /** + * Gets the value of a parameter, consuming it in the process. + * + * @param key The parameter key + * @return The parameter value if it exists, or null. + */ + public String param(String key) { + consumedParams.add(key); + return params.get(key); + } + + /** + * Gets the value of a parameter, consuming it in the process. + * + * @param key The parameter key + * @param defaultValue A value to return if the parameter value doesn't exist. + * @return The parameter value if it exists, or the default value. + */ + public String param(String key, String defaultValue) { + consumedParams.add(key); + return params.getOrDefault(key, defaultValue); + } + + /** + * Gets the value of a parameter as a long, consuming it in the process. + * + * @param key The parameter key + * @param defaultValue A value to return if the parameter value doesn't exist. + * @return The parameter value if it exists, or the default value. + */ + public long paramAsLong(String key, long defaultValue) { + String value = param(key); + if (value == null) { + return defaultValue; + } + try { + return Long.parseLong(value); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Unable to parse param '" + key + "' value '" + value + "' to a long.", e); + } + } + + /** + * Returns parameters consumed by {@link #param(String)} or {@link #param(String, String)}. + * + * @return a list of consumed parameters. + */ + public List consumedParams() { + return new ArrayList<>(consumedParams); + } + + /** + * Gets the content type, if any. + * + * @return the content type of the {@link #content()}, or null if the context is plain text or if there is no content. + */ + public XContentType getXContentType() { + return xContentType; + } + + /** + * Gets the content. + * + * @return This REST request's content + */ + public BytesReference content() { + contentConsumed = true; + return content; + } + + /** + * Tests whether content exists. + * + * @return True if there is non-empty content. + */ + public boolean hasContent() { + return content.length() > 0; + } + + /** + * Tests whether content has been consumed. + * + * @return True if the content was consumed. + */ + public boolean isContentConsumed() { + return contentConsumed; + } + + /** + * Gets a parser for the contents of this request if there is content and an xContentType. + * + * @param xContentRegistry The extension's xContentRegistry + * @return A parser for the given content and content type. + * @throws OpenSearchParseException on missing body or xContentType. + * @throws IOException on a failure creating the parser. + */ + public final XContentParser contentParser(NamedXContentRegistry xContentRegistry) throws IOException { + if (!hasContent() || getXContentType() == null) { + throw new OpenSearchParseException("There is no request body or the ContentType is invalid."); + } + return getXContentType().xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, content.streamInput()); + } + + /** + * @return This REST request issuer's identity token + */ + public String getRequestIssuerIdentity() { + return principalIdentifierToken; + } + + @Override + public String toString() { + return "ExtensionRestRequest{method=" + + method + + ", path=" + + path + + ", params=" + + params + + ", xContentType=" + + xContentType + + ", contentLength=" + + content.length() + + ", requester=" + + principalIdentifierToken + + "}"; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + ExtensionRestRequest that = (ExtensionRestRequest) obj; + return Objects.equals(method, that.method) + && Objects.equals(path, that.path) + && Objects.equals(params, that.params) + && Objects.equals(xContentType, that.xContentType) + && Objects.equals(content, that.content) + && Objects.equals(principalIdentifierToken, that.principalIdentifierToken); + } + + @Override + public int hashCode() { + return Objects.hash(method, path, params, xContentType, content, principalIdentifierToken); + } +} diff --git a/server/src/main/java/org/opensearch/extensions/rest/ExtensionRestResponse.java b/server/src/main/java/org/opensearch/extensions/rest/ExtensionRestResponse.java new file mode 100644 index 0000000000000..0eb59823bee93 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/rest/ExtensionRestResponse.java @@ -0,0 +1,113 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.rest; + +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestStatus; + +import java.util.List; + +/** + * A subclass of {@link BytesRestResponse} which also tracks consumed parameters and content. + * + * @opensearch.api + */ +public class ExtensionRestResponse extends BytesRestResponse { + + private final List consumedParams; + private final boolean contentConsumed; + + /** + * Creates a new response based on {@link XContentBuilder}. + * + * @param request the REST request being responded to. + * @param status The REST status. + * @param builder The builder for the response. + */ + public ExtensionRestResponse(ExtensionRestRequest request, RestStatus status, XContentBuilder builder) { + super(status, builder); + this.consumedParams = request.consumedParams(); + this.contentConsumed = request.isContentConsumed(); + } + + /** + * Creates a new plain text response. + * + * @param request the REST request being responded to. + * @param status The REST status. + * @param content A plain text response string. + */ + public ExtensionRestResponse(ExtensionRestRequest request, RestStatus status, String content) { + super(status, content); + this.consumedParams = request.consumedParams(); + this.contentConsumed = request.isContentConsumed(); + } + + /** + * Creates a new plain text response. + * + * @param request the REST request being responded to. + * @param status The REST status. + * @param contentType The content type of the response string. + * @param content A response string. + */ + public ExtensionRestResponse(ExtensionRestRequest request, RestStatus status, String contentType, String content) { + super(status, contentType, content); + this.consumedParams = request.consumedParams(); + this.contentConsumed = request.isContentConsumed(); + } + + /** + * Creates a binary response. + * + * @param request the REST request being responded to. + * @param status The REST status. + * @param contentType The content type of the response bytes. + * @param content Response bytes. + */ + public ExtensionRestResponse(ExtensionRestRequest request, RestStatus status, String contentType, byte[] content) { + super(status, contentType, content); + this.consumedParams = request.consumedParams(); + this.contentConsumed = request.isContentConsumed(); + } + + /** + * Creates a binary response. + * + * @param request the REST request being responded to. + * @param status The REST status. + * @param contentType The content type of the response bytes. + * @param content Response bytes. + */ + public ExtensionRestResponse(ExtensionRestRequest request, RestStatus status, String contentType, BytesReference content) { + super(status, contentType, content); + this.consumedParams = request.consumedParams(); + this.contentConsumed = request.isContentConsumed(); + } + + /** + * Gets the list of consumed parameters. These are needed to consume the parameters of the original request. + * + * @return the list of consumed params. + */ + public List getConsumedParams() { + return consumedParams; + } + + /** + * Reports whether content was consumed. + * + * @return true if the content was consumed, false otherwise. + */ + public boolean isContentConsumed() { + return contentConsumed; + } +} diff --git a/server/src/main/java/org/opensearch/extensions/rest/RestExecuteOnExtensionRequest.java b/server/src/main/java/org/opensearch/extensions/rest/RestExecuteOnExtensionRequest.java deleted file mode 100644 index 128dad2645b42..0000000000000 --- a/server/src/main/java/org/opensearch/extensions/rest/RestExecuteOnExtensionRequest.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.extensions.rest; - -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.rest.RestRequest; -import org.opensearch.rest.RestRequest.Method; -import org.opensearch.transport.TransportRequest; - -import java.io.IOException; -import java.util.Objects; - -/** - * Request to execute REST actions on extension node - * - * @opensearch.internal - */ -public class RestExecuteOnExtensionRequest extends TransportRequest { - - private Method method; - private String uri; - - public RestExecuteOnExtensionRequest(Method method, String uri) { - this.method = method; - this.uri = uri; - } - - public RestExecuteOnExtensionRequest(StreamInput in) throws IOException { - super(in); - try { - method = RestRequest.Method.valueOf(in.readString()); - } catch (IllegalArgumentException e) { - throw new IOException(e); - } - uri = in.readString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(method.name()); - out.writeString(uri); - } - - public Method getMethod() { - return method; - } - - public String getUri() { - return uri; - } - - @Override - public String toString() { - return "RestExecuteOnExtensionRequest{method=" + method + ", uri=" + uri + "}"; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) return true; - if (obj == null || getClass() != obj.getClass()) return false; - RestExecuteOnExtensionRequest that = (RestExecuteOnExtensionRequest) obj; - return Objects.equals(method, that.method) && Objects.equals(uri, that.uri); - } - - @Override - public int hashCode() { - return Objects.hash(method, uri); - } -} diff --git a/server/src/main/java/org/opensearch/extensions/rest/RestExecuteOnExtensionResponse.java b/server/src/main/java/org/opensearch/extensions/rest/RestExecuteOnExtensionResponse.java index b7d7aae3faaab..e2625105e705c 100644 --- a/server/src/main/java/org/opensearch/extensions/rest/RestExecuteOnExtensionResponse.java +++ b/server/src/main/java/org/opensearch/extensions/rest/RestExecuteOnExtensionResponse.java @@ -10,14 +10,11 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestResponse; import org.opensearch.rest.RestStatus; import org.opensearch.transport.TransportResponse; import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -27,20 +24,13 @@ * @opensearch.internal */ public class RestExecuteOnExtensionResponse extends TransportResponse { + private RestStatus status; private String contentType; private byte[] content; private Map> headers; - - /** - * Instantiate this object with a status and response string. - * - * @param status The REST status. - * @param responseString The response content as a String. - */ - public RestExecuteOnExtensionResponse(RestStatus status, String responseString) { - this(status, BytesRestResponse.TEXT_CONTENT_TYPE, responseString.getBytes(StandardCharsets.UTF_8), Collections.emptyMap()); - } + private List consumedParams; + private boolean contentConsumed; /** * Instantiate this object with the components of a {@link RestResponse}. @@ -49,33 +39,49 @@ public RestExecuteOnExtensionResponse(RestStatus status, String responseString) * @param contentType The type of the content. * @param content The content. * @param headers The headers. + * @param consumedParams The consumed params. + * @param contentConsumed Whether content was consumed. */ - public RestExecuteOnExtensionResponse(RestStatus status, String contentType, byte[] content, Map> headers) { + public RestExecuteOnExtensionResponse( + RestStatus status, + String contentType, + byte[] content, + Map> headers, + List consumedParams, + boolean contentConsumed + ) { + super(); setStatus(status); setContentType(contentType); setContent(content); setHeaders(headers); + setConsumedParams(consumedParams); + setContentConsumed(contentConsumed); } /** - * Instantiate this object from a Transport Stream + * Instantiate this object from a Transport Stream. * * @param in The stream input. * @throws IOException on transport failure. */ public RestExecuteOnExtensionResponse(StreamInput in) throws IOException { - setStatus(RestStatus.readFrom(in)); + setStatus(in.readEnum(RestStatus.class)); setContentType(in.readString()); setContent(in.readByteArray()); setHeaders(in.readMapOfLists(StreamInput::readString, StreamInput::readString)); + setConsumedParams(in.readStringList()); + setContentConsumed(in.readBoolean()); } @Override public void writeTo(StreamOutput out) throws IOException { - RestStatus.writeTo(out, status); + out.writeEnum(status); out.writeString(contentType); out.writeByteArray(content); out.writeMapOfLists(headers, StreamOutput::writeString, StreamOutput::writeString); + out.writeStringCollection(consumedParams); + out.writeBoolean(contentConsumed); } public RestStatus getStatus() { @@ -109,4 +115,20 @@ public Map> getHeaders() { public void setHeaders(Map> headers) { this.headers = Map.copyOf(headers); } + + public List getConsumedParams() { + return consumedParams; + } + + public void setConsumedParams(List consumedParams) { + this.consumedParams = consumedParams; + } + + public boolean isContentConsumed() { + return contentConsumed; + } + + public void setContentConsumed(boolean contentConsumed) { + this.contentConsumed = contentConsumed; + } } diff --git a/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java b/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java index d08a74c0ba314..38e92ed604a09 100644 --- a/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java +++ b/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java @@ -11,7 +11,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.client.node.NodeClient; +import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.xcontent.XContentType; import org.opensearch.extensions.DiscoveryExtensionNode; import org.opensearch.extensions.ExtensionsManager; import org.opensearch.rest.BaseRestHandler; @@ -26,14 +28,13 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.security.Principal; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - +import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableList; @@ -44,17 +45,23 @@ public class RestSendToExtensionAction extends BaseRestHandler { private static final String SEND_TO_EXTENSION_ACTION = "send_to_extension_action"; private static final Logger logger = LogManager.getLogger(RestSendToExtensionAction.class); - private static final String CONSUMED_PARAMS_KEY = "extension.consumed.parameters"; + // To replace with user identity see https://github.com/opensearch-project/OpenSearch/pull/4247 + private static final Principal DEFAULT_PRINCIPAL = new Principal() { + @Override + public String getName() { + return "OpenSearchUser"; + } + }; private final List routes; - private final String uriPrefix; + private final String pathPrefix; private final DiscoveryExtensionNode discoveryExtensionNode; private final TransportService transportService; /** * Instantiates this object using a {@link RegisterRestActionsRequest} to populate the routes. * - * @param restActionsRequest A request encapsulating a list of Strings with the API methods and URIs. + * @param restActionsRequest A request encapsulating a list of Strings with the API methods and paths. * @param transportService The OpenSearch transport service * @param discoveryExtensionNode The extension node to which to send actions */ @@ -63,20 +70,20 @@ public RestSendToExtensionAction( DiscoveryExtensionNode discoveryExtensionNode, TransportService transportService ) { - this.uriPrefix = "/_extensions/_" + restActionsRequest.getUniqueId(); + this.pathPrefix = "/_extensions/_" + restActionsRequest.getUniqueId(); List restActionsAsRoutes = new ArrayList<>(); for (String restAction : restActionsRequest.getRestActions()) { RestRequest.Method method; - String uri; + String path; try { int delim = restAction.indexOf(' '); method = RestRequest.Method.valueOf(restAction.substring(0, delim)); - uri = uriPrefix + restAction.substring(delim).trim(); + path = pathPrefix + restAction.substring(delim).trim(); } catch (IndexOutOfBoundsException | IllegalArgumentException e) { throw new IllegalArgumentException(restAction + " does not begin with a valid REST method"); } - logger.info("Registering: " + method + " " + uri); - restActionsAsRoutes.add(new Route(method, uri)); + logger.info("Registering: " + method + " " + path); + restActionsAsRoutes.add(new Route(method, path)); } this.routes = unmodifiableList(restActionsAsRoutes); this.discoveryExtensionNode = discoveryExtensionNode; @@ -95,21 +102,27 @@ public List routes() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - Method method = request.getHttpRequest().method(); - String uri = request.getHttpRequest().uri(); - if (uri.startsWith(uriPrefix)) { - uri = uri.substring(uriPrefix.length()); + Method method = request.method(); + String path = request.path(); + Map params = request.params(); + XContentType contentType = request.getXContentType(); + BytesReference content = request.content(); + + if (path.startsWith(pathPrefix)) { + path = path.substring(pathPrefix.length()); } - String message = "Forwarding the request " + method + " " + uri + " to " + discoveryExtensionNode; + String message = "Forwarding the request " + method + " " + path + " to " + discoveryExtensionNode; logger.info(message); // Initialize response. Values will be changed in the handler. final RestExecuteOnExtensionResponse restExecuteOnExtensionResponse = new RestExecuteOnExtensionResponse( RestStatus.INTERNAL_SERVER_ERROR, BytesRestResponse.TEXT_CONTENT_TYPE, message.getBytes(StandardCharsets.UTF_8), - emptyMap() + emptyMap(), + emptyList(), + false ); - final CompletableFuture inProgressFuture = new CompletableFuture<>(); + final CountDownLatch inProgressLatch = new CountDownLatch(1); final TransportResponseHandler restExecuteOnExtensionResponseHandler = new TransportResponseHandler< RestExecuteOnExtensionResponse>() { @@ -124,27 +137,21 @@ public void handleResponse(RestExecuteOnExtensionResponse response) { restExecuteOnExtensionResponse.setStatus(response.getStatus()); restExecuteOnExtensionResponse.setContentType(response.getContentType()); restExecuteOnExtensionResponse.setContent(response.getContent()); - // Extract the consumed parameters from the header - Map> headers = response.getHeaders(); - List consumedParams = headers.get(CONSUMED_PARAMS_KEY); - if (consumedParams != null) { - consumedParams.stream().forEach(p -> request.param(p)); + restExecuteOnExtensionResponse.setHeaders(response.getHeaders()); + // Consume parameters and content + response.getConsumedParams().stream().forEach(p -> request.param(p)); + if (response.isContentConsumed()) { + request.content(); } - Map> headersWithoutConsumedParams = headers.entrySet() - .stream() - .filter(e -> !e.getKey().equals(CONSUMED_PARAMS_KEY)) - .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); - restExecuteOnExtensionResponse.setHeaders(headersWithoutConsumedParams); - inProgressFuture.complete(response); } @Override public void handleException(TransportException exp) { - logger.error("REST request failed", exp); + logger.debug("REST request failed", exp); // Status is already defaulted to 500 (INTERNAL_SERVER_ERROR) byte[] responseBytes = ("Request failed: " + exp.getMessage()).getBytes(StandardCharsets.UTF_8); restExecuteOnExtensionResponse.setContent(responseBytes); - inProgressFuture.completeExceptionally(exp); + inProgressLatch.countDown(); } @Override @@ -153,17 +160,20 @@ public String executor() { } }; try { + // Will be replaced with ExtensionTokenProcessor and PrincipalIdentifierToken classes from feature/identity + final String extensionTokenProcessor = "placeholder_token_processor"; + final String requestIssuerIdentity = "placeholder_request_issuer_identity"; + transportService.sendRequest( discoveryExtensionNode, ExtensionsManager.REQUEST_REST_EXECUTE_ON_EXTENSION_ACTION, // HERE BE DRAGONS - DO NOT INCLUDE HEADERS // SEE https://github.com/opensearch-project/OpenSearch/issues/4429 - new RestExecuteOnExtensionRequest(method, uri), + new ExtensionRestRequest(method, path, params, contentType, content, requestIssuerIdentity), restExecuteOnExtensionResponseHandler ); try { - // TODO: make asynchronous - inProgressFuture.get(5, TimeUnit.SECONDS); + inProgressLatch.await(5, TimeUnit.SECONDS); } catch (InterruptedException e) { return channel -> channel.sendResponse( new BytesRestResponse(RestStatus.REQUEST_TIMEOUT, "No response from extension to request.") @@ -177,11 +187,11 @@ public String executor() { restExecuteOnExtensionResponse.getContentType(), restExecuteOnExtensionResponse.getContent() ); - for (Entry> headerEntry : restExecuteOnExtensionResponse.getHeaders().entrySet()) { - for (String value : headerEntry.getValue()) { - restResponse.addHeader(headerEntry.getKey(), value); - } - } + // No constructor that includes headers so we roll our own + restExecuteOnExtensionResponse.getHeaders() + .entrySet() + .stream() + .forEach(e -> { e.getValue().stream().forEach(v -> restResponse.addHeader(e.getKey(), v)); }); return channel -> channel.sendResponse(restResponse); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index e127826921fe9..46270230ccf27 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -833,7 +833,8 @@ protected Node( settingsModule, transportService, clusterService, - environment.settings() + environment.settings(), + client ); } final GatewayMetaState gatewayMetaState = new GatewayMetaState(); @@ -1001,6 +1002,9 @@ protected Node( b.bind(Client.class).toInstance(client); b.bind(NodeClient.class).toInstance(client); b.bind(Environment.class).toInstance(this.environment); + if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { + b.bind(ExtensionsManager.class).toInstance(this.extensionsManager); + } b.bind(ThreadPool.class).toInstance(threadPool); b.bind(NodeEnvironment.class).toInstance(nodeEnvironment); b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService); diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java index 472899db7dad6..47820ee739c49 100644 --- a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java +++ b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java @@ -42,7 +42,6 @@ import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterSettingsResponse; -import org.opensearch.cluster.LocalNodeResponse; import org.opensearch.env.EnvironmentSettingsResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; @@ -79,6 +78,7 @@ import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.MockLogAppender; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.client.NoOpNodeClient; import org.opensearch.test.transport.MockTransportService; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -94,6 +94,7 @@ public class ExtensionsManagerTests extends OpenSearchTestCase { private RestController restController; private SettingsModule settingsModule; private ClusterService clusterService; + private NodeClient client; private MockNioTransport transport; private Path extensionDir; private final ThreadPool threadPool = new TestThreadPool(ExtensionsManagerTests.class.getSimpleName()); @@ -126,7 +127,10 @@ public class ExtensionsManagerTests extends OpenSearchTestCase { " javaVersion: '17'", " className: fakeClass2", " customFolderName: fakeFolder2", - " hasNativeController: true" + " hasNativeController: true", + " dependencies:", + " - uniqueId: 'uniqueid0'", + " - version: '2.0.0'" ); private DiscoveryExtensionNode extensionNode; @@ -190,8 +194,10 @@ public void setup() throws Exception { "fakeClass1", new ArrayList(), false - ) + ), + Collections.emptyList() ); + client = new NoOpNodeClient(this.getTestName()); } @Override @@ -199,6 +205,7 @@ public void setup() throws Exception { public void tearDown() throws Exception { super.tearDown(); transportService.close(); + client.close(); ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } @@ -209,6 +216,10 @@ public void testDiscover() throws Exception { List expectedUninitializedExtensions = new ArrayList(); + String expectedUniqueId = "uniqueid0"; + Version expectedVersion = Version.fromString("2.0.0"); + ExtensionDependency expectedDependency = new ExtensionDependency(expectedUniqueId, expectedVersion); + expectedUninitializedExtensions.add( new DiscoveryExtensionNode( "firstExtension", @@ -228,7 +239,8 @@ public void testDiscover() throws Exception { "fakeClass1", new ArrayList(), false - ) + ), + Collections.emptyList() ) ); @@ -251,10 +263,12 @@ public void testDiscover() throws Exception { "fakeClass2", new ArrayList(), true - ) + ), + List.of(expectedDependency) ) ); assertEquals(expectedUninitializedExtensions.size(), extensionsManager.getExtensionIdMap().values().size()); + assertEquals(List.of(expectedDependency), expectedUninitializedExtensions.get(1).getDependencies()); assertTrue(expectedUninitializedExtensions.containsAll(extensionsManager.getExtensionIdMap().values())); assertTrue(extensionsManager.getExtensionIdMap().values().containsAll(expectedUninitializedExtensions)); } @@ -289,12 +303,74 @@ public void testNonUniqueExtensionsDiscovery() throws Exception { "fakeClass1", new ArrayList(), false - ) + ), + Collections.emptyList() ) ); assertEquals(expectedUninitializedExtensions.size(), extensionsManager.getExtensionIdMap().values().size()); assertTrue(expectedUninitializedExtensions.containsAll(extensionsManager.getExtensionIdMap().values())); assertTrue(extensionsManager.getExtensionIdMap().values().containsAll(expectedUninitializedExtensions)); + assertTrue(expectedUninitializedExtensions.containsAll(emptyList())); + } + + public void testDiscoveryExtension() throws Exception { + String expectedId = "test id"; + Version expectedVersion = Version.fromString("2.0.0"); + ExtensionDependency expectedDependency = new ExtensionDependency(expectedId, expectedVersion); + + DiscoveryExtensionNode discoveryExtensionNode = new DiscoveryExtensionNode( + "firstExtension", + "uniqueid1", + "uniqueid1", + "myIndependentPluginHost1", + "127.0.0.0", + new TransportAddress(InetAddress.getByName("127.0.0.0"), 9300), + new HashMap(), + Version.fromString("3.0.0"), + new PluginInfo( + "firstExtension", + "Fake description 1", + "0.0.7", + Version.fromString("3.0.0"), + "14", + "fakeClass1", + new ArrayList(), + false + ), + List.of(expectedDependency) + ); + + assertEquals(List.of(expectedDependency), discoveryExtensionNode.getDependencies()); + + try (BytesStreamOutput out = new BytesStreamOutput()) { + discoveryExtensionNode.writeTo(out); + out.flush(); + try (BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes()))) { + discoveryExtensionNode = new DiscoveryExtensionNode(in); + + assertEquals(List.of(expectedDependency), discoveryExtensionNode.getDependencies()); + } + } + } + + public void testExtensionDependency() throws Exception { + String expectedUniqueId = "Test uniqueId"; + Version expectedVersion = Version.fromString("3.0.0"); + + ExtensionDependency dependency = new ExtensionDependency(expectedUniqueId, expectedVersion); + + assertEquals(expectedUniqueId, dependency.getUniqueId()); + assertEquals(expectedVersion, dependency.getVersion()); + + try (BytesStreamOutput out = new BytesStreamOutput()) { + dependency.writeTo(out); + out.flush(); + try (BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes()))) { + dependency = new ExtensionDependency(in); + assertEquals(expectedUniqueId, dependency.getUniqueId()); + assertEquals(expectedVersion, dependency.getVersion()); + } + } } public void testNonAccessibleDirectory() throws Exception { @@ -339,12 +415,9 @@ public void testEmptyExtensionsFile() throws Exception { public void testInitialize() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); - transportService.start(); - transportService.acceptIncomingRequests(); - extensionsManager.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); + initialize(extensionsManager); try (MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(LogManager.getLogger(ExtensionsManager.class))) { @@ -379,8 +452,8 @@ public void testHandleRegisterRestActionsRequest() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); + initialize(extensionsManager); - extensionsManager.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); String uniqueIdStr = "uniqueid1"; List actionsList = List.of("GET /foo", "PUT /bar", "POST /baz"); RegisterRestActionsRequest registerActionsRequest = new RegisterRestActionsRequest(uniqueIdStr, actionsList); @@ -392,10 +465,9 @@ public void testHandleRegisterRestActionsRequest() throws Exception { public void testHandleRegisterSettingsRequest() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); + initialize(extensionsManager); - extensionsManager.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); String uniqueIdStr = "uniqueid1"; List> settingsList = List.of( Setting.boolSetting("index.falseSetting", false, Property.IndexScope, Property.Dynamic), @@ -410,8 +482,8 @@ public void testHandleRegisterSettingsRequest() throws Exception { public void testHandleRegisterRestActionsRequestWithInvalidMethod() throws Exception { ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); + initialize(extensionsManager); - extensionsManager.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); String uniqueIdStr = "uniqueid1"; List actionsList = List.of("FOO /foo", "PUT /bar", "POST /baz"); RegisterRestActionsRequest registerActionsRequest = new RegisterRestActionsRequest(uniqueIdStr, actionsList); @@ -422,12 +494,8 @@ public void testHandleRegisterRestActionsRequestWithInvalidMethod() throws Excep } public void testHandleRegisterRestActionsRequestWithInvalidUri() throws Exception { - - Path extensionDir = createTempDir(); - ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); - - extensionsManager.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); + initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; List actionsList = List.of("GET", "PUT /bar", "POST /baz"); RegisterRestActionsRequest registerActionsRequest = new RegisterRestActionsRequest(uniqueIdStr, actionsList); @@ -438,19 +506,15 @@ public void testHandleRegisterRestActionsRequestWithInvalidUri() throws Exceptio } public void testHandleExtensionRequest() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); + initialize(extensionsManager); - extensionsManager.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); ExtensionRequest clusterStateRequest = new ExtensionRequest(ExtensionsManager.RequestType.REQUEST_EXTENSION_CLUSTER_STATE); assertEquals(ClusterStateResponse.class, extensionsManager.handleExtensionRequest(clusterStateRequest).getClass()); ExtensionRequest clusterSettingRequest = new ExtensionRequest(ExtensionsManager.RequestType.REQUEST_EXTENSION_CLUSTER_SETTINGS); assertEquals(ClusterSettingsResponse.class, extensionsManager.handleExtensionRequest(clusterSettingRequest).getClass()); - ExtensionRequest localNodeRequest = new ExtensionRequest(ExtensionsManager.RequestType.REQUEST_EXTENSION_LOCAL_NODE); - assertEquals(LocalNodeResponse.class, extensionsManager.handleExtensionRequest(localNodeRequest).getClass()); - ExtensionRequest environmentSettingsRequest = new ExtensionRequest( ExtensionsManager.RequestType.REQUEST_EXTENSION_ENVIRONMENT_SETTINGS ); @@ -529,7 +593,7 @@ public void testAddSettingsUpdateConsumerRequest() throws Exception { Path extensionDir = createTempDir(); Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); - extensionsManager.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); + initialize(extensionsManager); List> componentSettings = List.of( Setting.boolSetting("falseSetting", false, Property.IndexScope, Property.NodeScope), @@ -576,8 +640,7 @@ public void testHandleAddSettingsUpdateConsumerRequest() throws Exception { Path extensionDir = createTempDir(); Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); - - extensionsManager.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); + initialize(extensionsManager); List> componentSettings = List.of( Setting.boolSetting("falseSetting", false, Property.Dynamic), @@ -599,7 +662,7 @@ public void testUpdateSettingsRequest() throws Exception { Path extensionDir = createTempDir(); Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); - extensionsManager.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); + initialize(extensionsManager); Setting componentSetting = Setting.boolSetting("falseSetting", false, Property.Dynamic); SettingType settingType = SettingType.Boolean; @@ -640,19 +703,22 @@ public void testRegisterHandler() throws Exception { Collections.emptySet() ) ); - extensionsManager.initializeServicesAndRestHandler(restController, settingsModule, mockTransportService, clusterService, settings); + extensionsManager.initializeServicesAndRestHandler( + restController, + settingsModule, + mockTransportService, + clusterService, + settings, + client + ); verify(mockTransportService, times(8)).registerRequestHandler(anyString(), anyString(), anyBoolean(), anyBoolean(), any(), any()); } public void testOnIndexModule() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); - - transportService.start(); - transportService.acceptIncomingRequests(); - extensionsManager.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); + initialize(extensionsManager); Environment environment = TestEnvironment.newEnvironment(settings); AnalysisRegistry emptyAnalysisRegistry = new AnalysisRegistry( @@ -696,4 +762,16 @@ public void testOnIndexModule() throws Exception { } } + private void initialize(ExtensionsManager extensionsManager) { + transportService.start(); + transportService.acceptIncomingRequests(); + extensionsManager.initializeServicesAndRestHandler( + restController, + settingsModule, + transportService, + clusterService, + settings, + client + ); + } } diff --git a/server/src/test/java/org/opensearch/extensions/RegisterTransportActionsRequestTests.java b/server/src/test/java/org/opensearch/extensions/RegisterTransportActionsRequestTests.java index ed36cc5290bb1..eb59c80ac6461 100644 --- a/server/src/test/java/org/opensearch/extensions/RegisterTransportActionsRequestTests.java +++ b/server/src/test/java/org/opensearch/extensions/RegisterTransportActionsRequestTests.java @@ -9,6 +9,7 @@ package org.opensearch.extensions; import org.junit.Before; +import org.opensearch.action.admin.indices.create.AutoCreateAction.TransportAction; import org.opensearch.common.collect.Map; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.StreamInput; @@ -21,7 +22,7 @@ public class RegisterTransportActionsRequestTests extends OpenSearchTestCase { @Before public void setup() { - this.originalRequest = new RegisterTransportActionsRequest(Map.of("testAction", Map.class)); + this.originalRequest = new RegisterTransportActionsRequest("extension-uniqueId", Map.of("testAction", TransportAction.class)); } public void testRegisterTransportActionsRequest() throws IOException { @@ -37,6 +38,9 @@ public void testRegisterTransportActionsRequest() throws IOException { } public void testToString() { - assertEquals(originalRequest.toString(), "TransportActionsRequest{actions={testAction=class org.opensearch.common.collect.Map}}"); + assertEquals( + originalRequest.toString(), + "TransportActionsRequest{uniqueId=extension-uniqueId, actions={testAction=class org.opensearch.action.admin.indices.create.AutoCreateAction$TransportAction}}" + ); } } diff --git a/server/src/test/java/org/opensearch/extensions/action/ExtensionActionRequestTests.java b/server/src/test/java/org/opensearch/extensions/action/ExtensionActionRequestTests.java new file mode 100644 index 0000000000000..2d4f2b5d8aa66 --- /dev/null +++ b/server/src/test/java/org/opensearch/extensions/action/ExtensionActionRequestTests.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.test.OpenSearchTestCase; + +import java.nio.charset.StandardCharsets; + +public class ExtensionActionRequestTests extends OpenSearchTestCase { + + public void testExtensionActionRequest() throws Exception { + String expectedAction = "test-action"; + byte[] expectedRequestBytes = "request-bytes".getBytes(StandardCharsets.UTF_8); + ExtensionActionRequest request = new ExtensionActionRequest(expectedAction, expectedRequestBytes); + + assertEquals(expectedAction, request.getAction()); + assertEquals(expectedRequestBytes, request.getRequestBytes()); + + BytesStreamOutput out = new BytesStreamOutput(); + request.writeTo(out); + BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes())); + request = new ExtensionActionRequest(in); + + assertEquals(expectedAction, request.getAction()); + assertArrayEquals(expectedRequestBytes, request.getRequestBytes()); + assertNull(request.validate()); + } +} diff --git a/server/src/test/java/org/opensearch/extensions/action/ExtensionActionResponseTests.java b/server/src/test/java/org/opensearch/extensions/action/ExtensionActionResponseTests.java new file mode 100644 index 0000000000000..5ec8c16027da2 --- /dev/null +++ b/server/src/test/java/org/opensearch/extensions/action/ExtensionActionResponseTests.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.test.OpenSearchTestCase; + +import java.nio.charset.StandardCharsets; + +public class ExtensionActionResponseTests extends OpenSearchTestCase { + + public void testExtensionActionResponse() throws Exception { + byte[] expectedResponseBytes = "response-bytes".getBytes(StandardCharsets.UTF_8); + ExtensionActionResponse response = new ExtensionActionResponse(expectedResponseBytes); + + assertEquals(expectedResponseBytes, response.getResponseBytes()); + + BytesStreamOutput out = new BytesStreamOutput(); + response.writeTo(out); + BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes())); + response = new ExtensionActionResponse(in); + assertArrayEquals(expectedResponseBytes, response.getResponseBytes()); + } +} diff --git a/server/src/test/java/org/opensearch/extensions/action/ExtensionHandleTransportRequestTests.java b/server/src/test/java/org/opensearch/extensions/action/ExtensionHandleTransportRequestTests.java new file mode 100644 index 0000000000000..15e7320ba7556 --- /dev/null +++ b/server/src/test/java/org/opensearch/extensions/action/ExtensionHandleTransportRequestTests.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.test.OpenSearchTestCase; + +import java.nio.charset.StandardCharsets; + +public class ExtensionHandleTransportRequestTests extends OpenSearchTestCase { + public void testExtensionHandleTransportRequest() throws Exception { + String expectedAction = "test-action"; + byte[] expectedRequestBytes = "request-bytes".getBytes(StandardCharsets.UTF_8); + ExtensionHandleTransportRequest request = new ExtensionHandleTransportRequest(expectedAction, expectedRequestBytes); + + assertEquals(expectedAction, request.getAction()); + assertEquals(expectedRequestBytes, request.getRequestBytes()); + + BytesStreamOutput out = new BytesStreamOutput(); + request.writeTo(out); + BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes())); + request = new ExtensionHandleTransportRequest(in); + + assertEquals(expectedAction, request.getAction()); + assertArrayEquals(expectedRequestBytes, request.getRequestBytes()); + } +} diff --git a/server/src/test/java/org/opensearch/extensions/action/ExtensionProxyActionTests.java b/server/src/test/java/org/opensearch/extensions/action/ExtensionProxyActionTests.java new file mode 100644 index 0000000000000..3719c29090287 --- /dev/null +++ b/server/src/test/java/org/opensearch/extensions/action/ExtensionProxyActionTests.java @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.test.OpenSearchTestCase; + +public class ExtensionProxyActionTests extends OpenSearchTestCase { + public void testExtensionProxyAction() { + assertEquals("cluster:internal/extensions", ExtensionProxyAction.NAME); + assertEquals(ExtensionProxyAction.class, ExtensionProxyAction.INSTANCE.getClass()); + } +} diff --git a/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java b/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java new file mode 100644 index 0000000000000..c3d6372a4f6b8 --- /dev/null +++ b/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java @@ -0,0 +1,181 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.action.admin.indices.create.AutoCreateAction.TransportAction; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.transport.TransportAddress; +import org.opensearch.common.util.PageCacheRecycler; +import org.opensearch.extensions.DiscoveryExtensionNode; +import org.opensearch.extensions.AcknowledgedResponse; +import org.opensearch.extensions.RegisterTransportActionsRequest; +import org.opensearch.extensions.rest.RestSendToExtensionActionTests; +import org.opensearch.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.plugins.PluginInfo; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.client.NoOpNodeClient; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.ActionNotFoundTransportException; +import org.opensearch.transport.TransportService; +import org.opensearch.transport.nio.MockNioTransport; + +import java.net.InetAddress; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; + +public class ExtensionTransportActionsHandlerTests extends OpenSearchTestCase { + private TransportService transportService; + private MockNioTransport transport; + private DiscoveryExtensionNode discoveryExtensionNode; + private ExtensionTransportActionsHandler extensionTransportActionsHandler; + private NodeClient client; + private final ThreadPool threadPool = new TestThreadPool(RestSendToExtensionActionTests.class.getSimpleName()); + + @Before + public void setup() throws Exception { + Settings settings = Settings.builder().put("cluster.name", "test").build(); + transport = new MockNioTransport( + settings, + Version.CURRENT, + threadPool, + new NetworkService(Collections.emptyList()), + PageCacheRecycler.NON_RECYCLING_INSTANCE, + new NamedWriteableRegistry(Collections.emptyList()), + new NoneCircuitBreakerService() + ); + transportService = new MockTransportService( + settings, + transport, + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + (boundAddress) -> new DiscoveryNode( + "test_node", + "test_node", + boundAddress.publishAddress(), + emptyMap(), + emptySet(), + Version.CURRENT + ), + null, + Collections.emptySet() + ); + discoveryExtensionNode = new DiscoveryExtensionNode( + "firstExtension", + "uniqueid1", + "uniqueid1", + "myIndependentPluginHost1", + "127.0.0.0", + new TransportAddress(InetAddress.getByName("127.0.0.0"), 9300), + new HashMap(), + Version.fromString("3.0.0"), + new PluginInfo( + "firstExtension", + "Fake description 1", + "0.0.7", + Version.fromString("3.0.0"), + "14", + "fakeClass1", + new ArrayList(), + false + ), + Collections.emptyList() + ); + client = new NoOpNodeClient(this.getTestName()); + extensionTransportActionsHandler = new ExtensionTransportActionsHandler( + Map.of("uniqueid1", discoveryExtensionNode), + transportService, + client + ); + } + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + transportService.close(); + client.close(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + } + + public void testRegisterAction() { + String action = "test-action"; + extensionTransportActionsHandler.registerAction(action, discoveryExtensionNode); + assertEquals(discoveryExtensionNode, extensionTransportActionsHandler.getExtension(action)); + + // Test duplicate action registration + expectThrows(IllegalArgumentException.class, () -> extensionTransportActionsHandler.registerAction(action, discoveryExtensionNode)); + assertEquals(discoveryExtensionNode, extensionTransportActionsHandler.getExtension(action)); + } + + public void testRegisterTransportActionsRequest() { + String action = "test-action"; + RegisterTransportActionsRequest request = new RegisterTransportActionsRequest("uniqueid1", Map.of(action, TransportAction.class)); + AcknowledgedResponse response = (AcknowledgedResponse) extensionTransportActionsHandler.handleRegisterTransportActionsRequest( + request + ); + assertTrue(response.getStatus()); + assertEquals(discoveryExtensionNode, extensionTransportActionsHandler.getExtension(action)); + + // Test duplicate action registration + response = (AcknowledgedResponse) extensionTransportActionsHandler.handleRegisterTransportActionsRequest(request); + assertFalse(response.getStatus()); + } + + public void testTransportActionRequestFromExtension() throws InterruptedException { + String action = "test-action"; + byte[] requestBytes = "requestBytes".getBytes(StandardCharsets.UTF_8); + TransportActionRequestFromExtension request = new TransportActionRequestFromExtension(action, requestBytes, "uniqueid1"); + // NoOpNodeClient returns null as response + expectThrows(NullPointerException.class, () -> extensionTransportActionsHandler.handleTransportActionRequestFromExtension(request)); + } + + public void testSendTransportRequestToExtension() throws InterruptedException { + String action = "test-action"; + byte[] requestBytes = "request-bytes".getBytes(StandardCharsets.UTF_8); + ExtensionActionRequest request = new ExtensionActionRequest(action, requestBytes); + + // Action not registered, expect exception + expectThrows( + ActionNotFoundTransportException.class, + () -> extensionTransportActionsHandler.sendTransportRequestToExtension(request) + ); + + // Register Action + RegisterTransportActionsRequest registerRequest = new RegisterTransportActionsRequest( + "uniqueid1", + Map.of(action, TransportAction.class) + ); + AcknowledgedResponse response = (AcknowledgedResponse) extensionTransportActionsHandler.handleRegisterTransportActionsRequest( + registerRequest + ); + assertTrue(response.getStatus()); + + ExtensionActionResponse extensionResponse = extensionTransportActionsHandler.sendTransportRequestToExtension(request); + assertEquals( + "Request failed: [firstExtension][127.0.0.0:9300] Node not connected", + new String(extensionResponse.getResponseBytes(), StandardCharsets.UTF_8) + ); + } +} diff --git a/server/src/test/java/org/opensearch/extensions/action/TransportActionRequestFromExtensionTests.java b/server/src/test/java/org/opensearch/extensions/action/TransportActionRequestFromExtensionTests.java new file mode 100644 index 0000000000000..a8ef5372800d9 --- /dev/null +++ b/server/src/test/java/org/opensearch/extensions/action/TransportActionRequestFromExtensionTests.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.test.OpenSearchTestCase; + +import java.nio.charset.StandardCharsets; + +public class TransportActionRequestFromExtensionTests extends OpenSearchTestCase { + public void testTransportActionRequestFromExtension() throws Exception { + String expectedAction = "test-action"; + byte[] expectedRequestBytes = "request-bytes".getBytes(StandardCharsets.UTF_8); + String uniqueId = "test-uniqueId"; + TransportActionRequestFromExtension request = new TransportActionRequestFromExtension( + expectedAction, + expectedRequestBytes, + uniqueId + ); + + assertEquals(expectedAction, request.getAction()); + assertEquals(expectedRequestBytes, request.getRequestBytes()); + assertEquals(uniqueId, request.getUniqueId()); + + BytesStreamOutput out = new BytesStreamOutput(); + request.writeTo(out); + BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes())); + request = new TransportActionRequestFromExtension(in); + + assertEquals(expectedAction, request.getAction()); + assertArrayEquals(expectedRequestBytes, request.getRequestBytes()); + assertEquals(uniqueId, request.getUniqueId()); + } +} diff --git a/server/src/test/java/org/opensearch/extensions/action/TransportActionResponseToExtensionTests.java b/server/src/test/java/org/opensearch/extensions/action/TransportActionResponseToExtensionTests.java new file mode 100644 index 0000000000000..070feaa240d98 --- /dev/null +++ b/server/src/test/java/org/opensearch/extensions/action/TransportActionResponseToExtensionTests.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +public class TransportActionResponseToExtensionTests extends OpenSearchTestCase { + public void testTransportActionRequestToExtension() throws IOException { + byte[] expectedResponseBytes = "response-bytes".getBytes(StandardCharsets.UTF_8); + TransportActionResponseToExtension response = new TransportActionResponseToExtension(expectedResponseBytes); + + assertEquals(expectedResponseBytes, response.getResponseBytes()); + + BytesStreamOutput out = new BytesStreamOutput(); + response.writeTo(out); + BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes())); + response = new TransportActionResponseToExtension(in); + + assertArrayEquals(expectedResponseBytes, response.getResponseBytes()); + } + + public void testSetBytes() { + byte[] expectedResponseBytes = "response-bytes".getBytes(StandardCharsets.UTF_8); + byte[] expectedEmptyBytes = new byte[0]; + TransportActionResponseToExtension response = new TransportActionResponseToExtension(expectedEmptyBytes); + assertArrayEquals(expectedEmptyBytes, response.getResponseBytes()); + + response.setResponseBytes(expectedResponseBytes); + assertArrayEquals(expectedResponseBytes, response.getResponseBytes()); + } +} diff --git a/server/src/test/java/org/opensearch/extensions/rest/ExtensionRestRequestTests.java b/server/src/test/java/org/opensearch/extensions/rest/ExtensionRestRequestTests.java new file mode 100644 index 0000000000000..9f09735dbe38d --- /dev/null +++ b/server/src/test/java/org/opensearch/extensions/rest/ExtensionRestRequestTests.java @@ -0,0 +1,262 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.rest; + +import org.opensearch.rest.RestStatus; +import org.opensearch.OpenSearchParseException; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest.Method; +import org.opensearch.test.OpenSearchTestCase; + +import java.nio.charset.StandardCharsets; +import java.security.Principal; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import static java.util.Map.entry; + +public class ExtensionRestRequestTests extends OpenSearchTestCase { + + private Method expectedMethod; + private String expectedPath; + Map expectedParams; + XContentType expectedContentType; + BytesReference expectedContent; + String extensionUniqueId1; + Principal userPrincipal; + // Will be replaced with ExtensionTokenProcessor and PrincipalIdentifierToken classes from feature/identity + String extensionTokenProcessor; + String expectedRequestIssuerIdentity; + NamedWriteableRegistry registry; + + public void setUp() throws Exception { + super.setUp(); + expectedMethod = Method.GET; + expectedPath = "/test/uri"; + expectedParams = Map.ofEntries(entry("foo", "bar"), entry("baz", "42")); + expectedContentType = XContentType.JSON; + expectedContent = new BytesArray("{\"key\": \"value\"}".getBytes(StandardCharsets.UTF_8)); + extensionUniqueId1 = "ext_1"; + userPrincipal = () -> "user1"; + extensionTokenProcessor = "placeholder_extension_token_processor"; + expectedRequestIssuerIdentity = "placeholder_request_issuer_identity"; + } + + public void testExtensionRestRequest() throws Exception { + ExtensionRestRequest request = new ExtensionRestRequest( + expectedMethod, + expectedPath, + expectedParams, + expectedContentType, + expectedContent, + expectedRequestIssuerIdentity + ); + + assertEquals(expectedMethod, request.method()); + assertEquals(expectedPath, request.path()); + + assertEquals(expectedParams, request.params()); + assertEquals(Collections.emptyList(), request.consumedParams()); + assertTrue(request.hasParam("foo")); + assertFalse(request.hasParam("bar")); + assertEquals("bar", request.param("foo")); + assertEquals("baz", request.param("bar", "baz")); + assertEquals(42L, request.paramAsLong("baz", 0L)); + assertEquals(0L, request.paramAsLong("bar", 0L)); + assertTrue(request.consumedParams().contains("foo")); + assertTrue(request.consumedParams().contains("baz")); + + assertEquals(expectedContentType, request.getXContentType()); + assertTrue(request.hasContent()); + assertFalse(request.isContentConsumed()); + assertEquals(expectedContent, request.content()); + assertTrue(request.isContentConsumed()); + + XContentParser parser = request.contentParser(NamedXContentRegistry.EMPTY); + Map contentMap = parser.mapStrings(); + assertEquals("value", contentMap.get("key")); + + assertEquals(expectedRequestIssuerIdentity, request.getRequestIssuerIdentity()); + + try (BytesStreamOutput out = new BytesStreamOutput()) { + request.writeTo(out); + out.flush(); + try (BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes()))) { + try (NamedWriteableAwareStreamInput nameWritableAwareIn = new NamedWriteableAwareStreamInput(in, registry)) { + request = new ExtensionRestRequest(nameWritableAwareIn); + assertEquals(expectedMethod, request.method()); + assertEquals(expectedPath, request.path()); + assertEquals(expectedParams, request.params()); + assertEquals(expectedContent, request.content()); + assertEquals(expectedRequestIssuerIdentity, request.getRequestIssuerIdentity()); + } + } + } + } + + public void testExtensionRestRequestWithNoContent() throws Exception { + ExtensionRestRequest request = new ExtensionRestRequest( + expectedMethod, + expectedPath, + expectedParams, + null, + new BytesArray(new byte[0]), + expectedRequestIssuerIdentity + ); + + assertEquals(expectedMethod, request.method()); + assertEquals(expectedPath, request.path()); + assertEquals(expectedParams, request.params()); + assertNull(request.getXContentType()); + assertEquals(0, request.content().length()); + assertEquals(expectedRequestIssuerIdentity, request.getRequestIssuerIdentity()); + + final ExtensionRestRequest requestWithNoContent = request; + assertThrows(OpenSearchParseException.class, () -> requestWithNoContent.contentParser(NamedXContentRegistry.EMPTY)); + + try (BytesStreamOutput out = new BytesStreamOutput()) { + request.writeTo(out); + out.flush(); + try (BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes()))) { + try (NamedWriteableAwareStreamInput nameWritableAwareIn = new NamedWriteableAwareStreamInput(in, registry)) { + request = new ExtensionRestRequest(nameWritableAwareIn); + assertEquals(expectedMethod, request.method()); + assertEquals(expectedPath, request.path()); + assertEquals(expectedParams, request.params()); + assertNull(request.getXContentType()); + assertEquals(0, request.content().length()); + assertEquals(expectedRequestIssuerIdentity, request.getRequestIssuerIdentity()); + + final ExtensionRestRequest requestWithNoContentType = request; + assertThrows(OpenSearchParseException.class, () -> requestWithNoContentType.contentParser(NamedXContentRegistry.EMPTY)); + } + } + } + } + + public void testExtensionRestRequestWithPlainTextContent() throws Exception { + BytesReference expectedText = new BytesArray("Plain text"); + + ExtensionRestRequest request = new ExtensionRestRequest( + expectedMethod, + expectedPath, + expectedParams, + null, + expectedText, + expectedRequestIssuerIdentity + ); + + assertEquals(expectedMethod, request.method()); + assertEquals(expectedPath, request.path()); + assertEquals(expectedParams, request.params()); + assertNull(request.getXContentType()); + assertEquals(expectedText, request.content()); + assertEquals(expectedRequestIssuerIdentity, request.getRequestIssuerIdentity()); + + try (BytesStreamOutput out = new BytesStreamOutput()) { + request.writeTo(out); + out.flush(); + try (BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes()))) { + try (NamedWriteableAwareStreamInput nameWritableAwareIn = new NamedWriteableAwareStreamInput(in, registry)) { + request = new ExtensionRestRequest(nameWritableAwareIn); + assertEquals(expectedMethod, request.method()); + assertEquals(expectedPath, request.path()); + assertEquals(expectedParams, request.params()); + assertNull(request.getXContentType()); + assertEquals(expectedText, request.content()); + assertEquals(expectedRequestIssuerIdentity, request.getRequestIssuerIdentity()); + } + } + } + } + + public void testRestExecuteOnExtensionResponse() throws Exception { + RestStatus expectedStatus = RestStatus.OK; + String expectedContentType = BytesRestResponse.TEXT_CONTENT_TYPE; + String expectedResponse = "Test response"; + byte[] expectedResponseBytes = expectedResponse.getBytes(StandardCharsets.UTF_8); + + RestExecuteOnExtensionResponse response = new RestExecuteOnExtensionResponse( + expectedStatus, + expectedContentType, + expectedResponseBytes, + Collections.emptyMap(), + Collections.emptyList(), + false + ); + + assertEquals(expectedStatus, response.getStatus()); + assertEquals(expectedContentType, response.getContentType()); + assertArrayEquals(expectedResponseBytes, response.getContent()); + assertEquals(0, response.getHeaders().size()); + assertEquals(0, response.getConsumedParams().size()); + assertFalse(response.isContentConsumed()); + + String headerKey = "foo"; + List headerValueList = List.of("bar", "baz"); + Map> expectedHeaders = Map.of(headerKey, headerValueList); + List expectedConsumedParams = List.of("foo", "bar"); + + response = new RestExecuteOnExtensionResponse( + expectedStatus, + expectedContentType, + expectedResponseBytes, + expectedHeaders, + expectedConsumedParams, + true + ); + + assertEquals(expectedStatus, response.getStatus()); + assertEquals(expectedContentType, response.getContentType()); + assertArrayEquals(expectedResponseBytes, response.getContent()); + + assertEquals(1, response.getHeaders().keySet().size()); + assertTrue(response.getHeaders().containsKey(headerKey)); + + List fooList = response.getHeaders().get(headerKey); + assertEquals(2, fooList.size()); + assertTrue(fooList.containsAll(headerValueList)); + + assertEquals(2, response.getConsumedParams().size()); + assertTrue(response.getConsumedParams().containsAll(expectedConsumedParams)); + assertTrue(response.isContentConsumed()); + + try (BytesStreamOutput out = new BytesStreamOutput()) { + response.writeTo(out); + out.flush(); + try (BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes()))) { + response = new RestExecuteOnExtensionResponse(in); + + assertEquals(expectedStatus, response.getStatus()); + assertEquals(expectedContentType, response.getContentType()); + assertArrayEquals(expectedResponseBytes, response.getContent()); + + assertEquals(1, response.getHeaders().keySet().size()); + assertTrue(response.getHeaders().containsKey(headerKey)); + + fooList = response.getHeaders().get(headerKey); + assertEquals(2, fooList.size()); + assertTrue(fooList.containsAll(headerValueList)); + + assertEquals(2, response.getConsumedParams().size()); + assertTrue(response.getConsumedParams().containsAll(expectedConsumedParams)); + assertTrue(response.isContentConsumed()); + } + } + } +} diff --git a/server/src/test/java/org/opensearch/extensions/rest/ExtensionRestResponseTests.java b/server/src/test/java/org/opensearch/extensions/rest/ExtensionRestResponseTests.java new file mode 100644 index 0000000000000..82ae61b02cb32 --- /dev/null +++ b/server/src/test/java/org/opensearch/extensions/rest/ExtensionRestResponseTests.java @@ -0,0 +1,132 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.rest; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; + +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.rest.RestRequest.Method; +import org.opensearch.test.OpenSearchTestCase; + +import static org.opensearch.rest.BytesRestResponse.TEXT_CONTENT_TYPE; +import static org.opensearch.rest.RestStatus.ACCEPTED; +import static org.opensearch.rest.RestStatus.OK; + +public class ExtensionRestResponseTests extends OpenSearchTestCase { + + private static final String OCTET_CONTENT_TYPE = "application/octet-stream"; + private static final String JSON_CONTENT_TYPE = "application/json; charset=UTF-8"; + + private String testText; + private byte[] testBytes; + + @Override + public void setUp() throws Exception { + super.setUp(); + testText = "plain text"; + testBytes = new byte[] { 1, 2 }; + } + + private ExtensionRestRequest generateTestRequest() { + ExtensionRestRequest request = new ExtensionRestRequest( + Method.GET, + "/foo", + Collections.emptyMap(), + null, + new BytesArray("Text Content"), + null + ); + // consume params "foo" and "bar" + request.param("foo"); + request.param("bar"); + // consume content + request.content(); + return request; + } + + public void testConstructorWithBuilder() throws IOException { + XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent()); + builder.startObject(); + builder.field("status", ACCEPTED); + builder.endObject(); + ExtensionRestRequest request = generateTestRequest(); + ExtensionRestResponse response = new ExtensionRestResponse(request, OK, builder); + + assertEquals(OK, response.status()); + assertEquals(JSON_CONTENT_TYPE, response.contentType()); + assertEquals("{\"status\":\"ACCEPTED\"}", response.content().utf8ToString()); + for (String param : response.getConsumedParams()) { + assertTrue(request.consumedParams().contains(param)); + } + assertTrue(request.isContentConsumed()); + } + + public void testConstructorWithPlainText() { + ExtensionRestRequest request = generateTestRequest(); + ExtensionRestResponse response = new ExtensionRestResponse(request, OK, testText); + + assertEquals(OK, response.status()); + assertEquals(TEXT_CONTENT_TYPE, response.contentType()); + assertEquals(testText, response.content().utf8ToString()); + for (String param : response.getConsumedParams()) { + assertTrue(request.consumedParams().contains(param)); + } + assertTrue(request.isContentConsumed()); + } + + public void testConstructorWithText() { + ExtensionRestRequest request = generateTestRequest(); + ExtensionRestResponse response = new ExtensionRestResponse(request, OK, TEXT_CONTENT_TYPE, testText); + + assertEquals(OK, response.status()); + assertEquals(TEXT_CONTENT_TYPE, response.contentType()); + assertEquals(testText, response.content().utf8ToString()); + + for (String param : response.getConsumedParams()) { + assertTrue(request.consumedParams().contains(param)); + } + assertTrue(request.isContentConsumed()); + } + + public void testConstructorWithByteArray() { + ExtensionRestRequest request = generateTestRequest(); + ExtensionRestResponse response = new ExtensionRestResponse(request, OK, OCTET_CONTENT_TYPE, testBytes); + + assertEquals(OK, response.status()); + assertEquals(OCTET_CONTENT_TYPE, response.contentType()); + assertArrayEquals(testBytes, BytesReference.toBytes(response.content())); + for (String param : response.getConsumedParams()) { + assertTrue(request.consumedParams().contains(param)); + } + assertTrue(request.isContentConsumed()); + } + + public void testConstructorWithBytesReference() { + ExtensionRestRequest request = generateTestRequest(); + ExtensionRestResponse response = new ExtensionRestResponse( + request, + OK, + OCTET_CONTENT_TYPE, + BytesReference.fromByteBuffer(ByteBuffer.wrap(testBytes, 0, 2)) + ); + + assertEquals(OK, response.status()); + assertEquals(OCTET_CONTENT_TYPE, response.contentType()); + assertArrayEquals(testBytes, BytesReference.toBytes(response.content())); + for (String param : response.getConsumedParams()) { + assertTrue(request.consumedParams().contains(param)); + } + assertTrue(request.isContentConsumed()); + } +} diff --git a/server/src/test/java/org/opensearch/extensions/rest/RestExecuteOnExtensionTests.java b/server/src/test/java/org/opensearch/extensions/rest/RestExecuteOnExtensionTests.java deleted file mode 100644 index 98521ddcf1e26..0000000000000 --- a/server/src/test/java/org/opensearch/extensions/rest/RestExecuteOnExtensionTests.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.extensions.rest; - -import org.opensearch.rest.RestStatus; -import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.io.stream.BytesStreamInput; -import org.opensearch.common.io.stream.BytesStreamOutput; -import org.opensearch.rest.BytesRestResponse; -import org.opensearch.rest.RestRequest.Method; -import org.opensearch.test.OpenSearchTestCase; - -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Map; - -public class RestExecuteOnExtensionTests extends OpenSearchTestCase { - - public void testRestExecuteOnExtensionRequest() throws Exception { - Method expectedMethod = Method.GET; - String expectedUri = "/test/uri"; - RestExecuteOnExtensionRequest request = new RestExecuteOnExtensionRequest(expectedMethod, expectedUri); - - assertEquals(expectedMethod, request.getMethod()); - assertEquals(expectedUri, request.getUri()); - - try (BytesStreamOutput out = new BytesStreamOutput()) { - request.writeTo(out); - out.flush(); - try (BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes()))) { - request = new RestExecuteOnExtensionRequest(in); - - assertEquals(expectedMethod, request.getMethod()); - assertEquals(expectedUri, request.getUri()); - } - } - } - - public void testRestExecuteOnExtensionResponse() throws Exception { - RestStatus expectedStatus = RestStatus.OK; - String expectedContentType = BytesRestResponse.TEXT_CONTENT_TYPE; - String expectedResponse = "Test response"; - byte[] expectedResponseBytes = expectedResponse.getBytes(StandardCharsets.UTF_8); - - RestExecuteOnExtensionResponse response = new RestExecuteOnExtensionResponse(expectedStatus, expectedResponse); - - assertEquals(expectedStatus, response.getStatus()); - assertEquals(expectedContentType, response.getContentType()); - assertArrayEquals(expectedResponseBytes, response.getContent()); - assertEquals(0, response.getHeaders().size()); - - String headerKey = "foo"; - List headerValueList = List.of("bar", "baz"); - Map> expectedHeaders = Map.of(headerKey, headerValueList); - - response = new RestExecuteOnExtensionResponse(expectedStatus, expectedContentType, expectedResponseBytes, expectedHeaders); - - assertEquals(expectedStatus, response.getStatus()); - assertEquals(expectedContentType, response.getContentType()); - assertArrayEquals(expectedResponseBytes, response.getContent()); - - assertEquals(1, expectedHeaders.keySet().size()); - assertTrue(expectedHeaders.containsKey(headerKey)); - - List fooList = expectedHeaders.get(headerKey); - assertEquals(2, fooList.size()); - assertTrue(fooList.containsAll(headerValueList)); - - try (BytesStreamOutput out = new BytesStreamOutput()) { - response.writeTo(out); - out.flush(); - try (BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes()))) { - response = new RestExecuteOnExtensionResponse(in); - - assertEquals(expectedStatus, response.getStatus()); - assertEquals(expectedContentType, response.getContentType()); - assertArrayEquals(expectedResponseBytes, response.getContent()); - - assertEquals(1, expectedHeaders.keySet().size()); - assertTrue(expectedHeaders.containsKey(headerKey)); - - fooList = expectedHeaders.get(headerKey); - assertEquals(2, fooList.size()); - assertTrue(fooList.containsAll(headerValueList)); - } - } - } -} diff --git a/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java b/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java index 2a593a8d251e9..97eeae8fb95af 100644 --- a/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java +++ b/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java @@ -93,7 +93,8 @@ public void setup() throws Exception { "fakeClass1", new ArrayList(), false - ) + ), + Collections.emptyList() ); } diff --git a/server/src/test/resources/config/extensions.yml b/server/src/test/resources/config/extensions.yml index 6264e9630ad60..e02a2913385d9 100644 --- a/server/src/test/resources/config/extensions.yml +++ b/server/src/test/resources/config/extensions.yml @@ -7,6 +7,11 @@ extensions: version: '3.0.0' - name: "secondExtension" uniqueId: 'uniqueid2' + dependencies: + - name: 'uniqueid0' + version: '2.0.0' + - name: 'uniqueid1' + version: '3.0.0' hostName: 'myIndependentPluginHost2' hostAddress: '127.0.0.1' port: '9301'