Skip to content

Commit c746d07

Browse files
kh3rajainankitk
authored andcommitted
Added a dynamic cluster setting to control merge segment warming (opensearch-project#18942)
* Adding dynamic cluster setting to control merge segment warming feature. The feature would be disabled by default Signed-off-by: kh3ra <adityakh3ra@gmail.com> Signed-off-by: Aditya Khera <kheraadi@amazon.com> Signed-off-by: Ankit Jain <jainankitk@apache.org>
1 parent f42e2bd commit c746d07

File tree

10 files changed

+275
-31
lines changed

10 files changed

+275
-31
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1515
- Add a dynamic setting to change skip_cache_factor and min_frequency for querycache ([#18351](https://github.com/opensearch-project/OpenSearch/issues/18351))
1616
- Add overload constructor for Translog to accept Channel Factory as a parameter ([#18918](https://github.com/opensearch-project/OpenSearch/pull/18918))
1717
- Add subdirectory-aware store module with recovery support ([#19132](https://github.com/opensearch-project/OpenSearch/pull/19132))
18-
18+
- Add a dynamic cluster setting to control the enablement of the merged segment warmer ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929))
1919
### Changed
2020
- Add CompletionStage variants to methods in the Client Interface and default to ActionListener impl ([#18998](https://github.com/opensearch-project/OpenSearch/pull/18998))
2121
- IllegalArgumentException when scroll ID references a node not found in Cluster ([#19031](https://github.com/opensearch-project/OpenSearch/pull/19031))

server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.opensearch.index.engine.Segment;
2525
import org.opensearch.index.shard.IndexShard;
2626
import org.opensearch.index.store.StoreFileMetadata;
27+
import org.opensearch.indices.recovery.RecoverySettings;
2728
import org.opensearch.test.OpenSearchIntegTestCase;
2829
import org.opensearch.test.transport.MockTransportService;
2930
import org.opensearch.transport.ConnectTransportException;
@@ -45,7 +46,10 @@
4546
public class MergedSegmentWarmerIT extends SegmentReplicationIT {
4647
@Override
4748
protected Settings nodeSettings(int nodeOrdinal) {
48-
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build();
49+
return Settings.builder()
50+
.put(super.nodeSettings(nodeOrdinal))
51+
.put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true)
52+
.build();
4953
}
5054

5155
@Override

server/src/internalClusterTest/java/org/opensearch/indices/replication/RemoteStoreMergedSegmentWarmerIT.java

Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.common.util.FeatureFlags;
1717
import org.opensearch.index.IndexSettings;
1818
import org.opensearch.index.TieredMergePolicyProvider;
19+
import org.opensearch.indices.recovery.RecoverySettings;
1920
import org.opensearch.indices.replication.checkpoint.RemoteStorePublishMergedSegmentRequest;
2021
import org.opensearch.test.OpenSearchIntegTestCase;
2122
import org.opensearch.test.transport.MockTransportService;
@@ -42,6 +43,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
4243
return Settings.builder()
4344
.put(super.nodeSettings(nodeOrdinal))
4445
.put(remoteStoreClusterSettings("test-remote-store-repo", absolutePath))
46+
.put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true)
4547
.build();
4648
}
4749

@@ -62,13 +64,11 @@ public void testMergeSegmentWarmerRemote() throws Exception {
6264
final String node2 = internalCluster().startDataOnlyNode();
6365
createIndex(INDEX_NAME);
6466
ensureGreen(INDEX_NAME);
65-
MockTransportService mockTransportServiceNode1 = (MockTransportService) internalCluster().getInstance(
66-
TransportService.class,
67-
node1
68-
);
69-
MockTransportService mockTransportServiceNode2 = (MockTransportService) internalCluster().getInstance(
67+
68+
String primaryShardNode = findprimaryShardNode(INDEX_NAME);
69+
MockTransportService mockTransportServicePrimary = (MockTransportService) internalCluster().getInstance(
7070
TransportService.class,
71-
node2
71+
primaryShardNode
7272
);
7373
final CountDownLatch latch = new CountDownLatch(1);
7474
StubbableTransport.SendRequestBehavior behavior = (connection, requestId, action, request, options) -> {
@@ -82,6 +82,8 @@ public void testMergeSegmentWarmerRemote() throws Exception {
8282
connection.sendRequest(requestId, action, request, options);
8383
};
8484

85+
mockTransportServicePrimary.addSendBehavior(behavior);
86+
8587
for (int i = 0; i < 30; i++) {
8688
client().prepareIndex(INDEX_NAME)
8789
.setId(String.valueOf(i))
@@ -92,14 +94,10 @@ public void testMergeSegmentWarmerRemote() throws Exception {
9294

9395
waitForSearchableDocs(30, node1, node2);
9496

95-
mockTransportServiceNode1.addSendBehavior(behavior);
96-
mockTransportServiceNode2.addSendBehavior(behavior);
97-
9897
client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(2));
9998
waitForSegmentCount(INDEX_NAME, 2, logger);
10099
assertTrue(latch.await(10, TimeUnit.SECONDS));
101-
mockTransportServiceNode1.clearAllRules();
102-
mockTransportServiceNode2.clearAllRules();
100+
mockTransportServicePrimary.clearAllRules();
103101
}
104102

105103
public void testConcurrentMergeSegmentWarmerRemote() throws Exception {
@@ -115,14 +113,13 @@ public void testConcurrentMergeSegmentWarmerRemote() throws Exception {
115113
.build()
116114
);
117115
ensureGreen(INDEX_NAME);
118-
MockTransportService mockTransportServiceNode1 = (MockTransportService) internalCluster().getInstance(
119-
TransportService.class,
120-
node1
121-
);
122-
MockTransportService mockTransportServiceNode2 = (MockTransportService) internalCluster().getInstance(
116+
117+
String primaryShardNode = findprimaryShardNode(INDEX_NAME);
118+
MockTransportService mockTransportServicePrimary = (MockTransportService) internalCluster().getInstance(
123119
TransportService.class,
124-
node2
120+
primaryShardNode
125121
);
122+
126123
CountDownLatch latch = new CountDownLatch(2);
127124
AtomicLong numInvocations = new AtomicLong(0);
128125
Set<String> executingThreads = ConcurrentHashMap.newKeySet();
@@ -139,8 +136,7 @@ public void testConcurrentMergeSegmentWarmerRemote() throws Exception {
139136
connection.sendRequest(requestId, action, request, options);
140137
};
141138

142-
mockTransportServiceNode1.addSendBehavior(behavior);
143-
mockTransportServiceNode2.addSendBehavior(behavior);
139+
mockTransportServicePrimary.addSendBehavior(behavior);
144140

145141
for (int i = 0; i < 30; i++) {
146142
client().prepareIndex(INDEX_NAME)
@@ -158,8 +154,7 @@ public void testConcurrentMergeSegmentWarmerRemote() throws Exception {
158154
assertTrue(executingThreads.size() > 1);
159155
// Verify concurrent execution by checking that multiple unique threads handled merge operations
160156
assertTrue(numInvocations.get() > 1);
161-
mockTransportServiceNode1.clearAllRules();
162-
mockTransportServiceNode2.clearAllRules();
157+
mockTransportServicePrimary.clearAllRules();
163158
}
164159

165160
public void testMergeSegmentWarmerWithInactiveReplicaRemote() throws Exception {
@@ -179,4 +174,60 @@ public void testMergeSegmentWarmerWithInactiveReplicaRemote() throws Exception {
179174
final IndicesSegmentResponse response = client().admin().indices().prepareSegments(INDEX_NAME).get();
180175
assertEquals(1, response.getIndices().get(INDEX_NAME).getShards().values().size());
181176
}
177+
178+
public void testMergeSegmentWarmerWithWarmingDisabled() throws Exception {
179+
internalCluster().startDataOnlyNode();
180+
internalCluster().startDataOnlyNode();
181+
createIndex(INDEX_NAME);
182+
ensureGreen(INDEX_NAME);
183+
184+
String primaryNodeName = findprimaryShardNode(INDEX_NAME);
185+
internalCluster().client()
186+
.admin()
187+
.cluster()
188+
.prepareUpdateSettings()
189+
.setPersistentSettings(
190+
Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), false).build()
191+
)
192+
.get();
193+
194+
MockTransportService mockTransportServicePrimary = (MockTransportService) internalCluster().getInstance(
195+
TransportService.class,
196+
primaryNodeName
197+
);
198+
199+
CountDownLatch warmingLatch = new CountDownLatch(1);
200+
StubbableTransport.SendRequestBehavior behavior = (connection, requestId, action, request, options) -> {
201+
if (action.equals("indices:admin/remote_publish_merged_segment[r]")) {
202+
warmingLatch.countDown(); // This should NOT happen
203+
}
204+
connection.sendRequest(requestId, action, request, options);
205+
};
206+
207+
mockTransportServicePrimary.addSendBehavior(behavior);
208+
209+
for (int i = 0; i < 30; i++) {
210+
client().prepareIndex(INDEX_NAME)
211+
.setId(String.valueOf(i))
212+
.setSource("foo" + i, "bar" + i)
213+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
214+
.get();
215+
}
216+
217+
client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(1)).get();
218+
final IndicesSegmentResponse response = client().admin().indices().prepareSegments(INDEX_NAME).get();
219+
assertEquals(1, response.getIndices().get(INDEX_NAME).getShards().values().size());
220+
assertFalse("Warming should be skipped when disabled", warmingLatch.await(5, TimeUnit.SECONDS));
221+
mockTransportServicePrimary.clearAllRules();
222+
}
223+
224+
/**
225+
* Returns the node name for the node hosting the primary shard for index "indexName"
226+
*/
227+
private String findprimaryShardNode(String indexName) {
228+
String nodeId = internalCluster().clusterService().state().routingTable().index(indexName).shard(0).primaryShard().currentNodeId();
229+
230+
return internalCluster().clusterService().state().nodes().get(nodeId).getName();
231+
232+
}
182233
}

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ public void apply(Settings value, Settings current, Settings previous) {
319319
ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES,
320320
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
321321
RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING,
322+
RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING,
322323
RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_MAX_BYTES_PER_SEC_SETTING,
323324
RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_TIMEOUT_SETTING,
324325
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,

server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ public MergedSegmentWarmer(
5151

5252
@Override
5353
public void warm(LeafReader leafReader) throws IOException {
54+
if (shouldWarm() == false) {
55+
return;
56+
}
5457
// IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader.
5558
assert leafReader instanceof SegmentReader;
5659
assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled();
@@ -72,4 +75,9 @@ public void warm(LeafReader leafReader) throws IOException {
7275
);
7376
});
7477
}
78+
79+
// package-private for tests
80+
boolean shouldWarm() {
81+
return indexShard.getRecoverySettings().isMergedSegmentReplicationWarmerEnabled() == true;
82+
}
7583
}

server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmerFactory.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.lucene.index.IndexWriter;
1212
import org.opensearch.cluster.service.ClusterService;
1313
import org.opensearch.common.annotation.ExperimentalApi;
14+
import org.opensearch.common.util.FeatureFlags;
1415
import org.opensearch.index.shard.IndexShard;
1516
import org.opensearch.indices.recovery.RecoverySettings;
1617
import org.opensearch.transport.TransportService;
@@ -34,12 +35,15 @@ public MergedSegmentWarmerFactory(TransportService transportService, RecoverySet
3435
}
3536

3637
public IndexWriter.IndexReaderWarmer get(IndexShard shard) {
37-
if (shard.indexSettings().isSegRepLocalEnabled() || shard.indexSettings().isRemoteStoreEnabled()) {
38-
return new MergedSegmentWarmer(transportService, recoverySettings, clusterService, shard);
39-
} else if (shard.indexSettings().isDocumentReplication()) {
40-
// MergedSegmentWarmerFactory#get is called when IndexShard is initialized. In scenario document replication,
41-
// IndexWriter.IndexReaderWarmer should be null.
38+
if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) == false
39+
|| shard.indexSettings().isDocumentReplication()) {
40+
// MergedSegmentWarmerFactory#get is called by IndexShard#newEngineConfig on the initialization of a new indexShard and
41+
// in cases of updates to shard state.
42+
// 1. IndexWriter.IndexReaderWarmer should be null when IndexMetadata.INDEX_REPLICATION_TYPE_SETTING == ReplicationType.DOCUMENT
43+
// 2. IndexWriter.IndexReaderWarmer should be null when the FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG == false
4244
return null;
45+
} else if (shard.indexSettings().isSegRepLocalEnabled() || shard.indexSettings().isRemoteStoreEnabled()) {
46+
return new MergedSegmentWarmer(transportService, recoverySettings, clusterService, shard);
4347
}
4448
// We just handle known cases and throw exception at the last. This will allow predictability on the IndexReaderWarmer behaviour.
4549
throw new IllegalStateException(shard.shardId() + " can't determine IndexReaderWarmer");

server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,10 @@
4040
import org.opensearch.common.settings.ClusterSettings;
4141
import org.opensearch.common.settings.Setting;
4242
import org.opensearch.common.settings.Setting.Property;
43+
import org.opensearch.common.settings.Setting.Validator;
4344
import org.opensearch.common.settings.Settings;
4445
import org.opensearch.common.unit.TimeValue;
46+
import org.opensearch.common.util.FeatureFlags;
4547
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
4648
import org.opensearch.core.common.unit.ByteSizeUnit;
4749
import org.opensearch.core.common.unit.ByteSizeValue;
@@ -75,6 +77,28 @@ public class RecoverySettings {
7577
Property.NodeScope
7678
);
7779

80+
/**
81+
* Dynamic setting to enable the merged segment warming(pre-copy) feature, default: false
82+
*/
83+
public static final Setting<Boolean> INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING = Setting.boolSetting(
84+
"indices.replication.merged_segment_warmer_enabled",
85+
false,
86+
new Validator<Boolean>() {
87+
@Override
88+
public void validate(Boolean value) {
89+
if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) == false && value == true) {
90+
throw new IllegalArgumentException(
91+
"FeatureFlag "
92+
+ FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG
93+
+ " must be enabled to set this property to true."
94+
);
95+
}
96+
}
97+
},
98+
Property.Dynamic,
99+
Property.NodeScope
100+
);
101+
78102
/**
79103
* Individual speed setting for merged segment replication, default -1B to reuse the setting of recovery.
80104
*/
@@ -211,6 +235,7 @@ public class RecoverySettings {
211235

212236
private volatile ByteSizeValue recoveryMaxBytesPerSec;
213237
private volatile ByteSizeValue replicationMaxBytesPerSec;
238+
private volatile boolean mergedSegmentReplicationWarmerEnabled;
214239
private volatile ByteSizeValue mergedSegmentReplicationMaxBytesPerSec;
215240
private volatile int maxConcurrentFileChunks;
216241
private volatile int maxConcurrentOperations;
@@ -250,6 +275,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
250275
recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
251276
}
252277
this.replicationMaxBytesPerSec = INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings);
278+
this.mergedSegmentReplicationWarmerEnabled = INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.get(settings);
253279
this.mergedSegmentReplicationMaxBytesPerSec = INDICES_MERGED_SEGMENT_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings);
254280
this.mergedSegmentReplicationTimeout = INDICES_MERGED_SEGMENT_REPLICATION_TIMEOUT_SETTING.get(settings);
255281
replicationRateLimiter = getReplicationRateLimiter(replicationMaxBytesPerSec);
@@ -261,6 +287,10 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
261287

262288
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec);
263289
clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec);
290+
clusterSettings.addSettingsUpdateConsumer(
291+
INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING,
292+
this::setIndicesMergedSegmentReplicationWarmerEnabled
293+
);
264294
clusterSettings.addSettingsUpdateConsumer(
265295
INDICES_MERGED_SEGMENT_REPLICATION_MAX_BYTES_PER_SEC_SETTING,
266296
this::setMergedSegmentReplicationMaxBytesPerSec
@@ -442,4 +472,12 @@ private void setMaxConcurrentRemoteStoreStreams(int maxConcurrentRemoteStoreStre
442472
this.maxConcurrentRemoteStoreStreams = maxConcurrentRemoteStoreStreams;
443473
}
444474

475+
public boolean isMergedSegmentReplicationWarmerEnabled() {
476+
return mergedSegmentReplicationWarmerEnabled;
477+
}
478+
479+
public void setIndicesMergedSegmentReplicationWarmerEnabled(boolean mergedSegmentReplicationWarmerEnabled) {
480+
this.mergedSegmentReplicationWarmerEnabled = mergedSegmentReplicationWarmerEnabled;
481+
}
482+
445483
}

0 commit comments

Comments
 (0)