From b54a9e9c83287008e274d29f12353d1b2610afb9 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 27 Mar 2017 16:43:54 -0400 Subject: [PATCH] Introduce translog generation rolling This commit introduces a maximum size for a translog generation and automatically rolls the translog when a generation exceeds the threshold into a new generation. This threshold is configurable per index and defaults to sixty-four megabytes. We introduce this constraint as sequence numbers will require keeping around more than the current generation (to ensure that we can rollback to the global checkpoint). Without keeping the size of generations under control, having to keep old generations around could consume excessive disk space. A follow-up will enable commits to trim previous generations based on the global checkpoint. Relates #23606 --- .../replication/TransportWriteAction.java | 21 +-- .../common/settings/IndexScopedSettings.java | 1 + .../elasticsearch/index/IndexSettings.java | 33 +++- .../elasticsearch/index/shard/IndexShard.java | 152 +++++++++++++----- .../index/translog/Translog.java | 94 ++++++++--- .../index/IndexSettingsTests.java | 21 +++ .../index/shard/IndexShardIT.java | 105 +++++++++--- .../index/translog/TranslogTests.java | 119 +++++++++++++- .../indices/settings/UpdateSettingsIT.java | 8 +- 9 files changed, 441 insertions(+), 113 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 10f8741ecccb6..ae4ae78c03386 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -22,16 +22,12 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteResponse; -import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; @@ -46,7 +42,6 @@ import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; -import org.apache.logging.log4j.core.pattern.ConverterKeys; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -302,15 +297,21 @@ private void maybeFinish() { } void run() { - // we either respond immediately ie. if we we don't fsync per request or wait for refresh - // OR we got an pass async operations on and wait for them to return to respond. - indexShard.maybeFlush(); - maybeFinish(); // decrement the pendingOpts by one, if there is nothing else to do we just respond with success. + /* + * We either respond immediately (i.e., if we do not fsync per request or wait for + * refresh), or we there are past async operations and we wait for them to return to + * respond. + */ + indexShard.afterWriteOperation(); + // decrement pending by one, if there is nothing else to do we just respond with success + maybeFinish(); if (waitUntilRefresh) { assert pendingOps.get() > 0; indexShard.addRefreshListener(location, forcedRefresh -> { if (forcedRefresh) { - logger.warn("block_until_refresh request ran out of slots and forced a refresh: [{}]", request); + logger.warn( + "block until refresh ran out of slots and forced a refresh: [{}]", + request); } refreshed.set(forcedRefresh); maybeFinish(); diff --git a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index badd80d5aea76..a072b68b2770d 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -125,6 +125,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING, EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING, IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, + IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY, FieldMapper.IGNORE_MALFORMED_SETTING, FieldMapper.COERCE_SETTING, diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 47c7ffb71bc85..599fea1823800 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -22,7 +22,6 @@ import org.apache.lucene.index.MergePolicy; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; @@ -112,6 +111,16 @@ public final class IndexSettings { Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic, Property.IndexScope); + /** + * The maximum size of a translog generation. This is independent of the maximum size of + * translog operations that have not been flushed. + */ + public static final Setting INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING = + Setting.byteSizeSetting( + "index.translog.generation_threshold_size", + new ByteSizeValue(64, ByteSizeUnit.MB), + new Property[]{Property.Dynamic, Property.IndexScope}); + public static final Setting INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL = Setting.timeSetting("index.seq_no.checkpoint_sync_interval", new TimeValue(30, TimeUnit.SECONDS), new TimeValue(-1, TimeUnit.MILLISECONDS), Property.Dynamic, Property.IndexScope); @@ -156,6 +165,7 @@ public final class IndexSettings { private volatile TimeValue refreshInterval; private final TimeValue globalCheckpointInterval; private volatile ByteSizeValue flushThresholdSize; + private volatile ByteSizeValue generationThresholdSize; private final MergeSchedulerConfig mergeSchedulerConfig; private final MergePolicyConfig mergePolicyConfig; private final IndexScopedSettings scopedSettings; @@ -250,6 +260,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING); globalCheckpointInterval = scopedSettings.get(INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL); flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); + generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING); mergeSchedulerConfig = new MergeSchedulerConfig(this); gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis(); warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING); @@ -281,6 +292,9 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(INDEX_WARMER_ENABLED_SETTING, this::setEnableWarmer); scopedSettings.addSettingsUpdateConsumer(INDEX_GC_DELETES_SETTING, this::setGCDeletes); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, this::setTranslogFlushThresholdSize); + scopedSettings.addSettingsUpdateConsumer( + INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, + this::setGenerationThresholdSize); scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval); scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners); scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll); @@ -290,6 +304,10 @@ private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) { this.flushThresholdSize = byteSizeValue; } + private void setGenerationThresholdSize(final ByteSizeValue generationThresholdSize) { + this.generationThresholdSize = generationThresholdSize; + } + private void setGCDeletes(TimeValue timeValue) { this.gcDeletesInMillis = timeValue.getMillis(); } @@ -461,6 +479,19 @@ public TimeValue getGlobalCheckpointInterval() { */ public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; } + /** + * Returns the generation threshold size. As sequence numbers can cause multiple generations to + * be preserved for rollback purposes, we want to keep the size of individual generations from + * growing too large to avoid excessive disk space consumption. Therefore, the translog is + * automatically rolled to a new generation when the current generation exceeds this generation + * threshold size. + * + * @return the generation threshold size + */ + public ByteSizeValue getGenerationThresholdSize() { + return generationThresholdSize; + } + /** * Returns the {@link MergeSchedulerConfig} */ diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 0e6054deccd0f..32d3d4d4bf8ee 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -771,27 +771,44 @@ public Engine.SyncedFlushResult syncFlush(String syncId, Engine.CommitId expecte return engine.syncFlush(syncId, expectedCommitId); } - public Engine.CommitId flush(FlushRequest request) throws ElasticsearchException { - boolean waitIfOngoing = request.waitIfOngoing(); - boolean force = request.force(); - if (logger.isTraceEnabled()) { - logger.trace("flush with {}", request); - } - // we allows flush while recovering, since we allow for operations to happen - // while recovering, and we want to keep the translog at bay (up to deletes, which - // we don't gc). Yet, we don't use flush internally to clear deletes and flush the indexwriter since - // we use #writeIndexingBuffer for this now. + /** + * Executes the given flush request against the engine. + * + * @param request the flush request + * @return the commit ID + */ + public Engine.CommitId flush(FlushRequest request) { + final boolean waitIfOngoing = request.waitIfOngoing(); + final boolean force = request.force(); + logger.trace("flush with {}", request); + /* + * We allow flushes while recovery since we allow operations to happen while recovering and + * we want to keep the translog under control (up to deletes, which we do not GC). Yet, we + * do not use flush internally to clear deletes and flush the index writer since we use + * Engine#writeIndexingBuffer for this now. + */ verifyNotClosed(); - Engine engine = getEngine(); + final Engine engine = getEngine(); if (engine.isRecovering()) { - throw new IllegalIndexShardStateException(shardId(), state, "flush is only allowed if the engine is not recovery" + - " from translog"); + throw new IllegalIndexShardStateException( + shardId(), + state, + "flush is only allowed if the engine is not recovery from translog"); } - long time = System.nanoTime(); - Engine.CommitId commitId = engine.flush(force, waitIfOngoing); + final long time = System.nanoTime(); + final Engine.CommitId commitId = engine.flush(force, waitIfOngoing); flushMetric.inc(System.nanoTime() - time); return commitId; + } + /** + * Rolls the tranlog generation. + * + * @throws IOException if any file operations on the translog throw an I/O exception + */ + private void rollTranslogGeneration() throws IOException { + final Engine engine = getEngine(); + engine.getTranslog().rollGeneration(); } public void forceMerge(ForceMergeRequest forceMerge) throws IOException { @@ -1256,17 +1273,39 @@ public boolean restoreFromRepository(Repository repository) { } /** - * Returns true iff this shard needs to be flushed due to too many translog operation or a too large transaction log. - * Otherwise false. + * Tests whether or not the translog should be flushed. This test is based on the current size + * of the translog comparted to the configured flush threshold size. + * + * @return {@code true} if the translog should be flushed */ boolean shouldFlush() { - Engine engine = getEngineOrNull(); + final Engine engine = getEngineOrNull(); if (engine != null) { try { - Translog translog = engine.getTranslog(); - return translog.sizeInBytes() > indexSettings.getFlushThresholdSize().getBytes(); - } catch (AlreadyClosedException ex) { - // that's fine we are already close - no need to flush + final Translog translog = engine.getTranslog(); + return translog.shouldFlush(); + } catch (final AlreadyClosedException e) { + // we are already closed, no need to flush or roll + } + } + return false; + } + + /** + * Tests whether or not the translog generation should be rolled to a new generation. This test + * is based on the size of the current generation compared to the configured generation + * threshold size. + * + * @return {@code true} if the current generation should be rolled to a new generation + */ + boolean shouldRollTranslogGeneration() { + final Engine engine = getEngineOrNull(); + if (engine != null) { + try { + final Translog translog = engine.getTranslog(); + return translog.shouldRollGeneration(); + } catch (final AlreadyClosedException e) { + // we are already closed, no need to flush or roll } } return false; @@ -1810,28 +1849,31 @@ public Translog.Durability getTranslogDurability() { return indexSettings.getTranslogDurability(); } - private final AtomicBoolean asyncFlushRunning = new AtomicBoolean(); + // we can not protect with a lock since we "release" on a different thread + private final AtomicBoolean flushOrRollRunning = new AtomicBoolean(); /** - * Schedules a flush if needed but won't schedule more than one flush concurrently. The flush will be executed on the - * Flush thread-pool asynchronously. - * - * @return true if a new flush is scheduled otherwise false. + * Schedules a flush or translog generation roll if needed but will not schedule more than one + * concurrently. The operation will be executed asynchronously on the flush thread pool. */ - public boolean maybeFlush() { - if (shouldFlush()) { - if (asyncFlushRunning.compareAndSet(false, true)) { // we can't use a lock here since we "release" in a different thread - if (shouldFlush() == false) { - // we have to check again since otherwise there is a race when a thread passes - // the first shouldFlush() check next to another thread which flushes fast enough - // to finish before the current thread could flip the asyncFlushRunning flag. - // in that situation we have an extra unexpected flush. - asyncFlushRunning.compareAndSet(true, false); - } else { + public void afterWriteOperation() { + if (shouldFlush() || shouldRollTranslogGeneration()) { + if (flushOrRollRunning.compareAndSet(false, true)) { + /* + * We have to check again since otherwise there is a race when a thread passes the + * first check next to another thread which performs the operation quickly enough to + * finish before the current thread could flip the flag. In that situation, we have + * an extra operation. + * + * Additionally, a flush implicitly executes a translog generation roll so if we + * execute a flush then we do not need to check if we should roll the translog + * generation. + */ + if (shouldFlush()) { logger.debug("submitting async flush request"); - final AbstractRunnable abstractRunnable = new AbstractRunnable() { + final AbstractRunnable flush = new AbstractRunnable() { @Override - public void onFailure(Exception e) { + public void onFailure(final Exception e) { if (state != IndexShardState.CLOSED) { logger.warn("failed to flush index", e); } @@ -1844,16 +1886,38 @@ protected void doRun() throws Exception { @Override public void onAfter() { - asyncFlushRunning.compareAndSet(true, false); - maybeFlush(); // fire a flush up again if we have filled up the limits such that shouldFlush() returns true + flushOrRollRunning.compareAndSet(true, false); + afterWriteOperation(); } }; - threadPool.executor(ThreadPool.Names.FLUSH).execute(abstractRunnable); - return true; + threadPool.executor(ThreadPool.Names.FLUSH).execute(flush); + } else if (shouldRollTranslogGeneration()) { + logger.debug("submitting async roll translog generation request"); + final AbstractRunnable roll = new AbstractRunnable() { + @Override + public void onFailure(final Exception e) { + if (state != IndexShardState.CLOSED) { + logger.warn("failed to roll translog generation", e); + } + } + + @Override + protected void doRun() throws Exception { + rollTranslogGeneration(); + } + + @Override + public void onAfter() { + flushOrRollRunning.compareAndSet(true, false); + afterWriteOperation(); + } + }; + threadPool.executor(ThreadPool.Names.FLUSH).execute(roll); + } else { + flushOrRollRunning.compareAndSet(true, false); } } } - return false; } /** diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index ee4d0a4391a23..d9a8cc408f822 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.util.BigArrays; @@ -55,6 +56,7 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.List; +import java.util.Locale; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -329,7 +331,7 @@ public Path location() { * Returns the generation of the current transaction log. */ public long currentFileGeneration() { - try (ReleasableLock lock = readLock.acquire()) { + try (ReleasableLock ignored = readLock.acquire()) { return current.getGeneration(); } } @@ -409,10 +411,9 @@ TranslogWriter createWriter(long fileGeneration) throws IOException { public Location add(final Operation operation) throws IOException { final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); try { - final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out); final long start = out.position(); out.skip(Integer.BYTES); - writeOperationNoSize(checksumStreamOutput, operation); + writeOperationNoSize(new BufferedChecksumStreamOutput(out), operation); final long end = out.position(); final int operationSize = (int) (end - Integer.BYTES - start); out.seek(start); @@ -442,6 +443,30 @@ public Location add(final Operation operation) throws IOException { } } + /** + * Tests whether or not the translog should be flushed. This test is based on the current size + * of the translog comparted to the configured flush threshold size. + * + * @return {@code true} if the translog should be flushed + */ + public boolean shouldFlush() { + final long size = this.sizeInBytes(); + return size > this.indexSettings.getFlushThresholdSize().getBytes(); + } + + /** + * Tests whether or not the translog generation should be rolled to a new generation. This test + * is based on the size of the current generation compared to the configured generation + * threshold size. + * + * @return {@code true} if the current generation should be rolled to a new generation + */ + public boolean shouldRollGeneration() { + final long size = this.current.sizeInBytes(); + final long threshold = this.indexSettings.getGenerationThresholdSize().getBytes(); + return size > threshold; + } + /** * The a {@linkplain Location} that will sort after the {@linkplain Location} returned by the last write but before any locations which * can be returned by the next write. @@ -1322,44 +1347,63 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl out.writeInt((int) checksum); } + /** + * Roll the current translog generation into a new generation. This does not commit the + * translog. + * + * @throws IOException if an I/O exception occurred during any file operations + */ + public void rollGeneration() throws IOException { + try (Releasable ignored = writeLock.acquire()) { + try { + final TranslogReader reader = current.closeIntoReader(); + readers.add(reader); + final Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME); + assert Checkpoint.read(checkpoint).generation == current.getGeneration(); + final Path generationCheckpoint = + location.resolve(getCommitCheckpointFileName(current.getGeneration())); + Files.copy(checkpoint, generationCheckpoint); + IOUtils.fsync(generationCheckpoint, false); + IOUtils.fsync(generationCheckpoint.getParent(), true); + // create a new translog file; this will sync it and update the checkpoint data; + current = createWriter(current.getGeneration() + 1); + logger.trace("current translog set to [{}]", current.getGeneration()); + } catch (final Exception e) { + IOUtils.closeWhileHandlingException(this); // tragic event + throw e; + } + } + } + @Override public long prepareCommit() throws IOException { - try (ReleasableLock lock = writeLock.acquire()) { + try (ReleasableLock ignored = writeLock.acquire()) { ensureOpen(); if (currentCommittingGeneration != NOT_SET_GENERATION) { - throw new IllegalStateException("already committing a translog with generation: " + currentCommittingGeneration); + final String message = String.format( + Locale.ROOT, + "already committing a translog with generation [%d]", + currentCommittingGeneration); + throw new IllegalStateException(message); } currentCommittingGeneration = current.getGeneration(); - TranslogReader currentCommittingTranslog = current.closeIntoReader(); - readers.add(currentCommittingTranslog); - Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME); - assert Checkpoint.read(checkpoint).generation == currentCommittingTranslog.getGeneration(); - Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(currentCommittingTranslog.getGeneration())); - Files.copy(checkpoint, commitCheckpoint); - IOUtils.fsync(commitCheckpoint, false); - IOUtils.fsync(commitCheckpoint.getParent(), true); - // create a new translog file - this will sync it and update the checkpoint data; - current = createWriter(current.getGeneration() + 1); - logger.trace("current translog set to [{}]", current.getGeneration()); - - } catch (Exception e) { - IOUtils.closeWhileHandlingException(this); // tragic event - throw e; + rollGeneration(); } - return 0L; + return 0; } @Override public long commit() throws IOException { - try (ReleasableLock lock = writeLock.acquire()) { + try (ReleasableLock ignored = writeLock.acquire()) { ensureOpen(); if (currentCommittingGeneration == NOT_SET_GENERATION) { prepareCommit(); } assert currentCommittingGeneration != NOT_SET_GENERATION; - assert readers.stream().filter(r -> r.getGeneration() == currentCommittingGeneration).findFirst().isPresent() - : "reader list doesn't contain committing generation [" + currentCommittingGeneration + "]"; - lastCommittedTranslogFileGeneration = current.getGeneration(); // this is important - otherwise old files will not be cleaned up + assert readers.stream().anyMatch(r -> r.getGeneration() == currentCommittingGeneration) + : "readers missing committing generation [" + currentCommittingGeneration + "]"; + // set the last committed generation otherwise old files will not be cleaned up + lastCommittedTranslogFileGeneration = currentCommittingGeneration + 1; currentCommittingGeneration = NOT_SET_GENERATION; trimUnreferencedReaders(); } diff --git a/core/src/test/java/org/elasticsearch/index/IndexSettingsTests.java b/core/src/test/java/org/elasticsearch/index/IndexSettingsTests.java index a32d076272ba6..bc3ee4b5f06f1 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexSettingsTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexSettingsTests.java @@ -370,6 +370,27 @@ public void testTranslogFlushSizeThreshold() { assertEquals(actualNewTranslogFlushThresholdSize, settings.getFlushThresholdSize()); } + public void testTranslogGenerationSizeThreshold() { + final ByteSizeValue size = new ByteSizeValue(Math.abs(randomInt())); + final String key = IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey(); + final ByteSizeValue actualValue = + ByteSizeValue.parseBytesSizeValue(size.toString(), key); + final IndexMetaData metaData = + newIndexMeta( + "index", + Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(key, size.toString()) + .build()); + final IndexSettings settings = new IndexSettings(metaData, Settings.EMPTY); + assertEquals(actualValue, settings.getGenerationThresholdSize()); + final ByteSizeValue newSize = new ByteSizeValue(Math.abs(randomInt())); + final ByteSizeValue actual = ByteSizeValue.parseBytesSizeValue(newSize.toString(), key); + settings.updateIndexMetaData( + newIndexMeta("index", Settings.builder().put(key, newSize.toString()).build())); + assertEquals(actual, settings.getGenerationThresholdSize()); + } + public void testArchiveBrokenIndexSettings() { Settings settings = IndexScopedSettings.DEFAULT_SCOPED_SETTINGS.archiveUnknownOrInvalidSettings( diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 97c96c8af12f7..ff5556089d28d 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -363,49 +363,104 @@ public void testMaybeFlush() throws Exception { assertEquals(0, shard.getEngine().getTranslog().totalOperations()); } - public void testStressMaybeFlush() throws Exception { + public void testMaybeRollTranslogGeneration() throws Exception { + final int generationThreshold = randomIntBetween(1, 512); + final Settings settings = + Settings + .builder() + .put("index.number_of_shards", 1) + .put("index.translog.generation_threshold_size", generationThreshold + "b") + .put() + .build(); + createIndex("test", settings); + ensureGreen("test"); + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); + final IndexService test = indicesService.indexService(resolveIndex("test")); + final IndexShard shard = test.getShardOrNull(0); + int rolls = 0; + final Translog translog = shard.getEngine().getTranslog(); + final long generation = translog.currentFileGeneration(); + for (int i = 0; i < randomIntBetween(32, 128); i++) { + assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); + final ParsedDocument doc = testParsedDocument( + "1", + "test", + null, + SequenceNumbersService.UNASSIGNED_SEQ_NO, + new ParseContext.Document(), + new BytesArray(new byte[]{1}), XContentType.JSON, null); + final Engine.Index index = new Engine.Index(new Term("_uid", doc.uid()), doc); + final Engine.IndexResult result = shard.index(index); + final Translog.Location location = result.getTranslogLocation(); + shard.afterWriteOperation(); + if (location.translogLocation + location.size > generationThreshold) { + // wait until the roll completes + assertBusy(() -> assertFalse(shard.shouldRollTranslogGeneration())); + rolls++; + assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); + } + } + } + + public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { createIndex("test"); ensureGreen(); IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService(resolveIndex("test")); final IndexShard shard = test.getShardOrNull(0); assertFalse(shard.shouldFlush()); - client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put( - IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), - new ByteSizeValue(117/* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get(); - client().prepareIndex("test", "test", "0").setSource("{}", XContentType.JSON) - .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); + final String key; + final boolean flush = randomBoolean(); + if (flush) { + key = "index.translog.flush_threshold_size"; + } else { + key = "index.translog.generation_threshold_size"; + } + // size of the operation plus header and footer + final Settings settings = Settings.builder().put(key, "117b").build(); + client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get(); + client().prepareIndex("test", "test", "0") + .setSource("{}", XContentType.JSON) + .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE) + .get(); assertFalse(shard.shouldFlush()); final AtomicBoolean running = new AtomicBoolean(true); final int numThreads = randomIntBetween(2, 4); - Thread[] threads = new Thread[numThreads]; - CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); + final Thread[] threads = new Thread[numThreads]; + final CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); for (int i = 0; i < threads.length; i++) { - threads[i] = new Thread() { - @Override - public void run() { - try { - barrier.await(); - } catch (InterruptedException | BrokenBarrierException e) { - throw new RuntimeException(e); - } - while (running.get()) { - shard.maybeFlush(); - } + threads[i] = new Thread(() -> { + try { + barrier.await(); + } catch (final InterruptedException | BrokenBarrierException e) { + throw new RuntimeException(e); + } + while (running.get()) { + shard.afterWriteOperation(); } - }; + }); threads[i].start(); } barrier.await(); - FlushStats flushStats = shard.flushStats(); - long total = flushStats.getTotal(); - client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get(); - assertBusy(() -> assertEquals(total + 1, shard.flushStats().getTotal())); + final Runnable check; + if (flush) { + final FlushStats flushStats = shard.flushStats(); + final long total = flushStats.getTotal(); + client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get(); + check = () -> assertEquals(total + 1, shard.flushStats().getTotal()); + } else { + final long generation = shard.getEngine().getTranslog().currentFileGeneration(); + client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get(); + check = () -> assertEquals( + generation + 1, + shard.getEngine().getTranslog().currentFileGeneration()); + } + assertBusy(check); running.set(false); for (int i = 0; i < threads.length; i++) { threads[i].join(); } - assertEquals(total + 1, shard.flushStats().getTotal()); + check.run(); } public void testShardHasMemoryBufferOnTranslogRecover() throws Throwable { diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index e47a5652b2431..36401deed4bc2 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -41,16 +41,18 @@ import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine.Operation.Origin; @@ -100,6 +102,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -156,12 +159,25 @@ private Translog create(Path path) throws IOException { return new Translog(getTranslogConfig(path), null, () -> globalCheckpoint.get()); } - private TranslogConfig getTranslogConfig(Path path) { - Settings build = Settings.builder() - .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) - .build(); - ByteSizeValue bufferSize = randomBoolean() ? TranslogConfig.DEFAULT_BUFFER_SIZE : new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES); - return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.getIndex(), build), BigArrays.NON_RECYCLING_INSTANCE, bufferSize); + private TranslogConfig getTranslogConfig(final Path path) { + final Settings settings = Settings + .builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) + .build(); + return getTranslogConfig(path, settings); + } + + private TranslogConfig getTranslogConfig(final Path path, final Settings settings) { + final ByteSizeValue bufferSize; + if (randomBoolean()) { + bufferSize = TranslogConfig.DEFAULT_BUFFER_SIZE; + } else { + bufferSize = new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES); + } + + final IndexSettings indexSettings = + IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings); + return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize); } protected void addToTranslogAndList(Translog translog, ArrayList list, Translog.Operation op) throws IOException { @@ -2073,4 +2089,93 @@ public void testTranslogOpSerialization() throws Exception { Translog.Delete serializedDelete = new Translog.Delete(in); assertEquals(delete, serializedDelete); } + + public void testRollGeneration() throws IOException { + final long generation = translog.currentFileGeneration(); + final int rolls = randomIntBetween(1, 16); + int totalOperations = 0; + int seqNo = 0; + for (int i = 0; i < rolls; i++) { + final int operations = randomIntBetween(1, 128); + for (int j = 0; j < operations; j++) { + translog.add(new Translog.NoOp(seqNo++, 0, "test")); + totalOperations++; + } + try (ReleasableLock ignored = translog.writeLock.acquire()) { + translog.rollGeneration(); + } + assertThat(translog.currentFileGeneration(), equalTo(generation + i + 1)); + assertThat(translog.totalOperations(), equalTo(totalOperations)); + } + for (int i = 0; i <= rolls; i++) { + assertFileIsPresent(translog, generation + i); + } + translog.commit(); + assertThat(translog.currentFileGeneration(), equalTo(generation + rolls + 1)); + assertThat(translog.totalOperations(), equalTo(0)); + for (int i = 0; i <= rolls; i++) { + assertFileDeleted(translog, generation + i); + } + assertFileIsPresent(translog, generation + rolls + 1); + } + + public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException { + final long generation = translog.currentFileGeneration(); + int seqNo = 0; + + final int rollsBefore = randomIntBetween(0, 16); + for (int r = 1; r <= rollsBefore; r++) { + final int operationsBefore = randomIntBetween(1, 256); + for (int i = 0; i < operationsBefore; i++) { + translog.add(new Translog.NoOp(seqNo++, 0, "test")); + } + + try (Releasable ignored = translog.writeLock.acquire()) { + translog.rollGeneration(); + } + + assertThat(translog.currentFileGeneration(), equalTo(generation + r)); + for (int i = 0; i <= r; i++) { + assertFileIsPresent(translog, generation + r); + } + } + + assertThat(translog.currentFileGeneration(), equalTo(generation + rollsBefore)); + translog.prepareCommit(); + assertThat(translog.currentFileGeneration(), equalTo(generation + rollsBefore + 1)); + + for (int i = 0; i <= rollsBefore + 1; i++) { + assertFileIsPresent(translog, generation + i); + } + + final int rollsBetween = randomIntBetween(0, 16); + for (int r = 1; r <= rollsBetween; r++) { + final int operationsBetween = randomIntBetween(1, 256); + for (int i = 0; i < operationsBetween; i++) { + translog.add(new Translog.NoOp(seqNo++, 0, "test")); + } + + try (Releasable ignored = translog.writeLock.acquire()) { + translog.rollGeneration(); + } + + assertThat( + translog.currentFileGeneration(), + equalTo(generation + rollsBefore + 1 + r)); + for (int i = 0; i <= rollsBefore + 1 + r; i++) { + assertFileIsPresent(translog, generation + i); + } + } + + translog.commit(); + + for (int i = 0; i <= rollsBefore; i++) { + assertFileDeleted(translog, generation + i); + } + for (int i = rollsBefore + 1; i <= rollsBefore + 1 + rollsBetween; i++) { + assertFileIsPresent(translog, generation + i); + } + + } + } diff --git a/core/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java b/core/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java index ae6b4588271b4..762d409b6b75e 100644 --- a/core/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java +++ b/core/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java @@ -93,7 +93,11 @@ public void testResetDefault() { .admin() .indices() .prepareUpdateSettings("test") - .setSettings(Settings.builder().put("index.refresh_interval", -1).put("index.translog.flush_threshold_size", "1024b")) + .setSettings( + Settings.builder() + .put("index.refresh_interval", -1) + .put("index.translog.flush_threshold_size", "1024b") + .put("index.translog.generation_threshold_size", "4096b")) .execute() .actionGet(); IndexMetaData indexMetaData = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test"); @@ -103,6 +107,7 @@ public void testResetDefault() { if (indexService != null) { assertEquals(indexService.getIndexSettings().getRefreshInterval().millis(), -1); assertEquals(indexService.getIndexSettings().getFlushThresholdSize().getBytes(), 1024); + assertEquals(indexService.getIndexSettings().getGenerationThresholdSize().getBytes(), 4096); } } client() @@ -119,6 +124,7 @@ public void testResetDefault() { if (indexService != null) { assertEquals(indexService.getIndexSettings().getRefreshInterval().millis(), 1000); assertEquals(indexService.getIndexSettings().getFlushThresholdSize().getBytes(), 1024); + assertEquals(indexService.getIndexSettings().getGenerationThresholdSize().getBytes(), 4096); } } }