diff --git a/docs/reference/ccr/auto-follow.asciidoc b/docs/reference/ccr/auto-follow.asciidoc index d072dd8022b12..333bb708cf1da 100644 --- a/docs/reference/ccr/auto-follow.asciidoc +++ b/docs/reference/ccr/auto-follow.asciidoc @@ -7,6 +7,13 @@ each new index in the series is replicated automatically. Whenever the name of a new index on the remote cluster matches the auto-follow pattern, a corresponding follower index is added to the local cluster. +You can also create auto-follow patterns for data streams. When a new backing +index is generated on a remote cluster, that index and its data stream are +automatically followed if the data stream name matches an auto-follow +pattern. If you create a data stream after creating the auto-follow pattern, +all backing indices are followed automatically. + + Auto-follow patterns are especially useful with <>, which might continually create new indices on the cluster containing the leader index. diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java index e6ea994fc5c0a..6c82947faee83 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -29,6 +29,8 @@ import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; @@ -37,6 +39,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.Index; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -156,9 +159,21 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request, mdBuilder.version(currentState.metadata().version()); String[] indices = indexNameExpressionResolver.concreteIndexNames(currentState, request); for (String filteredIndex : indices) { - IndexMetadata indexMetadata = currentState.metadata().index(filteredIndex); - if (indexMetadata != null) { - mdBuilder.put(indexMetadata, false); + // If the requested index is part of a data stream then that data stream should also be included: + IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(filteredIndex); + if (indexAbstraction.getParentDataStream() != null) { + DataStream dataStream = indexAbstraction.getParentDataStream().getDataStream(); + mdBuilder.put(dataStream); + // Also the IMD of other backing indices need to be included, otherwise the cluster state api + // can't create a valid cluster state instance: + for (Index backingIndex : dataStream.getIndices()) { + mdBuilder.put(currentState.metadata().index(backingIndex), false); + } + } else { + IndexMetadata indexMetadata = currentState.metadata().index(filteredIndex); + if (indexMetadata != null) { + mdBuilder.put(indexMetadata, false); + } } } } else { diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index a2ccee0c92f3b..baba7e69832a8 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -86,6 +86,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -184,6 +185,20 @@ public RestoreService(ClusterService clusterService, RepositoriesService reposit * @param listener restore listener */ public void restoreSnapshot(final RestoreSnapshotRequest request, final ActionListener listener) { + restoreSnapshot(request, listener, (clusterState, builder) -> {}); + } + + /** + * Restores snapshot specified in the restore request. + * + * @param request restore request + * @param listener restore listener + * @param updater handler that allows callers to make modifications to {@link Metadata} + * in the same cluster state update as the restore operation + */ + public void restoreSnapshot(final RestoreSnapshotRequest request, + final ActionListener listener, + final BiConsumer updater) { try { // Read snapshot info and metadata from the repository final String repositoryName = request.repository(); @@ -471,6 +486,7 @@ restoreUUID, snapshot, overallState(RestoreInProgress.State.INIT, shards), } RoutingTable rt = rtBuilder.build(); + updater.accept(currentState, mdBuilder); ClusterState updatedState = builder.metadata(mdBuilder).blocks(blocks).routingTable(rt).build(); return allocationService.reroute(updatedState, "restored snapshot [" + snapshot + "]"); } diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 2a2fcca41c51c..b84a13333058e 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -13,18 +13,27 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.common.xcontent.support.XContentMapValues; import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; public class AutoFollowIT extends ESCCRRestTestCase { + private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss", Locale.ROOT); + public void testMultipleAutoFollowPatternsDifferentClusters() throws Exception { if ("follow".equals(targetCluster) == false) { logger.info("skipping test, waiting for target cluster [follow]" ); @@ -64,6 +73,7 @@ public void testMultipleAutoFollowPatternsDifferentClusters() throws Exception { verifyDocuments("logs-20190101", 5, "filtered_field:true"); verifyDocuments("logs-20200101", 5, "filtered_field:true"); }); + deleteAutoFollowPattern("leader_cluster_pattern"); } public void testAutoFollowPatterns() throws Exception { @@ -122,6 +132,7 @@ public void testAutoFollowPatterns() throws Exception { verifyCcrMonitoring("metrics-20210101", "metrics-20210101"); verifyAutoFollowMonitoring(); }, 30, TimeUnit.SECONDS); + deleteAutoFollowPattern("test_pattern"); } public void testPutAutoFollowPatternThatOverridesRequiredLeaderSetting() throws IOException { @@ -163,6 +174,179 @@ public void testPutAutoFollowPatternThatOverridesRequiredLeaderSetting() throws ); } + public void testDataStreams() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + + final int numDocs = 64; + final String dataStreamName = "logs-mysql-error"; + + int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); + + // Create auto follow pattern + Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern"); + try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) { + bodyBuilder.startObject(); + { + bodyBuilder.startArray("leader_index_patterns"); + { + bodyBuilder.value("logs-*"); + } + bodyBuilder.endArray(); + bodyBuilder.field("remote_cluster", "leader_cluster"); + } + bodyBuilder.endObject(); + request.setJsonEntity(Strings.toString(bodyBuilder)); + } + assertOK(client().performRequest(request)); + + // Create data stream and ensure that is is auto followed + { + try (RestClient leaderClient = buildLeaderClient()) { + for (int i = 0; i < numDocs; i++) { + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + } + verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001"); + verifyDocuments(leaderClient, dataStreamName, numDocs); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); + verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001"); + ensureYellow(dataStreamName); + verifyDocuments(client(), dataStreamName, numDocs); + }); + } + + // First rollover and ensure second backing index is replicated: + { + try (RestClient leaderClient = buildLeaderClient()) { + Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); + assertOK(leaderClient.performRequest(rolloverRequest)); + verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002"); + + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + verifyDocuments(leaderClient, dataStreamName, numDocs + 1); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2)); + verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002"); + ensureYellow(dataStreamName); + verifyDocuments(client(), dataStreamName, numDocs + 1); + }); + } + + // Second rollover and ensure third backing index is replicated: + { + try (RestClient leaderClient = buildLeaderClient()) { + Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); + assertOK(leaderClient.performRequest(rolloverRequest)); + verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002", "" + + ".ds-logs-mysql-error-000003"); + + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + verifyDocuments(leaderClient, dataStreamName, numDocs + 2); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 3)); + verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002", + ".ds-logs-mysql-error-000003"); + ensureYellow(dataStreamName); + verifyDocuments(client(), dataStreamName, numDocs + 2); + }); + } + // Cleanup: + { + deleteAutoFollowPattern("test_pattern"); + deleteDataStream(dataStreamName); + } + } + + public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + + final int initialNumDocs = 16; + final String dataStreamName = "logs-syslog-prod"; + int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); + // Initialize data stream prior to auto following + { + try (RestClient leaderClient = buildLeaderClient()) { + for (int i = 0; i < initialNumDocs; i++) { + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + } + verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001"); + verifyDocuments(leaderClient, dataStreamName, initialNumDocs); + } + } + // Create auto follow pattern + { + Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern"); + try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) { + bodyBuilder.startObject(); + { + bodyBuilder.startArray("leader_index_patterns"); + { + bodyBuilder.value("logs-*"); + } + bodyBuilder.endArray(); + bodyBuilder.field("remote_cluster", "leader_cluster"); + } + bodyBuilder.endObject(); + request.setJsonEntity(Strings.toString(bodyBuilder)); + } + assertOK(client().performRequest(request)); + } + // Rollover and ensure only second backing index is replicated: + { + try (RestClient leaderClient = buildLeaderClient()) { + Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); + assertOK(leaderClient.performRequest(rolloverRequest)); + verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000002"); + + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + verifyDocuments(leaderClient, dataStreamName, initialNumDocs + 1); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); + verifyDataStream(client(), dataStreamName, ".ds-logs-syslog-prod-000002"); + ensureYellow(dataStreamName); + verifyDocuments(client(), dataStreamName, 1); + }); + } + // Explicitly follow the first backing index and check that the data stream in follow cluster is updated correctly: + { + followIndex(".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000001"); + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); + verifyDataStream(client(), dataStreamName, ".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000002"); + ensureYellow(dataStreamName); + verifyDocuments(client(), dataStreamName, initialNumDocs + 1); + }); + } + // Cleanup: + { + deleteAutoFollowPattern("test_pattern"); + deleteDataStream(dataStreamName); + } + } + private int getNumberOfSuccessfulFollowedIndices() throws IOException { Request statsRequest = new Request("GET", "/_ccr/stats"); Map response = toMap(client().performRequest(statsRequest)); @@ -170,5 +354,38 @@ private int getNumberOfSuccessfulFollowedIndices() throws IOException { return (Integer) response.get("number_of_successful_follow_indices"); } + private static void verifyDocuments(final RestClient client, + final String index, + final int expectedNumDocs) throws IOException { + final Request request = new Request("GET", "/" + index + "/_search"); + request.addParameter(TOTAL_HITS_AS_INT_PARAM, "true"); + Map response = toMap(client.performRequest(request)); + + int numDocs = (int) XContentMapValues.extractValue("hits.total", response); + assertThat(index, numDocs, equalTo(expectedNumDocs)); + } + + static void verifyDataStream(final RestClient client, + final String name, + final String... expectedBackingIndices) throws IOException { + Request request = new Request("GET", "/_data_stream/" + name); + Map response = toMap(client.performRequest(request)); + List retrievedDataStreams = (List) response.get("data_streams"); + assertThat(retrievedDataStreams, hasSize(1)); + List actualBackingIndices = (List) ((Map) retrievedDataStreams.get(0)).get("indices"); + assertThat(actualBackingIndices, hasSize(expectedBackingIndices.length)); + for (int i = 0; i < expectedBackingIndices.length; i++) { + Map actualBackingIndex = (Map) actualBackingIndices.get(i); + String expectedBackingIndex = expectedBackingIndices[i]; + assertThat(actualBackingIndex.get("index_name"), equalTo(expectedBackingIndex)); + } + } + + private void deleteDataStream(String name) throws IOException { + try (RestClient leaderClient = buildLeaderClient()) { + Request deleteTemplateRequest = new Request("DELETE", "/_data_stream/" + name); + assertOK(leaderClient.performRequest(deleteTemplateRequest)); + } + } } diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java index 3880632d36f62..1acd0d4dc7ca8 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -15,6 +15,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.xpack.ccr.AutoFollowIT.verifyDataStream; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; @@ -123,4 +124,39 @@ public void testFollowNonExistingLeaderIndex() throws Exception { assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404)); } + public void testFollowDataStreamFails() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + + final String dataStreamName = "logs-syslog-prod"; + try (RestClient leaderClient = buildLeaderClient()) { + Request request = new Request("PUT", "/_data_stream/" + dataStreamName); + assertOK(leaderClient.performRequest(request)); + verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001"); + } + + ResponseException failure = expectThrows(ResponseException.class, () -> followIndex(dataStreamName, dataStreamName)); + assertThat(failure.getResponse().getStatusLine().getStatusCode(), equalTo(400)); + assertThat(failure.getMessage(), containsString("cannot follow [logs-syslog-prod], because it is a DATA_STREAM")); + } + + public void testChangeBackingIndexNameFails() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + + final String dataStreamName = "logs-foobar-prod"; + try (RestClient leaderClient = buildLeaderClient()) { + Request request = new Request("PUT", "/_data_stream/" + dataStreamName); + assertOK(leaderClient.performRequest(request)); + verifyDataStream(leaderClient, dataStreamName, ".ds-logs-foobar-prod-000001"); + } + + ResponseException failure = expectThrows(ResponseException.class, + () -> followIndex(".ds-logs-foobar-prod-000001", ".ds-logs-barbaz-prod-000001")); + assertThat(failure.getResponse().getStatusLine().getStatusCode(), equalTo(400)); + assertThat(failure.getMessage(), containsString("a backing index name in the local and remote cluster must remain the same")); + } + } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index 6073a618ea69f..04b2347eb7001 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -23,9 +23,12 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.FilterClient; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.IndexNotFoundException; @@ -109,11 +112,11 @@ public boolean isCcrAllowed() { * @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards */ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( - final Client client, - final String clusterAlias, - final String leaderIndex, - final Consumer onFailure, - final BiConsumer consumer) { + final Client client, + final String clusterAlias, + final String leaderIndex, + final Consumer onFailure, + final BiConsumer> consumer) { final ClusterStateRequest request = new ClusterStateRequest(); request.clear(); @@ -127,20 +130,35 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( onFailure, remoteClusterStateResponse -> { ClusterState remoteClusterState = remoteClusterStateResponse.getState(); - IndexMetadata leaderIndexMetadata = remoteClusterState.getMetadata().index(leaderIndex); + final IndexMetadata leaderIndexMetadata = remoteClusterState.getMetadata().index(leaderIndex); if (leaderIndexMetadata == null) { - onFailure.accept(new IndexNotFoundException(leaderIndex)); + final IndexAbstraction indexAbstraction = remoteClusterState.getMetadata().getIndicesLookup().get(leaderIndex); + final Exception failure; + if (indexAbstraction == null) { + failure = new IndexNotFoundException(leaderIndex); + } else { + // provided name may be an alias or data stream and in that case throw a specific error: + String message = String.format(Locale.ROOT, + "cannot follow [%s], because it is a %s", + leaderIndex, indexAbstraction.getType() + ); + failure = new IllegalArgumentException(message); + } + onFailure.accept(failure); return; } if (leaderIndexMetadata.getState() == IndexMetadata.State.CLOSE) { onFailure.accept(new IndexClosedException(leaderIndexMetadata.getIndex())); return; } + IndexAbstraction indexAbstraction = remoteClusterState.getMetadata().getIndicesLookup().get(leaderIndex); + final DataStream remoteDataStream = indexAbstraction.getParentDataStream() != null ? + indexAbstraction.getParentDataStream().getDataStream() : null; final Client remoteClient = client.getRemoteClusterClient(clusterAlias); hasPrivilegesToFollowIndices(remoteClient, new String[] {leaderIndex}, e -> { if (e == null) { fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetadata, onFailure, historyUUIDs -> - consumer.accept(historyUUIDs, leaderIndexMetadata)); + consumer.accept(historyUUIDs, Tuple.tuple(leaderIndexMetadata, remoteDataStream))); } else { onFailure.accept(e); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 6af8affaefcab..4e036f7d21c68 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -105,7 +106,7 @@ public AutoFollowCoordinator( this.relativeMillisTimeProvider = relativeMillisTimeProvider; this.absoluteMillisTimeProvider = absoluteMillisTimeProvider; this.executor = Objects.requireNonNull(executor); - this.recentAutoFollowErrors = new LinkedHashMap>() { + this.recentAutoFollowErrors = new LinkedHashMap<>() { @Override protected boolean removeEldestEntry(final Map.Entry> eldest) { return size() > MAX_AUTO_FOLLOW_ERRORS; @@ -496,8 +497,9 @@ private void checkAutoFollowPattern(String autoFollowPattenName, leaderIndicesToFollow.size()); for (final Index indexToFollow : leaderIndicesToFollow) { + IndexAbstraction indexAbstraction = remoteMetadata.getIndicesLookup().get(indexToFollow.getName()); List otherMatchingPatterns = patternsForTheSameRemoteCluster.stream() - .filter(otherPattern -> otherPattern.v2().match(indexToFollow.getName())) + .filter(otherPattern -> otherPattern.v2().match(indexAbstraction)) .map(Tuple::v1) .collect(Collectors.toList()); if (otherMatchingPatterns.size() != 0) { @@ -615,7 +617,9 @@ static List getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern, if (leaderIndexMetadata.getState() != IndexMetadata.State.OPEN) { continue; } - if (autoFollowPattern.isActive() && autoFollowPattern.match(leaderIndexMetadata.getIndex().getName())) { + IndexAbstraction indexAbstraction = + remoteClusterState.getMetadata().getIndicesLookup().get(leaderIndexMetadata.getIndex().getName()); + if (autoFollowPattern.isActive() && autoFollowPattern.match(indexAbstraction)) { IndexRoutingTable indexRoutingTable = remoteClusterState.routingTable().index(leaderIndexMetadata.getIndex()); if (indexRoutingTable != null && // Leader indices can be in the cluster state, but not all primary shards may be ready yet. @@ -624,7 +628,6 @@ static List getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern, // this index will be auto followed. indexRoutingTable.allPrimaryShardsActive() && followedIndexUUIDs.contains(leaderIndexMetadata.getIndex().getUUID()) == false) { - leaderIndicesToFollow.add(leaderIndexMetadata.getIndex()); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index 65bdb5e8417c9..df8734575b47b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; @@ -199,7 +200,8 @@ private static void markExistingIndicesAsAutoFollowed( List followedIndexUUIDS) { for (final IndexMetadata indexMetadata : leaderMetadata) { - if (AutoFollowPattern.match(patterns, indexMetadata.getIndex().getName())) { + IndexAbstraction indexAbstraction = leaderMetadata.getIndicesLookup().get(indexMetadata.getIndex().getName()); + if (AutoFollowPattern.match(patterns, indexAbstraction)) { followedIndexUUIDS.add(indexMetadata.getIndexUUID()); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index 99a10d954fbbb..556f7a4a626b6 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -20,12 +20,15 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.snapshots.RestoreInfo; @@ -39,8 +42,12 @@ import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; import java.util.Locale; import java.util.Objects; +import java.util.function.BiConsumer; public final class TransportPutFollowAction extends TransportMasterNodeAction { @@ -97,11 +104,12 @@ protected void masterOperation( remoteCluster, leaderIndex, listener::onFailure, - (historyUUID, leaderIndexMetadata) -> createFollowerIndex(leaderIndexMetadata, request, listener)); + (historyUUID, tuple) -> createFollowerIndex(tuple.v1(), tuple.v2(), request, listener)); } private void createFollowerIndex( final IndexMetadata leaderIndexMetadata, + final DataStream remoteDataStream, final PutFollowAction.Request request, final ActionListener listener) { if (leaderIndexMetadata == null) { @@ -125,6 +133,16 @@ private void createFollowerIndex( return; } + if (remoteDataStream != null) { + // when following a backing index then the names of the backing index must be remain the same in the local + // and remote cluster. + if (request.getLeaderIndex().equals(request.getFollowerIndex()) == false) { + listener.onFailure( + new IllegalArgumentException("a backing index name in the local and remote cluster must remain the same")); + return; + } + } + final Settings overrideSettings = Settings.builder() .put(IndexMetadata.SETTING_INDEX_PROVIDED_NAME, request.getFollowerIndex()) .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) @@ -147,9 +165,24 @@ public void onFailure(Exception e) { @Override protected void doRun() { - restoreService.restoreSnapshot(restoreRequest, - ActionListener.delegateFailure(listener, - (delegatedListener, response) -> afterRestoreStarted(clientWithHeaders, request, delegatedListener, response))); + ActionListener delegatelistener = ActionListener.delegateFailure( + listener, + (delegatedListener, response) -> afterRestoreStarted(clientWithHeaders, request, delegatedListener, response) + ); + if (remoteDataStream == null) { + restoreService.restoreSnapshot(restoreRequest, delegatelistener); + } else { + String followerIndexName = request.getFollowerIndex(); + BiConsumer updater = (currentState, mdBuilder) -> { + DataStream localDataStream = currentState.getMetadata().dataStreams().get(remoteDataStream.getName()); + Index followerIndex = mdBuilder.get(followerIndexName).getIndex(); + assert followerIndex != null; + + DataStream updatedDataStream = updateLocalDataStream(followerIndex, localDataStream, remoteDataStream); + mdBuilder.put(updatedDataStream); + }; + restoreService.restoreSnapshot(restoreRequest, delegatelistener, updater); + } } }); } @@ -160,7 +193,7 @@ private void afterRestoreStarted(Client clientWithHeaders, PutFollowAction.Reque final ActionListener listener; if (ActiveShardCount.NONE.equals(request.waitForActiveShards())) { originalListener.onResponse(new PutFollowAction.Response(true, false, false)); - listener = new ActionListener() { + listener = new ActionListener<>() { @Override public void onResponse(PutFollowAction.Response response) { @@ -210,6 +243,29 @@ private void initiateFollowing( )); } + static DataStream updateLocalDataStream(Index backingIndexToFollow, + DataStream localDataStream, + DataStream remoteDataStream) { + if (localDataStream == null) { + // The data stream and the backing indices have been created and validated in the remote cluster, + // just copying the data stream is in this case safe. + return new DataStream(remoteDataStream.getName(), remoteDataStream.getTimeStampField(), + List.of(backingIndexToFollow), remoteDataStream.getGeneration(), remoteDataStream.getMetadata()); + } else { + List backingIndices = new ArrayList<>(localDataStream.getIndices()); + backingIndices.add(backingIndexToFollow); + + // When following an older backing index it should be positioned before the newer backing indices. + // Currently the assumption is that the newest index (highest generation) is the write index. + // (just appending an older backing index to the list of backing indices would break that assumption) + // (string sorting works because of the naming backing index naming scheme) + backingIndices.sort(Comparator.comparing(Index::getName)); + + return new DataStream(localDataStream.getName(), localDataStream.getTimeStampField(), backingIndices, + remoteDataStream.getGeneration(), remoteDataStream.getMetadata()); + } + } + @Override protected ClusterBlockException checkBlock(final PutFollowAction.Request request, final ClusterState state) { return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowerIndex()); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java index 62787fc703fdf..b3247dd246ed5 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java @@ -133,7 +133,7 @@ protected void masterOperation(final ResumeFollowAction.Request request, listener::onFailure, (leaderHistoryUUID, leaderIndexMetadata) -> { try { - start(request, leaderCluster, leaderIndexMetadata, followerIndexMetadata, leaderHistoryUUID, listener); + start(request, leaderCluster, leaderIndexMetadata.v1(), followerIndexMetadata, leaderHistoryUUID, listener); } catch (final IOException e) { listener.onFailure(e); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index bdb30ed4e7355..09676579b5910 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -170,6 +171,93 @@ void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List pa assertThat(invoked[0], is(true)); } + public void testAutoFollower_dataStream() { + Client client = mock(Client.class); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + + ClusterState remoteState = createRemoteClusterStateWithDataStream("logs-foobar"); + + AutoFollowPattern autoFollowPattern = new AutoFollowPattern( + "remote", + Collections.singletonList("logs-*"), + null, + Settings.EMPTY, + true, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + Map patterns = new HashMap<>(); + patterns.put("remote", autoFollowPattern); + Map> followedLeaderIndexUUIDS = new HashMap<>(); + followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); + Map> autoFollowHeaders = new HashMap<>(); + autoFollowHeaders.put("remote", Map.of("key", "val")); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, autoFollowHeaders); + + ClusterState currentState = ClusterState.builder(new ClusterName("name")) + .metadata(Metadata.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) + .build(); + + boolean[] invoked = new boolean[]{false}; + Consumer> handler = results -> { + invoked[0] = true; + + assertThat(results.size(), equalTo(1)); + assertThat(results.get(0).clusterStateFetchException, nullValue()); + List> entries = new ArrayList<>(results.get(0).autoFollowExecutionResults.entrySet()); + assertThat(entries.size(), equalTo(1)); + assertThat(entries.get(0).getKey().getName(), equalTo(".ds-logs-foobar-000001")); + assertThat(entries.get(0).getValue(), nullValue()); + }; + AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(currentState), () -> 1L, Runnable::run) { + @Override + void getRemoteClusterState(String remoteCluster, + long metadataVersion, + BiConsumer handler) { + assertThat(remoteCluster, equalTo("remote")); + handler.accept(new ClusterStateResponse(new ClusterName("name"), remoteState, false), null); + } + + @Override + void createAndFollow(Map headers, + PutFollowAction.Request followRequest, + Runnable successHandler, + Consumer failureHandler) { + assertThat(headers, equalTo(autoFollowHeaders.get("remote"))); + assertThat(followRequest.getRemoteCluster(), equalTo("remote")); + assertThat(followRequest.getLeaderIndex(), equalTo(".ds-logs-foobar-000001")); + assertThat(followRequest.getFollowerIndex(), equalTo(".ds-logs-foobar-000001")); + assertThat(followRequest.masterNodeTimeout(), equalTo(TimeValue.MAX_VALUE)); + successHandler.run(); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, + Consumer handler) { + ClusterState resultCs = updateFunction.apply(currentState); + AutoFollowMetadata result = resultCs.metadata().custom(AutoFollowMetadata.TYPE); + assertThat(result.getFollowedLeaderIndexUUIDs().size(), equalTo(1)); + assertThat(result.getFollowedLeaderIndexUUIDs().get("remote").size(), equalTo(1)); + handler.accept(null); + } + + @Override + void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List patterns) { + // Ignore, to avoid invoking updateAutoFollowMetadata(...) twice + } + }; + autoFollower.start(); + assertThat(invoked[0], is(true)); + } + public void testAutoFollowerClusterStateApiFailure() { Client client = mock(Client.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); @@ -2010,4 +2098,30 @@ private ClusterService mockClusterService() { return clusterService; } + private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName) { + Settings.Builder indexSettings = settings(Version.CURRENT); + indexSettings.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())); + indexSettings.put("index.hidden", true); + + IndexMetadata indexMetadata = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1)) + .settings(indexSettings) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + DataStream dataStream = new DataStream(dataStreamName, new DataStream.TimestampField("@timestamp"), + List.of(indexMetadata.getIndex())); + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote")) + .metadata(Metadata.builder() + .put(indexMetadata, true) + .put(dataStream) + .version(0L)); + + ShardRouting shardRouting = + TestShardRouting.newShardRouting(dataStreamName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted(); + IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()).addShard(shardRouting).build(); + csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build(); + + return csBuilder.build(); + } + } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java new file mode 100644 index 0000000000000..585a8f62ace1e --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java @@ -0,0 +1,90 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStream.TimestampField; +import org.elasticsearch.index.Index; +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.equalTo; + +public class TransportPutFollowActionTests extends ESTestCase { + + public void testCreateNewLocalDataStream() { + DataStream remoteDataStream = generateDataSteam("logs-foobar", 3); + Index backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 1); + DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, null, remoteDataStream); + assertThat(result.getName(), equalTo(remoteDataStream.getName())); + assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField())); + assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration())); + assertThat(result.getIndices().size(), equalTo(1)); + assertThat(result.getIndices().get(0), equalTo(backingIndexToFollow)); + } + + public void testUpdateLocalDataStream_followNewBackingIndex() { + DataStream remoteDataStream = generateDataSteam("logs-foobar", 3); + DataStream localDataStream = generateDataSteam("logs-foobar", 2); + Index backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 1); + DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream); + assertThat(result.getName(), equalTo(remoteDataStream.getName())); + assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField())); + assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration())); + assertThat(result.getIndices().size(), equalTo(3)); + assertThat(result.getIndices().get(0).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 1))); + assertThat(result.getIndices().get(1).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 2))); + assertThat(result.getIndices().get(2).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 3))); + } + + public void testUpdateLocalDataStream_followOlderBackingIndex() { + // follow first backing index: + DataStream remoteDataStream = generateDataSteam("logs-foobar", 5); + DataStream localDataStream = generateDataSteam("logs-foobar", 5, DataStream.getDefaultBackingIndexName("logs-foobar", 5)); + Index backingIndexToFollow = remoteDataStream.getIndices().get(0); + DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream); + assertThat(result.getName(), equalTo(remoteDataStream.getName())); + assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField())); + assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration())); + assertThat(result.getIndices().size(), equalTo(2)); + assertThat(result.getIndices().get(0).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 1))); + assertThat(result.getIndices().get(1).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 5))); + + // follow second last backing index: + localDataStream = result; + backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 2); + result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream); + assertThat(result.getName(), equalTo(remoteDataStream.getName())); + assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField())); + assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration())); + assertThat(result.getIndices().size(), equalTo(3)); + assertThat(result.getIndices().get(0).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 1))); + assertThat(result.getIndices().get(1).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 4))); + assertThat(result.getIndices().get(2).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 5))); + } + + static DataStream generateDataSteam(String name, int numBackingIndices) { + List backingIndices = IntStream.range(1, numBackingIndices + 1) + .mapToObj(value -> DataStream.getDefaultBackingIndexName(name, value)) + .map(value -> new Index(value, "uuid")) + .collect(Collectors.toList()); + return new DataStream(name, new TimestampField("@timestamp"), backingIndices); + } + + static DataStream generateDataSteam(String name, int generation, String... backingIndexNames) { + List backingIndices = Arrays.stream(backingIndexNames) + .map(value -> new Index(value, "uuid")) + .collect(Collectors.toList()); + return new DataStream(name, new TimestampField("@timestamp"), backingIndices, generation, Map.of()); + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java index 863404d03bf5c..16c66f4f288ca 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java @@ -8,6 +8,7 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractNamedDiffable; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; @@ -276,12 +277,18 @@ private AutoFollowPattern(String remoteCluster, List leaderIndexPatterns } } - public boolean match(String indexName) { - return match(leaderIndexPatterns, indexName); + public boolean match(IndexAbstraction indexAbstraction) { + return match(leaderIndexPatterns, indexAbstraction); } - public static boolean match(List leaderIndexPatterns, String indexName) { - return Regex.simpleMatch(leaderIndexPatterns, indexName); + public static boolean match(List leaderIndexPatterns, IndexAbstraction indexAbstraction) { + boolean matches = Regex.simpleMatch(leaderIndexPatterns, indexAbstraction.getName()); + if (matches) { + return true; + } else { + return indexAbstraction.getParentDataStream() != null && + Regex.simpleMatch(leaderIndexPatterns, indexAbstraction.getParentDataStream().getName()); + } } public String getRemoteCluster() { diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java index d429543f8ded5..a01b3b7f5d351 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java @@ -10,6 +10,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; @@ -35,6 +36,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; @@ -1139,6 +1141,18 @@ public void testDataStreamMetadata() throws Exception { assertThat(dataStream.getMetadata(), equalTo(Map.of("managed_by", "core-features"))); } + public void testClusterStateIncludeDataStream() throws Exception { + putComposableIndexTemplate("id1", List.of("metrics-foo*")); + CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("metrics-foo"); + client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get(); + + // when querying a backing index then the data stream should be included as well. + ClusterStateRequest request = new ClusterStateRequest().indices(".ds-metrics-foo-000001"); + ClusterState state = client().admin().cluster().state(request).get().getState(); + assertThat(state.metadata().dataStreams().size(), equalTo(1)); + assertThat(state.metadata().dataStreams().get("metrics-foo").getName(), equalTo("metrics-foo")); + } + private static void verifyResolvability(String dataStream, ActionRequestBuilder requestBuilder, boolean fail) { verifyResolvability(dataStream, requestBuilder, fail, 0); }