Skip to content

Commit

Permalink
codec service usage change
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <poojiraj@amazon.com>
  • Loading branch information
Poojita-Raj committed Mar 28, 2023
1 parent e41e757 commit d5d63b2
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,10 @@ public boolean isSystem() {
return indexSettings.getIndexMetadata().isSystem();
}

public CodecService codecService() {
return codecService;
}

/**
* USE THIS METHOD WITH CARE!
* Returns the primary term the index shard is supposed to be on. In case of primary promotion or when a replica learns about
Expand Down Expand Up @@ -1484,7 +1488,7 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
return null;
}
if (getEngineOrNull() == null) {
return new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId));
return new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId, codecService));
}
// do not close the snapshot - caller will close it.
final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot();
Expand All @@ -1508,7 +1512,7 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
} catch (IOException e) {
throw new OpenSearchException("Error Fetching SegmentInfos and latest checkpoint", e);
}
}).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId)));
}).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId, codecService)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public void messageReceived(final ForceSyncRequest request, TransportChannel cha
return;
}
startReplication(
ReplicationCheckpoint.empty(request.getShardId()),
ReplicationCheckpoint.empty(request.getShardId(), indexShard.codecService()),
indexShard,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,12 @@ public class ReplicationCheckpoint implements Writeable, Comparable<ReplicationC
private final long segmentInfosVersion;
private final long length;
private final String latestSupportedCodec;
private CodecService codecService;

public static ReplicationCheckpoint empty(ShardId shardId) {
return new ReplicationCheckpoint(shardId);
public static ReplicationCheckpoint empty(ShardId shardId, CodecService codecService) {
return new ReplicationCheckpoint(shardId, codecService);
}

private ReplicationCheckpoint(ShardId shardId) {
codecService = new CodecService(null, null);
private ReplicationCheckpoint(ShardId shardId, CodecService codecService) {
this.shardId = shardId;
primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
segmentsGen = SequenceNumbers.NO_OPS_PERFORMED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.set.Sets;
import org.opensearch.env.ShardLockObtainFailedException;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.repositories.IndexId;
Expand Down Expand Up @@ -810,11 +811,11 @@ public TestAllocator addData(
}

public TestAllocator addData(DiscoveryNode node, String allocationId, boolean primary) {
return addData(node, allocationId, primary, ReplicationCheckpoint.empty(shardId), null);
return addData(node, allocationId, primary, ReplicationCheckpoint.empty(shardId, new CodecService(null, null)), null);
}

public TestAllocator addData(DiscoveryNode node, String allocationId, boolean primary, @Nullable Exception storeException) {
return addData(node, allocationId, primary, ReplicationCheckpoint.empty(shardId), storeException);
return addData(node, allocationId, primary, ReplicationCheckpoint.empty(shardId, new CodecService(null, null)), storeException);
}

public TestAllocator addData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,7 @@ private void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCoun

private void resolveCheckpointInfoResponseListener(ActionListener<CheckpointInfoResponse> listener, IndexShard primary) {
try {
final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId), primary);
final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId, primary.codecService()), primary);
listener.onResponse(
new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes())
);
Expand All @@ -1035,7 +1035,7 @@ private void startReplicationAndAssertCancellation(IndexShard replica, SegmentRe
throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
final SegmentReplicationTarget target = targetService.startReplication(
ReplicationCheckpoint.empty(replica.shardId),
ReplicationCheckpoint.empty(replica.shardId, replica.codecService()),
replica,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ public class CopyStateTests extends IndexShardTestCase {

public void testCopyStateCreation() throws IOException {
final IndexShard mockIndexShard = createMockIndexShard();
CopyState copyState = new CopyState(ReplicationCheckpoint.empty(mockIndexShard.shardId()), mockIndexShard);
CopyState copyState = new CopyState(
ReplicationCheckpoint.empty(mockIndexShard.shardId(), mockIndexShard.codecService()),
mockIndexShard
);
ReplicationCheckpoint checkpoint = copyState.getCheckpoint();
assertEquals(TEST_SHARD_ID, checkpoint.getShardId());
// version was never set so this should be zero
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1299,7 +1299,10 @@ public void getCheckpointMetadata(
ActionListener<CheckpointInfoResponse> listener
) {
try {
final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primaryShard.shardId), primaryShard);
final CopyState copyState = new CopyState(
ReplicationCheckpoint.empty(primaryShard.shardId, primaryShard.codecService()),
primaryShard
);
listener.onResponse(
new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes())
);
Expand Down Expand Up @@ -1353,7 +1356,7 @@ public final List<SegmentReplicationTarget> replicateSegments(IndexShard primary
for (IndexShard replica : replicaShards) {
final SegmentReplicationTargetService targetService = prepareForReplication(primaryShard, replica);
final SegmentReplicationTarget target = targetService.startReplication(
ReplicationCheckpoint.empty(replica.shardId),
ReplicationCheckpoint.empty(replica.shardId, replica.codecService()),
replica,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
Expand Down

0 comments on commit d5d63b2

Please sign in to comment.