Skip to content

Commit

Permalink
Fix flaky test SegmentReplicationRemoteStoreIT.testPressureServiceSta…
Browse files Browse the repository at this point in the history
…ts (#8827)

* Fix ReplicationTracker to not include unavailable former primary shards when computing replication stats.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix relocation IT relying on stats to determine if segrep has occured. The API should still show a result for the replica even if it has not sync'd.

Signed-off-by: Marc Handalian <handalm@amazon.com>

---------

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 authored Jul 24, 2023
1 parent 077e12c commit a3baa68
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -875,69 +875,84 @@ public void testPressureServiceStats() throws Exception {
waitForDocs(initialDocCount, indexer);
refresh(INDEX_NAME);

// get shard references.
final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
final IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);
logger.info("Replica aid {}", replicaShard.routingEntry().allocationId());
logger.info("former primary aid {}", primaryShard.routingEntry().allocationId());

// fetch pressure stats from the Primary's Node.
SegmentReplicationPressureService pressureService = internalCluster().getInstance(
SegmentReplicationPressureService.class,
primaryNode
);

final Map<ShardId, SegmentReplicationPerGroupStats> shardStats = pressureService.nodeStats().getShardStats();
assertEquals(1, shardStats.size());
final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
IndexShard replica = getIndexShard(replicaNode, INDEX_NAME);
SegmentReplicationPerGroupStats groupStats = shardStats.get(primaryShard.shardId());
Set<SegmentReplicationShardStats> replicaStats = groupStats.getReplicaStats();
assertEquals(1, replicaStats.size());

// assert replica node returns nothing.
// Fetch pressure stats from the Replica's Node we will assert replica node returns nothing until it is promoted.
SegmentReplicationPressureService replicaNode_service = internalCluster().getInstance(
SegmentReplicationPressureService.class,
replicaNode
);

final Map<ShardId, SegmentReplicationPerGroupStats> shardStats = pressureService.nodeStats().getShardStats();
assertEquals("We should have stats returned for the replication group", 1, shardStats.size());

SegmentReplicationPerGroupStats groupStats = shardStats.get(primaryShard.shardId());
Set<SegmentReplicationShardStats> replicaStats = groupStats.getReplicaStats();
assertAllocationIdsInReplicaShardStats(Set.of(replicaShard.routingEntry().allocationId().getId()), replicaStats);

assertTrue(replicaNode_service.nodeStats().getShardStats().isEmpty());

// drop the primary, this won't hand off SR state.
// drop the primary, this won't hand off pressure stats between old/new primary.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
ensureYellowAndNoInitializingShards(INDEX_NAME);
replicaNode_service = internalCluster().getInstance(SegmentReplicationPressureService.class, replicaNode);
replica = getIndexShard(replicaNode, INDEX_NAME);
assertTrue("replica should be promoted as a primary", replica.routingEntry().primary());
assertEquals(1, replicaNode_service.nodeStats().getShardStats().size());
// we don't have a replica assigned yet, so this should be 0.
assertEquals(0, replicaNode_service.nodeStats().getShardStats().get(primaryShard.shardId()).getReplicaStats().size());

assertTrue("replica should be promoted as a primary", replicaShard.routingEntry().primary());
assertEquals(
"We should have stats returned for the replication group",
1,
replicaNode_service.nodeStats().getShardStats().size()
);
// after the primary is dropped and replica is promoted we won't have a replica assigned yet, so stats per replica should return
// empty.
replicaStats = replicaNode_service.nodeStats().getShardStats().get(primaryShard.shardId()).getReplicaStats();
assertTrue(replicaStats.isEmpty());

// start another replica.
String replicaNode_2 = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
String docId = String.valueOf(initialDocCount + 1);
client().prepareIndex(INDEX_NAME).setId(docId).setSource("foo", "bar").get();
refresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount + 1, replicaNode_2);
final IndexShard secondReplicaShard = getIndexShard(replicaNode_2, INDEX_NAME);
final String second_replica_aid = secondReplicaShard.routingEntry().allocationId().getId();
waitForSearchableDocs(initialDocCount, replicaNode_2);

