Skip to content

Commit

Permalink
Support shard promotion with Segment Replication.
Browse files Browse the repository at this point in the history
This change adds basic failover support with segment replication.  Once selected, a replica will commit and reopen a writeable engine.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Aug 5, 2022
1 parent d09c285 commit c28905a
Show file tree
Hide file tree
Showing 7 changed files with 331 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.opensearch.action.admin.indices.segments.ShardSegments;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
Expand All @@ -29,6 +32,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
Expand Down Expand Up @@ -70,6 +74,87 @@ protected boolean addMockInternalEngine() {
return false;
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);

waitForReplicaUpdate();
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);

final DiscoveryNode primaryDiscoveryNode = getNodeContainingPrimaryShard();
final String primaryNodeName = primaryDiscoveryNode.getName();
final String replicaNodeName = nodeA.equals(primaryNodeName) ? nodeB : nodeA;

// index another doc but don't refresh, we will ensure this is searchable once replica is promoted.
client().prepareIndex(INDEX_NAME).setId("2").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

// stop the primary node - we only have one shard on here.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName));
ensureYellowAndNoInitializingShards(INDEX_NAME);

final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replicaNodeName);
assertNotNull(replicaShardRouting);
assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary());
assertHitCount(client(replicaNodeName).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);

// assert we can index into the new primary.
client().prepareIndex(INDEX_NAME).setId("3").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
assertHitCount(client(replicaNodeName).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);

// start another node, index another doc and replicate.
String nodeC = internalCluster().startNode();
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get();
refresh(INDEX_NAME);
waitForReplicaUpdate();
assertHitCount(client(nodeC).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4);
assertHitCount(client(replicaNodeName).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4);
assertSegmentStats(REPLICA_COUNT);
}

private DiscoveryNode getNodeContainingPrimaryShard() {
final ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard();
return state.nodes().resolveNode(primaryShard.currentNodeId());
}

public void testRestartPrimary() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final DiscoveryNode primaryDiscoveryNode = getNodeContainingPrimaryShard();
final String primaryNodeName = primaryDiscoveryNode.getName();
final String replicaNodeName = nodeA.equals(primaryNodeName) ? nodeB : nodeA;

final int initialDocCount = 1;

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);

waitForReplicaUpdate();
assertDocCounts(initialDocCount, replicaNodeName, primaryNodeName);

internalCluster().restartNode(primaryNodeName);
ensureGreen(INDEX_NAME);

final DiscoveryNode newPrimaryNode = getNodeContainingPrimaryShard();
assertEquals(newPrimaryNode.getName(), replicaNodeName);

flushAndRefresh(INDEX_NAME);
waitForReplicaUpdate();

assertDocCounts(initialDocCount, replicaNodeName, primaryNodeName);
assertSegmentStats(REPLICA_COUNT);
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
Expand Down Expand Up @@ -303,4 +388,25 @@ private Map<String, Segment> getLatestSegments(ShardSegments segments) {
private Map<Boolean, List<ShardSegments>> segmentsByShardType(ShardSegments[] replicationGroupSegments) {
return Arrays.stream(replicationGroupSegments).collect(Collectors.groupingBy(s -> s.getShardRouting().primary()));
}

@Nullable
private ShardRouting getShardRoutingForNodeName(String nodeName) {
final ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(INDEX_NAME)) {
for (ShardRouting shardRouting : shardRoutingTable.activeShards()) {
final String nodeId = shardRouting.currentNodeId();
final DiscoveryNode discoveryNode = state.nodes().resolveNode(nodeId);
if (discoveryNode.getName().equals(nodeName)) {
return shardRouting;
}
}
}
return null;
}

