Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote state download #143

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,14 @@ public String toString() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.value(toString());
}

public static TransportAddress fromString(String address) throws UnknownHostException {
String[] addressSplit = address.split(":");
if (addressSplit.length != 2) {
throw new IllegalArgumentException("address must be of the form [hostname/ip]:[port]");
}
String hostname = addressSplit[0];
int port = Integer.parseInt(addressSplit[1]);
return new TransportAddress(InetAddress.getByName(hostname), port);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.upgrades;

import org.opensearch.client.Request;
import org.opensearch.client.Response;

import java.util.Map;

public class ClusterStateIT extends AbstractRollingTestCase{
public void testTemplateMetadataUpgrades() throws Exception {
if (CLUSTER_TYPE == ClusterType.OLD) {
String templateName = "my_template";
Request putIndexTemplate = new Request("PUT", "_template/" + templateName);
putIndexTemplate.setJsonEntity("{\"index_patterns\": [\"pattern-1\", \"log-*\"]}");
client().performRequest(putIndexTemplate);
verifyTemplateMetadataInClusterState();
} else {
verifyTemplateMetadataInClusterState();
}
}

@SuppressWarnings("unchecked")
private static void verifyTemplateMetadataInClusterState() throws Exception {
Request request = new Request("GET", "_cluster/state/metadata");
Response response = client().performRequest(request);
assertOK(response);
Map<String, Object> metadata = (Map<String, Object>) entityAsMap(response).get("metadata");
assertNotNull(metadata.get("templates"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,24 @@
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.RETAINED_MANIFESTS;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.COORDINATION_METADATA;
import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.CUSTOM_METADATA;
import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.SETTING_METADATA;
import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.TEMPLATES_METADATA;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase {
Expand Down Expand Up @@ -55,27 +66,59 @@ private Map<String, Long> initialTestSetup(int shardCount, int replicaCount, int
return indexStats;
}

public void testFullClusterRestoreStaleDelete() throws Exception {
public void testRemoteCleanupTaskUpdated() {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;

initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
setReplicaCount(0);
setReplicaCount(2);
setReplicaCount(0);
setReplicaCount(1);
setReplicaCount(0);
setReplicaCount(1);
setReplicaCount(0);
setReplicaCount(2);
setReplicaCount(0);
RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
);

assertEquals(5, remoteClusterStateService.getStaleFileDeletionTask().getInterval().getMinutes());
assertTrue(remoteClusterStateService.getStaleFileDeletionTask().isScheduled());

// now disable
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), -1))
.get();

assertEquals(-1, remoteClusterStateService.getStaleFileDeletionTask().getInterval().getMillis());
assertFalse(remoteClusterStateService.getStaleFileDeletionTask().isScheduled());

// now set Clean up interval to 1 min
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m"))
.get();
assertEquals(1, remoteClusterStateService.getStaleFileDeletionTask().getInterval().getMinutes());
}

public void testRemoteCleanupOnlyAfter10Updates() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;

initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
);

// set cleanup interval to 1 min
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m"))
.get();

replicaCount = updateReplicaCountNTimes(9, replicaCount);

RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);

BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
Expand All @@ -87,14 +130,39 @@ public void testFullClusterRestoreStaleDelete() throws Exception {
)
.add("cluster-state")
.add(getClusterState().metadata().clusterUUID());
BlobPath manifestContainerPath = baseMetadataPath.add("manifest");

assertEquals(10, repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size());
assertBusy(() -> {
assertEquals(
RETAINED_MANIFESTS - 1,
repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size()
);
}, 1, TimeUnit.MINUTES);

replicaCount = updateReplicaCountNTimes(8, replicaCount);

// wait for 1 min, to ensure that clean up task ran and didn't clean up stale files because it was less than 10
Thread.sleep(60000);
assertNotEquals(
RETAINED_MANIFESTS - 1,
repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size()
);

// Do 2 more updates, now since the total successful state changes are more than 10, stale files will be cleaned up
replicaCount = updateReplicaCountNTimes(2, replicaCount);

assertBusy(() -> {
assertEquals(
RETAINED_MANIFESTS - 1,
repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size()
);
}, 1, TimeUnit.MINUTES);

Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestClusterState(
cluster().getClusterName(),
getClusterState().metadata().clusterUUID()
).getMetadata().getIndices();
assertEquals(0, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas());
assertEquals(replicaCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas());
assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards());
}

Expand Down Expand Up @@ -181,6 +249,45 @@ public void testRemoteStateStatsFromAllNodes() {
}
}

public void testRemoteClusterStateMetadataSplit() throws IOException {
initialTestSetup(1, 0, 1, 1);

RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
);
RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
BlobPath globalMetadataPath = repository.basePath()
.add(
Base64.getUrlEncoder()
.withoutPadding()
.encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
)
.add("cluster-state")
.add(getClusterState().metadata().clusterUUID())
.add("global-metadata");

Map<String, Integer> metadataFiles = repository.blobStore()
.blobContainer(globalMetadataPath)
.listBlobs()
.keySet()
.stream()
.map(fileName -> {
logger.info(fileName);
return fileName.split(DELIMITER)[0];
})
.collect(Collectors.toMap(Function.identity(), key -> 1, Integer::sum));

assertTrue(metadataFiles.containsKey(COORDINATION_METADATA));
assertEquals(1, (int) metadataFiles.get(COORDINATION_METADATA));
assertTrue(metadataFiles.containsKey(SETTING_METADATA));
assertEquals(1, (int) metadataFiles.get(SETTING_METADATA));
assertTrue(metadataFiles.containsKey(TEMPLATES_METADATA));
assertEquals(1, (int) metadataFiles.get(TEMPLATES_METADATA));
assertTrue(metadataFiles.keySet().stream().anyMatch(key -> key.startsWith(CUSTOM_METADATA)));
assertFalse(metadataFiles.containsKey(METADATA_FILE_PREFIX));
}

private void validateNodesStatsResponse(NodesStatsResponse nodesStatsResponse) {
// _nodes/stats/discovery must never fail due to any exception
assertFalse(nodesStatsResponse.toString().contains("exception"));
Expand All @@ -196,4 +303,17 @@ private void setReplicaCount(int replicaCount) {
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount))
.get();
}

private int updateReplicaCountNTimes(int n, int initialCount) {
int newReplicaCount = randomIntBetween(0, 3);
;
for (int i = 0; i < n; i++) {
while (newReplicaCount == initialCount) {
newReplicaCount = randomIntBetween(0, 3);
}
setReplicaCount(newReplicaCount);
initialCount = newReplicaCount;
}
return newReplicaCount;
}
}
Loading
Loading