Skip to content

Commit

Permalink
Test and code refactoring for shard indexing pressure.
Browse files Browse the repository at this point in the history
Signed-off-by: Saurabh Singh <sisurab@amazon.com>
  • Loading branch information
getsaurabh02 committed Jul 29, 2021
1 parent ca43fa1 commit 8dbb32e
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 361 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.index.ShardIndexingPressureTracker.CommonOperationTracker;
import org.opensearch.index.ShardIndexingPressureTracker.OperationTracker;
import org.opensearch.index.ShardIndexingPressureTracker.PerformanceTracker;
import org.opensearch.index.ShardIndexingPressureTracker.RejectionTracker;
import org.opensearch.index.ShardIndexingPressureTracker.StatsTracker;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.stats.ShardIndexingPressureStats;
Expand All @@ -22,6 +23,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* Shard Indexing Pressure is a framework level artefact build on top of IndexingPressure to track incoming indexing request, per shard.
Expand Down Expand Up @@ -50,7 +52,7 @@ public class ShardIndexingPressure extends IndexingPressure {
public Releasable markCoordinatingOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
if(0 == bytes) { return () -> {}; }

long requestStartTime = System.currentTimeMillis();
long requestStartTime = System.nanoTime();
ShardIndexingPressureTracker tracker = getShardIndexingPressureTracker(shardId);
long nodeCombinedBytes = currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
long nodeReplicaBytes = currentReplicaBytes.get();
Expand All @@ -65,25 +67,11 @@ public Releasable markCoordinatingOperationStarted(ShardId shardId, long bytes,
}

if (shouldRejectRequest(nodeLevelLimitBreached, shardLevelLimitBreached)) {
long nodeBytesWithoutOperation = nodeCombinedBytes - bytes;
long nodeTotalBytesWithoutOperation = nodeTotalBytes - bytes;
long shardBytesWithoutOperation = shardCombinedBytes - bytes;

currentCombinedCoordinatingAndPrimaryBytes.addAndGet(-bytes);
coordinatingRejections.getAndIncrement();
currentCombinedCoordinatingAndPrimaryBytes.addAndGet(-bytes);
tracker.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(-bytes);
tracker.getCoordinatingOperationTracker().getRejectionTracker().incrementTotalRejections();

throw new OpenSearchRejectedExecutionException("rejected execution of coordinating operation [" +
"shard_detail=[" + shardId.getIndexName() + "][" + shardId.id() + "][C], " +
"shard_coordinating_and_primary_bytes=" + shardBytesWithoutOperation + ", " +
"shard_operation_bytes=" + bytes + ", " +
"shard_max_coordinating_and_primary_bytes=" + tracker.getPrimaryAndCoordinatingLimits() + "] OR [" +
"node_coordinating_and_primary_bytes=" + nodeBytesWithoutOperation + ", " +
"node_replica_bytes=" + nodeReplicaBytes + ", " +
"node_all_bytes=" + nodeTotalBytesWithoutOperation + ", " +
"node_operation_bytes=" + bytes + ", " +
"node_max_coordinating_and_primary_bytes=" + primaryAndCoordinatingLimits + "]", false);
rejectShardRequest(tracker, bytes, nodeTotalBytes, shardCombinedBytes,
tracker.getCoordinatingOperationTracker().getRejectionTracker(), "coordinating");
}
}
currentCoordinatingBytes.addAndGet(bytes);
Expand Down Expand Up @@ -124,7 +112,7 @@ public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(ShardId sha
public Releasable markPrimaryOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
if(0 == bytes) { return () -> {}; }

long requestStartTime = System.currentTimeMillis();
long requestStartTime = System.nanoTime();
ShardIndexingPressureTracker tracker = getShardIndexingPressureTracker(shardId);
long nodeCombinedBytes = currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
long nodeReplicaBytes = currentReplicaBytes.get();
Expand All @@ -139,25 +127,11 @@ public Releasable markPrimaryOperationStarted(ShardId shardId, long bytes, boole
}

if (shouldRejectRequest(nodeLevelLimitBreached, shardLevelLimitBreached)) {
long nodeBytesWithoutOperation = nodeCombinedBytes - bytes;
long nodeTotalBytesWithoutOperation = nodeTotalBytes - bytes;
long shardBytesWithoutOperation = shardCombinedBytes - bytes;

currentCombinedCoordinatingAndPrimaryBytes.addAndGet(-bytes);
primaryRejections.getAndIncrement();
currentCombinedCoordinatingAndPrimaryBytes.addAndGet(-bytes);
tracker.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(-bytes);
tracker.getPrimaryOperationTracker().getRejectionTracker().incrementTotalRejections();

throw new OpenSearchRejectedExecutionException("rejected execution of primary operation [" +
"shard_detail=[" + shardId.getIndexName() + "][" + shardId.id() + "][P], " +
"shard_coordinating_and_primary_bytes=" + shardBytesWithoutOperation + ", " +
"shard_operation_bytes=" + bytes + ", " +
"shard_max_coordinating_and_primary_bytes=" + tracker.getPrimaryAndCoordinatingLimits() + "] OR [" +
"node_coordinating_and_primary_bytes=" + nodeBytesWithoutOperation + ", " +
"node_replica_bytes=" + nodeReplicaBytes + ", " +
"node_all_bytes=" + nodeTotalBytesWithoutOperation + ", " +
"node_operation_bytes=" + bytes + ", " +
"node_max_coordinating_and_primary_bytes=" + this.primaryAndCoordinatingLimits + "]", false);
rejectShardRequest(tracker, bytes, nodeTotalBytes, shardCombinedBytes,
tracker.getPrimaryOperationTracker().getRejectionTracker(), "primary");
}
}
currentPrimaryBytes.addAndGet(bytes);
Expand All @@ -182,7 +156,7 @@ public Releasable markPrimaryOperationStarted(ShardId shardId, long bytes, boole
public Releasable markReplicaOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
if(0 == bytes) { return () -> {}; }

long requestStartTime = System.currentTimeMillis();
long requestStartTime = System.nanoTime();
ShardIndexingPressureTracker tracker = getShardIndexingPressureTracker(shardId);
long nodeReplicaBytes = currentReplicaBytes.addAndGet(bytes);
long shardReplicaBytes = tracker.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(bytes);
Expand All @@ -195,22 +169,11 @@ public Releasable markReplicaOperationStarted(ShardId shardId, long bytes, boole
}

if (shouldRejectRequest(nodeLevelLimitBreached, shardLevelLimitBreached)) {
long nodeReplicaBytesWithoutOperation = nodeReplicaBytes - bytes;
long shardReplicaBytesWithoutOperation = shardReplicaBytes - bytes;

currentReplicaBytes.addAndGet(-bytes);
replicaRejections.getAndIncrement();
currentReplicaBytes.addAndGet(-bytes);
tracker.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(-bytes);
tracker.getReplicaOperationTracker().getRejectionTracker().incrementTotalRejections();

throw new OpenSearchRejectedExecutionException("rejected execution of replica operation [" +
"shard_detail=[" + shardId.getIndexName() + "][" + shardId.id() + "][R], " +
"shard_replica_bytes=" + shardReplicaBytesWithoutOperation + ", " +
"operation_bytes=" + bytes + ", " +
"max_coordinating_and_primary_bytes=" + tracker.getReplicaLimits() + "] OR [" +
"replica_bytes=" + nodeReplicaBytesWithoutOperation + ", " +
"operation_bytes=" + bytes + ", " +
"max_coordinating_and_primary_bytes=" + this.replicaLimits + "]", false);
rejectShardRequest(tracker, bytes, nodeReplicaBytes, shardReplicaBytes,
tracker.getReplicaOperationTracker().getRejectionTracker(), "replica");
}
}
totalReplicaBytes.addAndGet(bytes);
Expand Down Expand Up @@ -239,8 +202,8 @@ private void markShardOperationStarted(StatsTracker statsTracker, PerformanceTra

private void adjustPerformanceUponCompletion(long bytes, long requestStartTime, StatsTracker statsTracker,
PerformanceTracker performanceTracker) {
long requestEndTime = System.currentTimeMillis();
long requestLatency = requestEndTime - requestStartTime;
long requestEndTime = System.nanoTime();
long requestLatency = TimeUnit.NANOSECONDS.toMillis(requestEndTime - requestStartTime);

performanceTracker.addLatencyInMillis(requestLatency);
performanceTracker.updateLastSuccessfulRequestTimestamp(requestEndTime);
Expand Down Expand Up @@ -300,6 +263,25 @@ private double calculateMovingAverage(long currentAverage, double frontValue, do
}
}

private void rejectShardRequest(ShardIndexingPressureTracker tracker, long bytes, long nodeTotalBytes, long shardTotalBytes,
RejectionTracker rejectionTracker, String operationType) {
long nodeBytesWithoutOperation = nodeTotalBytes - bytes;
long shardBytesWithoutOperation = shardTotalBytes - bytes;
ShardId shardId = tracker.getShardId();

rejectionTracker.incrementTotalRejections();
throw new OpenSearchRejectedExecutionException("rejected execution of " + operationType + " operation [" +
"shard_detail=[" + shardId.getIndexName() + "][" + shardId.id() + "], " +
"shard_total_bytes=" + shardBytesWithoutOperation + ", " +
"shard_operation_bytes=" + bytes + ", " +
"shard_max_coordinating_and_primary_bytes=" + tracker.getPrimaryAndCoordinatingLimits() + ", " +
"shard_max_replica_bytes=" + tracker.getReplicaLimits() + "] OR [" +
"node_total_bytes=" + nodeBytesWithoutOperation + ", " +
"node_operation_bytes=" + bytes + ", " +
"node_max_coordinating_and_primary_bytes=" + primaryAndCoordinatingLimits + ", " +
"node_max_replica_bytes=" + replicaLimits + "]", false);
}

public ShardIndexingPressureStats shardStats(CommonStatsFlags statsFlags) {
if (statsFlags.includeOnlyTopIndexingPressureMetrics()) {
return topStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ private boolean evaluateThroughputDegradationLimitsBreached(PerformanceTracker p
*/
private boolean evaluateLastSuccessfulRequestDurationLimitsBreached(PerformanceTracker performanceTracker, long requestStartTime) {
return (performanceTracker.getLastSuccessfulRequestTimestamp() > 0) &&
(requestStartTime - performanceTracker.getLastSuccessfulRequestTimestamp()) > this.successfulRequestElapsedTimeout.millis() &&
(requestStartTime - performanceTracker.getLastSuccessfulRequestTimestamp()) > this.successfulRequestElapsedTimeout.nanos() &&
performanceTracker.getTotalOutstandingRequests() > this.maxOutstandingRequests;
}

Expand Down
Loading

0 comments on commit 8dbb32e

Please sign in to comment.