Skip to content

Commit

Permalink
Merge branch 'main' into fix_overloading
Browse files Browse the repository at this point in the history
  • Loading branch information
harshavamsi committed Sep 5, 2024
2 parents 8149760 + 2f1e209 commit c6246e6
Show file tree
Hide file tree
Showing 38 changed files with 1,130 additions and 135 deletions.
1 change: 1 addition & 0 deletions .ci/bwcVersions
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ BWC_VERSION:
- "2.16.0"
- "2.16.1"
- "2.17.0"
- "2.18.0"
9 changes: 3 additions & 6 deletions .github/workflows/assemble.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,8 @@ jobs:
- name: Setup docker (missing on MacOS)
id: setup_docker
if: runner.os == 'macos'
uses: douglascamata/setup-docker-macos-action@main
continue-on-error: true
with:
upgrade-qemu: true
colima: v0.6.8
run: |
exit 0;
- name: Run Gradle (assemble)
if: runner.os == 'macos' && steps.setup_docker.outcome != 'success'
run: |
Expand All @@ -48,4 +45,4 @@ jobs:
- name: Run Gradle (assemble)
if: runner.os == 'macos' && steps.setup_docker.outcome == 'success'
run: |
./gradlew assemble --parallel --no-build-cache -PDISABLE_BUILD_CACHE -Druntime.java=${{ matrix.java }}
exit 0;
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428)))
- Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494))
- Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290))
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
- [Reader Writer Separation] Add experimental search replica shard type to achieve reader writer separation ([#15237](https://github.com/opensearch-project/OpenSearch/pull/15237))
- [Range Queries] Add new approximateable query framework to short-circuit range queries ([#13788](https://github.com/opensearch-project/OpenSearch/pull/13788))
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
- Add support for pluggable deciders for concurrent search ([#15363](https://github.com/opensearch-project/OpenSearch/pull/15363))
Expand Down
1 change: 1 addition & 0 deletions libs/core/src/main/java/org/opensearch/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public class Version implements Comparable<Version>, ToXContentFragment {
public static final Version V_2_16_0 = new Version(2160099, org.apache.lucene.util.Version.LUCENE_9_11_1);
public static final Version V_2_16_1 = new Version(2160199, org.apache.lucene.util.Version.LUCENE_9_11_1);
public static final Version V_2_17_0 = new Version(2170099, org.apache.lucene.util.Version.LUCENE_9_11_1);
public static final Version V_2_18_0 = new Version(2180099, org.apache.lucene.util.Version.LUCENE_9_11_1);
public static final Version V_3_0_0 = new Version(3000099, org.apache.lucene.util.Version.LUCENE_9_12_0);
public static final Version CURRENT = V_3_0_0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -48,19 +52,27 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;

@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class RemoteStatePublicationIT extends RemoteStoreBaseIntegTestCase {

private static String INDEX_NAME = "test-index";
private static final String INDEX_NAME = "test-index";
private static final String REMOTE_STATE_PREFIX = "!";
private static final String REMOTE_ROUTING_PREFIX = "_";
private boolean isRemoteStateEnabled = true;
private String isRemotePublicationEnabled = "true";
private boolean hasRemoteStateCharPrefix;
private boolean hasRemoteRoutingCharPrefix;

@Before
public void setup() {
asyncUploadMockFsRepo = false;
isRemoteStateEnabled = true;
isRemotePublicationEnabled = "true";
hasRemoteStateCharPrefix = randomBoolean();
hasRemoteRoutingCharPrefix = randomBoolean();
}

@Override
Expand All @@ -84,6 +96,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
routingTableRepoName
);

return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), isRemoteStateEnabled)
Expand All @@ -94,6 +107,19 @@ protected Settings nodeSettings(int nodeOrdinal) {
RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(),
RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE
)
.put(
RemoteClusterStateService.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.getKey(),
hasRemoteStateCharPrefix ? REMOTE_STATE_PREFIX : ""
)
.put(
RemoteRoutingTableBlobStore.CLUSTER_REMOTE_STORE_ROUTING_TABLE_PATH_PREFIX.getKey(),
hasRemoteRoutingCharPrefix ? REMOTE_ROUTING_PREFIX : ""
)
.put(RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX.toString())
.put(
RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING.getKey(),
PathHashAlgorithm.FNV_1A_COMPOSITE_1.toString()
)
.build();
}

Expand Down Expand Up @@ -137,6 +163,27 @@ public void testPublication() throws Exception {
Map<String, Integer> manifestFiles = getMetadataFiles(repository, RemoteClusterMetadataManifest.MANIFEST);
assertTrue(manifestFiles.containsKey(RemoteClusterMetadataManifest.MANIFEST));

RemoteClusterStateService remoteClusterStateService = internalCluster().getInstance(
RemoteClusterStateService.class,
internalCluster().getClusterManagerName()
);
ClusterMetadataManifest manifest = remoteClusterStateService.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().metadata().clusterUUID()
).get();
assertThat(manifest.getIndices().size(), is(1));
if (hasRemoteStateCharPrefix) {
for (UploadedIndexMetadata md : manifest.getIndices()) {
assertThat(md.getUploadedFilename(), startsWith(REMOTE_STATE_PREFIX));
}
}
assertThat(manifest.getIndicesRouting().size(), is(1));
if (hasRemoteRoutingCharPrefix) {
for (UploadedIndexMetadata md : manifest.getIndicesRouting()) {
assertThat(md.getUploadedFilename(), startsWith(REMOTE_ROUTING_PREFIX));
}
}

// get settings from each node and verify that it is updated
Settings settings = clusterService().getSettings();
logger.info("settings : {}", settings);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.nio.file.Path;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT {

private static final String REPOSITORY_NAME = "test-remote-store-repo";
protected Path absolutePath;

private Boolean useRemoteStore;

@Before
public void randomizeRemoteStoreEnabled() {
useRemoteStore = randomBoolean();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (useRemoteStore) {
if (absolutePath == null) {
absolutePath = randomRepoPath().toAbsolutePath();
}
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
.build();
}
return super.nodeSettings(nodeOrdinal);
}

@After
public void teardown() {
if (useRemoteStore) {
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
}
}

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build();
}

public void testReplication() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final int docCount = 10;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
waitForSearchableDocs(docCount, primary, replica);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,17 @@ public void testFullClusterRestoreManifestFilePointsToInvalidIndexMetadataPathTh
internalCluster().stopAllNodes();
// Step - 3 Delete index metadata file in remote
try {
Files.move(
segmentRepoPath.resolve(encodeString(clusterName) + "/cluster-state/" + prevClusterUUID + "/index"),
segmentRepoPath.resolve("cluster-state/")
RemoteClusterStateService remoteClusterStateService = internalCluster().getInstance(
RemoteClusterStateService.class,
internalCluster().getClusterManagerName()
);
ClusterMetadataManifest manifest = remoteClusterStateService.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().metadata().clusterUUID()
).get();
for (UploadedIndexMetadata md : manifest.getIndices()) {
Files.move(segmentRepoPath.resolve(md.getUploadedFilename()), segmentRepoPath.resolve("cluster-state/"));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ public void testDeleteShallowCopyV2MultipleSnapshots() throws Exception {

}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/15692")
public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -602,28 +603,28 @@ public void testSnapshotStatusApiFailureForTooManyShardsAcrossSnapshots() throws

// across a single snapshot
assertBusy(() -> {
TooManyShardsInSnapshotsStatusException exception = expectThrows(
TooManyShardsInSnapshotsStatusException.class,
CircuitBreakingException exception = expectThrows(
CircuitBreakingException.class,
() -> client().admin().cluster().prepareSnapshotStatus(repositoryName).setSnapshots(snapshot1).execute().actionGet()
);
assertEquals(exception.status(), RestStatus.REQUEST_ENTITY_TOO_LARGE);
assertEquals(exception.status(), RestStatus.TOO_MANY_REQUESTS);
assertTrue(
exception.getMessage().endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request")
);
}, 1, TimeUnit.MINUTES);

// across multiple snapshots
assertBusy(() -> {
TooManyShardsInSnapshotsStatusException exception = expectThrows(
TooManyShardsInSnapshotsStatusException.class,
CircuitBreakingException exception = expectThrows(
CircuitBreakingException.class,
() -> client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(snapshot1, snapshot2)
.execute()
.actionGet()
);
assertEquals(exception.status(), RestStatus.REQUEST_ENTITY_TOO_LARGE);
assertEquals(exception.status(), RestStatus.TOO_MANY_REQUESTS);
assertTrue(
exception.getMessage().endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request")
);
Expand Down Expand Up @@ -741,8 +742,8 @@ public void testSnapshotStatusFailuresWithIndexFilter() throws Exception {
updateSettingsRequest.persistentSettings(Settings.builder().put(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 2));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

TooManyShardsInSnapshotsStatusException ex = expectThrows(
TooManyShardsInSnapshotsStatusException.class,
CircuitBreakingException ex = expectThrows(
CircuitBreakingException.class,
() -> client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
Expand All @@ -751,7 +752,7 @@ public void testSnapshotStatusFailuresWithIndexFilter() throws Exception {
.execute()
.actionGet()
);
assertEquals(ex.status(), RestStatus.REQUEST_ENTITY_TOO_LARGE);
assertEquals(ex.status(), RestStatus.TOO_MANY_REQUESTS);
assertTrue(ex.getMessage().endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request"));

logger.info("Reset MAX_SHARDS_ALLOWED_IN_STATUS_API to default value");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.opensearch.common.util.set.Sets;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.util.CollectionUtils;
import org.opensearch.core.index.shard.ShardId;
Expand All @@ -66,7 +68,6 @@
import org.opensearch.snapshots.SnapshotShardsService;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.snapshots.TooManyShardsInSnapshotsStatusException;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -458,13 +459,17 @@ private Map<SnapshotId, SnapshotInfo> snapshotsInfo(
snapshotsInfoMap.put(snapshotId, snapshotInfo);
}
if (totalShardsAcrossSnapshots > maximumAllowedShardCount && request.indices().length == 0) {
String message = "Total shard count ["
String message = "["
+ repositoryName
+ ":"
+ String.join(", ", request.snapshots())
+ "]"
+ " Total shard count ["
+ totalShardsAcrossSnapshots
+ "] is more than the maximum allowed value of shard count ["
+ maximumAllowedShardCount
+ "] for snapshot status request";

throw new TooManyShardsInSnapshotsStatusException(repositoryName, message, request.snapshots());
throw new CircuitBreakingException(message, CircuitBreaker.Durability.PERMANENT);
}
return unmodifiableMap(snapshotsInfoMap);
}
Expand Down Expand Up @@ -520,15 +525,19 @@ private Map<ShardId, IndexShardSnapshotStatus> snapshotShards(
}

if (totalShardsAcrossIndices > maximumAllowedShardCount && requestedIndexNames.isEmpty() == false && isV2Snapshot == false) {
String message = "Total shard count ["
String message = "["
+ repositoryName
+ ":"
+ String.join(", ", request.snapshots())
+ "]"
+ " Total shard count ["
+ totalShardsAcrossIndices
+ "] across the requested indices ["
+ requestedIndexNames.stream().collect(Collectors.joining(", "))
+ "] is more than the maximum allowed value of shard count ["
+ maximumAllowedShardCount
+ "] for snapshot status request";

throw new TooManyShardsInSnapshotsStatusException(repositoryName, message, snapshotName);
throw new CircuitBreakingException(message, CircuitBreaker.Durability.PERMANENT);
}

final Map<ShardId, IndexShardSnapshotStatus> shardStatus = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.cluster.metadata.ViewMetadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.routing.DelayedAllocationService;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
Expand Down Expand Up @@ -479,4 +480,7 @@ public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator, Shard
allocationService.setExistingShardsAllocators(existingShardsAllocators);
}

public void setRerouteServiceForAllocator(RerouteService rerouteService) {
shardsAllocator.setRerouteService(rerouteService);
}
}
Loading

0 comments on commit c6246e6

Please sign in to comment.