-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid sending duplicate remote failed shard requests #31313
Conversation
We might lose messages between getCapturedRequestsAndClear calls. This commit makes sure that both getCapturedRequestsAndClear and getCapturedRequestsByTargetNodeAndClear are atomic.
Today if a replica fails, we will send a shard failed request for each replication request to the master node until that primary receives the new cluster state. However, if a bulk requests are large and the master node is busy, we might overwhelm the cluster with shard failed requests. This commit tries to minimize the shard failed requests in the above scenario by caching the ongoing requests. This was discussed at https://discuss.elastic.co/t/half-dead-node-lead-to-cluster-hang/113658/25
Pinging @elastic/es-distributed |
I will make another PR for local shard-failed requests and #15896. |
return requests.toArray(new CapturedRequest[0]); | ||
} | ||
|
||
private Map<String, List<CapturedRequest>> groupRequestsByTargetNode(Collection<CapturedRequest> requests) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened #31312 for CapturingTransport .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dnhatn. I've left 2 questions. I'm also wondering why you chose to do the local failures in a later PR? Can't we just reuse the same failedShardsCache
?
} | ||
} | ||
if (firstException != null) { | ||
throw new ElasticsearchException("failed to notify listener", firstException); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is this exception going to end up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an interesting question. It will be reported as unhandled by the transport threads. The same thing should happen for the current implementation.
Map<FailedShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap = | ||
failingTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.failure(new RuntimeException("simulated applyFailedShards failure")))); | ||
taskResultMap.putAll(nonExistentTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success()))); | ||
Map<FailedShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap = new IdentityHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the reason for turning this into a identity map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously we did not override FailedShardEntry, but now we do. I used an identity map because we verify that the size of the resultMap equals the number of tasks.
I replaced these maps with a list. I think it's clearer now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concept looks good to me, especially where the intention is do share this with IndicesClusterStateService.
@ywelsch @bleskes Thanks for looking at this.
I did a prototype for both and realized that we might have to use two separate maps: one for local shard failed requests; one for remote shard failed requests. I prefer to do the remote first because it's tidy so that we can see the concept better. |
@elasticmachine test this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Map<FailedShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap = | ||
failingTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.failure(new RuntimeException("simulated applyFailedShards failure")))); | ||
taskResultMap.putAll(nonExistentTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success()))); | ||
List<Tuple<FailedShardEntry, ClusterStateTaskExecutor.TaskResult>> taskResultMap = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to taskResultList?
task -> ClusterStateTaskExecutor.TaskResult.failure(new ShardStateAction.NoLongerPrimaryShardException(task.shardId, | ||
"primary term [" + task.primaryTerm + "] did not match current primary term [" + | ||
currentState.metaData().index(task.shardId.getIndex()).primaryTerm(task.shardId.id()) + "]")))); | ||
List<Tuple<FailedShardEntry, ClusterStateTaskExecutor.TaskResult>> taskResultMap = tasks.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here (taskResultList)
CountDownLatch latch = new CountDownLatch(numListeners); | ||
for (int i = 0; i < numListeners; i++) { | ||
shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(), | ||
primaryTerm + 1, markAsStale, "test", getSimulatedFailure(), new ShardStateAction.Listener() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why primaryTerm + 1? just a random but fixed value would do? (we never run this task)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 496a6bd. I copied this from an existing test.
* master: Add get stored script and delete stored script to high level REST API - post backport fix Add get stored script and delete stored script to high level REST API (#31355) Core: Combine Action and GenericAction (#31405) Fix reference to XContentBuilder.string() (#31337) Avoid sending duplicate remote failed shard requests (#31313) Fix defaults in GeoShapeFieldMapper output (#31302) RestAPI: Reject forcemerge requests with a body (#30792) Packaging: Remove windows bin files from the tar distribution (#30596) Docs: Use the default distribution to test docs (#31251) [DOCS] Adds testing for security APIs (#31345) Clarify that IP range data can be specified in CIDR notation. (#31374) Use system context for cluster state update tasks (#31241) Percentile/Ranks should return null instead of NaN when empty (#30460) REST high-level client: add validate query API (#31077) Move language analyzers from server to analysis-common module. (#31300) [Test] Fix :example-plugins:rest-handler on Windows Expose lucene's RemoveDuplicatesTokenFilter (#31275) Reload secure settings for plugins (#31383) Remove some cases in FieldTypeLookupTests that are no longer relevant. (#31381) Ensure we don't use a remote profile if cluster name matches (#31331) [TEST] Double write alias fault (#30942) [DOCS] Fix version in SQL JDBC Maven template [DOCS] Improve install and setup section for SQL JDBC SQL: Fix rest endpoint names in node stats (#31371) Support for remote path in reindex api - post backport fix Closes #22913 [ML] Put ML filter API response should contain the filter (#31362) Support for remote path in reindex api (#31290) Add byte array pooling to nio http transport (#31349) Remove trial status info from start trial doc (#31365) [DOCS] Adds links to release notes and highlights add is-write-index flag to aliases (#30942) Add rollover-creation-date setting to rolled over index (#31144) [ML] Hold ML filter items in sorted set (#31338) [Tests] Fix edge case in ScriptedMetricAggregatorTests (#31357)
Today if a write replication request fails, we will send a shard-failed message to the master node to fail that replica. However, if there are many ongoing write replication requests and the master node is busy, we might overwhelm the cluster and the master node with many shard-failed requests. This commit tries to minimize the shard-failed requests in the above scenario by caching the ongoing shard-failed requests. This issue was discussed at https://discuss.elastic.co/t/half-dead-node-lead-to-cluster-hang/113658/25.
* 6.x: Avoid sending duplicate remote failed shard requests (#31313) Add get field mappings to High Level REST API Client Relates to #27205 [DOCS] Updates Watcher examples for code testing (#31152) [DOCS] Move monitoring to docs folder (#31477) [DOCS] Fixes SQL docs in nav [DOCS] Move sql to docs IndexShard should not return null stats - empty stats or AlreadyCloseException if it's closed is better Clarify that IP range data can be specified in CIDR notation. (#31374) Remove some cases in FieldTypeLookupTests that are no longer relevant. (#31381) In NumberFieldType equals and hashCode, make sure that NumberType is taken into account. (#31514) fix repository update with the same settings but different type Revert "AwaitsFix FullClusterRestartIT#testRecovery" Upgrade to Lucene 7.4.0. (#31529) Avoid deprecation warning when running the ML datafeed extractor. (#31463) Retry synced-flush in FullClusterRestartIT#testRecovery Allow multiple unicast host providers (#31509) [ML] Add ML filter update API (#31437) AwaitsFix FullClusterRestartIT#testRecovery Fix missing historyUUID in peer recovery when rolling upgrade 5.x to 6.3 (#31506) Remove QueryCachingPolicy#ALWAYS_CACHE (#31451) Rename createNewTranslog to fileBasedRecovery (#31508) [DOCS] Add code snippet testing in more ML APIs (#31339) [DOCS] Remove fixed file from build.gradle [DOCS] Creates field and document level security overview (#30937) Test: Skip assertion on windows [DOCS] Move migration APIs to docs (#31473) Add a known issue for upgrading from 5.x to 6.3.0 (#31501) Return transport addresses from UnicastHostsProvider (#31426) Add Delete Snapshot High Level REST API Reload secure settings for plugins (#31481) [DOCS] Fix JDBC Maven client group/artifact ID
Hi @hackerwin7, did you hit any issue relating to this PR? |
Yes, in 6.3.1 (before 6.4.0), we hit the pending task flooded. after reviewing this PR, it seems that there is only remote shard failed deduplicate, and you say another PR for local shard failed, so where is it? |
@hackerwin7 I haven't implemented that PR yet because of other higher priority tasks. As you are on 6.3.1, I think you have hit the remote shard failed tasks, not the local shard failed tasks. You should upgrade to a newer version to have this fix. Can you continue this discussion in the forums? This allows us to use GitHub for verified bug reports, feature requests, and pull requests. Thank you. |
|
Today if a write replication request fails, we will send a shard-failed message to the master node to fail that replica. However, if there are many ongoing write replication requests and the master node is busy, we might overwhelm the cluster and the master node with many shard-failed requests.
This commit tries to minimize the shard-failed requests in the above scenario by caching the ongoing shard-failed requests.
This issue was discussed at https://discuss.elastic.co/t/half-dead-node-lead-to-cluster-hang/113658/25.