Skip to content

Commit

Permalink
Move cluster state publication to remote
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
  • Loading branch information
Arpit-Bandejiya committed May 24, 2024
1 parent d499a74 commit baf7960
Show file tree
Hide file tree
Showing 70 changed files with 7,244 additions and 1,347 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,14 @@ public String toString() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.value(toString());
}

public static TransportAddress fromString(String address) throws UnknownHostException {
String[] addressSplit = address.split(":");
if (addressSplit.length != 2) {
throw new IllegalArgumentException("address must be of the form [hostname/ip]:[port]");
}
String hostname = addressSplit[0];
int port = Integer.parseInt(addressSplit[1]);
return new TransportAddress(InetAddress.getByName(hostname), port);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.opensearch.core.xcontent.XContentParser.Token;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.function.Consumer;

Expand Down Expand Up @@ -178,4 +180,14 @@ public static <T> void parseTypedKeysObject(XContentParser parser, String delimi
throw new ParsingException(parser.getTokenLocation(), "Failed to parse object: empty key");
}
}

public static List<String> parseStringList(XContentParser parser) throws IOException {
List<String> valueList = new ArrayList<>();
ensureExpectedToken(Token.START_ARRAY, parser.currentToken(), parser);
while (parser.nextToken() != Token.END_ARRAY) {
ensureExpectedToken(Token.VALUE_STRING, parser.currentToken(), parser);
valueList.add(parser.text());
}
return valueList;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteClusterStateCleanupManagerIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "test-index";

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
}

private Map<String, Long> initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) {
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
Map<String, Long> indexStats = indexData(1, false, INDEX_NAME);
assertEquals(shardCount * (replicaCount + 1), getNumShards(INDEX_NAME).totalNumShards);
ensureGreen(INDEX_NAME);
return indexStats;
}

public void testRemoteCleanupTaskUpdated() {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;

initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
RemoteClusterStateCleanupManager remoteClusterStateCleanupManager = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateCleanupManager.class
);

assertEquals(CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval());
assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled());

// now disable
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), -1))
.get();

assertEquals(-1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMillis());
assertFalse(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled());

// now set Clean up interval to 1 min
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m"))
.get();
assertEquals(1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMinutes());
}

public void testRemoteCleanupDeleteStale() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;

initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);

// set cleanup interval to 100 ms to make the test faster
ClusterUpdateSettingsResponse response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "100ms"))
.get();

assertTrue(response.isAcknowledged());

// update cluster state 21 times to ensure that clean up has run after this will upload 42 manifest files
// to repository, if manifest files are less than that it means clean up has run
updateClusterStateNTimes(RETAINED_MANIFESTS + SKIP_CLEANUP_STATE_CHANGES + 1);

RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
BlobPath baseMetadataPath = repository.basePath()
.add(
Base64.getUrlEncoder()
.withoutPadding()
.encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
)
.add("cluster-state")
.add(getClusterState().metadata().clusterUUID());
BlobPath manifestContainerPath = baseMetadataPath.add("manifest");

assertBusy(() -> {
int manifestFiles = repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size();
logger.info("number of current manifest file: {}", manifestFiles);
// we can't guarantee that we have same number of manifest as Retained manifest in our repo as there can be other queued task
// other than replica count change which can upload new manifest files, that's why we check that number of manifests is between
// Retained manifests and Retained manifests + 2 * Skip cleanup state changes (each cluster state update uploads 2 manifests)
assertTrue(
"Current number of manifest files: " + manifestFiles,
manifestFiles >= RETAINED_MANIFESTS && manifestFiles < RETAINED_MANIFESTS + 2 * SKIP_CLEANUP_STATE_CHANGES
);
}, 500, TimeUnit.MILLISECONDS);
}

private void updateClusterStateNTimes(int n) {
int newReplicaCount = randomIntBetween(0, 3);
for (int i = n; i > 0; i--) {
ClusterUpdateSettingsResponse response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.getKey(), i, TimeUnit.SECONDS))
.get();
assertTrue(response.isAcknowledged());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -311,4 +311,14 @@ protected void restore(boolean restoreAllShards, String... indices) {
PlainActionFuture.newFuture()
);
}

protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
internalCluster().startDataOnlyNodes(numDataOnlyNodes);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
}
}
30 changes: 2 additions & 28 deletions server/src/main/java/org/opensearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -496,38 +496,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}

if (metrics.contains(Metric.BLOCKS)) {
builder.startObject("blocks");

if (blocks().global().isEmpty() == false) {
builder.startObject("global");
for (ClusterBlock block : blocks().global()) {
block.toXContent(builder, params);
}
builder.endObject();
}

if (blocks().indices().isEmpty() == false) {
builder.startObject("indices");
for (final Map.Entry<String, Set<ClusterBlock>> entry : blocks().indices().entrySet()) {
builder.startObject(entry.getKey());
for (ClusterBlock block : entry.getValue()) {
block.toXContent(builder, params);
}
builder.endObject();
}
builder.endObject();
}

builder.endObject();
blocks().toXContent(builder, params);
}

// nodes
if (metrics.contains(Metric.NODES)) {
builder.startObject("nodes");
for (DiscoveryNode node : nodes) {
node.toXContent(builder, params);
}
builder.endObject();
nodes.toXContent(builder, params);
}

// meta data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@

import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.repositories.RepositoryOperation;

import java.io.IOException;
Expand Down Expand Up @@ -101,13 +104,46 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject();
{
builder.field("repository", entry.repository);
if (params.param(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_API).equals(Metadata.CONTEXT_MODE_GATEWAY)) {
builder.field("repository_state_id", entry.repositoryStateId);
} // else we don't serialize it
}
builder.endObject();
}
builder.endArray();
return builder;
}

public static RepositoryCleanupInProgress fromXContent(XContentParser parser) throws IOException {
if (parser.currentToken() == null) {
parser.nextToken();
}
XContentParserUtils.ensureFieldName(parser, parser.currentToken(), TYPE);
parser.nextToken();
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
List<Entry> entries = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
String repository = null;
long repositoryStateId = -1L;
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser);
String currentFieldName = parser.currentName();
parser.nextToken();
if ("repository".equals(currentFieldName)) {
repository = parser.text();
} else if ("repository_state_id".equals(currentFieldName)) {
// only XContent parsed with {@link Metadata.CONTEXT_MODE_GATEWAY} will have the repository state id and can be deserialized
repositoryStateId = parser.longValue();
} else {
throw new IllegalArgumentException("unknown field [" + currentFieldName + "]");
}
}
entries.add(new Entry(repository, repositoryStateId));
}
return new RepositoryCleanupInProgress(entries);
}

@Override
public String toString() {
return Strings.toString(MediaTypeRegistry.JSON, this);
Expand Down
Loading

0 comments on commit baf7960

Please sign in to comment.