-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CCR] Added auto follow patterns feature #33118
Conversation
Auto Following Patterns is a cross cluster replication feature that keeps track whether in the leader cluster indices are being created with names that match with a specific pattern and if so automatically let the follower cluster follow these newly created indices. This change adds an `AutoFollowCoordinator` component that is only active on the elected master node. Periodically this component checks the the cluster state of remote clusters if there new leader indices that match with configured auto follow patterns that have been defined in `AutoFollowMetadata` custom metadata. This change also adds two new APIs to manage auto follow patterns. A put auto follow pattern api: ``` PUT /_ccr/_autofollow/{{remote_cluster}} { "leader_index_pattern": ["logs-*", ...], "follow_index_pattern": "{{leader_index}}-copy", "max_concurrent_read_batches": 2 ... // other optional parameters } ``` and delete auto follow pattern api: ``` DELETE /_ccr/_autofollow/{{remote_cluster_alias}} ``` The auto follow patterns are directly tied to the remote cluster aliases configured in the follow cluster. Relates to elastic#33007
Pinging @elastic/es-distributed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heya @martijnvg this is a big pull request that will take me some time to go through. I’ve started buffering some comments but rather than dropping a giant review on you, I’ll submit them as I go. I’ll pick more of this up tomorrow morning.
/** | ||
* Custom metadata that contains auto follow patterns and what leader indices an auto follow pattern has already followed. | ||
*/ | ||
public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> implements MetaData.Custom { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has to implement XPackPlugin$MetaDataCustom
otherwise the feature-aware check fails:
> Task :x-pack:plugin:core:featureAwareCheck FAILED
checking for custom violations
class [org/elasticsearch/xpack/core/ccr/AutoFollowMetadata] implements [org/elasticsearch/cluster/metadata/MetaData$Custom but does not implement [org/elasticsearch/xpack/core/XPackPlugin$XPackMetaDataCustom]
Exception in thread "main" java.lang.IllegalStateException: found custom in X-Pack not extending appropriate X-Pack mix-in
at org.elasticsearch.xpack.test.feature_aware.FeatureAwareCheck.main(FeatureAwareCheck.java:58)
The reason for this check is to prevent us from adding new metadata customs to X-Pack that to do not specify they require X-Pack. See #31020 and #31081.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I didn't notice this, because I only ran: ./gradlew x-pack:plugin:ccr:check
prior to opening this PR and the PR build failed for a different reason (mixed cluster qa tests).
private static final ParseField PATTERNS_FIELD = new ParseField("patterns"); | ||
private static final ParseField FOLLOWED_LEADER_INDICES_FIELD = new ParseField("followed_leader_indices"); | ||
|
||
private static final ConstructingObjectParser<AutoFollowMetadata, Void> PARSER = new ConstructingObjectParser<>("autofollow", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: autofollow
-> auto_follow
.
private static final ParseField PATTERNS_FIELD = new ParseField("patterns"); | ||
private static final ParseField FOLLOWED_LEADER_INDICES_FIELD = new ParseField("followed_leader_indices"); | ||
|
||
private static final ConstructingObjectParser<AutoFollowMetadata, Void> PARSER = new ConstructingObjectParser<>("autofollow", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's apply @SuppressWarnings("unchecked")
to this field so we do not have to see those obnoxious unchecked cast warnings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly for autofollow
elsewhere, and for other unchecked casts in this file.
|
||
@Override | ||
public Version getMinimalSupportedVersion() { | ||
return Version.CURRENT.minimumCompatibilityVersion(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not right, this should be 6.5.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whoops
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed the coordinator today. Overall I think this is a great start and we are merely polishing around the edges. I will try to review another big chunk of code later tonight, but I want to not block you on making further progress on this change so I submit another piece of the review now. I will probably keep doing it in chunks like this. Let me know if you would prefer a different approach.
clusterService.addStateApplier(this); | ||
} | ||
|
||
void doAutoFollow() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it be private?
errorHolder.get().addSuppressed(failure); | ||
} | ||
|
||
if (executedRequests.incrementAndGet() == autoFollowMetadata.getPatterns().size()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use a CountDown
since it gives us safety about never over counting down?
|
||
Consumer<Exception> handler = e -> { | ||
if (e != null) { | ||
LOGGER.error("Failure occurred during auto following indices", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be at the warn level.
for (Map.Entry<String, AutoFollowMetadata.AutoFollowPattern> entry : autoFollowMetadata.getPatterns().entrySet()) { | ||
String clusterAlias = entry.getKey(); | ||
AutoFollowMetadata.AutoFollowPattern autoFollowPattern = entry.getValue(); | ||
Client remoteClient = clusterAlias.equals("_local_") ? client : client.getRemoteClusterClient(clusterAlias); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of this variable is misleading since it might not be a remote client.
String clusterAlias = entry.getKey(); | ||
AutoFollowMetadata.AutoFollowPattern autoFollowPattern = entry.getValue(); | ||
Client remoteClient = clusterAlias.equals("_local_") ? client : client.getRemoteClusterClient(clusterAlias); | ||
List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDS().get(clusterAlias); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lowercase s
in UUIDS
.
} | ||
}); | ||
}; | ||
LOGGER.info("Auto following leader index [{}] as follow index [{}]", leaderIndexName, followIndexName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the placement of this log message is misleading since we can still fail after this.
|
||
void createAndFollowApiCall(FollowIndexAction.Request followRequest, Consumer<Exception> handler) { | ||
client.execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest), | ||
ActionListener.wrap(r -> handler.accept(null), handler::accept)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be handler
instead of handler::accept
.
autoFollowPattern.getIdleShardRetryDelay()); | ||
|
||
// This runs on the elected master node, so we can update cluster state here: | ||
Consumer<Exception> handler = followError -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is difficult to read because it breaks the usual linear flow of reading code. So we are defining a callback, and then within that defining another callback and then it's really the updateAutoMetadata
that executes first, and then there's another callback defined. I think it might be easier if we broke it down into onSuccess/onFailure
instead of the Consumer<Exception>
so that we can see the logic separately and think of it as "this is what happens if there is success, and this is what happens if there is failure".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because really what we are after here is:
- try to create the follower index and follow the leader
- if that succeeds, update the auto-follow metadata, then do some accounting
- if that fails, do some accounting and track the failure
it would be nice if the code read a little closer to that.
.build()); | ||
return newState.build(); | ||
}; | ||
updateAutoMetadata(clusterStateUpdateFunction, updateError -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe updateAutoFollowMetadata
?
}); | ||
}; | ||
LOGGER.info("Auto following leader index [{}] as follow index [{}]", leaderIndexName, followIndexName); | ||
createAndFollowApiCall(followRequest, handler); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that createAndFollow
would be a fine name.
determine the follow index name into testable methods.
@jasontedor Thanks for the review. I've updated the PR.
I think that for this change this is the best way to get it reviewed. |
to make sure that al least one shard is available to avoid 'all shards failed' error in search api. so instead of checking if index exists in cluster state a wait for yellow check is done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @martijnvg, let's keep going on working on the coordinator. I left a few more high-level comments. Let's come up with a plan for these.
I am going to be submitting a review for the other components later tonight, most of it is fine, I don't see anything glaring.
This will free us to focus the remainder of our effort on polishing the coordinator.
}; | ||
AutoFollower operation = new AutoFollower(client, handler, autoFollowMetadata) { | ||
|
||
void getLeaderClusterState(Client leaderClient, BiConsumer<ClusterState, Exception> handler) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can haz @Override
annotations? 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whoops, not sure what happened to these annotations :) I'll add them back.
if (leaderIndicesToFollow.isEmpty()) { | ||
finalise(null); | ||
} else { | ||
CountDown countDown = new CountDown(leaderIndicesToFollow.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reusing the name countDown
scares me here because it shadows the countDown
from the AutoFollower
. It is especially confusing with this block of code:
if (countDown.countDown()) {
finalise(updateError);
}
where that is counting down the shadower (the local countDown
defined in handleClusterAlias
and then finalise
itself counts down another CountDown
named countDown
(the AutoFollower
CountDown
). Can we give these clearer names, and definitely not shadow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I'll rename them.
LOGGER.debug("Successfully marked leader index [{}] as auto followed", leaderIndexName); | ||
} | ||
if (countDown.countDown()) { | ||
finalise(updateError); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I think that something like this means that we are losing errors. So we have:
- a pattern
metricbeat-*
- let's say that matches two indices in the leader cluster
metricbeat-2018-08-27
andmetricbeat-2018-08-28
- we have an outer
CountDown
AutoFollower#countDown
initialized to one for the single pattern - we have an inner
CountDown
countDown
inhandleClusterAlias
initialized to two for the two indices - the first cluster state update task for the first index fails
- we count down
handleClusterAlias#countDown
and it decrements to one; we do nothing - the second cluster state update task for the second index succeeds, or fails; either way, we would count down
handleClusterAlias#countDown
again and the exception from the first failure would be lost
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I'll keep track of the errorHolder and suppress subsequent errors to the first error if multiple exceptions occur.
// The coordinator always runs on the elected master node, so we can update cluster state here: | ||
updateAutoFollowMetadata(function, updateError -> { | ||
if (updateError != null) { | ||
LOGGER.error("Failed to mark leader index [" + leaderIndexName + "] as auto followed", updateError); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I am concerned about the error handling here. What happens if we have created the follower and actively started replication, but failed to add the index to the metadata? I think that it means we will retry on the next auto follower execution, which will then fail because the follower index already exists. And we will keep retrying. I think we need some protection here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it means we will retry on the next auto follower execution, which will then fail because the follower index already exists. And we will keep retrying.
Agreed. that would be a bad situation. I suspect such a thing is possible when the current master node fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only think that I can think of that we can do here without making this code more complicated if to have another background check that verifies that there is an entry for all leader indices being followed. We could check persistent tasks to figure this out or use the leader index metadata that we're going to add as custom to IndexMetaData. What do you think about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I am thinking something along those lines too. Like: if a leader index matches a pattern, and we are already following it, then we can skip create and follow and only need to get that index into the metadata. Your approach of using the follower index metadata about the leader index it is following sounds like a good approach to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it then makes sense to do this in a follow up PR? (because the follower index metadata needs to be added first)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think so. I think we can put some skeleton code into place now, with a bunch of TODOs
. Would you add this to the meta-issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added an entry to #33007
…ture we can check for indices that are already being followed but we failed to update AutoMetaData#followedLeaderIndexUUIDs
@jasontedor I've updated the PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments about put auto-follow patterns, I will be following up shortly with comments about delete auto-follow patterns.
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current", | ||
"methods": [ "PUT" ], | ||
"url": { | ||
"path": "/_ccr/_autofollow/{remote_cluster_alias}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us make the endpoints /_ccr/auto_follow
for both delete and put.
|
||
@Override | ||
public String getName() { | ||
return "ccr_put_auto_folow_pattern_action"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
folow
-> follow
clusterStateRequest.metaData(true); | ||
|
||
remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(resp -> { | ||
clusterService.submitStateUpdateTask("put_auto_follow_pattern-" + request.getRemoteClusterAlias(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use dashes throughout: put-auto-follow-pattern-
|
||
AutoFollowMetadata.AutoFollowPattern previousPattern = configurations.get(request.getRemoteClusterAlias()); | ||
List<String> followedIndexUUIDS = followedLeaderIndices.get(request.getRemoteClusterAlias()); | ||
if (followedIndexUUIDS == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lowercase s
in UUIDs
.
|
||
AutoFollowMetadata currentAutoFollowMetadata = localState.metaData().custom(AutoFollowMetadata.TYPE); | ||
Map<String, List<String>> followedLeaderIndices; | ||
Map<String, AutoFollowMetadata.AutoFollowPattern> configurations; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call it patterns
?
protected void masterOperation(PutAutoFollowPatternAction.Request request, | ||
ClusterState state, | ||
ActionListener<AcknowledgedResponse> listener) throws Exception { | ||
final Client remoteClient; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us call this leaderClient
.
for (String newPattern : request.getLeaderIndexPatterns()) { | ||
if (previousPattern.getLeaderIndexPatterns().contains(newPattern) == false) { | ||
for (IndexMetaData indexMetaData : remoteClusterState.getMetaData()) { | ||
if (Regex.simpleMatch(newPattern, indexMetaData.getIndex().getName())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are doing matching like this in a few places; can we get a utility class AutoFollowPatterns
now that has a match
method that simply delegates to Regex#simpleMatch
. I want this now so we don't have to hunt through the code later if we ever make the matching more intricate.
} else { | ||
for (IndexMetaData indexMetaData : remoteClusterState.getMetaData()) { | ||
String[] patterns = request.getLeaderIndexPatterns().toArray(new String[0]); | ||
if (Regex.simpleMatch(patterns, indexMetaData.getIndex().getName())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delegate to AutoFollowPatterns#match
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This now looks identical to the markExistingIndicesAsAutoFollowed
that I suggested above so I think we can collapse these into a single implementation parameterized by patterns
. We would have:
diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java
index bff2809f321..f94be2a3b41 100644
--- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java
+++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java
@@ -111,22 +111,11 @@ public class TransportPutAutoFollowPatternAction extends
// Mark existing leader indices as already auto followed:
if (previousPattern != null) {
- for (String newPattern : request.getLeaderIndexPatterns()) {
- if (previousPattern.getLeaderIndexPatterns().contains(newPattern) == false) {
- for (IndexMetaData indexMetaData : remoteClusterState.getMetaData()) {
- if (Regex.simpleMatch(newPattern, indexMetaData.getIndex().getName())) {
- followedIndexUUIDS.add(indexMetaData.getIndexUUID());
- }
- }
- }
- }
+ markExistingIndicesMatchingNewPatternsAsAutoFollowed(
+ previousPattern, request.getLeaderIndexPatterns(), remoteClusterState.metaData(), followedIndexUUIDS);
} else {
- for (IndexMetaData indexMetaData : remoteClusterState.getMetaData()) {
- String[] patterns = request.getLeaderIndexPatterns().toArray(new String[0]);
- if (Regex.simpleMatch(patterns, indexMetaData.getIndex().getName())) {
- followedIndexUUIDS.add(indexMetaData.getIndexUUID());
- }
- }
+ final String[] patterns = request.getLeaderIndexPatterns().toArray(new String[0]);
+ markExistingIndicesMatchingPatternsAsAutoFollowed(patterns, remoteClusterState.metaData(), followedIndexUUIDS);
}
AutoFollowMetadata.AutoFollowPattern autoFollowPattern = new AutoFollowMetadata.AutoFollowPattern(request.getLeaderIndexPatterns(),
@@ -141,6 +130,27 @@ public class TransportPutAutoFollowPatternAction extends
return newState.build();
}
+ private static void markExistingIndicesMatchingNewPatternsAsAutoFollowed(
+ AutoFollowMetadata.AutoFollowPattern previousPattern,
+ List<String> leaderIndexPatterns,
+ MetaData leaderMetaData,
+ List<String> followedIndexUUIDS) {
+ final String[] newPatterns = leaderIndexPatterns
+ .stream()
+ .filter(p -> previousPattern.getLeaderIndexPatterns().contains(p) == false)
+ .toArray(String[]::new);
+ markExistingIndicesMatchingPatternsAsAutoFollowed(newPatterns, leaderMetaData, followedIndexUUIDS);
+ }
+
+ private static void markExistingIndicesMatchingPatternsAsAutoFollowed(
+ String[] patterns, MetaData leaderMetaData, List<String> followedIndexUUIDS) {
+ for (final IndexMetaData indexMetaData : leaderMetaData) {
+ if (Regex.simpleMatch(patterns, indexMetaData.getIndex().getName())) {
+ followedIndexUUIDS.add(indexMetaData.getIndexUUID());
+ }
+ }
+ }
+
@Override
protected ClusterBlockException checkBlock(PutAutoFollowPatternAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
if (previousPattern != null) { | ||
for (String newPattern : request.getLeaderIndexPatterns()) { | ||
if (previousPattern.getLeaderIndexPatterns().contains(newPattern) == false) { | ||
for (IndexMetaData indexMetaData : remoteClusterState.getMetaData()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us collapse some of the nesting here. For example, we can replace this loop with addIndicesMatchingPattern(newPattern, leaderClusterState.getMetaData())
whose implementation is exactly this loop. This method becomes unit testable too.
|
||
// Mark existing leader indices as already auto followed: | ||
if (previousPattern != null) { | ||
for (String newPattern : request.getLeaderIndexPatterns()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collapse this whole loop into a method: markExistingIndicesAsFollowed
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also I think this loop can be rewritten more simply. I would go through two transformations, like this:
diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java
index bff2809f321..e3308c30345 100644
--- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java
+++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java
@@ -111,15 +111,8 @@ public class TransportPutAutoFollowPatternAction extends
// Mark existing leader indices as already auto followed:
if (previousPattern != null) {
- for (String newPattern : request.getLeaderIndexPatterns()) {
- if (previousPattern.getLeaderIndexPatterns().contains(newPattern) == false) {
- for (IndexMetaData indexMetaData : remoteClusterState.getMetaData()) {
- if (Regex.simpleMatch(newPattern, indexMetaData.getIndex().getName())) {
- followedIndexUUIDS.add(indexMetaData.getIndexUUID());
- }
- }
- }
- }
+ markExistingIndicesAsAutoFollowed(
+ request.getLeaderIndexPatterns(), remoteClusterState.metaData(), previousPattern, followedIndexUUIDS);
} else {
for (IndexMetaData indexMetaData : remoteClusterState.getMetaData()) {
String[] patterns = request.getLeaderIndexPatterns().toArray(new String[0]);
@@ -141,6 +134,26 @@ public class TransportPutAutoFollowPatternAction extends
return newState.build();
}
+ private static void markExistingIndicesAsAutoFollowed(
+ List<String> leaderIndexPatterns,
+ MetaData leaderMetaData,
+ AutoFollowMetadata.AutoFollowPattern previousPattern,
+ List<String> followedIndexUUIDS) {
+ for (String newPattern : leaderIndexPatterns) {
+ if (previousPattern.getLeaderIndexPatterns().contains(newPattern) == false) {
+ addIndicesMatchingPattern(leaderMetaData, followedIndexUUIDS, newPattern);
+ }
+ }
+ }
+
+ private static void addIndicesMatchingPattern(MetaData leaderMetaData, List<String> followedIndexUUIDS, String newPattern) {
+ for (IndexMetaData indexMetaData : leaderMetaData) {
+ if (Regex.simpleMatch(newPattern, indexMetaData.getIndex().getName())) {
+ followedIndexUUIDS.add(indexMetaData.getIndexUUID());
+ }
+ }
+ }
+
@Override
protected ClusterBlockException checkBlock(PutAutoFollowPatternAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
And then a second iteration:
diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java
index e3308c30345..73b4d44511f 100644
--- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java
+++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
public class TransportPutAutoFollowPatternAction extends
TransportMasterNodeAction<PutAutoFollowPatternAction.Request, AcknowledgedResponse> {
@@ -139,16 +140,12 @@ public class TransportPutAutoFollowPatternAction extends
MetaData leaderMetaData,
AutoFollowMetadata.AutoFollowPattern previousPattern,
List<String> followedIndexUUIDS) {
- for (String newPattern : leaderIndexPatterns) {
- if (previousPattern.getLeaderIndexPatterns().contains(newPattern) == false) {
- addIndicesMatchingPattern(leaderMetaData, followedIndexUUIDS, newPattern);
- }
- }
- }
-
- private static void addIndicesMatchingPattern(MetaData leaderMetaData, List<String> followedIndexUUIDS, String newPattern) {
+ final String[] newPatterns = leaderIndexPatterns
+ .stream()
+ .filter(p -> previousPattern.getLeaderIndexPatterns().contains(p) == false)
+ .toArray(String[]::new);
for (IndexMetaData indexMetaData : leaderMetaData) {
- if (Regex.simpleMatch(newPattern, indexMetaData.getIndex().getName())) {
+ if (Regex.simpleMatch(newPatterns, indexMetaData.getIndex().getName())) {
followedIndexUUIDS.add(indexMetaData.getIndexUUID());
}
}
removes some of the nesting and lets us drop one of the methods that we just added. Then the final diff looks like:
diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java
index bff2809f321..73b4d44511f 100644
--- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java
+++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
public class TransportPutAutoFollowPatternAction extends
TransportMasterNodeAction<PutAutoFollowPatternAction.Request, AcknowledgedResponse> {
@@ -111,15 +112,8 @@ public class TransportPutAutoFollowPatternAction extends
// Mark existing leader indices as already auto followed:
if (previousPattern != null) {
- for (String newPattern : request.getLeaderIndexPatterns()) {
- if (previousPattern.getLeaderIndexPatterns().contains(newPattern) == false) {
- for (IndexMetaData indexMetaData : remoteClusterState.getMetaData()) {
- if (Regex.simpleMatch(newPattern, indexMetaData.getIndex().getName())) {
- followedIndexUUIDS.add(indexMetaData.getIndexUUID());
- }
- }
- }
- }
+ markExistingIndicesAsAutoFollowed(
+ request.getLeaderIndexPatterns(), remoteClusterState.metaData(), previousPattern, followedIndexUUIDS);
} else {
for (IndexMetaData indexMetaData : remoteClusterState.getMetaData()) {
String[] patterns = request.getLeaderIndexPatterns().toArray(new String[0]);
@@ -141,6 +135,22 @@ public class TransportPutAutoFollowPatternAction extends
return newState.build();
}
+ private static void markExistingIndicesAsAutoFollowed(
+ List<String> leaderIndexPatterns,
+ MetaData leaderMetaData,
+ AutoFollowMetadata.AutoFollowPattern previousPattern,
+ List<String> followedIndexUUIDS) {
+ final String[] newPatterns = leaderIndexPatterns
+ .stream()
+ .filter(p -> previousPattern.getLeaderIndexPatterns().contains(p) == false)
+ .toArray(String[]::new);
+ for (IndexMetaData indexMetaData : leaderMetaData) {
+ if (Regex.simpleMatch(newPatterns, indexMetaData.getIndex().getName())) {
+ followedIndexUUIDS.add(indexMetaData.getIndexUUID());
+ }
+ }
+ }
+
@Override
protected ClusterBlockException checkBlock(PutAutoFollowPatternAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few more comments about put.
|
||
public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject { | ||
|
||
static final ParseField REMOTE_CLUSTER_ALIAS_FIELD = new ParseField("remote_cluster_alias"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remote_cluster
-> leader_cluster
return request; | ||
} | ||
|
||
private String remoteClusterAlias; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leaderClusterAlias
public ActionRequestValidationException validate() { | ||
ActionRequestValidationException validationException = null; | ||
if (remoteClusterAlias == null) { | ||
validationException = addValidationError("remoteClusterAlias is missing", validationException); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leaderClusterAlias
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments about delete auto follow patterns API.
|
||
public static class Request extends AcknowledgedRequest<Request> { | ||
|
||
private String remoteClusterAlias; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leaderClusterAlias
public ActionRequestValidationException validate() { | ||
ActionRequestValidationException validationException = null; | ||
if (remoteClusterAlias == null) { | ||
validationException = addValidationError("remoteClusterAlias is missing", validationException); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leaderClusterAlias
@Override | ||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { | ||
Request request = new Request(); | ||
request.setRemoteClusterAlias(restRequest.param("remote_cluster_alias")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leader_cluster_alias
protected void masterOperation(DeleteAutoFollowPatternAction.Request request, | ||
ClusterState state, | ||
ActionListener<AcknowledgedResponse> listener) throws Exception { | ||
clusterService.submitStateUpdateTask("put_auto_follow_pattern-" + request.getRemoteClusterAlias(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put
-> delete
and _
-> -
so: delete-auto-follow-pattern-
static ClusterState innerDelete(DeleteAutoFollowPatternAction.Request request, ClusterState currentState) { | ||
AutoFollowMetadata currentAutoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE); | ||
if (currentAutoFollowMetadata == null) { | ||
throw new ResourceNotFoundException("auto follow patterns for [{}] cluster alias are missing", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auto follow patterns for [{}] cluster alias are missing
-> no auto-follow patterns for cluster alias [{}] found
if (request.getRemoteClusterAlias().equals(configurationKey)) { | ||
toRemove.add(configurationKey); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused by this loop. Isn't configurations.get(request.getRemoteClusterAlias())
all we need here?
} | ||
} | ||
if (toRemove.isEmpty()) { | ||
throw new ResourceNotFoundException("auto follow patterns for [{}] cluster alias are missing", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should replace this with the same exception message that I suggested above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦♂️ yup
|
||
public RestDeleteAutoFollowPatternAction(Settings settings, RestController controller) { | ||
super(settings); | ||
controller.registerHandler(RestRequest.Method.DELETE, "/_ccr/_autofollow/{remote_cluster_alias}", this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remote_cluster_alias
-> leader_cluster_alias
(and in the specification)
|
||
public RestDeleteAutoFollowPatternAction(Settings settings, RestController controller) { | ||
super(settings); | ||
controller.registerHandler(RestRequest.Method.DELETE, "/_ccr/_autofollow/{remote_cluster_alias}", this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_autofollow
-> auto_follow
final Map<String, AutoFollowMetadata.AutoFollowPattern> configurationsCopy = new HashMap<>(configurations); | ||
final Map<String, List<String>> followedLeaderIndexUUIDSCopy = | ||
new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs()); | ||
for (String key : toRemove) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be one or none, right? And if none, we can return the current cluster state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦♂️ again
@jasontedor I've updated the PR. |
* master: (64 commits) HLREST: add update by query API (elastic#32760) TEST: Increase timeout testFollowIndexAndCloseNode (elastic#33333) HLRC: ML Flush job (elastic#33187) HLRC: Adding ML Job stats (elastic#33183) LLREST: Drop deprecated methods (elastic#33223) Mute testSyncerOnClosingShard [DOCS] Moves machine learning APIs to docs folder (elastic#31118) Mute test watcher usage stats output [Rollup] Fix FullClusterRestart test Adjust soft-deletes version after backport into 6.5 completely drop `index.shard.check_on_startup: fix` for 7.0 (elastic#33194) Fix AwaitsFix issue number Mute SmokeTestWatcherWithSecurityIT testsi drop `index.shard.check_on_startup: fix` (elastic#32279) tracked at [DOCS] Moves ml folder from x-pack/docs to docs (elastic#33248) [DOCS] Move rollup APIs to docs (elastic#31450) [DOCS] Rename X-Pack Commands section (elastic#33005) TEST: Disable soft-deletes in ParentChildTestCase Fixes SecurityIntegTestCase so it always adds at least one alias (elastic#33296) ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work, LGTM. Let us get this in and start work on the follow-ups that we have discussed. We have smoothed this one out enough to iterate on.
} | ||
return false; | ||
public static boolean simpleMatch(final List<String> patterns, final String str) { | ||
// #simpleMatch(String[], String) is likely to be inlined into this method |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Auto Following Patterns is a cross cluster replication feature that keeps track whether in the leader cluster indices are being created with names that match with a specific pattern and if so automatically let the follower cluster follow these newly created indices. This change adds an `AutoFollowCoordinator` component that is only active on the elected master node. Periodically this component checks the the cluster state of remote clusters if there new leader indices that match with configured auto follow patterns that have been defined in `AutoFollowMetadata` custom metadata. This change also adds two new APIs to manage auto follow patterns. A put auto follow pattern api: ``` PUT /_ccr/_autofollow/{{remote_cluster}} { "leader_index_pattern": ["logs-*", ...], "follow_index_pattern": "{{leader_index}}-copy", "max_concurrent_read_batches": 2 ... // other optional parameters } ``` and delete auto follow pattern api: ``` DELETE /_ccr/_autofollow/{{remote_cluster_alias}} ``` The auto follow patterns are directly tied to the remote cluster aliases configured in the follow cluster. Relates to #33007 Co-authored-by: Jason Tedor <jason@tedor.me>
Auto Following Patterns is a cross cluster replication feature that
keeps track whether in the leader cluster indices are being created with
names that match with a specific pattern and if so automatically let
the follower cluster follow these newly created indices.
This change adds an
AutoFollowCoordinator
component that is only activeon the elected master node. Periodically this component checks the
the cluster state of remote clusters if there new leader indices that
match with configured auto follow patterns that have been defined in
AutoFollowMetadata
custom metadata.This change also adds two new APIs to manage auto follow patterns. A put
auto follow pattern api:
and delete auto follow pattern api:
The auto follow patterns are directly tied to the remote cluster aliases
configured in the follow cluster.
Note that this PR is not feature complete and several things will be added
via follow up PRs. (auto follow patterns work correctly with security, component
that cleans up leader index UUIDS from auto follow metadata if leader index have
been removed, etc. see meta issue for more)
Co-authored-by: Jason Tedor jason@tedor.me
Relates to #33007