Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCR] Make auto follow patterns work with security #33501

Merged
merged 16 commits into from
Sep 17, 2018
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion x-pack/plugin/ccr/qa/multi-cluster-with-security/roles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ ccruser:
cluster:
- manage_ccr
indices:
- names: [ 'allowed-index' ]
- names: [ 'allowed-index', 'logs-eu-*' ]
privileges:
- monitor
- read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ccr;

import org.apache.http.HttpHost;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
Expand Down Expand Up @@ -119,6 +120,45 @@ public void testFollowIndex() throws Exception {
}
}

public void testAutoFollowPatterns() throws Exception {
assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster);
String allowedIndex = "logs-eu-20190101";
String disallowedIndex = "logs-us-20190101";

Request request = new Request("PUT", "/_ccr/auto_follow/leader_cluster");
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"]}");
assertOK(client().performRequest(request));

try (RestClient leaderClient = buildLeaderClient()) {
for (String index : new String[]{allowedIndex, disallowedIndex}) {
Settings settings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.build();
String requestBody = "{\"settings\": " + Strings.toString(settings) +
", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }";
request = new Request("PUT", "/" + index);
request.setJsonEntity(requestBody);
assertOK(leaderClient.performRequest(request));

for (int i = 0; i < 5; i++) {
String id = Integer.toString(i);
index(leaderClient, index, id, "field", i, "filtered_field", "true");
}
}
}

assertBusy(() -> {
ensureYellow(allowedIndex);
verifyDocuments(adminClient(), allowedIndex, 5);
});
assertThat(indexExists(adminClient(), disallowedIndex), is(false));

// Cleanup by deleting auto follow pattern and unfollowing:
request = new Request("DELETE", "/_ccr/auto_follow/leader_cluster");
assertOK(client().performRequest(request));
unfollowIndex(allowedIndex);
}

private int countCcrNodeTasks() throws IOException {
final Request request = new Request("GET", "/_tasks");
request.addParameter("detailed", "true");
Expand All @@ -139,14 +179,18 @@ private int countCcrNodeTasks() throws IOException {
}

private static void index(String index, String id, Object... fields) throws IOException {
index(adminClient(), index, id, fields);
}

private static void index(RestClient client, String index, String id, Object... fields) throws IOException {
XContentBuilder document = jsonBuilder().startObject();
for (int i = 0; i < fields.length; i += 2) {
document.field((String) fields[i], fields[i + 1]);
}
document.endObject();
final Request request = new Request("POST", "/" + index + "/_doc/" + id);
request.setJsonEntity(Strings.toString(document));
assertOK(adminClient().performRequest(request));
assertOK(client.performRequest(request));
}

private static void refresh(String index) throws IOException {
Expand Down Expand Up @@ -201,11 +245,34 @@ protected static void createIndex(String name, Settings settings, String mapping
assertOK(adminClient().performRequest(request));
}

private static void ensureYellow(String index) throws IOException {
Request request = new Request("GET", "/_cluster/health/" + index);
request.addParameter("wait_for_status", "yellow");
request.addParameter("wait_for_no_relocating_shards", "true");
request.addParameter("wait_for_no_initializing_shards", "true");
request.addParameter("timeout", "70s");
request.addParameter("level", "shards");
adminClient().performRequest(request);
}

private RestClient buildLeaderClient() throws IOException {
assert runningAgainstLeaderCluster == false;
String leaderUrl = System.getProperty("tests.leader_host");
int portSeparator = leaderUrl.lastIndexOf(':');
HttpHost httpHost = new HttpHost(leaderUrl.substring(0, portSeparator),
Integer.parseInt(leaderUrl.substring(portSeparator + 1)), getProtocol());
return buildClient(restAdminSettings(), new HttpHost[]{httpHost});
}

private static boolean indexExists(RestClient client, String index) throws IOException {
Response response = client.performRequest(new Request("HEAD", "/" + index));
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
}

private static void unfollowIndex(String followIndex) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/unfollow")));
}

private static void verifyCcrMonitoring(String expectedLeaderIndex, String expectedFollowerIndex) throws IOException {
ensureYellow(".monitoring-*");

Expand Down Expand Up @@ -239,14 +306,4 @@ private static void verifyCcrMonitoring(String expectedLeaderIndex, String expec
assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1));
}

