From 0beafa9dc655d64ea0db8ca398a4f471d6c5557c Mon Sep 17 00:00:00 2001 From: Yogesh Gaikwad <902768+bizybot@users.noreply.github.com> Date: Fri, 8 Mar 2019 11:47:14 +1100 Subject: [PATCH] Add pre-upgrade check to test cluster routing allocation is enabled (#39340) When following the steps mentioned in upgrade guide https://www.elastic.co/guide/en/elastic-stack/6.6/upgrading-elastic-stack.html if we disable the cluster shard allocation but fail to enable it after upgrading the nodes and plugins, the next step of upgrading internal indices fails. As we did not check the bulk request response for reindexing, we delete the old index assuming it has been created. This is fatal as we cannot recover from this state. This commit adds a pre-upgrade check to test the cluster shard allocation setting and fail upgrade if it is disabled. In case there are search or bulk failures then we remove the read-only block and fail the upgrade index request. Closes #39339 --- .../upgrade/IndexUpgradeCheckVersion.java | 2 +- .../xpack/upgrade/IndexUpgradeCheck.java | 25 +++++- .../xpack/upgrade/InternalIndexReindexer.java | 79 +++++++++++++------ .../xpack/upgrade/IndexUpgradeIT.java | 2 +- .../upgrade/InternalIndexReindexerIT.java | 60 ++++++++++---- 5 files changed, 122 insertions(+), 46 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/upgrade/IndexUpgradeCheckVersion.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/upgrade/IndexUpgradeCheckVersion.java index e09f73a688e57..298c8ac95c2e5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/upgrade/IndexUpgradeCheckVersion.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/upgrade/IndexUpgradeCheckVersion.java @@ -6,7 +6,7 @@ package org.elasticsearch.xpack.core.upgrade; public final class IndexUpgradeCheckVersion { - public static final int UPRADE_VERSION = 6; + public static final int UPGRADE_VERSION = 6; private IndexUpgradeCheckVersion() {} diff --git a/x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java b/x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java index 62a2829b9258c..102827f87f771 100644 --- a/x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java +++ b/x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java @@ -5,10 +5,13 @@ */ package org.elasticsearch.xpack.upgrade; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.Allocation; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.protocol.xpack.migration.UpgradeActionRequired; @@ -18,7 +21,6 @@ import org.elasticsearch.xpack.core.upgrade.IndexUpgradeCheckVersion; import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.function.Function; /** @@ -51,7 +53,17 @@ public IndexUpgradeCheck(String name, Function actionRequired, Client client, ClusterService clusterService, String[] types, Script updateScript) { this(name, actionRequired, client, clusterService, types, updateScript, - listener -> listener.onResponse(null), (t, listener) -> listener.onResponse(TransportResponse.Empty.INSTANCE)); + (cs, listener) -> { + Allocation clusterRoutingAllocation = EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING + .get(cs.getMetaData().settings()); + if (Allocation.NONE == clusterRoutingAllocation) { + listener.onFailure(new ElasticsearchException( + "pre-upgrade check failed, please enable cluster routing allocation using setting [{}]", + EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey())); + } else { + listener.onResponse(null); + } + }, (t, listener) -> listener.onResponse(TransportResponse.Empty.INSTANCE)); } /** @@ -69,11 +81,11 @@ public IndexUpgradeCheck(String name, public IndexUpgradeCheck(String name, Function actionRequired, Client client, ClusterService clusterService, String[] types, Script updateScript, - Consumer> preUpgrade, + BiConsumer> preUpgrade, BiConsumer> postUpgrade) { this.name = name; this.actionRequired = actionRequired; - this.reindexer = new InternalIndexReindexer<>(client, clusterService, IndexUpgradeCheckVersion.UPRADE_VERSION, updateScript, + this.reindexer = new InternalIndexReindexer<>(client, clusterService, IndexUpgradeCheckVersion.UPGRADE_VERSION, updateScript, types, preUpgrade, postUpgrade); } @@ -106,4 +118,9 @@ public void upgrade(TaskId task, IndexMetaData indexMetaData, ClusterState state ActionListener listener) { reindexer.upgrade(task, indexMetaData.getIndex().getName(), state, listener); } + + // pkg scope for testing + InternalIndexReindexer getInternalIndexReindexer() { + return reindexer; + } } diff --git a/x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java b/x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java index 6ab920555bb0b..763fc7d92deb8 100644 --- a/x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java +++ b/x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java @@ -5,6 +5,9 @@ */ package org.elasticsearch.xpack.upgrade; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; @@ -15,6 +18,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.reindex.BulkByScrollResponse; @@ -25,7 +29,6 @@ import org.elasticsearch.transport.TransportResponse; import java.util.function.BiConsumer; -import java.util.function.Consumer; import static org.elasticsearch.index.IndexSettings.same; @@ -39,17 +42,18 @@ * - Delete index .{name} and add alias .{name} to .{name}-6 */ public class InternalIndexReindexer { + private static final Logger logger = LogManager.getLogger(InternalIndexReindexer.class); private final Client client; private final ClusterService clusterService; private final Script transformScript; private final String[] types; private final int version; - private final Consumer> preUpgrade; + private final BiConsumer> preUpgrade; private final BiConsumer> postUpgrade; public InternalIndexReindexer(Client client, ClusterService clusterService, int version, Script transformScript, String[] types, - Consumer> preUpgrade, + BiConsumer> preUpgrade, BiConsumer> postUpgrade) { this.client = client; this.clusterService = clusterService; @@ -62,7 +66,7 @@ public InternalIndexReindexer(Client client, ClusterService clusterService, int public void upgrade(TaskId task, String index, ClusterState clusterState, ActionListener listener) { ParentTaskAssigningClient parentAwareClient = new ParentTaskAssigningClient(client, task); - preUpgrade.accept(ActionListener.wrap( + preUpgrade.accept(clusterState, ActionListener.wrap( t -> innerUpgrade(parentAwareClient, index, clusterState, ActionListener.wrap( response -> postUpgrade.accept(t, ActionListener.wrap( empty -> listener.onResponse(response), @@ -76,32 +80,61 @@ public void upgrade(TaskId task, String index, ClusterState clusterState, Action private void innerUpgrade(ParentTaskAssigningClient parentAwareClient, String index, ClusterState clusterState, ActionListener listener) { String newIndex = index + "-" + version; + logger.trace("upgrading index {} to new index {}", index, newIndex); try { checkMasterAndDataNodeVersion(clusterState); - parentAwareClient.admin().indices().prepareCreate(newIndex).execute(ActionListener.wrap(createIndexResponse -> - setReadOnlyBlock(index, ActionListener.wrap(setReadOnlyResponse -> - reindex(parentAwareClient, index, newIndex, ActionListener.wrap( - bulkByScrollResponse -> // Successful completion of reindexing - delete old index - removeReadOnlyBlock(parentAwareClient, index, ActionListener.wrap(unsetReadOnlyResponse -> - parentAwareClient.admin().indices().prepareAliases().removeIndex(index) - .addAlias(newIndex, index).execute(ActionListener.wrap(deleteIndexResponse -> - listener.onResponse(bulkByScrollResponse), listener::onFailure - )), listener::onFailure - )), - e -> // Something went wrong during reindexing - remove readonly flag and report the error - removeReadOnlyBlock(parentAwareClient, index, ActionListener.wrap(unsetReadOnlyResponse -> { - listener.onFailure(e); - }, e1 -> { - listener.onFailure(e); - })) - )), listener::onFailure - )), listener::onFailure - )); + parentAwareClient.admin().indices().prepareCreate(newIndex).execute(ActionListener.wrap(createIndexResponse -> { + setReadOnlyBlock(index, ActionListener.wrap( + setReadOnlyResponse -> reindex(parentAwareClient, index, newIndex, ActionListener.wrap(bulkByScrollResponse -> { + if ((bulkByScrollResponse.getBulkFailures() != null + && bulkByScrollResponse.getBulkFailures().isEmpty() == false) + || (bulkByScrollResponse.getSearchFailures() != null + && bulkByScrollResponse.getSearchFailures().isEmpty() == false)) { + ElasticsearchException ex = logAndThrowExceptionForFailures(bulkByScrollResponse); + removeReadOnlyBlockOnReindexFailure(parentAwareClient, index, listener, ex); + } else { + // Successful completion of reindexing - remove read only and delete old index + removeReadOnlyBlock(parentAwareClient, index, + ActionListener.wrap(unsetReadOnlyResponse -> parentAwareClient.admin().indices().prepareAliases() + .removeIndex(index).addAlias(newIndex, index) + .execute(ActionListener.wrap( + deleteIndexResponse -> listener.onResponse(bulkByScrollResponse), + listener::onFailure)), + listener::onFailure)); + } + }, e -> { + logger.error("error occurred while reindexing", e); + removeReadOnlyBlockOnReindexFailure(parentAwareClient, index, listener, e); + })), listener::onFailure)); + }, listener::onFailure)); } catch (Exception ex) { + logger.error("error occurred while upgrading index", ex); + removeReadOnlyBlockOnReindexFailure(parentAwareClient, index, listener, ex); listener.onFailure(ex); } } + private void removeReadOnlyBlockOnReindexFailure(ParentTaskAssigningClient parentAwareClient, String index, + ActionListener listener, Exception ex) { + removeReadOnlyBlock(parentAwareClient, index, ActionListener.wrap(unsetReadOnlyResponse -> { + listener.onFailure(ex); + }, e1 -> { + listener.onFailure(ex); + })); + } + + private ElasticsearchException logAndThrowExceptionForFailures(BulkByScrollResponse bulkByScrollResponse) { + String bulkFailures = (bulkByScrollResponse.getBulkFailures() != null) + ? Strings.collectionToCommaDelimitedString(bulkByScrollResponse.getBulkFailures()) + : ""; + String searchFailures = (bulkByScrollResponse.getSearchFailures() != null) + ? Strings.collectionToCommaDelimitedString(bulkByScrollResponse.getSearchFailures()) + : ""; + logger.error("error occurred while reindexing, bulk failures [{}], search failures [{}]", bulkFailures, searchFailures); + return new ElasticsearchException("error occurred while reindexing, bulk failures [{}], search failures [{}]", bulkFailures, + searchFailures); + } + private void checkMasterAndDataNodeVersion(ClusterState clusterState) { if (clusterState.nodes().getMinNodeVersion().before(Upgrade.UPGRADE_INTRODUCED)) { throw new IllegalStateException("All nodes should have at least version [" + Upgrade.UPGRADE_INTRODUCED + "] to upgrade"); diff --git a/x-pack/plugin/upgrade/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java b/x-pack/plugin/upgrade/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java index 3663d586159d9..c764966d1132c 100644 --- a/x-pack/plugin/upgrade/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java +++ b/x-pack/plugin/upgrade/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java @@ -96,7 +96,7 @@ public void testInternalUpgradePrePostChecks() throws Exception { } }, client(), internalCluster().clusterService(internalCluster().getMasterName()), Strings.EMPTY_ARRAY, null, - listener -> { + (cs, listener) -> { assertFalse(preUpgradeIsCalled.getAndSet(true)); assertFalse(postUpgradeIsCalled.get()); listener.onResponse(val); diff --git a/x-pack/plugin/upgrade/src/test/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexerIT.java b/x-pack/plugin/upgrade/src/test/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexerIT.java index 013680ee2d17b..9f9c7353ad62b 100644 --- a/x-pack/plugin/upgrade/src/test/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexerIT.java +++ b/x-pack/plugin/upgrade/src/test/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexerIT.java @@ -6,8 +6,11 @@ package org.elasticsearch.xpack.upgrade; import com.carrotsearch.hppc.cursors.ObjectCursor; + +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.PlainActionFuture; @@ -19,6 +22,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; @@ -26,13 +30,16 @@ import org.elasticsearch.index.reindex.ReindexPlugin; import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.protocol.xpack.migration.UpgradeActionRequired; import org.elasticsearch.script.MockScriptPlugin; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.SearchHit; import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.upgrade.IndexUpgradeCheckVersion; import java.util.ArrayList; import java.util.Arrays; @@ -45,10 +52,13 @@ import static org.elasticsearch.test.VersionUtils.randomVersionBetween; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.core.IsEqual.equalTo; +@ClusterScope(scope=Scope.TEST) public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase { @Override @@ -77,13 +87,13 @@ protected Map, Object>> pluginScripts() { public void testUpgradeIndex() throws Exception { createTestIndex("test"); - InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); + InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY); PlainActionFuture future = PlainActionFuture.newFuture(); reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); BulkByScrollResponse response = future.actionGet(); assertThat(response.getCreated(), equalTo(2L)); - SearchResponse searchResponse = client().prepareSearch("test-123").get(); + SearchResponse searchResponse = client().prepareSearch("test-" + IndexUpgradeCheckVersion.UPGRADE_VERSION).get(); assertThat(searchResponse.getHits().getTotalHits().value, equalTo(2L)); assertThat(searchResponse.getHits().getHits().length, equalTo(2)); for (SearchHit hit : searchResponse.getHits().getHits()) { @@ -94,7 +104,7 @@ public void testUpgradeIndex() throws Exception { GetAliasesResponse aliasesResponse = client().admin().indices().prepareGetAliases("test").get(); assertThat(aliasesResponse.getAliases().size(), equalTo(1)); - List testAlias = aliasesResponse.getAliases().get("test-123"); + List testAlias = aliasesResponse.getAliases().get("test-" + IndexUpgradeCheckVersion.UPGRADE_VERSION); assertNotNull(testAlias); assertThat(testAlias.size(), equalTo(1)); assertThat(testAlias.get(0).alias(), equalTo("test")); @@ -102,8 +112,8 @@ public void testUpgradeIndex() throws Exception { public void testTargetIndexExists() throws Exception { createTestIndex("test"); - createTestIndex("test-123"); - InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); + createTestIndex("test-" + IndexUpgradeCheckVersion.UPGRADE_VERSION); + InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY); PlainActionFuture future = PlainActionFuture.newFuture(); reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); assertThrows(future, ResourceAlreadyExistsException.class); @@ -115,14 +125,14 @@ public void testTargetIndexExists() throws Exception { public void testTargetIndexExistsAsAlias() throws Exception { createTestIndex("test"); createTestIndex("test-foo"); - client().admin().indices().prepareAliases().addAlias("test-foo", "test-123").get(); - InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); + client().admin().indices().prepareAliases().addAlias("test-foo", "test-" + IndexUpgradeCheckVersion.UPGRADE_VERSION).get(); + InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY); PlainActionFuture future = PlainActionFuture.newFuture(); reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); assertThrows(future, InvalidIndexNameException.class); // Make sure that the index is not marked as read-only - client().prepareIndex("test-123", "doc").setSource("foo", "bar").get(); + client().prepareIndex("test-" + IndexUpgradeCheckVersion.UPGRADE_VERSION, "doc").setSource("foo", "bar").get(); } public void testSourceIndexIsReadonly() throws Exception { @@ -130,7 +140,7 @@ public void testSourceIndexIsReadonly() throws Exception { try { Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true).build(); assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get()); - InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); + InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY); PlainActionFuture future = PlainActionFuture.newFuture(); reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); assertThrows(future, IllegalStateException.class); @@ -144,12 +154,30 @@ public void testSourceIndexIsReadonly() throws Exception { } } + public void testReindexingFailureWithClusterRoutingAllocationDisabled() throws Exception { + createTestIndex("test"); + + Settings settings = Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none") + .build(); + ClusterUpdateSettingsResponse clusterUpdateResponse = client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(settings).get(); + assertThat(clusterUpdateResponse.isAcknowledged(), is(true)); + assertThat(clusterUpdateResponse.getTransientSettings() + .get(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey()), is("none")); + + InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY); + PlainActionFuture future = PlainActionFuture.newFuture(); + reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> future.actionGet()); + assertThat(e.getMessage(), containsString( + "pre-upgrade check failed, please enable cluster routing allocation using setting [cluster.routing.allocation.enable]")); + } public void testReindexingFailure() throws Exception { createTestIndex("test"); // Make sure that the index is not marked as read-only client().prepareIndex("test", "doc").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - InternalIndexReindexer reindexer = createIndexReindexer(123, script("fail"), Strings.EMPTY_ARRAY); + InternalIndexReindexer reindexer = createIndexReindexer(script("fail"), Strings.EMPTY_ARRAY); PlainActionFuture future = PlainActionFuture.newFuture(); reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); assertThrows(future, RuntimeException.class); @@ -161,7 +189,7 @@ public void testReindexingFailure() throws Exception { public void testMixedNodeVersion() throws Exception { createTestIndex("test"); - InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); + InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY); PlainActionFuture future = PlainActionFuture.newFuture(); reindexer.upgrade(new TaskId("abc", 123), "test", withRandomOldNode(), future); assertThrows(future, IllegalStateException.class); @@ -183,11 +211,9 @@ private Script script(String name) { return new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, name, new HashMap<>()); } - private InternalIndexReindexer createIndexReindexer(int version, Script transformScript, String[] types) { - return new InternalIndexReindexer(client(), internalCluster().clusterService(internalCluster().getMasterName()), - version, transformScript, types, voidActionListener -> voidActionListener.onResponse(null), - (aVoid, listener) -> listener.onResponse(TransportResponse.Empty.INSTANCE)); - + private InternalIndexReindexer createIndexReindexer(Script transformScript, String[] types) { + return new IndexUpgradeCheck("test", imd -> UpgradeActionRequired.UPGRADE, client(), + internalCluster().clusterService(internalCluster().getMasterName()), types, transformScript).getInternalIndexReindexer(); } private ClusterState clusterState() {