Skip to content

Commit

Permalink
[Segment Replication] Compatibility check for differing lucene codec …
Browse files Browse the repository at this point in the history
…versions (#6730) (#6991)

This change aims to fail segment replications between the primary and replica if they are utilizing differing lucene codec versions. This is to avoid the current behavior of failing the replica shard in such situations.

(cherry picked from commit c334bbd)

Signed-off-by: Poojita Raj <poojiraj@amazon.com>
  • Loading branch information
opensearch-trigger-bot[bot] authored Apr 6, 2023
1 parent d95effb commit c7e3b02
Show file tree
Hide file tree
Showing 15 changed files with 232 additions and 52 deletions.
20 changes: 17 additions & 3 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,13 @@ public boolean isSystem() {
return indexSettings.getIndexMetadata().isSystem();
}

/**
* Returns the name of the default codec in codecService
*/
public String getDefaultCodecName() {
return codecService.codec(CodecService.DEFAULT_CODEC).getName();
}

/**
* USE THIS METHOD WITH CARE!
* Returns the primary term the index shard is supposed to be on. In case of primary promotion or when a replica learns about
Expand Down Expand Up @@ -1489,7 +1496,7 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
return null;
}
if (getEngineOrNull() == null) {
return new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId));
return new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId, getDefaultCodecName()));
}
// do not close the snapshot - caller will close it.
final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot();
Expand All @@ -1506,13 +1513,14 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
// getSegmentInfosSnapshot, so computing length from SegmentInfos can cause issues.
shardRouting.primary()
? store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum()
: store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes()
: store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes(),
getEngine().config().getCodec().getName()
)
);
} catch (IOException e) {
throw new OpenSearchException("Error Fetching SegmentInfos and latest checkpoint", e);
}
}).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId)));
}).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId, getDefaultCodecName())));
}

/**
Expand Down Expand Up @@ -1582,6 +1590,12 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
);
return false;
}
if (localCheckpoint.getCodec().equals(requestCheckpoint.getCodec()) == false) {
logger.trace(
() -> new ParameterizedMessage("Shard does not support the received lucene codec version {}", requestCheckpoint.getCodec())
);
return false;
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -147,6 +148,12 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
*/
CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException {
final CopyState copyState = getCachedCopyState(request.getCheckpoint());
if (copyState.getCheckpoint().getCodec().equals(request.getCheckpoint().getCodec()) == false) {
logger.trace("Requested unsupported codec version {}", request.getCheckpoint().getCodec());
throw new CancellableThreads.ExecutionCancelledException(
new ParameterizedMessage("Requested unsupported codec version {}", request.getCheckpoint().getCodec()).toString()
);
}
allocationIdToHandlers.compute(request.getTargetAllocationId(), (allocationId, segrepHandler) -> {
if (segrepHandler != null) {
logger.warn("Override handler for allocation id {}", request.getTargetAllocationId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public void messageReceived(final ForceSyncRequest request, TransportChannel cha
return;
}
startReplication(
ReplicationCheckpoint.empty(request.getShardId()),
ReplicationCheckpoint.empty(request.getShardId(), indexShard.getDefaultCodecName()),
indexShard,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.indices.replication.checkpoint;

import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand All @@ -30,37 +31,46 @@ public class ReplicationCheckpoint implements Writeable, Comparable<ReplicationC
private final long segmentsGen;
private final long segmentInfosVersion;
private final long length;
private final String codec;

public static ReplicationCheckpoint empty(ShardId shardId) {
return new ReplicationCheckpoint(shardId);
public static ReplicationCheckpoint empty(ShardId shardId, String codec) {
return new ReplicationCheckpoint(shardId, codec);
}

private ReplicationCheckpoint(ShardId shardId) {
private ReplicationCheckpoint(ShardId shardId, String codec) {
this.shardId = shardId;
primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
segmentsGen = SequenceNumbers.NO_OPS_PERFORMED;
segmentInfosVersion = SequenceNumbers.NO_OPS_PERFORMED;
length = 0L;
this.codec = codec;
}

public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion) {
this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L);
public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, String codec) {
this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec);
}

public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, long length) {
public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, long length, String codec) {
this.shardId = shardId;
this.primaryTerm = primaryTerm;
this.segmentsGen = segmentsGen;
this.segmentInfosVersion = segmentInfosVersion;
this.length = length;
this.codec = codec;
}

public ReplicationCheckpoint(StreamInput in) throws IOException {
shardId = new ShardId(in);
primaryTerm = in.readLong();
segmentsGen = in.readLong();
segmentInfosVersion = in.readLong();
length = in.readLong();
if (in.getVersion().onOrAfter(Version.V_2_7_0)) {
length = in.readLong();
codec = in.readString();
} else {
length = 0L;
codec = null;
}
}

/**
Expand Down Expand Up @@ -102,13 +112,25 @@ public long getLength() {
return length;
}

/**
* Latest supported codec version
*
* @return the codec name
*/
public String getCodec() {
return codec;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
out.writeLong(primaryTerm);
out.writeLong(segmentsGen);
out.writeLong(segmentInfosVersion);
out.writeLong(length);
if (out.getVersion().onOrAfter(Version.V_2_7_0)) {
out.writeLong(length);
out.writeString(codec);
}
}

@Override
Expand All @@ -124,7 +146,8 @@ public boolean equals(Object o) {
return primaryTerm == that.primaryTerm
&& segmentsGen == that.segmentsGen
&& segmentInfosVersion == that.segmentInfosVersion
&& Objects.equals(shardId, that.shardId);
&& Objects.equals(shardId, that.shardId)
&& codec.equals(that.codec);
}

@Override
Expand Down Expand Up @@ -155,6 +178,8 @@ public String toString() {
+ segmentInfosVersion
+ ", size="
+ length
+ ", codec="
+ codec
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.gateway;

import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.CorruptIndexException;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
Expand Down Expand Up @@ -61,6 +62,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.set.Sets;
import org.opensearch.env.ShardLockObtainFailedException;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.repositories.IndexId;
Expand Down Expand Up @@ -220,9 +222,9 @@ public void testPreferReplicaWithHighestPrimaryTerm() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 22, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 101, 1, Codec.getDefault().getName()));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 22, 120, 2, Codec.getDefault().getName()));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 120, 2, Codec.getDefault().getName()));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -253,9 +255,9 @@ public void testPreferReplicaWithNullReplicationCheckpoint() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 101, 1));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 101, 1, Codec.getDefault().getName()));
testAllocator.addData(node2, allocId2, false);
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 40, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 40, 120, 2, Codec.getDefault().getName()));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -319,9 +321,9 @@ public void testPreferReplicaWithHighestSegmentInfoVersion() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 3));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 101, 1, Codec.getDefault().getName()));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 3, Codec.getDefault().getName()));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 120, 2, Codec.getDefault().getName()));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -351,9 +353,9 @@ public void testOutOfSyncHighestRepCheckpointIsIgnored() {
allocId1,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 120, 2));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 101, 1, Codec.getDefault().getName()));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 2, Codec.getDefault().getName()));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 120, 2, Codec.getDefault().getName()));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -384,9 +386,9 @@ public void testPreferAllocatingPreviousPrimaryWithLowerRepCheckpoint() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, true, new ReplicationCheckpoint(shardId, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 120, 2));
testAllocator.addData(node1, allocId1, true, new ReplicationCheckpoint(shardId, 10, 101, 1, Codec.getDefault().getName()));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 2, Codec.getDefault().getName()));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 120, 2, Codec.getDefault().getName()));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -809,11 +811,23 @@ public TestAllocator addData(
}

