-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add framework level constructs to track shard indexing pressure. #496
Add framework level constructs to track shard indexing pressure. #496
Conversation
✅ Gradle Wrapper Validation success 225fb0fc3a4b4c2e22599811861434e5bb519429 |
❌ DCO Check Failed 225fb0fc3a4b4c2e22599811861434e5bb519429 |
✅ Gradle Precommit success 225fb0fc3a4b4c2e22599811861434e5bb519429 |
Signed-off-by: Saurabh Singh <sisurab@amazon.com>
225fb0f
to
7a80862
Compare
✅ Gradle Wrapper Validation success 7a80862 |
✅ DCO Check Passed 7a80862 |
✅ Gradle Precommit success 7a80862 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes. Few comments some good to have.
It might be good to see some unit tests around ShardIndexingPressureMemoryManager
while I see we have ITs around this
@@ -190,6 +193,22 @@ public MasterService getMasterService() { | |||
return masterService; | |||
} | |||
|
|||
/** | |||
* Getter and Setter for Indexing Pressure, This method is added specifically for getting IndexingPressure | |||
* instance in ODFE PA plugin via ClusterService. Indexing Pressure instances can be accessible only via |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to reword this as This method exposes IndexingPressure stats to other plugins
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
long shardCombinedBytes = tracker.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes); | ||
|
||
boolean shardLevelLimitBreached = false; | ||
if (!forceExecution) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following the convention lets use forceExecution == false
and elsewhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will update across.
* 4. Calling methods of {@link ShardIndexingPressureMemoryManager} to evaluate if a request can be process successfully | ||
* and can increase the memory limits for a shard under certain scenarios | ||
*/ | ||
public class ShardIndexingPressure { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Serves as a good candidate for extending IndexingPressure
. Maybe good to extend
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We intend to keep ShardIndexingPressure
as a sub feature and is contained with IndexingPressure
. Similarly other sub-features or related improvements can be added to IndexingPressure
, while keeping IndexingPressure
as a top level object for accessibility in plugins/actions. Hence will like to keep it same and can revisit this refactoring later if really needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have introduced IndexingPressureService
which acts as an orchestrator for method invocation of ShardIndexingPressure
vs IndexingPressure
(base), based on the dynamic setting value. Going forward IndexingPressureService
can be the required construct for exposing lister/interfaces or building new constructs in parallel to current one for back-pressure.
public static final Setting<Boolean> SHARD_INDEXING_PRESSURE_ENABLED = | ||
Setting.boolSetting("aes.shard_indexing_pressure.enabled", false, Setting.Property.Dynamic, Setting.Property.NodeScope); | ||
|
||
/** | ||
* Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set to true, shard level | ||
* rejection will be performed, otherwise only rejection metrics will be populated. | ||
*/ | ||
public static final Setting<Boolean> SHARD_INDEXING_PRESSURE_ENFORCED = | ||
Setting.boolSetting("aes.shard_indexing_pressure.enforced", false, Setting.Property.Dynamic, Setting.Property.NodeScope); | ||
|
||
// This represents the last N request samples that will be considered for secondary parameter evaluation. | ||
public static final Setting<Integer> REQUEST_SIZE_WINDOW = | ||
Setting.intSetting("aes.shard_indexing_pressure.secondary_parameter.throughput.request_size_window", 2000, | ||
Setting.Property.NodeScope, Setting.Property.Dynamic); | ||
|
||
//Each shard will be initially given 1/1000th bytes of node limits. | ||
public static final Setting<Double> SHARD_MIN_LIMIT = | ||
Setting.doubleSetting("aes.shard_indexing_pressure.primary_parameter.shard.min_limit", 0.001d, 0.0d, | ||
Setting.Property.NodeScope, Setting.Property.Dynamic); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets check if aes
is still complaint to be used. Good to have enabled/enforced as discrete values supported against the same settings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will confirm on the aes
compliance. It provides control access from managed service perspective.
We have kept enabled & enforced
as boolean settings for intuitiveness and simplicity. We also envision that in ideal state enforced
as a setting can be independently deprecated faster, once we achieve a self sustainable model, hence easier cleanup/maintenance.
Let me know if you think otherwise.
* | ||
* For more details on 6,7,8,9,10,11 see {@link ShardIndexingPressureMemoryManager} | ||
*/ | ||
public class ShardIndexingPressureTracker { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to break down the tracker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ShardIndexingPressureTracker
is currently a simple object which acts as a container for about 12 fields related to shard level information. These are directly accessed/updated and breaking them further would add more indirections. Since the fields are not complex (only long values), we have clubbed them together with comments for added maintainability.
I believe we will learn from the first release, to add or remove more fields, and then based on the need we can break it down further.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code blocks on related properties itself seem to suggest it can be broken down
In the spirit of encapsulation, lets also make fields private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Have introduced multiple inner classes by logically clubbing fields to add maintainability inside ShardIndexingPressureTracker
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's open a follow up on this. I propose something like
private RoleTracker primaryRoleTracker;
private RoleTracker coordinatingRoleTracker;
private RoleTracker replicaRoleTracker;
private CommonTracker commonTracker;
class RoleTracker {
private final StatsTracker statsTracker = new StatsTracker();
private final PerformanceTracker performanceTracker = new PerformanceTracker();
}
class PerformanceTracker {
private final ConcurrentLinkedQueue<Double> throughputMovingQueue = new ConcurrentLinkedQueue();
private final AtomicLong throughputMovingAverage = new AtomicLong();
private final AtomicLong outstandingRequests = new AtomicLong();
private final AtomicLong lastSuccessfulRequestTimestamp = new AtomicLong();
private final AtomicLong lastSuccessfulRequestTimestampNew = new AtomicLong();
}
class StatsTracker {
private final AtomicLong timeInMillis = new AtomicLong();
private final AtomicLong count = new AtomicLong();
private final AtomicLong rejection = new AtomicLong();
}
class CommonTracker {
//common tracking properties go here
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++ Have added a list item in #478 to revisit the modelling.
* current shard utilization. | ||
* | ||
*/ | ||
public class ShardIndexingPressureMemoryManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe changing limits on-demand when we are on the verge of soft limits breaching, would help simplify computations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree there is some scope of tuning and multiple ways with respect to how memory management can be done. Its hard to arrive at a perfect solution in the first go. I will like to keep this approach for the first release as planned, and then based on the learnings from the shadow/enforced mode metrics, we can improve the approaches around. We have performed performance and load testing around the changes using rally workload and have found no regressions.
…of comments. Signed-off-by: Saurabh Singh <sisurab@amazon.com>
✅ DCO Check Passed afabf25 |
✅ Gradle Wrapper Validation success afabf25 |
✅ Gradle Precommit success afabf25 |
Signed-off-by: Dharmesh 💤 <sdharms@amazon.com>
✅ Gradle Wrapper Validation success 71c4018 |
✅ DCO Check Passed 71c4018 |
✅ Gradle Precommit success 71c4018 |
private final Logger logger = LogManager.getLogger(getClass()); | ||
|
||
private final IndexingPressure indexingPressure; | ||
private static ClusterService clusterService; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why this is static
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used to access & verify the node level attribute for shard-indexing-pressure for ser/de from Node Stats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm yet to review tests. I'll share more comments as I look through deeper
* | ||
* For more details on 6,7,8,9,10,11 see {@link ShardIndexingPressureMemoryManager} | ||
*/ | ||
public class ShardIndexingPressureTracker { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code blocks on related properties itself seem to suggest it can be broken down
In the spirit of encapsulation, lets also make fields private
ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED, | ||
ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED, | ||
ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW, | ||
ShardIndexingPressureSettings.SHARD_MIN_LIMIT, | ||
ShardIndexingPressureStore.MAX_CACHE_STORE_SIZE, | ||
ShardIndexingPressureMemoryManager.LOWER_OPERATING_FACTOR, | ||
ShardIndexingPressureMemoryManager.OPTIMAL_OPERATING_FACTOR, | ||
ShardIndexingPressureMemoryManager.UPPER_OPERATING_FACTOR, | ||
ShardIndexingPressureMemoryManager.NODE_SOFT_LIMIT, | ||
ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS, | ||
ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT, | ||
ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These might need doc update whenever we have one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
@@ -732,6 +737,14 @@ protected Node(final Environment initialEnvironment, | |||
} | |||
} | |||
|
|||
private static Settings addShardIndexingBackPressureAttributeSettings(Settings settings) { | |||
// Shard Indexing BackPressure is enabled from AES-7.9 onwards. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets scan through all AES specific interactions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this. Have verified and removed
@@ -586,7 +590,8 @@ protected Node(final Environment initialEnvironment, | |||
final SearchTransportService searchTransportService = new SearchTransportService(transportService, | |||
SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); | |||
final HttpServerTransport httpServerTransport = newHttpTransport(networkModule); | |||
final IndexingPressure indexingLimits = new IndexingPressure(settings); | |||
final IndexingPressure indexingLimits = new IndexingPressure(settings, clusterService); | |||
clusterService.setIndexingPressure(indexingLimits); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets think for a more generic way to expose metrics preferably through a ShardIndexingPressureListener
that exposes methods to be extended by the plugins.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we want plugins to be able to do an on-demand fetch and get the current view of the system. This allows a quick way for plugin for now. Event listener for shard tracker might get too overwhelmed by huge amount of concurrent updates during indexing workload. Will need to think through on how we can do this in a generic and optimised way. Added item to the the meta issue #478 list for a followup.
String ShardIndexingBackPressureEnabledValue = "true"; | ||
return Settings.builder().put(settings) | ||
.put(NODE_ATTRIBUTES.getKey() + SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY, ShardIndexingBackPressureEnabledValue) | ||
.build(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if this can go in the yaml instead of code here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keeping this in code prevent unintentional updates or issues due to yml copy etc, as this can lead to severe serialisation issue otherwise.
.getShardPrimaryAndCoordinatingBaseLimits()); | ||
newShardIndexingPressureTracker.replicaLimits.set(this.shardIndexingPressureSettings.getShardReplicaBaseLimits()); | ||
// Try update the new shard stat to the hot store | ||
tracker = shardIndexingPressureHotStore.putIfAbsent((long) shardId.hashCode(), newShardIndexingPressureTracker); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why we couldn't have Map<ShardId, ShardIndexingPressureTracker>
Should primary and replica shards be treated independently
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ShardId
provides hashcode directly which saves on equals()
comparison . Yes primary and replica trackers are treated independently.
public static final Setting<Integer> MAX_CACHE_STORE_SIZE = | ||
Setting.intSetting("aes.shard_indexing_pressure.cache_store.max_size", 200, Setting.Property.NodeScope, Setting.Property.Dynamic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we limit the cache store with a max limit to prevent misuse. Watch out on the settings name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good points. I have updated to keep max as 1000.
ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); | ||
private final ShardIndexingPressureSettings shardIndexingPressureSettings; | ||
|
||
private int maxColdStoreSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
volatile if you want it dynamic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was wondering if we should optimise for reads since concurrent updates will be rare for this. Have updated this as volatile now.
private ShardIndexingPressureTracker getIndexingPressureTrackerFromColdStore(ShardId shardId) { | ||
return shardIndexingPressureColdStore.get((long)shardId.hashCode()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets inline this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
server/src/main/java/org/opensearch/index/ShardIndexingPressureStore.java
Outdated
Show resolved
Hide resolved
return false; | ||
} | ||
|
||
boolean isReplicaShardLimitBreached(ShardIndexingPressureTracker tracker, long requestStartTime, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of scope for breaking this method down and abstracting common logic out across
- isCoordinatingShardLimitBreached
- isReplicaShardLimitBreached
- isPrimaryShardLimitBreached
Please reconsider as bugs are very likely to creep in due to manual maintenance overhead keeping logic in sync
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you look on the logic part, the common logic such as increaseShardReplicaLimits
, evaluateLastSuccessfulRequestDurationLimitsBreached
, evaluateThroughputDegradationLimitsBreached
are already abstracted and reused across. Other part of the code vary largely on the tracker fields they operate and update. Breaking it further will hamper readability as well. With abstraction of Tracker
now it should be easier to maintain the logic.
if(randomBoolean) { | ||
assertFalse(memoryManager.isCoordinatingShardLimitBreached(tracker, requestStartTime, hotStore, 1 * 1024)); | ||
} else { | ||
assertFalse(memoryManager.isPrimaryShardLimitBreached(tracker, requestStartTime, hotStore, 1 * 1024)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't we assert both. Lets remove randomBoolean since this is not helping with randomized tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
if(randomBoolean) { | ||
assertTrue(memoryManager.isCoordinatingShardLimitBreached(tracker1, requestStartTime, hotStore, 6 * 1024)); | ||
assertEquals(1, tracker1.coordinatingNodeLimitsBreachedRejections.get()); | ||
assertEquals(0, tracker2.coordinatingNodeLimitsBreachedRejections.get()); | ||
} else { | ||
assertTrue(memoryManager.isPrimaryShardLimitBreached(tracker1, requestStartTime, hotStore, 6 * 1024)); | ||
assertEquals(1, tracker1.primaryNodeLimitsBreachedRejections.get()); | ||
assertEquals(0, tracker2.primaryNodeLimitsBreachedRejections.get()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets remove randomization and assert all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
✅ DCO Check Passed 68775510a541ccd2738936be9722de41b5bbdf7c |
✅ Gradle Wrapper Validation success 68775510a541ccd2738936be9722de41b5bbdf7c |
✅ Gradle Wrapper Validation success 907e05235eb0e4b209fdaac128ac1be49cfa8fb5 |
✅ DCO Check Passed 907e05235eb0e4b209fdaac128ac1be49cfa8fb5 |
✅ Gradle Precommit success 907e05235eb0e4b209fdaac128ac1be49cfa8fb5 |
Signed-off-by: Saurabh Singh <sisurab@amazon.com>
907e052
to
536e5a5
Compare
✅ Gradle Wrapper Validation success 536e5a5 |
✅ DCO Check Passed 536e5a5 |
✅ Gradle Precommit success 536e5a5 |
…objective better. Signed-off-by: Saurabh Singh <sisurab@amazon.com>
b42d491
to
0de959a
Compare
❌ Gradle Precommit failure b42d491982552fa49117ce19d24e775030513acd |
❌ Gradle Wrapper Validation failure b42d491982552fa49117ce19d24e775030513acd :alert: Gradle Wrapper integrity has been altered |
✅ DCO Check Passed 0de959a |
✅ Gradle Wrapper Validation success 0de959a |
✅ Gradle Precommit success 0de959a |
✅ DCO Check Passed 13abb7ab3cc59b37155c18a340ffd1d44c5d5701 |
❌ Gradle Precommit failure 13abb7ab3cc59b37155c18a340ffd1d44c5d5701 |
❌ Gradle Wrapper Validation failure 13abb7ab3cc59b37155c18a340ffd1d44c5d5701 :alert: Gradle Wrapper integrity has been altered |
260c59e
to
13abb7a
Compare
32f0926
to
0de959a
Compare
✅ DCO Check Passed 0de959a |
32f0926
to
0de959a
Compare
✅ Gradle Precommit success 0de959a |
396f8b9
to
0de959a
Compare
Signed-off-by: Dharmesh 💤 <sdharms@amazon.com>
✅ DCO Check Passed 14426c2 |
✅ Gradle Wrapper Validation success 14426c2 |
✅ Gradle Precommit success 14426c2 |
Closing this in favour of the new PR #525 as the fork repository changed post public launched. |
Description
This PR is 1st of the 4 planned PRs for Shard Indexing Pressure ((#478). It provides the framework level constructs to track shard indexing pressure.
Shard Indexing Pressure introduces smart rejections of indexing requests when there are too many stuck/slow requests in the cluster, breaching key performance thresholds. This prevents the nodes in cluster to run into cascading effects of failures.
Co-authored-by: Dharmesh Singh sdharms@amazon.com
Issues Resolved
Addresses Item 1 of #478
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.