Skip to content

Commit

Permalink
HBASE-26132 Backport HBASE-25535 "Set span kind to CLIENT in Abstract…
Browse files Browse the repository at this point in the history
…RpcClient" to branch-2 (#3607)

9/17 commits of HBASE-22120, original commit bb8c496

Co-authored-by: Duo Zhang <zhangduo@apache.org>

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
taklwu authored Aug 20, 2021
1 parent 6a0f159 commit 9724df0
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
final Message param, Message returnType, final User ticket, final Address addr,
final RpcCallback<Message> callback) {
Span span = TraceUtil.createSpan("RpcClient.callMethod")
Span span = TraceUtil.createClientSpan("RpcClient.callMethod")
.setAttribute(TraceUtil.RPC_SERVICE_KEY, md.getService().getName())
.setAttribute(TraceUtil.RPC_METHOD_KEY, md.getName())
.setAttribute(TraceUtil.REMOTE_HOST_KEY, addr.getHostName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ public static Span createRemoteSpan(String name, Context ctx) {
return getGlobalTracer().spanBuilder(name).setParent(ctx).setSpanKind(Kind.SERVER).startSpan();
}

/**
* Create a span with {@link Kind#CLIENT}.
*/
public static Span createClientSpan(String name) {
return createSpan(name, Kind.CLIENT);
}

/**
* Trace an asynchronous operation for a table.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;

import io.opentelemetry.api.trace.Span.Kind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.trace.data.SpanData;
Expand Down Expand Up @@ -455,14 +456,22 @@ private SpanData waitSpan(String name) {
return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get();
}

private void assertRpcAttribute(SpanData data, String methodName, InetSocketAddress addr) {
private void assertRpcAttribute(SpanData data, String methodName, InetSocketAddress addr,
Kind kind) {
assertEquals(SERVICE.getDescriptorForType().getName(),
data.getAttributes().get(TraceUtil.RPC_SERVICE_KEY));
assertEquals(methodName, data.getAttributes().get(TraceUtil.RPC_METHOD_KEY));
if (addr != null) {
assertEquals(addr.getHostName(), data.getAttributes().get(TraceUtil.REMOTE_HOST_KEY));
assertEquals(addr.getPort(), data.getAttributes().get(TraceUtil.REMOTE_PORT_KEY).intValue());
}
assertEquals(kind, data.getKind());
}

private void assertRemoteSpan() {
SpanData data = waitSpan("RpcServer.process");
assertTrue(data.getParentSpanContext().isRemote());
assertEquals(Kind.SERVER, data.getKind());
}

@Test
Expand All @@ -474,8 +483,10 @@ public void testTracing() throws IOException, ServiceException {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build());
assertRpcAttribute(waitSpan("RpcClient.callMethod"), "pause", rpcServer.getListenerAddress());
assertRpcAttribute(waitSpan("RpcServer.callMethod"), "pause", null);
assertRpcAttribute(waitSpan("RpcClient.callMethod"), "pause", rpcServer.getListenerAddress(),
Kind.CLIENT);
assertRpcAttribute(waitSpan("RpcServer.callMethod"), "pause", null, Kind.INTERNAL);
assertRemoteSpan();
assertSameTraceId();
for (SpanData data : traceRule.getSpans()) {
assertThat(
Expand All @@ -487,8 +498,10 @@ public void testTracing() throws IOException, ServiceException {
traceRule.clearSpans();
assertThrows(ServiceException.class,
() -> stub.error(null, EmptyRequestProto.getDefaultInstance()));
assertRpcAttribute(waitSpan("RpcClient.callMethod"), "error", rpcServer.getListenerAddress());
assertRpcAttribute(waitSpan("RpcServer.callMethod"), "error", null);
assertRpcAttribute(waitSpan("RpcClient.callMethod"), "error", rpcServer.getListenerAddress(),
Kind.CLIENT);
assertRpcAttribute(waitSpan("RpcServer.callMethod"), "error", null, Kind.INTERNAL);
assertRemoteSpan();
assertSameTraceId();
for (SpanData data : traceRule.getSpans()) {
assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode());
Expand Down

0 comments on commit 9724df0

Please sign in to comment.