Skip to content

Commit

Permalink
Cleanup interfaces around ExtensionManager
Browse files Browse the repository at this point in the history
ExtensionsManager has a whole lot of functionality that shouldn't be
exposed to external classes, marked much of this package protected for a
cleaner interface.  Then updated the Noop version to override all public
functions to remove extranious feature flag checks.

Note; I didn't go so far to remove all getters/setters that were unused
but that could be a good follow up in the future.

Signed-off-by: Peter Nied <petern@amazon.com>
  • Loading branch information
peternied committed May 2, 2023
1 parent 1151308 commit 5eead5e
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 197 deletions.
148 changes: 44 additions & 104 deletions server/src/main/java/org/opensearch/extensions/ExtensionsManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -107,8 +108,7 @@ public static enum OpenSearchRequestType {

private final Path extensionsPath;
private ExtensionTransportActionsHandler extensionTransportActionsHandler;
// A list of initialized extensions, a subset of the values of map below which includes all extensions
private List<DiscoveryExtensionNode> extensions;
private Map<String, DiscoveryExtensionNode> initializedExtensions;
private Map<String, DiscoveryExtensionNode> extensionIdMap;
private RestActionsRequestHandler restActionsRequestHandler;
private CustomSettingsRequestHandler customSettingsRequestHandler;
Expand All @@ -118,21 +118,16 @@ public static enum OpenSearchRequestType {
private AddSettingsUpdateConsumerRequestHandler addSettingsUpdateConsumerRequestHandler;
private NodeClient client;

public ExtensionsManager() {
this.extensionsPath = Path.of("");
}

/**
* Instantiate a new ExtensionsManager object to handle requests and responses from extensions. This is called during Node bootstrap.
*
* @param settings Settings from the node the orchestrator is running on.
* @param extensionsPath Path to a directory containing extensions.
* @throws IOException If the extensions discovery file is not properly retrieved.
*/
public ExtensionsManager(Settings settings, Path extensionsPath) throws IOException {
public ExtensionsManager(Path extensionsPath) throws IOException {
logger.info("ExtensionsManager initialized");
this.extensionsPath = extensionsPath;
this.extensions = new ArrayList<DiscoveryExtensionNode>();
this.initializedExtensions = new HashMap<String, DiscoveryExtensionNode>();
this.extensionIdMap = new HashMap<String, DiscoveryExtensionNode>();
// will be initialized in initializeServicesAndRestHandler which is called after the Node is initialized
this.transportService = null;
Expand Down Expand Up @@ -187,6 +182,16 @@ public void initializeServicesAndRestHandler(
registerRequestHandler();
}

/**
* Lookup an initialized extension by its unique id
*
* @param extensionId The unique extension identifier
* @return An optional of the DiscoveryExtensionNode instance for the matching extension
*/
public Optional<DiscoveryExtensionNode> lookupInitializedExtensionById(final String extensionId) {
return Optional.ofNullable(this.initializedExtensions.get(extensionId));
}

/**
* Handles Transport Request from {@link org.opensearch.extensions.action.ExtensionTransportAction} which was invoked by an extension via {@link ExtensionTransportActionsHandler}.
*
Expand Down Expand Up @@ -342,7 +347,7 @@ private void loadExtension(Extension extension) throws IOException {
}

/**
* Iterate through all extensions and initialize them. Initialized extensions will be added to the {@link #extensions}.
* Iterate through all extensions and initialize them. Initialized extensions will be added to the {@link #initializedExtensions}.
*/
public void initialize() {
for (DiscoveryExtensionNode extension : extensionIdMap.values()) {
Expand All @@ -366,7 +371,7 @@ public void handleResponse(InitializeExtensionResponse response) {
for (DiscoveryExtensionNode extension : extensionIdMap.values()) {
if (extension.getName().equals(response.getName())) {
extension.setImplementedInterfaces(response.getImplementedInterfaces());
extensions.add(extension);
initializedExtensions.put(extension.getId(), extension);
logger.info("Initialized extension: " + extension.getName());
break;
}
Expand Down Expand Up @@ -426,11 +431,17 @@ TransportResponse handleExtensionRequest(ExtensionRequest extensionRequest) thro
case REQUEST_EXTENSION_DEPENDENCY_INFORMATION:
String uniqueId = extensionRequest.getUniqueId();
if (uniqueId == null) {
return new ExtensionDependencyResponse(extensions);
return new ExtensionDependencyResponse(
initializedExtensions.entrySet().stream().map(e -> e.getValue()).collect(Collectors.toList())
);
} else {
ExtensionDependency matchingId = new ExtensionDependency(uniqueId, Version.CURRENT);
return new ExtensionDependencyResponse(
extensions.stream().filter(e -> e.dependenciesContain(matchingId)).collect(Collectors.toList())
initializedExtensions.entrySet()
.stream()
.map(e -> e.getValue())
.filter(e -> e.dependenciesContain(matchingId))
.collect(Collectors.toList())
);
}
default:
Expand Down Expand Up @@ -623,154 +634,83 @@ private ExtensionsSettings readFromExtensionsYml(Path filePath) throws IOExcepti
}
}

public static String getRequestExtensionActionName() {
static String getRequestExtensionActionName() {
return REQUEST_EXTENSION_ACTION_NAME;
}

public static String getIndicesExtensionPointActionName() {
static String getIndicesExtensionPointActionName() {
return INDICES_EXTENSION_POINT_ACTION_NAME;
}

public static String getIndicesExtensionNameActionName() {
static String getIndicesExtensionNameActionName() {
return INDICES_EXTENSION_NAME_ACTION_NAME;
}

public static String getRequestExtensionClusterState() {
static String getRequestExtensionClusterState() {
return REQUEST_EXTENSION_CLUSTER_STATE;
}

public static String getRequestExtensionClusterSettings() {
static String getRequestExtensionClusterSettings() {
return REQUEST_EXTENSION_CLUSTER_SETTINGS;
}

public static Logger getLogger() {
static Logger getLogger() {
return logger;
}

public Path getExtensionsPath() {
Path getExtensionsPath() {
return extensionsPath;
}

public List<DiscoveryExtensionNode> getExtensions() {
return extensions;
}

public TransportService getTransportService() {
TransportService getTransportService() {
return transportService;
}

public ClusterService getClusterService() {
ClusterService getClusterService() {
return clusterService;
}

public static String getRequestExtensionRegisterRestActions() {
return REQUEST_EXTENSION_REGISTER_REST_ACTIONS;
}

public static String getRequestRestExecuteOnExtensionAction() {
return REQUEST_REST_EXECUTE_ON_EXTENSION_ACTION;
}

public Map<String, DiscoveryExtensionNode> getExtensionIdMap() {
Map<String, DiscoveryExtensionNode> getExtensionIdMap() {
return extensionIdMap;
}

public RestActionsRequestHandler getRestActionsRequestHandler() {
RestActionsRequestHandler getRestActionsRequestHandler() {
return restActionsRequestHandler;
}

public void setExtensions(List<DiscoveryExtensionNode> extensions) {
this.extensions = extensions;
}

public void setExtensionIdMap(Map<String, DiscoveryExtensionNode> extensionIdMap) {
void setExtensionIdMap(Map<String, DiscoveryExtensionNode> extensionIdMap) {
this.extensionIdMap = extensionIdMap;
}

public void setRestActionsRequestHandler(RestActionsRequestHandler restActionsRequestHandler) {
void setRestActionsRequestHandler(RestActionsRequestHandler restActionsRequestHandler) {
this.restActionsRequestHandler = restActionsRequestHandler;
}

public void setTransportService(TransportService transportService) {
void setTransportService(TransportService transportService) {
this.transportService = transportService;
}

public void setClusterService(ClusterService clusterService) {
void setClusterService(ClusterService clusterService) {
this.clusterService = clusterService;
}

public static String getRequestExtensionRegisterTransportActions() {
return REQUEST_EXTENSION_REGISTER_TRANSPORT_ACTIONS;
}

public static String getRequestExtensionRegisterCustomSettings() {
return REQUEST_EXTENSION_REGISTER_CUSTOM_SETTINGS;
}

public CustomSettingsRequestHandler getCustomSettingsRequestHandler() {
CustomSettingsRequestHandler getCustomSettingsRequestHandler() {
return customSettingsRequestHandler;
}

public void setCustomSettingsRequestHandler(CustomSettingsRequestHandler customSettingsRequestHandler) {
void setCustomSettingsRequestHandler(CustomSettingsRequestHandler customSettingsRequestHandler) {
this.customSettingsRequestHandler = customSettingsRequestHandler;
}

public static String getRequestExtensionEnvironmentSettings() {
return REQUEST_EXTENSION_ENVIRONMENT_SETTINGS;
}

public static String getRequestExtensionAddSettingsUpdateConsumer() {
return REQUEST_EXTENSION_ADD_SETTINGS_UPDATE_CONSUMER;
}

public static String getRequestExtensionUpdateSettings() {
return REQUEST_EXTENSION_UPDATE_SETTINGS;
}

public AddSettingsUpdateConsumerRequestHandler getAddSettingsUpdateConsumerRequestHandler() {
AddSettingsUpdateConsumerRequestHandler getAddSettingsUpdateConsumerRequestHandler() {
return addSettingsUpdateConsumerRequestHandler;
}

public void setAddSettingsUpdateConsumerRequestHandler(
AddSettingsUpdateConsumerRequestHandler addSettingsUpdateConsumerRequestHandler
) {
void setAddSettingsUpdateConsumerRequestHandler(AddSettingsUpdateConsumerRequestHandler addSettingsUpdateConsumerRequestHandler) {
this.addSettingsUpdateConsumerRequestHandler = addSettingsUpdateConsumerRequestHandler;
}

public Settings getEnvironmentSettings() {
Settings getEnvironmentSettings() {
return environmentSettings;
}

public void setEnvironmentSettings(Settings environmentSettings) {
this.environmentSettings = environmentSettings;
}

public static String getRequestExtensionHandleTransportAction() {
return REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION;
}

public static String getTransportActionRequestFromExtension() {
return TRANSPORT_ACTION_REQUEST_FROM_EXTENSION;
}

public static int getExtensionRequestWaitTimeout() {
return EXTENSION_REQUEST_WAIT_TIMEOUT;
}

public ExtensionTransportActionsHandler getExtensionTransportActionsHandler() {
return extensionTransportActionsHandler;
}

public void setExtensionTransportActionsHandler(ExtensionTransportActionsHandler extensionTransportActionsHandler) {
this.extensionTransportActionsHandler = extensionTransportActionsHandler;
}

public NodeClient getClient() {
return client;
}

public void setClient(NodeClient client) {
this.client = client;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,65 @@

package org.opensearch.extensions;

import java.io.IOException;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.util.Optional;

import org.opensearch.action.ActionModule;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsModule;

import org.opensearch.extensions.action.ExtensionActionRequest;
import org.opensearch.extensions.action.ExtensionActionResponse;
import org.opensearch.extensions.action.RemoteExtensionActionResponse;
import org.opensearch.index.IndexModule;
import org.opensearch.transport.TransportService;

/**
* Noop class for ExtensionsManager
*
* @opensearch.internal
*/
public class NoopExtensionsManager extends ExtensionsManager {

public NoopExtensionsManager() {
super();
public NoopExtensionsManager() throws IOException {
super(Path.of(""));
}

public void initializeServicesAndRestHandler(
ActionModule actionModule,
SettingsModule settingsModule,
TransportService transportService,
ClusterService clusterService,
Settings initialEnvironmentSettings,
NodeClient client
) {
// no-op
}

public RemoteExtensionActionResponse handleRemoteTransportRequest(ExtensionActionRequest request) throws Exception {
// no-op empty response
return new RemoteExtensionActionResponse(true, new byte[0]);
}

public ExtensionActionResponse handleTransportRequest(ExtensionActionRequest request) throws Exception {
// no-op empty response
return new ExtensionActionResponse(new byte[0]);
}

public void initialize() {
// no-op
}

public void onIndexModule(IndexModule indexModule) throws UnknownHostException {
// no-op
}

public Optional<DiscoveryExtensionNode> lookupInitializedExtensionById(final String extensionId) {
// no-op not found
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.iterable.Iterables;
import org.opensearch.common.util.set.Sets;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand Down Expand Up @@ -926,9 +925,7 @@ private synchronized IndexService createIndexService(
indexModule.addIndexOperationListener(operationListener);
}
pluginsService.onIndexModule(indexModule);
if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) {
extensionsManager.onIndexModule(indexModule);
}
extensionsManager.onIndexModule(indexModule);
for (IndexEventListener listener : builtInListeners) {
indexModule.addIndexEventListener(listener);
}
Expand Down
Loading

0 comments on commit 5eead5e

Please sign in to comment.