Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support extension additional settings with extension REST initialization #8414

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add API to initialize extensions ([#8029]()https://github.com/opensearch-project/OpenSearch/pull/8029)
- Add distributed tracing framework ([#7543](https://github.com/opensearch-project/OpenSearch/issues/7543))
- Enable Point based optimization for custom comparators ([#8168](https://github.com/opensearch-project/OpenSearch/pull/8168))
- [Extensions] Support extension additional settings with extension REST initialization ([#8414](https://github.com/opensearch-project/OpenSearch/pull/8414))

### Dependencies
- Bump `com.azure:azure-storage-common` from 12.21.0 to 12.21.1 (#7566, #7814)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.XContentParser;

import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

/**
* This class handles the dependent extensions information
Expand Down Expand Up @@ -60,39 +56,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVersion(version);
}

public static ExtensionDependency parse(XContentParser parser) throws IOException {
String uniqueId = null;
Version version = null;
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();

switch (fieldName) {
case UNIQUE_ID:
uniqueId = parser.text();
break;
case VERSION:
try {
version = Version.fromString(parser.text());
} catch (IllegalArgumentException e) {
throw e;
}
break;
default:
parser.skipChildren();
break;
}
}
if (Strings.isNullOrEmpty(uniqueId)) {
throw new IOException("Required field [uniqueId] is missing in the request for the dependent extension");
} else if (version == null) {
throw new IOException("Required field [version] is missing in the request for the dependent extension");
}
return new ExtensionDependency(uniqueId, version);

}

/**
* The uniqueId of the dependency extension
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public static enum OpenSearchRequestType {
/**
* Instantiate a new ExtensionsManager object to handle requests and responses from extensions. This is called during Node bootstrap.
*
* @param additionalSettings Additional settings to read in from extensions.yml
* @param additionalSettings Additional settings to read in from extension initialization request
* @throws IOException If the extensions discovery file is not properly retrieved.
*/
public ExtensionsManager(Set<Setting<?>> additionalSettings) throws IOException {
Expand Down Expand Up @@ -504,4 +504,8 @@ void setAddSettingsUpdateConsumerRequestHandler(AddSettingsUpdateConsumerRequest
Settings getEnvironmentSettings() {
return environmentSettings;
}

public Set<Setting<?>> getAdditionalSettings() {
return this.additionalSettings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@

package org.opensearch.extensions.rest;

import org.opensearch.Version;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.extensions.ExtensionDependency;
import org.opensearch.extensions.ExtensionScopedSettings;
import org.opensearch.extensions.ExtensionsManager;
Expand All @@ -23,12 +28,16 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.rest.RestRequest.Method.POST;

/**
Expand Down Expand Up @@ -62,36 +71,79 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client
String openSearchVersion = null;
String minimumCompatibleVersion = null;
List<ExtensionDependency> dependencies = new ArrayList<>();
Set<String> additionalSettingsKeys = extensionsManager.getAdditionalSettings()
.stream()
.map(s -> s.getKey())
.collect(Collectors.toSet());

try (XContentParser parser = request.contentParser()) {
parser.nextToken();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String currentFieldName = parser.currentName();
parser.nextToken();
if ("name".equals(currentFieldName)) {
name = parser.text();
} else if ("uniqueId".equals(currentFieldName)) {
uniqueId = parser.text();
} else if ("hostAddress".equals(currentFieldName)) {
hostAddress = parser.text();
} else if ("port".equals(currentFieldName)) {
port = parser.text();
} else if ("version".equals(currentFieldName)) {
version = parser.text();
} else if ("opensearchVersion".equals(currentFieldName)) {
openSearchVersion = parser.text();
} else if ("minimumCompatibleVersion".equals(currentFieldName)) {
minimumCompatibleVersion = parser.text();
} else if ("dependencies".equals(currentFieldName)) {
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
dependencies.add(ExtensionDependency.parse(parser));
Tuple<? extends MediaType, Map<String, Object>> unreadExtensionTuple = XContentHelper.convertToMap(
request.content(),
false,
request.getXContentType().xContent().mediaType()
);
Map<String, Object> extensionMap = unreadExtensionTuple.v2();

ExtensionScopedSettings extAdditionalSettings = new ExtensionScopedSettings(extensionsManager.getAdditionalSettings());

try {
// checking to see whether any required fields are missing from extension initialization request or not
String[] requiredFields = {
"name",
"uniqueId",
"hostAddress",
"port",
"version",
"opensearchVersion",
"minimumCompatibleVersion" };
List<String> missingFields = Arrays.stream(requiredFields)
.filter(field -> !extensionMap.containsKey(field))
.collect(Collectors.toList());
if (!missingFields.isEmpty()) {
throw new IOException("Extension is missing these required fields : " + missingFields);
}

// Parse extension dependencies
List<ExtensionDependency> extensionDependencyList = new ArrayList<ExtensionDependency>();
if (extensionMap.get("dependencies") != null) {
List<HashMap<String, ?>> extensionDependencies = new ArrayList<>(
(Collection<HashMap<String, ?>>) extensionMap.get("dependencies")
);
for (HashMap<String, ?> dependency : extensionDependencies) {
if (Strings.isNullOrEmpty((String) dependency.get("uniqueId"))) {
throw new IOException("Required field [uniqueId] is missing in the request for the dependent extension");
} else if (dependency.get("version") == null) {
throw new IOException("Required field [version] is missing in the request for the dependent extension");
}
extensionDependencyList.add(
new ExtensionDependency(
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
dependency.get("uniqueId").toString(),
Version.fromString(dependency.get("version").toString())
)
);
}
}

owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
Map<String, ?> additionalSettingsMap = extensionMap.entrySet()
.stream()
.filter(kv -> additionalSettingsKeys.contains(kv.getKey()))
.collect(Collectors.toMap(map -> map.getKey(), map -> map.getValue()));

Settings.Builder output = Settings.builder();
output.loadFromMap(additionalSettingsMap);
extAdditionalSettings.applySettings(output.build());

// Create extension read from initialization request
name = extensionMap.get("name").toString();
uniqueId = extensionMap.get("uniqueId").toString();
hostAddress = extensionMap.get("hostAddress").toString();
port = extensionMap.get("port").toString();
version = extensionMap.get("version").toString();
openSearchVersion = extensionMap.get("opensearchVersion").toString();
minimumCompatibleVersion = extensionMap.get("minimumCompatibleVersion").toString();
dependencies = extensionDependencyList;
} catch (IOException e) {
throw new IOException("Missing attribute", e);
logger.warn("loading extension has been failed because of exception : " + e.getMessage());
return channel -> channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
}

Extension extension = new Extension(
Expand All @@ -103,8 +155,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client
openSearchVersion,
minimumCompatibleVersion,
dependencies,
// TODO add this to the API (https://github.com/opensearch-project/OpenSearch/issues/8032)
new ExtensionScopedSettings(Collections.emptySet())
extAdditionalSettings
);
try {
extensionsManager.loadExtension(extension);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterSettingsResponse;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.env.EnvironmentSettingsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -398,16 +396,6 @@ public void testExtensionDependency() throws Exception {
}
}

public void testParseExtensionDependency() throws Exception {
XContentParser parser = createParser(JsonXContent.jsonXContent, "{\"uniqueId\": \"test1\", \"version\": \"2.0.0\"}");

assertEquals(XContentParser.Token.START_OBJECT, parser.nextToken());
ExtensionDependency dependency = ExtensionDependency.parse(parser);

assertEquals("test1", dependency.getUniqueId());
assertEquals(Version.fromString("2.0.0"), dependency.getVersion());
}

public void testInitialize() throws Exception {
ExtensionsManager extensionsManager = new ExtensionsManager(Set.of());

Expand Down
Loading