From 31256f3c584fbd21c99fe589cb485749c83ad775 Mon Sep 17 00:00:00 2001 From: Neetika Singhal Date: Thu, 16 May 2024 16:12:31 -0700 Subject: [PATCH] Add rest, transport layer changes for Hot to warm tiering - dedicated setup Signed-off-by: Neetika Singhal --- CHANGELOG.md | 1 + .../org/opensearch/action/ActionModule.java | 9 + .../tiering/HotToWarmTieringAction.java | 28 ++ .../tiering/HotToWarmTieringResponse.java | 157 +++++++++ .../tiering/RestWarmTieringAction.java | 61 ++++ .../indices/tiering/TieringIndexRequest.java | 195 +++++++++++ .../tiering/TieringValidationResult.java | 83 +++++ .../TransportHotToWarmTieringAction.java | 110 ++++++ .../admin/indices/tiering/package-info.java | 36 ++ .../common/settings/IndexScopedSettings.java | 2 +- .../org/opensearch/index/IndexModule.java | 20 ++ .../tiering/TieringRequestValidator.java | 277 +++++++++++++++ .../indices/tiering/package-info.java | 36 ++ .../HotToWarmTieringResponseTests.java | 101 ++++++ .../tiering/TieringIndexRequestTests.java | 79 +++++ .../TransportHotToWarmTieringActionTests.java | 118 +++++++ .../tiering/TieringRequestValidatorTests.java | 318 ++++++++++++++++++ 17 files changed, 1630 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/tiering/HotToWarmTieringAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/tiering/HotToWarmTieringResponse.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/tiering/RestWarmTieringAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringIndexRequest.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringValidationResult.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/tiering/package-info.java create mode 100644 server/src/main/java/org/opensearch/indices/tiering/TieringRequestValidator.java create mode 100644 server/src/main/java/org/opensearch/indices/tiering/package-info.java create mode 100644 server/src/test/java/org/opensearch/action/admin/indices/tiering/HotToWarmTieringResponseTests.java create mode 100644 server/src/test/java/org/opensearch/action/admin/indices/tiering/TieringIndexRequestTests.java create mode 100644 server/src/test/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringActionTests.java create mode 100644 server/src/test/java/org/opensearch/indices/tiering/TieringRequestValidatorTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 0931ff63c145b..ed5ae39881e13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Reduce logging in DEBUG for MasterService:run ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795)) - Enabling term version check on local state for all ClusterManager Read Transport Actions ([#14273](https://github.com/opensearch-project/OpenSearch/pull/14273)) - Add persian_stem filter (([#14847](https://github.com/opensearch-project/OpenSearch/pull/14847))) +- Add rest, transport layer changes for hot to warm tiering - dedicated setup (([#13980](https://github.com/opensearch-project/OpenSearch/pull/13980)) ### Dependencies - Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442)) diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 16c15f553951c..574b7029a6501 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -216,6 +216,9 @@ import org.opensearch.action.admin.indices.template.put.TransportPutComponentTemplateAction; import org.opensearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; import org.opensearch.action.admin.indices.template.put.TransportPutIndexTemplateAction; +import org.opensearch.action.admin.indices.tiering.HotToWarmTieringAction; +import org.opensearch.action.admin.indices.tiering.RestWarmTieringAction; +import org.opensearch.action.admin.indices.tiering.TransportHotToWarmTieringAction; import org.opensearch.action.admin.indices.upgrade.get.TransportUpgradeStatusAction; import org.opensearch.action.admin.indices.upgrade.get.UpgradeStatusAction; import org.opensearch.action.admin.indices.upgrade.post.TransportUpgradeAction; @@ -634,6 +637,9 @@ public void reg actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class); actions.register(CloneSnapshotAction.INSTANCE, TransportCloneSnapshotAction.class); actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class); + if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) { + actions.register(HotToWarmTieringAction.INSTANCE, TransportHotToWarmTieringAction.class); + } actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class); actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class); @@ -966,6 +972,9 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestNodeAttrsAction()); registerHandler.accept(new RestRepositoriesAction()); registerHandler.accept(new RestSnapshotAction()); + if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) { + registerHandler.accept(new RestWarmTieringAction()); + } registerHandler.accept(new RestTemplatesAction()); // Point in time API diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/HotToWarmTieringAction.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/HotToWarmTieringAction.java new file mode 100644 index 0000000000000..ae34a9a734221 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/HotToWarmTieringAction.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.tiering; + +import org.opensearch.action.ActionType; +import org.opensearch.common.annotation.ExperimentalApi; + +/** + * Tiering action to move indices from hot to warm + * + * @opensearch.experimental + */ +@ExperimentalApi +public class HotToWarmTieringAction extends ActionType { + + public static final HotToWarmTieringAction INSTANCE = new HotToWarmTieringAction(); + public static final String NAME = "indices:admin/tier/hot_to_warm"; + + private HotToWarmTieringAction() { + super(NAME, HotToWarmTieringResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/HotToWarmTieringResponse.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/HotToWarmTieringResponse.java new file mode 100644 index 0000000000000..275decf7a8ea5 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/HotToWarmTieringResponse.java @@ -0,0 +1,157 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.tiering; + +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.common.Strings; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Response object for an {@link TieringIndexRequest} which is sent to client after the initial verification of the request + * by the backend service. The format of the response object will be as below: + * + * { + * "acknowledged": true/false, + * "failed_indices": [ + * { + * "index": "index1", + * "error": "Low disk threshold watermark breached" + * }, + * { + * "index": "index2", + * "error": "Index is not a remote store backed index" + * } + * ] + * } + * + * @opensearch.experimental + */ +@ExperimentalApi +public class HotToWarmTieringResponse extends AcknowledgedResponse { + + private final List failedIndices; + + public HotToWarmTieringResponse(boolean acknowledged) { + super(acknowledged); + this.failedIndices = Collections.emptyList(); + } + + public HotToWarmTieringResponse(boolean acknowledged, List indicesResults) { + super(acknowledged); + this.failedIndices = (indicesResults == null) + ? Collections.emptyList() + : indicesResults.stream().sorted(Comparator.comparing(IndexResult::getIndex)).collect(Collectors.toList()); + } + + public HotToWarmTieringResponse(StreamInput in) throws IOException { + super(in); + failedIndices = Collections.unmodifiableList(in.readList(IndexResult::new)); + } + + public List getFailedIndices() { + return this.failedIndices; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(this.failedIndices); + } + + @Override + protected void addCustomFields(XContentBuilder builder, Params params) throws IOException { + super.addCustomFields(builder, params); + builder.startArray("failed_indices"); + + for (IndexResult failedIndex : failedIndices) { + failedIndex.toXContent(builder, params); + } + builder.endArray(); + } + + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this); + } + + /** + * Inner class to represent the result of a failed index for tiering. + * @opensearch.experimental + */ + @ExperimentalApi + public static class IndexResult implements Writeable, ToXContentFragment { + private final String index; + private final String failureReason; + + public IndexResult(String index, String failureReason) { + this.index = index; + this.failureReason = failureReason; + } + + IndexResult(StreamInput in) throws IOException { + this.index = in.readString(); + this.failureReason = in.readString(); + } + + public String getIndex() { + return index; + } + + public String getFailureReason() { + return failureReason; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(index); + out.writeString(failureReason); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("index", index); + builder.field("error", failureReason); + return builder.endObject(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + IndexResult that = (IndexResult) o; + return Objects.equals(index, that.index) && Objects.equals(failureReason, that.failureReason); + } + + @Override + public int hashCode() { + int result = Objects.hashCode(index); + result = 31 * result + Objects.hashCode(failureReason); + return result; + } + + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this); + } + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/RestWarmTieringAction.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/RestWarmTieringAction.java new file mode 100644 index 0000000000000..6f2eceafa9e77 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/RestWarmTieringAction.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.tiering; + +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.opensearch.core.common.Strings.splitStringByCommaToArray; +import static org.opensearch.rest.RestRequest.Method.POST; + +/** + * Rest Tiering API action to move indices to warm tier + * + * @opensearch.experimental + */ +@ExperimentalApi +public class RestWarmTieringAction extends BaseRestHandler { + + private static final String TARGET_TIER = "warm"; + + @Override + public List routes() { + return singletonList(new RestHandler.Route(POST, "/{index}/_tier/" + TARGET_TIER)); + } + + @Override + public String getName() { + return "warm_tiering_action"; + } + + @Override + protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + final TieringIndexRequest tieringIndexRequest = new TieringIndexRequest( + TARGET_TIER, + splitStringByCommaToArray(request.param("index")) + ); + tieringIndexRequest.timeout(request.paramAsTime("timeout", tieringIndexRequest.timeout())); + tieringIndexRequest.clusterManagerNodeTimeout( + request.paramAsTime("cluster_manager_timeout", tieringIndexRequest.clusterManagerNodeTimeout()) + ); + tieringIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, tieringIndexRequest.indicesOptions())); + tieringIndexRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", tieringIndexRequest.waitForCompletion())); + return channel -> client.admin() + .cluster() + .execute(HotToWarmTieringAction.INSTANCE, tieringIndexRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringIndexRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringIndexRequest.java new file mode 100644 index 0000000000000..ed458a47ddb7d --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringIndexRequest.java @@ -0,0 +1,195 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.tiering; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.IndicesRequest; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.support.master.AcknowledgedRequest; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Locale; +import java.util.Objects; + +import static org.opensearch.action.ValidateActions.addValidationError; + +/** + * Represents the tiering request for indices to move to a different tier + * + * @opensearch.experimental + */ +@ExperimentalApi +public class TieringIndexRequest extends AcknowledgedRequest implements IndicesRequest.Replaceable { + + private String[] indices; + private final Tier targetTier; + private IndicesOptions indicesOptions; + private boolean waitForCompletion; + + public TieringIndexRequest(String targetTier, String... indices) { + this.targetTier = Tier.fromString(targetTier); + this.indices = indices; + this.indicesOptions = IndicesOptions.fromOptions(false, false, true, false); + this.waitForCompletion = false; + } + + public TieringIndexRequest(StreamInput in) throws IOException { + super(in); + indices = in.readStringArray(); + targetTier = Tier.fromString(in.readString()); + indicesOptions = IndicesOptions.readIndicesOptions(in); + waitForCompletion = in.readBoolean(); + } + + // pkg private for testing + TieringIndexRequest(Tier targetTier, IndicesOptions indicesOptions, boolean waitForCompletion, String... indices) { + this.indices = indices; + this.targetTier = targetTier; + this.indicesOptions = indicesOptions; + this.waitForCompletion = waitForCompletion; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (indices == null) { + validationException = addValidationError("Mandatory parameter - indices is missing from the request", validationException); + } else { + for (String index : indices) { + if (index == null || index.length() == 0) { + validationException = addValidationError( + String.format(Locale.ROOT, "Specified index in the request [%s] is null or empty", index), + validationException + ); + } + } + } + if (!Tier.WARM.equals(targetTier)) { + validationException = addValidationError("The specified tier is not supported", validationException); + } + return validationException; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(indices); + out.writeString(targetTier.value()); + indicesOptions.writeIndicesOptions(out); + out.writeBoolean(waitForCompletion); + } + + @Override + public String[] indices() { + return indices; + } + + @Override + public IndicesOptions indicesOptions() { + return indicesOptions; + } + + @Override + public boolean includeDataStreams() { + return true; + } + + @Override + public TieringIndexRequest indices(String... indices) { + this.indices = indices; + return this; + } + + public TieringIndexRequest indicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + + /** + * If this parameter is set to true the operation will wait for completion of tiering process before returning. + * + * @param waitForCompletion if true the operation will wait for completion + * @return this request + */ + public TieringIndexRequest waitForCompletion(boolean waitForCompletion) { + this.waitForCompletion = waitForCompletion; + return this; + } + + /** + * Returns wait for completion setting + * + * @return true if the operation will wait for completion + */ + public boolean waitForCompletion() { + return waitForCompletion; + } + + public Tier tier() { + return targetTier; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TieringIndexRequest that = (TieringIndexRequest) o; + return clusterManagerNodeTimeout.equals(that.clusterManagerNodeTimeout) + && timeout.equals(that.timeout) + && Objects.equals(indicesOptions, that.indicesOptions) + && Arrays.equals(indices, that.indices) + && targetTier.equals(that.targetTier) + && waitForCompletion == that.waitForCompletion; + } + + @Override + public int hashCode() { + return Objects.hash(clusterManagerNodeTimeout, timeout, indicesOptions, waitForCompletion, Arrays.hashCode(indices)); + } + + /** + * Represents the supported tiers for an index + * + * @opensearch.experimental + */ + @ExperimentalApi + public enum Tier { + HOT, + WARM; + + public static Tier fromString(String name) { + if (name == null) { + throw new IllegalArgumentException("Tiering type cannot be null"); + } + String upperCase = name.trim().toUpperCase(Locale.ROOT); + switch (upperCase) { + case "HOT": + return HOT; + case "WARM": + return WARM; + default: + throw new IllegalArgumentException( + "Tiering type [" + name + "] is not supported. Supported types are " + HOT + " and " + WARM + ); + } + } + + public String value() { + return name().toLowerCase(Locale.ROOT); + } + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringValidationResult.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringValidationResult.java new file mode 100644 index 0000000000000..ccd60daf027ce --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringValidationResult.java @@ -0,0 +1,83 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.tiering; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.index.Index; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Validation result for tiering + * + * @opensearch.experimental + */ + +@ExperimentalApi +public class TieringValidationResult { + private final Set acceptedIndices; + private final Map rejectedIndices; + + public TieringValidationResult(Set concreteIndices) { + // by default all the indices are added to the accepted set + this.acceptedIndices = ConcurrentHashMap.newKeySet(); + acceptedIndices.addAll(concreteIndices); + this.rejectedIndices = new HashMap<>(); + } + + public Set getAcceptedIndices() { + return acceptedIndices; + } + + public Map getRejectedIndices() { + return rejectedIndices; + } + + public void addToRejected(Index index, String reason) { + acceptedIndices.remove(index); + rejectedIndices.put(index, reason); + } + + public HotToWarmTieringResponse constructResponse() { + final List indicesResult = new LinkedList<>(); + for (Map.Entry rejectedIndex : rejectedIndices.entrySet()) { + indicesResult.add(new HotToWarmTieringResponse.IndexResult(rejectedIndex.getKey().getName(), rejectedIndex.getValue())); + } + return new HotToWarmTieringResponse(acceptedIndices.size() > 0, indicesResult); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TieringValidationResult that = (TieringValidationResult) o; + + if (!Objects.equals(acceptedIndices, that.acceptedIndices)) return false; + return Objects.equals(rejectedIndices, that.rejectedIndices); + } + + @Override + public int hashCode() { + int result = acceptedIndices != null ? acceptedIndices.hashCode() : 0; + result = 31 * result + (rejectedIndices != null ? rejectedIndices.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "TieringValidationResult{" + "acceptedIndices=" + acceptedIndices + ", rejectedIndices=" + rejectedIndices + '}'; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringAction.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringAction.java new file mode 100644 index 0000000000000..8d1ab0bb37cdd --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringAction.java @@ -0,0 +1,110 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.tiering; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; +import org.opensearch.cluster.ClusterInfoService; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.index.Index; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.Set; + +import static org.opensearch.indices.tiering.TieringRequestValidator.validateHotToWarm; + +/** + * Transport Tiering action to move indices from hot to warm + * + * @opensearch.experimental + */ +@ExperimentalApi +public class TransportHotToWarmTieringAction extends TransportClusterManagerNodeAction { + + private static final Logger logger = LogManager.getLogger(TransportHotToWarmTieringAction.class); + private final ClusterInfoService clusterInfoService; + private final DiskThresholdSettings diskThresholdSettings; + + @Inject + public TransportHotToWarmTieringAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + ClusterInfoService clusterInfoService, + Settings settings + ) { + super( + HotToWarmTieringAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + TieringIndexRequest::new, + indexNameExpressionResolver + ); + this.clusterInfoService = clusterInfoService; + this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterService.getClusterSettings()); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected HotToWarmTieringResponse read(StreamInput in) throws IOException { + return new HotToWarmTieringResponse(in); + } + + @Override + protected ClusterBlockException checkBlock(TieringIndexRequest request, ClusterState state) { + return state.blocks() + .indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, indexNameExpressionResolver.concreteIndexNames(state, request)); + } + + @Override + protected void clusterManagerOperation( + TieringIndexRequest request, + ClusterState state, + ActionListener listener + ) throws Exception { + Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request); + if (concreteIndices == null || concreteIndices.length == 0) { + listener.onResponse(new HotToWarmTieringResponse(true)); + return; + } + final TieringValidationResult tieringValidationResult = validateHotToWarm( + state, + Set.of(concreteIndices), + clusterInfoService.getClusterInfo(), + diskThresholdSettings + ); + + if (tieringValidationResult.getAcceptedIndices().isEmpty()) { + listener.onResponse(tieringValidationResult.constructResponse()); + return; + } + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/package-info.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/package-info.java new file mode 100644 index 0000000000000..878e3575a3934 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/package-info.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Actions that OpenSearch can take to tier the indices + */ +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.action.admin.indices.tiering; diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index ca2c4dab6102b..6e7d77d0c00d4 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -273,7 +273,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { */ public static final Map> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of( FeatureFlags.TIERED_REMOTE_INDEX, - List.of(IndexModule.INDEX_STORE_LOCALITY_SETTING) + List.of(IndexModule.INDEX_STORE_LOCALITY_SETTING, IndexModule.INDEX_TIERING_STATE) ); public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS); diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 09b904394ee09..93ff1b78b1ac5 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -48,6 +48,7 @@ import org.opensearch.common.CheckedFunction; import org.opensearch.common.SetOnce; import org.opensearch.common.TriFunction; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.logging.DeprecationLogger; import org.opensearch.common.settings.Setting; @@ -174,6 +175,14 @@ public final class IndexModule { Property.NodeScope ); + public static final Setting INDEX_TIERING_STATE = new Setting<>( + "index.tiering.state", + TieringState.HOT.name(), + Function.identity(), + Property.IndexScope, + Property.PrivateIndex + ); + /** Which lucene file extensions to load with the mmap directory when using hybridfs store. This settings is ignored if {@link #INDEX_STORE_HYBRID_NIO_EXTENSIONS} is set. * This is an expert setting. * @see Lucene File Extensions. @@ -663,6 +672,17 @@ public static Type defaultStoreType(final boolean allowMmap) { } } + /** + * Represents the tiering state of the index. + */ + @ExperimentalApi + public enum TieringState { + HOT, + HOT_TO_WARM, + WARM, + WARM_TO_HOT; + } + public IndexService newIndexService( IndexService.IndexCreationContext indexCreationContext, NodeEnvironment environment, diff --git a/server/src/main/java/org/opensearch/indices/tiering/TieringRequestValidator.java b/server/src/main/java/org/opensearch/indices/tiering/TieringRequestValidator.java new file mode 100644 index 0000000000000..2de50f4d4295d --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/tiering/TieringRequestValidator.java @@ -0,0 +1,277 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.tiering; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.tiering.TieringValidationResult; +import org.opensearch.cluster.ClusterInfo; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiskUsage; +import org.opensearch.cluster.health.ClusterHealthStatus; +import org.opensearch.cluster.health.ClusterIndexHealth; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexModule; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; + +/** + * Validator class to validate the tiering requests of the index + * @opensearch.experimental + */ +public class TieringRequestValidator { + + private static final Logger logger = LogManager.getLogger(TieringRequestValidator.class); + + /** + * Validates the tiering request for indices going from hot to warm tier + * + * @param currentState current cluster state + * @param concreteIndices set of indices to be validated + * @param clusterInfo the current nodes usage info for the cluster + * @param diskThresholdSettings the disk threshold settings of the cluster + * @return result of the validation + */ + public static TieringValidationResult validateHotToWarm( + final ClusterState currentState, + final Set concreteIndices, + final ClusterInfo clusterInfo, + final DiskThresholdSettings diskThresholdSettings + ) { + final String indexNames = concreteIndices.stream().map(Index::getName).collect(Collectors.joining(", ")); + validateSearchNodes(currentState, indexNames); + validateDiskThresholdWaterMarkNotBreached(currentState, clusterInfo, diskThresholdSettings, indexNames); + + final TieringValidationResult tieringValidationResult = new TieringValidationResult(concreteIndices); + + for (Index index : concreteIndices) { + if (!validateHotIndex(currentState, index)) { + tieringValidationResult.addToRejected(index, "index is not in the HOT tier"); + continue; + } + if (!validateRemoteStoreIndex(currentState, index)) { + tieringValidationResult.addToRejected(index, "index is not backed up by the remote store"); + continue; + } + if (!validateOpenIndex(currentState, index)) { + tieringValidationResult.addToRejected(index, "index is closed"); + continue; + } + if (!validateIndexHealth(currentState, index)) { + tieringValidationResult.addToRejected(index, "index is red"); + continue; + } + } + + validateEligibleNodesCapacity(clusterInfo, currentState, tieringValidationResult); + logger.info( + "Successfully accepted indices for tiering are [{}], rejected indices are [{}]", + tieringValidationResult.getAcceptedIndices(), + tieringValidationResult.getRejectedIndices() + ); + + return tieringValidationResult; + } + + /** + * Validates that there are eligible nodes with the search role in the current cluster state. + * (only for the dedicated case - to be removed later) + * + * @param currentState the current cluster state + * @param indexNames the names of the indices being validated + * @throws IllegalArgumentException if there are no eligible search nodes in the cluster + */ + static void validateSearchNodes(final ClusterState currentState, final String indexNames) { + if (getEligibleNodes(currentState).isEmpty()) { + final String errorMsg = "Rejecting tiering request for indices [" + + indexNames + + "] because there are no nodes found with the search role"; + logger.warn(errorMsg); + throw new IllegalArgumentException(errorMsg); + } + } + + /** + * Validates that the specified index has the remote store setting enabled. + * + * @param state the current cluster state + * @param index the index to be validated + * @return true if the remote store setting is enabled for the index, false otherwise + */ + static boolean validateRemoteStoreIndex(final ClusterState state, final Index index) { + return IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(state.metadata().getIndexSafe(index).getSettings()); + } + + /** + * Validates that the specified index is in the "hot" tiering state. + * + * @param state the current cluster state + * @param index the index to be validated + * @return true if the index is in the "hot" tiering state, false otherwise + */ + static boolean validateHotIndex(final ClusterState state, final Index index) { + return IndexModule.TieringState.HOT.name().equals(INDEX_TIERING_STATE.get(state.metadata().getIndexSafe(index).getSettings())); + } + + /** + * Validates the health of the specified index in the current cluster state. + * + * @param currentState the current cluster state + * @param index the index to be validated + * @return true if the index health is not in the "red" state, false otherwise + */ + static boolean validateIndexHealth(final ClusterState currentState, final Index index) { + final IndexRoutingTable indexRoutingTable = currentState.routingTable().index(index); + final IndexMetadata indexMetadata = currentState.metadata().index(index); + final ClusterIndexHealth indexHealth = new ClusterIndexHealth(indexMetadata, indexRoutingTable); + return !ClusterHealthStatus.RED.equals(indexHealth.getStatus()); + } + + /** + * Validates that the specified index is in the open state in the current cluster state. + * + * @param currentState the current cluster state + * @param index the index to be validated + * @return true if the index is in the open state, false otherwise + */ + static boolean validateOpenIndex(final ClusterState currentState, final Index index) { + return currentState.metadata().index(index).getState() == IndexMetadata.State.OPEN; + } + + /** + * Validates that the disk threshold low watermark is not breached on all the eligible nodes in the cluster. + * + * @param currentState the current cluster state + * @param clusterInfo the current nodes usage info for the cluster + * @param diskThresholdSettings the disk threshold settings of the cluster + * @param indexNames the names of the indices being validated + * @throws IllegalArgumentException if the disk threshold low watermark is breached on all eligible nodes + */ + static void validateDiskThresholdWaterMarkNotBreached( + final ClusterState currentState, + final ClusterInfo clusterInfo, + final DiskThresholdSettings diskThresholdSettings, + final String indexNames + ) { + final Map usages = clusterInfo.getNodeLeastAvailableDiskUsages(); + if (usages == null) { + logger.trace("skipping monitor as no disk usage information is available"); + return; + } + final Set nodeIds = getEligibleNodes(currentState).stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); + for (String node : nodeIds) { + final DiskUsage nodeUsage = usages.get(node); + if (nodeUsage != null && nodeUsage.getFreeBytes() > diskThresholdSettings.getFreeBytesThresholdLow().getBytes()) { + return; + } + } + throw new IllegalArgumentException( + "Disk threshold low watermark is breached on all the search nodes, rejecting tiering request for indices: " + indexNames + ); + } + + /** + * Validates the capacity of eligible nodes in the cluster to accommodate the specified indices + * and adds the rejected indices to tieringValidationResult + * + * @param clusterInfo the current nodes usage info for the cluster + * @param currentState the current cluster state + * @param tieringValidationResult contains the indices to validate + */ + static void validateEligibleNodesCapacity( + final ClusterInfo clusterInfo, + final ClusterState currentState, + final TieringValidationResult tieringValidationResult + ) { + + final Set eligibleNodeIds = getEligibleNodes(currentState).stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); + long totalAvailableBytesInWarmTier = getTotalAvailableBytesInWarmTier( + clusterInfo.getNodeLeastAvailableDiskUsages(), + eligibleNodeIds + ); + + Map indexSizes = new HashMap<>(); + for (Index index : tieringValidationResult.getAcceptedIndices()) { + indexSizes.put(index, getIndexPrimaryStoreSize(currentState, clusterInfo, index.getName())); + } + + if (indexSizes.values().stream().mapToLong(Long::longValue).sum() < totalAvailableBytesInWarmTier) { + return; + } + HashMap sortedIndexSizes = indexSizes.entrySet() + .stream() + .sorted(Map.Entry.comparingByValue()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, HashMap::new)); + + long requestIndexBytes = 0L; + for (Index index : sortedIndexSizes.keySet()) { + requestIndexBytes += sortedIndexSizes.get(index); + if (requestIndexBytes >= totalAvailableBytesInWarmTier) { + tieringValidationResult.addToRejected(index, "insufficient node capacity"); + } + } + } + + /** + * Calculates the total size of the specified index in the cluster. + * Note: This function only accounts for the primary shard size. + * + * @param clusterState the current state of the cluster + * @param clusterInfo the current nodes usage info for the cluster + * @param index the name of the index for which the total size is to be calculated + * @return the total size of the specified index in the cluster + */ + static long getIndexPrimaryStoreSize(ClusterState clusterState, ClusterInfo clusterInfo, String index) { + long totalIndexSize = 0; + List shardRoutings = clusterState.routingTable().allShards(index); + for (ShardRouting shardRouting : shardRoutings) { + if (shardRouting.primary()) { + totalIndexSize += clusterInfo.getShardSize(shardRouting, 0); + } + } + return totalIndexSize; + } + + /** + * Calculates the total available bytes in the warm tier of the cluster. + * + * @param usages the current disk usage of the cluster + * @param nodeIds the set of warm nodes ids in the cluster + * @return the total available bytes in the warm tier + */ + static long getTotalAvailableBytesInWarmTier(final Map usages, final Set nodeIds) { + long totalAvailableBytes = 0; + for (String node : nodeIds) { + totalAvailableBytes += usages.get(node).getFreeBytes(); + } + return totalAvailableBytes; + } + + /** + * Retrieves the set of eligible(search) nodes from the current cluster state. + * + * @param currentState the current cluster state + * @return the set of eligible nodes + */ + static Set getEligibleNodes(final ClusterState currentState) { + final Map nodes = currentState.getNodes().getDataNodes(); + return nodes.values().stream().filter(DiscoveryNode::isSearchNode).collect(Collectors.toSet()); + } +} diff --git a/server/src/main/java/org/opensearch/indices/tiering/package-info.java b/server/src/main/java/org/opensearch/indices/tiering/package-info.java new file mode 100644 index 0000000000000..552f87382ea15 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/tiering/package-info.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Validator layer checks that OpenSearch can perform to tier the indices + */ +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.indices.tiering; diff --git a/server/src/test/java/org/opensearch/action/admin/indices/tiering/HotToWarmTieringResponseTests.java b/server/src/test/java/org/opensearch/action/admin/indices/tiering/HotToWarmTieringResponseTests.java new file mode 100644 index 0000000000000..85cabe0fa1491 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/indices/tiering/HotToWarmTieringResponseTests.java @@ -0,0 +1,101 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.tiering; + +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.test.AbstractWireSerializingTestCase; + +import java.util.LinkedList; +import java.util.List; + +public class HotToWarmTieringResponseTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return HotToWarmTieringResponse::new; + } + + @Override + protected HotToWarmTieringResponse createTestInstance() { + return randomHotToWarmTieringResponse(); + } + + @Override + protected void assertEqualInstances(HotToWarmTieringResponse expected, HotToWarmTieringResponse actual) { + assertNotSame(expected, actual); + assertEquals(actual.isAcknowledged(), expected.isAcknowledged()); + + for (int i = 0; i < expected.getFailedIndices().size(); i++) { + HotToWarmTieringResponse.IndexResult expectedIndexResult = expected.getFailedIndices().get(i); + HotToWarmTieringResponse.IndexResult actualIndexResult = actual.getFailedIndices().get(i); + assertNotSame(expectedIndexResult, actualIndexResult); + assertEquals(actualIndexResult.getIndex(), expectedIndexResult.getIndex()); + assertEquals(actualIndexResult.getFailureReason(), expectedIndexResult.getFailureReason()); + } + } + + /** + * Verifies that ToXContent works with any random {@link HotToWarmTieringResponse} object + * @throws Exception - in case of error + */ + public void testToXContentWorksForRandomResponse() throws Exception { + HotToWarmTieringResponse testResponse = randomHotToWarmTieringResponse(); + XContentType xContentType = randomFrom(XContentType.values()); + try (XContentBuilder builder = XContentBuilder.builder(xContentType.xContent())) { + testResponse.toXContent(builder, ToXContent.EMPTY_PARAMS); + } + } + + /** + * Verify the XContent output of the response object + * @throws Exception - in case of error + */ + public void testToXContentOutput() throws Exception { + String[] indices = new String[] { "index2", "index1" }; + String[] errorReasons = new String[] { "reason2", "reason1" }; + List results = new LinkedList<>(); + for (int i = 0; i < indices.length; ++i) { + results.add(new HotToWarmTieringResponse.IndexResult(indices[i], errorReasons[i])); + } + HotToWarmTieringResponse testResponse = new HotToWarmTieringResponse(true, results); + + // generate a corresponding expected xcontent + XContentBuilder content = XContentFactory.jsonBuilder().startObject().field("acknowledged", true).startArray("failed_indices"); + // expected result should be in the sorted order + content.startObject().field("index", "index1").field("error", "reason1").endObject(); + content.startObject().field("index", "index2").field("error", "reason2").endObject(); + content.endArray().endObject(); + assertEquals(content.toString(), testResponse.toString()); + } + + /** + * @return - randomly generated object of type {@link HotToWarmTieringResponse.IndexResult} + */ + private HotToWarmTieringResponse.IndexResult randomIndexResult() { + String indexName = randomAlphaOfLengthBetween(1, 50); + String failureReason = randomAlphaOfLengthBetween(1, 200); + return new HotToWarmTieringResponse.IndexResult(indexName, failureReason); + } + + /** + * @return - randomly generated object of type {@link HotToWarmTieringResponse} + */ + private HotToWarmTieringResponse randomHotToWarmTieringResponse() { + int numIndexResult = randomIntBetween(0, 10); + List indexResults = new LinkedList<>(); + for (int i = 0; i < numIndexResult; ++i) { + indexResults.add(randomIndexResult()); + } + return new HotToWarmTieringResponse(randomBoolean(), indexResults); + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/indices/tiering/TieringIndexRequestTests.java b/server/src/test/java/org/opensearch/action/admin/indices/tiering/TieringIndexRequestTests.java new file mode 100644 index 0000000000000..e33d10268a617 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/indices/tiering/TieringIndexRequestTests.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.tiering; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class TieringIndexRequestTests extends OpenSearchTestCase { + + public void testTieringRequestWithListOfIndices() { + TieringIndexRequest request = new TieringIndexRequest( + TieringIndexRequest.Tier.WARM, + IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()), + false, + "foo", + "bar", + "baz" + ); + ActionRequestValidationException validationException = request.validate(); + assertNull(validationException); + } + + public void testTieringRequestWithIndexPattern() { + TieringIndexRequest request = new TieringIndexRequest(TieringIndexRequest.Tier.WARM.name(), "foo-*"); + ActionRequestValidationException validationException = request.validate(); + assertNull(validationException); + } + + public void testTieringRequestWithNullOrEmptyIndices() { + TieringIndexRequest request = new TieringIndexRequest(TieringIndexRequest.Tier.WARM.name(), null, ""); + ActionRequestValidationException validationException = request.validate(); + assertNotNull(validationException); + } + + public void testTieringRequestWithNotSupportedTier() { + TieringIndexRequest request = new TieringIndexRequest(TieringIndexRequest.Tier.HOT.name(), "test"); + ActionRequestValidationException validationException = request.validate(); + assertNotNull(validationException); + } + + public void testTieringTypeFromString() { + expectThrows(IllegalArgumentException.class, () -> TieringIndexRequest.Tier.fromString("tier")); + expectThrows(IllegalArgumentException.class, () -> TieringIndexRequest.Tier.fromString(null)); + } + + public void testSerDeOfTieringRequest() throws IOException { + TieringIndexRequest request = new TieringIndexRequest(TieringIndexRequest.Tier.WARM.name(), "test"); + try (BytesStreamOutput out = new BytesStreamOutput()) { + request.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + final TieringIndexRequest deserializedRequest = new TieringIndexRequest(in); + assertEquals(request, deserializedRequest); + } + } + } + + public void testTieringRequestEquals() { + final TieringIndexRequest original = new TieringIndexRequest(TieringIndexRequest.Tier.WARM.name(), "test"); + original.indicesOptions(IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())); + final TieringIndexRequest expected = new TieringIndexRequest(TieringIndexRequest.Tier.WARM.name(), original.indices()); + expected.indicesOptions(original.indicesOptions()); + assertThat(expected, equalTo(original)); + assertThat(expected.indices(), equalTo(original.indices())); + assertThat(expected.indicesOptions(), equalTo(original.indicesOptions())); + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringActionTests.java new file mode 100644 index 0000000000000..10273366af804 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringActionTests.java @@ -0,0 +1,118 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.tiering; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.cluster.ClusterInfoService; +import org.opensearch.cluster.MockInternalClusterInfoService; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; +import org.opensearch.monitor.fs.FsInfo; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.After; +import org.junit.Before; + +import java.util.Collection; +import java.util.Collections; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false) +public class TransportHotToWarmTieringActionTests extends OpenSearchIntegTestCase { + protected static final String TEST_IDX_1 = "test-idx-1"; + protected static final String TEST_IDX_2 = "idx-2"; + protected static final String TARGET_TIER = "warm"; + private String[] indices; + + @Override + protected Settings featureFlagSettings() { + Settings.Builder featureSettings = Settings.builder(); + featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true); + return featureSettings.build(); + } + + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(MockInternalClusterInfoService.TestPlugin.class); + } + + @Before + public void setup() { + internalCluster().startClusterManagerOnlyNode(); + internalCluster().ensureAtLeastNumSearchAndDataNodes(1); + long bytes = new ByteSizeValue(1000, ByteSizeUnit.KB).getBytes(); + final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); + clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, bytes, bytes - 1)); + + final int numReplicasIndex = 0; + final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasIndex) + .build(); + + indices = new String[] { TEST_IDX_1, TEST_IDX_2 }; + for (String index : indices) { + assertAcked(client().admin().indices().prepareCreate(index).setSettings(settings).get()); + ensureGreen(index); + } + } + + @After + public void cleanup() { + client().admin().indices().prepareDelete(indices).get(); + } + + MockInternalClusterInfoService getMockInternalClusterInfoService() { + return (MockInternalClusterInfoService) internalCluster().getCurrentClusterManagerNodeInstance(ClusterInfoService.class); + } + + static FsInfo.Path setDiskUsage(FsInfo.Path original, long totalBytes, long freeBytes) { + return new FsInfo.Path(original.getPath(), original.getMount(), totalBytes, freeBytes, freeBytes); + } + + public void testIndexLevelBlocks() { + enableIndexBlock(TEST_IDX_1, SETTING_READ_ONLY_ALLOW_DELETE); + TieringIndexRequest request = new TieringIndexRequest(TARGET_TIER, TEST_IDX_1); + expectThrows(ClusterBlockException.class, () -> client().execute(HotToWarmTieringAction.INSTANCE, request).actionGet()); + } + + public void testIndexNotFound() { + TieringIndexRequest request = new TieringIndexRequest(TARGET_TIER, "foo"); + expectThrows(IndexNotFoundException.class, () -> client().execute(HotToWarmTieringAction.INSTANCE, request).actionGet()); + } + + public void testNoConcreteIndices() { + TieringIndexRequest request = new TieringIndexRequest(TARGET_TIER, "foo"); + request.indicesOptions(IndicesOptions.fromOptions(true, true, true, false)); + HotToWarmTieringResponse response = client().admin().indices().execute(HotToWarmTieringAction.INSTANCE, request).actionGet(); + assertTrue(response.isAcknowledged()); + assertTrue(response.getFailedIndices().isEmpty()); + } + + public void testNoAcceptedIndices() { + TieringIndexRequest request = new TieringIndexRequest(TARGET_TIER, "test-idx-*", "idx-*"); + HotToWarmTieringResponse response = client().admin().indices().execute(HotToWarmTieringAction.INSTANCE, request).actionGet(); + assertFalse(response.isAcknowledged()); + assertEquals(2, response.getFailedIndices().size()); + for (HotToWarmTieringResponse.IndexResult result : response.getFailedIndices()) { + assertEquals("index is not backed up by the remote store", result.getFailureReason()); + } + } +} diff --git a/server/src/test/java/org/opensearch/indices/tiering/TieringRequestValidatorTests.java b/server/src/test/java/org/opensearch/indices/tiering/TieringRequestValidatorTests.java new file mode 100644 index 0000000000000..6b6f74353812b --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/tiering/TieringRequestValidatorTests.java @@ -0,0 +1,318 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.tiering; + +import org.opensearch.Version; +import org.opensearch.action.admin.indices.tiering.TieringValidationResult; +import org.opensearch.cluster.ClusterInfo; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiskUsage; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexModule; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING; +import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING; +import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING; +import static org.opensearch.indices.tiering.TieringRequestValidator.getEligibleNodes; +import static org.opensearch.indices.tiering.TieringRequestValidator.getIndexPrimaryStoreSize; +import static org.opensearch.indices.tiering.TieringRequestValidator.getTotalAvailableBytesInWarmTier; +import static org.opensearch.indices.tiering.TieringRequestValidator.validateDiskThresholdWaterMarkNotBreached; +import static org.opensearch.indices.tiering.TieringRequestValidator.validateEligibleNodesCapacity; +import static org.opensearch.indices.tiering.TieringRequestValidator.validateHotIndex; +import static org.opensearch.indices.tiering.TieringRequestValidator.validateIndexHealth; +import static org.opensearch.indices.tiering.TieringRequestValidator.validateOpenIndex; +import static org.opensearch.indices.tiering.TieringRequestValidator.validateRemoteStoreIndex; +import static org.opensearch.indices.tiering.TieringRequestValidator.validateSearchNodes; + +public class TieringRequestValidatorTests extends OpenSearchTestCase { + + public void testValidateSearchNodes() { + ClusterState clusterStateWithSearchNodes = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(createNodes(2, 0, 0)) + .build(); + + // throws no errors + validateSearchNodes(clusterStateWithSearchNodes, "test_index"); + } + + public void testWithNoSearchNodesInCluster() { + ClusterState clusterStateWithNoSearchNodes = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(createNodes(0, 1, 1)) + .build(); + // throws error + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> validateSearchNodes(clusterStateWithNoSearchNodes, "test") + ); + } + + public void testValidRemoteStoreIndex() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + + ClusterState clusterState1 = buildClusterState( + indexName, + indexUuid, + Settings.builder() + .put(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) + .build() + ); + + assertTrue(validateRemoteStoreIndex(clusterState1, new Index(indexName, indexUuid))); + } + + public void testDocRepIndex() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + assertFalse(validateRemoteStoreIndex(buildClusterState(indexName, indexUuid, Settings.EMPTY), new Index(indexName, indexUuid))); + } + + public void testValidHotIndex() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + assertTrue(validateHotIndex(buildClusterState(indexName, indexUuid, Settings.EMPTY), new Index(indexName, indexUuid))); + } + + public void testIndexWithOngoingOrCompletedTiering() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + + IndexModule.TieringState tieringState = randomBoolean() ? IndexModule.TieringState.HOT_TO_WARM : IndexModule.TieringState.WARM; + + ClusterState clusterState = buildClusterState( + indexName, + indexUuid, + Settings.builder().put(IndexModule.INDEX_TIERING_STATE.getKey(), tieringState).build() + ); + assertFalse(validateHotIndex(clusterState, new Index(indexName, indexUuid))); + } + + public void testValidateIndexHealth() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + ClusterState clusterState = buildClusterState(indexName, indexUuid, Settings.EMPTY); + assertTrue(validateIndexHealth(clusterState, new Index(indexName, indexUuid))); + } + + public void testValidOpenIndex() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + assertTrue(validateOpenIndex(buildClusterState(indexName, indexUuid, Settings.EMPTY), new Index(indexName, indexUuid))); + } + + public void testCloseIndex() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + assertFalse( + validateOpenIndex( + buildClusterState(indexName, indexUuid, Settings.EMPTY, IndexMetadata.State.CLOSE), + new Index(indexName, indexUuid) + ) + ); + } + + public void testValidateDiskThresholdWaterMarkNotBreached() { + int noOfNodes = 2; + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(createNodes(noOfNodes, 0, 0)) + .build(); + + ClusterInfo clusterInfo = clusterInfo(noOfNodes, 100, 20); + DiskThresholdSettings diskThresholdSettings = diskThresholdSettings("10b", "10b", "5b"); + // throws no error + validateDiskThresholdWaterMarkNotBreached(clusterState, clusterInfo, diskThresholdSettings, "test"); + } + + public void testValidateDiskThresholdWaterMarkNotBreachedThrowsError() { + int noOfNodes = 2; + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(createNodes(noOfNodes, 0, 0)) + .build(); + ClusterInfo clusterInfo = clusterInfo(noOfNodes, 100, 5); + DiskThresholdSettings diskThresholdSettings = diskThresholdSettings("10b", "10b", "5b"); + // throws error + expectThrows( + IllegalArgumentException.class, + () -> validateDiskThresholdWaterMarkNotBreached(clusterState, clusterInfo, diskThresholdSettings, "test") + ); + } + + public void testGetTotalIndexSize() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + ClusterState clusterState = ClusterState.builder(buildClusterState(indexName, indexUuid, Settings.EMPTY)) + .nodes(createNodes(1, 0, 0)) + .build(); + Map diskUsages = diskUsages(1, 100, 50); + final Map shardSizes = new HashMap<>(); + shardSizes.put("[test_index][0][p]", 10L); // 10 bytes + ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), Map.of()); + assertEquals(10, getIndexPrimaryStoreSize(clusterState, clusterInfo, indexName)); + } + + public void testValidateEligibleNodesCapacityWithAllAccepted() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + Set indices = Set.of(new Index(indexName, indexUuid)); + ClusterState clusterState = ClusterState.builder(buildClusterState(indexName, indexUuid, Settings.EMPTY)) + .nodes(createNodes(1, 0, 0)) + .build(); + Map diskUsages = diskUsages(1, 100, 50); + final Map shardSizes = new HashMap<>(); + shardSizes.put("[test_index][0][p]", 10L); // 10 bytes + ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), Map.of()); + TieringValidationResult tieringValidationResult = new TieringValidationResult(indices); + validateEligibleNodesCapacity(clusterInfo, clusterState, tieringValidationResult); + assertEquals(indices, tieringValidationResult.getAcceptedIndices()); + assertTrue(tieringValidationResult.getRejectedIndices().isEmpty()); + } + + public void testValidateEligibleNodesCapacityWithAllRejected() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + Set indices = Set.of(new Index(indexName, indexUuid)); + ClusterState clusterState = ClusterState.builder(buildClusterState(indexName, indexUuid, Settings.EMPTY)) + .nodes(createNodes(1, 0, 0)) + .build(); + Map diskUsages = diskUsages(1, 100, 10); + final Map shardSizes = new HashMap<>(); + shardSizes.put("[test_index][0][p]", 20L); // 20 bytes + ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), Map.of()); + TieringValidationResult tieringValidationResult = new TieringValidationResult(indices); + validateEligibleNodesCapacity(clusterInfo, clusterState, tieringValidationResult); + assertEquals(indices.size(), tieringValidationResult.getRejectedIndices().size()); + assertEquals(indices, tieringValidationResult.getRejectedIndices().keySet()); + assertTrue(tieringValidationResult.getAcceptedIndices().isEmpty()); + } + + public void testGetTotalAvailableBytesInWarmTier() { + Map diskUsages = diskUsages(2, 500, 100); + assertEquals(200, getTotalAvailableBytesInWarmTier(diskUsages, Set.of("node-s0", "node-s1"))); + } + + public void testEligibleNodes() { + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(createNodes(2, 0, 0)) + .build(); + + assertEquals(2, getEligibleNodes(clusterState).size()); + + clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(createNodes(0, 1, 1)) + .build(); + assertEquals(0, getEligibleNodes(clusterState).size()); + } + + private static ClusterState buildClusterState(String indexName, String indexUuid, Settings settings) { + return buildClusterState(indexName, indexUuid, settings, IndexMetadata.State.OPEN); + } + + private static ClusterState buildClusterState(String indexName, String indexUuid, Settings settings, IndexMetadata.State state) { + Settings combinedSettings = Settings.builder().put(settings).put(createDefaultIndexSettings(indexUuid)).build(); + + Metadata metadata = Metadata.builder().put(IndexMetadata.builder(indexName).settings(combinedSettings).state(state)).build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .build(); + } + + private static Settings createDefaultIndexSettings(String indexUuid) { + return Settings.builder() + .put("index.version.created", Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, indexUuid) + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 2) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .build(); + } + + private DiscoveryNodes createNodes(int numOfSearchNodes, int numOfDataNodes, int numOfIngestNodes) { + DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder(); + for (int i = 0; i < numOfSearchNodes; i++) { + discoveryNodesBuilder.add( + new DiscoveryNode( + "node-s" + i, + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE), + Version.CURRENT + ) + ); + } + for (int i = 0; i < numOfDataNodes; i++) { + discoveryNodesBuilder.add( + new DiscoveryNode( + "node-d" + i, + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ) + ); + } + for (int i = 0; i < numOfIngestNodes; i++) { + discoveryNodesBuilder.add( + new DiscoveryNode( + "node-i" + i, + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.INGEST_ROLE), + Version.CURRENT + ) + ); + } + return discoveryNodesBuilder.build(); + } + + private static DiskThresholdSettings diskThresholdSettings(String low, String high, String flood) { + return new DiskThresholdSettings( + Settings.builder() + .put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), low) + .put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), high) + .put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), flood) + .build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + } + + private static ClusterInfo clusterInfo(int noOfNodes, long totalBytes, long freeBytes) { + final Map diskUsages = diskUsages(noOfNodes, totalBytes, freeBytes); + return new ClusterInfo(diskUsages, null, null, null, Map.of(), Map.of()); + } + + private static Map diskUsages(int noOfSearchNodes, long totalBytes, long freeBytes) { + final Map diskUsages = new HashMap<>(); + for (int i = 0; i < noOfSearchNodes; i++) { + diskUsages.put("node-s" + i, new DiskUsage("node-s" + i, "node-s" + i, "/foo/bar", totalBytes, freeBytes)); + } + return diskUsages; + } +}