Skip to content

Commit c5de961

Browse files
committed
add test
Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
1 parent 0e84bd5 commit c5de961

File tree

3 files changed

+190
-9
lines changed

3 files changed

+190
-9
lines changed

server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java

Lines changed: 89 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.opensearch.indices.replication.SegmentReplicationState;
6161
import org.opensearch.indices.replication.SegmentReplicationTarget;
6262
import org.opensearch.indices.replication.SegmentReplicationTargetService;
63+
import org.opensearch.indices.replication.checkpoint.MergedSegmentPublisher;
6364
import org.opensearch.indices.replication.checkpoint.ReferencedSegmentsCheckpoint;
6465
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
6566
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
@@ -158,7 +159,66 @@ public void testReplication() throws Exception {
158159
@LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG)
159160
public void testMergedSegmentReplication() throws Exception {
160161
// Test that the pre-copy merged segment logic does not block the merge process of the primary shard when there are 1 replica shard.
161-
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory());) {
162+
final RecoverySettings recoverySettings = new RecoverySettings(
163+
Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build(),
164+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
165+
);
166+
try (
167+
ReplicationGroup shards = createGroup(
168+
1,
169+
getIndexSettings(),
170+
indexMapping,
171+
new NRTReplicationEngineFactory(),
172+
recoverySettings,
173+
MergedSegmentPublisher.EMPTY
174+
)
175+
) {
176+
shards.startAll();
177+
final IndexShard primaryShard = shards.getPrimary();
178+
final IndexShard replicaShard = shards.getReplicas().get(0);
179+
180+
// index and replicate segments to replica.
181+
int numDocs = randomIntBetween(10, 20);
182+
shards.indexDocs(numDocs);
183+
primaryShard.refresh("test");
184+
flushShard(primaryShard);
185+
186+
shards.indexDocs(numDocs);
187+
primaryShard.refresh("test");
188+
flushShard(primaryShard);
189+
replicateSegments(primaryShard, List.of(replicaShard));
190+
shards.assertAllEqual(2 * numDocs);
191+
192+
primaryShard.forceMerge(new ForceMergeRequest("test").maxNumSegments(1));
193+
replicateMergedSegments(primaryShard, List.of(replicaShard));
194+
primaryShard.refresh("test");
195+
assertEquals(1, primaryShard.segments(false).size());
196+
// After the pre-copy merged segment is completed, the merged segment is not visible in the replica, and the number of segments
197+
// in the replica shard is still 2.
198+
assertEquals(2, replicaShard.segments(false).size());
199+
}
200+
}
201+
202+
@LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG)
203+
public void testMergedSegmentReplicationWithException() throws Exception {
204+
// Test that the pre-copy merged segment exception will not cause primary shard to fail
205+
MergedSegmentPublisher mergedSegmentPublisherWithException = new MergedSegmentPublisher((indexShard, checkpoint) -> {
206+
throw new RuntimeException("mock exception");
207+
});
208+
final RecoverySettings recoverySettings = new RecoverySettings(
209+
Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build(),
210+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
211+
);
212+
try (
213+
ReplicationGroup shards = createGroup(
214+
1,
215+
getIndexSettings(),
216+
indexMapping,
217+
new NRTReplicationEngineFactory(),
218+
recoverySettings,
219+
mergedSegmentPublisherWithException
220+
)
221+
) {
162222
shards.startAll();
163223
final IndexShard primaryShard = shards.getPrimary();
164224
final IndexShard replicaShard = shards.getReplicas().get(0);
@@ -188,7 +248,20 @@ public void testMergedSegmentReplication() throws Exception {
188248
@LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG)
189249
public void testMergedSegmentReplicationWithZeroReplica() throws Exception {
190250
// Test that the pre-copy merged segment logic does not block the merge process of the primary shard when there are 0 replica shard.
191-
try (ReplicationGroup shards = createGroup(0, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory());) {
251+
final RecoverySettings recoverySettings = new RecoverySettings(
252+
Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build(),
253+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
254+
);
255+
try (
256+
ReplicationGroup shards = createGroup(
257+
0,
258+
getIndexSettings(),
259+
indexMapping,
260+
new NRTReplicationEngineFactory(),
261+
recoverySettings,
262+
MergedSegmentPublisher.EMPTY
263+
)
264+
) {
192265
shards.startAll();
193266
final IndexShard primaryShard = shards.getPrimary();
194267

@@ -210,7 +283,20 @@ public void testMergedSegmentReplicationWithZeroReplica() throws Exception {
210283

211284
@LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG)
212285
public void testCleanupRedundantPendingMergeSegment() throws Exception {
213-
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory());) {
286+
final RecoverySettings recoverySettings = new RecoverySettings(
287+
Settings.builder().put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true).build(),
288+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
289+
);
290+
try (
291+
ReplicationGroup shards = createGroup(
292+
1,
293+
getIndexSettings(),
294+
indexMapping,
295+
new NRTReplicationEngineFactory(),
296+
recoverySettings,
297+
MergedSegmentPublisher.EMPTY
298+
)
299+
) {
214300
shards.startAll();
215301
final IndexShard primaryShard = shards.getPrimary();
216302
final IndexShard replicaShard = shards.getReplicas().get(0);

test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,11 @@
101101
import org.opensearch.index.shard.PrimaryReplicaSyncer;
102102
import org.opensearch.index.shard.ShardPath;
103103
import org.opensearch.index.translog.Translog;
104+
import org.opensearch.indices.recovery.DefaultRecoverySettings;
105+
import org.opensearch.indices.recovery.RecoverySettings;
104106
import org.opensearch.indices.recovery.RecoveryState;
105107
import org.opensearch.indices.recovery.RecoveryTarget;
108+
import org.opensearch.indices.replication.checkpoint.MergedSegmentPublisher;
106109
import org.opensearch.tasks.TaskManager;
107110
import org.opensearch.threadpool.ThreadPool;
108111
import org.opensearch.threadpool.ThreadPool.Names;
@@ -150,17 +153,48 @@ protected ReplicationGroup createGroup(int replicas, Settings settings, EngineFa
150153

151154
protected ReplicationGroup createGroup(int replicas, Settings settings, String mappings, EngineFactory engineFactory)
152155
throws IOException {
156+
return createGroup(replicas, settings, mappings, engineFactory, DefaultRecoverySettings.INSTANCE, MergedSegmentPublisher.EMPTY);
157+
}
158+
159+
protected ReplicationGroup createGroup(
160+
int replicas,
161+
Settings settings,
162+
String mappings,
163+
EngineFactory engineFactory,
164+
RecoverySettings recoverySettings,
165+
MergedSegmentPublisher mergedSegmentPublisher
166+
) throws IOException {
153167
Path remotePath = null;
154168
if ("true".equals(settings.get(IndexMetadata.SETTING_REMOTE_STORE_ENABLED))) {
155169
remotePath = createTempDir();
156170
}
157-
return createGroup(replicas, settings, mappings, engineFactory, remotePath);
171+
return createGroup(replicas, settings, mappings, engineFactory, recoverySettings, remotePath, mergedSegmentPublisher);
158172
}
159173

160174
protected ReplicationGroup createGroup(int replicas, Settings settings, String mappings, EngineFactory engineFactory, Path remotePath)
161175
throws IOException {
176+
return createGroup(
177+
replicas,
178+
settings,
179+
mappings,
180+
engineFactory,
181+
DefaultRecoverySettings.INSTANCE,
182+
remotePath,
183+
MergedSegmentPublisher.EMPTY
184+
);
185+
}
186+
187+
protected ReplicationGroup createGroup(
188+
int replicas,
189+
Settings settings,
190+
String mappings,
191+
EngineFactory engineFactory,
192+
RecoverySettings recoverySettings,
193+
Path remotePath,
194+
MergedSegmentPublisher mergedSegmentPublisher
195+
) throws IOException {
162196
IndexMetadata metadata = buildIndexMetadata(replicas, settings, mappings);
163-
return new ReplicationGroup(metadata, remotePath) {
197+
return new ReplicationGroup(metadata, recoverySettings, remotePath, mergedSegmentPublisher) {
164198
@Override
165199
protected EngineFactory getEngineFactory(ShardRouting routing) {
166200
return engineFactory;
@@ -253,6 +287,15 @@ protected ReplicationGroup(final IndexMetadata indexMetadata) throws IOException
253287
}
254288

255289
protected ReplicationGroup(final IndexMetadata indexMetadata, Path remotePath) throws IOException {
290+
this(indexMetadata, DefaultRecoverySettings.INSTANCE, remotePath, MergedSegmentPublisher.EMPTY);
291+
}
292+
293+
protected ReplicationGroup(
294+
final IndexMetadata indexMetadata,
295+
RecoverySettings recoverySettings,
296+
Path remotePath,
297+
MergedSegmentPublisher mergedSegmentPublisher
298+
) throws IOException {
256299
final ShardRouting primaryRouting = this.createShardRouting("s0", true);
257300
primary = newShard(
258301
primaryRouting,
@@ -261,7 +304,9 @@ protected ReplicationGroup(final IndexMetadata indexMetadata, Path remotePath) t
261304
getEngineFactory(primaryRouting),
262305
() -> {},
263306
retentionLeaseSyncer,
264-
remotePath
307+
recoverySettings,
308+
remotePath,
309+
mergedSegmentPublisher
265310
);
266311
replicas = new CopyOnWriteArrayList<>();
267312
this.indexMetadata = indexMetadata;

test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,44 @@ protected IndexShard newShard(
485485
RetentionLeaseSyncer retentionLeaseSyncer,
486486
Path path,
487487
IndexingOperationListener... listeners
488+
) throws IOException {
489+
return newShard(
490+
routing,
491+
indexMetadata,
492+
indexReaderWrapper,
493+
engineFactory,
494+
globalCheckpointSyncer,
495+
retentionLeaseSyncer,
496+
DefaultRecoverySettings.INSTANCE,
497+
path,
498+
MergedSegmentPublisher.EMPTY,
499+
listeners
500+
);
501+
}
502+
503+
/**
504+
* creates a new initializing shard. The shard will be put in its proper path under the
505+
* current node id the shard is assigned to.
506+
* @param routing shard routing to use
507+
* @param indexMetadata indexMetadata for the shard, including any mapping
508+
* @param indexReaderWrapper an optional wrapper to be used during search
509+
* @param globalCheckpointSyncer callback for syncing global checkpoints
510+
* @param recoverySettings recovery settings
511+
* @param path remote path
512+
* @param mergedSegmentPublisher merged segment publisher
513+
* @param listeners an optional set of listeners to add to the shard
514+
*/
515+
protected IndexShard newShard(
516+
ShardRouting routing,
517+
IndexMetadata indexMetadata,
518+
@Nullable CheckedFunction<DirectoryReader, DirectoryReader, IOException> indexReaderWrapper,
519+
@Nullable EngineFactory engineFactory,
520+
Runnable globalCheckpointSyncer,
521+
RetentionLeaseSyncer retentionLeaseSyncer,
522+
RecoverySettings recoverySettings,
523+
Path path,
524+
MergedSegmentPublisher mergedSegmentPublisher,
525+
IndexingOperationListener... listeners
488526
) throws IOException {
489527
// add node id as name to settings for proper logging
490528
final ShardId shardId = routing.shardId();
@@ -501,7 +539,10 @@ protected IndexShard newShard(
501539
globalCheckpointSyncer,
502540
retentionLeaseSyncer,
503541
EMPTY_EVENT_LISTENER,
542+
SegmentReplicationCheckpointPublisher.EMPTY,
543+
recoverySettings,
504544
path,
545+
mergedSegmentPublisher,
505546
listeners
506547
);
507548
}
@@ -544,7 +585,9 @@ protected IndexShard newShard(
544585
retentionLeaseSyncer,
545586
indexEventListener,
546587
SegmentReplicationCheckpointPublisher.EMPTY,
588+
DefaultRecoverySettings.INSTANCE,
547589
remotePath,
590+
MergedSegmentPublisher.EMPTY,
548591
listeners
549592
);
550593
}
@@ -597,7 +640,9 @@ protected IndexShard newShard(boolean primary, SegmentReplicationCheckpointPubli
597640
RetentionLeaseSyncer.EMPTY,
598641
EMPTY_EVENT_LISTENER,
599642
checkpointPublisher,
600-
null
643+
DefaultRecoverySettings.INSTANCE,
644+
null,
645+
MergedSegmentPublisher.EMPTY
601646
);
602647
}
603648

@@ -611,6 +656,9 @@ protected IndexShard newShard(boolean primary, SegmentReplicationCheckpointPubli
611656
* @param globalCheckpointSyncer callback for syncing global checkpoints
612657
* @param indexEventListener index event listener
613658
* @param checkpointPublisher segment Replication Checkpoint Publisher to publish checkpoint
659+
* @param recoverySettings recovery settings
660+
* @param remotePath remote path
661+
* @param mergedSegmentPublisher merged segment publisher
614662
* @param listeners an optional set of listeners to add to the shard
615663
*/
616664
protected IndexShard newShard(
@@ -625,7 +673,9 @@ protected IndexShard newShard(
625673
RetentionLeaseSyncer retentionLeaseSyncer,
626674
IndexEventListener indexEventListener,
627675
SegmentReplicationCheckpointPublisher checkpointPublisher,
676+
RecoverySettings recoverySettings,
628677
@Nullable Path remotePath,
678+
MergedSegmentPublisher mergedSegmentPublisher,
629679
IndexingOperationListener... listeners
630680
) throws IOException {
631681
Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
@@ -726,7 +776,7 @@ protected IndexShard newShard(
726776
remoteStore,
727777
remoteStoreStatsTrackerFactory,
728778
"dummy-node",
729-
DefaultRecoverySettings.INSTANCE,
779+
recoverySettings,
730780
DefaultRemoteStoreSettings.INSTANCE,
731781
false,
732782
discoveryNodes,
@@ -737,7 +787,7 @@ protected IndexShard newShard(
737787
indexSettings::getRefreshInterval,
738788
new Object(),
739789
clusterService.getClusterApplierService(),
740-
MergedSegmentPublisher.EMPTY,
790+
mergedSegmentPublisher,
741791
ReferencedSegmentsPublisher.EMPTY
742792
);
743793
indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);

0 commit comments

Comments
 (0)