Skip to content

Commit

Permalink
[Segment Replication] Ensure replica's store always contains the prev…
Browse files Browse the repository at this point in the history
…ious commit point. (#2551)

* Ensure replica's store always contains the previous commit point.

This change:
1. Updates the cleanup and validation steps after a replication event occurs to prevent
deleting files still referenced by both the on disk segments_N file and the in memory SegmentInfos.
2. Sends metadata diff of on disk segments with each copy event. This allows replicas that are multiple commit points behind
to catch up.
3. Update initial recovery in IndexShard to copy segments before lighting up as active.  This fixes bug where replicas could not be added
after primary.

Signed-off-by: Marc Handalian <handalm@amazon.com>

- Updated TrackShardRequestHandler to send error case back to replicas.
- Renamed additionalFiles to pendingDeleteFiles in TransportCheckpointInfoResponse.
- Refactored Store.cleanupAndVerify methods to remove duplication.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 authored Mar 25, 2022
1 parent e569e6f commit 9bcee79
Show file tree
Hide file tree
Showing 12 changed files with 324 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@

package org.opensearch.indices.replication;

import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexModule;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand All @@ -23,17 +26,21 @@ public class SegmentReplicationIT extends OpenSearchIntegTestCase {
private static final int SHARD_COUNT = 1;
private static final int REPLICA_COUNT = 1;

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true)
.build();
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true)
.build()
);
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
Expand All @@ -56,4 +63,64 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
}

public void testReplicationAfterForceMerge() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), initialDocCount)) {
waitForDocs(initialDocCount, indexer);
}
flush(INDEX_NAME);
// wait a short amount of time to give replication a chance to complete.
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int totalDocs = initialDocCount + additionalDocCount;
try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), additionalDocCount)) {
waitForDocs(additionalDocCount, indexer);
}
// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
// This case tests that replicas preserve these files so the local store is not corrupt.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
}

public void testReplicaSetupAfterPrimaryIndexesDocs() throws Exception {
final String nodeA = internalCluster().startNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(INDEX_NAME);

// Index a doc to create the first set of segments. _s1.si
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();
// Flush segments to disk and create a new commit point (Primary: segments_3, _s1.si)
flushAndRefresh(INDEX_NAME);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);

// Index to create another segment
client().prepareIndex(INDEX_NAME).setId("2").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();

// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
// This case tests that we are still sending these older segments to replicas so the index on disk is not corrupt.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
refresh(INDEX_NAME);

final String nodeB = internalCluster().startNode();
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);
ensureGreen(INDEX_NAME);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
}
}

public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException {};
public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {}

public long getProcessedLocalCheckpoint() {
return 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BufferedChecksumIndexInput;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.ByteBuffersIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.BytesRef;
Expand Down Expand Up @@ -119,7 +115,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -341,21 +336,13 @@ public InternalEngine(EngineConfig engineConfig) {
}

@Override
public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException {
assert engineConfig.isReadOnly() == true : "Only read-only replicas should update Infos";
SegmentInfos infos = SegmentInfos.readCommit(this.store.directory(), toIndexInput(infosBytes), gen);
assert gen == infos.getGeneration();
public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {
assert engineConfig.isReadOnly() : "Only replicas should update Infos";
externalReaderManager.internalReaderManager.updateSegments(infos);
externalReaderManager.maybeRefresh();
localCheckpointTracker.markSeqNoAsProcessed(seqNo);
}

private ChecksumIndexInput toIndexInput(byte[] input) {
return new BufferedChecksumIndexInput(
new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(ByteBuffer.wrap(input))), "SegmentInfos")
);
}

private LocalCheckpointTracker createLocalCheckpointTracker(
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier
) throws IOException {
Expand Down
39 changes: 15 additions & 24 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@
import org.opensearch.indices.replication.copy.ReplicationCheckpoint;
import org.opensearch.indices.replication.copy.ReplicationFailedException;
import org.opensearch.indices.replication.copy.SegmentReplicationState;
import org.opensearch.indices.replication.copy.TrackShardResponse;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -1349,6 +1348,10 @@ public void rollTranslogGeneration() {
}

public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
if (indexSettings.isSegrepEnabled() && shardRouting.primary() == false) {
// With segment replication enabled, replicas do not perform this operation.
return;
}
verifyActive();
if (logger.isTraceEnabled()) {
logger.trace("force merge with {}", forceMerge);
Expand Down Expand Up @@ -1444,8 +1447,8 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
);
}

