diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/tiering/HotToWarmTieringServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/tiering/HotToWarmTieringServiceIT.java new file mode 100644 index 0000000000000..09b6fe7a22ec9 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/tiering/HotToWarmTieringServiceIT.java @@ -0,0 +1,150 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.tiering; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.opensearch.action.admin.indices.get.GetIndexResponse; +import org.opensearch.action.admin.indices.tiering.HotToWarmTieringAction; +import org.opensearch.action.admin.indices.tiering.HotToWarmTieringResponse; +import org.opensearch.action.admin.indices.tiering.TieringIndexRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.ClusterInfoService; +import org.opensearch.cluster.MockInternalClusterInfoService; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.index.IndexModule; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; +import org.opensearch.monitor.fs.FsInfo; +import org.opensearch.plugins.Plugin; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Before; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false) +// Uncomment the below line to enable trace level logs for this test for better debugging +// @TestLogging(reason = "Getting trace logs from tiering package", value = +// "org.opensearch.tiering:TRACE,org.opensearch.cluster.routing.allocation.decider:TRACE") +public class HotToWarmTieringServiceIT extends RemoteStoreBaseIntegTestCase { + + protected static final String TEST_IDX_1 = "test-idx-1"; + protected static final String TEST_IDX_2 = "test-idx-2"; + protected static final String TARGET_TIER = "warm"; + protected static final int NUM_DOCS_IN_BULK = 10; + private static final long TOTAL_SPACE_BYTES = new ByteSizeValue(1000, ByteSizeUnit.KB).getBytes(); + + /* + Disabling MockFSIndexStore plugin as the MockFSDirectoryFactory wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory) + As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory + */ + @Override + protected boolean addMockIndexStorePlugin() { + return false; + } + + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(MockInternalClusterInfoService.TestPlugin.class); + } + + @Override + protected Settings featureFlagSettings() { + Settings.Builder featureSettings = Settings.builder(); + featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true); + return featureSettings.build(); + } + + @Before + public void setup() { + internalCluster().startClusterManagerOnlyNode(); + } + + public void testTieringBasic() { + final int numReplicasIndex = 0; + internalCluster().ensureAtLeastNumDataNodes(1); + final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasIndex) + .put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL.name()) + .build(); + + String[] indices = new String[] { TEST_IDX_1, TEST_IDX_2 }; + for (String index : indices) { + assertAcked(client().admin().indices().prepareCreate(index).setSettings(settings).get()); + ensureGreen(index); + // Ingesting some docs + indexBulk(index, NUM_DOCS_IN_BULK); + flushAndRefresh(index); + // ensuring cluster is green after performing force-merge + ensureGreen(); + SearchResponse searchResponse = client().prepareSearch(index).setQuery(QueryBuilders.matchAllQuery()).get(); + // Asserting that search returns same number of docs as ingested + assertHitCount(searchResponse, NUM_DOCS_IN_BULK); + } + + // Spin up node having search role + internalCluster().ensureAtLeastNumSearchAndDataNodes(1); + + final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); + clusterInfoService.setDiskUsageFunctionAndRefresh( + (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES) + ); + + TieringIndexRequest request = new TieringIndexRequest(TARGET_TIER, indices); + request.waitForCompletion(true); + HotToWarmTieringResponse response = client().admin().indices().execute(HotToWarmTieringAction.INSTANCE, request).actionGet(); + assertAcked(response); + assertTrue(response.getFailedIndices().isEmpty()); + assertTrue(response.isAcknowledged()); + ensureGreen(); + for (String index : indices) { + SearchResponse searchResponse = client().prepareSearch(index).setQuery(QueryBuilders.matchAllQuery()).get(); + // Asserting that search returns same number of docs as ingested + assertHitCount(searchResponse, NUM_DOCS_IN_BULK); + GetIndexResponse getIndexResponse = client().admin().indices().prepareGetIndex().addIndices(index).get(); + assertWarmSettings(getIndexResponse, index); + assertAcked(client().admin().indices().prepareDelete(index).get()); + } + } + + private MockInternalClusterInfoService getMockInternalClusterInfoService() { + return (MockInternalClusterInfoService) internalCluster().getCurrentClusterManagerNodeInstance(ClusterInfoService.class); + } + + private static FsInfo.Path setDiskUsage(FsInfo.Path original, long totalBytes, long freeBytes) { + return new FsInfo.Path(original.getPath(), original.getMount(), totalBytes, freeBytes, freeBytes); + } + + private void assertWarmSettings(GetIndexResponse response, String indexName) { + final Map settings = response.settings(); + assertThat(settings, notNullValue()); + assertThat(settings.size(), equalTo(1)); + Settings indexSettings = settings.get(indexName); + assertThat(indexSettings, notNullValue()); + assertThat( + indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()), + equalTo(IndexModule.DataLocalityType.PARTIAL.name()) + ); + assertThat(indexSettings.get(IndexModule.INDEX_TIERING_STATE.getKey()), equalTo(IndexModule.TieringState.WARM.name())); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java new file mode 100644 index 0000000000000..8d63dc454012b --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java @@ -0,0 +1,113 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.tiering; + +import org.opensearch.common.UUIDs; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; + +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Context class to hold indices to be tiered per request. It also holds + * the listener per request to mark the request as complete once all + * tiering operations are completed. + * + * @opensearch.experimental + */ + +@ExperimentalApi +public class TieringRequestContext { + private final String requestUuid; + private final TieringIndexRequest request; + private final ActionListener actionListener; + private final Set inProgressIndices; + private final Set successfulIndices; + private final Map failedIndices; + + public TieringRequestContext( + TieringIndexRequest request, + ActionListener actionListener, + Set inProgressIndices, + Map failedIndices + ) { + this.request = request; + this.actionListener = actionListener; + this.inProgressIndices = inProgressIndices; + this.failedIndices = failedIndices; + this.requestUuid = UUIDs.randomBase64UUID(); + this.successfulIndices = new HashSet<>(); + } + + public ActionListener getListener() { + return actionListener; + } + + public TieringIndexRequest getRequest() { + return request; + } + + public String getRequestUuid() { + return requestUuid; + } + + public Set getInProgressIndices() { + return inProgressIndices; + } + + public Map getFailedIndices() { + return failedIndices; + } + + public Set getSuccessfulIndices() { + return successfulIndices; + } + + public void addToFailed(Index index, String reason) { + inProgressIndices.remove(index); + failedIndices.put(index, reason); + } + + public void addToSuccessful(Index index) { + inProgressIndices.remove(index); + successfulIndices.add(index); + } + + public boolean isRequestProcessingComplete() { + return inProgressIndices.isEmpty(); + } + + public HotToWarmTieringResponse constructHotToWarmTieringResponse() { + final List indicesResult = new LinkedList<>(); + for (Map.Entry rejectedIndex : failedIndices.entrySet()) { + indicesResult.add(new HotToWarmTieringResponse.IndexResult(rejectedIndex.getKey().getName(), rejectedIndex.getValue())); + } + return new HotToWarmTieringResponse(successfulIndices.size() > 0, indicesResult); + } + + @Override + public String toString() { + return "TieringRequestContext{" + + "requestUuid='" + + requestUuid + + '\'' + + ", inProgressIndices=" + + inProgressIndices + + ", successfulIndices=" + + successfulIndices + + ", failedIndices=" + + failedIndices + + '}'; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringAction.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringAction.java index 628d4bcfe3383..ef5c1291f234f 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringAction.java @@ -25,6 +25,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.index.Index; +import org.opensearch.indices.tiering.HotToWarmTieringService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -45,6 +46,8 @@ public class TransportHotToWarmTieringAction extends TransportClusterManagerNode private final ClusterInfoService clusterInfoService; private final DiskThresholdSettings diskThresholdSettings; + private final HotToWarmTieringService hotToWarmTieringService; + @Inject public TransportHotToWarmTieringAction( TransportService transportService, @@ -53,7 +56,8 @@ public TransportHotToWarmTieringAction( ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ClusterInfoService clusterInfoService, - Settings settings + Settings settings, + HotToWarmTieringService hotToWarmTieringService ) { super( HotToWarmTieringAction.NAME, @@ -66,6 +70,7 @@ public TransportHotToWarmTieringAction( ); this.clusterInfoService = clusterInfoService; this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterService.getClusterSettings()); + this.hotToWarmTieringService = hotToWarmTieringService; } @Override @@ -106,5 +111,12 @@ protected void clusterManagerOperation( listener.onResponse(tieringValidationResult.constructResponse()); return; } + TieringRequestContext tieringRequestContext = new TieringRequestContext( + request, + listener, + tieringValidationResult.getAcceptedIndices(), + tieringValidationResult.getRejectedIndices() + ); + hotToWarmTieringService.tier(tieringRequestContext, listener); } } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index 9e7fe23f29872..195021252135f 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -637,6 +637,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException { static final String KEY_SYSTEM = "system"; public static final String KEY_PRIMARY_TERMS = "primary_terms"; public static final String REMOTE_STORE_CUSTOM_KEY = "remote_store"; + public static final String TIERING_CUSTOM_KEY = "tiering"; public static final String TRANSLOG_METADATA_KEY = "translog_metadata"; public static final String INDEX_STATE_FILE_PREFIX = "state-"; diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java index db10ad61c7d6d..c57b2ee33ec92 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java @@ -11,6 +11,8 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.IndexModule; /** * {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods @@ -58,6 +60,11 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all * @return {@link RoutingPool} for the given index. */ public static RoutingPool getIndexPool(IndexMetadata indexMetadata) { - return indexMetadata.isRemoteSnapshot() ? REMOTE_CAPABLE : LOCAL_ONLY; + return indexMetadata.isRemoteSnapshot() + || (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX) + && IndexModule.DataLocalityType.PARTIAL.name() + .equals(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()))) + ? REMOTE_CAPABLE + : LOCAL_ONLY; } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index b2443490dd973..80e72485960f9 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -54,6 +54,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import java.util.HashMap; import java.util.HashSet; @@ -284,6 +285,9 @@ public void allocate(RoutingAllocation allocation) { preferPrimaryShardBalance, preferPrimaryShardRebalance ); + if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) { + localShardsBalancer.tierShards(); + } localShardsBalancer.allocateUnassigned(); localShardsBalancer.moveShards(); localShardsBalancer.balance(); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 00eb79add9f1d..fb5fdbafd7f62 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -28,9 +28,12 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; +import org.opensearch.common.Randomness; import org.opensearch.common.collect.Tuple; import org.opensearch.gateway.PriorityComparator; +import org.opensearch.index.IndexModule; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -39,6 +42,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -527,6 +531,133 @@ private void checkAndAddInEligibleTargetNode(RoutingNode targetNode) { } } + /** + * Triggers shard relocation for the shards that are submitted + * for tiering to go from local node pool to remote capable node + * pool. + */ + public void tierShards() { + final Set indicesPendingTiering = new HashSet<>(); + allocation.routingTable().indicesRouting().keySet().stream().forEach(index -> { + IndexMetadata indexMetadata = allocation.metadata().index(index); + if (IndexModule.TieringState.HOT_TO_WARM.name() + .equals(indexMetadata.getSettings().get(IndexModule.INDEX_TIERING_STATE.getKey()))) { + indicesPendingTiering.add(index); + } + }); + if (indicesPendingTiering.isEmpty()) { + logger.debug("No indices found eligible for hot to warm tiering."); + return; + } + + logger.debug("[HotToWarmTiering] Indices pending tiering are [{}]", indicesPendingTiering); + final List shardsPendingTiering = new ArrayList<>(); + for (String index : indicesPendingTiering) { + final List indexShards = allocation.routingTable().index(index).shardsWithState(ShardRoutingState.STARTED); + logger.debug("[HotToWarmTiering] Found shards {} in STARTED state for index [{}] for tiering", indexShards, index); + for (ShardRouting shard : indexShards) { + RoutingPool targetPool = RoutingPool.getShardPool(shard, allocation); + RoutingPool currentNodePool = RoutingPool.getNodePool(allocation.routingNodes().node(shard.currentNodeId())); + if (targetPool != currentNodePool) { + logger.trace( + "[HotToWarmTiering] Found shard [{}] with target pool [{}] on node pool [{}]. Adding to tiering list.", + shard.toString(), + targetPool.toString(), + currentNodePool.toString() + ); + shardsPendingTiering.add(shard); + } + } + } + + if (shardsPendingTiering.isEmpty()) { + logger.debug("No shards found eligible for hot to warm tiering."); + return; + } + + final List remoteRoutingNodes = new ArrayList<>(); + routingNodes.stream().forEach(node -> { + RoutingPool pool = RoutingPool.getNodePool(node); + if (pool == RoutingPool.REMOTE_CAPABLE) { + remoteRoutingNodes.add(node); + } + }); + + if (remoteRoutingNodes.isEmpty()) { + logger.warn("No nodes available in the remote capable pool for hot to warm tiering"); + return; + } + + Randomness.shuffle(remoteRoutingNodes); + Queue nodeQueue = new ArrayDeque<>(remoteRoutingNodes); + + // Relocate shards pending tiering + for (ShardRouting shard : shardsPendingTiering) { + if (nodeQueue.isEmpty()) { + logger.error("[HotToWarmTiering] No nodes available. Cannot tier to target pool: [{}]", RoutingPool.REMOTE_CAPABLE.name()); + break; + } + + logger.info( + "[HotToWarmTiering] Processing tiering, Target Pool: [{}]. Pending Shards: [{}], Available Nodes: [{}]", + RoutingPool.REMOTE_CAPABLE.name(), + shardsPendingTiering, + nodeQueue.size() + ); + + // Find a tiering target node for shard and initiate relocation + int nodesCheckedForShard = 0; + while (!nodeQueue.isEmpty()) { + RoutingNode targetNode = nodeQueue.poll(); + Decision tieringDecision = allocation.deciders().canAllocate(shard, targetNode, allocation); + + if (tieringDecision.type() == Decision.Type.YES) { + logger.debug( + "[HotToWarmTiering] Relocating shard: [{}] from node: [{}] to node: [{}].", + shard.toString(), + shard.currentNodeId(), + targetNode.nodeId() + ); + routingNodes.relocateShard( + shard, + targetNode.nodeId(), + allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), + allocation.changes() + ); + nodeQueue.offer(targetNode); + break; + } else { + logger.trace( + "[HotToWarmTiering] Cannot relocate shard: [{}] to node: [{}]. Decisions: [{}]", + shard.toString(), + targetNode.nodeId(), + tieringDecision.getDecisions() + ); + + Decision nodeLevelDecision = allocation.deciders().canAllocateAnyShardToNode(targetNode, allocation); + if (nodeLevelDecision.type() == Decision.Type.YES) { + logger.debug( + "[HotToWarmTiering] Node: [{}] can still accept shards. Adding it back to the queue.", + targetNode.nodeId() + ); + nodeQueue.offer(targetNode); + nodesCheckedForShard++; + } else { + logger.debug( + "[HotToWarmTiering] Node: [{}] cannot accept any more shards. Removing it from queue.", + targetNode.nodeId() + ); + } + + // Break out if all nodes in the queue have been checked for this shard + if (nodeQueue.size() == nodesCheckedForShard) { + break; + } + } + } + } + } + /** * Move started shards that can not be allocated to a node anymore *

diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java index a05938c176678..5ac2faa80876e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java @@ -266,6 +266,17 @@ void balance() { } } + /** + * Intended to trigger shard relocation for the shards that are submitted + * for tiering to go from remote node pool to local capable node + * pool. + * yet to be implemented + */ + @Override + void tierShards() { + throw new UnsupportedOperationException(); + } + /** * Calculates the total number of primary shards per node. * @param remoteRoutingNodes routing nodes for which the aggregation needs to be performed diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java index ef2dbd34644a7..17b0ed0234e53 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java @@ -37,6 +37,8 @@ public abstract class ShardsBalancer { */ abstract void balance(); + abstract void tierShards(); + /** * Make a decision for allocating an unassigned shard. * @param shardRouting the shard for which the decision has to be made diff --git a/server/src/main/java/org/opensearch/indices/tiering/HotToWarmTieringService.java b/server/src/main/java/org/opensearch/indices/tiering/HotToWarmTieringService.java new file mode 100644 index 0000000000000..29d6c8042051b --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/tiering/HotToWarmTieringService.java @@ -0,0 +1,875 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.tiering; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; +import org.opensearch.action.admin.indices.tiering.HotToWarmTieringResponse; +import org.opensearch.action.admin.indices.tiering.TieringIndexRequest; +import org.opensearch.action.admin.indices.tiering.TieringRequestContext; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateListener; +import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.NotClusterManagerException; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.IndexModule; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING; +import static org.opensearch.cluster.metadata.IndexMetadata.TIERING_CUSTOM_KEY; +import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING; +import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; + +/** + * Service responsible for tiering indices from hot to warm + * @opensearch.experimental + */ +public class HotToWarmTieringService extends AbstractLifecycleComponent implements ClusterStateListener { + private static final Logger logger = LogManager.getLogger(HotToWarmTieringService.class); + protected final ClusterService clusterService; + protected final IndexNameExpressionResolver indexNameExpressionResolver; + protected final AllocationService allocationService; + // map to store all the in progress tiering requests + private final Map requestUuidToRequestContext = new ConcurrentHashMap<>(); + // map to store index info of all the indices under-going hot to warm tiering + private final Map indexTieringInfoMap = new ConcurrentHashMap<>(); + public static final String HOT_TO_WARM_START_TIME = "hot_to_warm_start_time"; + public static final String HOT_TO_WARM_END_TIME = "hot_to_warm_end_time"; + + @Inject + public HotToWarmTieringService( + Settings settings, + ClusterService clusterService, + IndexNameExpressionResolver indexNameExpressionResolver, + AllocationService allocationService + ) { + super(); + this.clusterService = clusterService; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.allocationService = allocationService; + + if (DiscoveryNode.isClusterManagerNode(settings) && FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) { + clusterService.addListener(this); + } + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + // TODO add handling for master switch, dangling indices, master reload + if (event.routingTableChanged()) { + if (!indexTieringInfoMap.isEmpty()) { + updateIndexShardStatus(event.state()); + } + if (!requestUuidToRequestContext.isEmpty()) { + completeRequestLevelTiering(); + } + } + } + + /** + * Checks and completes the request level tiering for + * the request uuids in requestUuidToRequestContext map. + */ + void completeRequestLevelTiering() { + for (String requestUuid : requestUuidToRequestContext.keySet()) { + final TieringRequestContext requestContext = requestUuidToRequestContext.get(requestUuid); + if (requestContext.isRequestProcessingComplete()) { + logger.info("Tiering is completed for the request [{}]", requestContext); + for (Index index : indexTieringInfoMap.keySet()) { + indexTieringInfoMap.remove(index); + } + requestUuidToRequestContext.remove(requestUuid); + if (requestContext.getRequest().waitForCompletion()) { + requestContext.getListener().onResponse(requestContext.constructHotToWarmTieringResponse()); + } + } + } + } + + /** + * Checks if the given node is a search node. + * @param node the node to check. + * @return true if the node is a search node, false otherwise. + */ + boolean isSearchNode(final DiscoveryNode node) { + return node != null && node.isSearchNode(); + } + + /** + * Updates the shard tiering status based on the shard state and the node that it is assigned to. + * @param shard the shard to update the status for. + * @param clusterState the cluster state. + * @return the updated shard tiering status. + */ + ShardTieringStatus updateShardTieringStatus(final ShardRouting shard, final ClusterState clusterState) { + final boolean isShardFoundOnSearchNode = isSearchNode(clusterState.getNodes().get(shard.currentNodeId())); + final boolean isShardRelocatingToSearchNode = isSearchNode(clusterState.getNodes().get(shard.relocatingNodeId())); + ShardTieringState tieringState; + String reason = null; + if (shard.started() && isShardFoundOnSearchNode) { + tieringState = ShardTieringState.SUCCESSFUL; + } else if (shard.unassigned()) { + tieringState = ShardTieringState.INIT; + reason = "Shard is unassigned due to " + shard.unassignedInfo().getReason(); + } else if (((shard.initializing() || shard.started()) && !isShardFoundOnSearchNode) + || (shard.relocating() && !isShardRelocatingToSearchNode)) { + tieringState = ShardTieringState.FAILED; + reason = "Shard with current state: " + + shard.state().toString() + + " is neither allocated nor relocating to the search node, " + + "current node: " + + shard.currentNodeId() + + ", relocating node: " + + shard.relocatingNodeId(); + } else { + tieringState = ShardTieringState.PROCESSING; + } + return new ShardTieringStatus( + shard.relocating() ? shard.relocatingNodeId() : shard.currentNodeId(), + shard.state(), + tieringState, + reason + ); + } + + /** + * Checks for updates in the index shard status for the indices + * under-going tiering and triggers a cluster state update for + * successful/failed indices. + * @param clusterState the cluster state. + */ + void updateIndexShardStatus(ClusterState clusterState) { + List shardRoutings; + // to include successful/failed indices + final Set completedIndices = new HashSet<>(); + for (Index index : indexTieringInfoMap.keySet()) { + IndexTieringInfo indexTieringInfo = indexTieringInfoMap.get(index); + if (indexTieringInfo.isCompleted()) { + continue; + } + if (clusterState.routingTable().hasIndex(index)) { + // Ensure index is not deleted + shardRoutings = clusterState.routingTable().allShards(index.getName()); + } else { + // Index already deleted nothing to do + logger.warn("Index [{}] deleted before hot to warm relocation finished", index.getName()); + requestUuidToRequestContext.get(indexTieringInfo.getRequestUuid()) + .addToFailed(indexTieringInfo.getIndex(), "index not found"); + continue; + } + final Map shardTieringStatusMap = indexTieringInfo.getShardTieringStatus(); + boolean relocationCompleted = true; + for (ShardRouting shard : shardRoutings) { + ShardInfo shardInfo = new ShardInfo(shard.shardId(), shard.primary()); + ShardTieringStatus currentShardTieringStatus = shardTieringStatusMap.get(shardInfo); + if (currentShardTieringStatus != null && (currentShardTieringStatus.isShardCompleted())) { + // shard is either already successful or failed + continue; + } + logger.debug("Shard tiering status for the shard {} : {}", shard.toString(), currentShardTieringStatus.toString()); + ShardTieringStatus updatedShardTieringStatus = updateShardTieringStatus(shard, clusterState); + logger.debug("Updated Shard tiering status for the shard {} : {}", shard.toString(), updatedShardTieringStatus.toString()); + if (updatedShardTieringStatus.isShardNotSuccessful()) { + relocationCompleted = false; + } + shardTieringStatusMap.put(shardInfo, updatedShardTieringStatus); + if (!completedIndices.contains(indexTieringInfo) && updatedShardTieringStatus.hasShardFailed()) { + indexTieringInfo.setState(IndexTieringStatus.FAILED); + completedIndices.add(indexTieringInfo); + } + } + indexTieringInfo.setShardTieringStatus(shardTieringStatusMap); + if (relocationCompleted) { + logger.info("Hot to warm relocation completed for index [{}]", index.getName()); + indexTieringInfo.setState(IndexTieringStatus.SUCCESSFUL); + completedIndices.add(indexTieringInfo); + } + indexTieringInfoMap.put(index, indexTieringInfo); + } + if (!completedIndices.isEmpty()) { + updateRequestContextForCompletedIndices(completedIndices); + updateClusterStateForCompletedIndices(completedIndices); + } + } + + /** + * Updates the index metadata with the tiering settings/metadata for a completed index. + * Completed index is a successful or failed tiered index. + * @param metadataBuilder metadata builder + * @param indexTieringInfo index tiering info + * @return updated index metadata builder + */ + IndexMetadata.Builder updateIndexMetadataForCompletedIndex( + final Metadata.Builder metadataBuilder, + final IndexTieringInfo indexTieringInfo + ) { + final IndexMetadata indexMetadata = metadataBuilder.get(indexTieringInfo.getIndex().getName()); + Settings.Builder indexSettingsBuilder = Settings.builder().put(indexMetadata.getSettings()); + // update tiering settings here + if (indexTieringInfo.hasFailed()) { + indexSettingsBuilder.put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL); + indexSettingsBuilder.put(INDEX_TIERING_STATE.getKey(), IndexModule.TieringState.HOT); + + } else { // successful case here + indexSettingsBuilder.put(INDEX_TIERING_STATE.getKey(), IndexModule.TieringState.WARM); + } + + // trying to put transient index metadata in the custom index metadata + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata).settings(indexSettingsBuilder); + Map tieringCustomData = new HashMap<>(); + tieringCustomData.put(HOT_TO_WARM_START_TIME, indexMetadata.getCustomData(TIERING_CUSTOM_KEY).get(HOT_TO_WARM_START_TIME)); + tieringCustomData.put(HOT_TO_WARM_END_TIME, String.valueOf(System.currentTimeMillis())); + indexMetadataBuilder.putCustom(TIERING_CUSTOM_KEY, tieringCustomData); + + // Update index settings version + indexMetadataBuilder.settingsVersion(1 + indexMetadataBuilder.settingsVersion()); + return indexMetadataBuilder; + } + + /** + * Checks if any of the indices in the completed indices set has failed. + * @param completedIndices set of completed indices + * @return true if any of the indices has failed, false otherwise + */ + boolean hasFailedIndices(final Set completedIndices) { + for (IndexTieringInfo indexTieringInfo : completedIndices) { + if (indexTieringInfo.hasFailed()) { + return true; + } + } + return false; + } + + /** + * Updates the request context object in the requestUuidToRequestContext map for + * completed (successful/failed) indices. + * @param completedIndices set of completed indices + */ + void updateRequestContextForCompletedIndices(final Set completedIndices) { + for (IndexTieringInfo indexTieringInfo : completedIndices) { + String reqId = indexTieringInfo.getRequestUuid(); + TieringRequestContext requestContext = requestUuidToRequestContext.get(reqId); + if (indexTieringInfo.hasFailed()) { + requestContext.addToFailed(indexTieringInfo.getIndex(), indexTieringInfo.getFailureReason()); + } else { + requestContext.addToSuccessful(indexTieringInfo.getIndex()); + } + } + } + + /** + * Updates the cluster state by updating the index metadata for completed (successful/failed) indices. + * @param completedIndices set of completed indices + */ + void updateClusterStateForCompletedIndices(final Set completedIndices) { + clusterService.submitStateUpdateTask( + "process hot to warm tiering for completed indices", + new ClusterStateUpdateTask(Priority.URGENT) { + + @Override + public ClusterState execute(ClusterState currentState) { + final Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); + for (IndexTieringInfo indexTieringInfo : completedIndices) { + final IndexMetadata.Builder indexMetadataBuilder = updateIndexMetadataForCompletedIndex( + metadataBuilder, + indexTieringInfo + ); + metadataBuilder.put(indexMetadataBuilder); + } + ClusterState updatedState = ClusterState.builder(currentState) + .metadata(metadataBuilder) + .routingTable(routingTableBuilder.build()) + .build(); + + if (hasFailedIndices(completedIndices)) { + // only reroute in case of failed indices to trigger shard relocation for shards to go back to hot nodes + updatedState = allocationService.reroute(updatedState, "hot to warm revert tiering"); + } + return updatedState; + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn( + (Supplier) () -> new ParameterizedMessage( + "failed to process hot to warm tiering for completed indices " + "[{}]", + completedIndices + ), + e + ); + } + + @Override + public void onNoLongerClusterManager(String source) { + this.onFailure(source, new NotClusterManagerException("no longer cluster manager. source: [" + source + "]")); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.info("Cluster state updated for source " + source); + } + } + ); + } + + /** + * Updates the index metadata with the tiering settings/metadata for an accepted index. + * Accepted index is an index to be tiered from hot to warm. + * @param metadataBuilder metadata builder + * @param routingTableBuilder routing builder + * @param index index + * @return updated index metadata builder + */ + IndexMetadata.Builder updateIndexMetadataForAcceptedIndex( + final Metadata.Builder metadataBuilder, + final RoutingTable.Builder routingTableBuilder, + final Index index + ) { + final IndexMetadata indexMetadata = metadataBuilder.get(index.getName()); + Settings.Builder indexSettingsBuilder = Settings.builder().put(indexMetadata.getSettings()); + // update index settings here + indexSettingsBuilder.put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL); + indexSettingsBuilder.put(INDEX_TIERING_STATE.getKey(), IndexModule.TieringState.HOT_TO_WARM); + + // Update number of replicas to 0 in case the number of replicas is greater than or equal to 1 + if (Integer.parseInt(metadataBuilder.getSafe(index).getSettings().get(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey())) >= 1) { + final String[] indices = new String[] { index.getName() }; + routingTableBuilder.updateNumberOfReplicas(0, indices); + metadataBuilder.updateNumberOfReplicas(0, indices); + } + // trying to put transient index metadata in the custom index metadata + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata).settings(indexSettingsBuilder); + final Map tieringCustomData = new HashMap<>(); + tieringCustomData.put(HOT_TO_WARM_START_TIME, String.valueOf(System.currentTimeMillis())); + tieringCustomData.put(HOT_TO_WARM_END_TIME, "-1"); + indexMetadataBuilder.putCustom(TIERING_CUSTOM_KEY, tieringCustomData); + // Update index settings version + indexMetadataBuilder.settingsVersion(1 + indexMetadataBuilder.settingsVersion()); + return indexMetadataBuilder; + } + + /** + * Tier indices from hot to warm + * @param tieringRequestContext - request context for the tiering request + * @param listener - call back listener + */ + public void tier(final TieringRequestContext tieringRequestContext, final ActionListener listener) { + final TieringIndexRequest request = tieringRequestContext.getRequest(); + logger.info("Starting hot to warm tiering for indices {}", tieringRequestContext); + + clusterService.submitStateUpdateTask("start hot to warm tiering", new ClusterStateUpdateTask(Priority.URGENT) { + + @Override + public ClusterState execute(ClusterState currentState) { + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); + final Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + for (Index index : tieringRequestContext.getInProgressIndices()) { + final IndexMetadata.Builder indexMetadataBuilder = updateIndexMetadataForAcceptedIndex( + metadataBuilder, + routingTableBuilder, + index + ); + metadataBuilder.put(indexMetadataBuilder); + final Map shardTieringStatus = new HashMap<>(); + currentState.routingTable().allShards(index.getName()).forEach(shardRouting -> { + shardTieringStatus.put( + new ShardInfo(shardRouting.shardId(), shardRouting.primary()), + new ShardTieringStatus(currentState.nodes().getLocalNodeId(), shardRouting.state()) + ); + }); + + final IndexTieringInfo indexTieringInfo = new IndexTieringInfo( + tieringRequestContext.getRequestUuid(), + index, + shardTieringStatus + ); + indexTieringInfoMap.put(index, indexTieringInfo); + } + requestUuidToRequestContext.put(tieringRequestContext.getRequestUuid(), tieringRequestContext); + + ClusterState updatedState = ClusterState.builder(currentState) + .metadata(metadataBuilder) + .routingTable(routingTableBuilder.build()) + .build(); + + // now, reroute to trigger shard relocation for the dedicated case + updatedState = allocationService.reroute(updatedState, "hot to warm tiering"); + + return updatedState; + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn( + (Supplier) () -> new ParameterizedMessage( + "failed to start hot to warm tiering for indices " + "[{}]", + (Object) request.indices() + ), + e + ); + listener.onFailure(e); + } + + @Override + public void onNoLongerClusterManager(String source) { + this.onFailure(source, new NotClusterManagerException("no longer cluster manager. source: [" + source + "]")); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.info("Cluster state updated for source " + source); + if (!request.waitForCompletion()) { + listener.onResponse(tieringRequestContext.constructHotToWarmTieringResponse()); + } + } + + @Override + public TimeValue timeout() { + return request.clusterManagerNodeTimeout(); + } + }); + } + + @Override + protected void doStart() {} + + @Override + protected void doStop() {} + + @Override + protected void doClose() throws IOException {} + + /** + * Represents info of a tiering index + * + * @opensearch.experimental + */ + @ExperimentalApi + public static class IndexTieringInfo { + private final String requestUuid; + private final Index index; + private IndexTieringStatus state; + private Map shardTieringStatus; + + public IndexTieringInfo(String requestUuid, Index index, Map shardTieringStatus) { + this.requestUuid = requestUuid; + this.index = index; + this.state = IndexTieringStatus.INIT; + this.shardTieringStatus = shardTieringStatus; + } + + IndexTieringInfo(String requestUuid, Index index, IndexTieringStatus state, Map shardTieringStatus) { + this.requestUuid = requestUuid; + this.index = index; + this.state = state; + this.shardTieringStatus = shardTieringStatus; + } + + public Index getIndex() { + return index; + } + + public IndexTieringStatus getState() { + return state; + } + + public void setState(IndexTieringStatus state) { + this.state = state; + } + + public boolean isSuccessful() { + return state.successful(); + } + + public boolean hasFailed() { + return state.failed(); + } + + public boolean isCompleted() { + return state.successful() || state.failed(); + } + + public String getRequestUuid() { + return requestUuid; + } + + public Map getShardTieringStatus() { + return shardTieringStatus; + } + + public void setShardTieringStatus(Map shardTieringStatus) { + this.shardTieringStatus = shardTieringStatus; + } + + public String getFailureReason() { + StringBuilder reason = new StringBuilder(); + reason.append("Failure for index: ").append(index.getName()).append("["); + for (ShardInfo shardInfo : shardTieringStatus.keySet()) { + ShardTieringStatus shardStatus = shardTieringStatus.get(shardInfo); + if (shardStatus.hasShardFailed()) { + reason.append("{Shard: ").append(shardInfo.toString()).append(", reason: ").append(shardStatus.reason()).append("}"); + } + } + reason.append("]"); + return reason.toString(); + } + + @Override + public String toString() { + return "IndexTieringInfo{" + + "requestUuid='" + + requestUuid + + '\'' + + ", index=" + + index + + ", state=" + + state + + ", shardTieringStatus=" + + shardTieringStatus + + '}'; + } + } + + /** + * Represents info of a tiering shard + * + * @opensearch.experimental + */ + @ExperimentalApi + public static class ShardInfo { + private final ShardId shardId; + private final boolean primary; + + public ShardInfo(ShardId shardId, boolean primary) { + this.shardId = shardId; + this.primary = primary; + } + + public ShardId getShardId() { + return shardId; + } + + public boolean isPrimary() { + return primary; + } + + @Override + public String toString() { + return "ShardInfo{" + "shardId=" + shardId + ", primary=" + primary + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ShardInfo shardInfo = (ShardInfo) o; + + if (primary != shardInfo.primary) return false; + return Objects.equals(shardId, shardInfo.shardId); + } + + @Override + public int hashCode() { + int result = shardId != null ? shardId.hashCode() : 0; + result = 31 * result + (primary ? 1 : 0); + return result; + } + } + + /** + * Represents status of a tiering shard + * + * @opensearch.experimental + */ + @ExperimentalApi + public static class ShardTieringStatus { + private final ShardTieringState state; + private final ShardRoutingState shardRoutingState; + private final String nodeId; + private final String reason; + + /** + * Constructs a new shard tiering status in with specified state on the given node + * + * @param nodeId node id + * @param shardRoutingState shard state + */ + public ShardTieringStatus(String nodeId, ShardRoutingState shardRoutingState) { + this(nodeId, shardRoutingState, ShardTieringState.INIT, null); + } + + /** + * Constructs a new shard tiering status in with specified state on the given node with specified failure reason + * + * @param nodeId node id + * @param shardRoutingState shard routing state + * @param state shard tiering state + * @param reason failure reason + */ + public ShardTieringStatus(String nodeId, ShardRoutingState shardRoutingState, ShardTieringState state, String reason) { + this.nodeId = nodeId; + this.state = state; + this.reason = reason; + this.shardRoutingState = shardRoutingState; + } + + /** + * Returns current state + * + * @return current state + */ + public ShardTieringState state() { + return state; + } + + /** + * Returns node id of the node where shared is getting tiered + * + * @return node id + */ + public String nodeId() { + return nodeId; + } + + /** + * Returns failure reason + * + * @return failure reason + */ + public String reason() { + return reason; + } + + public boolean isShardCompleted() { + return this.state == ShardTieringState.SUCCESSFUL || this.state == ShardTieringState.FAILED; + } + + public boolean hasShardFailed() { + return this.state == ShardTieringState.FAILED; + } + + public boolean isShardNotSuccessful() { + return this.state != ShardTieringState.SUCCESSFUL; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ShardTieringStatus status = (ShardTieringStatus) o; + return state == status.state && Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason); + } + + @Override + public int hashCode() { + return Objects.hash(state, nodeId, reason); + } + + @Override + public String toString() { + return "ShardTieringStatus{" + + "state=" + + state + + ", shardRoutingState=" + + shardRoutingState + + ", nodeId='" + + nodeId + + '\'' + + ", reason='" + + reason + + '\'' + + '}'; + } + } + + /** + * Index Tiering status + * + * @opensearch.experimental + */ + @ExperimentalApi + public enum IndexTieringStatus { + /** + * Initializing state + */ + INIT((byte) 0), + /** + * Tiering finished successfully + */ + SUCCESSFUL((byte) 1), + /** + * Tiering failed + */ + FAILED((byte) 2); + + private final byte value; + + /** + * Constructs new state + * + * @param value state code + */ + IndexTieringStatus(byte value) { + this.value = value; + } + + /** + * Returns state code + * + * @return state code + */ + public byte value() { + return value; + } + + /** + * @return true if tiering is successful + */ + public boolean successful() { + return this == SUCCESSFUL; + } + + /** + * @return true if tiering is failed + */ + public boolean failed() { + return this == FAILED; + } + + /** + * Returns state corresponding to state code + * + * @param value stat code + * @return state + */ + public static IndexTieringStatus fromValue(byte value) { + switch (value) { + case 0: + return INIT; + case 1: + return SUCCESSFUL; + case 2: + return FAILED; + default: + throw new IllegalArgumentException("No tiering state for value [" + value + "]"); + } + } + } + + /** + * Shard Tiering state + * + * @opensearch.experimental + */ + @ExperimentalApi + public enum ShardTieringState { + /** + * Initializing state + */ + INIT((byte) 0), + /** + * Processing state + */ + PROCESSING((byte) 1), + /** + * Tiering finished successfully + */ + SUCCESSFUL((byte) 2), + /** + * Tiering failed + */ + FAILED((byte) 3); + + private final byte value; + + /** + * Constructs new state + * + * @param value state code + */ + ShardTieringState(byte value) { + this.value = value; + } + + /** + * Returns state code + * + * @return state code + */ + public byte value() { + return value; + } + + /** + * Returns true if tiering completed (either successfully or with failure) + * + * @return true if tiering completed + */ + public boolean completed() { + return this == SUCCESSFUL || this == FAILED; + } + + /** + * Returns state corresponding to state code + * + * @param value stat code + * @return state + */ + public static ShardTieringState fromValue(byte value) { + switch (value) { + case 0: + return INIT; + case 1: + return PROCESSING; + case 2: + return SUCCESSFUL; + case 3: + return FAILED; + default: + throw new IllegalArgumentException("No tiering state for value [" + value + "]"); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index d91b2a45a48c6..1ecdb30f7f797 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -178,6 +178,7 @@ import org.opensearch.indices.replication.SegmentReplicationSourceService; import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.store.IndicesStore; +import org.opensearch.indices.tiering.HotToWarmTieringService; import org.opensearch.ingest.IngestService; import org.opensearch.monitor.MonitorService; import org.opensearch.monitor.fs.FsHealthService; @@ -1195,6 +1196,13 @@ protected Node( remoteClusterStateService ); + final HotToWarmTieringService hotToWarmTieringService = new HotToWarmTieringService( + settings, + clusterService, + clusterModule.getIndexNameExpressionResolver(), + clusterModule.getAllocationService() + ); + final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor( settings, clusterService::state, @@ -1391,6 +1399,9 @@ protected Node( b.bind(TransportNodesSnapshotsStatus.class).toInstance(nodesSnapshotsStatus); b.bind(RestoreService.class).toInstance(restoreService); b.bind(RemoteStoreRestoreService.class).toInstance(remoteStoreRestoreService); + if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) { + b.bind(HotToWarmTieringService.class).toInstance(hotToWarmTieringService); + } b.bind(RerouteService.class).toInstance(rerouteService); b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator); b.bind(FsHealthService.class).toInstance(fsHealthService); diff --git a/server/src/test/java/org/opensearch/indices/tiering/HotToWarmTieringServiceTests.java b/server/src/test/java/org/opensearch/indices/tiering/HotToWarmTieringServiceTests.java new file mode 100644 index 0000000000000..675d8d078e2ef --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/tiering/HotToWarmTieringServiceTests.java @@ -0,0 +1,167 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.tiering; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.IndexModule; +import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.junit.Before; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING; +import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; + +public class HotToWarmTieringServiceTests extends OpenSearchSingleNodeTestCase { + + private ClusterService clusterService; + private HotToWarmTieringService hotToWarmTieringService; + + @Before + public void beforeTest() { + clusterService = this.getInstanceFromNode(ClusterService.class); + hotToWarmTieringService = this.getInstanceFromNode(HotToWarmTieringService.class); + } + + public void testUpdateIndexMetadataForAcceptedIndices() { + String indexName = "test_index"; + createIndex(indexName); + Index index = resolveIndex(indexName); + final Metadata.Builder metadataBuilder = Metadata.builder(clusterService.state().metadata()); + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(clusterService.state().routingTable()); + IndexMetadata.Builder builder = hotToWarmTieringService.updateIndexMetadataForAcceptedIndex( + metadataBuilder, + routingTableBuilder, + index + ); + assertNotNull(builder); + IndexMetadata indexMetadata = builder.index(indexName).build(); + assertEquals( + IndexModule.DataLocalityType.PARTIAL, + IndexModule.DataLocalityType.getValueOf(indexMetadata.getSettings().get(INDEX_STORE_LOCALITY_SETTING.getKey())) + ); + assertEquals(IndexModule.TieringState.HOT_TO_WARM.name(), indexMetadata.getSettings().get(INDEX_TIERING_STATE.getKey())); + Map customData = indexMetadata.getCustomData(IndexMetadata.TIERING_CUSTOM_KEY); + assertNotNull(customData); + assertNotNull(customData.get(HotToWarmTieringService.HOT_TO_WARM_START_TIME)); + assertNotNull(customData.get(HotToWarmTieringService.HOT_TO_WARM_END_TIME)); + } + + public void testHasFailedIndices() { + String indexName = "test_index"; + createIndex(indexName); + Index index = resolveIndex(indexName); + final Map shardTieringStatus = new HashMap<>(); + shardTieringStatus.put( + new HotToWarmTieringService.ShardInfo(new ShardId(index, 0), false), + new HotToWarmTieringService.ShardTieringStatus("nodeId", ShardRoutingState.UNASSIGNED) + ); + final HotToWarmTieringService.IndexTieringInfo indexTieringInfo = new HotToWarmTieringService.IndexTieringInfo( + UUID.randomUUID().toString(), + index, + HotToWarmTieringService.IndexTieringStatus.FAILED, + shardTieringStatus + ); + boolean hasFailedIndices = hotToWarmTieringService.hasFailedIndices(Set.of(indexTieringInfo)); + assertTrue(hasFailedIndices); + } + + public void testNoFailedIndices() { + String indexName = "test_index"; + createIndex(indexName); + Index index = resolveIndex(indexName); + final Map shardTieringStatus = new HashMap<>(); + shardTieringStatus.put( + new HotToWarmTieringService.ShardInfo(new ShardId(index, 0), false), + new HotToWarmTieringService.ShardTieringStatus("nodeId", ShardRoutingState.STARTED) + ); + final HotToWarmTieringService.IndexTieringInfo indexTieringInfo = new HotToWarmTieringService.IndexTieringInfo( + UUID.randomUUID().toString(), + index, + HotToWarmTieringService.IndexTieringStatus.SUCCESSFUL, + shardTieringStatus + ); + boolean hasFailedIndices = hotToWarmTieringService.hasFailedIndices(Set.of(indexTieringInfo)); + assertFalse(hasFailedIndices); + } + + public void testUpdateIndexMetadataForSuccessfulIndex() { + String indexName = "test_index"; + createIndex(indexName); + Index index = resolveIndex(indexName); + final Metadata.Builder metadataBuilder = Metadata.builder(clusterService.state().metadata()); + final Map shardTieringStatus = new HashMap<>(); + shardTieringStatus.put( + new HotToWarmTieringService.ShardInfo(new ShardId(index, 0), false), + new HotToWarmTieringService.ShardTieringStatus("nodeId", ShardRoutingState.STARTED) + ); + final HotToWarmTieringService.IndexTieringInfo indexTieringInfo = new HotToWarmTieringService.IndexTieringInfo( + UUID.randomUUID().toString(), + index, + HotToWarmTieringService.IndexTieringStatus.SUCCESSFUL, + shardTieringStatus + ); + Map customData = new HashMap<>(); + customData.put(HotToWarmTieringService.HOT_TO_WARM_START_TIME, String.valueOf(System.currentTimeMillis())); + metadataBuilder.put(IndexMetadata.builder(metadataBuilder.getSafe(index)).putCustom(IndexMetadata.TIERING_CUSTOM_KEY, customData)); + IndexMetadata.Builder builder = hotToWarmTieringService.updateIndexMetadataForCompletedIndex(metadataBuilder, indexTieringInfo); + assertNotNull(builder); + IndexMetadata indexMetadata = builder.index(indexName).build(); + assertEquals(IndexModule.TieringState.WARM.name(), indexMetadata.getSettings().get(INDEX_TIERING_STATE.getKey())); + customData = indexMetadata.getCustomData(IndexMetadata.TIERING_CUSTOM_KEY); + assertNotNull(customData); + String endTime = customData.get(HotToWarmTieringService.HOT_TO_WARM_END_TIME); + assertNotNull(endTime); + assertNotEquals("-1", endTime); + } + + public void testUpdateIndexMetadataForFailedIndex() { + String indexName = "test_index"; + createIndex(indexName); + Index index = resolveIndex(indexName); + final Metadata.Builder metadataBuilder = Metadata.builder(clusterService.state().metadata()); + final Map shardTieringStatus = new HashMap<>(); + shardTieringStatus.put( + new HotToWarmTieringService.ShardInfo(new ShardId(index, 0), false), + new HotToWarmTieringService.ShardTieringStatus("nodeId", ShardRoutingState.UNASSIGNED) + ); + final HotToWarmTieringService.IndexTieringInfo indexTieringInfo = new HotToWarmTieringService.IndexTieringInfo( + UUID.randomUUID().toString(), + index, + HotToWarmTieringService.IndexTieringStatus.FAILED, + shardTieringStatus + ); + Map customData = new HashMap<>(); + customData.put(HotToWarmTieringService.HOT_TO_WARM_START_TIME, String.valueOf(System.currentTimeMillis())); + metadataBuilder.put(IndexMetadata.builder(metadataBuilder.getSafe(index)).putCustom(IndexMetadata.TIERING_CUSTOM_KEY, customData)); + + IndexMetadata.Builder builder = hotToWarmTieringService.updateIndexMetadataForCompletedIndex(metadataBuilder, indexTieringInfo); + assertNotNull(builder); + IndexMetadata indexMetadata = builder.index(indexName).build(); + assertEquals( + IndexModule.DataLocalityType.FULL, + IndexModule.DataLocalityType.getValueOf(indexMetadata.getSettings().get(INDEX_STORE_LOCALITY_SETTING.getKey())) + ); + assertEquals(IndexModule.TieringState.HOT.name(), indexMetadata.getSettings().get(INDEX_TIERING_STATE.getKey())); + customData = indexMetadata.getCustomData(IndexMetadata.TIERING_CUSTOM_KEY); + assertNotNull(customData); + String endTime = customData.get(HotToWarmTieringService.HOT_TO_WARM_END_TIME); + assertNotNull(endTime); + assertNotEquals("-1", endTime); + } +}