From 324e61c38a914551d97127a16f11ed8ee4d6725a Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Tue, 19 Sep 2023 10:16:33 -0400 Subject: [PATCH] HBASE-27981 Add connection and request attributes to slow log (#5335) Signed-off-by: Bryan Beaudreault --- .../hadoop/hbase/client/OnlineLogRecord.java | 55 +++++++++++++-- .../hbase/shaded/protobuf/ProtobufUtil.java | 30 +++++++- .../hbase/client/TestOnlineLogRecord.java | 56 ++++++++++++++- .../src/main/protobuf/TooSlowLog.proto | 4 ++ .../hbase/namequeues/RpcLogDetails.java | 20 +++++- .../namequeues/impl/SlowLogQueueService.java | 18 ++++- .../regionserver/rsOperationDetails.jsp | 9 +++ .../namequeues/TestNamedQueueRecorder.java | 70 ++++++++++++++++++- 8 files changed, 250 insertions(+), 12 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java index 65e2f58f4529..d9fd51e80a95 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import java.util.Map; import java.util.Optional; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; @@ -29,6 +30,8 @@ import org.apache.hbase.thirdparty.com.google.gson.JsonObject; import org.apache.hbase.thirdparty.com.google.gson.JsonSerializer; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; + /** * Slow/Large Log payload for hbase-client, to be used by Admin API get_slow_responses and * get_large_responses @@ -53,6 +56,18 @@ final public class OnlineLogRecord extends LogEntry { if (slowLogPayload.getMultiServiceCalls() == 0) { jsonObj.remove("multiServiceCalls"); } + if (slowLogPayload.getRequestAttributes().isEmpty()) { + jsonObj.remove("requestAttributes"); + } else { + jsonObj.add("requestAttributes", gson + .toJsonTree(ProtobufUtil.deserializeAttributes(slowLogPayload.getRequestAttributes()))); + } + if (slowLogPayload.getConnectionAttributes().isEmpty()) { + jsonObj.remove("connectionAttributes"); + } else { + jsonObj.add("connectionAttributes", gson.toJsonTree( + ProtobufUtil.deserializeAttributes(slowLogPayload.getConnectionAttributes()))); + } if (slowLogPayload.getScan().isPresent()) { jsonObj.add("scan", gson.toJsonTree(slowLogPayload.getScan().get().toMap())); } else { @@ -79,6 +94,8 @@ final public class OnlineLogRecord extends LogEntry { private final int multiMutationsCount; private final int multiServiceCalls; private final Optional scan; + private final Map requestAttributes; + private final Map connectionAttributes; public long getStartTime() { return startTime; @@ -152,11 +169,20 @@ public Optional getScan() { return scan; } + public Map getRequestAttributes() { + return requestAttributes; + } + + public Map getConnectionAttributes() { + return connectionAttributes; + } + OnlineLogRecord(final long startTime, final int processingTime, final int queueTime, final long responseSize, final long blockBytesScanned, final String clientAddress, final String serverClass, final String methodName, final String callDetails, final String param, final String regionName, final String userName, final int multiGetsCount, - final int multiMutationsCount, final int multiServiceCalls, final Scan scan) { + final int multiMutationsCount, final int multiServiceCalls, final Scan scan, + final Map requestAttributes, final Map connectionAttributes) { this.startTime = startTime; this.processingTime = processingTime; this.queueTime = queueTime; @@ -173,6 +199,8 @@ public Optional getScan() { this.multiMutationsCount = multiMutationsCount; this.multiServiceCalls = multiServiceCalls; this.scan = Optional.ofNullable(scan); + this.requestAttributes = requestAttributes; + this.connectionAttributes = connectionAttributes; } public static class OnlineLogRecordBuilder { @@ -192,6 +220,8 @@ public static class OnlineLogRecordBuilder { private int multiMutationsCount; private int multiServiceCalls; private Scan scan = null; + private Map requestAttributes; + private Map connectionAttributes; public OnlineLogRecordBuilder setStartTime(long startTime) { this.startTime = startTime; @@ -276,10 +306,22 @@ public OnlineLogRecordBuilder setScan(Scan scan) { return this; } + public OnlineLogRecordBuilder setRequestAttributes(Map requestAttributes) { + this.requestAttributes = requestAttributes; + return this; + } + + public OnlineLogRecordBuilder + setConnectionAttributes(Map connectionAttributes) { + this.connectionAttributes = connectionAttributes; + return this; + } + public OnlineLogRecord build() { return new OnlineLogRecord(startTime, processingTime, queueTime, responseSize, blockBytesScanned, clientAddress, serverClass, methodName, callDetails, param, regionName, - userName, multiGetsCount, multiMutationsCount, multiServiceCalls, scan); + userName, multiGetsCount, multiMutationsCount, multiServiceCalls, scan, requestAttributes, + connectionAttributes); } } @@ -304,7 +346,8 @@ public boolean equals(Object o) { .append(serverClass, that.serverClass).append(methodName, that.methodName) .append(callDetails, that.callDetails).append(param, that.param) .append(regionName, that.regionName).append(userName, that.userName).append(scan, that.scan) - .isEquals(); + .append(requestAttributes, that.requestAttributes) + .append(connectionAttributes, that.connectionAttributes).isEquals(); } @Override @@ -313,7 +356,7 @@ public int hashCode() { .append(responseSize).append(blockBytesScanned).append(clientAddress).append(serverClass) .append(methodName).append(callDetails).append(param).append(regionName).append(userName) .append(multiGetsCount).append(multiMutationsCount).append(multiServiceCalls).append(scan) - .toHashCode(); + .append(requestAttributes).append(connectionAttributes).toHashCode(); } @Override @@ -330,7 +373,9 @@ public String toString() { .append("methodName", methodName).append("callDetails", callDetails).append("param", param) .append("regionName", regionName).append("userName", userName) .append("multiGetsCount", multiGetsCount).append("multiMutationsCount", multiMutationsCount) - .append("multiServiceCalls", multiServiceCalls).append("scan", scan).toString(); + .append("multiServiceCalls", multiServiceCalls).append("scan", scan) + .append("requestAttributes", requestAttributes) + .append("connectionAttributes", connectionAttributes).toString(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 1f5b30240f1a..a3b55e5ef16b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2163,6 +2163,25 @@ public static SlowLogParams getSlowLogParams(Message message, boolean slowLogSca return new SlowLogParams(params); } + /** + * Convert a list of NameBytesPair to a more readable CSV + */ + public static String convertAttributesToCsv(List attributes) { + if (attributes.isEmpty()) { + return HConstants.EMPTY_STRING; + } + return deserializeAttributes(convertNameBytesPairsToMap(attributes)).entrySet().stream() + .map(entry -> entry.getKey() + " = " + entry.getValue()).collect(Collectors.joining(", ")); + } + + /** + * Convert a map of byte array attributes to a more readable map of binary string representations + */ + public static Map deserializeAttributes(Map attributes) { + return attributes.entrySet().stream().collect( + Collectors.toMap(Map.Entry::getKey, entry -> Bytes.toStringBinary(entry.getValue()))); + } + /** * Print out some subset of a MutationProto rather than all of it and its data * @param proto Protobuf to print out @@ -3348,7 +3367,10 @@ private static LogEntry getSlowLogRecord(final TooSlowLog.SlowLogPayload slowLog .setResponseSize(slowLogPayload.getResponseSize()) .setBlockBytesScanned(slowLogPayload.getBlockBytesScanned()) .setServerClass(slowLogPayload.getServerClass()).setStartTime(slowLogPayload.getStartTime()) - .setUserName(slowLogPayload.getUserName()); + .setUserName(slowLogPayload.getUserName()) + .setRequestAttributes(convertNameBytesPairsToMap(slowLogPayload.getRequestAttributeList())) + .setConnectionAttributes( + convertNameBytesPairsToMap(slowLogPayload.getConnectionAttributeList())); if (slowLogPayload.hasScan()) { try { onlineLogRecord.setScan(ProtobufUtil.toScan(slowLogPayload.getScan())); @@ -3359,6 +3381,12 @@ private static LogEntry getSlowLogRecord(final TooSlowLog.SlowLogPayload slowLog return onlineLogRecord.build(); } + private static Map + convertNameBytesPairsToMap(List nameBytesPairs) { + return nameBytesPairs.stream().collect(Collectors.toMap(NameBytesPair::getName, + nameBytesPair -> nameBytesPair.getValue().toByteArray())); + } + /** * Convert AdminProtos#SlowLogResponses to list of {@link OnlineLogRecord} * @param logEntry slowlog response protobuf instance diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestOnlineLogRecord.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestOnlineLogRecord.java index 846738d82987..fe753973ae20 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestOnlineLogRecord.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestOnlineLogRecord.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.client; +import java.util.Collections; +import java.util.Map; +import java.util.Set; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -26,6 +29,9 @@ import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; + @Category({ ClientTests.class, SmallTests.class }) public class TestOnlineLogRecord { @@ -47,10 +53,56 @@ public void itSerializesScan() { + " \"maxResultSize\": -1,\n" + " \"families\": {},\n" + " \"caching\": -1,\n" + " \"maxVersions\": 1,\n" + " \"timeRange\": [\n" + " 0,\n" + " 9223372036854775807\n" + " ]\n" + " }\n" + "}"; - OnlineLogRecord o = - new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null, 6, 7, 0, scan); + OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null, + 6, 7, 0, scan, Collections.emptyMap(), Collections.emptyMap()); String actualOutput = o.toJsonPrettyPrint(); System.out.println(actualOutput); Assert.assertEquals(actualOutput, expectedOutput); } + + @Test + public void itSerializesRequestAttributes() { + Map requestAttributes = ImmutableMap. builder() + .put("r", Bytes.toBytes("1")).put("2", Bytes.toBytes(0.0)).build(); + Set expectedOutputs = + ImmutableSet. builder().add("requestAttributes").add("\"r\": \"1\"") + .add("\"2\": \"\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\"").build(); + OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null, + 6, 7, 0, null, requestAttributes, Collections.emptyMap()); + String actualOutput = o.toJsonPrettyPrint(); + System.out.println(actualOutput); + expectedOutputs.forEach(expected -> Assert.assertTrue(actualOutput.contains(expected))); + } + + @Test + public void itOmitsEmptyRequestAttributes() { + OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null, + 6, 7, 0, null, Collections.emptyMap(), Collections.emptyMap()); + String actualOutput = o.toJsonPrettyPrint(); + System.out.println(actualOutput); + Assert.assertFalse(actualOutput.contains("requestAttributes")); + } + + @Test + public void itSerializesConnectionAttributes() { + Map connectionAttributes = ImmutableMap. builder() + .put("c", Bytes.toBytes("1")).put("2", Bytes.toBytes(0.0)).build(); + Set expectedOutputs = + ImmutableSet. builder().add("connectionAttributes").add("\"c\": \"1\"") + .add("\"2\": \"\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\"").build(); + OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null, + 6, 7, 0, null, Collections.emptyMap(), connectionAttributes); + String actualOutput = o.toJsonPrettyPrint(); + System.out.println(actualOutput); + expectedOutputs.forEach(expected -> Assert.assertTrue(actualOutput.contains(expected))); + } + + @Test + public void itOmitsEmptyConnectionAttributes() { + OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null, + 6, 7, 0, null, Collections.emptyMap(), Collections.emptyMap()); + String actualOutput = o.toJsonPrettyPrint(); + System.out.println(actualOutput); + Assert.assertFalse(actualOutput.contains("connectionAttributes")); + } } diff --git a/hbase-protocol-shaded/src/main/protobuf/TooSlowLog.proto b/hbase-protocol-shaded/src/main/protobuf/TooSlowLog.proto index e61948519473..e4cee4a252d4 100644 --- a/hbase-protocol-shaded/src/main/protobuf/TooSlowLog.proto +++ b/hbase-protocol-shaded/src/main/protobuf/TooSlowLog.proto @@ -27,6 +27,7 @@ option java_outer_classname = "TooSlowLog"; option java_generate_equals_and_hash = true; option optimize_for = SPEED; +import "HBase.proto"; import "Client.proto"; message SlowLogPayload { @@ -49,6 +50,9 @@ message SlowLogPayload { optional int64 block_bytes_scanned = 16; optional Scan scan = 17; + repeated NameBytesPair connection_attribute = 18; + repeated NameBytesPair request_attribute = 19; + // SLOW_LOG is RPC call slow in nature whereas LARGE_LOG is RPC call quite large. // Majority of times, slow logs are also large logs and hence, ALL is combination of // both diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java index c0baf21e4340..eb35d886bbb0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.namequeues; +import java.util.Map; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.hadoop.hbase.ipc.RpcCall; import org.apache.yetus.audience.InterfaceAudience; @@ -39,6 +40,8 @@ public class RpcLogDetails extends NamedQueuePayload { private final String className; private final boolean isSlowLog; private final boolean isLargeLog; + private final Map connectionAttributes; + private final Map requestAttributes; public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long responseSize, long blockBytesScanned, String className, boolean isSlowLog, boolean isLargeLog) { @@ -51,6 +54,12 @@ public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long this.className = className; this.isSlowLog = isSlowLog; this.isLargeLog = isLargeLog; + + // it's important to call getConnectionAttributes and getRequestAttributes here + // because otherwise the buffers may get released before the log details are processed which + // would result in corrupted attributes + this.connectionAttributes = rpcCall.getConnectionAttributes(); + this.requestAttributes = rpcCall.getRequestAttributes(); } public RpcCall getRpcCall() { @@ -85,11 +94,20 @@ public Message getParam() { return param; } + public Map getConnectionAttributes() { + return connectionAttributes; + } + + public Map getRequestAttributes() { + return requestAttributes; + } + @Override public String toString() { return new ToStringBuilder(this).append("rpcCall", rpcCall).append("param", param) .append("clientAddress", clientAddress).append("responseSize", responseSize) .append("className", className).append("isSlowLog", isSlowLog) - .append("isLargeLog", isLargeLog).toString(); + .append("isLargeLog", isLargeLog).append("connectionAttributes", connectionAttributes) + .append("requestAttributes", requestAttributes).toString(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java index 48121a8b066a..fb29b8563ef7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hbase.namequeues.impl; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -42,12 +44,14 @@ import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; import org.apache.hbase.thirdparty.com.google.common.collect.Queues; +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; /** @@ -164,7 +168,9 @@ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { .setProcessingTime(processingTime).setQueueTime(qTime) .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY) .setResponseSize(responseSize).setBlockBytesScanned(blockBytesScanned) - .setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName); + .setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName) + .addAllRequestAttribute(buildNameBytesPairs(rpcLogDetails.getRequestAttributes())) + .addAllConnectionAttribute(buildNameBytesPairs(rpcLogDetails.getConnectionAttributes())); if (slowLogParams != null && slowLogParams.getScan() != null) { slowLogPayloadBuilder.setScan(slowLogParams.getScan()); } @@ -177,6 +183,16 @@ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { } } + private static Collection + buildNameBytesPairs(Map attributes) { + if (attributes == null) { + return Collections.emptySet(); + } + return attributes.entrySet().stream().map(attr -> HBaseProtos.NameBytesPair.newBuilder() + .setName(attr.getKey()).setValue(ByteString.copyFrom(attr.getValue())).build()) + .collect(Collectors.toSet()); + } + @Override public boolean clearNamedQueue() { if (!isOnlineLogProviderEnabled) { diff --git a/hbase-server/src/main/resources/hbase-webapps/regionserver/rsOperationDetails.jsp b/hbase-server/src/main/resources/hbase-webapps/regionserver/rsOperationDetails.jsp index a1ff23143bad..e8944b63f435 100644 --- a/hbase-server/src/main/resources/hbase-webapps/regionserver/rsOperationDetails.jsp +++ b/hbase-server/src/main/resources/hbase-webapps/regionserver/rsOperationDetails.jsp @@ -26,6 +26,7 @@ import="org.apache.hadoop.hbase.regionserver.HRegionServer" import="org.apache.hadoop.hbase.HConstants" import="org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog" + import="org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil" import="org.apache.hadoop.hbase.namequeues.NamedQueueRecorder" import="org.apache.hadoop.hbase.namequeues.RpcLogDetails" import="org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest" @@ -108,6 +109,8 @@ MultiService Calls Call Details Param + Request Attributes + Connection Attributes <% if (slowLogs != null && !slowLogs.isEmpty()) {%> <% for (TooSlowLog.SlowLogPayload r : slowLogs) { %> @@ -127,6 +130,8 @@ <%=r.getMultiServiceCalls()%> <%=r.getCallDetails()%> <%=r.getParam()%> + <%=ProtobufUtil.convertAttributesToCsv(r.getRequestAttributeList())%> + <%=ProtobufUtil.convertAttributesToCsv(r.getConnectionAttributeList())%> <% } %> <% } %> @@ -151,6 +156,8 @@ MultiService Calls Call Details Param + Request Attributes + Connection Attributes <% if (largeLogs != null && !largeLogs.isEmpty()) {%> <% for (TooSlowLog.SlowLogPayload r : largeLogs) { %> @@ -170,6 +177,8 @@ <%=r.getMultiServiceCalls()%> <%=r.getCallDetails()%> <%=r.getParam()%> + <%=ProtobufUtil.convertAttributesToCsv(r.getRequestAttributeList())%> + <%=ProtobufUtil.convertAttributesToCsv(r.getConnectionAttributeList())%> <% } %> <% } %> diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java index 0135062de408..2be75e373846 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java @@ -28,6 +28,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -47,6 +48,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; @@ -72,6 +74,20 @@ public class TestNamedQueueRecorder { private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class); private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility(); + private static final List REQUEST_HEADERS = + ImmutableList. builder() + .add(HBaseProtos.NameBytesPair.newBuilder().setName("1") + .setValue(ByteString.copyFromUtf8("r")).build()) + .add(HBaseProtos.NameBytesPair.newBuilder().setName("2") + .setValue(ByteString.copyFromUtf8("h")).build()) + .build(); + private static final List CONNECTION_HEADERS = + ImmutableList. builder() + .add(HBaseProtos.NameBytesPair.newBuilder().setName("1") + .setValue(ByteString.copyFromUtf8("c")).build()) + .add(HBaseProtos.NameBytesPair.newBuilder().setName("2") + .setValue(ByteString.copyFromUtf8("h")).build()) + .build(); private NamedQueueRecorder namedQueueRecorder; @@ -600,6 +616,54 @@ public void testOnlineSlowLogScanPayloadExplicitlyEnabled() throws Exception { })); } + @Test + public void testOnlineSlowLogRequestAttributes() throws Exception { + Configuration conf = applySlowLogRecorderConf(1); + Constructor constructor = + NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + namedQueueRecorder = constructor.newInstance(conf); + AdminProtos.SlowLogResponseRequest request = + AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); + + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + LOG.debug("Initially ringbuffer of Slow Log records is empty"); + RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); + namedQueueRecorder.addRecord(rpcLogDetails); + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { + Optional slowLogPayload = getSlowLogPayloads(request).stream().findAny(); + if (slowLogPayload.isPresent() && !slowLogPayload.get().getRequestAttributeList().isEmpty()) { + return slowLogPayload.get().getRequestAttributeList().containsAll(REQUEST_HEADERS); + } + return false; + })); + } + + @Test + public void testOnlineSlowLogConnectionAttributes() throws Exception { + Configuration conf = applySlowLogRecorderConf(1); + Constructor constructor = + NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + namedQueueRecorder = constructor.newInstance(conf); + AdminProtos.SlowLogResponseRequest request = + AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); + + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + LOG.debug("Initially ringbuffer of Slow Log records is empty"); + RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); + namedQueueRecorder.addRecord(rpcLogDetails); + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { + Optional slowLogPayload = getSlowLogPayloads(request).stream().findAny(); + if ( + slowLogPayload.isPresent() && !slowLogPayload.get().getConnectionAttributeList().isEmpty() + ) { + return slowLogPayload.get().getConnectionAttributeList().containsAll(CONNECTION_HEADERS); + } + return false; + })); + } + static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className, int forcedParamIndex) { RpcCall rpcCall = getRpcCall(userName, forcedParamIndex); @@ -697,12 +761,14 @@ public RPCProtos.RequestHeader getHeader() { @Override public Map getConnectionAttributes() { - return null; + return CONNECTION_HEADERS.stream().collect(Collectors + .toMap(HBaseProtos.NameBytesPair::getName, pair -> pair.getValue().toByteArray())); } @Override public Map getRequestAttributes() { - return null; + return REQUEST_HEADERS.stream().collect(Collectors.toMap(HBaseProtos.NameBytesPair::getName, + pair -> pair.getValue().toByteArray())); } @Override