Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Refactor file cleanup logic and fix PIT/Scroll with remote store. #9111

Merged
merged 8 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ public void testIndexReopenClose() throws Exception {
}

public void testScrollWithConcurrentIndexAndSearch() throws Exception {
assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled());
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
final String primary = internalCluster().startDataOnlyNode();
final String replica = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -657,7 +656,6 @@ public void testDeleteOperations() throws Exception {
* from xlog.
*/
public void testReplicationPostDeleteAndForceMerge() throws Exception {
assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled());
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();
Expand Down Expand Up @@ -966,7 +964,6 @@ private void assertAllocationIdsInReplicaShardStats(Set<String> expected, Set<Se
* @throws Exception when issue is encountered
*/
public void testScrollCreatedOnReplica() throws Exception {
assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled());
// create the cluster with one primary node containing primary shard and replica node containing replica shard
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -1059,8 +1056,9 @@ public void testScrollCreatedOnReplica() throws Exception {
* @throws Exception when issue is encountered
*/
public void testScrollWithOngoingSegmentReplication() throws Exception {
// this test stubs transport calls specific to node-node replication.
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store yet.",
"Skipping the test as its not compatible with segment replication with remote store.",
segmentReplicationWithRemoteEnabled()
);

Expand Down Expand Up @@ -1188,10 +1186,6 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
}

public void testPitCreatedOnReplica() throws Exception {
assumeFalse(
"Skipping the test as it is flaky with remote store. Tracking issue https://github.com/opensearch-project/OpenSearch/issues/8850",
segmentReplicationWithRemoteEnabled()
);
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
Expand Down Expand Up @@ -1425,4 +1419,23 @@ public void testIndexWhileRecoveringReplica() throws Exception {
.get();
assertNoFailures(response);
}

public void testRestartPrimary_NoReplicas() throws Exception {
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellow(INDEX_NAME);

assertEquals(getNodeContainingPrimaryShard().getName(), primary);

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
if (randomBoolean()) {
flush(INDEX_NAME);
} else {
refresh(INDEX_NAME);
}

internalCluster().restartNode(primary);
ensureYellow(INDEX_NAME);
assertDocCounts(1, primary);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,8 @@ private void testPeerRecovery(int numberOfIterations, boolean invokeFlush) throw
.filter(rs -> rs.getRecoverySource().getType() == RecoverySource.Type.PEER)
.findFirst();
assertFalse(recoverySource.isEmpty());
if (numberOfIterations == 1 && invokeFlush) {
// segments_N file is copied to new replica
assertEquals(1, recoverySource.get().getIndex().recoveredFileCount());
} else {
assertEquals(0, recoverySource.get().getIndex().recoveredFileCount());
}
// segments_N file is copied to new replica
assertEquals(1, recoverySource.get().getIndex().recoveredFileCount());

IndexResponse response = indexSingleDoc(INDEX_NAME);
assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL) + 1, response.getSeqNo());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ public class NRTReplicationEngine extends Engine {
private final CompletionStatsCache completionStatsCache;
private final LocalCheckpointTracker localCheckpointTracker;
private final WriteOnlyTranslogManager translogManager;
private final boolean shouldCommit;
protected final ReplicaFileTracker replicaFileTracker;

private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED;
private volatile long lastReceivedPrimaryGen = SequenceNumbers.NO_OPS_PERFORMED;

private static final int SI_COUNTER_INCREMENT = 10;

Expand All @@ -69,7 +69,12 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
NRTReplicationReaderManager readerManager = null;
WriteOnlyTranslogManager translogManagerRef = null;
try {
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
this.replicaFileTracker = new ReplicaFileTracker(store::deleteQuiet);
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
// always protect latest commit on disk.
replicaFileTracker.incRef(this.lastCommittedSegmentInfos.files(true));
// cleanup anything not referenced by the latest infos.
cleanUnreferencedFiles();
readerManager = buildReaderManager();
final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
this.lastCommittedSegmentInfos.getUserData().entrySet()
Expand All @@ -85,7 +90,7 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) {
this.readerManager.addListener(listener);
}
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
final Map<String, String> userData = this.lastCommittedSegmentInfos.getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
translogManagerRef = new WriteOnlyTranslogManager(
engineConfig.getTranslogConfig(),
Expand Down Expand Up @@ -116,18 +121,21 @@ public void onAfterTranslogSync() {
engineConfig.getPrimaryModeSupplier()
);
this.translogManager = translogManagerRef;
this.shouldCommit = engineConfig.getIndexSettings().isRemoteStoreEnabled() == false;
} catch (IOException e) {
IOUtils.closeWhileHandlingException(store::decRef, readerManager, translogManagerRef);
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
}
}

public void cleanUnreferencedFiles() throws IOException {
replicaFileTracker.deleteUnreferencedFiles(store.directory().listAll());
}

private NRTReplicationReaderManager buildReaderManager() throws IOException {
return new NRTReplicationReaderManager(
OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId),
store::incRefFileDeleter,
store::decRefFileDeleter
replicaFileTracker::incRef,
replicaFileTracker::decRef
);
}

Expand All @@ -143,34 +151,36 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep
final long maxSeqNo = Long.parseLong(infos.userData.get(MAX_SEQ_NO));
final long incomingGeneration = infos.getGeneration();
readerManager.updateSegments(infos);

// Commit and roll the translog when we receive a different generation than what was last received.
// lower/higher gens are possible from a new primary that was just elected.
if (incomingGeneration != lastReceivedGen) {
// Ensure that we commit and clear the local translog if a new commit has been made on the primary.
// We do not compare against the last local commit gen here because it is possible to receive
// a lower gen from a newly elected primary shard that is behind this shard's last commit gen.
// In that case we still commit into the next local generation.
if (incomingGeneration != this.lastReceivedPrimaryGen) {
commitSegmentInfos();
translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo);
translogManager.rollTranslogGeneration();
}
lastReceivedGen = incomingGeneration;
this.lastReceivedPrimaryGen = incomingGeneration;
localCheckpointTracker.fastForwardProcessedSeqNo(maxSeqNo);
}
}

