Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce translog generation rolling #23606

Merged
merged 29 commits into from
Mar 27, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
cabd994
Introduce translog generation folding
jasontedor Mar 14, 2017
152cae7
Fix ordering of checking/locks
jasontedor Mar 16, 2017
91d5f85
Remove unnecessary import
jasontedor Mar 16, 2017
c88e788
Change method to be instance method
jasontedor Mar 16, 2017
7e69ae8
Remove generation parameter from fold
jasontedor Mar 16, 2017
fb00dd3
Remove obsolete catch block
jasontedor Mar 16, 2017
0d5e6e2
Move field
jasontedor Mar 16, 2017
bbab126
Rename folding generation to rolling generation
jasontedor Mar 17, 2017
421b336
Merge branch 'master' into translog-generation
jasontedor Mar 17, 2017
8f6b609
Stricter roll checking
jasontedor Mar 17, 2017
70ade35
Merge branch 'master' into translog-generation
jasontedor Mar 17, 2017
d9fbc42
Add test for rolling and committing
jasontedor Mar 17, 2017
9076d79
Simplify tests
jasontedor Mar 17, 2017
b79053e
Fix typo
jasontedor Mar 18, 2017
b4ed67d
Merge branch 'master' into translog-generation
jasontedor Mar 20, 2017
1669224
Stronger test for precommit/roll/commit sequences
jasontedor Mar 20, 2017
35d0119
Add Javadocs to IndexSettings#getGenerationThresholdSize
jasontedor Mar 20, 2017
32afd9e
Add assert message
jasontedor Mar 20, 2017
518f12e
Merge branch 'master' into translog-generation
jasontedor Mar 22, 2017
7480faf
Merge branch 'master' into translog-generation
jasontedor Mar 22, 2017
dd14ba8
Migrate translog generation rolling to index shard
jasontedor Mar 22, 2017
f59233c
Add Javadocs and tidy up
jasontedor Mar 22, 2017
3c78802
Merge branch 'master' into translog-generation
jasontedor Mar 22, 2017
192a7ce
Funny business
jasontedor Mar 23, 2017
6c2adb6
Revert "Funny business"
jasontedor Mar 23, 2017
1a31061
Committed generation
jasontedor Mar 23, 2017
07fc092
Remove unneeded field
jasontedor Mar 23, 2017
2e7f722
Fix thread pool name
jasontedor Mar 27, 2017
a14937d
Change method name
jasontedor Mar 27, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING,
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
FieldMapper.IGNORE_MALFORMED_SETTING,
FieldMapper.COERCE_SETTING,
Expand Down
24 changes: 23 additions & 1 deletion core/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.lucene.index.MergePolicy;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
Expand Down Expand Up @@ -112,6 +111,16 @@ public final class IndexSettings {
Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic,
Property.IndexScope);

/**
* The maximum size of a translog generation. This is independent of the maximum size of
* translog operations that have not been flushed.
*/
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING =
Setting.byteSizeSetting(
"index.translog.generation_threshold_size",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wondering if this should just be index.translog.generation_size

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I picked index.translog.generation_threshold_size to be consistent with index.translog.flush_threshold_size. Do you still wonder if it should be changed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep as is then. I don't mind much.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you maybe document what happens if that size is exceeded or add a link to the explain?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanna understand why this is 64MB why can't we use INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING or it's default instead rather than an arbitrary value? I would love to get some insight that we also can document? Then there is also the question why we can't simply flush this from the outside via IndexShard#maybeFlush, that would simplify the translog potentially, one property that I liked about the Translog#add() operation was that it would never acquire a write lock. We are now entering potentially dangerous territory here, locking can be a beast.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanna understand why this is 64MB

Jason can say why he chose 64MB - I think any value we choose now will be arbitrary so I was good with it for now and tweak later if needed.

why can't we use INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING or it's default instead rather than an arbitrary value?

We want a number that's much smaller than the flush size. We're heading towards keeping the generations that are needed to recover all seq# ops after a certain point. That means that we won't always clean previous generation when flushing. If we use the flush size for generations we can end up in a poisonous where we repeatedly try to flush but the translog is not trimmed.

why we can't simply flush this from the outside via IndexShard#maybeFlush

Do you mean doing both flushing and potentially opening a new generation from the same method that is called after indexing? i.e., call it maybeFlushAndRoll ( :) ) ? I would be good with that (though prefer the current approach).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean doing both flushing and potentially opening a new generation from the same method that is called after indexing? i.e., call it maybeFlushAndRoll ( :) ) ? I would be good with that (though prefer the current approach).

I was looking into having a Translog#rollover() method we can from the maybeFlush we can rename maybeFlush to onAfterOpteration to not necessarily yield impl details. I am a bit concerned about the write lock, which might not be a problem today but maybe tomorrow.

