Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add framework level constructs to track shard indexing pressure. #525

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1283,7 +1283,15 @@ private void verifyShardInfo(XContentParser parser, boolean primary, boolean inc
assertTrue(parser.currentName().equals("id")
|| parser.currentName().equals("name")
|| parser.currentName().equals("transport_address")
|| parser.currentName().equals("weight_ranking"));
|| parser.currentName().equals("weight_ranking")
|| parser.currentName().equals("attributes"));
// Skip past all the attributes object
if (parser.currentName().equals("attributes")) {
while(!parser.nextToken().equals(Token.END_OBJECT)) {
parser.nextToken();
}
break;
}
} else {
assertTrue(token.isValue());
assertNotNull(parser.text());
Expand Down Expand Up @@ -1403,6 +1411,11 @@ private String verifyNodeDecisionPrologue(XContentParser parser) throws IOExcept
parser.nextToken();
assertNotNull(parser.text());
parser.nextToken();
assertEquals("node_attributes", parser.currentName());
// skip past all the node_attributes object
while (!parser.currentName().equals("node_decision")) {
parser.nextToken();
}
assertEquals("node_decision", parser.currentName());
parser.nextToken();
return nodeName;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.index.ShardIndexingPressureSettings;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -53,6 +54,8 @@ public class CommonStatsFlags implements Writeable, Cloneable {
private String[] completionDataFields = null;
private boolean includeSegmentFileSizes = false;
private boolean includeUnloadedSegments = false;
private boolean includeAllShardIndexingPressureTrackers = false;
private boolean includeOnlyTopIndexingPressureMetrics = false;

/**
* @param flags flags to set. If no flags are supplied, default flags will be set.
Expand Down Expand Up @@ -80,6 +83,15 @@ public CommonStatsFlags(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(LegacyESVersion.V_7_2_0)) {
includeUnloadedSegments = in.readBoolean();
}
if (in.getVersion().onOrAfter(LegacyESVersion.V_7_10_2)) {
includeAllShardIndexingPressureTrackers = in.readBoolean();
includeOnlyTopIndexingPressureMetrics = in.readBoolean();
} else if (in.getVersion().onOrAfter(LegacyESVersion.V_7_9_0)) {
if (ShardIndexingPressureSettings.isShardIndexingPressureAttributeEnabled()) {
includeAllShardIndexingPressureTrackers = in.readBoolean();
includeOnlyTopIndexingPressureMetrics = in.readBoolean();
}
}
}

@Override
Expand All @@ -98,6 +110,15 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(LegacyESVersion.V_7_2_0)) {
out.writeBoolean(includeUnloadedSegments);
}
if (out.getVersion().onOrAfter(LegacyESVersion.V_7_10_2)) {
out.writeBoolean(includeAllShardIndexingPressureTrackers);
out.writeBoolean(includeOnlyTopIndexingPressureMetrics);
} else if (out.getVersion().onOrAfter(LegacyESVersion.V_7_9_0)) {
if (ShardIndexingPressureSettings.isShardIndexingPressureAttributeEnabled()) {
out.writeBoolean(includeAllShardIndexingPressureTrackers);
out.writeBoolean(includeOnlyTopIndexingPressureMetrics);
}
}
}

/**
Expand All @@ -111,6 +132,8 @@ public CommonStatsFlags all() {
completionDataFields = null;
includeSegmentFileSizes = false;
includeUnloadedSegments = false;
includeAllShardIndexingPressureTrackers = false;
includeOnlyTopIndexingPressureMetrics = false;
return this;
}

Expand All @@ -125,6 +148,8 @@ public CommonStatsFlags clear() {
completionDataFields = null;
includeSegmentFileSizes = false;
includeUnloadedSegments = false;
includeAllShardIndexingPressureTrackers = false;
includeOnlyTopIndexingPressureMetrics = false;
return this;
}

Expand Down Expand Up @@ -198,10 +223,28 @@ public CommonStatsFlags includeUnloadedSegments(boolean includeUnloadedSegments)
return this;
}

public CommonStatsFlags includeAllShardIndexingPressureTrackers(boolean includeAllShardPressureTrackers) {
this.includeAllShardIndexingPressureTrackers = includeAllShardPressureTrackers;
return this;
}

public CommonStatsFlags includeOnlyTopIndexingPressureMetrics(boolean includeOnlyTopIndexingPressureMetrics) {
this.includeOnlyTopIndexingPressureMetrics = includeOnlyTopIndexingPressureMetrics;
return this;
}

public boolean includeUnloadedSegments() {
return this.includeUnloadedSegments;
}

public boolean includeAllShardIndexingPressureTrackers() {
return this.includeAllShardIndexingPressureTrackers;
}

public boolean includeOnlyTopIndexingPressureMetrics() {
return this.includeOnlyTopIndexingPressureMetrics;
}

public boolean includeSegmentFileSizes() {
return this.includeSegmentFileSizes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
import org.opensearch.common.util.concurrent.AtomicArray;
import org.opensearch.index.Index;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.IndexingPressure;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.VersionType;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.ShardId;
Expand Down Expand Up @@ -127,25 +127,26 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final NodeClient client;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private static final String DROPPED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated";
private final IndexingPressure indexingPressure;
private final IndexingPressureService indexingPressureService;
private final SystemIndices systemIndices;

@Inject
public TransportBulkAction(ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, IngestService ingestService,
TransportShardBulkAction shardBulkAction, NodeClient client, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex, IndexingPressure indexingPressure, SystemIndices systemIndices) {
TransportShardBulkAction shardBulkAction, NodeClient client, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex, IndexingPressureService indexingPressureService,
SystemIndices systemIndices) {
this(threadPool, transportService, clusterService, ingestService, shardBulkAction, client, actionFilters,
indexNameExpressionResolver, autoCreateIndex, indexingPressure, systemIndices, System::nanoTime);
indexNameExpressionResolver, autoCreateIndex, indexingPressureService, systemIndices, System::nanoTime);
}

public TransportBulkAction(ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, IngestService ingestService,
TransportShardBulkAction shardBulkAction, NodeClient client,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex, IndexingPressure indexingPressure, SystemIndices systemIndices,
LongSupplier relativeTimeProvider) {
AutoCreateIndex autoCreateIndex, IndexingPressureService indexingPressureService,
SystemIndices systemIndices, LongSupplier relativeTimeProvider) {
super(BulkAction.NAME, transportService, actionFilters, BulkRequest::new, ThreadPool.Names.SAME);
Objects.requireNonNull(relativeTimeProvider);
this.threadPool = threadPool;
Expand All @@ -157,7 +158,7 @@ public TransportBulkAction(ThreadPool threadPool, TransportService transportServ
this.ingestForwarder = new IngestActionForwarder(transportService);
this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.indexingPressure = indexingPressure;
this.indexingPressureService = indexingPressureService;
this.systemIndices = systemIndices;
clusterService.addStateApplier(this.ingestForwarder);
}
Expand All @@ -184,7 +185,7 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest<?> docWriteReque
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
final long indexingBytes = bulkRequest.ramBytesUsed();
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingBytes, isOnlySystem);
final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(indexingBytes, isOnlySystem);
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
try {
Expand Down Expand Up @@ -562,7 +563,12 @@ protected void doRun() {
if (task != null) {
bulkShardRequest.setParentTask(nodeId, task.getId());
}
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
// Add the shard level accounting for coordinating and supply the listener
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(shardId,
bulkShardRequest.ramBytesUsed(), isOnlySystem);

shardBulkAction.execute(bulkShardRequest, ActionListener.runBefore(new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
Expand Down Expand Up @@ -595,7 +601,7 @@ private void finishHim() {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
buildTookInMillis(startTimeNanos)));
}
});
}, releasable::close));
}
bulkRequest = null; // allow memory for bulk request items to be reclaimed before all items have been completed
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.IndexingPressure;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.index.get.GetResult;
Expand Down Expand Up @@ -115,9 +115,9 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
IndexingPressure indexingPressure, SystemIndices systemIndices) {
IndexingPressureService indexingPressureService, SystemIndices systemIndices) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
BulkShardRequest::new, BulkShardRequest::new, EXECUTOR_NAME_FUNCTION, false, indexingPressure, systemIndices);
BulkShardRequest::new, BulkShardRequest::new, EXECUTOR_NAME_FUNCTION, false, indexingPressureService, systemIndices);
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexingPressure;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -80,11 +80,11 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
public TransportResyncReplicationAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexingPressure indexingPressure, SystemIndices systemIndices) {
IndexingPressureService indexingPressureService, SystemIndices systemIndices) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
ResyncReplicationRequest::new, ResyncReplicationRequest::new, EXECUTOR_NAME_FUNCTION,
true, /* we should never reject resync because of thread pool capacity on primary */
indexingPressure, systemIndices);
indexingPressureService, systemIndices);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexingPressure;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.mapper.MapperParsingException;
import org.opensearch.index.shard.IndexShard;
Expand All @@ -76,7 +76,7 @@ public abstract class TransportWriteAction<
Response extends ReplicationResponse & WriteResponse
> extends TransportReplicationAction<Request, ReplicaRequest, Response> {

protected final IndexingPressure indexingPressure;
protected final IndexingPressureService indexingPressureService;
protected final SystemIndices systemIndices;

private final Function<IndexShard, String> executorFunction;
Expand All @@ -85,13 +85,14 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters, Writeable.Reader<Request> request,
Writeable.Reader<ReplicaRequest> replicaRequest, Function<IndexShard, String> executorFunction,
boolean forceExecutionOnPrimary, IndexingPressure indexingPressure, SystemIndices systemIndices) {
boolean forceExecutionOnPrimary, IndexingPressureService indexingPressureService,
SystemIndices systemIndices) {
// We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the
// ThreadPool.Names.WRITE/ThreadPool.Names.SYSTEM_WRITE thread pools in this class.
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary);
this.executorFunction = executorFunction;
this.indexingPressure = indexingPressure;
this.indexingPressureService = indexingPressureService;
this.systemIndices = systemIndices;
}

