Skip to content

Commit

Permalink
Clean up cancel methods.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Aug 10, 2022
1 parent c95112f commit 6548f9b
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -124,21 +125,6 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
}
}

/**
* Cancel any ongoing replications for a given {@link DiscoveryNode}
*
* @param node {@link DiscoveryNode} node for which to cancel replication events.
*/
void cancelReplication(DiscoveryNode node) {
for (Map.Entry<String, SegmentReplicationSourceHandler> entry : allocationIdToHandlers.entrySet()) {
if (entry.getValue().getTargetNode().equals(node)) {
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(entry.getKey());
handler.cancel("Cancel on node left");
removeCopyState(handler.getCopyState());
}
}
}

/**
* Prepare for a Replication event. This method constructs a {@link CopyState} holding files to be sent off of the current
* nodes's store. This state is intended to be sent back to Replicas before copy is initiated so the replica can perform a diff against its
Expand All @@ -154,7 +140,7 @@ CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter f
final CopyState copyState = getCachedCopyState(request.getCheckpoint());
if (allocationIdToHandlers.putIfAbsent(
request.getTargetAllocationId(),
createTargetHandler(request.getTargetNode(), copyState, fileChunkWriter)
createTargetHandler(request.getTargetNode(), copyState, request.getTargetAllocationId(), fileChunkWriter)
) != null) {
throw new OpenSearchException(
"Shard copy {} on node {} already replicating",
Expand All @@ -166,29 +152,23 @@ CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter f
}

/**
* Cancel all Replication events for the given shard, intended to be called when the current primary is shutting down.
* Cancel all Replication events for the given shard, intended to be called when a primary is shutting down.
*
* @param shard {@link IndexShard}
* @param reason {@link String} - Reason for the cancel
*/
synchronized void cancel(IndexShard shard, String reason) {
for (Map.Entry<String, SegmentReplicationSourceHandler> handlerEntry : allocationIdToHandlers.entrySet()) {
if (handlerEntry.getValue().getCopyState().getShard().shardId().equals(shard.shardId())) {
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(handlerEntry.getKey());
handler.cancel(reason);
}
}
final List<CopyState> list = copyStateMap.values()
.stream()
.filter(state -> state.getShard().shardId().equals(shard.shardId()))
.collect(Collectors.toList());
for (CopyState copyState : list) {
if (copyState.getShard().shardId().equals(shard.shardId())) {
while (copyStateMap.containsKey(copyState.getRequestedReplicationCheckpoint())) {
removeCopyState(copyState);
}
}
}
cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shard.shardId()), reason);
}

/**
* Cancel any ongoing replications for a given {@link DiscoveryNode}
*
* @param node {@link DiscoveryNode} node for which to cancel replication events.
*/
void cancelReplication(DiscoveryNode node) {
cancelHandlers(handler -> handler.getTargetNode().equals(node), "Node left");

}

/**
Expand All @@ -207,12 +187,18 @@ int cachedCopyStateSize() {
return copyStateMap.size();
}

private SegmentReplicationSourceHandler createTargetHandler(DiscoveryNode node, CopyState copyState, FileChunkWriter fileChunkWriter) {
private SegmentReplicationSourceHandler createTargetHandler(
DiscoveryNode node,
CopyState copyState,
String allocationId,
FileChunkWriter fileChunkWriter
) {
return new SegmentReplicationSourceHandler(
node,
fileChunkWriter,
copyState.getShard().getThreadPool(),
copyState,
allocationId,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
recoverySettings.getMaxConcurrentFileChunks()
);
Expand Down Expand Up @@ -245,4 +231,23 @@ private synchronized void removeCopyState(CopyState copyState) {
copyStateMap.remove(copyState.getRequestedReplicationCheckpoint());
}
}

/**
* Remove handlers from allocationIdToHandlers map based on a filter predicate.
* This will also decref the handler's CopyState reference.
*/
private void cancelHandlers(Predicate<? super SegmentReplicationSourceHandler> predicate, String reason) {
final List<String> allocationIds = allocationIdToHandlers.values()
.stream()
.filter(predicate)
.map(SegmentReplicationSourceHandler::getAllocationId)
.collect(Collectors.toList());
for (String allocationId : allocationIds) {
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId);
if (handler != null) {
handler.cancel(reason);
removeCopyState(handler.getCopyState());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class SegmentReplicationSourceHandler {
private final Logger logger;
private final AtomicBoolean isReplicating = new AtomicBoolean();
private final DiscoveryNode targetNode;
private final String allocationId;

/**
* Constructor.
Expand All @@ -71,6 +72,7 @@ class SegmentReplicationSourceHandler {
FileChunkWriter writer,
ThreadPool threadPool,
CopyState copyState,
String allocationId,
int fileChunkSizeInBytes,
int maxConcurrentFileChunks
) {
Expand All @@ -91,6 +93,7 @@ class SegmentReplicationSourceHandler {
fileChunkSizeInBytes,
maxConcurrentFileChunks
);
this.allocationId = allocationId;
this.copyState = copyState;
}

Expand Down Expand Up @@ -181,4 +184,8 @@ public boolean isReplicating() {
public DiscoveryNode getTargetNode() {
return targetNode;
}

public String getAllocationId() {
return allocationId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public void testSendFiles() throws IOException {
chunkWriter,
threadPool,
copyState,
primary.routingEntry().allocationId().getId(),
5000,
1
);
Expand Down Expand Up @@ -100,6 +101,7 @@ public void testSendFiles_emptyRequest() throws IOException {
chunkWriter,
threadPool,
copyState,
primary.routingEntry().allocationId().getId(),
5000,
1
);
Expand Down Expand Up @@ -138,6 +140,7 @@ public void testSendFileFails() throws IOException {
chunkWriter,
threadPool,
copyState,
primary.routingEntry().allocationId().getId(),
5000,
1
);
Expand Down Expand Up @@ -175,6 +178,7 @@ public void testReplicationAlreadyRunning() throws IOException {
chunkWriter,
threadPool,
copyState,
primary.routingEntry().allocationId().getId(),
5000,
1
);
Expand Down

0 comments on commit 6548f9b

Please sign in to comment.