Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport "HBASE-26474 Implement connection-level attributes" to branch-2 #4014

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ public CompletableFuture<ServerName> getActiveMaster() {
getClass().getSimpleName() + ".getActiveMaster");
}

@Override
public String getConnectionString() {
return "unimplemented";
ndimiduk marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void close() {
trace(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
* The implementation of AsyncConnection.
*/
@InterfaceAudience.Private
class AsyncConnectionImpl implements AsyncConnection {
public class AsyncConnectionImpl implements AsyncConnection {

private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);

Expand Down Expand Up @@ -191,6 +191,14 @@ synchronized ChoreService getChoreService() {
return choreService;
}

public User getUser() {
return user;
}

public ConnectionRegistry getConnectionRegistry() {
return registry;
}

@Override
public Configuration getConfiguration() {
return conf;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -20,24 +20,27 @@
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY;
import static org.apache.hadoop.hbase.trace.TraceUtil.createSpan;
import static org.apache.hadoop.hbase.trace.TraceUtil.createTableSpan;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.trace.ConnectionSpanBuilder;
import org.apache.hadoop.hbase.client.trace.TableSpanBuilder;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -95,9 +98,12 @@ private boolean isMeta(TableName tableName) {
return TableName.isMetaTableName(tableName);
}

private <T> CompletableFuture<T> tracedLocationFuture(Supplier<CompletableFuture<T>> action,
Function<T, List<String>> getRegionNames, TableName tableName, String methodName) {
Span span = createTableSpan("AsyncRegionLocator." + methodName, tableName);
private <T> CompletableFuture<T> tracedLocationFuture(
Supplier<CompletableFuture<T>> action,
Function<T, List<String>> getRegionNames,
Supplier<Span> spanSupplier
) {
final Span span = spanSupplier.get();
try (Scope scope = span.makeCurrent()) {
CompletableFuture<T> future = action.get();
FutureUtils.addListener(future, (resp, error) -> {
Expand All @@ -116,18 +122,30 @@ private <T> CompletableFuture<T> tracedLocationFuture(Supplier<CompletableFuture
}
}

private List<String> getRegionName(RegionLocations locs) {
List<String> names = new ArrayList<>();
for (HRegionLocation loc : locs.getRegionLocations()) {
if (loc != null) {
names.add(loc.getRegion().getRegionNameAsString());
}
static List<String> getRegionNames(RegionLocations locs) {
if (locs == null || locs.getRegionLocations() == null) {
return Collections.emptyList();
}
return names;
return Arrays.stream(locs.getRegionLocations())
.filter(Objects::nonNull)
.map(HRegionLocation::getRegion)
.map(RegionInfo::getRegionNameAsString)
.collect(Collectors.toList());
}

static List<String> getRegionNames(HRegionLocation location) {
return Optional.ofNullable(location)
.map(HRegionLocation::getRegion)
.map(RegionInfo::getRegionNameAsString)
.map(Collections::singletonList)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will net an immutable list whereas Arrays.asList() (as previously used down below) was returning a mutable list. Hopefully we would not be altering this List, but noting a change in implementation to make sure it was intentional.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted. I had not considered mutability of the previous implementation. Consider this change to immutability a happy side-effect. At least on the master PR, this appears to have introduced no issues. Perhaps this is cause for some of the test failures I seem to have caused in the branch-2 backport. Let me investigate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No test failures after reverting the erroneous changes pointed out by Huaxiang, so I'll take that as acceptability of the immutable collection. Shout if that's not okay by you.

.orElseGet(Collections::emptyList);
}

CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
RegionLocateType type, boolean reload, long timeoutNs) {
final Supplier<Span> supplier = new TableSpanBuilder(conn)
.setName("AsyncRegionLocator.getRegionLocations")
.setTableName(tableName);
return tracedLocationFuture(() -> {
CompletableFuture<RegionLocations> future = isMeta(tableName) ?
metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) :
Expand All @@ -137,11 +155,14 @@ CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
"ms) waiting for region locations for " + tableName + ", row='" +
Bytes.toStringBinary(row) + "'");
}, this::getRegionName, tableName, "getRegionLocations");
}, AsyncRegionLocator::getRegionNames, supplier);
}

CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
final Supplier<Span> supplier = new TableSpanBuilder(conn)
.setName("AsyncRegionLocator.getRegionLocation")
.setTableName(tableName);
return tracedLocationFuture(() -> {
// meta region can not be split right now so we always call the same method.
// Change it later if the meta table can have more than one regions.
Expand Down Expand Up @@ -172,8 +193,7 @@ CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[]
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
"ms) waiting for region location for " + tableName + ", row='" +
Bytes.toStringBinary(row) + "', replicaId=" + replicaId);
}, loc -> Arrays.asList(loc.getRegion().getRegionNameAsString()), tableName,
"getRegionLocation");
}, AsyncRegionLocator::getRegionNames, supplier);
}

CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
Expand Down Expand Up @@ -201,31 +221,38 @@ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
}

void clearCache(TableName tableName) {
Supplier<Span> supplier = new TableSpanBuilder(conn)
.setName("AsyncRegionLocator.clearCache")
.setTableName(tableName);
TraceUtil.trace(() -> {
LOG.debug("Clear meta cache for {}", tableName);
if (tableName.equals(META_TABLE_NAME)) {
metaRegionLocator.clearCache();
} else {
nonMetaRegionLocator.clearCache(tableName);
}
}, () -> createTableSpan("AsyncRegionLocator.clearCache", tableName));
}, supplier);
}

