Skip to content

Commit

Permalink
Throw back replica local checkpoint on new primary
Browse files Browse the repository at this point in the history
This commit causes a replica to throwback its local checkpoint to the
global checkpoint when learning of a new primary through a replica
operation.
  • Loading branch information
jasontedor committed Jun 28, 2017
1 parent 5a4a473 commit 2115f4a
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,19 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) {
}
}

/**
* Resets the checkpoint to the specified value.
*
* @param checkpoint the local checkpoint to reset this tracker to
*/
synchronized void resetCheckpoint(final long checkpoint) {
assert checkpoint <= this.checkpoint;
processedSeqNo.clear();
firstProcessedSeqNo = checkpoint + 1;
nextSeqNo = checkpoint + 1;
this.checkpoint = checkpoint;
}

/**
* The current checkpoint which can be advanced by {@link #markSeqNoAsCompleted(long)}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ public void markSeqNoAsCompleted(final long seqNo) {
localCheckpointTracker.markSeqNoAsCompleted(seqNo);
}

/**
* Resets the local checkpoint to the specified value.
*
* @param localCheckpoint the local checkpoint to reset to
*/
public void resetLocalCheckpoint(final long localCheckpoint) {
localCheckpointTracker.resetCheckpoint(localCheckpoint);
}

/**
* The current sequence number stats.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2057,6 +2057,15 @@ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final
assert operationPrimaryTerm > primaryTerm :
"shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]";
primaryTerm = operationPrimaryTerm;
logger.trace(
"detected new primary with primary term [{}], "
+ "resetting local checkpoint from [{}] to [{}], "
+ "updating global checkpoint to [{}]",
operationPrimaryTerm,
getLocalCheckpoint(),
globalCheckpoint,
globalCheckpoint);
getEngine().seqNoService().resetLocalCheckpoint(globalCheckpoint);
updateGlobalCheckpointOnReplica(globalCheckpoint);
getEngine().getTranslog().rollGeneration();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.not;

public class LocalCheckpointTrackerTests extends ESTestCase {

Expand All @@ -49,14 +51,14 @@ public class LocalCheckpointTrackerTests extends ESTestCase {

public static LocalCheckpointTracker createEmptyTracker() {
return new LocalCheckpointTracker(
IndexSettingsModule.newIndexSettings(
"test",
Settings
.builder()
.put(LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE.getKey(), SMALL_CHUNK_SIZE)
.build()),
SequenceNumbersService.NO_OPS_PERFORMED,
SequenceNumbersService.NO_OPS_PERFORMED
IndexSettingsModule.newIndexSettings(
"test",
Settings
.builder()
.put(LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE.getKey(), SMALL_CHUNK_SIZE)
.build()),
SequenceNumbersService.NO_OPS_PERFORMED,
SequenceNumbersService.NO_OPS_PERFORMED
);
}

Expand Down Expand Up @@ -236,4 +238,24 @@ public void testWaitForOpsToComplete() throws BrokenBarrierException, Interrupte

thread.join();
}

public void testResetCheckpoint() {
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
for (int i = 0; i < operations; i++) {
if (!rarely()) {
tracker.markSeqNoAsCompleted(i);
}
}

final int localCheckpoint =
randomIntBetween(Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED), Math.toIntExact(tracker.getCheckpoint()));
tracker.resetCheckpoint(localCheckpoint);
assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint));
assertThat(tracker.getMaxSeqNo(), equalTo((long) localCheckpoint));
assertThat(tracker.processedSeqNo, empty());
assertThat(tracker.generateSeqNo(), equalTo((long) (localCheckpoint + 1)));
tracker.markSeqNoAsCompleted((long) (localCheckpoint + 1));
assertThat(tracker.processedSeqNo, not(empty()));
assertThat(tracker.processedSeqNo.peek().get(0), equalTo(true));
}
}
112 changes: 92 additions & 20 deletions core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;

/**
Expand Down Expand Up @@ -405,26 +404,10 @@ public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception {

// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
int max = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED);
boolean gap = false;
for (int i = 0; i < operations; i++) {
if (!rarely()) {
final String id = Integer.toString(i);
SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id,
new BytesArray("{}"), XContentType.JSON);
indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(),
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse,
getMappingUpdater(indexShard, sourceToParse.type()));
max = i;
} else {
gap = true;
}
}
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED));

final int maxSeqNo = max;
if (gap) {
assertThat(indexShard.getLocalCheckpoint(), not(equalTo(maxSeqNo)));
}
final int maxSeqNo = result.maxSeqNo;
final boolean gap = result.gap;

// promote the replica
final ShardRouting replicaRouting = indexShard.routingEntry();
Expand Down Expand Up @@ -637,6 +620,7 @@ public void onFailure(Exception e) {
@Override
public void onResponse(Releasable releasable) {
assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
assertThat(indexShard.getLocalCheckpoint(), equalTo(newGlobalCheckPoint));
assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
onResponse.set(true);
releasable.close();
Expand Down Expand Up @@ -697,6 +681,7 @@ private void finish() {
assertTrue(onResponse.get());
assertNull(onFailure.get());
assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1));
assertThat(indexShard.getLocalCheckpoint(), equalTo(newGlobalCheckPoint));
assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
}
}
Expand All @@ -707,6 +692,44 @@ private void finish() {
closeShards(indexShard);
}

public void testThrowbackLocalCheckpointOnReplica() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);

// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED));

final int globalCheckpoint =
randomIntBetween(
Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED),
Math.toIntExact(indexShard.getLocalCheckpoint()));
final CountDownLatch latch = new CountDownLatch(1);
indexShard.acquireReplicaOperationPermit(
indexShard.primaryTerm + 1,
globalCheckpoint,
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
latch.countDown();
}

@Override
public void onFailure(Exception e) {

}
},
ThreadPool.Names.SAME);

latch.await();
assertThat(indexShard.getLocalCheckpoint(), equalTo((long) globalCheckpoint));

// ensure that after the local checkpoint throwback and indexing again, the local checkpoint advances
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint()));
assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint));

closeShards(indexShard);
}

public void testConcurrentTermIncreaseOnReplicaShard() throws BrokenBarrierException, InterruptedException, IOException {
final IndexShard indexShard = newStartedShard(false);

Expand Down Expand Up @@ -1966,6 +1989,55 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept
closeShards(newShard);
}

class Result {
private final int localCheckpoint;
private final int maxSeqNo;
private final boolean gap;

public Result(final int localCheckpoint, final int maxSeqNo, final boolean gap) {
this.localCheckpoint = localCheckpoint;
this.maxSeqNo = maxSeqNo;
this.gap = gap;
}
}

/**
* Index on the specified shard while introducing sequence number gaps.
*
* @param indexShard the shard
* @param operations the number of operations
* @param offset the starting sequence number
* @return a pair of the maximum sequence number and whether or not a gap was introduced
* @throws IOException if an I/O exception occurs while indexing on the shard
*/
private Result indexOnReplicaWithGaps(
final IndexShard indexShard,
final int operations,
final int offset) throws IOException {
int localCheckpoint = offset;
int max = offset;
boolean gap = false;
for (int i = offset + 1; i < operations; i++) {
if (!rarely()) {
final String id = Integer.toString(i);
SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id,
new BytesArray("{}"), XContentType.JSON);
indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(),
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse,
getMappingUpdater(indexShard, sourceToParse.type()));
if (!gap && i == localCheckpoint + 1) {
localCheckpoint++;
}
max = i;
} else {
gap = true;
}
}
assert localCheckpoint == indexShard.getLocalCheckpoint();
assert !gap || (localCheckpoint != max);
return new Result(localCheckpoint, max, gap);
}

/** A dummy repository for testing which just needs restore overridden */
private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
private final String indexName;
Expand Down

0 comments on commit 2115f4a

Please sign in to comment.