Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue of red index on close for remote enabled clusters #15990

Merged
merged 3 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.client.Requests;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
Expand Down Expand Up @@ -202,7 +203,7 @@ public void testRemoteTranslogCleanup() throws Exception {

public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
String dataNode = internalCluster().startNode();
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000L, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
Expand Down Expand Up @@ -1011,4 +1012,70 @@ public void testAsyncTranslogDurabilityRestrictionsThroughIdxTemplates() throws
.get()
);
}

public void testCloseIndexWithNoOpSyncAndFlushForSyncTranslog() throws InterruptedException {
internalCluster().startNodes(3);
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "5s"))
.get();
Settings.Builder settings = Settings.builder()
.put(remoteStoreIndexSettings(0, 10000L, -1))
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s");
createIndex(INDEX_NAME, settings.build());
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
if (randomBoolean()) {
for (int i = 0; i < randomIntBetween(1, 5); i++) {
indexSingleDoc(INDEX_NAME);
}
flushAndRefresh(INDEX_NAME);
}
// Index single doc to start the asyn io processor to run which will lead to 10s wait time before the next sync.
indexSingleDoc(INDEX_NAME);
// Reduce the latch for the main thread to flush after some sleep.
latch.countDown();
// Index another doc and in this case the flush would have happened before the sync.
indexSingleDoc(INDEX_NAME);
}).start();
// Wait for atleast one doc to be ingested.
latch.await();
// Sleep for some time for the next doc to be present in lucene buffer. If flush happens first before the doc #2
// gets indexed, then it goes into the happy case where the close index happens succefully.
Thread.sleep(1000);
// Flush so that the subsequent sync or flushes are no-op.
flush(INDEX_NAME);
// Closing the index involves translog.sync and shard.flush which are now no-op.
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
Thread.sleep(10000);
ensureGreen(INDEX_NAME);
}

