Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix test testDropPrimaryDuringReplication and clean up ReplicationCheckpoint validation #8889

Merged
merged 8 commits into from
Aug 3, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
Expand All @@ -60,6 +62,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.search.SearchService;
import org.opensearch.search.builder.PointInTimeBuilder;
Expand Down Expand Up @@ -983,8 +986,11 @@ public void testScrollCreatedOnReplica() throws Exception {
)
);
final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
final SegmentInfos segmentInfos = replicaShard.getLatestSegmentInfosAndCheckpoint().v1().get();
final Collection<String> snapshottedSegments = segmentInfos.files(false);
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> tuple = replicaShard.getLatestSegmentInfosAndCheckpoint();
final Collection<String> snapshottedSegments;
try (final GatedCloseable<SegmentInfos> closeable = tuple.v1()) {
snapshottedSegments = closeable.get().files(false);
}
// opens a scrolled query before a flush is called.
// this is for testing scroll segment consistency between refresh and flush
SearchResponse searchResponse = client(replica).prepareSearch()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -445,6 +446,20 @@ protected SegmentInfos getLatestSegmentInfos() {
return readerManager.getSegmentInfos();
}

@Override
public synchronized GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
// get reference to latest infos
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
// incref all files
try {
final Collection<String> files = latestSegmentInfos.files(false);
store.incRefFileDeleter(files);
return new GatedCloseable<>(latestSegmentInfos, () -> store.decRefFileDeleter(files));
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
}
}

protected LocalCheckpointTracker getLocalCheckpointTracker() {
return localCheckpointTracker;
}
Expand Down
69 changes: 47 additions & 22 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -677,8 +677,19 @@ public void updateShardState(
// this Shard's engine was read only, we need to update its engine before restoring local history from xlog.
assert newRouting.primary() && currentRouting.primary() == false;
resetEngineToGlobalCheckpoint();
// It is possible an engine can open with a SegmentInfos on a higher gen but the reader does not refresh to
// trigger our refresh listener.
// Force update the checkpoint post engine reset.
updateReplicationCheckpoint();
}

mch2 marked this conversation as resolved.
Show resolved Hide resolved
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
if (indexSettings.isSegRepEnabled()) {
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
// force publish a checkpoint once in primary mode so that replicas not caught up to previous primary
// are brought up to date.
checkpointPublisher.publish(this, getLatestReplicationCheckpoint());
mch2 marked this conversation as resolved.
Show resolved Hide resolved
}

ensurePeerRecoveryRetentionLeasesExist();
/*
* If this shard was serving as a replica shard when another shard was promoted to primary then
Expand Down Expand Up @@ -1551,15 +1562,7 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
* @return EMPTY checkpoint before the engine is opened and null for non-segrep enabled indices
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> infosAndCheckpoint = getLatestSegmentInfosAndCheckpoint();
if (infosAndCheckpoint == null) {
return null;
}
try (final GatedCloseable<SegmentInfos> ignored = infosAndCheckpoint.v1()) {
return infosAndCheckpoint.v2();
} catch (IOException e) {
throw new OpenSearchException("Error Closing SegmentInfos Snapshot", e);
}
return replicationTracker.getLatestReplicationCheckpoint();
}

/**
Expand All @@ -1573,13 +1576,11 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
*
*/
public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegmentInfosAndCheckpoint() {
if (indexSettings.isSegRepEnabled() == false) {
return null;
}
assert indexSettings.isSegRepEnabled();

Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> nullSegmentInfosEmptyCheckpoint = new Tuple<>(
new GatedCloseable<>(null, () -> {}),
ReplicationCheckpoint.empty(shardId, getDefaultCodecName())
getLatestReplicationCheckpoint()
);

if (getEngineOrNull() == null) {
Expand All @@ -1598,11 +1599,7 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
segmentInfos.getVersion(),
// TODO: Update replicas to compute length from SegmentInfos. Replicas do not yet incref segments with
// getSegmentInfosSnapshot, so computing length from SegmentInfos can cause issues.
shardRouting.primary()
? store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum()
: store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes(),
store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum(),
getEngine().config().getCodec().getName()
)
);
Expand Down Expand Up @@ -1858,10 +1855,6 @@ public void resetToWriteableEngine() throws IOException, InterruptedException, T
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); });
}