public void updateCurrentInfos(long gen, byte[] infosBytes, long seqNo) throws IOException {
getEngine().updateCurrentInfos(infosBytes, gen, seqNo);
public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {
getEngine().updateCurrentInfos(infos, seqNo);
}

/**
Expand Down Expand Up @@ -1952,6 +1955,9 @@ public void openEngineAndSkipTranslogRecovery() throws IOException {
// TODO: Segrep - fix initial recovery stages from ReplicationTarget.
if (indexSettings.isSegrepEnabled() == false) {
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
} else {
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
}
loadGlobalCheckpointToReplicationTracker();
innerOpenEngineAndTranslog(replicationTracker);
Expand Down Expand Up @@ -3011,25 +3017,14 @@ public void startRecovery(
try {
markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
if (indexSettings.isSegrepEnabled()) {
IndexShard indexShard = this;
segmentReplicationReplicaService.prepareForReplication(
// Start a "Recovery" using segment replication. This ensures the shard is tracked by the primary
// and started with the latest set of segments.
segmentReplicationReplicaService.startRecovery(
this,
recoveryState.getTargetNode(),
recoveryState.getSourceNode(),
new ActionListener<TrackShardResponse>() {
@Override
public void onResponse(TrackShardResponse unused) {
segRepListener.onReplicationDone(segRepState);
recoveryState.getIndex().setFileDetailsComplete();
finalizeRecovery();
postRecovery("Shard setup complete.");
}

@Override
public void onFailure(Exception e) {
segRepListener.onReplicationFailure(segRepState, new ReplicationFailedException(indexShard, e), true);
}
}
replicationSource,
segRepListener
);
} else {
peerRecoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
Expand Down Expand Up @@ -3669,22 +3664,19 @@ public synchronized void onNewCheckpoint(
return;
}
if (isReplicating()) {
logger.info("Ignore - shard is currently replicating to a checkpoint");
logger.debug("Ignore - shard is currently replicating to a checkpoint");
return;
}
try {
markAsReplicating();
final ReplicationCheckpoint checkpoint = request.getCheckpoint();
logger.trace("Received new checkpoint {}", checkpoint);
// TODO: segrep - these are the states set after we perform our initial store recovery.
segmentReplicationReplicaService.startReplication(
checkpoint,
this,
source,
new SegmentReplicationReplicaService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
markReplicationComplete();
logger.debug("Replication complete to {}", getLatestReplicationCheckpoint());
}

Expand All @@ -3694,7 +3686,6 @@ public void onReplicationFailure(
ReplicationFailedException e,
boolean sendShardFailure
) {
markReplicationComplete();
logger.error("Failure", e);
}
}
Expand Down
71 changes: 51 additions & 20 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.Version;
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.Nullable;
import org.opensearch.common.UUIDs;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.Streams;
Expand Down Expand Up @@ -672,26 +673,7 @@ private static void failIfCorrupted(Directory directory) throws IOException {
public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) throws IOException {
metadataLock.writeLock().lock();
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
for (String existingFile : directory.listAll()) {
if (Store.isAutogenerated(existingFile) || sourceMetadata.contains(existingFile)) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
}
try {
directory.deleteFile(reason, existingFile);
// FNF should not happen since we hold a write lock?
} catch (IOException ex) {
if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) {
// TODO do we need to also fail this if we can't delete the pending commit file?
// if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit
// point around?
throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex);
}
logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex);
// ignore, we don't really care, will get deleted later on
}
}
cleanupFiles(reason, sourceMetadata, null);
directory.syncMetaData();
final Store.MetadataSnapshot metadataOrEmpty = getMetadata((IndexCommit) null);
verifyAfterCleanup(sourceMetadata, metadataOrEmpty);
Expand All @@ -700,6 +682,55 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
}
}

/**
* This method deletes every file in this store that is not contained in either the remote or local metadata snapshots.
* This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file.
* In this case files from both snapshots must be preserved.
* @param reason the reason for this cleanup operation logged for each deleted file
* @param remoteSnapshot The remote snapshot sent from primary shards.
* @param localSnapshot The local snapshot from in memory SegmentInfos.
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndVerify(String reason, MetadataSnapshot remoteSnapshot, MetadataSnapshot localSnapshot) throws IOException {
// fetch a snapshot from the latest on disk Segments_N file. This can be behind
// the passed in local in memory snapshot, so we want to ensure files it references are not removed.
metadataLock.writeLock().lock();
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
final Store.MetadataSnapshot latestCommitPointMetadata = getMetadata((IndexCommit) null);
cleanupFiles(reason, remoteSnapshot, latestCommitPointMetadata);
verifyAfterCleanup(remoteSnapshot, localSnapshot);
} finally {
metadataLock.writeLock().unlock();
}
}

private void cleanupFiles(String reason, MetadataSnapshot remoteSnapshot, @Nullable MetadataSnapshot additionalSnapshot)
throws IOException {
assert metadataLock.isWriteLockedByCurrentThread();
for (String existingFile : directory.listAll()) {
if (Store.isAutogenerated(existingFile)
|| remoteSnapshot.contains(existingFile)
|| (additionalSnapshot != null && additionalSnapshot.contains(existingFile))) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
}
try {
directory.deleteFile(reason, existingFile);
// FNF should not happen since we hold a write lock?
} catch (IOException ex) {
if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) {
// TODO do we need to also fail this if we can't delete the pending commit file?
// if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit
// point around?
throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex);
}
logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex);
// ignore, we don't really care, will get deleted later on
}
}
}

// pkg private for testing
final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot targetMetadata) {
final RecoveryDiff recoveryDiff = targetMetadata.recoveryDiff(sourceMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ private ShardRoutingReplicationListener(final ShardRouting shardRouting, final l

@Override
public void onReplicationDone(final SegmentReplicationState state) {
logger.info("Shard setup complete, ready for segment copy.");
logger.trace("Shard setup complete, ready for segment copy.");
shardStateAction.shardStarted(shardRouting, primaryTerm, "after replication", SHARD_STATE_ACTION_LISTENER);
}

Expand Down
Loading

0 comments on commit 9bcee79

Please sign in to comment.