From 97ccdc5472239ce047ec16dab58ba54b6aa19d8d Mon Sep 17 00:00:00 2001 From: Ankit Kala Date: Tue, 19 Jul 2022 09:59:05 +0530 Subject: [PATCH 1/2] SegRep POC --- .github/workflows/security-tests.yml | 12 ---- build.gradle | 26 +++++++- .../replication/ReplicationPlugin.kt | 13 +++- .../repository/RemoteClusterRepository.kt | 4 +- .../seqno/RemoteClusterTranslogService.kt | 11 +++- .../task/index/IndexReplicationTask.kt | 59 +++++++++++++++++-- .../task/shard/ShardReplicationTask.kt | 22 +++++++ .../replication/util/ValidationUtil.kt | 2 +- 8 files changed, 124 insertions(+), 25 deletions(-) diff --git a/.github/workflows/security-tests.yml b/.github/workflows/security-tests.yml index ee435d15..9b7bcb3f 100644 --- a/.github/workflows/security-tests.yml +++ b/.github/workflows/security-tests.yml @@ -22,18 +22,6 @@ jobs: # This step uses the checkout Github action: https://github.com/actions/checkout - name: Checkout Branch uses: actions/checkout@v2 - # Security plugin dependency - - name: Checkout security - uses: actions/checkout@v2 - with: - repository: 'opensearch-project/security' - path: security - ref: 'main' - - name: Build security - working-directory: ./security - run: | - ./gradlew clean build -Dbuild.snapshot=false -x test - cp build/distributions/opensearch-security-*.zip ../src/test/resources/security/plugin/opensearch-security.zip - name: Build and run Replication tests run: | ls -al src/test/resources/security/plugin diff --git a/build.gradle b/build.gradle index 4ff5e3fe..53355ba6 100644 --- a/build.gradle +++ b/build.gradle @@ -36,7 +36,7 @@ import org.opensearch.gradle.test.RestIntegTestTask buildscript { ext { isSnapshot = "true" == System.getProperty("build.snapshot", "true") - opensearch_version = System.getProperty("opensearch.version", "2.1.0-SNAPSHOT") + opensearch_version = System.getProperty("opensearch.version", "3.0.0-SNAPSHOT") buildVersionQualifier = System.getProperty("build.version_qualifier", "") // e.g. 2.0.0-rc1-SNAPSHOT -> 2.0.0.0-rc1-SNAPSHOT version_tokens = opensearch_version.tokenize('-') @@ -56,6 +56,14 @@ buildscript { common_utils_version = System.getProperty("common_utils.version", opensearch_build) kotlin_version = System.getProperty("kotlin.version", "1.6.0") + // For fetching security zip from Maven. + // https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/2.1.0/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-security-2.1.0.0.zip + opensearch_no_snapshot = opensearch_version.replace("-SNAPSHOT","") + security_no_snapshot = opensearch_build.replace("-SNAPSHOT","") + security_plugin_path = "build/dependencies/security" + security_plugin_download_url = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot + + '/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-security-' + security_no_snapshot + '.zip' + } repositories { @@ -63,6 +71,7 @@ buildscript { mavenCentral() maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } maven { url "https://plugins.gradle.org/m2/" } + maven { url "https://d1nvenhzbhpy0q.cloudfront.net/snapshots/lucene/" } } dependencies { @@ -209,7 +218,14 @@ def securityPluginFile = new Callable() { return new RegularFile() { @Override File getAsFile() { - return fileTree("$projectDir/src/test/resources/security/plugin/opensearch-security.zip").getSingleFile() + if (new File("$project.rootDir/$security_plugin_path").exists()) { + project.delete(files("$project.rootDir/$security_plugin_path")) + } + project.mkdir security_plugin_path + ant.get(src: security_plugin_download_url, + dest: security_plugin_path, + httpusecaches: false) + return fileTree(security_plugin_path).getSingleFile() } } } @@ -337,6 +353,8 @@ testClusters { //numberOfNodes = 3 setting 'path.repo', repo.absolutePath if(_numNodes == 1) jvmArgs "${-> getDebugJvmArgs(debugPort++)}" + systemProperty "opensearch.experimental.feature.replication_type.enabled", "true" + systemProperty "opensearch.experimental.feature.remote.segment_replication.enabled", "true" } followCluster { testDistribution = "INTEG_TEST" @@ -349,6 +367,8 @@ testClusters { //numberOfNodes = 3 setting 'path.repo', repo.absolutePath if(_numNodes == 1) jvmArgs "${-> getDebugJvmArgs(debugPort++)}" + systemProperty "opensearch.experimental.feature.replication_type.enabled", "true" + systemProperty "opensearch.experimental.feature.remote.segment_replication.enabled", "true" } } @@ -360,6 +380,8 @@ def configureCluster(OpenSearchCluster cluster, Boolean securityEnabled) { node.getAllTransportPortURI().stream() }.collect(Collectors.joining("\n")) cluster.nodes.forEach {node -> + node.systemProperty("opensearch.experimental.feature.replication_type.enabled", "true") + node.systemProperty("opensearch.experimental.feature.remote.segment_replication.enabled", "true") try { // Manually write the unicast hosts as we are not depending on the internal method Files.write(node.getConfigDir().resolve("unicast_hosts.txt"), unicastUris.getBytes(StandardCharsets.UTF_8)); diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt index 3fa602b0..afc2b4ed 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt @@ -76,6 +76,7 @@ import org.opensearch.action.ActionRequest import org.opensearch.action.ActionResponse import org.opensearch.client.Client import org.opensearch.cluster.NamedDiff +import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.metadata.Metadata import org.opensearch.cluster.metadata.RepositoryMetadata @@ -104,6 +105,7 @@ import org.opensearch.env.NodeEnvironment import org.opensearch.index.IndexModule import org.opensearch.index.IndexSettings import org.opensearch.index.engine.EngineFactory +import org.opensearch.index.engine.NRTReplicationEngine import org.opensearch.index.translog.ReplicationTranslogDeletionPolicy import org.opensearch.index.translog.TranslogDeletionPolicyFactory import org.opensearch.indices.recovery.RecoverySettings @@ -359,7 +361,16 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, override fun getEngineFactory(indexSettings: IndexSettings): Optional { return if (indexSettings.settings.get(REPLICATED_INDEX_SETTING.key) != null) { - Optional.of(EngineFactory { config -> ReplicationEngine(config) }) + Optional.of(EngineFactory { config -> + // Do not override the Engine for replica shards + if (indexSettings.settings.get(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.key) != null + && indexSettings.settings.get(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.key).equals("SEGMENT") + && (config.isReadOnlyReplica || config.isReadOnlyPrimary)) { + NRTReplicationEngine(config) + } else { + ReplicationEngine(config) + } + }) } else { Optional.empty() } diff --git a/src/main/kotlin/org/opensearch/replication/repository/RemoteClusterRepository.kt b/src/main/kotlin/org/opensearch/replication/repository/RemoteClusterRepository.kt index 6cfbff0d..8d40f6b4 100644 --- a/src/main/kotlin/org/opensearch/replication/repository/RemoteClusterRepository.kt +++ b/src/main/kotlin/org/opensearch/replication/repository/RemoteClusterRepository.kt @@ -100,8 +100,6 @@ class RemoteClusterRepository(private val repositoryMetadata: RepositoryMetadata fun repoForCluster(leaderClusterName: String): String = REMOTE_REPOSITORY_PREFIX + leaderClusterName } - - override fun getRestoreThrottleTimeInNanos(): Long { return restoreRateLimitingTimeInNanos.count() } @@ -238,10 +236,12 @@ class RemoteClusterRepository(private val repositoryMetadata: RepositoryMetadata val builder = Settings.builder().put(indexMetadata.settings) val replicatedIndex = "${repositoryMetadata.leaderClusterName()}:${index.name}" builder.put(ReplicationPlugin.REPLICATED_INDEX_SETTING.key, replicatedIndex) + builder.put(IndexMetadata.INDEX_REMOTE_REPLICATION_TYPE_SETTING.key, "SEGMENT") // Remove translog pruning for the follower index builder.remove(REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING.key) + val indexMdBuilder = IndexMetadata.builder(indexMetadata).settings(builder) indexMetadata.aliases.valuesIt().forEach { indexMdBuilder.putAlias(it) diff --git a/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterTranslogService.kt b/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterTranslogService.kt index 21edfea3..3d7973dd 100644 --- a/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterTranslogService.kt +++ b/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterTranslogService.kt @@ -12,10 +12,8 @@ package org.opensearch.replication.seqno import org.apache.logging.log4j.LogManager -import org.opensearch.ResourceNotFoundException import org.opensearch.common.component.AbstractLifecycleComponent import org.opensearch.common.inject.Singleton -import org.opensearch.index.engine.Engine import org.opensearch.index.shard.IndexShard import org.opensearch.index.translog.Translog @@ -38,7 +36,14 @@ class RemoteClusterTranslogService : AbstractLifecycleComponent(){ public fun getHistoryOfOperations(indexShard: IndexShard, startSeqNo: Long, toSeqNo: Long): List { log.trace("Fetching translog snapshot for $indexShard - from $startSeqNo to $toSeqNo") // Ref issue: https://github.com/opensearch-project/OpenSearch/issues/2482 - val snapshot = indexShard.getHistoryOperationsFromTranslogFile(SOURCE_NAME, startSeqNo, toSeqNo) + //val snapshot = indexShard.getHistoryOperationsFromTranslogFile(SOURCE_NAME, startSeqNo, toSeqNo) + val snapshot = object: Translog.Snapshot { + override fun close() { } + override fun totalOperations(): Int { return 0 } + override fun next(): Translog.Operation { throw Exception("") } + } + + // Total ops to be fetched (both toSeqNo and startSeqNo are inclusive) val opsSize = toSeqNo - startSeqNo + 1 diff --git a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt index 33a311c0..711f321f 100644 --- a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt @@ -52,8 +52,12 @@ import org.opensearch.action.ActionListener import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest +import org.opensearch.action.admin.indices.create.CreateIndexRequestBuilder import org.opensearch.action.admin.indices.delete.DeleteIndexRequest +import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest +import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequestBuilder import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest +import org.opensearch.action.admin.indices.settings.get.GetSettingsRequestBuilder import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.opensearch.client.Client import org.opensearch.client.Requests @@ -93,7 +97,7 @@ import org.opensearch.rest.RestStatus import org.opensearch.tasks.TaskId import org.opensearch.tasks.TaskManager import org.opensearch.threadpool.ThreadPool -import java.util.Collections +import java.util.* import java.util.function.Predicate import java.util.stream.Collectors import kotlin.coroutines.resume @@ -143,7 +147,10 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING, EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING, EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING, - IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING + IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING, + // TODO: Should also be removing other settings like index.creation_date, index.provided_name, index.uuid, index.version.created + IndexMetadata.INDEX_REMOTE_REPLICATION_TYPE_SETTING, + IndexMetadata.INDEX_REPLICATION_TYPE_SETTING ) val blockListedSettings :Set = blSettings.stream().map { k -> k.key }.collect(Collectors.toSet()) @@ -409,7 +416,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript private suspend fun pollForMetadata(scope: CoroutineScope) { while (scope.isActive) { try { - log.debug("Polling for metadata for $followerIndexName") + log.info("Polling for metadata for $followerIndexName") var staticUpdated = false var gsr = GetSettingsRequest().includeDefaults(false).indices(this.leaderIndex.name) var settingsResponse = remoteClient.suspending(remoteClient.admin().indices()::getSettings, injectSecurityContext = true)(gsr) @@ -421,6 +428,9 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript leaderSettings = leaderSettings.filter { k: String? -> !blockListedSettings.contains(k) } + log.info("Filtered leaser settings are $leaderSettings") + + //leaderSettings.filter() gsr = GetSettingsRequest().includeDefaults(false).indices(this.followerIndexName) settingsResponse = client.suspending(client.admin().indices()::getSettings, injectSecurityContext = true)(gsr) @@ -436,6 +446,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript val indexScopedSettings = settingsModule.indexScopedSettings val settingsList = arrayOf(leaderSettings, overriddenSettings) + log.info("Final settings to be updated on follower are $leaderSettings") val desiredSettingsBuilder = Settings.builder() // Desired settings are taking leader Settings and then overriding them with desired settings for (settings in settingsList) { @@ -445,10 +456,12 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript } val setting = indexScopedSettings[key] if (!setting.isPrivateIndex) { - desiredSettingsBuilder.copy(key, settings); + desiredSettingsBuilder.copy(key, settings) } } } + desiredSettingsBuilder.copy(IndexMetadata.SETTING_REMOTE_REPLICATION_TYPE,followerSettings) + desiredSettingsBuilder.copy(IndexMetadata.SETTING_REPLICATION_TYPE,followerSettings) val desiredSettings = desiredSettingsBuilder.build() val changedSettingsBuilder = Settings.builder() @@ -744,6 +757,42 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript log.error("Unable to update setting for translog pruning based on retention lease") } + /* + //Get leader settings + val getSettingRequest = GetSettingsRequest().includeDefaults(false).indices(leaderIndex.name) + + val settingsResponse = remoteClient.suspending(remoteClient.admin().indices()::getSettings, + injectSecurityContext = true)(getSettingRequest) + val leaderSettings = settingsResponse.indexToSettings.get(leaderIndex.name) + + + //Update Follower settings + val followerSettingBuilder = Settings.builder() + followerSettingBuilder.put(leaderSettings) + val replicatedIndex = "${leaderAlias}:${leaderIndex}" //TODO: fix the replicated Index value + followerSettingBuilder.put(REPLICATED_INDEX_SETTING.key, replicatedIndex) + followerSettingBuilder.put(IndexMetadata.INDEX_REMOTE_REPLICATION_TYPE_SETTING.key, "SEGMENT") + // Remove translog pruning for the follower index + followerSettingBuilder.remove(REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING.key) + followerSettingBuilder.remove("index.creation_date") + followerSettingBuilder.remove("index.uuid") + followerSettingBuilder.remove("index.version.created") + followerSettingBuilder.remove("index.provided_name") + + + val getMappingsRequest = GetMappingsRequest().indices(leaderIndex.name) + val getMappingsResponse = remoteClient.suspending(remoteClient.admin().indices()::getMappings, injectSecurityContext = true)(getMappingsRequest) + val followerMappingSource = getMappingsResponse?.mappings()?.get(leaderIndex.name)?.source()?.string() + + + client.admin().indices().prepareCreate(followerIndexName) + .setSettings(followerSettingBuilder.build()) + .setMapping(followerMappingSource) + .execute().get() + + log.info("ankikala: Follower Index Created") + */ + val restoreRequest = client.admin().cluster() .prepareRestoreSnapshot(RemoteClusterRepository.repoForCluster(leaderAlias), REMOTE_SNAPSHOT_NAME) .setIndices(leaderIndex.name) @@ -769,10 +818,12 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript return FailedState(Collections.emptyMap(), err) } cso.waitForNextChange("remote restore start") { inProgressRestore(it) != null } + return RestoreState } private suspend fun waitForRestore(): IndexReplicationState { + var restore = inProgressRestore(clusterService.state()) // Waiting for snapshot restore to reach a terminal stage. diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt index 44493bc7..e3c020e8 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt @@ -11,6 +11,8 @@ package org.opensearch.replication.task.shard +import org.opensearch.action.support.replication.crosscluster.follower.SyncLeaderSegmentsRequest +import org.opensearch.action.support.replication.crosscluster.follower.SyncLeaderSegmentsAction import org.opensearch.replication.ReplicationSettings import org.opensearch.replication.action.changes.GetChangesAction import org.opensearch.replication.action.changes.GetChangesRequest @@ -50,6 +52,7 @@ import org.opensearch.index.seqno.RetentionLeaseNotFoundException import org.opensearch.index.shard.ShardId import org.opensearch.persistent.PersistentTaskState import org.opensearch.persistent.PersistentTasksNodeService +import org.opensearch.replication.util.suspendExecute import org.opensearch.rest.RestStatus import org.opensearch.tasks.TaskId import org.opensearch.threadpool.ThreadPool @@ -198,6 +201,15 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: val followerIndexService = indicesService.indexServiceSafe(followerShardId.index) val indexShard = followerIndexService.getShard(followerShardId.id) + addListenerToInterruptTask() + coroutineScope { + while (isActive) { + syncSegments() + delay(10000) + } + } + + /* try { //Retention leases preserve the operations including and starting from the retainingSequenceNumber we specify when we take the lease . //hence renew retention lease with lastSyncedGlobalCheckpoint + 1 @@ -209,6 +221,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: } addListenerToInterruptTask() + this.followerClusterStats.stats[followerShardId] = FollowerShardMetric() // Since this setting is not dynamic, setting update would only reflect after pause-resume or on a new replication job. @@ -293,6 +306,15 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: } } sequencer.close() + */ + } + private suspend fun syncSegments(){ + val remoteClient = client.getRemoteClusterClient(leaderAlias) + val request = SyncLeaderSegmentsRequest(leaderAlias, leaderShardId, followerShardId) + logInfo("ankikala: Fetching changes from leader") + var changesResp = client.suspendExecute(replicationMetadata = replicationMetadata, + action = SyncLeaderSegmentsAction.INSTANCE, req = request) + logInfo("ankikala: Sync complete") } private suspend fun getChanges(fromSeqNo: Long, toSeqNo: Long): GetChangesResponse { diff --git a/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt b/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt index ad4c20d4..a4d66a13 100644 --- a/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt +++ b/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt @@ -35,7 +35,7 @@ object ValidationUtil { val analyserSettings = leaderSettings.filter { k: String? -> k!!.matches(Regex("index.analysis.*path")) } for (analyserSetting in analyserSettings.keySet()) { val settingValue = if (overriddenSettings.hasValue(analyserSetting)) overriddenSettings.get(analyserSetting) else analyserSettings.get(analyserSetting) - val path: Path = environment.configFile().resolve(settingValue) + val path: Path = environment.configDir().resolve(settingValue) if (!Files.exists(path)) { val message = "IOException while reading ${analyserSetting}: ${path.toString()}" log.error(message) From d9436fe0b5b9f90409bef9bab63cdb525fbcfa29 Mon Sep 17 00:00:00 2001 From: Ankit Kala Date: Tue, 23 Aug 2022 21:03:14 +0530 Subject: [PATCH 2/2] temp --- .../replication/task/index/IndexReplicationTask.kt | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt index 711f321f..eed173a5 100644 --- a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt @@ -757,7 +757,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript log.error("Unable to update setting for translog pruning based on retention lease") } - /* + //Get leader settings val getSettingRequest = GetSettingsRequest().includeDefaults(false).indices(leaderIndex.name) @@ -791,7 +791,8 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript .execute().get() log.info("ankikala: Follower Index Created") - */ + + /* val restoreRequest = client.admin().cluster() .prepareRestoreSnapshot(RemoteClusterRepository.repoForCluster(leaderAlias), REMOTE_SNAPSHOT_NAME) @@ -818,12 +819,13 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript return FailedState(Collections.emptyMap(), err) } cso.waitForNextChange("remote restore start") { inProgressRestore(it) != null } - + */ return RestoreState } private suspend fun waitForRestore(): IndexReplicationState { - + return InitFollowState + /* var restore = inProgressRestore(clusterService.state()) // Waiting for snapshot restore to reach a terminal stage. @@ -860,6 +862,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript } else { return InitFollowState } + */ } /**