public TestAllocator addData(DiscoveryNode node, String allocationId, boolean primary) {
return addData(node, allocationId, primary, ReplicationCheckpoint.empty(shardId), null);
return addData(
node,
allocationId,
primary,
ReplicationCheckpoint.empty(shardId, new CodecService(null, null).codec("default").getName()),
null
);
}

public TestAllocator addData(DiscoveryNode node, String allocationId, boolean primary, @Nullable Exception storeException) {
return addData(node, allocationId, primary, ReplicationCheckpoint.empty(shardId), storeException);
return addData(
node,
allocationId,
primary,
ReplicationCheckpoint.empty(shardId, new CodecService(null, null).codec("default").getName()),
storeException
);
}

public TestAllocator addData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.index.seqno;

import org.apache.lucene.codecs.Codec;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.cluster.routing.AllocationId;
Expand Down Expand Up @@ -1800,9 +1801,30 @@ public void testSegmentReplicationCheckpointTracking() {
.filter(id -> tracker.shardAllocationId.equals(id) == false)
.collect(Collectors.toSet());

final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 1, 1, 1L);
final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 2, 2, 50L);
final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 2, 3, 100L);
final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
1,
1,
1L,
Codec.getDefault().getName()
);
final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
2,
2,
50L,
Codec.getDefault().getName()
);
final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
2,
3,
100L,
Codec.getDefault().getName()
);

tracker.setLatestReplicationCheckpoint(initialCheckpoint);
tracker.setLatestReplicationCheckpoint(secondCheckpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.shard;

import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.SegmentInfos;
import org.junit.Assert;
import org.opensearch.ExceptionsHelper;
Expand Down Expand Up @@ -306,7 +307,7 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
assertEquals(false, primaryShard.getReplicationTracker().isPrimaryMode());
assertEquals(true, primaryShard.routingEntry().primary());

spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L), spyShard);
spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, Codec.getDefault().getName()), spyShard);

// Verify that checkpoint is not processed as shard routing is primary.
verify(spy, times(0)).startReplication(any(), any(), any());
Expand Down Expand Up @@ -1020,7 +1021,7 @@ private void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCoun

private void resolveCheckpointInfoResponseListener(ActionListener<CheckpointInfoResponse> listener, IndexShard primary) {
try {
final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId), primary);
final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId, primary.getDefaultCodecName()), primary);
listener.onResponse(
new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes())
);
Expand All @@ -1034,7 +1035,7 @@ private void startReplicationAndAssertCancellation(IndexShard replica, SegmentRe
throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
final SegmentReplicationTarget target = targetService.startReplication(
ReplicationCheckpoint.empty(replica.shardId),
ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()),
replica,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
Expand Down
Loading

0 comments on commit c7e3b02

Please sign in to comment.