Skip to content

Commit eb68485

Browse files
committed
Prune only gc deletes below local checkpoint (#28790)
Once a document is deleted and Lucene is refreshed, we will not be able to look up the `version/seq#` associated with that delete in Lucene. As conflicting operations can still be indexed, we need another mechanism to remember these deletes. Therefore deletes should still be stored in the Version Map, even after Lucene is refreshed. Obviously, we can't remember all deletes forever so a trimming mechanism is needed. Currently, we remember deletes for at least 1 minute (the default GC deletes cycle) and clean them periodically. This is, at the moment, the best we can do on the primary for user facing APIs but this arbitrary time limit is problematic for replicas. Furthermore, we can't rely on the primary and replicas doing the trimming in a synchronized manner, and failing to do so results in the replica and primary making different decisions. The following scenario can cause inconsistency between primary and replica. 1. Primary index doc (index, id=1, v2) 2. Network packet issue causes index operation to back off and wait 3. Primary deletes doc (delete, id=1, v3) 4. Replica processes delete (delete, id=1, v3) 5. 1+ minute passes (GC deletes runs replica) 6. Indexing op is finally sent to the replica which no processes it because it forgot about the delete. We can reply on sequence-numbers to prevent this issue. If we prune only deletes whose seqno at most the local checkpoint, a replica will correctly remember what it needs. The correctness is explained as follows: Suppose o1 and o2 are two operations on the same document with seq#(o1) < seq#(o2), and o2 arrives before o1 on the replica. o2 is processed normally since it arrives first; when o1 arrives it should be discarded: 1. If seq#(o1) <= LCP, then it will be not be added to Lucene, as it was already previously added. 2. If seq#(o1) > LCP, then it depends on the nature of o2: - If o2 is a delete then its seq# is recorded in the VersionMap, since seq#(o2) > seq#(o1) > LCP, so a lookup can find it and determine that o1 is stale. - If o2 is an indexing then its seq# is either in Lucene (if refreshed) or the VersionMap (if not refreshed yet), so a real-time lookup can find it and determine that o1 is stale. In this PR, we prefer to deploy a single trimming strategy, which satisfies both requirements, on primary and replicas because: - It's simpler - no need to distinguish if an engine is running at primary mode or replica mode or being promoted. - If a replica subsequently is promoted, user experience is fully maintained as that replica remembers deletes for the last GC cycle. However, the version map may consume less memory if we deploy two different trimming strategies for primary and replicas.
1 parent 1b7c4c3 commit eb68485

File tree

7 files changed

+211
-33
lines changed

7 files changed

+211
-33
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import java.nio.file.Path;
8383
import java.util.ArrayList;
8484
import java.util.Arrays;
85+
import java.util.Collection;
8586
import java.util.HashMap;
8687
import java.util.List;
8788
import java.util.Map;
@@ -1717,15 +1718,41 @@ public void trimTranslog() throws EngineException {
17171718
}
17181719

17191720
private void pruneDeletedTombstones() {
1721+
/*
1722+
* We need to deploy two different trimming strategies for GC deletes on primary and replicas. Delete operations on primary
1723+
* are remembered for at least one GC delete cycle and trimmed periodically. This is, at the moment, the best we can do on
1724+
* primary for user facing APIs but this arbitrary time limit is problematic for replicas. On replicas however we should
1725+
* trim only deletes whose seqno at most the local checkpoint. This requirement is explained as follows.
1726+
*
1727+
* Suppose o1 and o2 are two operations on the same document with seq#(o1) < seq#(o2), and o2 arrives before o1 on the replica.
1728+
* o2 is processed normally since it arrives first; when o1 arrives it should be discarded:
1729+
* - If seq#(o1) <= LCP, then it will be not be added to Lucene, as it was already previously added.
1730+
* - If seq#(o1) > LCP, then it depends on the nature of o2:
1731+
* *) If o2 is a delete then its seq# is recorded in the VersionMap, since seq#(o2) > seq#(o1) > LCP,
1732+
* so a lookup can find it and determine that o1 is stale.
1733+
* *) If o2 is an indexing then its seq# is either in Lucene (if refreshed) or the VersionMap (if not refreshed yet),
1734+
* so a real-time lookup can find it and determine that o1 is stale.
1735+
*
1736+
* Here we prefer to deploy a single trimming strategy, which satisfies two constraints, on both primary and replicas because:
1737+
* - It's simpler - no need to distinguish if an engine is running at primary mode or replica mode or being promoted.
1738+
* - If a replica subsequently is promoted, user experience is maintained as that replica remembers deletes for the last GC cycle.
1739+
*
1740+
* However, the version map may consume less memory if we deploy two different trimming strategies for primary and replicas.
1741+
*/
17201742
final long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
1721-
versionMap.pruneTombstones(timeMSec, engineConfig.getIndexSettings().getGcDeletesInMillis());
1743+
final long maxTimestampToPrune = timeMSec - engineConfig.getIndexSettings().getGcDeletesInMillis();
1744+
versionMap.pruneTombstones(maxTimestampToPrune, localCheckpointTracker.getCheckpoint());
17221745
lastDeleteVersionPruneTimeMSec = timeMSec;
17231746
}
17241747

17251748
// testing
17261749
void clearDeletedTombstones() {
1727-
// clean with current time Long.MAX_VALUE and interval 0 since we use a greater than relationship here.
1728-
versionMap.pruneTombstones(Long.MAX_VALUE, 0);
1750+
versionMap.pruneTombstones(Long.MAX_VALUE, localCheckpointTracker.getMaxSeqNo());
1751+
}
1752+
1753+
// for testing
1754+
final Collection<DeleteVersionValue> getDeletedTombstones() {
1755+
return versionMap.getAllTombstones().values();
17291756
}
17301757

17311758
@Override

server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -375,21 +375,25 @@ void removeTombstoneUnderLock(BytesRef uid) {
375375
}
376376
}
377377