Expand All @@ -101,7 +102,7 @@ protected String executor(IndexShard shard) {

@Override
protected Releasable checkOperationLimits(Request request) {
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), force(request));
return indexingPressureService.markPrimaryOperationStarted(request.shardId, primaryOperationSize(request), force(request));
}

protected boolean force(ReplicatedWriteRequest<?> request) {
Expand All @@ -119,15 +120,16 @@ protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal
// If this primary request was received from a local reroute initiated by the node client, we
// must mark a new primary operation local to the coordinating node.
if (localRerouteInitiatedByNodeClient) {
return indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(primaryOperationSize(request));
return indexingPressureService.markPrimaryOperationLocalToCoordinatingNodeStarted(request.shardId,
primaryOperationSize(request));
} else {
return () -> {};
}
} else {
// If this primary request was received directly from the network, we must mark a new primary
// operation. This happens if the write action skips the reroute step (ex: rsync) or during
// primary delegation, after the primary relocation hand-off.
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), force(request));
return indexingPressureService.markPrimaryOperationStarted(request.shardId, primaryOperationSize(request), force(request));
}
}

Expand All @@ -137,7 +139,7 @@ protected long primaryOperationSize(Request request) {

@Override
protected Releasable checkReplicaLimits(ReplicaRequest request) {
return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), force(request));
return indexingPressureService.markReplicaOperationStarted(request.shardId, replicaOperationSize(request), force(request));
}

protected long replicaOperationSize(ReplicaRequest request) {
Expand Down
Loading