Skip to content

Commit

Permalink
Migrate muted auto follow rolling upgrade test and unmute this test (e…
Browse files Browse the repository at this point in the history
…lastic#38900)

The rest of `CCRIT` is now no longer relevant, because the remaining
test tests the same of the index following test in the rolling upgrade
multi cluster module.

Added `tests.upgrade_from_version` version to test. It is not needed
in this branch, but is in 6.7 branch.

Closes elastic#37231
  • Loading branch information
martijnvg committed Feb 15, 2019
1 parent cb973c8 commit 12eac68
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ObjectPath;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.rest.RestStatus;

Expand Down Expand Up @@ -90,6 +91,123 @@ public void testIndexFollowing() throws Exception {
}
}

public void testAutoFollowing() throws Exception {
String leaderIndex1 = "logs-20200101";
String leaderIndex2 = "logs-20200102";
String leaderIndex3 = "logs-20200103";

if (clusterName == ClusterName.LEADER) {
switch (upgradeState) {
case NONE:
case ONE_THIRD:
case TWO_THIRD:
break;
case ALL:
index(leaderClient(), leaderIndex1, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex1;
assertTotalHitCount(followerIndex, 320, followerClient());
});
index(leaderClient(), leaderIndex2, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex2;
assertTotalHitCount(followerIndex, 256, followerClient());
});
index(leaderClient(), leaderIndex3, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex3;
assertTotalHitCount(followerIndex, 192, followerClient());
});

deleteAutoFollowPattern(followerClient(), "test_pattern");
stopIndexFollowing(followerClient(), "copy-" + leaderIndex1);
stopIndexFollowing(followerClient(), "copy-" + leaderIndex2);
stopIndexFollowing(followerClient(), "copy-" + leaderIndex3);
break;
default:
throw new AssertionError("unexpected upgrade_state [" + upgradeState + "]");
}
} else if (clusterName == ClusterName.FOLLOWER) {
switch (upgradeState) {
case NONE:
putAutoFollowPattern(followerClient(), "test_pattern", "leader", "logs-*");
createLeaderIndex(leaderClient(), leaderIndex1);
index(leaderClient(), leaderIndex1, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex1;
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(1));
assertTotalHitCount(followerIndex, 64, followerClient());
});
break;
case ONE_THIRD:
index(leaderClient(), leaderIndex1, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex1;
assertTotalHitCount(followerIndex, 128, followerClient());
});
// Auto follow stats are kept in-memory on master elected node
// and if this node get updated then auto follow stats are reset
{
int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
createLeaderIndex(leaderClient(), leaderIndex2);
index(leaderClient(), leaderIndex2, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex2;
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(previousNumberOfSuccessfulFollowedIndices + 1));
assertTotalHitCount(followerIndex, 64, followerClient());
});
}
break;
case TWO_THIRD:
index(leaderClient(), leaderIndex1, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex1;
assertTotalHitCount(followerIndex, 192, followerClient());
});
index(leaderClient(), leaderIndex2, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex2;
assertTotalHitCount(followerIndex, 128, followerClient());
});

// Auto follow stats are kept in-memory on master elected node
// and if this node get updated then auto follow stats are reset
{
int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
createLeaderIndex(leaderClient(), leaderIndex3);
index(leaderClient(), leaderIndex3, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex3;
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(previousNumberOfSuccessfulFollowedIndices + 1));
assertTotalHitCount(followerIndex, 64, followerClient());
});
}
break;
case ALL:
index(leaderClient(), leaderIndex1, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex1;
assertTotalHitCount(followerIndex, 256, followerClient());
});
index(leaderClient(), leaderIndex2, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex2;
assertTotalHitCount(followerIndex, 192, followerClient());
});
index(leaderClient(), leaderIndex3, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex3;
assertTotalHitCount(followerIndex, 128, followerClient());
});
break;
default:
throw new UnsupportedOperationException("unexpected upgrade state [" + upgradeState + "]");
}
} else {
throw new AssertionError("unexpected cluster_name [" + clusterName + "]");
}
}

public void testCannotFollowLeaderInUpgradedCluster() throws Exception {
assumeTrue("Tests only runs with upgrade_state [all]", upgradeState == UpgradeState.ALL);
assumeTrue("Put follow api does not restore from ccr repository before 6.7.0",
Expand Down Expand Up @@ -117,12 +235,11 @@ public void testCannotFollowLeaderInUpgradedCluster() throws Exception {
}

private static void createLeaderIndex(RestClient client, String indexName) throws IOException {
Settings indexSettings = Settings.builder()
Settings.Builder indexSettings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.build();
createIndex(client, indexName, indexSettings);
.put("index.number_of_replicas", 0);
createIndex(client, indexName, indexSettings.build());
}

private static void createIndex(RestClient client, String name, Settings settings) throws IOException {
Expand All @@ -138,6 +255,29 @@ private static void followIndex(RestClient client, String leaderCluster, String
assertOK(client.performRequest(request));
}

private static void putAutoFollowPattern(RestClient client, String name, String remoteCluster, String pattern) throws IOException {
Request request = new Request("PUT", "/_ccr/auto_follow/" + name);
request.setJsonEntity("{\"leader_index_patterns\": [\"" + pattern + "\"], \"remote_cluster\": \"" + remoteCluster + "\"," +
"\"follow_index_pattern\": \"copy-{{leader_index}}\", \"read_poll_timeout\": \"10ms\"}");
assertOK(client.performRequest(request));
}

private static void deleteAutoFollowPattern(RestClient client, String patternName) throws IOException {
Request request = new Request("DELETE", "/_ccr/auto_follow/" + patternName);
assertOK(client.performRequest(request));
}

private int getNumberOfSuccessfulFollowedIndices() throws IOException {
Request statsRequest = new Request("GET", "/_ccr/stats");
Map<?, ?> response = toMap(client().performRequest(statsRequest));
Integer actualSuccessfulFollowedIndices = ObjectPath.eval("auto_follow_stats.number_of_successful_follow_indices", response);
if (actualSuccessfulFollowedIndices != null) {
return actualSuccessfulFollowedIndices;
} else {
return -1;
}
}

private static void index(RestClient client, String index, int numDocs) throws IOException {
for (int i = 0; i < numDocs; i++) {
final Request request = new Request("POST", "/" + index + "/_doc/");
Expand Down Expand Up @@ -171,4 +311,10 @@ private static void verifyTotalHitCount(final String index,
assertThat(totalHits, equalTo(expectedTotalHits));
}

private static void stopIndexFollowing(RestClient client, String followerIndex) throws IOException {
assertOK(client.performRequest(new Request("POST", "/" + followerIndex + "/_ccr/pause_follow")));
assertOK(client.performRequest(new Request("POST", "/" + followerIndex + "/_close")));
assertOK(client.performRequest(new Request("POST", "/" + followerIndex + "/_ccr/unfollow")));
}

}
Loading

0 comments on commit 12eac68

Please sign in to comment.