Skip to content

Commit

Permalink
Merge a34afea into 505e242
Browse files Browse the repository at this point in the history
  • Loading branch information
monusingh-1 committed Aug 17, 2023
2 parents 505e242 + a34afea commit 8b895cc
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 34 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,9 @@ task integTestRemote (type: RestIntegTestTask) {
systemProperty "tests.cluster.leaderCluster.security_enabled", System.getProperty("security_enabled")

nonInputProperties.systemProperty('tests.integTestRemote', "true")
var numberOfNodes = findProperty('numNodes') as Integer
systemProperty "tests.cluster.followCluster.total_nodes", "${-> numberOfNodes.toString()}"
systemProperty "tests.cluster.leaderCluster.total_nodes", "${-> numberOfNodes.toString()}"
systemProperty "build.dir", "${buildDir}"

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
lateinit var testClusters : Map<String, TestCluster>
var isSecurityPropertyEnabled = false
var forceInitSecurityConfiguration = false
var isMultiNodeClusterConfiguration = true

internal fun createTestCluster(configuration: ClusterConfiguration) : TestCluster {
return createTestCluster(configuration.clusterName, configuration.preserveSnapshots, configuration.preserveIndices,
Expand All @@ -132,7 +131,6 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
val httpHostsProp = systemProperties.get("tests.cluster.${cluster}.http_hosts") as String?
val transportHostsProp = systemProperties.get("tests.cluster.${cluster}.transport_hosts") as String?
val securityEnabled = systemProperties.get("tests.cluster.${cluster}.security_enabled") as String?
val totalNodes = systemProperties.get("tests.cluster.${cluster}.total_nodes") as String?

requireNotNull(httpHostsProp) { "Missing http hosts property for cluster: $cluster."}
requireNotNull(transportHostsProp) { "Missing transport hosts property for cluster: $cluster."}
Expand All @@ -144,9 +142,6 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
isSecurityPropertyEnabled = true
}

if(totalNodes != null && totalNodes < "2") {
isMultiNodeClusterConfiguration = false
}

forceInitSecurityConfiguration = isSecurityPropertyEnabled && initSecurityConfiguration

Expand Down Expand Up @@ -666,6 +661,19 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
return integTestRemote.equals("true")
}

protected fun isMultiNodeClusterConfiguration(leaderCluster: String, followerCluster: String): Boolean{
val systemProperties = BootstrapInfo.getSystemProperties()
val totalLeaderNodes = systemProperties.get("tests.cluster.${leaderCluster}.total_nodes") as String
val totalFollowerNodes = systemProperties.get("tests.cluster.${followerCluster}.total_nodes") as String

assertNotNull(totalLeaderNodes)
assertNotNull(totalFollowerNodes)
if(totalLeaderNodes < "2" || totalFollowerNodes < "2" ) {
return false
}
return true
}

protected fun docCount(cluster: RestHighLevelClient, indexName: String) : Int {
val persistentConnectionRequest = Request("GET", "/$indexName/_search?pretty&q=*")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ const val INDEX_TASK_CANCELLATION_REASON = "AutoPaused: Index replication task w
const val STATUS_REASON_USER_INITIATED = "User initiated"
const val STATUS_REASON_SHARD_TASK_CANCELLED = "Shard task killed or cancelled."
const val STATUS_REASON_INDEX_NOT_FOUND = "no such index"
const val ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS = "Analysers are not accessible when run on remote clusters."
const val SNAPSHOTS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS = "Snapshots are not accessible when run on remote clusters."
const val REROUTE_TESTS_NOT_ELIGIBLE_FOR_SINGLE_NODE_CLUSTER = "Reroute not eligible for single node clusters"


fun RestHighLevelClient.startReplication(request: StartReplicationRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.opensearch.replication.MultiClusterRestTestCase
import org.opensearch.replication.MultiClusterAnnotations
import org.opensearch.replication.StartReplicationRequest
import org.opensearch.replication.startReplication
import org.opensearch.replication.stopReplication
import org.opensearch.replication.REROUTE_TESTS_NOT_ELIGIBLE_FOR_SINGLE_NODE_CLUSTER
import org.assertj.core.api.Assertions
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.CreateIndexRequest
Expand All @@ -25,7 +25,7 @@ class ClusterRerouteFollowerIT : MultiClusterRestTestCase() {

@Before
fun beforeTest() {
Assume.assumeTrue(isMultiNodeClusterConfiguration)
Assume.assumeTrue(REROUTE_TESTS_NOT_ELIGIBLE_FOR_SINGLE_NODE_CLUSTER, isMultiNodeClusterConfiguration(LEADER, FOLLOWER))
}

fun `test replication works after rerouting a shard from one node to another in follower cluster`() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.junit.Assert
import org.junit.Assume
import org.junit.Before
import org.junit.Ignore
import org.opensearch.replication.REROUTE_TESTS_NOT_ELIGIBLE_FOR_SINGLE_NODE_CLUSTER
import java.util.concurrent.TimeUnit

@MultiClusterAnnotations.ClusterConfigurations(
Expand All @@ -30,7 +31,7 @@ class ClusterRerouteLeaderIT : MultiClusterRestTestCase() {

@Before
fun beforeTest() {
Assume.assumeTrue(isMultiNodeClusterConfiguration)
Assume.assumeTrue(REROUTE_TESTS_NOT_ELIGIBLE_FOR_SINGLE_NODE_CLUSTER, isMultiNodeClusterConfiguration(LEADER, FOLLOWER),)
}

fun `test replication works after rerouting a shard from one node to another in leader cluster`() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ import org.opensearch.client.indices.GetMappingsRequest
import org.opensearch.common.io.PathUtils
import org.opensearch.common.settings.Settings
import org.junit.Assert
import org.junit.Assume
import java.nio.file.Files
import java.util.concurrent.TimeUnit
import org.opensearch.bootstrap.BootstrapInfo
import org.opensearch.replication.ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS

@MultiClusterAnnotations.ClusterConfigurations(
MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER),
Expand Down Expand Up @@ -165,9 +166,7 @@ class ResumeReplicationIT: MultiClusterRestTestCase() {

fun `test that replication fails to resume when custom analyser is not present in follower`() {

if(checkifIntegTestRemote()){
return;
}
Assume.assumeFalse(ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS, checkifIntegTestRemote())

val synonyms = javaClass.getResourceAsStream("/analyzers/synonyms.txt")
val config = PathUtils.get(buildDir, leaderClusterPath, "config")
Expand Down Expand Up @@ -202,9 +201,7 @@ class ResumeReplicationIT: MultiClusterRestTestCase() {

fun `test that replication resumes when custom analyser is present in follower`() {

if(checkifIntegTestRemote()){
return;
}
Assume.assumeFalse(ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS, checkifIntegTestRemote())

val synonyms = javaClass.getResourceAsStream("/analyzers/synonyms.txt")
val config = PathUtils.get(buildDir, leaderClusterPath, "config")
Expand Down Expand Up @@ -246,9 +243,7 @@ class ResumeReplicationIT: MultiClusterRestTestCase() {

fun `test that replication resumes when custom analyser is overridden and present in follower`() {

if(checkifIntegTestRemote()){
return;
}
Assume.assumeFalse(ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS, checkifIntegTestRemote())

val synonyms = javaClass.getResourceAsStream("/analyzers/synonyms.txt")
val config = PathUtils.get(buildDir, leaderClusterPath, "config")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.opensearch.replication.resumeReplication
import org.opensearch.replication.`validate paused status response due to leader index deleted`
import org.opensearch.replication.`validate status syncing response`
import org.opensearch.replication.startReplication
import org.opensearch.replication.ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS
import org.opensearch.replication.SNAPSHOTS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS
import org.opensearch.replication.stopReplication
import org.opensearch.replication.updateReplication
import org.apache.hc.core5.http.HttpStatus
Expand Down Expand Up @@ -66,7 +68,7 @@ import org.opensearch.index.mapper.MapperService
import org.opensearch.repositories.fs.FsRepository
import org.opensearch.test.OpenSearchTestCase.assertBusy
import org.junit.Assert
import org.opensearch.cluster.metadata.AliasMetadata
import org.junit.Assume
import org.opensearch.core.xcontent.DeprecationHandler
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING
Expand Down Expand Up @@ -585,9 +587,7 @@ class StartReplicationIT: MultiClusterRestTestCase() {

fun `test that replication fails to start when custom analyser is not present in follower`() {

if(checkifIntegTestRemote()){
return;
}
Assume.assumeFalse(ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS, checkifIntegTestRemote())

val synonyms = javaClass.getResourceAsStream("/analyzers/synonyms.txt")
val config = PathUtils.get(buildDir, leaderClusterPath, "config")
Expand Down Expand Up @@ -620,9 +620,7 @@ class StartReplicationIT: MultiClusterRestTestCase() {

fun `test that replication starts successfully when custom analyser is present in follower`() {

if(checkifIntegTestRemote()){
return;
}
Assume.assumeFalse(ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS, checkifIntegTestRemote())

val synonyms = javaClass.getResourceAsStream("/analyzers/synonyms.txt")
val leaderConfig = PathUtils.get(buildDir, leaderClusterPath, "config")
Expand Down Expand Up @@ -662,9 +660,7 @@ class StartReplicationIT: MultiClusterRestTestCase() {

fun `test that replication starts successfully when custom analyser is overridden and present in follower`() {

if(checkifIntegTestRemote()){
return;
}
Assume.assumeFalse(ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS, checkifIntegTestRemote())

val synonyms = javaClass.getResourceAsStream("/analyzers/synonyms.txt")
val leaderConfig = PathUtils.get(buildDir, leaderClusterPath, "config")
Expand Down Expand Up @@ -801,9 +797,7 @@ class StartReplicationIT: MultiClusterRestTestCase() {

fun `test that snapshot on leader does not affect replication during bootstrap`() {

if(checkifIntegTestRemote()){
return;
}
Assume.assumeFalse(SNAPSHOTS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS,checkifIntegTestRemote())

val settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 20)
Expand Down Expand Up @@ -1147,6 +1141,10 @@ class StartReplicationIT: MultiClusterRestTestCase() {
}

fun `test that wait_for_active_shards setting is updated on follower through start replication api`() {

Assume.assumeTrue("Ignore this test if clusters dont have multiple nodes as this test reles on wait_for_active_shards",
isMultiNodeClusterConfiguration(LEADER, FOLLOWER))

val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hc.core5.http.io.entity.EntityUtils
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Assert
import org.junit.Assume
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest
Expand All @@ -40,6 +41,7 @@ import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.index.mapper.MapperService
import org.opensearch.replication.SNAPSHOTS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS
import java.util.Random
import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -243,9 +245,7 @@ class StopReplicationIT: MultiClusterRestTestCase() {

fun `test stop replication with stale replication settings at leader cluster`() {

if(checkifIntegTestRemote()){
return;
}
Assume.assumeFalse(SNAPSHOTS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS, checkifIntegTestRemote())

val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
Expand Down

0 comments on commit 8b895cc

Please sign in to comment.