private static void ensureYellow(String index) throws IOException {
Request request = new Request("GET", "/_cluster/health/" + index);
request.addParameter("wait_for_status", "yellow");
request.addParameter("wait_for_no_relocating_shards", "true");
request.addParameter("wait_for_no_initializing_shards", "true");
request.addParameter("timeout", "70s");
request.addParameter("level", "shards");
adminClient().performRequest(request);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,43 @@
package org.elasticsearch.xpack.ccr;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.core.XPackPlugin;

import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* Encapsulates licensing checking for CCR.
Expand Down Expand Up @@ -93,6 +103,7 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU
request.indices(leaderIndex);
checkRemoteClusterLicenseAndFetchClusterState(
client,
Collections.emptyMap(),
clusterAlias,
request,
onFailure,
Expand All @@ -115,19 +126,22 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU
*
* @param client the client
* @param clusterAlias the remote cluster alias
* @param headers the headers to use for leader client
* @param request the cluster state request
* @param onFailure the failure consumer
* @param leaderClusterStateConsumer the leader cluster state consumer
* @param <T> the type of response the listener is waiting for
*/
public <T> void checkRemoteClusterLicenseAndFetchClusterState(
final Client client,
final Map<String, String> headers,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super happy with the changes made to this file. Maybe the caller should provide the leaderClient? So that headers and clusterAlias does not need to be provided?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed offline, this is okay for now, we can always refactor this later. The implementation is fine, we just want to take a step back and look at how ML uses the remote license checker too (multiple clusters) which has kind of forced how we implement this.

final String clusterAlias,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer) {
checkRemoteClusterLicenseAndFetchClusterState(
client,
headers,
clusterAlias,
request,
onFailure,
Expand All @@ -144,6 +158,7 @@ public <T> void checkRemoteClusterLicenseAndFetchClusterState(
*
* @param client the client
* @param clusterAlias the remote cluster alias
* @param headers the headers to use for leader client
* @param request the cluster state request
* @param onFailure the failure consumer
* @param leaderClusterStateConsumer the leader cluster state consumer
Expand All @@ -153,6 +168,7 @@ public <T> void checkRemoteClusterLicenseAndFetchClusterState(
*/
private <T> void checkRemoteClusterLicenseAndFetchClusterState(
final Client client,
final Map<String, String> headers,
final String clusterAlias,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
Expand All @@ -167,7 +183,7 @@ private <T> void checkRemoteClusterLicenseAndFetchClusterState(
@Override
public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
if (licenseCheck.isSuccess()) {
final Client leaderClient = client.getRemoteClusterClient(clusterAlias);
final Client leaderClient = wrapClient(client.getRemoteClusterClient(clusterAlias), headers);
final ActionListener<ClusterStateResponse> clusterStateListener =
ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure);
// following an index in remote cluster, so use remote client to fetch leader index metadata
Expand Down Expand Up @@ -237,6 +253,33 @@ public void fetchLeaderHistoryUUIDs(
leaderClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure));
}

public static Client wrapClient(Client client, Map<String, String> headers) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved it here, because I did not want that this code depends on ccr action code.

if (headers.isEmpty()) {
return client;
} else {
final ThreadContext threadContext = client.threadPool().getThreadContext();
Map<String, String> filteredHeaders = headers.entrySet().stream()
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return new FilterClient(client) {
@Override
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = stashWithHeaders(threadContext, filteredHeaders)) {
super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener));
}
}
};
}
}

private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map<String, String> headers) {
final ThreadContext.StoredContext storedContext = threadContext.stashContext();
threadContext.copyHeaders(headers.entrySet());
return storedContext;
}

