Skip to content

Commit

Permalink
conflicts
Browse files Browse the repository at this point in the history
add connection and request headers to slowlog payload

client side of slow log attributes. RS UI updates

prefer toStringBinary in attribute deserialization

rsOperationDetails UI fixes

checkstyle

spotless

fix tests
  • Loading branch information
Ray Mattingly committed Sep 15, 2023
1 parent 84ccae3 commit 46a6c1a
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;

import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
Expand All @@ -29,6 +30,8 @@
import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
import org.apache.hbase.thirdparty.com.google.gson.JsonSerializer;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;

/**
* Slow/Large Log payload for hbase-client, to be used by Admin API get_slow_responses and
* get_large_responses
Expand All @@ -53,6 +56,18 @@ final public class OnlineLogRecord extends LogEntry {
if (slowLogPayload.getMultiServiceCalls() == 0) {
jsonObj.remove("multiServiceCalls");
}
if (slowLogPayload.getRequestAttributes().isEmpty()) {
jsonObj.remove("requestAttributes");
} else {
jsonObj.add("requestAttributes", gson
.toJsonTree(ProtobufUtil.deserializeAttributes(slowLogPayload.getRequestAttributes())));
}
if (slowLogPayload.getConnectionAttributes().isEmpty()) {
jsonObj.remove("connectionAttributes");
} else {
jsonObj.add("connectionAttributes", gson.toJsonTree(
ProtobufUtil.deserializeAttributes(slowLogPayload.getConnectionAttributes())));
}
if (slowLogPayload.getScan().isPresent()) {
jsonObj.add("scan", gson.toJsonTree(slowLogPayload.getScan().get().toMap()));
} else {
Expand All @@ -79,6 +94,8 @@ final public class OnlineLogRecord extends LogEntry {
private final int multiMutationsCount;
private final int multiServiceCalls;
private final Optional<Scan> scan;
private final Map<String, byte[]> requestAttributes;
private final Map<String, byte[]> connectionAttributes;

public long getStartTime() {
return startTime;
Expand Down Expand Up @@ -152,11 +169,20 @@ public Optional<Scan> getScan() {
return scan;
}

public Map<String, byte[]> getRequestAttributes() {
return requestAttributes;
}

public Map<String, byte[]> getConnectionAttributes() {
return connectionAttributes;
}

OnlineLogRecord(final long startTime, final int processingTime, final int queueTime,
final long responseSize, final long blockBytesScanned, final String clientAddress,
final String serverClass, final String methodName, final String callDetails, final String param,
final String regionName, final String userName, final int multiGetsCount,
final int multiMutationsCount, final int multiServiceCalls, final Scan scan) {
final int multiMutationsCount, final int multiServiceCalls, final Scan scan,
final Map<String, byte[]> requestAttributes, final Map<String, byte[]> connectionAttributes) {
this.startTime = startTime;
this.processingTime = processingTime;
this.queueTime = queueTime;
Expand All @@ -173,6 +199,8 @@ public Optional<Scan> getScan() {
this.multiMutationsCount = multiMutationsCount;
this.multiServiceCalls = multiServiceCalls;
this.scan = Optional.ofNullable(scan);
this.requestAttributes = requestAttributes;
this.connectionAttributes = connectionAttributes;
}

public static class OnlineLogRecordBuilder {
Expand All @@ -192,6 +220,8 @@ public static class OnlineLogRecordBuilder {
private int multiMutationsCount;
private int multiServiceCalls;
private Scan scan = null;
private Map<String, byte[]> requestAttributes;
private Map<String, byte[]> connectionAttributes;

public OnlineLogRecordBuilder setStartTime(long startTime) {
this.startTime = startTime;
Expand Down Expand Up @@ -276,10 +306,22 @@ public OnlineLogRecordBuilder setScan(Scan scan) {
return this;
}

public OnlineLogRecordBuilder setRequestAttributes(Map<String, byte[]> requestAttributes) {
this.requestAttributes = requestAttributes;
return this;
}

public OnlineLogRecordBuilder
setConnectionAttributes(Map<String, byte[]> connectionAttributes) {
this.connectionAttributes = connectionAttributes;
return this;
}

public OnlineLogRecord build() {
return new OnlineLogRecord(startTime, processingTime, queueTime, responseSize,
blockBytesScanned, clientAddress, serverClass, methodName, callDetails, param, regionName,
userName, multiGetsCount, multiMutationsCount, multiServiceCalls, scan);
userName, multiGetsCount, multiMutationsCount, multiServiceCalls, scan, requestAttributes,
connectionAttributes);
}
}

Expand All @@ -304,7 +346,8 @@ public boolean equals(Object o) {
.append(serverClass, that.serverClass).append(methodName, that.methodName)
.append(callDetails, that.callDetails).append(param, that.param)
.append(regionName, that.regionName).append(userName, that.userName).append(scan, that.scan)
.isEquals();
.append(requestAttributes, that.requestAttributes)
.append(connectionAttributes, that.connectionAttributes).isEquals();
}

@Override
Expand All @@ -313,7 +356,7 @@ public int hashCode() {
.append(responseSize).append(blockBytesScanned).append(clientAddress).append(serverClass)
.append(methodName).append(callDetails).append(param).append(regionName).append(userName)
.append(multiGetsCount).append(multiMutationsCount).append(multiServiceCalls).append(scan)
.toHashCode();
.append(requestAttributes).append(connectionAttributes).toHashCode();
}

@Override
Expand All @@ -330,7 +373,9 @@ public String toString() {
.append("methodName", methodName).append("callDetails", callDetails).append("param", param)
.append("regionName", regionName).append("userName", userName)
.append("multiGetsCount", multiGetsCount).append("multiMutationsCount", multiMutationsCount)
.append("multiServiceCalls", multiServiceCalls).append("scan", scan).toString();
.append("multiServiceCalls", multiServiceCalls).append("scan", scan)
.append("requestAttributes", requestAttributes)
.append("connectionAttributes", connectionAttributes).toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2196,6 +2196,25 @@ public static SlowLogParams getSlowLogParams(Message message, boolean slowLogSca
return new SlowLogParams(params);
}

/**
* Convert a list of NameBytesPair to a more readable CSV
*/
public static String convertAttributesToCsv(List<NameBytesPair> attributes) {
if (attributes.isEmpty()) {
return HConstants.EMPTY_STRING;
}
return deserializeAttributes(convertNameBytesPairsToMap(attributes)).entrySet().stream()
.map(entry -> entry.getKey() + " = " + entry.getValue()).collect(Collectors.joining(", "));
}

