Skip to content

Commit 43b2549

Browse files
author
Harsh Kothari
committed
auto force merge manager
Code Spotless Fixes Signed-off-by: Harsh Kothari <techarsh@amazon.com>
1 parent 2f5c585 commit 43b2549

File tree

10 files changed

+512
-239
lines changed

10 files changed

+512
-239
lines changed

server/src/internalClusterTest/java/org/opensearch/autoforcemerge/AutoForceMergeManagerIT.java

Lines changed: 69 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,27 @@
99
package org.opensearch.autoforcemerge;
1010

1111
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
12+
1213
import org.opensearch.cluster.metadata.IndexMetadata;
1314
import org.opensearch.common.settings.Settings;
1415
import org.opensearch.common.unit.TimeValue;
16+
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
1517
import org.opensearch.core.common.unit.ByteSizeUnit;
1618
import org.opensearch.core.common.unit.ByteSizeValue;
19+
import org.opensearch.index.IndexSettings;
1720
import org.opensearch.index.engine.SegmentsStats;
21+
import org.opensearch.index.shard.IndexShard;
1822
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
1923
import org.opensearch.node.Node;
2024
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
21-
import org.opensearch.test.OpenSearchIntegTestCase;
22-
import org.opensearch.index.shard.IndexShard;
2325
import org.opensearch.test.InternalTestCluster;
26+
import org.opensearch.test.OpenSearchIntegTestCase;
2427

2528
import java.util.concurrent.ExecutionException;
2629
import java.util.concurrent.atomic.AtomicLong;
2730

2831
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
29-
import static org.opensearch.test.hamcrest.OpenSearchAssertions.*;
32+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
3033

3134
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
3235
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false)
@@ -52,8 +55,9 @@ protected Settings nodeSettings(int nodeOrdinal) {
5255
.put(super.nodeSettings(nodeOrdinal))
5356
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
5457
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString())
58+
.put(OpenSearchExecutors.NODE_PROCESSORS_SETTING.getKey(), 32)
5559
.put(ForceMergeManagerSettings.AUTO_FORCE_MERGE_SCHEDULER_INTERVAL.getKey(), SCHEDULER_INTERVAL)
56-
.put(ForceMergeManagerSettings.SEGMENT_COUNT_THRESHOLD_FOR_AUTO_FORCE_MERGE.getKey(), SEGMENT_COUNT)
60+
.put(ForceMergeManagerSettings.SEGMENT_COUNT_FOR_AUTO_FORCE_MERGE.getKey(), SEGMENT_COUNT)
5761
.put(ForceMergeManagerSettings.MERGE_DELAY_BETWEEN_SHARDS_FOR_AUTO_FORCE_MERGE.getKey(), MERGE_DELAY)
5862
.build();
5963
}
@@ -102,6 +106,35 @@ public void testAutoForceMergeFeatureFlagDisabled() throws InterruptedException,
102106
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_1).get());
103107
}
104108

