Skip to content

Commit e420b21

Browse files
jasontedormartijnvg
andcommitted
Replicate index settings to followers (#35089)
This commit uses the index settings version so that a follower can replicate index settings changes as needed from the leader. Co-authored-by: Martijn van Groningen <martijn.v.groningen@gmail.com>
1 parent 067e23c commit e420b21

File tree

22 files changed

+667
-83
lines changed

22 files changed

+667
-83
lines changed

docs/reference/ccr/apis/follow/get-follow-stats.asciidoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ The `shards` array consists of objects containing the following fields:
111111
`indices[].shards[].follower_mapping_version`::
112112
(long) the mapping version the follower is synced up to
113113

114+
`indices[].shards[].follower_settings_version`::
115+
(long) the index settings version the follower is synced up to
116+
114117
`indices[].shards[].total_read_time_millis`::
115118
(long) the total time reads were outstanding, measured from the time a read
116119
was sent to the leader to the time a reply was returned to the follower
@@ -206,6 +209,7 @@ The API returns the following results:
206209
"outstanding_write_requests" : 2,
207210
"write_buffer_operation_count" : 64,
208211
"follower_mapping_version" : 4,
212+
"follower_settings_version" : 2,
209213
"total_read_time_millis" : 32768,
210214
"total_read_remote_exec_time_millis" : 16384,
211215
"successful_read_requests" : 32,
@@ -234,6 +238,7 @@ The API returns the following results:
234238
// TESTRESPONSE[s/"outstanding_write_requests" : 2/"outstanding_write_requests" : $body.indices.0.shards.0.outstanding_write_requests/]
235239
// TESTRESPONSE[s/"write_buffer_operation_count" : 64/"write_buffer_operation_count" : $body.indices.0.shards.0.write_buffer_operation_count/]
236240
// TESTRESPONSE[s/"follower_mapping_version" : 4/"follower_mapping_version" : $body.indices.0.shards.0.follower_mapping_version/]
241+
// TESTRESPONSE[s/"follower_settings_version" : 2/"follower_settings_version" : $body.indices.0.shards.0.follower_settings_version/]
237242
// TESTRESPONSE[s/"total_read_time_millis" : 32768/"total_read_time_millis" : $body.indices.0.shards.0.total_read_time_millis/]
238243
// TESTRESPONSE[s/"total_read_remote_exec_time_millis" : 16384/"total_read_remote_exec_time_millis" : $body.indices.0.shards.0.total_read_remote_exec_time_millis/]
239244
// TESTRESPONSE[s/"successful_read_requests" : 32/"successful_read_requests" : $body.indices.0.shards.0.successful_read_requests/]

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ protected Node(
538538

539539
final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
540540
.filterPlugins(PersistentTaskPlugin.class).stream()
541-
.map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client))
541+
.map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule))
542542
.flatMap(List::stream)
543543
.collect(toList());
544544

server/src/main/java/org/elasticsearch/plugins/PersistentTaskPlugin.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.elasticsearch.client.Client;
2222
import org.elasticsearch.cluster.service.ClusterService;
23+
import org.elasticsearch.common.settings.SettingsModule;
2324
import org.elasticsearch.persistent.PersistentTasksExecutor;
2425
import org.elasticsearch.threadpool.ThreadPool;
2526

@@ -35,7 +36,9 @@ public interface PersistentTaskPlugin {
3536
* Returns additional persistent tasks executors added by this plugin.
3637
*/
3738
default List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
38-
ThreadPool threadPool, Client client) {
39+
ThreadPool threadPool,
40+
Client client,
41+
SettingsModule settingsModule) {
3942
return Collections.emptyList();
4043
}
4144

server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.common.io.stream.StreamOutput;
4747
import org.elasticsearch.common.io.stream.Writeable;
4848
import org.elasticsearch.common.settings.Settings;
49+
import org.elasticsearch.common.settings.SettingsModule;
4950
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
5051
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
5152
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -91,7 +92,9 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
9192

