Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segment Replication - Allow search idle with no replicas #8173

Merged
merged 5 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.core.rest.RestStatus;
Expand Down Expand Up @@ -370,6 +371,48 @@ public void testSearchIdle() throws Exception {
});
}

public void testSearchIdleWithSegmentReplication() {
int numOfReplicas = 1;
internalCluster().ensureAtLeastNumDataNodes(numOfReplicas + 1);
final Settings.Builder settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 5))
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplicas)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);
assertAcked(prepareCreate("test").setSettings(settings).setMapping("created_date", "type=date,format=yyyy-MM-dd"));
ensureGreen("test");
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(
Settings.builder()
.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMillis(randomIntBetween(50, 500)))
)
);

for (String node : internalCluster().nodesInclude("test")) {
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
for (IndexShard indexShard : indicesService.indexServiceSafe(resolveIndex("test"))) {
assertFalse(indexShard.isSearchIdleSupported());
}
}

assertAcked(
client().admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0))
);

for (String node : internalCluster().nodesInclude("test")) {
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
for (IndexShard indexShard : indicesService.indexServiceSafe(resolveIndex("test"))) {
assertTrue(indexShard.isSearchIdleSupported());
}
}
;
}

public void testCircuitBreakerReduceFail() throws Exception {
int numShards = randomIntBetween(1, 10);
indexSomeDocs("test", numShards, numShards * 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,9 @@ private void setSearchSegmentOrderReversed(boolean reversed) {
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
if (this.replicationType == ReplicationType.SEGMENT && this.getNumberOfReplicas() > 0) {
logger.warn("Search idle is not supported for indices with replicas using 'replication.type: SEGMENT'");
}
this.searchIdleAfter = searchIdleAfter;
}

Expand Down
18 changes: 14 additions & 4 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4282,12 +4282,9 @@ public boolean scheduledRefresh() {
boolean listenerNeedsRefresh = refreshListeners.refreshNeeded();
if (isReadAllowed() && (listenerNeedsRefresh || getEngine().refreshNeeded())) {
if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it
&& isSearchIdleSupported()
&& isSearchIdle()
&& indexSettings.isExplicitRefresh() == false
&& indexSettings.isSegRepEnabled() == false
// Indices with segrep enabled will never wait on a refresh and ignore shard idle. Primary shards push out new segments only
// after a refresh, so we don't want to wait for a search to trigger that cycle. Replicas will only refresh after receiving
// a new set of segments.
&& active.get()) { // it must be active otherwise we might not free up segment memory once the shard became inactive
// lets skip this refresh since we are search idle and
// don't necessarily need to refresh. the next searcher access will register a refreshListener and that will
Expand Down Expand Up @@ -4315,6 +4312,19 @@ public final boolean isSearchIdle() {
return (threadPool.relativeTimeInMillis() - lastSearcherAccess.get()) >= indexSettings.getSearchIdleAfter().getMillis();
}

/**
*
* Returns true if this shard supports search idle.
*
* Indices using Segment Replication will ignore search idle unless there are no replicas.
* Primary shards push out new segments only
* after a refresh, so we don't want to wait for a search to trigger that cycle. Replicas will only refresh after receiving
* a new set of segments.
*/
public final boolean isSearchIdleSupported() {
return indexSettings.isSegRepEnabled() == false || indexSettings.getNumberOfReplicas() == 0;
}

/**
* Returns the last timestamp the searcher was accessed. This is a relative timestamp in milliseconds.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,37 +410,68 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception {
}

public void testIgnoreShardIdle() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
Settings updatedSettings = Settings.builder()
.put(settings)
.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO)
.build();
try (ReplicationGroup shards = createGroup(1, updatedSettings, new NRTReplicationEngineFactory())) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

final int numDocs = shards.indexDocs(randomInt(10));
primary.refresh("test");
final int numDocs = shards.indexDocs(randomIntBetween(1, 10));
// ensure search idle conditions are met.
assertTrue(primary.isSearchIdle());
assertTrue(replica.isSearchIdle());

// invoke scheduledRefresh, returns true if refresh is immediately invoked.
assertTrue(primary.scheduledRefresh());
// replica would always return false here as there is no indexed doc to refresh on.
assertFalse(replica.scheduledRefresh());

// assert there is no pending refresh
assertFalse(primary.hasRefreshPending());
assertFalse(replica.hasRefreshPending());
shards.refresh("test");
replicateSegments(primary, shards.getReplicas());
shards.assertAllEqual(numDocs);
}
}

primary.scheduledRefresh();
replica.scheduledRefresh();

primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));
replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));

// Update the search_idle setting, this will put both shards into search idle.
Settings updatedSettings = Settings.builder()
.put(settings)
.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO)
.build();
primary.indexSettings().getScopedSettings().applySettings(updatedSettings);
replica.indexSettings().getScopedSettings().applySettings(updatedSettings);

primary.scheduledRefresh();
replica.scheduledRefresh();
public void testShardIdle_Docrep() throws Exception {
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
.build();
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);
final int numDocs = shards.indexDocs(randomIntBetween(1, 10));
// ensure search idle conditions are met.
assertTrue(primary.isSearchIdle());
assertTrue(replica.isSearchIdle());
assertFalse(primary.scheduledRefresh());
assertFalse(replica.scheduledRefresh());
assertTrue(primary.hasRefreshPending());
assertTrue(replica.hasRefreshPending());
shards.refresh("test");
shards.assertAllEqual(numDocs);
}
}

// Shards without segrep will register a new RefreshListener on the engine and return true when registered,
// assert with segrep enabled that awaitShardSearchActive does not register a listener.
primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));
replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));
public void testShardIdleWithNoReplicas() throws Exception {
Settings updatedSettings = Settings.builder()
.put(settings)
.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO)
.build();
try (ReplicationGroup shards = createGroup(0, updatedSettings, new NRTReplicationEngineFactory())) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
shards.indexDocs(randomIntBetween(1, 10));
// ensure search idle conditions are met.
assertTrue(primary.isSearchIdle());
assertFalse(primary.scheduledRefresh());
assertTrue(primary.hasRefreshPending());
}
}

Expand Down