Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
836984b
Replicate index settings to followers
jasontedor Oct 14, 2018
310bf87
Merge remote-tracking branch 'es/master' into ccr-index-settings-version
martijnvg Oct 31, 2018
fc251f0
Fixed tests
martijnvg Oct 31, 2018
0ff9ff1
Added followerSettingsVersion to follow shard stats.
martijnvg Oct 31, 2018
b71bee2
added unit tests
martijnvg Oct 31, 2018
05b51ad
Added integration tests
martijnvg Oct 31, 2018
74f2914
Merge remote-tracking branch 'es/master' into ccr-index-settings-version
martijnvg Nov 1, 2018
02f3635
Tidy up async calls
martijnvg Nov 1, 2018
485aa47
added test for whitelisted settings
martijnvg Nov 1, 2018
91ca753
Only close index if a setting is not dynamic.
martijnvg Nov 1, 2018
acbfd12
Fixed docs
martijnvg Nov 1, 2018
b33b07f
Merge remote-tracking branch 'es/master' into ccr-index-settings-version
martijnvg Nov 2, 2018
89c5586
Merge remote-tracking branch 'es/master' into ccr-index-settings-version
martijnvg Nov 5, 2018
d53360c
Merge remote-tracking branch 'es/master' into ccr-index-settings-version
martijnvg Nov 6, 2018
e66bcae
changed `PersistentTaskPlugin#getPersistentTasksExecutor()`` to use `…
martijnvg Nov 6, 2018
e3a41bc
iter
martijnvg Nov 6, 2018
64a9592
removed unused variable
martijnvg Nov 6, 2018
1c2e8da
fixed checkstyle violation
martijnvg Nov 6, 2018
775d26d
Merge remote-tracking branch 'es/master' into ccr-index-settings-version
martijnvg Nov 7, 2018
4eb9807
renamed test
martijnvg Nov 7, 2018
53f11bb
Merge remote-tracking branch 'es/master' into ccr-index-settings-version
martijnvg Nov 7, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/reference/ccr/apis/follow/get-follow-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ The `shards` array consists of objects containing the following fields:
`indices[].shards[].follower_mapping_version`::
(long) the mapping version the follower is synced up to

`indices[].shards[].follower_settings_version`::
(long) the index settings version the follower is synced up to

