Skip to content

Commit

Permalink
HBASE-23898 Add trace support for simple apis in async client (#2813)
Browse files Browse the repository at this point in the history
Signed-off-by: Guanghao Zhang <zghao@apache.org>
  • Loading branch information
Apache9 committed Apr 9, 2021
1 parent 8874284 commit 852bbd3
Show file tree
Hide file tree
Showing 16 changed files with 1,169 additions and 346 deletions.
10 changes: 10 additions & 0 deletions hbase-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@
<groupId>org.jruby.joni</groupId>
<artifactId>joni</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public interface AsyncConnection extends Closeable {
/**
* Retrieve an {@link AsyncTable} implementation for accessing a table.
* <p>
* The returned instance will use default configs. Use {@link #getTableBuilder(TableName)} if
* you want to customize some configs.
* The returned instance will use default configs. Use {@link #getTableBuilder(TableName)} if you
* want to customize some configs.
* <p>
* This method no longer checks table existence. An exception will be thrown if the table does not
* exist only when the first operation is attempted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Optional;
Expand All @@ -51,6 +53,7 @@
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -121,7 +124,7 @@ class AsyncConnectionImpl implements AsyncConnection {
private volatile ConnectionOverAsyncConnection conn;

public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
SocketAddress localAddress, User user) {
SocketAddress localAddress, User user) {
this.conf = conf;
this.user = user;

Expand All @@ -135,8 +138,8 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri
} else {
this.metrics = Optional.empty();
}
this.rpcClient = RpcClientFactory.createClient(
conf, clusterId, localAddress, metrics.orElse(null));
this.rpcClient =
RpcClientFactory.createClient(conf, clusterId, localAddress, metrics.orElse(null));
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.rpcTimeout =
(int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
Expand All @@ -159,14 +162,13 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri
LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS);
} else {
try {
listener = new ClusterStatusListener(
new ClusterStatusListener.DeadServerHandler() {
@Override
public void newDead(ServerName sn) {
locator.clearCache(sn);
rpcClient.cancelConnections(sn);
}
}, conf, listenerClass);
listener = new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler() {
@Override
public void newDead(ServerName sn) {
locator.clearCache(sn);
rpcClient.cancelConnections(sn);
}
}, conf, listenerClass);
} catch (IOException e) {
LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e);
}
Expand Down Expand Up @@ -206,28 +208,30 @@ public boolean isClosed() {

@Override
public void close() {
if (!closed.compareAndSet(false, true)) {
return;
}
LOG.info("Connection has been closed by {}.", Thread.currentThread().getName());
if(LOG.isDebugEnabled()){
logCallStack(Thread.currentThread().getStackTrace());
}
IOUtils.closeQuietly(clusterStatusListener,
e -> LOG.warn("failed to close clusterStatusListener", e));
IOUtils.closeQuietly(rpcClient, e -> LOG.warn("failed to close rpcClient", e));
IOUtils.closeQuietly(registry, e -> LOG.warn("failed to close registry", e));
synchronized (this) {
if (choreService != null) {
choreService.shutdown();
choreService = null;
TraceUtil.trace(() -> {
if (!closed.compareAndSet(false, true)) {
return;
}
}
metrics.ifPresent(MetricsConnection::shutdown);
ConnectionOverAsyncConnection c = this.conn;
if (c != null) {
c.closePool();
}
LOG.info("Connection has been closed by {}.", Thread.currentThread().getName());
if (LOG.isDebugEnabled()) {
logCallStack(Thread.currentThread().getStackTrace());
}
IOUtils.closeQuietly(clusterStatusListener,
e -> LOG.warn("failed to close clusterStatusListener", e));
IOUtils.closeQuietly(rpcClient, e -> LOG.warn("failed to close rpcClient", e));
IOUtils.closeQuietly(registry, e -> LOG.warn("failed to close registry", e));
synchronized (this) {
if (choreService != null) {
choreService.shutdown();
choreService = null;
}
}
metrics.ifPresent(MetricsConnection::shutdown);
ConnectionOverAsyncConnection c = this.conn;
if (c != null) {
c.closePool();
}
}, "AsyncConnection.close");
}

private void logCallStack(StackTraceElement[] stackTraceElements) {
Expand Down Expand Up @@ -341,7 +345,7 @@ public AsyncTable<AdvancedScanResultConsumer> build() {

@Override
public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName,
ExecutorService pool) {
ExecutorService pool) {
return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) {

@Override
Expand Down Expand Up @@ -382,7 +386,7 @@ public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName

@Override
public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
ExecutorService pool) {
ExecutorService pool) {
return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
RETRY_TIMER);
}
Expand All @@ -406,28 +410,36 @@ public Connection toConnection() {

@Override
public CompletableFuture<Hbck> getHbck() {
CompletableFuture<Hbck> future = new CompletableFuture<>();
addListener(registry.getActiveMaster(), (sn, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else {
try {
future.complete(getHbck(sn));
} catch (IOException e) {
future.completeExceptionally(e);
return TraceUtil.tracedFuture(() -> {
CompletableFuture<Hbck> future = new CompletableFuture<>();
addListener(registry.getActiveMaster(), (sn, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else {
try {
future.complete(getHbck(sn));
} catch (IOException e) {
future.completeExceptionally(e);
}
}
}
});
return future;
});
return future;
}, getClass().getName() + ".getHbck");
}

@Override
public Hbck getHbck(ServerName masterServer) throws IOException {
// we will not create a new connection when creating a new protobuf stub, and for hbck there
// will be no performance consideration, so for simplification we will create a new stub every
// time instead of caching the stub here.
return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
Span span = TraceUtil.createSpan(getClass().getName() + ".getHbck")
.setAttribute(TraceUtil.SERVER_NAME_KEY, masterServer.getServerName());
try (Scope scope = span.makeCurrent()) {
// we will not create a new connection when creating a new protobuf stub, and for hbck there
// will be no performance consideration, so for simplification we will create a new stub every
// time instead of caching the stub here.
return new HBaseHbck(
MasterProtos.HbckService
.newBlockingStub(rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)),
rpcControllerFactory);
}
}

Optional<MetricsConnection> getConnectionMetrics() {
Expand Down
Loading

0 comments on commit 852bbd3

Please sign in to comment.