Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Change behaviour in replica recovery for remote translog enabled indices #4318

Merged
merged 12 commits into from
Sep 17, 2022
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253))
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402))
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))
- [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318))

### Deprecated

Expand Down Expand Up @@ -91,4 +92,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)


[Unreleased]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...HEAD
[2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...2.x
[2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...2.x
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unwanted

66 changes: 57 additions & 9 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -203,6 +203,7 @@
import java.util.stream.StreamSupport;

import static org.opensearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

/**
Expand Down Expand Up @@ -1703,13 +1704,8 @@ public void prepareForIndexRecovery() {
* @return a sequence number that an operation-based peer recovery can start with.
* This is the first operation after the local checkpoint of the safe commit if exists.
*/
public long recoverLocallyUpToGlobalCheckpoint() {
assert Thread.holdsLock(mutex) == false : "recover locally under mutex";
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
private long recoverLocallyUpToGlobalCheckpoint() {
validateLocalRecoveryState();
final Optional<SequenceNumbers.CommitInfo> safeCommit;
final long globalCheckpoint;
try {
Expand Down Expand Up @@ -1792,6 +1788,54 @@ public long recoverLocallyUpToGlobalCheckpoint() {
}
}

public long recoverLocallyAndFetchStartSeqNo(boolean localTranslog) {
if (localTranslog) {
return recoverLocallyUpToGlobalCheckpoint();
} else {
return recoverLocallyUptoLastCommit();
}
}
ashking94 marked this conversation as resolved.
Show resolved Hide resolved

/**
* The method figures out the sequence number basis the last commit.
*
* @return the starting sequence number from which the recovery should start.
*/
private long recoverLocallyUptoLastCommit() {
assert isRemoteTranslogEnabled() : "Remote translog store is not enabled";
long seqNo;
validateLocalRecoveryState();

try {
seqNo = Long.parseLong(store.readLastCommittedSegmentsInfo().getUserData().get(MAX_SEQ_NO));
} catch (org.apache.lucene.index.IndexNotFoundException e) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: will prefer import. Also add an assertion that this will be called on remote translog enabled indices

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to use static import for MAX_SEQ_NO? If so, made that change. Added the assertions.

logger.error("skip local recovery as no index commit found", e);
return UNASSIGNED_SEQ_NO;
} catch (Exception e) {
logger.error("skip local recovery as failed to find the safe commit", e);
return UNASSIGNED_SEQ_NO;
}

try {
maybeCheckIndex();
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
recoveryState.getTranslog().totalLocal(0);
} catch (Exception e) {
logger.error("check index failed during fetch seqNo", e);
return UNASSIGNED_SEQ_NO;
}
return seqNo;
}

private void validateLocalRecoveryState() {
assert Thread.holdsLock(mutex) == false : "recover locally under mutex";
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
}

public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) {
getEngine().translogManager().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo);
}
Expand Down Expand Up @@ -1998,7 +2042,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
private boolean assertSequenceNumbersInCommit() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
assert userData.containsKey(SequenceNumbers.MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number";
assert userData.containsKey(MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number";
assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid";
assert userData.get(Engine.HISTORY_UUID_KEY).equals(getHistoryUUID()) : "commit point history uuid ["
+ userData.get(Engine.HISTORY_UUID_KEY)
Expand Down Expand Up @@ -3275,6 +3319,10 @@ private boolean isRemoteStoreEnabled() {
return (remoteStore != null && shardRouting.primary());
}

public boolean isRemoteTranslogEnabled() {
return indexSettings() != null && indexSettings().isRemoteTranslogStoreEnabled();
}

ashking94 marked this conversation as resolved.
Show resolved Hide resolved
/**
* Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.LegacyESVersion;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -219,6 +219,12 @@ protected void reestablishRecovery(final StartRecoveryRequest request, final Str
threadPool.scheduleUnlessShuttingDown(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(recoveryId, request));
}

/**
* Initiates recovery of the replica. TODO - Need to revisit it with PRRL and later. @see
* <a href="https://github.com/opensearch-project/OpenSearch/issues/4502">github issue</a> on it.
* @param recoveryId recovery id
* @param preExistingRequest start recovery request
*/
private void doRecovery(final long recoveryId, final StartRecoveryRequest preExistingRequest) {
final String actionName;
final TransportRequest requestToSend;
Expand All @@ -238,10 +244,17 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
indexShard.prepareForIndexRecovery();
final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint();
boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!remoteTranslogEnabled);
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
: "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
startRequest = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);
startRequest = getStartRecoveryRequest(
logger,
clusterService.localNode(),
recoveryTarget,
startingSeqNo,
!remoteTranslogEnabled
);
requestToSend = startRequest;
actionName = PeerRecoverySourceService.Actions.START_RECOVERY;
} catch (final Exception e) {
Expand Down Expand Up @@ -270,44 +283,58 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
);
}

public static StartRecoveryRequest getStartRecoveryRequest(
Logger logger,
DiscoveryNode localNode,
RecoveryTarget recoveryTarget,
long startingSeqNo
) {
return getStartRecoveryRequest(logger, localNode, recoveryTarget, startingSeqNo, true);
}

/**
* Prepare the start recovery request.
*
* @param logger the logger
* @param localNode the local node of the recovery target
* @param recoveryTarget the target of the recovery
* @param startingSeqNo a sequence number that an operation-based peer recovery can start with.
* This is the first operation after the local checkpoint of the safe commit if exists.
* @param logger the logger
* @param localNode the local node of the recovery target
* @param recoveryTarget the target of the recovery
* @param startingSeqNo a sequence number that an operation-based peer recovery can start with.
* This is the first operation after the local checkpoint of the safe commit if exists.
* @param verifyTranslog should the recovery request validate translog consistency with snapshot store metadata.
* @return a start recovery request
*/
public static StartRecoveryRequest getStartRecoveryRequest(
Logger logger,
DiscoveryNode localNode,
RecoveryTarget recoveryTarget,
long startingSeqNo
long startingSeqNo,
boolean verifyTranslog
) {
final StartRecoveryRequest request;
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());

Store.MetadataSnapshot metadataSnapshot;
try {
metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
// Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene index.
try {
final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID);
assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint;
} catch (IOException | TranslogCorruptedException e) {
logger.warn(
new ParameterizedMessage(
"error while reading global checkpoint from translog, "
+ "resetting the starting sequence number from {} to unassigned and recovering as if there are none",
startingSeqNo
),
e
);
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
startingSeqNo = UNASSIGNED_SEQ_NO;
if (verifyTranslog) {
// Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene
// index.
try {
final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID);
assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint;
} catch (IOException | TranslogCorruptedException e) {
logger.warn(
new ParameterizedMessage(
"error while reading global checkpoint from translog, "
+ "resetting the starting sequence number from {} to unassigned and recovering as if there are none",
startingSeqNo
),
e
);
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
startingSeqNo = UNASSIGNED_SEQ_NO;
}
}
} catch (final org.apache.lucene.index.IndexNotFoundException e) {
// happens on an empty folder. no need to log
Expand Down
Loading