Skip to content

Commit

Permalink
Provide Extension API to OpenSearch (opensearch-project#74)
Browse files Browse the repository at this point in the history
* Provide Extension API to OpenSearch

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Fix newline, add test

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Clarify Valid API format

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Store uniqueID of extensions

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Fix errors in merge commit

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Rename Api to RestApi

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Delay sending REST API until after responding to initialization

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Move YAML file to classpath

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Temporarily disable failing test for debug

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* fixed testHandleExtensionInitRequest, added protected transport service setter in ExtensionsRunner for testing purporses

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Change transport service setter to package privage

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Rename RestApi to RestActions or RestPaths

Signed-off-by: Daniel Widdis <widdis@gmail.com>

Signed-off-by: Daniel Widdis <widdis@gmail.com>
Signed-off-by: Joshua Palis <jpalis@amazon.com>
Co-authored-by: Joshua Palis <jpalis@amazon.com>
  • Loading branch information
2 people authored and kokibas committed Mar 17, 2023
1 parent 35ec7a0 commit a02464f
Show file tree
Hide file tree
Showing 12 changed files with 297 additions and 50 deletions.
4 changes: 2 additions & 2 deletions DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ Every extension will require metadata stored in an extensions.yml file in order
- Navigate to the extensions folder using `cd extensions`.
- Manually create a file titled `extensions.yml` within the extensions directory using an IDE or an in-line text editor.

Sample extensions.yml file:
Sample `extensions.yml` file (the name must match the `extensionName` field in the corresponding `extension.yml`:

```
extensions:
- name: opensearch-sdk
- name: sample-extension
uniqueId: opensearch-sdk-1
hostName: 'sdk_host'
hostAddress: '127.0.0.1'
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,6 @@ test {
systemProperty 'tests.security.manager', 'false'
testLogging {
events "passed", "skipped", "failed"
exceptionFormat "full"
}
}
62 changes: 62 additions & 0 deletions src/main/java/org/opensearch/sdk/ExtensionRestPaths.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;

/**
* This class encapsulates the API of an Extension.
*/
public class ExtensionRestPaths {

private List<String> restPaths = new ArrayList<>();

/**
* Placeholder field. Eventually will read this from spec file
*/
public static final String EXTENSION_REST_PATHS_DESCRIPTOR = "/extension_rest_paths.yml";

/**
* Jackson requires a default constructor.
*/
private ExtensionRestPaths() {
super();
}

/**
* Gets the REST API Path and Method Strings.
*
* @return a copy of the list containing the REST API Path Strings
*/
public List<String> getRestPaths() {
return new ArrayList<>(restPaths);
}

@Override
public String toString() {
return "ExtensionRestPaths{restPaths=" + restPaths + "}";
}

/**
* Instantiates an instance of this class by reading from a YAML file.
*
* @return An instance of this class.
* @throws IOException if there is an error reading the file.
*/
static ExtensionRestPaths readFromYaml() throws IOException {
File file = new File(ExtensionRestPaths.class.getResource(EXTENSION_REST_PATHS_DESCRIPTOR).getPath());
ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory());
return objectMapper.readValue(file, ExtensionRestPaths.class);
}
}
3 changes: 1 addition & 2 deletions src/main/java/org/opensearch/sdk/ExtensionSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public String getHostPort() {

@Override
public String toString() {
return "\nnodename: " + extensionName + "\nhostaddress: " + hostAddress + "\nhostPort: " + hostPort + "\n";
return "ExtensionSettings{extensionName=" + extensionName + ", hostAddress=" + hostAddress + ", hostPort=" + hostPort + "}";
}

}
104 changes: 80 additions & 24 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.extensions.DiscoveryExtension;
import org.opensearch.extensions.ExtensionBooleanResponse;
import org.opensearch.discovery.InitializeExtensionsRequest;
import org.opensearch.discovery.InitializeExtensionsResponse;
import org.opensearch.extensions.ExtensionRequest;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.extensions.ExtensionBooleanResponse;
import org.opensearch.extensions.RegisterRestActionsRequest;
import org.opensearch.index.IndicesModuleRequest;
import org.opensearch.index.IndicesModuleResponse;
import org.opensearch.indices.IndicesModule;
Expand All @@ -37,14 +39,14 @@
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.LocalNodeResponseHandler;
import org.opensearch.sdk.handlers.RegisterRestActionsResponseHandler;
import org.opensearch.search.SearchModule;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ClusterConnectionManager;
import org.opensearch.transport.ConnectionManager;
import org.opensearch.transport.TransportInterceptor;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportSettings;
import org.opensearch.transport.TransportInterceptor;
import org.opensearch.transport.TransportResponse;

import java.io.File;
Expand All @@ -65,7 +67,11 @@
*/
public class ExtensionsRunner {
private ExtensionSettings extensionSettings = readExtensionSettings();
private ExtensionRestPaths extensionRestPaths = ExtensionRestPaths.readFromYaml();
private String uniqueId;
private DiscoveryNode opensearchNode;
private TransportService extensionTransportService = null;

private final Settings settings = Settings.builder()
.put("node.name", extensionSettings.getExtensionName())
.put(TransportSettings.BIND_HOST.getKey(), extensionSettings.getHostAddress())
Expand All @@ -79,22 +85,33 @@ public class ExtensionsRunner {
/**
* Instantiates a new Extensions Runner.
*
* @throws IOException if the runner failed to connect to the OpenSearch cluster.
* @throws IOException if the runner failed to read settings or API.
*/
public ExtensionsRunner() throws IOException {}

private ExtensionSettings readExtensionSettings() throws IOException {
File file = new File(ExtensionSettings.EXTENSION_DESCRIPTOR);
ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory());
ExtensionSettings extensionSettings = objectMapper.readValue(file, ExtensionSettings.class);
return extensionSettings;
return objectMapper.readValue(file, ExtensionSettings.class);
}

void setExtensionTransportService(TransportService extensionTransportService) {
this.extensionTransportService = extensionTransportService;
}

private void setUniqueId(String id) {
this.uniqueId = id;
}

String getUniqueId() {
return uniqueId;
}

private void setOpensearchNode(DiscoveryNode opensearchNode) {
this.opensearchNode = opensearchNode;
}

public DiscoveryNode getOpensearchNode() {
DiscoveryNode getOpensearchNode() {
return opensearchNode;
}

Expand All @@ -108,8 +125,28 @@ InitializeExtensionsResponse handleExtensionInitRequest(InitializeExtensionsRequ
logger.info("Registering Extension Request received from OpenSearch");
InitializeExtensionsResponse initializeExtensionsResponse = new InitializeExtensionsResponse(extensionSettings.getExtensionName());
opensearchNode = extensionInitRequest.getSourceNode();
setOpensearchNode(opensearchNode);
return initializeExtensionsResponse;
// Fetch the unique ID
for (DiscoveryExtension de : extensionInitRequest.getExtensions()) {
if (de.getName().equals(extensionSettings.getExtensionName())) {
setUniqueId(de.getId());
break;
}
}
// We could avoid this check if we only send one extension in the initialize request, rather than the entire list
if (getUniqueId() == null) {
throw new IllegalArgumentException(
"Extension " + extensionSettings.getExtensionName() + " does not match any requested extension."
);
}
// Successfully initialized. Send the response.
try {
return initializeExtensionsResponse;
} finally {
// After sending successful response to initialization, send the REST API
setOpensearchNode(opensearchNode);
extensionTransportService.connectToNode(opensearchNode);
sendRegisterRestActionsRequest(extensionTransportService);
}
}

/**
Expand Down Expand Up @@ -140,10 +177,6 @@ TransportResponse handleOpenSearchRequest(OpenSearchRequest request) throws Exce
IndicesModuleResponse handleIndicesModuleRequest(IndicesModuleRequest indicesModuleRequest, TransportService transportService) {
logger.info("Registering Indices Module Request received from OpenSearch");
IndicesModuleResponse indicesModuleResponse = new IndicesModuleResponse(true, true, true);

// handleExtensionInitRequest will set the opensearchNode while bootstraping of OpenSearch
DiscoveryNode opensearchNode = getOpensearchNode();
transportService.connectToNode(opensearchNode);
return indicesModuleResponse;
}

Expand Down Expand Up @@ -200,21 +233,26 @@ public Netty4Transport getNetty4Transport(Settings settings, ThreadPool threadPo
}

/**
* Creates a TransportService object. This object will control communication between the extension and OpenSearch.
* Initializes the TransportService object for this extension. This object will control communication between the extension and OpenSearch.
*
* @param settings The transport settings to configure.
* @return The configured TransportService object.
* @return The initialized TransportService object.
*/
public TransportService createTransportService(Settings settings) {
public TransportService initializeExtensionTransportService(Settings settings) {

ThreadPool threadPool = new ThreadPool(settings);

Netty4Transport transport = getNetty4Transport(settings, threadPool);

final ConnectionManager connectionManager = new ClusterConnectionManager(settings, transport);

// Stop any existing transport service
if (extensionTransportService != null) {
extensionTransportService.stop();
}

// create transport service
return new TransportService(
extensionTransportService = new TransportService(
settings,
transport,
threadPool,
Expand All @@ -228,6 +266,8 @@ public TransportService createTransportService(Settings settings) {
emptySet(),
connectionManager
);
startTransportService(extensionTransportService);
return extensionTransportService;
}

/**
Expand Down Expand Up @@ -289,6 +329,26 @@ public void startTransportService(TransportService transportService) {

}

/**
* Requests that OpenSearch register the REST Actions for this extension.
*
* @param transportService The TransportService defining the connection to OpenSearch.
*/
public void sendRegisterRestActionsRequest(TransportService transportService) {
logger.info("Sending Register REST Actions request to OpenSearch for " + extensionRestPaths.getRestPaths());
RegisterRestActionsResponseHandler registerActionsResponseHandler = new RegisterRestActionsResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_REGISTER_REST_ACTIONS,
new RegisterRestActionsRequest(getUniqueId(), extensionRestPaths.getRestPaths()),
registerActionsResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Register REST Actions request to OpenSearch", e);
}
}

/**
* Requests the cluster state from OpenSearch. The result will be handled by a {@link ClusterStateResponseHandler}.
*
Expand Down Expand Up @@ -364,7 +424,7 @@ public void startActionListener(int timeout) {
}

/**
* Run the Extension. Imports settings, establishes a connection with an OpenSearch cluster via a Transport Service, and sets up a listener for responses.
* Run the Extension. Imports settings and sets up Transport Service listening for incoming connections.
*
* @param args Unused
* @throws IOException if the runner failed to connect to the OpenSearch cluster.
Expand All @@ -373,13 +433,9 @@ public static void main(String[] args) throws IOException {

ExtensionsRunner extensionsRunner = new ExtensionsRunner();

// configure and retrieve transport service with settings
Settings settings = extensionsRunner.getSettings();
TransportService transportService = extensionsRunner.createTransportService(settings);

// start transport service and action listener
extensionsRunner.startTransportService(transportService);
// initialize the transport service
extensionsRunner.initializeExtensionTransportService(extensionsRunner.getSettings());
// start listening on configured port and wait for connection from OpenSearch
extensionsRunner.startActionListener(0);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk.handlers;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.extensions.RegisterRestActionsResponse;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.io.IOException;

/**
* This class handles the response from OpenSearch to a {@link ExtensionsRunner#sendRegisterRestActionsRequest(TransportService)} call.
*/
public class RegisterRestActionsResponseHandler implements TransportResponseHandler<RegisterRestActionsResponse> {
private static final Logger logger = LogManager.getLogger(RegisterRestActionsResponseHandler.class);

@Override
public void handleResponse(RegisterRestActionsResponse response) {
logger.info("received {}", response);
}

@Override
public void handleException(TransportException exp) {
logger.info("RegisterActionsRequest failed", exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}

@Override
public RegisterRestActionsResponse read(StreamInput in) throws IOException {
return new RegisterRestActionsResponse(in);
}
}
9 changes: 9 additions & 0 deletions src/main/resources/extension_rest_paths.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# This is a placeholder for a more formal API definition file, e.g., an OpenAPI
# spec file which can be used to generate documentation and streamline testing
# See https://github.com/opensearch-project/OpenSearch/issues/3090
# The API will include at least two components, a method (e.g., GET, PUT) and
# parts of a URI (e.g., /_extensions/_sample-extension/api_1)
restPaths:
- GET /api_1
- PUT /api_2
- POST /api_3
Loading

0 comments on commit a02464f

Please sign in to comment.