From c95112f2c471c5e2e77ba01c90442da74425a5b0 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 9 Aug 2022 20:04:42 -0700 Subject: [PATCH 1/2] Fix OngoingSegmentReplications to key by allocation ID instead of DiscoveryNode. This change fixes segrep to work with multiple shards per node by keying ongoing replications on allocation ID. This also updates cancel methods to ensure state is properly cleared on shard cancel. Signed-off-by: Marc Handalian --- .../replication/SegmentReplicationIT.java | 66 ++++++++++++++++--- .../OngoingSegmentReplications.java | 46 ++++++++----- .../SegmentReplicationSourceHandler.java | 8 ++- .../OngoingSegmentReplicationsTests.java | 42 +++++++++++- 4 files changed, 136 insertions(+), 26 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index a1cc0148dcdac..dae2fa04a3a7e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -111,6 +111,54 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { } } + public void testMultipleShards() throws Exception { + Settings indexSettings = Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + createIndex(INDEX_NAME, indexSettings); + ensureGreen(INDEX_NAME); + + final int initialDocCount = scaledRandomIntBetween(1, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + + flushAndRefresh(INDEX_NAME); + waitForReplicaUpdate(); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + ensureGreen(INDEX_NAME); + assertSegmentStats(REPLICA_COUNT); + } + } + public void testReplicationAfterForceMerge() throws Exception { final String nodeA = internalCluster().startNode(); final String nodeB = internalCluster().startNode(); @@ -262,15 +310,17 @@ private void waitForReplicaUpdate() throws Exception { final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); final List primaryShardSegmentsList = segmentListMap.get(true); final List replicaShardSegments = segmentListMap.get(false); - + // if we don't have any segments yet, proceed. final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); - final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); - final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); - for (ShardSegments shardSegments : replicaShardSegments) { - final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments() - .stream() - .anyMatch(segment -> segment.getGeneration() == latestPrimaryGen); - assertTrue(isReplicaCaughtUpToPrimary); + if (primaryShardSegments.getSegments().isEmpty() == false) { + final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); + final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); + for (ShardSegments shardSegments : replicaShardSegments) { + final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments() + .stream() + .anyMatch(segment -> segment.getGeneration() == latestPrimaryGen); + assertTrue(isReplicaCaughtUpToPrimary); + } } } }); diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index a9b032c98b70f..b6055ce7a6d12 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -24,7 +24,9 @@ import java.io.IOException; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * Manages references to ongoing segrep events on a node. @@ -38,7 +40,7 @@ class OngoingSegmentReplications { private final RecoverySettings recoverySettings; private final IndicesService indicesService; private final Map copyStateMap; - private final Map nodesToHandlers; + private final Map allocationIdToHandlers; /** * Constructor. @@ -50,7 +52,7 @@ class OngoingSegmentReplications { this.indicesService = indicesService; this.recoverySettings = recoverySettings; this.copyStateMap = Collections.synchronizedMap(new HashMap<>()); - this.nodesToHandlers = ConcurrentCollections.newConcurrentMap(); + this.allocationIdToHandlers = ConcurrentCollections.newConcurrentMap(); } /** @@ -96,8 +98,7 @@ synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) thro * @param listener {@link ActionListener} that resolves when sending files is complete. */ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener listener) { - final DiscoveryNode node = request.getTargetNode(); - final SegmentReplicationSourceHandler handler = nodesToHandlers.get(node); + final SegmentReplicationSourceHandler handler = allocationIdToHandlers.get(request.getTargetAllocationId()); if (handler != null) { if (handler.isReplicating()) { throw new OpenSearchException( @@ -108,7 +109,7 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener wrappedListener = ActionListener.runBefore(listener, () -> { - final SegmentReplicationSourceHandler sourceHandler = nodesToHandlers.remove(node); + final SegmentReplicationSourceHandler sourceHandler = allocationIdToHandlers.remove(request.getTargetAllocationId()); if (sourceHandler != null) { removeCopyState(sourceHandler.getCopyState()); } @@ -129,10 +130,12 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener entry : allocationIdToHandlers.entrySet()) { + if (entry.getValue().getTargetNode().equals(node)) { + final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(entry.getKey()); + handler.cancel("Cancel on node left"); + removeCopyState(handler.getCopyState()); + } } } @@ -149,8 +152,8 @@ void cancelReplication(DiscoveryNode node) { */ CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException { final CopyState copyState = getCachedCopyState(request.getCheckpoint()); - if (nodesToHandlers.putIfAbsent( - request.getTargetNode(), + if (allocationIdToHandlers.putIfAbsent( + request.getTargetAllocationId(), createTargetHandler(request.getTargetNode(), copyState, fileChunkWriter) ) != null) { throw new OpenSearchException( @@ -169,12 +172,23 @@ CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter f * @param reason {@link String} - Reason for the cancel */ synchronized void cancel(IndexShard shard, String reason) { - for (SegmentReplicationSourceHandler entry : nodesToHandlers.values()) { - if (entry.getCopyState().getShard().equals(shard)) { - entry.cancel(reason); + for (Map.Entry handlerEntry : allocationIdToHandlers.entrySet()) { + if (handlerEntry.getValue().getCopyState().getShard().shardId().equals(shard.shardId())) { + final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(handlerEntry.getKey()); + handler.cancel(reason); + } + } + final List list = copyStateMap.values() + .stream() + .filter(state -> state.getShard().shardId().equals(shard.shardId())) + .collect(Collectors.toList()); + for (CopyState copyState : list) { + if (copyState.getShard().shardId().equals(shard.shardId())) { + while (copyStateMap.containsKey(copyState.getRequestedReplicationCheckpoint())) { + removeCopyState(copyState); + } } } - copyStateMap.clear(); } /** @@ -186,7 +200,7 @@ boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { } int size() { - return nodesToHandlers.size(); + return allocationIdToHandlers.size(); } int cachedCopyStateSize() { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index 8911302a722f5..e9bacf4481202 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -54,6 +54,7 @@ class SegmentReplicationSourceHandler { private final List resources = new CopyOnWriteArrayList<>(); private final Logger logger; private final AtomicBoolean isReplicating = new AtomicBoolean(); + private final DiscoveryNode targetNode; /** * Constructor. @@ -73,6 +74,7 @@ class SegmentReplicationSourceHandler { int fileChunkSizeInBytes, int maxConcurrentFileChunks ) { + this.targetNode = targetNode; this.shard = copyState.getShard(); this.logger = Loggers.getLogger( SegmentReplicationSourceHandler.class, @@ -118,7 +120,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene logger.debug( "delaying replication of {} as it is not listed as assigned to target node {}", shard.shardId(), - request.getTargetNode() + targetNode ); throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); } @@ -175,4 +177,8 @@ CopyState getCopyState() { public boolean isReplicating() { return isReplicating.get(); } + + public DiscoveryNode getTargetNode() { + return targetNode; + } } diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index d42e75871a45a..38c55620e1223 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -155,6 +155,9 @@ public void testCancelReplication() throws IOException { } public void testMultipleReplicasUseSameCheckpoint() throws IOException { + IndexShard secondReplica = newShard(primary.shardId(), false); + recoverReplica(secondReplica, primary, true); + OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings); final CheckpointInfoRequest request = new CheckpointInfoRequest( 1L, @@ -172,7 +175,7 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException { final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest( 1L, - replica.routingEntry().allocationId().getId(), + secondReplica.routingEntry().allocationId().getId(), replicaDiscoveryNode, testCheckpoint ); @@ -187,6 +190,7 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException { assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); assertEquals(0, replications.cachedCopyStateSize()); + closeShards(secondReplica); } public void testStartCopyWithoutPrepareStep() { @@ -272,4 +276,40 @@ public void onFailure(Exception e) { } }); } + + public void testCancelAllReplicationsForShard() throws IOException { + // This tests when primary has multiple ongoing replications. + IndexShard replica_2 = newShard(primary.shardId(), false); + recoverReplica(replica_2, primary, true); + + OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings); + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + primaryDiscoveryNode, + testCheckpoint + ); + + final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class)); + assertEquals(1, copyState.refCount()); + + final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest( + 1L, + replica_2.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + testCheckpoint + ); + replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class)); + + assertEquals(2, copyState.refCount()); + assertEquals(2, replications.size()); + assertEquals(1, replications.cachedCopyStateSize()); + + // cancel the primary's ongoing replications. + replications.cancel(primary, "Test"); + assertEquals(0, copyState.refCount()); + assertEquals(0, replications.size()); + assertEquals(0, replications.cachedCopyStateSize()); + closeShards(replica_2); + } } From 6548f9b9614160cb1f193f9724d066913098dc6f Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 9 Aug 2022 20:54:38 -0700 Subject: [PATCH 2/2] Clean up cancel methods. Signed-off-by: Marc Handalian --- .../OngoingSegmentReplications.java | 75 ++++++++++--------- .../SegmentReplicationSourceHandler.java | 7 ++ .../SegmentReplicationSourceHandlerTests.java | 4 + 3 files changed, 51 insertions(+), 35 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index b6055ce7a6d12..dfebe5f7cabf2 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Predicate; import java.util.stream.Collectors; /** @@ -124,21 +125,6 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener entry : allocationIdToHandlers.entrySet()) { - if (entry.getValue().getTargetNode().equals(node)) { - final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(entry.getKey()); - handler.cancel("Cancel on node left"); - removeCopyState(handler.getCopyState()); - } - } - } - /** * Prepare for a Replication event. This method constructs a {@link CopyState} holding files to be sent off of the current * nodes's store. This state is intended to be sent back to Replicas before copy is initiated so the replica can perform a diff against its @@ -154,7 +140,7 @@ CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter f final CopyState copyState = getCachedCopyState(request.getCheckpoint()); if (allocationIdToHandlers.putIfAbsent( request.getTargetAllocationId(), - createTargetHandler(request.getTargetNode(), copyState, fileChunkWriter) + createTargetHandler(request.getTargetNode(), copyState, request.getTargetAllocationId(), fileChunkWriter) ) != null) { throw new OpenSearchException( "Shard copy {} on node {} already replicating", @@ -166,29 +152,23 @@ CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter f } /** - * Cancel all Replication events for the given shard, intended to be called when the current primary is shutting down. + * Cancel all Replication events for the given shard, intended to be called when a primary is shutting down. * * @param shard {@link IndexShard} * @param reason {@link String} - Reason for the cancel */ synchronized void cancel(IndexShard shard, String reason) { - for (Map.Entry handlerEntry : allocationIdToHandlers.entrySet()) { - if (handlerEntry.getValue().getCopyState().getShard().shardId().equals(shard.shardId())) { - final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(handlerEntry.getKey()); - handler.cancel(reason); - } - } - final List list = copyStateMap.values() - .stream() - .filter(state -> state.getShard().shardId().equals(shard.shardId())) - .collect(Collectors.toList()); - for (CopyState copyState : list) { - if (copyState.getShard().shardId().equals(shard.shardId())) { - while (copyStateMap.containsKey(copyState.getRequestedReplicationCheckpoint())) { - removeCopyState(copyState); - } - } - } + cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shard.shardId()), reason); + } + + /** + * Cancel any ongoing replications for a given {@link DiscoveryNode} + * + * @param node {@link DiscoveryNode} node for which to cancel replication events. + */ + void cancelReplication(DiscoveryNode node) { + cancelHandlers(handler -> handler.getTargetNode().equals(node), "Node left"); + } /** @@ -207,12 +187,18 @@ int cachedCopyStateSize() { return copyStateMap.size(); } - private SegmentReplicationSourceHandler createTargetHandler(DiscoveryNode node, CopyState copyState, FileChunkWriter fileChunkWriter) { + private SegmentReplicationSourceHandler createTargetHandler( + DiscoveryNode node, + CopyState copyState, + String allocationId, + FileChunkWriter fileChunkWriter + ) { return new SegmentReplicationSourceHandler( node, fileChunkWriter, copyState.getShard().getThreadPool(), copyState, + allocationId, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks() ); @@ -245,4 +231,23 @@ private synchronized void removeCopyState(CopyState copyState) { copyStateMap.remove(copyState.getRequestedReplicationCheckpoint()); } } + + /** + * Remove handlers from allocationIdToHandlers map based on a filter predicate. + * This will also decref the handler's CopyState reference. + */ + private void cancelHandlers(Predicate predicate, String reason) { + final List allocationIds = allocationIdToHandlers.values() + .stream() + .filter(predicate) + .map(SegmentReplicationSourceHandler::getAllocationId) + .collect(Collectors.toList()); + for (String allocationId : allocationIds) { + final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId); + if (handler != null) { + handler.cancel(reason); + removeCopyState(handler.getCopyState()); + } + } + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index e9bacf4481202..46bad7951a2e1 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -55,6 +55,7 @@ class SegmentReplicationSourceHandler { private final Logger logger; private final AtomicBoolean isReplicating = new AtomicBoolean(); private final DiscoveryNode targetNode; + private final String allocationId; /** * Constructor. @@ -71,6 +72,7 @@ class SegmentReplicationSourceHandler { FileChunkWriter writer, ThreadPool threadPool, CopyState copyState, + String allocationId, int fileChunkSizeInBytes, int maxConcurrentFileChunks ) { @@ -91,6 +93,7 @@ class SegmentReplicationSourceHandler { fileChunkSizeInBytes, maxConcurrentFileChunks ); + this.allocationId = allocationId; this.copyState = copyState; } @@ -181,4 +184,8 @@ public boolean isReplicating() { public DiscoveryNode getTargetNode() { return targetNode; } + + public String getAllocationId() { + return allocationId; + } } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java index 70061c54d0da2..a644e3a861eaa 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -63,6 +63,7 @@ public void testSendFiles() throws IOException { chunkWriter, threadPool, copyState, + primary.routingEntry().allocationId().getId(), 5000, 1 ); @@ -100,6 +101,7 @@ public void testSendFiles_emptyRequest() throws IOException { chunkWriter, threadPool, copyState, + primary.routingEntry().allocationId().getId(), 5000, 1 ); @@ -138,6 +140,7 @@ public void testSendFileFails() throws IOException { chunkWriter, threadPool, copyState, + primary.routingEntry().allocationId().getId(), 5000, 1 ); @@ -175,6 +178,7 @@ public void testReplicationAlreadyRunning() throws IOException { chunkWriter, threadPool, copyState, + primary.routingEntry().allocationId().getId(), 5000, 1 );