void clearCache(ServerName serverName) {
Supplier<Span> supplier = new ConnectionSpanBuilder(conn)
.setName("AsyncRegionLocator.clearCache")
.addAttribute(SERVER_NAME_KEY, serverName.getServerName());
TraceUtil.trace(() -> {
LOG.debug("Clear meta cache for {}", serverName);
metaRegionLocator.clearCache(serverName);
nonMetaRegionLocator.clearCache(serverName);
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
}, () -> createSpan("AsyncRegionLocator.clearCache").setAttribute(SERVER_NAME_KEY,
serverName.getServerName()));
}, supplier);
}

void clearCache() {
Supplier<Span> supplier = new ConnectionSpanBuilder(conn)
.setName("AsyncRegionLocator.clearCache");
TraceUtil.trace(() -> {
metaRegionLocator.clearCache();
nonMetaRegionLocator.clearCache();
}, "AsyncRegionLocator.clearCache");
}, supplier);
}

AsyncNonMetaRegionLocator getNonMetaRegionLocator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
Expand Down Expand Up @@ -175,8 +176,7 @@ void updateCachedLocations(TableName tableName, byte[] regionName, byte[] rowkey
* question
* @throws IOException if a remote or network exception occurs
*/
HRegionLocation locateRegion(final byte[] regionName)
throws IOException;
HRegionLocation locateRegion(final byte[] regionName) throws IOException;

/**
* Gets the locations of all regions in the specified table, <i>tableName</i>.
Expand Down Expand Up @@ -335,4 +335,14 @@ List<ServerName> getLiveRegionServers(Supplier<ServerName> masterAddrTracker, in
* Get the bootstrap node list of another region server.
*/
List<ServerName> getAllBootstrapNodes(ServerName regionServer) throws IOException;

/**
* Get the {@link User} associated with this connection. May be {@code null}.
*/
User getUser();

/**
* Get the {@link ConnectionRegistry} used to orient this cluster.
*/
ConnectionRegistry getConnectionRegistry();
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@
value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
justification="Access to the conncurrent hash map is under a lock so should be fine.")
@InterfaceAudience.Private
class ConnectionImplementation implements ClusterConnection, Closeable {
public class ConnectionImplementation implements ClusterConnection, Closeable {
public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class);

Expand Down Expand Up @@ -513,6 +513,16 @@ public MetricsConnection getConnectionMetrics() {
return this.metrics;
}

@Override
public User getUser() {
return user;
}

@Override
public ConnectionRegistry getConnectionRegistry() {
return registry;
}

private ThreadPoolExecutor getBatchPool() {
if (batchPool == null) {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* Internal use only.
*/
@InterfaceAudience.Private
interface ConnectionRegistry extends Closeable {
public interface ConnectionRegistry extends Closeable {

/**
* Get the location of meta region(s).
Expand All @@ -48,6 +48,13 @@ interface ConnectionRegistry extends Closeable {
*/
CompletableFuture<ServerName> getActiveMaster();

/**
* Return the connection string associated with this registry instance. This value is
* informational, used for annotating traces. Values returned may not be valid for establishing a
* working cluster connection.
*/
String getConnectionString();

/**
* Closes this instance and releases any system resources associated with it
*/
Expand Down
Loading