replicaNode_service = internalCluster().getInstance(SegmentReplicationPressureService.class, replicaNode);
replica = getIndexShard(replicaNode_2, INDEX_NAME);
assertEquals(1, replicaNode_service.nodeStats().getShardStats().size());
replicaStats = replicaNode_service.nodeStats().getShardStats().get(primaryShard.shardId()).getReplicaStats();
assertEquals(1, replicaStats.size());
assertEquals(
"We should have stats returned for the replication group",
1,
replicaNode_service.nodeStats().getShardStats().size()
);
replicaStats = replicaNode_service.nodeStats().getShardStats().get(replicaShard.shardId()).getReplicaStats();
assertAllocationIdsInReplicaShardStats(Set.of(second_replica_aid), replicaStats);
final SegmentReplicationShardStats replica_entry = replicaStats.stream().findFirst().get();
assertEquals(replica_entry.getCheckpointsBehindCount(), 0);

// test a checkpoint without any new segments
flush(INDEX_NAME);
assertBusy(() -> {
final SegmentReplicationPressureService service = internalCluster().getInstance(
SegmentReplicationPressureService.class,
replicaNode
);
assertEquals(1, service.nodeStats().getShardStats().size());
final Set<SegmentReplicationShardStats> shardStatsSet = service.nodeStats()
assertEquals(1, replicaNode_service.nodeStats().getShardStats().size());
final Set<SegmentReplicationShardStats> shardStatsSet = replicaNode_service.nodeStats()
.getShardStats()
.get(primaryShard.shardId())
.get(replicaShard.shardId())
.getReplicaStats();
assertEquals(1, shardStatsSet.size());
assertAllocationIdsInReplicaShardStats(Set.of(second_replica_aid), shardStatsSet);
final SegmentReplicationShardStats stats = shardStatsSet.stream().findFirst().get();
assertEquals(0, stats.getCheckpointsBehindCount());
});
}
}

private void assertAllocationIdsInReplicaShardStats(Set<String> expected, Set<SegmentReplicationShardStats> replicaStats) {
assertEquals(expected, replicaStats.stream().map(SegmentReplicationShardStats::getAllocationId).collect(Collectors.toSet()));
}