9293
@Override
9394
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
94-
ThreadPool threadPool, Client client) {
95+
ThreadPool threadPool,
96+
Client client,
97+
SettingsModule settingsModule) {
9598
return Collections.singletonList(new TestPersistentTasksExecutor(Settings.EMPTY, clusterService));
9699
}
97100

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.common.settings.Setting;
2020
import org.elasticsearch.common.settings.Settings;
2121
import org.elasticsearch.common.settings.SettingsFilter;
22+
import org.elasticsearch.common.settings.SettingsModule;
2223
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2324
import org.elasticsearch.env.Environment;
2425
import org.elasticsearch.env.NodeEnvironment;
@@ -40,41 +41,41 @@
4041
import org.elasticsearch.threadpool.ThreadPool;
4142
import org.elasticsearch.watcher.ResourceWatcherService;
4243
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
43-
import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction;
44-
import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction;
45-
import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction;
46-
import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction;
47-
import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction;
48-
import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction;
49-
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
50-
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
51-
import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction;
52-
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
5344
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
5445
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
5546
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
47+
import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction;
48+
import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction;
5649
import org.elasticsearch.xpack.ccr.action.TransportFollowStatsAction;
50+
import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction;
51+
import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction;
52+
import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction;
5753
import org.elasticsearch.xpack.ccr.action.TransportPutFollowAction;
58-
import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction;
5954
import org.elasticsearch.xpack.ccr.action.TransportResumeFollowAction;
60-
import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction;
61-
import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction;
55+
import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction;
6256
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
6357
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
6458
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
59+
import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction;
60+
import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction;
6561
import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction;
62+
import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction;
63+
import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction;
64+
import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction;
6665
import org.elasticsearch.xpack.ccr.rest.RestPutFollowAction;
67-
import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction;
6866
import org.elasticsearch.xpack.ccr.rest.RestResumeFollowAction;
69-
import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction;
70-
import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction;
67+
import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction;
7168
import org.elasticsearch.xpack.core.XPackClientActionPlugin;
7269
import org.elasticsearch.xpack.core.XPackPlugin;
7370
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
71+
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
72+
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
7473
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
74+
import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction;
75+
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
76+
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
7577
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
7678
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
77-
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
7879
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
7980

8081
import java.util.Arrays;
@@ -155,8 +156,11 @@ public Collection<Object> createComponents(
155156

156157
@Override
157158
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
158-
ThreadPool threadPool, Client client) {
159-
return Collections.singletonList(new ShardFollowTasksExecutor(settings, client, threadPool, clusterService));
159+
ThreadPool threadPool,
160+
Client client,
161+
SettingsModule settingsModule) {
162+
IndexScopedSettings indexScopedSettings = settingsModule.getIndexScopedSettings();
163+
return Collections.singletonList(new ShardFollowTasksExecutor(settings, client, threadPool, clusterService, indexScopedSettings));
160164
}
161165

162166
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
1919
import org.elasticsearch.client.ElasticsearchClient;
2020
import org.elasticsearch.cluster.ClusterState;
21+
import org.elasticsearch.cluster.metadata.IndexMetaData;
2122
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2223
import org.elasticsearch.cluster.routing.ShardsIterator;
2324
import org.elasticsearch.cluster.service.ClusterService;
@@ -213,6 +214,12 @@ public long getMappingVersion() {
213214
return mappingVersion;
214215
}
215216

217+
private long settingsVersion;
218+
219+
public long getSettingsVersion() {
220+
return settingsVersion;
221+
}
222+
216223
private long globalCheckpoint;
217224

