From 347bcc506287f5f3387fc6eccf8584f11ab1fe69 Mon Sep 17 00:00:00 2001 From: Saurabh Singh Date: Tue, 20 Jul 2021 05:58:05 +0530 Subject: [PATCH] Add Shard Indexing Pressure Memory Manager (#478) (#945) It introduces a Memory Manager for Shard Indexing Pressure. It is responsible for increasing and decreasing the allocated shard limit based on incoming requests, and validate the current values against the thresholds. Signed-off-by: Saurabh Singh --- .../common/settings/ClusterSettings.java | 20 +- .../ShardIndexingPressureMemoryManager.java | 457 +++++++++++++++ .../index/ShardIndexingPressureTracker.java | 52 +- ...ardIndexingPressureMemoryManagerTests.java | 526 ++++++++++++++++++ 4 files changed, 1047 insertions(+), 8 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/ShardIndexingPressureMemoryManager.java create mode 100644 server/src/test/java/org/opensearch/index/ShardIndexingPressureMemoryManagerTests.java diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 6be770e3af62e..e0f2fdb435599 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -33,6 +33,12 @@ import org.apache.logging.log4j.LogManager; import org.opensearch.action.main.TransportMainAction; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.IndexingPressure; +import org.opensearch.index.ShardIndexingPressureMemoryManager; +import org.opensearch.index.ShardIndexingPressureSettings; +import org.opensearch.index.ShardIndexingPressureStore; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.opensearch.action.admin.indices.close.TransportCloseIndexAction; @@ -100,11 +106,6 @@ import org.opensearch.gateway.GatewayService; import org.opensearch.gateway.PersistedClusterStateService; import org.opensearch.http.HttpTransportSettings; -import org.opensearch.index.IndexModule; -import org.opensearch.index.IndexSettings; -import org.opensearch.index.IndexingPressure; -import org.opensearch.index.ShardIndexingPressureSettings; -import org.opensearch.index.ShardIndexingPressureStore; import org.opensearch.indices.IndexingMemoryController; import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.IndicesRequestCache; @@ -587,7 +588,14 @@ public void apply(Settings value, Settings current, Settings previous) { ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED, ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW, ShardIndexingPressureSettings.SHARD_MIN_LIMIT, - ShardIndexingPressureStore.MAX_COLD_STORE_SIZE))); + ShardIndexingPressureStore.MAX_COLD_STORE_SIZE, + ShardIndexingPressureMemoryManager.LOWER_OPERATING_FACTOR, + ShardIndexingPressureMemoryManager.OPTIMAL_OPERATING_FACTOR, + ShardIndexingPressureMemoryManager.UPPER_OPERATING_FACTOR, + ShardIndexingPressureMemoryManager.NODE_SOFT_LIMIT, + ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS, + ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT, + ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS))); public static List> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList( SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER, diff --git a/server/src/main/java/org/opensearch/index/ShardIndexingPressureMemoryManager.java b/server/src/main/java/org/opensearch/index/ShardIndexingPressureMemoryManager.java new file mode 100644 index 0000000000000..d65ff885b8375 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/ShardIndexingPressureMemoryManager.java @@ -0,0 +1,457 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.ShardIndexingPressureTracker.OperationTracker; +import org.opensearch.index.ShardIndexingPressureTracker.PerformanceTracker; +import org.opensearch.index.ShardIndexingPressureTracker.RejectionTracker; +import org.opensearch.index.ShardIndexingPressureTracker.StatsTracker; +import org.opensearch.index.shard.ShardId; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiPredicate; +import java.util.function.BooleanSupplier; +import java.util.function.LongSupplier; +import java.util.function.ToLongFunction; + +/** + * The Shard Indexing Pressure Memory Manager is the construct responsible for increasing and decreasing the allocated shard limit + * based on incoming requests. A shard limits defines the maximum memory that a shard can occupy in the heap for request objects. + * + * Based on the overall memory utilization on the node, and current traffic needs shard limits will be modified: + * + * 1. If the limits assigned to a shard is breached (Primary Parameter) while the node level overall occupancy across all shards + * is not greater than primary_parameter.node.soft_limit, MemoryManager will increase the shard limits without any deeper evaluation. + * 2. If the limits assigned to the shard is breached(Primary Parameter) and the node level overall occupancy across all shards + * is greater than primary_parameter.node.soft_limit, then MemoryManager will evaluate deeper parameters for shards to identify any + * issues, such as throughput degradation (Secondary Parameter - 1) and time since last request was successful (Secondary Parameter - 2). + * This helps identify detect any duress state with the shard, requesting more memory. + * + * Secondary Parameters covered above: + * 1. ThroughputDegradationLimitsBreached - When the moving window throughput average has increased by a factor compared to + * the historical throughput average. If the factor by which it has increased is greater than the degradation limit threshold, this + * parameter is considered to be breached. + * 2. LastSuccessfulRequestDurationLimitsBreached - When the time since the last successful request completed is greater than the max + * timeout threshold value, while there a number of outstanding requests greater than the max outstanding requests then this parameter + * is considered to be breached. + * + * MemoryManager attempts to increase of decrease the shard limits in case the shard utilization goes below operating_factor.lower or + * goes above operating_factor.upper of current shard limits. MemoryManager attempts to update the new shard limit such that the new value + * remains withing the operating_factor.optimal range of current shard utilization. + * + */ +public class ShardIndexingPressureMemoryManager { + private static final Logger logger = LogManager.getLogger(ShardIndexingPressureMemoryManager.class); + + /** + * Shard operating factor can be evaluated using currentShardBytes/shardLimits. Outcome of this expression is categorized as + * lower, optimal and upper boundary, and appropriate action is taken once the below defined threshold values are breached. + */ + public static final Setting LOWER_OPERATING_FACTOR = + Setting.doubleSetting("shard_indexing_pressure.operating_factor.lower", 0.75d, 0.0d, + Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting OPTIMAL_OPERATING_FACTOR = + Setting.doubleSetting("shard_indexing_pressure.operating_factor.optimal", 0.85d, 0.0d, + Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting UPPER_OPERATING_FACTOR = + Setting.doubleSetting("shard_indexing_pressure.operating_factor.upper", 0.95d, 0.0d, + Setting.Property.NodeScope, Setting.Property.Dynamic); + + /** + * This determines the max time elapsed since any request was processed successfully. Appropriate action is taken + * once the below below defined threshold value is breached. + */ + public static final Setting SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT = + Setting.positiveTimeSetting("shard_indexing_pressure.secondary_parameter.successful_request.elapsed_timeout", + TimeValue.timeValueMillis(300000), Setting.Property.NodeScope, Setting.Property.Dynamic); + + /** + * This determines the max outstanding request that are yet to be processed successfully. Appropriate + * action is taken once the below defined threshold value is breached. + */ + public static final Setting MAX_OUTSTANDING_REQUESTS = + Setting.intSetting("shard_indexing_pressure.secondary_parameter.successful_request.max_outstanding_requests", + 100, Setting.Property.NodeScope, Setting.Property.Dynamic); + + /** + * Degradation for a shard can be evaluated using average throughput of last N requests, + * where N being {@link ShardIndexingPressureSettings#REQUEST_SIZE_WINDOW}, divided by lifetime average throughput. + * Appropriate action is taken once the outcome of above expression breaches the below defined threshold value is breached. + */ + public static final Setting THROUGHPUT_DEGRADATION_LIMITS = + Setting.doubleSetting("shard_indexing_pressure.secondary_parameter.throughput.degradation_factor", 5.0d, 1.0d, + Setting.Property.NodeScope, Setting.Property.Dynamic); + + /** + * The node level soft limit determines when the secondary parameters for shard is to be evaluated for degradation. + */ + public static final Setting NODE_SOFT_LIMIT = + Setting.doubleSetting("shard_indexing_pressure.primary_parameter.node.soft_limit", 0.7d, 0.0d, + Setting.Property.NodeScope, Setting.Property.Dynamic); + + private final AtomicLong totalNodeLimitsBreachedRejections = new AtomicLong(); + private final AtomicLong totalLastSuccessfulRequestLimitsBreachedRejections = new AtomicLong(); + private final AtomicLong totalThroughputDegradationLimitsBreachedRejections = new AtomicLong(); + + private final ShardIndexingPressureSettings shardIndexingPressureSettings; + private final ShardIndexingPressureStore shardIndexingPressureStore; + + private volatile double lowerOperatingFactor; + private volatile double optimalOperatingFactor; + private volatile double upperOperatingFactor; + + private volatile TimeValue successfulRequestElapsedTimeout; + private volatile int maxOutstandingRequests; + + private volatile double primaryAndCoordinatingThroughputDegradationLimits; + private volatile double replicaThroughputDegradationLimits; + + private volatile double nodeSoftLimit; + + public ShardIndexingPressureMemoryManager(ShardIndexingPressureSettings shardIndexingPressureSettings, + ClusterSettings clusterSettings, Settings settings) { + this.shardIndexingPressureSettings = shardIndexingPressureSettings; + this.shardIndexingPressureStore = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + + this.lowerOperatingFactor = LOWER_OPERATING_FACTOR.get(settings).doubleValue(); + clusterSettings.addSettingsUpdateConsumer(LOWER_OPERATING_FACTOR, this::setLowerOperatingFactor); + + this.optimalOperatingFactor = OPTIMAL_OPERATING_FACTOR.get(settings).doubleValue(); + clusterSettings.addSettingsUpdateConsumer(OPTIMAL_OPERATING_FACTOR, this::setOptimalOperatingFactor); + + this.upperOperatingFactor = UPPER_OPERATING_FACTOR.get(settings).doubleValue(); + clusterSettings.addSettingsUpdateConsumer(UPPER_OPERATING_FACTOR, this::setUpperOperatingFactor); + + this.successfulRequestElapsedTimeout = SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.get(settings); + clusterSettings.addSettingsUpdateConsumer(SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT, this::setSuccessfulRequestElapsedTimeout); + + this.maxOutstandingRequests = MAX_OUTSTANDING_REQUESTS.get(settings).intValue(); + clusterSettings.addSettingsUpdateConsumer(MAX_OUTSTANDING_REQUESTS, this::setMaxOutstandingRequests); + + this.primaryAndCoordinatingThroughputDegradationLimits = THROUGHPUT_DEGRADATION_LIMITS.get(settings).doubleValue(); + this.replicaThroughputDegradationLimits = this.primaryAndCoordinatingThroughputDegradationLimits * 1.5; + clusterSettings.addSettingsUpdateConsumer(THROUGHPUT_DEGRADATION_LIMITS, this::setThroughputDegradationLimits); + + this.nodeSoftLimit = NODE_SOFT_LIMIT.get(settings).doubleValue(); + clusterSettings.addSettingsUpdateConsumer(NODE_SOFT_LIMIT, this::setNodeSoftLimit); + } + + /** + * Checks if the node level memory threshold is breached for coordinating operations. + */ + boolean isCoordinatingNodeLimitBreached(ShardIndexingPressureTracker tracker, long nodeTotalBytes) { + if(nodeTotalBytes > this.shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits()) { + logger.debug("Node limits breached for coordinating operation [node_total_bytes={} , " + + "node_primary_and_coordinating_limits={}]", nodeTotalBytes, + this.shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits()); + incrementNodeLimitBreachedRejectionCount(tracker.getCoordinatingOperationTracker().getRejectionTracker()); + return true; + } + return false; + } + + /** + * Checks if the shard level memory threshold is breached for coordinating operations. + */ + boolean isCoordinatingShardLimitBreached(ShardIndexingPressureTracker tracker, long nodeTotalBytes, long requestStartTime) { + // Shard memory limits is breached when the current utilization is greater than operating_factor.upper limit. + long shardCombinedBytes = tracker.getCommonOperationTracker().getCurrentCombinedCoordinatingAndPrimaryBytes(); + long shardPrimaryAndCoordinatingLimits = tracker.getPrimaryAndCoordinatingLimits(); + boolean shardMemoryLimitsBreached = ((double)shardCombinedBytes / shardPrimaryAndCoordinatingLimits) > this.upperOperatingFactor; + + if(shardMemoryLimitsBreached) { + BooleanSupplier increaseShardLimitSupplier = () -> increaseShardLimits(tracker.getShardId(), + this.shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits(), + () -> tracker.getCommonOperationTracker().getCurrentCombinedCoordinatingAndPrimaryBytes(), + tracker::getPrimaryAndCoordinatingLimits, + ShardIndexingPressureTracker::getPrimaryAndCoordinatingLimits, + tracker::compareAndSetPrimaryAndCoordinatingLimits); + + return onShardLimitBreached(nodeTotalBytes, this.shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits(), + requestStartTime, tracker.getCoordinatingOperationTracker(), increaseShardLimitSupplier); + } else { + return false; + } + } + + /** + * Checks if the node level memory threshold is breached for primary operations. + */ + boolean isPrimaryNodeLimitBreached(ShardIndexingPressureTracker tracker, long nodeTotalBytes) { + if(nodeTotalBytes > this.shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits()) { + logger.debug("Node limits breached for primary operation [node_total_bytes={}, " + + "node_primary_and_coordinating_limits={}]", nodeTotalBytes, + this.shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits()); + incrementNodeLimitBreachedRejectionCount(tracker.getPrimaryOperationTracker().getRejectionTracker()); + return true; + } + return false; + } + + /** + * Checks if the shard level memory threshold is breached for primary operations. + */ + boolean isPrimaryShardLimitBreached(ShardIndexingPressureTracker tracker, long nodeTotalBytes, long requestStartTime) { + // Shard memory limits is breached when the current utilization is greater than operating_factor.upper limit. + long shardCombinedBytes = tracker.getCommonOperationTracker().getCurrentCombinedCoordinatingAndPrimaryBytes(); + long shardPrimaryAndCoordinatingLimits = tracker.getPrimaryAndCoordinatingLimits(); + boolean shardMemoryLimitsBreached = ((double)shardCombinedBytes / shardPrimaryAndCoordinatingLimits) > this.upperOperatingFactor; + + if(shardMemoryLimitsBreached) { + BooleanSupplier increaseShardLimitSupplier = () -> increaseShardLimits(tracker.getShardId(), + this.shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits(), + () -> tracker.getCommonOperationTracker().getCurrentCombinedCoordinatingAndPrimaryBytes(), + tracker::getPrimaryAndCoordinatingLimits, + ShardIndexingPressureTracker::getPrimaryAndCoordinatingLimits, + tracker::compareAndSetPrimaryAndCoordinatingLimits); + + return onShardLimitBreached(nodeTotalBytes, this.shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits(), + requestStartTime, tracker.getPrimaryOperationTracker(), increaseShardLimitSupplier); + } else { + return false; + } + } + + /** + * Checks if the node level memory threshold is breached for replica operations. + */ + boolean isReplicaNodeLimitBreached(ShardIndexingPressureTracker tracker, long nodeReplicaBytes) { + if(nodeReplicaBytes > this.shardIndexingPressureSettings.getNodeReplicaLimits()) { + logger.debug("Node limits breached for replica operation [node_replica_bytes={} , " + + "node_replica_limits={}]", nodeReplicaBytes, this.shardIndexingPressureSettings.getNodeReplicaLimits()); + incrementNodeLimitBreachedRejectionCount(tracker.getReplicaOperationTracker().getRejectionTracker()); + return true; + } + return false; + } + + /** + * Checks if the shard level memory threshold is breached for replica operations. + */ + boolean isReplicaShardLimitBreached(ShardIndexingPressureTracker tracker, long nodeReplicaBytes, long requestStartTime) { + // Shard memory limits is breached when the current utilization is greater than operating_factor.upper limit. + long shardReplicaBytes = tracker.getReplicaOperationTracker().getStatsTracker().getCurrentBytes(); + long shardReplicaLimits = tracker.getReplicaLimits(); + final boolean shardMemoryLimitsBreached = ((double)shardReplicaBytes / shardReplicaLimits) > this.upperOperatingFactor; + + if(shardMemoryLimitsBreached) { + BooleanSupplier increaseShardLimitSupplier = () -> increaseShardLimits(tracker.getShardId(), + this.shardIndexingPressureSettings.getNodeReplicaLimits(), + () -> tracker.getReplicaOperationTracker().getStatsTracker().getCurrentBytes(), + tracker::getReplicaLimits, + ShardIndexingPressureTracker::getReplicaLimits, + tracker::compareAndSetReplicaLimits); + + return onShardLimitBreached(nodeReplicaBytes, this.shardIndexingPressureSettings.getNodeReplicaLimits(), + requestStartTime, tracker.getReplicaOperationTracker(), increaseShardLimitSupplier); + } else { + return false; + } + } + + void decreaseShardPrimaryAndCoordinatingLimits(ShardIndexingPressureTracker tracker) { + decreaseShardLimits(tracker.getShardId(), + () -> tracker.getCommonOperationTracker().getCurrentCombinedCoordinatingAndPrimaryBytes(), + tracker::getPrimaryAndCoordinatingLimits, + tracker::compareAndSetPrimaryAndCoordinatingLimits, + shardIndexingPressureSettings.getShardPrimaryAndCoordinatingBaseLimits()); + } + + void decreaseShardReplicaLimits(ShardIndexingPressureTracker tracker) { + decreaseShardLimits(tracker.getShardId(), + () -> tracker.getReplicaOperationTracker().getStatsTracker().getCurrentBytes(), + tracker::getReplicaLimits, + tracker::compareAndSetReplicaLimits, + shardIndexingPressureSettings.getShardReplicaBaseLimits()); + } + + ShardIndexingPressureTracker getShardIndexingPressureTracker(ShardId shardId) { + return shardIndexingPressureStore.getShardIndexingPressureTracker(shardId); + } + + long getTotalNodeLimitsBreachedRejections() { + return totalNodeLimitsBreachedRejections.get(); + } + + long getTotalLastSuccessfulRequestLimitsBreachedRejections() { + return totalLastSuccessfulRequestLimitsBreachedRejections.get(); + } + + long getTotalThroughputDegradationLimitsBreachedRejections() { + return totalThroughputDegradationLimitsBreachedRejections.get(); + } + + /** + * Verifies and returns true if the shard limit is hard-breached i.e. shard limit cannot be increased further. Otherwise + * increases the shard limit and returns false. + */ + private boolean onShardLimitBreached(long nodeTotalBytes, long nodeLimit, long requestStartTime, OperationTracker operationTracker, + BooleanSupplier increaseShardLimitSupplier) { + + // Secondary Parameters (i.e. LastSuccessfulRequestDuration and Throughput) is taken into consideration when + // the current node utilization is greater than primary_parameter.node.soft_limit of total node limits. + if(((double)nodeTotalBytes / nodeLimit) < this.nodeSoftLimit) { + boolean isShardLimitsIncreased = increaseShardLimitSupplier.getAsBoolean(); + if (isShardLimitsIncreased == false) { + incrementNodeLimitBreachedRejectionCount(operationTracker.getRejectionTracker()); + } + return !isShardLimitsIncreased; + } else { + boolean shardLastSuccessfulRequestDurationLimitsBreached = + evaluateLastSuccessfulRequestDurationLimitsBreached(operationTracker.getPerformanceTracker(), requestStartTime); + + if (shardLastSuccessfulRequestDurationLimitsBreached) { + operationTracker.getRejectionTracker().incrementLastSuccessfulRequestLimitsBreachedRejections(); + this.totalLastSuccessfulRequestLimitsBreachedRejections.incrementAndGet(); + return true; + } + + boolean shardThroughputDegradationLimitsBreached = + evaluateThroughputDegradationLimitsBreached(operationTracker.getPerformanceTracker(), + operationTracker.getStatsTracker(), primaryAndCoordinatingThroughputDegradationLimits); + + if (shardThroughputDegradationLimitsBreached) { + operationTracker.getRejectionTracker().incrementThroughputDegradationLimitsBreachedRejections(); + this.totalThroughputDegradationLimitsBreachedRejections.incrementAndGet(); + return true; + } + + boolean isShardLimitsIncreased = increaseShardLimitSupplier.getAsBoolean(); + if (isShardLimitsIncreased == false) { + incrementNodeLimitBreachedRejectionCount(operationTracker.getRejectionTracker()); + } + return !isShardLimitsIncreased; + } + } + + private boolean increaseShardLimits(ShardId shardId, long nodeLimit, + LongSupplier shardCurrentBytesSupplier, LongSupplier shardLimitSupplier, + ToLongFunction getShardLimitFunction, + BiPredicate updateShardLimitPredicate) { + long currentShardLimit; + long newShardLimit; + do { + currentShardLimit = shardLimitSupplier.getAsLong(); + long shardCurrentBytes = shardCurrentBytesSupplier.getAsLong(); + + if(((double)shardCurrentBytes / currentShardLimit) > this.upperOperatingFactor) { + newShardLimit = (long)(shardCurrentBytes / this.optimalOperatingFactor); + long totalShardLimitsExceptCurrentShard = this.shardIndexingPressureStore.getShardIndexingPressureHotStore() + .entrySet().stream() + .filter(entry -> (shardId != entry.getKey())) + .map(Map.Entry::getValue) + .mapToLong(getShardLimitFunction).sum(); + + if (totalShardLimitsExceptCurrentShard + newShardLimit > nodeLimit) { + logger.debug("Failed To Increase Shard Limit [shard_detail=[{}][{}}], " + + "shard_current_limit_bytes={}, " + "total_shard_limits_bytes_except_current_shard={}, " + + "expected_shard_limits_bytes={}]", + shardId.getIndexName(), shardId.id(), currentShardLimit, totalShardLimitsExceptCurrentShard, newShardLimit); + return false; + } + } else { + return true; + } + } while(!updateShardLimitPredicate.test(currentShardLimit, newShardLimit)); + + logger.debug("Increased Shard Limit [" + + "shard_detail=[{}][{}], old_shard_limit_bytes={}, " + "new_shard_limit_bytes={}]", + shardId.getIndexName(), shardId.id(), currentShardLimit, newShardLimit); + return true; + } + + private void decreaseShardLimits(ShardId shardId, LongSupplier shardCurrentBytesSupplier, LongSupplier shardLimitSupplier, + BiPredicate updateShardLimitPredicate, long shardBaseLimit) { + + long currentShardLimit; + long newShardLimit; + do { + currentShardLimit = shardLimitSupplier.getAsLong(); + long shardCurrentBytes = shardCurrentBytesSupplier.getAsLong(); + newShardLimit = Math.max((long) (shardCurrentBytes / this.optimalOperatingFactor), shardBaseLimit); + + if (((double)shardCurrentBytes / currentShardLimit) > this.lowerOperatingFactor) { + logger.debug("Shard Limits Already Decreased [" + + "shard_detail=[{}][{}], " + "current_shard_limit_bytes={}, " + + "expected_shard_limit_bytes={}]", + shardId.getIndexName(), shardId.id(), currentShardLimit, newShardLimit); + return; + } + } while(!updateShardLimitPredicate.test(currentShardLimit,newShardLimit)); + + logger.debug("Decreased Shard Limit [shard_detail=[{}][{}], " + + "old_shard_limit_bytes={}, new_shard_limit_bytes={}]", + shardId.getIndexName(), shardId.id(), currentShardLimit, newShardLimit); + } + + /** + * This evaluation returns true if throughput of last N request divided by the total lifetime requests throughput is greater than + * the degradation limits threshold. + */ + private boolean evaluateThroughputDegradationLimitsBreached(PerformanceTracker performanceTracker, StatsTracker statsTracker, + double degradationLimits) { + double throughputMovingAverage = Double.longBitsToDouble(performanceTracker.getThroughputMovingAverage()); + long throughputMovingQueueSize = performanceTracker.getThroughputMovingQueueSize(); + double throughputHistoricalAverage = (double)statsTracker.getTotalBytes() / performanceTracker.getLatencyInMillis(); + return throughputMovingAverage > 0 && throughputMovingQueueSize >= this.shardIndexingPressureSettings.getRequestSizeWindow() && + throughputHistoricalAverage / throughputMovingAverage > degradationLimits; + } + + /** + * This evaluation returns true if the difference in the current timestamp and last successful request timestamp is greater than + * the successful request elapsed-timeout threshold, and the total number of outstanding requests is greater than + * the maximum outstanding request-count threshold. + */ + private boolean evaluateLastSuccessfulRequestDurationLimitsBreached(PerformanceTracker performanceTracker, long requestStartTime) { + return (performanceTracker.getLastSuccessfulRequestTimestamp() > 0) && + (requestStartTime - performanceTracker.getLastSuccessfulRequestTimestamp()) > this.successfulRequestElapsedTimeout.millis() && + performanceTracker.getTotalOutstandingRequests() > this.maxOutstandingRequests; + } + + private void setLowerOperatingFactor(double lowerOperatingFactor) { + this.lowerOperatingFactor = lowerOperatingFactor; + } + + private void setOptimalOperatingFactor(double optimalOperatingFactor) { + this.optimalOperatingFactor = optimalOperatingFactor; + } + + private void setUpperOperatingFactor(double upperOperatingFactor) { + this.upperOperatingFactor = upperOperatingFactor; + } + + private void setSuccessfulRequestElapsedTimeout(TimeValue successfulRequestElapsedTimeout) { + this.successfulRequestElapsedTimeout = successfulRequestElapsedTimeout; + } + + private void setMaxOutstandingRequests(int maxOutstandingRequests) { + this.maxOutstandingRequests = maxOutstandingRequests; + } + + private void setThroughputDegradationLimits(double throughputDegradationLimits) { + this.primaryAndCoordinatingThroughputDegradationLimits = throughputDegradationLimits; + this.replicaThroughputDegradationLimits = this.primaryAndCoordinatingThroughputDegradationLimits * 1.5; + } + + private void setNodeSoftLimit(double nodeSoftLimit) { + this.nodeSoftLimit = nodeSoftLimit; + } + + private void incrementNodeLimitBreachedRejectionCount(RejectionTracker rejectionTracker) { + rejectionTracker.incrementNodeLimitsBreachedRejections(); + this.totalNodeLimitsBreachedRejections.incrementAndGet(); + } +} diff --git a/server/src/main/java/org/opensearch/index/ShardIndexingPressureTracker.java b/server/src/main/java/org/opensearch/index/ShardIndexingPressureTracker.java index 6378c303782c1..a2ebc45cf1226 100644 --- a/server/src/main/java/org/opensearch/index/ShardIndexingPressureTracker.java +++ b/server/src/main/java/org/opensearch/index/ShardIndexingPressureTracker.java @@ -57,10 +57,18 @@ public long getPrimaryAndCoordinatingLimits() { return primaryAndCoordinatingLimits.get(); } + public boolean compareAndSetPrimaryAndCoordinatingLimits(long expectedValue, long newValue) { + return primaryAndCoordinatingLimits.compareAndSet(expectedValue, newValue); + } + public long getReplicaLimits() { return replicaLimits.get(); } + public boolean compareAndSetReplicaLimits(long expectedValue, long newValue) { + return replicaLimits.compareAndSet(expectedValue, newValue); + } + public OperationTracker getCoordinatingOperationTracker() { return coordinatingOperationTracker; } @@ -116,10 +124,18 @@ public long getCurrentBytes() { return currentBytes.get(); } + public long incrementCurrentBytes(long bytes) { + return currentBytes.addAndGet(bytes); + } + public long getTotalBytes() { return totalBytes.get(); } + public long incrementTotalBytes(long bytes) { + return totalBytes.addAndGet(bytes); + } + public long getRequestCount() { return requestCount.get(); } @@ -149,13 +165,25 @@ public long getNodeLimitsBreachedRejections() { return nodeLimitsBreachedRejections.get(); } + public long incrementNodeLimitsBreachedRejections() { + return nodeLimitsBreachedRejections.incrementAndGet(); + } + public long getLastSuccessfulRequestLimitsBreachedRejections() { return lastSuccessfulRequestLimitsBreachedRejections.get(); } + public long incrementLastSuccessfulRequestLimitsBreachedRejections() { + return lastSuccessfulRequestLimitsBreachedRejections.incrementAndGet(); + } + public long getThroughputDegradationLimitsBreachedRejections() { return throughputDegradationLimitsBreachedRejections.get(); } + + public long incrementThroughputDegradationLimitsBreachedRejections() { + return throughputDegradationLimitsBreachedRejections.incrementAndGet(); + } } /** @@ -170,7 +198,7 @@ public long getThroughputDegradationLimitsBreachedRejections() { */ public static class PerformanceTracker { private final AtomicLong latencyInMillis = new AtomicLong(); - private final AtomicLong lastSuccessfulRequestTimestamp = new AtomicLong(); + private volatile long lastSuccessfulRequestTimestamp = 0; private final AtomicLong totalOutstandingRequests = new AtomicLong(); /** * Shard Window Throughput Tracker. @@ -184,18 +212,34 @@ public long getLatencyInMillis() { return latencyInMillis.get(); } + public long addLatencyInMillis(long latency) { + return latencyInMillis.addAndGet(latency); + } + public long getLastSuccessfulRequestTimestamp() { - return lastSuccessfulRequestTimestamp.get(); + return lastSuccessfulRequestTimestamp; + } + + public void updateLastSuccessfulRequestTimestamp(long timeStamp) { + lastSuccessfulRequestTimestamp = timeStamp; } public long getTotalOutstandingRequests() { return totalOutstandingRequests.get(); } + public long incrementTotalOutstandingRequests() { + return totalOutstandingRequests.incrementAndGet(); + } + public long getThroughputMovingAverage() { return throughputMovingAverage.get(); } + public long updateThroughputMovingAverage(long newAvg) { + return throughputMovingAverage.getAndSet(newAvg); + } + public boolean addNewThroughout(Double newThroughput) { return throughputMovingQueue.offer(newThroughput); } @@ -224,6 +268,10 @@ public long getCurrentCombinedCoordinatingAndPrimaryBytes() { return currentCombinedCoordinatingAndPrimaryBytes.get(); } + public long incrementCurrentCombinedCoordinatingAndPrimaryBytes(long bytes) { + return currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes); + } + public long getTotalCombinedCoordinatingAndPrimaryBytes() { return totalCombinedCoordinatingAndPrimaryBytes.get(); } diff --git a/server/src/test/java/org/opensearch/index/ShardIndexingPressureMemoryManagerTests.java b/server/src/test/java/org/opensearch/index/ShardIndexingPressureMemoryManagerTests.java new file mode 100644 index 0000000000000..41e7a9a6de2d1 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/ShardIndexingPressureMemoryManagerTests.java @@ -0,0 +1,526 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.ShardId; +import org.opensearch.test.OpenSearchTestCase; + +public class ShardIndexingPressureMemoryManagerTests extends OpenSearchTestCase { + + private final Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB") + .put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 1) + .put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), 20) + .put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 2) + .build(); + private final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + private final ShardIndexingPressureSettings shardIndexingPressureSettings = + new ShardIndexingPressureSettings(new ClusterService(settings, clusterSettings, null), settings, + IndexingPressure.MAX_INDEXING_BYTES.get(settings).getBytes()); + + private final Index index = new Index("IndexName", "UUID"); + private final ShardId shardId1 = new ShardId(index, 0); + private final ShardId shardId2 = new ShardId(index, 1); + + private final ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + + public void testCoordinatingNodeLevelBreach() { + ShardIndexingPressureTracker tracker = memoryManager.getShardIndexingPressureTracker(shardId1); + + assertFalse(memoryManager.isCoordinatingNodeLimitBreached(tracker, 1 * 1024)); + assertTrue(memoryManager.isCoordinatingNodeLimitBreached(tracker, 11 * 1024)); + } + + public void testPrimaryNodeLevelBreach() { + ShardIndexingPressureTracker tracker = memoryManager.getShardIndexingPressureTracker(shardId1); + + assertFalse(memoryManager.isPrimaryNodeLimitBreached(tracker, 1 * 1024)); + assertTrue(memoryManager.isPrimaryNodeLimitBreached(tracker, 11 * 1024)); + } + + public void testReplicaNodeLevelBreach() { + ShardIndexingPressureTracker tracker = memoryManager.getShardIndexingPressureTracker(shardId1); + + assertFalse(memoryManager.isReplicaNodeLimitBreached(tracker, 1 * 1024)); + assertTrue(memoryManager.isReplicaNodeLimitBreached(tracker, 16 * 1024)); + } + + public void testCoordinatingPrimaryShardLimitsNotBreached() { + ShardIndexingPressureTracker tracker = memoryManager.getShardIndexingPressureTracker(shardId1); + tracker.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(1); + long requestStartTime = System.nanoTime(); + + assertFalse(memoryManager.isCoordinatingShardLimitBreached(tracker, 1 * 1024, requestStartTime)); + assertFalse(memoryManager.isPrimaryShardLimitBreached(tracker, 1 * 1024, requestStartTime)); + } + + public void testReplicaShardLimitsNotBreached() { + ShardIndexingPressureTracker tracker = memoryManager.getShardIndexingPressureTracker(shardId1); + tracker.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(1); + long requestStartTime = System.nanoTime(); + + assertFalse(memoryManager.isReplicaShardLimitBreached(tracker, 1 * 1024, requestStartTime)); + } + + public void testCoordinatingPrimaryShardLimitsIncreasedAndSoftLimitNotBreached() { + ShardIndexingPressureTracker tracker = memoryManager.getShardIndexingPressureTracker(shardId1); + tracker.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(10); + long baseLimit = tracker.getPrimaryAndCoordinatingLimits(); + long requestStartTime = System.nanoTime(); + + assertFalse(memoryManager.isCoordinatingShardLimitBreached(tracker, 1 * 1024, requestStartTime)); + assertFalse(memoryManager.isPrimaryShardLimitBreached(tracker, 1 * 1024, requestStartTime)); + + assertTrue(tracker.getPrimaryAndCoordinatingLimits() > baseLimit); + assertEquals(tracker.getPrimaryAndCoordinatingLimits(), (long)(baseLimit/0.85)); + } + + public void testReplicaShardLimitsIncreasedAndSoftLimitNotBreached() { + ShardIndexingPressureTracker tracker = memoryManager.getShardIndexingPressureTracker(shardId1); + tracker.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(15); + long baseLimit = tracker.getReplicaLimits(); + long requestStartTime = System.nanoTime(); + + assertFalse(memoryManager.isReplicaShardLimitBreached(tracker, 1 * 1024, requestStartTime)); + assertTrue(tracker.getReplicaLimits() > baseLimit); + assertEquals(tracker.getReplicaLimits(), (long)(baseLimit/0.85)); + } + + public void testCoordinatingPrimarySoftLimitNotBreachedAndNodeLevelRejections() { + ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2); + tracker1.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(4 * 1024); + tracker2.compareAndSetPrimaryAndCoordinatingLimits(tracker2.getPrimaryAndCoordinatingLimits(), 6 * 1024); + long limit1 = tracker1.getPrimaryAndCoordinatingLimits(); + long limit2 = tracker2.getPrimaryAndCoordinatingLimits(); + long requestStartTime = System.nanoTime(); + + assertTrue(memoryManager.isCoordinatingShardLimitBreached(tracker1, 8 * 1024, requestStartTime)); + assertEquals(1, tracker1.getCoordinatingOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + assertEquals(0, tracker2.getCoordinatingOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + + assertTrue(memoryManager.isPrimaryShardLimitBreached(tracker1, 8 * 1024, requestStartTime)); + assertEquals(1, tracker1.getPrimaryOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + assertEquals(0, tracker2.getPrimaryOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + + assertEquals(limit1, tracker1.getPrimaryAndCoordinatingLimits()); + assertEquals(limit2, tracker2.getPrimaryAndCoordinatingLimits()); + assertEquals(2, memoryManager.getTotalNodeLimitsBreachedRejections()); + } + + public void testReplicaShardLimitsSoftLimitNotBreachedAndNodeLevelRejections() { + ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2); + tracker1.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(5 * 1024); + tracker2.compareAndSetReplicaLimits(tracker2.getReplicaLimits(), 10 * 1024); + long limit1 = tracker1.getReplicaLimits(); + long limit2 = tracker2.getReplicaLimits(); + long requestStartTime = System.nanoTime(); + + assertTrue(memoryManager.isReplicaShardLimitBreached(tracker1, 10 * 1024, requestStartTime)); + assertEquals(limit1, tracker1.getReplicaLimits()); + assertEquals(limit2, tracker2.getReplicaLimits()); + assertEquals(1, tracker1.getReplicaOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + assertEquals(0, tracker2.getReplicaOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + assertEquals(1, memoryManager.getTotalNodeLimitsBreachedRejections()); + } + + public void testCoordinatingPrimarySoftLimitBreachedAndNodeLevelRejections() { + ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2); + tracker1.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(4 * 1024); + tracker2.compareAndSetPrimaryAndCoordinatingLimits(tracker2.getPrimaryAndCoordinatingLimits(), 6 * 1024); + long limit1 = tracker1.getPrimaryAndCoordinatingLimits(); + long limit2 = tracker2.getPrimaryAndCoordinatingLimits(); + long requestStartTime = System.nanoTime(); + + assertTrue(memoryManager.isCoordinatingShardLimitBreached(tracker1, 8 * 1024, requestStartTime)); + assertEquals(1, tracker1.getCoordinatingOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + assertEquals(0, tracker2.getCoordinatingOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + + assertTrue(memoryManager.isPrimaryShardLimitBreached(tracker1, 8 * 1024, requestStartTime)); + assertEquals(1, tracker1.getPrimaryOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + assertEquals(0, tracker2.getPrimaryOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + + assertEquals(limit1, tracker1.getPrimaryAndCoordinatingLimits()); + assertEquals(limit2, tracker2.getPrimaryAndCoordinatingLimits()); + assertEquals(2, memoryManager.getTotalNodeLimitsBreachedRejections()); + } + + public void testReplicaShardLimitsSoftLimitBreachedAndNodeLevelRejections() { + ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2); + tracker1.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(5 * 1024); + tracker2.compareAndSetReplicaLimits(tracker2.getReplicaLimits(), 12 * 1024); + long limit1 = tracker1.getReplicaLimits(); + long limit2 = tracker2.getReplicaLimits(); + long requestStartTime = System.nanoTime(); + + assertTrue(memoryManager.isReplicaShardLimitBreached(tracker1, 12 * 1024, requestStartTime)); + assertEquals(limit1, tracker1.getReplicaLimits()); + assertEquals(limit2, tracker2.getReplicaLimits()); + assertEquals(1, tracker1.getReplicaOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + assertEquals(0, tracker2.getReplicaOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + assertEquals(1, memoryManager.getTotalNodeLimitsBreachedRejections()); + } + + public void testCoordinatingPrimarySoftLimitBreachedAndLastSuccessfulRequestLimitRejections() { + ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2); + tracker1.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(4 * 1024); + tracker2.compareAndSetPrimaryAndCoordinatingLimits(tracker2.getPrimaryAndCoordinatingLimits(), 6 * 1024); + long limit1 = tracker1.getPrimaryAndCoordinatingLimits(); + long limit2 = tracker2.getPrimaryAndCoordinatingLimits(); + long requestStartTime = System.nanoTime(); + + tracker1.getCoordinatingOperationTracker().getPerformanceTracker().updateLastSuccessfulRequestTimestamp(requestStartTime - 100); + tracker1.getCoordinatingOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + tracker1.getCoordinatingOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + + assertTrue(memoryManager.isCoordinatingShardLimitBreached(tracker1, 8 * 1024, requestStartTime)); + assertEquals(1, tracker1.getCoordinatingOperationTracker().getRejectionTracker() + .getLastSuccessfulRequestLimitsBreachedRejections()); + assertEquals(0, tracker2.getCoordinatingOperationTracker().getRejectionTracker() + .getLastSuccessfulRequestLimitsBreachedRejections()); + + tracker1.getPrimaryOperationTracker().getPerformanceTracker().updateLastSuccessfulRequestTimestamp(requestStartTime - 100); + tracker1.getPrimaryOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + tracker1.getPrimaryOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + + assertTrue(memoryManager.isPrimaryShardLimitBreached(tracker1, 8 * 1024, requestStartTime)); + assertEquals(1, tracker1.getPrimaryOperationTracker().getRejectionTracker() + .getLastSuccessfulRequestLimitsBreachedRejections()); + assertEquals(0, tracker2.getPrimaryOperationTracker().getRejectionTracker() + .getLastSuccessfulRequestLimitsBreachedRejections()); + + assertEquals(limit1, tracker1.getPrimaryAndCoordinatingLimits()); + assertEquals(limit2, tracker2.getPrimaryAndCoordinatingLimits()); + assertEquals(2, memoryManager.getTotalLastSuccessfulRequestLimitsBreachedRejections()); + } + + public void testReplicaShardLimitsSoftLimitBreachedAndLastSuccessfulRequestLimitRejections() { + ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2); + tracker1.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(5 * 1024); + tracker2.compareAndSetReplicaLimits(tracker2.getReplicaLimits(), 12 * 1024); + long limit1 = tracker1.getReplicaLimits(); + long limit2 = tracker2.getReplicaLimits(); + long requestStartTime = System.nanoTime(); + tracker1.getReplicaOperationTracker().getPerformanceTracker().updateLastSuccessfulRequestTimestamp(requestStartTime - 100); + tracker1.getReplicaOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + tracker1.getReplicaOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + + assertTrue(memoryManager.isReplicaShardLimitBreached(tracker1, 12 * 1024, requestStartTime)); + assertEquals(limit1, tracker1.getReplicaLimits()); + assertEquals(limit2, tracker2.getReplicaLimits()); + assertEquals(1, tracker1.getReplicaOperationTracker().getRejectionTracker() + .getLastSuccessfulRequestLimitsBreachedRejections()); + assertEquals(0, tracker2.getReplicaOperationTracker().getRejectionTracker() + .getLastSuccessfulRequestLimitsBreachedRejections()); + assertEquals(1, memoryManager.getTotalLastSuccessfulRequestLimitsBreachedRejections()); + } + + public void testCoordinatingPrimarySoftLimitBreachedAndLessOutstandingRequestsAndNoLastSuccessfulRequestLimitRejections() { + ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2); + tracker1.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(1 * 1024); + tracker2.compareAndSetPrimaryAndCoordinatingLimits(tracker2.getPrimaryAndCoordinatingLimits(), 6 * 1024); + long limit1 = tracker1.getPrimaryAndCoordinatingLimits(); + long limit2 = tracker2.getPrimaryAndCoordinatingLimits(); + long requestStartTime = System.nanoTime(); + + tracker1.getCoordinatingOperationTracker().getPerformanceTracker().updateLastSuccessfulRequestTimestamp(requestStartTime - 100); + tracker1.getCoordinatingOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + + assertFalse(memoryManager.isCoordinatingShardLimitBreached(tracker1, 8 * 1024, requestStartTime)); + assertEquals(0, tracker1.getCoordinatingOperationTracker().getRejectionTracker() + .getLastSuccessfulRequestLimitsBreachedRejections()); + assertEquals(0, tracker2.getCoordinatingOperationTracker().getRejectionTracker() + .getLastSuccessfulRequestLimitsBreachedRejections()); + + tracker1.getPrimaryOperationTracker().getPerformanceTracker().updateLastSuccessfulRequestTimestamp(requestStartTime - 100); + tracker1.getPrimaryOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + + assertFalse(memoryManager.isPrimaryShardLimitBreached(tracker1, 8 * 1024, requestStartTime)); + assertEquals(0, tracker1.getPrimaryOperationTracker().getRejectionTracker() + .getLastSuccessfulRequestLimitsBreachedRejections()); + assertEquals(0, tracker2.getPrimaryOperationTracker().getRejectionTracker() + .getLastSuccessfulRequestLimitsBreachedRejections()); + + assertTrue(tracker1.getPrimaryAndCoordinatingLimits() > limit1); + assertEquals((long)(1 * 1024/0.85), tracker1.getPrimaryAndCoordinatingLimits()); + assertEquals(limit2, tracker2.getPrimaryAndCoordinatingLimits()); + assertEquals(0, memoryManager.getTotalLastSuccessfulRequestLimitsBreachedRejections()); + } + + public void testReplicaShardLimitsSoftLimitBreachedAndLessOutstandingRequestsAndNoLastSuccessfulRequestLimitRejections() { + ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2); + tracker1.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(2 * 1024); + tracker2.compareAndSetReplicaLimits(tracker2.getReplicaLimits(), 12 * 1024); + long limit1 = tracker1.getReplicaLimits(); + long limit2 = tracker2.getReplicaLimits(); + long requestStartTime = System.nanoTime(); + tracker1.getReplicaOperationTracker().getPerformanceTracker().updateLastSuccessfulRequestTimestamp(requestStartTime - 100); + tracker1.getReplicaOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + + assertFalse(memoryManager.isReplicaShardLimitBreached(tracker1, 12 * 1024, requestStartTime)); + assertTrue(tracker1.getReplicaLimits() > limit1); + assertEquals((long)(2 * 1024/0.85), tracker1.getReplicaLimits()); + assertEquals(limit2, tracker2.getReplicaLimits()); + assertEquals(0, tracker1.getReplicaOperationTracker().getRejectionTracker() + .getLastSuccessfulRequestLimitsBreachedRejections()); + assertEquals(0, tracker2.getReplicaOperationTracker().getRejectionTracker() + .getLastSuccessfulRequestLimitsBreachedRejections()); + assertEquals(0, memoryManager.getTotalLastSuccessfulRequestLimitsBreachedRejections()); + } + + public void testCoordinatingPrimarySoftLimitBreachedAndThroughputDegradationLimitRejections() { + ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2); + tracker1.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(4 * 1024); + tracker2.compareAndSetPrimaryAndCoordinatingLimits(tracker2.getPrimaryAndCoordinatingLimits(), 6 * 1024); + long limit1 = tracker1.getPrimaryAndCoordinatingLimits(); + long limit2 = tracker2.getPrimaryAndCoordinatingLimits(); + long requestStartTime = System.nanoTime(); + + tracker1.getCoordinatingOperationTracker().getPerformanceTracker().updateThroughputMovingAverage(Double.doubleToLongBits(1d)); + tracker1.getCoordinatingOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + tracker1.getCoordinatingOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + tracker1.getCoordinatingOperationTracker().getStatsTracker().incrementTotalBytes(60); + tracker1.getCoordinatingOperationTracker().getPerformanceTracker().addLatencyInMillis(10); + tracker1.getCoordinatingOperationTracker().getPerformanceTracker().addNewThroughout(1d); + tracker1.getCoordinatingOperationTracker().getPerformanceTracker().addNewThroughout(2d); + + assertTrue(memoryManager.isCoordinatingShardLimitBreached(tracker1, 8 * 1024, requestStartTime)); + assertEquals(1, tracker1.getCoordinatingOperationTracker().getRejectionTracker() + .getThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, tracker2.getCoordinatingOperationTracker().getRejectionTracker() + .getThroughputDegradationLimitsBreachedRejections()); + + tracker1.getPrimaryOperationTracker().getPerformanceTracker().updateThroughputMovingAverage(Double.doubleToLongBits(1d)); + tracker1.getPrimaryOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + tracker1.getPrimaryOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + tracker1.getPrimaryOperationTracker().getStatsTracker().incrementTotalBytes(60); + tracker1.getPrimaryOperationTracker().getPerformanceTracker().addLatencyInMillis(10); + tracker1.getPrimaryOperationTracker().getPerformanceTracker().addNewThroughout(1d); + tracker1.getPrimaryOperationTracker().getPerformanceTracker().addNewThroughout(2d); + + assertTrue(memoryManager.isPrimaryShardLimitBreached(tracker1, 8 * 1024, requestStartTime)); + assertEquals(1, tracker1.getPrimaryOperationTracker().getRejectionTracker() + .getThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, tracker2.getPrimaryOperationTracker().getRejectionTracker() + .getThroughputDegradationLimitsBreachedRejections()); + + assertEquals(limit1, tracker1.getPrimaryAndCoordinatingLimits()); + assertEquals(limit2, tracker2.getPrimaryAndCoordinatingLimits()); + assertEquals(2, memoryManager.getTotalThroughputDegradationLimitsBreachedRejections()); + } + + public void testReplicaShardLimitsSoftLimitBreachedAndThroughputDegradationLimitRejections() { + ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2); + tracker1.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(5 * 1024); + tracker2.compareAndSetReplicaLimits(tracker2.getReplicaLimits(), 12 * 1024); + long limit1 = tracker1.getReplicaLimits(); + long limit2 = tracker2.getReplicaLimits(); + long requestStartTime = System.nanoTime(); + tracker1.getReplicaOperationTracker().getPerformanceTracker().updateThroughputMovingAverage(Double.doubleToLongBits(1d)); + tracker1.getReplicaOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + tracker1.getReplicaOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + tracker1.getReplicaOperationTracker().getStatsTracker().incrementTotalBytes(80); + tracker1.getReplicaOperationTracker().getPerformanceTracker().addLatencyInMillis(10); + tracker1.getReplicaOperationTracker().getPerformanceTracker().addNewThroughout(1d); + tracker1.getReplicaOperationTracker().getPerformanceTracker().addNewThroughout(2d); + + assertTrue(memoryManager.isReplicaShardLimitBreached(tracker1, 12 * 1024, requestStartTime)); + assertEquals(limit1, tracker1.getReplicaLimits()); + assertEquals(limit2, tracker2.getReplicaLimits()); + assertEquals(1, tracker1.getReplicaOperationTracker().getRejectionTracker() + .getThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, tracker2.getReplicaOperationTracker().getRejectionTracker() + .getThroughputDegradationLimitsBreachedRejections()); + assertEquals(1, memoryManager.getTotalThroughputDegradationLimitsBreachedRejections()); + } + + public void testCoordinatingPrimarySoftLimitBreachedAndMovingAverageQueueNotBuildUpAndNoThroughputDegradationLimitRejections() { + ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2); + tracker1.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(1 * 1024); + tracker2.compareAndSetPrimaryAndCoordinatingLimits(tracker2.getPrimaryAndCoordinatingLimits(), 6 * 1024); + long limit1 = tracker1.getPrimaryAndCoordinatingLimits(); + long limit2 = tracker2.getPrimaryAndCoordinatingLimits(); + long requestStartTime = System.nanoTime(); + + tracker1.getCoordinatingOperationTracker().getPerformanceTracker().updateThroughputMovingAverage + (Double.doubleToLongBits(1d)); + tracker1.getCoordinatingOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + tracker1.getCoordinatingOperationTracker().getStatsTracker().incrementTotalBytes(60); + tracker1.getCoordinatingOperationTracker().getPerformanceTracker().addLatencyInMillis(10); + tracker1.getCoordinatingOperationTracker().getPerformanceTracker().addNewThroughout(1d); + + assertFalse(memoryManager.isCoordinatingShardLimitBreached(tracker1, 8 * 1024, requestStartTime)); + assertEquals(0, tracker1.getCoordinatingOperationTracker().getRejectionTracker() + .getThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, tracker2.getCoordinatingOperationTracker().getRejectionTracker() + .getThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, tracker1.getCoordinatingOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + assertEquals(0, tracker2.getCoordinatingOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + + tracker1.getPrimaryOperationTracker().getPerformanceTracker().updateThroughputMovingAverage(Double.doubleToLongBits(1d)); + tracker1.getPrimaryOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + tracker1.getPrimaryOperationTracker().getStatsTracker().incrementTotalBytes(60); + tracker1.getPrimaryOperationTracker().getPerformanceTracker().addLatencyInMillis(10); + tracker1.getPrimaryOperationTracker().getPerformanceTracker().addNewThroughout(1d); + + assertFalse(memoryManager.isPrimaryShardLimitBreached(tracker1, 8 * 1024, requestStartTime)); + assertEquals(0, tracker1.getPrimaryOperationTracker().getRejectionTracker() + .getThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, tracker2.getPrimaryOperationTracker().getRejectionTracker() + .getThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, tracker1.getPrimaryOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + assertEquals(0, tracker2.getPrimaryOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + + assertTrue(tracker1.getPrimaryAndCoordinatingLimits() > limit1); + assertEquals((long)(1 * 1024/0.85), tracker1.getPrimaryAndCoordinatingLimits()); + assertEquals(limit2, tracker2.getPrimaryAndCoordinatingLimits()); + assertEquals(0, memoryManager.getTotalThroughputDegradationLimitsBreachedRejections()); + } + + public void testReplicaShardLimitsSoftLimitBreachedAndMovingAverageQueueNotBuildUpAndNThroughputDegradationLimitRejections() { + ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2); + tracker1.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(2 * 1024); + tracker2.compareAndSetReplicaLimits(tracker2.getReplicaLimits(), 12 * 1024); + long limit1 = tracker1.getReplicaLimits(); + long limit2 = tracker2.getReplicaLimits(); + long requestStartTime = System.nanoTime(); + tracker1.getReplicaOperationTracker().getPerformanceTracker().updateThroughputMovingAverage(Double.doubleToLongBits(1d)); + tracker1.getReplicaOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + tracker1.getReplicaOperationTracker().getStatsTracker().incrementTotalBytes(80); + tracker1.getReplicaOperationTracker().getPerformanceTracker().addLatencyInMillis(10); + tracker1.getReplicaOperationTracker().getPerformanceTracker().addNewThroughout(1d); + + assertFalse(memoryManager.isReplicaShardLimitBreached(tracker1, 12 * 1024, requestStartTime)); + assertTrue(tracker1.getReplicaLimits() > limit1); + assertEquals((long)(2 * 1024/0.85), tracker1.getReplicaLimits()); + assertEquals(limit2, tracker2.getReplicaLimits()); + assertEquals(0, tracker1.getReplicaOperationTracker().getRejectionTracker() + .getThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, tracker2.getReplicaOperationTracker().getRejectionTracker() + .getThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, tracker1.getReplicaOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + assertEquals(0, memoryManager.getTotalThroughputDegradationLimitsBreachedRejections()); + } + + public void testCoordinatingPrimarySoftLimitBreachedAndNoSecondaryParameterBreachedAndNodeLevelRejections() { + ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2); + tracker1.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(4 * 1024); + tracker2.compareAndSetPrimaryAndCoordinatingLimits(tracker2.getPrimaryAndCoordinatingLimits(), 6 * 1024); + long limit1 = tracker1.getPrimaryAndCoordinatingLimits(); + long limit2 = tracker2.getPrimaryAndCoordinatingLimits(); + long requestStartTime = System.nanoTime(); + + tracker1.getCoordinatingOperationTracker().getPerformanceTracker().updateThroughputMovingAverage(Double.doubleToLongBits(1d)); + tracker1.getCoordinatingOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + tracker1.getCoordinatingOperationTracker().getStatsTracker().incrementTotalBytes(60); + tracker1.getCoordinatingOperationTracker().getPerformanceTracker().addLatencyInMillis(10); + tracker1.getCoordinatingOperationTracker().getPerformanceTracker().addNewThroughout(1d); + + assertTrue(memoryManager.isCoordinatingShardLimitBreached(tracker1, 8 * 1024, requestStartTime)); + assertEquals(1, tracker1.getCoordinatingOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + assertEquals(0, tracker2.getCoordinatingOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + + tracker1.getPrimaryOperationTracker().getPerformanceTracker().updateThroughputMovingAverage(Double.doubleToLongBits(1d)); + tracker1.getPrimaryOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + tracker1.getPrimaryOperationTracker().getStatsTracker().incrementTotalBytes(60); + tracker1.getPrimaryOperationTracker().getPerformanceTracker().addLatencyInMillis(10); + tracker1.getPrimaryOperationTracker().getPerformanceTracker().addNewThroughout(1d); + + assertTrue(memoryManager.isPrimaryShardLimitBreached(tracker1, 8 * 1024, requestStartTime)); + assertEquals(1, tracker1.getPrimaryOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + assertEquals(0, tracker2.getPrimaryOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + + assertEquals(limit1, tracker1.getPrimaryAndCoordinatingLimits()); + assertEquals(limit2, tracker2.getPrimaryAndCoordinatingLimits()); + assertEquals(2, memoryManager.getTotalNodeLimitsBreachedRejections()); + } + + public void testReplicaShardLimitsSoftLimitBreachedAndNoSecondaryParameterBreachedAndNodeLevelRejections() { + ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2); + tracker1.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(5 * 1024); + tracker2.compareAndSetReplicaLimits(tracker2.getReplicaLimits(), 12 * 1024); + long limit1 = tracker1.getReplicaLimits(); + long limit2 = tracker2.getReplicaLimits(); + long requestStartTime = System.nanoTime(); + tracker1.getReplicaOperationTracker().getPerformanceTracker().updateThroughputMovingAverage(Double.doubleToLongBits(1d)); + tracker1.getReplicaOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests(); + tracker1.getReplicaOperationTracker().getStatsTracker().incrementTotalBytes(80); + tracker1.getReplicaOperationTracker().getPerformanceTracker().addLatencyInMillis(10); + tracker1.getReplicaOperationTracker().getPerformanceTracker().addNewThroughout(1d); + + assertTrue(memoryManager.isReplicaShardLimitBreached(tracker1, 12 * 1024, requestStartTime)); + assertEquals(limit1, tracker1.getReplicaLimits()); + assertEquals(limit2, tracker2.getReplicaLimits()); + assertEquals(1, tracker1.getReplicaOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + assertEquals(0, tracker2.getReplicaOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections()); + assertEquals(1, memoryManager.getTotalNodeLimitsBreachedRejections()); + } + + public void testDecreaseShardPrimaryAndCoordinatingLimitsToBaseLimit() { + ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1); + tracker1.compareAndSetPrimaryAndCoordinatingLimits(tracker1.getPrimaryAndCoordinatingLimits(), 1 * 1024); + tracker1.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(0); + long limit1 = tracker1.getPrimaryAndCoordinatingLimits(); + memoryManager.decreaseShardPrimaryAndCoordinatingLimits(tracker1); + + assertTrue(tracker1.getPrimaryAndCoordinatingLimits() < limit1); + assertEquals(10, tracker1.getPrimaryAndCoordinatingLimits()); + } + + public void testDecreaseShardReplicaLimitsToBaseLimit() { + ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1); + + tracker1.compareAndSetReplicaLimits(tracker1.getReplicaLimits(), 1 * 1024); + tracker1.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(0); + long limit1 = tracker1.getReplicaLimits(); + memoryManager.decreaseShardReplicaLimits(tracker1); + + assertTrue(tracker1.getReplicaLimits() < limit1); + assertEquals(15, tracker1.getReplicaLimits()); + } + + public void testDecreaseShardPrimaryAndCoordinatingLimits() { + ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1); + tracker1.compareAndSetPrimaryAndCoordinatingLimits(tracker1.getPrimaryAndCoordinatingLimits(), 1 * 1024); + tracker1.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(512); + long limit1 = tracker1.getPrimaryAndCoordinatingLimits(); + memoryManager.decreaseShardPrimaryAndCoordinatingLimits(tracker1); + + assertTrue(tracker1.getPrimaryAndCoordinatingLimits() < limit1); + assertEquals((long)(512/0.85), tracker1.getPrimaryAndCoordinatingLimits()); + } + + public void testDecreaseShardReplicaLimits() { + ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1); + + tracker1.compareAndSetReplicaLimits(tracker1.getReplicaLimits(), 1 * 1024); + tracker1.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(512); + long limit1 = tracker1.getReplicaLimits(); + memoryManager.decreaseShardReplicaLimits(tracker1); + + assertTrue(tracker1.getReplicaLimits() < limit1); + assertEquals((long)(512/0.85), tracker1.getReplicaLimits()); + } +}