Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport support for replicating closed indices to 7.x #39506

Merged
merged 20 commits into from
Mar 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
4c17697
[RCI] Add NoOpEngine for closed indices (#33903)
tlrx Sep 26, 2018
9eab2c7
[RCI] Adapt NoOpEngine to latest FrozenEngine changes
tlrx Nov 8, 2018
f3232f3
Relax NoOpEngine constraints (#37413)
tlrx Jan 23, 2019
a4ee55c
Fix compilation error in IndexShardIT after merge with master
tlrx Jan 29, 2019
7c6f8e9
Allow shards of closed indices to be replicated as regular shards (#3…
tlrx Feb 4, 2019
37e5e88
Adapt testPendingTasks() for replicated closed indices (#38326)
tlrx Feb 11, 2019
224ee2e
Adapt testIndexCanChangeCustomDataPath for replicated closed indices …
tlrx Feb 11, 2019
7a0c02e
Do not schedule Refresh/Translog/GlobalCheckpoint tasks for closed in…
tlrx Feb 11, 2019
c56d9bc
Mute CloseFollowerIndexIT.testCloseAndReopenFollowerIndex()
tlrx Feb 15, 2019
b28acaa
Remove index routing table of closed indices in mixed versions cluste…
tlrx Feb 18, 2019
4ec4db2
Test the Cluster Shard Allocation Explain API with closed indices (#3…
tlrx Feb 18, 2019
4d080e9
Add replica to primary promotion test for closed indices (#39110)
tlrx Feb 21, 2019
855d83e
Adapt more tests suites to closed indices (#39186)
tlrx Feb 22, 2019
95d9e9a
Adapt the Recovery API for closed indices (#38421)
tlrx Feb 25, 2019
dfe8463
Recover closed indices after a full cluster restart (#39249)
tlrx Feb 26, 2019
6fc3590
Adapt CloseFollowerIndexIT for replicated closed indices (#38767)
tlrx Feb 26, 2019
2b0ee91
Adapt the Cluster Health API to closed indices (#39364)
tlrx Feb 26, 2019
719d905
Wait for shards to be active after closing indices (#38854)
tlrx Feb 26, 2019
fa346c6
(7.x) BWC layer for replicated closed indices
tlrx Feb 27, 2019
f8be7f0
Apply feedback
tlrx Mar 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.WarningFailureException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Strings;
Expand All @@ -47,6 +48,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -65,8 +67,11 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;

/**
Expand Down Expand Up @@ -1022,8 +1027,98 @@ public void testSoftDeletes() throws Exception {
}
}

private void checkSnapshot(final String snapshotName, final int count, final Version tookOnVersion)
throws IOException {
/**
* This test creates an index in the old cluster and then closes it. When the cluster is fully restarted in a newer version,
* it verifies that the index exists and is replicated if the old version supports replication.
*/
public void testClosedIndices() throws Exception {
if (isRunningAgainstOldCluster()) {
createIndex(index, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build());
ensureGreen(index);

int numDocs = 0;
if (randomBoolean()) {
numDocs = between(1, 100);
for (int i = 0; i < numDocs; i++) {
final Request request = new Request("POST", "/" + index + "/_doc/" + i);
request.setJsonEntity(Strings.toString(JsonXContent.contentBuilder().startObject().field("field", "v1").endObject()));
assertOK(client().performRequest(request));
if (rarely()) {
refresh();
}
}
refresh();
}

assertTotalHits(numDocs, entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
saveInfoDocument(index + "_doc_count", Integer.toString(numDocs));
closeIndex(index);
}

if (getOldClusterVersion().onOrAfter(Version.V_7_1_0)) {
ensureGreenLongWait(index);
assertClosedIndex(index, true);
} else {
assertClosedIndex(index, false);
}

if (isRunningAgainstOldCluster() == false) {
openIndex(index);
ensureGreen(index);

final int expectedNumDocs = Integer.parseInt(loadInfoDocument(index + "_doc_count"));
assertTotalHits(expectedNumDocs, entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
}
}

/**
* Asserts that an index is closed in the cluster state. If `checkRoutingTable` is true, it also asserts
* that the index has started shards.
*/
@SuppressWarnings("unchecked")
private void assertClosedIndex(final String index, final boolean checkRoutingTable) throws IOException {
final Map<String, ?> state = entityAsMap(client().performRequest(new Request("GET", "/_cluster/state")));

final Map<String, ?> metadata = (Map<String, Object>) XContentMapValues.extractValue("metadata.indices." + index, state);
assertThat(metadata, notNullValue());
assertThat(metadata.get("state"), equalTo("close"));

final Map<String, ?> blocks = (Map<String, Object>) XContentMapValues.extractValue("blocks.indices." + index, state);
assertThat(blocks, notNullValue());
assertThat(blocks.containsKey(String.valueOf(MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID)), is(true));

final Map<String, ?> settings = (Map<String, Object>) XContentMapValues.extractValue("settings", metadata);
assertThat(settings, notNullValue());

final Map<String, ?> routingTable = (Map<String, Object>) XContentMapValues.extractValue("routing_table.indices." + index, state);
if (checkRoutingTable) {
assertThat(routingTable, notNullValue());
assertThat(Booleans.parseBoolean((String) XContentMapValues.extractValue("index.verified_before_close", settings)), is(true));
final String numberOfShards = (String) XContentMapValues.extractValue("index.number_of_shards", settings);
assertThat(numberOfShards, notNullValue());
final int nbShards = Integer.parseInt(numberOfShards);
assertThat(nbShards, greaterThanOrEqualTo(1));

for (int i = 0; i < nbShards; i++) {
final Collection<Map<String, ?>> shards =
(Collection<Map<String, ?>>) XContentMapValues.extractValue("shards." + i, routingTable);
assertThat(shards, notNullValue());
assertThat(shards.size(), equalTo(2));
for (Map<String, ?> shard : shards) {
assertThat(XContentMapValues.extractValue("shard", shard), equalTo(i));
assertThat(XContentMapValues.extractValue("state", shard), equalTo("STARTED"));
assertThat(XContentMapValues.extractValue("index", shard), equalTo(index));
}
}
} else {
assertThat(routingTable, nullValue());
assertThat(XContentMapValues.extractValue("index.verified_before_close", settings), nullValue());
}
}

private void checkSnapshot(final String snapshotName, final int count, final Version tookOnVersion) throws IOException {
// Check the snapshot metadata, especially the version
Request listSnapshotRequest = new Request("GET", "/_snapshot/repo/" + snapshotName);
Map<String, Object> listSnapshotResponse = entityAsMap(client().performRequest(listSnapshotRequest));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,21 @@
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.document.RestIndexAction;
import org.elasticsearch.test.rest.yaml.ObjectPath;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.function.Predicate;
Expand All @@ -43,7 +49,9 @@
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

/**
* In depth testing of the recovery mechanism during a rolling restart.
Expand Down Expand Up @@ -310,4 +318,148 @@ public void testRecoveryWithSoftDeletes() throws Exception {
}
ensureGreen(index);
}

/**
* This test creates an index in the non upgraded cluster and closes it. It then checks that the index
* is effectively closed and potentially replicated (if the version the index was created on supports
* the replication of closed indices) during the rolling upgrade.
*/
public void testRecoveryClosedIndex() throws Exception {
final String indexName = "closed_index_created_on_old";
if (CLUSTER_TYPE == ClusterType.OLD) {
createIndex(indexName, Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
// if the node with the replica is the first to be restarted, while a replica is still recovering
// then delayed allocation will kick in. When the node comes back, the master will search for a copy
// but the recovering copy will be seen as invalid and the cluster health won't return to GREEN
// before timing out
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
.build());
ensureGreen(indexName);
closeIndex(indexName);
}

final Version indexVersionCreated = indexVersionCreated(indexName);
if (indexVersionCreated.onOrAfter(Version.V_7_1_0)) {
// index was created on a version that supports the replication of closed indices,
// so we expect the index to be closed and replicated
ensureGreen(indexName);
assertClosedIndex(indexName, true);
} else {
assertClosedIndex(indexName, false);
}
}

/**
* This test creates and closes a new index at every stage of the rolling upgrade. It then checks that the index
* is effectively closed and potentially replicated if the cluster supports replication of closed indices at the
* time the index was closed.
*/
public void testCloseIndexDuringRollingUpgrade() throws Exception {
final Version minimumNodeVersion = minimumNodeVersion();
final String indexName =
String.join("_", "index", CLUSTER_TYPE.toString(), Integer.toString(minimumNodeVersion.id)).toLowerCase(Locale.ROOT);

final Request indexExistsRequest = new Request("HEAD", "/" + indexName);
indexExistsRequest.setOptions(allowTypesRemovalWarnings());

final Response indexExistsResponse = client().performRequest(indexExistsRequest);
if (RestStatus.OK.getStatus() != indexExistsResponse.getStatusLine().getStatusCode()) {
createIndex(indexName, Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.build());
ensureGreen(indexName);
closeIndex(indexName);
}

if (minimumNodeVersion.onOrAfter(Version.V_7_1_0)) {
// index is created on a version that supports the replication of closed indices,
// so we expect the index to be closed and replicated
ensureGreen(indexName);
assertClosedIndex(indexName, true);
} else {
assertClosedIndex(indexName, false);
}
}

/**
* Returns the version in which the given index has been created
*/
private static Version indexVersionCreated(final String indexName) throws IOException {
final Request request = new Request("GET", "/" + indexName + "/_settings");
final String versionCreatedSetting = indexName + ".settings.index.version.created";
request.addParameter("filter_path", versionCreatedSetting);

final Response response = client().performRequest(request);
return Version.fromId(Integer.parseInt(ObjectPath.createFromResponse(response).evaluate(versionCreatedSetting)));
}

/**
* Returns the minimum node version among all nodes of the cluster
*/
private static Version minimumNodeVersion() throws IOException {
final Request request = new Request("GET", "_nodes");
request.addParameter("filter_path", "nodes.*.version");

final Response response = client().performRequest(request);
final Map<String, Object> nodes = ObjectPath.createFromResponse(response).evaluate("nodes");

Version minVersion = null;
for (Map.Entry<String, Object> node : nodes.entrySet()) {
@SuppressWarnings("unchecked")
Version nodeVersion = Version.fromString((String) ((Map<String, Object>) node.getValue()).get("version"));
if (minVersion == null || minVersion.after(nodeVersion)) {
minVersion = nodeVersion;
}
}
assertNotNull(minVersion);
return minVersion;
}

/**
* Asserts that an index is closed in the cluster state. If `checkRoutingTable` is true, it also asserts
* that the index has started shards.
*/
@SuppressWarnings("unchecked")
private void assertClosedIndex(final String index, final boolean checkRoutingTable) throws IOException {
final Map<String, ?> state = entityAsMap(client().performRequest(new Request("GET", "/_cluster/state")));

final Map<String, ?> metadata = (Map<String, Object>) XContentMapValues.extractValue("metadata.indices." + index, state);
assertThat(metadata, notNullValue());
assertThat(metadata.get("state"), equalTo("close"));

final Map<String, ?> blocks = (Map<String, Object>) XContentMapValues.extractValue("blocks.indices." + index, state);
assertThat(blocks, notNullValue());
assertThat(blocks.containsKey(String.valueOf(MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID)), is(true));

final Map<String, ?> settings = (Map<String, Object>) XContentMapValues.extractValue("settings", metadata);
assertThat(settings, notNullValue());

final int numberOfShards = Integer.parseInt((String) XContentMapValues.extractValue("index.number_of_shards", settings));
final int numberOfReplicas = Integer.parseInt((String) XContentMapValues.extractValue("index.number_of_replicas", settings));

final Map<String, ?> routingTable = (Map<String, Object>) XContentMapValues.extractValue("routing_table.indices." + index, state);
if (checkRoutingTable) {
assertThat(routingTable, notNullValue());
assertThat(Booleans.parseBoolean((String) XContentMapValues.extractValue("index.verified_before_close", settings)), is(true));

for (int i = 0; i < numberOfShards; i++) {
final Collection<Map<String, ?>> shards =
(Collection<Map<String, ?>>) XContentMapValues.extractValue("shards." + i, routingTable);
assertThat(shards, notNullValue());
assertThat(shards.size(), equalTo(numberOfReplicas + 1));
for (Map<String, ?> shard : shards) {
assertThat(XContentMapValues.extractValue("shard", shard), equalTo(i));
assertThat(XContentMapValues.extractValue("state", shard), equalTo("STARTED"));
assertThat(XContentMapValues.extractValue("index", shard), equalTo(index));
}
}
} else {
assertThat(routingTable, nullValue());
assertThat(XContentMapValues.extractValue("index.verified_before_close", settings), nullValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
}
},
"params": {
"expand_wildcards": {
"type" : "enum",
"options" : ["open","closed","none","all"],
"default" : "all",
"description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both."
},
"level": {
"type" : "enum",
"options" : ["cluster","indices","shards"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
"options" : ["open","closed","none","all"],
"default" : "open",
"description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both."
},
"wait_for_active_shards": {
"type" : "string",
"description" : "Sets the number of active shards to wait for before the operation returns."
}
}
},
Expand Down
Loading