Skip to content

Commit

Permalink
PR Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ndimiduk committed Jan 24, 2022
1 parent 41092f6 commit 2e7bc1b
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RpcSystem;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -47,27 +45,14 @@ public class IpcClientSpanBuilder implements Supplier<Span> {
private String name;
private final Map<AttributeKey<?>, Object> attributes = new HashMap<>();

public IpcClientSpanBuilder(
final Supplier<String> connectionStringSupplier,
final Supplier<User> userSupplier
) {
// TODO: this constructor is a hack used by AbstractRpcClient because it does not have access
// to the AsyncConnectionImpl within which it resides. Use this for now, until we come back
// and plumb through the instance.
ConnectionSpanBuilder.populateConnectionAttributes(attributes, connectionStringSupplier,
userSupplier);
}

@Override
public Span get() {
return build();
}

public IpcClientSpanBuilder setMethodDescriptor(final Descriptors.MethodDescriptor md) {
// it happens that `getFullName` returns a string in the $package.$service format required by
// the otel RPC specification. Use it for now; might have to parse the value in the future.
final String packageAndService = md.getService().getFullName();
final String method = md.getName();
final String packageAndService = getRpcPackageAndService(md);
final String method = getRpcName(md);
this.name = packageAndService + "/" + method;
populateMethodDescriptorAttributes(attributes, md);
return this;
Expand Down Expand Up @@ -99,12 +84,20 @@ static void populateMethodDescriptorAttributes(
final Map<AttributeKey<?>, Object> attributes,
final Descriptors.MethodDescriptor md
) {
// it happens that `getFullName` returns a string in the $package.$service format required by
// the otel RPC specification. Use it for now; might have to parse the value in the future.
final String packageAndService = md.getService().getFullName();
final String method = md.getName();
final String packageAndService = getRpcPackageAndService(md);
final String method = getRpcName(md);
attributes.put(RPC_SYSTEM, RpcSystem.HBASE_RPC.name());
attributes.put(RPC_SERVICE, packageAndService);
attributes.put(RPC_METHOD, method);
}

private static String getRpcPackageAndService(final Descriptors.MethodDescriptor md) {
// it happens that `getFullName` returns a string in the $package.$service format required by
// the otel RPC specification. Use it for now; might have to parse the value in the future.
return md.getService().getFullName();
}

private static String getRpcName(final Descriptors.MethodDescriptor md) {
return md.getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -393,18 +393,10 @@ private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
}
}

private User resolveUser() {
try {
return userProvider.getCurrent();
} catch (IOException e) {
return null;
}
}

private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
final Message param, Message returnType, final User ticket, final Address addr,
final RpcCallback<Message> callback) {
Span span = new IpcClientSpanBuilder(() -> null, this::resolveUser)
Span span = new IpcClientSpanBuilder()
.setMethodDescriptor(md)
.setRemoteAddress(addr)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,10 @@ public static Matcher<SpanData> hasTraceId(String traceId) {
}

public static Matcher<SpanData> hasTraceId(Matcher<String> matcher) {
return new TypeSafeMatcher<SpanData>() {
@Override protected boolean matchesSafely(SpanData item) {
return item.getTraceId() != null && matcher.matches(item.getTraceId());
}
@Override public void describeTo(Description description) {
description.appendText("SpanData with a traceId that ").appendDescriptionOf(matcher);
return new FeatureMatcher<SpanData, String>(
matcher, "SpanData with a traceId that ", "traceId") {
@Override protected String featureValueOf(SpanData item) {
return item.getTraceId();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@
*/
package org.apache.hadoop.hbase.ipc;

import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub;
import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasDuration;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasTraceId;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.everyItem;
Expand All @@ -43,7 +43,6 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
Expand All @@ -68,17 +67,14 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.hamcrest.Matcher;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;

import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
Expand Down Expand Up @@ -465,49 +461,51 @@ private SpanData waitSpan(Matcher<SpanData> matcher) {
.orElseThrow(AssertionError::new);
}

private static String buildIpcSpanName(Descriptors.MethodDescriptor md) {
final String packageAndService = md.getService().getFullName();
final String method = md.getName();
return packageAndService + "/" + method;
private static String buildIpcSpanName(final String packageAndService, final String methodName) {
return packageAndService + "/" + methodName;
}

private static Matcher<SpanData> buildIpcClientSpanMatcher(Descriptors.MethodDescriptor md) {
private static Matcher<SpanData> buildIpcClientSpanMatcher(
final String packageAndService,
final String methodName
) {
return allOf(
hasName(buildIpcSpanName(md)),
hasName(buildIpcSpanName(packageAndService, methodName)),
hasKind(SpanKind.CLIENT)
);
}

private static Matcher<SpanData> buildIpcServerSpanMatcher(Descriptors.MethodDescriptor md) {
private static Matcher<SpanData> buildIpcServerSpanMatcher(
final String packageAndService,
final String methodName
) {
return allOf(
hasName(buildIpcSpanName(md)),
hasName(buildIpcSpanName(packageAndService, methodName)),
hasKind(SpanKind.SERVER)
);
}

private static Matcher<SpanData> buildIpcClientSpanAttributesMatcher(
Descriptors.MethodDescriptor md,
InetSocketAddress isa
final String packageAndService,
final String methodName,
final InetSocketAddress isa
) {
final String packageAndService = md.getService().getFullName();
final String method = md.getName();
return hasAttributes(allOf(
containsEntry("rpc.system", "HBASE_RPC"),
containsEntry("rpc.service", packageAndService),
containsEntry("rpc.method", method),
containsEntry("rpc.method", methodName),
containsEntry("net.peer.name", isa.getHostName()),
containsEntry(AttributeKey.longKey("net.peer.port"), (long) isa.getPort())));
}

private static Matcher<SpanData> buildIpcServerSpanAttributesMatcher(
Descriptors.MethodDescriptor md
final String packageAndService,
final String methodName
) {
final String packageAndService = md.getService().getFullName();
final String method = md.getName();
return hasAttributes(allOf(
containsEntry("rpc.system", "HBASE_RPC"),
containsEntry("rpc.service", packageAndService),
containsEntry("rpc.method", method)));
containsEntry("rpc.method", methodName)));
}

private void assertRemoteSpan() {
Expand All @@ -528,12 +526,14 @@ public void testTracingSuccessIpc() throws IOException, ServiceException {
stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build());
// use the ISA from the running server so that we can get the port selected.
final InetSocketAddress isa = rpcServer.getListenerAddress();
final Descriptors.MethodDescriptor pauseMd =
SERVICE.getDescriptorForType().findMethodByName("pause");
final SpanData pauseClientSpan = waitSpan(buildIpcClientSpanMatcher(pauseMd));
assertThat(pauseClientSpan, buildIpcClientSpanAttributesMatcher(pauseMd, isa));
final SpanData pauseServerSpan = waitSpan(buildIpcServerSpanMatcher(pauseMd));
assertThat(pauseServerSpan, buildIpcServerSpanAttributesMatcher(pauseMd));
final SpanData pauseClientSpan = waitSpan(buildIpcClientSpanMatcher(
"hbase.test.pb.TestProtobufRpcProto", "pause"));
assertThat(pauseClientSpan, buildIpcClientSpanAttributesMatcher(
"hbase.test.pb.TestProtobufRpcProto", "pause", isa));
final SpanData pauseServerSpan = waitSpan(buildIpcServerSpanMatcher(
"hbase.test.pb.TestProtobufRpcProto", "pause"));
assertThat(pauseServerSpan, buildIpcServerSpanAttributesMatcher(
"hbase.test.pb.TestProtobufRpcProto", "pause"));
assertRemoteSpan();
assertFalse("no spans provided", traceRule.getSpans().isEmpty());
assertThat(traceRule.getSpans(), everyItem(allOf(
Expand All @@ -556,12 +556,14 @@ public void testTracingErrorIpc() throws IOException {
assertThrows(ServiceException.class,
() -> stub.error(null, EmptyRequestProto.getDefaultInstance()));
final InetSocketAddress isa = rpcServer.getListenerAddress();
final Descriptors.MethodDescriptor errorMd =
SERVICE.getDescriptorForType().findMethodByName("error");
final SpanData errorClientSpan = waitSpan(buildIpcClientSpanMatcher(errorMd));
assertThat(errorClientSpan, buildIpcClientSpanAttributesMatcher(errorMd, isa));
final SpanData errorServerSpan = waitSpan(buildIpcServerSpanMatcher(errorMd));
assertThat(errorServerSpan, buildIpcServerSpanAttributesMatcher(errorMd));
final SpanData errorClientSpan = waitSpan(buildIpcClientSpanMatcher(
"hbase.test.pb.TestProtobufRpcProto", "error"));
assertThat(errorClientSpan, buildIpcClientSpanAttributesMatcher(
"hbase.test.pb.TestProtobufRpcProto", "error", isa));
final SpanData errorServerSpan = waitSpan(buildIpcServerSpanMatcher(
"hbase.test.pb.TestProtobufRpcProto", "error"));
assertThat(errorServerSpan, buildIpcServerSpanAttributesMatcher(
"hbase.test.pb.TestProtobufRpcProto", "error"));
assertRemoteSpan();
assertFalse("no spans provided", traceRule.getSpans().isEmpty());
assertThat(traceRule.getSpans(), everyItem(allOf(
Expand Down

0 comments on commit 2e7bc1b

Please sign in to comment.