We want a number that's much smaller than the flush size. We're heading towards keeping the generations that are needed to recover all seq# ops after a certain point.

makes sense. lets document this. it's not clear from reviewing the PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed dd14ba8 to move the control of rolling to index shard. Would @nik9000, @bleskes, and @s1monw please take another look?

new ByteSizeValue(64, ByteSizeUnit.MB),
new Property[]{Property.Dynamic, Property.IndexScope});

public static final Setting<TimeValue> INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL =
Setting.timeSetting("index.seq_no.checkpoint_sync_interval", new TimeValue(30, TimeUnit.SECONDS),
new TimeValue(-1, TimeUnit.MILLISECONDS), Property.Dynamic, Property.IndexScope);
Expand Down Expand Up @@ -156,6 +165,7 @@ public final class IndexSettings {
private volatile TimeValue refreshInterval;
private final TimeValue globalCheckpointInterval;
private volatile ByteSizeValue flushThresholdSize;
private volatile ByteSizeValue generationThresholdSize;
private final MergeSchedulerConfig mergeSchedulerConfig;
private final MergePolicyConfig mergePolicyConfig;
private final IndexScopedSettings scopedSettings;
Expand Down Expand Up @@ -250,6 +260,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
globalCheckpointInterval = scopedSettings.get(INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL);
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
mergeSchedulerConfig = new MergeSchedulerConfig(this);
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING);
Expand Down Expand Up @@ -281,6 +292,9 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(INDEX_WARMER_ENABLED_SETTING, this::setEnableWarmer);
scopedSettings.addSettingsUpdateConsumer(INDEX_GC_DELETES_SETTING, this::setGCDeletes);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, this::setTranslogFlushThresholdSize);
scopedSettings.addSettingsUpdateConsumer(
INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
this::setGenerationThresholdSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval);
scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners);
scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll);
Expand All @@ -290,6 +304,10 @@ private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {
this.flushThresholdSize = byteSizeValue;
}

private void setGenerationThresholdSize(final ByteSizeValue generationThresholdSize) {
this.generationThresholdSize = generationThresholdSize;
}

private void setGCDeletes(TimeValue timeValue) {
this.gcDeletesInMillis = timeValue.getMillis();
}
Expand Down Expand Up @@ -461,6 +479,10 @@ public TimeValue getGlobalCheckpointInterval() {
*/
public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; }

public ByteSizeValue getGenerationThresholdSize() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadocs please

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 35d0119.

return generationThresholdSize;
}

