Skip to content

Commit

Permalink
Ensure replica's store always contains the previous commit point.
Browse files Browse the repository at this point in the history
This change:
1. Updates the cleanup and validation steps after a replication event occurs to prevent
deleting files still referenced by both the on disk segments_N file and the in memory SegmentInfos.
2. Sends metadata diff of on disk segments with each copy event. This allows replicas that are multiple commit points behind
to catch up.
3. Update initial recovery in IndexShard to copy segments before lighting up as active.  This fixes bug where replicas could not be added
after primary.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Mar 22, 2022
1 parent 3b57a13 commit d51a440
Show file tree
Hide file tree
Showing 12 changed files with 297 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@

package org.opensearch.indices.replication;

import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexModule;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand All @@ -23,17 +26,21 @@ public class SegmentReplicationIT extends OpenSearchIntegTestCase {
private static final int SHARD_COUNT = 1;
private static final int REPLICA_COUNT = 1;

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true)
.build();
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true)
.build()
);
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
Expand All @@ -56,4 +63,63 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
}

public void testReplicationAfterForceMerge() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), initialDocCount)) {
waitForDocs(initialDocCount, indexer);
}
flush(INDEX_NAME);
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int totalDocs = initialDocCount + additionalDocCount;
try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), additionalDocCount)) {
waitForDocs(additionalDocCount, indexer);
}
// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
// This case tests that replicas preserve these files so the local store is not corrupt.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
}

public void testReplicaSetupAfterPrimaryIndexesDocs() throws Exception {
final String nodeA = internalCluster().startNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(INDEX_NAME);

// Index a doc to create the first set of segments. _s1.si
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();
// Flush segments to disk and create a new commit point (Primary: segments_3, _s1.si)
flushAndRefresh(INDEX_NAME);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);

// Index to create another segment
client().prepareIndex(INDEX_NAME).setId("2").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();

// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
// This case tests that we are still sending these older segments to replicas so the index on disk is not corrupt.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
refresh(INDEX_NAME);

final String nodeB = internalCluster().startNode();
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);
ensureGreen(INDEX_NAME);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
}
}

public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException {};
public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {}

public long getProcessedLocalCheckpoint() {
return 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BufferedChecksumIndexInput;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.ByteBuffersIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.BytesRef;
Expand Down Expand Up @@ -119,7 +115,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -341,21 +336,13 @@ public InternalEngine(EngineConfig engineConfig) {
}

@Override
public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException {
assert engineConfig.isReadOnly() == true : "Only read-only replicas should update Infos";
SegmentInfos infos = SegmentInfos.readCommit(this.store.directory(), toIndexInput(infosBytes), gen);
assert gen == infos.getGeneration();
public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {
assert engineConfig.isReadOnly() : "Only replicas should update Infos";
externalReaderManager.internalReaderManager.updateSegments(infos);
externalReaderManager.maybeRefresh();
localCheckpointTracker.markSeqNoAsProcessed(seqNo);
}

private ChecksumIndexInput toIndexInput(byte[] input) {
return new BufferedChecksumIndexInput(
new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(ByteBuffer.wrap(input))), "SegmentInfos")
);
}

private LocalCheckpointTracker createLocalCheckpointTracker(
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier
) throws IOException {
Expand Down
36 changes: 13 additions & 23 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@
import org.opensearch.indices.replication.copy.ReplicationCheckpoint;
import org.opensearch.indices.replication.copy.ReplicationFailedException;
import org.opensearch.indices.replication.copy.SegmentReplicationState;
import org.opensearch.indices.replication.copy.TrackShardResponse;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -1349,6 +1348,9 @@ public void rollTranslogGeneration() {
}

public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
if (indexSettings.isSegrepEnabled() && shardRouting.primary() == false) {
return;
}
verifyActive();
if (logger.isTraceEnabled()) {
logger.trace("force merge with {}", forceMerge);
Expand Down Expand Up @@ -1444,8 +1446,8 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
);
}

