From 706be8a1e70ddec198366c169d216ac7afa8f0f5 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Wed, 5 Jan 2022 15:32:12 -0800 Subject: [PATCH 1/4] HBASE-26474 Implement connection-level attributes (#3952) Add support for `db.system`, `db.connection_string`, `db.user`. Signed-off-by: Duo Zhang --- .../AbstractRpcBasedConnectionRegistry.java | 5 + .../hbase/client/AsyncConnectionImpl.java | 10 +- .../hbase/client/AsyncRegionLocator.java | 69 +++++++---- .../hbase/client/ClusterConnection.java | 14 ++- .../client/ConnectionImplementation.java | 12 +- .../hbase/client/ConnectionRegistry.java | 9 +- .../hadoop/hbase/client/HRegionLocator.java | 100 +++++++++++----- .../apache/hadoop/hbase/client/HTable.java | 70 ++++++----- .../hadoop/hbase/client/MasterRegistry.java | 12 ++ .../hbase/client/RawAsyncTableImpl.java | 2 +- .../hbase/client/RpcConnectionRegistry.java | 22 +++- .../hbase/client/ZKConnectionRegistry.java | 7 ++ .../client/trace/ConnectionSpanBuilder.java | 107 +++++++++++++++++ .../trace/TableOperationSpanBuilder.java | 25 ++-- .../hbase/client/trace/TableSpanBuilder.java | 99 ++++++++++++++++ .../client/DoNothingConnectionRegistry.java | 5 + .../client/TestAsyncRegionLocatorTracing.java | 112 ++++++++++++------ .../hbase/client/TestAsyncTableTracing.java | 37 ++---- .../hbase/client/TestHTableTracing.java | 8 +- .../client/TestRegionLocatorTracing.java | 64 ++++++++-- .../hadoop/hbase/client/TestTracingBase.java | 35 ++++++ .../trace/hamcrest/AttributesMatchers.java | 12 ++ .../client/trace/hamcrest/TraceTestUtil.java | 64 ++++++++++ .../hbase/trace/HBaseSemanticAttributes.java | 5 + .../apache/hadoop/hbase/trace/TraceUtil.java | 12 -- .../hbase/client/RegionServerRegistry.java | 5 + 26 files changed, 737 insertions(+), 185 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableSpanBuilder.java create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/TraceTestUtil.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java index 54138d30516c..0ee374d42c0f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java @@ -272,6 +272,11 @@ public CompletableFuture getActiveMaster() { getClass().getSimpleName() + ".getActiveMaster"); } + @Override + public String getConnectionString() { + return "unimplemented"; + } + @Override public void close() { trace(() -> { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index e7a198cd5b0c..d458c6cb66ec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -71,7 +71,7 @@ * The implementation of AsyncConnection. */ @InterfaceAudience.Private -class AsyncConnectionImpl implements AsyncConnection { +public class AsyncConnectionImpl implements AsyncConnection { private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class); @@ -191,6 +191,14 @@ synchronized ChoreService getChoreService() { return choreService; } + public User getUser() { + return user; + } + + public ConnectionRegistry getConnectionRegistry() { + return registry; + } + @Override public Configuration getConfiguration() { return conf; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java index 7e275a86356f..09cae3571b1a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,24 +20,27 @@ import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY; import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY; -import static org.apache.hadoop.hbase.trace.TraceUtil.createSpan; -import static org.apache.hadoop.hbase.trace.TraceUtil.createTableSpan; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.context.Scope; -import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.trace.ConnectionSpanBuilder; +import org.apache.hadoop.hbase.client.trace.TableSpanBuilder; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; @@ -95,9 +98,12 @@ private boolean isMeta(TableName tableName) { return TableName.isMetaTableName(tableName); } - private CompletableFuture tracedLocationFuture(Supplier> action, - Function> getRegionNames, TableName tableName, String methodName) { - Span span = createTableSpan("AsyncRegionLocator." + methodName, tableName); + private CompletableFuture tracedLocationFuture( + Supplier> action, + Function> getRegionNames, + Supplier spanSupplier + ) { + final Span span = spanSupplier.get(); try (Scope scope = span.makeCurrent()) { CompletableFuture future = action.get(); FutureUtils.addListener(future, (resp, error) -> { @@ -116,18 +122,30 @@ private CompletableFuture tracedLocationFuture(Supplier getRegionName(RegionLocations locs) { - List names = new ArrayList<>(); - for (HRegionLocation loc : locs.getRegionLocations()) { - if (loc != null) { - names.add(loc.getRegion().getRegionNameAsString()); - } + static List getRegionNames(RegionLocations locs) { + if (locs == null || locs.getRegionLocations() == null) { + return Collections.emptyList(); } - return names; + return Arrays.stream(locs.getRegionLocations()) + .filter(Objects::nonNull) + .map(HRegionLocation::getRegion) + .map(RegionInfo::getRegionNameAsString) + .collect(Collectors.toList()); + } + + static List getRegionNames(HRegionLocation location) { + return Optional.ofNullable(location) + .map(HRegionLocation::getRegion) + .map(RegionInfo::getRegionNameAsString) + .map(Collections::singletonList) + .orElseGet(Collections::emptyList); } CompletableFuture getRegionLocations(TableName tableName, byte[] row, RegionLocateType type, boolean reload, long timeoutNs) { + final Supplier supplier = new TableSpanBuilder(conn) + .setName("AsyncRegionLocator.getRegionLocations") + .setTableName(tableName); return tracedLocationFuture(() -> { CompletableFuture future = isMeta(tableName) ? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) : @@ -137,11 +155,14 @@ CompletableFuture getRegionLocations(TableName tableName, byte[ () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + "ms) waiting for region locations for " + tableName + ", row='" + Bytes.toStringBinary(row) + "'"); - }, this::getRegionName, tableName, "getRegionLocations"); + }, AsyncRegionLocator::getRegionNames, supplier); } CompletableFuture getRegionLocation(TableName tableName, byte[] row, int replicaId, RegionLocateType type, boolean reload, long timeoutNs) { + final Supplier supplier = new TableSpanBuilder(conn) + .setName("AsyncRegionLocator.getRegionLocation") + .setTableName(tableName); return tracedLocationFuture(() -> { // meta region can not be split right now so we always call the same method. // Change it later if the meta table can have more than one regions. @@ -172,8 +193,7 @@ CompletableFuture getRegionLocation(TableName tableName, byte[] () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + "ms) waiting for region location for " + tableName + ", row='" + Bytes.toStringBinary(row) + "', replicaId=" + replicaId); - }, loc -> Arrays.asList(loc.getRegion().getRegionNameAsString()), tableName, - "getRegionLocation"); + }, AsyncRegionLocator::getRegionNames, supplier); } CompletableFuture getRegionLocation(TableName tableName, byte[] row, @@ -201,6 +221,9 @@ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { } void clearCache(TableName tableName) { + Supplier supplier = new TableSpanBuilder(conn) + .setName("AsyncRegionLocator.clearCache") + .setTableName(tableName); TraceUtil.trace(() -> { LOG.debug("Clear meta cache for {}", tableName); if (tableName.equals(META_TABLE_NAME)) { @@ -208,24 +231,28 @@ void clearCache(TableName tableName) { } else { nonMetaRegionLocator.clearCache(tableName); } - }, () -> createTableSpan("AsyncRegionLocator.clearCache", tableName)); + }, supplier); } void clearCache(ServerName serverName) { + Supplier supplier = new ConnectionSpanBuilder(conn) + .setName("AsyncRegionLocator.clearCache") + .addAttribute(SERVER_NAME_KEY, serverName.getServerName()); TraceUtil.trace(() -> { LOG.debug("Clear meta cache for {}", serverName); metaRegionLocator.clearCache(serverName); nonMetaRegionLocator.clearCache(serverName); conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer); - }, () -> createSpan("AsyncRegionLocator.clearCache").setAttribute(SERVER_NAME_KEY, - serverName.getServerName())); + }, supplier); } void clearCache() { + Supplier supplier = new ConnectionSpanBuilder(conn) + .setName("AsyncRegionLocator.clearCache"); TraceUtil.trace(() -> { metaRegionLocator.clearCache(); nonMetaRegionLocator.clearCache(); - }, "AsyncRegionLocator.clearCache"); + }, supplier); } AsyncNonMetaRegionLocator getNonMetaRegionLocator() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index 760225967b07..921cc9f3ff65 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.security.User; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; @@ -175,8 +176,7 @@ void updateCachedLocations(TableName tableName, byte[] regionName, byte[] rowkey * question * @throws IOException if a remote or network exception occurs */ - HRegionLocation locateRegion(final byte[] regionName) - throws IOException; + HRegionLocation locateRegion(final byte[] regionName) throws IOException; /** * Gets the locations of all regions in the specified table, tableName. @@ -335,4 +335,14 @@ List getLiveRegionServers(Supplier masterAddrTracker, in * Get the bootstrap node list of another region server. */ List getAllBootstrapNodes(ServerName regionServer) throws IOException; + + /** + * Get the {@link User} associated with this connection. Maybe be {@code null}. + */ + User getUser(); + + /** + * Get the {@link ConnectionRegistry} used to orient this cluster. + */ + ConnectionRegistry getConnectionRegistry(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 5fec87ef4590..04ca5ee9ed53 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -166,7 +166,7 @@ value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION", justification="Access to the conncurrent hash map is under a lock so should be fine.") @InterfaceAudience.Private -class ConnectionImplementation implements ClusterConnection, Closeable { +public class ConnectionImplementation implements ClusterConnection, Closeable { public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server"; private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class); @@ -513,6 +513,16 @@ public MetricsConnection getConnectionMetrics() { return this.metrics; } + @Override + public User getUser() { + return user; + } + + @Override + public ConnectionRegistry getConnectionRegistry() { + return registry; + } + private ThreadPoolExecutor getBatchPool() { if (batchPool == null) { synchronized (this) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java index cd22d7861b4c..975d8df71808 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java @@ -29,7 +29,7 @@ * Internal use only. */ @InterfaceAudience.Private -interface ConnectionRegistry extends Closeable { +public interface ConnectionRegistry extends Closeable { /** * Get the location of meta region(s). @@ -48,6 +48,13 @@ interface ConnectionRegistry extends Closeable { */ CompletableFuture getActiveMaster(); + /** + * Return the connection string associated with this registry instance. This value is + * informational, used for annotating traces. Values returned may not be valid for establishing a + * working cluster connection. + */ + String getConnectionString(); + /** * Closes this instance and releases any system resources associated with it */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java index 1558aa8524e1..6b56b05e288e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java @@ -18,18 +18,26 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; +import java.util.Objects; +import java.util.function.Function; +import java.util.function.Supplier; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.trace.TableSpanBuilder; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; /** * An implementation of {@link RegionLocator}. Used to view region location information for a single @@ -62,41 +70,47 @@ public void close() throws IOException { @Override public HRegionLocation getRegionLocation(byte[] row, int replicaId, boolean reload) throws IOException { - return TraceUtil.trace(() -> connection.locateRegion(tableName, row, !reload, true, replicaId) - .getRegionLocation(replicaId), () -> TraceUtil - .createTableSpan(getClass().getSimpleName() + ".getRegionLocation", tableName)); + final Supplier supplier = new TableSpanBuilder(connection) + .setName("HRegionLocator.getRegionLocation") + .setTableName(tableName); + return tracedLocationFuture( + () -> connection.locateRegion(tableName, row, !reload, true, replicaId) + .getRegionLocation(replicaId), + AsyncRegionLocator::getRegionNames, + supplier); } @Override public List getRegionLocations(byte[] row, boolean reload) throws IOException { - return TraceUtil.trace(() -> { - RegionLocations locs = - connection.locateRegion(tableName, row, !reload, true, RegionInfo.DEFAULT_REPLICA_ID); - return Arrays.asList(locs.getRegionLocations()); - }, () -> TraceUtil - .createTableSpan(getClass().getSimpleName() + ".getRegionLocations", tableName)); + final Supplier supplier = new TableSpanBuilder(connection) + .setName("HRegionLocator.getRegionLocations") + .setTableName(tableName); + final RegionLocations locs = tracedLocationFuture( + () -> connection.locateRegion(tableName, row, !reload, true, + RegionInfo.DEFAULT_REPLICA_ID), + AsyncRegionLocator::getRegionNames, supplier); + return Arrays.asList(locs.getRegionLocations()); } @Override public List getAllRegionLocations() throws IOException { - return TraceUtil.trace(() -> { - ArrayList regions = new ArrayList<>(); - for (RegionLocations locations : listRegionLocations()) { - for (HRegionLocation location : locations.getRegionLocations()) { - regions.add(location); - } - connection.cacheLocation(tableName, locations); - } - return regions; - }, () -> TraceUtil - .createTableSpan(getClass().getSimpleName() + ".getAllRegionLocations", tableName)); + final Supplier supplier = new TableSpanBuilder(connection) + .setName("HRegionLocator.getAllRegionLocations") + .setTableName(tableName); + final RegionLocations locs = tracedLocationFuture(() -> { + final RegionLocations rlocs = listRegionLocations(); + connection.cacheLocation(tableName, rlocs); + return rlocs; + }, AsyncRegionLocator::getRegionNames, supplier); + return Arrays.asList(locs.getRegionLocations()); } @Override public void clearRegionLocationCache() { - TraceUtil.trace(() -> - connection.clearRegionCache(tableName), () -> TraceUtil - .createTableSpan(this.getClass().getSimpleName() + ".clearRegionLocationCache", tableName)); + final Supplier supplier = new TableSpanBuilder(connection) + .setName("HRegionLocator.clearRegionLocationCache") + .setTableName(tableName); + TraceUtil.trace(() -> connection.clearRegionCache(tableName), supplier); } @Override @@ -104,10 +118,9 @@ public TableName getName() { return this.tableName; } - private List listRegionLocations() throws IOException { + private RegionLocations listRegionLocations() throws IOException { if (TableName.isMetaTableName(tableName)) { - return Collections - .singletonList(connection.locateRegion(tableName, HConstants.EMPTY_START_ROW, false, true)); + return connection.locateRegion(tableName, HConstants.EMPTY_START_ROW, false, true); } final List regions = new ArrayList<>(); MetaTableAccessor.Visitor visitor = new MetaTableAccessor.TableVisitorBase(tableName) { @@ -122,6 +135,37 @@ public boolean visitInternal(Result result) throws IOException { } }; MetaTableAccessor.scanMetaForTableRegions(connection, visitor, tableName); - return regions; + return consolidate(regions); + } + + private static RegionLocations consolidate(final List locations) { + final HRegionLocation[] consolidated = locations.stream() + .filter(Objects::nonNull) + .flatMap(locs -> Arrays.stream(locs.getRegionLocations())) + .filter(Objects::nonNull) + .toArray(HRegionLocation[]::new); + return new RegionLocations(consolidated); + } + + private T tracedLocationFuture( + TraceUtil.IOExceptionCallable action, + Function> getRegionNames, + Supplier spanSupplier + ) throws IOException { + final Span span = spanSupplier.get(); + try (Scope ignored = span.makeCurrent()) { + final T result = action.call(); + final List regionNames = getRegionNames.apply(result); + if (!CollectionUtils.isEmpty(regionNames)) { + span.setAttribute(REGION_NAMES_KEY, regionNames); + } + span.setStatus(StatusCode.OK); + span.end(); + return result; + } catch (IOException e) { + TraceUtil.setError(span, e); + span.end(); + throw e; + } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index f72b0651842d..2f870b7fd3b7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -29,6 +29,7 @@ import com.google.protobuf.ServiceException; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Scope; import java.io.IOException; import java.io.InterruptedIOException; @@ -54,6 +55,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; +import org.apache.hadoop.hbase.client.trace.TableSpanBuilder; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; @@ -359,7 +361,7 @@ public ResultScanner getScanner(byte [] family, byte [] qualifier) @Override public Result get(final Get get) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(get); return TraceUtil.trace(() -> get(get, get.isCheckExistenceOnly()), supplier); @@ -402,7 +404,7 @@ protected Result rpcCall() throws Exception { @Override public Result[] get(List gets) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.BATCH); return TraceUtil.trace(() -> { @@ -429,7 +431,7 @@ public Result[] get(List gets) throws IOException { @Override public void batch(final List actions, final Object[] results) throws InterruptedException, IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.BATCH); TraceUtil.traceWithIOException(() -> { @@ -468,7 +470,7 @@ public void batch(final List actions, final Object[] results, int .setOperationTimeout(operationTimeoutMs) .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) .build(); - final Span span = new TableOperationSpanBuilder() + final Span span = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.BATCH) .build(); @@ -507,7 +509,7 @@ public static void doBatchWithCallback(List actions, Object[] .setRpcTimeout(writeTimeout) .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) .build(); - final Span span = new TableOperationSpanBuilder() + final Span span = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.BATCH) .build(); @@ -525,7 +527,7 @@ public static void doBatchWithCallback(List actions, Object[] @Override public void delete(final Delete delete) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(delete); TraceUtil.traceWithIOException(() -> { @@ -547,7 +549,7 @@ protected Void rpcCall() throws Exception { @Override public void delete(final List deletes) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.BATCH); TraceUtil.traceWithIOException(() -> { @@ -573,7 +575,7 @@ public void delete(final List deletes) throws IOException { @Override public void put(final Put put) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(put); TraceUtil.traceWithIOException(() -> { @@ -596,7 +598,7 @@ protected Void rpcCall() throws Exception { @Override public void put(final List puts) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.BATCH); TraceUtil.traceWithIOException(() -> { @@ -614,7 +616,7 @@ public void put(final List puts) throws IOException { @Override public Result mutateRow(final RowMutations rm) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.BATCH); return TraceUtil.trace(() -> { @@ -670,7 +672,7 @@ private long getNonce() { @Override public Result append(final Append append) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(append); return TraceUtil.trace(() -> { @@ -697,7 +699,7 @@ protected Result rpcCall() throws Exception { @Override public Result increment(final Increment increment) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(increment); return TraceUtil.trace(() -> { @@ -731,7 +733,7 @@ public long incrementColumnValue(final byte [] row, final byte [] family, public long incrementColumnValue(final byte [] row, final byte [] family, final byte [] qualifier, final long amount, final Durability durability) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.INCREMENT); return TraceUtil.trace(() -> { @@ -769,7 +771,7 @@ protected Long rpcCall() throws Exception { @Deprecated public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final byte [] value, final Put put) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace( @@ -782,7 +784,7 @@ public boolean checkAndPut(final byte [] row, final byte [] family, final byte [ @Deprecated public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final Put put) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace( @@ -795,7 +797,7 @@ public boolean checkAndPut(final byte [] row, final byte [] family, final byte [ @Deprecated public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOperator op, final byte [] value, final Put put) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace( @@ -807,7 +809,7 @@ public boolean checkAndPut(final byte [] row, final byte [] family, final byte [ @Deprecated public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, final byte[] value, final Delete delete) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace( @@ -820,7 +822,7 @@ public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[ @Deprecated public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace( @@ -833,7 +835,7 @@ public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[ @Deprecated public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value, final Delete delete) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace( @@ -910,7 +912,7 @@ protected MultiResponse rpcCall() throws Exception { @Deprecated public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final RowMutations rm) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace( @@ -923,7 +925,7 @@ public boolean checkAndMutate(final byte [] row, final byte [] family, final byt @Deprecated public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace( @@ -933,7 +935,7 @@ public boolean checkAndMutate(final byte [] row, final byte [] family, final byt @Override public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(checkAndMutate); return TraceUtil.trace(() -> { @@ -982,7 +984,7 @@ protected CheckAndMutateResult rpcCall() throws Exception { @Override public List checkAndMutate(List checkAndMutates) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.BATCH); return TraceUtil.trace(() -> { @@ -1040,7 +1042,7 @@ private CompareOperator toCompareOperator(CompareOp compareOp) { @Override public boolean exists(final Get get) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(get); return TraceUtil.trace(() -> { @@ -1052,7 +1054,7 @@ public boolean exists(final Get get) throws IOException { @Override public boolean[] exists(List gets) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.BATCH); return TraceUtil.trace(() -> { @@ -1108,6 +1110,10 @@ public void processBatchCallback( @Override public void close() throws IOException { + final Supplier supplier = new TableSpanBuilder(connection) + .setName("HTable.close") + .setTableName(tableName) + .setSpanKind(SpanKind.INTERNAL); TraceUtil.traceWithIOException(() -> { if (this.closed) { return; @@ -1126,7 +1132,7 @@ public void close() throws IOException { } } this.closed = true; - }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".close", tableName)); + }, supplier); } // validate for well-formedness @@ -1456,7 +1462,7 @@ private void preCheck() { @Override public boolean thenPut(Put put) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace(() -> { @@ -1469,7 +1475,7 @@ public boolean thenPut(Put put) throws IOException { @Override public boolean thenDelete(Delete delete) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace(() -> { @@ -1481,7 +1487,7 @@ public boolean thenDelete(Delete delete) throws IOException { @Override public boolean thenMutate(RowMutations mutation) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace(() -> { @@ -1511,7 +1517,7 @@ public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { @Override public boolean thenPut(Put put) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace(() -> { @@ -1523,7 +1529,7 @@ public boolean thenPut(Put put) throws IOException { @Override public boolean thenDelete(Delete delete) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace( @@ -1533,7 +1539,7 @@ public boolean thenDelete(Delete delete) throws IOException { @Override public boolean thenMutate(RowMutations mutation) throws IOException { - final Supplier supplier = new TableOperationSpanBuilder() + final Supplier supplier = new TableOperationSpanBuilder(connection) .setTableName(tableName) .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace( diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java index 64e389cf35e2..05773d0b4195 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java @@ -87,9 +87,12 @@ public static Set parseMasterAddrs(Configuration conf) throws Unknow return masterAddrs; } + private final String connectionString; + MasterRegistry(Configuration conf) throws IOException { super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS, MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES); + connectionString = getConnectionString(conf); } @Override @@ -102,6 +105,15 @@ protected CompletableFuture> fetchEndpoints() { return getMasters(); } + @Override + public String getConnectionString() { + return connectionString; + } + + static String getConnectionString(Configuration conf) throws UnknownHostException { + return getMasterAddr(conf); + } + /** * Builds the default master address end point if it is not specified in the configuration. *

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 5ed5e7ef6709..c3cc1fb36f5c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -261,7 +261,7 @@ private CompletableFuture get(Get get, int replicaId) { } private TableOperationSpanBuilder newTableOperationSpanBuilder() { - return new TableOperationSpanBuilder().setTableName(tableName); + return new TableOperationSpanBuilder(conn).setTableName(tableName); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java index 731d6202b3ef..660d74e74c28 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.net.UnknownHostException; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -72,9 +73,23 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry { private static final char ADDRS_CONF_SEPARATOR = ','; + private final String connectionString; + RpcConnectionRegistry(Configuration conf) throws IOException { super(conf, HEDGED_REQS_FANOUT_KEY, INITIAL_REFRESH_DELAY_SECS, PERIODIC_REFRESH_INTERVAL_SECS, MIN_SECS_BETWEEN_REFRESHES); + connectionString = buildConnectionString(conf); + } + + private String buildConnectionString(Configuration conf) throws UnknownHostException { + final String configuredBootstrapNodes = conf.get(BOOTSTRAP_NODES); + if (StringUtils.isBlank(configuredBootstrapNodes)) { + return MasterRegistry.getConnectionString(conf); + } + return Splitter.on(ADDRS_CONF_SEPARATOR) + .trimResults() + .splitToStream(configuredBootstrapNodes) + .collect(Collectors.joining(String.valueOf(ADDRS_CONF_SEPARATOR))); } @Override @@ -91,6 +106,11 @@ protected Set getBootstrapNodes(Configuration conf) throws IOExcepti } } + @Override + public String getConnectionString() { + return connectionString; + } + private static Set transformServerNames(GetBootstrapNodesResponse resp) { return resp.getServerNameList().stream().map(ProtobufUtil::toServerName) .collect(Collectors.toSet()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java index 6e94afe00a79..abb98569bfd3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java @@ -247,6 +247,13 @@ public CompletableFuture getActiveMaster() { "ZKConnectionRegistry.getActiveMaster"); } + @Override + public String getConnectionString() { + final String serverList = zk.getConnectString(); + final String baseZNode = znodePaths.baseZNode; + return serverList + ":" + baseZNode; + } + @Override public void close() { zk.close(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java new file mode 100644 index 000000000000..afb34cad15c2 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client.trace; + +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_CONNECTION_STRING; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_SYSTEM; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_SYSTEM_VALUE; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_USER; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanKind; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; +import org.apache.hadoop.hbase.client.AsyncConnectionImpl; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.ConnectionImplementation; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Construct {@link Span} instances originating from the client side of a connection. + */ +@InterfaceAudience.Private +public class ConnectionSpanBuilder implements Supplier { + + private String name; + private final Map, Object> attributes = new HashMap<>(); + + public ConnectionSpanBuilder(final AsyncConnectionImpl conn) { + populateConnectionAttributes(attributes, conn); + } + + @Override + public Span get() { + return build(); + } + + public ConnectionSpanBuilder setName(final String name) { + this.name = name; + return this; + } + + public ConnectionSpanBuilder addAttribute(final AttributeKey key, T value) { + attributes.put(key, value); + return this; + } + + @SuppressWarnings("unchecked") + public Span build() { + final SpanBuilder builder = TraceUtil.getGlobalTracer() + .spanBuilder(name) + // TODO: what about clients embedded in Master/RegionServer/Gateways/&c? + .setSpanKind(SpanKind.CLIENT); + attributes.forEach((k, v) -> builder.setAttribute((AttributeKey) k, v)); + return builder.startSpan(); + } + + /** + * @see #populateConnectionAttributes(Map, AsyncConnectionImpl) + */ + static void populateConnectionAttributes( + final Map, Object> attributes, + final ClusterConnection conn + ) { + attributes.put(DB_SYSTEM, DB_SYSTEM_VALUE); + attributes.put(DB_CONNECTION_STRING, conn.getConnectionRegistry().getConnectionString()); + attributes.put(DB_USER, Optional.ofNullable(conn.getUser()) + .map(Object::toString) + .orElse(null)); + } + + /** + * Static utility method that performs the primary logic of this builder. It is visible to other + * classes in this package so that other builders can use this functionality as a mix-in. + * @param attributes the attributes map to be populated. + * @param conn the source of attribute values. + */ + static void populateConnectionAttributes( + final Map, Object> attributes, + final AsyncConnectionImpl conn + ) { + attributes.put(DB_SYSTEM, DB_SYSTEM_VALUE); + attributes.put(DB_CONNECTION_STRING, conn.getConnectionRegistry().getConnectionString()); + attributes.put(DB_USER, Optional.ofNullable(conn.getUser()) + .map(Object::toString) + .orElse(null)); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java index aaa53610321e..e8620de2a6c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java @@ -18,10 +18,7 @@ package org.apache.hadoop.hbase.client.trace; -import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_NAME; import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION; -import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY; -import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanBuilder; @@ -32,7 +29,10 @@ import java.util.function.Supplier; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.AsyncConnectionImpl; import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.ConnectionImplementation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -46,8 +46,8 @@ import org.apache.yetus.audience.InterfaceAudience; /** - * Construct {@link io.opentelemetry.api.trace.Span} instances originating from - * "table operations" -- the verbs in our public API that interact with data in tables. + * Construct {@link Span} instances originating from "table operations" -- the verbs in our public + * API that interact with data in tables. */ @InterfaceAudience.Private public class TableOperationSpanBuilder implements Supplier { @@ -60,7 +60,16 @@ public class TableOperationSpanBuilder implements Supplier { private TableName tableName; private final Map, Object> attributes = new HashMap<>(); - @Override public Span get() { + public TableOperationSpanBuilder(final ClusterConnection conn) { + ConnectionSpanBuilder.populateConnectionAttributes(attributes, conn); + } + + public TableOperationSpanBuilder(final AsyncConnectionImpl conn) { + ConnectionSpanBuilder.populateConnectionAttributes(attributes, conn); + } + + @Override + public Span get() { return build(); } @@ -84,9 +93,7 @@ public TableOperationSpanBuilder setOperation(final Operation operation) { public TableOperationSpanBuilder setTableName(final TableName tableName) { this.tableName = tableName; - attributes.put(NAMESPACE_KEY, tableName.getNamespaceAsString()); - attributes.put(DB_NAME, tableName.getNamespaceAsString()); - attributes.put(TABLE_KEY, tableName.getNameAsString()); + TableSpanBuilder.populateTableNameAttributes(attributes, tableName); return this; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableSpanBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableSpanBuilder.java new file mode 100644 index 000000000000..8973da66be76 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableSpanBuilder.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client.trace; + +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_NAME; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanKind; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncConnectionImpl; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Construct {@link Span} instances involving data tables. + */ +@InterfaceAudience.Private +public class TableSpanBuilder implements Supplier { + + private String name; + private SpanKind spanKind = SpanKind.CLIENT; + private final Map, Object> attributes = new HashMap<>(); + + public TableSpanBuilder(ClusterConnection conn) { + ConnectionSpanBuilder.populateConnectionAttributes(attributes, conn); + } + + public TableSpanBuilder(AsyncConnectionImpl conn) { + ConnectionSpanBuilder.populateConnectionAttributes(attributes, conn); + } + + @Override + public Span get() { + return build(); + } + + public TableSpanBuilder setName(final String name) { + this.name = name; + return this; + } + + public TableSpanBuilder setSpanKind(final SpanKind spanKind) { + this.spanKind = spanKind; + return this; + } + + public TableSpanBuilder setTableName(final TableName tableName) { + populateTableNameAttributes(attributes, tableName); + return this; + } + + @SuppressWarnings("unchecked") + public Span build() { + final SpanBuilder builder = TraceUtil.getGlobalTracer() + .spanBuilder(name) + // TODO: what about clients embedded in Master/RegionServer/Gateways/&c? + .setSpanKind(spanKind); + attributes.forEach((k, v) -> builder.setAttribute((AttributeKey) k, v)); + return builder.startSpan(); + } + + /** + * Static utility method that performs the primary logic of this builder. It is visible to other + * classes in this package so that other builders can use this functionality as a mix-in. + * @param attributes the attributes map to be populated. + * @param tableName the source of attribute values. + */ + static void populateTableNameAttributes( + final Map, Object> attributes, + final TableName tableName + ) { + attributes.put(NAMESPACE_KEY, tableName.getNamespaceAsString()); + attributes.put(DB_NAME, tableName.getNamespaceAsString()); + attributes.put(TABLE_KEY, tableName.getNameAsString()); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java index 4bd66877b1b4..3b792a5bd15f 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java @@ -47,6 +47,11 @@ public CompletableFuture getActiveMaster() { return CompletableFuture.completedFuture(null); } + @Override + public String getConnectionString() { + return "nothing"; + } + @Override public void close() { } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java index 4f4a29cab0d0..47e09f8113fe 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,13 +17,25 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.assertEquals; - +import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; +import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; +import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher; +import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasItem; +import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; -import java.util.List; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; @@ -31,6 +43,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MatcherPredicate; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -38,24 +51,26 @@ import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; +import org.hamcrest.Matcher; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.io.Closeables; @Category({ ClientTests.class, MediumTests.class }) public class TestAsyncRegionLocatorTracing { + private static final Logger LOG = LoggerFactory.getLogger(TestAsyncRegionLocatorTracing.class); @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestAsyncRegionLocatorTracing.class); - private static Configuration CONF = HBaseConfiguration.create(); + private static final Configuration CONF = HBaseConfiguration.create(); private AsyncConnectionImpl conn; @@ -89,16 +104,35 @@ public void tearDown() throws IOException { } private SpanData waitSpan(String name) { - Waiter.waitFor(CONF, 1000, - () -> traceRule.getSpans().stream().map(SpanData::getName).anyMatch(s -> s.equals(name))); - return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get(); + return waitSpan(hasName(name)); + } + + private SpanData waitSpan(Matcher matcher) { + Matcher spanLocator = allOf(matcher, hasEnded()); + try { + Waiter.waitFor(CONF, 1000, new MatcherPredicate<>( + "waiting for span", + () -> traceRule.getSpans(), hasItem(spanLocator))); + } catch (AssertionError e) { + LOG.error("AssertionError while waiting for matching span. Span reservoir contains: {}", + traceRule.getSpans()); + throw e; + } + return traceRule.getSpans() + .stream() + .filter(spanLocator::matches) + .findFirst() + .orElseThrow(AssertionError::new); } @Test public void testClearCache() { conn.getLocator().clearCache(); SpanData span = waitSpan("AsyncRegionLocator.clearCache"); - assertEquals(StatusCode.OK, span.getStatus().getStatusCode()); + assertThat(span, allOf( + hasStatusWithCode(StatusCode.OK), + hasKind(SpanKind.CLIENT), + buildConnectionAttributesMatcher(conn))); } @Test @@ -106,19 +140,22 @@ public void testClearCacheServerName() { ServerName sn = ServerName.valueOf("127.0.0.1", 12345, System.currentTimeMillis()); conn.getLocator().clearCache(sn); SpanData span = waitSpan("AsyncRegionLocator.clearCache"); - assertEquals(StatusCode.OK, span.getStatus().getStatusCode()); - assertEquals(sn.toString(), span.getAttributes().get(HBaseSemanticAttributes.SERVER_NAME_KEY)); + assertThat(span, allOf( + hasStatusWithCode(StatusCode.OK), + hasKind(SpanKind.CLIENT), + buildConnectionAttributesMatcher(conn), + hasAttributes(containsEntry("db.hbase.server.name", sn.getServerName())))); } @Test public void testClearCacheTableName() { conn.getLocator().clearCache(TableName.META_TABLE_NAME); SpanData span = waitSpan("AsyncRegionLocator.clearCache"); - assertEquals(StatusCode.OK, span.getStatus().getStatusCode()); - assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(), - span.getAttributes().get(HBaseSemanticAttributes.NAMESPACE_KEY)); - assertEquals(TableName.META_TABLE_NAME.getNameAsString(), - span.getAttributes().get(HBaseSemanticAttributes.TABLE_KEY)); + assertThat(span, allOf( + hasStatusWithCode(StatusCode.OK), + hasKind(SpanKind.CLIENT), + buildConnectionAttributesMatcher(conn), + buildTableAttributesMatcher(TableName.META_TABLE_NAME))); } @Test @@ -126,15 +163,14 @@ public void testGetRegionLocation() { conn.getLocator().getRegionLocation(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, RegionLocateType.CURRENT, TimeUnit.SECONDS.toNanos(1)).join(); SpanData span = waitSpan("AsyncRegionLocator.getRegionLocation"); - assertEquals(StatusCode.OK, span.getStatus().getStatusCode()); - assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(), - span.getAttributes().get(HBaseSemanticAttributes.NAMESPACE_KEY)); - assertEquals(TableName.META_TABLE_NAME.getNameAsString(), - span.getAttributes().get(HBaseSemanticAttributes.TABLE_KEY)); - List regionNames = span.getAttributes().get(HBaseSemanticAttributes.REGION_NAMES_KEY); - assertEquals(1, regionNames.size()); - assertEquals(locs.getDefaultRegionLocation().getRegion().getRegionNameAsString(), - regionNames.get(0)); + assertThat(span, allOf( + hasStatusWithCode(StatusCode.OK), + hasKind(SpanKind.CLIENT), + buildConnectionAttributesMatcher(conn), + buildTableAttributesMatcher(TableName.META_TABLE_NAME), + hasAttributes( + containsEntryWithStringValuesOf("db.hbase.regions", + locs.getDefaultRegionLocation().getRegion().getRegionNameAsString())))); } @Test @@ -142,16 +178,16 @@ public void testGetRegionLocations() { conn.getLocator().getRegionLocations(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, RegionLocateType.CURRENT, false, TimeUnit.SECONDS.toNanos(1)).join(); SpanData span = waitSpan("AsyncRegionLocator.getRegionLocations"); - assertEquals(StatusCode.OK, span.getStatus().getStatusCode()); - assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(), - span.getAttributes().get(HBaseSemanticAttributes.NAMESPACE_KEY)); - assertEquals(TableName.META_TABLE_NAME.getNameAsString(), - span.getAttributes().get(HBaseSemanticAttributes.TABLE_KEY)); - List regionNames = span.getAttributes().get(HBaseSemanticAttributes.REGION_NAMES_KEY); - assertEquals(3, regionNames.size()); - for (int i = 0; i < 3; i++) { - assertEquals(locs.getRegionLocation(i).getRegion().getRegionNameAsString(), - regionNames.get(i)); - } + String[] expectedRegions = Arrays.stream(locs.getRegionLocations()) + .map(HRegionLocation::getRegion) + .map(RegionInfo::getRegionNameAsString) + .toArray(String[]::new); + assertThat(span, allOf( + hasStatusWithCode(StatusCode.OK), + hasKind(SpanKind.CLIENT), + buildConnectionAttributesMatcher(conn), + buildTableAttributesMatcher(TableName.META_TABLE_NAME), + hasAttributes( + containsEntryWithStringValuesOf("db.hbase.regions", containsInAnyOrder(expectedRegions))))); } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java index af7ee8b2bafc..d507a312edc1 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java @@ -23,11 +23,12 @@ import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; +import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher; +import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.hasItem; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -99,7 +101,7 @@ public class TestAsyncTableTracing { private ClientService.Interface stub; - private AsyncConnection conn; + private AsyncConnectionImpl conn; private AsyncTable table; @@ -197,8 +199,9 @@ public Void answer(InvocationOnMock invocation) throws Throwable { return null; } }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any()); + final User user = UserProvider.instantiate(CONF).getCurrent(); conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test", - UserProvider.instantiate(CONF).getCurrent()) { + user) { @Override AsyncRegionLocator getLocator() { @@ -236,22 +239,13 @@ public void tearDown() throws IOException { Closeables.close(conn, true); } - /** - * All {@link Span}s generated from table data access operations over {@code tableName} should - * include these attributes. - */ - static Matcher buildBaseAttributesMatcher(TableName tableName) { - return hasAttributes(allOf( - containsEntry("db.name", tableName.getNamespaceAsString()), - containsEntry("db.hbase.namespace", tableName.getNamespaceAsString()), - containsEntry("db.hbase.table", tableName.getNameAsString()))); - } - private void assertTrace(String tableOperation) { assertTrace(tableOperation, new IsAnything<>()); } private void assertTrace(String tableOperation, Matcher matcher) { + // n.b. this method implementation must match the one of the same name found in + // TestHTableTracing final TableName tableName = table.getName(); final Matcher spanLocator = allOf( hasName(containsString(tableOperation)), hasEnded()); @@ -269,7 +263,8 @@ private void assertTrace(String tableOperation, Matcher matcher) { hasName(expectedName), hasKind(SpanKind.CLIENT), hasStatusWithCode(StatusCode.OK), - buildBaseAttributesMatcher(tableName), + buildConnectionAttributesMatcher(conn), + buildTableAttributesMatcher(tableName), matcher)); } @@ -524,16 +519,4 @@ public void testBatchAll() { table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); assertTrace("BATCH"); } - - @Test - public void testConnClose() throws IOException { - conn.close(); - Waiter.waitFor(CONF, 1000, - () -> traceRule.getSpans().stream() - .anyMatch(span -> span.getName().equals("AsyncConnection.close") && - span.getKind() == SpanKind.INTERNAL && span.hasEnded())); - SpanData data = traceRule.getSpans().stream() - .filter(s -> s.getName().equals("AsyncConnection.close")).findFirst().get(); - assertEquals(StatusCode.OK, data.getStatus().getStatusCode()); - } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java index 8e6409ef3c92..80db9a122e37 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java @@ -17,11 +17,12 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.TestAsyncTableTracing.buildBaseAttributesMatcher; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; +import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher; +import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; @@ -238,6 +239,8 @@ private void assertTrace(String tableOperation) { } private void assertTrace(String tableOperation, Matcher matcher) { + // n.b. this method implementation must match the one of the same name found in + // TestAsyncTableTracing final TableName tableName = table.getName(); final Matcher spanLocator = allOf( hasName(containsString(tableOperation)), hasEnded()); @@ -255,7 +258,8 @@ private void assertTrace(String tableOperation, Matcher matcher) { hasName(expectedName), hasKind(SpanKind.CLIENT), hasStatusWithCode(StatusCode.OK), - buildBaseAttributesMatcher(tableName), + buildConnectionAttributesMatcher(conn), + buildTableAttributesMatcher(tableName), matcher)); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocatorTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocatorTracing.java index 8d93d2ae2975..a0415da28d05 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocatorTracing.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocatorTracing.java @@ -17,9 +17,23 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; +import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher; +import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsInAnyOrder; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; +import java.util.Arrays; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.testclassification.ClientTests; @@ -51,33 +65,65 @@ public void tearDown() throws IOException { Closeables.close(conn, true); } - @Test public void testGetRegionLocation() throws IOException { conn.getRegionLocator(TableName.META_TABLE_NAME).getRegionLocation(HConstants.EMPTY_START_ROW); - assertTrace(HRegionLocator.class.getSimpleName(), "getRegionLocation", - null, TableName.META_TABLE_NAME); + SpanData span = waitSpan("HRegionLocator.getRegionLocation"); + assertThat(span, allOf( + hasStatusWithCode(StatusCode.OK), + hasKind(SpanKind.CLIENT), + buildConnectionAttributesMatcher(conn), + buildTableAttributesMatcher(TableName.META_TABLE_NAME), + hasAttributes( + containsEntryWithStringValuesOf("db.hbase.regions", + META_REGION_LOCATION.getDefaultRegionLocation().getRegion().getRegionNameAsString())))); } @Test public void testGetRegionLocations() throws IOException { conn.getRegionLocator(TableName.META_TABLE_NAME).getRegionLocations(HConstants.EMPTY_START_ROW); - assertTrace(HRegionLocator.class.getSimpleName(), "getRegionLocations", - null, TableName.META_TABLE_NAME); + SpanData span = waitSpan("HRegionLocator.getRegionLocations"); + // TODO: Use a value of `META_REGION_LOCATION` that contains multiple region locations. + String[] expectedRegions = Arrays.stream(META_REGION_LOCATION.getRegionLocations()) + .map(HRegionLocation::getRegion) + .map(RegionInfo::getRegionNameAsString) + .toArray(String[]::new); + assertThat(span, allOf( + hasStatusWithCode(StatusCode.OK), + hasKind(SpanKind.CLIENT), + buildConnectionAttributesMatcher(conn), + buildTableAttributesMatcher(TableName.META_TABLE_NAME), + hasAttributes( + containsEntryWithStringValuesOf("db.hbase.regions", containsInAnyOrder(expectedRegions))))); } @Test public void testGetAllRegionLocations() throws IOException { conn.getRegionLocator(TableName.META_TABLE_NAME).getAllRegionLocations(); - assertTrace(HRegionLocator.class.getSimpleName(), "getAllRegionLocations", - null, TableName.META_TABLE_NAME); + SpanData span = waitSpan("HRegionLocator.getAllRegionLocations"); + // TODO: Use a value of `META_REGION_LOCATION` that contains multiple region locations. + String[] expectedRegions = Arrays.stream(META_REGION_LOCATION.getRegionLocations()) + .map(HRegionLocation::getRegion) + .map(RegionInfo::getRegionNameAsString) + .toArray(String[]::new); + assertThat(span, allOf( + hasStatusWithCode(StatusCode.OK), + hasKind(SpanKind.CLIENT), + buildConnectionAttributesMatcher(conn), + buildTableAttributesMatcher(TableName.META_TABLE_NAME), + hasAttributes( + containsEntryWithStringValuesOf("db.hbase.regions", containsInAnyOrder(expectedRegions))))); } @Test public void testClearRegionLocationCache() throws IOException { conn.getRegionLocator(TableName.META_TABLE_NAME).clearRegionLocationCache(); - assertTrace(HRegionLocator.class.getSimpleName(), "clearRegionLocationCache", - null, TableName.META_TABLE_NAME); + SpanData span = waitSpan("HRegionLocator.clearRegionLocationCache"); + assertThat(span, allOf( + hasStatusWithCode(StatusCode.OK), + hasKind(SpanKind.CLIENT), + buildConnectionAttributesMatcher(conn), + buildTableAttributesMatcher(TableName.META_TABLE_NAME))); } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTracingBase.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTracingBase.java index a172733faed6..c2067e75c671 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTracingBase.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTracingBase.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import io.opentelemetry.api.trace.SpanKind; @@ -30,15 +34,20 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MatcherPredicate; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; +import org.hamcrest.Matcher; import org.junit.Before; import org.junit.ClassRule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestTracingBase { + private static final Logger LOG = LoggerFactory.getLogger(TestTracingBase.class); protected static final ServerName MASTER_HOST = ServerName.valueOf("localhost", 16010, 12345); protected static final RegionLocations META_REGION_LOCATION = @@ -86,6 +95,28 @@ protected void assertTrace(String className, String methodName, ServerName serve } } + protected SpanData waitSpan(String name) { + return waitSpan(hasName(name)); + } + + protected SpanData waitSpan(Matcher matcher) { + Matcher spanLocator = allOf(matcher, hasEnded()); + try { + Waiter.waitFor(conf, 1000, new MatcherPredicate<>( + "waiting for span", + () -> TRACE_RULE.getSpans(), hasItem(spanLocator))); + } catch (AssertionError e) { + LOG.error("AssertionError while waiting for matching span. Span reservoir contains: {}", + TRACE_RULE.getSpans()); + throw e; + } + return TRACE_RULE.getSpans() + .stream() + .filter(spanLocator::matches) + .findFirst() + .orElseThrow(AssertionError::new); + } + static class RegistryForTracingTest implements ConnectionRegistry { public RegistryForTracingTest(Configuration conf) { @@ -106,6 +137,10 @@ public CompletableFuture getActiveMaster() { return CompletableFuture.completedFuture(MASTER_HOST); } + @Override public String getConnectionString() { + return "nothing"; + } + @Override public void close() { } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java index c3bf3bee59e5..c7bb205076cd 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java @@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.hasProperty; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; +import java.util.Arrays; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; @@ -48,6 +49,17 @@ public static Matcher containsEntry(String key, String value) { return containsEntry(AttributeKey.stringKey(key), value); } + public static Matcher containsEntryWithStringValuesOf(String key, String... values) { + return containsEntry(AttributeKey.stringArrayKey(key), Arrays.asList(values)); + } + + public static Matcher containsEntryWithStringValuesOf( + String key, + Matcher> matcher + ) { + return new IsAttributesContaining<>(equalTo(AttributeKey.stringArrayKey(key)), matcher); + } + private static final class IsAttributesContaining extends TypeSafeMatcher { private final Matcher> keyMatcher; private final Matcher valueMatcher; diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/TraceTestUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/TraceTestUtil.java new file mode 100644 index 000000000000..990621ad3a65 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/TraceTestUtil.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client.trace.hamcrest; + +import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes; +import static org.hamcrest.Matchers.allOf; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.sdk.trace.data.SpanData; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncConnectionImpl; +import org.apache.hadoop.hbase.client.ConnectionImplementation; +import org.hamcrest.Matcher; + +public final class TraceTestUtil { + + private TraceTestUtil() { } + + /** + * All {@link Span}s involving {@code conn} should include these attributes. + */ + public static Matcher buildConnectionAttributesMatcher(AsyncConnectionImpl conn) { + return hasAttributes(allOf( + containsEntry("db.system", "hbase"), + containsEntry("db.connection_string", "nothing"), + containsEntry("db.user", conn.getUser().toString()))); + } + + /** + * @see #buildConnectionAttributesMatcher(AsyncConnectionImpl) + */ + public static Matcher buildConnectionAttributesMatcher(ConnectionImplementation conn) { + return hasAttributes(allOf( + containsEntry("db.system", "hbase"), + containsEntry("db.connection_string", "nothing"), + containsEntry("db.user", conn.getUser().toString()))); + } + + /** + * All {@link Span}s involving {@code tableName} should include these attributes. + */ + public static Matcher buildTableAttributesMatcher(TableName tableName) { + return hasAttributes(allOf( + containsEntry("db.name", tableName.getNamespaceAsString()), + containsEntry("db.hbase.namespace", tableName.getNamespaceAsString()), + containsEntry("db.hbase.table", tableName.getNameAsString()))); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java index 90c3c858a706..59c372f8a782 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java @@ -28,6 +28,11 @@ */ @InterfaceAudience.Private public final class HBaseSemanticAttributes { + public static final AttributeKey DB_SYSTEM = SemanticAttributes.DB_SYSTEM; + public static final String DB_SYSTEM_VALUE = SemanticAttributes.DbSystemValues.HBASE; + public static final AttributeKey DB_CONNECTION_STRING = + SemanticAttributes.DB_CONNECTION_STRING; + public static final AttributeKey DB_USER = SemanticAttributes.DB_USER; public static final AttributeKey DB_NAME = SemanticAttributes.DB_NAME; public static final AttributeKey NAMESPACE_KEY = SemanticAttributes.DB_HBASE_NAMESPACE; public static final AttributeKey DB_OPERATION = SemanticAttributes.DB_OPERATION; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java index 1c428ae5608d..a01962e9946b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.trace; -import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY; -import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; @@ -30,7 +28,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Version; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -52,15 +49,6 @@ public static Span createSpan(String name) { return createSpan(name, SpanKind.INTERNAL); } - /** - * Create a {@link SpanKind#INTERNAL} span and set table related attributes. - */ - public static Span createTableSpan(String spanName, TableName tableName) { - return createSpan(spanName) - .setAttribute(NAMESPACE_KEY, tableName.getNamespaceAsString()) - .setAttribute(TABLE_KEY, tableName.getNameAsString()); - } - /** * Create a span with the given {@code kind}. Notice that, OpenTelemetry only expects one * {@link SpanKind#CLIENT} span and one {@link SpanKind#SERVER} span for a traced request, so use diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RegionServerRegistry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RegionServerRegistry.java index cdfbb6d925f4..93eb7e8b9996 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RegionServerRegistry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RegionServerRegistry.java @@ -74,6 +74,11 @@ public CompletableFuture getActiveMaster() { return future; } + @Override + public String getConnectionString() { + return "short-circuit"; + } + @Override public void close() { // nothing From d7a0b8640837236f79972382468ff017710d1455 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Thu, 6 Jan 2022 13:53:14 -0800 Subject: [PATCH 2/4] Address precommit static analysis nits --- .../hadoop/hbase/client/trace/ConnectionSpanBuilder.java | 1 - .../hadoop/hbase/client/trace/TableOperationSpanBuilder.java | 1 - .../org/apache/hadoop/hbase/client/TestAsyncTableTracing.java | 3 --- .../hadoop/hbase/client/trace/hamcrest/TraceTestUtil.java | 1 + 4 files changed, 1 insertion(+), 5 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java index afb34cad15c2..93a9d8fd983f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java @@ -32,7 +32,6 @@ import java.util.function.Supplier; import org.apache.hadoop.hbase.client.AsyncConnectionImpl; import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.ConnectionImplementation; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.yetus.audience.InterfaceAudience; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java index e8620de2a6c5..2b9314a7ee84 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.client.AsyncConnectionImpl; import org.apache.hadoop.hbase.client.CheckAndMutate; import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.ConnectionImplementation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java index d507a312edc1..05a8ec3e21d4 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; -import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; @@ -35,7 +33,6 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/TraceTestUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/TraceTestUtil.java index 990621ad3a65..21d37e84c664 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/TraceTestUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/TraceTestUtil.java @@ -43,6 +43,7 @@ public static Matcher buildConnectionAttributesMatcher(AsyncConnection } /** + * All {@link Span}s involving {@code conn} should include these attributes. * @see #buildConnectionAttributesMatcher(AsyncConnectionImpl) */ public static Matcher buildConnectionAttributesMatcher(ConnectionImplementation conn) { From b28307b10fa26d945d5caa87290db072a5d71475 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Tue, 11 Jan 2022 18:45:51 -0800 Subject: [PATCH 3/4] undo changes in population of RegionLocation return values --- .../hadoop/hbase/client/HRegionLocator.java | 46 +++++++++++-------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java index 6b56b05e288e..b62f090fbf48 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java @@ -25,10 +25,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MetaTableAccessor; @@ -97,12 +99,28 @@ public List getAllRegionLocations() throws IOException { final Supplier supplier = new TableSpanBuilder(connection) .setName("HRegionLocator.getAllRegionLocations") .setTableName(tableName); - final RegionLocations locs = tracedLocationFuture(() -> { - final RegionLocations rlocs = listRegionLocations(); - connection.cacheLocation(tableName, rlocs); - return rlocs; - }, AsyncRegionLocator::getRegionNames, supplier); - return Arrays.asList(locs.getRegionLocations()); + return tracedLocationFuture(() -> { + ArrayList regions = new ArrayList<>(); + for (RegionLocations locations : listRegionLocations()) { + for (HRegionLocation location : locations.getRegionLocations()) { + regions.add(location); + } + connection.cacheLocation(tableName, locations); + } + return regions; + }, HRegionLocator::getRegionNames, supplier); + } + + private static List getRegionNames(List locations) { + if (CollectionUtils.isEmpty(locations)) { + return Collections.emptyList(); + } + return locations.stream() + .filter(Objects::nonNull) + .map(AsyncRegionLocator::getRegionNames) + .filter(Objects::nonNull) + .flatMap(List::stream) + .collect(Collectors.toList()); } @Override @@ -118,9 +136,10 @@ public TableName getName() { return this.tableName; } - private RegionLocations listRegionLocations() throws IOException { + private List listRegionLocations() throws IOException { if (TableName.isMetaTableName(tableName)) { - return connection.locateRegion(tableName, HConstants.EMPTY_START_ROW, false, true); + return Collections + .singletonList(connection.locateRegion(tableName, HConstants.EMPTY_START_ROW, false, true)); } final List regions = new ArrayList<>(); MetaTableAccessor.Visitor visitor = new MetaTableAccessor.TableVisitorBase(tableName) { @@ -135,16 +154,7 @@ public boolean visitInternal(Result result) throws IOException { } }; MetaTableAccessor.scanMetaForTableRegions(connection, visitor, tableName); - return consolidate(regions); - } - - private static RegionLocations consolidate(final List locations) { - final HRegionLocation[] consolidated = locations.stream() - .filter(Objects::nonNull) - .flatMap(locs -> Arrays.stream(locs.getRegionLocations())) - .filter(Objects::nonNull) - .toArray(HRegionLocation[]::new); - return new RegionLocations(consolidated); + return regions; } private T tracedLocationFuture( From b8d25a10c0823bc4f2a88c8a765af2ba18fd1fe1 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Wed, 12 Jan 2022 09:14:31 -0800 Subject: [PATCH 4/4] PR Feedback Co-authored-by: Josh Elser --- .../java/org/apache/hadoop/hbase/client/ClusterConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index 921cc9f3ff65..277056137f76 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -337,7 +337,7 @@ List getLiveRegionServers(Supplier masterAddrTracker, in List getAllBootstrapNodes(ServerName regionServer) throws IOException; /** - * Get the {@link User} associated with this connection. Maybe be {@code null}. + * Get the {@link User} associated with this connection. May be {@code null}. */ User getUser();