diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java index 93a9d8fd983f..27081a750fb0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java @@ -32,6 +32,7 @@ import java.util.function.Supplier; import org.apache.hadoop.hbase.client.AsyncConnectionImpl; import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.yetus.audience.InterfaceAudience; @@ -91,15 +92,32 @@ static void populateConnectionAttributes( * Static utility method that performs the primary logic of this builder. It is visible to other * classes in this package so that other builders can use this functionality as a mix-in. * @param attributes the attributes map to be populated. - * @param conn the source of attribute values. + * @param conn the source of connection attribute values. */ static void populateConnectionAttributes( final Map, Object> attributes, final AsyncConnectionImpl conn + ) { + final Supplier connStringSupplier = () -> conn.getConnectionRegistry() + .getConnectionString(); + populateConnectionAttributes(attributes, connStringSupplier, conn::getUser); + } + + /** + * Static utility method that performs the primary logic of this builder. It is visible to other + * classes in this package so that other builders can use this functionality as a mix-in. + * @param attributes the attributes map to be populated. + * @param connectionStringSupplier the source of the {@code db.connection_string} attribute value. + * @param userSupplier the source of the {@code db.user} attribute value. + */ + static void populateConnectionAttributes( + final Map, Object> attributes, + final Supplier connectionStringSupplier, + final Supplier userSupplier ) { attributes.put(DB_SYSTEM, DB_SYSTEM_VALUE); - attributes.put(DB_CONNECTION_STRING, conn.getConnectionRegistry().getConnectionString()); - attributes.put(DB_USER, Optional.ofNullable(conn.getUser()) + attributes.put(DB_CONNECTION_STRING, connectionStringSupplier.get()); + attributes.put(DB_USER, Optional.ofNullable(userSupplier.get()) .map(Object::toString) .orElse(null)); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/IpcClientSpanBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/IpcClientSpanBuilder.java new file mode 100644 index 000000000000..07edbcb2807d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/IpcClientSpanBuilder.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client.trace; + +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NET_PEER_NAME; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NET_PEER_PORT; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_METHOD; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SERVICE; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SYSTEM; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanKind; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RpcSystem; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; + +/** + * Construct {@link Span} instances originating from the client side of an IPC. + * + * @see Semantic conventions for RPC spans + */ +@InterfaceAudience.Private +public class IpcClientSpanBuilder implements Supplier { + + private String name; + private final Map, Object> attributes = new HashMap<>(); + + @Override + public Span get() { + return build(); + } + + public IpcClientSpanBuilder setMethodDescriptor(final Descriptors.MethodDescriptor md) { + final String packageAndService = getRpcPackageAndService(md.getService()); + final String method = getRpcName(md); + this.name = buildSpanName(packageAndService, method); + populateMethodDescriptorAttributes(attributes, md); + return this; + } + + public IpcClientSpanBuilder setRemoteAddress(final Address remoteAddress) { + attributes.put(NET_PEER_NAME, remoteAddress.getHostName()); + attributes.put(NET_PEER_PORT, (long) remoteAddress.getPort()); + return this; + } + + @SuppressWarnings("unchecked") + public Span build() { + final SpanBuilder builder = TraceUtil.getGlobalTracer() + .spanBuilder(name) + // TODO: what about clients embedded in Master/RegionServer/Gateways/&c? + .setSpanKind(SpanKind.CLIENT); + attributes.forEach((k, v) -> builder.setAttribute((AttributeKey) k, v)); + return builder.startSpan(); + } + + /** + * Static utility method that performs the primary logic of this builder. It is visible to other + * classes in this package so that other builders can use this functionality as a mix-in. + * @param attributes the attributes map to be populated. + * @param md the source of the RPC attribute values. + */ + static void populateMethodDescriptorAttributes( + final Map, Object> attributes, + final Descriptors.MethodDescriptor md + ) { + final String packageAndService = getRpcPackageAndService(md.getService()); + final String method = getRpcName(md); + attributes.put(RPC_SYSTEM, RpcSystem.HBASE_RPC.name()); + attributes.put(RPC_SERVICE, packageAndService); + attributes.put(RPC_METHOD, method); + } + + /** + * Retrieve the combined {@code $package.$service} value from {@code sd}. + */ + public static String getRpcPackageAndService(final Descriptors.ServiceDescriptor sd) { + // it happens that `getFullName` returns a string in the $package.$service format required by + // the otel RPC specification. Use it for now; might have to parse the value in the future. + return sd.getFullName(); + } + + /** + * Retrieve the {@code $method} value from {@code md}. + */ + public static String getRpcName(final Descriptors.MethodDescriptor md) { + return md.getName(); + } + + /** + * Construct an RPC span name. + */ + public static String buildSpanName(final String packageAndService, final String method) { + return packageAndService + "/" + method; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index b502142480a1..accb83291812 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -20,10 +20,6 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; -import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REMOTE_HOST_KEY; -import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REMOTE_PORT_KEY; -import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_METHOD_KEY; -import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SERVICE_KEY; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.StatusCode; @@ -40,6 +36,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.MetricsConnection; +import org.apache.hadoop.hbase.client.trace.IpcClientSpanBuilder; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodec; import org.apache.hadoop.hbase.net.Address; @@ -399,11 +396,10 @@ private void onCallFinished(Call call, HBaseRpcController hrc, Address addr, private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, final Message param, Message returnType, final User ticket, final Address addr, final RpcCallback callback) { - Span span = TraceUtil.createClientSpan("RpcClient.callMethod") - .setAttribute(RPC_SERVICE_KEY, md.getService().getName()) - .setAttribute(RPC_METHOD_KEY, md.getName()) - .setAttribute(REMOTE_HOST_KEY, addr.getHostName()) - .setAttribute(REMOTE_PORT_KEY, addr.getPort()); + Span span = new IpcClientSpanBuilder() + .setMethodDescriptor(md) + .setRemoteAddress(addr) + .build(); try (Scope scope = span.makeCurrent()) { final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); cs.setStartTime(EnvironmentEdgeManager.currentTime()); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java index 2839e7c597c7..9697d69f18ba 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java @@ -24,6 +24,7 @@ import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.StatusData; +import java.time.Duration; import org.hamcrest.Description; import org.hamcrest.FeatureMatcher; import org.hamcrest.Matcher; @@ -46,6 +47,16 @@ public static Matcher hasAttributes(Matcher matcher) { }; } + public static Matcher hasDuration(Matcher matcher) { + return new FeatureMatcher( + matcher, "SpanData having duration that ", "duration") { + @Override + protected Duration featureValueOf(SpanData item) { + return Duration.ofNanos(item.getEndEpochNanos() - item.getStartEpochNanos()); + } + }; + } + public static Matcher hasEnded() { return new TypeSafeMatcher() { @Override protected boolean matchesSafely(SpanData item) { @@ -92,4 +103,17 @@ public static Matcher hasStatusWithCode(StatusCode statusCode) { } }; } + + public static Matcher hasTraceId(String traceId) { + return hasTraceId(is(equalTo(traceId))); + } + + public static Matcher hasTraceId(Matcher matcher) { + return new FeatureMatcher( + matcher, "SpanData with a traceId that ", "traceId") { + @Override protected String featureValueOf(SpanData item) { + return item.getTraceId(); + } + }; + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java index 1a74fdcd65a2..1689a44bee9d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java @@ -44,14 +44,13 @@ public final class HBaseSemanticAttributes { AttributeKey.stringArrayKey("db.hbase.container_operations"); public static final AttributeKey> REGION_NAMES_KEY = AttributeKey.stringArrayKey("db.hbase.regions"); - public static final AttributeKey RPC_SERVICE_KEY = - AttributeKey.stringKey("db.hbase.rpc.service"); - public static final AttributeKey RPC_METHOD_KEY = - AttributeKey.stringKey("db.hbase.rpc.method"); + public static final AttributeKey RPC_SYSTEM = SemanticAttributes.RPC_SYSTEM; + public static final AttributeKey RPC_SERVICE = SemanticAttributes.RPC_SERVICE; + public static final AttributeKey RPC_METHOD = SemanticAttributes.RPC_METHOD; public static final AttributeKey SERVER_NAME_KEY = AttributeKey.stringKey("db.hbase.server.name"); - public static final AttributeKey REMOTE_HOST_KEY = SemanticAttributes.NET_PEER_NAME; - public static final AttributeKey REMOTE_PORT_KEY = SemanticAttributes.NET_PEER_PORT; + public static final AttributeKey NET_PEER_NAME = SemanticAttributes.NET_PEER_NAME; + public static final AttributeKey NET_PEER_PORT = SemanticAttributes.NET_PEER_PORT; public static final AttributeKey ROW_LOCK_READ_LOCK_KEY = AttributeKey.booleanKey("db.hbase.rowlock.readlock"); public static final AttributeKey WAL_IMPL = AttributeKey.stringKey("db.hbase.wal.impl"); @@ -74,5 +73,13 @@ public enum Operation { SCAN, } + /** + * These are values used with {@link #RPC_SYSTEM}. Only a single value for now; more to come as + * we add tracing over our gateway components. + */ + public enum RpcSystem { + HBASE_RPC, + } + private HBaseSemanticAttributes() { } } diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index 50ca081ff920..48350c03ab0b 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -255,6 +255,12 @@ org.apache.hbase hbase-common + + org.apache.hbase + hbase-client + test-jar + test + org.apache.hbase hbase-http diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index ef37247f412f..9607a71f6bed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -17,11 +17,8 @@ */ package org.apache.hadoop.hbase.ipc; -import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_METHOD_KEY; -import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SERVICE_KEY; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; @@ -32,6 +29,7 @@ import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.server.trace.IpcServerSpanBuilder; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; @@ -90,14 +88,6 @@ private void cleanup() { this.rpcServer = null; } - private String getServiceName() { - return call.getService() != null ? call.getService().getDescriptorForType().getName() : ""; - } - - private String getMethodName() { - return call.getMethod() != null ? call.getMethod().getName() : ""; - } - public void run() { try { if (call.disconnectSince() >= 0) { @@ -122,12 +112,7 @@ public void run() { String error = null; Pair resultPair = null; RpcServer.CurCall.set(call); - String serviceName = getServiceName(); - String methodName = getMethodName(); - Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcServer.callMethod") - .setParent(Context.current().with(((ServerCall) call).getSpan())).startSpan() - .setAttribute(RPC_SERVICE_KEY, serviceName) - .setAttribute(RPC_METHOD_KEY, methodName); + Span span = new IpcServerSpanBuilder(call).build(); try (Scope traceScope = span.makeCurrent()) { if (!this.rpcServer.isStarted()) { InetSocketAddress address = rpcServer.getListenerAddress(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/server/trace/IpcServerSpanBuilder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/server/trace/IpcServerSpanBuilder.java new file mode 100644 index 000000000000..259268b19f62 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/server/trace/IpcServerSpanBuilder.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.server.trace; + +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_METHOD; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SERVICE; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SYSTEM; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; +import org.apache.hadoop.hbase.client.trace.IpcClientSpanBuilder; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.ipc.ServerCall; +import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RpcSystem; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; + +/** + * Construct {@link Span} instances originating from the server side of an IPC. + * + * @see Semantic conventions for RPC spans + */ +@InterfaceAudience.Private +public class IpcServerSpanBuilder implements Supplier { + + private final RpcCall rpcCall; + private String name; + private final Map, Object> attributes = new HashMap<>(); + + public IpcServerSpanBuilder(final RpcCall rpcCall) { + this.rpcCall = rpcCall; + final String packageAndService = Optional.ofNullable(rpcCall.getService()) + .map(BlockingService::getDescriptorForType) + .map(IpcClientSpanBuilder::getRpcPackageAndService) + .orElse(""); + final String method = Optional.ofNullable(rpcCall.getMethod()) + .map(IpcClientSpanBuilder::getRpcName) + .orElse(""); + setName(IpcClientSpanBuilder.buildSpanName(packageAndService, method)); + addAttribute(RPC_SYSTEM, RpcSystem.HBASE_RPC.name()); + addAttribute(RPC_SERVICE, packageAndService); + addAttribute(RPC_METHOD, method); + } + + @Override + public Span get() { + return build(); + } + + public IpcServerSpanBuilder setName(final String name) { + this.name = name; + return this; + } + + public IpcServerSpanBuilder addAttribute(final AttributeKey key, T value) { + attributes.put(key, value); + return this; + } + + @SuppressWarnings("unchecked") + public Span build() { + final SpanBuilder builder = TraceUtil.getGlobalTracer() + .spanBuilder(name) + .setSpanKind(SpanKind.SERVER); + attributes.forEach((k, v) -> builder.setAttribute((AttributeKey) k, v)); + return builder.setParent(Context.current().with(((ServerCall) rpcCall).getSpan())) + .startSpan(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 030b052cb1e4..1309ef7a77d3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -17,11 +17,21 @@ */ package org.apache.hadoop.hbase.ipc; +import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasDuration; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasTraceId; import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -33,16 +43,16 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.internal.verification.VerificationModeFactory.times; - +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; @@ -50,22 +60,21 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MatcherPredicate; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; -import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.util.StringUtils; +import org.hamcrest.Matcher; import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; - import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto; @@ -442,74 +451,124 @@ public void testAsyncTimeout() throws IOException { } } - private void assertSameTraceId() { - String traceId = traceRule.getSpans().get(0).getTraceId(); - for (SpanData data : traceRule.getSpans()) { - // assert we are the same trace - assertEquals(traceId, data.getTraceId()); - } + private SpanData waitSpan(Matcher matcher) { + Waiter.waitFor(CONF, 1000, new MatcherPredicate<>( + () -> traceRule.getSpans(), hasItem(matcher))); + return traceRule.getSpans() + .stream() + .filter(matcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); } - private SpanData waitSpan(String name) { - Waiter.waitFor(CONF, 1000, - () -> traceRule.getSpans().stream().map(SpanData::getName).anyMatch(s -> s.equals(name))); - return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get(); + private static String buildIpcSpanName(final String packageAndService, final String methodName) { + return packageAndService + "/" + methodName; } - private void assertRpcAttribute(SpanData data, String methodName, InetSocketAddress addr, - SpanKind kind) { - assertEquals(SERVICE.getDescriptorForType().getName(), - data.getAttributes().get(HBaseSemanticAttributes.RPC_SERVICE_KEY)); - assertEquals(methodName, data.getAttributes().get(HBaseSemanticAttributes.RPC_METHOD_KEY)); - if (addr != null) { - assertEquals( - addr.getHostName(), - data.getAttributes().get(HBaseSemanticAttributes.REMOTE_HOST_KEY)); - assertEquals( - addr.getPort(), - data.getAttributes().get(HBaseSemanticAttributes.REMOTE_PORT_KEY).intValue()); - } - assertEquals(kind, data.getKind()); + private static Matcher buildIpcClientSpanMatcher( + final String packageAndService, + final String methodName + ) { + return allOf( + hasName(buildIpcSpanName(packageAndService, methodName)), + hasKind(SpanKind.CLIENT) + ); + } + + private static Matcher buildIpcServerSpanMatcher( + final String packageAndService, + final String methodName + ) { + return allOf( + hasName(buildIpcSpanName(packageAndService, methodName)), + hasKind(SpanKind.SERVER) + ); + } + + private static Matcher buildIpcClientSpanAttributesMatcher( + final String packageAndService, + final String methodName, + final InetSocketAddress isa + ) { + return hasAttributes(allOf( + containsEntry("rpc.system", "HBASE_RPC"), + containsEntry("rpc.service", packageAndService), + containsEntry("rpc.method", methodName), + containsEntry("net.peer.name", isa.getHostName()), + containsEntry(AttributeKey.longKey("net.peer.port"), (long) isa.getPort()))); + } + + private static Matcher buildIpcServerSpanAttributesMatcher( + final String packageAndService, + final String methodName + ) { + return hasAttributes(allOf( + containsEntry("rpc.system", "HBASE_RPC"), + containsEntry("rpc.service", packageAndService), + containsEntry("rpc.method", methodName))); } private void assertRemoteSpan() { - SpanData data = waitSpan("RpcServer.process"); + SpanData data = waitSpan(hasName("RpcServer.process")); assertTrue(data.getParentSpanContext().isRemote()); assertEquals(SpanKind.SERVER, data.getKind()); } @Test - public void testTracing() throws IOException, ServiceException { + public void testTracingSuccessIpc() throws IOException, ServiceException { RpcServer rpcServer = createRpcServer(null, "testRpcServer", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), - new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); + new InetSocketAddress("localhost", 0), CONF, + new FifoRpcScheduler(CONF, 1)); try (AbstractRpcClient client = createRpcClient(CONF)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build()); - assertRpcAttribute(waitSpan("RpcClient.callMethod"), "pause", rpcServer.getListenerAddress(), - SpanKind.CLIENT); - assertRpcAttribute(waitSpan("RpcServer.callMethod"), "pause", null, SpanKind.INTERNAL); + // use the ISA from the running server so that we can get the port selected. + final InetSocketAddress isa = rpcServer.getListenerAddress(); + final SpanData pauseClientSpan = waitSpan(buildIpcClientSpanMatcher( + "hbase.test.pb.TestProtobufRpcProto", "pause")); + assertThat(pauseClientSpan, buildIpcClientSpanAttributesMatcher( + "hbase.test.pb.TestProtobufRpcProto", "pause", isa)); + final SpanData pauseServerSpan = waitSpan(buildIpcServerSpanMatcher( + "hbase.test.pb.TestProtobufRpcProto", "pause")); + assertThat(pauseServerSpan, buildIpcServerSpanAttributesMatcher( + "hbase.test.pb.TestProtobufRpcProto", "pause")); assertRemoteSpan(); - assertSameTraceId(); - for (SpanData data : traceRule.getSpans()) { - assertThat( - TimeUnit.NANOSECONDS.toMillis(data.getEndEpochNanos() - data.getStartEpochNanos()), - greaterThanOrEqualTo(100L)); - assertEquals(StatusCode.OK, data.getStatus().getStatusCode()); - } + assertFalse("no spans provided", traceRule.getSpans().isEmpty()); + assertThat(traceRule.getSpans(), everyItem(allOf( + hasStatusWithCode(StatusCode.OK), + hasTraceId(traceRule.getSpans().iterator().next().getTraceId()), + hasDuration(greaterThanOrEqualTo(Duration.ofMillis(100L)))))); + } + } - traceRule.clearSpans(); + @Test + public void testTracingErrorIpc() throws IOException { + RpcServer rpcServer = createRpcServer(null, "testRpcServer", + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress("localhost", 0), CONF, + new FifoRpcScheduler(CONF, 1)); + try (AbstractRpcClient client = createRpcClient(CONF)) { + rpcServer.start(); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); + // use the ISA from the running server so that we can get the port selected. assertThrows(ServiceException.class, () -> stub.error(null, EmptyRequestProto.getDefaultInstance())); - assertRpcAttribute(waitSpan("RpcClient.callMethod"), "error", rpcServer.getListenerAddress(), - SpanKind.CLIENT); - assertRpcAttribute(waitSpan("RpcServer.callMethod"), "error", null, SpanKind.INTERNAL); + final InetSocketAddress isa = rpcServer.getListenerAddress(); + final SpanData errorClientSpan = waitSpan(buildIpcClientSpanMatcher( + "hbase.test.pb.TestProtobufRpcProto", "error")); + assertThat(errorClientSpan, buildIpcClientSpanAttributesMatcher( + "hbase.test.pb.TestProtobufRpcProto", "error", isa)); + final SpanData errorServerSpan = waitSpan(buildIpcServerSpanMatcher( + "hbase.test.pb.TestProtobufRpcProto", "error")); + assertThat(errorServerSpan, buildIpcServerSpanAttributesMatcher( + "hbase.test.pb.TestProtobufRpcProto", "error")); assertRemoteSpan(); - assertSameTraceId(); - for (SpanData data : traceRule.getSpans()) { - assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode()); - } + assertFalse("no spans provided", traceRule.getSpans().isEmpty()); + assertThat(traceRule.getSpans(), everyItem(allOf( + hasStatusWithCode(StatusCode.ERROR), + hasTraceId(traceRule.getSpans().iterator().next().getTraceId())))); } } } diff --git a/pom.xml b/pom.xml index a09c39e3d69f..e1ab843dc2b9 100755 --- a/pom.xml +++ b/pom.xml @@ -1838,6 +1838,13 @@ org.apache.hbase ${project.version} + + hbase-client + org.apache.hbase + ${project.version} + test-jar + test + hbase-metrics-api org.apache.hbase