Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support shard promotion with Segment Replication. #4135

Merged
merged 5 commits into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
Expand All @@ -30,6 +33,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
Expand Down Expand Up @@ -73,6 +77,109 @@ protected boolean addMockInternalEngine() {
return false;
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);

waitForReplicaUpdate();
assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);

// index another doc but don't refresh, we will ensure this is searchable once replica is promoted.
client().prepareIndex(INDEX_NAME).setId("2").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

// stop the primary node - we only have one shard on here.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));
ensureYellowAndNoInitializingShards(INDEX_NAME);

final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica);
assertNotNull(replicaShardRouting);
assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary());
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);

// assert we can index into the new primary.
client().prepareIndex(INDEX_NAME).setId("3").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);

// start another node, index another doc and replicate.
String nodeC = internalCluster().startNode();
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get();
refresh(INDEX_NAME);
waitForReplicaUpdate();
assertHitCount(client(nodeC).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4);
assertSegmentStats(REPLICA_COUNT);
}

public void testRestartPrimary() throws Exception {
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);

assertEquals(getNodeContainingPrimaryShard().getName(), primary);

final int initialDocCount = 1;
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);

waitForReplicaUpdate();
assertDocCounts(initialDocCount, replica, primary);

internalCluster().restartNode(primary);
ensureGreen(INDEX_NAME);

assertEquals(getNodeContainingPrimaryShard().getName(), replica);

flushAndRefresh(INDEX_NAME);
waitForReplicaUpdate();

assertDocCounts(initialDocCount, replica, primary);
assertSegmentStats(REPLICA_COUNT);
}

public void testCancelPrimaryAllocation() throws Exception {
// this test cancels allocation on the primary - promoting the new replica and recreating the former primary as a replica.
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);

final int initialDocCount = 1;

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);

waitForReplicaUpdate();
assertDocCounts(initialDocCount, replica, primary);

final IndexShard indexShard = getIndexShard(primary);
client().admin()
.cluster()
.prepareReroute()
.add(new CancelAllocationCommand(INDEX_NAME, indexShard.shardId().id(), primary, true))
.execute()
.actionGet();
ensureGreen(INDEX_NAME);

assertEquals(getNodeContainingPrimaryShard().getName(), replica);

flushAndRefresh(INDEX_NAME);
waitForReplicaUpdate();

assertDocCounts(initialDocCount, replica, primary);
assertSegmentStats(REPLICA_COUNT);
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
Expand Down Expand Up @@ -240,9 +347,8 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);

final Index index = resolveIndex(INDEX_NAME);
IndexShard primaryShard = getIndexShard(index, primaryNode);
IndexShard replicaShard = getIndexShard(index, replicaNode);
IndexShard primaryShard = getIndexShard(primaryNode);
IndexShard replicaShard = getIndexShard(replicaNode);
assertEquals(
primaryShard.translogStats().estimatedNumberOfOperations(),
replicaShard.translogStats().estimatedNumberOfOperations()
Expand Down Expand Up @@ -351,8 +457,7 @@ private void assertSegmentStats(int numberOfReplicas) throws IOException {
final ShardRouting replicaShardRouting = shardSegment.getShardRouting();
ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState();
final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId());
final Index index = resolveIndex(INDEX_NAME);
IndexShard indexShard = getIndexShard(index, replicaNode.getName());
IndexShard indexShard = getIndexShard(replicaNode.getName());
final String lastCommitSegmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(indexShard.store().directory());
// calls to readCommit will fail if a valid commit point and all its segments are not in the store.
SegmentInfos.readCommit(indexShard.store().directory(), lastCommitSegmentsFileName);
Expand Down Expand Up @@ -392,7 +497,8 @@ private void waitForReplicaUpdate() throws Exception {
});
}

