Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [RW Separation] Add polling segment replication for search replicas (#15627) #15717

Merged
merged 2 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494))
- [Range Queries] Add new approximateable query framework to short-circuit range queries ([#13788](https://github.com/opensearch-project/OpenSearch/pull/13788))
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
- [Reader Writer Separation] Add experimental search replica shard type to achieve reader writer separation ([#15237](https://github.com/opensearch-project/OpenSearch/pull/15237))
- Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290))
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))
- Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.nio.file.Path;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT {

private static final String REPOSITORY_NAME = "test-remote-store-repo";
protected Path absolutePath;

private Boolean useRemoteStore;

@Before
public void randomizeRemoteStoreEnabled() {
useRemoteStore = randomBoolean();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (useRemoteStore) {
if (absolutePath == null) {
absolutePath = randomRepoPath().toAbsolutePath();
}
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
.build();
}
return super.nodeSettings(nodeOrdinal);
}

@After
public void teardown() {
if (useRemoteStore) {
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
}
}

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build();
}

public void testReplication() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final int docCount = 10;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
waitForSearchableDocs(docCount, primary, replica);
}

}
54 changes: 53 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.similarity.SimilarityService;
Expand Down Expand Up @@ -729,6 +730,56 @@
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings
) throws IOException {
return newIndexService(

Check warning on line 734 in server/src/main/java/org/opensearch/index/IndexModule.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexModule.java#L734

Added line #L734 was not covered by tests
indexCreationContext,
environment,
xContentRegistry,
shardStoreDeleter,
circuitBreakerService,
bigArrays,
threadPool,
scriptService,
clusterService,
client,
indicesQueryCache,
mapperRegistry,
indicesFieldDataCache,
namedWriteableRegistry,
idFieldDataEnabled,
valuesSourceRegistry,
remoteDirectoryFactory,
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier,
recoverySettings,
remoteStoreSettings,
(s) -> {}

Check warning on line 756 in server/src/main/java/org/opensearch/index/IndexModule.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexModule.java#L756

Added line #L756 was not covered by tests
);
}

public IndexService newIndexService(
IndexService.IndexCreationContext indexCreationContext,
NodeEnvironment environment,
NamedXContentRegistry xContentRegistry,
IndexService.ShardStoreDeleter shardStoreDeleter,
CircuitBreakerService circuitBreakerService,
BigArrays bigArrays,
ThreadPool threadPool,
ScriptService scriptService,
ClusterService clusterService,
Client client,
IndicesQueryCache indicesQueryCache,
MapperRegistry mapperRegistry,
IndicesFieldDataCache indicesFieldDataCache,
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
Consumer<IndexShard> replicator
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -789,7 +840,8 @@
recoverySettings,
remoteStoreSettings,
fileCache,
compositeIndexSettings
compositeIndexSettings,
replicator
);
success = true;
return indexService;
Expand Down
73 changes: 70 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.common.collect.MapBuilder.newMapBuilder;
import static org.opensearch.common.util.FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING;
import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasRemoteStoreSettings;

