Skip to content

Commit

Permalink
Add plumbing logic to invoke shard indexing pressure during write ope…
Browse files Browse the repository at this point in the history
…ration. (#478)

Signed-off-by: Saurabh Singh <sisurab@amazon.com>
  • Loading branch information
getsaurabh02 committed Apr 5, 2021
1 parent 7a80862 commit 8ab960a
Show file tree
Hide file tree
Showing 4 changed files with 630 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,12 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest<?> docWriteReque
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
final long indexingBytes = bulkRequest.ramBytesUsed();
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingBytes, isOnlySystem);
final Releasable releasable;
if (indexingPressure.isShardIndexingPressureEnabled() == false) {
releasable = indexingPressure.markCoordinatingOperationStarted(indexingBytes, isOnlySystem);
} else {
releasable = () -> {};
}
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
try {
Expand Down Expand Up @@ -548,7 +553,17 @@ protected void doRun() {
if (task != null) {
bulkShardRequest.setParentTask(nodeId, task.getId());
}
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
// Add the shard level accounting for coordinating and supply the listener
final Releasable releasable;
if (indexingPressure.getShardIndexingPressure().isShardIndexingPressureEnabled()) {
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(),
systemIndices);
releasable = indexingPressure.getShardIndexingPressure()
.markCoordinatingOperationStarted(shardId, bulkShardRequest.ramBytesUsed(), isOnlySystem);
} else {
releasable = () -> {};
}
shardBulkAction.execute(bulkShardRequest, ActionListener.runBefore(new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
Expand Down Expand Up @@ -581,7 +596,7 @@ private void finishHim() {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
buildTookInMillis(startTimeNanos)));
}
});
}, releasable::close));
}
bulkRequest = null; // allow memory for bulk request items to be reclaimed before all items have been completed
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,30 +288,46 @@ private void handleOperationRequest(final Request request, final TransportChanne
Releasable releasable = checkOperationLimits(request);
ActionListener<Response> listener =
ActionListener.runBefore(new ChannelActionListener<>(channel, actionName, request), releasable::close);
runReroutePhase(task, request, listener, false);
// Add the shard level accounting for primary and chain the listener
Releasable shardReleasable = checkShardOperationLimits(request);
ActionListener<Response> finalListener = ActionListener.runBefore(listener, shardReleasable::close);
runReroutePhase(task, request, finalListener, false);
}

protected Releasable checkOperationLimits(final Request request) {
return () -> {};
}

protected Releasable checkShardOperationLimits(final Request request) {
return () -> {};
}

protected void handlePrimaryRequest(final ConcreteShardRequest<Request> request, final TransportChannel channel, final Task task) {
Releasable releasable = checkPrimaryLimits(request.getRequest(), request.sentFromLocalReroute(),
request.localRerouteInitiatedByNodeClient());
ActionListener<Response> listener =
ActionListener.runBefore(new ChannelActionListener<>(channel, transportPrimaryAction, request), releasable::close);
// Add the shard level accounting for transport primary and chain the listener
Releasable shardReleasable = checkShardPrimaryLimits(request.getRequest(), request.sentFromLocalReroute(),
request.localRerouteInitiatedByNodeClient());
ActionListener<Response> finalListener = ActionListener.runBefore(listener, shardReleasable::close);

try {
new AsyncPrimaryAction(request, listener, (ReplicationTask) task).run();
new AsyncPrimaryAction(request, finalListener, (ReplicationTask) task).run();
} catch (RuntimeException e) {
listener.onFailure(e);
finalListener.onFailure(e);
}
}

protected Releasable checkPrimaryLimits(final Request request, boolean rerouteWasLocal, boolean localRerouteInitiatedByNodeClient) {
return () -> {};
}

protected Releasable checkShardPrimaryLimits(final Request request, boolean rerouteWasLocal,
boolean localRerouteInitiatedByNodeClient) {
return () -> {};
}

