Skip to content

Commit

Permalink
[Remote Store] Emit correct global checkpoint during translog upload
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Sep 19, 2024
1 parent 3a1b6d1 commit a1d5a87
Showing 1 changed file with 71 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.client.Requests;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
Expand Down Expand Up @@ -202,7 +203,7 @@ public void testRemoteTranslogCleanup() throws Exception {

public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
String dataNode = internalCluster().startNode();
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000L, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
Expand Down Expand Up @@ -1011,4 +1012,73 @@ public void testAsyncTranslogDurabilityRestrictionsThroughIdxTemplates() throws
.get()
);
}

public void testCloseIndexWithNoOpSyncAndFlushForSyncTranslog() throws InterruptedException {
internalCluster().startNodes(3);
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "5s"))
.get();
Settings.Builder settings = Settings.builder()
.put(remoteStoreIndexSettings(0, 10000L, -1))
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s");
createIndex(INDEX_NAME, settings.build());
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
if (randomBoolean()) {
for (int i = 0; i < randomIntBetween(1, 5); i++) {
indexSingleDoc(INDEX_NAME);
}
flushAndRefresh(INDEX_NAME);
}
// Index single doc to start the asyn io processor to run which will lead to 10s wait time before the next sync.
indexSingleDoc(INDEX_NAME);
// Reduce the latch for the main thread to flush after some sleep.
latch.countDown();
// Index another doc and in this case the flush would have happened before the sync.
indexSingleDoc(INDEX_NAME);
}).start();
// Wait for atleast one doc to be ingested.
latch.await();
// Sleep for some time for the next doc to be present in lucene buffer. If flush happens first before the doc #2
// gets indexed, then it goes into the happy case where the close index happens succefully.
Thread.sleep(1000);
// Flush so that the subsequent sync or flushes are no-op.
flush(INDEX_NAME);
// Closing the index involves translog.sync and shard.flush which are now no-op.
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
Thread.sleep(10000);
ensureGreen(INDEX_NAME);
}

public void testCloseIndexWithNoOpSyncAndFlushForAsyncTranslog() throws InterruptedException {
internalCluster().startNodes(3);
Settings.Builder settings = Settings.builder()
.put(remoteStoreIndexSettings(0, 10000L, -1))
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s")
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Durability.ASYNC)
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "10s");
createIndex(INDEX_NAME, settings.build());
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
// Index some docs to start the asyn io processor to run which will lead to 10s wait time before the next sync.
indexSingleDoc(INDEX_NAME);
indexSingleDoc(INDEX_NAME);
indexSingleDoc(INDEX_NAME);
// Reduce the latch for the main thread to flush after some sleep.
latch.countDown();
}).start();
// Wait for atleast one doc to be ingested.
latch.await();
// Sleep for some time for the next doc to be present in lucene buffer. If flush happens first before the doc #2
// gets indexed, then it goes into the happy case where the close index happens succefully.
Thread.sleep(1000);
// Flush so that the subsequent sync or flushes are no-op.
flush(INDEX_NAME);
// Closing the index involves translog.sync and shard.flush which are now no-op.
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
Thread.sleep(10000);
ensureGreen(INDEX_NAME);
}
}

0 comments on commit a1d5a87

Please sign in to comment.