diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 10f8741ecccb6..ae4ae78c03386 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -22,16 +22,12 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteResponse; -import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; @@ -46,7 +42,6 @@ import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; -import org.apache.logging.log4j.core.pattern.ConverterKeys; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -302,15 +297,21 @@ private void maybeFinish() { } void run() { - // we either respond immediately ie. if we we don't fsync per request or wait for refresh - // OR we got an pass async operations on and wait for them to return to respond. - indexShard.maybeFlush(); - maybeFinish(); // decrement the pendingOpts by one, if there is nothing else to do we just respond with success. + /* + * We either respond immediately (i.e., if we do not fsync per request or wait for + * refresh), or we there are past async operations and we wait for them to return to + * respond. + */ + indexShard.afterWriteOperation(); + // decrement pending by one, if there is nothing else to do we just respond with success + maybeFinish(); if (waitUntilRefresh) { assert pendingOps.get() > 0; indexShard.addRefreshListener(location, forcedRefresh -> { if (forcedRefresh) { - logger.warn("block_until_refresh request ran out of slots and forced a refresh: [{}]", request); + logger.warn( + "block until refresh ran out of slots and forced a refresh: [{}]", + request); } refreshed.set(forcedRefresh); maybeFinish(); diff --git a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index badd80d5aea76..a072b68b2770d 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -125,6 +125,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING, EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING, IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, + IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY, FieldMapper.IGNORE_MALFORMED_SETTING, FieldMapper.COERCE_SETTING, diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 47c7ffb71bc85..599fea1823800 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -22,7 +22,6 @@ import org.apache.lucene.index.MergePolicy; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; @@ -112,6 +111,16 @@ public final class IndexSettings { Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic, Property.IndexScope); + /** + * The maximum size of a translog generation. This is independent of the maximum size of + * translog operations that have not been flushed. + */ + public static final Setting INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING = + Setting.byteSizeSetting( + "index.translog.generation_threshold_size", + new ByteSizeValue(64, ByteSizeUnit.MB), + new Property[]{Property.Dynamic, Property.IndexScope}); + public static final Setting INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL = Setting.timeSetting("index.seq_no.checkpoint_sync_interval", new TimeValue(30, TimeUnit.SECONDS), new TimeValue(-1, TimeUnit.MILLISECONDS), Property.Dynamic, Property.IndexScope); @@ -156,6 +165,7 @@ public final class IndexSettings { private volatile TimeValue refreshInterval; private final TimeValue globalCheckpointInterval; private volatile ByteSizeValue flushThresholdSize; + private volatile ByteSizeValue generationThresholdSize; private final MergeSchedulerConfig mergeSchedulerConfig; private final MergePolicyConfig mergePolicyConfig; private final IndexScopedSettings scopedSettings; @@ -250,6 +260,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING); globalCheckpointInterval = scopedSettings.get(INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL); flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); + generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING); mergeSchedulerConfig = new MergeSchedulerConfig(this); gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis(); warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING); @@ -281,6 +292,9 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(INDEX_WARMER_ENABLED_SETTING, this::setEnableWarmer); scopedSettings.addSettingsUpdateConsumer(INDEX_GC_DELETES_SETTING, this::setGCDeletes); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, this::setTranslogFlushThresholdSize); + scopedSettings.addSettingsUpdateConsumer( + INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, + this::setGenerationThresholdSize); scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval); scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners); scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll); @@ -290,6 +304,10 @@ private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) { this.flushThresholdSize = byteSizeValue; } + private void setGenerationThresholdSize(final ByteSizeValue generationThresholdSize) { + this.generationThresholdSize = generationThresholdSize; + } + private void setGCDeletes(TimeValue timeValue) { this.gcDeletesInMillis = timeValue.getMillis(); } @@ -461,6 +479,19 @@ public TimeValue getGlobalCheckpointInterval() { */ public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; } + /** + * Returns the generation threshold size. As sequence numbers can cause multiple generations to + * be preserved for rollback purposes, we want to keep the size of individual generations from + * growing too large to avoid excessive disk space consumption. Therefore, the translog is + * automatically rolled to a new generation when the current generation exceeds this generation + * threshold size. + * + * @return the generation threshold size + */ + public ByteSizeValue getGenerationThresholdSize() { + return generationThresholdSize; + } + /** * Returns the {@link MergeSchedulerConfig} */ diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 0e6054deccd0f..32d3d4d4bf8ee 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -771,27 +771,44 @@ public Engine.SyncedFlushResult syncFlush(String syncId, Engine.CommitId expecte return engine.syncFlush(syncId, expectedCommitId); } - public Engine.CommitId flush(FlushRequest request) throws ElasticsearchException { - boolean waitIfOngoing = request.waitIfOngoing(); - boolean force = request.force(); - if (logger.isTraceEnabled()) { - logger.trace("flush with {}", request); - } - // we allows flush while recovering, since we allow for operations to happen - // while recovering, and we want to keep the translog at bay (up to deletes, which - // we don't gc). Yet, we don't use flush internally to clear deletes and flush the indexwriter since - // we use #writeIndexingBuffer for this now. + /** + * Executes the given flush request against the engine. + * + * @param request the flush request + * @return the commit ID + */ + public Engine.CommitId flush(FlushRequest request) { + final boolean waitIfOngoing = request.waitIfOngoing(); + final boolean force = request.force(); + logger.trace("flush with {}", request); + /* + * We allow flushes while recovery since we allow operations to happen while recovering and + * we want to keep the translog under control (up to deletes, which we do not GC). Yet, we + * do not use flush internally to clear deletes and flush the index writer since we use + * Engine#writeIndexingBuffer for this now. + */ verifyNotClosed(); - Engine engine = getEngine(); + final Engine engine = getEngine(); if (engine.isRecovering()) { - throw new IllegalIndexShardStateException(shardId(), state, "flush is only allowed if the engine is not recovery" + - " from translog"); + throw new IllegalIndexShardStateException( + shardId(), + state, + "flush is only allowed if the engine is not recovery from translog"); } - long time = System.nanoTime(); - Engine.CommitId commitId = engine.flush(force, waitIfOngoing); + final long time = System.nanoTime(); + final Engine.CommitId commitId = engine.flush(force, waitIfOngoing); flushMetric.inc(System.nanoTime() - time); return commitId; + } + /** + * Rolls the tranlog generation. + * + * @throws IOException if any file operations on the translog throw an I/O exception + */ + private void rollTranslogGeneration() throws IOException { + final Engine engine = getEngine(); + engine.getTranslog().rollGeneration(); } public void forceMerge(ForceMergeRequest forceMerge) throws IOException { @@ -1256,17 +1273,39 @@ public boolean restoreFromRepository(Repository repository) { } /** - * Returns true iff this shard needs to be flushed due to too many translog operation or a too large transaction log. - * Otherwise false. + * Tests whether or not the translog should be flushed. This test is based on the current size + * of the translog comparted to the configured flush threshold size. + * + * @return {@code true} if the translog should be flushed */ boolean shouldFlush() { - Engine engine = getEngineOrNull(); + final Engine engine = getEngineOrNull(); if (engine != null) { try { - Translog translog = engine.getTranslog(); - return translog.sizeInBytes() > indexSettings.getFlushThresholdSize().getBytes(); - } catch (AlreadyClosedException ex) { - // that's fine we are already close - no need to flush + final Translog translog = engine.getTranslog(); + return translog.shouldFlush(); + } catch (final AlreadyClosedException e) { + // we are already closed, no need to flush or roll + } + } + return false; + } + + /** + * Tests whether or not the translog generation should be rolled to a new generation. This test + * is based on the size of the current generation compared to the configured generation + * threshold size. + * + * @return {@code true} if the current generation should be rolled to a new generation + */ + boolean shouldRollTranslogGeneration() { + final Engine engine = getEngineOrNull(); + if (engine != null) { + try { + final Translog translog = engine.getTranslog(); + return translog.shouldRollGeneration(); + } catch (final AlreadyClosedException e) { + // we are already closed, no need to flush or roll } } return false; @@ -1810,28 +1849,31 @@ public Translog.Durability getTranslogDurability() { return indexSettings.getTranslogDurability(); } - private final AtomicBoolean asyncFlushRunning = new AtomicBoolean(); + // we can not protect with a lock since we "release" on a different thread + private final AtomicBoolean flushOrRollRunning = new AtomicBoolean(); /** - * Schedules a flush if needed but won't schedule more than one flush concurrently. The flush will be executed on the - * Flush thread-pool asynchronously. - * - * @return true if a new flush is scheduled otherwise false. + * Schedules a flush or translog generation roll if needed but will not schedule more than one + * concurrently. The operation will be executed asynchronously on the flush thread pool. */ - public boolean maybeFlush() { - if (shouldFlush()) { - if (asyncFlushRunning.compareAndSet(false, true)) { // we can't use a lock here since we "release" in a different thread - if (shouldFlush() == false) { - // we have to check again since otherwise there is a race when a thread passes - // the first shouldFlush() check next to another thread which flushes fast enough - // to finish before the current thread could flip the asyncFlushRunning flag. - // in that situation we have an extra unexpected flush. - asyncFlushRunning.compareAndSet(true, false); - } else { + public void afterWriteOperation() { + if (shouldFlush() || shouldRollTranslogGeneration()) { + if (flushOrRollRunning.compareAndSet(false, true)) { + /* + * We have to check again since otherwise there is a race when a thread passes the + * first check next to another thread which performs the operation quickly enough to + * finish before the current thread could flip the flag. In that situation, we have + * an extra operation. + * + * Additionally, a flush implicitly executes a translog generation roll so if we + * execute a flush then we do not need to check if we should roll the translog + * generation. + */ + if (shouldFlush()) { logger.debug("submitting async flush request"); - final AbstractRunnable abstractRunnable = new AbstractRunnable() { + final AbstractRunnable flush = new AbstractRunnable() { @Override - public void onFailure(Exception e) { + public void onFailure(final Exception e) { if (state != IndexShardState.CLOSED) { logger.warn("failed to flush index", e); } @@ -1844,16 +1886,38 @@ protected void doRun() throws Exception { @Override public void onAfter() { - asyncFlushRunning.compareAndSet(true, false); - maybeFlush(); // fire a flush up again if we have filled up the limits such that shouldFlush() returns true + flushOrRollRunning.compareAndSet(true, false); + afterWriteOperation(); } }; - threadPool.executor(ThreadPool.Names.FLUSH).execute(abstractRunnable); - return true; + threadPool.executor(ThreadPool.Names.FLUSH).execute(flush); + } else if (shouldRollTranslogGeneration()) { + logger.debug("submitting async roll translog generation request"); + final AbstractRunnable roll = new AbstractRunnable() { + @Override + public void onFailure(final Exception e) { + if (state != IndexShardState.CLOSED) { + logger.warn("failed to roll translog generation", e); + } + } + + @Override + protected void doRun() throws Exception { + rollTranslogGeneration(); + } + + @Override + public void onAfter() { + flushOrRollRunning.compareAndSet(true, false); + afterWriteOperation(); + } + }; + threadPool.executor(ThreadPool.Names.FLUSH).execute(roll); + } else { + flushOrRollRunning.compareAndSet(true, false); } } } - return false; } /** diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index ee4d0a4391a23..d9a8cc408f822 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.util.BigArrays; @@ -55,6 +56,7 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.List; +import java.util.Locale; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -329,7 +331,7 @@ public Path location() { * Returns the generation of the current transaction log. */ public long currentFileGeneration() { - try (ReleasableLock lock = readLock.acquire()) { + try (ReleasableLock ignored = readLock.acquire()) { return current.getGeneration(); } } @@ -409,10 +411,9 @@ TranslogWriter createWriter(long fileGeneration) throws IOException { public Location add(final Operation operation) throws IOException { final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); try { - final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out); final long start = out.position(); out.skip(Integer.BYTES); - writeOperationNoSize(checksumStreamOutput, operation); + writeOperationNoSize(new BufferedChecksumStreamOutput(out), operation); final long end = out.position(); final int operationSize = (int) (end - Integer.BYTES - start); out.seek(start); @@ -442,6 +443,30 @@ public Location add(final Operation operation) throws IOException { } } + /** + * Tests whether or not the translog should be flushed. This test is based on the current size + * of the translog comparted to the configured flush threshold size. + * + * @return {@code true} if the translog should be flushed + */ + public boolean shouldFlush() { + final long size = this.sizeInBytes(); + return size > this.indexSettings.getFlushThresholdSize().getBytes(); + } + + /** + * Tests whether or not the translog generation should be rolled to a new generation. This test + * is based on the size of the current generation compared to the configured generation + * threshold size. + * + * @return {@code true} if the current generation should be rolled to a new generation + */ + public boolean shouldRollGeneration() { + final long size = this.current.sizeInBytes(); + final long threshold = this.indexSettings.getGenerationThresholdSize().getBytes(); + return size > threshold; + } + /** * The a {@linkplain Location} that will sort after the {@linkplain Location} returned by the last write but before any locations which * can be returned by the next write. @@ -1322,44 +1347,63 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl out.writeInt((int) checksum); } + /** + * Roll the current translog generation into a new generation. This does not commit the + * translog. + * + * @throws IOException if an I/O exception occurred during any file operations + */ + public void rollGeneration() throws IOException { + try (Releasable ignored = writeLock.acquire()) { + try { + final TranslogReader reader = current.closeIntoReader(); + readers.add(reader); + final Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME); + assert Checkpoint.read(checkpoint).generation == current.getGeneration(); + final Path generationCheckpoint = + location.resolve(getCommitCheckpointFileName(current.getGeneration())); + Files.copy(checkpoint, generationCheckpoint); + IOUtils.fsync(generationCheckpoint, false); + IOUtils.fsync(generationCheckpoint.getParent(), true); + // create a new translog file; this will sync it and update the checkpoint data; + current = createWriter(current.getGeneration() + 1); + logger.trace("current translog set to [{}]", current.getGeneration()); + } catch (final Exception e) { + IOUtils.closeWhileHandlingException(this); // tragic event + throw e; + } + } + } + @Override public long prepareCommit() throws IOException { - try (ReleasableLock lock = writeLock.acquire()) { + try (ReleasableLock ignored = writeLock.acquire()) { ensureOpen(); if (currentCommittingGeneration != NOT_SET_GENERATION) { - throw new IllegalStateException("already committing a translog with generation: " + currentCommittingGeneration); + final String message = String.format( + Locale.ROOT, + "already committing a translog with generation [%d]", + currentCommittingGeneration); + throw new IllegalStateException(message); } currentCommittingGeneration = current.getGeneration(); - TranslogReader currentCommittingTranslog = current.closeIntoReader(); - readers.add(currentCommittingTranslog); - Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME); - assert Checkpoint.read(checkpoint).generation == currentCommittingTranslog.getGeneration(); - Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(currentCommittingTranslog.getGeneration())); - Files.copy(checkpoint, commitCheckpoint); - IOUtils.fsync(commitCheckpoint, false); - IOUtils.fsync(commitCheckpoint.getParent(), true); - // create a new translog file - this will sync it and update the checkpoint data; - current = createWriter(current.getGeneration() + 1); - logger.trace("current translog set to [{}]", current.getGeneration()); - - } catch (Exception e) { - IOUtils.closeWhileHandlingException(this); // tragic event - throw e; + rollGeneration(); } - return 0L; + return 0; } @Override public long commit() throws IOException { - try (ReleasableLock lock = writeLock.acquire()) { + try (ReleasableLock ignored = writeLock.acquire()) { ensureOpen(); if (currentCommittingGeneration == NOT_SET_GENERATION) { prepareCommit(); } assert currentCommittingGeneration != NOT_SET_GENERATION; - assert readers.stream().filter(r -> r.getGeneration() == currentCommittingGeneration).findFirst().isPresent() - : "reader list doesn't contain committing generation [" + currentCommittingGeneration + "]"; - lastCommittedTranslogFileGeneration = current.getGeneration(); // this is important - otherwise old files will not be cleaned up + assert readers.stream().anyMatch(r -> r.getGeneration() == currentCommittingGeneration) + : "readers missing committing generation [" + currentCommittingGeneration + "]"; + // set the last committed generation otherwise old files will not be cleaned up + lastCommittedTranslogFileGeneration = currentCommittingGeneration + 1; currentCommittingGeneration = NOT_SET_GENERATION; trimUnreferencedReaders(); } diff --git a/core/src/test/java/org/elasticsearch/index/IndexSettingsTests.java b/core/src/test/java/org/elasticsearch/index/IndexSettingsTests.java index a32d076272ba6..bc3ee4b5f06f1 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexSettingsTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexSettingsTests.java @@ -370,6 +370,27 @@ public void testTranslogFlushSizeThreshold() { assertEquals(actualNewTranslogFlushThresholdSize, settings.getFlushThresholdSize()); } + public void testTranslogGenerationSizeThreshold() { + final ByteSizeValue size = new ByteSizeValue(Math.abs(randomInt())); + final String key = IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey(); + final ByteSizeValue actualValue = + ByteSizeValue.parseBytesSizeValue(size.toString(), key); + final IndexMetaData metaData = + newIndexMeta( + "index", + Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(key, size.toString()) + .build()); + final IndexSettings settings = new IndexSettings(metaData, Settings.EMPTY); + assertEquals(actualValue, settings.getGenerationThresholdSize()); + final ByteSizeValue newSize = new ByteSizeValue(Math.abs(randomInt())); + final ByteSizeValue actual = ByteSizeValue.parseBytesSizeValue(newSize.toString(), key); + settings.updateIndexMetaData( + newIndexMeta("index", Settings.builder().put(key, newSize.toString()).build())); + assertEquals(actual, settings.getGenerationThresholdSize()); + } + public void testArchiveBrokenIndexSettings() { Settings settings = IndexScopedSettings.DEFAULT_SCOPED_SETTINGS.archiveUnknownOrInvalidSettings( diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 97c96c8af12f7..ff5556089d28d 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -363,49 +363,104 @@ public void testMaybeFlush() throws Exception { assertEquals(0, shard.getEngine().getTranslog().totalOperations()); } - public void testStressMaybeFlush() throws Exception { + public void testMaybeRollTranslogGeneration() throws Exception { + final int generationThreshold = randomIntBetween(1, 512); + final Settings settings = + Settings + .builder() + .put("index.number_of_shards", 1) + .put("index.translog.generation_threshold_size", generationThreshold + "b") + .put() + .build(); + createIndex("test", settings); + ensureGreen("test"); + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); + final IndexService test = indicesService.indexService(resolveIndex("test")); + final IndexShard shard = test.getShardOrNull(0); + int rolls = 0; + final Translog translog = shard.getEngine().getTranslog(); + final long generation = translog.currentFileGeneration(); + for (int i = 0; i < randomIntBetween(32, 128); i++) { + assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); + final ParsedDocument doc = testParsedDocument( + "1", + "test", + null, + SequenceNumbersService.UNASSIGNED_SEQ_NO, + new ParseContext.Document(), + new BytesArray(new byte[]{1}), XContentType.JSON, null); + final Engine.Index index = new Engine.Index(new Term("_uid", doc.uid()), doc); + final Engine.IndexResult result = shard.index(index); + final Translog.Location location = result.getTranslogLocation(); + shard.afterWriteOperation(); + if (location.translogLocation + location.size > generationThreshold) { + // wait until the roll completes + assertBusy(() -> assertFalse(shard.shouldRollTranslogGeneration())); + rolls++; + assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); + } + } + } + + public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { createIndex("test"); ensureGreen(); IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService(resolveIndex("test")); final IndexShard shard = test.getShardOrNull(0); assertFalse(shard.shouldFlush()); - client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put( - IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), - new ByteSizeValue(117/* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get(); - client().prepareIndex("test", "test", "0").setSource("{}", XContentType.JSON) - .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); + final String key; + final boolean flush = randomBoolean(); + if (flush) { + key = "index.translog.flush_threshold_size"; + } else { + key = "index.translog.generation_threshold_size"; + } + // size of the operation plus header and footer + final Settings settings = Settings.builder().put(key, "117b").build(); + client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get(); + client().prepareIndex("test", "test", "0") + .setSource("{}", XContentType.JSON) + .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE) + .get(); assertFalse(shard.shouldFlush()); final AtomicBoolean running = new AtomicBoolean(true); final int numThreads = randomIntBetween(2, 4); - Thread[] threads = new Thread[numThreads]; - CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); + final Thread[] threads = new Thread[numThreads]; + final CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); for (int i = 0; i < threads.length; i++) { - threads[i] = new Thread() { - @Override - public void run() { - try { - barrier.await(); - } catch (InterruptedException | BrokenBarrierException e) { - throw new RuntimeException(e); - } - while (running.get()) { - shard.maybeFlush(); - } + threads[i] = new Thread(() -> { + try { + barrier.await(); + } catch (final InterruptedException | BrokenBarrierException e) { + throw new RuntimeException(e); + } + while (running.get()) { + shard.afterWriteOperation(); } - }; + }); threads[i].start(); } barrier.await(); - FlushStats flushStats = shard.flushStats(); - long total = flushStats.getTotal(); - client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get(); - assertBusy(() -> assertEquals(total + 1, shard.flushStats().getTotal())); + final Runnable check; + if (flush) { + final FlushStats flushStats = shard.flushStats(); + final long total = flushStats.getTotal(); + client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get(); + check = () -> assertEquals(total + 1, shard.flushStats().getTotal()); + } else { + final long generation = shard.getEngine().getTranslog().currentFileGeneration(); + client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get(); + check = () -> assertEquals( + generation + 1, + shard.getEngine().getTranslog().currentFileGeneration()); + } + assertBusy(check); running.set(false); for (int i = 0; i < threads.length; i++) { threads[i].join(); } - assertEquals(total + 1, shard.flushStats().getTotal()); + check.run(); } public void testShardHasMemoryBufferOnTranslogRecover() throws Throwable { diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index e47a5652b2431..36401deed4bc2 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -41,16 +41,18 @@ import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine.Operation.Origin; @@ -100,6 +102,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -156,12 +159,25 @@ private Translog create(Path path) throws IOException { return new Translog(getTranslogConfig(path), null, () -> globalCheckpoint.get()); } - private TranslogConfig getTranslogConfig(Path path) { - Settings build = Settings.builder() - .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) - .build(); - ByteSizeValue bufferSize = randomBoolean() ? TranslogConfig.DEFAULT_BUFFER_SIZE : new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES); - return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.getIndex(), build), BigArrays.NON_RECYCLING_INSTANCE, bufferSize); + private TranslogConfig getTranslogConfig(final Path path) { + final Settings settings = Settings + .builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) + .build(); + return getTranslogConfig(path, settings); + } + + private TranslogConfig getTranslogConfig(final Path path, final Settings settings) { + final ByteSizeValue bufferSize; + if (randomBoolean()) { + bufferSize = TranslogConfig.DEFAULT_BUFFER_SIZE; + } else { + bufferSize = new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES); + } + + final IndexSettings indexSettings = + IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings); + return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize); } protected void addToTranslogAndList(Translog translog, ArrayList list, Translog.Operation op) throws IOException { @@ -2073,4 +2089,93 @@ public void testTranslogOpSerialization() throws Exception { Translog.Delete serializedDelete = new Translog.Delete(in); assertEquals(delete, serializedDelete); } + + public void testRollGeneration() throws IOException { + final long generation = translog.currentFileGeneration(); + final int rolls = randomIntBetween(1, 16); + int totalOperations = 0; + int seqNo = 0; + for (int i = 0; i < rolls; i++) { + final int operations = randomIntBetween(1, 128); + for (int j = 0; j < operations; j++) { + translog.add(new Translog.NoOp(seqNo++, 0, "test")); + totalOperations++; + } + try (ReleasableLock ignored = translog.writeLock.acquire()) { + translog.rollGeneration(); + } + assertThat(translog.currentFileGeneration(), equalTo(generation + i + 1)); + assertThat(translog.totalOperations(), equalTo(totalOperations)); + } + for (int i = 0; i <= rolls; i++) { + assertFileIsPresent(translog, generation + i); + } + translog.commit(); + assertThat(translog.currentFileGeneration(), equalTo(generation + rolls + 1)); + assertThat(translog.totalOperations(), equalTo(0)); + for (int i = 0; i <= rolls; i++) { + assertFileDeleted(translog, generation + i); + } + assertFileIsPresent(translog, generation + rolls + 1); + } + + public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException { + final long generation = translog.currentFileGeneration(); + int seqNo = 0; + + final int rollsBefore = randomIntBetween(0, 16); + for (int r = 1; r <= rollsBefore; r++) { + final int operationsBefore = randomIntBetween(1, 256); + for (int i = 0; i < operationsBefore; i++) { + translog.add(new Translog.NoOp(seqNo++, 0, "test")); + } + + try (Releasable ignored = translog.writeLock.acquire()) { + translog.rollGeneration(); + } + + assertThat(translog.currentFileGeneration(), equalTo(generation + r)); + for (int i = 0; i <= r; i++) { + assertFileIsPresent(translog, generation + r); + } + } + + assertThat(translog.currentFileGeneration(), equalTo(generation + rollsBefore)); + translog.prepareCommit(); + assertThat(translog.currentFileGeneration(), equalTo(generation + rollsBefore + 1)); + + for (int i = 0; i <= rollsBefore + 1; i++) { + assertFileIsPresent(translog, generation + i); + } + + final int rollsBetween = randomIntBetween(0, 16); + for (int r = 1; r <= rollsBetween; r++) { + final int operationsBetween = randomIntBetween(1, 256); + for (int i = 0; i < operationsBetween; i++) { + translog.add(new Translog.NoOp(seqNo++, 0, "test")); + } + + try (Releasable ignored = translog.writeLock.acquire()) { + translog.rollGeneration(); + } + + assertThat( + translog.currentFileGeneration(), + equalTo(generation + rollsBefore + 1 + r)); + for (int i = 0; i <= rollsBefore + 1 + r; i++) { + assertFileIsPresent(translog, generation + i); + } + } + + translog.commit(); + + for (int i = 0; i <= rollsBefore; i++) { + assertFileDeleted(translog, generation + i); + } + for (int i = rollsBefore + 1; i <= rollsBefore + 1 + rollsBetween; i++) { + assertFileIsPresent(translog, generation + i); + } + + } + } diff --git a/core/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java b/core/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java index ae6b4588271b4..762d409b6b75e 100644 --- a/core/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java +++ b/core/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java @@ -93,7 +93,11 @@ public void testResetDefault() { .admin() .indices() .prepareUpdateSettings("test") - .setSettings(Settings.builder().put("index.refresh_interval", -1).put("index.translog.flush_threshold_size", "1024b")) + .setSettings( + Settings.builder() + .put("index.refresh_interval", -1) + .put("index.translog.flush_threshold_size", "1024b") + .put("index.translog.generation_threshold_size", "4096b")) .execute() .actionGet(); IndexMetaData indexMetaData = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test"); @@ -103,6 +107,7 @@ public void testResetDefault() { if (indexService != null) { assertEquals(indexService.getIndexSettings().getRefreshInterval().millis(), -1); assertEquals(indexService.getIndexSettings().getFlushThresholdSize().getBytes(), 1024); + assertEquals(indexService.getIndexSettings().getGenerationThresholdSize().getBytes(), 4096); } } client() @@ -119,6 +124,7 @@ public void testResetDefault() { if (indexService != null) { assertEquals(indexService.getIndexSettings().getRefreshInterval().millis(), 1000); assertEquals(indexService.getIndexSettings().getFlushThresholdSize().getBytes(), 1024); + assertEquals(indexService.getIndexSettings().getGenerationThresholdSize().getBytes(), 4096); } } }