Skip to content

Commit

Permalink
Fix bug where replication lag grows post primary relocation (#11238)
Browse files Browse the repository at this point in the history
* Fix bug where replication lag grows post primary relocation

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>

* Fix broken UT

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>

* add unit test for cluster state update

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>

* PR feedback

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>

* add changelog entry

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>

---------

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
  • Loading branch information
mch2 authored Dec 1, 2023
1 parent 21c0597 commit 6fa3a0d
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix for stuck update action in a bulk with `retry_on_conflict` property ([#11152](https://github.com/opensearch-project/OpenSearch/issues/11152))
- Remove shadowJar from `lang-painless` module publication ([#11369](https://github.com/opensearch-project/OpenSearch/issues/11369))
- Fix remote shards balancer and remove unused variables ([#11167](https://github.com/opensearch-project/OpenSearch/pull/11167))
- Fix bug where replication lag grows post primary relocation ([#11238](https://github.com/opensearch-project/OpenSearch/pull/11238))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@

package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexService;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationState;
Expand All @@ -20,10 +26,12 @@
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.SlowClusterStateProcessing;

import java.nio.file.Path;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
* This class runs tests with remote store + segRep while blocking file downloads
Expand Down Expand Up @@ -111,6 +119,75 @@ public void testCancelReplicationWhileFetchingMetadata() throws Exception {
cleanupRepo();
}

public void testUpdateVisibleCheckpointWithLaggingClusterStateUpdates_primaryRelocation() throws Exception {
Path location = randomRepoPath().toAbsolutePath();
Settings nodeSettings = Settings.builder().put(buildRemoteStoreNodeAttributes(location, 0d, "metadata", Long.MAX_VALUE)).build();
internalCluster().startClusterManagerOnlyNode(nodeSettings);
internalCluster().startDataOnlyNodes(2, nodeSettings);
final Settings indexSettings = Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build();
createIndex(INDEX_NAME, indexSettings);
ensureGreen(INDEX_NAME);
final Set<String> dataNodeNames = internalCluster().getDataNodeNames();
final String replicaNode = getNode(dataNodeNames, false);
final String oldPrimary = getNode(dataNodeNames, true);

// index a doc.
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", randomInt()).get();
refresh(INDEX_NAME);

logger.info("--> start another node");
final String newPrimary = internalCluster().startDataOnlyNode(nodeSettings);
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("4")
.get();
assertEquals(clusterHealthResponse.isTimedOut(), false);

SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(replicaNode, random(), 0, 0, 1000, 2000);
internalCluster().setDisruptionScheme(disruption);
disruption.startDisrupting();

// relocate the primary
logger.info("--> relocate the shard");
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary))
.execute()
.actionGet();
clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setTimeout(new TimeValue(5, TimeUnit.MINUTES))
.execute()
.actionGet();
assertEquals(clusterHealthResponse.isTimedOut(), false);

IndexShard newPrimary_shard = getIndexShard(newPrimary, INDEX_NAME);
IndexShard replica = getIndexShard(replicaNode, INDEX_NAME);
assertBusy(() -> {
assertEquals(
newPrimary_shard.getLatestReplicationCheckpoint().getSegmentInfosVersion(),
replica.getLatestReplicationCheckpoint().getSegmentInfosVersion()
);
});

assertBusy(() -> {
ClusterStatsResponse clusterStatsResponse = client().admin().cluster().prepareClusterStats().get();
ReplicationStats replicationStats = clusterStatsResponse.getIndicesStats().getSegments().getReplicationStats();
assertEquals(0L, replicationStats.maxBytesBehind);
assertEquals(0L, replicationStats.maxReplicationLag);
assertEquals(0L, replicationStats.totalBytesBehind);
});
disruption.stopDisrupting();
disableRepoConsistencyCheck("Remote Store Creates System Repository");
cleanupRepo();
}

private String getNode(Set<String> dataNodeNames, boolean primary) {
assertEquals(2, dataNodeNames.size());
for (String name : dataNodeNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1322,8 +1322,10 @@ private SegmentReplicationShardStats buildShardStats(final String allocationId,
allocationId,
cps.checkpointTimers.size(),
bytesBehind,
cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0),
cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0),
bytesBehind > 0L ? cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0) : 0,
bytesBehind > 0L
? cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0)
: 0,
cps.lastCompletedReplicationLag
);
}
Expand Down
10 changes: 2 additions & 8 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1764,8 +1764,8 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
if (isSegmentReplicationAllowed() == false) {
return false;
}
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
if (localCheckpoint.isAheadOf(requestCheckpoint)) {
final ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
if (requestCheckpoint.isAheadOf(localCheckpoint) == false) {
logger.trace(
() -> new ParameterizedMessage(
"Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}",
Expand All @@ -1775,12 +1775,6 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
);
return false;
}
if (localCheckpoint.equals(requestCheckpoint)) {
logger.trace(
() -> new ParameterizedMessage("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint)
);
return false;
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
Expand Down Expand Up @@ -61,7 +65,7 @@
*
* @opensearch.internal
*/
public class SegmentReplicationTargetService implements IndexEventListener {
public class SegmentReplicationTargetService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener {

private static final Logger logger = LogManager.getLogger(SegmentReplicationTargetService.class);

Expand Down Expand Up @@ -144,6 +148,53 @@ public SegmentReplicationTargetService(
);
}

@Override
protected void doStart() {
if (DiscoveryNode.isDataNode(clusterService.getSettings())) {
clusterService.addListener(this);
}
}

@Override
protected void doStop() {
if (DiscoveryNode.isDataNode(clusterService.getSettings())) {
clusterService.removeListener(this);
}
}

@Override
protected void doClose() throws IOException {

}

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.routingTableChanged()) {
for (IndexService indexService : indicesService) {
if (indexService.getIndexSettings().isSegRepEnabled() && event.indexRoutingTableChanged(indexService.index().getName())) {
for (IndexShard shard : indexService) {
if (shard.routingEntry().primary() == false) {
// for this shard look up its primary routing, if it has completed a relocation trigger replication
final String previousNode = event.previousState()
.routingTable()
.shardRoutingTable(shard.shardId())
.primaryShard()
.currentNodeId();
final String currentNode = event.state()
.routingTable()
.shardRoutingTable(shard.shardId())
.primaryShard()
.currentNodeId();
if (previousNode.equals(currentNode) == false) {
processLatestReceivedCheckpoint(shard, Thread.currentThread());
}
}
}
}
}
}
}

/**
* Cancel any replications on this node for a replica that is about to be closed.
*/
Expand Down Expand Up @@ -395,7 +446,7 @@ private DiscoveryNode getPrimaryNode(ShardRouting primaryShard) {
// visible to tests
protected boolean processLatestReceivedCheckpoint(IndexShard replicaShard, Thread thread) {
final ReplicationCheckpoint latestPublishedCheckpoint = latestReceivedCheckpoint.get(replicaShard.shardId());
if (latestPublishedCheckpoint != null && latestPublishedCheckpoint.isAheadOf(replicaShard.getLatestReplicationCheckpoint())) {
if (latestPublishedCheckpoint != null) {
logger.trace(
() -> new ParameterizedMessage(
"Processing latest received checkpoint for shard {} {}",
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,7 @@ public Node start() throws NodeValidationException {
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
: "transportService has a different local node than the factory provided";
injector.getInstance(PeerRecoverySourceService.class).start();
injector.getInstance(SegmentReplicationTargetService.class).start();
injector.getInstance(SegmentReplicationSourceService.class).start();

final RemoteClusterStateService remoteClusterStateService = injector.getInstance(RemoteClusterStateService.class);
Expand Down Expand Up @@ -1602,6 +1603,7 @@ public synchronized void close() throws IOException {
toClose.add(injector.getInstance(IndicesStore.class));
toClose.add(injector.getInstance(PeerRecoverySourceService.class));
toClose.add(injector.getInstance(SegmentReplicationSourceService.class));
toClose.add(injector.getInstance(SegmentReplicationTargetService.class));
toClose.add(() -> stopWatch.stop().start("cluster"));
toClose.add(injector.getInstance(ClusterService.class));
toClose.add(() -> stopWatch.stop().start("node_connections_service"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,26 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.index.IndexService;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.replication.TestReplicationSource;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -51,6 +59,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -91,6 +100,8 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase {
private SegmentReplicationState state;
private ReplicationCheckpoint initialCheckpoint;

private ClusterState clusterState;

private static final long TRANSPORT_TIMEOUT = 30000;// 30sec

@Override
Expand Down Expand Up @@ -129,7 +140,7 @@ public void setUp() throws Exception {

indicesService = mock(IndicesService.class);
ClusterService clusterService = mock(ClusterService.class);
ClusterState clusterState = mock(ClusterState.class);
clusterState = mock(ClusterState.class);
RoutingTable mockRoutingTable = mock(RoutingTable.class);
when(clusterService.state()).thenReturn(clusterState);
when(clusterState.routingTable()).thenReturn(mockRoutingTable);
Expand Down Expand Up @@ -465,9 +476,22 @@ public void testStartReplicationListenerFailure() throws InterruptedException {
verify(spy, (never())).updateVisibleCheckpoint(eq(0L), eq(replicaShard));
}

public void testDoNotProcessLatestCheckpointIfItIsbehind() {
sut.updateLatestReceivedCheckpoint(replicaShard.getLatestReplicationCheckpoint(), replicaShard);
assertFalse(sut.processLatestReceivedCheckpoint(replicaShard, null));
public void testDoNotProcessLatestCheckpointIfCheckpointIsBehind() {
SegmentReplicationTargetService service = spy(sut);
doReturn(mock(SegmentReplicationTarget.class)).when(service).startReplication(any(), any(), any());
ReplicationCheckpoint checkpoint = replicaShard.getLatestReplicationCheckpoint();
service.updateLatestReceivedCheckpoint(checkpoint, replicaShard);
service.processLatestReceivedCheckpoint(replicaShard, null);
verify(service, times(0)).startReplication(eq(replicaShard), eq(checkpoint), any());
}

public void testProcessLatestCheckpointIfCheckpointAhead() {
SegmentReplicationTargetService service = spy(sut);
doNothing().when(service).startReplication(any());
doReturn(mock(SegmentReplicationTarget.class)).when(service).startReplication(any(), any(), any());
service.updateLatestReceivedCheckpoint(aheadCheckpoint, replicaShard);
service.processLatestReceivedCheckpoint(replicaShard, null);
verify(service, times(1)).startReplication(eq(replicaShard), eq(aheadCheckpoint), any());
}

public void testOnNewCheckpointInvokedOnClosedShardDoesNothing() throws IOException {
Expand Down Expand Up @@ -617,4 +641,46 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile
target.cancel("test");
sut.startReplication(target);
}

public void testProcessCheckpointOnClusterStateUpdate() {
// set up mocks on indicies & index service to return our replica's index & shard.
IndexService indexService = mock(IndexService.class);
when(indexService.iterator()).thenReturn(Set.of(replicaShard).iterator());
when(indexService.getIndexSettings()).thenReturn(replicaShard.indexSettings());
when(indexService.index()).thenReturn(replicaShard.routingEntry().index());
when(indicesService.iterator()).thenReturn(Set.of(indexService).iterator());

// create old & new cluster states
final String targetNodeId = "targetNodeId";
ShardRouting initialRouting = primaryShard.routingEntry().relocate(targetNodeId, 0L);
assertEquals(ShardRoutingState.RELOCATING, initialRouting.state());

ShardRouting targetRouting = ShardRouting.newUnassigned(
primaryShard.shardId(),
true,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.REINITIALIZED, "test")
).initialize(targetNodeId, initialRouting.allocationId().getId(), 0L).moveToStarted();
assertEquals(targetNodeId, targetRouting.currentNodeId());
assertEquals(ShardRoutingState.STARTED, targetRouting.state());
ClusterState oldState = ClusterState.builder(ClusterName.DEFAULT)
.routingTable(
RoutingTable.builder()
.add(IndexRoutingTable.builder(primaryShard.shardId().getIndex()).addShard(initialRouting).build())
.build()
)
.build();
ClusterState newState = ClusterState.builder(ClusterName.DEFAULT)
.routingTable(
RoutingTable.builder()
.add(IndexRoutingTable.builder(primaryShard.shardId().getIndex()).addShard(targetRouting).build())
.build()
)
.build();

// spy so we can verify process is invoked
SegmentReplicationTargetService spy = spy(sut);
spy.clusterChanged(new ClusterChangedEvent("ignored", oldState, newState));
verify(spy, times(1)).processLatestReceivedCheckpoint(eq(replicaShard), any());
}
}

0 comments on commit 6fa3a0d

Please sign in to comment.