diff --git a/README.md b/README.md index ee66e3687..f98b02591 100644 --- a/README.md +++ b/README.md @@ -106,7 +106,7 @@ curl -XPOST "http://${LEADER}/leader-01/_doc/1" -H 'Content-Type: application/js ```bash curl -XPUT "http://${FOLLOWER}/_plugins/_replication/follower-01/_start?pretty" \ -H 'Content-type: application/json' \ --d'{"leader_cluster":"leader-cluster", "leader_index": "leader-01"}' +-d'{"leader_alias":"leader-cluster", "leader_index": "leader-01"}' ``` ### Step 5: Make changes to data on leader index @@ -157,6 +157,12 @@ For more details on design and architecture, please refer to [RFC](docs/RFC.md) See [CONTRIBUTING](CONTRIBUTING.md) for more information. +## Getting Help + +If you find a bug, or have a feature request, please don't hesitate to open an issue in this repository. + +For more information, see the [project website](https://opensearch.org/) and [technical documentation](https://opensearch.org/docs/latest/replication-plugin/index/). If you need help and are unsure where to open an issue, try the OpenSearch [Forum](https://forum.opensearch.org/c/plugins/cross-cluster-replication/53). + ## License This project is licensed under the Apache-2.0 License. diff --git a/build.gradle b/build.gradle index a6fcb5a94..deb5a94df 100644 --- a/build.gradle +++ b/build.gradle @@ -56,14 +56,7 @@ buildscript { common_utils_version = System.getProperty("common_utils.version", opensearch_build) kotlin_version = System.getProperty("kotlin.version", "1.6.0") - // For fetching security zip from Maven. - // https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/2.1.0/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-security-2.1.0.0.zip - opensearch_no_snapshot = opensearch_version.replace("-SNAPSHOT","") - security_no_snapshot = opensearch_build.replace("-SNAPSHOT","") - security_plugin_path = "build/dependencies/security" - security_plugin_download_url = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot + - '/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-security-' + security_no_snapshot + '.zip' - + security_plugin_version = opensearch_build.replace("-SNAPSHOT","") } repositories { @@ -120,12 +113,16 @@ configurations.all { force 'org.apache.httpcomponents:httpclient-osgi:4.5.13' force 'org.apache.httpcomponents.client5:httpclient5:5.0.3' force 'org.apache.httpcomponents.client5:httpclient5-osgi:5.0.3' - force 'com.fasterxml.jackson.core:jackson-databind:2.12.6' - force 'org.yaml:snakeyaml:1.31' + force 'com.fasterxml.jackson.core:jackson-databind:2.13.4' + force 'org.yaml:snakeyaml:1.32' force 'org.codehaus.plexus:plexus-utils:3.0.24' } } +configurations { + opensearchPlugin +} + dependencies { runtimeOnly "org.opensearch:opensearch:${opensearch_version}" implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8" @@ -143,6 +140,8 @@ dependencies { testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:${kotlin_version}" testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0" + + opensearchPlugin "org.opensearch.plugin:opensearch-security:${security_plugin_version}@zip" } repositories { @@ -218,14 +217,9 @@ def securityPluginFile = new Callable() { return new RegularFile() { @Override File getAsFile() { - if (new File("$project.rootDir/$security_plugin_path").exists()) { - project.delete(files("$project.rootDir/$security_plugin_path")) - } - project.mkdir security_plugin_path - ant.get(src: security_plugin_download_url, - dest: security_plugin_path, - httpusecaches: false) - return fileTree(security_plugin_path).getSingleFile() + return configurations.opensearchPlugin.resolvedConfiguration.resolvedArtifacts + .find { ResolvedArtifact f -> f.name.contains('opensearch-security') } + .file } } } diff --git a/release-notes/opensearch-cross-cluster-replication.release-notes-2.2.0.0.md b/release-notes/opensearch-cross-cluster-replication.release-notes-2.2.0.0.md new file mode 100644 index 000000000..d3530e214 --- /dev/null +++ b/release-notes/opensearch-cross-cluster-replication.release-notes-2.2.0.0.md @@ -0,0 +1,9 @@ +## Version 2.2.0.0 Release Notes + +Compatible with OpenSearch 2.2.0 + +### Bug Fixes +* Adding Index Settings validation before starting replication ([#461](https://github.com/opensearch-project/cross-cluster-replication/pull/461)) + +### Infrastructure +* Use the published zip for security plugin ([#455](https://github.com/opensearch-project/cross-cluster-replication/pull/455)) diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt index a74189176..44493bc7d 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt @@ -217,6 +217,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats) val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings) + followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint coroutineScope { while (isActive) { rateLimiter.acquire() @@ -273,7 +274,6 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: //hence renew retention lease with lastSyncedGlobalCheckpoint + 1 so that any shard that picks up shard replication task has data until then. try { retentionLeaseHelper.renewRetentionLease(leaderShardId, indexShard.lastSyncedGlobalCheckpoint + 1, followerShardId) - followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.lastSyncedGlobalCheckpoint lastLeaseRenewalMillis = System.currentTimeMillis() } catch (ex: Exception) { when (ex) { diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt index be5fe89cc..38b625bf3 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt @@ -28,6 +28,7 @@ import org.opensearch.client.Client import org.opensearch.common.logging.Loggers import org.opensearch.index.shard.ShardId import org.opensearch.index.translog.Translog +import org.opensearch.replication.util.indicesService import org.opensearch.tasks.TaskId import java.util.ArrayList import java.util.concurrent.ConcurrentHashMap @@ -55,6 +56,9 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: private val log = Loggers.getLogger(javaClass, followerShardId)!! private val completed = CompletableDeferred() + val followerIndexService = indicesService.indexServiceSafe(followerShardId.index) + val indexShard = followerIndexService.getShard(followerShardId.id) + private val sequencer = scope.actor(capacity = Channel.UNLIMITED) { // Exceptions thrown here will mark the channel as failed and the next attempt to send to the channel will // raise the same exception. See [SendChannel.close] method for details. @@ -88,6 +92,7 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: val tookInNanos = System.nanoTime() - relativeStartNanos followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet(TimeUnit.NANOSECONDS.toMillis(tookInNanos)) followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(replayRequest.changes.size.toLong()) + followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint } highWatermark = next.changes.lastOrNull()?.seqNo() ?: highWatermark } diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt index f989859c4..ee82c3c44 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt @@ -1095,6 +1095,7 @@ class StartReplicationIT: MultiClusterRestTestCase() { assertThat(stats.getValue("operations_read").toString()).isEqualTo("50") assertThat(stats.getValue("failed_read_requests").toString()).isEqualTo("0") assertThat(stats.getValue("failed_write_requests").toString()).isEqualTo("0") + assertThat(stats.getValue("follower_checkpoint").toString()).isEqualTo((docCount-1).toString()) assertThat(stats.containsKey("index_stats")) assertThat(stats.size).isEqualTo(16) diff --git a/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt b/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt index ed5afb063..ac377687b 100644 --- a/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt +++ b/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt @@ -11,32 +11,37 @@ package org.opensearch.replication.task.shard -import org.opensearch.replication.action.changes.GetChangesResponse -import org.opensearch.replication.action.replay.ReplayChangesAction -import org.opensearch.replication.action.replay.ReplayChangesRequest -import org.opensearch.replication.action.replay.ReplayChangesResponse -import org.opensearch.replication.metadata.ReplicationOverallState -import org.opensearch.replication.metadata.store.ReplicationContext -import org.opensearch.replication.metadata.store.ReplicationMetadata -import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.ObsoleteCoroutinesApi import kotlinx.coroutines.test.runBlockingTest import org.assertj.core.api.Assertions.assertThat +import org.mockito.Mockito import org.opensearch.action.ActionListener import org.opensearch.action.ActionRequest import org.opensearch.action.ActionResponse import org.opensearch.action.ActionType import org.opensearch.action.support.replication.ReplicationResponse.ShardInfo import org.opensearch.common.settings.Settings +import org.opensearch.index.IndexService +import org.opensearch.index.shard.IndexShard import org.opensearch.index.shard.ShardId import org.opensearch.index.translog.Translog +import org.opensearch.indices.IndicesService +import org.opensearch.replication.action.changes.GetChangesResponse +import org.opensearch.replication.action.replay.ReplayChangesAction +import org.opensearch.replication.action.replay.ReplayChangesRequest +import org.opensearch.replication.action.replay.ReplayChangesResponse +import org.opensearch.replication.metadata.ReplicationOverallState +import org.opensearch.replication.metadata.store.ReplicationContext +import org.opensearch.replication.metadata.store.ReplicationMetadata +import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType +import org.opensearch.replication.util.indicesService import org.opensearch.tasks.TaskId.EMPTY_TASK_ID import org.opensearch.test.OpenSearchTestCase -import org.opensearch.test.OpenSearchTestCase.randomList import org.opensearch.test.client.NoOpClient import java.util.Locale + @ObsoleteCoroutinesApi class TranslogSequencerTests : OpenSearchTestCase() { @@ -83,6 +88,11 @@ class TranslogSequencerTests : OpenSearchTestCase() { val stats = FollowerClusterStats() stats.stats[followerShardId] = FollowerShardMetric() val startSeqNo = randomNonNegativeLong() + indicesService = Mockito.mock(IndicesService::class.java) + val followerIndexService = Mockito.mock(IndexService::class.java) + val indexShard = Mockito.mock(IndexShard::class.java) + Mockito.`when`(indicesService.indexServiceSafe(followerShardId.index)).thenReturn(followerIndexService) + Mockito.`when`(followerIndexService.getShard(followerShardId.id)).thenReturn(indexShard) val sequencer = TranslogSequencer(this, replicationMetadata, followerShardId, leaderAlias, leaderIndex, EMPTY_TASK_ID, client, startSeqNo, stats)