Skip to content

Commit

Permalink
Introduce translog generation rolling
Browse files Browse the repository at this point in the history
This commit introduces a maximum size for a translog generation and
automatically rolls the translog when a generation exceeds the threshold
into a new generation. This threshold is configurable per index and
defaults to sixty-four megabytes. We introduce this constraint as
sequence numbers will require keeping around more than the current
generation (to ensure that we can rollback to the global
checkpoint). Without keeping the size of generations under control,
having to keep old generations around could consume excessive disk
space. A follow-up will enable commits to trim previous generations
based on the global checkpoint.

Relates #23606
  • Loading branch information
jasontedor authored Mar 27, 2017
1 parent defd045 commit b54a9e9
Show file tree
Hide file tree
Showing 9 changed files with 441 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,12 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
Expand All @@ -46,7 +42,6 @@
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.apache.logging.log4j.core.pattern.ConverterKeys;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -302,15 +297,21 @@ private void maybeFinish() {
}

void run() {
// we either respond immediately ie. if we we don't fsync per request or wait for refresh
// OR we got an pass async operations on and wait for them to return to respond.
indexShard.maybeFlush();
maybeFinish(); // decrement the pendingOpts by one, if there is nothing else to do we just respond with success.
/*
* We either respond immediately (i.e., if we do not fsync per request or wait for
* refresh), or we there are past async operations and we wait for them to return to
* respond.
*/
indexShard.afterWriteOperation();
// decrement pending by one, if there is nothing else to do we just respond with success
maybeFinish();
if (waitUntilRefresh) {
assert pendingOps.get() > 0;
indexShard.addRefreshListener(location, forcedRefresh -> {
if (forcedRefresh) {
logger.warn("block_until_refresh request ran out of slots and forced a refresh: [{}]", request);
logger.warn(
"block until refresh ran out of slots and forced a refresh: [{}]",
request);
}
refreshed.set(forcedRefresh);
maybeFinish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING,
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
FieldMapper.IGNORE_MALFORMED_SETTING,
FieldMapper.COERCE_SETTING,
Expand Down
33 changes: 32 additions & 1 deletion core/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.lucene.index.MergePolicy;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
Expand Down Expand Up @@ -112,6 +111,16 @@ public final class IndexSettings {
Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic,
Property.IndexScope);

/**
* The maximum size of a translog generation. This is independent of the maximum size of
* translog operations that have not been flushed.
*/
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING =
Setting.byteSizeSetting(
"index.translog.generation_threshold_size",
new ByteSizeValue(64, ByteSizeUnit.MB),
new Property[]{Property.Dynamic, Property.IndexScope});

public static final Setting<TimeValue> INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL =
Setting.timeSetting("index.seq_no.checkpoint_sync_interval", new TimeValue(30, TimeUnit.SECONDS),
new TimeValue(-1, TimeUnit.MILLISECONDS), Property.Dynamic, Property.IndexScope);
Expand Down Expand Up @@ -156,6 +165,7 @@ public final class IndexSettings {
private volatile TimeValue refreshInterval;
private final TimeValue globalCheckpointInterval;
private volatile ByteSizeValue flushThresholdSize;
private volatile ByteSizeValue generationThresholdSize;
private final MergeSchedulerConfig mergeSchedulerConfig;
private final MergePolicyConfig mergePolicyConfig;
private final IndexScopedSettings scopedSettings;
Expand Down Expand Up @@ -250,6 +260,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
globalCheckpointInterval = scopedSettings.get(INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL);
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
mergeSchedulerConfig = new MergeSchedulerConfig(this);
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING);
Expand Down Expand Up @@ -281,6 +292,9 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(INDEX_WARMER_ENABLED_SETTING, this::setEnableWarmer);
scopedSettings.addSettingsUpdateConsumer(INDEX_GC_DELETES_SETTING, this::setGCDeletes);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, this::setTranslogFlushThresholdSize);
scopedSettings.addSettingsUpdateConsumer(
INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
this::setGenerationThresholdSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval);
scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners);
scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll);
Expand All @@ -290,6 +304,10 @@ private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {
this.flushThresholdSize = byteSizeValue;
}

private void setGenerationThresholdSize(final ByteSizeValue generationThresholdSize) {
this.generationThresholdSize = generationThresholdSize;
}

private void setGCDeletes(TimeValue timeValue) {
this.gcDeletesInMillis = timeValue.getMillis();
}
Expand Down Expand Up @@ -461,6 +479,19 @@ public TimeValue getGlobalCheckpointInterval() {
*/
public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; }

/**
* Returns the generation threshold size. As sequence numbers can cause multiple generations to
* be preserved for rollback purposes, we want to keep the size of individual generations from
* growing too large to avoid excessive disk space consumption. Therefore, the translog is
* automatically rolled to a new generation when the current generation exceeds this generation
* threshold size.
*
* @return the generation threshold size
*/
public ByteSizeValue getGenerationThresholdSize() {
return generationThresholdSize;
}

/**
* Returns the {@link MergeSchedulerConfig}
*/
Expand Down
152 changes: 108 additions & 44 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -771,27 +771,44 @@ public Engine.SyncedFlushResult syncFlush(String syncId, Engine.CommitId expecte
return engine.syncFlush(syncId, expectedCommitId);
}

public Engine.CommitId flush(FlushRequest request) throws ElasticsearchException {
boolean waitIfOngoing = request.waitIfOngoing();
boolean force = request.force();
if (logger.isTraceEnabled()) {
logger.trace("flush with {}", request);
}
// we allows flush while recovering, since we allow for operations to happen
// while recovering, and we want to keep the translog at bay (up to deletes, which
// we don't gc). Yet, we don't use flush internally to clear deletes and flush the indexwriter since
// we use #writeIndexingBuffer for this now.
/**
* Executes the given flush request against the engine.
*
* @param request the flush request
* @return the commit ID
*/
public Engine.CommitId flush(FlushRequest request) {
final boolean waitIfOngoing = request.waitIfOngoing();
final boolean force = request.force();
logger.trace("flush with {}", request);
/*
* We allow flushes while recovery since we allow operations to happen while recovering and
* we want to keep the translog under control (up to deletes, which we do not GC). Yet, we
* do not use flush internally to clear deletes and flush the index writer since we use
* Engine#writeIndexingBuffer for this now.
*/
verifyNotClosed();
Engine engine = getEngine();
final Engine engine = getEngine();
if (engine.isRecovering()) {
throw new IllegalIndexShardStateException(shardId(), state, "flush is only allowed if the engine is not recovery" +
" from translog");
throw new IllegalIndexShardStateException(
shardId(),
state,
"flush is only allowed if the engine is not recovery from translog");
}
long time = System.nanoTime();
Engine.CommitId commitId = engine.flush(force, waitIfOngoing);
final long time = System.nanoTime();
final Engine.CommitId commitId = engine.flush(force, waitIfOngoing);
flushMetric.inc(System.nanoTime() - time);
return commitId;
}

/**
* Rolls the tranlog generation.
*
* @throws IOException if any file operations on the translog throw an I/O exception
*/
private void rollTranslogGeneration() throws IOException {
final Engine engine = getEngine();
engine.getTranslog().rollGeneration();
}

public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
Expand Down Expand Up @@ -1256,17 +1273,39 @@ public boolean restoreFromRepository(Repository repository) {
}

/**
* Returns <code>true</code> iff this shard needs to be flushed due to too many translog operation or a too large transaction log.
* Otherwise <code>false</code>.
* Tests whether or not the translog should be flushed. This test is based on the current size
* of the translog comparted to the configured flush threshold size.
*
* @return {@code true} if the translog should be flushed
*/
boolean shouldFlush() {
Engine engine = getEngineOrNull();
final Engine engine = getEngineOrNull();
if (engine != null) {
try {
Translog translog = engine.getTranslog();
return translog.sizeInBytes() > indexSettings.getFlushThresholdSize().getBytes();
} catch (AlreadyClosedException ex) {
// that's fine we are already close - no need to flush
final Translog translog = engine.getTranslog();
return translog.shouldFlush();
} catch (final AlreadyClosedException e) {
// we are already closed, no need to flush or roll
}
}
return false;
}

/**
* Tests whether or not the translog generation should be rolled to a new generation. This test
* is based on the size of the current generation compared to the configured generation
* threshold size.
*
* @return {@code true} if the current generation should be rolled to a new generation
*/
boolean shouldRollTranslogGeneration() {
final Engine engine = getEngineOrNull();
if (engine != null) {
try {
final Translog translog = engine.getTranslog();
return translog.shouldRollGeneration();
} catch (final AlreadyClosedException e) {
// we are already closed, no need to flush or roll
}
}
return false;
Expand Down Expand Up @@ -1810,28 +1849,31 @@ public Translog.Durability getTranslogDurability() {
return indexSettings.getTranslogDurability();
}

private final AtomicBoolean asyncFlushRunning = new AtomicBoolean();
// we can not protect with a lock since we "release" on a different thread
private final AtomicBoolean flushOrRollRunning = new AtomicBoolean();

/**
* Schedules a flush if needed but won't schedule more than one flush concurrently. The flush will be executed on the
* Flush thread-pool asynchronously.
*
* @return <code>true</code> if a new flush is scheduled otherwise <code>false</code>.
* Schedules a flush or translog generation roll if needed but will not schedule more than one
* concurrently. The operation will be executed asynchronously on the flush thread pool.
*/
public boolean maybeFlush() {
if (shouldFlush()) {
if (asyncFlushRunning.compareAndSet(false, true)) { // we can't use a lock here since we "release" in a different thread
if (shouldFlush() == false) {
// we have to check again since otherwise there is a race when a thread passes
// the first shouldFlush() check next to another thread which flushes fast enough
// to finish before the current thread could flip the asyncFlushRunning flag.
// in that situation we have an extra unexpected flush.
asyncFlushRunning.compareAndSet(true, false);
} else {
public void afterWriteOperation() {
if (shouldFlush() || shouldRollTranslogGeneration()) {
if (flushOrRollRunning.compareAndSet(false, true)) {
/*
* We have to check again since otherwise there is a race when a thread passes the
* first check next to another thread which performs the operation quickly enough to
* finish before the current thread could flip the flag. In that situation, we have
* an extra operation.
*
* Additionally, a flush implicitly executes a translog generation roll so if we
* execute a flush then we do not need to check if we should roll the translog
* generation.
*/
if (shouldFlush()) {
logger.debug("submitting async flush request");
final AbstractRunnable abstractRunnable = new AbstractRunnable() {
final AbstractRunnable flush = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
public void onFailure(final Exception e) {
if (state != IndexShardState.CLOSED) {
logger.warn("failed to flush index", e);
}
Expand All @@ -1844,16 +1886,38 @@ protected void doRun() throws Exception {

@Override
public void onAfter() {
asyncFlushRunning.compareAndSet(true, false);
maybeFlush(); // fire a flush up again if we have filled up the limits such that shouldFlush() returns true
flushOrRollRunning.compareAndSet(true, false);
afterWriteOperation();
}
};
threadPool.executor(ThreadPool.Names.FLUSH).execute(abstractRunnable);
return true;
threadPool.executor(ThreadPool.Names.FLUSH).execute(flush);
} else if (shouldRollTranslogGeneration()) {
logger.debug("submitting async roll translog generation request");
final AbstractRunnable roll = new AbstractRunnable() {
@Override
public void onFailure(final Exception e) {
if (state != IndexShardState.CLOSED) {
logger.warn("failed to roll translog generation", e);
}
}

@Override
protected void doRun() throws Exception {
rollTranslogGeneration();
}

@Override
public void onAfter() {
flushOrRollRunning.compareAndSet(true, false);
afterWriteOperation();
}
};
threadPool.executor(ThreadPool.Names.FLUSH).execute(roll);
} else {
flushOrRollRunning.compareAndSet(true, false);
}
}
}
return false;
}

/**
Expand Down
Loading

0 comments on commit b54a9e9

Please sign in to comment.