Skip to content

Commit

Permalink
HBASE-26474 Implement connection-level attributes (apache#3952)
Browse files Browse the repository at this point in the history
Add support for `db.system`, `db.connection_string`, `db.user`.

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
ndimiduk committed Jan 6, 2022
1 parent 755b3b4 commit 486eb4d
Show file tree
Hide file tree
Showing 26 changed files with 737 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ public CompletableFuture<ServerName> getActiveMaster() {
getClass().getSimpleName() + ".getActiveMaster");
}

@Override
public String getConnectionString() {
return "unimplemented";
}

@Override
public void close() {
trace(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
* The implementation of AsyncConnection.
*/
@InterfaceAudience.Private
class AsyncConnectionImpl implements AsyncConnection {
public class AsyncConnectionImpl implements AsyncConnection {

private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);

Expand Down Expand Up @@ -191,6 +191,14 @@ synchronized ChoreService getChoreService() {
return choreService;
}

public User getUser() {
return user;
}

public ConnectionRegistry getConnectionRegistry() {
return registry;
}

@Override
public Configuration getConfiguration() {
return conf;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -20,24 +20,27 @@
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY;
import static org.apache.hadoop.hbase.trace.TraceUtil.createSpan;
import static org.apache.hadoop.hbase.trace.TraceUtil.createTableSpan;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.trace.ConnectionSpanBuilder;
import org.apache.hadoop.hbase.client.trace.TableSpanBuilder;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -95,9 +98,12 @@ private boolean isMeta(TableName tableName) {
return TableName.isMetaTableName(tableName);
}

private <T> CompletableFuture<T> tracedLocationFuture(Supplier<CompletableFuture<T>> action,
Function<T, List<String>> getRegionNames, TableName tableName, String methodName) {
Span span = createTableSpan("AsyncRegionLocator." + methodName, tableName);
private <T> CompletableFuture<T> tracedLocationFuture(
Supplier<CompletableFuture<T>> action,
Function<T, List<String>> getRegionNames,
Supplier<Span> spanSupplier
) {
final Span span = spanSupplier.get();
try (Scope scope = span.makeCurrent()) {
CompletableFuture<T> future = action.get();
FutureUtils.addListener(future, (resp, error) -> {
Expand All @@ -116,18 +122,30 @@ private <T> CompletableFuture<T> tracedLocationFuture(Supplier<CompletableFuture
}
}

private List<String> getRegionName(RegionLocations locs) {
List<String> names = new ArrayList<>();
for (HRegionLocation loc : locs.getRegionLocations()) {
if (loc != null) {
names.add(loc.getRegion().getRegionNameAsString());
}
static List<String> getRegionNames(RegionLocations locs) {
if (locs == null || locs.getRegionLocations() == null) {
return Collections.emptyList();
}
return names;
return Arrays.stream(locs.getRegionLocations())
.filter(Objects::nonNull)
.map(HRegionLocation::getRegion)
.map(RegionInfo::getRegionNameAsString)
.collect(Collectors.toList());
}

static List<String> getRegionNames(HRegionLocation location) {
return Optional.ofNullable(location)
.map(HRegionLocation::getRegion)
.map(RegionInfo::getRegionNameAsString)
.map(Collections::singletonList)
.orElseGet(Collections::emptyList);
}

CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
RegionLocateType type, boolean reload, long timeoutNs) {
final Supplier<Span> supplier = new TableSpanBuilder(conn)
.setName("AsyncRegionLocator.getRegionLocations")
.setTableName(tableName);
return tracedLocationFuture(() -> {
CompletableFuture<RegionLocations> future = isMeta(tableName) ?
metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) :
Expand All @@ -137,11 +155,14 @@ CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
"ms) waiting for region locations for " + tableName + ", row='" +
Bytes.toStringBinary(row) + "'");
}, this::getRegionName, tableName, "getRegionLocations");
}, AsyncRegionLocator::getRegionNames, supplier);
}

CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
final Supplier<Span> supplier = new TableSpanBuilder(conn)
.setName("AsyncRegionLocator.getRegionLocation")
.setTableName(tableName);
return tracedLocationFuture(() -> {
// meta region can not be split right now so we always call the same method.
// Change it later if the meta table can have more than one regions.
Expand Down Expand Up @@ -172,8 +193,7 @@ CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[]
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
"ms) waiting for region location for " + tableName + ", row='" +
Bytes.toStringBinary(row) + "', replicaId=" + replicaId);
}, loc -> Arrays.asList(loc.getRegion().getRegionNameAsString()), tableName,
"getRegionLocation");
}, AsyncRegionLocator::getRegionNames, supplier);
}

CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
Expand Down Expand Up @@ -201,31 +221,38 @@ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
}

