-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Shard Indexing Pressure Store (#478) #838
Add Shard Indexing Pressure Store (#478) #838
Conversation
✅ Gradle Wrapper Validation success 5e0cb9fa02d46309ac3ef5b18da4f837d08292a0 |
✅ DCO Check Passed 5e0cb9fa02d46309ac3ef5b18da4f837d08292a0 |
✅ Gradle Precommit success 5e0cb9fa02d46309ac3ef5b18da4f837d08292a0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, please try to address the concurrency concerns and add multi-threaded tests against those.
tracker = shardIndexingPressureHotStore.putIfAbsent(shardId, newShardIndexingPressureTracker); | ||
// Update the tracker so that we use the one actual in the hot store | ||
tracker = tracker == null ? newShardIndexingPressureTracker : tracker; | ||
// Write through into the cold store for future reference | ||
updateShardIndexingPressureColdStore(tracker); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These operations are not atomically executed
tracker = shardIndexingPressureHotStore.putIfAbsent(shardId, newShardIndexingPressureTracker); | |
// Update the tracker so that we use the one actual in the hot store | |
tracker = tracker == null ? newShardIndexingPressureTracker : tracker; | |
// Write through into the cold store for future reference | |
updateShardIndexingPressureColdStore(tracker); | |
tracker = shardIndexingPressureHotStore.computeIfAbsent(shardId, (k) -> { | |
ShardIndexingPressureTracker newTracker = new ShardIndexingPressureTracker(shardId, this.shardIndexingPressureSettings.getShardPrimaryAndCoordinatingBaseLimits(), this.shardIndexingPressureSettings.getShardReplicaBaseLimits()); | |
updateIndexingPressureColdStore(newTracker); | |
return newTracker; | |
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
computeIfAbsent
of ConcurrentHashMap
is a complex and relatively costlier operation (synchronized). It is not kept atomic, as a well thought approach here based on the use case. We need to keep the Get
operations highly concurrent, and frequent operation to address sudden spike in traffic for a new shard.
Let me know of you see any concerns with the current approach if guarantees are not being met. Will be happy to accommodate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://vmlens.com/articles/scale/scalability_hash_map/ Gets are just volatile reads if that helps. See the benchmarks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What needs a thought is that you are using a ConcurrentHashMap
and you seem to get away without synchronization
. You should also look into the implementation of putIfAbsent
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree putIfAbsent
on a map is unavoidable here, while the need to maintain atomicity across the two hash maps operations is avoidable.
// Try inserting into cold store again in case there was an eviction triggered | ||
shardIndexingPressureColdStore.putIfAbsent(tracker.getShardId(), tracker); | ||
// Remove from the hot store | ||
shardIndexingPressureHotStore.remove(tracker.getShardId(), tracker); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This operation is again not atomic, consider computeIfAbsent
if this is needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, tracker objects will be evaluated for release post every operation, and we dont want to unnecessarily use synchronization constructs. Not keeping it atomic works well here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depends on how we consume this but this opens room for two distinct shard tracker objects to tracked concurrently
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be used by memory manager piece as a next PR. Shard tracker as long as live will always be returned from hot store during get operations.
return Collections.unmodifiableMap(shardIndexingPressureColdStore); | ||
} | ||
|
||
public void tryTrackerCleanupFromHotStore(ShardIndexingPressureTracker tracker, Supplier<Boolean> condition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use BooleanSupplier
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
private void updateShardIndexingPressureColdStore(ShardIndexingPressureTracker tracker) { | ||
if (shardIndexingPressureColdStore.size() > maxColdStoreSize) { | ||
shardIndexingPressureColdStore.clear(); | ||
} | ||
shardIndexingPressureColdStore.put(tracker.getShardId(), tracker); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- There seems to be a concurrency bug here, you'll end up clearing the store multiple times and losing the previous entry after clear. Hence please follow the suggestion as made above
- Also the very idea of clearing all when full isn't ideal since you transition from hot to cold, the transition has rough edges
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
It is not a bug but an intended approach to intentionally clear up trackers, once they are no more in use, and allows us to clear up the memory. In most indexing scenarios, indexes are expected to rotate, and hence the new trackers will be created per shard to track the new operations. Tracker if not present in a
hot store
, will be eventually removed from the cold store once the purge happens. Trackers already in hot store will be updated back to cold upon request completion.
So Cold essentially acts a transient store between successive operations for the same shard which are at some gap. At any point customers would use the hot trackers to track the current cluster performance, stuck tasks, host shards etc. Size of cold store is of significance to avoid unnecessary churns, and is provided as a dynamic setting for tuning. -
It is important to size the
shard_indexing_pressure.cache_store.max_size
setting correctly, to ensure unnecessary purge doesn't happen for the number of indexes which are being concurrently updated. LRU could be an alternative to full flush, but is more expensive. So choosing simplicity currently as it serves the purpose well. Have already added a note to revisit this later, evaluate, and change to LRU if need be, based on feedback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think you got my point here, Lets discuss more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your concerns around race between the threads trying to clear up the state simultaneously. However I see that as a tradeoff, to keep the approach less synchornized and simplistic. Flush is a less frequent operation, can be sized based on workload, and it is okay to not have strong consistency on counters (under such rare races), as long as we are able to address majority of use cases at ease.
Yes, let's take walk a few scenarios and discuss this more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is confusing is you use cold store as a collection with aggressive concurrency when you don't need this based on what you have mentioned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is required for a scenario when large number of gets are requested concurrently for a tracker which either (1) do not exist and needs to be created and updated (2) exists in cold store and needs to be added to hot store. From functional use case its traffic spike for a relatively new shard or a completely new shard (index rollover).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a clarifying comment that such discrepancies are fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
5e0cb9f
to
986cf2c
Compare
✅ Gradle Wrapper Validation success 986cf2ccd2d4ca9b27c910f25310d2b7f8285c97 |
✅ DCO Check Passed 986cf2ccd2d4ca9b27c910f25310d2b7f8285c97 |
Thanks @Bukhtawar for raising the concerns around the need to have strong guarantees from the framework perspective when we do the full flush in cold store. This is inline when multiple threads try to clear the store, few additional trackers will be cleared off. As discussed, given this is a tradeoffs we can take between (a) re-initialising few more trackers v/s (b) synchronising the purge operation path itself, while both are a rare scenarios in consideration. Given the cold store currently is just a means to re-use the empty tracker objects, to avoid re-initialisation under continuous workload, additional purges are not harmful or break the functional consistency either. They will happen only once for a tracker object, even if it was purged, without any loss of state. Since we do not need this guarantee upfront today, skipping this addition for now. There exists a note in java-doc to evaluate any need to move to a pure LRU based model, if really identified a need in future, based on evolving use-cases or community feedback. |
✅ Gradle Precommit success 986cf2ccd2d4ca9b27c910f25310d2b7f8285c97 |
Signed-off-by: Saurabh Singh <sisurab@amazon.com>
986cf2c
to
adca87b
Compare
✅ Gradle Wrapper Validation success adca87b |
✅ DCO Check Passed adca87b |
✅ Gradle Precommit success adca87b |
Lets avoid force push, it drops the commit history |
ShardIndexingPressureTracker newShardIndexingPressureTracker = new ShardIndexingPressureTracker(shardId, | ||
this.shardIndexingPressureSettings.getShardPrimaryAndCoordinatingBaseLimits(), | ||
this.shardIndexingPressureSettings.getShardReplicaBaseLimits()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we would be doing unnecessary allocation if the tracker already exists. Suggest
ShardIndexingPressureTracker newShardIndexingPressureTracker = new ShardIndexingPressureTracker(shardId, | |
this.shardIndexingPressureSettings.getShardPrimaryAndCoordinatingBaseLimits(), | |
this.shardIndexingPressureSettings.getShardReplicaBaseLimits()); | |
tracker = shardIndexingPressureHotStore.computeIfAbsent(shardId, (k) -> { | |
ShardIndexingPressureTracker newTracker = new ShardIndexingPressureTracker(shardId, this.shardIndexingPressureSettings.getShardPrimaryAndCoordinatingBaseLimits(), this.shardIndexingPressureSettings.getShardReplicaBaseLimits()); | |
return newTracker; | |
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Tradeoff is that the attempted update operations on this map by other threads will be blocked during compute. But since this computation should be short and simple, it makes sense.
} | ||
|
||
assertEquals(0, store.getShardIndexingPressureHotStore().size()); | ||
assertTrue(store.getShardIndexingPressureColdStore().size() <= maxColdStoreSize + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry but why do we need maxColdStoreSize + 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just for the boundary condition as cleanup happens when the next element is attempted insert post limit breach, since the condition is >
and not >=
in shardIndexingPressureColdStore.size() > maxColdStoreSize
Signed-off-by: Saurabh Singh <sisurab@amazon.com>
✅ Gradle Wrapper Validation success 43b3bbc |
✅ DCO Check Passed 43b3bbc |
✅ Gradle Precommit success 43b3bbc |
2c165f7
into
opensearch-project:feature/478_indexBackPressure
* Add Shard Indexing Pressure Store (#478) Signed-off-by: Saurabh Singh <sisurab@amazon.com> * Added comments and shard allocation based on compute in hot store. Signed-off-by: Saurabh Singh <sisurab@amazon.com> Co-authored-by: Saurabh Singh <sisurab@amazon.com>
* Add Shard Indexing Pressure Store (#478) Signed-off-by: Saurabh Singh <sisurab@amazon.com> * Added comments and shard allocation based on compute in hot store. Signed-off-by: Saurabh Singh <sisurab@amazon.com> Co-authored-by: Saurabh Singh <sisurab@amazon.com>
…h-project#838) * Add Shard Indexing Pressure Store (opensearch-project#478) Signed-off-by: Saurabh Singh <sisurab@amazon.com> * Added comments and shard allocation based on compute in hot store. Signed-off-by: Saurabh Singh <sisurab@amazon.com> Co-authored-by: Saurabh Singh <sisurab@amazon.com>
Shard level indexing pressure improves the current Indexing Pressure framework which performs memory accounting at node level and rejects the requests. This takes a step further to have rejections based on the memory accounting at shard level along with other key performance factors like throughput and last successful requests. **Key features** - Granular tracking of indexing tasks performance, at every shard level, for each node role i.e. coordinator, primary and replica. - Smarter rejections by discarding the requests intended only for problematic index or shard, while still allowing others to continue (fairness in rejection). - Rejections thresholds governed by combination of configurable parameters (such as memory limits on node) and dynamic parameters (such as latency increase, throughput degradation). - Node level and shard level indexing pressure statistics exposed through stats api. - Integration of Indexing pressure stats with Plugins for for metric visibility and auto-tuning in future. - Control knobs to tune to the key performance thresholds which control rejections, to address any specific requirement or issues. - Control knobs to run the feature in shadow-mode or enforced-mode. In shadow-mode only internal rejection breakdown metrics will be published while no actual rejections will be performed. The changes were divided into small manageable chunks as part of the following PRs against a feature branch. - Add Shard Indexing Pressure Settings. #716 - Add Shard Indexing Pressure Tracker. #717 - Refactor IndexingPressure to allow extension. #718 - Add Shard Indexing Pressure Store #838 - Add Shard Indexing Pressure Memory Manager #945 - Add ShardIndexingPressure framework level construct and Stats #1015 - Add Indexing Pressure Service which acts as orchestrator for IP #1084 - Add plumbing logic for IndexingPressureService in Transport Actions. #1113 - Add shard indexing pressure metric/stats via rest end point. #1171 - Add shard indexing pressure integration tests. #1198 Signed-off-by: Saurabh Singh <sisurab@amazon.com> Co-authored-by: Saurabh Singh <sisurab@amazon.com> Co-authored-by: Rabi Panda <adnapibar@gmail.com>
Shard level indexing pressure improves the current Indexing Pressure framework which performs memory accounting at node level and rejects the requests. This takes a step further to have rejections based on the memory accounting at shard level along with other key performance factors like throughput and last successful requests. **Key features** - Granular tracking of indexing tasks performance, at every shard level, for each node role i.e. coordinator, primary and replica. - Smarter rejections by discarding the requests intended only for problematic index or shard, while still allowing others to continue (fairness in rejection). - Rejections thresholds governed by combination of configurable parameters (such as memory limits on node) and dynamic parameters (such as latency increase, throughput degradation). - Node level and shard level indexing pressure statistics exposed through stats api. - Integration of Indexing pressure stats with Plugins for for metric visibility and auto-tuning in future. - Control knobs to tune to the key performance thresholds which control rejections, to address any specific requirement or issues. - Control knobs to run the feature in shadow-mode or enforced-mode. In shadow-mode only internal rejection breakdown metrics will be published while no actual rejections will be performed. The changes were divided into small manageable chunks as part of the following PRs against a feature branch. - Add Shard Indexing Pressure Settings. opensearch-project#716 - Add Shard Indexing Pressure Tracker. opensearch-project#717 - Refactor IndexingPressure to allow extension. opensearch-project#718 - Add Shard Indexing Pressure Store opensearch-project#838 - Add Shard Indexing Pressure Memory Manager opensearch-project#945 - Add ShardIndexingPressure framework level construct and Stats opensearch-project#1015 - Add Indexing Pressure Service which acts as orchestrator for IP opensearch-project#1084 - Add plumbing logic for IndexingPressureService in Transport Actions. opensearch-project#1113 - Add shard indexing pressure metric/stats via rest end point. opensearch-project#1171 - Add shard indexing pressure integration tests. opensearch-project#1198 Signed-off-by: Saurabh Singh <sisurab@amazon.com> Co-authored-by: Saurabh Singh <sisurab@amazon.com> Co-authored-by: Rabi Panda <adnapibar@gmail.com>
Shard level indexing pressure improves the current Indexing Pressure framework which performs memory accounting at node level and rejects the requests. This takes a step further to have rejections based on the memory accounting at shard level along with other key performance factors like throughput and last successful requests. **Key features** - Granular tracking of indexing tasks performance, at every shard level, for each node role i.e. coordinator, primary and replica. - Smarter rejections by discarding the requests intended only for problematic index or shard, while still allowing others to continue (fairness in rejection). - Rejections thresholds governed by combination of configurable parameters (such as memory limits on node) and dynamic parameters (such as latency increase, throughput degradation). - Node level and shard level indexing pressure statistics exposed through stats api. - Integration of Indexing pressure stats with Plugins for for metric visibility and auto-tuning in future. - Control knobs to tune to the key performance thresholds which control rejections, to address any specific requirement or issues. - Control knobs to run the feature in shadow-mode or enforced-mode. In shadow-mode only internal rejection breakdown metrics will be published while no actual rejections will be performed. The changes were divided into small manageable chunks as part of the following PRs against a feature branch. - Add Shard Indexing Pressure Settings. #716 - Add Shard Indexing Pressure Tracker. #717 - Refactor IndexingPressure to allow extension. #718 - Add Shard Indexing Pressure Store #838 - Add Shard Indexing Pressure Memory Manager #945 - Add ShardIndexingPressure framework level construct and Stats #1015 - Add Indexing Pressure Service which acts as orchestrator for IP #1084 - Add plumbing logic for IndexingPressureService in Transport Actions. #1113 - Add shard indexing pressure metric/stats via rest end point. #1171 - Add shard indexing pressure integration tests. #1198 Signed-off-by: Saurabh Singh <sisurab@amazon.com> Co-authored-by: Saurabh Singh <sisurab@amazon.com> Co-authored-by: Rabi Panda <adnapibar@gmail.com>
Signed-off-by: Saurabh Singh sisurab@amazon.com
Description
This PR is 4th of the multiple planned PRs planned for Shard Indexing Pressure (#478). It introduces a Store for Shard Indexing Pressure Trackers which were introduced as part of (#717 ). It helps to manage lifecycle of tracker objects and provide access to them.
Issues Resolved
Addresses Item 4 of #478
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.