109+
public void testAutoForceMergeTriggeringWithOneShardOfNonWarmCandidate() throws Exception {
110+
Settings clusterSettings = Settings.builder()
111+
.put(super.nodeSettings(0))
112+
.put(ForceMergeManagerSettings.AUTO_FORCE_MERGE_SETTING.getKey(), true)
113+
.build();
114+
InternalTestCluster internalTestCluster = internalCluster();
115+
internalTestCluster.startClusterManagerOnlyNode(clusterSettings);
116+
String dataNode = internalTestCluster.startDataOnlyNodes(1, clusterSettings).getFirst();
117+
internalCluster().startWarmOnlyNodes(1, clusterSettings).getFirst();
118+
Settings settings = Settings.builder()
119+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
120+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
121+
.put(IndexSettings.INDEX_ALLOW_AUTO_FORCE_MERGES.getKey(), false)
122+
.build();
123+
assertAcked(client().admin().indices().prepareCreate(INDEX_NAME_1).setSettings(settings).get());
124+
for (int i = 0; i < INGESTION_COUNT; i++) {
125+
indexBulk(INDEX_NAME_1, NUM_DOCS_IN_BULK);
126+
flushAndRefresh(INDEX_NAME_1);
127+
}
128+
IndexShard shard = getIndexShard(dataNode, INDEX_NAME_1);
129+
assertNotNull(shard);
130+
SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false);
131+
Thread.sleep(TimeValue.parseTimeValue(SCHEDULER_INTERVAL, "test").getMillis() * 3);
132+
flushAndRefresh(INDEX_NAME_1);
133+
SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false);
134+
assertEquals(segmentsStatsBefore.getCount(), segmentsStatsAfter.getCount());
135+
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_1).get());
136+
}
137+
105138
public void testAutoForceMergeTriggeringBasicWithOneShard() throws Exception {
106139

107140
Settings clusterSettings = Settings.builder()
@@ -158,14 +191,30 @@ public void testAutoForceMergeTriggeringBasicWithFiveShardsOfTwoIndex() throws E
158191
internalTestCluster.startClusterManagerOnlyNode(clusterSettings);
159192
String dataNode = internalTestCluster.startDataOnlyNodes(1, clusterSettings).getFirst();
160193
internalCluster().startWarmOnlyNodes(1, clusterSettings).getFirst();
161-
assertAcked(client().admin().indices().prepareCreate(INDEX_NAME_1).setSettings(Settings.builder()
162-
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3)
163-
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
164-
.build()).get());
165-
assertAcked(client().admin().indices().prepareCreate(INDEX_NAME_2).setSettings(Settings.builder()
166-
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
167-
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
168-
.build()).get());
194+
assertAcked(
195+
client().admin()
196+
.indices()
197+
.prepareCreate(INDEX_NAME_1)
198+
.setSettings(
199+
Settings.builder()
200+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3)
201+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
202+
.build()
203+
)
204+
.get()
205+
);
206+
assertAcked(
207+
client().admin()
208+
.indices()
209+
.prepareCreate(INDEX_NAME_2)
210+
.setSettings(
211+
Settings.builder()
212+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
213+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
214+
.build()
215+
)
216+
.get()
217+
);
169218

170219
// Each ingestion request creates a segment here
171220
for (int i = 0; i < INGESTION_COUNT; i++) {
@@ -192,11 +241,10 @@ public void testAutoForceMergeTriggeringBasicWithFiveShardsOfTwoIndex() throws E
192241
SegmentsStats segmentsStatsForShard3Before = shard3.segmentStats(false, false);
193242
SegmentsStats segmentsStatsForShard4Before = shard4.segmentStats(false, false);
194243
SegmentsStats segmentsStatsForShard5Before = shard5.segmentStats(false, false);
195-
AtomicLong totalSegments = new AtomicLong(segmentsStatsForShard1Before.getCount()
196-
+ segmentsStatsForShard2Before.getCount()
197-
+ segmentsStatsForShard3Before.getCount()
198-
+ segmentsStatsForShard4Before.getCount()
199-
+ segmentsStatsForShard5Before.getCount());
244+
AtomicLong totalSegments = new AtomicLong(
245+
segmentsStatsForShard1Before.getCount() + segmentsStatsForShard2Before.getCount() + segmentsStatsForShard3Before.getCount()
246+
+ segmentsStatsForShard4Before.getCount() + segmentsStatsForShard5Before.getCount()
247+
);
200248

201249
assertTrue(totalSegments.get() > 5);
202250

@@ -208,11 +256,10 @@ public void testAutoForceMergeTriggeringBasicWithFiveShardsOfTwoIndex() throws E
208256
SegmentsStats segmentsStatsForShard3After = shard3.segmentStats(false, false);
209257
SegmentsStats segmentsStatsForShard4After = shard4.segmentStats(false, false);
210258
SegmentsStats segmentsStatsForShard5After = shard5.segmentStats(false, false);
211-
totalSegments.set(segmentsStatsForShard1After.getCount()
212-
+ segmentsStatsForShard2After.getCount()
213-
+ segmentsStatsForShard3After.getCount()
214-
+ segmentsStatsForShard4After.getCount()
215-
+ segmentsStatsForShard5After.getCount());
259+
totalSegments.set(
260+
segmentsStatsForShard1After.getCount() + segmentsStatsForShard2After.getCount() + segmentsStatsForShard3After.getCount()
261+
+ segmentsStatsForShard4After.getCount() + segmentsStatsForShard5After.getCount()
262+
);
216263
// refresh to clear old segments
217264
flushAndRefresh(INDEX_NAME_1);
218265
flushAndRefresh(INDEX_NAME_2);
@@ -224,4 +271,3 @@ public void testAutoForceMergeTriggeringBasicWithFiveShardsOfTwoIndex() throws E
224271
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_2).get());
225272
}
226273
}
227-

0 commit comments

Comments
 (0)