378-
private boolean canRemoveTombstone(long currentTime, long pruneInterval, DeleteVersionValue versionValue) {
379-
// check if the value is old enough to be removed
380-
final boolean isTooOld = currentTime - versionValue.time > pruneInterval;
378+
private boolean canRemoveTombstone(long maxTimestampToPrune, long maxSeqNoToPrune, DeleteVersionValue versionValue) {
379+
// check if the value is old enough and safe to be removed
380+
final boolean isTooOld = versionValue.time < maxTimestampToPrune;
381+
final boolean isSafeToPrune = versionValue.seqNo <= maxSeqNoToPrune;
381382
// version value can't be removed it's
382383
// not yet flushed to lucene ie. it's part of this current maps object
383384
final boolean isNotTrackedByCurrentMaps = versionValue.time < maps.getMinDeleteTimestamp();
384-
return isTooOld && isNotTrackedByCurrentMaps;
385+
return isTooOld && isSafeToPrune && isNotTrackedByCurrentMaps;
385386
}
386387

387-
void pruneTombstones(long currentTime, long pruneInterval) {
388+
/**
389+
* Try to prune tombstones whose timestamp is less than maxTimestampToPrune and seqno at most the maxSeqNoToPrune.
390+
*/
391+
void pruneTombstones(long maxTimestampToPrune, long maxSeqNoToPrune) {
388392
for (Map.Entry<BytesRef, DeleteVersionValue> entry : tombstones.entrySet()) {
389393
// we do check before we actually lock the key - this way we don't need to acquire the lock for tombstones that are not
390394
// prune-able. If the tombstone changes concurrently we will re-read and step out below since if we can't collect it now w
391395
// we won't collect the tombstone below since it must be newer than this one.
392-
if (canRemoveTombstone(currentTime, pruneInterval, entry.getValue())) {
396+
if (canRemoveTombstone(maxTimestampToPrune, maxSeqNoToPrune, entry.getValue())) {
393397
final BytesRef uid = entry.getKey();
394398
try (Releasable lock = keyedLock.tryAcquire(uid)) {
395399
// we use tryAcquire here since this is a best effort and we try to be least disruptive
@@ -399,7 +403,7 @@ void pruneTombstones(long currentTime, long pruneInterval) {
399403
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
400404
final DeleteVersionValue versionValue = tombstones.get(uid);
401405
if (versionValue != null) {
402-
if (canRemoveTombstone(currentTime, pruneInterval, versionValue)) {
406+
if (canRemoveTombstone(maxTimestampToPrune, maxSeqNoToPrune, versionValue)) {
403407
removeTombstoneUnderLock(uid);
404408
}
405409
}

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import org.elasticsearch.cluster.routing.TestShardRouting;
7878
import org.elasticsearch.common.Randomness;
7979
import org.elasticsearch.common.Strings;
80+
import org.elasticsearch.common.UUIDs;
8081
import org.elasticsearch.common.bytes.BytesArray;
8182
import org.elasticsearch.common.bytes.BytesReference;
8283
import org.elasticsearch.common.collect.Tuple;
@@ -165,6 +166,8 @@
165166
import static org.hamcrest.CoreMatchers.instanceOf;
166167
import static org.hamcrest.CoreMatchers.sameInstance;
167168
import static org.hamcrest.Matchers.contains;
169+
import static org.hamcrest.Matchers.containsInAnyOrder;
170+
import static org.hamcrest.Matchers.empty;
168171
import static org.hamcrest.Matchers.equalTo;
169172
import static org.hamcrest.Matchers.everyItem;
170173
import static org.hamcrest.Matchers.greaterThan;
@@ -175,6 +178,8 @@
175178
import static org.hamcrest.Matchers.not;
176179
import static org.hamcrest.Matchers.notNullValue;
177180
import static org.hamcrest.Matchers.nullValue;
181+
import static org.mockito.Mockito.spy;
182+
import static org.mockito.Mockito.when;
178183

179184
public class InternalEngineTests extends EngineTestCase {
180185

@@ -4633,4 +4638,65 @@ public void testStressUpdateSameDocWhileGettingIt() throws IOException, Interrup
46334638
}
46344639
}
46354640
}
4641+
4642+
public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception {
4643+
final AtomicLong clock = new AtomicLong(0);
4644+
threadPool = spy(threadPool);
4645+
when(threadPool.relativeTimeInMillis()).thenAnswer(invocation -> clock.get());
4646+
final EngineConfig config = engine.config();
4647+
final long gcInterval = randomIntBetween(0, 10);
4648+
final IndexSettings indexSettings = engine.config().getIndexSettings();
4649+
final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData())
4650+
.settings(Settings.builder().put(indexSettings.getSettings())
4651+
.put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), TimeValue.timeValueMillis(gcInterval).getStringRep())).build();
4652+
indexSettings.updateIndexMetaData(indexMetaData);
4653+
4654+
try (Store store = createStore();
4655+
InternalEngine engine = createEngine(new EngineConfig(config.getShardId(), config.getAllocationId(), threadPool,
4656+
indexSettings, config.getWarmer(), store, config.getMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
4657+
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
4658+
config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(),
4659+
config.getIndexSort(), config.getTranslogRecoveryRunner(), config.getCircuitBreakerService(),
4660+
config.getGlobalCheckpointSupplier()))) {
4661+
engine.config().setEnableGcDeletes(false);
4662+
for (int i = 0, docs = scaledRandomIntBetween(0, 10); i < docs; i++) {
4663+
index(engine, i);
4664+
}
4665+
final long deleteBatch = between(10, 20);
4666+
final long gapSeqNo = randomLongBetween(
4667+
engine.getLocalCheckpointTracker().getMaxSeqNo() + 1, engine.getLocalCheckpointTracker().getMaxSeqNo() + deleteBatch);
4668+
for (int i = 0; i < deleteBatch; i++) {
4669+
final long seqno = engine.getLocalCheckpointTracker().generateSeqNo();
4670+
if (seqno != gapSeqNo) {
4671+
if (randomBoolean()) {
4672+
clock.incrementAndGet();
4673+
}
4674+
engine.delete(replicaDeleteForDoc(UUIDs.randomBase64UUID(), 1, seqno, threadPool.relativeTimeInMillis()));
4675+
}
4676+
}
4677+
List<DeleteVersionValue> tombstones = new ArrayList<>(engine.getDeletedTombstones());
4678+
engine.config().setEnableGcDeletes(true);
4679+
// Prune tombstones whose seqno < gap_seqno and timestamp < clock-gcInterval.
4680+
clock.set(randomLongBetween(gcInterval, deleteBatch + gcInterval));
4681+
engine.refresh("test");
4682+
tombstones.removeIf(v -> v.seqNo < gapSeqNo && v.time < clock.get() - gcInterval);
4683+
assertThat(engine.getDeletedTombstones(), containsInAnyOrder(tombstones.toArray()));
4684+
// Prune tombstones whose seqno at most the local checkpoint (eg. seqno < gap_seqno).
4685+
clock.set(randomLongBetween(deleteBatch + gcInterval * 4/3, 100)); // Need a margin for gcInterval/4.
4686+
engine.refresh("test");
4687+
tombstones.removeIf(v -> v.seqNo < gapSeqNo);
4688+
assertThat(engine.getDeletedTombstones(), containsInAnyOrder(tombstones.toArray()));
4689+
// Fill the seqno gap - should prune all tombstones.
4690+
clock.set(between(0, 100));
4691+
if (randomBoolean()) {
4692+
engine.index(replicaIndexForDoc(testParsedDocument("d", null, testDocumentWithTextField(), SOURCE, null), 1, gapSeqNo, false));
4693+
} else {
4694+
engine.delete(replicaDeleteForDoc(UUIDs.randomBase64UUID(), Versions.MATCH_ANY, gapSeqNo, threadPool.relativeTimeInMillis()));
4695+
}
4696+
clock.set(randomLongBetween(100 + gcInterval * 4/3, Long.MAX_VALUE)); // Need a margin for gcInterval/4.
4697+
engine.refresh("test");
4698+
assertThat(engine.getDeletedTombstones(), empty());
4699+
}
4700+
}
4701+
46364702
}

server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@
3737
import java.util.concurrent.CountDownLatch;
3838
import java.util.concurrent.atomic.AtomicBoolean;
3939
import java.util.concurrent.atomic.AtomicLong;
40-
import java.util.stream.StreamSupport;
40+
41+
import static org.hamcrest.Matchers.empty;
4142

4243
public class LiveVersionMapTests extends ESTestCase {
4344

@@ -106,14 +107,15 @@ public void testBasics() throws IOException {
106107
map.afterRefresh(randomBoolean());
107108
assertNull(map.getUnderLock(uid("test")));
108109

109-
110110
map.putUnderLock(uid("test"), new DeleteVersionValue(1,1,1,1));
111111
assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test")));
112112
map.beforeRefresh();
113113
assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test")));
114114
map.afterRefresh(randomBoolean());
115115
assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test")));
116116
map.pruneTombstones(2, 0);
117+
assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test")));
118+
map.pruneTombstones(2, 1);
117119
assertNull(map.getUnderLock(uid("test")));
118120
}
119121
}
@@ -134,8 +136,10 @@ public void testConcurrently() throws IOException, InterruptedException {
134136
CountDownLatch startGun = new CountDownLatch(numThreads);
135137
CountDownLatch done = new CountDownLatch(numThreads);
136138
int randomValuesPerThread = randomIntBetween(5000, 20000);
137-
AtomicLong clock = new AtomicLong(0);
138-
AtomicLong lastPrunedTimestamp = new AtomicLong(-1);
139+
final AtomicLong clock = new AtomicLong(0);
140+
final AtomicLong lastPrunedTimestamp = new AtomicLong(-1);
141+
final AtomicLong maxSeqNo = new AtomicLong();
142+
final AtomicLong lastPrunedSeqNo = new AtomicLong();
139143
for (int j = 0; j < threads.length; j++) {
140144
threads[j] = new Thread(() -> {
141145
startGun.countDown();
@@ -148,29 +152,31 @@ public void testConcurrently() throws IOException, InterruptedException {
148152
try {
149153
for (int i = 0; i < randomValuesPerThread; ++i) {
150154
BytesRef bytesRef = randomFrom(random(), keyList);
151-
final long clockTick = clock.get();
152155
try (Releasable r = map.acquireLock(bytesRef)) {
153156
VersionValue versionValue = values.computeIfAbsent(bytesRef,
154-
v -> new VersionValue(randomLong(), randomLong(), randomLong()));
157+
v -> new VersionValue(randomLong(), maxSeqNo.incrementAndGet(), randomLong()));
155158
boolean isDelete = versionValue instanceof DeleteVersionValue;
156159
if (isDelete) {
157160
map.removeTombstoneUnderLock(bytesRef);
158161
deletes.remove(bytesRef);
159162
}
160163
if (isDelete == false && rarely()) {
161-
versionValue = new DeleteVersionValue(versionValue.version + 1, versionValue.seqNo + 1,
164+
versionValue = new DeleteVersionValue(versionValue.version + 1, maxSeqNo.incrementAndGet(),
162165
versionValue.term, clock.getAndIncrement());
163166
deletes.put(bytesRef, (DeleteVersionValue) versionValue);
164167
} else {
165-
versionValue = new VersionValue(versionValue.version + 1, versionValue.seqNo + 1, versionValue.term);
168+
versionValue = new VersionValue(versionValue.version + 1, maxSeqNo.incrementAndGet(), versionValue.term);
166169
}
167170
values.put(bytesRef, versionValue);
168171
map.putUnderLock(bytesRef, versionValue);
169172
}
170173
if (rarely()) {
171-
map.pruneTombstones(clockTick, 0);
172-
// timestamp we pruned the deletes
173-
lastPrunedTimestamp.updateAndGet(prev -> Math.max(clockTick, prev)); // make sure we track the latest
174+
final long pruneSeqNo = randomLongBetween(0, maxSeqNo.get());
175+
final long clockTick = randomLongBetween(0, clock.get());
176+
map.pruneTombstones(clockTick, pruneSeqNo);
177+
// make sure we track the latest timestamp and seqno we pruned the deletes
178+
lastPrunedTimestamp.updateAndGet(prev -> Math.max(clockTick, prev));
179+
lastPrunedSeqNo.updateAndGet(prev -> Math.max(pruneSeqNo, prev));
174180
}
175181
}
176182
} finally {
@@ -234,15 +240,17 @@ public void testConcurrently() throws IOException, InterruptedException {
234240
VersionValue value = map.getUnderLock(e.getKey());
235241
// here we keep track of the deletes and ensure that all deletes that are not visible anymore ie. not in the map
236242
// have a timestamp that is smaller or equal to the maximum timestamp that we pruned on
243+
final DeleteVersionValue delete = e.getValue();
237244
if (value == null) {
238-
assertTrue(e.getValue().time + " > " + lastPrunedTimestamp.get(), e.getValue().time <= lastPrunedTimestamp.get());
245+
assertTrue(delete.time + " > " + lastPrunedTimestamp.get() + "," + delete.seqNo + " > " + lastPrunedSeqNo.get(),
246+
delete.time <= lastPrunedTimestamp.get() && delete.seqNo <= lastPrunedSeqNo.get());
239247
} else {
240-
assertEquals(value, e.getValue());
248+
assertEquals(value, delete);
241249
}
242250
}
243251
});
244-
map.pruneTombstones(clock.incrementAndGet(), 0);
245-
assertEquals(0, StreamSupport.stream(map.getAllTombstones().entrySet().spliterator(), false).count());
252+
map.pruneTombstones(clock.incrementAndGet(), maxSeqNo.get());
253+
assertThat(map.getAllTombstones().entrySet(), empty());
246254
}
247255

248256
public void testCarryOnSafeAccess() throws IOException {

0 commit comments

Comments
 (0)