From 7a6c2facc3929782840daddcb9c1bac357dd6b03 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Wed, 24 May 2023 10:21:52 -0700 Subject: [PATCH 1/5] Segment Replication - Allow shard idle when there are no replicas an index. Signed-off-by: Marc Handalian --- .../opensearch/index/shard/IndexShard.java | 5 +- .../SegmentReplicationIndexShardTests.java | 79 +++++++++++++------ 2 files changed, 58 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 8541a1f5e554b..94329093a0557 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4284,10 +4284,11 @@ public boolean scheduledRefresh() { if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it && isSearchIdle() && indexSettings.isExplicitRefresh() == false - && indexSettings.isSegRepEnabled() == false - // Indices with segrep enabled will never wait on a refresh and ignore shard idle. Primary shards push out new segments only + // Indices with segrep enabled will never wait on a refresh and ignore shard idle unless there are no replicas. Primary + // shards push out new segments only // after a refresh, so we don't want to wait for a search to trigger that cycle. Replicas will only refresh after receiving // a new set of segments. + && (indexSettings.isSegRepEnabled() == false || indexSettings.getNumberOfReplicas() == 0) && active.get()) { // it must be active otherwise we might not free up segment memory once the shard became inactive // lets skip this refresh since we are search idle and // don't necessarily need to refresh. the next searcher access will register a refreshListener and that will diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 1f5980ba9bfe0..aa590358efa25 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -410,37 +410,68 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception { } public void testIgnoreShardIdle() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + Settings updatedSettings = Settings.builder() + .put(settings) + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO) + .build(); + try (ReplicationGroup shards = createGroup(1, updatedSettings, new NRTReplicationEngineFactory())) { shards.startAll(); final IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); - - final int numDocs = shards.indexDocs(randomInt(10)); - primary.refresh("test"); + final int numDocs = shards.indexDocs(randomIntBetween(1, 10)); + // ensure search idle conditions are met. + assertTrue(primary.isSearchIdle()); + assertTrue(replica.isSearchIdle()); + + // invoke scheduledRefresh, returns true if refresh is immediately invoked. + assertTrue(primary.scheduledRefresh()); + // replica would always return false here as there is no indexed doc to refresh on. + assertFalse(replica.scheduledRefresh()); + + // assert there is no pending refresh + assertFalse(primary.hasRefreshPending()); + assertFalse(replica.hasRefreshPending()); + shards.refresh("test"); replicateSegments(primary, shards.getReplicas()); shards.assertAllEqual(numDocs); + } + } - primary.scheduledRefresh(); - replica.scheduledRefresh(); - - primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); - replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); - - // Update the search_idle setting, this will put both shards into search idle. - Settings updatedSettings = Settings.builder() - .put(settings) - .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO) - .build(); - primary.indexSettings().getScopedSettings().applySettings(updatedSettings); - replica.indexSettings().getScopedSettings().applySettings(updatedSettings); - - primary.scheduledRefresh(); - replica.scheduledRefresh(); + public void testShardIdle_Docrep() throws Exception { + Settings settings = Settings.builder() + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT) + .build(); + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + final int numDocs = shards.indexDocs(randomIntBetween(1, 10)); + // ensure search idle conditions are met. + assertTrue(primary.isSearchIdle()); + assertTrue(replica.isSearchIdle()); + assertFalse(primary.scheduledRefresh()); + assertFalse(replica.scheduledRefresh()); + assertTrue(primary.hasRefreshPending()); + assertTrue(replica.hasRefreshPending()); + shards.refresh("test"); + shards.assertAllEqual(numDocs); + } + } - // Shards without segrep will register a new RefreshListener on the engine and return true when registered, - // assert with segrep enabled that awaitShardSearchActive does not register a listener. - primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); - replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + public void testShardIdleWithNoReplicas() throws Exception { + Settings updatedSettings = Settings.builder() + .put(settings) + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO) + .build(); + try (ReplicationGroup shards = createGroup(0, updatedSettings, new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + shards.indexDocs(randomIntBetween(1, 10)); + // ensure search idle conditions are met. + assertTrue(primary.isSearchIdle()); + assertFalse(primary.scheduledRefresh()); + assertTrue(primary.hasRefreshPending()); } } From 127cf8777df204d9e6c10022edd245901051be95 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 20 Jun 2023 08:43:32 -0700 Subject: [PATCH 2/5] Add warning when updating search.idle.after with Segment Replication enabled. Signed-off-by: Marc Handalian --- server/src/main/java/org/opensearch/index/IndexSettings.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 30c9efde19cdc..bc853adb757cb 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -930,6 +930,9 @@ private void setSearchSegmentOrderReversed(boolean reversed) { } private void setSearchIdleAfter(TimeValue searchIdleAfter) { + if (this.replicationType == ReplicationType.SEGMENT && this.getNumberOfReplicas() > 0) { + logger.warn("Search idle is disabled for indices with replicas using the Segment Replication strategy"); + } this.searchIdleAfter = searchIdleAfter; } From 84b7e5bd3d02ff7f27e0e3b788208fd9eea6b66f Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 20 Jun 2023 11:54:02 -0700 Subject: [PATCH 3/5] PR cleanup. Signed-off-by: Marc Handalian --- .../org/opensearch/index/IndexSettings.java | 2 +- .../opensearch/index/shard/IndexShard.java | 19 ++++++++++++++----- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index bc853adb757cb..732ae2f765284 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -931,7 +931,7 @@ private void setSearchSegmentOrderReversed(boolean reversed) { private void setSearchIdleAfter(TimeValue searchIdleAfter) { if (this.replicationType == ReplicationType.SEGMENT && this.getNumberOfReplicas() > 0) { - logger.warn("Search idle is disabled for indices with replicas using the Segment Replication strategy"); + logger.warn("Search idle is not supported for indices with replicas using 'replication.type: SEGMENT"); } this.searchIdleAfter = searchIdleAfter; } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 94329093a0557..399a520057e67 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4282,13 +4282,9 @@ public boolean scheduledRefresh() { boolean listenerNeedsRefresh = refreshListeners.refreshNeeded(); if (isReadAllowed() && (listenerNeedsRefresh || getEngine().refreshNeeded())) { if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it + && isSearchIdleSupported() && isSearchIdle() && indexSettings.isExplicitRefresh() == false - // Indices with segrep enabled will never wait on a refresh and ignore shard idle unless there are no replicas. Primary - // shards push out new segments only - // after a refresh, so we don't want to wait for a search to trigger that cycle. Replicas will only refresh after receiving - // a new set of segments. - && (indexSettings.isSegRepEnabled() == false || indexSettings.getNumberOfReplicas() == 0) && active.get()) { // it must be active otherwise we might not free up segment memory once the shard became inactive // lets skip this refresh since we are search idle and // don't necessarily need to refresh. the next searcher access will register a refreshListener and that will @@ -4316,6 +4312,19 @@ public final boolean isSearchIdle() { return (threadPool.relativeTimeInMillis() - lastSearcherAccess.get()) >= indexSettings.getSearchIdleAfter().getMillis(); } + /** + * + * Returns true if this shard supports search idle. + * + * Indices using Segment Replication will ignore search idle unless there are no replicas. + * Primary shards push out new segments only + * after a refresh, so we don't want to wait for a search to trigger that cycle. Replicas will only refresh after receiving + * a new set of segments. + */ + private boolean isSearchIdleSupported() { + return indexSettings.isSegRepEnabled() == false || indexSettings.getNumberOfReplicas() == 0; + } + /** * Returns the last timestamp the searcher was accessed. This is a relative timestamp in milliseconds. */ From dcbc2e56520114cb16eb555f56c2cef3f5ffbe9d Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 20 Jun 2023 15:00:13 -0700 Subject: [PATCH 4/5] Add more tests. Signed-off-by: Marc Handalian --- .../action/search/TransportSearchIT.java | 46 +++++++++++++++++++ .../org/opensearch/index/IndexSettings.java | 2 +- .../opensearch/index/shard/IndexShard.java | 2 +- 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/action/search/TransportSearchIT.java b/server/src/internalClusterTest/java/org/opensearch/action/search/TransportSearchIT.java index c22319271b4cf..3ca6cf8e23b7e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/search/TransportSearchIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/search/TransportSearchIT.java @@ -61,6 +61,7 @@ import org.opensearch.index.query.RangeQueryBuilder; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.SearchPlugin; import org.opensearch.core.rest.RestStatus; @@ -370,6 +371,51 @@ public void testSearchIdle() throws Exception { }); } + public void testSearchIdleWithSegmentReplication() throws Exception { + int numOfReplicas = 1; + internalCluster().ensureAtLeastNumDataNodes(numOfReplicas + 1); + final Settings.Builder settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplicas) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + assertAcked(prepareCreate("test").setSettings(settings).setMapping("created_date", "type=date,format=yyyy-MM-dd")); + ensureGreen("test"); + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings("test") + .setSettings( + Settings.builder() + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMillis(randomIntBetween(50, 500))) + ) + ); + + assertBusy(() -> { + for (String node : internalCluster().nodesInclude("test")) { + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + for (IndexShard indexShard : indicesService.indexServiceSafe(resolveIndex("test"))) { + assertFalse(indexShard.isSearchIdleSupported()); + } + } + }); + + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings("test") + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)) + ); + + assertBusy(() -> { + for (String node : internalCluster().nodesInclude("test")) { + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + for (IndexShard indexShard : indicesService.indexServiceSafe(resolveIndex("test"))) { + assertTrue(indexShard.isSearchIdleSupported()); + } + } + }); + } + public void testCircuitBreakerReduceFail() throws Exception { int numShards = randomIntBetween(1, 10); indexSomeDocs("test", numShards, numShards * 3); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 732ae2f765284..5b96b09a65cea 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -931,7 +931,7 @@ private void setSearchSegmentOrderReversed(boolean reversed) { private void setSearchIdleAfter(TimeValue searchIdleAfter) { if (this.replicationType == ReplicationType.SEGMENT && this.getNumberOfReplicas() > 0) { - logger.warn("Search idle is not supported for indices with replicas using 'replication.type: SEGMENT"); + logger.warn("Search idle is not supported for indices with replicas using 'replication.type: SEGMENT'"); } this.searchIdleAfter = searchIdleAfter; } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 399a520057e67..5deaf5f9a483a 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4321,7 +4321,7 @@ public final boolean isSearchIdle() { * after a refresh, so we don't want to wait for a search to trigger that cycle. Replicas will only refresh after receiving * a new set of segments. */ - private boolean isSearchIdleSupported() { + public final boolean isSearchIdleSupported() { return indexSettings.isSegRepEnabled() == false || indexSettings.getNumberOfReplicas() == 0; } From fde836139084a7b9417dbaf377070fc99f11aae5 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Fri, 14 Jul 2023 11:48:35 -0700 Subject: [PATCH 5/5] Remove unnecessary assertBusy Signed-off-by: Marc Handalian --- .../action/search/TransportSearchIT.java | 27 +++++++++---------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/action/search/TransportSearchIT.java b/server/src/internalClusterTest/java/org/opensearch/action/search/TransportSearchIT.java index 3ca6cf8e23b7e..afa5ac908c137 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/search/TransportSearchIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/search/TransportSearchIT.java @@ -371,7 +371,7 @@ public void testSearchIdle() throws Exception { }); } - public void testSearchIdleWithSegmentReplication() throws Exception { + public void testSearchIdleWithSegmentReplication() { int numOfReplicas = 1; internalCluster().ensureAtLeastNumDataNodes(numOfReplicas + 1); final Settings.Builder settings = Settings.builder() @@ -390,14 +390,12 @@ public void testSearchIdleWithSegmentReplication() throws Exception { ) ); - assertBusy(() -> { - for (String node : internalCluster().nodesInclude("test")) { - final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); - for (IndexShard indexShard : indicesService.indexServiceSafe(resolveIndex("test"))) { - assertFalse(indexShard.isSearchIdleSupported()); - } + for (String node : internalCluster().nodesInclude("test")) { + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + for (IndexShard indexShard : indicesService.indexServiceSafe(resolveIndex("test"))) { + assertFalse(indexShard.isSearchIdleSupported()); } - }); + } assertAcked( client().admin() @@ -406,14 +404,13 @@ public void testSearchIdleWithSegmentReplication() throws Exception { .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)) ); - assertBusy(() -> { - for (String node : internalCluster().nodesInclude("test")) { - final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); - for (IndexShard indexShard : indicesService.indexServiceSafe(resolveIndex("test"))) { - assertTrue(indexShard.isSearchIdleSupported()); - } + for (String node : internalCluster().nodesInclude("test")) { + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + for (IndexShard indexShard : indicesService.indexServiceSafe(resolveIndex("test"))) { + assertTrue(indexShard.isSearchIdleSupported()); } - }); + } + ; } public void testCircuitBreakerReduceFail() throws Exception {