From f755b3f52c5873da01a715bbc805da33e6d0d5d6 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Mon, 31 May 2021 11:31:17 +0900 Subject: [PATCH] Use AtomicLongUpdater for gRPC message ID (#3137) --- .../grpc/v1_6/TracingClientInterceptor.java | 16 +++++++++++++--- .../grpc/v1_6/TracingServerInterceptor.java | 12 +++++++++--- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java index 8dba29f74c53..1edd68e152e0 100644 --- a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java +++ b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java @@ -25,10 +25,14 @@ import io.opentelemetry.context.propagation.ContextPropagators; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import java.net.SocketAddress; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; final class TracingClientInterceptor implements ClientInterceptor { + @SuppressWarnings("rawtypes") + private static final AtomicLongFieldUpdater MESSAGE_ID_UPDATER = + AtomicLongFieldUpdater.newUpdater(TracingClientCallListener.class, "messageId"); + private final Instrumenter instrumenter; private final ContextPropagators propagators; @@ -97,10 +101,13 @@ public void sendMessage(REQUEST message) { final class TracingClientCallListener extends ForwardingClientCallListener.SimpleForwardingClientCallListener { + private final Context context; private final GrpcRequest request; - private final AtomicLong messageId = new AtomicLong(); + // Used by MESSAGE_ID_UPDATER + @SuppressWarnings("UnusedVariable") + volatile long messageId; TracingClientCallListener(Listener delegate, Context context, GrpcRequest request) { super(delegate); @@ -113,7 +120,10 @@ public void onMessage(RESPONSE message) { Span span = Span.fromContext(context); Attributes attributes = Attributes.of( - GrpcHelper.MESSAGE_TYPE, "SENT", GrpcHelper.MESSAGE_ID, messageId.incrementAndGet()); + GrpcHelper.MESSAGE_TYPE, + "SENT", + GrpcHelper.MESSAGE_ID, + MESSAGE_ID_UPDATER.incrementAndGet(this)); span.addEvent("message", attributes); try (Scope ignored = context.makeCurrent()) { delegate().onMessage(message); diff --git a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingServerInterceptor.java b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingServerInterceptor.java index dff0e139bf06..ddfd6bb338bc 100644 --- a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingServerInterceptor.java +++ b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingServerInterceptor.java @@ -20,10 +20,14 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; final class TracingServerInterceptor implements ServerInterceptor { + @SuppressWarnings("rawtypes") + private static final AtomicLongFieldUpdater MESSAGE_ID_UPDATER = + AtomicLongFieldUpdater.newUpdater(TracingServerCallListener.class, "messageId"); + private final Instrumenter instrumenter; private final boolean captureExperimentalSpanAttributes; @@ -89,7 +93,9 @@ final class TracingServerCallListener private final Context context; private final GrpcRequest request; - private final AtomicLong messageId = new AtomicLong(); + // Used by MESSAGE_ID_UPDATER + @SuppressWarnings("UnusedVariable") + volatile long messageId; TracingServerCallListener(Listener delegate, Context context, GrpcRequest request) { super(delegate); @@ -105,7 +111,7 @@ public void onMessage(REQUEST message) { GrpcHelper.MESSAGE_TYPE, "RECEIVED", GrpcHelper.MESSAGE_ID, - messageId.incrementAndGet()); + MESSAGE_ID_UPDATER.incrementAndGet(this)); Span.fromContext(context).addEvent("message", attributes); delegate().onMessage(message); }