Skip to content

Commit

Permalink
Add tracing instrumentation for indexing paths
Browse files Browse the repository at this point in the history
Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>
  • Loading branch information
rayshrey committed Sep 29, 2023
1 parent d656e3d commit 6a39f5c
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@
import org.opensearch.ingest.IngestService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.listener.TraceableActionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -133,6 +138,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final IndexingPressureService indexingPressureService;
private final IndicesService indicesService;
private final SystemIndices systemIndices;
private final Tracer tracer;

@Inject
public TransportBulkAction(
Expand All @@ -147,7 +153,8 @@ public TransportBulkAction(
AutoCreateIndex autoCreateIndex,
IndexingPressureService indexingPressureService,
IndicesService indicesService,
SystemIndices systemIndices
SystemIndices systemIndices,
Tracer tracer
) {
this(
threadPool,
Expand All @@ -162,7 +169,8 @@ public TransportBulkAction(
indexingPressureService,
indicesService,
systemIndices,
System::nanoTime
System::nanoTime,
tracer
);
}

Expand All @@ -179,7 +187,8 @@ public TransportBulkAction(
IndexingPressureService indexingPressureService,
IndicesService indicesService,
SystemIndices systemIndices,
LongSupplier relativeTimeProvider
LongSupplier relativeTimeProvider,
Tracer tracer
) {
super(BulkAction.NAME, transportService, actionFilters, BulkRequest::new, ThreadPool.Names.SAME);
Objects.requireNonNull(relativeTimeProvider);
Expand All @@ -196,6 +205,7 @@ public TransportBulkAction(
this.indicesService = indicesService;
this.systemIndices = systemIndices;
clusterService.addStateApplier(this.ingestForwarder);
this.tracer = tracer;
}

/**
Expand Down Expand Up @@ -642,52 +652,61 @@ protected void doRun() {
bulkShardRequest::ramBytesUsed,
isOnlySystem
);
shardBulkAction.execute(bulkShardRequest, ActionListener.runBefore(new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
// we may have no response if item failed
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}

docStatusStats.inc(bulkItemResponse.status());
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}
final Span span = tracer.startSpan(SpanBuilder.from("bulkShardAction", nodeId, bulkShardRequest));
ActionListener traceableActionListener = TraceableActionListener.create(
ActionListener.runBefore(new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
// we may have no response if item failed
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}

if (counter.decrementAndGet() == 0) {
finishHim();
docStatusStats.inc(bulkItemResponse.status());
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}

if (counter.decrementAndGet() == 0) {
finishHim();
}
}
}

@Override
public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
final DocWriteRequest<?> docWriteRequest = request.request();
final BulkItemResponse bulkItemResponse = new BulkItemResponse(
request.id(),
docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
);
@Override
public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
final DocWriteRequest<?> docWriteRequest = request.request();
final BulkItemResponse bulkItemResponse = new BulkItemResponse(
request.id(),
docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
);

docStatusStats.inc(bulkItemResponse.status());
responses.set(request.id(), bulkItemResponse);
}

docStatusStats.inc(bulkItemResponse.status());
responses.set(request.id(), bulkItemResponse);
if (counter.decrementAndGet() == 0) {
finishHim();
}
}

if (counter.decrementAndGet() == 0) {
finishHim();
private void finishHim() {
indicesService.addDocStatusStats(docStatusStats);
listener.onResponse(
new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))
);
}
}

private void finishHim() {
indicesService.addDocStatusStats(docStatusStats);
listener.onResponse(
new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))
);
}
}, releasable::close));
}, releasable::close),
span,
tracer
);
try(SpanScope spanScope = tracer.withSpanInScope(span)) {
shardBulkAction.execute(bulkShardRequest, traceableActionListener);
}
}
bulkRequest = null; // allow memory for bulk request items to be reclaimed before all items have been completed
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.opensearch.indices.SystemIndices;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.TransportChannel;
Expand Down Expand Up @@ -146,6 +147,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
* term validation in presence of a new primary.
*/
private final String transportPrimaryTermValidationAction;
private final Tracer tracer;

@Inject
public TransportShardBulkAction(
Expand All @@ -161,7 +163,8 @@ public TransportShardBulkAction(
IndexingPressureService indexingPressureService,
SegmentReplicationPressureService segmentReplicationPressureService,
RemoteStorePressureService remoteStorePressureService,
SystemIndices systemIndices
SystemIndices systemIndices,
Tracer tracer
) {
super(
settings,
Expand All @@ -183,6 +186,7 @@ public TransportShardBulkAction(
this.mappingUpdatedAction = mappingUpdatedAction;
this.segmentReplicationPressureService = segmentReplicationPressureService;
this.remoteStorePressureService = remoteStorePressureService;
this.tracer = tracer;

this.transportPrimaryTermValidationAction = ACTION_NAME + "[validate_primary_term]";

Expand All @@ -196,6 +200,11 @@ public TransportShardBulkAction(
);
}

@Override
protected Tracer getTracer() {
return tracer;
}

protected void handlePrimaryTermValidationRequest(
final PrimaryTermValidationRequest request,
final TransportChannel channel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskListener;
import org.opensearch.tasks.TaskManager;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.noop.NoopTracer;

import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -79,6 +81,10 @@ private Releasable registerChildNode(TaskId parentTask) {
}
}

protected Tracer getTracer() {
return NoopTracer.INSTANCE;
}

/**
* Use this method when the transport action call should result in creation of a new task associated with the call.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.listener.TraceableActionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.TransportChannel;
Expand Down Expand Up @@ -412,8 +416,9 @@ protected void handlePrimaryRequest(final ConcreteShardRequest<Request> request,
releasable::close
);

try {
new AsyncPrimaryAction(request, listener, (ReplicationTask) task).run();
final Span span = getTracer().startSpan(SpanBuilder.from("shardPrimaryWrite", clusterService.localNode().getId(), request.getRequest().shardId()));
try(SpanScope spanScope = getTracer().withSpanInScope(span)) {
new AsyncPrimaryAction(request, TraceableActionListener.create(listener, span, getTracer()), (ReplicationTask) task).run();
} catch (RuntimeException e) {
listener.onFailure(e);
}
Expand Down Expand Up @@ -693,8 +698,10 @@ protected void handleReplicaRequest(
releasable::close
);

try {
new AsyncReplicaAction(replicaRequest, listener, (ReplicationTask) task).run();
final Span span = getTracer().startSpan(SpanBuilder.from(
"shardReplicaWrite", clusterService.localNode().getId(), replicaRequest.getRequest().shardId()));
try(SpanScope spanScope = getTracer().withSpanInScope(span)) {
new AsyncReplicaAction(replicaRequest, TraceableActionListener.create(listener, span, getTracer()), (ReplicationTask) task).run();
} catch (RuntimeException e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,24 @@ private AttributeNames() {
* Action Name.
*/
public static final String TRANSPORT_ACTION = "action";

/**
* Index Name
*/
public static final String INDEX = "index";

/**
* Shard ID
*/
public static final String SHARD_ID = "shard_id";

/**
* Number of request items in bulk request
*/
public static final String NUM_REQUEST_ITEMS ="num_request_items";

/**
* Node ID
*/
public static final String NODE_ID = "node_id";
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@

package org.opensearch.telemetry.tracing;

import org.opensearch.action.bulk.BulkShardRequest;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.core.common.Strings;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.http.HttpRequest;
import org.opensearch.rest.RestRequest;
import org.opensearch.telemetry.tracing.attributes.Attributes;
Expand Down Expand Up @@ -67,6 +69,14 @@ public static SpanCreationContext from(String action, Transport.Connection conne
return SpanCreationContext.server().name(createSpanName(action, connection)).attributes(buildSpanAttributes(action, connection));
}

public static SpanCreationContext from(String spanName, String nodeId, BulkShardRequest bulkShardRequest) {
return SpanCreationContext.server().name(spanName).attributes(buildSpanAttributes(nodeId, bulkShardRequest));
}

public static SpanCreationContext from(String spanName, String nodeId, ShardId shardId) {
return SpanCreationContext.server().name(spanName).attributes(buildSpanAttributes(nodeId, shardId));
}

private static String createSpanName(HttpRequest httpRequest) {
return httpRequest.method().name() + SEPARATOR + httpRequest.uri();
}
Expand Down Expand Up @@ -127,4 +137,18 @@ private static Attributes buildSpanAttributes(String action, Transport.Connectio
return attributes;
}

private static Attributes buildSpanAttributes(String nodeId, BulkShardRequest bulkShardRequest) {
Attributes attributes = buildSpanAttributes(nodeId, bulkShardRequest.shardId());
attributes.addAttribute(AttributeNames.NUM_REQUEST_ITEMS, bulkShardRequest.items().length);
return attributes;
}

private static Attributes buildSpanAttributes(String nodeId, ShardId shardId) {
Attributes attributes = Attributes.create()
.addAttribute(AttributeNames.NODE_ID, nodeId)
.addAttribute(AttributeNames.INDEX, (shardId!=null)?shardId.getIndexName():"NULL")
.addAttribute(AttributeNames.SHARD_ID, (shardId!=null)?shardId.getId():-1);
return attributes;
}

}

0 comments on commit 6a39f5c

Please sign in to comment.