218225
public long getGlobalCheckpoint() {
@@ -248,13 +255,15 @@ public long getTookInMillis() {
248255

249256
Response(
250257
final long mappingVersion,
258+
final long settingsVersion,
251259
final long globalCheckpoint,
252260
final long maxSeqNo,
253261
final long maxSeqNoOfUpdatesOrDeletes,
254262
final Translog.Operation[] operations,
255263
final long tookInMillis) {
256264

257265
this.mappingVersion = mappingVersion;
266+
this.settingsVersion = settingsVersion;
258267
this.globalCheckpoint = globalCheckpoint;
259268
this.maxSeqNo = maxSeqNo;
260269
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
@@ -266,6 +275,7 @@ public long getTookInMillis() {
266275
public void readFrom(final StreamInput in) throws IOException {
267276
super.readFrom(in);
268277
mappingVersion = in.readVLong();
278+
settingsVersion = in.readVLong();
269279
globalCheckpoint = in.readZLong();
270280
maxSeqNo = in.readZLong();
271281
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
@@ -277,6 +287,7 @@ public void readFrom(final StreamInput in) throws IOException {
277287
public void writeTo(final StreamOutput out) throws IOException {
278288
super.writeTo(out);
279289
out.writeVLong(mappingVersion);
290+
out.writeVLong(settingsVersion);
280291
out.writeZLong(globalCheckpoint);
281292
out.writeZLong(maxSeqNo);
282293
out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
@@ -290,6 +301,7 @@ public boolean equals(final Object o) {
290301
if (o == null || getClass() != o.getClass()) return false;
291302
final Response that = (Response) o;
292303
return mappingVersion == that.mappingVersion &&
304+
settingsVersion == that.settingsVersion &&
293305
globalCheckpoint == that.globalCheckpoint &&
294306
maxSeqNo == that.maxSeqNo &&
295307
maxSeqNoOfUpdatesOrDeletes == that.maxSeqNoOfUpdatesOrDeletes &&
@@ -299,8 +311,14 @@ public boolean equals(final Object o) {
299311

300312
@Override
301313
public int hashCode() {
302-
return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes,
303-
Arrays.hashCode(operations), tookInMillis);
314+
return Objects.hash(
315+
mappingVersion,
316+
settingsVersion,
317+
globalCheckpoint,
318+
maxSeqNo,
319+
maxSeqNoOfUpdatesOrDeletes,
320+
Arrays.hashCode(operations),
321+
tookInMillis);
304322
}
305323
}
306324

@@ -333,7 +351,9 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
333351
final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
334352
final IndexShard indexShard = indexService.getShard(request.getShard().id());
335353
final SeqNoStats seqNoStats = indexShard.seqNoStats();
336-
final long mappingVersion = clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();
354+
final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
355+
final long mappingVersion = indexMetaData.getMappingVersion();
356+
final long settingsVersion = indexMetaData.getSettingsVersion();
337357

338358
final Translog.Operation[] operations = getOperations(
339359
indexShard,
@@ -344,7 +364,13 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
344364
request.getMaxBatchSize());
345365
// must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations.
346366
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
347-
return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations, request.relativeStartNanos);
367+
return getResponse(
368+
mappingVersion,
369+
settingsVersion,
370+
seqNoStats,
371+
maxSeqNoOfUpdatesOrDeletes,
372+
operations,
373+
request.relativeStartNanos);
348374
}
349375

350376
@Override
@@ -420,12 +446,19 @@ private void globalCheckpointAdvancementFailure(
420446
e);
421447
if (e instanceof TimeoutException) {
422448
try {
423-
final long mappingVersion =
424-
clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();
449+
final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
450+
final long mappingVersion = indexMetaData.getMappingVersion();
451+
final long settingsVersion = indexMetaData.getSettingsVersion();
425452
final SeqNoStats latestSeqNoStats = indexShard.seqNoStats();
426453
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
427-
listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY,
428-
request.relativeStartNanos));
454+
listener.onResponse(
455+
getResponse(
456+
mappingVersion,
457+
settingsVersion,
458+
latestSeqNoStats,
459+
maxSeqNoOfUpdatesOrDeletes,
460+
EMPTY_OPERATIONS_ARRAY,
461+
request.relativeStartNanos));
429462
} catch (final Exception caught) {
430463
caught.addSuppressed(e);
431464
listener.onFailure(caught);
@@ -510,12 +543,23 @@ static Translog.Operation[] getOperations(
510543
return operations.toArray(EMPTY_OPERATIONS_ARRAY);
511544
}
512545

513-
static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats,
514-
final long maxSeqNoOfUpdates, final Translog.Operation[] operations, long relativeStartNanos) {
546+
static Response getResponse(
547+
final long mappingVersion,
548+
final long settingsVersion,
549+
final SeqNoStats seqNoStats,
550+
final long maxSeqNoOfUpdates,
551+
final Translog.Operation[] operations,
552+
long relativeStartNanos) {
515553
long tookInNanos = System.nanoTime() - relativeStartNanos;
516554
long tookInMillis = TimeUnit.NANOSECONDS.toMillis(tookInNanos);
517-
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates,
518-
operations, tookInMillis);
555+
return new Response(
556+
mappingVersion,
557+
settingsVersion,
558+
seqNoStats.getGlobalCheckpoint(),
559+
seqNoStats.getMaxSeqNo(),
560+
maxSeqNoOfUpdates,
561+
operations,
562+
tookInMillis);
519563
}
520564

521565
}

0 commit comments

Comments
 (0)