void clearCache(TableName tableName) {
Supplier<Span> supplier = new TableSpanBuilder(conn)
.setName("AsyncRegionLocator.clearCache")
.setTableName(tableName);
TraceUtil.trace(() -> {
LOG.debug("Clear meta cache for {}", tableName);
if (tableName.equals(META_TABLE_NAME)) {
metaRegionLocator.clearCache();
} else {
nonMetaRegionLocator.clearCache(tableName);
}
}, () -> createTableSpan("AsyncRegionLocator.clearCache", tableName));
}, supplier);
}

void clearCache(ServerName serverName) {
Supplier<Span> supplier = new ConnectionSpanBuilder(conn)
.setName("AsyncRegionLocator.clearCache")
.addAttribute(SERVER_NAME_KEY, serverName.getServerName());
TraceUtil.trace(() -> {
LOG.debug("Clear meta cache for {}", serverName);
metaRegionLocator.clearCache(serverName);
nonMetaRegionLocator.clearCache(serverName);
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
}, () -> createSpan("AsyncRegionLocator.clearCache").setAttribute(SERVER_NAME_KEY,
serverName.getServerName()));
}, supplier);
}

void clearCache() {
Supplier<Span> supplier = new ConnectionSpanBuilder(conn)
.setName("AsyncRegionLocator.clearCache");
TraceUtil.trace(() -> {
metaRegionLocator.clearCache();
nonMetaRegionLocator.clearCache();
}, "AsyncRegionLocator.clearCache");
}, supplier);
}

AsyncNonMetaRegionLocator getNonMetaRegionLocator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
Expand Down Expand Up @@ -175,8 +176,7 @@ void updateCachedLocations(TableName tableName, byte[] regionName, byte[] rowkey
* question
* @throws IOException if a remote or network exception occurs
*/
HRegionLocation locateRegion(final byte[] regionName)
throws IOException;
HRegionLocation locateRegion(final byte[] regionName) throws IOException;

/**
* Gets the locations of all regions in the specified table, <i>tableName</i>.
Expand Down Expand Up @@ -335,4 +335,14 @@ List<ServerName> getLiveRegionServers(Supplier<ServerName> masterAddrTracker, in
* Get the bootstrap node list of another region server.
*/
List<ServerName> getAllBootstrapNodes(ServerName regionServer) throws IOException;

/**
* Get the {@link User} associated with this connection. Maybe be {@code null}.
*/
User getUser();

/**
* Get the {@link ConnectionRegistry} used to orient this cluster.
*/
ConnectionRegistry getConnectionRegistry();
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@
value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
justification="Access to the conncurrent hash map is under a lock so should be fine.")
@InterfaceAudience.Private
class ConnectionImplementation implements ClusterConnection, Closeable {
public class ConnectionImplementation implements ClusterConnection, Closeable {
public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class);

Expand Down Expand Up @@ -513,6 +513,16 @@ public MetricsConnection getConnectionMetrics() {
return this.metrics;
}

@Override
public User getUser() {
return user;
}

@Override
public ConnectionRegistry getConnectionRegistry() {
return registry;
}

private ThreadPoolExecutor getBatchPool() {
if (batchPool == null) {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* Internal use only.
*/
@InterfaceAudience.Private
interface ConnectionRegistry extends Closeable {
public interface ConnectionRegistry extends Closeable {

/**
* Get the location of meta region(s).
Expand All @@ -48,6 +48,13 @@ interface ConnectionRegistry extends Closeable {
*/
CompletableFuture<ServerName> getActiveMaster();

/**
* Return the connection string associated with this registry instance. This value is
* informational, used for annotating traces. Values returned may not be valid for establishing a
* working cluster connection.
*/
String getConnectionString();

/**
* Closes this instance and releases any system resources associated with it
*/
Expand Down
Loading

0 comments on commit 486eb4d

Please sign in to comment.