private static ElasticsearchStatusException indexMetadataNonCompliantRemoteLicense(
final String leaderIndex, final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
final String clusterAlias = licenseCheck.remoteClusterLicenseInfo().clusterAlias();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,22 @@ private void doAutoFollow() {
AutoFollower operation = new AutoFollower(handler, followerClusterState) {

@Override
void getLeaderClusterState(final String leaderClusterAlias, final BiConsumer<ClusterState, Exception> handler) {
void getLeaderClusterState(final Map<String, String> headers,
final String leaderClusterAlias,
final BiConsumer<ClusterState, Exception> handler) {
final ClusterStateRequest request = new ClusterStateRequest();
request.clear();
request.metaData(true);

if ("_local_".equals(leaderClusterAlias)) {
Client client = CcrLicenseChecker.wrapClient(AutoFollowCoordinator.this.client, headers);
client.admin().cluster().state(
request, ActionListener.wrap(r -> handler.accept(r.getState(), null), e -> handler.accept(null, e)));
} else {
final Client leaderClient = client.getRemoteClusterClient(leaderClusterAlias);
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
leaderClient,
client,
headers,
leaderClusterAlias,
request,
e -> handler.accept(null, e),
Expand All @@ -125,15 +128,22 @@ void getLeaderClusterState(final String leaderClusterAlias, final BiConsumer<Clu
}

@Override
void createAndFollow(FollowIndexAction.Request followRequest,
void createAndFollow(Map<String, String> headers,
FollowIndexAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler) {
client.execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest),
ActionListener.wrap(r -> successHandler.run(), failureHandler));
Client followerClient = CcrLicenseChecker.wrapClient(client, headers);
CreateAndFollowIndexAction.Request request = new CreateAndFollowIndexAction.Request(followRequest);
followerClient.execute(
CreateAndFollowIndexAction.INSTANCE,
request,
ActionListener.wrap(r -> successHandler.run(), failureHandler)
);
}

@Override
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler) {
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction,
Consumer<Exception> handler) {
clusterService.submitStateUpdateTask("update_auto_follow_metadata", new ClusterStateUpdateTask() {

@Override
Expand Down Expand Up @@ -188,7 +198,7 @@ void autoFollowIndices() {
AutoFollowPattern autoFollowPattern = entry.getValue();
List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias);

getLeaderClusterState(clusterAlias, (leaderClusterState, e) -> {
getLeaderClusterState(autoFollowPattern.getHeaders(), clusterAlias, (leaderClusterState, e) -> {
if (leaderClusterState != null) {
assert e == null;
handleClusterAlias(clusterAlias, autoFollowPattern, followedIndices, leaderClusterState);
Expand Down Expand Up @@ -251,7 +261,7 @@ private void handleClusterAlias(String clusterAlias, AutoFollowPattern autoFollo
finalise(followError);
}
};
createAndFollow(followRequest, successHandler, failureHandler);
createAndFollow(autoFollowPattern.getHeaders(), followRequest, successHandler, failureHandler);
}
}
}
Expand Down Expand Up @@ -314,14 +324,27 @@ static Function<ClusterState, ClusterState> recordLeaderIndexAsFollowFunction(St
/**
* Fetch the cluster state from the leader with the specified cluster alias
*
* @param headers the client headers
* @param leaderClusterAlias the cluster alias of the leader
* @param handler the callback to invoke
*/
abstract void getLeaderClusterState(String leaderClusterAlias, BiConsumer<ClusterState, Exception> handler);

abstract void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer<Exception> failureHandler);

abstract void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler);
abstract void getLeaderClusterState(
Map<String, String> headers,
String leaderClusterAlias,
BiConsumer<ClusterState, Exception> handler
);

abstract void createAndFollow(
Map<String, String> headers,
FollowIndexAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler
);

abstract void updateAutoFollowMetadata(
Function<ClusterState, ClusterState> updateFunction,
Consumer<Exception> handler
);

}
}
Loading