Skip to content

Commit

Permalink
Added some refactoring based on PR comments.
Browse files Browse the repository at this point in the history
Signed-off-by: Saurabh Singh <sisurab@amazon.com>
  • Loading branch information
getsaurabh02 committed Apr 12, 2021
1 parent 14426c2 commit 57a0032
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 23 deletions.
32 changes: 16 additions & 16 deletions server/src/main/java/org/opensearch/index/IndexingPressure.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,22 @@ public class IndexingPressure {

private static final Logger logger = LogManager.getLogger(IndexingPressure.class);

final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0);
final AtomicLong currentCoordinatingBytes = new AtomicLong(0);
final AtomicLong currentPrimaryBytes = new AtomicLong(0);
final AtomicLong currentReplicaBytes = new AtomicLong(0);

final AtomicLong totalCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0);
final AtomicLong totalCoordinatingBytes = new AtomicLong(0);
final AtomicLong totalPrimaryBytes = new AtomicLong(0);
final AtomicLong totalReplicaBytes = new AtomicLong(0);

final AtomicLong coordinatingRejections = new AtomicLong(0);
final AtomicLong primaryRejections = new AtomicLong(0);
final AtomicLong replicaRejections = new AtomicLong(0);

final long primaryAndCoordinatingLimits;
final long replicaLimits;
protected final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0);
protected final AtomicLong currentCoordinatingBytes = new AtomicLong(0);
protected final AtomicLong currentPrimaryBytes = new AtomicLong(0);
protected final AtomicLong currentReplicaBytes = new AtomicLong(0);

protected final AtomicLong totalCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0);
protected final AtomicLong totalCoordinatingBytes = new AtomicLong(0);
protected final AtomicLong totalPrimaryBytes = new AtomicLong(0);
protected final AtomicLong totalReplicaBytes = new AtomicLong(0);

protected final AtomicLong coordinatingRejections = new AtomicLong(0);
protected final AtomicLong primaryRejections = new AtomicLong(0);
protected final AtomicLong replicaRejections = new AtomicLong(0);

protected final long primaryAndCoordinatingLimits;
protected final long replicaLimits;

public IndexingPressure(Settings settings) {
this.primaryAndCoordinatingLimits = MAX_INDEXING_BYTES.get(settings).getBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,9 @@ public ShardIndexingPressureTracker getShardIndexingPressureTracker(ShardId shar
tracker = shardIndexingPressureColdStore.get((long)shardId.hashCode());
// If not present in cold store so instantiate a new one
if (isNull(tracker)) {
ShardIndexingPressureTracker newShardIndexingPressureTracker = new ShardIndexingPressureTracker(shardId);
newShardIndexingPressureTracker.getPrimaryAndCoordinatingLimits().set(this.shardIndexingPressureSettings
.getShardPrimaryAndCoordinatingBaseLimits());
newShardIndexingPressureTracker.getReplicaLimits().set(this.shardIndexingPressureSettings.getShardReplicaBaseLimits());
ShardIndexingPressureTracker newShardIndexingPressureTracker = new ShardIndexingPressureTracker(shardId,
this.shardIndexingPressureSettings.getShardPrimaryAndCoordinatingBaseLimits(),
this.shardIndexingPressureSettings.getShardReplicaBaseLimits());
// Try update the new shard stat to the hot store
tracker = shardIndexingPressureHotStore.putIfAbsent((long) shardId.hashCode(), newShardIndexingPressureTracker);
// Update the tracker so that we use the one actual in the hot store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,17 @@ public class ShardIndexingPressureTracker {
private final OutstandingRequestTracker outstandingRequestTracker = new OutstandingRequestTracker();
private final ThroughputTracker throughputTracker = new ThroughputTracker();

private final AtomicLong primaryAndCoordinatingLimits = new AtomicLong(0);
private final AtomicLong replicaLimits = new AtomicLong(0);
private final AtomicLong primaryAndCoordinatingLimits;
private final AtomicLong replicaLimits;

public ShardId getShardId() {
return shardId;
}

public ShardIndexingPressureTracker(ShardId shardId) {
public ShardIndexingPressureTracker(ShardId shardId, long primaryAndCoordinatingLimits, long replicaLimits) {
this.shardId = shardId;
this.primaryAndCoordinatingLimits = new AtomicLong(primaryAndCoordinatingLimits);
this.replicaLimits = new AtomicLong(replicaLimits);
}

public MemoryTracker memory() {
Expand Down

0 comments on commit 57a0032

Please sign in to comment.