diff --git a/CHANGELOG-3.0.md b/CHANGELOG-3.0.md index 964383078c38d..cd899bec28525 100644 --- a/CHANGELOG-3.0.md +++ b/CHANGELOG-3.0.md @@ -24,6 +24,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add task completion count in search backpressure stats API ([#10028](https://github.com/opensearch-project/OpenSearch/pull/10028/)) - Deprecate CamelCase `PathHierarchy` tokenizer name in favor to lowercase `path_hierarchy` ([#10894](https://github.com/opensearch-project/OpenSearch/pull/10894)) - Breaking change: Do not request "search_pipelines" metrics by default in NodesInfoRequest ([#12497](https://github.com/opensearch-project/OpenSearch/pull/12497)) +- [QueryGroup] Add QueryGroup CRUD APIs ([#13315](https://github.com/opensearch-project/OpenSearch/pull/13315)) +- [QueryGroup] Add QueryGroup Create API ([#14459](https://github.com/opensearch-project/OpenSearch/pull/14459)) ### Deprecated diff --git a/plugins/workload-management/build.gradle b/plugins/workload-management/build.gradle new file mode 100644 index 0000000000000..e91ef90005c74 --- /dev/null +++ b/plugins/workload-management/build.gradle @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +opensearchplugin { + description 'OpenSearch Workload Management Plugin.' + classname 'org.opensearch.plugin.wlm.WorkloadManagementPlugin' +} + +dependencies { +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/CreateQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/CreateQueryGroupAction.java new file mode 100644 index 0000000000000..068f7822fd437 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/CreateQueryGroupAction.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm; + +import org.opensearch.action.ActionType; + +/** + * Rest action to create QueryGroup + * + * @opensearch.api + */ +public class CreateQueryGroupAction extends ActionType { + + /** + * An instance of CreateQueryGroupAction + */ + public static final CreateQueryGroupAction INSTANCE = new CreateQueryGroupAction(); + + /** + * Name for CreateQueryGroupAction + */ + public static final String NAME = "cluster:admin/opensearch/query_group/wlm/_create"; + + /** + * Default constructor + */ + private CreateQueryGroupAction() { + super(NAME, CreateQueryGroupResponse::new); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/CreateQueryGroupRequest.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/CreateQueryGroupRequest.java new file mode 100644 index 0000000000000..675e471e38581 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/CreateQueryGroupRequest.java @@ -0,0 +1,332 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.cluster.metadata.QueryGroup.QueryGroupMode; +import org.opensearch.common.UUIDs; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.XContentParser; +import org.joda.time.Instant; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; + +import static org.opensearch.cluster.metadata.QueryGroup.ALLOWED_RESOURCES; +import static org.opensearch.cluster.metadata.QueryGroup.MAX_CHARS_ALLOWED_IN_NAME; + +/** + * A request for create QueryGroup + * User input schema: + * { + * "jvm": 0.4, + * "mode": "enforced", + * "name": "analytics" + * } + * + * @opensearch.internal + */ +public class CreateQueryGroupRequest extends ActionRequest implements Writeable.Reader { + private String name; + private String _id; + private QueryGroupMode mode; + private Map resourceLimits; + private long updatedAtInMillis; + + /** + * Default constructor for CreateQueryGroupRequest + */ + public CreateQueryGroupRequest() {} + + /** + * Constructor for CreateQueryGroupRequest + * @param queryGroup - A {@link QueryGroup} object + */ + public CreateQueryGroupRequest(QueryGroup queryGroup) { + this.name = queryGroup.getName(); + this._id = queryGroup.get_id(); + this.resourceLimits = queryGroup.getResourceLimits(); + this.mode = queryGroup.getMode(); + this.updatedAtInMillis = queryGroup.getUpdatedAtInMillis(); + verifyCreateQueryGroupRequest(name, _id, mode, resourceLimits, updatedAtInMillis); + } + + /** + * Constructor for CreateQueryGroupRequest + * @param name - QueryGroup name for CreateQueryGroupRequest + * @param _id - QueryGroup _id for CreateQueryGroupRequest + * @param mode - QueryGroup mode for CreateQueryGroupRequest + * @param resourceLimits - QueryGroup resourceLimits for CreateQueryGroupRequest + * @param updatedAtInMillis - QueryGroup updated time in millis for CreateQueryGroupRequest + */ + public CreateQueryGroupRequest( + String name, + String _id, + QueryGroupMode mode, + Map resourceLimits, + long updatedAtInMillis + ) { + this.name = name; + this._id = _id; + this.resourceLimits = resourceLimits; + this.mode = mode; + this.updatedAtInMillis = updatedAtInMillis; + verifyCreateQueryGroupRequest(name, _id, mode, resourceLimits, updatedAtInMillis); + } + + /** + * Constructor for CreateQueryGroupRequest + * @param in - A {@link StreamInput} object + */ + public CreateQueryGroupRequest(StreamInput in) throws IOException { + super(in); + name = in.readString(); + _id = in.readString(); + mode = QueryGroupMode.fromName(in.readString()); + resourceLimits = in.readMap(); + updatedAtInMillis = in.readLong(); + verifyCreateQueryGroupRequest(name, _id, mode, resourceLimits, updatedAtInMillis); + } + + /** + * Verification for CreateQueryGroupRequest + * @param name - QueryGroup name for CreateQueryGroupRequest + * @param _id - QueryGroup _id for CreateQueryGroupRequest + * @param mode - QueryGroup mode for CreateQueryGroupRequest + * @param resourceLimits - QueryGroup resourceLimits for CreateQueryGroupRequest + * @param updatedAtInMillis - QueryGroup updated time in millis for CreateQueryGroupRequest + */ + private void verifyCreateQueryGroupRequest( + String name, + String _id, + QueryGroupMode mode, + Map resourceLimits, + long updatedAtInMillis + ) { + Objects.requireNonNull(_id, "QueryGroup._id can't be null"); + Objects.requireNonNull(name, "QueryGroup.name can't be null"); + Objects.requireNonNull(resourceLimits, "QueryGroup.resourceLimits can't be null"); + Objects.requireNonNull(mode, "QueryGroup.mode can't be null"); + Objects.requireNonNull(updatedAtInMillis, "QueryGroup.updatedAtInMillis can't be null"); + + validateName(name); + validateMode(mode); + validateResourceLimits(resourceLimits); + validateUpdatedAtInMillis(updatedAtInMillis); + } + + /** + * Verification for CreateQueryGroupRequest.name + * @param name - name to be verified + */ + public static void validateName(String name) { + if (name.isEmpty()) { + throw new IllegalArgumentException("QueryGroup.name cannot be empty"); + } + if (name.length() > MAX_CHARS_ALLOWED_IN_NAME) { + throw new IllegalArgumentException("QueryGroup.name shouldn't be more than 50 chars long"); + } + if (name.startsWith("-") || name.startsWith("_")) { + throw new IllegalArgumentException("QueryGroup.name cannot start with '_' or '-'."); + } + if (name.matches(".*[ ,:\"*+/\\\\|?#><].*")) { + throw new IllegalArgumentException("QueryGroup.name can't contain spaces, commas, quotes, slashes, :, *, +, |, ?, #, >, or <"); + } + } + + /** + * Verification for CreateQueryGroupRequest.mode + * @param mode - mode to be verified + */ + public static void validateMode(QueryGroupMode mode) { + if (!mode.getName().equals("monitor")) { + throw new IllegalArgumentException("QueryGroup.mode must be monitor"); + } + } + + /** + * Verification for CreateQueryGroupRequest.resourceLimits + * @param resourceLimits - resourceLimits to be verified + */ + public static void validateResourceLimits(Map resourceLimits) { + if (resourceLimits.isEmpty()) { + throw new IllegalArgumentException("QueryGroup.resourceLimits should at least have 1 resource limit."); + } + for (Map.Entry resource : resourceLimits.entrySet()) { + String resourceName = resource.getKey(); + Double threshold = (Double) resource.getValue(); + Objects.requireNonNull(resourceName, "resourceName can't be null"); + Objects.requireNonNull(threshold, "resource value can't be null"); + + if (threshold < 0 || threshold > 1) { + throw new IllegalArgumentException("Resource value should be between 0 and 1"); + } + String str = String.valueOf(threshold); + if (str.contains(".") && str.split("\\.")[1].length() > 2) { + throw new IllegalArgumentException("Resource value should have at most two digits after the decimal point"); + } + if (!ALLOWED_RESOURCES.contains(resourceName.toLowerCase(Locale.ROOT))) { + throw new IllegalArgumentException( + "resource has to be valid, valid resources are: " + ALLOWED_RESOURCES.stream().reduce((x, e) -> x + ", " + e).get() + ); + } + } + } + + /** + * Verification for CreateQueryGroupRequest.updatedAtInMillis + * @param updatedAt - updated time to be verified + */ + public static void validateUpdatedAtInMillis(long updatedAt) { + long minValidTimestamp = Instant.ofEpochMilli(0L).getMillis(); + long currentSeconds = Instant.now().getMillis(); + if (minValidTimestamp > updatedAt || updatedAt > currentSeconds) { + throw new IllegalArgumentException("QueryGroup.updatedAtInMillis is not a valid epoch"); + } + } + + @Override + public CreateQueryGroupRequest read(StreamInput in) throws IOException { + return new CreateQueryGroupRequest(in); + } + + /** + * Generate a CreateQueryGroupRequest from XContent + * @param parser - A {@link XContentParser} object + */ + public static CreateQueryGroupRequest fromXContent(XContentParser parser) throws IOException { + + while (parser.currentToken() != XContentParser.Token.START_OBJECT) { + parser.nextToken(); + } + + if (parser.currentToken() != XContentParser.Token.START_OBJECT) { + throw new IllegalArgumentException("expected start object but got a " + parser.currentToken()); + } + + XContentParser.Token token; + String fieldName = ""; + String name = null; + String _id = UUIDs.randomBase64UUID(); + QueryGroupMode mode = null; + long updatedAtInMillis = Instant.now().getMillis(); + + // Map to hold resources + final Map resourceLimits_ = new HashMap<>(); + while ((token = parser.nextToken()) != null) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token.isValue()) { + if (fieldName.equals("name")) { + name = parser.text(); + } else if (fieldName.equals("mode")) { + mode = QueryGroupMode.fromName(parser.text()); + } else if (ALLOWED_RESOURCES.contains(fieldName)) { + resourceLimits_.put(fieldName, parser.doubleValue()); + } else { + throw new IllegalArgumentException("unrecognised [field=" + fieldName + " in QueryGroup"); + } + } + } + return new CreateQueryGroupRequest(name, _id, mode, resourceLimits_, updatedAtInMillis); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + /** + * name getter + */ + public String getName() { + return name; + } + + /** + * name setter + * @param name - name to be set + */ + public void setName(String name) { + this.name = name; + } + + /** + * resourceLimits getter + */ + public Map getResourceLimits() { + return resourceLimits; + } + + /** + * resourceLimits setter + * @param resourceLimits - resourceLimit to be set + */ + public void setResourceLimits(Map resourceLimits) { + this.resourceLimits = resourceLimits; + } + + /** + * mode getter + */ + public QueryGroupMode getMode() { + return mode; + } + + /** + * mode setter + * @param mode - mode to be set + */ + public void setMode(QueryGroupMode mode) { + this.mode = mode; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + QueryGroup.writeToOutputStream(out, name, _id, mode, resourceLimits, updatedAtInMillis); + } + + /** + * _id getter + */ + public String get_id() { + return _id; + } + + /** + * UUID setter + * @param _id - _id to be set + */ + public void set_id(String _id) { + this._id = _id; + } + + /** + * updatedAtInMillis getter + */ + public long getUpdatedAtInMillis() { + return updatedAtInMillis; + } + + /** + * updatedAtInMillis setter + * @param updatedAtInMillis - updatedAtInMillis to be set + */ + public void setUpdatedAtInMillis(long updatedAtInMillis) { + this.updatedAtInMillis = updatedAtInMillis; + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/CreateQueryGroupResponse.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/CreateQueryGroupResponse.java new file mode 100644 index 0000000000000..ea112f95f1097 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/CreateQueryGroupResponse.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm; + +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * Response for the create API for QueryGroup + * + * @opensearch.internal + */ +public class CreateQueryGroupResponse extends ActionResponse implements ToXContent, ToXContentObject { + private final QueryGroup queryGroup; + private RestStatus restStatus; + + /** + * Constructor for CreateQueryGroupResponse + * @param queryGroup - The QueryGroup to be created + * @param restStatus - The resStatus for the response + */ + public CreateQueryGroupResponse(final QueryGroup queryGroup, RestStatus restStatus) { + this.queryGroup = queryGroup; + this.restStatus = restStatus; + } + + /** + * Constructor for CreateQueryGroupResponse + * @param in - A {@link StreamInput} object + */ + public CreateQueryGroupResponse(StreamInput in) throws IOException { + queryGroup = new QueryGroup(in); + restStatus = RestStatus.readFrom(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + queryGroup.writeTo(out); + RestStatus.writeTo(out, restStatus); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("name", queryGroup.getName()); + builder.field("mode", queryGroup.getMode().getName()); + builder.field("updatedAt", queryGroup.getUpdatedAtInMillis()); + builder.mapContents(queryGroup.getResourceLimits()); + builder.endObject(); + return builder; + } + + /** + * queryGroup getter + */ + public QueryGroup getQueryGroup() { + return queryGroup; + } + + /** + * restStatus getter + */ + public RestStatus getRestStatus() { + return restStatus; + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/TransportCreateQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/TransportCreateQueryGroupAction.java new file mode 100644 index 0000000000000..1454ad6a0865c --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/TransportCreateQueryGroupAction.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm; + +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.plugin.wlm.service.Persistable; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import static org.opensearch.cluster.metadata.QueryGroup.builder; + +/** + * Transport action for create QueryGroup + * + * @opensearch.internal + */ +public class TransportCreateQueryGroupAction extends HandledTransportAction { + + private final ThreadPool threadPool; + private final Persistable queryGroupPersistenceService; + + /** + * Constructor for TransportCreateQueryGroupAction + * + * @param actionName - action name + * @param transportService - a {@link TransportService} object + * @param actionFilters - a {@link ActionFilters} object + * @param threadPool - a {@link ThreadPool} object + * @param queryGroupPersistenceService - a {@link Persistable} object + */ + @Inject + public TransportCreateQueryGroupAction( + String actionName, + TransportService transportService, + ActionFilters actionFilters, + ThreadPool threadPool, + Persistable queryGroupPersistenceService + ) { + super(CreateQueryGroupAction.NAME, transportService, actionFilters, CreateQueryGroupRequest::new); + this.threadPool = threadPool; + this.queryGroupPersistenceService = queryGroupPersistenceService; + } + + @Override + protected void doExecute(Task task, CreateQueryGroupRequest request, ActionListener listener) { + QueryGroup queryGroup = builder().name(request.getName()) + ._id(request.get_id()) + .mode(request.getMode().getName()) + .resourceLimits(request.getResourceLimits()) + .updatedAt(request.getUpdatedAtInMillis()) + .build(); + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> queryGroupPersistenceService.persist(queryGroup, listener)); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPlugin.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPlugin.java new file mode 100644 index 0000000000000..065c2f3229d15 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPlugin.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm; + +import org.opensearch.action.ActionRequest; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.inject.Module; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.IndexScopedSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.plugin.wlm.rest.RestCreateQueryGroupAction; +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.rest.RestController; +import org.opensearch.rest.RestHandler; + +import java.util.Collection; +import java.util.List; +import java.util.function.Supplier; + +/** + * Plugin class for WorkloadManagement + */ +public class WorkloadManagementPlugin extends Plugin implements ActionPlugin { + + /** + * Default constructor + */ + public WorkloadManagementPlugin() {} + + @Override + public List> getActions() { + return List.of(new ActionPlugin.ActionHandler<>(CreateQueryGroupAction.INSTANCE, TransportCreateQueryGroupAction.class)); + } + + @Override + public List getRestHandlers( + Settings settings, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster + ) { + return List.of(new RestCreateQueryGroupAction()); + } + + @Override + public Collection createGuiceModules() { + return List.of(new WorkloadManagementPluginModule()); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPluginModule.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPluginModule.java new file mode 100644 index 0000000000000..58208af293f14 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPluginModule.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm; + +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.common.inject.AbstractModule; +import org.opensearch.common.inject.TypeLiteral; +import org.opensearch.plugin.wlm.service.Persistable; +import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService; + +/** + * Guice Module to manage WorkloadManagement related objects + */ +public class WorkloadManagementPluginModule extends AbstractModule { + + /** + * Constructor for WorkloadManagementPluginModule + */ + public WorkloadManagementPluginModule() {} + + @Override + protected void configure() { + bind(new TypeLiteral>() { + }).to(QueryGroupPersistenceService.class).asEagerSingleton(); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/package-info.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/package-info.java new file mode 100644 index 0000000000000..73027f70bb294 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Base Package of CRUD API of QueryGroup + */ +package org.opensearch.plugin.wlm; diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestCreateQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestCreateQueryGroupAction.java new file mode 100644 index 0000000000000..48de27d4fc02a --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestCreateQueryGroupAction.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.rest; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.plugin.wlm.CreateQueryGroupAction; +import org.opensearch.plugin.wlm.CreateQueryGroupRequest; +import org.opensearch.plugin.wlm.CreateQueryGroupResponse; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.action.RestResponseListener; + +import java.io.IOException; +import java.util.List; + +import static org.opensearch.rest.RestRequest.Method.POST; +import static org.opensearch.rest.RestRequest.Method.PUT; + +/** + * Rest action to create a QueryGroup + * + * @opensearch.api + */ +public class RestCreateQueryGroupAction extends BaseRestHandler { + + /** + * Constructor for RestCreateQueryGroupAction + */ + public RestCreateQueryGroupAction() {} + + @Override + public String getName() { + return "create_query_group"; + } + + /** + * The list of {@link Route}s that this RestHandler is responsible for handling. + */ + @Override + public List routes() { + return List.of(new Route(POST, "_query_group/"), new Route(PUT, "_query_group/")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + CreateQueryGroupRequest createQueryGroupRequest = new CreateQueryGroupRequest(); + request.applyContentParser((parser) -> parseRestRequest(createQueryGroupRequest, parser)); + return channel -> client.execute(CreateQueryGroupAction.INSTANCE, createQueryGroupRequest, createQueryGroupResponse(channel)); + } + + private void parseRestRequest(CreateQueryGroupRequest request, XContentParser parser) throws IOException { + final CreateQueryGroupRequest createQueryGroupRequest = CreateQueryGroupRequest.fromXContent(parser); + request.setName(createQueryGroupRequest.getName()); + request.set_id(createQueryGroupRequest.get_id()); + request.setMode(createQueryGroupRequest.getMode()); + request.setResourceLimits(createQueryGroupRequest.getResourceLimits()); + request.setUpdatedAtInMillis(createQueryGroupRequest.getUpdatedAtInMillis()); + } + + private RestResponseListener createQueryGroupResponse(final RestChannel channel) { + return new RestResponseListener<>(channel) { + @Override + public RestResponse buildResponse(final CreateQueryGroupResponse response) throws Exception { + return new BytesRestResponse(RestStatus.OK, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS)); + } + }; + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/package-info.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/package-info.java new file mode 100644 index 0000000000000..6d1853f04ce58 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Package for the rest classes for QueryGroup CRUD operations + */ +package org.opensearch.plugin.wlm.rest; diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/Persistable.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/Persistable.java new file mode 100644 index 0000000000000..37a6b4a58bc17 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/Persistable.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.service; + +import org.opensearch.core.action.ActionListener; +import org.opensearch.plugin.wlm.CreateQueryGroupResponse; + +/** + * This interface defines the key APIs for implementing QueruGroup persistence + */ +public interface Persistable { + + /** + * persists the QueryGroup in a durable storage + * @param queryGroup + * @param listener + */ + void persist(T queryGroup, ActionListener listener); +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java new file mode 100644 index 0000000000000..768dfcdd22fa1 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java @@ -0,0 +1,236 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler.ThrottlingKey; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.plugin.wlm.CreateQueryGroupResponse; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.DoubleAdder; + +import static org.opensearch.search.query_group.QueryGroupServiceSettings.MAX_QUERY_GROUP_COUNT; +import static org.opensearch.search.query_group.QueryGroupServiceSettings.QUERY_GROUP_COUNT_SETTING_NAME; + +/** + * This class defines the functions for QueryGroup persistence + */ +public class QueryGroupPersistenceService implements Persistable { + private static final Logger logger = LogManager.getLogger(QueryGroupPersistenceService.class); + private final ClusterService clusterService; + private static final String SOURCE = "query-group-persistence-service"; + private static final String CREATE_QUERY_GROUP_THROTTLING_KEY = "create-query-group"; + private static final String UPDATE_QUERY_GROUP_THROTTLING_KEY = "update-query-group"; + private static final String DELETE_QUERY_GROUP_THROTTLING_KEY = "delete-query-group"; + private final AtomicInteger inflightCreateQueryGroupRequestCount; + private final Map inflightResourceLimitValues; + private volatile int maxQueryGroupCount; + final ThrottlingKey createQueryGroupThrottlingKey; + final ThrottlingKey updateQueryGroupThrottlingKey; + final ThrottlingKey deleteQueryGroupThrottlingKey; + + /** + * Constructor for QueryGroupPersistenceService + * + * @param clusterService {@link ClusterService} - The cluster service to be used by QueryGroupPersistenceService + * @param settings {@link Settings} - The settings to be used by QueryGroupPersistenceService + * @param clusterSettings {@link ClusterSettings} - The cluster settings to be used by QueryGroupPersistenceService + */ + @Inject + public QueryGroupPersistenceService( + final ClusterService clusterService, + final Settings settings, + final ClusterSettings clusterSettings + ) { + this.clusterService = clusterService; + this.createQueryGroupThrottlingKey = clusterService.registerClusterManagerTask(CREATE_QUERY_GROUP_THROTTLING_KEY, true); + this.deleteQueryGroupThrottlingKey = clusterService.registerClusterManagerTask(DELETE_QUERY_GROUP_THROTTLING_KEY, true); + this.updateQueryGroupThrottlingKey = clusterService.registerClusterManagerTask(UPDATE_QUERY_GROUP_THROTTLING_KEY, true); + maxQueryGroupCount = MAX_QUERY_GROUP_COUNT.get(settings); + clusterSettings.addSettingsUpdateConsumer(MAX_QUERY_GROUP_COUNT, this::setMaxQueryGroupCount); + inflightCreateQueryGroupRequestCount = new AtomicInteger(); + inflightResourceLimitValues = new HashMap<>(); + } + + /** + * Set maxQueryGroupCount to be newMaxQueryGroupCount + * @param newMaxQueryGroupCount - the max number of QueryGroup allowed + */ + public void setMaxQueryGroupCount(int newMaxQueryGroupCount) { + if (newMaxQueryGroupCount < 0) { + throw new IllegalArgumentException("node.query_group.max_count can't be negative"); + } + this.maxQueryGroupCount = newMaxQueryGroupCount; + } + + @Override + public void persist(QueryGroup queryGroup, ActionListener listener) { + persistInClusterStateMetadata(queryGroup, listener); + } + + /** + * Update cluster state to include the new QueryGroup + * @param queryGroup {@link QueryGroup} - the QueryGroup we're currently creating + */ + void persistInClusterStateMetadata(QueryGroup queryGroup, ActionListener listener) { + clusterService.submitStateUpdateTask(SOURCE, new ClusterStateUpdateTask(Priority.URGENT) { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return saveQueryGroupInClusterState(queryGroup, currentState); + } + + @Override + public ThrottlingKey getClusterManagerThrottlingKey() { + return createQueryGroupThrottlingKey; + } + + @Override + public void onFailure(String source, Exception e) { + restoreInflightValues(queryGroup.getResourceLimits()); + inflightCreateQueryGroupRequestCount.decrementAndGet(); + logger.warn("failed to save QueryGroup object due to error: {}, for source: {}", e.getMessage(), source); + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + restoreInflightValues(queryGroup.getResourceLimits()); + inflightCreateQueryGroupRequestCount.decrementAndGet(); + CreateQueryGroupResponse response = new CreateQueryGroupResponse(queryGroup, RestStatus.OK); + listener.onResponse(response); + } + }); + } + + /** + * Get the allocation value for resourceName for the QueryGroup + * @param resourceName - the resourceName we want to get the usage for + * @param resourceLimits - the resource limit from which to get the allocation value for resourceName + */ + private double getResourceLimitValue(String resourceName, final Map resourceLimits) { + return (double) resourceLimits.getOrDefault(resourceName, 0.0); + } + + /** + * This method will be executed before we submit the new cluster state + * @param queryGroup - the QueryGroup we're currently creating + * @param currentClusterState - the cluster state before the update + */ + ClusterState saveQueryGroupInClusterState(final QueryGroup queryGroup, final ClusterState currentClusterState) { + final Metadata metadata = currentClusterState.metadata(); + String groupName = queryGroup.getName(); + final Set previousGroups = metadata.queryGroups(); + + // check if there's any resource allocation that exceed limit of 1.0 + String resourceNameWithThresholdExceeded = ""; + for (String resourceName : queryGroup.getResourceLimits().keySet()) { + double existingUsage = calculateExistingUsage(resourceName, previousGroups, groupName); + double newGroupUsage = getResourceLimitValue(resourceName, queryGroup.getResourceLimits()); + inflightResourceLimitValues.computeIfAbsent(resourceName, k -> new DoubleAdder()).add(newGroupUsage); + double totalUsage = existingUsage + inflightResourceLimitValues.get(resourceName).doubleValue(); + if (totalUsage > 1) { + resourceNameWithThresholdExceeded = resourceName; + } + } + // check if group count exceed max + boolean groupCountExceeded = inflightCreateQueryGroupRequestCount.incrementAndGet() + previousGroups.size() > maxQueryGroupCount; + + if (previousGroups.stream().anyMatch(group -> group.getName().equals(groupName))) { + logger.warn("QueryGroup with name {} already exists. Not creating a new one.", groupName); + throw new RuntimeException("QueryGroup with name " + groupName + " already exists. Not creating a new one."); + } + if (!resourceNameWithThresholdExceeded.isEmpty()) { + logger.error("Total resource allocation for {} will go above the max limit of 1.0", resourceNameWithThresholdExceeded); + throw new RuntimeException( + "Total resource allocation for " + resourceNameWithThresholdExceeded + " will go above the max limit of 1.0" + ); + } + if (groupCountExceeded) { + logger.error("{} value exceeded its assigned limit of {}", QUERY_GROUP_COUNT_SETTING_NAME, maxQueryGroupCount); + throw new RuntimeException("Can't create more than " + maxQueryGroupCount + " QueryGroups in the system"); + } + Metadata newData = Metadata.builder(metadata).put(queryGroup).build(); + + return ClusterState.builder(currentClusterState).metadata(newData).build(); + } + + /** + * This method restores the inflight values to be before the QueryGroup is processed + * @param resourceLimits - the resourceLimits we're currently restoring + */ + void restoreInflightValues(Map resourceLimits) { + if (resourceLimits == null || resourceLimits.isEmpty()) { + return; + } + for (String currResourceName : resourceLimits.keySet()) { + inflightResourceLimitValues.get(currResourceName).add(-getResourceLimitValue(currResourceName, resourceLimits)); + } + } + + /** + * This method calculates the existing total usage of the resource (except the group that we're updating here) + * @param resourceName - the resource name we're calculating + * @param groupsList - existing QueryGroups + * @param groupName - the QueryGroup name we're updating + */ + private double calculateExistingUsage(String resourceName, Set groupsList, String groupName) { + double existingUsage = 0; + for (QueryGroup group : groupsList) { + if (!group.getName().equals(groupName)) { + existingUsage += getResourceLimitValue(resourceName, group.getResourceLimits()); + } + } + return existingUsage; + } + + List getFromClusterStateMetadata(String name, ClusterState currentState) { + Set currentGroups = currentState.getMetadata().queryGroups(); + List resultGroups = new ArrayList<>(); + if (name == null || name.isEmpty()) { + resultGroups = new ArrayList<>(currentGroups); + } + Optional findResultGroups = currentGroups.stream().filter(group -> group.getName().equals(name)).findFirst(); + if (findResultGroups.isPresent()) { + resultGroups = List.of(findResultGroups.get()); + } + return resultGroups; + } + + /** + * inflightCreateQueryGroupRequestCount getter + */ + public AtomicInteger getInflightCreateQueryGroupRequestCount() { + return inflightCreateQueryGroupRequestCount; + } + + /** + * inflightResourceLimitValues getter + */ + public Map getInflightResourceLimitValues() { + return inflightResourceLimitValues; + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/package-info.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/package-info.java new file mode 100644 index 0000000000000..ecfc94568888a --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Package for the service classes for QueryGroup CRUD operations + */ +package org.opensearch.plugin.wlm.service; diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/CreateQueryGroupRequestTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/CreateQueryGroupRequestTests.java new file mode 100644 index 0000000000000..09fce8d911d96 --- /dev/null +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/CreateQueryGroupRequestTests.java @@ -0,0 +1,96 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm; + +import org.opensearch.cluster.metadata.QueryGroup.QueryGroupMode; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.MONITOR; +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.NAME_ONE; +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.TIMESTAMP_ONE; +import static org.opensearch.plugin.wlm.QueryGroupTestUtils._ID_ONE; +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.compareResourceLimits; +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupOne; + +public class CreateQueryGroupRequestTests extends OpenSearchTestCase { + + public void testSerialization() throws IOException { + CreateQueryGroupRequest request = new CreateQueryGroupRequest(queryGroupOne); + BytesStreamOutput out = new BytesStreamOutput(); + request.writeTo(out); + StreamInput streamInput = out.bytes().streamInput(); + CreateQueryGroupRequest otherRequest = new CreateQueryGroupRequest(streamInput); + assertEquals(request.getName(), otherRequest.getName()); + assertEquals(request.getResourceLimits().size(), otherRequest.getResourceLimits().size()); + assertEquals(request.getMode(), otherRequest.getMode()); + compareResourceLimits(request.getResourceLimits(), otherRequest.getResourceLimits()); + assertEquals(request.getUpdatedAtInMillis(), otherRequest.getUpdatedAtInMillis()); + } + + public void testInvalidName() { + assertThrows( + IllegalArgumentException.class, + () -> new CreateQueryGroupRequest("", _ID_ONE, QueryGroupMode.fromName(MONITOR), Map.of("jvm", 0.3), TIMESTAMP_ONE) + ); + assertThrows( + IllegalArgumentException.class, + () -> new CreateQueryGroupRequest("-test", _ID_ONE, QueryGroupMode.fromName(MONITOR), Map.of("jvm", 0.3), TIMESTAMP_ONE) + ); + assertThrows( + IllegalArgumentException.class, + () -> new CreateQueryGroupRequest(":test", _ID_ONE, QueryGroupMode.fromName(MONITOR), Map.of("jvm", 0.3), TIMESTAMP_ONE) + ); + } + + public void testInvalidResourceLimitList() { + assertThrows( + IllegalArgumentException.class, + () -> new CreateQueryGroupRequest(NAME_ONE, _ID_ONE, QueryGroupMode.fromName(MONITOR), new HashMap<>(), TIMESTAMP_ONE) + ); + + assertThrows( + IllegalArgumentException.class, + () -> new CreateQueryGroupRequest( + NAME_ONE, + _ID_ONE, + QueryGroupMode.fromName(MONITOR), + Map.of("jvm", 0.3, "jvm", 0.4), + TIMESTAMP_ONE + ) + ); + + assertThrows( + IllegalArgumentException.class, + () -> new CreateQueryGroupRequest(NAME_ONE, _ID_ONE, QueryGroupMode.fromName(MONITOR), Map.of("jvm", -0.3), TIMESTAMP_ONE) + ); + + assertThrows( + IllegalArgumentException.class, + () -> new CreateQueryGroupRequest(NAME_ONE, _ID_ONE, QueryGroupMode.fromName(MONITOR), Map.of("jvm", 12.0), TIMESTAMP_ONE) + ); + + assertThrows( + IllegalArgumentException.class, + () -> new CreateQueryGroupRequest(NAME_ONE, _ID_ONE, QueryGroupMode.fromName(MONITOR), Map.of("cpu", 0.3), TIMESTAMP_ONE) + ); + } + + public void testInvalidEnforcement() { + assertThrows( + IllegalArgumentException.class, + () -> new CreateQueryGroupRequest(NAME_ONE, _ID_ONE, QueryGroupMode.fromName("random"), Map.of("jvm", 0.3), TIMESTAMP_ONE) + ); + } +} diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/CreateQueryGroupResponseTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/CreateQueryGroupResponseTests.java new file mode 100644 index 0000000000000..c5d570427faa5 --- /dev/null +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/CreateQueryGroupResponseTests.java @@ -0,0 +1,33 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm; + +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.List; + +public class CreateQueryGroupResponseTests extends OpenSearchTestCase { + + public void testSerialization() throws IOException { + CreateQueryGroupResponse response = new CreateQueryGroupResponse(QueryGroupTestUtils.queryGroupOne, RestStatus.OK); + BytesStreamOutput out = new BytesStreamOutput(); + response.writeTo(out); + StreamInput streamInput = out.bytes().streamInput(); + CreateQueryGroupResponse otherResponse = new CreateQueryGroupResponse(streamInput); + assertEquals(response.getRestStatus(), otherResponse.getRestStatus()); + QueryGroup responseGroup = response.getQueryGroup(); + QueryGroup otherResponseGroup = otherResponse.getQueryGroup(); + QueryGroupTestUtils.compareQueryGroups(List.of(responseGroup), List.of(otherResponseGroup)); + } +} diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/QueryGroupTestUtils.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/QueryGroupTestUtils.java new file mode 100644 index 0000000000000..f40fe929d9fd6 --- /dev/null +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/QueryGroupTestUtils.java @@ -0,0 +1,125 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm; + +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService; +import org.opensearch.threadpool.ThreadPool; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.DoubleAdder; +import java.util.stream.Collectors; + +import static org.opensearch.cluster.metadata.QueryGroup.builder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +public class QueryGroupTestUtils { + public static final String SANDBOX_MAX_SETTING_NAME = "node.sandbox.max_count"; + public static final String NAME_ONE = "sandbox_one"; + public static final String NAME_TWO = "sandbox_two"; + public static final String _ID_ONE = "AgfUO5Ja9yfsYlONlYi3TQ=="; + public static final String _ID_TWO = "G5iIqHy4g7eK1qIAAAAIH53=1"; + public static final String NAME_NONE_EXISTED = "sandbox_none_existed"; + public static final String MONITOR = "monitor"; + public static final long TIMESTAMP_ONE = 4513232413L; + public static final long TIMESTAMP_TWO = 4513232415L; + public static final QueryGroup queryGroupOne = builder().name(NAME_ONE) + ._id(_ID_ONE) + .mode(MONITOR) + .resourceLimits(Map.of("jvm", 0.3)) + .updatedAt(TIMESTAMP_ONE) + .build(); + + public static final QueryGroup queryGroupTwo = builder().name(NAME_TWO) + ._id(_ID_TWO) + .mode(MONITOR) + .resourceLimits(Map.of("jvm", 0.6)) + .updatedAt(TIMESTAMP_TWO) + .build(); + + public static final Map queryGroupMap = Map.of(NAME_ONE, queryGroupOne, NAME_TWO, queryGroupTwo); + + public static List queryGroupList() { + return List.of(queryGroupOne, queryGroupTwo); + } + + public static ClusterState clusterState() { + final Metadata metadata = Metadata.builder().queryGroups(new HashSet<>(queryGroupList())).build(); + return ClusterState.builder(new ClusterName("_name")).metadata(metadata).build(); + } + + public static Settings settings() { + return Settings.builder().build(); + } + + public static ClusterSettings clusterSettings() { + return new ClusterSettings(settings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + } + + public static QueryGroupPersistenceService queryGroupPersistenceService() { + ClusterService clusterService = new ClusterService(settings(), clusterSettings(), mock(ThreadPool.class)); + return new QueryGroupPersistenceService(clusterService, settings(), clusterSettings()); + } + + public static List prepareSandboxPersistenceService(List queryGroups) { + Metadata metadata = Metadata.builder().queryGroups(new HashSet<>(queryGroups)).build(); + Settings settings = Settings.builder().build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = new ClusterService(settings, clusterSettings, mock(ThreadPool.class)); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).metadata(metadata).build(); + QueryGroupPersistenceService queryGroupPersistenceService = new QueryGroupPersistenceService( + clusterService, + settings, + clusterSettings + ); + return List.of(queryGroupPersistenceService, clusterState); + } + + public static void compareResourceLimits(Map resourceLimitMapOne, Map resourceLimitMapTwo) { + for (String resourceName : resourceLimitMapOne.keySet()) { + assertTrue(resourceLimitMapTwo.containsKey(resourceName)); + assertEquals(resourceLimitMapOne.get(resourceName), resourceLimitMapTwo.get(resourceName)); + } + } + + public static void compareQueryGroups(List listOne, List listTwo) { + assertEquals(listOne.size(), listTwo.size()); + for (QueryGroup groupOne : listOne) { + String groupOneName = groupOne.getName(); + List groupTwoList = listTwo.stream().filter(sb -> sb.getName().equals(groupOneName)).collect(Collectors.toList()); + assertEquals(1, groupTwoList.size()); + QueryGroup groupTwo = groupTwoList.get(0); + assertEquals(groupOne.getName(), groupTwo.getName()); + assertEquals(groupOne.get_id(), groupTwo.get_id()); + compareResourceLimits(groupOne.getResourceLimits(), groupTwo.getResourceLimits()); + assertEquals(groupOne.getMode(), groupTwo.getMode()); + assertEquals(groupOne.getUpdatedAtInMillis(), groupTwo.getUpdatedAtInMillis()); + } + } + + public static void assertInflightValuesAreZero(QueryGroupPersistenceService queryGroupPersistenceService) { + assertEquals(0, queryGroupPersistenceService.getInflightCreateQueryGroupRequestCount().get()); + Map inflightResourceMap = queryGroupPersistenceService.getInflightResourceLimitValues(); + if (inflightResourceMap != null) { + for (String resourceName : inflightResourceMap.keySet()) { + assertEquals(0, inflightResourceMap.get(resourceName).intValue()); + } + } + } +} diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceServiceTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceServiceTests.java new file mode 100644 index 0000000000000..0f5e20f9b9474 --- /dev/null +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceServiceTests.java @@ -0,0 +1,118 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.service; + +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.opensearch.cluster.metadata.QueryGroup.builder; +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.MONITOR; +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.NAME_NONE_EXISTED; +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.NAME_ONE; +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.NAME_TWO; +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.SANDBOX_MAX_SETTING_NAME; +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.assertInflightValuesAreZero; +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.compareQueryGroups; +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.prepareSandboxPersistenceService; +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupList; +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupOne; +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupPersistenceService; +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupTwo; +import static org.mockito.Mockito.mock; + +public class QueryGroupPersistenceServiceTests extends OpenSearchTestCase { + + public void testCreateQueryGroup() { + List setup = prepareSandboxPersistenceService(new ArrayList<>()); + QueryGroupPersistenceService queryGroupPersistenceService1 = (QueryGroupPersistenceService) setup.get(0); + ClusterState clusterState = (ClusterState) setup.get(1); + ClusterState newClusterState = queryGroupPersistenceService1.saveQueryGroupInClusterState(queryGroupOne, clusterState); + Set updatedGroupsMap = newClusterState.getMetadata().queryGroups(); + assertEquals(1, updatedGroupsMap.size()); + Optional findUpdatedGroupOne = newClusterState.metadata() + .queryGroups() + .stream() + .filter(group -> group.getName().equals(NAME_ONE)) + .findFirst(); + assertTrue(findUpdatedGroupOne.isPresent()); + compareQueryGroups(List.of(queryGroupOne), List.of(findUpdatedGroupOne.get())); + assertInflightValuesAreZero(queryGroupPersistenceService()); + } + + public void testCreateAnotherQueryGroup() { + List setup = prepareSandboxPersistenceService(List.of(queryGroupOne)); + QueryGroupPersistenceService queryGroupPersistenceService1 = (QueryGroupPersistenceService) setup.get(0); + ClusterState clusterState = (ClusterState) setup.get(1); + ClusterState newClusterState = queryGroupPersistenceService1.saveQueryGroupInClusterState(queryGroupTwo, clusterState); + Set updatedGroups = newClusterState.getMetadata().queryGroups(); + assertEquals(2, updatedGroups.size()); + compareQueryGroups(queryGroupList(), new ArrayList<>(updatedGroups)); + assertInflightValuesAreZero(queryGroupPersistenceService()); + } + + public void testCreateQueryGroupDuplicateName() { + List setup = prepareSandboxPersistenceService(List.of(queryGroupOne)); + QueryGroupPersistenceService queryGroupPersistenceService1 = (QueryGroupPersistenceService) setup.get(0); + ClusterState clusterState = (ClusterState) setup.get(1); + QueryGroup toCreate = builder().name(NAME_ONE) + ._id("W5iIqHyhgi4K1qIAAAAIHw==") + .mode(MONITOR) + .resourceLimits(Map.of("jvm", 0.3)) + .updatedAt(1690934400000L) + .build(); + assertThrows(RuntimeException.class, () -> queryGroupPersistenceService1.saveQueryGroupInClusterState(toCreate, clusterState)); + } + + public void testCreateQueryGroupOverflowAllocation() { + List setup = prepareSandboxPersistenceService(List.of(queryGroupTwo)); + QueryGroup toCreate = builder().name(NAME_TWO) + ._id("W5iIqHyhgi4K1qIAAAAIHw==") + .mode(MONITOR) + .resourceLimits(Map.of("jvm", 0.5)) + .updatedAt(1690934400000L) + .build(); + QueryGroupPersistenceService queryGroupPersistenceService1 = (QueryGroupPersistenceService) setup.get(0); + ClusterState clusterState = (ClusterState) setup.get(1); + assertThrows(RuntimeException.class, () -> queryGroupPersistenceService1.saveQueryGroupInClusterState(toCreate, clusterState)); + } + + public void testCreateQueryGroupOverflowCount() { + QueryGroup toCreate = builder().name(NAME_NONE_EXISTED) + ._id("W5iIqHyhgi4K1qIAAAAIHw==") + .mode(MONITOR) + .resourceLimits(Map.of("jvm", 0.5)) + .updatedAt(1690934400000L) + .build(); + Metadata metadata = Metadata.builder().queryGroups(new HashSet<>(queryGroupList())).build(); + Settings settings = Settings.builder().put(SANDBOX_MAX_SETTING_NAME, 2).build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = new ClusterService(settings, clusterSettings, mock(ThreadPool.class)); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).metadata(metadata).build(); + QueryGroupPersistenceService queryGroupPersistenceService1 = new QueryGroupPersistenceService( + clusterService, + settings, + clusterSettings + ); + assertThrows(RuntimeException.class, () -> queryGroupPersistenceService1.saveQueryGroupInClusterState(toCreate, clusterState)); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index c7fd263bda56a..bb51c42252448 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -48,6 +48,7 @@ import org.opensearch.cluster.metadata.MetadataIndexTemplateService; import org.opensearch.cluster.metadata.MetadataMappingService; import org.opensearch.cluster.metadata.MetadataUpdateSettingsService; +import org.opensearch.cluster.metadata.QueryGroupMetadata; import org.opensearch.cluster.metadata.RepositoriesMetadata; import org.opensearch.cluster.metadata.ViewMetadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; @@ -214,6 +215,8 @@ public static List getNamedWriteables() { DecommissionAttributeMetadata::new, DecommissionAttributeMetadata::readDiffFrom ); + + registerMetadataCustom(entries, QueryGroupMetadata.TYPE, QueryGroupMetadata::new, QueryGroupMetadata::readDiffFrom); // Task Status (not Diffable) entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new)); return entries; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java index a0ef8de07fbf2..0657a74851636 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java @@ -844,6 +844,12 @@ public Map views() { return Optional.ofNullable((ViewMetadata) this.custom(ViewMetadata.TYPE)).map(ViewMetadata::views).orElse(Collections.emptyMap()); } + public Set queryGroups() { + return Optional.ofNullable((QueryGroupMetadata) this.custom(QueryGroupMetadata.TYPE)) + .map(QueryGroupMetadata::queryGroups) + .orElse(Collections.emptySet()); + } + public DecommissionAttributeMetadata decommissionAttributeMetadata() { return custom(DecommissionAttributeMetadata.TYPE); } @@ -1367,6 +1373,34 @@ public Builder removeDataStream(String name) { return this; } + public Builder queryGroups(final Set queryGroups) { + this.customs.put(QueryGroupMetadata.TYPE, new QueryGroupMetadata(queryGroups)); + return this; + } + + public Builder put(final QueryGroup queryGroup) { + Objects.requireNonNull(queryGroup, "queryGroup should not be null"); + Set existing = getQueryGroups(); + HashSet updatedGroups = new HashSet<>(existing); + updatedGroups.add(queryGroup); + return queryGroups(updatedGroups); + } + + public Builder remove(final QueryGroup queryGroup) { + Objects.requireNonNull(queryGroup, "queryGroup should not be null"); + Set existing = getQueryGroups(); + HashSet updatedGroups = new HashSet<>(existing); + updatedGroups.remove(queryGroup); + return queryGroups(updatedGroups); + } + + public Set getQueryGroups() { + return Optional.ofNullable(this.customs.get(QueryGroupMetadata.TYPE)) + .map(o -> (QueryGroupMetadata) o) + .map(QueryGroupMetadata::queryGroups) + .orElse(Collections.emptySet()); + } + private Map getViews() { return Optional.ofNullable(customs.get(ViewMetadata.TYPE)) .map(o -> (ViewMetadata) o) @@ -1763,8 +1797,8 @@ static void validateDataStreams(SortedMap indicesLooku for (DataStream ds : dsMetadata.dataStreams().values()) { String prefix = DataStream.BACKING_INDEX_PREFIX + ds.getName() + "-"; Set conflicts = indicesLookup.subMap(prefix, DataStream.BACKING_INDEX_PREFIX + ds.getName() + ".") // '.' is the - // char after - // '-' + // char after + // '-' .keySet() .stream() .filter(s -> NUMBER_PATTERN.matcher(s.substring(prefix.length())).matches()) diff --git a/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java b/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java new file mode 100644 index 0000000000000..87a6fc897e9c0 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java @@ -0,0 +1,315 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.metadata; + +import org.opensearch.cluster.AbstractDiffable; +import org.opensearch.cluster.Diff; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.joda.time.Instant; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; + +/** + * Class to define the QueryGroup schema + * { + * "fafjafjkaf9ag8a9ga9g7ag0aagaga" : { + * "jvm": 0.4, + * "mode": "enforced", + * "name": "analytics", + * "updatedAt": 4513232415 + * } + * } + */ +@ExperimentalApi +public class QueryGroup extends AbstractDiffable implements ToXContentObject { + + public static final int MAX_CHARS_ALLOWED_IN_NAME = 50; + private final String name; + private final String _id; + private final QueryGroupMode mode; + // It is an epoch in millis + private final long updatedAtInMillis; + private final Map resourceLimits; + + // list of resources that are allowed to be present in the QueryGroup schema + public static final List ALLOWED_RESOURCES = List.of("jvm"); + + public QueryGroup(String name, String _id, QueryGroupMode mode, Map resourceLimits, long updatedAt) { + Objects.requireNonNull(name, "QueryGroup.name can't be null"); + Objects.requireNonNull(resourceLimits, "QueryGroup.resourceLimits can't be null"); + Objects.requireNonNull(mode, "QueryGroup.mode can't be null"); + Objects.requireNonNull(_id, "QueryGroup._id can't be null"); + + if (name.length() > MAX_CHARS_ALLOWED_IN_NAME) { + throw new IllegalArgumentException("QueryGroup.name shouldn't be more than 50 chars long"); + } + + if (resourceLimits.isEmpty()) { + throw new IllegalArgumentException("QueryGroup.resourceLimits should at least have 1 resource limit"); + } + validateResourceLimits(resourceLimits); + if (!isValid(updatedAt)) { + throw new IllegalArgumentException("QueryGroup.updatedAtInMillis is not a valid epoch"); + } + + this.name = name; + this._id = _id; + this.mode = mode; + this.resourceLimits = resourceLimits; + this.updatedAtInMillis = updatedAt; + } + + private static boolean isValid(long updatedAt) { + long minValidTimestamp = Instant.ofEpochMilli(0L).getMillis(); + + // Use Instant.now() to get the current time in seconds since epoch + long currentSeconds = Instant.now().getMillis(); + + // Check if the timestamp is within a reasonable range + return minValidTimestamp <= updatedAt && updatedAt <= currentSeconds; + } + + public QueryGroup(StreamInput in) throws IOException { + this(in.readString(), in.readString(), QueryGroupMode.fromName(in.readString()), in.readMap(), in.readLong()); + } + + /** + * Write this into the {@linkplain StreamOutput}. + * + * @param out + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + writeToOutputStream(out, name, _id, mode, resourceLimits, updatedAtInMillis); + } + + public static void writeToOutputStream( + StreamOutput out, + String name, + String _id, + QueryGroupMode mode, + Map resourceLimits, + long updatedAtInMillis + ) throws IOException { + out.writeString(name); + out.writeString(_id); + out.writeString(mode.getName()); + out.writeMap(resourceLimits); + out.writeLong(updatedAtInMillis); + } + + private void validateResourceLimits(Map resourceLimits) { + for (Map.Entry resource : resourceLimits.entrySet()) { + String resourceName = resource.getKey(); + Double threshold = (Double) resource.getValue(); + Objects.requireNonNull(resourceName, "resourceName can't be null"); + Objects.requireNonNull(threshold, "resource value can't be null"); + + if (Double.compare(threshold, 1.0) > 0) { + throw new IllegalArgumentException("resource value should be less than 1.0"); + } + + if (!ALLOWED_RESOURCES.contains(resourceName.toLowerCase(Locale.ROOT))) { + throw new IllegalArgumentException( + "resource has to be valid, valid resources " + ALLOWED_RESOURCES.stream().reduce((x, e) -> x + ", " + e).get() + ); + } + } + } + + /** + * @param builder + * @param params + * @return + * @throws IOException + */ + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + builder.startObject(this._id); + builder.field("name", name); + builder.field("mode", mode.getName()); + builder.field("updatedAt", updatedAtInMillis); + builder.mapContents(resourceLimits); + builder.endObject(); + builder.endObject(); + return builder; + } + + public static QueryGroup fromXContent(final XContentParser parser) throws IOException { + if (parser.currentToken() == null) { // fresh parser? move to the first token + parser.nextToken(); + } + if (parser.currentToken() == XContentParser.Token.START_OBJECT) { + parser.nextToken(); // move to field name + } + if (parser.currentToken() != XContentParser.Token.FIELD_NAME) { // on a start object move to next token + throw new IllegalArgumentException("Expected FIELD_NAME token but found [" + parser.currentName() + "]"); + } + + Builder builder = builder()._id(parser.currentName()); + + XContentParser.Token token = parser.nextToken(); + + if (token != XContentParser.Token.START_OBJECT) { + throw new IllegalArgumentException("Expected START_OBJECT token but found [" + parser.currentName() + "]"); + } + + String fieldName = ""; + // Map to hold resources + final Map resourceLimits_ = new HashMap<>(); + while ((token = parser.nextToken()) != null) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token.isValue()) { + if (fieldName.equals("name")) { + builder.name(parser.text()); + } else if (fieldName.equals("mode")) { + builder.mode(parser.text()); + } else if (fieldName.equals("updatedAt")) { + builder.updatedAt(parser.longValue()); + } else if (ALLOWED_RESOURCES.contains(fieldName)) { + resourceLimits_.put(fieldName, parser.doubleValue()); + } else { + throw new IllegalArgumentException("unrecognised [field=" + fieldName + " in QueryGroup"); + } + } + } + builder.resourceLimits(resourceLimits_); + return builder.build(); + } + + public static Diff readDiff(final StreamInput in) throws IOException { + return readDiffFrom(QueryGroup::new, in); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + QueryGroup that = (QueryGroup) o; + return Objects.equals(name, that.name) && Objects.equals(resourceLimits, that.resourceLimits); + } + + @Override + public int hashCode() { + return Objects.hash(name, resourceLimits); + } + + public String getName() { + return name; + } + + public QueryGroupMode getMode() { + return mode; + } + + public Map getResourceLimits() { + return resourceLimits; + } + + public String get_id() { + return _id; + } + + public long getUpdatedAtInMillis() { + return updatedAtInMillis; + } + + /** + * builder method for this {@link QueryGroup} + * @return + */ + public static Builder builder() { + return new Builder(); + } + + /** + * This enum models the different sandbox modes + */ + @ExperimentalApi + public enum QueryGroupMode { + SOFT("soft"), + ENFORCED("enforced"), + MONITOR("monitor"); + + private final String name; + + QueryGroupMode(String mode) { + this.name = mode; + } + + public String getName() { + return name; + } + + public static QueryGroupMode fromName(String s) { + for (QueryGroupMode mode : values()) { + if (mode.getName().equalsIgnoreCase(s)) return mode; + + } + throw new IllegalArgumentException("Invalid value for QueryGroupMode: " + s); + } + + } + + /** + * Builder class for {@link QueryGroup} + */ + @ExperimentalApi + public static class Builder { + private String name; + private String _id; + private QueryGroupMode mode; + private long updatedAt; + private Map resourceLimits; + + private Builder() {} + + public Builder name(String name) { + this.name = name; + return this; + } + + public Builder _id(String _id) { + this._id = _id; + return this; + } + + public Builder mode(String mode) { + this.mode = QueryGroupMode.fromName(mode); + return this; + } + + public Builder updatedAt(long updatedAt) { + this.updatedAt = updatedAt; + return this; + } + + public Builder resourceLimits(Map resourceLimits) { + this.resourceLimits = resourceLimits; + return this; + } + + public QueryGroup build() { + return new QueryGroup(name, _id, mode, resourceLimits, updatedAt); + } + + } +} diff --git a/server/src/main/java/org/opensearch/cluster/metadata/QueryGroupMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/QueryGroupMetadata.java new file mode 100644 index 0000000000000..073432e6aa15a --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/metadata/QueryGroupMetadata.java @@ -0,0 +1,218 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.metadata; + +import org.opensearch.Version; +import org.opensearch.cluster.Diff; +import org.opensearch.cluster.DiffableUtils; +import org.opensearch.cluster.NamedDiff; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.ParseField; +import org.opensearch.core.common.Strings; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ConstructingObjectParser; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.opensearch.cluster.metadata.Metadata.ALL_CONTEXTS; + +/** + * This class holds the QueryGroupMetadata + * sample schema + * { + * "queryGroups": { + * "_id": { + * {@link QueryGroup} + * }, + * ... + * } + * } + */ +@ExperimentalApi +public class QueryGroupMetadata implements Metadata.Custom { + public static final String TYPE = "queryGroup"; + private static final ParseField QUERY_GROUP_FIELD = new ParseField("queryGroups"); + + @SuppressWarnings("unchecked") + static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "queryGroupParser", + args -> new QueryGroupMetadata((Set) args[0]) + ); + + static { + PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> QueryGroup.fromXContent(p), QUERY_GROUP_FIELD); + } + + private final Set queryGroups; + + public QueryGroupMetadata(Set queryGroups) { + this.queryGroups = queryGroups; + } + + public QueryGroupMetadata(StreamInput in) throws IOException { + this.queryGroups = in.readSet(QueryGroup::new); + } + + public Set queryGroups() { + return this.queryGroups; + } + + /** + * Returns the name of the writeable object + */ + @Override + public String getWriteableName() { + return TYPE; + } + + /** + * The minimal version of the recipient this object can be sent to + */ + @Override + public Version getMinimalSupportedVersion() { + return Version.V_3_0_0; + } + + /** + * Write this into the {@linkplain StreamOutput}. + * + * @param out + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeCollection(queryGroups); + } + + /** + * @param builder + * @param params + * @return + * @throws IOException + */ + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(QUERY_GROUP_FIELD.getPreferredName(), queryGroups); + return builder; + } + + public static QueryGroupMetadata fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + /** + * Returns serializable object representing differences between this and previousState + * + * @param previousState + */ + @Override + public Diff diff(final Metadata.Custom previousState) { + return new QueryGroupMetadataDiff((QueryGroupMetadata) previousState, this); + } + + /** + * @param in + * @return the metadata diff for {@link QueryGroupMetadata} objects + * @throws IOException + */ + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return new QueryGroupMetadataDiff(in); + } + + @Override + public EnumSet context() { + return ALL_CONTEXTS; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + QueryGroupMetadata that = (QueryGroupMetadata) o; + return Objects.equals(queryGroups, that.queryGroups); + } + + @Override + public int hashCode() { + return Objects.hash(queryGroups); + } + + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this); + } + + /** + * QueryGroupMetadataDiff + */ + static class QueryGroupMetadataDiff implements NamedDiff { + final Diff> dataStreamDiff; + + QueryGroupMetadataDiff(final QueryGroupMetadata before, final QueryGroupMetadata after) { + dataStreamDiff = DiffableUtils.diff( + toMap(before.queryGroups), + toMap(after.queryGroups), + DiffableUtils.getStringKeySerializer() + ); + } + + QueryGroupMetadataDiff(final StreamInput in) throws IOException { + this.dataStreamDiff = DiffableUtils.readJdkMapDiff( + in, + DiffableUtils.getStringKeySerializer(), + QueryGroup::new, + QueryGroup::readDiff + ); + } + + private Map toMap(Set queryGroups) { + final Map queryGroupMap = new HashMap<>(); + + queryGroups.forEach(queryGroup -> queryGroupMap.put(queryGroup.getName(), queryGroup)); + return queryGroupMap; + } + + /** + * Returns the name of the writeable object + */ + @Override + public String getWriteableName() { + return TYPE; + } + + /** + * Write this into the {@linkplain StreamOutput}. + * + * @param out + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + dataStreamDiff.writeTo(out); + } + + /** + * Applies difference to the specified part and returns the resulted part + * + * @param part + */ + @Override + public Metadata.Custom apply(Metadata.Custom part) { + return new QueryGroupMetadata(new HashSet<>(dataStreamDiff.apply(toMap(((QueryGroupMetadata) part).queryGroups)).values())); + } + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 7ea04acf00415..9007e2272ccec 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -158,6 +158,7 @@ import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; import org.opensearch.search.backpressure.settings.SearchTaskSettings; import org.opensearch.search.fetch.subphase.highlight.FastVectorHighlighter; +import org.opensearch.search.query_group.QueryGroupServiceSettings; import org.opensearch.snapshots.InternalSnapshotsInfoService; import org.opensearch.snapshots.SnapshotsService; import org.opensearch.tasks.TaskCancellationMonitoringSettings; @@ -327,6 +328,7 @@ public void apply(Settings value, Settings current, Settings previous) { DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING, + QueryGroupServiceSettings.MAX_QUERY_GROUP_COUNT, ShardStateAction.FOLLOW_UP_REROUTE_PRIORITY_SETTING, InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING, InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING, @@ -755,7 +757,12 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS, RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA, - SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING + SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING, + + // QueryGroup settings + QueryGroupServiceSettings.MAX_QUERY_GROUP_COUNT, + QueryGroupServiceSettings.NODE_LEVEL_REJECTION_THRESHOLD, + QueryGroupServiceSettings.NODE_LEVEL_CANCELLATION_THRESHOLD ) ) ); diff --git a/server/src/main/java/org/opensearch/search/query_group/QueryGroupServiceSettings.java b/server/src/main/java/org/opensearch/search/query_group/QueryGroupServiceSettings.java new file mode 100644 index 0000000000000..db54737b3c458 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/query_group/QueryGroupServiceSettings.java @@ -0,0 +1,198 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.query_group; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; + +/** + * Main class to declare the QueryGroup feature related settings + */ +public class QueryGroupServiceSettings { + private static final Long DEFAULT_RUN_INTERVAL_MILLIS = 1000l; + private static final Double DEFAULT_NODE_LEVEL_REJECTION_THRESHOLD = 0.8; + private static final Double DEFAULT_NODE_LEVEL_CANCELLATION_THRESHOLD = 0.9; + /** + * default max queryGroup count on any node at any given point in time + */ + public static final int DEFAULT_MAX_QUERY_GROUP_COUNT_VALUE = 100; + + public static final String QUERY_GROUP_COUNT_SETTING_NAME = "node.query_group.max_count"; + public static final double NODE_LEVEL_CANCELLATION_THRESHOLD_MAX_VALUE = 0.95; + public static final double NODE_LEVEL_REJECTION_THRESHOLD_MAX_VALUE = 0.90; + + private TimeValue runIntervalMillis; + private Double nodeLevelJvmCancellationThreshold; + private Double nodeLevelJvmRejectionThreshold; + private volatile int maxQueryGroupCount; + /** + * max QueryGroup count setting + */ + public static final Setting MAX_QUERY_GROUP_COUNT = Setting.intSetting( + QUERY_GROUP_COUNT_SETTING_NAME, + DEFAULT_MAX_QUERY_GROUP_COUNT_VALUE, + 0, + (newVal) -> { + if (newVal > 100 || newVal < 1) throw new IllegalArgumentException( + QUERY_GROUP_COUNT_SETTING_NAME + " should be in range [1-100]" + ); + }, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + /** + * Setting name for default QueryGroup count + */ + public static final String SERVICE_RUN_INTERVAL_MILLIS_SETTING_NAME = "query_group.service.run_interval_millis"; + /** + * Setting to control the run interval of QSB service + */ + private static final Setting QUERY_GROUP_RUN_INTERVAL_SETTING = Setting.longSetting( + SERVICE_RUN_INTERVAL_MILLIS_SETTING_NAME, + DEFAULT_RUN_INTERVAL_MILLIS, + 1, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Setting name for node level rejection threshold for QSB + */ + public static final String NODE_REJECTION_THRESHOLD_SETTING_NAME = "query_group.node.rejection_threshold"; + /** + * Setting to control the rejection threshold + */ + public static final Setting NODE_LEVEL_REJECTION_THRESHOLD = Setting.doubleSetting( + NODE_REJECTION_THRESHOLD_SETTING_NAME, + DEFAULT_NODE_LEVEL_REJECTION_THRESHOLD, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + /** + * Setting name for node level cancellation threshold + */ + public static final String NODE_CANCELLATION_THRESHOLD_SETTING_NAME = "query_group.node.cancellation_threshold"; + /** + * Setting name for node level cancellation threshold + */ + public static final Setting NODE_LEVEL_CANCELLATION_THRESHOLD = Setting.doubleSetting( + NODE_CANCELLATION_THRESHOLD_SETTING_NAME, + DEFAULT_NODE_LEVEL_CANCELLATION_THRESHOLD, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * QueryGroup service settings constructor + * @param settings + * @param clusterSettings + */ + public QueryGroupServiceSettings(Settings settings, ClusterSettings clusterSettings) { + runIntervalMillis = new TimeValue(QUERY_GROUP_RUN_INTERVAL_SETTING.get(settings)); + nodeLevelJvmCancellationThreshold = NODE_LEVEL_CANCELLATION_THRESHOLD.get(settings); + nodeLevelJvmRejectionThreshold = NODE_LEVEL_REJECTION_THRESHOLD.get(settings); + maxQueryGroupCount = MAX_QUERY_GROUP_COUNT.get(settings); + + ensureRejectionThresholdIsLessThanCancellation(nodeLevelJvmRejectionThreshold, nodeLevelJvmCancellationThreshold); + + clusterSettings.addSettingsUpdateConsumer(MAX_QUERY_GROUP_COUNT, this::setMaxQueryGroupCount); + clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_CANCELLATION_THRESHOLD, this::setNodeLevelJvmCancellationThreshold); + clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_REJECTION_THRESHOLD, this::setNodeLevelJvmRejectionThreshold); + } + + /** + * Method to get runInterval for QSB + * @return runInterval in milliseconds for QSB Service + */ + public TimeValue getRunIntervalMillis() { + return runIntervalMillis; + } + + /** + * Method to set the new QueryGroup count + * @param newMaxQueryGroupCount is the new maxQueryGroupCount per node + */ + public void setMaxQueryGroupCount(int newMaxQueryGroupCount) { + if (newMaxQueryGroupCount < 0) { + throw new IllegalArgumentException("node.QueryGroup.max_count can't be negative"); + } + this.maxQueryGroupCount = newMaxQueryGroupCount; + } + + /** + * Method to get the node level cancellation threshold + * @return current node level cancellation threshold + */ + public Double getNodeLevelJvmCancellationThreshold() { + return nodeLevelJvmCancellationThreshold; + } + + /** + * Method to set the node level cancellation threshold + * @param nodeLevelJvmCancellationThreshold sets the new node level cancellation threshold + * @throws IllegalArgumentException if the value is > 0.95 and cancellation < rejection threshold + */ + public void setNodeLevelJvmCancellationThreshold(Double nodeLevelJvmCancellationThreshold) { + if (Double.compare(nodeLevelJvmCancellationThreshold, NODE_LEVEL_CANCELLATION_THRESHOLD_MAX_VALUE) > 0) { + throw new IllegalArgumentException( + NODE_CANCELLATION_THRESHOLD_SETTING_NAME + " value should not be greater than 0.95 as it pose a threat of node drop" + ); + } + + ensureRejectionThresholdIsLessThanCancellation(nodeLevelJvmRejectionThreshold, nodeLevelJvmCancellationThreshold); + + this.nodeLevelJvmCancellationThreshold = nodeLevelJvmCancellationThreshold; + } + + /** + * Method to get the node level rejection threshold + * @return the current node level rejection threshold + */ + public Double getNodeLevelJvmRejectionThreshold() { + return nodeLevelJvmRejectionThreshold; + } + + /** + * Method to set the node level rejection threshold + * @param nodeLevelJvmRejectionThreshold sets the new rejection threshold + * @throws IllegalArgumentException if rejection > 0.90 and rejection < cancellation threshold + */ + public void setNodeLevelJvmRejectionThreshold(Double nodeLevelJvmRejectionThreshold) { + if (Double.compare(nodeLevelJvmRejectionThreshold, NODE_LEVEL_REJECTION_THRESHOLD_MAX_VALUE) > 0) { + throw new IllegalArgumentException( + NODE_REJECTION_THRESHOLD_SETTING_NAME + " value not be greater than 0.90 as it pose a threat of node drop" + ); + } + + ensureRejectionThresholdIsLessThanCancellation(nodeLevelJvmRejectionThreshold, nodeLevelJvmCancellationThreshold); + + this.nodeLevelJvmRejectionThreshold = nodeLevelJvmRejectionThreshold; + } + + private void ensureRejectionThresholdIsLessThanCancellation( + Double nodeLevelJvmRejectionThreshold, + Double nodeLevelJvmCancellationThreshold + ) { + if (Double.compare(nodeLevelJvmCancellationThreshold, nodeLevelJvmRejectionThreshold) < 0) { + throw new IllegalArgumentException( + NODE_CANCELLATION_THRESHOLD_SETTING_NAME + " value should not be less than " + NODE_REJECTION_THRESHOLD_SETTING_NAME + ); + } + } + + /** + * Method to get the current QueryGroup count + * @return the current max QueryGroup count + */ + public int getMaxQueryGroupCount() { + return maxQueryGroupCount; + } +} diff --git a/server/src/main/java/org/opensearch/search/query_group/package-info.java b/server/src/main/java/org/opensearch/search/query_group/package-info.java new file mode 100644 index 0000000000000..e7b8443799e83 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/query_group/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Query Sandboxing related artifacts + */ +package org.opensearch.search.query_group; diff --git a/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupMetadataTests.java b/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupMetadataTests.java new file mode 100644 index 0000000000000..18cdeb114a012 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupMetadataTests.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.metadata; + +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.test.AbstractNamedWriteableTestCase; + +import java.util.Collections; +import java.util.Set; + +import static org.opensearch.cluster.metadata.QueryGroupTests.createRandomQueryGroup; + +public class QueryGroupMetadataTests extends AbstractNamedWriteableTestCase { + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry( + Collections.singletonList( + new NamedWriteableRegistry.Entry(QueryGroupMetadata.class, QueryGroupMetadata.TYPE, QueryGroupMetadata::new) + ) + ); + } + + @Override + protected Class categoryClass() { + return QueryGroupMetadata.class; + } + + @Override + protected QueryGroupMetadata createTestInstance() { + return new QueryGroupMetadata(getRandomQueryGroups()); + } + + private Set getRandomQueryGroups() { + return Set.of(createRandomQueryGroup(), createRandomQueryGroup()); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupTests.java b/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupTests.java new file mode 100644 index 0000000000000..1d900b3c02d27 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupTests.java @@ -0,0 +1,144 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.metadata; + +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.test.AbstractSerializingTestCase; +import org.joda.time.Instant; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class QueryGroupTests extends AbstractSerializingTestCase { + + private static final List allowedModes = List.of( + QueryGroup.QueryGroupMode.SOFT, + QueryGroup.QueryGroupMode.ENFORCED, + QueryGroup.QueryGroupMode.MONITOR + ); + + static QueryGroup createRandomQueryGroup() { + String name = randomAlphaOfLength(10); + Map resourceLimit = new HashMap<>(); + resourceLimit.put("jvm", randomDoubleBetween(0.0, 0.80, false)); + return new QueryGroup(name, "random", randomMode(), resourceLimit, Instant.now().getMillis()); + } + + private static QueryGroup.QueryGroupMode randomMode() { + return allowedModes.get(randomIntBetween(0, allowedModes.size() - 1)); + } + + /** + * Parses to a new instance using the provided {@link XContentParser} + * + * @param parser + */ + @Override + protected QueryGroup doParseInstance(XContentParser parser) throws IOException { + return QueryGroup.fromXContent(parser); + } + + /** + * Returns a {@link Writeable.Reader} that can be used to de-serialize the instance + */ + @Override + protected Writeable.Reader instanceReader() { + return QueryGroup::new; + } + + /** + * Creates a random test instance to use in the tests. This method will be + * called multiple times during test execution and should return a different + * random instance each time it is called. + */ + @Override + protected QueryGroup createTestInstance() { + return createRandomQueryGroup(); + } + + public void testNullName() { + assertThrows( + NullPointerException.class, + () -> new QueryGroup(null, "_id", randomMode(), Collections.emptyMap(), Instant.now().getMillis()) + ); + } + + public void testNullId() { + assertThrows( + NullPointerException.class, + () -> new QueryGroup("Dummy", null, randomMode(), Collections.emptyMap(), Instant.now().getMillis()) + ); + } + + public void testNullResourceLimits() { + assertThrows(NullPointerException.class, () -> new QueryGroup("analytics", "_id", randomMode(), null, Instant.now().getMillis())); + } + + public void testEmptyResourceLimits() { + assertThrows( + IllegalArgumentException.class, + () -> new QueryGroup("analytics", "_id", randomMode(), Collections.emptyMap(), Instant.now().getMillis()) + ); + } + + public void testIllegalQueryGroupMode() { + assertThrows( + NullPointerException.class, + () -> new QueryGroup("analytics", "_id", null, Map.of("jvm", (Object) 0.4), Instant.now().getMillis()) + ); + } + + public void testInvalidResourceLimitWhenInvalidSystemResourceNameIsGiven() { + assertThrows( + IllegalArgumentException.class, + () -> new QueryGroup( + "analytics", + "_id", + randomMode(), + Map.of("RequestRate", (Object) randomDoubleBetween(0.01, 0.8, false)), + Instant.now().getMillis() + ) + ); + } + + public void testInvalidResourceLimitWhenInvalidSystemResourceValueIsGiven() { + assertThrows( + IllegalArgumentException.class, + () -> new QueryGroup( + "analytics", + "_id", + randomMode(), + Map.of("RequestRate", (Object) randomDoubleBetween(1.1, 1.8, false)), + Instant.now().getMillis() + ) + ); + } + + public void testValidQueryGroup() { + QueryGroup queryGroup = new QueryGroup( + "analytics", + "_id", + randomMode(), + Map.of("jvm", randomDoubleBetween(0.01, 0.8, false)), + Instant.ofEpochMilli(1717187289).getMillis() + ); + + assertNotNull(queryGroup.getName()); + assertEquals("analytics", queryGroup.getName()); + assertNotNull(queryGroup.getResourceLimits()); + assertFalse(queryGroup.getResourceLimits().isEmpty()); + assertEquals(1, queryGroup.getResourceLimits().size()); + assertTrue(allowedModes.contains(queryGroup.getMode())); + assertEquals(1717187289, queryGroup.getUpdatedAtInMillis()); + } +}