Skip to content

Commit

Permalink
HotToWarmTieringService changes and changes in shard balancer to tier…
Browse files Browse the repository at this point in the history
… shards

Signed-off-by: Neetika Singhal <neetiks@amazon.com>
  • Loading branch information
neetikasinghal committed Jul 22, 2024
1 parent b1107ec commit fde4f91
Show file tree
Hide file tree
Showing 12 changed files with 1,486 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore.tiering;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.admin.indices.tiering.HotToWarmTieringAction;
import org.opensearch.action.admin.indices.tiering.HotToWarmTieringResponse;
import org.opensearch.action.admin.indices.tiering.TieringIndexRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.MockInternalClusterInfoService;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.IndexModule;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;

@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false)
// Uncomment the below line to enable trace level logs for this test for better debugging
// @TestLogging(reason = "Getting trace logs from tiering package", value =
// "org.opensearch.tiering:TRACE,org.opensearch.cluster.routing.allocation.decider:TRACE")
public class HotToWarmTieringServiceIT extends RemoteStoreBaseIntegTestCase {

protected static final String TEST_IDX_1 = "test-idx-1";
protected static final String TEST_IDX_2 = "test-idx-2";
protected static final String TARGET_TIER = "warm";
protected static final int NUM_DOCS_IN_BULK = 10;
private static final long TOTAL_SPACE_BYTES = new ByteSizeValue(1000, ByteSizeUnit.KB).getBytes();

/*
Disabling MockFSIndexStore plugin as the MockFSDirectoryFactory wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory)
As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory
*/
@Override
protected boolean addMockIndexStorePlugin() {
return false;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(MockInternalClusterInfoService.TestPlugin.class);
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true);
return featureSettings.build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
}

public void testTieringBasic() {
final int numReplicasIndex = 0;
internalCluster().ensureAtLeastNumDataNodes(1);
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasIndex)
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL.name())
.build();

String[] indices = new String[] { TEST_IDX_1, TEST_IDX_2 };
for (String index : indices) {
assertAcked(client().admin().indices().prepareCreate(index).setSettings(settings).get());
ensureGreen(index);
// Ingesting some docs
indexBulk(index, NUM_DOCS_IN_BULK);
flushAndRefresh(index);
// ensuring cluster is green after performing force-merge
ensureGreen();
SearchResponse searchResponse = client().prepareSearch(index).setQuery(QueryBuilders.matchAllQuery()).get();
// Asserting that search returns same number of docs as ingested
assertHitCount(searchResponse, NUM_DOCS_IN_BULK);
}

// Spin up node having search role
internalCluster().ensureAtLeastNumSearchAndDataNodes(1);

final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
clusterInfoService.setDiskUsageFunctionAndRefresh(
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES)
);

TieringIndexRequest request = new TieringIndexRequest(TARGET_TIER, indices);
request.waitForCompletion(true);
HotToWarmTieringResponse response = client().admin().indices().execute(HotToWarmTieringAction.INSTANCE, request).actionGet();
assertAcked(response);
assertTrue(response.getFailedIndices().isEmpty());
assertTrue(response.isAcknowledged());
ensureGreen();
for (String index : indices) {
SearchResponse searchResponse = client().prepareSearch(index).setQuery(QueryBuilders.matchAllQuery()).get();
// Asserting that search returns same number of docs as ingested
assertHitCount(searchResponse, NUM_DOCS_IN_BULK);
GetIndexResponse getIndexResponse = client().admin().indices().prepareGetIndex().addIndices(index).get();
assertWarmSettings(getIndexResponse, index);
assertAcked(client().admin().indices().prepareDelete(index).get());
}
}

private MockInternalClusterInfoService getMockInternalClusterInfoService() {
return (MockInternalClusterInfoService) internalCluster().getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
}

private static FsInfo.Path setDiskUsage(FsInfo.Path original, long totalBytes, long freeBytes) {
return new FsInfo.Path(original.getPath(), original.getMount(), totalBytes, freeBytes, freeBytes);
}