`indices[].shards[].total_read_time_millis`::
(long) the total time reads were outstanding, measured from the time a read
was sent to the leader to the time a reply was returned to the follower
Expand Down Expand Up @@ -206,6 +209,7 @@ The API returns the following results:
"outstanding_write_requests" : 2,
"write_buffer_operation_count" : 64,
"follower_mapping_version" : 4,
"follower_settings_version" : 2,
"total_read_time_millis" : 32768,
"total_read_remote_exec_time_millis" : 16384,
"successful_read_requests" : 32,
Expand Down Expand Up @@ -234,6 +238,7 @@ The API returns the following results:
// TESTRESPONSE[s/"outstanding_write_requests" : 2/"outstanding_write_requests" : $body.indices.0.shards.0.outstanding_write_requests/]
// TESTRESPONSE[s/"write_buffer_operation_count" : 64/"write_buffer_operation_count" : $body.indices.0.shards.0.write_buffer_operation_count/]
// TESTRESPONSE[s/"follower_mapping_version" : 4/"follower_mapping_version" : $body.indices.0.shards.0.follower_mapping_version/]
// TESTRESPONSE[s/"follower_settings_version" : 2/"follower_settings_version" : $body.indices.0.shards.0.follower_settings_version/]
// TESTRESPONSE[s/"total_read_time_millis" : 32768/"total_read_time_millis" : $body.indices.0.shards.0.total_read_time_millis/]
// TESTRESPONSE[s/"total_read_remote_exec_time_millis" : 16384/"total_read_remote_exec_time_millis" : $body.indices.0.shards.0.total_read_remote_exec_time_millis/]
// TESTRESPONSE[s/"successful_read_requests" : 32/"successful_read_requests" : $body.indices.0.shards.0.successful_read_requests/]
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ protected Node(

final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
.filterPlugins(PersistentTaskPlugin.class).stream()
.map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client))
.map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule))
.flatMap(List::stream)
.collect(toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -35,7 +36,9 @@ public interface PersistentTaskPlugin {
* Returns additional persistent tasks executors added by this plugin.
*/
default List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
ThreadPool threadPool, Client client) {
ThreadPool threadPool,
Client client,
SettingsModule settingsModule) {
return Collections.emptyList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -89,7 +90,9 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P

@Override
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
ThreadPool threadPool, Client client) {
ThreadPool threadPool,
Client client,
SettingsModule settingsModule) {
return Collections.singletonList(new TestPersistentTasksExecutor(clusterService));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
Expand Down Expand Up @@ -149,8 +150,11 @@ public Collection<Object> createComponents(

@Override
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
ThreadPool threadPool, Client client) {
return Collections.singletonList(new ShardFollowTasksExecutor(client, threadPool, clusterService));
ThreadPool threadPool,
Client client,
SettingsModule settingsModule) {
IndexScopedSettings indexScopedSettings = settingsModule.getIndexScopedSettings();
return Collections.singletonList(new ShardFollowTasksExecutor(client, threadPool, clusterService, indexScopedSettings));
}

public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -205,6 +206,12 @@ public long getMappingVersion() {
return mappingVersion;
}

private long settingsVersion;

public long getSettingsVersion() {
return settingsVersion;
}

private long globalCheckpoint;

public long getGlobalCheckpoint() {
Expand Down Expand Up @@ -240,13 +247,15 @@ public long getTookInMillis() {

Response(
final long mappingVersion,
final long settingsVersion,
final long globalCheckpoint,
final long maxSeqNo,
final long maxSeqNoOfUpdatesOrDeletes,
final Translog.Operation[] operations,
final long tookInMillis) {

this.mappingVersion = mappingVersion;
this.settingsVersion = settingsVersion;
this.globalCheckpoint = globalCheckpoint;
this.maxSeqNo = maxSeqNo;
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
Expand All @@ -258,6 +267,7 @@ public long getTookInMillis() {
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
mappingVersion = in.readVLong();
settingsVersion = in.readVLong();
globalCheckpoint = in.readZLong();
maxSeqNo = in.readZLong();
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
Expand All @@ -269,6 +279,7 @@ public void readFrom(final StreamInput in) throws IOException {
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(mappingVersion);
out.writeVLong(settingsVersion);
out.writeZLong(globalCheckpoint);
out.writeZLong(maxSeqNo);
out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
Expand All @@ -282,6 +293,7 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) return false;
final Response that = (Response) o;
return mappingVersion == that.mappingVersion &&
settingsVersion == that.settingsVersion &&
globalCheckpoint == that.globalCheckpoint &&
maxSeqNo == that.maxSeqNo &&
maxSeqNoOfUpdatesOrDeletes == that.maxSeqNoOfUpdatesOrDeletes &&
Expand All @@ -291,8 +303,14 @@ public boolean equals(final Object o) {

@Override
public int hashCode() {
return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes,
Arrays.hashCode(operations), tookInMillis);
return Objects.hash(
mappingVersion,
settingsVersion,
globalCheckpoint,
maxSeqNo,
maxSeqNoOfUpdatesOrDeletes,
Arrays.hashCode(operations),
tookInMillis);
}
}

Expand All @@ -317,7 +335,9 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
final IndexShard indexShard = indexService.getShard(request.getShard().id());
final SeqNoStats seqNoStats = indexShard.seqNoStats();
final long mappingVersion = clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();
final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
final long mappingVersion = indexMetaData.getMappingVersion();
final long settingsVersion = indexMetaData.getSettingsVersion();

final Translog.Operation[] operations = getOperations(
indexShard,
Expand All @@ -328,7 +348,13 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
request.getMaxBatchSize());
// must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations.
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations, request.relativeStartNanos);
return getResponse(
mappingVersion,
settingsVersion,
seqNoStats,
maxSeqNoOfUpdatesOrDeletes,
operations,
request.relativeStartNanos);
}

@Override
Expand Down Expand Up @@ -404,12 +430,19 @@ private void globalCheckpointAdvancementFailure(
e);
if (e instanceof TimeoutException) {
try {
final long mappingVersion =
clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();
final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
final long mappingVersion = indexMetaData.getMappingVersion();
final long settingsVersion = indexMetaData.getSettingsVersion();
final SeqNoStats latestSeqNoStats = indexShard.seqNoStats();
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY,
request.relativeStartNanos));
listener.onResponse(
getResponse(
mappingVersion,
settingsVersion,
latestSeqNoStats,
maxSeqNoOfUpdatesOrDeletes,
EMPTY_OPERATIONS_ARRAY,
request.relativeStartNanos));
} catch (final Exception caught) {
caught.addSuppressed(e);
listener.onFailure(caught);
Expand Down Expand Up @@ -494,12 +527,23 @@ static Translog.Operation[] getOperations(
return operations.toArray(EMPTY_OPERATIONS_ARRAY);
}

static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats,
final long maxSeqNoOfUpdates, final Translog.Operation[] operations, long relativeStartNanos) {
static Response getResponse(
final long mappingVersion,
final long settingsVersion,
final SeqNoStats seqNoStats,
final long maxSeqNoOfUpdates,
final Translog.Operation[] operations,
long relativeStartNanos) {
long tookInNanos = System.nanoTime() - relativeStartNanos;
long tookInMillis = TimeUnit.NANOSECONDS.toMillis(tookInNanos);
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates,
operations, tookInMillis);
return new Response(
mappingVersion,
settingsVersion,
seqNoStats.getGlobalCheckpoint(),
seqNoStats.getMaxSeqNo(),
maxSeqNoOfUpdates,
operations,
tookInMillis);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -74,6 +74,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private int numOutstandingReads = 0;
private int numOutstandingWrites = 0;
private long currentMappingVersion = 0;
private long currentSettingsVersion = 0;
private long totalReadRemoteExecTimeMillis = 0;
private long totalReadTimeMillis = 0;
private long successfulReadRequests = 0;
Expand Down Expand Up @@ -134,9 +135,19 @@ void start(
synchronized (ShardFollowNodeTask.this) {
currentMappingVersion = followerMappingVersion;
}
LOGGER.info("{} Started to follow leader shard {}, followGlobalCheckPoint={}, followerMappingVersion={}",
params.getFollowShardId(), params.getLeaderShardId(), followerGlobalCheckpoint, followerMappingVersion);
coordinateReads();
updateSettings(followerSettingsVersion -> {
synchronized (ShardFollowNodeTask.this) {
currentSettingsVersion = followerSettingsVersion;
}
LOGGER.info(
"{} following leader shard {}, follower global checkpoint=[{}], mapping version=[{}], settings version=[{}]",
params.getFollowShardId(),
params.getLeaderShardId(),
followerGlobalCheckpoint,
followerMappingVersion,
followerSettingsVersion);
coordinateReads();
});
});
}

Expand Down Expand Up @@ -269,7 +280,15 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
}

