diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MaxDocsLimitIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MaxDocsLimitIT.java new file mode 100644 index 0000000000000..700e1488cf695 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MaxDocsLimitIT.java @@ -0,0 +1,189 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.IndexWriterMaxDocsChanger; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.plugins.EnginePlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; +import org.junit.After; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class MaxDocsLimitIT extends ESIntegTestCase { + + private static final AtomicInteger maxDocs = new AtomicInteger(); + + public static class TestEnginePlugin extends Plugin implements EnginePlugin { + @Override + public Optional getEngineFactory(IndexSettings indexSettings) { + return Optional.of(config -> { + assert maxDocs.get() > 0 : "maxDocs is unset"; + return EngineTestCase.createEngine(config, maxDocs.get()); + }); + } + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + @Override + protected Collection> nodePlugins() { + List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(TestEnginePlugin.class); + return plugins; + } + + @Before + public void setMaxDocs() { + maxDocs.set(randomIntBetween(10, 100)); // Do not set this too low as we can fail to write the cluster state + IndexWriterMaxDocsChanger.setMaxDocs(maxDocs.get()); + } + + @After + public void restoreMaxDocs() { + IndexWriterMaxDocsChanger.restoreMaxDocs(); + } + + public void testMaxDocsLimit() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(1); + assertAcked(client().admin().indices().prepareCreate("test") + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST))); + IndexingResult indexingResult = indexDocs(maxDocs.get(), 1); + assertThat(indexingResult.numSuccess, equalTo(maxDocs.get())); + assertThat(indexingResult.numFailures, equalTo(0)); + int rejectedRequests = between(1, 10); + indexingResult = indexDocs(rejectedRequests, between(1, 8)); + assertThat(indexingResult.numFailures, equalTo(rejectedRequests)); + assertThat(indexingResult.numSuccess, equalTo(0)); + final IllegalArgumentException deleteError = expectThrows(IllegalArgumentException.class, + () -> client().prepareDelete("test", "any-id").get()); + assertThat(deleteError.getMessage(), containsString("Number of documents in the index can't exceed [" + maxDocs.get() + "]")); + client().admin().indices().prepareRefresh("test").get(); + SearchResponse searchResponse = client().prepareSearch("test").setQuery(new MatchAllQueryBuilder()) + .setTrackTotalHitsUpTo(Integer.MAX_VALUE).setSize(0).get(); + ElasticsearchAssertions.assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) maxDocs.get())); + if (randomBoolean()) { + client().admin().indices().prepareFlush("test").get(); + } + internalCluster().fullRestart(); + internalCluster().ensureAtLeastNumDataNodes(2); + ensureGreen("test"); + searchResponse = client().prepareSearch("test").setQuery(new MatchAllQueryBuilder()) + .setTrackTotalHitsUpTo(Integer.MAX_VALUE).setSize(0).get(); + ElasticsearchAssertions.assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) maxDocs.get())); + } + + public void testMaxDocsLimitConcurrently() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(1); + assertAcked(client().admin().indices().prepareCreate("test") + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1))); + IndexingResult indexingResult = indexDocs(between(maxDocs.get() + 1, maxDocs.get() * 2), between(2, 8)); + assertThat(indexingResult.numFailures, greaterThan(0)); + assertThat(indexingResult.numSuccess, both(greaterThan(0)).and(lessThanOrEqualTo(maxDocs.get()))); + client().admin().indices().prepareRefresh("test").get(); + SearchResponse searchResponse = client().prepareSearch("test").setQuery(new MatchAllQueryBuilder()) + .setTrackTotalHitsUpTo(Integer.MAX_VALUE).setSize(0).get(); + ElasticsearchAssertions.assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) indexingResult.numSuccess)); + int totalSuccess = indexingResult.numSuccess; + while (totalSuccess < maxDocs.get()) { + indexingResult = indexDocs(between(1, 10), between(1, 8)); + assertThat(indexingResult.numSuccess, greaterThan(0)); + totalSuccess += indexingResult.numSuccess; + } + if (randomBoolean()) { + indexingResult = indexDocs(between(1, 10), between(1, 8)); + assertThat(indexingResult.numSuccess, equalTo(0)); + } + client().admin().indices().prepareRefresh("test").get(); + searchResponse = client().prepareSearch("test").setQuery(new MatchAllQueryBuilder()) + .setTrackTotalHitsUpTo(Integer.MAX_VALUE).setSize(0).get(); + ElasticsearchAssertions.assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) totalSuccess)); + } + + static final class IndexingResult { + final int numSuccess; + final int numFailures; + + IndexingResult(int numSuccess, int numFailures) { + this.numSuccess = numSuccess; + this.numFailures = numFailures; + } + } + + static IndexingResult indexDocs(int numRequests, int numThreads) throws Exception { + final AtomicInteger completedRequests = new AtomicInteger(); + final AtomicInteger numSuccess = new AtomicInteger(); + final AtomicInteger numFailure = new AtomicInteger(); + Thread[] indexers = new Thread[numThreads]; + Phaser phaser = new Phaser(indexers.length); + for (int i = 0; i < indexers.length; i++) { + indexers[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + while (completedRequests.incrementAndGet() <= numRequests) { + try { + final IndexResponse resp = client().prepareIndex("test").setSource("{}", XContentType.JSON).get(); + numSuccess.incrementAndGet(); + assertThat(resp.status(), equalTo(RestStatus.CREATED)); + } catch (IllegalArgumentException e) { + numFailure.incrementAndGet(); + assertThat(e.getMessage(), containsString("Number of documents in the index can't exceed [" + maxDocs.get() + "]")); + } + } + }); + indexers[i].start(); + } + for (Thread indexer : indexers) { + indexer.join(); + } + internalCluster().assertNoInFlightDocsInEngine(); + return new IndexingResult(numSuccess.get(), numFailure.get()); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 336e60a3ce427..db5ac0db99da8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -176,6 +176,18 @@ public class InternalEngine extends Engine { private final KeyedLock noOpKeyedLock = new KeyedLock<>(); private final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false); + /** + * If multiple writes passed {@link InternalEngine#tryAcquireInFlightDocs(Operation, int)} but they haven't adjusted + * {@link IndexWriter#getPendingNumDocs()} yet, then IndexWriter can fail with too many documents. In this case, we have to fail + * the engine because we already generated sequence numbers for write operations; otherwise we will have gaps in sequence numbers. + * To avoid this, we keep track the number of documents that are being added to IndexWriter, and account it in + * {@link InternalEngine#tryAcquireInFlightDocs(Operation, int)}. Although we can double count some inFlight documents in IW and Engine, + * this shouldn't be an issue because it happens for a short window and we adjust the inFlightDocCount once an indexing is completed. + */ + private final AtomicLong inFlightDocCount = new AtomicLong(); + + private final int maxDocs; + @Nullable private final String historyUUID; @@ -186,13 +198,12 @@ public class InternalEngine extends Engine { private volatile String forceMergeUUID; public InternalEngine(EngineConfig engineConfig) { - this(engineConfig, LocalCheckpointTracker::new); + this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new); } - InternalEngine( - final EngineConfig engineConfig, - final BiFunction localCheckpointTrackerSupplier) { + InternalEngine(EngineConfig engineConfig, int maxDocs, BiFunction localCheckpointTrackerSupplier) { super(engineConfig); + this.maxDocs = maxDocs; final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(); store.incRef(); IndexWriter writer = null; @@ -835,6 +846,7 @@ public IndexResult index(Index index) throws IOException { try (ReleasableLock releasableLock = readLock.acquire()) { ensureOpen(); assert assertIncomingSequenceNumber(index.origin(), index.seqNo()); + int reservedDocs = 0; try (Releasable ignored = versionMap.acquireLock(index.uid().bytes()); Releasable indexThrottle = doThrottle ? throttle.acquireThrottle() : () -> {}) { lastWriteNanos = index.startTime(); @@ -865,9 +877,11 @@ public IndexResult index(Index index) throws IOException { * or calls updateDocument. */ final IndexingStrategy plan = indexingStrategyForOperation(index); + reservedDocs = plan.reservedDocs; final IndexResult indexResult; if (plan.earlyResultOnPreFlightError.isPresent()) { + assert index.origin() == Operation.Origin.PRIMARY : index.origin(); indexResult = plan.earlyResultOnPreFlightError.get(); assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType(); } else { @@ -922,6 +936,8 @@ public IndexResult index(Index index) throws IOException { indexResult.setTook(System.nanoTime() - index.startTime()); indexResult.freeze(); return indexResult; + } finally { + releaseInFlightDocs(reservedDocs); } } catch (RuntimeException | IOException e) { try { @@ -960,14 +976,14 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO } else if (maxSeqNoOfUpdatesOrDeletes <= localCheckpointTracker.getProcessedCheckpoint()) { // see Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : index.seqNo() + ">=" + maxSeqNoOfUpdatesOrDeletes; - plan = IndexingStrategy.optimizedAppendOnly(index.version()); + plan = IndexingStrategy.optimizedAppendOnly(index.version(), 0); } else { versionMap.enforceSafeAccess(); final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = IndexingStrategy.processAsStaleOp(index.version()); + plan = IndexingStrategy.processAsStaleOp(index.version(), 0); } else { - plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version()); + plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version(), 0); } } return plan; @@ -984,11 +1000,17 @@ protected IndexingStrategy indexingStrategyForOperation(final Index index) throw private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin(); + final int reservingDocs = index.parsedDoc().docs().size(); final IndexingStrategy plan; // resolve an external operation into an internal one which is safe to replay final boolean canOptimizeAddDocument = canOptimizeAddDocument(index); if (canOptimizeAddDocument && mayHaveBeenIndexedBefore(index) == false) { - plan = IndexingStrategy.optimizedAppendOnly(1L); + final Exception reserveError = tryAcquireInFlightDocs(index, reservingDocs); + if (reserveError != null) { + plan = IndexingStrategy.failAsTooManyDocs(reserveError); + } else { + plan = IndexingStrategy.optimizedAppendOnly(1L, reservingDocs); + } } else { versionMap.enforceSafeAccess(); // resolves incoming version @@ -1020,9 +1042,14 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { new VersionConflictEngineException(shardId, index, currentVersion, currentNotFoundOrDeleted); plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion); } else { - plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted, - canOptimizeAddDocument ? 1L : index.versionType().updateVersion(currentVersion, index.version()) - ); + final Exception reserveError = tryAcquireInFlightDocs(index, reservingDocs); + if (reserveError != null) { + plan = IndexingStrategy.failAsTooManyDocs(reserveError); + } else { + plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted, + canOptimizeAddDocument ? 1L : index.versionType().updateVersion(currentVersion, index.version()), + reservingDocs); + } } } return plan; @@ -1133,52 +1160,56 @@ protected static final class IndexingStrategy { final long versionForIndexing; final boolean indexIntoLucene; final boolean addStaleOpToLucene; + final int reservedDocs; final Optional earlyResultOnPreFlightError; private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpdateDocument, boolean indexIntoLucene, boolean addStaleOpToLucene, - long versionForIndexing, IndexResult earlyResultOnPreFlightError) { + long versionForIndexing, int reservedDocs, IndexResult earlyResultOnPreFlightError) { assert useLuceneUpdateDocument == false || indexIntoLucene : "use lucene update is set to true, but we're not indexing into lucene"; assert (indexIntoLucene && earlyResultOnPreFlightError != null) == false : "can only index into lucene or have a preflight result but not both." + "indexIntoLucene: " + indexIntoLucene + " earlyResultOnPreFlightError:" + earlyResultOnPreFlightError; + assert reservedDocs == 0 || indexIntoLucene || addStaleOpToLucene : reservedDocs; this.currentNotFoundOrDeleted = currentNotFoundOrDeleted; this.useLuceneUpdateDocument = useLuceneUpdateDocument; this.versionForIndexing = versionForIndexing; this.indexIntoLucene = indexIntoLucene; this.addStaleOpToLucene = addStaleOpToLucene; + this.reservedDocs = reservedDocs; this.earlyResultOnPreFlightError = earlyResultOnPreFlightError == null ? Optional.empty() : Optional.of(earlyResultOnPreFlightError); } - static IndexingStrategy optimizedAppendOnly(long versionForIndexing) { - return new IndexingStrategy(true, false, true, false, versionForIndexing, null); + static IndexingStrategy optimizedAppendOnly(long versionForIndexing, int reservedDocs) { + return new IndexingStrategy(true, false, true, false, versionForIndexing, reservedDocs, null); } public static IndexingStrategy skipDueToVersionConflict( VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) { final IndexResult result = new IndexResult(e, currentVersion); - return new IndexingStrategy( - currentNotFoundOrDeleted, false, false, false, - Versions.NOT_FOUND, result); + return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, Versions.NOT_FOUND, 0, result); } - static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted, - long versionForIndexing) { + static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted, long versionForIndexing, int reservedDocs) { return new IndexingStrategy(currentNotFoundOrDeleted, currentNotFoundOrDeleted == false, - true, false, versionForIndexing, null); + true, false, versionForIndexing, reservedDocs, null); } public static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long versionForIndexing) { - return new IndexingStrategy(currentNotFoundOrDeleted, false, false, - false, versionForIndexing, null); + return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, versionForIndexing, 0, null); + } + + static IndexingStrategy processAsStaleOp(long versionForIndexing, int reservedDocs) { + return new IndexingStrategy(false, false, false, true, versionForIndexing, reservedDocs, null); } - static IndexingStrategy processAsStaleOp(long versionForIndexing) { - return new IndexingStrategy(false, false, false, true, versionForIndexing, null); + static IndexingStrategy failAsTooManyDocs(Exception e) { + final IndexResult result = new IndexResult(e, Versions.NOT_FOUND); + return new IndexingStrategy(false, false, false, false, Versions.NOT_FOUND, 0, result); } } @@ -1221,13 +1252,15 @@ public DeleteResult delete(Delete delete) throws IOException { assert Objects.equals(delete.uid().field(), IdFieldMapper.NAME) : delete.uid().field(); assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo()); final DeleteResult deleteResult; + int reservedDocs = 0; // NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments: try (ReleasableLock ignored = readLock.acquire(); Releasable ignored2 = versionMap.acquireLock(delete.uid().bytes())) { ensureOpen(); lastWriteNanos = delete.startTime(); final DeletionStrategy plan = deletionStrategyForOperation(delete); - + reservedDocs = plan.reservedDocs; if (plan.earlyResultOnPreflightError.isPresent()) { + assert delete.origin() == Operation.Origin.PRIMARY : delete.origin(); deleteResult = plan.earlyResultOnPreflightError.get(); } else { // generate or register sequence number @@ -1275,11 +1308,36 @@ public DeleteResult delete(Delete delete) throws IOException { e.addSuppressed(inner); } throw e; + } finally { + releaseInFlightDocs(reservedDocs); } maybePruneDeletes(); return deleteResult; } + private Exception tryAcquireInFlightDocs(Operation operation, int addingDocs) { + assert operation.origin() == Operation.Origin.PRIMARY : operation; + assert operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO : operation; + assert addingDocs > 0 : addingDocs; + final long totalDocs = indexWriter.getPendingNumDocs() + inFlightDocCount.addAndGet(addingDocs); + if (totalDocs > maxDocs) { + releaseInFlightDocs(addingDocs); + return new IllegalArgumentException("Number of documents in the index can't exceed [" + maxDocs + "]"); + } else { + return null; + } + } + + private void releaseInFlightDocs(int numDocs) { + assert numDocs >= 0 : numDocs; + final long newValue = inFlightDocCount.addAndGet(-numDocs); + assert newValue >= 0 : "inFlightDocCount must not be negative [" + newValue + "]"; + } + + long getInFlightDocCount() { + return inFlightDocCount.get(); + } + protected DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException { if (delete.origin() == Operation.Origin.PRIMARY) { return planDeletionAsPrimary(delete); @@ -1306,7 +1364,7 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { plan = DeletionStrategy.processAsStaleOp(delete.version()); } else { - plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, delete.version()); + plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, delete.version(), 0); } } return plan; @@ -1346,7 +1404,13 @@ private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted); plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted); } else { - plan = DeletionStrategy.processNormally(currentlyDeleted, delete.versionType().updateVersion(currentVersion, delete.version())); + final Exception reserveError = tryAcquireInFlightDocs(delete, 1); + if (reserveError != null) { + plan = DeletionStrategy.failAsTooManyDocs(reserveError); + } else { + final long versionOfDeletion = delete.versionType().updateVersion(currentVersion, delete.version()); + plan = DeletionStrategy.processNormally(currentlyDeleted, versionOfDeletion, 1); + } } return plan; } @@ -1394,9 +1458,10 @@ protected static final class DeletionStrategy { final boolean currentlyDeleted; final long versionOfDeletion; final Optional earlyResultOnPreflightError; + final int reservedDocs; private DeletionStrategy(boolean deleteFromLucene, boolean addStaleOpToLucene, boolean currentlyDeleted, - long versionOfDeletion, DeleteResult earlyResultOnPreflightError) { + long versionOfDeletion, int reservedDocs, DeleteResult earlyResultOnPreflightError) { assert (deleteFromLucene && earlyResultOnPreflightError != null) == false : "can only delete from lucene or have a preflight result but not both." + "deleteFromLucene: " + deleteFromLucene @@ -1405,6 +1470,8 @@ private DeletionStrategy(boolean deleteFromLucene, boolean addStaleOpToLucene, b this.addStaleOpToLucene = addStaleOpToLucene; this.currentlyDeleted = currentlyDeleted; this.versionOfDeletion = versionOfDeletion; + this.reservedDocs = reservedDocs; + assert reservedDocs == 0 || deleteFromLucene || addStaleOpToLucene : reservedDocs; this.earlyResultOnPreflightError = earlyResultOnPreflightError == null ? Optional.empty() : Optional.of(earlyResultOnPreflightError); } @@ -1413,20 +1480,26 @@ public static DeletionStrategy skipDueToVersionConflict( VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) { final DeleteResult deleteResult = new DeleteResult(e, currentVersion, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_SEQ_NO, currentlyDeleted == false); - return new DeletionStrategy(false, false, currentlyDeleted, Versions.NOT_FOUND, deleteResult); + return new DeletionStrategy(false, false, currentlyDeleted, Versions.NOT_FOUND, 0, deleteResult); } - static DeletionStrategy processNormally(boolean currentlyDeleted, long versionOfDeletion) { - return new DeletionStrategy(true, false, currentlyDeleted, versionOfDeletion, null); + static DeletionStrategy processNormally(boolean currentlyDeleted, long versionOfDeletion, int reservedDocs) { + return new DeletionStrategy(true, false, currentlyDeleted, versionOfDeletion, reservedDocs, null); } public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, long versionOfDeletion) { - return new DeletionStrategy(false, false, currentlyDeleted, versionOfDeletion, null); + return new DeletionStrategy(false, false, currentlyDeleted, versionOfDeletion, 0, null); } static DeletionStrategy processAsStaleOp(long versionOfDeletion) { - return new DeletionStrategy(false, true, false, versionOfDeletion, null); + return new DeletionStrategy(false, true, false, versionOfDeletion, 0, null); + } + + static DeletionStrategy failAsTooManyDocs(Exception e) { + final DeleteResult deleteResult = new DeleteResult(e, Versions.NOT_FOUND, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_SEQ_NO, false); + return new DeletionStrategy(false, false, false, Versions.NOT_FOUND, 0, deleteResult); } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index ed2127d8aa16b..9e618b3010dbb 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -37,6 +37,7 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexWriterMaxDocsChanger; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; @@ -184,6 +185,7 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.elasticsearch.index.seqno.SequenceNumbers.max; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.Matchers.contains; @@ -4026,7 +4028,7 @@ public void testNoOps() throws IOException { localCheckpoint); EngineConfig noopEngineConfig = copy(engine.config(), new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, () -> new MatchAllDocsQuery(), engine.config().getMergePolicy())); - noOpEngine = new InternalEngine(noopEngineConfig, supplier) { + noOpEngine = new InternalEngine(noopEngineConfig, IndexWriter.MAX_DOCS, supplier) { @Override protected long doGenerateSeqNoForOperation(Operation operation) { throw new UnsupportedOperationException(); @@ -5991,4 +5993,62 @@ public void testProducesStoredFieldsReader() throws Exception { } } } + + public void testMaxDocsOnPrimary() throws Exception { + engine.close(); + int maxDocs = randomIntBetween(1, 100); + IndexWriterMaxDocsChanger.setMaxDocs(maxDocs); + try { + engine = new InternalTestEngine(engine.config(), maxDocs, LocalCheckpointTracker::new); + int numDocs = between(maxDocs + 1, maxDocs * 2); + List operations = new ArrayList<>(numDocs); + for (int i = 0; i < numDocs; i++) { + final String id = Integer.toString(randomInt(numDocs)); + if (randomBoolean()) { + operations.add(indexForDoc(createParsedDoc(id, null))); + } else { + operations.add(new Engine.Delete(id, newUid(id), primaryTerm.get())); + } + } + for (int i = 0; i < numDocs; i++) { + final long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo(); + final Engine.Result result = applyOperation(engine, operations.get(i)); + if (i < maxDocs) { + assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS)); + assertNull(result.getFailure()); + assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo + 1L)); + } else { + assertThat(result.getResultType(), equalTo(Engine.Result.Type.FAILURE)); + assertNotNull(result.getFailure()); + assertThat(result.getFailure().getMessage(), + containsString("Number of documents in the index can't exceed [" + maxDocs + "]")); + assertThat(result.getSeqNo(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo)); + } + assertFalse(engine.isClosed.get()); + } + } finally { + IndexWriterMaxDocsChanger.restoreMaxDocs(); + } + } + + public void testMaxDocsOnReplica() throws Exception { + engine.close(); + int maxDocs = randomIntBetween(1, 100); + IndexWriterMaxDocsChanger.setMaxDocs(maxDocs); + try { + engine = new InternalTestEngine(engine.config(), maxDocs, LocalCheckpointTracker::new); + int numDocs = between(maxDocs + 1, maxDocs * 2); + List operations = generateHistoryOnReplica(numDocs, randomBoolean(), randomBoolean(), randomBoolean()); + final IllegalArgumentException error = expectThrows(IllegalArgumentException.class, () -> { + for (Engine.Operation op : operations) { + applyOperation(engine, op); + } + }); + assertThat(error.getMessage(), containsString("number of documents in the index cannot exceed " + maxDocs)); + assertTrue(engine.isClosed.get()); + } finally { + IndexWriterMaxDocsChanger.restoreMaxDocs(); + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 63abd9f4701da..4a947181c5e70 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -267,12 +267,14 @@ public void tearDown() throws Exception { try { if (engine != null && engine.isClosed.get() == false) { engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); + assertNoInFlightDocuments(engine); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService()); assertMaxSeqNoInCommitUserData(engine); assertAtMostOneLuceneDocumentPerSequenceNumber(engine); } if (replicaEngine != null && replicaEngine.isClosed.get() == false) { replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); + assertNoInFlightDocuments(replicaEngine); assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService()); assertMaxSeqNoInCommitUserData(replicaEngine); assertAtMostOneLuceneDocumentPerSequenceNumber(replicaEngine); @@ -529,6 +531,10 @@ protected InternalEngine createEngine(@Nullable IndexWriterFactory indexWriterFa return internalEngine; } + public static InternalEngine createEngine(EngineConfig engineConfig, int maxDocs) { + return new InternalEngine(engineConfig, maxDocs, LocalCheckpointTracker::new); + } + @FunctionalInterface public interface IndexWriterFactory { @@ -566,7 +572,7 @@ protected long doGenerateSeqNoForOperation(final Operation operation) { } }; } else { - return new InternalTestEngine(config, localCheckpointTrackerSupplier) { + return new InternalTestEngine(config, IndexWriter.MAX_DOCS, localCheckpointTrackerSupplier) { @Override IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { return (indexWriterFactory != null) ? @@ -1241,4 +1247,16 @@ static long maxSeqNosInReader(DirectoryReader reader) throws IOException { public static long getNumVersionLookups(Engine engine) { return ((InternalEngine) engine).getNumVersionLookups(); } + + public static long getInFlightDocCount(Engine engine) { + if (engine instanceof InternalEngine) { + return ((InternalEngine) engine).getInFlightDocCount(); + } else { + return 0; + } + } + + public static void assertNoInFlightDocuments(Engine engine) throws Exception { + assertBusy(() -> assertThat(getInFlightDocCount(engine), equalTo(0L))); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/InternalTestEngine.java b/test/framework/src/main/java/org/elasticsearch/index/engine/InternalTestEngine.java index 8c52d57aabc39..d31fe609e6203 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/InternalTestEngine.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/InternalTestEngine.java @@ -37,8 +37,9 @@ class InternalTestEngine extends InternalEngine { super(engineConfig); } - InternalTestEngine(EngineConfig engineConfig, BiFunction localCheckpointTrackerSupplier) { - super(engineConfig, localCheckpointTrackerSupplier); + InternalTestEngine(EngineConfig engineConfig, int maxDocs, + BiFunction localCheckpointTrackerSupplier) { + super(engineConfig, maxDocs, localCheckpointTrackerSupplier); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 1a3f0a1795d24..85419c6974389 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1272,6 +1272,24 @@ public void assertConsistentHistoryBetweenTranslogAndLuceneIndex() throws IOExce } } + public void assertNoInFlightDocsInEngine() throws Exception { + assertBusy(() -> { + for (String nodeName : getNodeNames()) { + IndicesService indexServices = getInstance(IndicesService.class, nodeName); + for (IndexService indexService : indexServices) { + for (IndexShard indexShard : indexService) { + try { + final Engine engine = IndexShardTestCase.getEngine(indexShard); + assertThat(indexShard.routingEntry().toString(), EngineTestCase.getInFlightDocCount(engine), equalTo(0L)); + } catch (AlreadyClosedException ignored) { + // shard is closed + } + } + } + } + }); + } + private IndexShard getShardOrNull(ClusterState clusterState, ShardRouting shardRouting) { if (shardRouting == null || shardRouting.assignedToNode() == false) { return null; @@ -2285,9 +2303,10 @@ public void ensureEstimatedStats() { } @Override - public synchronized void assertAfterTest() throws IOException { + public synchronized void assertAfterTest() throws Exception { super.assertAfterTest(); assertRequestsFinished(); + assertNoInFlightDocsInEngine(); for (NodeAndClient nodeAndClient : nodes.values()) { NodeEnvironment env = nodeAndClient.node().getNodeEnvironment(); Set shardIds = env.lockedShards(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/TestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/TestCluster.java index e799f0eba2f4d..fabbdab0d83ab 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/TestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/TestCluster.java @@ -85,7 +85,7 @@ public void beforeIndexDeletion() throws Exception { /** * This method checks all the things that need to be checked after each test */ - public void assertAfterTest() throws IOException { + public void assertAfterTest() throws Exception { ensureEstimatedStats(); }