private void assertWarmSettings(GetIndexResponse response, String indexName) {
final Map<String, Settings> settings = response.settings();
assertThat(settings, notNullValue());
assertThat(settings.size(), equalTo(1));
Settings indexSettings = settings.get(indexName);
assertThat(indexSettings, notNullValue());
assertThat(
indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()),
equalTo(IndexModule.DataLocalityType.PARTIAL.name())
);
assertThat(indexSettings.get(IndexModule.INDEX_TIERING_STATE.getKey()), equalTo(IndexModule.TieringState.WARM.name()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.tiering;

import org.opensearch.common.UUIDs;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.Index;

import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Context class to hold indices to be tiered per request. It also holds
* the listener per request to mark the request as complete once all
* tiering operations are completed.
*
* @opensearch.experimental
*/

@ExperimentalApi
public class TieringRequestContext {
private final String requestUuid;
private final TieringIndexRequest request;
private final ActionListener<HotToWarmTieringResponse> actionListener;
private final Set<Index> inProgressIndices;
private final Set<Index> successfulIndices;
private final Map<Index, String> failedIndices;

public TieringRequestContext(
TieringIndexRequest request,
ActionListener<HotToWarmTieringResponse> actionListener,
Set<Index> inProgressIndices,
Map<Index, String> failedIndices
) {
this.request = request;
this.actionListener = actionListener;
this.inProgressIndices = inProgressIndices;
this.failedIndices = failedIndices;
this.requestUuid = UUIDs.randomBase64UUID();
this.successfulIndices = new HashSet<>();
}

public ActionListener<HotToWarmTieringResponse> getListener() {
return actionListener;
}

public TieringIndexRequest getRequest() {
return request;
}

public String getRequestUuid() {
return requestUuid;
}

public Set<Index> getInProgressIndices() {
return inProgressIndices;
}

public Map<Index, String> getFailedIndices() {
return failedIndices;
}

public Set<Index> getSuccessfulIndices() {
return successfulIndices;
}

public void addToFailed(Index index, String reason) {
inProgressIndices.remove(index);
failedIndices.put(index, reason);
}

public void addToSuccessful(Index index) {
inProgressIndices.remove(index);
successfulIndices.add(index);
}

public boolean isRequestProcessingComplete() {
return inProgressIndices.isEmpty();
}

public HotToWarmTieringResponse constructHotToWarmTieringResponse() {
final List<HotToWarmTieringResponse.IndexResult> indicesResult = new LinkedList<>();
for (Map.Entry<Index, String> rejectedIndex : failedIndices.entrySet()) {
indicesResult.add(new HotToWarmTieringResponse.IndexResult(rejectedIndex.getKey().getName(), rejectedIndex.getValue()));
}
return new HotToWarmTieringResponse(successfulIndices.size() > 0, indicesResult);
}

@Override
public String toString() {
return "TieringRequestContext{"
+ "requestUuid='"
+ requestUuid
+ '\''
+ ", inProgressIndices="
+ inProgressIndices
+ ", successfulIndices="
+ successfulIndices
+ ", failedIndices="
+ failedIndices
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.Index;
import org.opensearch.indices.tiering.HotToWarmTieringService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -45,6 +46,8 @@ public class TransportHotToWarmTieringAction extends TransportClusterManagerNode
private final ClusterInfoService clusterInfoService;
private final DiskThresholdSettings diskThresholdSettings;

private final HotToWarmTieringService hotToWarmTieringService;

@Inject
public TransportHotToWarmTieringAction(
TransportService transportService,
Expand All @@ -53,7 +56,8 @@ public TransportHotToWarmTieringAction(
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterInfoService clusterInfoService,
Settings settings
Settings settings,
HotToWarmTieringService hotToWarmTieringService
) {
super(
HotToWarmTieringAction.NAME,
Expand All @@ -66,6 +70,7 @@ public TransportHotToWarmTieringAction(
);
this.clusterInfoService = clusterInfoService;
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterService.getClusterSettings());
this.hotToWarmTieringService = hotToWarmTieringService;
}

@Override
Expand Down Expand Up @@ -106,5 +111,12 @@ protected void clusterManagerOperation(
listener.onResponse(tieringValidationResult.constructResponse());
return;
}
TieringRequestContext tieringRequestContext = new TieringRequestContext(
request,
listener,
tieringValidationResult.getAcceptedIndices(),
tieringValidationResult.getRejectedIndices()
);
hotToWarmTieringService.tier(tieringRequestContext, listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
static final String KEY_SYSTEM = "system";
public static final String KEY_PRIMARY_TERMS = "primary_terms";
public static final String REMOTE_STORE_CUSTOM_KEY = "remote_store";
public static final String TIERING_CUSTOM_KEY = "tiering";
public static final String TRANSLOG_METADATA_KEY = "translog_metadata";

public static final String INDEX_STATE_FILE_PREFIX = "state-";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;

/**
* {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods
Expand Down Expand Up @@ -58,6 +60,11 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all
* @return {@link RoutingPool} for the given index.
*/
public static RoutingPool getIndexPool(IndexMetadata indexMetadata) {
return indexMetadata.isRemoteSnapshot() ? REMOTE_CAPABLE : LOCAL_ONLY;
return indexMetadata.isRemoteSnapshot()
|| (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)
&& IndexModule.DataLocalityType.PARTIAL.name()
.equals(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey())))
? REMOTE_CAPABLE
: LOCAL_ONLY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -284,6 +285,9 @@ public void allocate(RoutingAllocation allocation) {
preferPrimaryShardBalance,
preferPrimaryShardRebalance
);
if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) {
localShardsBalancer.tierShards();
}
localShardsBalancer.allocateUnassigned();
localShardsBalancer.moveShards();
localShardsBalancer.balance();
Expand Down
Loading

0 comments on commit fde4f91

Please sign in to comment.