Skip to content

Commit

Permalink
Maintaing separate combined deletion policy for child level index writer
Browse files Browse the repository at this point in the history
  • Loading branch information
RS146BIJAY committed Oct 25, 2024
1 parent 908b5ca commit 0d2dde5
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void setupSuiteScopeCluster() throws Exception {
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_CONTEXT_AWARE_ENABLED, true)
).setMapping("type", "type=keyword", "num", "type=integer", "score", "type=integer")
);
waitForRelocation(ClusterHealthStatus.GREEN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public void onCommit(List<? extends IndexCommit> commits) throws IOException {
safeCommit = this.safeCommit;
}

System.out.println("Local checkpoint set for safe commit " + safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
assert Thread.holdsLock(this) == false : "should not block concurrent acquire or relesase";
safeCommitInfo = new SafeCommitInfo(
Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)),
Expand Down Expand Up @@ -152,6 +153,7 @@ private void updateRetentionPolicy() throws IOException {
assert safeCommit.isDeleted() == false : "The safe commit must not be deleted";
assert lastCommit.isDeleted() == false : "The last commit must not be deleted";
final long localCheckpointOfSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
System.out.println("Local checkpoint set for safe commit inside updateRetentionPolicy " + localCheckpointOfSafeCommit);
softDeletesPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -221,6 +222,7 @@ public class InternalEngine extends Engine {

@Nullable
private final String historyUUID;
private final TranslogDeletionPolicy translogDeletionPolicy;

/**
* UUID value that is updated every time the engine is force merged.
Expand Down Expand Up @@ -252,7 +254,7 @@ public TranslogManager translogManager() {
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
updateAutoIdTimestamp(Long.MAX_VALUE, true);
}
final TranslogDeletionPolicy translogDeletionPolicy = getTranslogDeletionPolicy(engineConfig);
this.translogDeletionPolicy = getTranslogDeletionPolicy(engineConfig);
store.incRef();
IndexWriter writer = null;
ExternalReaderManager externalReaderManager = null;
Expand Down Expand Up @@ -341,7 +343,7 @@ public void onFailure(String reason, Exception ex) {
externalReaderManager = createReaderManager(new RefreshWarmerListener(logger, isClosed, engineConfig), parentIndexWriter);
if (engineConfig.isContextAwareEnabled()) {
for (IndexWriter groupLevelIndexWriter : criteriaBasedIndexWriters.values()) {
groupLevelExternalReaderManagers.add(createReaderManager(new RefreshWarmerListener(logger, isClosed, engineConfig), groupLevelIndexWriter));
groupLevelExternalReaderManagers.add(createReaderManager(new RefreshWarmerListener(logger, new AtomicBoolean(isClosed.get()), engineConfig), groupLevelIndexWriter));
}
}

Expand Down Expand Up @@ -401,6 +403,7 @@ private LocalCheckpointTracker createLocalCheckpointTracker(
return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
}

// TODO: Fix this for multiple IndexWriter??
private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
final Map<String, String> commitUserData = store.readLastCommittedSegmentsInfo().userData;
final long lastMinRetainedSeqNo;
Expand Down Expand Up @@ -1166,7 +1169,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws I
try {
IndexWriter currentIndexWriter;
String criteria = getGroupingCriteriaForDoc(index.docs());
if (criteria.equals("0")) {
if (!config().isContextAwareEnabled() || criteria.equals("0")) {
currentIndexWriter = parentIndexWriter;
} else {
currentIndexWriter = criteriaBasedIndexWriters.get(criteria);
Expand Down Expand Up @@ -1898,17 +1901,18 @@ final boolean refresh(String source, SearcherScope scope, boolean block) throws
// it is intentional that we never refresh both internal / external together
// Refresh both child and Parent ReaderManager together.
if (block) {
referenceManager.maybeRefreshBlocking();
for (ReferenceManager<OpenSearchDirectoryReader> groupLevelExternalReaderManager: groupLevelExternalReaderManagers) {
groupLevelExternalReaderManager.maybeRefreshBlocking();
}

referenceManager.maybeRefreshBlocking();
refreshed = true;
} else {
refreshed = referenceManager.maybeRefresh();
for (ReferenceManager<OpenSearchDirectoryReader> groupLevelExternalReaderManager: groupLevelExternalReaderManagers) {
groupLevelExternalReaderManager.maybeRefresh();
}

refreshed = referenceManager.maybeRefresh();
}
} catch (Exception ex) {
throw new IOException("Failed refresh", ex);
Expand Down Expand Up @@ -2037,6 +2041,15 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
commitIndexWriter(indexWriter, translogManager.getTranslogUUID());
}
}

try (StandardDirectoryReader r1 = (StandardDirectoryReader) StandardDirectoryReader.open(criteriaBasedIndexWriters.get("400"));
StandardDirectoryReader r2 = (StandardDirectoryReader) StandardDirectoryReader.open(criteriaBasedIndexWriters.get("200"))) {
parentIndexWriter.addIndexes(r1.getSegmentInfos(), r2.getSegmentInfos());
}
final Map<String, String> userData = getParentCommitData(translogManager.getTranslogUUID());
SegmentInfos latestSegmentInfos = parentIndexWriter.getSegmentInfos();
latestSegmentInfos.setUserData(userData, true);
latestSegmentInfos.commit(store.directory());
} else {
commitIndexWriter(parentIndexWriter, translogManager.getTranslogUUID());
}
Expand Down Expand Up @@ -2502,8 +2515,12 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
internalReaderManager.removeListener(versionMap);
}
try {
for (ReferenceManager<OpenSearchDirectoryReader> groupLevelReaderManager: groupLevelExternalReaderManagers) {
ExternalReaderManager groupLevelExternalReaderManager = (ExternalReaderManager) groupLevelReaderManager;
IOUtils.close(groupLevelExternalReaderManager, groupLevelExternalReaderManager.internalReaderManager);
}

IOUtils.close(externalReaderManager, internalReaderManager);
IOUtils.close(groupLevelExternalReaderManagers);
} catch (Exception e) {
logger.warn("Failed to close ReaderManager", e);
}
Expand All @@ -2523,15 +2540,20 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
}
}

if (config().isContextAwareEnabled()) {
// TODO: This is needed to keep pending num docs in sync.
try (StandardDirectoryReader r1 = (StandardDirectoryReader) StandardDirectoryReader.open(criteriaBasedIndexWriters.get("400"));
StandardDirectoryReader r2 = (StandardDirectoryReader) StandardDirectoryReader.open(criteriaBasedIndexWriters.get("200"))) {
parentIndexWriter.addIndexes(r1.getSegmentInfos(), r2.getSegmentInfos());
}
// TODO: Rollback SegmentInfos of parent IndexWriter.
// if (config().isContextAwareEnabled()) {
// // TODO: This is needed to keep pending num docs in sync.
// parentIndexWriter.addIndexes(criteriaBasedIndexWriters.get("400").getSegmentInfos(),
// criteriaBasedIndexWriters.get("200").getSegmentInfos());
// parentIndexWriter.close();
// } else {
// parentIndexWriter.rollback();
// }

if (!config().isContextAwareEnabled()) {
parentIndexWriter.rollback();
}

parentIndexWriter.rollback();
logger.trace("rollback indexWriter done");
} catch (Exception e) {
logger.warn("failed to rollback writer on close", e);
Expand Down Expand Up @@ -2568,7 +2590,7 @@ private void populateCriteriaIndexWriters() throws IOException {

private IndexWriter createWriter() throws IOException {
try {
final IndexWriterConfig iwc = getIndexWriterConfig(mergeSchedulerCriteriaMap.get("-1"), null);
final IndexWriterConfig iwc = getIndexWriterConfig(mergeSchedulerCriteriaMap.get("-1"), null, combinedDeletionPolicy);
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
return createWriter(store.directory(), iwc);
} catch (LockObtainFailedException ex) {
Expand All @@ -2586,7 +2608,18 @@ IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOEx
}
}

private IndexWriterConfig getIndexWriterConfig(OpenSearchConcurrentMergeScheduler mergeScheduler, AtomicLong seqNo) {
private IndexWriterConfig getIndexWriterConfig(OpenSearchConcurrentMergeScheduler mergeScheduler, AtomicLong seqNo) throws IOException {
CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(
logger,
translogDeletionPolicy,
softDeletesPolicy,
translogManager::getLastSyncedGlobalCheckpoint
);

return getIndexWriterConfig(mergeScheduler, seqNo, combinedDeletionPolicy);
}

private IndexWriterConfig getIndexWriterConfig(OpenSearchConcurrentMergeScheduler mergeScheduler, AtomicLong seqNo, CombinedDeletionPolicy combinedDeletionPolicy) {
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
Expand Down Expand Up @@ -2865,6 +2898,22 @@ protected void commitIndexWriter(final IndexWriter writer, final String translog
}
}

private Map<String, String> getParentCommitData(final String translogUUID) {
final Map<String, String> commitData = new HashMap<>(7);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpointTracker.getProcessedCheckpoint()));
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
commitData.put(HISTORY_UUID_KEY, historyUUID);
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
final String currentForceMergeUUID = forceMergeUUID;
if (currentForceMergeUUID != null) {
commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID);
}

return commitData;
}

@Override
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
for (OpenSearchConcurrentMergeScheduler mergeScheduler: mergeSchedulerCriteriaMap.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand All @@ -53,27 +54,27 @@
*/
final class SoftDeletesPolicy {
private final LongSupplier globalCheckpointSupplier;
private long localCheckpointOfSafeCommit;
private AtomicLong localCheckpointOfSafeCommit;
// This lock count is used to prevent `minRetainedSeqNo` from advancing.
private int retentionLockCount;
// The extra number of operations before the global checkpoint are retained
private long retentionOperations;
// The min seq_no value that is retained - ops after this seq# should exist in the Lucene index.
private long minRetainedSeqNo;
private AtomicLong minRetainedSeqNo;
// provides the retention leases used to calculate the minimum sequence number to retain
private final Supplier<RetentionLeases> retentionLeasesSupplier;

SoftDeletesPolicy(
final LongSupplier globalCheckpointSupplier,
final long minRetainedSeqNo,
final long lastMinRetainedSeqNo,
final long retentionOperations,
final Supplier<RetentionLeases> retentionLeasesSupplier
) {
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.retentionOperations = retentionOperations;
this.minRetainedSeqNo = minRetainedSeqNo;
this.minRetainedSeqNo = new AtomicLong(lastMinRetainedSeqNo);
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
this.localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED;
this.localCheckpointOfSafeCommit = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
this.retentionLockCount = 0;
}

Expand All @@ -86,21 +87,25 @@ synchronized void setRetentionOperations(long retentionOperations) {
}

/**
* Sets the local checkpoint of the current safe commit
* Sets the local checkpoint of the current safe commit. This can be different for different IndexWriter.
* TODO: With multiple IndexWriter LocalCheckPointOfSafeCommit can go backward as well. Can removing assertions here
* cause issues??
*/
synchronized void setLocalCheckpointOfSafeCommit(long newCheckpoint) {
if (newCheckpoint < this.localCheckpointOfSafeCommit) {
throw new IllegalArgumentException(
"Local checkpoint can't go backwards; "
+ "new checkpoint ["
+ newCheckpoint
+ "],"
+ "current checkpoint ["
+ localCheckpointOfSafeCommit
+ "]"
);
}
this.localCheckpointOfSafeCommit = newCheckpoint;
// if (newCheckpoint < this.localCheckpointOfSafeCommit) {
// throw new IllegalArgumentException(
// "Local checkpoint can't go backwards; "
// + "new checkpoint ["
// + newCheckpoint
// + "],"
// + "current checkpoint ["
// + localCheckpointOfSafeCommit
// + "]"
// );
// }

// Consider maximum value of localCheckpointOfSafeCommits across group level IndexWriters.
this.localCheckpointOfSafeCommit.set(Math.max(newCheckpoint, localCheckpointOfSafeCommit.get()));
}

/**
Expand Down Expand Up @@ -162,15 +167,16 @@ synchronized long getMinRetainedSeqNo() {
1 + globalCheckpointSupplier.getAsLong() - retentionOperations,
minimumRetainingSequenceNumber
);
final long minSeqNoToRetain = Math.min(minSeqNoForQueryingChanges, 1 + localCheckpointOfSafeCommit);
final long minSeqNoToRetain = Math.min(minSeqNoForQueryingChanges, 1 + localCheckpointOfSafeCommit.get());

/*
* We take the maximum as minSeqNoToRetain can go backward as the retention operations value can be changed in settings, or from
* the addition of leases with a retaining sequence number lower than previous retaining sequence numbers.
*/
minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain);

minRetainedSeqNo.set(Math.max(minRetainedSeqNo.get(), minSeqNoToRetain));
}
return minRetainedSeqNo;
return minRetainedSeqNo.get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
* Defines a translog deletion policy
Expand All @@ -68,7 +69,7 @@ public void assertNoOpenTranslogRefs() {
* translog generation
*/
private final Map<Long, Counter> translogRefCounts = new HashMap<>();
private long localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED;
private final AtomicLong localCheckpointOfSafeCommit = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

public TranslogDeletionPolicy() {
if (Assertions.ENABLED) {
Expand All @@ -79,17 +80,17 @@ public TranslogDeletionPolicy() {
}

public synchronized void setLocalCheckpointOfSafeCommit(long newCheckpoint) {
if (newCheckpoint < this.localCheckpointOfSafeCommit) {
throw new IllegalArgumentException(
"local checkpoint of the safe commit can't go backwards: "
+ "current ["
+ this.localCheckpointOfSafeCommit
+ "] new ["
+ newCheckpoint
+ "]"
);
}
this.localCheckpointOfSafeCommit = newCheckpoint;
// if (newCheckpoint < this.localCheckpointOfSafeCommit) {
// throw new IllegalArgumentException(
// "local checkpoint of the safe commit can't go backwards: "
// + "current ["
// + this.localCheckpointOfSafeCommit
// + "] new ["
// + newCheckpoint
// + "]"
// );
// }
this.localCheckpointOfSafeCommit.set(Math.max(newCheckpoint, localCheckpointOfSafeCommit.get()));
}

public abstract void setRetentionSizeInBytes(long bytes);
Expand Down Expand Up @@ -204,7 +205,7 @@ protected long getMinTranslogGenRequiredByLocks() {
* Returns the local checkpoint of the safe commit. This value is used to calculate the min required generation for recovery.
*/
public synchronized long getLocalCheckpointOfSafeCommit() {
return localCheckpointOfSafeCommit;
return localCheckpointOfSafeCommit.get();
}

synchronized long getTranslogRefCount(long gen) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public void testWhenRetentionLeasesDictateThePolicy() {
version,
Collections.unmodifiableCollection(new ArrayList<>(leases))
);
final SoftDeletesPolicy policy = new SoftDeletesPolicy(globalCheckpoint::get, 0, retentionOperations, leasesSupplier);
final SoftDeletesPolicy policy = new SoftDeletesPolicy(globalCheckpoint::get,0, retentionOperations, leasesSupplier);
policy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
assertThat(policy.getMinRetainedSeqNo(), equalTo(minimumRetainingSequenceNumber.getAsLong()));
}
Expand Down

0 comments on commit 0d2dde5

Please sign in to comment.