public void onCheckpointPublished(ReplicationCheckpoint checkpoint) {
replicationTracker.setLatestReplicationCheckpoint(checkpoint);
}

/**
* Wrapper for a non-closing reader
*
Expand Down Expand Up @@ -2342,6 +2335,11 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
final Engine newEngine = engineFactory.newReadWriteEngine(config);
onNewEngine(newEngine);
currentEngineReference.set(newEngine);

if (indexSettings.isSegRepEnabled()) {
// set initial replication checkpoints into tracker.
updateReplicationCheckpoint();
}
// We set active because we are now writing operations to the engine; this way,
// we can flush if we go idle after some time and become inactive.
active.set(true);
Expand Down Expand Up @@ -3667,6 +3665,9 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro

internalRefreshListener.clear();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (indexSettings.isSegRepEnabled()) {
internalRefreshListener.add(new ReplicationCheckpointUpdater());
mch2 marked this conversation as resolved.
Show resolved Hide resolved
}
if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}
Expand Down Expand Up @@ -4471,6 +4472,30 @@ public void afterRefresh(boolean didRefresh) throws IOException {
}
}

/**
* Refresh listener to update the Shard's ReplicationCheckpoint post refresh.
*/
private class ReplicationCheckpointUpdater implements ReferenceManager.RefreshListener {
@Override
public void beforeRefresh() throws IOException {}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh) {
updateReplicationCheckpoint();
}
}
}

private void updateReplicationCheckpoint() {
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> tuple = getLatestSegmentInfosAndCheckpoint();
try (final GatedCloseable<SegmentInfos> ignored = tuple.v1()) {
replicationTracker.setLatestReplicationCheckpoint(tuple.v2());
} catch (IOException e) {
throw new OpenSearchException("Error Closing SegmentInfos Snapshot", e);
}
}

private EngineConfig.TombstoneDocSupplier tombstoneDocSupplier() {
final RootObjectMapper.Builder noopRootMapper = new RootObjectMapper.Builder("__noop");
final DocumentMapper noopDocumentMapper = mapperService != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ private synchronized boolean syncSegments() {
return true;
}
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();
indexShard.onCheckpointPublished(checkpoint);
beforeSegmentsSync();
long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs();
long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo();
Expand All @@ -207,6 +206,10 @@ private synchronized boolean syncSegments() {

try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
assert segmentInfos.getGeneration() == checkpoint.getSegmentsGen() : "SegmentInfos generation: "
+ segmentInfos.getGeneration()
+ " does not match metadata generation: "
+ checkpoint.getSegmentsGen();
// Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can
// move.
long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,14 @@ public class SegmentReplicationTarget extends ReplicationTarget {

public final static String REPLICATION_PREFIX = "replication.";

public SegmentReplicationTarget(IndexShard indexShard, SegmentReplicationSource source, ReplicationListener listener) {
public SegmentReplicationTarget(
IndexShard indexShard,
ReplicationCheckpoint checkpoint,
SegmentReplicationSource source,
ReplicationListener listener
) {
super("replication_target", indexShard, new ReplicationLuceneIndex(), listener);
this.checkpoint = indexShard.getLatestReplicationCheckpoint();
this.checkpoint = checkpoint;
this.source = source;
this.state = new SegmentReplicationState(
indexShard.routingEntry(),
Expand Down Expand Up @@ -90,12 +95,19 @@ public SegmentReplicationState state() {
}

public SegmentReplicationTarget retryCopy() {
return new SegmentReplicationTarget(indexShard, source, listener);
return new SegmentReplicationTarget(indexShard, checkpoint, source, listener);
}

@Override
public String description() {
return String.format(Locale.ROOT, "Id:[%d] Shard:[%s] Source:[%s]", getId(), shardId(), source.getDescription());
return String.format(
Locale.ROOT,
"Id:[%d] Checkpoint [%s] Shard:[%s] Source:[%s]",
getId(),
getCheckpoint(),
shardId(),
source.getDescription()
);
}

@Override
Expand Down
Loading