void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {
maybeUpdateMapping(response.getMappingVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response));
// In order to process this read response (3), we need to check and potentially update the follow index's setting (1) and
// check and potentially update the follow index's mappings (2).

// 3) handle read response:
Runnable handleResponseTask = () -> innerHandleReadResponse(from, maxRequiredSeqNo, response);
// 2) update follow index mapping:
Runnable updateMappingsTask = () -> maybeUpdateMapping(response.getMappingVersion(), handleResponseTask);
// 1) update follow index settings:
maybeUpdateSettings(response.getSettingsVersion(), updateMappingsTask);
}

/** Called when some operations are fetched from the leading */
Expand Down Expand Up @@ -367,6 +386,21 @@ private synchronized void maybeUpdateMapping(Long minimumRequiredMappingVersion,
}
}

private synchronized void maybeUpdateSettings(final Long minimumRequiredSettingsVersion, Runnable task) {
if (currentSettingsVersion >= minimumRequiredSettingsVersion) {
LOGGER.trace("{} settings version [{}] is higher or equal than minimum required mapping version [{}]",
params.getFollowShardId(), currentSettingsVersion, minimumRequiredSettingsVersion);
task.run();
} else {
LOGGER.trace("{} updating settings, settings version [{}] is lower than minimum required settings version [{}]",
params.getFollowShardId(), currentSettingsVersion, minimumRequiredSettingsVersion);
updateSettings(settingsVersion -> {
currentSettingsVersion = settingsVersion;
task.run();
});
}
}

private void updateMapping(LongConsumer handler) {
updateMapping(handler, new AtomicInteger(0));
}
Expand All @@ -375,6 +409,14 @@ private void updateMapping(LongConsumer handler, AtomicInteger retryCounter) {
innerUpdateMapping(handler, e -> handleFailure(e, retryCounter, () -> updateMapping(handler, retryCounter)));
}

private void updateSettings(final LongConsumer handler) {
updateSettings(handler, new AtomicInteger(0));
}

private void updateSettings(final LongConsumer handler, final AtomicInteger retryCounter) {
innerUpdateSettings(handler, e -> handleFailure(e, retryCounter, () -> updateSettings(handler, retryCounter)));
}

private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) {
assert e != null;
if (shouldRetry(e) && isStopped() == false) {
Expand Down Expand Up @@ -424,6 +466,8 @@ static boolean shouldRetry(Exception e) {
// These methods are protected for testing purposes:
protected abstract void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler);

protected abstract void innerUpdateSettings(LongConsumer handler, Consumer<Exception> errorHandler);

protected abstract void innerSendBulkShardOperationsRequest(String followerHistoryUUID,
List<Translog.Operation> operations,
long leaderMaxSeqNoOfUpdatesOrDeletes,
Expand Down Expand Up @@ -470,6 +514,7 @@ public synchronized ShardFollowNodeTaskStatus getStatus() {
buffer.size(),
bufferSizeInBytes,
currentMappingVersion,
currentSettingsVersion,
totalReadTimeMillis,
totalReadRemoteExecTimeMillis,
successfulReadRequests,
Expand Down
Loading