Skip to content

Commit

Permalink
[Segment Replication] Prevent store clean up post reader close and re…
Browse files Browse the repository at this point in the history
…factor (#8463)

* [Segment Replication] Prevent store clean up on reader close action

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* [Segment Replication] Self review

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comment

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comments & refactor

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Comment

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Fix unit test

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Unit test to verify temporary files are not deleted from commits

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Compilation error fix

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Javadoc

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Skip testScrollWithConcurrentIndexAndSearch with remote store

Signed-off-by: Suraj Singh <surajrider@gmail.com>

---------

Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 authored Jul 8, 2023
1 parent 2bced5d commit 516685d
Show file tree
Hide file tree
Showing 10 changed files with 480 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
Expand Down Expand Up @@ -79,6 +80,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
Expand Down Expand Up @@ -272,6 +274,59 @@ public void testIndexReopenClose() throws Exception {
verifyStoreContent();
}

public void testScrollWithConcurrentIndexAndSearch() throws Exception {
assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled());
final String primary = internalCluster().startDataOnlyNode();
final String replica = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
final List<ActionFuture<SearchResponse>> pendingSearchResponse = new ArrayList<>();
final int searchCount = randomIntBetween(10, 20);
final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());

for (int i = 0; i < searchCount; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(refreshPolicy)
.setSource("field", "value" + i)
.execute()
);
flush(INDEX_NAME);
forceMerge();
}

final SearchResponse searchResponse = client().prepareSearch()
.setQuery(matchAllQuery())
.setIndices(INDEX_NAME)
.setRequestCache(false)
.setScroll(TimeValue.timeValueDays(1))
.setSize(10)
.get();

for (int i = searchCount; i < searchCount * 2; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(refreshPolicy)
.setSource("field", "value" + i)
.execute()
);
}
flush(INDEX_NAME);
forceMerge();
client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get();

assertBusy(() -> {
client().admin().indices().prepareRefresh().execute().actionGet();
assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone));
assertTrue(pendingSearchResponse.stream().allMatch(ActionFuture::isDone));
}, 1, TimeUnit.MINUTES);
verifyStoreContent();
waitForSearchableDocs(INDEX_NAME, 2 * searchCount, List.of(primary, replica));
}

public void testMultipleShards() throws Exception {
Settings indexSettings = Settings.builder()
.put(super.indexSettings())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,7 @@ private NRTReplicationReaderManager buildReaderManager() throws IOException {
return new NRTReplicationReaderManager(
OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId),
store::incRefFileDeleter,
(files) -> {
store.decRefFileDeleter(files);
try {
store.cleanupAndPreserveLatestCommitPoint(
"On reader closed",
getLatestSegmentInfos(),
getLastCommittedSegmentInfos(),
false
);
} catch (IOException e) {
// Log but do not rethrow - we can try cleaning up again after next replication cycle.
// If that were to fail, the shard will as well.
logger.error("Unable to clean store after reader closed", e);
}
}
store::decRefFileDeleter
);
}

Expand All @@ -147,9 +133,9 @@ public TranslogManager translogManager() {
}

