Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ShardIndexingPressure framework level construct and Stats #1015

Conversation

getsaurabh02
Copy link
Member

This PR is next among the planned PRs planned for Shard Indexing Pressure (#478). It introduces the main Shard Indexing Pressure construct and implementation to access stats. It is a framework level artefact build on top of IndexingPressure to track incoming indexing request, per shard.

Overall ShardIndexingPressure provides:

  • Memory Accounting at shard level. Overall this feature can be enabled/disabled using on dynamic setting.
  • Memory Accounting at Node level. Tracking is done using the IndexingPressure constructs to support seamless feature toggling.
  • Interfaces to access the statistics (ShardIndexingPressureStats) for shard trackers.

Signed-off-by: Saurabh Singh sisurab@amazon.com

Description

[Describe what this change achieves]

Issues Resolved

Addresses Item 6 of #478

Check List

  • New functionality includes testing.
    • All tests pass
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@opensearch-ci-bot
Copy link
Collaborator

✅   Gradle Wrapper Validation success 8612f3174c405d698f30c4afa075bf9092a10dd6

@opensearch-ci-bot
Copy link
Collaborator

✅   DCO Check Passed 8612f3174c405d698f30c4afa075bf9092a10dd6

@opensearch-ci-bot
Copy link
Collaborator

✅   Gradle Precommit success 8612f3174c405d698f30c4afa075bf9092a10dd6

@getsaurabh02 getsaurabh02 force-pushed the 6_ShardIndexingPressure_Framework branch from 8612f31 to 76cfd74 Compare July 27, 2021 15:39
@opensearch-ci-bot
Copy link
Collaborator

✅   DCO Check Passed 76cfd7404fa01406f9310a20c16ac204336668bc

@opensearch-ci-bot
Copy link
Collaborator

✅   Gradle Wrapper Validation success 76cfd7404fa01406f9310a20c16ac204336668bc

@getsaurabh02
Copy link
Member Author

getsaurabh02 commented Jul 27, 2021

ToDo's while we move this from draft to final:

  • Add tests for ShardIndexingPressureStats. Much of it covered under tests for ShardIndexingPressure currently.
  • Improve tests added for ShardIndexingPressureConcurrentExecutionTests as there seems to be some room left for refactoring.

… artefacts.

Signed-off-by: Saurabh Singh <sisurab@amazon.com>
@getsaurabh02 getsaurabh02 force-pushed the 6_ShardIndexingPressure_Framework branch from 76cfd74 to ca43fa1 Compare July 27, 2021 15:47
@opensearch-ci-bot
Copy link
Collaborator

✅   DCO Check Passed ca43fa1

@opensearch-ci-bot
Copy link
Collaborator

✅   Gradle Wrapper Validation success ca43fa1

@opensearch-ci-bot
Copy link
Collaborator

❌   Gradle Precommit failure 76cfd7404fa01406f9310a20c16ac204336668bc
Log 856

@opensearch-ci-bot
Copy link
Collaborator

✅   Gradle Precommit success ca43fa1

public Releasable markCoordinatingOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
if(0 == bytes) { return () -> {}; }

long requestStartTime = System.currentTimeMillis();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use System.nanoTime() instead

markShardOperationComplete(bytes, requestStartTime, isShadowModeBreach, tracker.getCoordinatingOperationTracker(),
tracker.getCommonOperationTracker());
memoryManager.decreaseShardPrimaryAndCoordinatingLimits(tracker);
tryReleaseTracker(tracker);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is avoidable per request when all thats needed on the tracker is

final Map<ShardId, ShardIndexingPressureTracker > shardIndexingPressureHotStore =
        Collections.synchronizedMap(new LinkedHashMap<ShardId, ShardIndexingPressureTracker>(100, .75F, true) {
            @Override
            protected boolean removeEldestEntry(Map.Entry eldest) {
                return size() > 100;
            }
        });

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will limit the hot shards i.e. shards tracking inflight requests to 100, and may remove the ones currently in use. However with the approach we want to allows hot shards to grow based on the actual traffic, and purge only the ones in cold store.
We have an item to evaluate and refactor shard store cleanup strategy as a follow up going forward #478.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final Map<ShardId, ShardIndexingPressureTracker > shardIndexingPressureColdStore =
        Collections.synchronizedMap(new LinkedHashMap<ShardId, ShardIndexingPressureTracker>(100, .75F, true) {
            @Override
            protected boolean removeEldestEntry(Map.Entry eldest) {
                return size() > 100;
            }
        });

Does this work now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this will help with the eviction strategy for cold store, however, we will still need to move the trackers from hot to cold store conditionally, which is happening here. Also, this will change the cold store from hash map to linked hash map. I will evaluate and revisit the cold store cleanup strategy as a follow up item listed in #478.

Copy link
Collaborator

@Bukhtawar Bukhtawar Jul 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very pessimistic unnecessary operation per request since a small young GC pause can cause trackers moving between hot and cold. IMHO we are just complicating things here when a single tracker would have been sufficient and could have been periodically cleared by a background sweep rather than per request checks to move trackers back and forth.

Please think about optimizing this and open a dedicated issue highlighting this problem so that it doesn't get missed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very pessimistic unnecessary operation per request since a small young GC pause can cause trackers moving between hot and cold.

Just to be more clear, this movement is not per request but only when all the requests for a shards drains out from the node, and there were no new incoming requests. This should not cause any churns in high tps scenarios, with continous or concurrent workload, which seems to be a concern here.
Also, when the tracker movements happens it is a not a deep copy, and no new tracker objects are created, but only references are copied, so overhead is minimal.

IMHO we are just complicating things here when a single tracker would have been sufficient and could have been periodically cleared by a background sweep rather than per request checks to move trackers back and forth.

The two phased lifecycle of tracker (hot and cold) has actually helped reduce churns and reduce GC, as found in the allocation profiling data. Will be sharing the performance results post we complete the PR for plumbing the logic. This has also simplified from implementing any background sweep logic, for now.

Please think about optimizing this and open a dedicated issue highlighting this problem so that it doesn't get missed.

I am open to suggestions and inputs to optimise this further. Given it is not critical, will like to push it behind as a follow up. Currently all related PRs and future pending items are being tracked at meta #478 level, and I have added an open item entry for the same here, so that this is not missed. Let me know if you still think we need a dedicated issue for this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is what we should think about, listing few concerns with tracking and cleaning

  1. Current implementation delegates this responsibility externally, which could well be placed within the tracker to manage the lifecycle. It might be hard to track clean up and maintain overtime
  2. Just becoz the current implementation is doing some optimization doesn't mean other options discussed aren't performant, you should consider evaluating other proposals as well
  3. Client side batching delays, N/W delays, connection establishment all of this can lead to premature drain and aggressive clean up which is a sub-optimal approach of clean up. Most of which is something which the server cannot control and hence shouldn't control. So it would impact high tps traffic as well
  4. The clean up doesn't have to be client side facing, we need to understand the additional overhead. In contrast background cleanup are relatively cheaper and can take better decision atleast better than the weak condition
tracker.getCommonOperationTracker().getCurrentCombinedCoordinatingAndPrimaryBytes() == 0 &&
                tracker.getReplicaOperationTracker().getStatsTracker().getCurrentBytes() == 0

All of these are suggestions its fine even if you don't want to take up

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I agree to most of it, there are few important considerations to make too, and hence was suggesting to de-couple this from the current PR. Captured the high level thoughts here in a dedicated issue #1033, in addition to item in #478 we previously had, to evaluate and follow up. Let me know if you think it's something should not be decoupled or is critical.


private void adjustPerformanceUponCompletion(long bytes, long requestStartTime, StatsTracker statsTracker,
PerformanceTracker performanceTracker) {
long requestEndTime = System.currentTimeMillis();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

System.nanoTime and elsewhere

Comment on lines 67 to 88
if (shouldRejectRequest(nodeLevelLimitBreached, shardLevelLimitBreached)) {
long nodeBytesWithoutOperation = nodeCombinedBytes - bytes;
long nodeTotalBytesWithoutOperation = nodeTotalBytes - bytes;
long shardBytesWithoutOperation = shardCombinedBytes - bytes;

currentCombinedCoordinatingAndPrimaryBytes.addAndGet(-bytes);
coordinatingRejections.getAndIncrement();
tracker.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(-bytes);
tracker.getCoordinatingOperationTracker().getRejectionTracker().incrementTotalRejections();

throw new OpenSearchRejectedExecutionException("rejected execution of coordinating operation [" +
"shard_detail=[" + shardId.getIndexName() + "][" + shardId.id() + "][C], " +
"shard_coordinating_and_primary_bytes=" + shardBytesWithoutOperation + ", " +
"shard_operation_bytes=" + bytes + ", " +
"shard_max_coordinating_and_primary_bytes=" + tracker.getPrimaryAndCoordinatingLimits() + "] OR [" +
"node_coordinating_and_primary_bytes=" + nodeBytesWithoutOperation + ", " +
"node_replica_bytes=" + nodeReplicaBytes + ", " +
"node_all_bytes=" + nodeTotalBytesWithoutOperation + ", " +
"node_operation_bytes=" + bytes + ", " +
"node_max_coordinating_and_primary_bytes=" + primaryAndCoordinatingLimits + "]", false);
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let reduce redundant code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++
Messaging has been consolidated.

Signed-off-by: Saurabh Singh <sisurab@amazon.com>
@getsaurabh02 getsaurabh02 marked this pull request as ready for review July 29, 2021 20:48
@opensearch-ci-bot
Copy link
Collaborator

✅   Gradle Wrapper Validation success 8dbb32e

@opensearch-ci-bot
Copy link
Collaborator

✅   DCO Check Passed 8dbb32e

@opensearch-ci-bot
Copy link
Collaborator

✅   Gradle Precommit success 8dbb32e

Comment on lines +206 to +208
long requestLatency = TimeUnit.NANOSECONDS.toMillis(requestEndTime - requestStartTime);

performanceTracker.addLatencyInMillis(requestLatency);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry why do we need to perform this computation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are keeping track of request latency, moving window throughputs, and total historical latency (sum) in ms across, as it provides more buffer with long data types and are more meaningful for operation level stats.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally values are kept in nanos and converted back into other units lazily.

IndexingStats.Stats stats(boolean isThrottled, long currentThrottleMillis) {
            return new IndexingStats.Stats(
                indexMetric.count(), TimeUnit.NANOSECONDS.toMillis(indexMetric.sum()), indexCurrent.count(), indexFailed.count(),
                deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(),
                noopUpdates.count(), isThrottled, TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis));
        }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree for stats, however here we are also using the latency values for performing bit wise operations (floating-point values) during moving average calculation, and also to keep a cumulative sum of latencies over time in a long data types with delays the overflow.

Comment on lines 258 to 264
private double calculateMovingAverage(long currentAverage, double frontValue, double currentValue, int count) {
if(count > 0) {
return ((Double.longBitsToDouble(currentAverage) * count) + currentValue - frontValue) / count;
} else {
return currentValue;
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets move this to common utils

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

@@ -80,6 +83,11 @@ public CommonStatsFlags(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(LegacyESVersion.V_7_2_0)) {
includeUnloadedSegments = in.readBoolean();
}
if (in.getVersion().onOrAfter(LegacyESVersion.V_7_9_0) &&
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpenSearch users won't have legacy versions >= 7.9. Right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I had some confusions around it too. Legacy versions for open search will be 7.10 as well, however it may not have the shard indexing feature, depending upon the migration type. One options is we only check for onOrAfter(Version.V_1_0_0) from open search perspective. Alternatively, we keep this check for uniform, across migration experiences, based on the versions >= 7.9 and feature attribute check. Let me know if you recommend keeping onOrAfter(Version.V_1_0_0) instead.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question is how would a user on Elasticsearch OSS 7.9 will get impacted once they upgrade to Opensearch 1.0.0 with the current logic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will be kind of no-op for Elasticsearch OSS 7.9/7.10 users during the upgrade flow, and they can start using the feature once on Opensearch 1.x if feature settings are enabled.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest if we can remove this safely lets do it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

Signed-off-by: Saurabh Singh <sisurab@amazon.com>
@opensearch-ci-bot
Copy link
Collaborator

✅   Gradle Wrapper Validation success c7721a8

@opensearch-ci-bot
Copy link
Collaborator

✅   DCO Check Passed c7721a8

@opensearch-ci-bot
Copy link
Collaborator

✅   Gradle Precommit success c7721a8

Copy link
Collaborator

@Bukhtawar Bukhtawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Saurabh
I'll give one final pass by today.

@@ -80,6 +83,11 @@ public CommonStatsFlags(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(LegacyESVersion.V_7_2_0)) {
includeUnloadedSegments = in.readBoolean();
}
if (in.getVersion().onOrAfter(LegacyESVersion.V_7_9_0) &&
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest if we can remove this safely lets do it

long requestLatency = TimeUnit.NANOSECONDS.toMillis(requestEndTime - requestStartTime);

performanceTracker.addLatencyInMillis(requestLatency);
performanceTracker.updateLastSuccessfulRequestTimestamp(requestEndTime);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this be a conditional put so that a lower TS doesn't override a higher TS

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TS here is the requestEndTime which is being calculated right here, just above this code. Since this is post completion of actual request, with no blocking operation in between, it should not really matter if there are any minor differences due to scheduling.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a good practise to ignore correctness because it doesn't matter to you

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really sure what you mean by correctness here. From the tracker perspective it is tracking request completion timestamp, as and when request gets completed, and doesn’t impose a hard requirement on sequencing.


performanceTracker.addLatencyInMillis(requestLatency);
performanceTracker.updateLastSuccessfulRequestTimestamp(requestEndTime);
performanceTracker.updateTotalOutstandingRequests(0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should atomically decrement out standing request rather than setting it to zero

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Objective here is to reset the outstanding counter, once any pending request gets successful and timestamp gets updated. Changed it to resetTotalOutstandingRequests to make it more explicit.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At any point there are concurrent requests in flight using same tracker, no sure how we could assume 0 outstanding

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is helping in stuck task tracking (black-hole scenario), which gets reset if there are executions in progress. Degradation or low throughput on the other hand is handled by other latency parameter.

performanceTracker.updateLastSuccessfulRequestTimestamp(requestEndTime);
performanceTracker.updateTotalOutstandingRequests(0);

if(requestLatency > 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a defensive check since all latencies are expected to be non-zero

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although nanoTime is expected to be monotonically increasing, there are few known reasons where it can be observed backwards. Since in calculateRequestThroughput, we use this value for ratio calculation, a negative value is completely undesirable, and hence this safe check.

};
}

public Releasable markPrimaryOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is known that the Releasable can be called multiple times. Should we wrap it to ensure it gets executed only once

    private static Releasable wrapReleasable(Releasable releasable) {
        final AtomicBoolean called = new AtomicBoolean();
        return () -> {
            if (called.compareAndSet(false, true)) {
                releasable.close();
            } else {
                logger.error("IndexingPressure memory is adjusted twice", new IllegalStateException("Releasable is called twice"));
                assert false : "IndexingPressure is adjusted twice";
            }
        };
    }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the original PR from elastic had no specific reason pointed yet, but more of a safety net. However, I agree having it wrapped with associated error log will help clients of ShardIndexingPressure to identify any leaks, and call Releasable at most once.

Signed-off-by: Saurabh Singh <sisurab@amazon.com>
@opensearch-ci-bot
Copy link
Collaborator

✅   DCO Check Passed 29a2455

@opensearch-ci-bot
Copy link
Collaborator

✅   Gradle Wrapper Validation success 29a2455

@opensearch-ci-bot
Copy link
Collaborator

✅   Gradle Precommit success 29a2455

long requestLatency = TimeUnit.NANOSECONDS.toMillis(requestEndTime - requestStartTime);

performanceTracker.addLatencyInMillis(requestLatency);
performanceTracker.updateLastSuccessfulRequestTimestamp(requestEndTime);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a good practise to ignore correctness because it doesn't matter to you

* 2. Memory Accounting at Node level. Tracking is done using the IndexingPressure artefacts to support feature seamless toggling.
* 3. Interfaces to access the statistics for shard trackers.
*/
public class ShardIndexingPressure extends IndexingPressure {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main concern I have with this class is that it has too much logic and I been lost multiple times reviewing this.
So we do say we are extending IndexingPressure but that is not really true. For instance I'd expect something of this type to exist to avoid duplicating the logic from IndexingPressure and clubbing it redundantly with ShardIndexingPressure.

@Override
    public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExecution) {
        Releasable nodeReleasable = super.markCoordinatingOperationStarted(bytes, forceExecution);
        () -> {
            currentCombinedCoordinatingAndPrimaryBytes.addAndGet(-bytes);
            currentCoordinatingBytes.addAndGet(-bytes);
            markShardOperationComplete(bytes, requestStartTime, isShadowModeBreach, tracker.getCoordinatingOperationTracker(),
                tracker.getCommonOperationTracker());
            memoryManager.decreaseShardPrimaryAndCoordinatingLimits(tracker);
            nodeReleasable.close();
        }
    }

The way it stand currently gives a fake impression of inheritance when what it is in practise trying to achieve is composition

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the core logic here got updated more than once in this PR. It exposes 4 public methods today, which are on the lines of IndexingPressure, and takes an additional shard field parameter. Each of these methods are for different shard operations i.e. coordinating, primary-local, primary and replica. So it is having the similar responsibilities as of IndexingPressure today.

Also from the inheritance perspective it is not fake, but in actual it is extending the properties/attributes of IndexingPressure so that values are updated appropriately, irrespective of the feature flag being on/off. Not really sure why you think this is composition since IndexingPressure is not part of ShardIndexingPressure.

I have thought this few times and have taken a conscious decision of not re-using the methods from IndexingPressure as the logic in total is not A+B type. It adds more complexity and tight coupling between the two classes when updating values during exceptions and release scenarios.

Moreover, we also envision with rollout of ShardIndexingPressure , we will eventually be able to deprecate IndexingPressure.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats exactly my concern, the moment you deprecate IndexingPressure the ShardIndexingPressure is gonna become a monolith. Inheritance is just not about using attributes but also extending methods, currently its just doing one part and duplicating the parent logic. Handling node level and shard level at one place makes the class overloaded which could have fanned out

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have thought this few times and have taken a conscious decision of not re-using the methods from IndexingPressure as the logic in total is not A+B type. It adds more complexity and tight coupling between the two classes when updating values during exceptions and release scenarios.

I would be happy to understand those cases and see if I could help with loose coupling. Let me know

Copy link
Member Author

@getsaurabh02 getsaurabh02 Aug 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShardIndexingPressure is not handling the core logic of node or shard level evaluation, but it is being delegating to the lower level constructs ShardIndexingPressureMemoryManager and ShardIndexingPressureStore with right responsibilities and hence avoiding it to become monolith. Also from the responsibilities side, it is exposing similar interfaces to clients as mentioned above, hence following the single responsibility principles.

I would be happy to understand those cases and see if I could help with loose coupling. Let me know

To take a simple example in markCoordinatingOperationStarted We need to verify the node level limits and shards level limits first, before we actually update any node level counters, such as for current utilization and rejection count.

I will chose to keep the logic separate, as there is not much reusability but tight coupling, given the logic is not additive in nature. We can rather focus on building the ShardIndexingPressure and required underlying constructs better in isolation, and simplify it further if you see any more concern.

Copy link
Collaborator

@Bukhtawar Bukhtawar Aug 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would humbly disagree. The reason I recommended you to extend was based on my understanding that the logic would be reused/fanned and delegated correctly at different levels. If you don't think thats possible I don't think extending IndexingPressure is the right design given there are hardly any commonalities with not even a single logic extended or reused

Copy link
Member Author

@getsaurabh02 getsaurabh02 Aug 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extension was a choice and we wanted to re-use for attributes as IndexingPressure class. It is required so that the ShardIndexingPressure as a feature can be toggled, while the state of IndexingPressure continues. Not re-using few methods is more of an implementation choice, to keep the coupling loose between these two classes. It also gives us frame to build ShardIndexingPressure and its underlying constructs in isolation, with liberty to have more features and also eventually deprecate IndexingPressure.

Can you please be more specific with the concern here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Code duplication in sub-class when an extension is possible
  2. Too much logic in ShardIndexingPressure which is a maintenance overhead.
    As a reviewer I got lost so will others looking at the code. If you don't want to improve on this suggestion it's still fine

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code duplication in sub-class when an extension is possible

There is no code in ShardIndexingPressure which could be offloaded to IndexingPressure directly. Core logic for node limit check is already segregated and offloaded to ShardIndexingPressureMemoryManager. For the node level counter updates in ShardIndexingPressure, since logic is not additive, still prefer to not use methods, as its not directly consumable. Changes in IndexingPressure will unnecessarily increase the coupling which I want to avoid.

Too much logic in ShardIndexingPressure which is a maintenance overhead.

I will be happy to take more inputs and rather focus on segregating the ShardIndexingPressure and making it simpler. I dont think we should be blocked on reusing IndexingPressure just for counter updates, as its unnecessary overhead.

Copy link
Collaborator

@Bukhtawar Bukhtawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ensure you have enuf tests to cover cases.

@getsaurabh02
Copy link
Member Author

@Bukhtawar I will be happy to take more inputs and rather focus on segregating the ShardIndexingPressure and making it simpler. I dont think we should be blocked on reusing IndexingPressure just for re-usability, as its not completely additive and adds unnecessary overhead.

@adnapibar adnapibar merged commit 84c01f7 into opensearch-project:feature/478_indexBackPressure Aug 12, 2021
adnapibar pushed a commit that referenced this pull request Sep 15, 2021
* Add ShardIndexingPressure framework level construct and related Stats artefacts.
* Test and code refactoring for shard indexing pressure.
* Moved the average calculation logic to common memory manager util.
* Add wrapper for releasable in ShardIndexingPressure operations.

Signed-off-by: Saurabh Singh <sisurab@amazon.com>
adnapibar pushed a commit that referenced this pull request Sep 15, 2021
* Add ShardIndexingPressure framework level construct and related Stats artefacts.
* Test and code refactoring for shard indexing pressure.
* Moved the average calculation logic to common memory manager util.
* Add wrapper for releasable in ShardIndexingPressure operations.

Signed-off-by: Saurabh Singh <sisurab@amazon.com>
getsaurabh02 added a commit to getsaurabh02/OpenSearch that referenced this pull request Oct 6, 2021
…arch-project#1015)

* Add ShardIndexingPressure framework level construct and related Stats artefacts.
* Test and code refactoring for shard indexing pressure.
* Moved the average calculation logic to common memory manager util.
* Add wrapper for releasable in ShardIndexingPressure operations.

Signed-off-by: Saurabh Singh <sisurab@amazon.com>
adnapibar added a commit that referenced this pull request Oct 7, 2021
Shard level indexing pressure improves the current Indexing Pressure framework which performs memory accounting at node level and rejects the requests. This takes a step further to have rejections based on the memory accounting at shard level along with other key performance factors like throughput and last successful requests. 

**Key features**
- Granular tracking of indexing tasks performance, at every shard level, for each node role i.e. coordinator, primary and replica.
- Smarter rejections by discarding the requests intended only for problematic index or shard, while still allowing others to continue (fairness in rejection).
- Rejections thresholds governed by combination of configurable parameters (such as memory limits on node) and dynamic parameters (such as latency increase, throughput degradation).
- Node level and shard level indexing pressure statistics exposed through stats api.
- Integration of Indexing pressure stats with Plugins for for metric visibility and auto-tuning in future.
- Control knobs to tune to the key performance thresholds which control rejections, to address any specific requirement or issues.
- Control knobs to run the feature in shadow-mode or enforced-mode. In shadow-mode only internal rejection breakdown metrics will be published while no actual rejections will be performed.

The changes were divided into small manageable chunks as part of the following PRs against a feature branch.

- Add Shard Indexing Pressure Settings. #716
- Add Shard Indexing Pressure Tracker. #717
- Refactor IndexingPressure to allow extension. #718
- Add Shard Indexing Pressure Store #838
- Add Shard Indexing Pressure Memory Manager #945
- Add ShardIndexingPressure framework level construct and Stats #1015
- Add Indexing Pressure Service which acts as orchestrator for IP #1084
- Add plumbing logic for IndexingPressureService in Transport Actions. #1113
- Add shard indexing pressure metric/stats via rest end point. #1171
- Add shard indexing pressure integration tests. #1198

Signed-off-by: Saurabh Singh <sisurab@amazon.com>
Co-authored-by: Saurabh Singh <sisurab@amazon.com>
Co-authored-by: Rabi Panda <adnapibar@gmail.com>
getsaurabh02 added a commit to getsaurabh02/OpenSearch that referenced this pull request Oct 7, 2021
Shard level indexing pressure improves the current Indexing Pressure framework which performs memory accounting at node level and rejects the requests. This takes a step further to have rejections based on the memory accounting at shard level along with other key performance factors like throughput and last successful requests.

**Key features**
- Granular tracking of indexing tasks performance, at every shard level, for each node role i.e. coordinator, primary and replica.
- Smarter rejections by discarding the requests intended only for problematic index or shard, while still allowing others to continue (fairness in rejection).
- Rejections thresholds governed by combination of configurable parameters (such as memory limits on node) and dynamic parameters (such as latency increase, throughput degradation).
- Node level and shard level indexing pressure statistics exposed through stats api.
- Integration of Indexing pressure stats with Plugins for for metric visibility and auto-tuning in future.
- Control knobs to tune to the key performance thresholds which control rejections, to address any specific requirement or issues.
- Control knobs to run the feature in shadow-mode or enforced-mode. In shadow-mode only internal rejection breakdown metrics will be published while no actual rejections will be performed.

The changes were divided into small manageable chunks as part of the following PRs against a feature branch.

- Add Shard Indexing Pressure Settings. opensearch-project#716
- Add Shard Indexing Pressure Tracker. opensearch-project#717
- Refactor IndexingPressure to allow extension. opensearch-project#718
- Add Shard Indexing Pressure Store opensearch-project#838
- Add Shard Indexing Pressure Memory Manager opensearch-project#945
- Add ShardIndexingPressure framework level construct and Stats opensearch-project#1015
- Add Indexing Pressure Service which acts as orchestrator for IP opensearch-project#1084
- Add plumbing logic for IndexingPressureService in Transport Actions. opensearch-project#1113
- Add shard indexing pressure metric/stats via rest end point. opensearch-project#1171
- Add shard indexing pressure integration tests. opensearch-project#1198

Signed-off-by: Saurabh Singh <sisurab@amazon.com>
Co-authored-by: Saurabh Singh <sisurab@amazon.com>
Co-authored-by: Rabi Panda <adnapibar@gmail.com>
tlfeng pushed a commit that referenced this pull request Oct 11, 2021
Shard level indexing pressure improves the current Indexing Pressure framework which performs memory accounting at node level and rejects the requests. This takes a step further to have rejections based on the memory accounting at shard level along with other key performance factors like throughput and last successful requests.

**Key features**
- Granular tracking of indexing tasks performance, at every shard level, for each node role i.e. coordinator, primary and replica.
- Smarter rejections by discarding the requests intended only for problematic index or shard, while still allowing others to continue (fairness in rejection).
- Rejections thresholds governed by combination of configurable parameters (such as memory limits on node) and dynamic parameters (such as latency increase, throughput degradation).
- Node level and shard level indexing pressure statistics exposed through stats api.
- Integration of Indexing pressure stats with Plugins for for metric visibility and auto-tuning in future.
- Control knobs to tune to the key performance thresholds which control rejections, to address any specific requirement or issues.
- Control knobs to run the feature in shadow-mode or enforced-mode. In shadow-mode only internal rejection breakdown metrics will be published while no actual rejections will be performed.

The changes were divided into small manageable chunks as part of the following PRs against a feature branch.

- Add Shard Indexing Pressure Settings. #716
- Add Shard Indexing Pressure Tracker. #717
- Refactor IndexingPressure to allow extension. #718
- Add Shard Indexing Pressure Store #838
- Add Shard Indexing Pressure Memory Manager #945
- Add ShardIndexingPressure framework level construct and Stats #1015
- Add Indexing Pressure Service which acts as orchestrator for IP #1084
- Add plumbing logic for IndexingPressureService in Transport Actions. #1113
- Add shard indexing pressure metric/stats via rest end point. #1171
- Add shard indexing pressure integration tests. #1198

Signed-off-by: Saurabh Singh <sisurab@amazon.com>
Co-authored-by: Saurabh Singh <sisurab@amazon.com>
Co-authored-by: Rabi Panda <adnapibar@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants