Skip to content

Commit

Permalink
Add framework level constructs to track shard indexing pressure. (#478)
Browse files Browse the repository at this point in the history
Signed-off-by: Saurabh Singh <sisurab@amazon.com>
  • Loading branch information
getsaurabh02 committed Apr 5, 2021
1 parent 5971a51 commit 7a80862
Show file tree
Hide file tree
Showing 23 changed files with 3,837 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.index.ShardIndexingPressure;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -40,6 +41,8 @@ public class CommonStatsFlags implements Writeable, Cloneable {
private String[] completionDataFields = null;
private boolean includeSegmentFileSizes = false;
private boolean includeUnloadedSegments = false;
private boolean includeAllShardIndexingPressureTrackers = false;
private boolean includeOnlyTopIndexingPressureMetrics = false;

/**
* @param flags flags to set. If no flags are supplied, default flags will be set.
Expand Down Expand Up @@ -67,6 +70,15 @@ public CommonStatsFlags(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_7_2_0)) {
includeUnloadedSegments = in.readBoolean();
}
if (in.getVersion().onOrAfter(Version.V_7_10_2)) {
includeAllShardIndexingPressureTrackers = in.readBoolean();
includeOnlyTopIndexingPressureMetrics = in.readBoolean();
} else if (in.getVersion().onOrAfter(Version.V_7_9_0)) {
if (ShardIndexingPressure.isShardIndexingPressureAttributeEnabled()) {
includeAllShardIndexingPressureTrackers = in.readBoolean();
includeOnlyTopIndexingPressureMetrics = in.readBoolean();
}
}
}

@Override
Expand All @@ -85,6 +97,15 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_2_0)) {
out.writeBoolean(includeUnloadedSegments);
}
if (out.getVersion().onOrAfter(Version.V_7_10_2)) {
out.writeBoolean(includeAllShardIndexingPressureTrackers);
out.writeBoolean(includeOnlyTopIndexingPressureMetrics);
} else if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
if (ShardIndexingPressure.isShardIndexingPressureAttributeEnabled()) {
out.writeBoolean(includeAllShardIndexingPressureTrackers);
out.writeBoolean(includeOnlyTopIndexingPressureMetrics);
}
}
}

/**
Expand All @@ -98,6 +119,8 @@ public CommonStatsFlags all() {
completionDataFields = null;
includeSegmentFileSizes = false;
includeUnloadedSegments = false;
includeAllShardIndexingPressureTrackers = false;
includeOnlyTopIndexingPressureMetrics = false;
return this;
}

Expand All @@ -112,6 +135,8 @@ public CommonStatsFlags clear() {
completionDataFields = null;
includeSegmentFileSizes = false;
includeUnloadedSegments = false;
includeAllShardIndexingPressureTrackers = false;
includeOnlyTopIndexingPressureMetrics = false;
return this;
}

Expand Down Expand Up @@ -185,10 +210,28 @@ public CommonStatsFlags includeUnloadedSegments(boolean includeUnloadedSegments)
return this;
}

public CommonStatsFlags includeAllShardIndexingPressureTrackers(boolean includeAllShardPressureTrackers) {
this.includeAllShardIndexingPressureTrackers = includeAllShardPressureTrackers;
return this;
}

public CommonStatsFlags includeOnlyTopIndexingPressureMetrics(boolean includeOnlyTopIndexingPressureMetrics) {
this.includeOnlyTopIndexingPressureMetrics = includeOnlyTopIndexingPressureMetrics;
return this;
}

public boolean includeUnloadedSegments() {
return this.includeUnloadedSegments;
}

public boolean includeAllShardIndexingPressureTrackers() {
return this.includeAllShardIndexingPressureTrackers;
}

public boolean includeOnlyTopIndexingPressureMetrics() {
return this.includeOnlyTopIndexingPressureMetrics;
}

public boolean includeSegmentFileSizes() {
return this.includeSegmentFileSizes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.common.component.AbstractLifecycleComponent;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.index.IndexingPressure;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.node.Node;
Expand Down Expand Up @@ -65,6 +66,8 @@ public class ClusterService extends AbstractLifecycleComponent {

private RerouteService rerouteService;

private IndexingPressure indexingPressure;

public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this(settings, clusterSettings, new MasterService(settings, clusterSettings, threadPool),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool));
Expand Down Expand Up @@ -190,6 +193,22 @@ public MasterService getMasterService() {
return masterService;
}

/**
* Getter and Setter for Indexing Pressure, This method is added specifically for getting IndexingPressure
* instance in ODFE PA plugin via ClusterService. Indexing Pressure instances can be accessible only via
* Node and NodeService class but none of them are present in the createComponents signature of ES OSS Plugin
* interface.
* {@link org.opensearch.plugins.Plugin#createComponents}
*/

public void setIndexingPressure(IndexingPressure indexingPressure) {
this.indexingPressure = indexingPressure;
}

public IndexingPressure getIndexingPressure() {
return indexingPressure;
}

public ClusterApplierService getClusterApplierService() {
return clusterApplierService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexingPressure;
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesRequestCache;
Expand Down Expand Up @@ -253,6 +256,18 @@ public void apply(Settings value, Settings current, Settings previous) {
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING,
SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING,
ShardStateAction.FOLLOW_UP_REROUTE_PRIORITY_SETTING,
ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED,
ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED,
ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW,
ShardIndexingPressureSettings.SHARD_MIN_LIMIT,
ShardIndexingPressureStore.MAX_CACHE_STORE_SIZE,
ShardIndexingPressureMemoryManager.LOWER_OPERATING_FACTOR,
ShardIndexingPressureMemoryManager.OPTIMAL_OPERATING_FACTOR,
ShardIndexingPressureMemoryManager.UPPER_OPERATING_FACTOR,
ShardIndexingPressureMemoryManager.NODE_SOFT_LIMIT,
ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS,
ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT,
ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS,
InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING,
InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING,
InternalSnapshotsInfoService.INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING,
Expand Down
67 changes: 66 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexingPressure.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
Expand All @@ -36,6 +37,8 @@ public class IndexingPressure {
public static final Setting<ByteSizeValue> MAX_INDEXING_BYTES =
Setting.memorySizeSetting("indexing_pressure.memory.limit", "10%", Setting.Property.NodeScope);

private final ShardIndexingPressure shardIndexingPressure;

private static final Logger logger = LogManager.getLogger(IndexingPressure.class);

private final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0);
Expand All @@ -55,9 +58,11 @@ public class IndexingPressure {
private final long primaryAndCoordinatingLimits;
private final long replicaLimits;

public IndexingPressure(Settings settings) {
public IndexingPressure(Settings settings, ClusterService clusterService) {
this.primaryAndCoordinatingLimits = MAX_INDEXING_BYTES.get(settings).getBytes();
this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5);

shardIndexingPressure = new ShardIndexingPressure(this, clusterService, settings);
}


Expand Down Expand Up @@ -160,6 +165,66 @@ public long getCurrentReplicaBytes() {
return currentReplicaBytes.get();
}

public long addAndGetCurrentCombinedCoordinatingAndPrimaryBytes(long bytes) {
return currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
}

public long addAndGetCurrentCoordinatingBytes(long bytes) {
return currentCoordinatingBytes.addAndGet(bytes);
}

public long addAndGetCurrentPrimaryBytes(long bytes) {
return currentPrimaryBytes.addAndGet(bytes);
}

public long addAndGetCurrentReplicaBytes(long bytes) {
return currentReplicaBytes.addAndGet(bytes);
}

public long addAndGetTotalCombinedCoordinatingAndPrimaryBytes(long bytes) {
return totalCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
}

public long addAndGetTotalCoordinatingBytes(long bytes) {
return totalCoordinatingBytes.addAndGet(bytes);
}

public long addAndGetTotalPrimaryBytes(long bytes) {
return totalPrimaryBytes.addAndGet(bytes);
}

public long addAndGetTotalReplicaBytes(long bytes) {
return totalReplicaBytes.addAndGet(bytes);
}

public long getAndIncrementCoordinatingRejections() {
return coordinatingRejections.getAndIncrement();
}

public long getAndIncrementPrimaryRejections() {
return primaryRejections.getAndIncrement();
}

public long getAndIncrementReplicaRejections() {
return replicaRejections.getAndIncrement();
}

public long getPrimaryAndCoordinatingLimits() {
return this.primaryAndCoordinatingLimits;
}

public long getReplicaLimits() {
return this.replicaLimits;
}

public ShardIndexingPressure getShardIndexingPressure() {
return shardIndexingPressure;
}

public boolean isShardIndexingPressureEnabled() {
return shardIndexingPressure.isShardIndexingPressureEnabled();
}

public IndexingPressureStats stats() {
return new IndexingPressureStats(totalCombinedCoordinatingAndPrimaryBytes.get(), totalCoordinatingBytes.get(),
totalPrimaryBytes.get(), totalReplicaBytes.get(), currentCombinedCoordinatingAndPrimaryBytes.get(),
Expand Down
Loading

0 comments on commit 7a80862

Please sign in to comment.