Skip to content

Commit

Permalink
HBASE-25454 Add trace support for connection registry (#2828)
Browse files Browse the repository at this point in the history
Signed-off-by: stack <stack@apache.org>
  • Loading branch information
Apache9 committed Mar 2, 2021
1 parent df1a8bb commit da4b212
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY;
import static org.apache.hadoop.hbase.trace.TraceUtil.trace;
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
import static org.apache.hadoop.hbase.util.DNS.getHostname;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

Expand Down Expand Up @@ -266,18 +268,23 @@ private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocatio

@Override
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return this.<GetMetaRegionLocationsResponse> call((c, s, d) -> s.getMetaRegionLocations(c,
GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0,
"getMetaLocationsCount").thenApply(MasterRegistry::transformMetaRegionLocations);
return tracedFuture(
() -> this
.<GetMetaRegionLocationsResponse> call(
(c, s, d) -> s.getMetaRegionLocations(c,
GetMetaRegionLocationsRequest.getDefaultInstance(), d),
r -> r.getMetaLocationsCount() != 0, "getMetaLocationsCount")
.thenApply(MasterRegistry::transformMetaRegionLocations),
"MasterRegistry.getMetaRegionLocations");
}

@Override
public CompletableFuture<String> getClusterId() {
return this
return tracedFuture(() -> this
.<GetClusterIdResponse> call(
(c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d),
GetClusterIdResponse::hasClusterId, "getClusterId()")
.thenApply(GetClusterIdResponse::getClusterId);
.thenApply(GetClusterIdResponse::getClusterId), "MasterRegistry.getClusterId");
}

private static boolean hasActiveMaster(GetMastersResponse resp) {
Expand All @@ -300,21 +307,23 @@ private static ServerName filterActiveMaster(GetMastersResponse resp) throws IOE

@Override
public CompletableFuture<ServerName> getActiveMaster() {
CompletableFuture<ServerName> future = new CompletableFuture<>();
addListener(call((c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
MasterRegistry::hasActiveMaster, "getMasters()"), (resp, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
}
ServerName result = null;
try {
result = filterActiveMaster((GetMastersResponse)resp);
} catch (IOException e) {
future.completeExceptionally(e);
}
future.complete(result);
});
return future;
return tracedFuture(() -> {
CompletableFuture<ServerName> future = new CompletableFuture<>();
addListener(call((c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
MasterRegistry::hasActiveMaster, "getMasters()"), (resp, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
}
ServerName result = null;
try {
result = filterActiveMaster((GetMastersResponse) resp);
} catch (IOException e) {
future.completeExceptionally(e);
}
future.complete(result);
});
return future;
}, "MasterRegistry.getActiveMaster");
}

private static List<ServerName> transformServerNames(GetMastersResponse resp) {
Expand All @@ -335,11 +344,13 @@ Set<ServerName> getParsedMasterServers() {

@Override
public void close() {
if (masterAddressRefresher != null) {
masterAddressRefresher.close();
}
if (rpcClient != null) {
rpcClient.close();
}
trace(() -> {
if (masterAddressRefresher != null) {
masterAddressRefresher.close();
}
if (rpcClient != null) {
rpcClient.close();
}
}, "MasterRegistry.close");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica;
import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;

Expand Down Expand Up @@ -94,7 +95,9 @@ private static String getClusterId(byte[] data) throws DeserializationException

@Override
public CompletableFuture<String> getClusterId() {
return getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId);
return tracedFuture(
() -> getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId),
"ZKConnectionRegistry.getClusterId");
}

ReadOnlyZKClient getZKClient() {
Expand Down Expand Up @@ -192,19 +195,20 @@ private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,

@Override
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
addListener(
zk.list(znodePaths.baseZNode)
.thenApply(children -> children.stream()
return tracedFuture(() -> {
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
addListener(
zk.list(znodePaths.baseZNode).thenApply(children -> children.stream()
.filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
(metaReplicaZNodes, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
getMetaRegionLocation(future, metaReplicaZNodes);
});
return future;
(metaReplicaZNodes, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
getMetaRegionLocation(future, metaReplicaZNodes);
});
return future;
}, "ZKConnectionRegistry.getMetaRegionLocations");
}

private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException {
Expand All @@ -218,15 +222,17 @@ private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOExcep

@Override
public CompletableFuture<ServerName> getActiveMaster() {
return getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto)
return tracedFuture(
() -> getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto)
.thenApply(proto -> {
if (proto == null) {
return null;
}
HBaseProtos.ServerName snProto = proto.getMaster();
return ServerName.valueOf(snProto.getHostName(), snProto.getPort(),
snProto.getStartCode());
});
}),
"ZKConnectionRegistry.getActiveMaster");
}

@Override
Expand Down

0 comments on commit da4b212

Please sign in to comment.