-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Shard Indexing Pressure Memory Manager (#478) #945
Add Shard Indexing Pressure Memory Manager (#478) #945
Conversation
…t#478) Signed-off-by: Saurabh Singh <sisurab@amazon.com>
✅ Gradle Wrapper Validation success 5b3f104 |
✅ DCO Check Passed 5b3f104 |
✅ Gradle Precommit success 5b3f104 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks lets break down redundant logic across methods
* Throughput of last N request divided by the total lifetime requests throughput is greater than the acceptable | ||
* degradation limits then we say this parameter has breached the threshold. | ||
*/ | ||
private boolean evaluateThroughputDegradationLimitsBreached(PerformanceTracker performanceTracker, StatsTracker statsTracker, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit space before method name
public final AtomicLong totalNodeLimitsBreachedRejections = new AtomicLong(); | ||
public final AtomicLong totalLastSuccessfulRequestLimitsBreachedRejections = new AtomicLong(); | ||
public final AtomicLong totalThroughputDegradationLimitsBreachedRejections = new AtomicLong(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a good practise to expose public
non-static/non-final member variables. How do you prevent it from illegal modification from outside this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this was unintended miss in the initial Draft PR. This is now updated in the final PR.
if(shardMemoryLimitsBreached) { | ||
// Secondary Parameters (i.e. LastSuccessfulRequestDuration and Throughput) is taken into consideration when | ||
// the current node utilization is greater than primary_parameter.node.soft_limit of total node limits. | ||
if(((double)nodeTotalBytes / this.shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits()) < this.nodeSoftLimit) { | ||
boolean isShardLimitsIncreased = this.increaseShardPrimaryAndCoordinatingLimits(tracker); | ||
if(isShardLimitsIncreased == false) { | ||
tracker.getPrimaryOperationTracker().getRejectionTracker().incrementNodeLimitsBreachedRejections(); | ||
totalNodeLimitsBreachedRejections.incrementAndGet(); | ||
} | ||
return !isShardLimitsIncreased; | ||
} else { | ||
boolean shardLastSuccessfulRequestDurationLimitsBreached = | ||
this.evaluateLastSuccessfulRequestDurationLimitsBreached(tracker.getPrimaryOperationTracker().getPerformanceTracker(), | ||
requestStartTime); | ||
|
||
if(shardLastSuccessfulRequestDurationLimitsBreached) { | ||
tracker.getPrimaryOperationTracker().getRejectionTracker() | ||
.incrementLastSuccessfulRequestLimitsBreachedRejections(); | ||
totalLastSuccessfulRequestLimitsBreachedRejections.incrementAndGet(); | ||
return true; | ||
} | ||
|
||
boolean shardThroughputDegradationLimitsBreached = | ||
this.evaluateThroughputDegradationLimitsBreached(tracker.getPrimaryOperationTracker().getPerformanceTracker(), | ||
tracker.getPrimaryOperationTracker().getStatsTracker(), | ||
primaryAndCoordinatingThroughputDegradationLimits); | ||
|
||
if (shardThroughputDegradationLimitsBreached) { | ||
tracker.getPrimaryOperationTracker().getRejectionTracker() | ||
.incrementThroughputDegradationLimitsBreachedRejections(); | ||
totalThroughputDegradationLimitsBreachedRejections.incrementAndGet(); | ||
return true; | ||
} | ||
|
||
boolean isShardLimitsIncreased = this.increaseShardPrimaryAndCoordinatingLimits(tracker); | ||
if(isShardLimitsIncreased == false) { | ||
tracker.getPrimaryOperationTracker().getRejectionTracker().incrementNodeLimitsBreachedRejections(); | ||
totalNodeLimitsBreachedRejections.incrementAndGet(); | ||
} | ||
|
||
return !isShardLimitsIncreased; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant logic across primary/replica across multiple methods. This would be a serious maintainability overhead. Lets break this down as
public void onShardMemoryLimitIncreased(OperationTracker opTracker, Predicate<OperationTracker> isShardLimitsIncreasedPredicate, double degradationLimits, double shardLimits) {
blah....
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, this was planned and was added as a ToDo in the Draft PR description. Has now been taken care of in the final PR.
Signed-off-by: Saurabh Singh <sisurab@amazon.com>
✅ Gradle Wrapper Validation success a927825 |
✅ DCO Check Passed a927825 |
✅ Gradle Precommit success a927825 |
tracker.getCoordinatingOperationTracker().getRejectionTracker().incrementNodeLimitsBreachedRejections(); | ||
this.totalNodeLimitsBreachedRejections.incrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets club these two across methods together so that the other doesn't get missed out whenever we update one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice idea, thanks.
this.shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits()); | ||
tracker.getCoordinatingOperationTracker().getRejectionTracker().incrementNodeLimitsBreachedRejections(); | ||
this.totalNodeLimitsBreachedRejections.incrementAndGet(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra line break
public long updateLastSuccessfulRequestTimestamp(long timeStamp) { | ||
return lastSuccessfulRequestTimestamp.getAndSet(timeStamp); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't have to AtomicLong, just volatile should work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
private boolean evaluateThroughputDegradationLimitsBreached(PerformanceTracker performanceTracker, StatsTracker statsTracker, | ||
double degradationLimits) { | ||
double throughputMovingAverage = Double.longBitsToDouble(performanceTracker.getThroughputMovingAverage()); | ||
long throughputMovingQueueSize = performanceTracker.getThroughputMovingQueueSize(); | ||
double throughputHistoricalAverage = (double)statsTracker.getTotalBytes() / performanceTracker.getLatencyInMillis(); | ||
return throughputMovingAverage > 0 && throughputMovingQueueSize >= this.shardIndexingPressureSettings.getRequestSizeWindow() && | ||
throughputHistoricalAverage / throughputMovingAverage > degradationLimits; | ||
} | ||
|
||
/** | ||
* This evaluation returns true if the difference in the current timestamp and last successful request timestamp is greater than | ||
* the successful request elapsed-timeout threshold, and the total number of outstanding requests is greater than | ||
* the maximum outstanding request-count threshold. | ||
*/ | ||
private boolean evaluateLastSuccessfulRequestDurationLimitsBreached(PerformanceTracker performanceTracker, long requestStartTime) { | ||
return (performanceTracker.getLastSuccessfulRequestTimestamp() > 0) && | ||
(requestStartTime - performanceTracker.getLastSuccessfulRequestTimestamp()) > this.successfulRequestElapsedTimeout && | ||
performanceTracker.getTotalOutstandingRequests() > this.maxOutstandingRequests; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets make them package private to test the logic out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Effects of these methods are already tested under tests for the public methods of the class, such as tests with suffixes:
SoftLimitBreachedAndLastSuccessfulRequestLimitRejections
SoftLimitBreachedAndLessOutstandingRequestsAndNoLastSuccessfulRequestLimitRejections
SoftLimitBreachedAndThroughputDegradationLimitRejections
SoftLimitBreachedAndMovingAverageQueueNotBuildUpAndNoThroughputDegradationLimitRejections
SoftLimitBreachedAndNoSecondaryParameterBreachedAndNodeLevelRejections
} | ||
} | ||
|
||
private boolean increaseShardLimits(ShardId shardId, long nodeLimit, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets add tests for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the paths for this are already covered in the tests for the public contracts of the class such as isCoordinatingShardLimitBreached
, for ex tests with suffixes:
ShardLimitsNotBreached
ShardLimitsIncreasedAndSoftLimitNotBreached
SoftLimitNotBreachedAndNodeLevelRejections
SoftLimitBreachedAndNodeLevelRejections
SoftLimitBreachedAndNoSecondaryParameterBreachedAndNodeLevelRejections
tracker2.compareAndSetPrimaryAndCoordinatingLimits(tracker2.getPrimaryAndCoordinatingLimits(), 6 * 1024); | ||
long limit1 = tracker1.getPrimaryAndCoordinatingLimits(); | ||
long limit2 = tracker2.getPrimaryAndCoordinatingLimits(); | ||
long requestStartTime = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use System.nanoTime
for logical times
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I seeSystem.currentTimeMillis
frequently used across tests. Given the benefits, other than precision, any specific reason for recommendation?
if(((double)shardCurrentBytes / currentShardLimit) > this.upperOperatingFactor) { | ||
newShardLimit = (long)(shardCurrentBytes / this.optimalOperatingFactor); | ||
long totalShardLimitsExceptCurrentShard = this.shardIndexingPressureStore.getShardIndexingPressureHotStore() | ||
.entrySet().stream() | ||
.filter(entry -> (shardId != entry.getKey())) | ||
.map(Map.Entry::getValue) | ||
.mapToLong(getShardLimitFunction).sum(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might turn out costly based on the high concurrency of incoming traffic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are pure computations, without any allocation or synchronisation overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I meant computation overhead, even that has a cost and time complexity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we have done Rally benchmarking for os indexing path with backpressure feature enabled. I will be sharing the results once we have the framework level constructs rolled out as well.
Signed-off-by: Saurabh Singh <sisurab@amazon.com>
✅ DCO Check Passed 1546338 |
✅ Gradle Wrapper Validation success 1546338 |
✅ Gradle Precommit success 1546338 |
* | ||
*/ | ||
public class ShardIndexingPressureMemoryManager { | ||
private final Logger logger = LogManager.getLogger(getClass()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: static
public static final Setting<Integer> SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT = | ||
Setting.intSetting("shard_indexing_pressure.secondary_parameter.successful_request.elapsed_timeout", 300000, | ||
Setting.Property.NodeScope, Setting.Property.Dynamic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Setting<TimeValue>
instead, the current timeout doesn't capture units
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its better if you name secondary_parameter
else it gets hard for users to understand and tune
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
public static final Setting<Double> THROUGHPUT_DEGRADATION_LIMITS = | ||
Setting.doubleSetting("shard_indexing_pressure.secondary_parameter.throughput.degradation_factor", 5.0d, 1.0d, | ||
Setting.Property.NodeScope, Setting.Property.Dynamic); | ||
|
||
/** | ||
* The node level soft limit determines when the secondary parameters for shard is to be evaluated for degradation. | ||
*/ | ||
public static final Setting<Double> NODE_SOFT_LIMIT = | ||
Setting.doubleSetting("shard_indexing_pressure.primary_parameter.node.soft_limit", 0.7d, 0.0d, | ||
Setting.Property.NodeScope, Setting.Property.Dynamic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both these setting names are non-intuitive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have tried grouping the settings in two buckets as primary_parameter
and secondary_parameter
for ease of classification and tuning. Let me know if you have any specific suggestions. Moreover, we will have documentation with description and role for each settings, covering more details, as part of the rollout. This should bring in clarity around usage with examples.
if(((double)shardCurrentBytes / currentShardLimit) > this.upperOperatingFactor) { | ||
newShardLimit = (long)(shardCurrentBytes / this.optimalOperatingFactor); | ||
long totalShardLimitsExceptCurrentShard = this.shardIndexingPressureStore.getShardIndexingPressureHotStore() | ||
.entrySet().stream() | ||
.filter(entry -> (shardId != entry.getKey())) | ||
.map(Map.Entry::getValue) | ||
.mapToLong(getShardLimitFunction).sum(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I meant computation overhead, even that has a cost and time complexity
Signed-off-by: Saurabh Singh <sisurab@amazon.com>
✅ Gradle Wrapper Validation success 5276ee8 |
✅ DCO Check Passed 5276ee8 |
✅ Gradle Precommit success 5276ee8 |
Thanks Just check on the performance of the while loop for high concurrent requests during load tests |
It introduces a Memory Manager for Shard Indexing Pressure. It is responsible for increasing and decreasing the allocated shard limit based on incoming requests, and validate the current values against the thresholds. Signed-off-by: Saurabh Singh <sisurab@amazon.com>
It introduces a Memory Manager for Shard Indexing Pressure. It is responsible for increasing and decreasing the allocated shard limit based on incoming requests, and validate the current values against the thresholds. Signed-off-by: Saurabh Singh <sisurab@amazon.com>
…pensearch-project#945) It introduces a Memory Manager for Shard Indexing Pressure. It is responsible for increasing and decreasing the allocated shard limit based on incoming requests, and validate the current values against the thresholds. Signed-off-by: Saurabh Singh <sisurab@amazon.com>
Shard level indexing pressure improves the current Indexing Pressure framework which performs memory accounting at node level and rejects the requests. This takes a step further to have rejections based on the memory accounting at shard level along with other key performance factors like throughput and last successful requests. **Key features** - Granular tracking of indexing tasks performance, at every shard level, for each node role i.e. coordinator, primary and replica. - Smarter rejections by discarding the requests intended only for problematic index or shard, while still allowing others to continue (fairness in rejection). - Rejections thresholds governed by combination of configurable parameters (such as memory limits on node) and dynamic parameters (such as latency increase, throughput degradation). - Node level and shard level indexing pressure statistics exposed through stats api. - Integration of Indexing pressure stats with Plugins for for metric visibility and auto-tuning in future. - Control knobs to tune to the key performance thresholds which control rejections, to address any specific requirement or issues. - Control knobs to run the feature in shadow-mode or enforced-mode. In shadow-mode only internal rejection breakdown metrics will be published while no actual rejections will be performed. The changes were divided into small manageable chunks as part of the following PRs against a feature branch. - Add Shard Indexing Pressure Settings. #716 - Add Shard Indexing Pressure Tracker. #717 - Refactor IndexingPressure to allow extension. #718 - Add Shard Indexing Pressure Store #838 - Add Shard Indexing Pressure Memory Manager #945 - Add ShardIndexingPressure framework level construct and Stats #1015 - Add Indexing Pressure Service which acts as orchestrator for IP #1084 - Add plumbing logic for IndexingPressureService in Transport Actions. #1113 - Add shard indexing pressure metric/stats via rest end point. #1171 - Add shard indexing pressure integration tests. #1198 Signed-off-by: Saurabh Singh <sisurab@amazon.com> Co-authored-by: Saurabh Singh <sisurab@amazon.com> Co-authored-by: Rabi Panda <adnapibar@gmail.com>
Shard level indexing pressure improves the current Indexing Pressure framework which performs memory accounting at node level and rejects the requests. This takes a step further to have rejections based on the memory accounting at shard level along with other key performance factors like throughput and last successful requests. **Key features** - Granular tracking of indexing tasks performance, at every shard level, for each node role i.e. coordinator, primary and replica. - Smarter rejections by discarding the requests intended only for problematic index or shard, while still allowing others to continue (fairness in rejection). - Rejections thresholds governed by combination of configurable parameters (such as memory limits on node) and dynamic parameters (such as latency increase, throughput degradation). - Node level and shard level indexing pressure statistics exposed through stats api. - Integration of Indexing pressure stats with Plugins for for metric visibility and auto-tuning in future. - Control knobs to tune to the key performance thresholds which control rejections, to address any specific requirement or issues. - Control knobs to run the feature in shadow-mode or enforced-mode. In shadow-mode only internal rejection breakdown metrics will be published while no actual rejections will be performed. The changes were divided into small manageable chunks as part of the following PRs against a feature branch. - Add Shard Indexing Pressure Settings. opensearch-project#716 - Add Shard Indexing Pressure Tracker. opensearch-project#717 - Refactor IndexingPressure to allow extension. opensearch-project#718 - Add Shard Indexing Pressure Store opensearch-project#838 - Add Shard Indexing Pressure Memory Manager opensearch-project#945 - Add ShardIndexingPressure framework level construct and Stats opensearch-project#1015 - Add Indexing Pressure Service which acts as orchestrator for IP opensearch-project#1084 - Add plumbing logic for IndexingPressureService in Transport Actions. opensearch-project#1113 - Add shard indexing pressure metric/stats via rest end point. opensearch-project#1171 - Add shard indexing pressure integration tests. opensearch-project#1198 Signed-off-by: Saurabh Singh <sisurab@amazon.com> Co-authored-by: Saurabh Singh <sisurab@amazon.com> Co-authored-by: Rabi Panda <adnapibar@gmail.com>
Shard level indexing pressure improves the current Indexing Pressure framework which performs memory accounting at node level and rejects the requests. This takes a step further to have rejections based on the memory accounting at shard level along with other key performance factors like throughput and last successful requests. **Key features** - Granular tracking of indexing tasks performance, at every shard level, for each node role i.e. coordinator, primary and replica. - Smarter rejections by discarding the requests intended only for problematic index or shard, while still allowing others to continue (fairness in rejection). - Rejections thresholds governed by combination of configurable parameters (such as memory limits on node) and dynamic parameters (such as latency increase, throughput degradation). - Node level and shard level indexing pressure statistics exposed through stats api. - Integration of Indexing pressure stats with Plugins for for metric visibility and auto-tuning in future. - Control knobs to tune to the key performance thresholds which control rejections, to address any specific requirement or issues. - Control knobs to run the feature in shadow-mode or enforced-mode. In shadow-mode only internal rejection breakdown metrics will be published while no actual rejections will be performed. The changes were divided into small manageable chunks as part of the following PRs against a feature branch. - Add Shard Indexing Pressure Settings. #716 - Add Shard Indexing Pressure Tracker. #717 - Refactor IndexingPressure to allow extension. #718 - Add Shard Indexing Pressure Store #838 - Add Shard Indexing Pressure Memory Manager #945 - Add ShardIndexingPressure framework level construct and Stats #1015 - Add Indexing Pressure Service which acts as orchestrator for IP #1084 - Add plumbing logic for IndexingPressureService in Transport Actions. #1113 - Add shard indexing pressure metric/stats via rest end point. #1171 - Add shard indexing pressure integration tests. #1198 Signed-off-by: Saurabh Singh <sisurab@amazon.com> Co-authored-by: Saurabh Singh <sisurab@amazon.com> Co-authored-by: Rabi Panda <adnapibar@gmail.com>
…arch-project#945) * Bump com.github.jk1.dependency-license-report from 2.6 to 2.7 Bumps com.github.jk1.dependency-license-report from 2.6 to 2.7. --- updated-dependencies: - dependency-name: com.github.jk1.dependency-license-report dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * Update changelog Signed-off-by: dependabot[bot] <support@github.com> --------- Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] <dependabot[bot]@users.noreply.github.com>
Signed-off-by: Saurabh Singh sisurab@amazon.com
Description
This PR is 5th of the multiple planned PRs planned for Shard Indexing Pressure (#478). It introduces a Memory Manager for Shard Indexing Pressure. It is responsible for increasing and decreasing the allocated shard limit based on incoming requests, and validate the current values against the thresholds.
Issues Resolved
Addresses Item 5 of #478
ToDo before we move from draft to complete
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.