Skip to content

Commit 892e175

Browse files
committed
iter
1 parent d82c9c1 commit 892e175

File tree

2 files changed

+64
-53
lines changed

2 files changed

+64
-53
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java

Lines changed: 53 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ protected void doExecute(Request request, ActionListener<Response> listener) {
193193
start(request, null, leaderIndexMetadata, followIndexMetadata, listener);
194194
} catch (IOException e) {
195195
listener.onFailure(e);
196+
return;
196197
}
197198
} else {
198199
// Following an index in remote cluster, so use remote client to fetch leader IndexMetaData:
@@ -229,75 +230,75 @@ void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMe
229230
ActionListener<Response> handler) throws IOException {
230231
MapperService mapperService = followIndexMetadata != null ? indicesService.createIndexMapperService(followIndexMetadata) : null;
231232
validate(request, leaderIndexMetadata, followIndexMetadata, mapperService);
232-
final int numShards = followIndexMetadata.getNumberOfShards();
233-
final AtomicInteger counter = new AtomicInteger(numShards);
234-
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
235-
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
236-
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
237-
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) {
238-
final int shardId = i;
239-
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
240-
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
241-
new ShardId(followIndexMetadata.getIndex(), shardId),
242-
new ShardId(leaderIndexMetadata.getIndex(), shardId),
243-
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders);
244-
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
245-
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
246-
@Override
247-
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> task) {
248-
responses.set(shardId, task);
249-
finalizeResponse();
250-
}
251-
233+
final int numShards = followIndexMetadata.getNumberOfShards();
234+
final AtomicInteger counter = new AtomicInteger(numShards);
235+
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
236+
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
237+
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
238+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) {
239+
final int shardId = i;
240+
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
241+
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
242+
new ShardId(followIndexMetadata.getIndex(), shardId),
243+
new ShardId(leaderIndexMetadata.getIndex(), shardId),
244+
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders);
245+
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
246+
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
252247
@Override
253-
public void onFailure(Exception e) {
254-
responses.set(shardId, e);
248+
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> task) {
249+
responses.set(shardId, task);
255250
finalizeResponse();
256251
}
257252

258-
void finalizeResponse() {
259-
Exception error = null;
260-
if (counter.decrementAndGet() == 0) {
261-
for (int j = 0; j < responses.length(); j++) {
262-
Object response = responses.get(j);
263-
if (response instanceof Exception) {
264-
if (error == null) {
265-
error = (Exception) response;
266-
} else {
267-
error.addSuppressed((Throwable) response);
268-
}
253+
@Override
254+
public void onFailure(Exception e) {
255+
responses.set(shardId, e);
256+
finalizeResponse();
257+
}
258+
259+
void finalizeResponse() {
260+
Exception error = null;
261+
if (counter.decrementAndGet() == 0) {
262+
for (int j = 0; j < responses.length(); j++) {
263+
Object response = responses.get(j);
264+
if (response instanceof Exception) {
265+
if (error == null) {
266+
error = (Exception) response;
267+
} else {
268+
error.addSuppressed((Throwable) response);
269269
}
270270
}
271+
}
271272

272-
if (error == null) {
273-
// include task ids?
274-
handler.onResponse(new Response(true));
275-
} else {
276-
// TODO: cancel all started tasks
277-
handler.onFailure(error);
278-
}
273+
if (error == null) {
274+
// include task ids?
275+
handler.onResponse(new Response(true));
276+
} else {
277+
// TODO: cancel all started tasks
278+
handler.onFailure(error);
279279
}
280280
}
281281
}
282+
}
282283
);
283284
}
284285
}
285286
}
286-
287+
287288
private static final Set<Setting<?>> WHITELISTED_SETTINGS;
288-
289+
289290
static {
290291
Set<Setting<?>> whiteListedSettings = new HashSet<>();
291292
whiteListedSettings.add(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING);
292293
whiteListedSettings.add(IndexMetaData.INDEX_AUTO_EXPAND_REPLICAS_SETTING);
293-
294+
294295
whiteListedSettings.add(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING);
295296
whiteListedSettings.add(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING);
296297
whiteListedSettings.add(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING);
297298
whiteListedSettings.add(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING);
298299
whiteListedSettings.add(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING);
299300
whiteListedSettings.add(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING);
300-
301+
301302
whiteListedSettings.add(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING);
302303
whiteListedSettings.add(IndexSettings.MAX_RESULT_WINDOW_SETTING);
303304
whiteListedSettings.add(IndexSettings.INDEX_WARMER_ENABLED_SETTING);
@@ -311,7 +312,7 @@ void finalizeResponse() {
311312
whiteListedSettings.add(IndexSettings.ALLOW_UNMAPPED);
312313
whiteListedSettings.add(IndexSettings.INDEX_SEARCH_IDLE_AFTER);
313314
whiteListedSettings.add(BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING);
314-
315+
315316
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG_SETTING);
316317
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN_SETTING);
317318
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO_SETTING);
@@ -328,10 +329,10 @@ void finalizeResponse() {
328329
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_LEVEL_SETTING);
329330
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_REFORMAT_SETTING);
330331
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_MAX_SOURCE_CHARS_TO_LOG_SETTING);
331-
332+
332333
whiteListedSettings.add(IndexSettings.INDEX_SOFT_DELETES_SETTING);
333334
whiteListedSettings.add(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING);
334-
335+
335336
WHITELISTED_SETTINGS = Collections.unmodifiableSet(whiteListedSettings);
336337
}
337338