private IndexShard getIndexShard(Index index, String node) {
private IndexShard getIndexShard(String node) {
final Index index = resolveIndex(INDEX_NAME);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
final Optional<Integer> shardId = indexService.shardIds().stream().findFirst();
Expand All @@ -409,7 +515,8 @@ private List<ShardSegments[]> getShardSegments(IndicesSegmentResponse indicesSeg
}

private Map<String, Segment> getLatestSegments(ShardSegments segments) {
final Long latestPrimaryGen = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare).get();
final Optional<Long> generation = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare);
final Long latestPrimaryGen = generation.get();
return segments.getSegments()
.stream()
.filter(s -> s.getGeneration() == latestPrimaryGen)
Expand All @@ -419,4 +526,31 @@ private Map<String, Segment> getLatestSegments(ShardSegments segments) {
private Map<Boolean, List<ShardSegments>> segmentsByShardType(ShardSegments[] replicationGroupSegments) {
return Arrays.stream(replicationGroupSegments).collect(Collectors.groupingBy(s -> s.getShardRouting().primary()));
}

@Nullable
private ShardRouting getShardRoutingForNodeName(String nodeName) {
final ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(INDEX_NAME)) {
for (ShardRouting shardRouting : shardRoutingTable.activeShards()) {
final String nodeId = shardRouting.currentNodeId();
final DiscoveryNode discoveryNode = state.nodes().resolveNode(nodeId);
if (discoveryNode.getName().equals(nodeName)) {
return shardRouting;
}
}
}
return null;
mch2 marked this conversation as resolved.
Show resolved Hide resolved
}

private void assertDocCounts(int expectedDocCount, String... nodeNames) {
for (String node : nodeNames) {
assertHitCount(client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedDocCount);
}
}

private DiscoveryNode getNodeContainingPrimaryShard() {
final ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard();
return state.nodes().resolveNode(primaryShard.currentNodeId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ public synchronized IndexShard createShard(
circuitBreakerService,
// TODO Replace with remote translog factory in the follow up PR
this.indexSettings.isRemoteTranslogStoreEnabled() ? null : new InternalTranslogFactory(),
this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,23 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
}

/**
* Persist the latest live SegmentInfos.
*
* This method creates a commit point from the latest SegmentInfos. It is intended to be used when this shard is about to be promoted as the new primary.
*
* TODO: If this method is invoked while the engine is currently updating segments on its reader, wait for that update to complete so the updated segments are used.
*
*
* @throws IOException - When there is an IO error committing the SegmentInfos.
*/
public void commitSegmentInfos() throws IOException {
// TODO: This method should wait for replication events to finalize.
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
store.commitSegmentInfos(latestSegmentInfos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
translogManager.syncTranslog();
}

@Override
public String getHistoryUUID() {
return loadHistoryUUID(lastCommittedSegmentInfos.userData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void beforeRefresh() throws IOException {

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh && shard.getReplicationTracker().isPrimaryMode()) {
if (didRefresh && shard.state() != IndexShardState.CLOSED && shard.getReplicationTracker().isPrimaryMode()) {
publisher.publish(shard);
}
}
Expand Down
40 changes: 31 additions & 9 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final GlobalCheckpointListeners globalCheckpointListeners;
private final PendingReplicationActions pendingReplicationActions;
private final ReplicationTracker replicationTracker;
private final SegmentReplicationCheckpointPublisher checkpointPublisher;

protected volatile ShardRouting shardRouting;
protected volatile IndexShardState state;
Expand Down Expand Up @@ -306,8 +307,6 @@ Runnable getGlobalCheckpointSyncer() {
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
private final RefreshPendingLocationListener refreshPendingLocationListener;
private volatile boolean useRetentionLeasesInPeerRecovery;
private final ReferenceManager.RefreshListener checkpointRefreshListener;

private final Store remoteStore;
private final TranslogFactory translogFactory;

Expand Down Expand Up @@ -417,11 +416,7 @@ public boolean shouldCache(Query query) {
persistMetadata(path, indexSettings, shardRouting, null, logger);
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
if (checkpointPublisher != null) {
this.checkpointRefreshListener = new CheckpointRefreshListener(this, checkpointPublisher);
} else {
this.checkpointRefreshListener = null;
}
this.checkpointPublisher = checkpointPublisher;
this.remoteStore = remoteStore;
this.translogFactory = translogFactory;
}
Expand Down Expand Up @@ -627,6 +622,11 @@ public void updateShardState(
+ newRouting;
assert getOperationPrimaryTerm() == newPrimaryTerm;
try {
if (indexSettings.isSegRepEnabled()) {
// this Shard's engine was read only, we need to update its engine before restoring local history from xlog.
assert newRouting.primary() && currentRouting.primary() == false;
promoteNRTReplicaToPrimary();
}
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
ensurePeerRecoveryRetentionLeasesExist();
/*
Expand Down Expand Up @@ -3231,8 +3231,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
Directory remoteDirectory = ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate();
internalRefreshListener.add(new RemoteStoreRefreshListener(store.directory(), remoteDirectory));
}
if (this.checkpointRefreshListener != null) {
internalRefreshListener.add(checkpointRefreshListener);
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}

return this.engineConfigFactory.newEngineConfig(
Expand Down Expand Up @@ -4123,4 +4123,26 @@ RetentionLeaseSyncer getRetentionLeaseSyncer() {
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
return getEngine().getSegmentInfosSnapshot();
}

/**
* With segment replication enabled - prepare the shard's engine to be promoted as the new primary.
*
* If this shard is currently using a replication engine, this method:
* 1. Invokes {@link NRTReplicationEngine#commitSegmentInfos()} to ensure the engine can be reopened as writeable from the latest refresh point.
* InternalEngine opens its IndexWriter from an on-disk commit point, but this replica may have recently synced from a primary's refresh point, meaning it has documents searchable in its in-memory SegmentInfos
* that are not part of a commit point. This ensures that those documents are made part of a commit and do not need to be reindexed after promotion.
* 2. Invokes resetEngineToGlobalCheckpoint - This call performs the engine swap, opening up as a writeable engine and replays any operations in the xlog. The operations indexed from xlog here will be
* any ack'd writes that were not copied to this replica before promotion.
*/
private void promoteNRTReplicaToPrimary() {
assert shardRouting.primary() && indexSettings.isSegRepEnabled();
getReplicationEngine().ifPresentOrElse(engine -> {
try {
engine.commitSegmentInfos();
resetEngineToGlobalCheckpoint();
mch2 marked this conversation as resolved.
Show resolved Hide resolved
} catch (IOException e) {
throw new EngineException(shardId, "Unable to update replica to writeable engine, failing shard", e);
}
}, () -> { throw new EngineException(shardId, "Expected replica engine to be of type NRTReplicationEngine"); });
}
}
Loading