Skip to content

Commit

Permalink
Support extension additional settings with extension REST initializat…
Browse files Browse the repository at this point in the history
…ion (opensearch-project#8414)

* Support extension additional settings with extension REST initialization

Signed-off-by: Craig Perkins <cwperx@amazon.com>

* Add CHANGELOG entry

Signed-off-by: Craig Perkins <cwperx@amazon.com>

* Add tests

Signed-off-by: Craig Perkins <cwperx@amazon.com>

* Add addition setting types to test

Signed-off-by: Craig Perkins <cwperx@amazon.com>

* Address code review feedback

Signed-off-by: Craig Perkins <cwperx@amazon.com>

* Check for missing values

Signed-off-by: Craig Perkins <cwperx@amazon.com>

* Use Version.CURRENT

Signed-off-by: Craig Perkins <cwperx@amazon.com>

* Switch minimum compat version back to 3.0.0

Signed-off-by: Craig Perkins <cwperx@amazon.com>

* Remove hardcoded versions

Signed-off-by: Craig Perkins <cwperx@amazon.com>

---------

Signed-off-by: Craig Perkins <cwperx@amazon.com>
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
cwperks authored and shiv0408 committed Apr 25, 2024
1 parent 9114fc9 commit 998ec20
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 84 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add API to initialize extensions ([#8029]()https://github.com/opensearch-project/OpenSearch/pull/8029)
- Add distributed tracing framework ([#7543](https://github.com/opensearch-project/OpenSearch/issues/7543))
- Enable Point based optimization for custom comparators ([#8168](https://github.com/opensearch-project/OpenSearch/pull/8168))
- [Extensions] Support extension additional settings with extension REST initialization ([#8414](https://github.com/opensearch-project/OpenSearch/pull/8414))

### Dependencies
- Bump `com.azure:azure-storage-common` from 12.21.0 to 12.21.1 (#7566, #7814)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.XContentParser;

import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

/**
* This class handles the dependent extensions information
Expand Down Expand Up @@ -60,39 +56,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVersion(version);
}

public static ExtensionDependency parse(XContentParser parser) throws IOException {
String uniqueId = null;
Version version = null;
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();

switch (fieldName) {
case UNIQUE_ID:
uniqueId = parser.text();
break;
case VERSION:
try {
version = Version.fromString(parser.text());
} catch (IllegalArgumentException e) {
throw e;
}
break;
default:
parser.skipChildren();
break;
}
}
if (Strings.isNullOrEmpty(uniqueId)) {
throw new IOException("Required field [uniqueId] is missing in the request for the dependent extension");
} else if (version == null) {
throw new IOException("Required field [version] is missing in the request for the dependent extension");
}
return new ExtensionDependency(uniqueId, version);

}

/**
* The uniqueId of the dependency extension
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public static enum OpenSearchRequestType {
/**
* Instantiate a new ExtensionsManager object to handle requests and responses from extensions. This is called during Node bootstrap.
*
* @param additionalSettings Additional settings to read in from extensions.yml
* @param additionalSettings Additional settings to read in from extension initialization request
* @throws IOException If the extensions discovery file is not properly retrieved.
*/
public ExtensionsManager(Set<Setting<?>> additionalSettings) throws IOException {
Expand Down Expand Up @@ -504,4 +504,8 @@ void setAddSettingsUpdateConsumerRequestHandler(AddSettingsUpdateConsumerRequest
Settings getEnvironmentSettings() {
return environmentSettings;
}

public Set<Setting<?>> getAdditionalSettings() {
return this.additionalSettings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@

package org.opensearch.extensions.rest;

import org.opensearch.Version;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.extensions.ExtensionDependency;
import org.opensearch.extensions.ExtensionScopedSettings;
import org.opensearch.extensions.ExtensionsManager;
Expand All @@ -23,12 +28,16 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.rest.RestRequest.Method.POST;

/**
Expand Down Expand Up @@ -62,36 +71,79 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client
String openSearchVersion = null;
String minimumCompatibleVersion = null;
List<ExtensionDependency> dependencies = new ArrayList<>();
Set<String> additionalSettingsKeys = extensionsManager.getAdditionalSettings()
.stream()
.map(s -> s.getKey())
.collect(Collectors.toSet());

try (XContentParser parser = request.contentParser()) {
parser.nextToken();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String currentFieldName = parser.currentName();
parser.nextToken();
if ("name".equals(currentFieldName)) {
name = parser.text();
} else if ("uniqueId".equals(currentFieldName)) {
uniqueId = parser.text();
} else if ("hostAddress".equals(currentFieldName)) {
hostAddress = parser.text();
} else if ("port".equals(currentFieldName)) {
port = parser.text();
} else if ("version".equals(currentFieldName)) {
version = parser.text();
} else if ("opensearchVersion".equals(currentFieldName)) {
openSearchVersion = parser.text();
} else if ("minimumCompatibleVersion".equals(currentFieldName)) {
minimumCompatibleVersion = parser.text();
} else if ("dependencies".equals(currentFieldName)) {
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
dependencies.add(ExtensionDependency.parse(parser));
Tuple<? extends MediaType, Map<String, Object>> unreadExtensionTuple = XContentHelper.convertToMap(
request.content(),
false,
request.getXContentType().xContent().mediaType()
);
Map<String, Object> extensionMap = unreadExtensionTuple.v2();

ExtensionScopedSettings extAdditionalSettings = new ExtensionScopedSettings(extensionsManager.getAdditionalSettings());

try {
// checking to see whether any required fields are missing from extension initialization request or not
String[] requiredFields = {
"name",
"uniqueId",
"hostAddress",
"port",
"version",
"opensearchVersion",
"minimumCompatibleVersion" };
List<String> missingFields = Arrays.stream(requiredFields)
.filter(field -> !extensionMap.containsKey(field))
.collect(Collectors.toList());
if (!missingFields.isEmpty()) {
throw new IOException("Extension is missing these required fields : " + missingFields);
}

// Parse extension dependencies
List<ExtensionDependency> extensionDependencyList = new ArrayList<ExtensionDependency>();
if (extensionMap.get("dependencies") != null) {
List<HashMap<String, ?>> extensionDependencies = new ArrayList<>(
(Collection<HashMap<String, ?>>) extensionMap.get("dependencies")
);
for (HashMap<String, ?> dependency : extensionDependencies) {
if (Strings.isNullOrEmpty((String) dependency.get("uniqueId"))) {
throw new IOException("Required field [uniqueId] is missing in the request for the dependent extension");
} else if (dependency.get("version") == null) {
throw new IOException("Required field [version] is missing in the request for the dependent extension");
}
extensionDependencyList.add(
new ExtensionDependency(
dependency.get("uniqueId").toString(),
Version.fromString(dependency.get("version").toString())
)
);
}
}

Map<String, ?> additionalSettingsMap = extensionMap.entrySet()
.stream()
.filter(kv -> additionalSettingsKeys.contains(kv.getKey()))
.collect(Collectors.toMap(map -> map.getKey(), map -> map.getValue()));

Settings.Builder output = Settings.builder();
output.loadFromMap(additionalSettingsMap);
extAdditionalSettings.applySettings(output.build());

// Create extension read from initialization request
name = extensionMap.get("name").toString();
uniqueId = extensionMap.get("uniqueId").toString();
hostAddress = extensionMap.get("hostAddress").toString();
port = extensionMap.get("port").toString();
version = extensionMap.get("version").toString();
openSearchVersion = extensionMap.get("opensearchVersion").toString();
minimumCompatibleVersion = extensionMap.get("minimumCompatibleVersion").toString();
dependencies = extensionDependencyList;
} catch (IOException e) {
throw new IOException("Missing attribute", e);
logger.warn("loading extension has been failed because of exception : " + e.getMessage());
return channel -> channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
}

Extension extension = new Extension(
Expand All @@ -103,8 +155,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client
openSearchVersion,
minimumCompatibleVersion,
dependencies,
// TODO add this to the API (https://github.com/opensearch-project/OpenSearch/issues/8032)
new ExtensionScopedSettings(Collections.emptySet())
extAdditionalSettings
);
try {
extensionsManager.loadExtension(extension);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterSettingsResponse;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.env.EnvironmentSettingsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -398,16 +396,6 @@ public void testExtensionDependency() throws Exception {
}
}

public void testParseExtensionDependency() throws Exception {
XContentParser parser = createParser(JsonXContent.jsonXContent, "{\"uniqueId\": \"test1\", \"version\": \"2.0.0\"}");

assertEquals(XContentParser.Token.START_OBJECT, parser.nextToken());
ExtensionDependency dependency = ExtensionDependency.parse(parser);

assertEquals("test1", dependency.getUniqueId());
assertEquals(Version.fromString("2.0.0"), dependency.getVersion());
}

public void testInitialize() throws Exception {
ExtensionsManager extensionsManager = new ExtensionsManager(Set.of());

Expand Down
Loading

0 comments on commit 998ec20

Please sign in to comment.