Skip to content

Commit

Permalink
HBASE-27981 Add connection and request attributes to slow log (#5335)
Browse files Browse the repository at this point in the history
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
  • Loading branch information
rmdmattingly authored and bbeaudreault committed Sep 19, 2023
1 parent 18a2af4 commit f80b2a0
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;

import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
Expand All @@ -29,6 +30,8 @@
import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
import org.apache.hbase.thirdparty.com.google.gson.JsonSerializer;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;

/**
* Slow/Large Log payload for hbase-client, to be used by Admin API get_slow_responses and
* get_large_responses
Expand All @@ -53,6 +56,18 @@ final public class OnlineLogRecord extends LogEntry {
if (slowLogPayload.getMultiServiceCalls() == 0) {
jsonObj.remove("multiServiceCalls");
}
if (slowLogPayload.getRequestAttributes().isEmpty()) {
jsonObj.remove("requestAttributes");
} else {
jsonObj.add("requestAttributes", gson
.toJsonTree(ProtobufUtil.deserializeAttributes(slowLogPayload.getRequestAttributes())));
}
if (slowLogPayload.getConnectionAttributes().isEmpty()) {
jsonObj.remove("connectionAttributes");
} else {
jsonObj.add("connectionAttributes", gson.toJsonTree(
ProtobufUtil.deserializeAttributes(slowLogPayload.getConnectionAttributes())));
}
if (slowLogPayload.getScan().isPresent()) {
jsonObj.add("scan", gson.toJsonTree(slowLogPayload.getScan().get().toMap()));
} else {
Expand All @@ -79,6 +94,8 @@ final public class OnlineLogRecord extends LogEntry {
private final int multiMutationsCount;
private final int multiServiceCalls;
private final Optional<Scan> scan;
private final Map<String, byte[]> requestAttributes;
private final Map<String, byte[]> connectionAttributes;

public long getStartTime() {
return startTime;
Expand Down Expand Up @@ -152,11 +169,20 @@ public Optional<Scan> getScan() {
return scan;
}

public Map<String, byte[]> getRequestAttributes() {
return requestAttributes;
}

public Map<String, byte[]> getConnectionAttributes() {
return connectionAttributes;
}

OnlineLogRecord(final long startTime, final int processingTime, final int queueTime,
final long responseSize, final long blockBytesScanned, final String clientAddress,
final String serverClass, final String methodName, final String callDetails, final String param,
final String regionName, final String userName, final int multiGetsCount,
final int multiMutationsCount, final int multiServiceCalls, final Scan scan) {
final int multiMutationsCount, final int multiServiceCalls, final Scan scan,
final Map<String, byte[]> requestAttributes, final Map<String, byte[]> connectionAttributes) {
this.startTime = startTime;
this.processingTime = processingTime;
this.queueTime = queueTime;
Expand All @@ -173,6 +199,8 @@ public Optional<Scan> getScan() {
this.multiMutationsCount = multiMutationsCount;
this.multiServiceCalls = multiServiceCalls;
this.scan = Optional.ofNullable(scan);
this.requestAttributes = requestAttributes;
this.connectionAttributes = connectionAttributes;
}

public static class OnlineLogRecordBuilder {
Expand All @@ -192,6 +220,8 @@ public static class OnlineLogRecordBuilder {
private int multiMutationsCount;
private int multiServiceCalls;
private Scan scan = null;
private Map<String, byte[]> requestAttributes;
private Map<String, byte[]> connectionAttributes;

public OnlineLogRecordBuilder setStartTime(long startTime) {
this.startTime = startTime;
Expand Down Expand Up @@ -276,10 +306,22 @@ public OnlineLogRecordBuilder setScan(Scan scan) {
return this;
}

public OnlineLogRecordBuilder setRequestAttributes(Map<String, byte[]> requestAttributes) {
this.requestAttributes = requestAttributes;
return this;
}

public OnlineLogRecordBuilder
setConnectionAttributes(Map<String, byte[]> connectionAttributes) {
this.connectionAttributes = connectionAttributes;
return this;
}

public OnlineLogRecord build() {
return new OnlineLogRecord(startTime, processingTime, queueTime, responseSize,
blockBytesScanned, clientAddress, serverClass, methodName, callDetails, param, regionName,
userName, multiGetsCount, multiMutationsCount, multiServiceCalls, scan);
userName, multiGetsCount, multiMutationsCount, multiServiceCalls, scan, requestAttributes,
connectionAttributes);
}
}

Expand All @@ -304,7 +346,8 @@ public boolean equals(Object o) {
.append(serverClass, that.serverClass).append(methodName, that.methodName)
.append(callDetails, that.callDetails).append(param, that.param)
.append(regionName, that.regionName).append(userName, that.userName).append(scan, that.scan)
.isEquals();
.append(requestAttributes, that.requestAttributes)
.append(connectionAttributes, that.connectionAttributes).isEquals();
}

@Override
Expand All @@ -313,7 +356,7 @@ public int hashCode() {
.append(responseSize).append(blockBytesScanned).append(clientAddress).append(serverClass)
.append(methodName).append(callDetails).append(param).append(regionName).append(userName)
.append(multiGetsCount).append(multiMutationsCount).append(multiServiceCalls).append(scan)
.toHashCode();
.append(requestAttributes).append(connectionAttributes).toHashCode();
}

@Override
Expand All @@ -330,7 +373,9 @@ public String toString() {
.append("methodName", methodName).append("callDetails", callDetails).append("param", param)
.append("regionName", regionName).append("userName", userName)
.append("multiGetsCount", multiGetsCount).append("multiMutationsCount", multiMutationsCount)
.append("multiServiceCalls", multiServiceCalls).append("scan", scan).toString();
.append("multiServiceCalls", multiServiceCalls).append("scan", scan)
.append("requestAttributes", requestAttributes)
.append("connectionAttributes", connectionAttributes).toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2196,6 +2196,25 @@ public static SlowLogParams getSlowLogParams(Message message, boolean slowLogSca
return new SlowLogParams(params);
}

/**
* Convert a list of NameBytesPair to a more readable CSV
*/
public static String convertAttributesToCsv(List<NameBytesPair> attributes) {
if (attributes.isEmpty()) {
return HConstants.EMPTY_STRING;
}
return deserializeAttributes(convertNameBytesPairsToMap(attributes)).entrySet().stream()
.map(entry -> entry.getKey() + " = " + entry.getValue()).collect(Collectors.joining(", "));
}

/**
* Convert a map of byte array attributes to a more readable map of binary string representations
*/
public static Map<String, String> deserializeAttributes(Map<String, byte[]> attributes) {
return attributes.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey, entry -> Bytes.toStringBinary(entry.getValue())));
}