public synchronized void updateSegments(final SegmentInfos infos) throws IOException {
// Update the current infos reference on the Engine's reader.
ensureOpen();
try (ReleasableLock lock = writeLock.acquire()) {
// Update the current infos reference on the Engine's reader.
ensureOpen();
final long maxSeqNo = Long.parseLong(infos.userData.get(MAX_SEQ_NO));
final long incomingGeneration = infos.getGeneration();
readerManager.updateSegments(infos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
for (LeafReaderContext ctx : standardDirectoryReader.leaves()) {
subs.add(ctx.reader());
}
// Segment_n here is ignored because it is either already committed on disk as part of previous commit point or
// does not yet exist on store (not yet committed)
final Collection<String> files = currentInfos.files(false);
DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null);
final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper(
Expand Down
117 changes: 68 additions & 49 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BufferedChecksum;
import org.apache.lucene.store.BufferedChecksumIndexInput;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
Expand All @@ -64,7 +65,7 @@
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.Version;
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.Nullable;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.UUIDs;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -789,69 +790,33 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr

/**
* Segment Replication method
* This method deletes every file in this store that is not referenced by the passed in SegmentInfos or
* part of the latest on-disk commit point.
* This method deletes files in store that are not referenced by latest on-disk commit point
*
* This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file.
* In this case files from both snapshots must be preserved. Verification has been done that all files are present on disk.
* @param reason the reason for this cleanup operation logged for each deleted file
* @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present.
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos) throws IOException {
this.cleanupAndPreserveLatestCommitPoint(reason, infos, readLastCommittedSegmentsInfo(), true);
}

/**
* Segment Replication method
*
* Similar to {@link Store#cleanupAndPreserveLatestCommitPoint(String, SegmentInfos)} with extra parameters for cleanup
*
* This method deletes every file in this store. Except
* 1. Files referenced by the passed in SegmentInfos, usually in-memory segment infos copied from primary
* 2. Files part of the passed in segment infos, typically the last committed segment info
* 3. Files incremented by active reader for pit/scroll queries
* 4. Temporary replication file if passed in deleteTempFiles is true.
*
* @param reason the reason for this cleanup operation logged for each deleted file
* @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present.
* @param lastCommittedSegmentInfos {@link SegmentInfos} Last committed segment infos
* @param deleteTempFiles Does this clean up delete temporary replication files
* @param fileToConsiderForCleanUp Files to consider for clean up.
*
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
* @throws IOException Exception on locking.
*/
public void cleanupAndPreserveLatestCommitPoint(
String reason,
SegmentInfos infos,
SegmentInfos lastCommittedSegmentInfos,
boolean deleteTempFiles
) throws IOException {
public void cleanupAndPreserveLatestCommitPoint(Collection<String> fileToConsiderForCleanUp, String reason) throws IOException {
assert indexSettings.isSegRepEnabled();
// fetch a snapshot from the latest on disk Segments_N file. This can be behind
// the passed in local in memory snapshot, so we want to ensure files it references are not removed.
metadataLock.writeLock().lock();
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
cleanupFiles(reason, lastCommittedSegmentInfos.files(true), infos.files(true), deleteTempFiles);
cleanupFiles(fileToConsiderForCleanUp, reason, this.readLastCommittedSegmentsInfo().files(true));
} finally {
metadataLock.writeLock().unlock();
}
}

private void cleanupFiles(
String reason,
Collection<String> localSnapshot,
@Nullable Collection<String> additionalFiles,
boolean deleteTempFiles
) throws IOException {
private void cleanupFiles(Collection<String> filesToConsiderForCleanup, String reason, Collection<String> lastCommittedSegmentInfos) {
assert metadataLock.isWriteLockedByCurrentThread();
for (String existingFile : directory.listAll()) {
if (Store.isAutogenerated(existingFile)
|| localSnapshot != null && localSnapshot.contains(existingFile)
|| (additionalFiles != null && additionalFiles.contains(existingFile))
// also ensure we are not deleting a file referenced by an active reader.
for (String existingFile : filesToConsiderForCleanup) {
if (Store.isAutogenerated(existingFile) || lastCommittedSegmentInfos != null && lastCommittedSegmentInfos.contains(existingFile)
// also ensure we are not deleting a file referenced by an active reader.
|| replicaFileTracker != null && replicaFileTracker.canDelete(existingFile) == false
// prevent temporary file deletion during reader cleanup
|| deleteTempFiles == false && existingFile.startsWith(REPLICATION_PREFIX)) {
// Prevent temporary replication files as it should be cleaned up MultiFileWriter
|| existingFile.startsWith(REPLICATION_PREFIX)) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
Expand All @@ -871,6 +836,53 @@ private void cleanupFiles(
}
}

/**
* Segment replication method
*
* This method takes the segment info bytes to build SegmentInfos. It inc'refs files pointed by passed in SegmentInfos
* bytes to ensure they are not deleted.
*
* @param tmpToFileName Map of temporary replication file to actual file name
* @param infosBytes bytes[] of SegmentInfos supposed to be sent over by primary excluding segment_N file
* @param segmentsGen segment generation number
* @param consumer consumer for generated SegmentInfos
* @throws IOException Exception while reading store and building segment infos
*/
public void buildInfosFromBytes(
Map<String, String> tmpToFileName,
byte[] infosBytes,
long segmentsGen,
CheckedConsumer<SegmentInfos, IOException> consumer
) throws IOException {
metadataLock.writeLock().lock();
try {
final List<String> values = new ArrayList<>(tmpToFileName.values());
incRefFileDeleter(values);
try {
renameTempFilesSafe(tmpToFileName);
consumer.accept(buildSegmentInfos(infosBytes, segmentsGen));
} finally {
decRefFileDeleter(values);
}
} finally {
metadataLock.writeLock().unlock();
}
}

private SegmentInfos buildSegmentInfos(byte[] infosBytes, long segmentsGen) throws IOException {
try (final ChecksumIndexInput input = toIndexInput(infosBytes)) {
return SegmentInfos.readCommit(directory, input, segmentsGen);
}
}

/**
* This method formats byte[] containing the primary's SegmentInfos into lucene's {@link ChecksumIndexInput} that can be
* passed to SegmentInfos.readCommit
*/
private ChecksumIndexInput toIndexInput(byte[] input) {
return new BufferedChecksumIndexInput(new ByteArrayIndexInput("Snapshot of SegmentInfos", input));
}

// pkg private for testing
final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot targetMetadata) {
final RecoveryDiff recoveryDiff = targetMetadata.recoveryDiff(sourceMetadata);
Expand Down Expand Up @@ -945,7 +957,7 @@ public void commitSegmentInfos(SegmentInfos latestSegmentInfos, long maxSeqNo, l
latestSegmentInfos.commit(directory());
directory.sync(latestSegmentInfos.files(true));
directory.syncMetaData();
cleanupAndPreserveLatestCommitPoint("After commit", latestSegmentInfos);
cleanupAndPreserveLatestCommitPoint(List.of(this.directory.listAll()), "After commit");
} finally {
metadataLock.writeLock().unlock();
}
Expand Down Expand Up @@ -1961,6 +1973,13 @@ public void incRefFileDeleter(Collection<String> files) {
public void decRefFileDeleter(Collection<String> files) {
if (this.indexSettings.isSegRepEnabled()) {
this.replicaFileTracker.decRef(files);
try {
this.cleanupAndPreserveLatestCommitPoint(files, "On reader close");
} catch (IOException e) {
// Log but do not rethrow - we can try cleaning up again after next replication cycle.
// If that were to fail, the shard will as well.
logger.error("Unable to clean store after reader closed", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ String getTempNameForFile(String origFile) {
return tempFilePrefix + origFile;
}

public Map<String, String> getTempFileNames() {
return tempFileNames;
}

public IndexOutput getOpenIndexOutput(String key) {
ensureOpen.run();
return openIndexOutputs.get(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.UUIDs;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.lucene.Lucene;
Expand Down Expand Up @@ -221,18 +222,15 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse,
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
Store store = null;
try {
multiFileWriter.renameAllTempFiles();
store = store();
store.incRef();
// Deserialize the new SegmentInfos object sent from the primary.
final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint();
SegmentInfos infos = SegmentInfos.readCommit(
store.directory(),
toIndexInput(checkpointInfoResponse.getInfosBytes()),
responseCheckpoint.getSegmentsGen()
CheckedConsumer<SegmentInfos, IOException> finalizeReplication = indexShard::finalizeReplication;
store.buildInfosFromBytes(
multiFileWriter.getTempFileNames(),
checkpointInfoResponse.getInfosBytes(),
checkpointInfoResponse.getCheckpoint().getSegmentsGen(),
finalizeReplication
);
cancellableThreads.checkForCancel();
indexShard.finalizeReplication(infos);
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -160,6 +161,38 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti
}
}

public void testSimultaneousEngineCloseAndCommit() throws IOException, InterruptedException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
CountDownLatch latch = new CountDownLatch(1);
Thread commitThread = new Thread(() -> {
try {
nrtEngine.updateSegments(store.readLastCommittedSegmentsInfo());
latch.countDown();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
Thread closeThread = new Thread(() -> {
try {
latch.await();
nrtEngine.close();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
commitThread.start();
closeThread.start();
commitThread.join();
closeThread.join();
}
}

public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

Expand Down
Loading

0 comments on commit 516685d

Please sign in to comment.