Skip to content

Commit

Permalink
Add data stream support to CCR (elastic#61993)
Browse files Browse the repository at this point in the history
This commit adds support data stream support to CCR's auto following by making the following changes:
* When the auto follow coordinator iterates over the candidate indices to follow,
  the auto follow coordinator also checks whether the index is part of a data stream and
  if the name of data stream also matches with the auto follow pattern then the index
  will be auto followed.
* When following an index, the put follow api also checks whether that index is part
  of a data stream and if so then also replicates the data stream definition to the
  local cluster.
* In order for the follow index api to determine whether an index is part of a data
  stream, the cluster state api was modified to also fetch the data stream definition
  of the cluster state if only the state is queried for specific indices.

When a data stream is auto followed, only new backing indices are auto followed.
This is in line with how time based indices patterns are replicated today. This
means that the data stream isn't copied 1 to 1 into the local cluster. The local
cluster's data stream definition contains the same name, timestamp field and
generation, but the list of backing indices may be different (depending on when
a data stream was auto followed).

Closes elastic#56259
  • Loading branch information
martijnvg committed Nov 3, 2020
1 parent 508d6f6 commit ce75bad
Show file tree
Hide file tree
Showing 14 changed files with 621 additions and 26 deletions.
7 changes: 7 additions & 0 deletions docs/reference/ccr/auto-follow.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ each new index in the series is replicated automatically. Whenever the name of
a new index on the remote cluster matches the auto-follow pattern, a
corresponding follower index is added to the local cluster.

You can also create auto-follow patterns for data streams. When a new backing
index is generated on a remote cluster, that index and its data stream are
automatically followed if the data stream name matches an auto-follow
pattern. If you create a data stream after creating the auto-follow pattern,
all backing indices are followed automatically.


Auto-follow patterns are especially useful with
<<index-lifecycle-management,{ilm-cap}>>, which might continually create
new indices on the cluster containing the leader index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
Expand All @@ -37,6 +39,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -156,9 +159,21 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request,
mdBuilder.version(currentState.metadata().version());
String[] indices = indexNameExpressionResolver.concreteIndexNames(currentState, request);
for (String filteredIndex : indices) {
IndexMetadata indexMetadata = currentState.metadata().index(filteredIndex);
if (indexMetadata != null) {
mdBuilder.put(indexMetadata, false);
// If the requested index is part of a data stream then that data stream should also be included:
IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(filteredIndex);
if (indexAbstraction.getParentDataStream() != null) {
DataStream dataStream = indexAbstraction.getParentDataStream().getDataStream();
mdBuilder.put(dataStream);
// Also the IMD of other backing indices need to be included, otherwise the cluster state api
// can't create a valid cluster state instance:
for (Index backingIndex : dataStream.getIndices()) {
mdBuilder.put(currentState.metadata().index(backingIndex), false);
}
} else {
IndexMetadata indexMetadata = currentState.metadata().index(filteredIndex);
if (indexMetadata != null) {
mdBuilder.put(indexMetadata, false);
}
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -184,6 +185,20 @@ public RestoreService(ClusterService clusterService, RepositoriesService reposit
* @param listener restore listener
*/
public void restoreSnapshot(final RestoreSnapshotRequest request, final ActionListener<RestoreCompletionResponse> listener) {
restoreSnapshot(request, listener, (clusterState, builder) -> {});
}

/**
* Restores snapshot specified in the restore request.
*
* @param request restore request
* @param listener restore listener
* @param updater handler that allows callers to make modifications to {@link Metadata}
* in the same cluster state update as the restore operation
*/
public void restoreSnapshot(final RestoreSnapshotRequest request,
final ActionListener<RestoreCompletionResponse> listener,
final BiConsumer<ClusterState, Metadata.Builder> updater) {
try {
// Read snapshot info and metadata from the repository
final String repositoryName = request.repository();
Expand Down Expand Up @@ -471,6 +486,7 @@ restoreUUID, snapshot, overallState(RestoreInProgress.State.INIT, shards),
}

RoutingTable rt = rtBuilder.build();
updater.accept(currentState, mdBuilder);
ClusterState updatedState = builder.metadata(mdBuilder).blocks(blocks).routingTable(rt).build();
return allocationService.reroute(updatedState, "restored snapshot [" + snapshot + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,27 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;

public class AutoFollowIT extends ESCCRRestTestCase {

private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss", Locale.ROOT);

public void testMultipleAutoFollowPatternsDifferentClusters() throws Exception {
if ("follow".equals(targetCluster) == false) {
logger.info("skipping test, waiting for target cluster [follow]" );
Expand Down Expand Up @@ -64,6 +73,7 @@ public void testMultipleAutoFollowPatternsDifferentClusters() throws Exception {
verifyDocuments("logs-20190101", 5, "filtered_field:true");
verifyDocuments("logs-20200101", 5, "filtered_field:true");
});
deleteAutoFollowPattern("leader_cluster_pattern");
}

public void testAutoFollowPatterns() throws Exception {
Expand Down Expand Up @@ -122,6 +132,7 @@ public void testAutoFollowPatterns() throws Exception {
verifyCcrMonitoring("metrics-20210101", "metrics-20210101");
verifyAutoFollowMonitoring();
}, 30, TimeUnit.SECONDS);
deleteAutoFollowPattern("test_pattern");
}

public void testPutAutoFollowPatternThatOverridesRequiredLeaderSetting() throws IOException {
Expand Down Expand Up @@ -163,12 +174,218 @@ public void testPutAutoFollowPatternThatOverridesRequiredLeaderSetting() throws
);
}

public void testDataStreams() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}

final int numDocs = 64;
final String dataStreamName = "logs-mysql-error";

int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();

// Create auto follow pattern
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
bodyBuilder.startObject();
{
bodyBuilder.startArray("leader_index_patterns");
{
bodyBuilder.value("logs-*");
}
bodyBuilder.endArray();
bodyBuilder.field("remote_cluster", "leader_cluster");
}
bodyBuilder.endObject();
request.setJsonEntity(Strings.toString(bodyBuilder));
}
assertOK(client().performRequest(request));

// Create data stream and ensure that is is auto followed
{
try (RestClient leaderClient = buildLeaderClient()) {
for (int i = 0; i < numDocs; i++) {
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
}
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001");
verifyDocuments(leaderClient, dataStreamName, numDocs);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001");
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, numDocs);
});
}

// First rollover and ensure second backing index is replicated:
{
try (RestClient leaderClient = buildLeaderClient()) {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002");

Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
verifyDocuments(leaderClient, dataStreamName, numDocs + 1);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2));
verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002");
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, numDocs + 1);
});
}