@@ -356,27 +357,27 @@ static void validate(Request request, IndexMetaData leaderIndex, IndexMetaData f
356357
if (leaderIndex.getState() != IndexMetaData.State.OPEN || followIndex.getState() != IndexMetaData.State.OPEN) {
357358
throw new IllegalArgumentException("leader and follow index must be open");
358359
}
359-
360+
360361
// Make a copy, remove settings that are allowed to be different and then compare if the settings are equal.
361362
Settings leaderSettings = filter(leaderIndex.getSettings());
362363
Settings followerSettings = filter(followIndex.getSettings());
363364
if (leaderSettings.equals(followerSettings) == false) {
364365
throw new IllegalArgumentException("the leader and follower index settings must be identical");
365366
}
366-
367+
367368
// Validates if the current follower mapping is mergable with the leader mapping.
368369
// This also validates for example whether specific mapper plugins have been installed
369370
followerMapperService.merge(leaderIndex, MapperService.MergeReason.MAPPING_RECOVERY);
370371
}
371-
372+
372373
private static Settings filter(Settings originalSettings) {
373374
Settings.Builder settings = Settings.builder().put(originalSettings);
374375
// Remove settings that are always going to be different between leader and follow index:
375376
settings.remove(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey());
376377
settings.remove(IndexMetaData.SETTING_INDEX_UUID);
377378
settings.remove(IndexMetaData.SETTING_INDEX_PROVIDED_NAME);
378379
settings.remove(IndexMetaData.SETTING_CREATION_DATE);
379-
380+
380381
Iterator<String> iterator = settings.keys().iterator();
381382
while (iterator.hasNext()) {
382383
String key = iterator.next();
@@ -389,5 +390,5 @@ private static Settings filter(Settings originalSettings) {
389390
}
390391
return settings.build();
391392
}
392-
393+
393394
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,22 +27,26 @@ public void testValidation() throws IOException {
2727
request.setFollowIndex("index2");
2828

2929
{
30+
// should fail, because leader index does not exist
3031
Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(request, null, null, null));
3132
assertThat(e.getMessage(), equalTo("leader index [index1] does not exist"));
3233
}
3334
{
35+
// should fail, because follow index does not exist
3436
IndexMetaData leaderIMD = createIMD("index1", 5);
3537
Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(request, leaderIMD, null, null));
3638
assertThat(e.getMessage(), equalTo("follow index [index2] does not exist"));
3739
}
3840
{
41+
// should fail because leader index does not have soft deletes enabled
3942
IndexMetaData leaderIMD = createIMD("index1", 5);
4043
IndexMetaData followIMD = createIMD("index2", 5);
4144
Exception e = expectThrows(IllegalArgumentException.class,
4245
() -> FollowIndexAction.validate(request, leaderIMD, followIMD, null));
4346
assertThat(e.getMessage(), equalTo("leader index [index1] does not have soft deletes enabled"));
4447
}
4548
{
49+
// should fail because the number of primary shards between leader and follow index are not equal
4650
IndexMetaData leaderIMD = createIMD("index1", 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
4751
IndexMetaData followIMD = createIMD("index2", 4);
4852
Exception e = expectThrows(IllegalArgumentException.class,
@@ -51,6 +55,7 @@ public void testValidation() throws IOException {
5155
equalTo("leader index primary shards [5] does not match with the number of shards of the follow index [4]"));
5256
}
5357
{
58+
// should fail, because leader index is closed
5459
IndexMetaData leaderIMD = createIMD("index1", State.CLOSE, "{}", 5,
5560
new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
5661
IndexMetaData followIMD = createIMD("index2", State.OPEN, "{}", 5,
@@ -60,6 +65,7 @@ public void testValidation() throws IOException {
6065
assertThat(e.getMessage(), equalTo("leader and follow index must be open"));
6166
}
6267
{
68+
// should fail, because leader has a field with the same name mapped as keyword and follower as text
6369
IndexMetaData leaderIMD = createIMD("index1", State.OPEN, "{\"properties\": {\"field\": {\"type\": \"keyword\"}}}", 5,
6470
new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
6571
IndexMetaData followIMD = createIMD("index2", State.OPEN, "{\"properties\": {\"field\": {\"type\": \"text\"}}}", 5);
@@ -70,6 +76,7 @@ public void testValidation() throws IOException {
7076
assertThat(e.getMessage(), equalTo("mapper [field] of different type, current_type [text], merged_type [keyword]"));
7177
}
7278
{
79+
// should fail because of non whitelisted settings not the same between leader and follow index
7380
String mapping = "{\"properties\": {\"field\": {\"type\": \"text\", \"analyzer\": \"my_analyzer\"}}}";
7481
IndexMetaData leaderIMD = createIMD("index1", State.OPEN, mapping, 5,
7582
new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"),
@@ -83,13 +90,15 @@ public void testValidation() throws IOException {
8390
assertThat(e.getMessage(), equalTo("the leader and follower index settings must be identical"));
8491
}
8592
{
93+
// should succeed
8694
IndexMetaData leaderIMD = createIMD("index1", 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
8795
IndexMetaData followIMD = createIMD("index2", 5);
8896
MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "index2");
8997
mapperService.updateMapping(followIMD);
9098
FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService);
9199
}
92100
{
101+
// should succeed, index settings are identical
93102
String mapping = "{\"properties\": {\"field\": {\"type\": \"text\", \"analyzer\": \"my_analyzer\"}}}";
94103
IndexMetaData leaderIMD = createIMD("index1", State.OPEN, mapping, 5,
95104
new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"),
@@ -104,6 +113,7 @@ public void testValidation() throws IOException {
104113
FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService);
105114
}
106115
{
116+
// should succeed despite whitelisted settings being different
107117
String mapping = "{\"properties\": {\"field\": {\"type\": \"text\", \"analyzer\": \"my_analyzer\"}}}";
108118
IndexMetaData leaderIMD = createIMD("index1", State.OPEN, mapping, 5,
109119
new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"),
@@ -124,7 +134,7 @@ public void testValidation() throws IOException {
124134
private static IndexMetaData createIMD(String index, int numShards, Tuple<?, ?>... settings) throws IOException {
125135
return createIMD(index, State.OPEN, "{\"properties\": {}}", numShards, settings);
126136
}
127-
137+
128138
private static IndexMetaData createIMD(String index, State state, String mapping, int numShards,
129139
Tuple<?, ?>... settings) throws IOException {
130140
Settings.Builder settingsBuilder = settings(Version.CURRENT);

0 commit comments

Comments
 (0)