/**
* Tests a scroll query on the replica
* @throws Exception when issue is encountered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -528,13 +532,27 @@ public void testFlushAfterRelocation() throws Exception {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}

// Verify segment replication event never happened on replica shard
final IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);

// Verify segment replication event never happened on replica shard other than recovery.
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), 0);
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), 0);

SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.execute()
.actionGet();
assertTrue(segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0).getReplicaStats().isEmpty());
final Set<SegmentReplicationShardStats> replicaStats = segmentReplicationStatsResponse.getReplicationStats()
.get(INDEX_NAME)
.get(0)
.getReplicaStats();
assertEquals(
Set.of(replicaShard.routingEntry().allocationId().getId()),
replicaStats.stream().map(SegmentReplicationShardStats::getAllocationId).collect(Collectors.toSet())
);
// the primary still has not refreshed to update its checkpoint, so our replica is not yet behind.
assertEquals(0, replicaStats.stream().findFirst().get().getCheckpointsBehindCount());

// Relocate primary to new primary. When new primary starts it does perform a flush.
logger.info("--> relocate the shard from primary to newPrimary");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L

private final Consumer<ReplicationGroup> onReplicationGroupUpdated;

private volatile ReplicationCheckpoint lastPublishedReplicationCheckpoint;
private volatile ReplicationCheckpoint latestReplicationCheckpoint;

/**
* Get all retention leases tracked on this shard.
Expand Down Expand Up @@ -1054,6 +1054,7 @@ public ReplicationTracker(
this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings());
this.safeCommitInfoSupplier = safeCommitInfoSupplier;
this.onReplicationGroupUpdated = onReplicationGroupUpdated;
this.latestReplicationCheckpoint = indexSettings.isSegRepEnabled() ? ReplicationCheckpoint.empty(shardId) : null;
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
assert invariant();
}
Expand Down Expand Up @@ -1212,26 +1213,42 @@ public synchronized void updateVisibleCheckpointForShard(final String allocation
*/
public synchronized void setLatestReplicationCheckpoint(ReplicationCheckpoint checkpoint) {
assert indexSettings.isSegRepEnabled();
assert handoffInProgress == false;
if (checkpoint.equals(lastPublishedReplicationCheckpoint) == false) {
this.lastPublishedReplicationCheckpoint = checkpoint;
for (Map.Entry<String, CheckpointState> entry : checkpoints.entrySet()) {
if (entry.getKey().equals(this.shardAllocationId) == false) {
final CheckpointState cps = entry.getValue();
if (cps.inSync) {
cps.checkpointTimers.computeIfAbsent(checkpoint, ignored -> {
final ReplicationTimer replicationTimer = new ReplicationTimer();
replicationTimer.start();
return replicationTimer;
});
logger.trace(
() -> new ParameterizedMessage(
"updated last published checkpoint to {} - timers [{}]",
checkpoint,
cps.checkpointTimers.keySet()
)
);
}
if (checkpoint.equals(latestReplicationCheckpoint) == false) {
this.latestReplicationCheckpoint = checkpoint;
}
if (primaryMode) {
startReplicationLagTimers();
}
}

public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return this.latestReplicationCheckpoint;
}

private void startReplicationLagTimers() {
for (Map.Entry<String, CheckpointState> entry : checkpoints.entrySet()) {
final String allocationId = entry.getKey();
if (allocationId.equals(this.shardAllocationId) == false) {
final CheckpointState cps = entry.getValue();
// if the shard is in checkpoints but is unavailable or out of sync we will not track its replication state.
// it is possible for a shard to be in-sync but not yet removed from the checkpoints collection after a failover event.
if (cps.inSync
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)) {
cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> {
final ReplicationTimer replicationTimer = new ReplicationTimer();
replicationTimer.start();
return replicationTimer;
});
logger.trace(
() -> new ParameterizedMessage(
"updated last published checkpoint for {} at visible cp {} to {} - timers [{}]",
allocationId,
cps.visibleReplicationCheckpoint,
latestReplicationCheckpoint,
cps.checkpointTimers.keySet()
)
);
}
}
}
Expand All @@ -1244,12 +1261,17 @@ public synchronized void setLatestReplicationCheckpoint(ReplicationCheckpoint ch
*/
public synchronized Set<SegmentReplicationShardStats> getSegmentReplicationStats() {
assert indexSettings.isSegRepEnabled();
final ReplicationCheckpoint lastPublishedCheckpoint = this.lastPublishedReplicationCheckpoint;
if (primaryMode && lastPublishedCheckpoint != null) {
if (primaryMode) {
return this.checkpoints.entrySet()
.stream()
.filter(entry -> entry.getKey().equals(this.shardAllocationId) == false && entry.getValue().inSync)
.map(entry -> buildShardStats(lastPublishedCheckpoint.getLength(), entry.getKey(), entry.getValue()))
// filter out this shard's allocation id, any shards that are out of sync or unavailable (shard marked in-sync but has not
// been assigned to a node).
.filter(
entry -> entry.getKey().equals(this.shardAllocationId) == false
&& entry.getValue().inSync
&& replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false
)
.map(entry -> buildShardStats(latestReplicationCheckpoint.getLength(), entry.getKey(), entry.getValue()))
.collect(Collectors.toUnmodifiableSet());
}
return Collections.emptySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public class ReplicationCheckpoint implements Writeable, Comparable<ReplicationC
private final long length;
private final String codec;

public static ReplicationCheckpoint empty(ShardId shardId) {
return empty(shardId, "");
}

public static ReplicationCheckpoint empty(ShardId shardId, String codec) {
return new ReplicationCheckpoint(shardId, codec);
}
Expand Down
Loading

0 comments on commit a3baa68

Please sign in to comment.