diff --git a/docs/reference/ccr/apis/follow/get-follow-stats.asciidoc b/docs/reference/ccr/apis/follow/get-follow-stats.asciidoc index 0efa156b95a49..2cbc0a8ccb3bd 100644 --- a/docs/reference/ccr/apis/follow/get-follow-stats.asciidoc +++ b/docs/reference/ccr/apis/follow/get-follow-stats.asciidoc @@ -111,6 +111,9 @@ The `shards` array consists of objects containing the following fields: `indices[].shards[].follower_mapping_version`:: (long) the mapping version the follower is synced up to +`indices[].shards[].follower_settings_version`:: + (long) the index settings version the follower is synced up to + `indices[].shards[].total_read_time_millis`:: (long) the total time reads were outstanding, measured from the time a read was sent to the leader to the time a reply was returned to the follower @@ -206,6 +209,7 @@ The API returns the following results: "outstanding_write_requests" : 2, "write_buffer_operation_count" : 64, "follower_mapping_version" : 4, + "follower_settings_version" : 2, "total_read_time_millis" : 32768, "total_read_remote_exec_time_millis" : 16384, "successful_read_requests" : 32, @@ -234,6 +238,7 @@ The API returns the following results: // TESTRESPONSE[s/"outstanding_write_requests" : 2/"outstanding_write_requests" : $body.indices.0.shards.0.outstanding_write_requests/] // TESTRESPONSE[s/"write_buffer_operation_count" : 64/"write_buffer_operation_count" : $body.indices.0.shards.0.write_buffer_operation_count/] // TESTRESPONSE[s/"follower_mapping_version" : 4/"follower_mapping_version" : $body.indices.0.shards.0.follower_mapping_version/] +// TESTRESPONSE[s/"follower_settings_version" : 2/"follower_settings_version" : $body.indices.0.shards.0.follower_settings_version/] // TESTRESPONSE[s/"total_read_time_millis" : 32768/"total_read_time_millis" : $body.indices.0.shards.0.total_read_time_millis/] // TESTRESPONSE[s/"total_read_remote_exec_time_millis" : 16384/"total_read_remote_exec_time_millis" : $body.indices.0.shards.0.total_read_remote_exec_time_millis/] // TESTRESPONSE[s/"successful_read_requests" : 32/"successful_read_requests" : $body.indices.0.shards.0.successful_read_requests/] diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 3546e770c99df..f3433dfa1ba1a 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -493,7 +493,7 @@ protected Node( final List> tasksExecutors = pluginsService .filterPlugins(PersistentTaskPlugin.class).stream() - .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client)) + .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule)) .flatMap(List::stream) .collect(toList()); diff --git a/server/src/main/java/org/elasticsearch/plugins/PersistentTaskPlugin.java b/server/src/main/java/org/elasticsearch/plugins/PersistentTaskPlugin.java index 5e3319a2bc5cd..7c383f752071a 100644 --- a/server/src/main/java/org/elasticsearch/plugins/PersistentTaskPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/PersistentTaskPlugin.java @@ -20,6 +20,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.threadpool.ThreadPool; @@ -35,7 +36,9 @@ public interface PersistentTaskPlugin { * Returns additional persistent tasks executors added by this plugin. */ default List> getPersistentTasksExecutor(ClusterService clusterService, - ThreadPool threadPool, Client client) { + ThreadPool threadPool, + Client client, + SettingsModule settingsModule) { return Collections.emptyList(); } diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index 7722a8ad66527..085a1cc264dd2 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -89,7 +90,9 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P @Override public List> getPersistentTasksExecutor(ClusterService clusterService, - ThreadPool threadPool, Client client) { + ThreadPool threadPool, + Client client, + SettingsModule settingsModule) { return Collections.singletonList(new TestPersistentTasksExecutor(clusterService)); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 085d58bce83da..ec196d637e1a9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; @@ -149,8 +150,11 @@ public Collection createComponents( @Override public List> getPersistentTasksExecutor(ClusterService clusterService, - ThreadPool threadPool, Client client) { - return Collections.singletonList(new ShardFollowTasksExecutor(client, threadPool, clusterService)); + ThreadPool threadPool, + Client client, + SettingsModule settingsModule) { + IndexScopedSettings indexScopedSettings = settingsModule.getIndexScopedSettings(); + return Collections.singletonList(new ShardFollowTasksExecutor(client, threadPool, clusterService, indexScopedSettings)); } public List> getActions() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 8dacb7c745444..9274708f22623 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.support.single.shard.SingleShardRequest; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.service.ClusterService; @@ -205,6 +206,12 @@ public long getMappingVersion() { return mappingVersion; } + private long settingsVersion; + + public long getSettingsVersion() { + return settingsVersion; + } + private long globalCheckpoint; public long getGlobalCheckpoint() { @@ -240,6 +247,7 @@ public long getTookInMillis() { Response( final long mappingVersion, + final long settingsVersion, final long globalCheckpoint, final long maxSeqNo, final long maxSeqNoOfUpdatesOrDeletes, @@ -247,6 +255,7 @@ public long getTookInMillis() { final long tookInMillis) { this.mappingVersion = mappingVersion; + this.settingsVersion = settingsVersion; this.globalCheckpoint = globalCheckpoint; this.maxSeqNo = maxSeqNo; this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; @@ -258,6 +267,7 @@ public long getTookInMillis() { public void readFrom(final StreamInput in) throws IOException { super.readFrom(in); mappingVersion = in.readVLong(); + settingsVersion = in.readVLong(); globalCheckpoint = in.readZLong(); maxSeqNo = in.readZLong(); maxSeqNoOfUpdatesOrDeletes = in.readZLong(); @@ -269,6 +279,7 @@ public void readFrom(final StreamInput in) throws IOException { public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); out.writeVLong(mappingVersion); + out.writeVLong(settingsVersion); out.writeZLong(globalCheckpoint); out.writeZLong(maxSeqNo); out.writeZLong(maxSeqNoOfUpdatesOrDeletes); @@ -282,6 +293,7 @@ public boolean equals(final Object o) { if (o == null || getClass() != o.getClass()) return false; final Response that = (Response) o; return mappingVersion == that.mappingVersion && + settingsVersion == that.settingsVersion && globalCheckpoint == that.globalCheckpoint && maxSeqNo == that.maxSeqNo && maxSeqNoOfUpdatesOrDeletes == that.maxSeqNoOfUpdatesOrDeletes && @@ -291,8 +303,14 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes, - Arrays.hashCode(operations), tookInMillis); + return Objects.hash( + mappingVersion, + settingsVersion, + globalCheckpoint, + maxSeqNo, + maxSeqNoOfUpdatesOrDeletes, + Arrays.hashCode(operations), + tookInMillis); } } @@ -317,7 +335,9 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); final IndexShard indexShard = indexService.getShard(request.getShard().id()); final SeqNoStats seqNoStats = indexShard.seqNoStats(); - final long mappingVersion = clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); + final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex()); + final long mappingVersion = indexMetaData.getMappingVersion(); + final long settingsVersion = indexMetaData.getSettingsVersion(); final Translog.Operation[] operations = getOperations( indexShard, @@ -328,7 +348,13 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc request.getMaxBatchSize()); // must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations. final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); - return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations, request.relativeStartNanos); + return getResponse( + mappingVersion, + settingsVersion, + seqNoStats, + maxSeqNoOfUpdatesOrDeletes, + operations, + request.relativeStartNanos); } @Override @@ -404,12 +430,19 @@ private void globalCheckpointAdvancementFailure( e); if (e instanceof TimeoutException) { try { - final long mappingVersion = - clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); + final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex()); + final long mappingVersion = indexMetaData.getMappingVersion(); + final long settingsVersion = indexMetaData.getSettingsVersion(); final SeqNoStats latestSeqNoStats = indexShard.seqNoStats(); final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); - listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY, - request.relativeStartNanos)); + listener.onResponse( + getResponse( + mappingVersion, + settingsVersion, + latestSeqNoStats, + maxSeqNoOfUpdatesOrDeletes, + EMPTY_OPERATIONS_ARRAY, + request.relativeStartNanos)); } catch (final Exception caught) { caught.addSuppressed(e); listener.onFailure(caught); @@ -494,12 +527,23 @@ static Translog.Operation[] getOperations( return operations.toArray(EMPTY_OPERATIONS_ARRAY); } - static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats, - final long maxSeqNoOfUpdates, final Translog.Operation[] operations, long relativeStartNanos) { + static Response getResponse( + final long mappingVersion, + final long settingsVersion, + final SeqNoStats seqNoStats, + final long maxSeqNoOfUpdates, + final Translog.Operation[] operations, + long relativeStartNanos) { long tookInNanos = System.nanoTime() - relativeStartNanos; long tookInMillis = TimeUnit.NANOSECONDS.toMillis(tookInNanos); - return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates, - operations, tookInMillis); + return new Response( + mappingVersion, + settingsVersion, + seqNoStats.getGlobalCheckpoint(), + seqNoStats.getMaxSeqNo(), + maxSeqNoOfUpdates, + operations, + tookInMillis); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 13dc736722e99..f213d5a4999f1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -20,8 +20,8 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.translog.Translog; @@ -74,6 +74,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private int numOutstandingReads = 0; private int numOutstandingWrites = 0; private long currentMappingVersion = 0; + private long currentSettingsVersion = 0; private long totalReadRemoteExecTimeMillis = 0; private long totalReadTimeMillis = 0; private long successfulReadRequests = 0; @@ -134,9 +135,19 @@ void start( synchronized (ShardFollowNodeTask.this) { currentMappingVersion = followerMappingVersion; } - LOGGER.info("{} Started to follow leader shard {}, followGlobalCheckPoint={}, followerMappingVersion={}", - params.getFollowShardId(), params.getLeaderShardId(), followerGlobalCheckpoint, followerMappingVersion); - coordinateReads(); + updateSettings(followerSettingsVersion -> { + synchronized (ShardFollowNodeTask.this) { + currentSettingsVersion = followerSettingsVersion; + } + LOGGER.info( + "{} following leader shard {}, follower global checkpoint=[{}], mapping version=[{}], settings version=[{}]", + params.getFollowShardId(), + params.getLeaderShardId(), + followerGlobalCheckpoint, + followerMappingVersion, + followerSettingsVersion); + coordinateReads(); + }); }); } @@ -269,7 +280,15 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR } void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) { - maybeUpdateMapping(response.getMappingVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response)); + // In order to process this read response (3), we need to check and potentially update the follow index's setting (1) and + // check and potentially update the follow index's mappings (2). + + // 3) handle read response: + Runnable handleResponseTask = () -> innerHandleReadResponse(from, maxRequiredSeqNo, response); + // 2) update follow index mapping: + Runnable updateMappingsTask = () -> maybeUpdateMapping(response.getMappingVersion(), handleResponseTask); + // 1) update follow index settings: + maybeUpdateSettings(response.getSettingsVersion(), updateMappingsTask); } /** Called when some operations are fetched from the leading */ @@ -367,6 +386,21 @@ private synchronized void maybeUpdateMapping(Long minimumRequiredMappingVersion, } } + private synchronized void maybeUpdateSettings(final Long minimumRequiredSettingsVersion, Runnable task) { + if (currentSettingsVersion >= minimumRequiredSettingsVersion) { + LOGGER.trace("{} settings version [{}] is higher or equal than minimum required mapping version [{}]", + params.getFollowShardId(), currentSettingsVersion, minimumRequiredSettingsVersion); + task.run(); + } else { + LOGGER.trace("{} updating settings, settings version [{}] is lower than minimum required settings version [{}]", + params.getFollowShardId(), currentSettingsVersion, minimumRequiredSettingsVersion); + updateSettings(settingsVersion -> { + currentSettingsVersion = settingsVersion; + task.run(); + }); + } + } + private void updateMapping(LongConsumer handler) { updateMapping(handler, new AtomicInteger(0)); } @@ -375,6 +409,14 @@ private void updateMapping(LongConsumer handler, AtomicInteger retryCounter) { innerUpdateMapping(handler, e -> handleFailure(e, retryCounter, () -> updateMapping(handler, retryCounter))); } + private void updateSettings(final LongConsumer handler) { + updateSettings(handler, new AtomicInteger(0)); + } + + private void updateSettings(final LongConsumer handler, final AtomicInteger retryCounter) { + innerUpdateSettings(handler, e -> handleFailure(e, retryCounter, () -> updateSettings(handler, retryCounter))); + } + private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) { assert e != null; if (shouldRetry(e) && isStopped() == false) { @@ -424,6 +466,8 @@ static boolean shouldRetry(Exception e) { // These methods are protected for testing purposes: protected abstract void innerUpdateMapping(LongConsumer handler, Consumer errorHandler); + protected abstract void innerUpdateSettings(LongConsumer handler, Consumer errorHandler); + protected abstract void innerSendBulkShardOperationsRequest(String followerHistoryUUID, List operations, long leaderMaxSeqNoOfUpdatesOrDeletes, @@ -470,6 +514,7 @@ public synchronized ShardFollowNodeTaskStatus getStatus() { buffer.size(), bufferSizeInBytes, currentMappingVersion, + currentSettingsVersion, totalReadTimeMillis, totalReadRemoteExecTimeMillis, successfulReadRequests, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 446e3aaee41d3..79de49c7a28f2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -8,16 +8,25 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; +import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.XContentType; @@ -56,12 +65,17 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor erro }, errorHandler)); } + @Override + protected void innerUpdateSettings(final LongConsumer finalHandler, final Consumer errorHandler) { + final Index leaderIndex = params.getLeaderShardId().getIndex(); + final Index followIndex = params.getFollowShardId().getIndex(); + + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + clusterStateRequest.indices(leaderIndex.getName()); + + CheckedConsumer onResponse = clusterStateResponse -> { + final IndexMetaData leaderIMD = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); + final IndexMetaData followerIMD = clusterService.state().metaData().getIndexSafe(followIndex); + + final Settings existingSettings = TransportResumeFollowAction.filter(followerIMD.getSettings()); + final Settings settings = TransportResumeFollowAction.filter(leaderIMD.getSettings()); + if (existingSettings.equals(settings)) { + // If no settings have been changed then just propagate settings version to shard follow node task: + finalHandler.accept(leaderIMD.getSettingsVersion()); + } else { + // Figure out which settings have been updated: + final Settings updatedSettings = settings.filter( + s -> existingSettings.get(s) == null || existingSettings.get(s).equals(settings.get(s)) == false + ); + + // Figure out whether the updated settings are all dynamic settings and + // if so just update the follower index's settings: + if (updatedSettings.keySet().stream().allMatch(indexScopedSettings::isDynamicSetting)) { + // If only dynamic settings have been updated then just update these settings in follower index: + final UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(followIndex.getName()); + updateSettingsRequest.settings(updatedSettings); + followerClient.admin().indices().updateSettings(updateSettingsRequest, + ActionListener.wrap(response -> finalHandler.accept(leaderIMD.getSettingsVersion()), errorHandler)); + } else { + // If one or more setting are not dynamic then close follow index, update leader settings and + // then open leader index: + Runnable handler = () -> finalHandler.accept(leaderIMD.getSettingsVersion()); + closeIndexUpdateSettingsAndOpenIndex(followIndex.getName(), updatedSettings, handler, errorHandler); + } + } + }; + leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler)); + } + + private void closeIndexUpdateSettingsAndOpenIndex(String followIndex, + Settings updatedSettings, + Runnable handler, + Consumer onFailure) { + CloseIndexRequest closeRequest = new CloseIndexRequest(followIndex); + CheckedConsumer onResponse = response -> { + updateSettingsAndOpenIndex(followIndex, updatedSettings, handler, onFailure); + }; + followerClient.admin().indices().close(closeRequest, ActionListener.wrap(onResponse, onFailure)); + } + + private void updateSettingsAndOpenIndex(String followIndex, + Settings updatedSettings, + Runnable handler, + Consumer onFailure) { + final UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(followIndex); + updateSettingsRequest.settings(updatedSettings); + CheckedConsumer onResponse = response -> openIndex(followIndex, handler, onFailure); + followerClient.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(onResponse, onFailure)); + } + + private void openIndex(String followIndex, + Runnable handler, + Consumer onFailure) { + OpenIndexRequest openIndexRequest = new OpenIndexRequest(followIndex); + CheckedConsumer onResponse = response -> handler.run(); + followerClient.admin().indices().open(openIndexRequest, ActionListener.wrap(onResponse, onFailure)); + } + @Override protected void innerSendBulkShardOperationsRequest( final String followerHistoryUUID, @@ -141,8 +228,7 @@ protected void innerSendBulkShardOperationsRequest( final BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(), followerHistoryUUID, operations, maxSeqNoOfUpdatesOrDeletes); - followerClient.execute(BulkShardOperationsAction.INSTANCE, request, - ActionListener.wrap(response -> handler.accept(response), errorHandler)); + followerClient.execute(BulkShardOperationsAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler)); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java index 3164c2522fd7e..3583167fbb07f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java @@ -385,10 +385,11 @@ static String[] extractLeaderShardHistoryUUIDs(Map ccrIndexMetaD WHITE_LISTED_SETTINGS = Collections.unmodifiableSet(whiteListedSettings); } - private static Settings filter(Settings originalSettings) { + static Settings filter(Settings originalSettings) { Settings.Builder settings = Settings.builder().put(originalSettings); // Remove settings that are always going to be different between leader and follow index: settings.remove(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey()); + settings.remove(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey()); settings.remove(IndexMetaData.SETTING_INDEX_UUID); settings.remove(IndexMetaData.SETTING_INDEX_PROVIDED_NAME); settings.remove(IndexMetaData.SETTING_CREATION_DATE); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 45360041c4d27..39975d5c29174 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -14,7 +14,12 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.bulk.BulkProcessor; @@ -27,6 +32,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; @@ -64,6 +70,8 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BooleanSupplier; +import java.util.stream.Collectors; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -585,6 +593,227 @@ public void testLeaderIndexRed() throws Exception { } } + public void testUpdateDynamicLeaderIndexSettings() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 0, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("leader"); + + final PutFollowAction.Request followRequest = putFollow("leader", "follower"); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + BooleanSupplier hasFollowIndexBeenClosedChecker = hasFollowIndexBeenClosed("follower"); + + final long firstBatchNumDocs = randomIntBetween(2, 64); + for (long i = 0; i < firstBatchNumDocs; i++) { + leaderClient().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get(); + } + assertBusy(() -> assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits, equalTo(firstBatchNumDocs))); + + // Sanity check that the setting has not been set in follower index: + { + GetSettingsRequest getSettingsRequest = new GetSettingsRequest(); + getSettingsRequest.indices("follower"); + GetSettingsResponse getSettingsResponse = followerClient().admin().indices().getSettings(getSettingsRequest).actionGet(); + assertThat(getSettingsResponse.getSetting("follower", "index.max_ngram_diff"), nullValue()); + } + assertThat(getFollowTaskSettingsVersion("follower"), equalTo(1L)); + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest("leader"); + updateSettingsRequest.settings(Settings.builder().put("index.max_ngram_diff", 2)); + assertAcked(leaderClient().admin().indices().updateSettings(updateSettingsRequest).actionGet()); + + final int secondBatchNumDocs = randomIntBetween(2, 64); + for (long i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { + leaderClient().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get(); + } + assertBusy(() -> { + // Check that the setting has been set in follower index: + GetSettingsRequest getSettingsRequest = new GetSettingsRequest(); + getSettingsRequest.indices("follower"); + GetSettingsResponse getSettingsResponse = followerClient().admin().indices().getSettings(getSettingsRequest).actionGet(); + assertThat(getSettingsResponse.getSetting("follower", "index.max_ngram_diff"), equalTo("2")); + assertThat(getFollowTaskSettingsVersion("follower"), equalTo(2L)); + + try { + assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits, + equalTo(firstBatchNumDocs + secondBatchNumDocs)); + } catch (Exception e) { + throw new AssertionError("error while searching", e); + } + }); + assertThat(hasFollowIndexBeenClosedChecker.getAsBoolean(), is(false)); + } + + public void testLeaderIndexSettingNotPercolatedToFollower() throws Exception { + // Sets an index setting on leader index that is excluded from being replicated to the follower index and + // expects that this setting is not replicated to the follower index, but does expect that the settings version + // is incremented. + final String leaderIndexSettings = getIndexSettings(1, 0, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("leader"); + + final PutFollowAction.Request followRequest = putFollow("leader", "follower"); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + BooleanSupplier hasFollowIndexBeenClosedChecker = hasFollowIndexBeenClosed("follower"); + + final long firstBatchNumDocs = randomIntBetween(2, 64); + for (long i = 0; i < firstBatchNumDocs; i++) { + leaderClient().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get(); + } + assertBusy(() -> assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits, equalTo(firstBatchNumDocs))); + + // Sanity check that the setting has not been set in follower index: + { + GetSettingsRequest getSettingsRequest = new GetSettingsRequest(); + getSettingsRequest.indices("follower"); + GetSettingsResponse getSettingsResponse = followerClient().admin().indices().getSettings(getSettingsRequest).actionGet(); + assertThat(getSettingsResponse.getSetting("follower", "index.number_of_replicas"), equalTo("0")); + } + assertThat(getFollowTaskSettingsVersion("follower"), equalTo(1L)); + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest("leader"); + updateSettingsRequest.settings(Settings.builder().put("index.number_of_replicas", 1)); + assertAcked(leaderClient().admin().indices().updateSettings(updateSettingsRequest).actionGet()); + + final int secondBatchNumDocs = randomIntBetween(2, 64); + for (long i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { + leaderClient().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get(); + } + assertBusy(() -> { + GetSettingsRequest getSettingsRequest = new GetSettingsRequest(); + getSettingsRequest.indices("follower"); + GetSettingsResponse getSettingsResponse = followerClient().admin().indices().getSettings(getSettingsRequest).actionGet(); + assertThat(getSettingsResponse.getSetting("follower", "index.number_of_replicas"), equalTo("0")); + assertThat(getFollowTaskSettingsVersion("follower"), equalTo(2L)); + + try { + assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits, + equalTo(firstBatchNumDocs + secondBatchNumDocs)); + } catch (Exception e) { + throw new AssertionError("error while searching", e); + } + }); + assertThat(hasFollowIndexBeenClosedChecker.getAsBoolean(), is(false)); + } + + public void testUpdateAnalysisLeaderIndexSettings() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 0, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("leader"); + + final PutFollowAction.Request followRequest = putFollow("leader", "follower"); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + BooleanSupplier hasFollowIndexBeenClosedChecker = hasFollowIndexBeenClosed("follower"); + + final long firstBatchNumDocs = randomIntBetween(2, 64); + for (long i = 0; i < firstBatchNumDocs; i++) { + leaderClient().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get(); + } + + assertBusy(() -> assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits, equalTo(firstBatchNumDocs))); + assertThat(getFollowTaskSettingsVersion("follower"), equalTo(1L)); + assertThat(getFollowTaskMappingVersion("follower"), equalTo(1L)); + + CloseIndexRequest closeIndexRequest = new CloseIndexRequest("leader"); + assertAcked(leaderClient().admin().indices().close(closeIndexRequest).actionGet()); + + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest("leader"); + updateSettingsRequest.settings(Settings.builder() + .put("index.analysis.analyzer.my_analyzer.type", "custom") + .put("index.analysis.analyzer.my_analyzer.tokenizer", "keyword") + ); + assertAcked(leaderClient().admin().indices().updateSettings(updateSettingsRequest).actionGet()); + + OpenIndexRequest openIndexRequest = new OpenIndexRequest("leader"); + assertAcked(leaderClient().admin().indices().open(openIndexRequest).actionGet()); + ensureLeaderGreen("leader"); + + PutMappingRequest putMappingRequest = new PutMappingRequest("leader"); + putMappingRequest.type("doc"); + putMappingRequest.source("new_field", "type=text,analyzer=my_analyzer"); + assertAcked(leaderClient().admin().indices().putMapping(putMappingRequest).actionGet()); + + final int secondBatchNumDocs = randomIntBetween(2, 64); + for (long i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"new_field\":\"value %d\"}", i); + leaderClient().prepareIndex("leader", "doc").setSource(source, XContentType.JSON).get(); + } + + assertBusy(() -> { + assertThat(getFollowTaskSettingsVersion("follower"), equalTo(2L)); + assertThat(getFollowTaskMappingVersion("follower"), equalTo(2L)); + + GetSettingsRequest getSettingsRequest = new GetSettingsRequest(); + getSettingsRequest.indices("follower"); + GetSettingsResponse getSettingsResponse = followerClient().admin().indices().getSettings(getSettingsRequest).actionGet(); + assertThat(getSettingsResponse.getSetting("follower", "index.analysis.analyzer.my_analyzer.type"), equalTo("custom")); + assertThat(getSettingsResponse.getSetting("follower", "index.analysis.analyzer.my_analyzer.tokenizer"), equalTo("keyword")); + + GetMappingsRequest getMappingsRequest = new GetMappingsRequest(); + getMappingsRequest.indices("follower"); + GetMappingsResponse getMappingsResponse = followerClient().admin().indices().getMappings(getMappingsRequest).actionGet(); + MappingMetaData mappingMetaData = getMappingsResponse.getMappings().get("follower").get("doc"); + assertThat(XContentMapValues.extractValue("properties.new_field.type", mappingMetaData.sourceAsMap()), equalTo("text")); + assertThat(XContentMapValues.extractValue("properties.new_field.analyzer", mappingMetaData.sourceAsMap()), + equalTo("my_analyzer")); + + try { + assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits, + equalTo(firstBatchNumDocs + secondBatchNumDocs)); + } catch (Exception e) { + throw new AssertionError("error while searching", e); + } + }); + assertThat(hasFollowIndexBeenClosedChecker.getAsBoolean(), is(true)); + } + + private long getFollowTaskSettingsVersion(String followerIndex) { + long settingsVersion = -1L; + for (ShardFollowNodeTaskStatus status : getFollowTaskStatuses(followerIndex)) { + if (settingsVersion == -1L) { + settingsVersion = status.followerSettingsVersion(); + } else { + assert settingsVersion == status.followerSettingsVersion(); + } + } + return settingsVersion; + } + + private long getFollowTaskMappingVersion(String followerIndex) { + long mappingVersion = -1L; + for (ShardFollowNodeTaskStatus status : getFollowTaskStatuses(followerIndex)) { + if (mappingVersion == -1L) { + mappingVersion = status.followerMappingVersion(); + } else { + assert mappingVersion == status.followerMappingVersion(); + } + } + return mappingVersion; + } + + private List getFollowTaskStatuses(String followerIndex) { + FollowStatsAction.StatsRequest request = new StatsRequest(); + request.setIndices(new String[]{followerIndex}); + FollowStatsAction.StatsResponses response = followerClient().execute(FollowStatsAction.INSTANCE, request).actionGet(); + return response.getStatsResponses().stream() + .map(FollowStatsAction.StatsResponse::status) + .filter(status -> status.followerIndex().equals(followerIndex)) + .collect(Collectors.toList()); + } + + private BooleanSupplier hasFollowIndexBeenClosed(String indexName) { + String electedMasterNode = getFollowerCluster().getMasterName(); + ClusterService clusterService = getFollowerCluster().getInstance(ClusterService.class, electedMasterNode); + AtomicBoolean closed = new AtomicBoolean(false); + clusterService.addListener(event -> { + IndexMetaData indexMetaData = event.state().metaData().index(indexName); + if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { + closed.set(true); + } + }); + return closed::get; + } + private CheckedRunnable assertTask(final int numberOfPrimaryShards, final Map numDocsPerShard) { return () -> { final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java index b9ac4fee3d23d..0e48fc8e57ca3 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java @@ -13,6 +13,7 @@ public class ShardChangesResponseTests extends AbstractStreamableTestCase fromToSlot = new HashMap<>(); @Override @@ -113,6 +115,11 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro handler.accept(mappingVersion); } + @Override + protected void innerUpdateSettings(LongConsumer handler, Consumer errorHandler) { + handler.accept(settingsVersion); + } + @Override protected void innerSendBulkShardOperationsRequest( String followerHistoryUUID, List operations, @@ -153,6 +160,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co // if too many invocations occur with the same from then AOBE occurs, this ok and then something is wrong. } mappingVersion = testResponse.mappingVersion; + settingsVersion = testResponse.settingsVersion; if (testResponse.exception != null) { errorHandler.accept(testResponse.exception); } else { @@ -162,8 +170,8 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co assert from >= testRun.finalExpectedGlobalCheckpoint; final long globalCheckpoint = tracker.getCheckpoint(); final long maxSeqNo = tracker.getMaxSeqNo(); - handler.accept(new ShardChangesAction.Response( - 0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(), new Translog.Operation[0], 1L)); + handler.accept(new ShardChangesAction.Response(0L, 0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(), + new Translog.Operation[0], 1L)); } }; threadPool.generic().execute(task); @@ -206,9 +214,10 @@ private void tearDown() { }; } - private static TestRun createTestRun(long startSeqNo, long startMappingVersion, int maxOperationCount) { + private static TestRun createTestRun(long startSeqNo, long startMappingVersion, long startSettingsVersion, int maxOperationCount) { long prevGlobalCheckpoint = startSeqNo; long mappingVersion = startMappingVersion; + long settingsVersion = startSettingsVersion; int numResponses = randomIntBetween(16, 256); Map> responses = new HashMap<>(numResponses); for (int i = 0; i < numResponses; i++) { @@ -216,13 +225,16 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, if (sometimes()) { mappingVersion++; } + if (sometimes()) { + settingsVersion++; + } if (sometimes()) { List item = new ArrayList<>(); // Sometimes add a random retryable error if (sometimes()) { Exception error = new UnavailableShardsException(new ShardId("test", "test", 0), ""); - item.add(new TestResponse(error, mappingVersion, null)); + item.add(new TestResponse(error, mappingVersion, settingsVersion, null)); } List ops = new ArrayList<>(); for (long seqNo = prevGlobalCheckpoint; seqNo <= nextGlobalCheckPoint; seqNo++) { @@ -233,8 +245,10 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, item.add(new TestResponse( null, mappingVersion, + settingsVersion, new ShardChangesAction.Response( mappingVersion, + settingsVersion, nextGlobalCheckPoint, nextGlobalCheckPoint, randomNonNegativeLong(), @@ -253,19 +267,20 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, // Sometimes add a random retryable error if (sometimes()) { Exception error = new UnavailableShardsException(new ShardId("test", "test", 0), ""); - item.add(new TestResponse(error, mappingVersion, null)); + item.add(new TestResponse(error, mappingVersion, settingsVersion, null)); } // Sometimes add an empty shard changes response to also simulate a leader shard lagging behind if (sometimes()) { ShardChangesAction.Response response = new ShardChangesAction.Response( mappingVersion, + settingsVersion, prevGlobalCheckpoint, prevGlobalCheckpoint, randomNonNegativeLong(), EMPTY, randomNonNegativeLong() ); - item.add(new TestResponse(null, mappingVersion, response)); + item.add(new TestResponse(null, mappingVersion, settingsVersion, response)); } List ops = new ArrayList<>(); for (long seqNo = fromSeqNo; seqNo <= toSeqNo; seqNo++) { @@ -277,13 +292,14 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, long localLeaderGCP = randomBoolean() ? ops.get(ops.size() - 1).seqNo() : toSeqNo; ShardChangesAction.Response response = new ShardChangesAction.Response( mappingVersion, + settingsVersion, localLeaderGCP, localLeaderGCP, randomNonNegativeLong(), ops.toArray(EMPTY), randomNonNegativeLong() ); - item.add(new TestResponse(null, mappingVersion, response)); + item.add(new TestResponse(null, mappingVersion, settingsVersion, response)); responses.put(fromSeqNo, Collections.unmodifiableList(item)); } } @@ -323,11 +339,13 @@ private static class TestResponse { final Exception exception; final long mappingVersion; + final long settingsVersion; final ShardChangesAction.Response response; - private TestResponse(Exception exception, long mappingVersion, ShardChangesAction.Response response) { + private TestResponse(Exception exception, long mappingVersion, long settingsVersion, ShardChangesAction.Response response) { this.exception = exception; this.mappingVersion = mappingVersion; + this.settingsVersion = settingsVersion; this.response = response; } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java index 95f8e86e09657..9cac01d278e74 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java @@ -58,6 +58,7 @@ protected ShardFollowNodeTaskStatus createTestInstance() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), randomReadExceptions(), randomLong(), randomBoolean() ? new ElasticsearchException("fatal error") : null); @@ -78,6 +79,7 @@ protected void assertEqualInstances(final ShardFollowNodeTaskStatus expectedInst assertThat(newInstance.outstandingWriteRequests(), equalTo(expectedInstance.outstandingWriteRequests())); assertThat(newInstance.writeBufferOperationCount(), equalTo(expectedInstance.writeBufferOperationCount())); assertThat(newInstance.followerMappingVersion(), equalTo(expectedInstance.followerMappingVersion())); + assertThat(newInstance.followerSettingsVersion(), equalTo(expectedInstance.followerSettingsVersion())); assertThat(newInstance.totalReadTimeMillis(), equalTo(expectedInstance.totalReadTimeMillis())); assertThat(newInstance.successfulReadRequests(), equalTo(expectedInstance.successfulReadRequests())); assertThat(newInstance.failedReadRequests(), equalTo(expectedInstance.failedReadRequests())); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index aeac0ac451806..cf7f901aaba33 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -57,6 +57,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { private Queue writeFailures; private Queue mappingUpdateFailures; private Queue mappingVersions; + private Queue settingsUpdateFailures; + private Queue settingsVersions; private Queue leaderGlobalCheckpoints; private Queue followerGlobalCheckpoints; private Queue maxSeqNos; @@ -73,7 +75,7 @@ public void testCoordinateReads() { task.coordinateReads(); assertThat(shardChangesRequests, contains(new long[]{0L, 8L})); // treat this a peak request shardChangesRequests.clear(); - task.innerHandleReadResponse(0, 5L, generateShardChangesResponse(0, 5L, 0L, 60L)); + task.innerHandleReadResponse(0, 5L, generateShardChangesResponse(0, 5L, 0L, 0L, 60L)); assertThat(shardChangesRequests, contains(new long[][]{ {6L, 8L}, {14L, 8L}, {22L, 8L}, {30L, 8L}, {38L, 8L}, {46L, 8L}, {54L, 7L}} )); @@ -98,7 +100,7 @@ public void testMaxWriteBufferCount() { shardChangesRequests.clear(); // Also invokes the coordinatesReads() method: - task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 128L)); + task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 128L)); assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer count limit has been reached ShardFollowNodeTaskStatus status = task.getStatus(); @@ -124,7 +126,7 @@ public void testMaxWriteBufferSize() { shardChangesRequests.clear(); // Also invokes the coordinatesReads() method: - task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 128L)); + task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 128L)); assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer size limit has been reached ShardFollowNodeTaskStatus status = task.getStatus(); @@ -189,7 +191,7 @@ public void testTaskCancelledAfterReadLimitHasBeenReached() { task.markAsCompleted(); shardChangesRequests.clear(); // Also invokes the coordinatesReads() method: - task.innerHandleReadResponse(0L, 15L, generateShardChangesResponse(0, 15, 0L, 31L)); + task.innerHandleReadResponse(0L, 15L, generateShardChangesResponse(0, 15, 0L, 0L, 31L)); assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because task has been cancelled assertThat(bulkShardOperationRequests.size(), equalTo(0)); // no more writes, because task has been cancelled @@ -219,7 +221,7 @@ public void testTaskCancelledAfterWriteBufferLimitHasBeenReached() { task.markAsCompleted(); shardChangesRequests.clear(); // Also invokes the coordinatesReads() method: - task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 128L)); + task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 128L)); assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because task has been cancelled assertThat(bulkShardOperationRequests.size(), equalTo(0)); // no more writes, because task has been cancelled @@ -439,7 +441,7 @@ public void testHandleReadResponse() { startTask(task, 63, -1); task.coordinateReads(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L); task.innerHandleReadResponse(0L, 63L, response); assertThat(bulkShardOperationRequests.size(), equalTo(1)); @@ -469,7 +471,7 @@ public void testReceiveLessThanRequested() { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); shardChangesRequests.clear(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 20, 0L, 31L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 20, 0L, 0L, 31L); task.innerHandleReadResponse(0L, 63L, response); assertThat(shardChangesRequests.size(), equalTo(1)); @@ -498,7 +500,7 @@ public void testCancelAndReceiveLessThanRequested() { shardChangesRequests.clear(); task.markAsCompleted(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 31L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 0L, 31L); task.innerHandleReadResponse(0L, 64L, response); assertThat(shardChangesRequests.size(), equalTo(0)); @@ -524,7 +526,7 @@ public void testReceiveNothingExpectedSomething() { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); shardChangesRequests.clear(); - task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 100, new Translog.Operation[0], 1L)); + task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 0, 100, new Translog.Operation[0], 1L)); assertThat(shardChangesRequests.size(), equalTo(1)); assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); @@ -547,7 +549,7 @@ public void testMappingUpdate() { mappingVersions.add(1L); task.coordinateReads(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 0L, 63L); task.handleReadResponse(0L, 63L, response); assertThat(bulkShardOperationRequests.size(), equalTo(1)); @@ -576,7 +578,7 @@ public void testMappingUpdateRetryableError() { } mappingVersions.add(1L); task.coordinateReads(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 0L, 63L); task.handleReadResponse(0L, 63L, response); assertThat(mappingUpdateFailures.size(), equalTo(0)); @@ -601,7 +603,7 @@ public void testMappingUpdateNonRetryableError() { mappingUpdateFailures.add(new RuntimeException()); task.coordinateReads(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 0L, 64L); task.handleReadResponse(0L, 64L, response); assertThat(bulkShardOperationRequests.size(), equalTo(0)); @@ -614,6 +616,85 @@ public void testMappingUpdateNonRetryableError() { assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); } + public void testSettingsUpdate() { + ShardFollowTaskParams params = new ShardFollowTaskParams(); + params.maxReadRequestOperationCount = 64; + params.maxOutstandingReadRequests = 1; + params.maxOutstandingWriteRequests = 1; + ShardFollowNodeTask task = createShardFollowTask(params); + startTask(task, 63, -1); + + settingsVersions.add(1L); + task.coordinateReads(); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 1L, 63L); + task.handleReadResponse(0L, 63L, response); + + assertThat(bulkShardOperationRequests.size(), equalTo(1)); + assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations()))); + + ShardFollowNodeTaskStatus status = task.getStatus(); + assertThat(status.followerMappingVersion(), equalTo(0L)); + assertThat(status.followerSettingsVersion(), equalTo(1L)); + assertThat(status.outstandingReadRequests(), equalTo(1)); + assertThat(status.outstandingWriteRequests(), equalTo(1)); + assertThat(status.lastRequestedSeqNo(), equalTo(63L)); + assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); + assertThat(status.followerGlobalCheckpoint(), equalTo(-1L)); + } + + public void testSettingsUpdateRetryableError() { + ShardFollowTaskParams params = new ShardFollowTaskParams(); + params.maxReadRequestOperationCount = 64; + params.maxOutstandingReadRequests = 1; + params.maxOutstandingWriteRequests = 1; + ShardFollowNodeTask task = createShardFollowTask(params); + startTask(task, 63, -1); + + int max = randomIntBetween(1, 30); + for (int i = 0; i < max; i++) { + settingsUpdateFailures.add(new ConnectException()); + } + settingsVersions.add(1L); + task.coordinateReads(); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 1L, 63L); + task.handleReadResponse(0L, 63L, response); + + assertThat(mappingUpdateFailures.size(), equalTo(0)); + assertThat(bulkShardOperationRequests.size(), equalTo(1)); + assertThat(task.isStopped(), equalTo(false)); + ShardFollowNodeTaskStatus status = task.getStatus(); + assertThat(status.followerMappingVersion(), equalTo(0L)); + assertThat(status.followerSettingsVersion(), equalTo(1L)); + assertThat(status.outstandingReadRequests(), equalTo(1)); + assertThat(status.outstandingWriteRequests(), equalTo(1)); + assertThat(status.lastRequestedSeqNo(), equalTo(63L)); + assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); + } + + public void testSettingsUpdateNonRetryableError() { + ShardFollowTaskParams params = new ShardFollowTaskParams(); + params.maxReadRequestOperationCount = 64; + params.maxOutstandingReadRequests = 1; + params.maxOutstandingWriteRequests = 1; + ShardFollowNodeTask task = createShardFollowTask(params); + startTask(task, 63, -1); + + settingsUpdateFailures.add(new RuntimeException()); + task.coordinateReads(); + ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 1L, 64L); + task.handleReadResponse(0L, 64L, response); + + assertThat(bulkShardOperationRequests.size(), equalTo(0)); + assertThat(task.isStopped(), equalTo(true)); + ShardFollowNodeTaskStatus status = task.getStatus(); + assertThat(status.followerMappingVersion(), equalTo(0L)); + assertThat(status.followerSettingsVersion(), equalTo(0L)); + assertThat(status.outstandingReadRequests(), equalTo(1)); + assertThat(status.outstandingWriteRequests(), equalTo(0)); + assertThat(status.lastRequestedSeqNo(), equalTo(63L)); + assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); + } + public void testCoordinateWrites() { ShardFollowTaskParams params = new ShardFollowTaskParams(); params.maxReadRequestOperationCount = 128; @@ -629,7 +710,7 @@ public void testCoordinateWrites() { assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); assertThat(shardChangesRequests.get(0)[1], equalTo(128L)); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 63L, response); @@ -649,7 +730,7 @@ public void testMaxOutstandingWrites() { params.maxWriteRequestOperationCount = 64; params.maxOutstandingWriteRequests = 2; ShardFollowNodeTask task = createShardFollowTask(params); - ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 256L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 0L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -662,7 +743,7 @@ public void testMaxOutstandingWrites() { params.maxOutstandingWriteRequests = 4; // change to 4 outstanding writers task = createShardFollowTask(params); - response = generateShardChangesResponse(0, 256, 0L, 256L); + response = generateShardChangesResponse(0, 256, 0L, 0L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -681,7 +762,7 @@ public void testMaxWriteRequestCount() { params.maxWriteRequestOperationCount = 8; params.maxOutstandingWriteRequests = 32; ShardFollowNodeTask task = createShardFollowTask(params); - ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 256L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 0L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -712,7 +793,7 @@ public void testRetryableError() { for (int i = 0; i < max; i++) { writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); } - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 63L, response); @@ -741,7 +822,7 @@ public void testNonRetryableError() { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); writeFailures.add(new RuntimeException()); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 63L, response); @@ -768,7 +849,7 @@ public void testMaxWriteRequestSize() { assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 64L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 64L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -791,7 +872,7 @@ public void testHandleWriteResponse() { shardChangesRequests.clear(); followerGlobalCheckpoints.add(63L); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 63L, response); @@ -866,6 +947,8 @@ private ShardFollowNodeTask createShardFollowTask(ShardFollowTaskParams params) writeFailures = new LinkedList<>(); mappingUpdateFailures = new LinkedList<>(); mappingVersions = new LinkedList<>(); + settingsUpdateFailures = new LinkedList<>(); + settingsVersions = new LinkedList<>(); leaderGlobalCheckpoints = new LinkedList<>(); followerGlobalCheckpoints = new LinkedList<>(); maxSeqNos = new LinkedList<>(); @@ -887,6 +970,20 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro } } + @Override + protected void innerUpdateSettings(LongConsumer handler, Consumer errorHandler) { + Exception failure = settingsUpdateFailures.poll(); + if (failure != null) { + errorHandler.accept(failure); + return; + } + + final Long settingsVersion = settingsVersions.poll(); + if (settingsVersion != null) { + handler.accept(settingsVersion); + } + } + @Override protected void innerSendBulkShardOperationsRequest( String followerHistoryUUID, final List operations, @@ -924,6 +1021,7 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con } final ShardChangesAction.Response response = new ShardChangesAction.Response( mappingVersions.poll(), + 0L, leaderGlobalCheckpoints.poll(), maxSeqNos.poll(), randomNonNegativeLong(), @@ -946,7 +1044,10 @@ public void markAsCompleted() { }; } - private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, long toSeqNo, long mappingVersion, + private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, + long toSeqNo, + long mappingVersion, + long settingsVersion, long leaderGlobalCheckPoint) { List ops = new ArrayList<>(); for (long seqNo = fromSeqNo; seqNo <= toSeqNo; seqNo++) { @@ -956,6 +1057,7 @@ private static ShardChangesAction.Response generateShardChangesResponse(long fro } return new ShardChangesAction.Response( mappingVersion, + settingsVersion, leaderGlobalCheckPoint, leaderGlobalCheckPoint, randomNonNegativeLong(), diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index d2f09c3900dfd..17ce05ccff132 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -402,6 +402,12 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro handler.accept(1L); } + @Override + protected void innerUpdateSettings(LongConsumer handler, Consumer errorHandler) { + // no-op as settings updates are not tested here + handler.accept(1L); + } + @Override protected void innerSendBulkShardOperationsRequest( final String followerHistoryUUID, @@ -432,7 +438,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co final SeqNoStats seqNoStats = indexShard.seqNoStats(); final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); if (from > seqNoStats.getGlobalCheckpoint()) { - handler.accept(ShardChangesAction.getResponse(1L, seqNoStats, + handler.accept(ShardChangesAction.getResponse(1L, 1L, seqNoStats, maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY, 1L)); return; } @@ -440,6 +446,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co maxOperationCount, recordedLeaderIndexHistoryUUID, params.getMaxReadRequestSize()); // hard code mapping version; this is ok, as mapping updates are not tested here final ShardChangesAction.Response response = new ShardChangesAction.Response( + 1L, 1L, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java index 86851d98ffed2..c173404955826 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java @@ -55,6 +55,7 @@ static FollowStatsAction.StatsResponses createStatsResponse() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), Collections.emptyNavigableMap(), randomLong(), randomBoolean() ? new ElasticsearchException("fatal error") : null); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java index f3e0c2d5bd7b3..410d573e1b4c0 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java @@ -93,7 +93,8 @@ public void testToXContent() throws IOException { final int numberOfConcurrentWrites = randomIntBetween(1, Integer.MAX_VALUE); final int writeBufferOperationCount = randomIntBetween(0, Integer.MAX_VALUE); final long writeBufferSizeInBytes = randomNonNegativeLong(); - final long followerMappingVersion = randomIntBetween(0, Integer.MAX_VALUE); + final long followerMappingVersion = randomNonNegativeLong(); + final long followerSettingsVersion = randomNonNegativeLong(); final long totalReadTimeMillis = randomLongBetween(0, 4096); final long totalReadRemoteExecTimeMillis = randomLongBetween(0, 4096); final long successfulReadRequests = randomNonNegativeLong(); @@ -124,6 +125,7 @@ public void testToXContent() throws IOException { writeBufferOperationCount, writeBufferSizeInBytes, followerMappingVersion, + followerSettingsVersion, totalReadTimeMillis, totalReadRemoteExecTimeMillis, successfulReadRequests, @@ -170,6 +172,7 @@ public void testToXContent() throws IOException { + "\"write_buffer_operation_count\":" + writeBufferOperationCount + "," + "\"write_buffer_size_in_bytes\":" + writeBufferSizeInBytes + "," + "\"follower_mapping_version\":" + followerMappingVersion + "," + + "\"follower_settings_version\":" + followerSettingsVersion + "," + "\"total_read_time_millis\":" + totalReadTimeMillis + "," + "\"total_read_remote_exec_time_millis\":" + totalReadRemoteExecTimeMillis + "," + "\"successful_read_requests\":" + successfulReadRequests + "," @@ -214,6 +217,7 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException { 1, 1, 1, + 1, 100, 50, 10, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java index b8f645eea4430..33a5d495c1631 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java @@ -48,6 +48,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { private static final ParseField WRITE_BUFFER_OPERATION_COUNT_FIELD = new ParseField("write_buffer_operation_count"); private static final ParseField WRITE_BUFFER_SIZE_IN_BYTES_FIELD = new ParseField("write_buffer_size_in_bytes"); private static final ParseField FOLLOWER_MAPPING_VERSION_FIELD = new ParseField("follower_mapping_version"); + private static final ParseField FOLLOWER_SETTINGS_VERSION_FIELD = new ParseField("follower_settings_version"); private static final ParseField TOTAL_READ_TIME_MILLIS_FIELD = new ParseField("total_read_time_millis"); private static final ParseField TOTAL_READ_REMOTE_EXEC_TIME_MILLIS_FIELD = new ParseField("total_read_remote_exec_time_millis"); private static final ParseField SUCCESSFUL_READ_REQUESTS_FIELD = new ParseField("successful_read_requests"); @@ -91,12 +92,13 @@ public class ShardFollowNodeTaskStatus implements Task.Status { (long) args[21], (long) args[22], (long) args[23], + (long) args[24], new TreeMap<>( - ((List>>) args[24]) + ((List>>) args[25]) .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), - (long) args[25], - (ElasticsearchException) args[26])); + (long) args[26], + (ElasticsearchException) args[27])); public static final String READ_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-read-exceptions-entry"; @@ -120,6 +122,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), WRITE_BUFFER_OPERATION_COUNT_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), WRITE_BUFFER_SIZE_IN_BYTES_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAPPING_VERSION_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_SETTINGS_VERSION_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_READ_TIME_MILLIS_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_READ_REMOTE_EXEC_TIME_MILLIS_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), SUCCESSFUL_READ_REQUESTS_FIELD); @@ -234,6 +237,12 @@ public long followerMappingVersion() { return followerMappingVersion; } + private final long followerSettingsVersion; + + public long followerSettingsVersion() { + return followerSettingsVersion; + } + private final long totalReadTimeMillis; public long totalReadTimeMillis() { @@ -327,6 +336,7 @@ public ShardFollowNodeTaskStatus( final int writeBufferOperationCount, final long writeBufferSizeInBytes, final long followerMappingVersion, + final long followerSettingsVersion, final long totalReadTimeMillis, final long totalReadRemoteExecTimeMillis, final long successfulReadRequests, @@ -354,6 +364,7 @@ public ShardFollowNodeTaskStatus( this.writeBufferOperationCount = writeBufferOperationCount; this.writeBufferSizeInBytes = writeBufferSizeInBytes; this.followerMappingVersion = followerMappingVersion; + this.followerSettingsVersion = followerSettingsVersion; this.totalReadTimeMillis = totalReadTimeMillis; this.totalReadRemoteExecTimeMillis = totalReadRemoteExecTimeMillis; this.successfulReadRequests = successfulReadRequests; @@ -384,6 +395,7 @@ public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException { this.writeBufferOperationCount = in.readVInt(); this.writeBufferSizeInBytes = in.readVLong(); this.followerMappingVersion = in.readVLong(); + this.followerSettingsVersion = in.readVLong(); this.totalReadTimeMillis = in.readVLong(); this.totalReadRemoteExecTimeMillis = in.readVLong(); this.successfulReadRequests = in.readVLong(); @@ -421,6 +433,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeVInt(writeBufferOperationCount); out.writeVLong(writeBufferSizeInBytes); out.writeVLong(followerMappingVersion); + out.writeVLong(followerSettingsVersion); out.writeVLong(totalReadTimeMillis); out.writeVLong(totalReadRemoteExecTimeMillis); out.writeVLong(successfulReadRequests); @@ -470,6 +483,7 @@ public XContentBuilder toXContentFragment(final XContentBuilder builder, final P "write_buffer_size", new ByteSizeValue(writeBufferSizeInBytes)); builder.field(FOLLOWER_MAPPING_VERSION_FIELD.getPreferredName(), followerMappingVersion); + builder.field(FOLLOWER_SETTINGS_VERSION_FIELD.getPreferredName(), followerSettingsVersion); builder.humanReadableField( TOTAL_READ_TIME_MILLIS_FIELD.getPreferredName(), "total_read_time", @@ -550,6 +564,7 @@ public boolean equals(final Object o) { writeBufferOperationCount == that.writeBufferOperationCount && writeBufferSizeInBytes == that.writeBufferSizeInBytes && followerMappingVersion == that.followerMappingVersion && + followerSettingsVersion== that.followerSettingsVersion && totalReadTimeMillis == that.totalReadTimeMillis && totalReadRemoteExecTimeMillis == that.totalReadRemoteExecTimeMillis && successfulReadRequests == that.successfulReadRequests && @@ -588,6 +603,7 @@ public int hashCode() { writeBufferOperationCount, writeBufferSizeInBytes, followerMappingVersion, + followerSettingsVersion, totalReadTimeMillis, totalReadRemoteExecTimeMillis, successfulReadRequests, diff --git a/x-pack/plugin/core/src/main/resources/monitoring-es.json b/x-pack/plugin/core/src/main/resources/monitoring-es.json index 259cba34f5dc6..1e6d3ec892af7 100644 --- a/x-pack/plugin/core/src/main/resources/monitoring-es.json +++ b/x-pack/plugin/core/src/main/resources/monitoring-es.json @@ -971,6 +971,9 @@ "follower_mapping_version": { "type": "long" }, + "follower_settings_version": { + "type": "long" + }, "total_read_time_millis": { "type": "long" }, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index f0db64d3271c8..92427fb92c439 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -74,7 +75,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Optional; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Predicate; @@ -384,9 +384,11 @@ public BiConsumer getJoinValidator() { @Override public List> getPersistentTasksExecutor(ClusterService clusterService, - ThreadPool threadPool, Client client) { + ThreadPool threadPool, + Client client, + SettingsModule settingsModule) { return filterPlugins(PersistentTaskPlugin.class).stream() - .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client)) + .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule)) .flatMap(List::stream) .collect(toList()); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 086754054b494..233e697407ff6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -431,7 +432,9 @@ public Collection createComponents(Client client, ClusterService cluster } public List> getPersistentTasksExecutor(ClusterService clusterService, - ThreadPool threadPool, Client client) { + ThreadPool threadPool, + Client client, + SettingsModule settingsModule) { if (enabled == false || transportClientMode) { return emptyList(); } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java index 62bb18ee03331..f3c6f6df85740 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; @@ -189,7 +190,9 @@ public List> getExecutorBuilders(Settings settings) { @Override public List> getPersistentTasksExecutor(ClusterService clusterService, - ThreadPool threadPool, Client client) { + ThreadPool threadPool, + Client client, + SettingsModule settingsModule) { if (enabled == false || transportClientMode ) { return emptyList(); }