Skip to content

Commit 20c6be9

Browse files
authored
Add pre-upgrade check to test cluster routing allocation is enabled (#39340) (#39817)
When following the steps mentioned in upgrade guide https://www.elastic.co/guide/en/elastic-stack/6.6/upgrading-elastic-stack.html if we disable the cluster shard allocation but fail to enable it after upgrading the nodes and plugins, the next step of upgrading internal indices fails. As we did not check the bulk request response for reindexing, we delete the old index assuming it has been created. This is fatal as we cannot recover from this state. This commit adds a pre-upgrade check to test the cluster shard allocation setting and fail upgrade if it is disabled. In case there are search or bulk failures then we remove the read-only block and fail the upgrade index request. Closes #39339
1 parent b0ff4d6 commit 20c6be9

File tree

7 files changed

+168
-57
lines changed

7 files changed

+168
-57
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/upgrade/IndexUpgradeCheckVersion.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
package org.elasticsearch.xpack.core.upgrade;
77

88
public final class IndexUpgradeCheckVersion {
9-
public static final int UPRADE_VERSION = 6;
9+
public static final int UPGRADE_VERSION = 6;
1010

1111
private IndexUpgradeCheckVersion() {}
1212

x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@
55
*/
66
package org.elasticsearch.xpack.upgrade;
77

8+
import org.elasticsearch.ElasticsearchException;
89
import org.elasticsearch.action.ActionListener;
910
import org.elasticsearch.client.Client;
1011
import org.elasticsearch.cluster.ClusterState;
1112
import org.elasticsearch.cluster.metadata.IndexMetaData;
13+
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
14+
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.Allocation;
1215
import org.elasticsearch.cluster.service.ClusterService;
1316
import org.elasticsearch.common.component.AbstractComponent;
1417
import org.elasticsearch.index.reindex.BulkByScrollResponse;
@@ -19,7 +22,6 @@
1922
import org.elasticsearch.xpack.core.upgrade.IndexUpgradeCheckVersion;
2023

2124
import java.util.function.BiConsumer;
22-
import java.util.function.Consumer;
2325
import java.util.function.Function;
2426

2527
/**
@@ -52,7 +54,17 @@ public IndexUpgradeCheck(String name,
5254
Function<IndexMetaData, UpgradeActionRequired> actionRequired,
5355
Client client, ClusterService clusterService, String[] types, Script updateScript) {
5456
this(name, actionRequired, client, clusterService, types, updateScript,
55-
listener -> listener.onResponse(null), (t, listener) -> listener.onResponse(TransportResponse.Empty.INSTANCE));
57+
(cs, listener) -> {
58+
Allocation clusterRoutingAllocation = EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING
59+
.get(cs.getMetaData().settings());
60+
if (Allocation.NONE == clusterRoutingAllocation) {
61+
listener.onFailure(new ElasticsearchException(
62+
"pre-upgrade check failed, please enable cluster routing allocation using setting [{}]",
63+
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey()));
64+
} else {
65+
listener.onResponse(null);
66+
}
67+
}, (t, listener) -> listener.onResponse(TransportResponse.Empty.INSTANCE));
5668
}
5769

5870
/**
@@ -70,11 +82,11 @@ public IndexUpgradeCheck(String name,
7082
public IndexUpgradeCheck(String name,
7183
Function<IndexMetaData, UpgradeActionRequired> actionRequired,
7284
Client client, ClusterService clusterService, String[] types, Script updateScript,
73-
Consumer<ActionListener<T>> preUpgrade,
85+
BiConsumer<ClusterState, ActionListener<T>> preUpgrade,
7486
BiConsumer<T, ActionListener<TransportResponse.Empty>> postUpgrade) {
7587
this.name = name;
7688
this.actionRequired = actionRequired;
77-
this.reindexer = new InternalIndexReindexer<>(client, clusterService, IndexUpgradeCheckVersion.UPRADE_VERSION, updateScript,
89+
this.reindexer = new InternalIndexReindexer<>(client, clusterService, IndexUpgradeCheckVersion.UPGRADE_VERSION, updateScript,
7890
types, preUpgrade, postUpgrade);
7991
}
8092

@@ -107,4 +119,9 @@ public void upgrade(TaskId task, IndexMetaData indexMetaData, ClusterState state
107119
ActionListener<BulkByScrollResponse> listener) {
108120
reindexer.upgrade(task, indexMetaData.getIndex().getName(), state, listener);
109121
}
122+
123+
// pkg scope for testing
124+
InternalIndexReindexer getInternalIndexReindexer() {
125+
return reindexer;
126+
}
110127
}

x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java

Lines changed: 56 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
*/
66
package org.elasticsearch.xpack.upgrade;
77

8+
import org.apache.logging.log4j.LogManager;
9+
import org.apache.logging.log4j.Logger;
10+
import org.elasticsearch.ElasticsearchException;
811
import org.elasticsearch.action.ActionListener;
912
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1013
import org.elasticsearch.client.Client;
@@ -15,6 +18,7 @@
1518
import org.elasticsearch.cluster.metadata.IndexMetaData;
1619
import org.elasticsearch.cluster.metadata.MetaData;
1720
import org.elasticsearch.cluster.service.ClusterService;
21+
import org.elasticsearch.common.Strings;
1822
import org.elasticsearch.common.settings.Settings;
1923
import org.elasticsearch.index.IndexNotFoundException;
2024
import org.elasticsearch.index.reindex.BulkByScrollResponse;
@@ -25,7 +29,6 @@
2529
import org.elasticsearch.transport.TransportResponse;
2630

2731
import java.util.function.BiConsumer;
28-
import java.util.function.Consumer;
2932

3033
import static org.elasticsearch.index.IndexSettings.same;
3134

@@ -39,17 +42,18 @@
3942
* - Delete index .{name} and add alias .{name} to .{name}-6
4043
*/
4144
public class InternalIndexReindexer<T> {
45+
private static final Logger logger = LogManager.getLogger(InternalIndexReindexer.class);
4246

4347
private final Client client;
4448
private final ClusterService clusterService;
4549
private final Script transformScript;
4650
private final String[] types;
4751
private final int version;
48-
private final Consumer<ActionListener<T>> preUpgrade;
52+
private final BiConsumer<ClusterState, ActionListener<T>> preUpgrade;
4953
private final BiConsumer<T, ActionListener<TransportResponse.Empty>> postUpgrade;
5054

5155
public InternalIndexReindexer(Client client, ClusterService clusterService, int version, Script transformScript, String[] types,
52-
Consumer<ActionListener<T>> preUpgrade,
56+
BiConsumer<ClusterState,ActionListener<T>> preUpgrade,
5357
BiConsumer<T, ActionListener<TransportResponse.Empty>> postUpgrade) {
5458
this.client = client;
5559
this.clusterService = clusterService;
@@ -62,7 +66,7 @@ public InternalIndexReindexer(Client client, ClusterService clusterService, int
6266

6367
public void upgrade(TaskId task, String index, ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
6468
ParentTaskAssigningClient parentAwareClient = new ParentTaskAssigningClient(client, task);
65-
preUpgrade.accept(ActionListener.wrap(
69+
preUpgrade.accept(clusterState, ActionListener.wrap(
6670
t -> innerUpgrade(parentAwareClient, index, clusterState, ActionListener.wrap(
6771
response -> postUpgrade.accept(t, ActionListener.wrap(
6872
empty -> listener.onResponse(response),
@@ -76,32 +80,61 @@ public void upgrade(TaskId task, String index, ClusterState clusterState, Action
7680
private void innerUpgrade(ParentTaskAssigningClient parentAwareClient, String index, ClusterState clusterState,
7781
ActionListener<BulkByScrollResponse> listener) {
7882
String newIndex = index + "-" + version;
83+
logger.trace("upgrading index {} to new index {}", index, newIndex);
7984
try {
8085
checkMasterAndDataNodeVersion(clusterState);
81-
parentAwareClient.admin().indices().prepareCreate(newIndex).execute(ActionListener.wrap(createIndexResponse ->
82-
setReadOnlyBlock(index, ActionListener.wrap(setReadOnlyResponse ->
83-
reindex(parentAwareClient, index, newIndex, ActionListener.wrap(
84-
bulkByScrollResponse -> // Successful completion of reindexing - delete old index
85-
removeReadOnlyBlock(parentAwareClient, index, ActionListener.wrap(unsetReadOnlyResponse ->
86-
parentAwareClient.admin().indices().prepareAliases().removeIndex(index)
87-
.addAlias(newIndex, index).execute(ActionListener.wrap(deleteIndexResponse ->
88-
listener.onResponse(bulkByScrollResponse), listener::onFailure
89-
)), listener::onFailure
90-
)),
91-
e -> // Something went wrong during reindexing - remove readonly flag and report the error
92-
removeReadOnlyBlock(parentAwareClient, index, ActionListener.wrap(unsetReadOnlyResponse -> {
93-
listener.onFailure(e);
94-
}, e1 -> {
95-
listener.onFailure(e);
96-
}))
97-
)), listener::onFailure
98-
)), listener::onFailure
99-
));
86+
parentAwareClient.admin().indices().prepareCreate(newIndex).execute(ActionListener.wrap(createIndexResponse -> {
87+
setReadOnlyBlock(index, ActionListener.wrap(
88+
setReadOnlyResponse -> reindex(parentAwareClient, index, newIndex, ActionListener.wrap(bulkByScrollResponse -> {
89+
if ((bulkByScrollResponse.getBulkFailures() != null
90+
&& bulkByScrollResponse.getBulkFailures().isEmpty() == false)
91+
|| (bulkByScrollResponse.getSearchFailures() != null
92+
&& bulkByScrollResponse.getSearchFailures().isEmpty() == false)) {
93+
ElasticsearchException ex = logAndThrowExceptionForFailures(bulkByScrollResponse);
94+
removeReadOnlyBlockOnReindexFailure(parentAwareClient, index, listener, ex);
95+
} else {
96+
// Successful completion of reindexing - remove read only and delete old index
97+
removeReadOnlyBlock(parentAwareClient, index,
98+
ActionListener.wrap(unsetReadOnlyResponse -> parentAwareClient.admin().indices().prepareAliases()
99+
.removeIndex(index).addAlias(newIndex, index)
100+
.execute(ActionListener.wrap(
101+
deleteIndexResponse -> listener.onResponse(bulkByScrollResponse),
102+
listener::onFailure)),
103+
listener::onFailure));
104+
}
105+
}, e -> {
106+
logger.error("error occurred while reindexing", e);
107+
removeReadOnlyBlockOnReindexFailure(parentAwareClient, index, listener, e);
108+
})), listener::onFailure));
109+
}, listener::onFailure));
100110
} catch (Exception ex) {
111+
logger.error("error occurred while upgrading index", ex);
112+
removeReadOnlyBlockOnReindexFailure(parentAwareClient, index, listener, ex);
101113
listener.onFailure(ex);
102114
}
103115
}
104116

117+
private void removeReadOnlyBlockOnReindexFailure(ParentTaskAssigningClient parentAwareClient, String index,
118+
ActionListener<BulkByScrollResponse> listener, Exception ex) {
119+
removeReadOnlyBlock(parentAwareClient, index, ActionListener.wrap(unsetReadOnlyResponse -> {
120+
listener.onFailure(ex);
121+
}, e1 -> {
122+
listener.onFailure(ex);
123+
}));
124+
}
125+
126+
private ElasticsearchException logAndThrowExceptionForFailures(BulkByScrollResponse bulkByScrollResponse) {
127+
String bulkFailures = (bulkByScrollResponse.getBulkFailures() != null)
128+
? Strings.collectionToCommaDelimitedString(bulkByScrollResponse.getBulkFailures())
129+
: "";
130+
String searchFailures = (bulkByScrollResponse.getSearchFailures() != null)
131+
? Strings.collectionToCommaDelimitedString(bulkByScrollResponse.getSearchFailures())
132+
: "";
133+
logger.error("error occurred while reindexing, bulk failures [{}], search failures [{}]", bulkFailures, searchFailures);
134+
return new ElasticsearchException("error occurred while reindexing, bulk failures [{}], search failures [{}]", bulkFailures,
135+
searchFailures);
136+
}
137+
105138
private void checkMasterAndDataNodeVersion(ClusterState clusterState) {
106139
if (clusterState.nodes().getMinNodeVersion().before(Upgrade.UPGRADE_INTRODUCED)) {
107140
throw new IllegalStateException("All nodes should have at least version [" + Upgrade.UPGRADE_INTRODUCED + "] to upgrade");

x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/Upgrade.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1717
import org.elasticsearch.action.update.UpdateRequest;
1818
import org.elasticsearch.client.Client;
19+
import org.elasticsearch.cluster.ClusterState;
1920
import org.elasticsearch.cluster.metadata.IndexMetaData;
2021
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2122
import org.elasticsearch.cluster.node.DiscoveryNodes;
23+
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
24+
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.Allocation;
2225
import org.elasticsearch.cluster.service.ClusterService;
2326
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2427
import org.elasticsearch.common.settings.ClusterSettings;
@@ -167,7 +170,15 @@ static BiFunction<Client, ClusterService, IndexUpgradeCheck> getSecurityUpgradeC
167170
" ctx._type = \"doc\";" +
168171
"}\n",
169172
new HashMap<>()),
170-
listener -> listener.onResponse(null),
173+
(cs, listener) -> {
174+
if (isClusterRoutingAllocationEnabled(cs) == false) {
175+
listener.onFailure(new ElasticsearchException(
176+
"pre-upgrade check failed, please enable cluster routing allocation using setting [{}]",
177+
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey()));
178+
} else {
179+
listener.onResponse(null);
180+
}
181+
},
171182
(success, listener) -> postSecurityUpgrade(clientWithOrigin, listener));
172183
};
173184
}
@@ -261,7 +272,7 @@ static BiFunction<Client, ClusterService, IndexUpgradeCheck> getWatchesIndexUpgr
261272
" ctx._source.status = ctx._source.remove(\"_status\");\n" +
262273
"}",
263274
new HashMap<>()),
264-
booleanActionListener -> preWatchesIndexUpgrade(clientWithOrigin, booleanActionListener),
275+
(cs, booleanActionListener) -> preWatchesIndexUpgrade(clientWithOrigin, cs, booleanActionListener),
265276
(shouldStartWatcher, listener) -> postWatchesIndexUpgrade(clientWithOrigin, shouldStartWatcher, listener)
266277
);
267278
};
@@ -285,7 +296,7 @@ static BiFunction<Client, ClusterService, IndexUpgradeCheck> getTriggeredWatches
285296
clusterService,
286297
new String[]{"triggered-watch"},
287298
new Script(ScriptType.INLINE, "painless", "ctx._type = \"doc\";\n", new HashMap<>()),
288-
booleanActionListener -> preTriggeredWatchesIndexUpgrade(clientWithOrigin, booleanActionListener),
299+
(cs, booleanActionListener) -> preTriggeredWatchesIndexUpgrade(clientWithOrigin, cs, booleanActionListener),
289300
(shouldStartWatcher, listener) -> postWatchesIndexUpgrade(clientWithOrigin, shouldStartWatcher, listener)
290301
);
291302
};
@@ -295,7 +306,12 @@ private static boolean indexOrAliasExists(IndexMetaData indexMetaData, String na
295306
return name.equals(indexMetaData.getIndex().getName()) || indexMetaData.getAliases().containsKey(name);
296307
}
297308

298-
static void preTriggeredWatchesIndexUpgrade(Client client, ActionListener<Boolean> listener) {
309+
static void preTriggeredWatchesIndexUpgrade(Client client, ClusterState cs, ActionListener<Boolean> listener) {
310+
if (isClusterRoutingAllocationEnabled(cs) == false) {
311+
listener.onFailure(new ElasticsearchException(
312+
"pre-upgrade check failed, please enable cluster routing allocation using setting [{}]",
313+
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey()));
314+
}
299315
new WatcherClient(client).prepareWatcherStats().execute(ActionListener.wrap(
300316
stats -> {
301317
if (stats.watcherMetaData().manuallyStopped()) {
@@ -316,6 +332,15 @@ static void preTriggeredWatchesIndexUpgrade(Client client, ActionListener<Boolea
316332
listener::onFailure));
317333
}
318334

335+
static boolean isClusterRoutingAllocationEnabled(ClusterState cs) {
336+
Allocation clusterRoutingAllocation = EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING
337+
.get(cs.getMetaData().settings());
338+
if (Allocation.NONE == clusterRoutingAllocation) {
339+
return false;
340+
}
341+
return true;
342+
}
343+
319344
private static void preTriggeredWatchesIndexUpgrade(final Client client, final ActionListener<Boolean> listener,
320345
final boolean restart) {
321346
final String legacyTriggeredWatchesTemplateName = "triggered_watches";
@@ -338,7 +363,13 @@ private static void preTriggeredWatchesIndexUpgrade(final Client client, final A
338363
.setSource(triggeredWatchesTemplate, XContentType.JSON).execute(putTriggeredWatchesListener);
339364
}
340365

341-
static void preWatchesIndexUpgrade(Client client, ActionListener<Boolean> listener) {
366+
static void preWatchesIndexUpgrade(Client client, ClusterState cs, ActionListener<Boolean> listener) {
367+
if (isClusterRoutingAllocationEnabled(cs) == false) {
368+
listener.onFailure(new ElasticsearchException(
369+
"pre-upgrade check failed, please enable cluster routing allocation using setting [{}]",
370+
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey()));
371+
}
372+
342373
new WatcherClient(client).prepareWatcherStats().execute(ActionListener.wrap(
343374
stats -> {
344375
if (stats.watcherMetaData().manuallyStopped()) {

x-pack/plugin/upgrade/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public void testInternalUpgradePrePostChecks() throws Exception {
9797
}
9898
},
9999
client(), internalCluster().clusterService(internalCluster().getMasterName()), Strings.EMPTY_ARRAY, null,
100-
listener -> {
100+
(cs, listener) -> {
101101
assertFalse(preUpgradeIsCalled.getAndSet(true));
102102
assertFalse(postUpgradeIsCalled.get());
103103
listener.onResponse(val);

0 commit comments

Comments
 (0)