/**
Expand Down Expand Up @@ -174,6 +175,7 @@
private volatile AsyncTranslogFSync fsyncTask;
private volatile AsyncGlobalCheckpointTask globalCheckpointTask;
private volatile AsyncRetentionLeaseSyncTask retentionLeaseSyncTask;
private volatile AsyncReplicationTask asyncReplicationTask;

// don't convert to Setting<> and register... we only set this in tests and register via a plugin
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
Expand All @@ -194,6 +196,7 @@
private final RemoteStoreSettings remoteStoreSettings;
private final FileCache fileCache;
private final CompositeIndexSettings compositeIndexSettings;
private final Consumer<IndexShard> replicator;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -231,7 +234,8 @@
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
FileCache fileCache,
CompositeIndexSettings compositeIndexSettings
CompositeIndexSettings compositeIndexSettings,
Consumer<IndexShard> replicator
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -306,11 +310,15 @@
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
if (READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(indexSettings.getNodeSettings())) {
this.asyncReplicationTask = new AsyncReplicationTask(this);
}
this.translogFactorySupplier = translogFactorySupplier;
this.recoverySettings = recoverySettings;
this.remoteStoreSettings = remoteStoreSettings;
this.compositeIndexSettings = compositeIndexSettings;
this.fileCache = fileCache;
this.replicator = replicator;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -387,7 +395,8 @@
recoverySettings,
remoteStoreSettings,
fileCache,
null
null,
(s) -> {}

Check warning on line 399 in server/src/main/java/org/opensearch/index/IndexService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexService.java#L399

Added line #L399 was not covered by tests
);
}

Expand Down Expand Up @@ -463,7 +472,8 @@
recoverySettings,
remoteStoreSettings,
null,
null
null,
s -> {}

Check warning on line 476 in server/src/main/java/org/opensearch/index/IndexService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexService.java#L476

Added line #L476 was not covered by tests
);
}

Expand All @@ -472,6 +482,11 @@
&& indexCreationContext == IndexCreationContext.CREATE_INDEX); // metadata verification needs a mapper service
}

// visible for tests
AsyncReplicationTask getReplicationTask() {
return asyncReplicationTask;
}

/**
* Context for index creation
*
Expand Down Expand Up @@ -1142,11 +1157,22 @@
}
onRefreshIntervalChange();
updateFsyncTaskIfNecessary();
if (READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(indexSettings.getNodeSettings())) {
updateReplicationTask();
}
}

metadataListeners.forEach(c -> c.accept(newIndexMetadata));
}

private void updateReplicationTask() {
try {
asyncReplicationTask.close();
} finally {
asyncReplicationTask = new AsyncReplicationTask(this);
}
}

/**
* Called whenever the refresh interval changes. This can happen in 2 cases -
* 1. {@code cluster.default.index.refresh_interval} cluster setting changes. The change would only happen for
Expand Down Expand Up @@ -1411,6 +1437,47 @@
}
}

final class AsyncReplicationTask extends BaseAsyncTask {

AsyncReplicationTask(IndexService indexService) {
super(indexService, indexService.getRefreshInterval());
}

@Override
protected void runInternal() {
indexService.maybeSyncSegments(false);
}

@Override
protected String getThreadPool() {
return ThreadPool.Names.GENERIC;
}

@Override
public String toString() {
return "replication";
}

@Override
protected boolean mustReschedule() {
return indexSettings.isSegRepEnabledOrRemoteNode() && super.mustReschedule();
}
}

private void maybeSyncSegments(boolean force) {
if (getRefreshInterval().millis() > 0 || force) {
for (IndexShard shard : this.shards.values()) {
try {
if (shard.routingEntry().isSearchOnly() && shard.routingEntry().active()) {
replicator.accept(shard);

Check warning on line 1472 in server/src/main/java/org/opensearch/index/IndexService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexService.java#L1472

Added line #L1472 was not covered by tests
}
} catch (IndexShardClosedException | AlreadyClosedException ex) {

Check warning on line 1474 in server/src/main/java/org/opensearch/index/IndexService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexService.java#L1474

Added line #L1474 was not covered by tests
// do nothing
}
}

Check warning on line 1477 in server/src/main/java/org/opensearch/index/IndexService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexService.java#L1476-L1477

Added lines #L1476 - L1477 were not covered by tests
}
}

final class AsyncTrimTranslogTask extends BaseAsyncTask {

AsyncTrimTranslogTask(IndexService indexService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1253,12 +1253,13 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return this.latestReplicationCheckpoint;
}

private boolean isPrimaryRelocation(String allocationId) {
// skip any shard that is a relocating primary or search only replica (not tracked by primary)
private boolean shouldSkipReplicationTimer(String allocationId) {
Optional<ShardRouting> shardRouting = routingTable.shards()
.stream()
.filter(routing -> routing.allocationId().getId().equals(allocationId))
.findAny();
return shardRouting.isPresent() && shardRouting.get().primary();
return shardRouting.isPresent() && (shardRouting.get().primary() || shardRouting.get().isSearchOnly());
}

private void createReplicationLagTimers() {
Expand All @@ -1270,7 +1271,7 @@ private void createReplicationLagTimers() {
// it is possible for a shard to be in-sync but not yet removed from the checkpoints collection after a failover event.
if (cps.inSync
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& isPrimaryRelocation(allocationId) == false
&& shouldSkipReplicationTimer(allocationId) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)
&& (indexSettings.isSegRepLocalEnabled() == true
|| isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(allocationId).currentNodeId()))) {
Expand Down Expand Up @@ -1304,7 +1305,7 @@ public synchronized void startReplicationLagTimers(ReplicationCheckpoint checkpo
final CheckpointState cps = e.getValue();
if (cps.inSync
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& isPrimaryRelocation(e.getKey()) == false
&& shouldSkipReplicationTimer(e.getKey()) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)
&& cps.checkpointTimers.containsKey(latestReplicationCheckpoint)) {
cps.checkpointTimers.get(latestReplicationCheckpoint).start();
Expand Down Expand Up @@ -1332,7 +1333,7 @@ public synchronized Set<SegmentReplicationShardStats> getSegmentReplicationStats
entry -> entry.getKey().equals(this.shardAllocationId) == false
&& entry.getValue().inSync
&& replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false
&& isPrimaryRelocation(entry.getKey()) == false
&& shouldSkipReplicationTimer(entry.getKey()) == false
/*Check if the current primary shard is migrating to remote and
all the other shard copies of the same index still hasn't completely moved over
to the remote enabled nodes. Ensures that:
Expand Down
Loading
Loading