Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport to 2.x] [Segment Replication] - Update replicas to commit SegmentInfos instead of relying on segments_N from primary shards. #4450

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308))
- Use RemoteSegmentStoreDirectory instead of RemoteDirectory ([#4240](https://github.com/opensearch-project/OpenSearch/pull/4240))
- Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253))
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402))

### Deprecated

Expand Down Expand Up @@ -55,6 +56,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386))
- [Segment Replication] Fix NoSuchFileExceptions with segment replication when computing primary metadata snapshots ([#4366](https://github.com/opensearch-project/OpenSearch/pull/4366))
- [Segment Replication] Fix timeout issue by calculating time needed to process getSegmentFiles ([#4434](https://github.com/opensearch-project/OpenSearch/pull/4434))
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on segments_N from primary shards.

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class NRTReplicationEngine extends Engine implements LifecycleAware {
private final LocalCheckpointTracker localCheckpointTracker;
private final WriteOnlyTranslogManager translogManager;

private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED;

private static final int SI_COUNTER_INCREMENT = 10;

public NRTReplicationEngine(EngineConfig engineConfig) {
Expand Down Expand Up @@ -120,14 +122,16 @@ public TranslogManager translogManager() {

public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException {
// Update the current infos reference on the Engine's reader.
final long incomingGeneration = infos.getGeneration();
readerManager.updateSegments(infos);

// only update the persistedSeqNo and "lastCommitted" infos reference if the incoming segments have a higher
// generation. We can still refresh with incoming SegmentInfos that are not part of a commit point.
if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) {
this.lastCommittedSegmentInfos = infos;
// Commit and roll the xlog when we receive a different generation than what was last received.
// lower/higher gens are possible from a new primary that was just elected.
if (incomingGeneration != lastReceivedGen) {
commitSegmentInfos();
translogManager.rollTranslogGeneration();
}
lastReceivedGen = incomingGeneration;
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
}

Expand All @@ -141,20 +145,16 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
*
* @throws IOException - When there is an IO error committing the SegmentInfos.
*/
public void commitSegmentInfos() throws IOException {
// TODO: This method should wait for replication events to finalize.
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
*/
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
store.commitSegmentInfos(latestSegmentInfos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
private void commitSegmentInfos(SegmentInfos infos) throws IOException {
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
translogManager.syncTranslog();
}

protected void commitSegmentInfos() throws IOException {
commitSegmentInfos(getLatestSegmentInfos());
}

@Override
public String getHistoryUUID() {
return loadHistoryUUID(lastCommittedSegmentInfos.userData);
Expand Down Expand Up @@ -405,7 +405,16 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
: "Either the write lock must be held or the engine must be currently be failing itself";
try {
IOUtils.close(readerManager, translogManager.getTranslog(), store::decRef);
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
*/
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
commitSegmentInfos(latestSegmentInfos);
IOUtils.close(readerManager, translogManager, store::decRef);
} catch (Exception e) {
logger.warn("failed to close engine", e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
* @throws IOException - When Refresh fails with an IOException.
*/
public synchronized void updateSegments(SegmentInfos infos) throws IOException {
// roll over the currentInfo's generation, this ensures the on-disk gen
// is always increased.
infos.updateGeneration(currentInfos);
currentInfos = infos;
maybeRefresh();
}
Expand Down
28 changes: 4 additions & 24 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ public void updateShardState(
if (indexSettings.isSegRepEnabled()) {
// this Shard's engine was read only, we need to update its engine before restoring local history from xlog.
assert newRouting.primary() && currentRouting.primary() == false;
promoteNRTReplicaToPrimary();
resetEngineToGlobalCheckpoint();
}
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
ensurePeerRecoveryRetentionLeasesExist();
Expand Down Expand Up @@ -3571,7 +3571,9 @@ private void innerAcquireReplicaOperationPermit(
currentGlobalCheckpoint,
maxSeqNo
);
if (currentGlobalCheckpoint < maxSeqNo) {
// With Segment Replication enabled, we never want to reset a replica's engine unless
// it is promoted to primary.
if (currentGlobalCheckpoint < maxSeqNo && indexSettings.isSegRepEnabled() == false) {
resetEngineToGlobalCheckpoint();
} else {
getEngine().rollTranslogGeneration();
Expand Down Expand Up @@ -4132,26 +4134,4 @@ RetentionLeaseSyncer getRetentionLeaseSyncer() {
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
return getEngine().getSegmentInfosSnapshot();
}

/**
* With segment replication enabled - prepare the shard's engine to be promoted as the new primary.
*
* If this shard is currently using a replication engine, this method:
* 1. Invokes {@link NRTReplicationEngine#commitSegmentInfos()} to ensure the engine can be reopened as writeable from the latest refresh point.
* InternalEngine opens its IndexWriter from an on-disk commit point, but this replica may have recently synced from a primary's refresh point, meaning it has documents searchable in its in-memory SegmentInfos
* that are not part of a commit point. This ensures that those documents are made part of a commit and do not need to be reindexed after promotion.
* 2. Invokes resetEngineToGlobalCheckpoint - This call performs the engine swap, opening up as a writeable engine and replays any operations in the xlog. The operations indexed from xlog here will be
* any ack'd writes that were not copied to this replica before promotion.
*/
private void promoteNRTReplicaToPrimary() {
assert shardRouting.primary() && indexSettings.isSegRepEnabled();
getReplicationEngine().ifPresentOrElse(engine -> {
try {
engine.commitSegmentInfos();
resetEngineToGlobalCheckpoint();
} catch (IOException e) {
throw new EngineException(shardId, "Unable to update replica to writeable engine, failing shard", e);
}
}, () -> { throw new EngineException(shardId, "Expected replica engine to be of type NRTReplicationEngine"); });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ private void innerWriteFileChunk(StoreFileMetadata fileMetadata, long position,
+ temporaryFileName
+ "] in "
+ Arrays.toString(store.directory().listAll());
store.directory().sync(Collections.singleton(temporaryFileName));
// With Segment Replication, we will fsync after a full commit has been received.
if (store.indexSettings().isSegRepEnabled() == false) {
store.directory().sync(Collections.singleton(temporaryFileName));
}
IndexOutput remove = removeOpenIndexOutputs(name);
assert remove == null || remove == indexOutput; // remove maybe null if we got finished
}
Expand Down
Loading