Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft]SegRep POC #479

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions .github/workflows/security-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,6 @@ jobs:
# This step uses the checkout Github action: https://github.com/actions/checkout
- name: Checkout Branch
uses: actions/checkout@v2
# Security plugin dependency
- name: Checkout security
uses: actions/checkout@v2
with:
repository: 'opensearch-project/security'
path: security
ref: 'main'
- name: Build security
working-directory: ./security
run: |
./gradlew clean build -Dbuild.snapshot=false -x test
cp build/distributions/opensearch-security-*.zip ../src/test/resources/security/plugin/opensearch-security.zip
- name: Build and run Replication tests
run: |
ls -al src/test/resources/security/plugin
Expand Down
26 changes: 24 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.opensearch.gradle.test.RestIntegTestTask
buildscript {
ext {
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
opensearch_version = System.getProperty("opensearch.version", "2.1.0-SNAPSHOT")
opensearch_version = System.getProperty("opensearch.version", "3.0.0-SNAPSHOT")
buildVersionQualifier = System.getProperty("build.version_qualifier", "")
// e.g. 2.0.0-rc1-SNAPSHOT -> 2.0.0.0-rc1-SNAPSHOT
version_tokens = opensearch_version.tokenize('-')
Expand All @@ -56,13 +56,22 @@ buildscript {
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
kotlin_version = System.getProperty("kotlin.version", "1.6.0")

// For fetching security zip from Maven.
// https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/2.1.0/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-security-2.1.0.0.zip
opensearch_no_snapshot = opensearch_version.replace("-SNAPSHOT","")
security_no_snapshot = opensearch_build.replace("-SNAPSHOT","")
security_plugin_path = "build/dependencies/security"
security_plugin_download_url = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot +
'/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-security-' + security_no_snapshot + '.zip'

}

repositories {
mavenLocal()
mavenCentral()
maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" }
maven { url "https://plugins.gradle.org/m2/" }
maven { url "https://d1nvenhzbhpy0q.cloudfront.net/snapshots/lucene/" }
}

dependencies {
Expand Down Expand Up @@ -209,7 +218,14 @@ def securityPluginFile = new Callable<RegularFile>() {
return new RegularFile() {
@Override
File getAsFile() {
return fileTree("$projectDir/src/test/resources/security/plugin/opensearch-security.zip").getSingleFile()
if (new File("$project.rootDir/$security_plugin_path").exists()) {
project.delete(files("$project.rootDir/$security_plugin_path"))
}
project.mkdir security_plugin_path
ant.get(src: security_plugin_download_url,
dest: security_plugin_path,
httpusecaches: false)
return fileTree(security_plugin_path).getSingleFile()
}
}
}
Expand Down Expand Up @@ -337,6 +353,8 @@ testClusters {
//numberOfNodes = 3
setting 'path.repo', repo.absolutePath
if(_numNodes == 1) jvmArgs "${-> getDebugJvmArgs(debugPort++)}"
systemProperty "opensearch.experimental.feature.replication_type.enabled", "true"
systemProperty "opensearch.experimental.feature.remote.segment_replication.enabled", "true"
}
followCluster {
testDistribution = "INTEG_TEST"
Expand All @@ -349,6 +367,8 @@ testClusters {
//numberOfNodes = 3
setting 'path.repo', repo.absolutePath
if(_numNodes == 1) jvmArgs "${-> getDebugJvmArgs(debugPort++)}"
systemProperty "opensearch.experimental.feature.replication_type.enabled", "true"
systemProperty "opensearch.experimental.feature.remote.segment_replication.enabled", "true"
}
}

Expand All @@ -360,6 +380,8 @@ def configureCluster(OpenSearchCluster cluster, Boolean securityEnabled) {
node.getAllTransportPortURI().stream()
}.collect(Collectors.joining("\n"))
cluster.nodes.forEach {node ->
node.systemProperty("opensearch.experimental.feature.replication_type.enabled", "true")
node.systemProperty("opensearch.experimental.feature.remote.segment_replication.enabled", "true")
try {
// Manually write the unicast hosts as we are not depending on the internal method
Files.write(node.getConfigDir().resolve("unicast_hosts.txt"), unicastUris.getBytes(StandardCharsets.UTF_8));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
import org.opensearch.client.Client
import org.opensearch.cluster.NamedDiff
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.metadata.Metadata
import org.opensearch.cluster.metadata.RepositoryMetadata
Expand Down Expand Up @@ -104,6 +105,7 @@ import org.opensearch.env.NodeEnvironment
import org.opensearch.index.IndexModule
import org.opensearch.index.IndexSettings
import org.opensearch.index.engine.EngineFactory
import org.opensearch.index.engine.NRTReplicationEngine
import org.opensearch.index.translog.ReplicationTranslogDeletionPolicy
import org.opensearch.index.translog.TranslogDeletionPolicyFactory
import org.opensearch.indices.recovery.RecoverySettings
Expand Down Expand Up @@ -359,7 +361,16 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,

override fun getEngineFactory(indexSettings: IndexSettings): Optional<EngineFactory> {
return if (indexSettings.settings.get(REPLICATED_INDEX_SETTING.key) != null) {
Optional.of(EngineFactory { config -> ReplicationEngine(config) })
Optional.of(EngineFactory { config ->
// Do not override the Engine for replica shards
if (indexSettings.settings.get(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.key) != null
&& indexSettings.settings.get(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.key).equals("SEGMENT")
&& (config.isReadOnlyReplica || config.isReadOnlyPrimary)) {
NRTReplicationEngine(config)
} else {
ReplicationEngine(config)
}
})
} else {
Optional.empty()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ class RemoteClusterRepository(private val repositoryMetadata: RepositoryMetadata
fun repoForCluster(leaderClusterName: String): String = REMOTE_REPOSITORY_PREFIX + leaderClusterName
}



override fun getRestoreThrottleTimeInNanos(): Long {
return restoreRateLimitingTimeInNanos.count()
}
Expand Down Expand Up @@ -238,10 +236,12 @@ class RemoteClusterRepository(private val repositoryMetadata: RepositoryMetadata
val builder = Settings.builder().put(indexMetadata.settings)
val replicatedIndex = "${repositoryMetadata.leaderClusterName()}:${index.name}"
builder.put(ReplicationPlugin.REPLICATED_INDEX_SETTING.key, replicatedIndex)
builder.put(IndexMetadata.INDEX_REMOTE_REPLICATION_TYPE_SETTING.key, "SEGMENT")

// Remove translog pruning for the follower index
builder.remove(REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING.key)


val indexMdBuilder = IndexMetadata.builder(indexMetadata).settings(builder)
indexMetadata.aliases.valuesIt().forEach {
indexMdBuilder.putAlias(it)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
package org.opensearch.replication.seqno

import org.apache.logging.log4j.LogManager
import org.opensearch.ResourceNotFoundException
import org.opensearch.common.component.AbstractLifecycleComponent
import org.opensearch.common.inject.Singleton
import org.opensearch.index.engine.Engine
import org.opensearch.index.shard.IndexShard
import org.opensearch.index.translog.Translog

Expand All @@ -38,7 +36,14 @@ class RemoteClusterTranslogService : AbstractLifecycleComponent(){
public fun getHistoryOfOperations(indexShard: IndexShard, startSeqNo: Long, toSeqNo: Long): List<Translog.Operation> {
log.trace("Fetching translog snapshot for $indexShard - from $startSeqNo to $toSeqNo")
// Ref issue: https://github.com/opensearch-project/OpenSearch/issues/2482
val snapshot = indexShard.getHistoryOperationsFromTranslogFile(SOURCE_NAME, startSeqNo, toSeqNo)
//val snapshot = indexShard.getHistoryOperationsFromTranslogFile(SOURCE_NAME, startSeqNo, toSeqNo)
val snapshot = object: Translog.Snapshot {
override fun close() { }
override fun totalOperations(): Int { return 0 }
override fun next(): Translog.Operation { throw Exception("") }
}



// Total ops to be fetched (both toSeqNo and startSeqNo are inclusive)
val opsSize = toSeqNo - startSeqNo + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@ import org.opensearch.action.ActionListener
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest
import org.opensearch.action.admin.indices.create.CreateIndexRequestBuilder
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest
import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequestBuilder
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequestBuilder
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.opensearch.client.Client
import org.opensearch.client.Requests
Expand Down Expand Up @@ -93,7 +97,7 @@ import org.opensearch.rest.RestStatus
import org.opensearch.tasks.TaskId
import org.opensearch.tasks.TaskManager
import org.opensearch.threadpool.ThreadPool
import java.util.Collections
import java.util.*
import java.util.function.Predicate
import java.util.stream.Collectors
import kotlin.coroutines.resume
Expand Down Expand Up @@ -143,7 +147,10 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING,
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING,
IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING
IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING,
// TODO: Should also be removing other settings like index.creation_date, index.provided_name, index.uuid, index.version.created
IndexMetadata.INDEX_REMOTE_REPLICATION_TYPE_SETTING,
IndexMetadata.INDEX_REPLICATION_TYPE_SETTING
)

val blockListedSettings :Set<String> = blSettings.stream().map { k -> k.key }.collect(Collectors.toSet())
Expand Down Expand Up @@ -409,7 +416,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
private suspend fun pollForMetadata(scope: CoroutineScope) {
while (scope.isActive) {
try {
log.debug("Polling for metadata for $followerIndexName")
log.info("Polling for metadata for $followerIndexName")
var staticUpdated = false
var gsr = GetSettingsRequest().includeDefaults(false).indices(this.leaderIndex.name)
var settingsResponse = remoteClient.suspending(remoteClient.admin().indices()::getSettings, injectSecurityContext = true)(gsr)
Expand All @@ -421,6 +428,9 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
leaderSettings = leaderSettings.filter { k: String? ->
!blockListedSettings.contains(k)
}
log.info("Filtered leaser settings are $leaderSettings")

//leaderSettings.filter()

gsr = GetSettingsRequest().includeDefaults(false).indices(this.followerIndexName)
settingsResponse = client.suspending(client.admin().indices()::getSettings, injectSecurityContext = true)(gsr)
Expand All @@ -436,6 +446,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
val indexScopedSettings = settingsModule.indexScopedSettings

val settingsList = arrayOf(leaderSettings, overriddenSettings)
log.info("Final settings to be updated on follower are $leaderSettings")
val desiredSettingsBuilder = Settings.builder()
// Desired settings are taking leader Settings and then overriding them with desired settings
for (settings in settingsList) {
Expand All @@ -445,10 +456,12 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
}
val setting = indexScopedSettings[key]
if (!setting.isPrivateIndex) {
desiredSettingsBuilder.copy(key, settings);
desiredSettingsBuilder.copy(key, settings)
}
}
}
desiredSettingsBuilder.copy(IndexMetadata.SETTING_REMOTE_REPLICATION_TYPE,followerSettings)
desiredSettingsBuilder.copy(IndexMetadata.SETTING_REPLICATION_TYPE,followerSettings)
val desiredSettings = desiredSettingsBuilder.build()

val changedSettingsBuilder = Settings.builder()
Expand Down Expand Up @@ -744,6 +757,43 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
log.error("Unable to update setting for translog pruning based on retention lease")
}


//Get leader settings
val getSettingRequest = GetSettingsRequest().includeDefaults(false).indices(leaderIndex.name)

val settingsResponse = remoteClient.suspending(remoteClient.admin().indices()::getSettings,
injectSecurityContext = true)(getSettingRequest)
val leaderSettings = settingsResponse.indexToSettings.get(leaderIndex.name)


//Update Follower settings
val followerSettingBuilder = Settings.builder()
followerSettingBuilder.put(leaderSettings)
val replicatedIndex = "${leaderAlias}:${leaderIndex}" //TODO: fix the replicated Index value
followerSettingBuilder.put(REPLICATED_INDEX_SETTING.key, replicatedIndex)
followerSettingBuilder.put(IndexMetadata.INDEX_REMOTE_REPLICATION_TYPE_SETTING.key, "SEGMENT")
// Remove translog pruning for the follower index
followerSettingBuilder.remove(REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING.key)
followerSettingBuilder.remove("index.creation_date")
followerSettingBuilder.remove("index.uuid")
followerSettingBuilder.remove("index.version.created")
followerSettingBuilder.remove("index.provided_name")


val getMappingsRequest = GetMappingsRequest().indices(leaderIndex.name)
val getMappingsResponse = remoteClient.suspending(remoteClient.admin().indices()::getMappings, injectSecurityContext = true)(getMappingsRequest)
val followerMappingSource = getMappingsResponse?.mappings()?.get(leaderIndex.name)?.source()?.string()


client.admin().indices().prepareCreate(followerIndexName)
.setSettings(followerSettingBuilder.build())
.setMapping(followerMappingSource)
.execute().get()

log.info("ankikala: Follower Index Created")

/*

val restoreRequest = client.admin().cluster()
.prepareRestoreSnapshot(RemoteClusterRepository.repoForCluster(leaderAlias), REMOTE_SNAPSHOT_NAME)
.setIndices(leaderIndex.name)
Expand All @@ -769,10 +819,13 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
return FailedState(Collections.emptyMap(), err)
}
cso.waitForNextChange("remote restore start") { inProgressRestore(it) != null }
*/
return RestoreState
}

private suspend fun waitForRestore(): IndexReplicationState {
return InitFollowState
/*
var restore = inProgressRestore(clusterService.state())

// Waiting for snapshot restore to reach a terminal stage.
Expand Down Expand Up @@ -809,6 +862,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
} else {
return InitFollowState
}
*/
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

package org.opensearch.replication.task.shard

import org.opensearch.action.support.replication.crosscluster.follower.SyncLeaderSegmentsRequest
import org.opensearch.action.support.replication.crosscluster.follower.SyncLeaderSegmentsAction
import org.opensearch.replication.ReplicationSettings
import org.opensearch.replication.action.changes.GetChangesAction
import org.opensearch.replication.action.changes.GetChangesRequest
Expand Down Expand Up @@ -50,6 +52,7 @@ import org.opensearch.index.seqno.RetentionLeaseNotFoundException
import org.opensearch.index.shard.ShardId
import org.opensearch.persistent.PersistentTaskState
import org.opensearch.persistent.PersistentTasksNodeService
import org.opensearch.replication.util.suspendExecute
import org.opensearch.rest.RestStatus
import org.opensearch.tasks.TaskId
import org.opensearch.threadpool.ThreadPool
Expand Down Expand Up @@ -198,6 +201,15 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
val followerIndexService = indicesService.indexServiceSafe(followerShardId.index)
val indexShard = followerIndexService.getShard(followerShardId.id)

addListenerToInterruptTask()
coroutineScope {
while (isActive) {
syncSegments()
delay(10000)
}
}

/*
try {
//Retention leases preserve the operations including and starting from the retainingSequenceNumber we specify when we take the lease .
//hence renew retention lease with lastSyncedGlobalCheckpoint + 1
Expand All @@ -209,6 +221,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
}

addListenerToInterruptTask()

this.followerClusterStats.stats[followerShardId] = FollowerShardMetric()

// Since this setting is not dynamic, setting update would only reflect after pause-resume or on a new replication job.
Expand Down Expand Up @@ -293,6 +306,15 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
}
}
sequencer.close()
*/
}
private suspend fun syncSegments(){
val remoteClient = client.getRemoteClusterClient(leaderAlias)
val request = SyncLeaderSegmentsRequest(leaderAlias, leaderShardId, followerShardId)
logInfo("ankikala: Fetching changes from leader")
var changesResp = client.suspendExecute(replicationMetadata = replicationMetadata,
action = SyncLeaderSegmentsAction.INSTANCE, req = request)
logInfo("ankikala: Sync complete")
}

private suspend fun getChanges(fromSeqNo: Long, toSeqNo: Long): GetChangesResponse {
Expand Down
Loading