Skip to content

Commit

Permalink
Merge branch 'main' into alloc-routing
Browse files Browse the repository at this point in the history
  • Loading branch information
mch2 committed Sep 2, 2024
2 parents 5e08f77 + 7247266 commit bb19dc2
Show file tree
Hide file tree
Showing 77 changed files with 4,909 additions and 1,505 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
- Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426))
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))
- [Reader Writer Separation] Add routing preference for search replicas ([#15563](https://github.com/opensearch-project/OpenSearch/pull/15563))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -40,6 +41,7 @@
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteUploadStats.REMOTE_UPLOAD;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
Expand Down Expand Up @@ -253,11 +255,13 @@ private void verifyIndexRoutingFilesDeletion(
DiscoveryStats discoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
assertNotNull(discoveryStats.getClusterStateStats());
for (PersistedStateStats persistedStateStats : discoveryStats.getClusterStateStats().getPersistenceStats()) {
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
assertTrue(extendedFields.containsKey(RemotePersistenceStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
long cleanupAttemptFailedCount = extendedFields.get(RemotePersistenceStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)
.get();
assertEquals(0, cleanupAttemptFailedCount);
if (Objects.equals(persistedStateStats.getStatsName(), REMOTE_UPLOAD)) {
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
assertTrue(extendedFields.containsKey(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
long cleanupAttemptFailedCount = extendedFields.get(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)
.get();
assertEquals(0, cleanupAttemptFailedCount);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
Expand All @@ -46,7 +47,7 @@
public class RemoteRoutingTableServiceIT extends RemoteStoreBaseIntegTestCase {
private static final String INDEX_NAME = "test-index";
private static final String INDEX_NAME_1 = "test-index-1";
BlobPath indexRoutingPath;
List<BlobPath> indexRoutingPaths;
AtomicInteger indexRoutingFiles = new AtomicInteger();
private final RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.HASHED_PREFIX;

Expand Down Expand Up @@ -91,7 +92,7 @@ public void testRemoteRoutingTableIndexLifecycle() throws Exception {
updateIndexSettings(INDEX_NAME, IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2);
ensureGreen(INDEX_NAME);
assertBusy(() -> {
int indexRoutingFilesAfterUpdate = repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size();
int indexRoutingFilesAfterUpdate = repository.blobStore().blobContainer(indexRoutingPaths.get(0)).listBlobs().size();
// At-least 3 new index routing files will be created as shards will transition from INIT -> UNASSIGNED -> STARTED state
assertTrue(indexRoutingFilesAfterUpdate >= indexRoutingFiles.get() + 3);
});
Expand All @@ -112,6 +113,47 @@ public void testRemoteRoutingTableIndexLifecycle() throws Exception {
assertTrue(areRoutingTablesSame(routingTableVersions));
}

public void testRemoteRoutingTableWithMultipleIndex() throws Exception {
BlobStoreRepository repository = prepareClusterAndVerifyRepository();

RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
);
RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);
List<String> expectedIndexNames = new ArrayList<>();
List<String> deletedIndexNames = new ArrayList<>();
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true);

List<RoutingTable> routingTables = getRoutingTableFromAllNodes();
// Verify indices in routing table
Set<String> expectedIndicesInRoutingTable = Set.of(INDEX_NAME);
assertEquals(routingTables.get(0).getIndicesRouting().keySet(), expectedIndicesInRoutingTable);
// Verify routing table across all nodes is equal
assertTrue(areRoutingTablesSame(routingTables));

// Create new index
createIndex(INDEX_NAME_1, remoteStoreIndexSettings(1, 5));
ensureGreen(INDEX_NAME_1);

latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);

updateIndexRoutingPaths(repository);
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 2, deletedIndexNames, true);
routingTables = getRoutingTableFromAllNodes();
// Verify indices in routing table
expectedIndicesInRoutingTable = Set.of(INDEX_NAME, INDEX_NAME_1);
assertEquals(routingTables.get(0).getIndicesRouting().keySet(), expectedIndicesInRoutingTable);
// Verify routing table across all nodes is equal
assertTrue(areRoutingTablesSame(routingTables));
}

public void testRemoteRoutingTableEmptyRoutingTableDiff() throws Exception {
prepareClusterAndVerifyRepository();

Expand Down Expand Up @@ -166,7 +208,7 @@ public void testRemoteRoutingTableIndexNodeRestart() throws Exception {
assertRemoteStoreRepositoryOnAllNodes(REMOTE_ROUTING_TABLE_REPO);

assertBusy(() -> {
int indexRoutingFilesAfterNodeDrop = repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size();
int indexRoutingFilesAfterNodeDrop = repository.blobStore().blobContainer(indexRoutingPaths.get(0)).listBlobs().size();
assertTrue(indexRoutingFilesAfterNodeDrop > indexRoutingFiles.get());
});

Expand Down Expand Up @@ -201,7 +243,7 @@ public void testRemoteRoutingTableIndexMasterRestart() throws Exception {
assertRemoteStoreRepositoryOnAllNodes(REMOTE_ROUTING_TABLE_REPO);

assertBusy(() -> {
int indexRoutingFilesAfterNodeDrop = repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size();
int indexRoutingFilesAfterNodeDrop = repository.blobStore().blobContainer(indexRoutingPaths.get(0)).listBlobs().size();
assertTrue(indexRoutingFilesAfterNodeDrop > indexRoutingFiles.get());
});

Expand Down Expand Up @@ -240,10 +282,14 @@ private BlobStoreRepository prepareClusterAndVerifyRepository() throws Exception

BlobPath baseMetadataPath = getBaseMetadataPath(repository);
List<IndexRoutingTable> indexRoutingTables = new ArrayList<>(getClusterState().routingTable().indicesRouting().values());
indexRoutingPath = getIndexRoutingPath(baseMetadataPath.add(INDEX_ROUTING_TABLE), indexRoutingTables.get(0).getIndex().getUUID());
indexRoutingPaths = new ArrayList<>();
for (IndexRoutingTable indexRoutingTable : indexRoutingTables) {
indexRoutingPaths.add(getIndexRoutingPath(baseMetadataPath.add(INDEX_ROUTING_TABLE), indexRoutingTable.getIndex().getUUID()));
}

assertBusy(() -> {
indexRoutingFiles.set(repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size());
int totalRoutingFiles = calculateTotalRoutingFiles(repository);
indexRoutingFiles.set(totalRoutingFiles);
// There would be >=3 files as shards will transition from UNASSIGNED -> INIT -> STARTED state
assertTrue(indexRoutingFiles.get() >= 3);
});
Expand Down Expand Up @@ -280,11 +326,19 @@ private void verifyUpdatesInManifestFile(
assertTrue(latestManifest.isPresent());
ClusterMetadataManifest manifest = latestManifest.get();

assertEquals(expectedIndexNames, manifest.getDiffManifest().getIndicesRoutingUpdated());
assertEquals(expectedDeletedIndex, manifest.getDiffManifest().getIndicesDeleted());
assertEquals(expectedIndicesRoutingFilesInManifest, manifest.getIndicesRouting().size());

// Check if all paths in manifest.getIndicesRouting() are present in indexRoutingPaths
for (ClusterMetadataManifest.UploadedIndexMetadata uploadedFilename : manifest.getIndicesRouting()) {
assertTrue(uploadedFilename.getUploadedFilename().contains(indexRoutingPath.buildAsString()));
boolean pathFound = false;
for (BlobPath indexRoutingPath : indexRoutingPaths) {
if (uploadedFilename.getUploadedFilename().contains(indexRoutingPath.buildAsString())) {
pathFound = true;
break;
}
}
assertTrue("Uploaded file not found in indexRoutingPaths: " + uploadedFilename.getUploadedFilename(), pathFound);
}
assertEquals(isRoutingTableDiffFileExpected, manifest.getDiffManifest().getIndicesRoutingDiffPath() != null);
}
Expand All @@ -305,6 +359,24 @@ private List<RoutingTable> getRoutingTableFromAllNodes() throws ExecutionExcepti
return routingTables;
}

private void updateIndexRoutingPaths(BlobStoreRepository repository) {
BlobPath baseMetadataPath = getBaseMetadataPath(repository);
List<IndexRoutingTable> indexRoutingTables = new ArrayList<>(getClusterState().routingTable().indicesRouting().values());

indexRoutingPaths.clear(); // Clear the list to avoid stale data
for (IndexRoutingTable indexRoutingTable : indexRoutingTables) {
indexRoutingPaths.add(getIndexRoutingPath(baseMetadataPath.add(INDEX_ROUTING_TABLE), indexRoutingTable.getIndex().getUUID()));
}
}

private int calculateTotalRoutingFiles(BlobStoreRepository repository) throws IOException {
int totalRoutingFiles = 0;
for (BlobPath path : indexRoutingPaths) {
totalRoutingFiles += repository.blobStore().blobContainer(path).listBlobs().size();
}
return totalRoutingFiles;
}

private boolean areRoutingTablesSame(List<RoutingTable> routingTables) {
if (routingTables == null || routingTables.isEmpty()) {
return false;
Expand Down Expand Up @@ -356,7 +428,6 @@ private void deleteIndexAndVerify(RemoteManifestManager remoteManifestManager) {
);
assertTrue(latestManifest.isPresent());
ClusterMetadataManifest manifest = latestManifest.get();
assertTrue(manifest.getDiffManifest().getIndicesRoutingUpdated().isEmpty());
assertTrue(manifest.getDiffManifest().getIndicesDeleted().contains(INDEX_NAME));
assertTrue(manifest.getIndicesRouting().isEmpty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.Client;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
Expand Down Expand Up @@ -155,6 +158,38 @@ public void testRemotePublicationDisableIfRemoteStateDisabled() {
assertNull(internalCluster().getCurrentClusterManagerNodeInstance(RemoteClusterStateService.class));
}

public void testRemotePublicationDownloadStats() {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
String dataNode = internalCluster().getDataNodeNames().stream().collect(Collectors.toList()).get(0);

NodesStatsResponse nodesStatsResponseDataNode = client().admin()
.cluster()
.prepareNodesStats(dataNode)
.addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName())
.get();

assertDataNodeDownloadStats(nodesStatsResponseDataNode);

}

private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) {
// assert cluster state stats for data node
DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
assertNotNull(dataNodeDiscoveryStats.getClusterStateStats());
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getUpdateSuccess());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getSuccessCount() > 0);
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getFailedCount());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getTotalTimeInMillis() > 0);

assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getSuccessCount() > 0);
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getFailedCount());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getTotalTimeInMillis() > 0);
}

private Map<String, Integer> getMetadataFiles(BlobStoreRepository repository, String subDirectory) throws IOException {
BlobPath metadataPath = repository.basePath()
.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -839,18 +839,13 @@ public void testCreateSnapshotV2() throws Exception {

String snapshotName2 = "test-create-snapshot2";

// verify even if waitForCompletion is not true, the request executes in a sync manner
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
// verify response status if waitForCompletion is not true
RestStatus createSnapshotResponseStatus = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName2)
.get();
snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName2));
assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L));

.get()
.status();
assertEquals(RestStatus.ACCEPTED, createSnapshotResponseStatus);
}

public void testMixedSnapshotCreationWithV2RepositorySetting() throws Exception {
Expand Down Expand Up @@ -914,6 +909,7 @@ public void testMixedSnapshotCreationWithV2RepositorySetting() throws Exception
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName2)
.setWaitForCompletion(true)
.get();
snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
Expand Down Expand Up @@ -968,6 +964,7 @@ public void testConcurrentSnapshotV2CreateOperation() throws InterruptedExceptio
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName)
.setWaitForCompletion(true)
.get();
SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
Expand Down Expand Up @@ -1036,6 +1033,7 @@ public void testCreateSnapshotV2WithRedIndex() throws Exception {
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.get();
SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
Expand Down Expand Up @@ -1097,6 +1095,7 @@ public void testCreateSnapshotV2WithIndexingLoad() throws Exception {
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.get();

SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
Expand Down Expand Up @@ -1203,6 +1202,7 @@ public void testClusterManagerFailoverDuringSnapshotCreation() throws Exception
CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.get();
snapshotInfo[0] = createSnapshotResponse.getSnapshotInfo();

Expand Down
Loading

0 comments on commit bb19dc2

Please sign in to comment.