/**
* Print out some subset of a MutationProto rather than all of it and its data
* @param proto Protobuf to print out
Expand Down Expand Up @@ -3389,7 +3408,10 @@ private static LogEntry getSlowLogRecord(final TooSlowLog.SlowLogPayload slowLog
.setResponseSize(slowLogPayload.getResponseSize())
.setBlockBytesScanned(slowLogPayload.getBlockBytesScanned())
.setServerClass(slowLogPayload.getServerClass()).setStartTime(slowLogPayload.getStartTime())
.setUserName(slowLogPayload.getUserName());
.setUserName(slowLogPayload.getUserName())
.setRequestAttributes(convertNameBytesPairsToMap(slowLogPayload.getRequestAttributeList()))
.setConnectionAttributes(
convertNameBytesPairsToMap(slowLogPayload.getConnectionAttributeList()));
if (slowLogPayload.hasScan()) {
try {
onlineLogRecord.setScan(ProtobufUtil.toScan(slowLogPayload.getScan()));
Expand All @@ -3400,6 +3422,12 @@ private static LogEntry getSlowLogRecord(final TooSlowLog.SlowLogPayload slowLog
return onlineLogRecord.build();
}

private static Map<String, byte[]>
convertNameBytesPairsToMap(List<NameBytesPair> nameBytesPairs) {
return nameBytesPairs.stream().collect(Collectors.toMap(NameBytesPair::getName,
nameBytesPair -> nameBytesPair.getValue().toByteArray()));
}

/**
* Convert AdminProtos#SlowLogResponses to list of {@link OnlineLogRecord}
* @param logEntry slowlog response protobuf instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.client;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
Expand All @@ -26,6 +29,9 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;

@Category({ ClientTests.class, SmallTests.class })
public class TestOnlineLogRecord {

Expand All @@ -47,10 +53,56 @@ public void itSerializesScan() {
+ " \"maxResultSize\": -1,\n" + " \"families\": {},\n" + " \"caching\": -1,\n"
+ " \"maxVersions\": 1,\n" + " \"timeRange\": [\n" + " 0,\n"
+ " 9223372036854775807\n" + " ]\n" + " }\n" + "}";
OnlineLogRecord o =
new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null, 6, 7, 0, scan);
OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null,
6, 7, 0, scan, Collections.emptyMap(), Collections.emptyMap());
String actualOutput = o.toJsonPrettyPrint();
System.out.println(actualOutput);
Assert.assertEquals(actualOutput, expectedOutput);
}

@Test
public void itSerializesRequestAttributes() {
Map<String, byte[]> requestAttributes = ImmutableMap.<String, byte[]> builder()
.put("r", Bytes.toBytes("1")).put("2", Bytes.toBytes(0.0)).build();
Set<String> expectedOutputs =
ImmutableSet.<String> builder().add("requestAttributes").add("\"r\": \"1\"")
.add("\"2\": \"\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\"").build();
OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null,
6, 7, 0, null, requestAttributes, Collections.emptyMap());
String actualOutput = o.toJsonPrettyPrint();
System.out.println(actualOutput);
expectedOutputs.forEach(expected -> Assert.assertTrue(actualOutput.contains(expected)));
}

@Test
public void itOmitsEmptyRequestAttributes() {
OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null,
6, 7, 0, null, Collections.emptyMap(), Collections.emptyMap());
String actualOutput = o.toJsonPrettyPrint();
System.out.println(actualOutput);
Assert.assertFalse(actualOutput.contains("requestAttributes"));
}

@Test
public void itSerializesConnectionAttributes() {
Map<String, byte[]> connectionAttributes = ImmutableMap.<String, byte[]> builder()
.put("c", Bytes.toBytes("1")).put("2", Bytes.toBytes(0.0)).build();
Set<String> expectedOutputs =
ImmutableSet.<String> builder().add("connectionAttributes").add("\"c\": \"1\"")
.add("\"2\": \"\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\"").build();
OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null,
6, 7, 0, null, Collections.emptyMap(), connectionAttributes);
String actualOutput = o.toJsonPrettyPrint();
System.out.println(actualOutput);
expectedOutputs.forEach(expected -> Assert.assertTrue(actualOutput.contains(expected)));
}

@Test
public void itOmitsEmptyConnectionAttributes() {
OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null,
6, 7, 0, null, Collections.emptyMap(), Collections.emptyMap());
String actualOutput = o.toJsonPrettyPrint();
System.out.println(actualOutput);
Assert.assertFalse(actualOutput.contains("connectionAttributes"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ option java_outer_classname = "TooSlowLog";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

import "HBase.proto";
import "client/Client.proto";

message SlowLogPayload {
Expand All @@ -49,6 +50,9 @@ message SlowLogPayload {
optional int64 block_bytes_scanned = 16;
optional Scan scan = 17;

repeated NameBytesPair connection_attribute = 18;
repeated NameBytesPair request_attribute = 19;

// SLOW_LOG is RPC call slow in nature whereas LARGE_LOG is RPC call quite large.
// Majority of times, slow logs are also large logs and hence, ALL is combination of
// both
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.namequeues;

import java.util.Map;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -39,6 +40,8 @@ public class RpcLogDetails extends NamedQueuePayload {
private final String className;
private final boolean isSlowLog;
private final boolean isLargeLog;
private final Map<String, byte[]> connectionAttributes;
private final Map<String, byte[]> requestAttributes;

public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long responseSize,
long blockBytesScanned, String className, boolean isSlowLog, boolean isLargeLog) {
Expand All @@ -51,6 +54,12 @@ public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long
this.className = className;
this.isSlowLog = isSlowLog;
this.isLargeLog = isLargeLog;

// it's important to call getConnectionAttributes and getRequestAttributes here
// because otherwise the buffers may get released before the log details are processed which
// would result in corrupted attributes
this.connectionAttributes = rpcCall.getConnectionAttributes();
this.requestAttributes = rpcCall.getRequestAttributes();
}

public RpcCall getRpcCall() {
Expand Down Expand Up @@ -85,11 +94,20 @@ public Message getParam() {
return param;
}

public Map<String, byte[]> getConnectionAttributes() {
return connectionAttributes;
}

public Map<String, byte[]> getRequestAttributes() {
return requestAttributes;
}

@Override
public String toString() {
return new ToStringBuilder(this).append("rpcCall", rpcCall).append("param", param)
.append("clientAddress", clientAddress).append("responseSize", responseSize)
.append("className", className).append("isSlowLog", isSlowLog)
.append("isLargeLog", isLargeLog).toString();
.append("isLargeLog", isLargeLog).append("connectionAttributes", connectionAttributes)
.append("requestAttributes", requestAttributes).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.hadoop.hbase.namequeues.impl;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -42,12 +44,14 @@

import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;

/**
Expand Down Expand Up @@ -164,7 +168,9 @@ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
.setProcessingTime(processingTime).setQueueTime(qTime)
.setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
.setResponseSize(responseSize).setBlockBytesScanned(blockBytesScanned)
.setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName);
.setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName)
.addAllRequestAttribute(buildNameBytesPairs(rpcLogDetails.getRequestAttributes()))
.addAllConnectionAttribute(buildNameBytesPairs(rpcLogDetails.getConnectionAttributes()));
if (slowLogParams != null && slowLogParams.getScan() != null) {
slowLogPayloadBuilder.setScan(slowLogParams.getScan());
}
Expand All @@ -177,6 +183,16 @@ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
}
}

private static Collection<HBaseProtos.NameBytesPair>
buildNameBytesPairs(Map<String, byte[]> attributes) {
if (attributes == null) {
return Collections.emptySet();
}
return attributes.entrySet().stream().map(attr -> HBaseProtos.NameBytesPair.newBuilder()
.setName(attr.getKey()).setValue(ByteString.copyFrom(attr.getValue())).build())
.collect(Collectors.toSet());
}

@Override
public boolean clearNamedQueue() {
if (!isOnlineLogProviderEnabled) {
Expand Down
Loading

0 comments on commit f80b2a0

Please sign in to comment.