Skip to content

Commit

Permalink
Fix unclosed store references with node-node segrep when primary node…
Browse files Browse the repository at this point in the history
… is unknown.

This PR fixes a bug with node-node pull based replication where if the replica does not know
the DiscoveryNode of its primary we would fail after constructing a
SegmentReplicationTarget that holds a store reference.  Only after replication
is started would a failure occur because the source node is null, and the target would not get cleaned up.
Push based replication already handled this case by catching any error and closing the target.
This update ensures the validation is done before constructing our PrimaryShardReplicationSource, before
any target object is created in both cases push and pull.

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
  • Loading branch information
mch2 committed Sep 27, 2024
1 parent 7caca26 commit 9510a7b
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ public void testFailoverWithSearchReplica_WithoutWriterReplicas() throws IOExcep
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numWriterReplicas)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numSearchReplicas)
.put("index.refresh_interval", "40ms") // set lower interval so replica attempts replication cycles after primary is
// removed.
.build()
);
ensureYellow(TEST_INDEX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public PrimaryShardReplicationSource(
RecoverySettings recoverySettings,
DiscoveryNode sourceNode
) {
assert targetNode != null : "Target node must be set";
assert sourceNode != null : "Source node must be set";
this.targetAllocationId = targetAllocationId;
this.transportService = transportService;
this.sourceNode = sourceNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public SegmentReplicationSource get(IndexShard shard) {

private DiscoveryNode getPrimaryNode(ShardId shardId) {
ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(shardId).primaryShard();
return clusterService.state().nodes().get(primaryShard.currentNodeId());
DiscoveryNode node = clusterService.state().nodes().get(primaryShard.currentNodeId());
if (node == null) {
throw new IllegalStateException("Cannot replicate, primary shard for " + shardId + " is not allocated on any node");
}
return node;

Check warning on line 60 in server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java#L60

Added line #L60 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@

import org.apache.lucene.store.IOContext;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
Expand All @@ -19,10 +25,12 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardTestCase;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.CopyState;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -45,6 +53,48 @@ public class SegmentReplicatorTests extends IndexShardTestCase {
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();

public void testReplicationWithUnassignedPrimary() throws Exception {
final IndexShard replica = newStartedShard(false, settings, new NRTReplicationEngineFactory());
final IndexShard primary = newStartedShard(true, settings, new NRTReplicationEngineFactory());
SegmentReplicator replicator = new SegmentReplicator(threadPool);

ClusterService cs = mock(ClusterService.class);
IndexShardRoutingTable.Builder shardRoutingTable = new IndexShardRoutingTable.Builder(replica.shardId());
shardRoutingTable.addShard(replica.routingEntry());
shardRoutingTable.addShard(primary.routingEntry().moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test")));

when(cs.state()).thenReturn(buildClusterState(replica, shardRoutingTable));
replicator.setSourceFactory(new SegmentReplicationSourceFactory(mock(TransportService.class), mock(RecoverySettings.class), cs));
expectThrows(IllegalStateException.class, () -> replicator.startReplication(replica));
closeShards(replica, primary);
}

public void testReplicationWithUnknownPrimaryNode() throws Exception {
final IndexShard replica = newStartedShard(false, settings, new NRTReplicationEngineFactory());
final IndexShard primary = newStartedShard(true, settings, new NRTReplicationEngineFactory());
SegmentReplicator replicator = new SegmentReplicator(threadPool);

ClusterService cs = mock(ClusterService.class);
IndexShardRoutingTable.Builder shardRoutingTable = new IndexShardRoutingTable.Builder(replica.shardId());
shardRoutingTable.addShard(replica.routingEntry());
shardRoutingTable.addShard(primary.routingEntry());

when(cs.state()).thenReturn(buildClusterState(replica, shardRoutingTable));
replicator.setSourceFactory(new SegmentReplicationSourceFactory(mock(TransportService.class), mock(RecoverySettings.class), cs));
expectThrows(IllegalStateException.class, () -> replicator.startReplication(replica));
closeShards(replica, primary);
}

private ClusterState buildClusterState(IndexShard replica, IndexShardRoutingTable.Builder indexShard) {
return ClusterState.builder(clusterService.state())
.routingTable(
RoutingTable.builder()
.add(IndexRoutingTable.builder(replica.shardId().getIndex()).addIndexShard(indexShard.build()).build())
.build()
)
.build();
}

public void testStartReplicationWithoutSourceFactory() {
ThreadPool threadpool = mock(ThreadPool.class);
ExecutorService mock = mock(ExecutorService.class);
Expand Down

0 comments on commit 9510a7b

Please sign in to comment.