Skip to content

Commit

Permalink
Segment Replication - Allow search idle with no replicas (opensearch-…
Browse files Browse the repository at this point in the history
…project#8173)

* Segment Replication - Allow shard idle when there are no replicas an index.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Add warning when updating search.idle.after with Segment Replication enabled.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* PR cleanup.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Add more tests.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Remove unnecessary assertBusy

Signed-off-by: Marc Handalian <handalm@amazon.com>

---------

Signed-off-by: Marc Handalian <handalm@amazon.com>
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
mch2 authored and shiv0408 committed Apr 25, 2024
1 parent 93ff6f1 commit 7488bee
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.core.rest.RestStatus;
Expand Down Expand Up @@ -370,6 +371,48 @@ public void testSearchIdle() throws Exception {
});
}

public void testSearchIdleWithSegmentReplication() {
int numOfReplicas = 1;
internalCluster().ensureAtLeastNumDataNodes(numOfReplicas + 1);
final Settings.Builder settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 5))
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplicas)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);
assertAcked(prepareCreate("test").setSettings(settings).setMapping("created_date", "type=date,format=yyyy-MM-dd"));
ensureGreen("test");
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(
Settings.builder()
.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMillis(randomIntBetween(50, 500)))
)
);

for (String node : internalCluster().nodesInclude("test")) {
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
for (IndexShard indexShard : indicesService.indexServiceSafe(resolveIndex("test"))) {
assertFalse(indexShard.isSearchIdleSupported());
}
}

assertAcked(
client().admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0))
);

for (String node : internalCluster().nodesInclude("test")) {
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
for (IndexShard indexShard : indicesService.indexServiceSafe(resolveIndex("test"))) {
assertTrue(indexShard.isSearchIdleSupported());
}
}
;
}

public void testCircuitBreakerReduceFail() throws Exception {
int numShards = randomIntBetween(1, 10);
indexSomeDocs("test", numShards, numShards * 3);
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,9 @@ private void setSearchSegmentOrderReversed(boolean reversed) {
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
if (this.replicationType == ReplicationType.SEGMENT && this.getNumberOfReplicas() > 0) {
logger.warn("Search idle is not supported for indices with replicas using 'replication.type: SEGMENT'");
}
this.searchIdleAfter = searchIdleAfter;
}

Expand Down
18 changes: 14 additions & 4 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4282,12 +4282,9 @@ public boolean scheduledRefresh() {
boolean listenerNeedsRefresh = refreshListeners.refreshNeeded();
if (isReadAllowed() && (listenerNeedsRefresh || getEngine().refreshNeeded())) {
if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it
&& isSearchIdleSupported()
&& isSearchIdle()
&& indexSettings.isExplicitRefresh() == false
&& indexSettings.isSegRepEnabled() == false
// Indices with segrep enabled will never wait on a refresh and ignore shard idle. Primary shards push out new segments only
// after a refresh, so we don't want to wait for a search to trigger that cycle. Replicas will only refresh after receiving
// a new set of segments.
&& active.get()) { // it must be active otherwise we might not free up segment memory once the shard became inactive
// lets skip this refresh since we are search idle and
// don't necessarily need to refresh. the next searcher access will register a refreshListener and that will
Expand Down Expand Up @@ -4315,6 +4312,19 @@ public final boolean isSearchIdle() {
return (threadPool.relativeTimeInMillis() - lastSearcherAccess.get()) >= indexSettings.getSearchIdleAfter().getMillis();
}

/**
*
* Returns true if this shard supports search idle.
*
* Indices using Segment Replication will ignore search idle unless there are no replicas.
* Primary shards push out new segments only
* after a refresh, so we don't want to wait for a search to trigger that cycle. Replicas will only refresh after receiving
* a new set of segments.
*/
public final boolean isSearchIdleSupported() {
return indexSettings.isSegRepEnabled() == false || indexSettings.getNumberOfReplicas() == 0;
}

/**
* Returns the last timestamp the searcher was accessed. This is a relative timestamp in milliseconds.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,37 +410,68 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception {
}

public void testIgnoreShardIdle() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
Settings updatedSettings = Settings.builder()
.put(settings)
.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO)
.build();
try (ReplicationGroup shards = createGroup(1, updatedSettings, new NRTReplicationEngineFactory())) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

final int numDocs = shards.indexDocs(randomInt(10));
primary.refresh("test");
final int numDocs = shards.indexDocs(randomIntBetween(1, 10));
// ensure search idle conditions are met.
assertTrue(primary.isSearchIdle());
assertTrue(replica.isSearchIdle());

// invoke scheduledRefresh, returns true if refresh is immediately invoked.
assertTrue(primary.scheduledRefresh());
// replica would always return false here as there is no indexed doc to refresh on.
assertFalse(replica.scheduledRefresh());

// assert there is no pending refresh
assertFalse(primary.hasRefreshPending());
assertFalse(replica.hasRefreshPending());
shards.refresh("test");
replicateSegments(primary, shards.getReplicas());
shards.assertAllEqual(numDocs);
}
}

primary.scheduledRefresh();
replica.scheduledRefresh();

primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));
replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));

// Update the search_idle setting, this will put both shards into search idle.
Settings updatedSettings = Settings.builder()
.put(settings)
.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO)
.build();
primary.indexSettings().getScopedSettings().applySettings(updatedSettings);
replica.indexSettings().getScopedSettings().applySettings(updatedSettings);

primary.scheduledRefresh();
replica.scheduledRefresh();
public void testShardIdle_Docrep() throws Exception {
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
.build();
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);
final int numDocs = shards.indexDocs(randomIntBetween(1, 10));
// ensure search idle conditions are met.
assertTrue(primary.isSearchIdle());
assertTrue(replica.isSearchIdle());
assertFalse(primary.scheduledRefresh());
assertFalse(replica.scheduledRefresh());
assertTrue(primary.hasRefreshPending());
assertTrue(replica.hasRefreshPending());
shards.refresh("test");
shards.assertAllEqual(numDocs);
}
}

// Shards without segrep will register a new RefreshListener on the engine and return true when registered,
// assert with segrep enabled that awaitShardSearchActive does not register a listener.
primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));
replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));
public void testShardIdleWithNoReplicas() throws Exception {
Settings updatedSettings = Settings.builder()
.put(settings)
.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO)
.build();
try (ReplicationGroup shards = createGroup(0, updatedSettings, new NRTReplicationEngineFactory())) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
shards.indexDocs(randomIntBetween(1, 10));
// ensure search idle conditions are met.
assertTrue(primary.isSearchIdle());
assertFalse(primary.scheduledRefresh());
assertTrue(primary.hasRefreshPending());
}
}

Expand Down

0 comments on commit 7488bee

Please sign in to comment.