// Second rollover and ensure third backing index is replicated:
{
try (RestClient leaderClient = buildLeaderClient()) {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002", "" +
".ds-logs-mysql-error-000003");

Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
verifyDocuments(leaderClient, dataStreamName, numDocs + 2);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 3));
verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002",
".ds-logs-mysql-error-000003");
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, numDocs + 2);
});
}
// Cleanup:
{
deleteAutoFollowPattern("test_pattern");
deleteDataStream(dataStreamName);
}
}

public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}

final int initialNumDocs = 16;
final String dataStreamName = "logs-syslog-prod";
int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
// Initialize data stream prior to auto following
{
try (RestClient leaderClient = buildLeaderClient()) {
for (int i = 0; i < initialNumDocs; i++) {
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
}
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001");
verifyDocuments(leaderClient, dataStreamName, initialNumDocs);
}
}
// Create auto follow pattern
{
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
bodyBuilder.startObject();
{
bodyBuilder.startArray("leader_index_patterns");
{
bodyBuilder.value("logs-*");
}
bodyBuilder.endArray();
bodyBuilder.field("remote_cluster", "leader_cluster");
}
bodyBuilder.endObject();
request.setJsonEntity(Strings.toString(bodyBuilder));
}
assertOK(client().performRequest(request));
}
// Rollover and ensure only second backing index is replicated:
{
try (RestClient leaderClient = buildLeaderClient()) {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000002");

Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
verifyDocuments(leaderClient, dataStreamName, initialNumDocs + 1);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
verifyDataStream(client(), dataStreamName, ".ds-logs-syslog-prod-000002");
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, 1);
});
}
// Explicitly follow the first backing index and check that the data stream in follow cluster is updated correctly:
{
followIndex(".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000001");
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
verifyDataStream(client(), dataStreamName, ".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000002");
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, initialNumDocs + 1);
});
}
// Cleanup:
{
deleteAutoFollowPattern("test_pattern");
deleteDataStream(dataStreamName);
}
}

private int getNumberOfSuccessfulFollowedIndices() throws IOException {
Request statsRequest = new Request("GET", "/_ccr/stats");
Map<?, ?> response = toMap(client().performRequest(statsRequest));
response = (Map<?, ?>) response.get("auto_follow_stats");
return (Integer) response.get("number_of_successful_follow_indices");
}

private static void verifyDocuments(final RestClient client,
final String index,
final int expectedNumDocs) throws IOException {
final Request request = new Request("GET", "/" + index + "/_search");
request.addParameter(TOTAL_HITS_AS_INT_PARAM, "true");
Map<String, ?> response = toMap(client.performRequest(request));

int numDocs = (int) XContentMapValues.extractValue("hits.total", response);
assertThat(index, numDocs, equalTo(expectedNumDocs));
}

static void verifyDataStream(final RestClient client,
final String name,
final String... expectedBackingIndices) throws IOException {
Request request = new Request("GET", "/_data_stream/" + name);
Map<String, ?> response = toMap(client.performRequest(request));
List<?> retrievedDataStreams = (List<?>) response.get("data_streams");
assertThat(retrievedDataStreams, hasSize(1));
List<?> actualBackingIndices = (List<?>) ((Map<?, ?>) retrievedDataStreams.get(0)).get("indices");
assertThat(actualBackingIndices, hasSize(expectedBackingIndices.length));
for (int i = 0; i < expectedBackingIndices.length; i++) {
Map<?, ?> actualBackingIndex = (Map<?, ?>) actualBackingIndices.get(i);
String expectedBackingIndex = expectedBackingIndices[i];
assertThat(actualBackingIndex.get("index_name"), equalTo(expectedBackingIndex));
}
}

private void deleteDataStream(String name) throws IOException {
try (RestClient leaderClient = buildLeaderClient()) {
Request deleteTemplateRequest = new Request("DELETE", "/_data_stream/" + name);
assertOK(leaderClient.performRequest(deleteTemplateRequest));
}
}

}
Loading

0 comments on commit ce75bad

Please sign in to comment.