-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CCR] Added auto follow patterns feature #33118
Changes from 5 commits
75b77f8
e5160d5
78c178a
80b517e
f818a5d
1292150
c7c2287
3baa210
812ebb7
1943f41
5fcfb3e
d2b0643
1634592
5edc457
5045a62
df5fa78
91dedcd
cf792d9
4c4caee
01819ab
6907ce9
720e53e
cf984b3
f8e9ffe
7dea40c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,261 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
package org.elasticsearch.xpack.ccr.action; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.cluster.ClusterChangedEvent; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.ClusterStateApplier; | ||
import org.elasticsearch.cluster.ClusterStateUpdateTask; | ||
import org.elasticsearch.cluster.metadata.IndexMetaData; | ||
import org.elasticsearch.cluster.metadata.MetaData; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.regex.Regex; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.xpack.ccr.CcrSettings; | ||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; | ||
|
||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.BiConsumer; | ||
import java.util.function.Consumer; | ||
import java.util.function.Function; | ||
|
||
/** | ||
* A component that runs only on the elected master node and follows leader indices automatically | ||
* if they match with a auto follow pattern that is defined in {@link AutoFollowMetadata}. | ||
*/ | ||
public class AutoFollowCoordinator implements ClusterStateApplier { | ||
|
||
private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class); | ||
|
||
private final Client client; | ||
private final TimeValue pollInterval; | ||
private final ThreadPool threadPool; | ||
private final ClusterService clusterService; | ||
|
||
private volatile boolean localNodeMaster = false; | ||
|
||
public AutoFollowCoordinator(Settings settings, | ||
Client client, | ||
ThreadPool threadPool, | ||
ClusterService clusterService) { | ||
this.client = client; | ||
this.threadPool = threadPool; | ||
this.clusterService = clusterService; | ||
|
||
this.pollInterval = CcrSettings.CCR_AUTO_FOLLOW_POLL_INTERVAL.get(settings); | ||
clusterService.addStateApplier(this); | ||
} | ||
|
||
void doAutoFollow() { | ||
if (localNodeMaster == false) { | ||
return; | ||
} | ||
ClusterState localClusterState = clusterService.state(); | ||
AutoFollowMetadata autoFollowMetadata = localClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); | ||
if (autoFollowMetadata == null) { | ||
threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); | ||
return; | ||
} | ||
|
||
if (autoFollowMetadata.getPatterns().isEmpty()) { | ||
threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); | ||
return; | ||
} | ||
|
||
Consumer<Exception> handler = e -> { | ||
if (e != null) { | ||
LOGGER.error("Failure occurred during auto following indices", e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be at the warn level. |
||
} | ||
threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); | ||
}; | ||
AutoFollower operation = new AutoFollower(client, handler, autoFollowMetadata) { | ||
|
||
void clusterStateApiCall(Client remoteClient, BiConsumer<ClusterState, Exception> handler) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let us call this parameter a name other than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not a fan of the |
||
ClusterStateRequest request = new ClusterStateRequest(); | ||
request.clear(); | ||
request.metaData(true); | ||
remoteClient.admin().cluster().state(request, | ||
ActionListener.wrap( | ||
r -> handler.accept(r.getState(), null), | ||
e -> handler.accept(null, e) | ||
) | ||
); | ||
} | ||
|
||
void createAndFollowApiCall(FollowIndexAction.Request followRequest, Consumer<Exception> handler) { | ||
client.execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest), | ||
ActionListener.wrap(r -> handler.accept(null), handler::accept)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be |
||
} | ||
|
||
void updateAutoMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler) { | ||
clusterService.submitStateUpdateTask("update_auto_follow_metadata", new ClusterStateUpdateTask() { | ||
|
||
@Override | ||
public ClusterState execute(ClusterState currentState) throws Exception { | ||
return updateFunction.apply(currentState); | ||
} | ||
|
||
@Override | ||
public void onFailure(String source, Exception e) { | ||
handler.accept(e); | ||
} | ||
|
||
@Override | ||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { | ||
handler.accept(null); | ||
} | ||
}); | ||
} | ||
|
||
}; | ||
operation.autoFollowIndices(); | ||
} | ||
|
||
@Override | ||
public void applyClusterState(ClusterChangedEvent event) { | ||
final boolean beforeLocalMasterNode = localNodeMaster; | ||
localNodeMaster = event.localNodeMaster(); | ||
if (beforeLocalMasterNode == false && localNodeMaster) { | ||
threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); | ||
} | ||
} | ||
|
||
abstract static class AutoFollower { | ||
|
||
private final Client client; | ||
private final Consumer<Exception> handler; | ||
private final AutoFollowMetadata autoFollowMetadata; | ||
|
||
private final AtomicInteger executedRequests = new AtomicInteger(0); | ||
private final AtomicReference<Exception> errorHolder = new AtomicReference<>(); | ||
|
||
AutoFollower(Client client, Consumer<Exception> handler, AutoFollowMetadata autoFollowMetadata) { | ||
this.client = client; | ||
this.handler = handler; | ||
this.autoFollowMetadata = autoFollowMetadata; | ||
} | ||
|
||
void autoFollowIndices() { | ||
for (Map.Entry<String, AutoFollowMetadata.AutoFollowPattern> entry : autoFollowMetadata.getPatterns().entrySet()) { | ||
String clusterAlias = entry.getKey(); | ||
AutoFollowMetadata.AutoFollowPattern autoFollowPattern = entry.getValue(); | ||
Client remoteClient = clusterAlias.equals("_local_") ? client : client.getRemoteClusterClient(clusterAlias); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The name of this variable is misleading since it might not be a remote client. |
||
List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDS().get(clusterAlias); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lowercase |
||
|
||
clusterStateApiCall(remoteClient, (remoteClusterState, e) -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let us avoid the name |
||
if (remoteClusterState != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let us assert that |
||
handleClusterAlias(clusterAlias, autoFollowPattern, followedIndices, remoteClusterState); | ||
} else { | ||
finalise(e); | ||
} | ||
}); | ||
} | ||
} | ||
|
||
private void handleClusterAlias(String clusterAlias, AutoFollowMetadata.AutoFollowPattern autoFollowPattern, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let us break this method down a lot. We can chunk it into unit-testable pieces and it should help with readability. |
||
List<String> followedIndexUUIDS, ClusterState remoteClusterState) { | ||
List<IndexMetaData> leaderIndicesToFollow = new ArrayList<>(); | ||
String[] patterns = autoFollowPattern.getLeaderIndexPatterns().toArray(new String[0]); | ||
for (IndexMetaData indexMetaData : remoteClusterState.getMetaData()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For example, this loop could be a method: |
||
if (Regex.simpleMatch(patterns, indexMetaData.getIndex().getName())) { | ||
if (followedIndexUUIDS.contains(indexMetaData.getIndex().getUUID()) == false) { | ||
leaderIndicesToFollow.add(indexMetaData); | ||
} | ||
} | ||
} | ||
if (leaderIndicesToFollow.isEmpty()) { | ||
finalise(null); | ||
} else { | ||
AtomicInteger numRequests = new AtomicInteger(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe this should be a |
||
for (IndexMetaData indexToFollow : leaderIndicesToFollow) { | ||
String leaderIndexName = indexToFollow.getIndex().getName(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe this block of code can be a method |
||
String followIndexName = leaderIndexName; | ||
if (autoFollowPattern.getFollowIndexPattern() != null) { | ||
followIndexName = autoFollowPattern.getFollowIndexPattern().replace("{{leader_index}}", leaderIndexName); | ||
} | ||
|
||
String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName : | ||
clusterAlias + ":" + leaderIndexName; | ||
FollowIndexAction.Request followRequest = | ||
new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName, | ||
autoFollowPattern.getMaxBatchOperationCount(), autoFollowPattern.getMaxConcurrentReadBatches(), | ||
autoFollowPattern.getMaxOperationSizeInBytes(), autoFollowPattern.getMaxConcurrentWriteBatches(), | ||
autoFollowPattern.getMaxWriteBufferSize(), autoFollowPattern.getRetryTimeout(), | ||
autoFollowPattern.getIdleShardRetryDelay()); | ||
|
||
// This runs on the elected master node, so we can update cluster state here: | ||
Consumer<Exception> handler = followError -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is difficult to read because it breaks the usual linear flow of reading code. So we are defining a callback, and then within that defining another callback and then it's really the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because really what we are after here is:
it would be nice if the code read a little closer to that. |
||
if (followError != null) { | ||
LOGGER.error("Failed to auto follow leader index [" + leaderIndexName + "]", followError); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be |
||
if (numRequests.incrementAndGet() == leaderIndicesToFollow.size()) { | ||
finalise(followError); | ||
} | ||
return; | ||
} | ||
Function<ClusterState, ClusterState> clusterStateUpdateFunction = currentState -> { | ||
AutoFollowMetadata currentAutoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE); | ||
|
||
Map<String, List<String>> newFollowedIndexUUIDS = | ||
new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDS()); | ||
newFollowedIndexUUIDS.get(clusterAlias).add(indexToFollow.getIndexUUID()); | ||
|
||
ClusterState.Builder newState = ClusterState.builder(currentState); | ||
AutoFollowMetadata newAutoFollowMetadata = | ||
new AutoFollowMetadata(currentAutoFollowMetadata.getPatterns(), newFollowedIndexUUIDS); | ||
newState.metaData(MetaData.builder(currentState.getMetaData()) | ||
.putCustom(AutoFollowMetadata.TYPE, newAutoFollowMetadata) | ||
.build()); | ||
return newState.build(); | ||
}; | ||
updateAutoMetadata(clusterStateUpdateFunction, updateError -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe |
||
if (updateError != null) { | ||
LOGGER.error("Failed to mark leader index [" + leaderIndexName + "] as auto followed", updateError); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So I am concerned about the error handling here. What happens if we have created the follower and actively started replication, but failed to add the index to the metadata? I think that it means we will retry on the next auto follower execution, which will then fail because the follower index already exists. And we will keep retrying. I think we need some protection here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Agreed. that would be a bad situation. I suspect such a thing is possible when the current master node fails. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The only think that I can think of that we can do here without making this code more complicated if to have another background check that verifies that there is an entry for all leader indices being followed. We could check persistent tasks to figure this out or use the leader index metadata that we're going to add as custom to IndexMetaData. What do you think about this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I am thinking something along those lines too. Like: if a leader index matches a pattern, and we are already following it, then we can skip create and follow and only need to get that index into the metadata. Your approach of using the follower index metadata about the leader index it is following sounds like a good approach to me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it then makes sense to do this in a follow up PR? (because the follower index metadata needs to be added first) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I think so. I think we can put some skeleton code into place now, with a bunch of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added an entry to #33007 |
||
} else { | ||
LOGGER.debug("Successfully marked leader index [{}] as auto followed", leaderIndexName); | ||
} | ||
if (numRequests.incrementAndGet() == leaderIndicesToFollow.size()) { | ||
finalise(updateError); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So I think that something like this means that we are losing errors. So we have:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. I'll keep track of the errorHolder and suppress subsequent errors to the first error if multiple exceptions occur. |
||
} | ||
}); | ||
}; | ||
LOGGER.info("Auto following leader index [{}] as follow index [{}]", leaderIndexName, followIndexName); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the placement of this log message is misleading since we can still fail after this. |
||
createAndFollowApiCall(followRequest, handler); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that |
||
} | ||
} | ||
} | ||
|
||
private void finalise(Exception failure) { | ||
if (errorHolder.compareAndSet(null, failure) == false) { | ||
errorHolder.get().addSuppressed(failure); | ||
} | ||
|
||
if (executedRequests.incrementAndGet() == autoFollowMetadata.getPatterns().size()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we use a |
||
handler.accept(errorHolder.get()); | ||
} | ||
} | ||
|
||
// abstract methods to make unit testing possible: | ||
|
||
abstract void clusterStateApiCall(Client remoteClient, BiConsumer<ClusterState, Exception> handler); | ||
|
||
abstract void createAndFollowApiCall(FollowIndexAction.Request followRequest, Consumer<Exception> handler); | ||
|
||
abstract void updateAutoMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler); | ||
|
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it be private?