private void assertDocCounts(int expectedDocCount, String... nodeNames) {
for (String node : nodeNames) {
assertHitCount(client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedDocCount);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ public synchronized IndexShard createShard(
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService,
this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,22 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
}

/**
* Persist the latest live SegmentInfos.
*
* This method creates a commit point from the latest SegmentInfos. It is intended to be used when this shard is about to be promoted as the new primary.
* If this method is invoked while the engine is currently updating segments on its reader, it will wait for that update to complete so the updated segments are used.
* It does not wait for segment copy to complete, that
*
* @throws IOException - When there is an IO error committing the SegmentInfos.
*/
public void commitSegmentInfos() throws IOException {
// TODO: This method should wait for replication events to finalize.
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
store.commitSegmentInfos(latestSegmentInfos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
translogManager.syncTranslog();
}

@Override
public String getHistoryUUID() {
return loadHistoryUUID(lastCommittedSegmentInfos.userData);
Expand Down
39 changes: 30 additions & 9 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final GlobalCheckpointListeners globalCheckpointListeners;
private final PendingReplicationActions pendingReplicationActions;
private final ReplicationTracker replicationTracker;
private final SegmentReplicationCheckpointPublisher checkpointPublisher;

protected volatile ShardRouting shardRouting;
protected volatile IndexShardState state;
Expand Down Expand Up @@ -305,8 +306,6 @@ Runnable getGlobalCheckpointSyncer() {
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
private final RefreshPendingLocationListener refreshPendingLocationListener;
private volatile boolean useRetentionLeasesInPeerRecovery;
private final ReferenceManager.RefreshListener checkpointRefreshListener;

private final Store remoteStore;

public IndexShard(
Expand Down Expand Up @@ -414,11 +413,7 @@ public boolean shouldCache(Query query) {
persistMetadata(path, indexSettings, shardRouting, null, logger);
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
if (checkpointPublisher != null) {
this.checkpointRefreshListener = new CheckpointRefreshListener(this, checkpointPublisher);
} else {
this.checkpointRefreshListener = null;
}
this.checkpointPublisher = checkpointPublisher;
this.remoteStore = remoteStore;
}

Expand Down Expand Up @@ -623,6 +618,10 @@ public void updateShardState(
+ newRouting;
assert getOperationPrimaryTerm() == newPrimaryTerm;
try {
if (indexSettings.isSegRepEnabled()) {
// this Shard's engine was read only, we need to update its engine before restoring local history from xlog.
promoteNRTReplicaToPrimary();
}
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
ensurePeerRecoveryRetentionLeasesExist();
/*
Expand Down Expand Up @@ -3220,8 +3219,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
Directory remoteDirectory = ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate();
internalRefreshListener.add(new RemoteStoreRefreshListener(store.directory(), remoteDirectory));
}
if (this.checkpointRefreshListener != null) {
internalRefreshListener.add(checkpointRefreshListener);
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}

return this.engineConfigFactory.newEngineConfig(
Expand Down Expand Up @@ -4111,4 +4110,26 @@ RetentionLeaseSyncer getRetentionLeaseSyncer() {
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
return getEngine().getSegmentInfosSnapshot();
}

/**
* With segment replication enabled - prepare the shard's engine to be promoted as the new primary.
*
* If this shard is currently using a replication engine, this method:
* 1. Invokes {@link NRTReplicationEngine#commitSegmentInfos()} to ensure the engine can be reopened as writeable from the latest refresh point.
* InternalEngine opens its IndexWriter from an on-disk commit point, but this replica may have recently synced from a primary's refresh point, meaning it has documents searchable in its in-memory SegmentInfos
* that are not part of a commit point. This ensures that those documents are made part of a commit and do not need to be reindexed after promotion.
* 2. Invokes resetEngineToGlobalCheckpoint - This call performs the engine swap, opening up as a writeable engine and replays any operations in the xlog. The operations indexed from xlog here will be
* any ack'd writes that were not copied to this replica before promotion.
*/
private void promoteNRTReplicaToPrimary() {
assert shardRouting.primary() && indexSettings.isSegRepEnabled();
getReplicationEngine().ifPresentOrElse(engine -> {
try {
engine.commitSegmentInfos();
resetEngineToGlobalCheckpoint();
} catch (IOException e) {
throw new OpenSearchException("Unable to change engine type, failing", e);
}
}, () -> { throw new OpenSearchException("Expected replica engine to be of type NRTReplicationEngine"); });
}
}
41 changes: 41 additions & 0 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;

/**
* A Store provides plain access to files written by an opensearch index shard. Each shard
Expand Down Expand Up @@ -799,6 +800,46 @@ public void beforeClose() {
shardLock.setDetails("closing shard");
}

/**
* This method should only be used with Segment Replication.
* Perform a commit from a live {@link SegmentInfos}. Replica engines with segrep do not have an IndexWriter and Lucene does not currently
* have the ability to create a writer directly from a SegmentInfos object. To promote the replica as a primary and avoid reindexing, we must first commit
* on the replica so that it can be opened with a writeable engine. Further, InternalEngine currently invokes `trimUnsafeCommits` which reverts the engine to a previous safeCommit where the max seqNo is less than or equal
* to the current global checkpoint. It is likely that the replica has a maxSeqNo that is higher than the global cp and a new commit will be wiped.
*
* To get around these limitations, this method first creates an IndexCommit directly from SegmentInfos, it then
* uses an appending IW to create an IndexCommit from the commit created on SegmentInfos.
* This ensures that 1. All files in the new commit are fsynced and 2. Deletes older commit points so the only commit to start from is our new commit.
*
* @param latestSegmentInfos {@link SegmentInfos} The latest active infos
* @param maxSeqNo The engine's current maxSeqNo
* @param processedCheckpoint The engine's current processed checkpoint.
* @throws IOException when there is an IO error committing.
*/
public void commitSegmentInfos(SegmentInfos latestSegmentInfos, long maxSeqNo, long processedCheckpoint) throws IOException {
assert indexSettings.isSegRepEnabled();
metadataLock.writeLock().lock();
try {
final Map<String, String> userData = new HashMap<>(latestSegmentInfos.getUserData());
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(processedCheckpoint));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
latestSegmentInfos.setUserData(userData, true);
latestSegmentInfos.commit(directory());

// similar to TrimUnsafeCommits, create a commit with an appending IW, this will delete old commits and ensure all files
// associated with the SegmentInfos.commit are fsynced.
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(directory);
assert existingCommits.isEmpty() == false : "No commits found";
final IndexCommit lastIndexCommit = existingCommits.get(existingCommits.size() - 1);
try (IndexWriter writer = newAppendingIndexWriter(directory, lastIndexCommit)) {
writer.setLiveCommitData(lastIndexCommit.getUserData().entrySet());
writer.commit();
}
} finally {
metadataLock.writeLock().unlock();
}
}

/**
* A store directory
*
Expand Down
Loading

0 comments on commit c28905a

Please sign in to comment.