/**
* Returns the {@link MergeSchedulerConfig}
*/
Expand Down
86 changes: 66 additions & 20 deletions core/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public Path location() {
* Returns the generation of the current transaction log.
*/
public long currentFileGeneration() {
try (ReleasableLock lock = readLock.acquire()) {
try (ReleasableLock ignored = readLock.acquire()) {
return current.getGeneration();
}
}
Expand Down Expand Up @@ -399,6 +399,8 @@ TranslogWriter createWriter(long fileGeneration) throws IOException {
return newFile;
}

final AtomicBoolean foldingGeneration = new AtomicBoolean();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting this down here rather than up with the rest of the members will make it much harder to find. Even if this is the one place it is used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 0d5e6e2.


/**
* Adds an operation to the transaction log.
*
Expand All @@ -409,20 +411,31 @@ TranslogWriter createWriter(long fileGeneration) throws IOException {
public Location add(final Operation operation) throws IOException {
final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
try {
final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out);
final long start = out.position();
out.skip(Integer.BYTES);
writeOperationNoSize(checksumStreamOutput, operation);
writeOperationNoSize(new BufferedChecksumStreamOutput(out), operation);
final long end = out.position();
final int operationSize = (int) (end - Integer.BYTES - start);
out.seek(start);
out.writeInt(operationSize);
out.seek(end);
final ReleasablePagedBytesReference bytes = out.bytes();
final Location location;
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
return current.add(bytes, operation.seqNo());
location = current.add(bytes, operation.seqNo());
}
try (ReleasableLock ignored = writeLock.acquire()) {
if (shouldFoldGeneration(this) && foldingGeneration.compareAndSet(false, true)) {
// we have to check the condition again lest we could fold twice in a race
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you already have the writeLock can this race?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My fingers did not do what they suppose to do here. 😉 I pushed 152cae7.

if (shouldFoldGeneration(this)) {
this.foldGeneration(current.getGeneration());
}
final boolean wasFoldingGeneration = foldingGeneration.getAndSet(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would feel better if this was in a finally clause (although if we have any exception here, we will fail the translog anyway). Thinking about this more - shall we move this logic into it's own "foldIfNeeded" method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 8f6b609.

assert wasFoldingGeneration;
}
}
return location;
} catch (final AlreadyClosedException | IOException ex) {
try {
closeOnTragicEvent(ex);
Expand All @@ -442,6 +455,20 @@ public Location add(final Operation operation) throws IOException {
}
}

/**
* Tests whether or not the current generation of the translog should be folded into a new
* generation. This test is based on the size of the current generation compared to the
* configured generation threshold size.
*
* @param translog the translog
* @return {@code true} if the current generation should be folded into a new generation
*/
private static boolean shouldFoldGeneration(final Translog translog) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd wouldn't have declared this as static. I'm really curious why you did. I expect I have something to learn from this....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed c88e788.

final long size = translog.current.sizeInBytes();
final long threshold = translog.indexSettings.getGenerationThresholdSize().getBytes();
return size > threshold;
}

/**
* The a {@linkplain Location} that will sort after the {@linkplain Location} returned by the last write but before any locations which
* can be returned by the next write.
Expand Down Expand Up @@ -1322,27 +1349,46 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl
out.writeInt((int) checksum);
}

/**
* Fold the current translog generation into a new generation. This does not commit the
* translog. The translog write lock must be held by the current thread.
*
* @param generation the current translog generation
* @throws IOException if an I/O exception occurred during any file operations
*/
void foldGeneration(final long generation) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/generation/currentGeneration/?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 7e69ae8.

assert writeLock.isHeldByCurrentThread();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe allow this assert to give us a message?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 32afd9e.

try {
final TranslogReader reader = current.closeIntoReader();
readers.add(reader);
final Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
assert Checkpoint.read(checkpoint).generation == generation;
final Path generationCheckpoint =
location.resolve(getCommitCheckpointFileName(generation));
Files.copy(checkpoint, generationCheckpoint);
IOUtils.fsync(generationCheckpoint, false);
IOUtils.fsync(generationCheckpoint.getParent(), true);
// create a new translog file; this will sync it and update the checkpoint data;
current = createWriter(generation + 1);
logger.trace("current translog set to [{}]", current.getGeneration());
} catch (final Exception e) {
IOUtils.closeWhileHandlingException(this); // tragic event
throw e;
}
}

@Override
public long prepareCommit() throws IOException {
try (ReleasableLock lock = writeLock.acquire()) {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
if (currentCommittingGeneration != NOT_SET_GENERATION) {
throw new IllegalStateException("already committing a translog with generation: " + currentCommittingGeneration);
throw new IllegalStateException("already committing a translog with generation: " +
currentCommittingGeneration);
}
currentCommittingGeneration = current.getGeneration();
TranslogReader currentCommittingTranslog = current.closeIntoReader();
readers.add(currentCommittingTranslog);
Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
assert Checkpoint.read(checkpoint).generation == currentCommittingTranslog.getGeneration();
Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(currentCommittingTranslog.getGeneration()));
Files.copy(checkpoint, commitCheckpoint);
IOUtils.fsync(commitCheckpoint, false);
IOUtils.fsync(commitCheckpoint.getParent(), true);
// create a new translog file - this will sync it and update the checkpoint data;
current = createWriter(current.getGeneration() + 1);
logger.trace("current translog set to [{}]", current.getGeneration());

} catch (Exception e) {
final long generation = current.getGeneration();
currentCommittingGeneration = generation;
foldGeneration(generation);
} catch (final Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this catch block still needed? foldGeneration has one just like it. Probably worth adding a comment if it is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed fb00dd3.

IOUtils.closeWhileHandlingException(this); // tragic event
throw e;
}
Expand Down
21 changes: 21 additions & 0 deletions core/src/test/java/org/elasticsearch/index/IndexSettingsTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,27 @@ public void testTranslogFlushSizeThreshold() {
assertEquals(actualNewTranslogFlushThresholdSize, settings.getFlushThresholdSize());
}

