Skip to content

Commit

Permalink
Merge branch 'main' into backport/backport-589-to-2.x
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitkala authored Oct 19, 2022
2 parents ca9743d + 5882c09 commit 5aa0b77
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 29 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ curl -XPOST "http://${LEADER}/leader-01/_doc/1" -H 'Content-Type: application/js
```bash
curl -XPUT "http://${FOLLOWER}/_plugins/_replication/follower-01/_start?pretty" \
-H 'Content-type: application/json' \
-d'{"leader_cluster":"leader-cluster", "leader_index": "leader-01"}'
-d'{"leader_alias":"leader-cluster", "leader_index": "leader-01"}'
```

### Step 5: Make changes to data on leader index
Expand Down Expand Up @@ -157,6 +157,12 @@ For more details on design and architecture, please refer to [RFC](docs/RFC.md)

See [CONTRIBUTING](CONTRIBUTING.md) for more information.

## Getting Help

If you find a bug, or have a feature request, please don't hesitate to open an issue in this repository.

For more information, see the [project website](https://opensearch.org/) and [technical documentation](https://opensearch.org/docs/latest/replication-plugin/index/). If you need help and are unsure where to open an issue, try the OpenSearch [Forum](https://forum.opensearch.org/c/plugins/cross-cluster-replication/53).

## License

This project is licensed under the Apache-2.0 License.
Expand Down
30 changes: 12 additions & 18 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,7 @@ buildscript {
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
kotlin_version = System.getProperty("kotlin.version", "1.6.0")

// For fetching security zip from Maven.
// https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/2.1.0/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-security-2.1.0.0.zip
opensearch_no_snapshot = opensearch_version.replace("-SNAPSHOT","")
security_no_snapshot = opensearch_build.replace("-SNAPSHOT","")
security_plugin_path = "build/dependencies/security"
security_plugin_download_url = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot +
'/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-security-' + security_no_snapshot + '.zip'

security_plugin_version = opensearch_build.replace("-SNAPSHOT","")
}

repositories {
Expand Down Expand Up @@ -120,12 +113,16 @@ configurations.all {
force 'org.apache.httpcomponents:httpclient-osgi:4.5.13'
force 'org.apache.httpcomponents.client5:httpclient5:5.0.3'
force 'org.apache.httpcomponents.client5:httpclient5-osgi:5.0.3'
force 'com.fasterxml.jackson.core:jackson-databind:2.12.6'
force 'org.yaml:snakeyaml:1.31'
force 'com.fasterxml.jackson.core:jackson-databind:2.13.4'
force 'org.yaml:snakeyaml:1.32'
force 'org.codehaus.plexus:plexus-utils:3.0.24'
}
}

configurations {
opensearchPlugin
}

dependencies {
runtimeOnly "org.opensearch:opensearch:${opensearch_version}"
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
Expand All @@ -143,6 +140,8 @@ dependencies {
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:${kotlin_version}"
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0"

opensearchPlugin "org.opensearch.plugin:opensearch-security:${security_plugin_version}@zip"
}

repositories {
Expand Down Expand Up @@ -218,14 +217,9 @@ def securityPluginFile = new Callable<RegularFile>() {
return new RegularFile() {
@Override
File getAsFile() {
if (new File("$project.rootDir/$security_plugin_path").exists()) {
project.delete(files("$project.rootDir/$security_plugin_path"))
}
project.mkdir security_plugin_path
ant.get(src: security_plugin_download_url,
dest: security_plugin_path,
httpusecaches: false)
return fileTree(security_plugin_path).getSingleFile()
return configurations.opensearchPlugin.resolvedConfiguration.resolvedArtifacts
.find { ResolvedArtifact f -> f.name.contains('opensearch-security') }
.file
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
## Version 2.2.0.0 Release Notes

Compatible with OpenSearch 2.2.0

### Bug Fixes
* Adding Index Settings validation before starting replication ([#461](https://github.com/opensearch-project/cross-cluster-replication/pull/461))

### Infrastructure
* Use the published zip for security plugin ([#455](https://github.com/opensearch-project/cross-cluster-replication/pull/455))
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats)

val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings)
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
coroutineScope {
while (isActive) {
rateLimiter.acquire()
Expand Down Expand Up @@ -273,7 +274,6 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
//hence renew retention lease with lastSyncedGlobalCheckpoint + 1 so that any shard that picks up shard replication task has data until then.
try {
retentionLeaseHelper.renewRetentionLease(leaderShardId, indexShard.lastSyncedGlobalCheckpoint + 1, followerShardId)
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.lastSyncedGlobalCheckpoint
lastLeaseRenewalMillis = System.currentTimeMillis()
} catch (ex: Exception) {
when (ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.client.Client
import org.opensearch.common.logging.Loggers
import org.opensearch.index.shard.ShardId
import org.opensearch.index.translog.Translog
import org.opensearch.replication.util.indicesService
import org.opensearch.tasks.TaskId
import java.util.ArrayList
import java.util.concurrent.ConcurrentHashMap
Expand Down Expand Up @@ -55,6 +56,9 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
private val log = Loggers.getLogger(javaClass, followerShardId)!!
private val completed = CompletableDeferred<Unit>()

val followerIndexService = indicesService.indexServiceSafe(followerShardId.index)
val indexShard = followerIndexService.getShard(followerShardId.id)

private val sequencer = scope.actor<Unit>(capacity = Channel.UNLIMITED) {
// Exceptions thrown here will mark the channel as failed and the next attempt to send to the channel will
// raise the same exception. See [SendChannel.close] method for details.
Expand Down Expand Up @@ -88,6 +92,7 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
val tookInNanos = System.nanoTime() - relativeStartNanos
followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet(TimeUnit.NANOSECONDS.toMillis(tookInNanos))
followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(replayRequest.changes.size.toLong())
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
}
highWatermark = next.changes.lastOrNull()?.seqNo() ?: highWatermark
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,7 @@ class StartReplicationIT: MultiClusterRestTestCase() {
assertThat(stats.getValue("operations_read").toString()).isEqualTo("50")
assertThat(stats.getValue("failed_read_requests").toString()).isEqualTo("0")
assertThat(stats.getValue("failed_write_requests").toString()).isEqualTo("0")
assertThat(stats.getValue("follower_checkpoint").toString()).isEqualTo((docCount-1).toString())
assertThat(stats.containsKey("index_stats"))
assertThat(stats.size).isEqualTo(16)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,37 @@

package org.opensearch.replication.task.shard

import org.opensearch.replication.action.changes.GetChangesResponse
import org.opensearch.replication.action.replay.ReplayChangesAction
import org.opensearch.replication.action.replay.ReplayChangesRequest
import org.opensearch.replication.action.replay.ReplayChangesResponse
import org.opensearch.replication.metadata.ReplicationOverallState
import org.opensearch.replication.metadata.store.ReplicationContext
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.test.runBlockingTest
import org.assertj.core.api.Assertions.assertThat
import org.mockito.Mockito
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
import org.opensearch.action.ActionType
import org.opensearch.action.support.replication.ReplicationResponse.ShardInfo
import org.opensearch.common.settings.Settings
import org.opensearch.index.IndexService
import org.opensearch.index.shard.IndexShard
import org.opensearch.index.shard.ShardId
import org.opensearch.index.translog.Translog
import org.opensearch.indices.IndicesService
import org.opensearch.replication.action.changes.GetChangesResponse
import org.opensearch.replication.action.replay.ReplayChangesAction
import org.opensearch.replication.action.replay.ReplayChangesRequest
import org.opensearch.replication.action.replay.ReplayChangesResponse
import org.opensearch.replication.metadata.ReplicationOverallState
import org.opensearch.replication.metadata.store.ReplicationContext
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType
import org.opensearch.replication.util.indicesService
import org.opensearch.tasks.TaskId.EMPTY_TASK_ID
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.test.OpenSearchTestCase.randomList
import org.opensearch.test.client.NoOpClient
import java.util.Locale


@ObsoleteCoroutinesApi
class TranslogSequencerTests : OpenSearchTestCase() {

Expand Down Expand Up @@ -83,6 +88,11 @@ class TranslogSequencerTests : OpenSearchTestCase() {
val stats = FollowerClusterStats()
stats.stats[followerShardId] = FollowerShardMetric()
val startSeqNo = randomNonNegativeLong()
indicesService = Mockito.mock(IndicesService::class.java)
val followerIndexService = Mockito.mock(IndexService::class.java)
val indexShard = Mockito.mock(IndexShard::class.java)
Mockito.`when`(indicesService.indexServiceSafe(followerShardId.index)).thenReturn(followerIndexService)
Mockito.`when`(followerIndexService.getShard(followerShardId.id)).thenReturn(indexShard)
val sequencer = TranslogSequencer(this, replicationMetadata, followerShardId, leaderAlias, leaderIndex, EMPTY_TASK_ID,
client, startSeqNo, stats)

Expand Down

0 comments on commit 5aa0b77

Please sign in to comment.