public void updateCurrentInfos(long gen, byte[] infosBytes, long seqNo) throws IOException {
getEngine().updateCurrentInfos(infosBytes, gen, seqNo);
public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {
getEngine().updateCurrentInfos(infos, seqNo);
}

/**
Expand Down Expand Up @@ -1952,6 +1954,9 @@ public void openEngineAndSkipTranslogRecovery() throws IOException {
// TODO: Segrep - fix initial recovery stages from ReplicationTarget.
if (indexSettings.isSegrepEnabled() == false) {
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
} else {
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
}
loadGlobalCheckpointToReplicationTracker();
innerOpenEngineAndTranslog(replicationTracker);
Expand Down Expand Up @@ -3011,25 +3016,13 @@ public void startRecovery(
try {
markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
if (indexSettings.isSegrepEnabled()) {
IndexShard indexShard = this;
segmentReplicationReplicaService.prepareForReplication(
markAsReplicating();
segmentReplicationReplicaService.startRecovery(
this,
recoveryState.getTargetNode(),
recoveryState.getSourceNode(),
new ActionListener<TrackShardResponse>() {
@Override
public void onResponse(TrackShardResponse unused) {
segRepListener.onReplicationDone(segRepState);
recoveryState.getIndex().setFileDetailsComplete();
finalizeRecovery();
postRecovery("Shard setup complete.");
}

@Override
public void onFailure(Exception e) {
segRepListener.onReplicationFailure(segRepState, new ReplicationFailedException(indexShard, e), true);
}
}
replicationSource,
segRepListener
);
} else {
peerRecoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
Expand Down Expand Up @@ -3669,22 +3662,20 @@ public synchronized void onNewCheckpoint(
return;
}
if (isReplicating()) {
logger.info("Ignore - shard is currently replicating to a checkpoint");
logger.debug("Ignore - shard is currently replicating to a checkpoint");
return;
}
try {
markAsReplicating();
final ReplicationCheckpoint checkpoint = request.getCheckpoint();
logger.trace("Received new checkpoint {}", checkpoint);
// TODO: segrep - these are the states set after we perform our initial store recovery.
segmentReplicationReplicaService.startReplication(
checkpoint,
this,
source,
new SegmentReplicationReplicaService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
markReplicationComplete();
logger.debug("Replication complete to {}", getLatestReplicationCheckpoint());
}

Expand All @@ -3694,7 +3685,6 @@ public void onReplicationFailure(
ReplicationFailedException e,
boolean sendShardFailure
) {
markReplicationComplete();
logger.error("Failure", e);
}
}
Expand Down
43 changes: 43 additions & 0 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,49 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
}
}

/**
* This method deletes every file in this store that is not contained in either the remote or local metadata snapshots.
* This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file.
* In this case files from both snapshots must be preserved.
* @param reason the reason for this cleanup operation logged for each deleted file
* @param remoteSnapshot The remote snapshot sent from primary shards.
* @param localSnapshot The local snapshot from in memory SegmentInfos.
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndVerify(String reason, MetadataSnapshot remoteSnapshot, MetadataSnapshot localSnapshot) throws IOException {
// fetch a snapshot from the latest on disk Segments_N file. This can be behind
// the passed in local in memory snapshot, so we want to ensure files it references are not removed.
final Store.MetadataSnapshot latestCommitPointMetadata = getMetadata((IndexCommit) null);
metadataLock.writeLock().lock();
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
for (String existingFile : directory.listAll()) {
if (Store.isAutogenerated(existingFile)
|| remoteSnapshot.contains(existingFile)
|| latestCommitPointMetadata.contains(existingFile)) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
}
try {
directory.deleteFile(reason, existingFile);
// FNF should not happen since we hold a write lock?
} catch (IOException ex) {
if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) {
// TODO do we need to also fail this if we can't delete the pending commit file?
// if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit
// point around?
throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex);
}
logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex);
// ignore, we don't really care, will get deleted later on
}
}
verifyAfterCleanup(remoteSnapshot, localSnapshot);
} finally {
metadataLock.writeLock().unlock();
}
}

// pkg private for testing
final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot targetMetadata) {
final RecoveryDiff recoveryDiff = targetMetadata.recoveryDiff(sourceMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ private ShardRoutingReplicationListener(final ShardRouting shardRouting, final l

@Override
public void onReplicationDone(final SegmentReplicationState state) {
logger.info("Shard setup complete, ready for segment copy.");
logger.trace("Shard setup complete, ready for segment copy.");
shardStateAction.shardStarted(shardRouting, primaryTerm, "after replication", SHARD_STATE_ACTION_LISTENER);
}

Expand Down
Loading

0 comments on commit d51a440

Please sign in to comment.