From 71c40180720454c482a4e67916dfac2b37a8a1ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dharmesh=20=F0=9F=92=A4?= Date: Fri, 9 Apr 2021 17:25:16 +0530 Subject: [PATCH] Added ShardIndexingPressureMemoryManager Tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Dharmesh 💤 --- .../ShardIndexingPressureMemoryManager.java | 24 +- ...ardIndexingPressureMemoryManagerTests.java | 587 ++++++++++++++++++ 2 files changed, 599 insertions(+), 12 deletions(-) create mode 100644 server/src/test/java/org/opensearch/index/ShardIndexingPressureMemoryManagerTests.java diff --git a/server/src/main/java/org/opensearch/index/ShardIndexingPressureMemoryManager.java b/server/src/main/java/org/opensearch/index/ShardIndexingPressureMemoryManager.java index 46edc8d0074d0..eeaafe3e9e13d 100644 --- a/server/src/main/java/org/opensearch/index/ShardIndexingPressureMemoryManager.java +++ b/server/src/main/java/org/opensearch/index/ShardIndexingPressureMemoryManager.java @@ -21,11 +21,11 @@ * and the values will be modified in certain scenarios. * * 1. If the limits assigned to the shard is breached(Primary Parameter) and the node level occupancy of all shards - * is not greater than 70%(Primary Parameter), we will be increasing the shard limits without any further evaluation. + * is not greater than primary_parameter.node.soft_limit, we will be increasing the shard limits without any further evaluation. * 2. If the limits assigned to the shard is breached(Primary Parameter) and the node level occupancy of all the shards - * is greater than 70%(Primary Parameter) is when we will evaluate certain parameters like throughput degradation(Secondary Parameter) - * and last successful request elapsed timeout(Secondary Parameter) to evaluate if the limits for the shard needs to - * be modified or not. + * is greater than primary_parameter.node.soft_limit is when we will evaluate certain parameters like + * throughput degradation(Secondary Parameter) and last successful request elapsed timeout(Secondary Parameter) to evaluate if the limits + * for the shard needs to be modified or not. * * Secondary Parameters * 1. ThroughputDegradationLimitsBreached - When the moving window throughput average has increased by some factor than @@ -35,8 +35,8 @@ * current request timestamp is greater than the max timeout value and the number of outstanding requests is greater * than the max outstanding requests then this parameter is said to be breached. * - * Note : Every time we try to increase of decrease the shard limits. In case the shard utilization goes below 75% or - * goes above 95% of current shard limits then we try to set the new shard limit to be 85% of + * Note : Every time we try to increase of decrease the shard limits. In case the shard utilization goes below operating_factor.lower or + * goes above operating_factor.upper of current shard limits then we try to set the new shard limit to be operating_factor.optimal of * current shard utilization. * */ @@ -150,7 +150,7 @@ boolean isPrimaryNodeLimitBreached(ShardIndexingPressureTracker tracker, long no boolean isPrimaryShardLimitBreached(ShardIndexingPressureTracker tracker, long requestStartTime, Map shardIndexingPressureStore, long nodeTotalBytes) { - //Memory limits is breached when the current utilization is greater than 95% of total shard limits. + /* Memory limits is breached when the current utilization is greater than operating_factor.upper of total shard limits. */ long shardCombinedBytes = tracker.currentCombinedCoordinatingAndPrimaryBytes.get(); long shardPrimaryAndCoordinatingLimits = tracker.primaryAndCoordinatingLimits.get(); boolean shardMemoryLimitsBreached = @@ -159,7 +159,7 @@ boolean isPrimaryShardLimitBreached(ShardIndexingPressureTracker tracker, long r if(shardMemoryLimitsBreached) { /* Secondary Parameters(i.e. LastSuccessfulRequestDuration and Throughput) is taken into consideration when - the current node utilization is greater than 70% of total node limits. + the current node utilization is greater than primary_parameter.node.soft_limit of total node limits. */ if(((double)nodeTotalBytes / this.shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits()) < this.nodeSoftLimit) { boolean isShardLimitsIncreased = @@ -225,7 +225,7 @@ boolean isCoordinatingNodeLimitBreached(ShardIndexingPressureTracker tracker, lo boolean isCoordinatingShardLimitBreached(ShardIndexingPressureTracker tracker, long requestStartTime, Map shardIndexingPressureStore, long nodeTotalBytes) { - //Shard memory limit is breached when the current utilization is greater than 95% of total shard limits. + //Shard memory limit is breached when the current utilization is greater than operating_factor.upper of total shard limits. long shardCombinedBytes = tracker.currentCombinedCoordinatingAndPrimaryBytes.get(); long shardPrimaryAndCoordinatingLimits = tracker.primaryAndCoordinatingLimits.get(); boolean shardMemoryLimitsBreached = @@ -234,7 +234,7 @@ boolean isCoordinatingShardLimitBreached(ShardIndexingPressureTracker tracker, l if(shardMemoryLimitsBreached) { /* Secondary Parameters(i.e. LastSuccessfulRequestDuration and Throughput) is taken into consideration when - the current node utilization is greater than 70% of total node limits. + the current node utilization is greater than primary_parameter.node.soft_limit of total node limits. */ if(((double)nodeTotalBytes / this.shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits()) < this.nodeSoftLimit) { boolean isShardLimitsIncreased = @@ -299,7 +299,7 @@ boolean isReplicaNodeLimitBreached(ShardIndexingPressureTracker tracker, long no boolean isReplicaShardLimitBreached(ShardIndexingPressureTracker tracker, long requestStartTime, Map shardIndexingPressureStore, long nodeReplicaBytes) { - //Memory limits is breached when the current utilization is greater than 95% of total shard limits. + //Memory limits is breached when the current utilization is greater than operating_factor.upper of total shard limits. long shardReplicaBytes = tracker.currentReplicaBytes.get(); long shardReplicaLimits = tracker.replicaLimits.get(); final boolean shardMemoryLimitsBreached = @@ -308,7 +308,7 @@ boolean isReplicaShardLimitBreached(ShardIndexingPressureTracker tracker, long r if(shardMemoryLimitsBreached) { /* Secondary Parameters(i.e. LastSuccessfulRequestDuration and Throughput) is taken into consideration when - the current node utilization is greater than 70% of total node limits. + the current node utilization is greater than primary_parameter.node.soft_limit of total node limits. */ if(((double)nodeReplicaBytes / this.shardIndexingPressureSettings.getNodeReplicaLimits()) < this.nodeSoftLimit) { boolean isShardLimitsIncreased = diff --git a/server/src/test/java/org/opensearch/index/ShardIndexingPressureMemoryManagerTests.java b/server/src/test/java/org/opensearch/index/ShardIndexingPressureMemoryManagerTests.java new file mode 100644 index 0000000000000..cd9cfe0326f3a --- /dev/null +++ b/server/src/test/java/org/opensearch/index/ShardIndexingPressureMemoryManagerTests.java @@ -0,0 +1,587 @@ +/* + * Copyright OpenSearch Contributors. + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.index; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.ShardId; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Collections; +import java.util.Map; + +public class ShardIndexingPressureMemoryManagerTests extends OpenSearchTestCase { + + private final Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB") + .put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 1) + .put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), 20) + .put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 2) + .build(); + private final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + private final ShardIndexingPressureSettings shardIndexingPressureSettings = + new ShardIndexingPressureSettings(clusterSettings, settings, IndexingPressure.MAX_INDEXING_BYTES.get(settings).getBytes()); + + private final Index index = new Index("IndexName", "UUID"); + private final ShardId shardId1 = new ShardId(index, 0); + private final ShardId shardId2 = new ShardId(index, 1); + + public void testCoordinatingPrimaryShardLimitsNotBreached() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(shardId1); + tracker.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(1); + long requestStartTime = System.currentTimeMillis(); + boolean randomBoolean = randomBoolean(); + Map hotStore = store.getShardIndexingPressureHotStore(); + + if(randomBoolean) { + assertFalse(memoryManager.isCoordinatingShardLimitBreached(tracker, requestStartTime, hotStore, 1 * 1024)); + } else { + assertFalse(memoryManager.isPrimaryShardLimitBreached(tracker, requestStartTime, hotStore, 1 * 1024)); + } + } + + public void testReplicaShardLimitsNotBreached() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(shardId1); + tracker.currentReplicaBytes.addAndGet(1); + long requestStartTime = System.currentTimeMillis(); + Map hotStore = Collections.singletonMap((long) shardId1.hashCode(), tracker); + + assertFalse(memoryManager.isReplicaShardLimitBreached(tracker, requestStartTime, hotStore, 1 * 1024)); + } + + public void testCoordinatingPrimaryShardLimitsIncreasedAndSoftLimitNotBreached() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(shardId1); + tracker.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(10); + long baseLimit = tracker.primaryAndCoordinatingLimits.get(); + long requestStartTime = System.currentTimeMillis(); + boolean randomBoolean = randomBoolean(); + Map hotStore = store.getShardIndexingPressureHotStore(); + + if(randomBoolean) { + assertFalse(memoryManager.isCoordinatingShardLimitBreached(tracker, requestStartTime, hotStore, 1 * 1024)); + } else { + assertFalse(memoryManager.isPrimaryShardLimitBreached(tracker, requestStartTime, hotStore, 1 * 1024)); + } + + assertTrue(tracker.primaryAndCoordinatingLimits.get() > baseLimit); + assertEquals(tracker.primaryAndCoordinatingLimits.get(), (long)(baseLimit/0.85)); + } + + public void testReplicaShardLimitsIncreasedAndSoftLimitNotBreached() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(shardId1); + tracker.currentReplicaBytes.addAndGet(15); + long baseLimit = tracker.replicaLimits.get(); + long requestStartTime = System.currentTimeMillis(); + Map hotStore = store.getShardIndexingPressureHotStore(); + + assertFalse(memoryManager.isReplicaShardLimitBreached(tracker, requestStartTime, hotStore, 1 * 1024)); + assertTrue(tracker.replicaLimits.get() > baseLimit); + assertEquals(tracker.replicaLimits.get(), (long)(baseLimit/0.85)); + } + + public void testCoordinatingPrimarySoftLimitNotBreachedAndNodeLevelRejections() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = store.getShardIndexingPressureTracker(shardId2); + tracker1.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(4 * 1024); + tracker2.primaryAndCoordinatingLimits.addAndGet(6 * 1024); + long limit1 = tracker1.primaryAndCoordinatingLimits.get(); + long limit2 = tracker2.primaryAndCoordinatingLimits.get(); + long requestStartTime = System.currentTimeMillis(); + boolean randomBoolean = randomBoolean(); + Map hotStore = store.getShardIndexingPressureHotStore(); + + if(randomBoolean) { + assertTrue(memoryManager.isCoordinatingShardLimitBreached(tracker1, requestStartTime, hotStore, 6 * 1024)); + assertEquals(1, tracker1.coordinatingNodeLimitsBreachedRejections.get()); + assertEquals(0, tracker2.coordinatingNodeLimitsBreachedRejections.get()); + } else { + assertTrue(memoryManager.isPrimaryShardLimitBreached(tracker1, requestStartTime, hotStore, 6 * 1024)); + assertEquals(1, tracker1.primaryNodeLimitsBreachedRejections.get()); + assertEquals(0, tracker2.primaryNodeLimitsBreachedRejections.get()); + } + + assertEquals(limit1, tracker1.primaryAndCoordinatingLimits.get()); + assertEquals(limit2, tracker2.primaryAndCoordinatingLimits.get()); + assertEquals(1, memoryManager.totalNodeLimitsBreachedRejections.get()); + } + + public void testReplicaShardLimitsSoftLimitNotBreachedAndNodeLevelRejections() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = store.getShardIndexingPressureTracker(shardId2); + tracker1.currentReplicaBytes.addAndGet(5 * 1024); + tracker2.replicaLimits.addAndGet(10 * 1024); + long limit1 = tracker1.replicaLimits.get(); + long limit2 = tracker2.replicaLimits.get(); + long requestStartTime = System.currentTimeMillis(); + Map hotStore = store.getShardIndexingPressureHotStore(); + + assertTrue(memoryManager.isReplicaShardLimitBreached(tracker1, requestStartTime, hotStore, 10 * 1024)); + assertEquals(limit1, tracker1.replicaLimits.get()); + assertEquals(limit2, tracker2.replicaLimits.get()); + assertEquals(1, tracker1.replicaNodeLimitsBreachedRejections.get()); + assertEquals(0, tracker2.replicaNodeLimitsBreachedRejections.get()); + assertEquals(1, memoryManager.totalNodeLimitsBreachedRejections.get()); + } + + public void testCoordinatingPrimarySoftLimitBreachedAndNodeLevelRejections() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = store.getShardIndexingPressureTracker(shardId2); + tracker1.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(4 * 1024); + tracker2.primaryAndCoordinatingLimits.addAndGet(6 * 1024); + long limit1 = tracker1.primaryAndCoordinatingLimits.get(); + long limit2 = tracker2.primaryAndCoordinatingLimits.get(); + long requestStartTime = System.currentTimeMillis(); + boolean randomBoolean = randomBoolean(); + Map hotStore = store.getShardIndexingPressureHotStore(); + + if(randomBoolean) { + assertTrue(memoryManager.isCoordinatingShardLimitBreached(tracker1, requestStartTime, hotStore, 8 * 1024)); + assertEquals(1, tracker1.coordinatingNodeLimitsBreachedRejections.get()); + assertEquals(0, tracker2.coordinatingNodeLimitsBreachedRejections.get()); + } else { + assertTrue(memoryManager.isPrimaryShardLimitBreached(tracker1, requestStartTime, hotStore, 8 * 1024)); + assertEquals(1, tracker1.primaryNodeLimitsBreachedRejections.get()); + assertEquals(0, tracker2.primaryNodeLimitsBreachedRejections.get()); + } + + assertEquals(limit1, tracker1.primaryAndCoordinatingLimits.get()); + assertEquals(limit2, tracker2.primaryAndCoordinatingLimits.get()); + assertEquals(1, memoryManager.totalNodeLimitsBreachedRejections.get()); + } + + public void testReplicaShardLimitsSoftLimitBreachedAndNodeLevelRejections() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = store.getShardIndexingPressureTracker(shardId2); + tracker1.currentReplicaBytes.addAndGet(5 * 1024); + tracker2.replicaLimits.addAndGet(12 * 1024); + long limit1 = tracker1.replicaLimits.get(); + long limit2 = tracker2.replicaLimits.get(); + long requestStartTime = System.currentTimeMillis(); + Map hotStore = store.getShardIndexingPressureHotStore(); + + assertTrue(memoryManager.isReplicaShardLimitBreached(tracker1, requestStartTime, hotStore, 12 * 1024)); + assertEquals(limit1, tracker1.replicaLimits.get()); + assertEquals(limit2, tracker2.replicaLimits.get()); + assertEquals(1, tracker1.replicaNodeLimitsBreachedRejections.get()); + assertEquals(0, tracker2.replicaNodeLimitsBreachedRejections.get()); + assertEquals(1, memoryManager.totalNodeLimitsBreachedRejections.get()); + } + + public void testCoordinatingPrimarySoftLimitBreachedAndLastSuccessfulRequestLimitRejections() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = store.getShardIndexingPressureTracker(shardId2); + tracker1.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(4 * 1024); + tracker2.primaryAndCoordinatingLimits.addAndGet(6 * 1024); + long limit1 = tracker1.primaryAndCoordinatingLimits.get(); + long limit2 = tracker2.primaryAndCoordinatingLimits.get(); + long requestStartTime = System.currentTimeMillis(); + boolean randomBoolean = randomBoolean(); + Map hotStore = store.getShardIndexingPressureHotStore(); + + if(randomBoolean) { + tracker1.lastSuccessfulCoordinatingRequestTimestamp.addAndGet(requestStartTime - 100); + tracker1.totalOutstandingCoordinatingRequests.addAndGet(2); + + assertTrue(memoryManager.isCoordinatingShardLimitBreached(tracker1, requestStartTime, hotStore, 8 * 1024)); + assertEquals(1, tracker1.coordinatingLastSuccessfulRequestLimitsBreachedRejections.get()); + assertEquals(0, tracker2.coordinatingLastSuccessfulRequestLimitsBreachedRejections.get()); + } else { + tracker1.lastSuccessfulPrimaryRequestTimestamp.addAndGet(requestStartTime - 100); + tracker1.totalOutstandingPrimaryRequests.addAndGet(2); + + assertTrue(memoryManager.isPrimaryShardLimitBreached(tracker1, requestStartTime, hotStore, 8 * 1024)); + assertEquals(1, tracker1.primaryLastSuccessfulRequestLimitsBreachedRejections.get()); + assertEquals(0, tracker2.primaryLastSuccessfulRequestLimitsBreachedRejections.get()); + } + + assertEquals(limit1, tracker1.primaryAndCoordinatingLimits.get()); + assertEquals(limit2, tracker2.primaryAndCoordinatingLimits.get()); + assertEquals(1, memoryManager.totalLastSuccessfulRequestLimitsBreachedRejections.get()); + } + + public void testReplicaShardLimitsSoftLimitBreachedAndLastSuccessfulRequestLimitRejections() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = store.getShardIndexingPressureTracker(shardId2); + tracker1.currentReplicaBytes.addAndGet(5 * 1024); + tracker2.replicaLimits.addAndGet(12 * 1024); + long limit1 = tracker1.replicaLimits.get(); + long limit2 = tracker2.replicaLimits.get(); + long requestStartTime = System.currentTimeMillis(); + tracker1.lastSuccessfulReplicaRequestTimestamp.addAndGet(requestStartTime - 100); + tracker1.totalOutstandingReplicaRequests.addAndGet(2); + + Map hotStore = store.getShardIndexingPressureHotStore(); + + assertTrue(memoryManager.isReplicaShardLimitBreached(tracker1, requestStartTime, hotStore, 12 * 1024)); + assertEquals(limit1, tracker1.replicaLimits.get()); + assertEquals(limit2, tracker2.replicaLimits.get()); + assertEquals(1, tracker1.replicaLastSuccessfulRequestLimitsBreachedRejections.get()); + assertEquals(0, tracker2.replicaLastSuccessfulRequestLimitsBreachedRejections.get()); + assertEquals(1, memoryManager.totalLastSuccessfulRequestLimitsBreachedRejections.get()); + } + + public void testCoordinatingPrimarySoftLimitBreachedAndLessOutstandingRequestsAndNoLastSuccessfulRequestLimitRejections() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = store.getShardIndexingPressureTracker(shardId2); + tracker1.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(1 * 1024); + tracker2.primaryAndCoordinatingLimits.addAndGet(6 * 1024); + long limit1 = tracker1.primaryAndCoordinatingLimits.get(); + long limit2 = tracker2.primaryAndCoordinatingLimits.get(); + long requestStartTime = System.currentTimeMillis(); + boolean randomBoolean = randomBoolean(); + Map hotStore = store.getShardIndexingPressureHotStore(); + + if(randomBoolean) { + tracker1.lastSuccessfulCoordinatingRequestTimestamp.addAndGet(requestStartTime - 100); + tracker1.totalOutstandingCoordinatingRequests.addAndGet(1); + + assertFalse(memoryManager.isCoordinatingShardLimitBreached(tracker1, requestStartTime, hotStore, 8 * 1024)); + assertEquals(0, tracker1.coordinatingLastSuccessfulRequestLimitsBreachedRejections.get()); + assertEquals(0, tracker2.coordinatingLastSuccessfulRequestLimitsBreachedRejections.get()); + } else { + tracker1.lastSuccessfulPrimaryRequestTimestamp.addAndGet(requestStartTime - 100); + tracker1.totalOutstandingPrimaryRequests.addAndGet(1); + + assertFalse(memoryManager.isPrimaryShardLimitBreached(tracker1, requestStartTime, hotStore, 8 * 1024)); + assertEquals(0, tracker1.primaryLastSuccessfulRequestLimitsBreachedRejections.get()); + assertEquals(0, tracker2.primaryLastSuccessfulRequestLimitsBreachedRejections.get()); + } + + assertTrue(tracker1.primaryAndCoordinatingLimits.get() > limit1); + assertEquals((long)(1 * 1024/0.85), tracker1.primaryAndCoordinatingLimits.get()); + assertEquals(limit2, tracker2.primaryAndCoordinatingLimits.get()); + assertEquals(0, memoryManager.totalLastSuccessfulRequestLimitsBreachedRejections.get()); + } + + public void testReplicaShardLimitsSoftLimitBreachedAndLessOutstandingRequestsAndNoLastSuccessfulRequestLimitRejections() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = store.getShardIndexingPressureTracker(shardId2); + tracker1.currentReplicaBytes.addAndGet(2 * 1024); + tracker2.replicaLimits.addAndGet(12 * 1024); + long limit1 = tracker1.replicaLimits.get(); + long limit2 = tracker2.replicaLimits.get(); + long requestStartTime = System.currentTimeMillis(); + tracker1.lastSuccessfulReplicaRequestTimestamp.addAndGet(requestStartTime - 100); + tracker1.totalOutstandingReplicaRequests.addAndGet(1); + + Map hotStore = store.getShardIndexingPressureHotStore(); + + assertFalse(memoryManager.isReplicaShardLimitBreached(tracker1, requestStartTime, hotStore, 12 * 1024)); + assertTrue(tracker1.replicaLimits.get() > limit1); + assertEquals((long)(2 * 1024/0.85), tracker1.replicaLimits.get()); + assertEquals(limit2, tracker2.replicaLimits.get()); + assertEquals(0, tracker1.replicaLastSuccessfulRequestLimitsBreachedRejections.get()); + assertEquals(0, tracker2.replicaLastSuccessfulRequestLimitsBreachedRejections.get()); + assertEquals(0, memoryManager.totalLastSuccessfulRequestLimitsBreachedRejections.get()); + } + + public void testCoordinatingPrimarySoftLimitBreachedAndThroughputDegradationLimitRejections() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = store.getShardIndexingPressureTracker(shardId2); + tracker1.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(4 * 1024); + tracker2.primaryAndCoordinatingLimits.addAndGet(6 * 1024); + long limit1 = tracker1.primaryAndCoordinatingLimits.get(); + long limit2 = tracker2.primaryAndCoordinatingLimits.get(); + long requestStartTime = System.currentTimeMillis(); + boolean randomBoolean = randomBoolean(); + Map hotStore = store.getShardIndexingPressureHotStore(); + + if(randomBoolean) { + tracker1.coordinatingThroughputMovingAverage.addAndGet(Double.doubleToLongBits(1d)); + tracker1.totalOutstandingCoordinatingRequests.addAndGet(2); + tracker1.totalCoordinatingBytes.addAndGet(60); + tracker1.coordinatingTimeInMillis.addAndGet(10); + tracker1.coordinatingThroughputMovingQueue.offer(1d); + tracker1.coordinatingThroughputMovingQueue.offer(2d); + + assertTrue(memoryManager.isCoordinatingShardLimitBreached(tracker1, requestStartTime, hotStore, 8 * 1024)); + assertEquals(1, tracker1.coordinatingThroughputDegradationLimitsBreachedRejections.get()); + assertEquals(0, tracker2.coordinatingThroughputDegradationLimitsBreachedRejections.get()); + } else { + tracker1.primaryThroughputMovingAverage.addAndGet(Double.doubleToLongBits(1d)); + tracker1.totalOutstandingPrimaryRequests.addAndGet(2); + tracker1.totalPrimaryBytes.addAndGet(60); + tracker1.primaryTimeInMillis.addAndGet(10); + tracker1.primaryThroughputMovingQueue.offer(1d); + tracker1.primaryThroughputMovingQueue.offer(2d); + + assertTrue(memoryManager.isPrimaryShardLimitBreached(tracker1, requestStartTime, hotStore, 8 * 1024)); + assertEquals(1, tracker1.primaryThroughputDegradationLimitsBreachedRejections.get()); + assertEquals(0, tracker2.primaryThroughputDegradationLimitsBreachedRejections.get()); + } + + assertEquals(limit1, tracker1.primaryAndCoordinatingLimits.get()); + assertEquals(limit2, tracker2.primaryAndCoordinatingLimits.get()); + assertEquals(1, memoryManager.totalThroughputDegradationLimitsBreachedRejections.get()); + } + + public void testReplicaShardLimitsSoftLimitBreachedAndThroughputDegradationLimitRejections() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = store.getShardIndexingPressureTracker(shardId2); + tracker1.currentReplicaBytes.addAndGet(5 * 1024); + tracker2.replicaLimits.addAndGet(12 * 1024); + long limit1 = tracker1.replicaLimits.get(); + long limit2 = tracker2.replicaLimits.get(); + long requestStartTime = System.currentTimeMillis(); + tracker1.replicaThroughputMovingAverage.addAndGet(Double.doubleToLongBits(1d)); + tracker1.totalOutstandingReplicaRequests.addAndGet(2); + tracker1.totalReplicaBytes.addAndGet(80); + tracker1.replicaTimeInMillis.addAndGet(10); + tracker1.replicaThroughputMovingQueue.offer(1d); + tracker1.replicaThroughputMovingQueue.offer(2d); + + Map hotStore = store.getShardIndexingPressureHotStore(); + + assertTrue(memoryManager.isReplicaShardLimitBreached(tracker1, requestStartTime, hotStore, 12 * 1024)); + assertEquals(limit1, tracker1.replicaLimits.get()); + assertEquals(limit2, tracker2.replicaLimits.get()); + assertEquals(1, tracker1.replicaThroughputDegradationLimitsBreachedRejections.get()); + assertEquals(0, tracker2.replicaThroughputDegradationLimitsBreachedRejections.get()); + assertEquals(1, memoryManager.totalThroughputDegradationLimitsBreachedRejections.get()); + } + + public void testCoordinatingPrimarySoftLimitBreachedAndMovingAverageQueueNotBuildUpAndNoThroughputDegradationLimitRejections() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = store.getShardIndexingPressureTracker(shardId2); + tracker1.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(1 * 1024); + tracker2.primaryAndCoordinatingLimits.addAndGet(6 * 1024); + long limit1 = tracker1.primaryAndCoordinatingLimits.get(); + long limit2 = tracker2.primaryAndCoordinatingLimits.get(); + long requestStartTime = System.currentTimeMillis(); + boolean randomBoolean = randomBoolean(); + Map hotStore = store.getShardIndexingPressureHotStore(); + + if(randomBoolean) { + tracker1.coordinatingThroughputMovingAverage.addAndGet(Double.doubleToLongBits(1d)); + tracker1.totalOutstandingCoordinatingRequests.addAndGet(1); + tracker1.totalCoordinatingBytes.addAndGet(60); + tracker1.coordinatingTimeInMillis.addAndGet(10); + tracker1.coordinatingThroughputMovingQueue.offer(1d); + + assertFalse(memoryManager.isCoordinatingShardLimitBreached(tracker1, requestStartTime, hotStore, 8 * 1024)); + assertEquals(0, tracker1.coordinatingThroughputDegradationLimitsBreachedRejections.get()); + assertEquals(0, tracker2.coordinatingThroughputDegradationLimitsBreachedRejections.get()); + assertEquals(0, tracker1.coordinatingNodeLimitsBreachedRejections.get()); + assertEquals(0, tracker2.coordinatingNodeLimitsBreachedRejections.get()); + } else { + tracker1.primaryThroughputMovingAverage.addAndGet(Double.doubleToLongBits(1d)); + tracker1.totalOutstandingPrimaryRequests.addAndGet(1); + tracker1.totalPrimaryBytes.addAndGet(60); + tracker1.primaryTimeInMillis.addAndGet(10); + tracker1.primaryThroughputMovingQueue.offer(1d); + + assertFalse(memoryManager.isPrimaryShardLimitBreached(tracker1, requestStartTime, hotStore, 8 * 1024)); + assertEquals(0, tracker1.primaryThroughputDegradationLimitsBreachedRejections.get()); + assertEquals(0, tracker2.primaryThroughputDegradationLimitsBreachedRejections.get()); + assertEquals(0, tracker1.primaryNodeLimitsBreachedRejections.get()); + assertEquals(0, tracker2.primaryNodeLimitsBreachedRejections.get()); + } + + assertTrue(tracker1.primaryAndCoordinatingLimits.get() > limit1); + assertEquals((long)(1 * 1024/0.85), tracker1.primaryAndCoordinatingLimits.get()); + assertEquals(limit2, tracker2.primaryAndCoordinatingLimits.get()); + assertEquals(0, memoryManager.totalThroughputDegradationLimitsBreachedRejections.get()); + } + + public void testReplicaShardLimitsSoftLimitBreachedAndMovingAverageQueueNotBuildUpAndNThroughputDegradationLimitRejections() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = store.getShardIndexingPressureTracker(shardId2); + tracker1.currentReplicaBytes.addAndGet(2 * 1024); + tracker2.replicaLimits.addAndGet(12 * 1024); + long limit1 = tracker1.replicaLimits.get(); + long limit2 = tracker2.replicaLimits.get(); + long requestStartTime = System.currentTimeMillis(); + tracker1.replicaThroughputMovingAverage.addAndGet(Double.doubleToLongBits(1d)); + tracker1.totalOutstandingReplicaRequests.addAndGet(1); + tracker1.totalReplicaBytes.addAndGet(80); + tracker1.replicaTimeInMillis.addAndGet(10); + tracker1.replicaThroughputMovingQueue.offer(1d); + + Map hotStore = store.getShardIndexingPressureHotStore(); + + assertFalse(memoryManager.isReplicaShardLimitBreached(tracker1, requestStartTime, hotStore, 12 * 1024)); + assertTrue(tracker1.replicaLimits.get() > limit1); + assertEquals((long)(2 * 1024/0.85), tracker1.replicaLimits.get()); + assertEquals(limit2, tracker2.replicaLimits.get()); + assertEquals(0, tracker1.replicaThroughputDegradationLimitsBreachedRejections.get()); + assertEquals(0, tracker2.replicaThroughputDegradationLimitsBreachedRejections.get()); + assertEquals(0, tracker1.replicaNodeLimitsBreachedRejections.get()); + assertEquals(0, memoryManager.totalThroughputDegradationLimitsBreachedRejections.get()); + } + + public void testCoordinatingPrimarySoftLimitBreachedAndNoSecondaryParameterBreachedAndNodeLevelRejections() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = store.getShardIndexingPressureTracker(shardId2); + tracker1.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(4 * 1024); + tracker2.primaryAndCoordinatingLimits.addAndGet(6 * 1024); + long limit1 = tracker1.primaryAndCoordinatingLimits.get(); + long limit2 = tracker2.primaryAndCoordinatingLimits.get(); + long requestStartTime = System.currentTimeMillis(); + boolean randomBoolean = randomBoolean(); + Map hotStore = store.getShardIndexingPressureHotStore(); + + if(randomBoolean) { + tracker1.coordinatingThroughputMovingAverage.addAndGet(Double.doubleToLongBits(1d)); + tracker1.totalOutstandingCoordinatingRequests.addAndGet(1); + tracker1.totalCoordinatingBytes.addAndGet(60); + tracker1.coordinatingTimeInMillis.addAndGet(10); + tracker1.coordinatingThroughputMovingQueue.offer(1d); + + assertTrue(memoryManager.isCoordinatingShardLimitBreached(tracker1, requestStartTime, hotStore, 8 * 1024)); + assertEquals(1, tracker1.coordinatingNodeLimitsBreachedRejections.get()); + assertEquals(0, tracker2.coordinatingNodeLimitsBreachedRejections.get()); + } else { + tracker1.primaryThroughputMovingAverage.addAndGet(Double.doubleToLongBits(1d)); + tracker1.totalOutstandingPrimaryRequests.addAndGet(1); + tracker1.totalPrimaryBytes.addAndGet(60); + tracker1.primaryTimeInMillis.addAndGet(10); + tracker1.primaryThroughputMovingQueue.offer(1d); + + assertTrue(memoryManager.isPrimaryShardLimitBreached(tracker1, requestStartTime, hotStore, 8 * 1024)); + assertEquals(1, tracker1.primaryNodeLimitsBreachedRejections.get()); + assertEquals(0, tracker2.primaryNodeLimitsBreachedRejections.get()); + } + + assertEquals(limit1, tracker1.primaryAndCoordinatingLimits.get()); + assertEquals(limit2, tracker2.primaryAndCoordinatingLimits.get()); + assertEquals(1, memoryManager.totalNodeLimitsBreachedRejections.get()); + } + + public void testReplicaShardLimitsSoftLimitBreachedAndNoSecondaryParameterBreachedAndNodeLevelRejections() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(shardId1); + ShardIndexingPressureTracker tracker2 = store.getShardIndexingPressureTracker(shardId2); + tracker1.currentReplicaBytes.addAndGet(5 * 1024); + tracker2.replicaLimits.addAndGet(12 * 1024); + long limit1 = tracker1.replicaLimits.get(); + long limit2 = tracker2.replicaLimits.get(); + long requestStartTime = System.currentTimeMillis(); + tracker1.replicaThroughputMovingAverage.addAndGet(Double.doubleToLongBits(1d)); + tracker1.totalOutstandingReplicaRequests.addAndGet(1); + tracker1.totalReplicaBytes.addAndGet(80); + tracker1.replicaTimeInMillis.addAndGet(10); + tracker1.replicaThroughputMovingQueue.offer(1d); + + Map hotStore = store.getShardIndexingPressureHotStore(); + + assertTrue(memoryManager.isReplicaShardLimitBreached(tracker1, requestStartTime, hotStore, 12 * 1024)); + assertEquals(limit1, tracker1.replicaLimits.get()); + assertEquals(limit2, tracker2.replicaLimits.get()); + assertEquals(1, tracker1.replicaNodeLimitsBreachedRejections.get()); + assertEquals(0, tracker2.replicaNodeLimitsBreachedRejections.get()); + assertEquals(1, memoryManager.totalNodeLimitsBreachedRejections.get()); + } + + public void testDecreaseShardPrimaryAndCoordinatingLimitsToBaseLimit() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(shardId1); + tracker1.primaryAndCoordinatingLimits.addAndGet(1 * 1024); + tracker1.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(0); + long limit1 = tracker1.primaryAndCoordinatingLimits.get(); + memoryManager.decreaseShardPrimaryAndCoordinatingLimits(tracker1); + + assertTrue(tracker1.primaryAndCoordinatingLimits.get() < limit1); + assertEquals(10, tracker1.primaryAndCoordinatingLimits.get()); + } + + public void testDecreaseShardReplicaLimitsToBaseLimit() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(shardId1); + + tracker1.replicaLimits.addAndGet(1 * 1024); + tracker1.currentReplicaBytes.addAndGet(0); + long limit1 = tracker1.replicaLimits.get(); + memoryManager.decreaseShardReplicaLimits(tracker1); + + assertTrue(tracker1.replicaLimits.get() < limit1); + assertEquals(15, tracker1.replicaLimits.get()); + } + + public void testDecreaseShardPrimaryAndCoordinatingLimits() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(shardId1); + tracker1.primaryAndCoordinatingLimits.addAndGet(1 * 1024); + tracker1.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(512); + long limit1 = tracker1.primaryAndCoordinatingLimits.get(); + memoryManager.decreaseShardPrimaryAndCoordinatingLimits(tracker1); + + assertTrue(tracker1.primaryAndCoordinatingLimits.get() < limit1); + assertEquals((long)(512/0.85), tracker1.primaryAndCoordinatingLimits.get()); + } + + public void testDecreaseShardReplicaLimits() { + ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, + clusterSettings, settings); + ShardIndexingPressureStore store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(shardId1); + + tracker1.replicaLimits.addAndGet(1 * 1024); + tracker1.currentReplicaBytes.addAndGet(512); + long limit1 = tracker1.replicaLimits.get(); + memoryManager.decreaseShardReplicaLimits(tracker1); + + assertTrue(tracker1.replicaLimits.get() < limit1); + assertEquals((long)(512/0.85), tracker1.replicaLimits.get()); + } +}