From 836984bceb3006cd4676adcad8ffb1259218b9ef Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sun, 14 Oct 2018 10:25:39 -0400 Subject: [PATCH 01/14] Replicate index settings to followers This commit uses the index settings version so that a follower can replicate index settings changes as needed from the leader. --- .../xpack/ccr/action/ShardChangesAction.java | 68 ++++++++++++--- .../xpack/ccr/action/ShardFollowNodeTask.java | 48 +++++++++-- .../ccr/action/ShardFollowTasksExecutor.java | 82 +++++++++++++++++++ .../action/TransportResumeFollowAction.java | 3 +- .../ShardFollowTaskReplicationTests.java | 9 +- 5 files changed, 191 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index cf54a236a0451..e733b21b6101c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.support.single.shard.SingleShardRequest; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.service.ClusterService; @@ -202,6 +203,12 @@ public long getMappingVersion() { return mappingVersion; } + private long settingsVersion; + + public long getSettingsVersion() { + return settingsVersion; + } + private long globalCheckpoint; public long getGlobalCheckpoint() { @@ -237,6 +244,7 @@ public long getTookInMillis() { Response( final long mappingVersion, + final long settingsVersion, final long globalCheckpoint, final long maxSeqNo, final long maxSeqNoOfUpdatesOrDeletes, @@ -244,6 +252,7 @@ public long getTookInMillis() { final long tookInMillis) { this.mappingVersion = mappingVersion; + this.settingsVersion = settingsVersion; this.globalCheckpoint = globalCheckpoint; this.maxSeqNo = maxSeqNo; this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; @@ -255,6 +264,7 @@ public long getTookInMillis() { public void readFrom(final StreamInput in) throws IOException { super.readFrom(in); mappingVersion = in.readVLong(); + settingsVersion = in.readVLong(); globalCheckpoint = in.readZLong(); maxSeqNo = in.readZLong(); maxSeqNoOfUpdatesOrDeletes = in.readZLong(); @@ -266,6 +276,7 @@ public void readFrom(final StreamInput in) throws IOException { public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); out.writeVLong(mappingVersion); + out.writeVLong(settingsVersion); out.writeZLong(globalCheckpoint); out.writeZLong(maxSeqNo); out.writeZLong(maxSeqNoOfUpdatesOrDeletes); @@ -279,6 +290,7 @@ public boolean equals(final Object o) { if (o == null || getClass() != o.getClass()) return false; final Response that = (Response) o; return mappingVersion == that.mappingVersion && + settingsVersion == that.settingsVersion && globalCheckpoint == that.globalCheckpoint && maxSeqNo == that.maxSeqNo && maxSeqNoOfUpdatesOrDeletes == that.maxSeqNoOfUpdatesOrDeletes && @@ -288,8 +300,14 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes, - Arrays.hashCode(operations), tookInMillis); + return Objects.hash( + mappingVersion, + settingsVersion, + globalCheckpoint, + maxSeqNo, + maxSeqNoOfUpdatesOrDeletes, + Arrays.hashCode(operations), + tookInMillis); } } @@ -315,7 +333,9 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); final IndexShard indexShard = indexService.getShard(request.getShard().id()); final SeqNoStats seqNoStats = indexShard.seqNoStats(); - final long mappingVersion = clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); + final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex()); + final long mappingVersion = indexMetaData.getMappingVersion(); + final long settingsVersion = indexMetaData.getSettingsVersion(); final Translog.Operation[] operations = getOperations( indexShard, @@ -326,7 +346,13 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc request.getMaxBatchSize()); // must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations. final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); - return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations, request.relativeStartNanos); + return getResponse( + mappingVersion, + settingsVersion, + seqNoStats, + maxSeqNoOfUpdatesOrDeletes, + operations, + request.relativeStartNanos); } @Override @@ -387,12 +413,19 @@ private void globalCheckpointAdvancementFailure( e); if (e instanceof TimeoutException) { try { - final long mappingVersion = - clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); + final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex()); + final long mappingVersion = indexMetaData.getMappingVersion(); + final long settingsVersion = indexMetaData.getSettingsVersion(); final SeqNoStats latestSeqNoStats = indexShard.seqNoStats(); final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); - listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY, - request.relativeStartNanos)); + listener.onResponse( + getResponse( + mappingVersion, + settingsVersion, + latestSeqNoStats, + maxSeqNoOfUpdatesOrDeletes, + EMPTY_OPERATIONS_ARRAY, + request.relativeStartNanos)); } catch (final Exception caught) { caught.addSuppressed(e); listener.onFailure(caught); @@ -477,12 +510,23 @@ static Translog.Operation[] getOperations( return operations.toArray(EMPTY_OPERATIONS_ARRAY); } - static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats, - final long maxSeqNoOfUpdates, final Translog.Operation[] operations, long relativeStartNanos) { + static Response getResponse( + final long mappingVersion, + final long settingsVersion, + final SeqNoStats seqNoStats, + final long maxSeqNoOfUpdates, + final Translog.Operation[] operations, + long relativeStartNanos) { long tookInNanos = System.nanoTime() - relativeStartNanos; long tookInMillis = TimeUnit.NANOSECONDS.toMillis(tookInNanos); - return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates, - operations, tookInMillis); + return new Response( + mappingVersion, + settingsVersion, + seqNoStats.getGlobalCheckpoint(), + seqNoStats.getMaxSeqNo(), + maxSeqNoOfUpdates, + operations, + tookInMillis); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 8c302344ad86d..9ac80818028b9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -20,8 +20,8 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.translog.Translog; @@ -73,6 +73,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private int numOutstandingReads = 0; private int numOutstandingWrites = 0; private long currentMappingVersion = 0; + private long currentSettingsVersion = 0; private long totalReadRemoteExecTimeMillis = 0; private long totalReadTimeMillis = 0; private long successfulReadRequests = 0; @@ -133,9 +134,19 @@ void start( synchronized (ShardFollowNodeTask.this) { currentMappingVersion = followerMappingVersion; } - LOGGER.info("{} Started to follow leader shard {}, followGlobalCheckPoint={}, followerMappingVersion={}", - params.getFollowShardId(), params.getLeaderShardId(), followerGlobalCheckpoint, followerMappingVersion); - coordinateReads(); + updateSettings(followerSettingsVersion -> { + synchronized (ShardFollowNodeTask.this) { + currentSettingsVersion = followerSettingsVersion; + } + LOGGER.info( + "{} following leader shard {}, follower global checkpoint=[{}], mapping version=[{}], settings version=[{}]", + params.getFollowShardId(), + params.getLeaderShardId(), + followerGlobalCheckpoint, + followerMappingVersion, + followerSettingsVersion); + coordinateReads(); + }); }); } @@ -268,7 +279,9 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR } void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) { - maybeUpdateMapping(response.getMappingVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response)); + maybeUpdateMapping( + response.getMappingVersion(), + () -> maybeUpdateSettings(response.getSettingsVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response))); } /** Called when some operations are fetched from the leading */ @@ -366,6 +379,21 @@ private synchronized void maybeUpdateMapping(Long minimumRequiredMappingVersion, } } + private synchronized void maybeUpdateSettings(final Long minimumRequiredSettingsVersion, Runnable task) { + if (currentSettingsVersion >= minimumRequiredSettingsVersion) { + LOGGER.trace("{} settings version [{}] is higher or equal than minimum required mapping version [{}]", + params.getFollowShardId(), currentSettingsVersion, minimumRequiredSettingsVersion); + task.run(); + } else { + LOGGER.trace("{} updating settings, settings version [{}] is lower than minimum required settings version [{}]", + params.getFollowShardId(), currentSettingsVersion, minimumRequiredSettingsVersion); + updateSettings(settingsVersion -> { + currentSettingsVersion = settingsVersion; + task.run(); + }); + } + } + private void updateMapping(LongConsumer handler) { updateMapping(handler, new AtomicInteger(0)); } @@ -374,6 +402,14 @@ private void updateMapping(LongConsumer handler, AtomicInteger retryCounter) { innerUpdateMapping(handler, e -> handleFailure(e, retryCounter, () -> updateMapping(handler, retryCounter))); } + private void updateSettings(final LongConsumer handler) { + updateSettings(handler, new AtomicInteger()); + } + + private void updateSettings(final LongConsumer handler, final AtomicInteger retryCounter) { + innerUpdateSettings(handler, e -> handleFailure(e, retryCounter, () -> updateSettings(handler, retryCounter))); + } + private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) { assert e != null; if (shouldRetry(e) && isStopped() == false) { @@ -422,6 +458,8 @@ static boolean shouldRetry(Exception e) { // These methods are protected for testing purposes: protected abstract void innerUpdateMapping(LongConsumer handler, Consumer errorHandler); + protected abstract void innerUpdateSettings(LongConsumer handler, Consumer errorHandler); + protected abstract void innerSendBulkShardOperationsRequest(String followerHistoryUUID, List operations, long leaderMaxSeqNoOfUpdatesOrDeletes, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 88d07566c74bd..5933f7a952b3c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -8,10 +8,14 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -132,6 +136,84 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro }, errorHandler)); } + @Override + protected void innerUpdateSettings(final LongConsumer handler, final Consumer errorHandler) { + final Index leaderIndex = params.getLeaderShardId().getIndex(); + final Index followIndex = params.getFollowShardId().getIndex(); + + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + clusterStateRequest.indices(leaderIndex.getName()); + + leaderClient + .admin() + .cluster() + .state( + clusterStateRequest, + ActionListener.wrap( + clusterStateResponse -> { + final IndexMetaData indexMetaData = + clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); + + final Settings existingSettings = + TransportResumeFollowAction.filter( + clusterService.state().metaData().getIndexSafe(followIndex).getSettings()); + final Settings settings = TransportResumeFollowAction.filter(indexMetaData.getSettings()); + if (existingSettings.equals(settings)) { + handler.accept(indexMetaData.getSettingsVersion()); + return; + } + final Settings updatedSettings = settings.filter( + s -> existingSettings.get(s) == null + || existingSettings.get(s).equals(settings.get(s)) == false); + final UpdateSettingsRequest updateSettingsRequest = + new UpdateSettingsRequest(followIndex.getName()); + updateSettingsRequest.settings(updatedSettings); + + // TODO: we should only close if needed, iterate over the settings and check if any non-dynamic + final ActionListener close = + getChainedListener( + handler, + errorHandler, + followIndex, + indexMetaData, + updateSettingsRequest); + + followerClient + .admin() + .indices() + .close(new CloseIndexRequest(followIndex.getName()), close); + + }, + errorHandler)); + } + + // TODO: this is horribly unreadable, we need a better approach here + private ActionListener getChainedListener( + LongConsumer handler, + Consumer errorHandler, + Index followIndex, + IndexMetaData indexMetaData, + UpdateSettingsRequest updateSettingsRequest) { + return ActionListener.wrap( + response -> followerClient + .admin() + .indices() + .updateSettings( + updateSettingsRequest, + ActionListener.wrap( + us -> followerClient.admin() + .indices() + .open( + new OpenIndexRequest(followIndex.getName()), + ActionListener.wrap( + open -> handler.accept(indexMetaData.getSettingsVersion()), + errorHandler)), + errorHandler)), + errorHandler); + } + @Override protected void innerSendBulkShardOperationsRequest( final String followerHistoryUUID, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java index d088d0f551e98..34cdd2aa7a1a8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java @@ -370,10 +370,11 @@ static String[] extractLeaderShardHistoryUUIDs(Map ccrIndexMetaD WHITE_LISTED_SETTINGS = Collections.unmodifiableSet(whiteListedSettings); } - private static Settings filter(Settings originalSettings) { + static Settings filter(Settings originalSettings) { Settings.Builder settings = Settings.builder().put(originalSettings); // Remove settings that are always going to be different between leader and follow index: settings.remove(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey()); + settings.remove(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey()); settings.remove(IndexMetaData.SETTING_INDEX_UUID); settings.remove(IndexMetaData.SETTING_INDEX_PROVIDED_NAME); settings.remove(IndexMetaData.SETTING_CREATION_DATE); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index d2f09c3900dfd..17ce05ccff132 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -402,6 +402,12 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro handler.accept(1L); } + @Override + protected void innerUpdateSettings(LongConsumer handler, Consumer errorHandler) { + // no-op as settings updates are not tested here + handler.accept(1L); + } + @Override protected void innerSendBulkShardOperationsRequest( final String followerHistoryUUID, @@ -432,7 +438,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co final SeqNoStats seqNoStats = indexShard.seqNoStats(); final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); if (from > seqNoStats.getGlobalCheckpoint()) { - handler.accept(ShardChangesAction.getResponse(1L, seqNoStats, + handler.accept(ShardChangesAction.getResponse(1L, 1L, seqNoStats, maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY, 1L)); return; } @@ -440,6 +446,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co maxOperationCount, recordedLeaderIndexHistoryUUID, params.getMaxReadRequestSize()); // hard code mapping version; this is ok, as mapping updates are not tested here final ShardChangesAction.Response response = new ShardChangesAction.Response( + 1L, 1L, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), From fc251f0d21997039325ea04511353283ecf03b78 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 31 Oct 2018 11:10:39 +0100 Subject: [PATCH 02/14] Fixed tests --- .../xpack/ccr/action/ShardFollowNodeTask.java | 2 +- .../ccr/action/ShardChangesResponseTests.java | 2 + .../ShardFollowNodeTaskRandomTests.java | 38 ++++++++--- .../ccr/action/ShardFollowNodeTaskTests.java | 65 +++++++++++++------ 4 files changed, 75 insertions(+), 32 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 9ac80818028b9..23e16795e7ae0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -403,7 +403,7 @@ private void updateMapping(LongConsumer handler, AtomicInteger retryCounter) { } private void updateSettings(final LongConsumer handler) { - updateSettings(handler, new AtomicInteger()); + updateSettings(handler, new AtomicInteger(0)); } private void updateSettings(final LongConsumer handler, final AtomicInteger retryCounter) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java index b9ac4fee3d23d..0e48fc8e57ca3 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java @@ -13,6 +13,7 @@ public class ShardChangesResponseTests extends AbstractStreamableTestCase fromToSlot = new HashMap<>(); @Override @@ -113,6 +115,11 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro handler.accept(mappingVersion); } + @Override + protected void innerUpdateSettings(LongConsumer handler, Consumer errorHandler) { + handler.accept(settingsVersion); + } + @Override protected void innerSendBulkShardOperationsRequest( String followerHistoryUUID, List operations, @@ -153,6 +160,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co // if too many invocations occur with the same from then AOBE occurs, this ok and then something is wrong. } mappingVersion = testResponse.mappingVersion; + settingsVersion = testResponse.settingsVersion; if (testResponse.exception != null) { errorHandler.accept(testResponse.exception); } else { @@ -162,8 +170,8 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co assert from >= testRun.finalExpectedGlobalCheckpoint; final long globalCheckpoint = tracker.getCheckpoint(); final long maxSeqNo = tracker.getMaxSeqNo(); - handler.accept(new ShardChangesAction.Response( - 0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(), new Translog.Operation[0], 1L)); + handler.accept(new ShardChangesAction.Response(0L, 0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(), + new Translog.Operation[0], 1L)); } }; threadPool.generic().execute(task); @@ -206,9 +214,10 @@ private void tearDown() { }; } - private static TestRun createTestRun(long startSeqNo, long startMappingVersion, int maxOperationCount) { + private static TestRun createTestRun(long startSeqNo, long startMappingVersion, long startSettingsVersion, int maxOperationCount) { long prevGlobalCheckpoint = startSeqNo; long mappingVersion = startMappingVersion; + long settingsVersion = startSettingsVersion; int numResponses = randomIntBetween(16, 256); Map> responses = new HashMap<>(numResponses); for (int i = 0; i < numResponses; i++) { @@ -216,13 +225,16 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, if (sometimes()) { mappingVersion++; } + if (sometimes()) { + settingsVersion++; + } if (sometimes()) { List item = new ArrayList<>(); // Sometimes add a random retryable error if (sometimes()) { Exception error = new UnavailableShardsException(new ShardId("test", "test", 0), ""); - item.add(new TestResponse(error, mappingVersion, null)); + item.add(new TestResponse(error, mappingVersion, settingsVersion, null)); } List ops = new ArrayList<>(); for (long seqNo = prevGlobalCheckpoint; seqNo <= nextGlobalCheckPoint; seqNo++) { @@ -233,8 +245,10 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, item.add(new TestResponse( null, mappingVersion, + settingsVersion, new ShardChangesAction.Response( mappingVersion, + settingsVersion, nextGlobalCheckPoint, nextGlobalCheckPoint, randomNonNegativeLong(), @@ -253,19 +267,20 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, // Sometimes add a random retryable error if (sometimes()) { Exception error = new UnavailableShardsException(new ShardId("test", "test", 0), ""); - item.add(new TestResponse(error, mappingVersion, null)); + item.add(new TestResponse(error, mappingVersion, settingsVersion, null)); } // Sometimes add an empty shard changes response to also simulate a leader shard lagging behind if (sometimes()) { ShardChangesAction.Response response = new ShardChangesAction.Response( mappingVersion, + settingsVersion, prevGlobalCheckpoint, prevGlobalCheckpoint, randomNonNegativeLong(), EMPTY, randomNonNegativeLong() ); - item.add(new TestResponse(null, mappingVersion, response)); + item.add(new TestResponse(null, mappingVersion, settingsVersion, response)); } List ops = new ArrayList<>(); for (long seqNo = fromSeqNo; seqNo <= toSeqNo; seqNo++) { @@ -277,13 +292,14 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, long localLeaderGCP = randomBoolean() ? ops.get(ops.size() - 1).seqNo() : toSeqNo; ShardChangesAction.Response response = new ShardChangesAction.Response( mappingVersion, + settingsVersion, localLeaderGCP, localLeaderGCP, randomNonNegativeLong(), ops.toArray(EMPTY), randomNonNegativeLong() ); - item.add(new TestResponse(null, mappingVersion, response)); + item.add(new TestResponse(null, mappingVersion, settingsVersion, response)); responses.put(fromSeqNo, Collections.unmodifiableList(item)); } } @@ -323,11 +339,13 @@ private static class TestResponse { final Exception exception; final long mappingVersion; + final long settingsVersion; final ShardChangesAction.Response response; - private TestResponse(Exception exception, long mappingVersion, ShardChangesAction.Response response) { + private TestResponse(Exception exception, long mappingVersion, long settingsVersion, ShardChangesAction.Response response) { this.exception = exception; this.mappingVersion = mappingVersion; + this.settingsVersion = settingsVersion; this.response = response; } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index aeac0ac451806..4421db63cbab7 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -57,6 +57,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { private Queue writeFailures; private Queue mappingUpdateFailures; private Queue mappingVersions; + private Queue settingsUpdateFailures; + private Queue settingsVersions; private Queue leaderGlobalCheckpoints; private Queue followerGlobalCheckpoints; private Queue maxSeqNos; @@ -73,7 +75,7 @@ public void testCoordinateReads() { task.coordinateReads(); assertThat(shardChangesRequests, contains(new long[]{0L, 8L})); // treat this a peak request shardChangesRequests.clear(); - task.innerHandleReadResponse(0, 5L, generateShardChangesResponse(0, 5L, 0L, 60L)); + task.innerHandleReadResponse(0, 5L, generateShardChangesResponse(0, 5L, 0L, 0L, 60L)); assertThat(shardChangesRequests, contains(new long[][]{ {6L, 8L}, {14L, 8L}, {22L, 8L}, {30L, 8L}, {38L, 8L}, {46L, 8L}, {54L, 7L}} )); @@ -98,7 +100,7 @@ public void testMaxWriteBufferCount() { shardChangesRequests.clear(); // Also invokes the coordinatesReads() method: - task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 128L)); + task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 128L)); assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer count limit has been reached ShardFollowNodeTaskStatus status = task.getStatus(); @@ -124,7 +126,7 @@ public void testMaxWriteBufferSize() { shardChangesRequests.clear(); // Also invokes the coordinatesReads() method: - task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 128L)); + task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 128L)); assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer size limit has been reached ShardFollowNodeTaskStatus status = task.getStatus(); @@ -189,7 +191,7 @@ public void testTaskCancelledAfterReadLimitHasBeenReached() { task.markAsCompleted(); shardChangesRequests.clear(); // Also invokes the coordinatesReads() method: - task.innerHandleReadResponse(0L, 15L, generateShardChangesResponse(0, 15, 0L, 31L)); + task.innerHandleReadResponse(0L, 15L, generateShardChangesResponse(0, 15, 0L, 0L, 31L)); assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because task has been cancelled assertThat(bulkShardOperationRequests.size(), equalTo(0)); // no more writes, because task has been cancelled @@ -219,7 +221,7 @@ public void testTaskCancelledAfterWriteBufferLimitHasBeenReached() { task.markAsCompleted(); shardChangesRequests.clear(); // Also invokes the coordinatesReads() method: - task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 128L)); + task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 128L)); assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because task has been cancelled assertThat(bulkShardOperationRequests.size(), equalTo(0)); // no more writes, because task has been cancelled @@ -439,7 +441,7 @@ public void testHandleReadResponse() { startTask(task, 63, -1); task.coordinateReads(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L); task.innerHandleReadResponse(0L, 63L, response); assertThat(bulkShardOperationRequests.size(), equalTo(1)); @@ -469,7 +471,7 @@ public void testReceiveLessThanRequested() { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); shardChangesRequests.clear(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 20, 0L, 31L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 20, 0L, 0L, 31L); task.innerHandleReadResponse(0L, 63L, response); assertThat(shardChangesRequests.size(), equalTo(1)); @@ -498,7 +500,7 @@ public void testCancelAndReceiveLessThanRequested() { shardChangesRequests.clear(); task.markAsCompleted(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 31L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 0L, 31L); task.innerHandleReadResponse(0L, 64L, response); assertThat(shardChangesRequests.size(), equalTo(0)); @@ -524,7 +526,7 @@ public void testReceiveNothingExpectedSomething() { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); shardChangesRequests.clear(); - task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 100, new Translog.Operation[0], 1L)); + task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 0, 100, new Translog.Operation[0], 1L)); assertThat(shardChangesRequests.size(), equalTo(1)); assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); @@ -547,7 +549,7 @@ public void testMappingUpdate() { mappingVersions.add(1L); task.coordinateReads(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 0L, 63L); task.handleReadResponse(0L, 63L, response); assertThat(bulkShardOperationRequests.size(), equalTo(1)); @@ -576,7 +578,7 @@ public void testMappingUpdateRetryableError() { } mappingVersions.add(1L); task.coordinateReads(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 0L, 63L); task.handleReadResponse(0L, 63L, response); assertThat(mappingUpdateFailures.size(), equalTo(0)); @@ -601,7 +603,7 @@ public void testMappingUpdateNonRetryableError() { mappingUpdateFailures.add(new RuntimeException()); task.coordinateReads(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 0L, 64L); task.handleReadResponse(0L, 64L, response); assertThat(bulkShardOperationRequests.size(), equalTo(0)); @@ -629,7 +631,7 @@ public void testCoordinateWrites() { assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); assertThat(shardChangesRequests.get(0)[1], equalTo(128L)); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 63L, response); @@ -649,7 +651,7 @@ public void testMaxOutstandingWrites() { params.maxWriteRequestOperationCount = 64; params.maxOutstandingWriteRequests = 2; ShardFollowNodeTask task = createShardFollowTask(params); - ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 256L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 0L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -662,7 +664,7 @@ public void testMaxOutstandingWrites() { params.maxOutstandingWriteRequests = 4; // change to 4 outstanding writers task = createShardFollowTask(params); - response = generateShardChangesResponse(0, 256, 0L, 256L); + response = generateShardChangesResponse(0, 256, 0L, 0L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -681,7 +683,7 @@ public void testMaxWriteRequestCount() { params.maxWriteRequestOperationCount = 8; params.maxOutstandingWriteRequests = 32; ShardFollowNodeTask task = createShardFollowTask(params); - ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 256L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 0L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -712,7 +714,7 @@ public void testRetryableError() { for (int i = 0; i < max; i++) { writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); } - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 63L, response); @@ -741,7 +743,7 @@ public void testNonRetryableError() { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); writeFailures.add(new RuntimeException()); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 63L, response); @@ -768,7 +770,7 @@ public void testMaxWriteRequestSize() { assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 64L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 64L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -791,7 +793,7 @@ public void testHandleWriteResponse() { shardChangesRequests.clear(); followerGlobalCheckpoints.add(63L); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 63L, response); @@ -866,6 +868,8 @@ private ShardFollowNodeTask createShardFollowTask(ShardFollowTaskParams params) writeFailures = new LinkedList<>(); mappingUpdateFailures = new LinkedList<>(); mappingVersions = new LinkedList<>(); + settingsUpdateFailures = new LinkedList<>(); + settingsVersions = new LinkedList<>(); leaderGlobalCheckpoints = new LinkedList<>(); followerGlobalCheckpoints = new LinkedList<>(); maxSeqNos = new LinkedList<>(); @@ -887,6 +891,20 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro } } + @Override + protected void innerUpdateSettings(LongConsumer handler, Consumer errorHandler) { + Exception failure = settingsUpdateFailures.poll(); + if (failure != null) { + errorHandler.accept(failure); + return; + } + + final Long settingsVersion = settingsVersions.poll(); + if (settingsVersion != null) { + handler.accept(settingsVersion); + } + } + @Override protected void innerSendBulkShardOperationsRequest( String followerHistoryUUID, final List operations, @@ -924,6 +942,7 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con } final ShardChangesAction.Response response = new ShardChangesAction.Response( mappingVersions.poll(), + 0L, leaderGlobalCheckpoints.poll(), maxSeqNos.poll(), randomNonNegativeLong(), @@ -946,7 +965,10 @@ public void markAsCompleted() { }; } - private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, long toSeqNo, long mappingVersion, + private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, + long toSeqNo, + long mappingVersion, + long settingsVersion, long leaderGlobalCheckPoint) { List ops = new ArrayList<>(); for (long seqNo = fromSeqNo; seqNo <= toSeqNo; seqNo++) { @@ -956,6 +978,7 @@ private static ShardChangesAction.Response generateShardChangesResponse(long fro } return new ShardChangesAction.Response( mappingVersion, + settingsVersion, leaderGlobalCheckPoint, leaderGlobalCheckPoint, randomNonNegativeLong(), From 0ff9ff1e594b09d949600b6367ea7e6c5386253e Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 31 Oct 2018 13:26:32 +0100 Subject: [PATCH 03/14] Added followerSettingsVersion to follow shard stats. --- .../xpack/ccr/action/ShardFollowNodeTask.java | 1 + .../ShardFollowNodeTaskStatusTests.java | 2 ++ .../xpack/ccr/action/StatsResponsesTests.java | 1 + .../ccr/FollowStatsMonitoringDocTests.java | 6 ++++- .../core/ccr/ShardFollowNodeTaskStatus.java | 22 ++++++++++++++++--- .../src/main/resources/monitoring-es.json | 3 +++ 6 files changed, 31 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 23e16795e7ae0..262b584ae5952 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -506,6 +506,7 @@ public synchronized ShardFollowNodeTaskStatus getStatus() { buffer.size(), bufferSizeInBytes, currentMappingVersion, + currentSettingsVersion, totalReadTimeMillis, totalReadRemoteExecTimeMillis, successfulReadRequests, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java index 95f8e86e09657..9cac01d278e74 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java @@ -58,6 +58,7 @@ protected ShardFollowNodeTaskStatus createTestInstance() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), randomReadExceptions(), randomLong(), randomBoolean() ? new ElasticsearchException("fatal error") : null); @@ -78,6 +79,7 @@ protected void assertEqualInstances(final ShardFollowNodeTaskStatus expectedInst assertThat(newInstance.outstandingWriteRequests(), equalTo(expectedInstance.outstandingWriteRequests())); assertThat(newInstance.writeBufferOperationCount(), equalTo(expectedInstance.writeBufferOperationCount())); assertThat(newInstance.followerMappingVersion(), equalTo(expectedInstance.followerMappingVersion())); + assertThat(newInstance.followerSettingsVersion(), equalTo(expectedInstance.followerSettingsVersion())); assertThat(newInstance.totalReadTimeMillis(), equalTo(expectedInstance.totalReadTimeMillis())); assertThat(newInstance.successfulReadRequests(), equalTo(expectedInstance.successfulReadRequests())); assertThat(newInstance.failedReadRequests(), equalTo(expectedInstance.failedReadRequests())); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java index 86851d98ffed2..c173404955826 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java @@ -55,6 +55,7 @@ static FollowStatsAction.StatsResponses createStatsResponse() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), Collections.emptyNavigableMap(), randomLong(), randomBoolean() ? new ElasticsearchException("fatal error") : null); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java index f3e0c2d5bd7b3..410d573e1b4c0 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java @@ -93,7 +93,8 @@ public void testToXContent() throws IOException { final int numberOfConcurrentWrites = randomIntBetween(1, Integer.MAX_VALUE); final int writeBufferOperationCount = randomIntBetween(0, Integer.MAX_VALUE); final long writeBufferSizeInBytes = randomNonNegativeLong(); - final long followerMappingVersion = randomIntBetween(0, Integer.MAX_VALUE); + final long followerMappingVersion = randomNonNegativeLong(); + final long followerSettingsVersion = randomNonNegativeLong(); final long totalReadTimeMillis = randomLongBetween(0, 4096); final long totalReadRemoteExecTimeMillis = randomLongBetween(0, 4096); final long successfulReadRequests = randomNonNegativeLong(); @@ -124,6 +125,7 @@ public void testToXContent() throws IOException { writeBufferOperationCount, writeBufferSizeInBytes, followerMappingVersion, + followerSettingsVersion, totalReadTimeMillis, totalReadRemoteExecTimeMillis, successfulReadRequests, @@ -170,6 +172,7 @@ public void testToXContent() throws IOException { + "\"write_buffer_operation_count\":" + writeBufferOperationCount + "," + "\"write_buffer_size_in_bytes\":" + writeBufferSizeInBytes + "," + "\"follower_mapping_version\":" + followerMappingVersion + "," + + "\"follower_settings_version\":" + followerSettingsVersion + "," + "\"total_read_time_millis\":" + totalReadTimeMillis + "," + "\"total_read_remote_exec_time_millis\":" + totalReadRemoteExecTimeMillis + "," + "\"successful_read_requests\":" + successfulReadRequests + "," @@ -214,6 +217,7 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException { 1, 1, 1, + 1, 100, 50, 10, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java index b8f645eea4430..33a5d495c1631 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java @@ -48,6 +48,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { private static final ParseField WRITE_BUFFER_OPERATION_COUNT_FIELD = new ParseField("write_buffer_operation_count"); private static final ParseField WRITE_BUFFER_SIZE_IN_BYTES_FIELD = new ParseField("write_buffer_size_in_bytes"); private static final ParseField FOLLOWER_MAPPING_VERSION_FIELD = new ParseField("follower_mapping_version"); + private static final ParseField FOLLOWER_SETTINGS_VERSION_FIELD = new ParseField("follower_settings_version"); private static final ParseField TOTAL_READ_TIME_MILLIS_FIELD = new ParseField("total_read_time_millis"); private static final ParseField TOTAL_READ_REMOTE_EXEC_TIME_MILLIS_FIELD = new ParseField("total_read_remote_exec_time_millis"); private static final ParseField SUCCESSFUL_READ_REQUESTS_FIELD = new ParseField("successful_read_requests"); @@ -91,12 +92,13 @@ public class ShardFollowNodeTaskStatus implements Task.Status { (long) args[21], (long) args[22], (long) args[23], + (long) args[24], new TreeMap<>( - ((List>>) args[24]) + ((List>>) args[25]) .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), - (long) args[25], - (ElasticsearchException) args[26])); + (long) args[26], + (ElasticsearchException) args[27])); public static final String READ_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-read-exceptions-entry"; @@ -120,6 +122,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), WRITE_BUFFER_OPERATION_COUNT_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), WRITE_BUFFER_SIZE_IN_BYTES_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAPPING_VERSION_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_SETTINGS_VERSION_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_READ_TIME_MILLIS_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_READ_REMOTE_EXEC_TIME_MILLIS_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), SUCCESSFUL_READ_REQUESTS_FIELD); @@ -234,6 +237,12 @@ public long followerMappingVersion() { return followerMappingVersion; } + private final long followerSettingsVersion; + + public long followerSettingsVersion() { + return followerSettingsVersion; + } + private final long totalReadTimeMillis; public long totalReadTimeMillis() { @@ -327,6 +336,7 @@ public ShardFollowNodeTaskStatus( final int writeBufferOperationCount, final long writeBufferSizeInBytes, final long followerMappingVersion, + final long followerSettingsVersion, final long totalReadTimeMillis, final long totalReadRemoteExecTimeMillis, final long successfulReadRequests, @@ -354,6 +364,7 @@ public ShardFollowNodeTaskStatus( this.writeBufferOperationCount = writeBufferOperationCount; this.writeBufferSizeInBytes = writeBufferSizeInBytes; this.followerMappingVersion = followerMappingVersion; + this.followerSettingsVersion = followerSettingsVersion; this.totalReadTimeMillis = totalReadTimeMillis; this.totalReadRemoteExecTimeMillis = totalReadRemoteExecTimeMillis; this.successfulReadRequests = successfulReadRequests; @@ -384,6 +395,7 @@ public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException { this.writeBufferOperationCount = in.readVInt(); this.writeBufferSizeInBytes = in.readVLong(); this.followerMappingVersion = in.readVLong(); + this.followerSettingsVersion = in.readVLong(); this.totalReadTimeMillis = in.readVLong(); this.totalReadRemoteExecTimeMillis = in.readVLong(); this.successfulReadRequests = in.readVLong(); @@ -421,6 +433,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeVInt(writeBufferOperationCount); out.writeVLong(writeBufferSizeInBytes); out.writeVLong(followerMappingVersion); + out.writeVLong(followerSettingsVersion); out.writeVLong(totalReadTimeMillis); out.writeVLong(totalReadRemoteExecTimeMillis); out.writeVLong(successfulReadRequests); @@ -470,6 +483,7 @@ public XContentBuilder toXContentFragment(final XContentBuilder builder, final P "write_buffer_size", new ByteSizeValue(writeBufferSizeInBytes)); builder.field(FOLLOWER_MAPPING_VERSION_FIELD.getPreferredName(), followerMappingVersion); + builder.field(FOLLOWER_SETTINGS_VERSION_FIELD.getPreferredName(), followerSettingsVersion); builder.humanReadableField( TOTAL_READ_TIME_MILLIS_FIELD.getPreferredName(), "total_read_time", @@ -550,6 +564,7 @@ public boolean equals(final Object o) { writeBufferOperationCount == that.writeBufferOperationCount && writeBufferSizeInBytes == that.writeBufferSizeInBytes && followerMappingVersion == that.followerMappingVersion && + followerSettingsVersion== that.followerSettingsVersion && totalReadTimeMillis == that.totalReadTimeMillis && totalReadRemoteExecTimeMillis == that.totalReadRemoteExecTimeMillis && successfulReadRequests == that.successfulReadRequests && @@ -588,6 +603,7 @@ public int hashCode() { writeBufferOperationCount, writeBufferSizeInBytes, followerMappingVersion, + followerSettingsVersion, totalReadTimeMillis, totalReadRemoteExecTimeMillis, successfulReadRequests, diff --git a/x-pack/plugin/core/src/main/resources/monitoring-es.json b/x-pack/plugin/core/src/main/resources/monitoring-es.json index bdd16d3b58cc6..c1cd0e2aef12d 100644 --- a/x-pack/plugin/core/src/main/resources/monitoring-es.json +++ b/x-pack/plugin/core/src/main/resources/monitoring-es.json @@ -971,6 +971,9 @@ "follower_mapping_version": { "type": "long" }, + "follower_settings_version": { + "type": "long" + }, "total_read_time_millis": { "type": "long" }, From b71bee27f282c96708f394bcf3d9f5ab428574bd Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 31 Oct 2018 13:51:24 +0100 Subject: [PATCH 04/14] added unit tests --- .../ccr/action/ShardFollowNodeTaskTests.java | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 4421db63cbab7..cf7f901aaba33 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -616,6 +616,85 @@ public void testMappingUpdateNonRetryableError() { assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); } + public void testSettingsUpdate() { + ShardFollowTaskParams params = new ShardFollowTaskParams(); + params.maxReadRequestOperationCount = 64; + params.maxOutstandingReadRequests = 1; + params.maxOutstandingWriteRequests = 1; + ShardFollowNodeTask task = createShardFollowTask(params); + startTask(task, 63, -1); + + settingsVersions.add(1L); + task.coordinateReads(); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 1L, 63L); + task.handleReadResponse(0L, 63L, response); + + assertThat(bulkShardOperationRequests.size(), equalTo(1)); + assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations()))); + + ShardFollowNodeTaskStatus status = task.getStatus(); + assertThat(status.followerMappingVersion(), equalTo(0L)); + assertThat(status.followerSettingsVersion(), equalTo(1L)); + assertThat(status.outstandingReadRequests(), equalTo(1)); + assertThat(status.outstandingWriteRequests(), equalTo(1)); + assertThat(status.lastRequestedSeqNo(), equalTo(63L)); + assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); + assertThat(status.followerGlobalCheckpoint(), equalTo(-1L)); + } + + public void testSettingsUpdateRetryableError() { + ShardFollowTaskParams params = new ShardFollowTaskParams(); + params.maxReadRequestOperationCount = 64; + params.maxOutstandingReadRequests = 1; + params.maxOutstandingWriteRequests = 1; + ShardFollowNodeTask task = createShardFollowTask(params); + startTask(task, 63, -1); + + int max = randomIntBetween(1, 30); + for (int i = 0; i < max; i++) { + settingsUpdateFailures.add(new ConnectException()); + } + settingsVersions.add(1L); + task.coordinateReads(); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 1L, 63L); + task.handleReadResponse(0L, 63L, response); + + assertThat(mappingUpdateFailures.size(), equalTo(0)); + assertThat(bulkShardOperationRequests.size(), equalTo(1)); + assertThat(task.isStopped(), equalTo(false)); + ShardFollowNodeTaskStatus status = task.getStatus(); + assertThat(status.followerMappingVersion(), equalTo(0L)); + assertThat(status.followerSettingsVersion(), equalTo(1L)); + assertThat(status.outstandingReadRequests(), equalTo(1)); + assertThat(status.outstandingWriteRequests(), equalTo(1)); + assertThat(status.lastRequestedSeqNo(), equalTo(63L)); + assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); + } + + public void testSettingsUpdateNonRetryableError() { + ShardFollowTaskParams params = new ShardFollowTaskParams(); + params.maxReadRequestOperationCount = 64; + params.maxOutstandingReadRequests = 1; + params.maxOutstandingWriteRequests = 1; + ShardFollowNodeTask task = createShardFollowTask(params); + startTask(task, 63, -1); + + settingsUpdateFailures.add(new RuntimeException()); + task.coordinateReads(); + ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 1L, 64L); + task.handleReadResponse(0L, 64L, response); + + assertThat(bulkShardOperationRequests.size(), equalTo(0)); + assertThat(task.isStopped(), equalTo(true)); + ShardFollowNodeTaskStatus status = task.getStatus(); + assertThat(status.followerMappingVersion(), equalTo(0L)); + assertThat(status.followerSettingsVersion(), equalTo(0L)); + assertThat(status.outstandingReadRequests(), equalTo(1)); + assertThat(status.outstandingWriteRequests(), equalTo(0)); + assertThat(status.lastRequestedSeqNo(), equalTo(63L)); + assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); + } + public void testCoordinateWrites() { ShardFollowTaskParams params = new ShardFollowTaskParams(); params.maxReadRequestOperationCount = 128; From 05b51adfbf92cee3f314df36a02126d77ef8fa2c Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 31 Oct 2018 18:12:16 +0100 Subject: [PATCH 05/14] Added integration tests --- .../xpack/ccr/IndexFollowingIT.java | 108 ++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index acfc9996294c3..f8f9b4d8f6dab 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -11,7 +11,12 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.bulk.BulkProcessor; @@ -540,6 +545,109 @@ public void testUnknownClusterAlias() throws Exception { assertThat(e.getMessage(), equalTo("unknown cluster alias [another_cluster]")); } + public void testUpdateLeaderIndexSettings() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 0, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("leader"); + + final PutFollowAction.Request followRequest = putFollow("leader", "follower"); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + final long firstBatchNumDocs = randomIntBetween(2, 64); + for (long i = 0; i < firstBatchNumDocs; i++) { + leaderClient().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get(); + } + assertBusy(() -> assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits, equalTo(firstBatchNumDocs))); + + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest("leader"); + updateSettingsRequest.settings(Settings.builder().put("index.max_ngram_diff", 2)); + assertAcked(leaderClient().admin().indices().updateSettings(updateSettingsRequest).actionGet()); + + final int secondBatchNumDocs = randomIntBetween(2, 64); + for (long i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { + leaderClient().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get(); + } + assertBusy(() -> { + GetSettingsRequest getSettingsRequest = new GetSettingsRequest(); + getSettingsRequest.indices("follower"); + GetSettingsResponse getSettingsResponse = followerClient().admin().indices().getSettings(getSettingsRequest).actionGet(); + assertThat(getSettingsResponse.getSetting("follower", "index.max_ngram_diff"), equalTo("2")); + + try { + assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits, + equalTo(firstBatchNumDocs + secondBatchNumDocs)); + } catch (Exception e) { + throw new AssertionError("error while searching", e); + } + }); + } + + public void testUpdateLeaderAnalysisSettings() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 0, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("leader"); + + final PutFollowAction.Request followRequest = putFollow("leader", "follower"); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + final long firstBatchNumDocs = randomIntBetween(2, 64); + for (long i = 0; i < firstBatchNumDocs; i++) { + leaderClient().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get(); + } + + assertBusy(() -> assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits, equalTo(firstBatchNumDocs))); + + CloseIndexRequest closeIndexRequest = new CloseIndexRequest("leader"); + assertAcked(leaderClient().admin().indices().close(closeIndexRequest).actionGet()); + + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest("leader"); + updateSettingsRequest.settings(Settings.builder() + .put("index.analysis.analyzer.my_analyzer.type", "custom") + .put("index.analysis.analyzer.my_analyzer.tokenizer", "keyword") + ); + assertAcked(leaderClient().admin().indices().updateSettings(updateSettingsRequest).actionGet()); + + OpenIndexRequest openIndexRequest = new OpenIndexRequest("leader"); + assertAcked(leaderClient().admin().indices().open(openIndexRequest).actionGet()); + ensureLeaderGreen("leader"); + + PutMappingRequest putMappingRequest = new PutMappingRequest("leader"); + putMappingRequest.type("doc"); + putMappingRequest.source("new_field", "type=text,analyzer=my_analyzer"); + assertAcked(leaderClient().admin().indices().putMapping(putMappingRequest).actionGet()); + + final int secondBatchNumDocs = randomIntBetween(2, 64); + for (long i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"new_field\":\"value %d\"}", i); + leaderClient().prepareIndex("leader", "doc").setSource(source, XContentType.JSON).get(); + } + + assertBusy(() -> { + GetSettingsRequest getSettingsRequest = new GetSettingsRequest(); + getSettingsRequest.indices("follower"); + GetSettingsResponse getSettingsResponse = followerClient().admin().indices().getSettings(getSettingsRequest).actionGet(); + assertThat(getSettingsResponse.getSetting("follower", "index.analysis.analyzer.my_analyzer.type"), equalTo("custom")); + assertThat(getSettingsResponse.getSetting("follower", "index.analysis.analyzer.my_analyzer.tokenizer"), equalTo("keyword")); + + GetMappingsRequest getMappingsRequest = new GetMappingsRequest(); + getMappingsRequest.indices("follower"); + GetMappingsResponse getMappingsResponse = followerClient().admin().indices().getMappings(getMappingsRequest).actionGet(); + MappingMetaData mappingMetaData = getMappingsResponse.getMappings().get("follower").get("doc"); + assertThat(XContentMapValues.extractValue("properties.new_field.type", mappingMetaData.sourceAsMap()), equalTo("text")); + assertThat(XContentMapValues.extractValue("properties.new_field.analyzer", mappingMetaData.sourceAsMap()), + equalTo("my_analyzer")); + + try { + assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits, + equalTo(firstBatchNumDocs + secondBatchNumDocs)); + } catch (Exception e) { + throw new AssertionError("error while searching", e); + } + }); + } + private CheckedRunnable assertTask(final int numberOfPrimaryShards, final Map numDocsPerShard) { return () -> { final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState(); From 02f3635e55148f130566a1db1248651eca0247db Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 1 Nov 2018 09:48:04 +0100 Subject: [PATCH 06/14] Tidy up async calls --- .../xpack/ccr/action/ShardFollowNodeTask.java | 12 +- .../ccr/action/ShardFollowTasksExecutor.java | 110 +++++++----------- 2 files changed, 52 insertions(+), 70 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 262b584ae5952..543df44e6f5ae 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -279,9 +279,15 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR } void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) { - maybeUpdateMapping( - response.getMappingVersion(), - () -> maybeUpdateSettings(response.getSettingsVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response))); + // In order to process this read response (3), we need to check and potentially update the follow index's setting (1) and + // check and potentially update the follow index's mappings (2). + + // 3) handle read response: + Runnable handleResponseTask = () -> innerHandleReadResponse(from, maxRequiredSeqNo, response); + // 2) update follow index mapping: + Runnable updateMappingsTask = () -> maybeUpdateMapping(response.getMappingVersion(), handleResponseTask); + // 1) update follow index settings: + maybeUpdateSettings(response.getSettingsVersion(), updateMappingsTask); } /** Called when some operations are fetched from the leading */ diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 2004891b11f30..08d7f38f611fe 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -8,9 +8,11 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; +import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; @@ -22,6 +24,8 @@ import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.XContentType; @@ -136,7 +140,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro } @Override - protected void innerUpdateSettings(final LongConsumer handler, final Consumer errorHandler) { + protected void innerUpdateSettings(final LongConsumer finalHandler, final Consumer errorHandler) { final Index leaderIndex = params.getLeaderShardId().getIndex(); final Index followIndex = params.getFollowShardId().getIndex(); @@ -145,72 +149,45 @@ protected void innerUpdateSettings(final LongConsumer handler, final Consumer { - final IndexMetaData indexMetaData = - clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); - - final Settings existingSettings = - TransportResumeFollowAction.filter( - clusterService.state().metaData().getIndexSafe(followIndex).getSettings()); - final Settings settings = TransportResumeFollowAction.filter(indexMetaData.getSettings()); - if (existingSettings.equals(settings)) { - handler.accept(indexMetaData.getSettingsVersion()); - return; - } - final Settings updatedSettings = settings.filter( - s -> existingSettings.get(s) == null - || existingSettings.get(s).equals(settings.get(s)) == false); - final UpdateSettingsRequest updateSettingsRequest = - new UpdateSettingsRequest(followIndex.getName()); - updateSettingsRequest.settings(updatedSettings); - - // TODO: we should only close if needed, iterate over the settings and check if any non-dynamic - final ActionListener close = - getChainedListener( - handler, - errorHandler, - followIndex, - indexMetaData, - updateSettingsRequest); - - followerClient - .admin() - .indices() - .close(new CloseIndexRequest(followIndex.getName()), close); - - }, - errorHandler)); + CheckedConsumer onResponse = clusterStateResponse -> { + final IndexMetaData leaderIMD = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); + final IndexMetaData followerIMD = clusterService.state().metaData().getIndexSafe(followIndex); + + final Settings existingSettings = TransportResumeFollowAction.filter(followerIMD.getSettings()); + final Settings settings = TransportResumeFollowAction.filter(leaderIMD.getSettings()); + if (existingSettings.equals(settings)) { + finalHandler.accept(leaderIMD.getSettingsVersion()); + } else { + final Settings updatedSettings = settings.filter( + s -> existingSettings.get(s) == null || existingSettings.get(s).equals(settings.get(s)) == false + ); + // TODO: we should only close if needed, iterate over the settings and check if any non-dynamic + Runnable handler = () -> finalHandler.accept(leaderIMD.getSettingsVersion()); + closeIndex(followIndex.getName(), updatedSettings, handler, errorHandler); + } + }; + leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler)); + } + + private void closeIndex(String followIndex, Settings updatedSettings, Runnable handler, Consumer onFailure) { + CloseIndexRequest closeRequest = new CloseIndexRequest(followIndex); + CheckedConsumer onResponse = response -> { + updateSettings(followIndex, updatedSettings, handler, onFailure); + }; + followerClient.admin().indices().close(closeRequest, ActionListener.wrap(onResponse, onFailure)); + } + + private void updateSettings(String followIndex, Settings updatedSettings, Runnable handler, Consumer onFailure) { + final UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(followIndex); + updateSettingsRequest.settings(updatedSettings); + CheckedConsumer onResponse = response -> openIndex(followIndex, handler, onFailure); + followerClient.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(onResponse, onFailure)); } - // TODO: this is horribly unreadable, we need a better approach here - private ActionListener getChainedListener( - LongConsumer handler, - Consumer errorHandler, - Index followIndex, - IndexMetaData indexMetaData, - UpdateSettingsRequest updateSettingsRequest) { - return ActionListener.wrap( - response -> followerClient - .admin() - .indices() - .updateSettings( - updateSettingsRequest, - ActionListener.wrap( - us -> followerClient.admin() - .indices() - .open( - new OpenIndexRequest(followIndex.getName()), - ActionListener.wrap( - open -> handler.accept(indexMetaData.getSettingsVersion()), - errorHandler)), - errorHandler)), - errorHandler); + private void openIndex(String followIndex, Runnable handler, Consumer onFailure) { + OpenIndexRequest openIndexRequest = new OpenIndexRequest(followIndex); + CheckedConsumer onResponse = response -> handler.run(); + followerClient.admin().indices().open(openIndexRequest, ActionListener.wrap(onResponse, onFailure)); } @Override @@ -223,8 +200,7 @@ protected void innerSendBulkShardOperationsRequest( final BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(), followerHistoryUUID, operations, maxSeqNoOfUpdatesOrDeletes); - followerClient.execute(BulkShardOperationsAction.INSTANCE, request, - ActionListener.wrap(response -> handler.accept(response), errorHandler)); + followerClient.execute(BulkShardOperationsAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler)); } @Override From 485aa479a2e1d5825dafb04fe0320f849c2efc9a Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 1 Nov 2018 10:32:00 +0100 Subject: [PATCH 07/14] added test for whitelisted settings --- .../xpack/ccr/IndexFollowingIT.java | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index f8f9b4d8f6dab..f6eefff33af19 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -64,6 +64,7 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -560,6 +561,14 @@ public void testUpdateLeaderIndexSettings() throws Exception { } assertBusy(() -> assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits, equalTo(firstBatchNumDocs))); + // Sanity check that the setting has not been set in follower index: + { + GetSettingsRequest getSettingsRequest = new GetSettingsRequest(); + getSettingsRequest.indices("follower"); + GetSettingsResponse getSettingsResponse = followerClient().admin().indices().getSettings(getSettingsRequest).actionGet(); + assertThat(getSettingsResponse.getSetting("follower", "index.max_ngram_diff"), nullValue()); + } + assertThat(getFollowTaskSettingsVersion("follower"), equalTo(1L)); UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest("leader"); updateSettingsRequest.settings(Settings.builder().put("index.max_ngram_diff", 2)); assertAcked(leaderClient().admin().indices().updateSettings(updateSettingsRequest).actionGet()); @@ -569,10 +578,59 @@ public void testUpdateLeaderIndexSettings() throws Exception { leaderClient().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get(); } assertBusy(() -> { + // Check that the setting has been set in follower index: GetSettingsRequest getSettingsRequest = new GetSettingsRequest(); getSettingsRequest.indices("follower"); GetSettingsResponse getSettingsResponse = followerClient().admin().indices().getSettings(getSettingsRequest).actionGet(); assertThat(getSettingsResponse.getSetting("follower", "index.max_ngram_diff"), equalTo("2")); + assertThat(getFollowTaskSettingsVersion("follower"), equalTo(2L)); + + try { + assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits, + equalTo(firstBatchNumDocs + secondBatchNumDocs)); + } catch (Exception e) { + throw new AssertionError("error while searching", e); + } + }); + } + + public void testUpdateWhiteListedSetting() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 0, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("leader"); + + final PutFollowAction.Request followRequest = putFollow("leader", "follower"); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + final long firstBatchNumDocs = randomIntBetween(2, 64); + for (long i = 0; i < firstBatchNumDocs; i++) { + leaderClient().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get(); + } + assertBusy(() -> assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits, equalTo(firstBatchNumDocs))); + + // Sanity check that the setting has not been set in follower index: + { + GetSettingsRequest getSettingsRequest = new GetSettingsRequest(); + getSettingsRequest.indices("follower"); + GetSettingsResponse getSettingsResponse = followerClient().admin().indices().getSettings(getSettingsRequest).actionGet(); + assertThat(getSettingsResponse.getSetting("follower", "index.number_of_replicas"), equalTo("0")); + } + assertThat(getFollowTaskSettingsVersion("follower"), equalTo(1L)); + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest("leader"); + updateSettingsRequest.settings(Settings.builder().put("index.number_of_replicas", 1)); + assertAcked(leaderClient().admin().indices().updateSettings(updateSettingsRequest).actionGet()); + + final int secondBatchNumDocs = randomIntBetween(2, 64); + for (long i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { + leaderClient().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get(); + } + assertBusy(() -> { + GetSettingsRequest getSettingsRequest = new GetSettingsRequest(); + getSettingsRequest.indices("follower"); + GetSettingsResponse getSettingsResponse = followerClient().admin().indices().getSettings(getSettingsRequest).actionGet(); + assertThat(getSettingsResponse.getSetting("follower", "index.number_of_replicas"), equalTo("0")); + assertThat(getFollowTaskSettingsVersion("follower"), equalTo(2L)); try { assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits, @@ -598,6 +656,8 @@ public void testUpdateLeaderAnalysisSettings() throws Exception { } assertBusy(() -> assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits, equalTo(firstBatchNumDocs))); + assertThat(getFollowTaskSettingsVersion("follower"), equalTo(1L)); + assertThat(getFollowTaskMappingVersion("follower"), equalTo(1L)); CloseIndexRequest closeIndexRequest = new CloseIndexRequest("leader"); assertAcked(leaderClient().admin().indices().close(closeIndexRequest).actionGet()); @@ -625,6 +685,9 @@ public void testUpdateLeaderAnalysisSettings() throws Exception { } assertBusy(() -> { + assertThat(getFollowTaskSettingsVersion("follower"), equalTo(2L)); + assertThat(getFollowTaskMappingVersion("follower"), equalTo(2L)); + GetSettingsRequest getSettingsRequest = new GetSettingsRequest(); getSettingsRequest.indices("follower"); GetSettingsResponse getSettingsResponse = followerClient().admin().indices().getSettings(getSettingsRequest).actionGet(); @@ -648,6 +711,40 @@ public void testUpdateLeaderAnalysisSettings() throws Exception { }); } + private long getFollowTaskSettingsVersion(String followerIndex) { + long settingsVersion = -1L; + for (ShardFollowNodeTaskStatus status : getFollowTaskStatuses(followerIndex)) { + if (settingsVersion == -1L) { + settingsVersion = status.followerSettingsVersion(); + } else { + assert settingsVersion == status.followerSettingsVersion(); + } + } + return settingsVersion; + } + + private long getFollowTaskMappingVersion(String followerIndex) { + long mappingVersion = -1L; + for (ShardFollowNodeTaskStatus status : getFollowTaskStatuses(followerIndex)) { + if (mappingVersion == -1L) { + mappingVersion = status.followerMappingVersion(); + } else { + assert mappingVersion == status.followerMappingVersion(); + } + } + return mappingVersion; + } + + private List getFollowTaskStatuses(String followerIndex) { + FollowStatsAction.StatsRequest request = new StatsRequest(); + request.setIndices(new String[]{followerIndex}); + FollowStatsAction.StatsResponses response = followerClient().execute(FollowStatsAction.INSTANCE, request).actionGet(); + return response.getStatsResponses().stream() + .map(FollowStatsAction.StatsResponse::status) + .filter(status -> status.followerIndex().equals(followerIndex)) + .collect(Collectors.toList()); + } + private CheckedRunnable assertTask(final int numberOfPrimaryShards, final Map numDocsPerShard) { return () -> { final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState(); From 91ca75395fc37c0df56a14fb7c81e839859a1685 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 1 Nov 2018 11:30:32 +0100 Subject: [PATCH 08/14] Only close index if a setting is not dynamic. --- .../java/org/elasticsearch/node/Node.java | 4 +- .../plugins/PersistentTaskPlugin.java | 5 ++- .../persistent/TestPersistentTasksPlugin.java | 5 ++- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 6 ++- .../ccr/action/ShardFollowTasksExecutor.java | 37 ++++++++++++++++--- .../xpack/ccr/IndexFollowingIT.java | 27 ++++++++++++-- .../core/LocalStateCompositeXPackPlugin.java | 7 ++-- .../xpack/ml/MachineLearning.java | 4 +- .../elasticsearch/xpack/rollup/Rollup.java | 4 +- 9 files changed, 81 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 6480ef3ffaebc..24ec97a012628 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -71,6 +71,7 @@ import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.SettingUpgrader; @@ -488,9 +489,10 @@ protected Node( threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(), responseCollectorService); + final IndexScopedSettings indexScopedSettings = settingsModule.getIndexScopedSettings(); final List> tasksExecutors = pluginsService .filterPlugins(PersistentTaskPlugin.class).stream() - .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client)) + .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, indexScopedSettings)) .flatMap(List::stream) .collect(toList()); diff --git a/server/src/main/java/org/elasticsearch/plugins/PersistentTaskPlugin.java b/server/src/main/java/org/elasticsearch/plugins/PersistentTaskPlugin.java index 5e3319a2bc5cd..c96b5aa1f5c03 100644 --- a/server/src/main/java/org/elasticsearch/plugins/PersistentTaskPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/PersistentTaskPlugin.java @@ -20,6 +20,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.threadpool.ThreadPool; @@ -35,7 +36,9 @@ public interface PersistentTaskPlugin { * Returns additional persistent tasks executors added by this plugin. */ default List> getPersistentTasksExecutor(ClusterService clusterService, - ThreadPool threadPool, Client client) { + ThreadPool threadPool, + Client client, + IndexScopedSettings indexScopedSettings) { return Collections.emptyList(); } diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index 2e99b83378b54..f7bbaf2d562a9 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -90,7 +91,9 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P @Override public List> getPersistentTasksExecutor(ClusterService clusterService, - ThreadPool threadPool, Client client) { + ThreadPool threadPool, + Client client, + IndexScopedSettings indexScopedSettings) { return Collections.singletonList(new TestPersistentTasksExecutor(clusterService)); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 085d58bce83da..b3428ed34a455 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -149,8 +149,10 @@ public Collection createComponents( @Override public List> getPersistentTasksExecutor(ClusterService clusterService, - ThreadPool threadPool, Client client) { - return Collections.singletonList(new ShardFollowTasksExecutor(client, threadPool, clusterService)); + ThreadPool threadPool, + Client client, + IndexScopedSettings indexScopedSettings) { + return Collections.singletonList(new ShardFollowTasksExecutor(client, threadPool, clusterService, indexScopedSettings)); } public List> getActions() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 08d7f38f611fe..6cef9689409b4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; @@ -64,12 +65,17 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor onResponse = clusterStateResponse -> { + CheckedConsumer onResponse = clusterStateResponse -> { final IndexMetaData leaderIMD = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); final IndexMetaData followerIMD = clusterService.state().metaData().getIndexSafe(followIndex); final Settings existingSettings = TransportResumeFollowAction.filter(followerIMD.getSettings()); final Settings settings = TransportResumeFollowAction.filter(leaderIMD.getSettings()); if (existingSettings.equals(settings)) { + // If no settings have been changed then just propagate settings version to shard follow node task: finalHandler.accept(leaderIMD.getSettingsVersion()); } else { + // Figure out which settings have been updated: final Settings updatedSettings = settings.filter( s -> existingSettings.get(s) == null || existingSettings.get(s).equals(settings.get(s)) == false ); - // TODO: we should only close if needed, iterate over the settings and check if any non-dynamic - Runnable handler = () -> finalHandler.accept(leaderIMD.getSettingsVersion()); - closeIndex(followIndex.getName(), updatedSettings, handler, errorHandler); + + // Figure out whether the updated settings are all dynamic settings: + boolean onlyDynamicSettings = true; + for (String key : updatedSettings.keySet()) { + if (indexScopedSettings.isDynamicSetting(key) == false) { + onlyDynamicSettings = false; + break; + } + } + + if (onlyDynamicSettings) { + // If only dynamic settings have been updated then just update these settings in follower index: + final UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(followIndex.getName()); + updateSettingsRequest.settings(updatedSettings); + followerClient.admin().indices().updateSettings(updateSettingsRequest, + ActionListener.wrap(response -> finalHandler.accept(leaderIMD.getSettingsVersion()), errorHandler)); + } else { + // If one or more setting are not dynamic then close follow index, update leader settings and + // then open leader index: + Runnable handler = () -> finalHandler.accept(leaderIMD.getSettingsVersion()); + closeIndex(followIndex.getName(), updatedSettings, handler, errorHandler); + } } }; leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index f6eefff33af19..f53517ad7c317 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; @@ -64,6 +65,7 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BooleanSupplier; import java.util.stream.Collectors; import static java.util.Collections.singletonMap; @@ -546,7 +548,7 @@ public void testUnknownClusterAlias() throws Exception { assertThat(e.getMessage(), equalTo("unknown cluster alias [another_cluster]")); } - public void testUpdateLeaderIndexSettings() throws Exception { + public void testUpdateDynamicLeaderIndexSettings() throws Exception { final String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(leaderClient().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); @@ -554,6 +556,7 @@ public void testUpdateLeaderIndexSettings() throws Exception { final PutFollowAction.Request followRequest = putFollow("leader", "follower"); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + BooleanSupplier hasFollowIndexBeenClosedChecker = hasFollowIndexBeenClosed("follower"); final long firstBatchNumDocs = randomIntBetween(2, 64); for (long i = 0; i < firstBatchNumDocs; i++) { @@ -592,9 +595,10 @@ public void testUpdateLeaderIndexSettings() throws Exception { throw new AssertionError("error while searching", e); } }); + assertThat(hasFollowIndexBeenClosedChecker.getAsBoolean(), is(false)); } - public void testUpdateWhiteListedSetting() throws Exception { + public void testUpdateWhiteListedLeaderIndexSettings() throws Exception { final String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(leaderClient().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); @@ -602,6 +606,7 @@ public void testUpdateWhiteListedSetting() throws Exception { final PutFollowAction.Request followRequest = putFollow("leader", "follower"); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + BooleanSupplier hasFollowIndexBeenClosedChecker = hasFollowIndexBeenClosed("follower"); final long firstBatchNumDocs = randomIntBetween(2, 64); for (long i = 0; i < firstBatchNumDocs; i++) { @@ -639,9 +644,10 @@ public void testUpdateWhiteListedSetting() throws Exception { throw new AssertionError("error while searching", e); } }); + assertThat(hasFollowIndexBeenClosedChecker.getAsBoolean(), is(false)); } - public void testUpdateLeaderAnalysisSettings() throws Exception { + public void testUpdateAnalysisLeaderIndexSettings() throws Exception { final String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(leaderClient().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); @@ -649,6 +655,7 @@ public void testUpdateLeaderAnalysisSettings() throws Exception { final PutFollowAction.Request followRequest = putFollow("leader", "follower"); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + BooleanSupplier hasFollowIndexBeenClosedChecker = hasFollowIndexBeenClosed("follower"); final long firstBatchNumDocs = randomIntBetween(2, 64); for (long i = 0; i < firstBatchNumDocs; i++) { @@ -709,6 +716,7 @@ public void testUpdateLeaderAnalysisSettings() throws Exception { throw new AssertionError("error while searching", e); } }); + assertThat(hasFollowIndexBeenClosedChecker.getAsBoolean(), is(true)); } private long getFollowTaskSettingsVersion(String followerIndex) { @@ -745,6 +753,19 @@ private List getFollowTaskStatuses(String followerInd .collect(Collectors.toList()); } + private BooleanSupplier hasFollowIndexBeenClosed(String indexName) { + String electedMasterNode = getFollowerCluster().getMasterName(); + ClusterService clusterService = getFollowerCluster().getInstance(ClusterService.class, electedMasterNode); + AtomicBoolean closed = new AtomicBoolean(false); + clusterService.addListener(event -> { + IndexMetaData indexMetaData = event.state().metaData().index(indexName); + if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { + closed.set(true); + } + }); + return closed::get; + } + private CheckedRunnable assertTask(final int numberOfPrimaryShards, final Map numDocsPerShard) { return () -> { final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState(); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index f0db64d3271c8..eb184d44a8b5c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -74,7 +74,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Optional; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Predicate; @@ -384,9 +383,11 @@ public BiConsumer getJoinValidator() { @Override public List> getPersistentTasksExecutor(ClusterService clusterService, - ThreadPool threadPool, Client client) { + ThreadPool threadPool, + Client client, + IndexScopedSettings indexScopedSettings) { return filterPlugins(PersistentTaskPlugin.class).stream() - .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client)) + .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, indexScopedSettings)) .flatMap(List::stream) .collect(toList()); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 086754054b494..31f550fd45332 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -431,7 +431,9 @@ public Collection createComponents(Client client, ClusterService cluster } public List> getPersistentTasksExecutor(ClusterService clusterService, - ThreadPool threadPool, Client client) { + ThreadPool threadPool, + Client client, + IndexScopedSettings indexScopedSettings) { if (enabled == false || transportClientMode) { return emptyList(); } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java index 62bb18ee03331..ca458e61606e2 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java @@ -189,7 +189,9 @@ public List> getExecutorBuilders(Settings settings) { @Override public List> getPersistentTasksExecutor(ClusterService clusterService, - ThreadPool threadPool, Client client) { + ThreadPool threadPool, + Client client, + IndexScopedSettings indexScopedSettings) { if (enabled == false || transportClientMode ) { return emptyList(); } From acbfd12067e03c90bae74fe49ea472ff76dda783 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 1 Nov 2018 13:25:57 +0100 Subject: [PATCH 09/14] Fixed docs --- docs/reference/ccr/apis/follow/get-follow-stats.asciidoc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/reference/ccr/apis/follow/get-follow-stats.asciidoc b/docs/reference/ccr/apis/follow/get-follow-stats.asciidoc index 0efa156b95a49..2cbc0a8ccb3bd 100644 --- a/docs/reference/ccr/apis/follow/get-follow-stats.asciidoc +++ b/docs/reference/ccr/apis/follow/get-follow-stats.asciidoc @@ -111,6 +111,9 @@ The `shards` array consists of objects containing the following fields: `indices[].shards[].follower_mapping_version`:: (long) the mapping version the follower is synced up to +`indices[].shards[].follower_settings_version`:: + (long) the index settings version the follower is synced up to + `indices[].shards[].total_read_time_millis`:: (long) the total time reads were outstanding, measured from the time a read was sent to the leader to the time a reply was returned to the follower @@ -206,6 +209,7 @@ The API returns the following results: "outstanding_write_requests" : 2, "write_buffer_operation_count" : 64, "follower_mapping_version" : 4, + "follower_settings_version" : 2, "total_read_time_millis" : 32768, "total_read_remote_exec_time_millis" : 16384, "successful_read_requests" : 32, @@ -234,6 +238,7 @@ The API returns the following results: // TESTRESPONSE[s/"outstanding_write_requests" : 2/"outstanding_write_requests" : $body.indices.0.shards.0.outstanding_write_requests/] // TESTRESPONSE[s/"write_buffer_operation_count" : 64/"write_buffer_operation_count" : $body.indices.0.shards.0.write_buffer_operation_count/] // TESTRESPONSE[s/"follower_mapping_version" : 4/"follower_mapping_version" : $body.indices.0.shards.0.follower_mapping_version/] +// TESTRESPONSE[s/"follower_settings_version" : 2/"follower_settings_version" : $body.indices.0.shards.0.follower_settings_version/] // TESTRESPONSE[s/"total_read_time_millis" : 32768/"total_read_time_millis" : $body.indices.0.shards.0.total_read_time_millis/] // TESTRESPONSE[s/"total_read_remote_exec_time_millis" : 16384/"total_read_remote_exec_time_millis" : $body.indices.0.shards.0.total_read_remote_exec_time_millis/] // TESTRESPONSE[s/"successful_read_requests" : 32/"successful_read_requests" : $body.indices.0.shards.0.successful_read_requests/] From e66bcae1a594ab899bb02919e1b08bc46b9215f2 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 6 Nov 2018 14:59:19 +0100 Subject: [PATCH 10/14] changed `PersistentTaskPlugin#getPersistentTasksExecutor()`` to use `SettingsModule` instead of `IndexScopedSettings` --- server/src/main/java/org/elasticsearch/node/Node.java | 2 +- .../java/org/elasticsearch/plugins/PersistentTaskPlugin.java | 4 ++-- .../elasticsearch/persistent/TestPersistentTasksPlugin.java | 4 ++-- .../ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java | 4 +++- .../xpack/core/LocalStateCompositeXPackPlugin.java | 5 +++-- .../java/org/elasticsearch/xpack/ml/MachineLearning.java | 3 ++- .../src/main/java/org/elasticsearch/xpack/rollup/Rollup.java | 3 ++- 7 files changed, 15 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 24ec97a012628..dc0f421afdd64 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -492,7 +492,7 @@ protected Node( final IndexScopedSettings indexScopedSettings = settingsModule.getIndexScopedSettings(); final List> tasksExecutors = pluginsService .filterPlugins(PersistentTaskPlugin.class).stream() - .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, indexScopedSettings)) + .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule)) .flatMap(List::stream) .collect(toList()); diff --git a/server/src/main/java/org/elasticsearch/plugins/PersistentTaskPlugin.java b/server/src/main/java/org/elasticsearch/plugins/PersistentTaskPlugin.java index c96b5aa1f5c03..7c383f752071a 100644 --- a/server/src/main/java/org/elasticsearch/plugins/PersistentTaskPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/PersistentTaskPlugin.java @@ -20,7 +20,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.threadpool.ThreadPool; @@ -38,7 +38,7 @@ public interface PersistentTaskPlugin { default List> getPersistentTasksExecutor(ClusterService clusterService, ThreadPool threadPool, Client client, - IndexScopedSettings indexScopedSettings) { + SettingsModule settingsModule) { return Collections.emptyList(); } diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index c03ca5a20d61e..085a1cc264dd2 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -44,7 +44,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -92,7 +92,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P public List> getPersistentTasksExecutor(ClusterService clusterService, ThreadPool threadPool, Client client, - IndexScopedSettings indexScopedSettings) { + SettingsModule settingsModule) { return Collections.singletonList(new TestPersistentTasksExecutor(clusterService)); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index b3428ed34a455..ec196d637e1a9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; @@ -151,7 +152,8 @@ public Collection createComponents( public List> getPersistentTasksExecutor(ClusterService clusterService, ThreadPool threadPool, Client client, - IndexScopedSettings indexScopedSettings) { + SettingsModule settingsModule) { + IndexScopedSettings indexScopedSettings = settingsModule.getIndexScopedSettings(); return Collections.singletonList(new ShardFollowTasksExecutor(client, threadPool, clusterService, indexScopedSettings)); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index eb184d44a8b5c..92427fb92c439 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -385,9 +386,9 @@ public BiConsumer getJoinValidator() { public List> getPersistentTasksExecutor(ClusterService clusterService, ThreadPool threadPool, Client client, - IndexScopedSettings indexScopedSettings) { + SettingsModule settingsModule) { return filterPlugins(PersistentTaskPlugin.class).stream() - .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, indexScopedSettings)) + .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule)) .flatMap(List::stream) .collect(toList()); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 31f550fd45332..233e697407ff6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -433,7 +434,7 @@ public Collection createComponents(Client client, ClusterService cluster public List> getPersistentTasksExecutor(ClusterService clusterService, ThreadPool threadPool, Client client, - IndexScopedSettings indexScopedSettings) { + SettingsModule settingsModule) { if (enabled == false || transportClientMode) { return emptyList(); } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java index ca458e61606e2..f3c6f6df85740 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; @@ -191,7 +192,7 @@ public List> getExecutorBuilders(Settings settings) { public List> getPersistentTasksExecutor(ClusterService clusterService, ThreadPool threadPool, Client client, - IndexScopedSettings indexScopedSettings) { + SettingsModule settingsModule) { if (enabled == false || transportClientMode ) { return emptyList(); } From e3a41bc403920e48ab9ab1a112e6df2f988049bd Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 6 Nov 2018 15:01:39 +0100 Subject: [PATCH 11/14] iter --- .../ccr/action/ShardFollowTasksExecutor.java | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 6cef9689409b4..79de49c7a28f2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -170,16 +170,9 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum s -> existingSettings.get(s) == null || existingSettings.get(s).equals(settings.get(s)) == false ); - // Figure out whether the updated settings are all dynamic settings: - boolean onlyDynamicSettings = true; - for (String key : updatedSettings.keySet()) { - if (indexScopedSettings.isDynamicSetting(key) == false) { - onlyDynamicSettings = false; - break; - } - } - - if (onlyDynamicSettings) { + // Figure out whether the updated settings are all dynamic settings and + // if so just update the follower index's settings: + if (updatedSettings.keySet().stream().allMatch(indexScopedSettings::isDynamicSetting)) { // If only dynamic settings have been updated then just update these settings in follower index: final UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(followIndex.getName()); updateSettingsRequest.settings(updatedSettings); @@ -189,29 +182,37 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum // If one or more setting are not dynamic then close follow index, update leader settings and // then open leader index: Runnable handler = () -> finalHandler.accept(leaderIMD.getSettingsVersion()); - closeIndex(followIndex.getName(), updatedSettings, handler, errorHandler); + closeIndexUpdateSettingsAndOpenIndex(followIndex.getName(), updatedSettings, handler, errorHandler); } } }; leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler)); } - private void closeIndex(String followIndex, Settings updatedSettings, Runnable handler, Consumer onFailure) { + private void closeIndexUpdateSettingsAndOpenIndex(String followIndex, + Settings updatedSettings, + Runnable handler, + Consumer onFailure) { CloseIndexRequest closeRequest = new CloseIndexRequest(followIndex); CheckedConsumer onResponse = response -> { - updateSettings(followIndex, updatedSettings, handler, onFailure); + updateSettingsAndOpenIndex(followIndex, updatedSettings, handler, onFailure); }; followerClient.admin().indices().close(closeRequest, ActionListener.wrap(onResponse, onFailure)); } - private void updateSettings(String followIndex, Settings updatedSettings, Runnable handler, Consumer onFailure) { + private void updateSettingsAndOpenIndex(String followIndex, + Settings updatedSettings, + Runnable handler, + Consumer onFailure) { final UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(followIndex); updateSettingsRequest.settings(updatedSettings); CheckedConsumer onResponse = response -> openIndex(followIndex, handler, onFailure); followerClient.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(onResponse, onFailure)); } - private void openIndex(String followIndex, Runnable handler, Consumer onFailure) { + private void openIndex(String followIndex, + Runnable handler, + Consumer onFailure) { OpenIndexRequest openIndexRequest = new OpenIndexRequest(followIndex); CheckedConsumer onResponse = response -> handler.run(); followerClient.admin().indices().open(openIndexRequest, ActionListener.wrap(onResponse, onFailure)); From 64a95928195c2936532fd6ad983cefa1d372f2b7 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 6 Nov 2018 15:09:52 +0100 Subject: [PATCH 12/14] removed unused variable --- server/src/main/java/org/elasticsearch/node/Node.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index dc0f421afdd64..2b7a78eef5411 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -489,7 +489,6 @@ protected Node( threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(), responseCollectorService); - final IndexScopedSettings indexScopedSettings = settingsModule.getIndexScopedSettings(); final List> tasksExecutors = pluginsService .filterPlugins(PersistentTaskPlugin.class).stream() .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule)) From 1c2e8da97aa3aed112cc688d8d4332c567d27f58 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 6 Nov 2018 15:20:23 +0100 Subject: [PATCH 13/14] fixed checkstyle violation --- server/src/main/java/org/elasticsearch/node/Node.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 2b7a78eef5411..28d559b15df6d 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -71,7 +71,6 @@ import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.SettingUpgrader; From 4eb9807c4e877a8102994e8f4807287b6dfa37c7 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 7 Nov 2018 09:47:12 +0100 Subject: [PATCH 14/14] renamed test --- .../java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 5c00d9117bc3c..cd6163dc948c9 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -610,7 +610,10 @@ public void testUpdateDynamicLeaderIndexSettings() throws Exception { assertThat(hasFollowIndexBeenClosedChecker.getAsBoolean(), is(false)); } - public void testUpdateWhiteListedLeaderIndexSettings() throws Exception { + public void testLeaderIndexSettingNotPercolatedToFollower() throws Exception { + // Sets an index setting on leader index that is excluded from being replicated to the follower index and + // expects that this setting is not replicated to the follower index, but does expect that the settings version + // is incremented. final String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(leaderClient().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON));