Skip to content

Commit

Permalink
Use AtomicLongUpdater for gRPC message ID (#3137)
Browse files Browse the repository at this point in the history
  • Loading branch information
Anuraag Agrawal authored May 31, 2021
1 parent c3ed9a3 commit f755b3f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

final class TracingClientInterceptor implements ClientInterceptor {

@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<TracingClientCallListener> MESSAGE_ID_UPDATER =
AtomicLongFieldUpdater.newUpdater(TracingClientCallListener.class, "messageId");

private final Instrumenter<GrpcRequest, Status> instrumenter;
private final ContextPropagators propagators;

Expand Down Expand Up @@ -97,10 +101,13 @@ public void sendMessage(REQUEST message) {

final class TracingClientCallListener<RESPONSE>
extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RESPONSE> {

private final Context context;
private final GrpcRequest request;

private final AtomicLong messageId = new AtomicLong();
// Used by MESSAGE_ID_UPDATER
@SuppressWarnings("UnusedVariable")
volatile long messageId;

TracingClientCallListener(Listener<RESPONSE> delegate, Context context, GrpcRequest request) {
super(delegate);
Expand All @@ -113,7 +120,10 @@ public void onMessage(RESPONSE message) {
Span span = Span.fromContext(context);
Attributes attributes =
Attributes.of(
GrpcHelper.MESSAGE_TYPE, "SENT", GrpcHelper.MESSAGE_ID, messageId.incrementAndGet());
GrpcHelper.MESSAGE_TYPE,
"SENT",
GrpcHelper.MESSAGE_ID,
MESSAGE_ID_UPDATER.incrementAndGet(this));
span.addEvent("message", attributes);
try (Scope ignored = context.makeCurrent()) {
delegate().onMessage(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

final class TracingServerInterceptor implements ServerInterceptor {

@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<TracingServerCallListener> MESSAGE_ID_UPDATER =
AtomicLongFieldUpdater.newUpdater(TracingServerCallListener.class, "messageId");

private final Instrumenter<GrpcRequest, Status> instrumenter;
private final boolean captureExperimentalSpanAttributes;

Expand Down Expand Up @@ -89,7 +93,9 @@ final class TracingServerCallListener<REQUEST>
private final Context context;
private final GrpcRequest request;

private final AtomicLong messageId = new AtomicLong();
// Used by MESSAGE_ID_UPDATER
@SuppressWarnings("UnusedVariable")
volatile long messageId;

TracingServerCallListener(Listener<REQUEST> delegate, Context context, GrpcRequest request) {
super(delegate);
Expand All @@ -105,7 +111,7 @@ public void onMessage(REQUEST message) {
GrpcHelper.MESSAGE_TYPE,
"RECEIVED",
GrpcHelper.MESSAGE_ID,
messageId.incrementAndGet());
MESSAGE_ID_UPDATER.incrementAndGet(this));
Span.fromContext(context).addEvent("message", attributes);
delegate().onMessage(message);
}
Expand Down

0 comments on commit f755b3f

Please sign in to comment.