Skip to content

Commit

Permalink
Fix the GlobalCheckpointSyncIT
Browse files Browse the repository at this point in the history
  • Loading branch information
Arpit-Bandejiya committed Sep 1, 2023
1 parent 3be1f3d commit 998cc58
Showing 1 changed file with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationBaseIT;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -254,12 +255,16 @@ public void testPersistGlobalCheckpoint() throws Exception {
client().prepareIndex("test").setId(Integer.toString(i)).setSource("{}", MediaTypeRegistry.JSON).get();
}
ensureGreen("test");
flushAndRefresh("test");
Thread.sleep(30000);
assertBusy(() -> {
for (IndicesService indicesService : internalCluster().getDataNodeInstances(IndicesService.class)) {
for (IndexService indexService : indicesService) {
for (IndexShard shard : indexService) {
final SeqNoStats seqNoStats = shard.seqNoStats();
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
if(shard.isRemoteTranslogEnabled() == false) {
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
}
assertThat(shard.getLastKnownGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
assertThat(shard.getLastSyncedGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
}
Expand All @@ -268,16 +273,16 @@ public void testPersistGlobalCheckpoint() throws Exception {
});
}

public void testPersistLocalCheckpoint() {
public void testPersistLocalCheckpoint() throws Exception{
internalCluster().ensureAtLeastNumDataNodes(2);
Settings.Builder indexSettings = Settings.builder()
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "10m")
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", randomIntBetween(0, 1));
.put("index.number_of_replicas", 1);
prepareCreate("test", indexSettings).get();
ensureGreen("test");
int numDocs = randomIntBetween(1, 20);
int numDocs = randomIntBetween(3, 10);
logger.info("numDocs {}", numDocs);
long maxSeqNo = 0;
for (int i = 0; i < numDocs; i++) {
Expand All @@ -288,9 +293,10 @@ public void testPersistLocalCheckpoint() {
for (IndexService indexService : indicesService) {
for (IndexShard shard : indexService) {
final SeqNoStats seqNoStats = shard.seqNoStats();
assertThat(maxSeqNo, equalTo(seqNoStats.getMaxSeqNo()));
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
;
if (shard.isRemoteTranslogEnabled() == false) {
assertThat(maxSeqNo, equalTo(seqNoStats.getMaxSeqNo()));
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
}
}
}
}
Expand Down

0 comments on commit 998cc58

Please sign in to comment.