class AsyncPrimaryAction extends AbstractRunnable {
private final ActionListener<Response> onCompletionListener;
private final ReplicationTask replicationTask;
Expand Down Expand Up @@ -524,18 +540,24 @@ protected void handleReplicaRequest(final ConcreteReplicaRequest<ReplicaRequest>
Releasable releasable = checkReplicaLimits(replicaRequest.getRequest());
ActionListener<ReplicaResponse> listener =
ActionListener.runBefore(new ChannelActionListener<>(channel, transportReplicaAction, replicaRequest), releasable::close);

// Add the shard level accounting for replica and chain the listener
Releasable shardReleasable = checkShardReplicaLimits(replicaRequest.getRequest());
ActionListener<ReplicaResponse> finalListener = ActionListener.runBefore(listener, shardReleasable::close);
try {
new AsyncReplicaAction(replicaRequest, listener, (ReplicationTask) task).run();
new AsyncReplicaAction(replicaRequest, finalListener, (ReplicationTask) task).run();
} catch (RuntimeException e) {
listener.onFailure(e);
finalListener.onFailure(e);
}
}

protected Releasable checkReplicaLimits(final ReplicaRequest request) {
return () -> {};
}

protected Releasable checkShardReplicaLimits(final ReplicaRequest request) {
return () -> {};
}

public static class RetryOnReplicaException extends OpenSearchException {

public RetryOnReplicaException(ShardId shardId, String msg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexingPressure;
import org.opensearch.index.ShardIndexingPressure;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.mapper.MapperParsingException;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -64,6 +65,7 @@ public abstract class TransportWriteAction<
> extends TransportReplicationAction<Request, ReplicaRequest, Response> {

protected final IndexingPressure indexingPressure;
private final ShardIndexingPressure shardIndexingPressure;
protected final SystemIndices systemIndices;

private final Function<IndexShard, String> executorFunction;
Expand All @@ -79,6 +81,7 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe
request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary);
this.executorFunction = executorFunction;
this.indexingPressure = indexingPressure;
this.shardIndexingPressure = indexingPressure.getShardIndexingPressure();
this.systemIndices = systemIndices;
}

Expand All @@ -88,7 +91,20 @@ protected String executor(IndexShard shard) {

@Override
protected Releasable checkOperationLimits(Request request) {
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), force(request));
if (indexingPressure.isShardIndexingPressureEnabled() == false) {
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), force(request));
} else {
return () -> {};
}
}

@Override
protected Releasable checkShardOperationLimits(Request request) {
if (shardIndexingPressure.isShardIndexingPressureEnabled()) {
return shardIndexingPressure.markPrimaryOperationStarted(request.shardId, primaryOperationSize(request), force(request));
} else {
return () -> {};
}
}

protected boolean force(ReplicatedWriteRequest<?> request) {
Expand All @@ -102,19 +118,46 @@ protected boolean isSystemShard(ShardId shardId) {

@Override
protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal, boolean localRerouteInitiatedByNodeClient) {
if (rerouteWasLocal) {
// If this primary request was received from a local reroute initiated by the node client, we
// must mark a new primary operation local to the coordinating node.
if (localRerouteInitiatedByNodeClient) {
return indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(primaryOperationSize(request));
if (indexingPressure.isShardIndexingPressureEnabled() == false) {
if (rerouteWasLocal) {
// If this primary request was received from a local reroute initiated by the node client, we
// must mark a new primary operation local to the coordinating node.
if (localRerouteInitiatedByNodeClient) {
return indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(primaryOperationSize(request));
} else {
return () -> {};
}
} else {
return () -> {};
// If this primary request was received directly from the network, we must mark a new primary
// operation. This happens if the write action skips the reroute step (ex: rsync) or during
// primary delegation, after the primary relocation hand-off.
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), force(request));
}
} else {
// If this primary request was received directly from the network, we must mark a new primary
// operation. This happens if the write action skips the reroute step (ex: rsync) or during
// primary delegation, after the primary relocation hand-off.
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), force(request));
return () -> {};
}
}

@Override
protected Releasable checkShardPrimaryLimits(Request request, boolean rerouteWasLocal, boolean localRerouteInitiatedByNodeClient) {
if (shardIndexingPressure.isShardIndexingPressureEnabled()) {
if (rerouteWasLocal) {
// If this primary request was received from a local reroute initiated by the node client, we
// must mark a new primary operation local to the coordinating node.
if (localRerouteInitiatedByNodeClient) {
return shardIndexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(request.shardId,
primaryOperationSize(request));
} else {
return () -> {};
}
} else {
// If this primary request was received directly from the network, we must mark a new primary
// operation. This happens if the write action skips the reroute step (ex: rsync) or during
// primary delegation, after the primary relocation hand-off.
return shardIndexingPressure.markPrimaryOperationStarted(request.shardId, primaryOperationSize(request), force(request));
}
} else {
return () -> {};
}
}

Expand All @@ -124,7 +167,20 @@ protected long primaryOperationSize(Request request) {

@Override
protected Releasable checkReplicaLimits(ReplicaRequest request) {
return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), force(request));
if (indexingPressure.isShardIndexingPressureEnabled() == false) {
return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), force(request));
} else {
return () -> {};
}
}

@Override
protected Releasable checkShardReplicaLimits(ReplicaRequest request) {
if (shardIndexingPressure.isShardIndexingPressureEnabled()) {
return shardIndexingPressure.markReplicaOperationStarted(request.shardId, replicaOperationSize(request), force(request));
} else {
return () -> {};
}
}

protected long replicaOperationSize(ReplicaRequest request) {
Expand Down
Loading

0 comments on commit 8ab960a

Please sign in to comment.