Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segment Replication - Allow shard idle when there are no replicas in index #7736

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -370,6 +371,34 @@ public void testSearchIdle() throws Exception {
});
}

public void testSearchIdle_SegmentReplication() {
// fail with replica count > 0
int numOfReplicas = 1;
internalCluster().ensureAtLeastNumDataNodes(numOfReplicas + 1);
final Settings.Builder settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 5))
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplicas)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMillis(randomIntBetween(50, 500)));

expectThrows(IllegalArgumentException.class, () -> {
assertAcked(prepareCreate("test").setSettings(settings).setMapping("created_date", "type=date,format=yyyy-MM-dd"));
});

// setting allowed with 0 replicas.
settings.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);
assertAcked(prepareCreate("test").setSettings(settings).setMapping("created_date", "type=date,format=yyyy-MM-dd"));

// add a replica
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);

}

public void testCircuitBreakerReduceFail() throws Exception {
int numShards = randomIntBetween(1, 10);
indexSomeDocs("test", numShards, numShards * 3);
Expand Down
33 changes: 33 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@
import org.opensearch.search.pipeline.SearchPipelineService;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand All @@ -70,6 +72,7 @@
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING;
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectory.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;

/**
* This class encapsulates all index level settings and handles settings updates.
Expand Down Expand Up @@ -122,6 +125,36 @@ public final class IndexSettings {
"index.search.idle.after",
TimeValue.timeValueSeconds(30),
TimeValue.timeValueMinutes(0),
new Setting.Validator<>() {

@Override
public void validate(TimeValue value) {

}

@Override
public void validate(TimeValue value, Map<Setting<?>, Object> settings, boolean isPresent) {
if (isPresent) {
final Object clusterSettingReplicationType = settings.get(CLUSTER_REPLICATION_TYPE_SETTING);
final Object replicationType = settings.get(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING);
int numReplicas = (int) settings.get(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING);
if ((ReplicationType.SEGMENT.equals(clusterSettingReplicationType) || ReplicationType.SEGMENT.equals(replicationType))
&& numReplicas > 0) {
throw new IllegalArgumentException("Shard idle is disabled for indices with replicas using Segment Replication");
}
}
}

@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = List.of(
CLUSTER_REPLICATION_TYPE_SETTING,
IndexMetadata.INDEX_REPLICATION_TYPE_SETTING,
IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING
);
return settings.iterator();
}
},
Property.IndexScope,
Property.Dynamic
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4224,10 +4224,11 @@ public boolean scheduledRefresh() {
if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it
&& isSearchIdle()
&& indexSettings.isExplicitRefresh() == false
&& indexSettings.isSegRepEnabled() == false
// Indices with segrep enabled will never wait on a refresh and ignore shard idle. Primary shards push out new segments only
// Indices with segrep enabled will never wait on a refresh and ignore shard idle unless there are no replicas. Primary
// shards push out new segments only
// after a refresh, so we don't want to wait for a search to trigger that cycle. Replicas will only refresh after receiving
// a new set of segments.
&& (indexSettings.isSegRepEnabled() == false || indexSettings.getNumberOfReplicas() == 0)
&& active.get()) { // it must be active otherwise we might not free up segment memory once the shard became inactive
// lets skip this refresh since we are search idle and
// don't necessarily need to refresh. the next searcher access will register a refreshListener and that will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,37 +220,68 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception {
}

public void testIgnoreShardIdle() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
Settings updatedSettings = Settings.builder()
.put(settings)
.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO)
.build();
try (ReplicationGroup shards = createGroup(1, updatedSettings, new NRTReplicationEngineFactory())) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

final int numDocs = shards.indexDocs(randomInt(10));
primary.refresh("test");
final int numDocs = shards.indexDocs(randomIntBetween(1, 10));
// ensure search idle conditions are met.
assertTrue(primary.isSearchIdle());
assertTrue(replica.isSearchIdle());

// invoke scheduledRefresh, returns true if refresh is immediately invoked.
assertTrue(primary.scheduledRefresh());
// replica would always return false here as there is no indexed doc to refresh on.
assertFalse(replica.scheduledRefresh());

// assert there is no pending refresh
assertFalse(primary.hasRefreshPending());
assertFalse(replica.hasRefreshPending());
shards.refresh("test");
replicateSegments(primary, shards.getReplicas());
shards.assertAllEqual(numDocs);
}
}

primary.scheduledRefresh();
replica.scheduledRefresh();

primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));
replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));

// Update the search_idle setting, this will put both shards into search idle.
Settings updatedSettings = Settings.builder()
.put(settings)
.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO)
.build();
primary.indexSettings().getScopedSettings().applySettings(updatedSettings);
replica.indexSettings().getScopedSettings().applySettings(updatedSettings);

primary.scheduledRefresh();
replica.scheduledRefresh();
public void testShardIdle_Docrep() throws Exception {
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
.build();
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);
final int numDocs = shards.indexDocs(randomIntBetween(1, 10));
// ensure search idle conditions are met.
assertTrue(primary.isSearchIdle());
assertTrue(replica.isSearchIdle());
assertFalse(primary.scheduledRefresh());
assertFalse(replica.scheduledRefresh());
assertTrue(primary.hasRefreshPending());
assertTrue(replica.hasRefreshPending());
shards.refresh("test");
shards.assertAllEqual(numDocs);
}
}

// Shards without segrep will register a new RefreshListener on the engine and return true when registered,
// assert with segrep enabled that awaitShardSearchActive does not register a listener.
primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));
replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));
public void testShardIdleWithNoReplicas() throws Exception {
Settings updatedSettings = Settings.builder()
.put(settings)
.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO)
.build();
try (ReplicationGroup shards = createGroup(0, updatedSettings, new NRTReplicationEngineFactory())) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
shards.indexDocs(randomIntBetween(1, 10));
// ensure search idle conditions are met.
assertTrue(primary.isSearchIdle());
assertFalse(primary.scheduledRefresh());
assertTrue(primary.hasRefreshPending());
}
}

Expand Down