Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup interfaces around ExtensionManager #7374

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 45 additions & 105 deletions server/src/main/java/org/opensearch/extensions/ExtensionsManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -107,8 +108,7 @@ public static enum OpenSearchRequestType {

private final Path extensionsPath;
private ExtensionTransportActionsHandler extensionTransportActionsHandler;
// A list of initialized extensions, a subset of the values of map below which includes all extensions
private List<DiscoveryExtensionNode> extensions;
private Map<String, DiscoveryExtensionNode> initializedExtensions;
private Map<String, DiscoveryExtensionNode> extensionIdMap;
private RestActionsRequestHandler restActionsRequestHandler;
private CustomSettingsRequestHandler customSettingsRequestHandler;
Expand All @@ -118,21 +118,16 @@ public static enum OpenSearchRequestType {
private AddSettingsUpdateConsumerRequestHandler addSettingsUpdateConsumerRequestHandler;
private NodeClient client;

public ExtensionsManager() {
this.extensionsPath = Path.of("");
}

/**
* Instantiate a new ExtensionsManager object to handle requests and responses from extensions. This is called during Node bootstrap.
*
* @param settings Settings from the node the orchestrator is running on.
* @param extensionsPath Path to a directory containing extensions.
* @throws IOException If the extensions discovery file is not properly retrieved.
*/
public ExtensionsManager(Settings settings, Path extensionsPath) throws IOException {
public ExtensionsManager(Path extensionsPath) throws IOException {
logger.info("ExtensionsManager initialized");
this.extensionsPath = extensionsPath;
this.extensions = new ArrayList<DiscoveryExtensionNode>();
this.initializedExtensions = new HashMap<String, DiscoveryExtensionNode>();
peternied marked this conversation as resolved.
Show resolved Hide resolved
this.extensionIdMap = new HashMap<String, DiscoveryExtensionNode>();
// will be initialized in initializeServicesAndRestHandler which is called after the Node is initialized
this.transportService = null;
Expand Down Expand Up @@ -187,6 +182,16 @@ public void initializeServicesAndRestHandler(
registerRequestHandler();
}

/**
* Lookup an initialized extension by its unique id
*
* @param extensionId The unique extension identifier
* @return An optional of the DiscoveryExtensionNode instance for the matching extension
*/
public Optional<DiscoveryExtensionNode> lookupInitializedExtensionById(final String extensionId) {
return Optional.ofNullable(this.initializedExtensions.get(extensionId));
}

/**
* Handles Transport Request from {@link org.opensearch.extensions.action.ExtensionTransportAction} which was invoked by an extension via {@link ExtensionTransportActionsHandler}.
*
Expand Down Expand Up @@ -289,7 +294,7 @@ private void registerRequestHandler() {
/*
* Load and populate all extensions
*/
private void discover() throws IOException {
protected void discover() throws IOException {
logger.info("Loading extensions : {}", extensionsPath);
if (!FileSystemUtils.isAccessibleDirectory(extensionsPath, logger)) {
return;
Expand Down Expand Up @@ -342,7 +347,7 @@ private void loadExtension(Extension extension) throws IOException {
}

/**
* Iterate through all extensions and initialize them. Initialized extensions will be added to the {@link #extensions}.
* Iterate through all extensions and initialize them. Initialized extensions will be added to the {@link #initializedExtensions}.
*/
public void initialize() {
for (DiscoveryExtensionNode extension : extensionIdMap.values()) {
Expand All @@ -366,7 +371,7 @@ public void handleResponse(InitializeExtensionResponse response) {
for (DiscoveryExtensionNode extension : extensionIdMap.values()) {
if (extension.getName().equals(response.getName())) {
extension.setImplementedInterfaces(response.getImplementedInterfaces());
extensions.add(extension);
initializedExtensions.put(extension.getId(), extension);
logger.info("Initialized extension: " + extension.getName());
break;
}
Expand Down Expand Up @@ -426,11 +431,17 @@ TransportResponse handleExtensionRequest(ExtensionRequest extensionRequest) thro
case REQUEST_EXTENSION_DEPENDENCY_INFORMATION:
String uniqueId = extensionRequest.getUniqueId();
if (uniqueId == null) {
return new ExtensionDependencyResponse(extensions);
return new ExtensionDependencyResponse(
initializedExtensions.entrySet().stream().map(e -> e.getValue()).collect(Collectors.toList())
);
} else {
ExtensionDependency matchingId = new ExtensionDependency(uniqueId, Version.CURRENT);
return new ExtensionDependencyResponse(
extensions.stream().filter(e -> e.dependenciesContain(matchingId)).collect(Collectors.toList())
initializedExtensions.entrySet()
.stream()
.map(e -> e.getValue())
.filter(e -> e.dependenciesContain(matchingId))
.collect(Collectors.toList())
);
}
default:
Expand Down Expand Up @@ -623,154 +634,83 @@ private ExtensionsSettings readFromExtensionsYml(Path filePath) throws IOExcepti
}
}

public static String getRequestExtensionActionName() {
static String getRequestExtensionActionName() {
return REQUEST_EXTENSION_ACTION_NAME;
}

public static String getIndicesExtensionPointActionName() {
static String getIndicesExtensionPointActionName() {
return INDICES_EXTENSION_POINT_ACTION_NAME;
}

public static String getIndicesExtensionNameActionName() {
static String getIndicesExtensionNameActionName() {
return INDICES_EXTENSION_NAME_ACTION_NAME;
}

public static String getRequestExtensionClusterState() {
static String getRequestExtensionClusterState() {
return REQUEST_EXTENSION_CLUSTER_STATE;
}

public static String getRequestExtensionClusterSettings() {
static String getRequestExtensionClusterSettings() {
return REQUEST_EXTENSION_CLUSTER_SETTINGS;
}

public static Logger getLogger() {
static Logger getLogger() {
return logger;
}

public Path getExtensionsPath() {
Path getExtensionsPath() {
return extensionsPath;
}

public List<DiscoveryExtensionNode> getExtensions() {
return extensions;
}

public TransportService getTransportService() {
TransportService getTransportService() {
return transportService;
}

public ClusterService getClusterService() {
ClusterService getClusterService() {
return clusterService;
}

public static String getRequestExtensionRegisterRestActions() {
return REQUEST_EXTENSION_REGISTER_REST_ACTIONS;
}

public static String getRequestRestExecuteOnExtensionAction() {
return REQUEST_REST_EXECUTE_ON_EXTENSION_ACTION;
}

public Map<String, DiscoveryExtensionNode> getExtensionIdMap() {
Map<String, DiscoveryExtensionNode> getExtensionIdMap() {
return extensionIdMap;
}

public RestActionsRequestHandler getRestActionsRequestHandler() {
RestActionsRequestHandler getRestActionsRequestHandler() {
return restActionsRequestHandler;
}

public void setExtensions(List<DiscoveryExtensionNode> extensions) {
this.extensions = extensions;
}

public void setExtensionIdMap(Map<String, DiscoveryExtensionNode> extensionIdMap) {
void setExtensionIdMap(Map<String, DiscoveryExtensionNode> extensionIdMap) {
this.extensionIdMap = extensionIdMap;
}

public void setRestActionsRequestHandler(RestActionsRequestHandler restActionsRequestHandler) {
void setRestActionsRequestHandler(RestActionsRequestHandler restActionsRequestHandler) {
this.restActionsRequestHandler = restActionsRequestHandler;
}

public void setTransportService(TransportService transportService) {
void setTransportService(TransportService transportService) {
this.transportService = transportService;
}

public void setClusterService(ClusterService clusterService) {
void setClusterService(ClusterService clusterService) {
this.clusterService = clusterService;
}

public static String getRequestExtensionRegisterTransportActions() {
return REQUEST_EXTENSION_REGISTER_TRANSPORT_ACTIONS;
}

public static String getRequestExtensionRegisterCustomSettings() {
return REQUEST_EXTENSION_REGISTER_CUSTOM_SETTINGS;
}

public CustomSettingsRequestHandler getCustomSettingsRequestHandler() {
CustomSettingsRequestHandler getCustomSettingsRequestHandler() {
return customSettingsRequestHandler;
}

public void setCustomSettingsRequestHandler(CustomSettingsRequestHandler customSettingsRequestHandler) {
void setCustomSettingsRequestHandler(CustomSettingsRequestHandler customSettingsRequestHandler) {
this.customSettingsRequestHandler = customSettingsRequestHandler;
}

public static String getRequestExtensionEnvironmentSettings() {
return REQUEST_EXTENSION_ENVIRONMENT_SETTINGS;
}

public static String getRequestExtensionAddSettingsUpdateConsumer() {
return REQUEST_EXTENSION_ADD_SETTINGS_UPDATE_CONSUMER;
}

public static String getRequestExtensionUpdateSettings() {
return REQUEST_EXTENSION_UPDATE_SETTINGS;
}

public AddSettingsUpdateConsumerRequestHandler getAddSettingsUpdateConsumerRequestHandler() {
AddSettingsUpdateConsumerRequestHandler getAddSettingsUpdateConsumerRequestHandler() {
return addSettingsUpdateConsumerRequestHandler;
}

public void setAddSettingsUpdateConsumerRequestHandler(
AddSettingsUpdateConsumerRequestHandler addSettingsUpdateConsumerRequestHandler
) {
void setAddSettingsUpdateConsumerRequestHandler(AddSettingsUpdateConsumerRequestHandler addSettingsUpdateConsumerRequestHandler) {
this.addSettingsUpdateConsumerRequestHandler = addSettingsUpdateConsumerRequestHandler;
}

public Settings getEnvironmentSettings() {
Settings getEnvironmentSettings() {
return environmentSettings;
}

public void setEnvironmentSettings(Settings environmentSettings) {
this.environmentSettings = environmentSettings;
}

public static String getRequestExtensionHandleTransportAction() {
return REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION;
}

public static String getTransportActionRequestFromExtension() {
return TRANSPORT_ACTION_REQUEST_FROM_EXTENSION;
}

public static int getExtensionRequestWaitTimeout() {
return EXTENSION_REQUEST_WAIT_TIMEOUT;
}

public ExtensionTransportActionsHandler getExtensionTransportActionsHandler() {
return extensionTransportActionsHandler;
}

public void setExtensionTransportActionsHandler(ExtensionTransportActionsHandler extensionTransportActionsHandler) {
this.extensionTransportActionsHandler = extensionTransportActionsHandler;
}

public NodeClient getClient() {
return client;
}

public void setClient(NodeClient client) {
this.client = client;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,76 @@

package org.opensearch.extensions;

import java.io.IOException;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.util.Optional;

import org.opensearch.action.ActionModule;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsModule;

import org.opensearch.extensions.action.ExtensionActionRequest;
import org.opensearch.extensions.action.ExtensionActionResponse;
import org.opensearch.extensions.action.RemoteExtensionActionResponse;
import org.opensearch.index.IndexModule;
import org.opensearch.transport.TransportService;

/**
* Noop class for ExtensionsManager
*
* @opensearch.internal
*/
public class NoopExtensionsManager extends ExtensionsManager {

public NoopExtensionsManager() {
super();
public NoopExtensionsManager() throws IOException {
super(Path.of(""));
}

@Override
public void initializeServicesAndRestHandler(
ActionModule actionModule,
SettingsModule settingsModule,
TransportService transportService,
ClusterService clusterService,
Settings initialEnvironmentSettings,
NodeClient client
) {
// no-op
}

@Override
public RemoteExtensionActionResponse handleRemoteTransportRequest(ExtensionActionRequest request) throws Exception {
// no-op empty response
return new RemoteExtensionActionResponse(true, new byte[0]);
}

@Override
public ExtensionActionResponse handleTransportRequest(ExtensionActionRequest request) throws Exception {
// no-op empty response
return new ExtensionActionResponse(new byte[0]);
}

@Override
protected void discover() throws IOException {
// no-op
}

@Override
public void initialize() {
// no-op
}

@Override
public void onIndexModule(IndexModule indexModule) throws UnknownHostException {
// no-op
}

@Override
public Optional<DiscoveryExtensionNode> lookupInitializedExtensionById(final String extensionId) {
// no-op not found
return Optional.empty();
}
}
Loading