/**
* Persist the latest live SegmentInfos.
*
* This method creates a commit point from the latest SegmentInfos. It is intended to be used when this shard is about to be promoted as the new primary.
*
* TODO: If this method is invoked while the engine is currently updating segments on its reader, wait for that update to complete so the updated segments are used.
*
* This method creates a commit point from the latest SegmentInfos.
*
* @throws IOException - When there is an IO error committing the SegmentInfos.
*/
private void commitSegmentInfos(SegmentInfos infos) throws IOException {
if (shouldCommit) {
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
}
// get a reference to the previous commit files so they can be decref'd once a new commit is made.
final Collection<String> previousCommitFiles = getLastCommittedSegmentInfos().files(true);
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
// incref the latest on-disk commit.
replicaFileTracker.incRef(this.lastCommittedSegmentInfos.files(true));
// decref the prev commit.
replicaFileTracker.decRef(previousCommitFiles);
translogManager.syncTranslog();
}

Expand Down Expand Up @@ -379,21 +389,19 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
: "Either the write lock must be held or the engine must be currently be failing itself";
try {
// if remote store is enabled, all segments durably persisted
if (shouldCommit) {
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
*/
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
mch2 marked this conversation as resolved.
Show resolved Hide resolved
This is not required for remote store implementations given on failover the replica re-syncs with the store
during promotion.
*/
if (engineConfig.getIndexSettings().isRemoteStoreEnabled() == false) {
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
commitSegmentInfos(latestSegmentInfos);
} else {
store.directory().sync(List.of(store.directory().listAll()));
store.directory().syncMetaData();
}
commitSegmentInfos(latestSegmentInfos);
IOUtils.close(readerManager, translogManager, store::decRef);
} catch (Exception e) {
logger.warn("failed to close engine", e);
Expand Down Expand Up @@ -453,8 +461,8 @@ public synchronized GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
// incref all files
try {
final Collection<String> files = latestSegmentInfos.files(false);
store.incRefFileDeleter(files);
return new GatedCloseable<>(latestSegmentInfos, () -> store.decRefFileDeleter(files));
replicaFileTracker.incRef(files);
return new GatedCloseable<>(latestSegmentInfos, () -> { replicaFileTracker.decRef(files); });
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.engine;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;

/**
* This class is heavily influenced by Lucene's ReplicaFileDeleter class used to keep track of
* segment files that should be preserved on replicas between replication events.
*
* https://github.com/apache/lucene/blob/main/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaFileDeleter.java
*
* @opensearch.internal
*/
final class ReplicaFileTracker {

public static final Logger logger = LogManager.getLogger(ReplicaFileTracker.class);
private final Map<String, Integer> refCounts = new HashMap<>();
private final BiConsumer<String, String> fileDeleter;
private final Set<String> EXCLUDE_FILES = Set.of("write.lock");

public ReplicaFileTracker(BiConsumer<String, String> fileDeleter) {
this.fileDeleter = fileDeleter;
}

public synchronized void incRef(Collection<String> fileNames) {
for (String fileName : fileNames) {
refCounts.merge(fileName, 1, Integer::sum);
}
}

public synchronized int refCount(String file) {
return Optional.ofNullable(refCounts.get(file)).orElse(0);
}

public synchronized void decRef(Collection<String> fileNames) {
Set<String> toDelete = new HashSet<>();
for (String fileName : fileNames) {
Integer curCount = refCounts.get(fileName);
assert curCount != null : "fileName=" + fileName;
assert curCount > 0;
if (curCount == 1) {
refCounts.remove(fileName);
toDelete.add(fileName);
} else {
refCounts.put(fileName, curCount - 1);
}
}
if (toDelete.isEmpty() == false) {
delete(toDelete);
}
}

public void deleteUnreferencedFiles(String... toDelete) {
for (String file : toDelete) {
if (canDelete(file)) {
delete(file);
}
}
}

private synchronized void delete(Collection<String> toDelete) {
for (String fileName : toDelete) {
delete(fileName);
}
}

private synchronized void delete(String fileName) {
assert canDelete(fileName);
fileDeleter.accept("delete unreferenced", fileName);
}

private synchronized boolean canDelete(String fileName) {
return EXCLUDE_FILES.contains(fileName) == false && refCounts.containsKey(fileName) == false;
}

}
35 changes: 21 additions & 14 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.ThreadInterruptedException;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.core.Assertions;
import org.opensearch.ExceptionsHelper;
Expand Down Expand Up @@ -4657,7 +4656,13 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.init();

Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = remoteDirectory
.getSegmentsUploadedToRemoteStore();
.getSegmentsUploadedToRemoteStore()
.entrySet()
.stream()
// if this is a refresh level sync, ignore any segments_n uploaded to the store, we will commit the received infos bytes
// locally.
.filter(entry -> refreshLevelSegmentSync && entry.getKey().startsWith(IndexFileNames.SEGMENTS) == false)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
store.incRef();
remoteStore.incRef();
try {
Expand All @@ -4678,19 +4683,21 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal);

if (refreshLevelSegmentSync && remoteSegmentMetadata != null) {
mch2 marked this conversation as resolved.
Show resolved Hide resolved
try (
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput(
new ByteArrayIndexInput("Snapshot of SegmentInfos", remoteSegmentMetadata.getSegmentInfosBytes())
);
) {
SegmentInfos infosSnapshot = SegmentInfos.readCommit(
store.directory(),
indexInput,
remoteSegmentMetadata.getGeneration()
);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
final SegmentInfos infosSnapshot = store.buildSegmentInfos(
remoteSegmentMetadata.getSegmentInfosBytes(),
remoteSegmentMetadata.getGeneration()
);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes.
// Extra segments will be wiped on engine open.
for (String file : List.of(store.directory().listAll())) {
mch2 marked this conversation as resolved.
Show resolved Hide resolved
if (file.startsWith(IndexFileNames.SEGMENTS)) {
store.deleteQuiet(file);
}
}
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
mch2 marked this conversation as resolved.
Show resolved Hide resolved
: "There should not be any segments file in the dir";
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
} catch (IOException e) {
throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e);
Expand Down
Loading