public void testTranslogGenerationSizeThreshold() {
final ByteSizeValue size = new ByteSizeValue(Math.abs(randomInt()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we protect against 0? Thinking about it more, we should at least fit one op in there. So I wonder if we should have a decent lower bound. The problem in making it too big is testing - how about making it 1kb?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary? Even at zero one operation can squeeze into the generation before rolling (we do not check until after an operation is added to the translog). I like it small for testing, it's easier to think about. If you feel strongly that we should lower bound it, I'm fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on the fence. Ideally we should protect from abusing a production system. Let's keep things as they are and not over engineer this. Thanks for pointing out that a single op will always go in there.

final String key = IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey();
final ByteSizeValue actualValue =
ByteSizeValue.parseBytesSizeValue(size.toString(), key);
final IndexMetaData metaData =
newIndexMeta(
"index",
Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(key, size.toString())
.build());
final IndexSettings settings = new IndexSettings(metaData, Settings.EMPTY);
assertEquals(actualValue, settings.getGenerationThresholdSize());
final ByteSizeValue newSize = new ByteSizeValue(Math.abs(randomInt()));
final ByteSizeValue actual = ByteSizeValue.parseBytesSizeValue(newSize.toString(), key);
settings.updateIndexMetaData(
newIndexMeta("index", Settings.builder().put(key, newSize.toString()).build()));
assertEquals(actual, settings.getGenerationThresholdSize());
}

public void testArchiveBrokenIndexSettings() {
Settings settings =
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS.archiveUnknownOrInvalidSettings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.translog;

import com.carrotsearch.randomizedtesting.annotations.Repeat;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
Expand All @@ -44,13 +45,14 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.Engine.Operation.Origin;
Expand Down Expand Up @@ -100,6 +102,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -156,12 +159,25 @@ private Translog create(Path path) throws IOException {
return new Translog(getTranslogConfig(path), null, () -> globalCheckpoint.get());
}

private TranslogConfig getTranslogConfig(Path path) {
Settings build = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
.build();
ByteSizeValue bufferSize = randomBoolean() ? TranslogConfig.DEFAULT_BUFFER_SIZE : new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES);
return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.getIndex(), build), BigArrays.NON_RECYCLING_INSTANCE, bufferSize);
private TranslogConfig getTranslogConfig(final Path path) {
final Settings settings = Settings
.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
.build();
return getTranslogConfig(path, settings);
}

private TranslogConfig getTranslogConfig(final Path path, final Settings settings) {
final ByteSizeValue bufferSize;
if (randomBoolean()) {
bufferSize = TranslogConfig.DEFAULT_BUFFER_SIZE;
} else {
bufferSize = new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value interacts with the test for generation folding, right? Would making this too low break other tests? Can you leave a comment about the size that you set here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only a refactoring of some existing logic.

}

final IndexSettings indexSettings =
IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings);
return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize);
}

protected void addToTranslogAndList(Translog translog, ArrayList<Translog.Operation> list, Translog.Operation op) throws IOException {
Expand Down Expand Up @@ -2073,4 +2089,65 @@ public void testTranslogOpSerialization() throws Exception {
Translog.Delete serializedDelete = new Translog.Delete(in);
assertEquals(delete, serializedDelete);
}

public void testFoldGeneration() throws IOException {
final long generation = translog.currentFileGeneration();
final int folds = randomIntBetween(1, 16);
int totalOperations = 0;
int seqNo = 0;
for (int i = 0; i < folds; i++) {
final int operations = randomIntBetween(1, 128);
for (int j = 0; j < operations; j++) {
translog.add(new Translog.NoOp(seqNo++, 0, "test"));
totalOperations++;
}
try (ReleasableLock ignored = translog.writeLock.acquire()) {
translog.foldGeneration(generation + i);
}
assertThat(translog.currentFileGeneration(), equalTo(generation + i + 1));
assertThat(translog.totalOperations(), equalTo(totalOperations));
}
for (int i = 0; i <= folds; i++) {
assertFileIsPresent(translog, generation + i);
}
translog.commit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we check that we can also fold between precommit and commit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed d9fbc42.

assertThat(translog.currentFileGeneration(), equalTo(generation + folds + 1));
assertThat(translog.totalOperations(), equalTo(0));
for (int i = 0; i <= folds; i++) {
assertFileDeleted(translog, generation + i);
}
assertFileIsPresent(translog, generation + folds + 1);
}

public void testGenerationThreshold() throws IOException {
translog.close();
final int generationThreshold = randomIntBetween(1, 512);
final Settings settings = Settings
.builder()
.put("index.translog.generation_threshold_size", generationThreshold + "b")
.build();
long seqNo = 0;
long folds = 0;
final TranslogConfig config = getTranslogConfig(translogDir, settings);
try (Translog translog =
new Translog(config, null, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
final long generation = translog.currentFileGeneration();
for (int i = 0; i < randomIntBetween(32, 128); i++) {
assertThat(translog.currentFileGeneration(), equalTo(generation + folds));
final Location location = translog.add(new Translog.NoOp(seqNo++, 0, "test"));
if (location.translogLocation + location.size > generationThreshold) {
folds++;
assertThat(translog.currentFileGeneration(), equalTo(generation + folds));
for (int j = 0; j < folds; j++) {
assertFileIsPresent(translog, generation + j);
}
}
}

for (int j = 0; j < folds; j++) {
assertFileIsPresent(translog, generation + j);
}
}
}

}
Loading