diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java index d3bec8b3cfbf..f7fa7e9f03fa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; +import java.util.Collections; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ipc.HBaseRpcController; @@ -44,7 +45,7 @@ public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl con long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable callable) { super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, - operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, Collections.emptyMap()); this.serverName = serverName; this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 7a8bbeb9420b..c485a0a2c05c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -114,6 +114,8 @@ class AsyncBatchRpcRetryingCaller { private final HBaseServerExceptionPauseManager pauseManager; + private final Map requestAttributes; + // we can not use HRegionLocation as the map key because the hashCode and equals method of // HRegionLocation only consider serverName. private static final class RegionRequest { @@ -149,7 +151,8 @@ public int getPriority() { public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, TableName tableName, List actions, long pauseNs, long pauseNsForServerOverloaded, - int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, + Map requestAttributes) { this.retryTimer = retryTimer; this.conn = conn; this.tableName = tableName; @@ -180,6 +183,7 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, this.startNs = System.nanoTime(); this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs); + this.requestAttributes = requestAttributes; } private static boolean hasIncrementOrAppend(Row action) { @@ -392,6 +396,7 @@ private void sendToServer(ServerName serverName, ServerRequest serverReq, int tr HBaseRpcController controller = conn.rpcControllerFactory.newController(); resetController(controller, Math.min(rpcTimeoutNs, remainingNs), calcPriority(serverReq.getPriority(), tableName)); + controller.setRequestAttributes(requestAttributes); if (!cells.isEmpty()) { controller.setCellScanner(createCellScanner(cells)); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index ed381df7e0da..b61f5b80c9e7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -32,6 +32,7 @@ import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.context.Scope; import java.io.IOException; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -92,9 +93,12 @@ class AsyncClientScanner { private final Span span; + private final Map requestAttributes; + public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName, AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded, - int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, + Map requestAttributes) { if (scan.getStartRow() == null) { scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow()); } @@ -113,6 +117,7 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN this.rpcTimeoutNs = rpcTimeoutNs; this.startLogErrorsCnt = startLogErrorsCnt; this.resultCache = createScanResultCache(scan); + this.requestAttributes = requestAttributes; if (scan.isScanMetricsEnabled()) { this.scanMetrics = new ScanMetrics(); consumer.onScanMetricsCreated(scanMetrics); @@ -191,15 +196,17 @@ private CompletableFuture callOpenScanner(HBaseRpcControlle } private void startScan(OpenScannerResponse resp) { - addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()) - .location(resp.loc).remote(resp.isRegionServerRemote) - .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub) - .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache) - .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) - .start(resp.controller, resp.resp), (hasMore, error) -> { + addListener( + conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc) + .remote(resp.isRegionServerRemote) + .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub) + .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) + .setRequestAttributes(requestAttributes).start(resp.controller, resp.resp), + (hasMore, error) -> { try (Scope ignored = span.makeCurrent()) { if (error != null) { try { @@ -231,8 +238,8 @@ private CompletableFuture openScanner(int replicaId) { .priority(scan.getPriority()).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner) - .call(); + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) + .setRequestAttributes(requestAttributes).action(this::callOpenScanner).call(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 3af574cfc0b2..4900581c69ad 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -31,6 +31,8 @@ import io.opentelemetry.api.trace.Span; import java.io.IOException; import java.net.SocketAddress; +import java.util.Collections; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -127,6 +129,11 @@ public class AsyncConnectionImpl implements AsyncConnection { public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, SocketAddress localAddress, User user) { + this(conf, registry, clusterId, localAddress, user, Collections.emptyMap()); + } + + public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, + SocketAddress localAddress, User user, Map connectionAttributes) { this.conf = conf; this.user = user; this.metricsScope = MetricsConnection.getScope(conf, clusterId, this); @@ -142,8 +149,8 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri } else { this.metrics = Optional.empty(); } - this.rpcClient = - RpcClientFactory.createClient(conf, clusterId, localAddress, metrics.orElse(null)); + this.rpcClient = RpcClientFactory.createClient(conf, clusterId, localAddress, + metrics.orElse(null), connectionAttributes); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.rpcTimeout = (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs())); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java index c02b80c666ae..42585ea1c919 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener; +import java.util.Collections; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.ipc.HBaseRpcController; @@ -47,7 +48,7 @@ public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl Callable callable, int priority, long pauseNs, long pauseNsForServerOverloaded, int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxRetries, - operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, Collections.emptyMap()); this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index 8b317bfec2c2..c3dd8740854e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; @@ -78,7 +79,7 @@ public abstract class AsyncRpcRetryingCaller { public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, - long rpcTimeoutNs, int startLogErrorsCnt) { + long rpcTimeoutNs, int startLogErrorsCnt, Map requestAttributes) { this.retryTimer = retryTimer; this.conn = conn; this.priority = priority; @@ -89,6 +90,7 @@ public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int pr this.future = new CompletableFuture<>(); this.controller = conn.rpcControllerFactory.newController(); this.controller.setPriority(priority); + this.controller.setRequestAttributes(requestAttributes); this.exceptions = new ArrayList<>(); this.startNs = System.nanoTime(); this.pauseManager = diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 2d8e7b7aabe9..1ea2a1ad7dd4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -23,7 +23,9 @@ import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HRegionLocation; @@ -83,6 +85,8 @@ public class SingleRequestCallerBuilder extends BuilderBase { private int priority = PRIORITY_UNSET; + private Map requestAttributes = Collections.emptyMap(); + public SingleRequestCallerBuilder table(TableName tableName) { this.tableName = tableName; return this; @@ -144,6 +148,12 @@ public SingleRequestCallerBuilder priority(int priority) { return this; } + public SingleRequestCallerBuilder + setRequestAttributes(Map requestAttributes) { + this.requestAttributes = requestAttributes; + return this; + } + private void preCheck() { checkArgument(replicaId >= 0, "invalid replica id %s", replicaId); checkNotNull(tableName, "tableName is null"); @@ -157,7 +167,7 @@ public AsyncSingleRequestRpcRetryingCaller build() { preCheck(); return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId, locateType, callable, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, - operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes); } /** @@ -201,6 +211,8 @@ public class ScanSingleRegionCallerBuilder extends BuilderBase { private int priority = PRIORITY_UNSET; + private Map requestAttributes = Collections.emptyMap(); + public ScanSingleRegionCallerBuilder id(long scannerId) { this.scannerId = scannerId; return this; @@ -278,6 +290,12 @@ public ScanSingleRegionCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { return this; } + public ScanSingleRegionCallerBuilder + setRequestAttributes(Map requestAttributes) { + this.requestAttributes = requestAttributes; + return this; + } + private void preCheck() { checkArgument(scannerId != null, "invalid scannerId %d", scannerId); checkNotNull(scan, "scan is null"); @@ -293,7 +311,7 @@ public AsyncScanSingleRegionRpcRetryingCaller build() { return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics, scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority, scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, maxAttempts, - scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes); } /** @@ -322,6 +340,8 @@ public class BatchCallerBuilder extends BuilderBase { private long rpcTimeoutNs = -1L; + private Map requestAttributes = Collections.emptyMap(); + public BatchCallerBuilder table(TableName tableName) { this.tableName = tableName; return this; @@ -362,10 +382,15 @@ public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { return this; } + public BatchCallerBuilder setRequestAttributes(Map requestAttributes) { + this.requestAttributes = requestAttributes; + return this; + } + public AsyncBatchRpcRetryingCaller build() { return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs, pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, - startLogErrorsCnt); + startLogErrorsCnt, requestAttributes); } public List> call() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index ca39051de84d..a5d4ef6407e1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; @@ -316,7 +317,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc, boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, - int startLogErrorsCnt) { + int startLogErrorsCnt, Map requestAttributes) { this.retryTimer = retryTimer; this.conn = conn; this.scan = scan; @@ -341,6 +342,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI this.priority = priority; this.controller = conn.rpcControllerFactory.newController(); this.controller.setPriority(priority); + this.controller.setRequestAttributes(requestAttributes); this.exceptions = new ArrayList<>(); this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, scanTimeoutNs); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java index 40cd3b87e928..d4484ba87bf1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; +import java.util.Collections; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; @@ -49,7 +50,7 @@ public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable callable) { super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, pauseNsForServerOverloaded, maxAttempts, - operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, Collections.emptyMap()); this.serverName = serverName; this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 9c115af97b5b..a0d536aef5f7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.HRegionLocation; @@ -57,9 +58,10 @@ CompletableFuture call(HBaseRpcController controller, HRegionLocation loc, public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, TableName tableName, byte[] row, int replicaId, RegionLocateType locateType, Callable callable, int priority, long pauseNs, long pauseNsForServerOverloaded, - int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, + Map requestAttributes) { super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, - operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes); this.tableName = tableName; this.row = row; this.replicaId = replicaId; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index 3c03444cfbbc..2979c6689884 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -22,9 +22,11 @@ import static org.apache.hadoop.hbase.util.FutureUtils.allOf; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.TableName; @@ -110,6 +112,14 @@ public interface AsyncTable { */ long getScanTimeout(TimeUnit unit); + /** + * Get the map of request attributes + * @return a map of request attributes supplied by the client + */ + default Map getRequestAttributes() { + throw new NotImplementedException("Add an implementation!"); + } + /** * Test for the existence of columns in the table, as specified by the Get. *

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java index f6db89f82bf5..007f7ad48685 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java @@ -137,6 +137,11 @@ default AsyncTableBuilder setMaxRetries(int maxRetries) { */ AsyncTableBuilder setStartLogErrorsCnt(int startLogErrorsCnt); + /** + * Set a request attribute + */ + AsyncTableBuilder setRequestAttribute(String key, byte[] value); + /** * Create the {@link AsyncTable} instance. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java index 624d6e1dbb0a..02e9da0770b4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java @@ -19,6 +19,9 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -50,6 +53,8 @@ abstract class AsyncTableBuilderBase protected int startLogErrorsCnt; + protected Map requestAttributes = Collections.emptyMap(); + AsyncTableBuilderBase(TableName tableName, AsyncConnectionConfiguration connConf) { this.tableName = tableName; this.operationTimeoutNs = tableName.isSystemTable() @@ -121,4 +126,13 @@ public AsyncTableBuilderBase setStartLogErrorsCnt(int startLogErrorsCnt) { this.startLogErrorsCnt = startLogErrorsCnt; return this; } + + @Override + public AsyncTableBuilder setRequestAttribute(String key, byte[] value) { + if (this.requestAttributes.isEmpty()) { + this.requestAttributes = new HashMap<>(); + } + this.requestAttributes.put(key, value); + return this; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index e785e587ab36..590ee9bc47a3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -24,6 +24,7 @@ import io.opentelemetry.context.Scope; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -101,6 +102,11 @@ public long getScanTimeout(TimeUnit unit) { return rawTable.getScanTimeout(unit); } + @Override + public Map getRequestAttributes() { + return rawTable.getRequestAttributes(); + } + private CompletableFuture wrap(CompletableFuture future) { return FutureUtils.wrapFuture(future, pool); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java index 4d4559f4b7a9..ac70091dcf65 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.security.PrivilegedExceptionAction; +import java.util.Collections; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; @@ -216,21 +218,53 @@ public static Connection createConnection(Configuration conf, User user) throws */ public static Connection createConnection(Configuration conf, ExecutorService pool, final User user) throws IOException { + return createConnection(conf, pool, user, Collections.emptyMap()); + } + + /** + * Create a new Connection instance using the passed conf instance. Connection + * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces + * created from returned connection share zookeeper connection, meta cache, and connections to + * region servers and masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: + * + *

+   * Connection connection = ConnectionFactory.createConnection(conf);
+   * Table table = connection.getTable(TableName.valueOf("table1"));
+   * try {
+   *   table.get(...);
+   *   ...
+   * } finally {
+   *   table.close();
+   *   connection.close();
+   * }
+   * 
+ * + * @param conf configuration + * @param user the user the connection is for + * @param pool the thread pool to use for batch operations + * @param connectionAttributes attributes to be sent along to server during connection establish + * @return Connection object for conf + */ + public static Connection createConnection(Configuration conf, ExecutorService pool, + final User user, Map connectionAttributes) throws IOException { Class clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, ConnectionOverAsyncConnection.class, Connection.class); if (clazz != ConnectionOverAsyncConnection.class) { try { // Default HCM#HCI is not accessible; make it so before invoking. - Constructor constructor = - clazz.getDeclaredConstructor(Configuration.class, ExecutorService.class, User.class); + Constructor constructor = clazz.getDeclaredConstructor(Configuration.class, + ExecutorService.class, User.class, Map.class); constructor.setAccessible(true); - return user.runAs((PrivilegedExceptionAction< - Connection>) () -> (Connection) constructor.newInstance(conf, pool, user)); + return user.runAs((PrivilegedExceptionAction) () -> (Connection) constructor + .newInstance(conf, pool, user, connectionAttributes)); } catch (Exception e) { throw new IOException(e); } } else { - return FutureUtils.get(createAsyncConnection(conf, user)).toConnection(); + return FutureUtils.get(createAsyncConnection(conf, user, connectionAttributes)) + .toConnection(); } } @@ -281,6 +315,27 @@ public static CompletableFuture createAsyncConnection(Configura */ public static CompletableFuture createAsyncConnection(Configuration conf, final User user) { + return createAsyncConnection(conf, user, null); + } + + /** + * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}. + * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and + * interfaces created from returned connection share zookeeper connection, meta cache, and + * connections to region servers and masters. + *

+ * The caller is responsible for calling {@link AsyncConnection#close()} on the returned + * connection instance. + *

+ * Usually you should only create one AsyncConnection instance in your code and use it everywhere + * as it is thread safe. + * @param conf configuration + * @param user the user the asynchronous connection is for + * @param connectionAttributes attributes to be sent along to server during connection establish + * @return AsyncConnection object wrapped by CompletableFuture + */ + public static CompletableFuture createAsyncConnection(Configuration conf, + final User user, Map connectionAttributes) { return TraceUtil.tracedFuture(() -> { CompletableFuture future = new CompletableFuture<>(); ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf); @@ -300,7 +355,7 @@ public static CompletableFuture createAsyncConnection(Configura try { future.complete( user.runAs((PrivilegedExceptionAction) () -> ReflectionUtils - .newInstance(clazz, conf, registry, clusterId, null, user))); + .newInstance(clazz, conf, registry, clusterId, null, user, connectionAttributes))); } catch (Exception e) { registry.close(); future.completeExceptionally(e); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java index 7a7b38a4df6a..51368fc23c15 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java @@ -189,12 +189,13 @@ public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { public Table build() { IOExceptionSupplier poolSupplier = pool != null ? () -> pool : ConnectionOverAsyncConnection.this::getBatchPool; - return new TableOverAsyncTable(conn, + AsyncTableBuilder tableBuilder = conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, TimeUnit.MILLISECONDS) .setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS) .setWriteRpcTimeout(writeRpcTimeout, TimeUnit.MILLISECONDS) - .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS).build(), - poolSupplier); + .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS); + requestAttributes.forEach(tableBuilder::setRequestAttribute); + return new TableOverAsyncTable(conn, tableBuilder.build(), poolSupplier); } }; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index ff75c0725ce5..342cf89acf1a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -119,6 +120,8 @@ class RawAsyncTableImpl implements AsyncTable { private final int startLogErrorsCnt; + private final Map requestAttributes; + RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase builder) { this.conn = conn; this.retryTimer = retryTimer; @@ -145,6 +148,7 @@ class RawAsyncTableImpl implements AsyncTable { ? conn.connConf.getMetaScannerCaching() : conn.connConf.getScannerCaching(); this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); + this.requestAttributes = builder.requestAttributes; } @Override @@ -210,7 +214,8 @@ private SingleRequestCallerBuilder newCaller(byte[] row, int priority, lo .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .pause(pauseNs, TimeUnit.NANOSECONDS) .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); + .maxAttempts(maxAttempts).setRequestAttributes(requestAttributes) + .startLogErrorsCnt(startLogErrorsCnt).setRequestAttributes(requestAttributes); } private SingleRequestCallerBuilder @@ -608,7 +613,7 @@ private Scan setDefaultScanConfig(Scan scan) { public void scan(Scan scan, AdvancedScanResultConsumer consumer) { new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer, pauseNs, pauseNsForServerOverloaded, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, - startLogErrorsCnt).start(); + startLogErrorsCnt, requestAttributes).start(); } private long resultSize2CacheSize(long maxResultSize) { @@ -704,7 +709,8 @@ private List> batch(List actions, long r .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call(); + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) + .setRequestAttributes(requestAttributes).call(); } @Override @@ -732,6 +738,11 @@ public long getScanTimeout(TimeUnit unit) { return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS); } + @Override + public Map getRequestAttributes() { + return requestAttributes; + } + private CompletableFuture coprocessorService(Function stubMaker, ServiceCaller callable, RegionInfo region, byte[] row) { RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index 7feefc831ca0..3941c0d18540 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -751,4 +751,12 @@ default long getWriteRpcTimeout(TimeUnit unit) { default long getOperationTimeout(TimeUnit unit) { throw new NotImplementedException("Add an implementation!"); } + + /** + * Get the attributes to be submitted with requests + * @return map of request attributes + */ + default Map getRequestAttributes() { + throw new NotImplementedException("Add an implementation!"); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java index 75e16e89a5de..eee985555b34 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java @@ -55,6 +55,11 @@ public interface TableBuilder { */ TableBuilder setWriteRpcTimeout(int timeout); + /** + * Set a request attribute + */ + TableBuilder setRequestAttribute(String key, byte[] value); + /** * Create the {@link Table} instance. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java index c74340259f3f..dc3111b0c79d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.client; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -36,6 +39,8 @@ abstract class TableBuilderBase implements TableBuilder { protected int writeRpcTimeout; + protected Map requestAttributes = Collections.emptyMap(); + TableBuilderBase(TableName tableName, ConnectionConfiguration connConf) { if (tableName == null) { throw new IllegalArgumentException("Given table name is null"); @@ -73,4 +78,13 @@ public TableBuilderBase setWriteRpcTimeout(int timeout) { this.writeRpcTimeout = timeout; return this; } + + @Override + public TableBuilderBase setRequestAttribute(String key, byte[] value) { + if (this.requestAttributes.isEmpty()) { + this.requestAttributes = new HashMap<>(); + } + this.requestAttributes.put(key, value); + return this; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java index e1565f18159a..0a7dabd476ce 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java @@ -560,6 +560,11 @@ public long getOperationTimeout(TimeUnit unit) { return table.getOperationTimeout(unit); } + @Override + public Map getRequestAttributes() { + return table.getRequestAttributes(); + } + @Override public RegionLocator getRegionLocator() throws IOException { return conn.toConnection().getRegionLocator(getName()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 23d14c272d2b..5e42558671b7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.net.SocketAddress; import java.util.Collection; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -106,6 +107,7 @@ public abstract class AbstractRpcClient implements RpcC private boolean running = true; // if client runs protected final Configuration conf; + protected final Map connectionAttributes; protected final String clusterId; protected final SocketAddress localAddr; protected final MetricsConnection metrics; @@ -154,7 +156,7 @@ public AtomicInteger load(Address key) throws Exception { * @param metrics the connection metrics */ public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, - MetricsConnection metrics) { + MetricsConnection metrics, Map connectionAttributes) { this.userProvider = UserProvider.instantiate(conf); this.localAddr = localAddr; this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); @@ -167,6 +169,7 @@ public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress loc this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes this.conf = conf; + this.connectionAttributes = connectionAttributes; this.codec = getCodec(); this.compressor = getCompressor(conf); this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, @@ -416,23 +419,24 @@ private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcCon } final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); - Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, - hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback() { - @Override - public void run(Call call) { - try (Scope scope = call.span.makeCurrent()) { - counter.decrementAndGet(); - onCallFinished(call, hrc, addr, callback); - } finally { - if (hrc.failed()) { - TraceUtil.setError(span, hrc.getFailed()); - } else { - span.setStatus(StatusCode.OK); + Call call = + new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, hrc.getCallTimeout(), + hrc.getPriority(), hrc.getRequestAttributes(), new RpcCallback() { + @Override + public void run(Call call) { + try (Scope scope = call.span.makeCurrent()) { + counter.decrementAndGet(); + onCallFinished(call, hrc, addr, callback); + } finally { + if (hrc.failed()) { + TraceUtil.setError(span, hrc.getFailed()); + } else { + span.setStatus(StatusCode.OK); + } + span.end(); } - span.end(); } - } - }, cs); + }, cs); ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr); int count = counter.incrementAndGet(); try { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java index 7fffdad935fc..3da00c5395d3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.net.SocketAddress; +import java.util.Collections; +import java.util.Map; import javax.net.SocketFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; @@ -41,7 +43,7 @@ public class BlockingRpcClient extends AbstractRpcClient * SocketFactory */ BlockingRpcClient(Configuration conf) { - this(conf, HConstants.CLUSTER_ID_DEFAULT, null, null); + this(conf, HConstants.CLUSTER_ID_DEFAULT, null, null, Collections.emptyMap()); } /** @@ -53,8 +55,8 @@ public class BlockingRpcClient extends AbstractRpcClient * @param metrics the connection metrics */ public BlockingRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, - MetricsConnection metrics) { - super(conf, clusterId, localAddr, metrics); + MetricsConnection metrics, Map connectionAttributes) { + super(conf, clusterId, localAddr, metrics, connectionAttributes); this.socketFactory = NetUtils.getDefaultSocketFactory(conf); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index d63d14940e78..81ad4d2f056d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -219,7 +219,7 @@ public void cleanup(IOException e) { BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException { super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor, - rpcClient.metrics); + rpcClient.metrics, rpcClient.connectionAttributes); this.rpcClient = rpcClient; this.connectionHeaderPreamble = getConnectionHeaderPreamble(); ConnectionHeader header = getConnectionHeader(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java index 3c0e24e57145..669fc73a3bfa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java @@ -19,6 +19,7 @@ import io.opentelemetry.api.trace.Span; import java.io.IOException; +import java.util.Map; import java.util.Optional; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -56,14 +57,15 @@ class Call { final Descriptors.MethodDescriptor md; final int timeout; // timeout in millisecond for this call; 0 means infinite. final int priority; + final Map attributes; final MetricsConnection.CallStats callStats; private final RpcCallback callback; final Span span; Timeout timeoutTask; Call(int id, final Descriptors.MethodDescriptor md, Message param, final CellScanner cells, - final Message responseDefaultType, int timeout, int priority, RpcCallback callback, - MetricsConnection.CallStats callStats) { + final Message responseDefaultType, int timeout, int priority, Map attributes, + RpcCallback callback, MetricsConnection.CallStats callStats) { this.param = param; this.md = md; this.cells = cells; @@ -73,6 +75,7 @@ class Call { this.id = id; this.timeout = timeout; this.priority = priority; + this.attributes = attributes; this.callback = callback; this.span = Span.current(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java index 9bee88d599f7..c752f4c18355 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -112,6 +113,16 @@ public boolean hasCallTimeout() { return delegate.hasCallTimeout(); } + @Override + public Map getRequestAttributes() { + return delegate.getRequestAttributes(); + } + + @Override + public void setRequestAttributes(Map requestAttributes) { + delegate.setRequestAttributes(requestAttributes); + } + @Override public void setFailed(IOException e) { delegate.setFailed(e); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java index c60de7658f3d..cd303a5eda77 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -71,6 +72,16 @@ public interface HBaseRpcController extends RpcController, CellScannable { boolean hasCallTimeout(); + /** + * Get the map of request attributes + */ + Map getRequestAttributes(); + + /** + * Set the map of request attributes + */ + void setRequestAttributes(Map requestAttributes); + /** * Set failed with an exception to pass on. For use in async rpc clients * @param e exception to set with diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java index 99ed5c4d48b6..425c5e77afcd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java @@ -19,7 +19,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; @@ -70,6 +72,8 @@ public class HBaseRpcControllerImpl implements HBaseRpcController { */ private CellScanner cellScanner; + private Map requestAttributes = Collections.emptyMap(); + public HBaseRpcControllerImpl() { this(null, (CellScanner) null); } @@ -166,6 +170,16 @@ public boolean hasCallTimeout() { return callTimeout != null; } + @Override + public Map getRequestAttributes() { + return requestAttributes; + } + + @Override + public void setRequestAttributes(Map requestAttributes) { + this.requestAttributes = requestAttributes; + } + @Override public synchronized String errorText() { if (!done || exception == null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index b509dcbd27b7..d6df6c974ccf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -25,6 +25,7 @@ import java.net.ConnectException; import java.net.SocketTimeoutException; import java.nio.channels.ClosedChannelException; +import java.util.Map; import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -44,10 +45,12 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; @@ -126,6 +129,14 @@ static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) if (call.priority != HConstants.PRIORITY_UNSET) { builder.setPriority(call.priority); } + if (call.attributes != null && !call.attributes.isEmpty()) { + HBaseProtos.NameBytesPair.Builder attributeBuilder = HBaseProtos.NameBytesPair.newBuilder(); + for (Map.Entry attribute : call.attributes.entrySet()) { + attributeBuilder.setName(attribute.getKey()); + attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue())); + builder.addAttribute(attributeBuilder.build()); + } + } builder.setTimeout(call.timeout); return builder.build(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java index 231caa40a89e..ed0c4fffc724 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.net.SocketAddress; +import java.util.Collections; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -55,7 +57,12 @@ public class NettyRpcClient extends AbstractRpcClient { public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, MetricsConnection metrics) { - super(configuration, clusterId, localAddress, metrics); + this(configuration, clusterId, localAddress, metrics, Collections.emptyMap()); + } + + public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, + MetricsConnection metrics, Map connectionAttributes) { + super(configuration, clusterId, localAddress, metrics, connectionAttributes); Pair> groupAndChannelClass = NettyRpcClientConfigHelper.getEventLoopConfig(conf); if (groupAndChannelClass == null) { @@ -75,7 +82,7 @@ public NettyRpcClient(Configuration configuration, String clusterId, SocketAddre /** Used in test only. */ public NettyRpcClient(Configuration configuration) { - this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null); + this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, Collections.emptyMap()); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index 48104038c217..3f9a58d51263 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -104,7 +104,7 @@ class NettyRpcConnection extends RpcConnection { NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException { super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor, - rpcClient.metrics); + rpcClient.metrics, rpcClient.connectionAttributes); this.rpcClient = rpcClient; this.eventLoop = rpcClient.group.next(); byte[] connectionHeaderPreamble = getConnectionHeaderPreamble(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java index 9b69b5234050..f1df572675c7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.ipc; import java.net.SocketAddress; +import java.util.Collections; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -59,7 +61,7 @@ public static RpcClient createClient(Configuration conf, String clusterId) { */ public static RpcClient createClient(Configuration conf, String clusterId, MetricsConnection metrics) { - return createClient(conf, clusterId, null, metrics); + return createClient(conf, clusterId, null, metrics, Collections.emptyMap()); } private static String getRpcClientClass(Configuration conf) { @@ -81,10 +83,11 @@ private static String getRpcClientClass(Configuration conf) { * @return newly created RpcClient */ public static RpcClient createClient(Configuration conf, String clusterId, - SocketAddress localAddr, MetricsConnection metrics) { + SocketAddress localAddr, MetricsConnection metrics, Map connectionAttributes) { String rpcClientClass = getRpcClientClass(conf); - return ReflectionUtils.instantiateWithCustomCtor(rpcClientClass, new Class[] { - Configuration.class, String.class, SocketAddress.class, MetricsConnection.class }, - new Object[] { conf, clusterId, localAddr, metrics }); + return ReflectionUtils.instantiateWithCustomCtor( + rpcClientClass, new Class[] { Configuration.class, String.class, SocketAddress.class, + MetricsConnection.class, Map.class }, + new Object[] { conf, clusterId, localAddr, metrics, connectionAttributes }); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java index 912fa4fb0654..31698a1a1e8e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; @@ -39,11 +40,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hbase.thirdparty.io.netty.util.Timeout; import org.apache.hbase.thirdparty.io.netty.util.TimerTask; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; @@ -70,6 +73,7 @@ abstract class RpcConnection { protected final CompressionCodec compressor; protected final MetricsConnection metrics; + private final Map connectionAttributes; protected final HashedWheelTimer timeoutTimer; @@ -86,12 +90,13 @@ abstract class RpcConnection { protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, ConnectionId remoteId, String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec compressor, - MetricsConnection metrics) throws IOException { + MetricsConnection metrics, Map connectionAttributes) throws IOException { this.timeoutTimer = timeoutTimer; this.codec = codec; this.compressor = compressor; this.conf = conf; this.metrics = metrics; + this.connectionAttributes = connectionAttributes; User ticket = remoteId.getTicket(); this.securityInfo = SecurityInfo.getInfo(remoteId.getServiceName()); this.useSasl = isSecurityEnabled; @@ -169,6 +174,14 @@ protected final ConnectionHeader getConnectionHeader() { if (this.compressor != null) { builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName()); } + if (connectionAttributes != null && !connectionAttributes.isEmpty()) { + HBaseProtos.NameBytesPair.Builder attributeBuilder = HBaseProtos.NameBytesPair.newBuilder(); + for (Map.Entry attribute : connectionAttributes.entrySet()) { + attributeBuilder.setName(attribute.getKey()); + attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue())); + builder.addAttribute(attributeBuilder.build()); + } + } builder.setVersionInfo(ProtobufUtil.getVersionInfo()); boolean isCryptoAESEnable = conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT); // if Crypto AES enable, setup Cipher transformation diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java index 6c97c19f96cc..54b351f00a3b 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.net.SocketAddress; import java.util.Collections; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -95,7 +96,7 @@ public class TestRpcBasedRegistryHedgedReads { public static final class RpcClientImpl implements RpcClient { public RpcClientImpl(Configuration configuration, String clusterId, SocketAddress localAddress, - MetricsConnection metrics) { + MetricsConnection metrics, Map attributes) { } @Override diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestTLSHandshadeFailure.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestTLSHandshadeFailure.java index 7375388e4a04..10948358ff92 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestTLSHandshadeFailure.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestTLSHandshadeFailure.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; +import java.util.Collections; import java.util.Random; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; @@ -148,7 +149,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable { Address.fromParts("127.0.0.1", server.getLocalPort())); NettyRpcConnection conn = client.createConnection(id); BlockingRpcCallback done = new BlockingRpcCallback<>(); - Call call = new Call(1, null, null, null, null, 0, 0, done, new CallStats()); + Call call = + new Call(1, null, null, null, null, 0, 0, Collections.emptyMap(), done, new CallStats()); HBaseRpcController hrc = new HBaseRpcControllerImpl(); conn.sendRequest(call, hrc); done.get(); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 51e9e1e7755f..fc7f66129d35 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -1667,9 +1667,10 @@ private static class ConfigurationCaptorConnection implements Connection { private final Connection delegate; - public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user) - throws IOException { - delegate = FutureUtils.get(createAsyncConnection(conf, user)).toConnection(); + public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user, + Map connectionAttributes) throws IOException { + delegate = + FutureUtils.get(createAsyncConnection(conf, user, connectionAttributes)).toConnection(); final String uuid = conf.get(UUID_KEY); if (uuid != null) { diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java index 7d099aa44e24..0c879bd5ace3 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java @@ -123,7 +123,8 @@ public static class MRSplitsConnection implements Connection { private final Configuration configuration; static final AtomicInteger creations = new AtomicInteger(0); - MRSplitsConnection(Configuration conf, ExecutorService pool, User user) throws IOException { + MRSplitsConnection(Configuration conf, ExecutorService pool, User user, + Map connectionAttributes) throws IOException { this.configuration = conf; creations.incrementAndGet(); } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java index 13e3831f6df6..f41282b8f4f8 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java @@ -212,8 +212,8 @@ private static class ConnectionForMergeTesting implements Connection { SIZE_MAP.put(Bytes.toBytes("p"), 200L * 1024L * 1024L); } - ConnectionForMergeTesting(Configuration conf, ExecutorService pool, User user) - throws IOException { + ConnectionForMergeTesting(Configuration conf, ExecutorService pool, User user, + Map connectionAttributes) throws IOException { } @Override diff --git a/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto b/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto index 6426f0cb06cb..e992e681fbff 100644 --- a/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto +++ b/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto @@ -92,6 +92,7 @@ message ConnectionHeader { optional VersionInfo version_info = 5; // the transformation for rpc AES encryption with Apache Commons Crypto optional string rpc_crypto_cipher_transformation = 6; + repeated NameBytesPair attribute = 7; } // This is sent by rpc server to negotiate the data if necessary @@ -148,6 +149,7 @@ message RequestHeader { // See HConstants. optional uint32 priority = 6; optional uint32 timeout = 7; + repeated NameBytesPair attribute = 8; } message ResponseHeader { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java index 1dda6c32ca04..e2c11ab1d5e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import java.net.SocketAddress; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -59,7 +60,7 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu public AsyncClusterConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, SocketAddress localAddress, User user) { - super(conf, registry, clusterId, localAddress, user); + super(conf, registry, clusterId, localAddress, user, Collections.emptyMap()); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java index 02718145c9b7..e2b45fe30c3c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; +import java.util.Collections; import java.util.List; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -54,7 +55,8 @@ public AsyncRegionReplicationRetryingCaller(HashedWheelTimer retryTimer, RegionInfo replica, List entries) { super(retryTimer, conn, ConnectionUtils.getPriority(replica.getTable()), conn.connConf.getPauseNs(), conn.connConf.getPauseNsForServerOverloaded(), maxAttempts, - operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt()); + operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt(), + Collections.emptyMap()); this.replica = replica; this.entries = entries.toArray(new Entry[0]); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java index 197ddb71d7e6..cc97a39c7ee4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java @@ -27,6 +27,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; /** @@ -82,6 +83,8 @@ public interface RpcCall extends RpcCallContext { /** Returns The request header of this call. */ RequestHeader getHeader(); + ConnectionHeader getConnectionHeader(); + /** Returns Port of remote address in this call */ int getRemotePort(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index 2188795914db..f3568a36f144 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; @@ -207,6 +208,11 @@ public RequestHeader getHeader() { return this.header; } + @Override + public RPCProtos.ConnectionHeader getConnectionHeader() { + return this.connection.connectionHeader; + } + @Override public int getPriority() { return this.header.getPriority(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java index a87babad0d27..45e59def7216 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -77,6 +78,11 @@ public long getScanTimeout(TimeUnit unit) { return 0; } + @Override + public Map getRequestAttributes() { + return null; + } + @Override public CompletableFuture get(Get get) { return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java index 65def75fff1b..d358695c5f9b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java @@ -22,6 +22,7 @@ import java.net.SocketAddress; import java.net.SocketTimeoutException; +import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; @@ -129,8 +130,8 @@ public void testAdminTimeout() throws Exception { */ public static class RandomTimeoutRpcClient extends BlockingRpcClient { public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, - MetricsConnection metrics) { - super(conf, clusterId, localAddr, metrics); + MetricsConnection metrics, Map connectionAttributes) { + super(conf, clusterId, localAddr, metrics, connectionAttributes); } // Return my own instance, one that does random timeouts diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java new file mode 100644 index 000000000000..b376bfc18557 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.AuthUtil; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; + +@Category({ ClientTests.class, MediumTests.class }) +public class TestRequestAndConnectionAttributes { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRequestAndConnectionAttributes.class); + + private static final Map CONNECTION_ATTRIBUTES = new HashMap<>(); + static { + CONNECTION_ATTRIBUTES.put("clientId", Bytes.toBytes("foo")); + } + private static final Map REQUEST_ATTRIBUTES = new HashMap<>(); + private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(100); + private static final AtomicBoolean REQUEST_ATTRIBUTES_VALIDATED = new AtomicBoolean(false); + private static final byte[] REQUEST_ATTRIBUTES_TEST_TABLE_CF = Bytes.toBytes("0"); + private static final TableName REQUEST_ATTRIBUTES_TEST_TABLE = + TableName.valueOf("testRequestAttributes"); + + private static HBaseTestingUtil TEST_UTIL = null; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL = new HBaseTestingUtil(); + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.createTable(REQUEST_ATTRIBUTES_TEST_TABLE, + new byte[][] { REQUEST_ATTRIBUTES_TEST_TABLE_CF }, 1, HConstants.DEFAULT_BLOCKSIZE, + AttributesCoprocessor.class.getName()); + } + + @AfterClass + public static void afterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setup() { + REQUEST_ATTRIBUTES_VALIDATED.getAndSet(false); + } + + @Test + public void testConnectionAttributes() throws IOException { + TableName tableName = TableName.valueOf("testConnectionAttributes"); + TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1, + HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName()); + + Configuration conf = TEST_UTIL.getConfiguration(); + try (Connection conn = ConnectionFactory.createConnection(conf, null, + AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); Table table = conn.getTable(tableName)) { + Result result = table.get(new Get(Bytes.toBytes(0))); + assertEquals(CONNECTION_ATTRIBUTES.size(), result.size()); + for (Map.Entry attr : CONNECTION_ATTRIBUTES.entrySet()) { + byte[] val = result.getValue(Bytes.toBytes("c"), Bytes.toBytes(attr.getKey())); + assertEquals(Bytes.toStringBinary(attr.getValue()), Bytes.toStringBinary(val)); + } + } + } + + @Test + public void testRequestAttributesGet() throws IOException { + addRandomRequestAttributes(); + + Configuration conf = TEST_UTIL.getConfiguration(); + try ( + Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), + CONNECTION_ATTRIBUTES); + Table table = configureRequestAttributes( + conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { + + table.get(new Get(Bytes.toBytes(0))); + } + + assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); + } + + @Test + public void testRequestAttributesMultiGet() throws IOException { + assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); + addRandomRequestAttributes(); + + Configuration conf = TEST_UTIL.getConfiguration(); + try ( + Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), + CONNECTION_ATTRIBUTES); + Table table = configureRequestAttributes( + conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { + List gets = ImmutableList.of(new Get(Bytes.toBytes(0)), new Get(Bytes.toBytes(1))); + table.get(gets); + } + + assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); + } + + @Test + public void testRequestAttributesExists() throws IOException { + assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); + addRandomRequestAttributes(); + + Configuration conf = TEST_UTIL.getConfiguration(); + try ( + Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), + CONNECTION_ATTRIBUTES); + Table table = configureRequestAttributes( + conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { + + table.exists(new Get(Bytes.toBytes(0))); + } + + assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); + } + + @Test + public void testRequestAttributesScan() throws IOException { + assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); + addRandomRequestAttributes(); + + Configuration conf = TEST_UTIL.getConfiguration(); + try ( + Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), + CONNECTION_ATTRIBUTES); + Table table = configureRequestAttributes( + conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { + ResultScanner scanner = table.getScanner(new Scan()); + scanner.next(); + } + assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); + } + + @Test + public void testRequestAttributesPut() throws IOException { + assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); + addRandomRequestAttributes(); + + Configuration conf = TEST_UTIL.getConfiguration(); + try ( + Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), + CONNECTION_ATTRIBUTES); + Table table = configureRequestAttributes( + conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { + Put put = new Put(Bytes.toBytes("a")); + put.addColumn(REQUEST_ATTRIBUTES_TEST_TABLE_CF, Bytes.toBytes("c"), Bytes.toBytes("v")); + table.put(put); + } + assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); + } + + @Test + public void testRequestAttributesMultiPut() throws IOException { + assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); + addRandomRequestAttributes(); + + Configuration conf = TEST_UTIL.getConfiguration(); + try ( + Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), + CONNECTION_ATTRIBUTES); + Table table = configureRequestAttributes( + conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { + Put put = new Put(Bytes.toBytes("a")); + put.addColumn(REQUEST_ATTRIBUTES_TEST_TABLE_CF, Bytes.toBytes("c"), Bytes.toBytes("v")); + table.put(put); + } + assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); + } + + @Test + public void testNoRequestAttributes() throws IOException { + assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); + TableName tableName = TableName.valueOf("testNoRequestAttributesScan"); + TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1, + HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName()); + + REQUEST_ATTRIBUTES.clear(); + Configuration conf = TEST_UTIL.getConfiguration(); + try (Connection conn = ConnectionFactory.createConnection(conf, null, + AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES)) { + TableBuilder tableBuilder = conn.getTableBuilder(tableName, null); + try (Table table = tableBuilder.build()) { + table.get(new Get(Bytes.toBytes(0))); + assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); + } + } + } + + private void addRandomRequestAttributes() { + REQUEST_ATTRIBUTES.clear(); + int j = Math.max(2, (int) (10 * Math.random())); + for (int i = 0; i < j; i++) { + REQUEST_ATTRIBUTES.put(String.valueOf(i), Bytes.toBytes(UUID.randomUUID().toString())); + } + } + + private static TableBuilder configureRequestAttributes(TableBuilder tableBuilder) { + REQUEST_ATTRIBUTES.forEach(tableBuilder::setRequestAttribute); + return tableBuilder; + } + + public static class AttributesCoprocessor implements RegionObserver, RegionCoprocessor { + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void preGetOp(ObserverContext c, Get get, + List result) throws IOException { + validateRequestAttributes(); + + // for connection attrs test + RpcCall rpcCall = RpcServer.getCurrentCall().get(); + for (HBaseProtos.NameBytesPair attr : rpcCall.getHeader().getAttributeList()) { + result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow()) + .setFamily(Bytes.toBytes("r")).setQualifier(Bytes.toBytes(attr.getName())) + .setValue(attr.getValue().toByteArray()).setType(Cell.Type.Put).setTimestamp(1).build()); + } + for (HBaseProtos.NameBytesPair attr : rpcCall.getConnectionHeader().getAttributeList()) { + result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow()) + .setFamily(Bytes.toBytes("c")).setQualifier(Bytes.toBytes(attr.getName())) + .setValue(attr.getValue().toByteArray()).setType(Cell.Type.Put).setTimestamp(1).build()); + } + result.sort(CellComparator.getInstance()); + c.bypass(); + } + + @Override + public boolean preScannerNext(ObserverContext c, + InternalScanner s, List result, int limit, boolean hasNext) throws IOException { + validateRequestAttributes(); + return hasNext; + } + + @Override + public void prePut(ObserverContext c, Put put, WALEdit edit) + throws IOException { + validateRequestAttributes(); + } + + private void validateRequestAttributes() { + RpcCall rpcCall = RpcServer.getCurrentCall().get(); + List attrs = rpcCall.getHeader().getAttributeList(); + if (attrs.size() != REQUEST_ATTRIBUTES.size()) { + return; + } + for (HBaseProtos.NameBytesPair attr : attrs) { + if (!REQUEST_ATTRIBUTES.containsKey(attr.getName())) { + return; + } + if (!Arrays.equals(REQUEST_ATTRIBUTES.get(attr.getName()), attr.getValue().toByteArray())) { + return; + } + } + REQUEST_ATTRIBUTES_VALIDATED.getAndSet(true); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java index f36fef186f08..feaf44e0b84e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.net.Socket; import java.net.SocketAddress; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.conf.Configuration; @@ -71,8 +72,8 @@ public MyRpcClientImpl(Configuration conf) { } public MyRpcClientImpl(Configuration conf, String clusterId, SocketAddress address, - MetricsConnection metrics) { - super(conf, clusterId, address, metrics); + MetricsConnection metrics, Map connectionAttributes) { + super(conf, clusterId, address, metrics, connectionAttributes); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java index e14b710647d1..80b3845d6688 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java @@ -124,7 +124,7 @@ public void test() throws IOException, InterruptedException { int callId = 10; Call call = new Call(callId, TestProtobufRpcProto.getDescriptor().findMethodByName("ping"), EmptyRequestProto.getDefaultInstance(), null, EmptyResponseProto.getDefaultInstance(), 1000, - HConstants.NORMAL_QOS, null, MetricsConnection.newCallStats()); + HConstants.NORMAL_QOS, null, null, MetricsConnection.newCallStats()); RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, null); dos.writeInt(IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param)); requestHeader.writeDelimitedTo(dos); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java index 909e7fdb7f3d..7a3ca0b7cf9f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java @@ -632,6 +632,7 @@ private static RpcCall getRpcCall(String userName, int forcedParamIndex) { return getRpcCall(userName, Optional.of(forcedParamIndex)); } + @SuppressWarnings("checkstyle:methodlength") private static RpcCall getRpcCall(String userName, Optional forcedParamIndex) { RpcCall rpcCall = new RpcCall() { @Override @@ -666,7 +667,6 @@ public long getStartTime() { @Override public void setStartTime(long startTime) { - } @Override @@ -694,6 +694,11 @@ public RPCProtos.RequestHeader getHeader() { return null; } + @Override + public RPCProtos.ConnectionHeader getConnectionHeader() { + return null; + } + @Override public int getRemotePort() { return 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java index d26870b77dfd..dd49d00ac3a1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java @@ -158,6 +158,7 @@ public void testInsertWithRpcCall() throws Exception { RpcServer.setCurrentCall(null); } + @SuppressWarnings("checkstyle:methodlength") private RpcCall newRpcCallWithDeadline() { return new RpcCall() { @Override @@ -220,6 +221,11 @@ public RPCProtos.RequestHeader getHeader() { return null; } + @Override + public RPCProtos.ConnectionHeader getConnectionHeader() { + return null; + } + @Override public int getRemotePort() { return 0; diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java index 250b8a74f030..db1b1e1c9870 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java @@ -89,8 +89,8 @@ public class ThriftConnection implements Connection { private int operationTimeout; private int connectTimeout; - public ThriftConnection(Configuration conf, ExecutorService pool, final User user) - throws IOException { + public ThriftConnection(Configuration conf, ExecutorService pool, final User user, + Map connectionAttributes) throws IOException { this.conf = conf; this.user = user; this.host = conf.get(Constants.HBASE_THRIFT_SERVER_NAME); @@ -322,6 +322,11 @@ public TableBuilder setWriteRpcTimeout(int timeout) { return this; } + @Override + public TableBuilder setRequestAttribute(String key, byte[] value) { + return this; + } + @Override public Table build() { try {