/**
* Convert a map of byte array attributes to a more readable map of binary string representations
*/
public static Map<String, String> deserializeAttributes(Map<String, byte[]> attributes) {
return attributes.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey, entry -> Bytes.toStringBinary(entry.getValue())));
}

/**
* Print out some subset of a MutationProto rather than all of it and its data
* @param proto Protobuf to print out
Expand Down Expand Up @@ -3389,7 +3408,10 @@ private static LogEntry getSlowLogRecord(final TooSlowLog.SlowLogPayload slowLog
.setResponseSize(slowLogPayload.getResponseSize())
.setBlockBytesScanned(slowLogPayload.getBlockBytesScanned())
.setServerClass(slowLogPayload.getServerClass()).setStartTime(slowLogPayload.getStartTime())
.setUserName(slowLogPayload.getUserName());
.setUserName(slowLogPayload.getUserName())
.setRequestAttributes(convertNameBytesPairsToMap(slowLogPayload.getRequestAttributeList()))
.setConnectionAttributes(
convertNameBytesPairsToMap(slowLogPayload.getConnectionAttributeList()));
if (slowLogPayload.hasScan()) {
try {
onlineLogRecord.setScan(ProtobufUtil.toScan(slowLogPayload.getScan()));
Expand All @@ -3400,6 +3422,12 @@ private static LogEntry getSlowLogRecord(final TooSlowLog.SlowLogPayload slowLog
return onlineLogRecord.build();
}

private static Map<String, byte[]>
convertNameBytesPairsToMap(List<NameBytesPair> nameBytesPairs) {
return nameBytesPairs.stream().collect(Collectors.toMap(NameBytesPair::getName,
nameBytesPair -> nameBytesPair.getValue().toByteArray()));
}

/**
* Convert AdminProtos#SlowLogResponses to list of {@link OnlineLogRecord}
* @param logEntry slowlog response protobuf instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.client;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
Expand All @@ -26,6 +29,9 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;

@Category({ ClientTests.class, SmallTests.class })
public class TestOnlineLogRecord {

Expand All @@ -47,10 +53,56 @@ public void itSerializesScan() {
+ " \"maxResultSize\": -1,\n" + " \"families\": {},\n" + " \"caching\": -1,\n"
+ " \"maxVersions\": 1,\n" + " \"timeRange\": [\n" + " 0,\n"
+ " 9223372036854775807\n" + " ]\n" + " }\n" + "}";
OnlineLogRecord o =
new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null, 6, 7, 0, scan);
OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null,
6, 7, 0, scan, Collections.emptyMap(), Collections.emptyMap());
String actualOutput = o.toJsonPrettyPrint();
System.out.println(actualOutput);
Assert.assertEquals(actualOutput, expectedOutput);
}

@Test
public void itSerializesRequestAttributes() {
Map<String, byte[]> requestAttributes = ImmutableMap.<String, byte[]> builder()
.put("r", Bytes.toBytes("1")).put("2", Bytes.toBytes(0.0)).build();
Set<String> expectedOutputs =
ImmutableSet.<String> builder().add("requestAttributes").add("\"r\": \"1\"")
.add("\"2\": \"\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\"").build();
OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null,
6, 7, 0, null, requestAttributes, Collections.emptyMap());
String actualOutput = o.toJsonPrettyPrint();
System.out.println(actualOutput);
expectedOutputs.forEach(expected -> Assert.assertTrue(actualOutput.contains(expected)));
}

@Test
public void itOmitsEmptyRequestAttributes() {
OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null,
6, 7, 0, null, Collections.emptyMap(), Collections.emptyMap());
String actualOutput = o.toJsonPrettyPrint();
System.out.println(actualOutput);
Assert.assertFalse(actualOutput.contains("requestAttributes"));
}

@Test
public void itSerializesConnectionAttributes() {
Map<String, byte[]> connectionAttributes = ImmutableMap.<String, byte[]> builder()
.put("c", Bytes.toBytes("1")).put("2", Bytes.toBytes(0.0)).build();
Set<String> expectedOutputs =
ImmutableSet.<String> builder().add("connectionAttributes").add("\"c\": \"1\"")
.add("\"2\": \"\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\"").build();
OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null,
6, 7, 0, null, Collections.emptyMap(), connectionAttributes);
String actualOutput = o.toJsonPrettyPrint();
System.out.println(actualOutput);
expectedOutputs.forEach(expected -> Assert.assertTrue(actualOutput.contains(expected)));
}

@Test
public void itOmitsEmptyConnectionAttributes() {
OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null,
6, 7, 0, null, Collections.emptyMap(), Collections.emptyMap());
String actualOutput = o.toJsonPrettyPrint();
System.out.println(actualOutput);
Assert.assertFalse(actualOutput.contains("connectionAttributes"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ option java_outer_classname = "TooSlowLog";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

import "HBase.proto";
import "client/Client.proto";

message SlowLogPayload {
Expand All @@ -49,6 +50,9 @@ message SlowLogPayload {
optional int64 block_bytes_scanned = 16;
optional Scan scan = 17;

repeated NameBytesPair connection_attribute = 18;
repeated NameBytesPair request_attribute = 19;

// SLOW_LOG is RPC call slow in nature whereas LARGE_LOG is RPC call quite large.
// Majority of times, slow logs are also large logs and hence, ALL is combination of
// both
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.hadoop.hbase.namequeues.impl;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -42,12 +44,14 @@

import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;

/**
Expand Down Expand Up @@ -164,7 +168,9 @@ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
.setProcessingTime(processingTime).setQueueTime(qTime)
.setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
.setResponseSize(responseSize).setBlockBytesScanned(blockBytesScanned)
.setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName);
.setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName)
.addAllRequestAttribute(buildNameBytesPairs(rpcCall.getRequestAttributes()))
.addAllConnectionAttribute(buildNameBytesPairs(rpcCall.getConnectionAttributes()));
if (slowLogParams != null && slowLogParams.getScan() != null) {
slowLogPayloadBuilder.setScan(slowLogParams.getScan());
}
Expand All @@ -177,6 +183,16 @@ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
}
}

private static Collection<HBaseProtos.NameBytesPair>
buildNameBytesPairs(Map<String, byte[]> attributes) {
if (attributes == null) {
return Collections.emptySet();
}
return attributes.entrySet().stream().map(attr -> HBaseProtos.NameBytesPair.newBuilder()
.setName(attr.getKey()).setValue(ByteString.copyFrom(attr.getValue())).build())
.collect(Collectors.toSet());
}

@Override
public boolean clearNamedQueue() {
if (!isOnlineLogProviderEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import="org.apache.hadoop.hbase.regionserver.HRegionServer"
import="org.apache.hadoop.hbase.HConstants"
import="org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog"
import="org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil"
import="org.apache.hadoop.hbase.namequeues.NamedQueueRecorder"
import="org.apache.hadoop.hbase.namequeues.RpcLogDetails"
import="org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest"
Expand Down Expand Up @@ -108,6 +109,8 @@
<th>MultiService Calls</th>
<th>Call Details</th>
<th>Param</th>
<th>Request Attributes</th>
<th>Connection Attributes</th>
</tr>
<% if (slowLogs != null && !slowLogs.isEmpty()) {%>
<% for (TooSlowLog.SlowLogPayload r : slowLogs) { %>
Expand All @@ -127,6 +130,8 @@
<td><%=r.getMultiServiceCalls()%></td>
<td><%=r.getCallDetails()%></td>
<td><%=r.getParam()%></td>
<td><%=ProtobufUtil.convertAttributesToCsv(r.getRequestAttributeList())%></td>
<td><%=ProtobufUtil.convertAttributesToCsv(r.getConnectionAttributeList())%></td>
</tr>
<% } %>
<% } %>
Expand All @@ -151,6 +156,8 @@
<th>MultiService Calls</th>
<th>Call Details</th>
<th>Param</th>
<th>Request Attributes</th>
<th>Connection Attributes</th>
</tr>
<% if (largeLogs != null && !largeLogs.isEmpty()) {%>
<% for (TooSlowLog.SlowLogPayload r : largeLogs) { %>
Expand All @@ -170,6 +177,8 @@
<td><%=r.getMultiServiceCalls()%></td>
<td><%=r.getCallDetails()%></td>
<td><%=r.getParam()%></td>
<td><%=ProtobufUtil.convertAttributesToCsv(r.getRequestAttributeList())%></td>
<td><%=ProtobufUtil.convertAttributesToCsv(r.getConnectionAttributeList())%></td>
</tr>
<% } %>
<% } %>
Expand Down
Loading

0 comments on commit 46a6c1a

Please sign in to comment.