diff --git a/server/src/main/java/org/opensearch/index/IndexingPressure.java b/server/src/main/java/org/opensearch/index/IndexingPressure.java index 647fa74a875f7..267397e05e28c 100644 --- a/server/src/main/java/org/opensearch/index/IndexingPressure.java +++ b/server/src/main/java/org/opensearch/index/IndexingPressure.java @@ -38,22 +38,22 @@ public class IndexingPressure { private static final Logger logger = LogManager.getLogger(IndexingPressure.class); - final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0); - final AtomicLong currentCoordinatingBytes = new AtomicLong(0); - final AtomicLong currentPrimaryBytes = new AtomicLong(0); - final AtomicLong currentReplicaBytes = new AtomicLong(0); - - final AtomicLong totalCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0); - final AtomicLong totalCoordinatingBytes = new AtomicLong(0); - final AtomicLong totalPrimaryBytes = new AtomicLong(0); - final AtomicLong totalReplicaBytes = new AtomicLong(0); - - final AtomicLong coordinatingRejections = new AtomicLong(0); - final AtomicLong primaryRejections = new AtomicLong(0); - final AtomicLong replicaRejections = new AtomicLong(0); - - final long primaryAndCoordinatingLimits; - final long replicaLimits; + protected final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0); + protected final AtomicLong currentCoordinatingBytes = new AtomicLong(0); + protected final AtomicLong currentPrimaryBytes = new AtomicLong(0); + protected final AtomicLong currentReplicaBytes = new AtomicLong(0); + + protected final AtomicLong totalCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0); + protected final AtomicLong totalCoordinatingBytes = new AtomicLong(0); + protected final AtomicLong totalPrimaryBytes = new AtomicLong(0); + protected final AtomicLong totalReplicaBytes = new AtomicLong(0); + + protected final AtomicLong coordinatingRejections = new AtomicLong(0); + protected final AtomicLong primaryRejections = new AtomicLong(0); + protected final AtomicLong replicaRejections = new AtomicLong(0); + + protected final long primaryAndCoordinatingLimits; + protected final long replicaLimits; public IndexingPressure(Settings settings) { this.primaryAndCoordinatingLimits = MAX_INDEXING_BYTES.get(settings).getBytes(); diff --git a/server/src/main/java/org/opensearch/index/ShardIndexingPressureStore.java b/server/src/main/java/org/opensearch/index/ShardIndexingPressureStore.java index 8d30d44819aee..9a96998c88a18 100644 --- a/server/src/main/java/org/opensearch/index/ShardIndexingPressureStore.java +++ b/server/src/main/java/org/opensearch/index/ShardIndexingPressureStore.java @@ -65,10 +65,9 @@ public ShardIndexingPressureTracker getShardIndexingPressureTracker(ShardId shar tracker = shardIndexingPressureColdStore.get((long)shardId.hashCode()); // If not present in cold store so instantiate a new one if (isNull(tracker)) { - ShardIndexingPressureTracker newShardIndexingPressureTracker = new ShardIndexingPressureTracker(shardId); - newShardIndexingPressureTracker.getPrimaryAndCoordinatingLimits().set(this.shardIndexingPressureSettings - .getShardPrimaryAndCoordinatingBaseLimits()); - newShardIndexingPressureTracker.getReplicaLimits().set(this.shardIndexingPressureSettings.getShardReplicaBaseLimits()); + ShardIndexingPressureTracker newShardIndexingPressureTracker = new ShardIndexingPressureTracker(shardId, + this.shardIndexingPressureSettings.getShardPrimaryAndCoordinatingBaseLimits(), + this.shardIndexingPressureSettings.getShardReplicaBaseLimits()); // Try update the new shard stat to the hot store tracker = shardIndexingPressureHotStore.putIfAbsent((long) shardId.hashCode(), newShardIndexingPressureTracker); // Update the tracker so that we use the one actual in the hot store diff --git a/server/src/main/java/org/opensearch/index/ShardIndexingPressureTracker.java b/server/src/main/java/org/opensearch/index/ShardIndexingPressureTracker.java index 2b42775d42c17..7b4663c29dc86 100644 --- a/server/src/main/java/org/opensearch/index/ShardIndexingPressureTracker.java +++ b/server/src/main/java/org/opensearch/index/ShardIndexingPressureTracker.java @@ -74,15 +74,17 @@ public class ShardIndexingPressureTracker { private final OutstandingRequestTracker outstandingRequestTracker = new OutstandingRequestTracker(); private final ThroughputTracker throughputTracker = new ThroughputTracker(); - private final AtomicLong primaryAndCoordinatingLimits = new AtomicLong(0); - private final AtomicLong replicaLimits = new AtomicLong(0); + private final AtomicLong primaryAndCoordinatingLimits; + private final AtomicLong replicaLimits; public ShardId getShardId() { return shardId; } - public ShardIndexingPressureTracker(ShardId shardId) { + public ShardIndexingPressureTracker(ShardId shardId, long primaryAndCoordinatingLimits, long replicaLimits) { this.shardId = shardId; + this.primaryAndCoordinatingLimits = new AtomicLong(primaryAndCoordinatingLimits); + this.replicaLimits = new AtomicLong(replicaLimits); } public MemoryTracker memory() {