-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add framework level constructs to track shard indexing pressure. #525
Add framework level constructs to track shard indexing pressure. #525
Conversation
✅ DCO Check Passed 14426c2 |
✅ Gradle Wrapper Validation success 14426c2 |
✅ Gradle Precommit success 14426c2 |
private final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0); | ||
private final AtomicLong currentCoordinatingBytes = new AtomicLong(0); | ||
private final AtomicLong currentPrimaryBytes = new AtomicLong(0); | ||
private final AtomicLong currentReplicaBytes = new AtomicLong(0); | ||
final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0); | ||
final AtomicLong currentCoordinatingBytes = new AtomicLong(0); | ||
final AtomicLong currentPrimaryBytes = new AtomicLong(0); | ||
final AtomicLong currentReplicaBytes = new AtomicLong(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the intent is to make it visible to the subclass... If yes make this protected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
intent was also the to keep this as package private. Agree there are tradeoffs based on which instance is created ie IndexingPressure
vs ShardIndexingPressure
ShardIndexingPressureTracker newShardIndexingPressureTracker = new ShardIndexingPressureTracker(shardId); | ||
newShardIndexingPressureTracker.getPrimaryAndCoordinatingLimits().set(this.shardIndexingPressureSettings | ||
.getShardPrimaryAndCoordinatingBaseLimits()); | ||
newShardIndexingPressureTracker.getReplicaLimits().set(this.shardIndexingPressureSettings.getShardReplicaBaseLimits()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets add base limtis to the constructor itself. So that the new instance gets created with a base limit always.
✅ DCO Check Passed 57a0032 |
✅ Gradle Wrapper Validation success 57a0032 |
✅ Gradle Precommit success 57a0032 |
|
||
public static boolean isShardIndexingPressureAttributeEnabled() { | ||
Iterator<DiscoveryNode> nodes = clusterService.state().getNodes().getNodes().valuesIt(); | ||
while (nodes.hasNext()) { | ||
if (Boolean.parseBoolean(nodes.next().getAttributes().get(SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY)) == false) { | ||
return false; | ||
} | ||
} | ||
return true; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move this to ShardIndexingPressureSettings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense. Moved to ShardIndexingPressureSettings
for containment of logic.
boolean isReplicaShardLimitBreached(ShardIndexingPressureTracker tracker, long requestStartTime, | ||
Map<Long, ShardIndexingPressureTracker> shardIndexingPressureStore, long nodeReplicaBytes) { | ||
|
||
//Memory limits is breached when the current utilization is greater than operating_factor.upper of total shard limits. | ||
long shardReplicaBytes = tracker.memory().getCurrentReplicaBytes().get(); | ||
long shardReplicaLimits = tracker.getReplicaLimits().get(); | ||
final boolean shardMemoryLimitsBreached = | ||
((double)shardReplicaBytes / shardReplicaLimits) > this.upperOperatingFactor; | ||
|
||
if(shardMemoryLimitsBreached) { | ||
/* | ||
Secondary Parameters(i.e. LastSuccessfulRequestDuration and Throughput) is taken into consideration when | ||
the current node utilization is greater than primary_parameter.node.soft_limit of total node limits. | ||
*/ | ||
if(((double)nodeReplicaBytes / this.shardIndexingPressureSettings.getNodeReplicaLimits()) < this.nodeSoftLimit) { | ||
boolean isShardLimitsIncreased = | ||
this.increaseShardReplicaLimits(tracker, shardIndexingPressureStore); | ||
if(isShardLimitsIncreased == false) { | ||
tracker.rejection().getReplicaNodeLimitsBreachedRejections().incrementAndGet(); | ||
totalNodeLimitsBreachedRejections.incrementAndGet(); | ||
} | ||
|
||
return !isShardLimitsIncreased; | ||
} else { | ||
boolean shardLastSuccessfulRequestDurationLimitsBreached = | ||
this.evaluateLastSuccessfulRequestDurationLimitsBreached( | ||
tracker.timeStamp().getLastSuccessfulReplicaRequestTimestamp().get(), | ||
requestStartTime, tracker.outstandingRequest().getTotalOutstandingReplicaRequests().get()); | ||
|
||
boolean shardThroughputDegradationLimitsBreached = | ||
this.evaluateThroughputDegradationLimitsBreached( | ||
Double.longBitsToDouble(tracker.throughput().getReplicaThroughputMovingAverage().get()), | ||
tracker.memory().getTotalReplicaBytes().get(), tracker.latency().getReplicaTimeInMillis().get(), | ||
tracker.throughput().getReplicaThroughputMovingQueue().size(), replicaThroughputDegradationLimits); | ||
|
||
if (shardLastSuccessfulRequestDurationLimitsBreached || shardThroughputDegradationLimitsBreached) { | ||
if(shardLastSuccessfulRequestDurationLimitsBreached) { | ||
tracker.rejection().getReplicaLastSuccessfulRequestLimitsBreachedRejections().incrementAndGet(); | ||
totalLastSuccessfulRequestLimitsBreachedRejections.incrementAndGet(); | ||
} else { | ||
tracker.rejection().getReplicaThroughputDegradationLimitsBreachedRejections().incrementAndGet(); | ||
totalThroughputDegradationLimitsBreachedRejections.incrementAndGet(); | ||
} | ||
|
||
return true; | ||
} else { | ||
boolean isShardLimitsIncreased = | ||
this.increaseShardReplicaLimits(tracker, shardIndexingPressureStore); | ||
if(isShardLimitsIncreased == false) { | ||
tracker.rejection().getReplicaNodeLimitsBreachedRejections().incrementAndGet(); | ||
totalNodeLimitsBreachedRejections.incrementAndGet(); | ||
} | ||
|
||
return !isShardLimitsIncreased; | ||
} | ||
} | ||
} else { | ||
return false; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets add this to meta as a fast follow up to break this monolith
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, added in #478 as an item list.
private void updateIndexingPressureColdStore(ShardIndexingPressureTracker tracker) { | ||
if (shardIndexingPressureColdStore.size() > maxColdStoreSize) { | ||
shardIndexingPressureColdStore.clear(); | ||
} | ||
shardIndexingPressureColdStore.put((long)tracker.getShardId().hashCode(), tracker); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clearing the cold store just because it got full doesn't seem ideal. Maybe we can improve the data structure used here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Saurabh/Dharmesh for the changes. LGTM overall, lets revisit this.
✅ Gradle Wrapper Validation success bce6e90 |
✅ DCO Check Passed bce6e90 |
✅ Gradle Precommit success bce6e90 |
start gradle check |
✅ Gradle Wrapper Validation success 7c4c654ccb6187edf30ca39e2434b68f4552795f |
✅ DCO Check Passed 7c4c654ccb6187edf30ca39e2434b68f4552795f |
✅ Gradle Precommit success 7c4c654ccb6187edf30ca39e2434b68f4552795f |
@nknize Fixed some of the integ tests, Can you please trigger |
start gradle check |
❌ Gradle Check failure 7c4c654ccb6187edf30ca39e2434b68f4552795f |
0a91e90
to
13cf3d2
Compare
❌ Gradle Precommit failure 0a91e90452a4110e00c7757ec9f527cfe1059a6f |
❌ Gradle Wrapper Validation failure 0a91e90452a4110e00c7757ec9f527cfe1059a6f :alert: Gradle Wrapper integrity has been altered |
✅ DCO Check Passed 13cf3d26fff187dcc308530cbf9d95085e8ad43c |
✅ Gradle Wrapper Validation success 13cf3d26fff187dcc308530cbf9d95085e8ad43c |
✅ Gradle Precommit success da37500 |
start gradle check |
Signed-off-by: Dharmesh 💤 <sdharms@amazon.com>
✅ Gradle Wrapper Validation success c069b09 |
✅ DCO Check Passed c069b09 |
✅ Gradle Precommit success c069b09 |
start gradle check |
Signed-off-by: Saurabh Singh <sisurab@amazon.com>
✅ DCO Check Passed 1b6bc7d |
✅ Gradle Wrapper Validation success 1b6bc7d |
✅ Gradle Precommit success 1b6bc7d |
start gradle check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just starting to look at the framework changes so I'll dig a little deeper. First thought I'm curious why this is introducing an IndexingPressure specific service and not built into IndicesService
which is used at the node level anyway?
Thanks @nknize. |
+1 to this plan! I understand the benefits to isolating the logic for "future" considerations and am not questioning that choice. I'm more specifically curious (keeping in mind that I haven't dug too deep into the changes yet) why this is a new service at the NodeService layer instead of placing that isolated index monitoring logic in the existing |
This is somewhat true. And the reason behind is that the tracking/monitoring done by the Also the implementation of |
Hi @nknize Please let me know if you have more thoughts. |
✅ Gradle Wrapper Validation success 1b6bc7d |
✅ DCO Check Passed 1b6bc7d |
✅ Gradle Precommit success 1b6bc7d |
1 similar comment
✅ Gradle Precommit success 1b6bc7d |
Can one of the admins verify this patch? |
Description
This PR is 1st of the 4 planned PRs for Shard Indexing Pressure ((#478). It provides the framework level constructs to track shard indexing pressure.
Shard Indexing Pressure introduces smart rejections of indexing requests when there are too many stuck/slow requests in the cluster, breaching key performance thresholds. This prevents the nodes in cluster to run into cascading effects of failures.
Co-authored-by: Dharmesh Singh sdharms@amazon.com
Issues Resolved
Addresses Item 1 of #478
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.