public void testCloseIndexWithNoOpSyncAndFlushForAsyncTranslog() throws InterruptedException {
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
internalCluster().startNodes(3);
Settings.Builder settings = Settings.builder()
.put(remoteStoreIndexSettings(0, 10000L, -1))
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s")
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Durability.ASYNC)
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "10s");
createIndex(INDEX_NAME, settings.build());
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
// Index some docs to start the asyn io processor to run which will lead to 10s wait time before the next sync.
indexSingleDoc(INDEX_NAME);
indexSingleDoc(INDEX_NAME);
indexSingleDoc(INDEX_NAME);
// Reduce the latch for the main thread to flush after some sleep.
latch.countDown();
}).start();
// Wait for atleast one doc to be ingested.
latch.await();
// Flush so that the subsequent sync or flushes are no-op.
flush(INDEX_NAME);
// Closing the index involves translog.sync and shard.flush which are now no-op.
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
Thread.sleep(10000);
ensureGreen(INDEX_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public class RemoteFsTranslog extends Translog {
// min generation referred by last uploaded translog
protected volatile long minRemoteGenReferenced;

// the max global checkpoint that has been synced
protected volatile long globalCheckpointSynced;

// clean up translog folder uploaded by previous primaries once
protected final SetOnce<Boolean> olderPrimaryCleaned = new SetOnce<>();

Expand Down Expand Up @@ -437,9 +440,10 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws
config.getNodeId()
).build()
) {
Checkpoint checkpoint = current.getLastSyncedCheckpoint();
return translogTransferManager.transferSnapshot(
transferSnapshotProvider,
new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo)
new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo, checkpoint.globalCheckpoint)
);
} finally {
syncPermit.release(SYNC_PERMIT);
Expand Down Expand Up @@ -474,7 +478,10 @@ public void sync() throws IOException {
public boolean syncNeeded() {
try (ReleasableLock lock = readLock.acquire()) {
return current.syncNeeded()
|| (maxRemoteTranslogGenerationUploaded + 1 < this.currentFileGeneration() && current.totalOperations() == 0);
|| (maxRemoteTranslogGenerationUploaded + 1 < this.currentFileGeneration() && current.totalOperations() == 0)
// The below condition on GCP exists to handle global checkpoint updates during close index.
// Refer issue - https://github.com/opensearch-project/OpenSearch/issues/15989
|| (current.getLastSyncedCheckpoint().globalCheckpoint > globalCheckpointSynced);
}
}

Expand Down Expand Up @@ -674,17 +681,25 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen

private final long maxSeqNo;

RemoteFsTranslogTransferListener(long generation, long primaryTerm, long maxSeqNo) {
private final long globalCheckpoint;

RemoteFsTranslogTransferListener(long generation, long primaryTerm, long maxSeqNo, long globalCheckpoint) {
this.generation = generation;
this.primaryTerm = primaryTerm;
this.maxSeqNo = maxSeqNo;
this.globalCheckpoint = globalCheckpoint;
}

@Override
public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException {
maxRemoteTranslogGenerationUploaded = generation;
long previousMinRemoteGenReferenced = minRemoteGenReferenced;
minRemoteGenReferenced = getMinFileGeneration();
// Update the global checkpoint only if the supplied global checkpoint is greater than it
// When a new writer is created the
if (globalCheckpoint > globalCheckpointSynced) {
globalCheckpointSynced = globalCheckpoint;
}
if (previousMinRemoteGenReferenced != minRemoteGenReferenced) {
onMinRemoteGenReferencedChange();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1801,6 +1801,83 @@ public void testDownloadWithEmptyTranslogOnlyInLocal() throws IOException {
assertArrayEquals(filesPostFirstDownload, filesPostSecondDownload);
}

public void testSyncWithGlobalCheckpointUpdate() throws IOException {
ArrayList<Translog.Operation> ops = new ArrayList<>();
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 1, primaryTerm.get(), new byte[] { 2 }));

// Set a global checkpoint
long initialGlobalCheckpoint = 1L;
globalCheckpoint.set(initialGlobalCheckpoint);

// Sync the translog
translog.sync();

// Verify that the globalCheckpointSynced is updated
assertEquals(initialGlobalCheckpoint, ((RemoteFsTranslog) translog).getLastSyncedCheckpoint().globalCheckpoint);

// Update global checkpoint
long newGlobalCheckpoint = 2L;
globalCheckpoint.set(newGlobalCheckpoint);

// Add a new operation and sync
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 2, primaryTerm.get(), new byte[] { 3 }));
translog.sync();

// Verify that the globalCheckpointSynced is updated to the new value
assertEquals(newGlobalCheckpoint, ((RemoteFsTranslog) translog).getLastSyncedCheckpoint().globalCheckpoint);
}

public void testSyncNeededWithGlobalCheckpointUpdate() throws IOException {
ArrayList<Translog.Operation> ops = new ArrayList<>();
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 }));

// Set initial global checkpoint
long initialGlobalCheckpoint = 0L;
globalCheckpoint.set(initialGlobalCheckpoint);

// Sync the translog
translog.sync();

// Verify that sync is not needed
assertFalse(translog.syncNeeded());

// Update global checkpoint
long newGlobalCheckpoint = 1L;
globalCheckpoint.set(newGlobalCheckpoint);

// Verify that sync is now needed due to global checkpoint update
assertTrue(translog.syncNeeded());

// Sync again
translog.sync();

// Verify that sync is not needed after syncing
assertFalse(translog.syncNeeded());
}

public void testGlobalCheckpointUpdateDuringClose() throws IOException {
ArrayList<Translog.Operation> ops = new ArrayList<>();
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 }));

// Set initial global checkpoint
long initialGlobalCheckpoint = 0L;
globalCheckpoint.set(initialGlobalCheckpoint);

// Sync the translog
translog.sync();

// Update global checkpoint
long newGlobalCheckpoint = 1L;
globalCheckpoint.set(newGlobalCheckpoint);

// Close the translog
translog.close();

// Verify that the last synced checkpoint includes the updated global checkpoint
assertEquals(newGlobalCheckpoint, ((RemoteFsTranslog) translog).getLastSyncedCheckpoint().globalCheckpoint);
}

public class ThrowingBlobRepository extends FsRepository {

private final Environment environment;
Expand Down
Loading